Sfoglia il codice sorgente

HPCC-12282 Move roxie global stats into new mechanism

Refactor slave stats so they actually work

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 anni fa
parent
commit
579537b875

+ 10 - 0
common/thorhelper/roxiehelper.cpp

@@ -1482,3 +1482,13 @@ StringBuffer & expandLogicalFilename(StringBuffer & logicalName, const char * fn
     }
     return logicalName;
 }
+
+//----------------------------------------------------------------------------------
+
+void IRoxieContextLogger::CTXLOGae(IException *E, const char *file, unsigned line, const char *prefix, const char *format, ...) const
+{
+    va_list args;
+    va_start(args, format);
+    CTXLOGaeva(E, file, line, prefix, format, args);
+    va_end(args);
+}

+ 18 - 1
common/thorhelper/roxiehelper.ipp

@@ -65,13 +65,30 @@ enum TracingCategory
 class LogItem;
 interface IRoxieContextLogger : extends IContextLogger
 {
+    // Override base interface with versions that add prefix
+    // We could consider moving some or all of these down into IContextLogger
+    virtual void CTXLOGva(const char *format, va_list args) const
+    {
+        StringBuffer text, prefix;
+        getLogPrefix(prefix);
+        text.valist_appendf(format, args);
+        CTXLOGa(LOG_TRACING, prefix.str(), text.str());
+    }
+    virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const
+    {
+        StringBuffer prefix;
+        getLogPrefix(prefix);
+        CTXLOGaeva(E, file, line, prefix.str(), format, args);
+    }
+
     virtual StringBuffer &getLogPrefix(StringBuffer &ret) const = 0;
     virtual bool isIntercepted() const = 0;
     virtual void CTXLOGa(TracingCategory category, const char *prefix, const char *text) const = 0;
-    virtual void CTXLOGae(IException *E, const char *file, unsigned line, const char *prefix, const char *format, ...) const __attribute__((format(printf, 6, 7))) = 0;
+    void CTXLOGae(IException *E, const char *file, unsigned line, const char *prefix, const char *format, ...) const __attribute__((format(printf, 6, 7)));
     virtual void CTXLOGaeva(IException *E, const char *file, unsigned line, const char *prefix, const char *format, va_list args) const = 0;
     virtual void CTXLOGl(LogItem *) const = 0;
     virtual bool isBlind() const = 0;
+    virtual const CRuntimeStatisticCollection &queryStats() const = 0;
 };
 
 //===================================================================================

+ 91 - 88
roxie/ccd/ccd.hpp

@@ -547,6 +547,84 @@ public:
 
 };
 
+// Used as a base class by classes that want to intercept statistics but pass on other logging (i.e. slave or server activity classes)
+// Note that the stats are upmerged on destruction
+// Also note that we assume single-threaded access to stats.
+
+class IndirectContextLogger : public CInterface, implements IRoxieContextLogger
+{
+protected:
+    const IRoxieContextLogger *logctx;
+    bool aborted;
+    mutable CRuntimeStatisticCollection stats;
+
+public:
+    IMPLEMENT_IINTERFACE;
+
+    IndirectContextLogger(const IRoxieContextLogger *_logctx, const StatisticsMapping & _statsMapping)
+    : logctx(_logctx), stats(_statsMapping)
+    {
+        aborted = false;
+    }
+    ~IndirectContextLogger()
+    {
+        if (logctx)
+            logctx->mergeStats(stats);
+    }
+    inline void abort()
+    {
+        aborted = true;
+    }
+    virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const
+    {
+        if (aborted)
+            throw MakeStringException(ROXIE_ABORT_ERROR, "Roxie server requested abort for running activity");
+        stats.addStatistic(kind, value);
+    }
+    virtual void mergeStats(const CRuntimeStatisticCollection &from) const
+    {
+        stats.merge(from);
+    }
+    virtual const CRuntimeStatisticCollection &queryStats() const
+    {
+        return stats;
+    }
+
+    // The rest are passthroughs
+
+    virtual unsigned queryTraceLevel() const
+    {
+        return logctx ? logctx->queryTraceLevel() : traceLevel;
+    }
+    virtual StringBuffer &getLogPrefix(StringBuffer &ret) const
+    {
+        return logctx ? logctx->getLogPrefix(ret) : ret;
+    }
+    virtual bool isIntercepted() const
+    {
+        return logctx ? logctx->isIntercepted() : false;
+    }
+    virtual void CTXLOGa(TracingCategory category, const char *prefix, const char *text) const
+    {
+        if (logctx)
+            logctx->CTXLOGa(category, prefix, text);
+    }
+    virtual void CTXLOGaeva(IException *E, const char *file, unsigned line, const char *prefix, const char *format, va_list args) const
+    {
+        if (logctx)
+            logctx->CTXLOGaeva(E, file, line, prefix, format, args);
+    }
+    virtual void CTXLOGl(LogItem *log) const
+    {
+        if (logctx)
+            logctx->CTXLOGl(log);
+    }
+    virtual bool isBlind() const
+    {
+        return logctx ? logctx->isBlind() : blindLogging;
+    }
+};
+
 // MORE - this code probably should be commoned up with some of the new stats code
 extern void putStatsValue(IPropertyTree *node, const char *statName, const char *statType, unsigned __int64 val);
 extern void putStatsValue(StringBuffer &reply, const char *statName, const char *statType, unsigned __int64 val);
@@ -558,12 +636,11 @@ protected:
     unsigned start;
     unsigned ctxTraceLevel;
     mutable CRuntimeStatisticCollection stats;
-    mutable ITimeReporter *timeReporter;
     unsigned channel;
 public: // Not very clean but I don't care
     bool intercept;
     bool blind;
-    bool aborted;
+    mutable bool aborted;
     mutable CIArrayOf<LogItem> log;
 private:
     ContextLogger(const ContextLogger &);  // Disable copy constructor
@@ -575,15 +652,10 @@ public:
         ctxTraceLevel = traceLevel;
         intercept = false;
         blind = false;
-        timeReporter = createStdTimeReporter();
         start = msTick();
         channel = 0;
         aborted = false;
     }
-    ~ContextLogger()
-    {
-        ::Release(timeReporter);
-    }
 
     void outputXML(IXmlStreamFlusher &out)
     {
@@ -594,26 +666,6 @@ public:
         }
     };
 
-    virtual void CTXLOG(const char *format, ...) const  __attribute__((format(printf, 2, 3)))
-    {
-        va_list args;
-        va_start(args, format);
-        CTXLOGva(format, args);
-        va_end(args);
-    }
-    virtual void CTXLOGva(const char *format, va_list args) const
-    {
-        StringBuffer prefix, text;
-        getLogPrefix(prefix);
-        text.valist_appendf(format, args);
-        DBGLOG("[%s] %s", prefix.str(), text.str());
-        if (intercept)
-        {
-            CriticalBlock b(crit);
-            log.append(* new LogItem(LOG_TRACING, prefix, msTick() - start, channel, text));
-            flush(false, false);
-        }
-    }
     virtual void CTXLOGa(TracingCategory category, const char *prefix, const char *text) const
     {
         if (category == LOG_TRACING)
@@ -624,29 +676,8 @@ public:
         {
             CriticalBlock b(crit);
             log.append(* new LogItem(category, prefix, msTick() - start, channel, text));
-            flush(false, false);
         }
     }
-    virtual void logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const  __attribute__((format(printf, 5, 6)))
-    {
-        va_list args;
-        va_start(args, format);
-        CTXLOGaeva(E, file, line, 0, format, args);
-        va_end(args);
-    }
-    virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const
-    {
-        CTXLOGaeva(E, file, line, 0, format, args);
-    }
-
-    virtual void CTXLOGae(IException *E, const char *file, unsigned line, const char *prefix, const char *format, ...) const __attribute__((format(printf, 6, 7)))
-    {
-        va_list args;
-        va_start(args, format);
-        CTXLOGaeva(E, file, line, prefix, format, args);
-        va_end(args);
-    }
-
     virtual void CTXLOGaeva(IException *E, const char *file, unsigned line, const char *prefix, const char *format, va_list args) const
     {
         StringBuffer text;
@@ -666,7 +697,6 @@ public:
         {
             CriticalBlock b(crit);
             log.append(* new LogItem(LOG_ERROR, prefix, msTick() - start, channel, text));
-            flush(false, false);
         }
     }
     virtual void CTXLOGl(LogItem *logItem) const
@@ -674,7 +704,6 @@ public:
         // NOTE - we don't actually print anything to logfile here - was already printed on slave
         CriticalBlock b(crit);
         log.append(*logItem);
-        flush(false, false);
     }
 
     void setIntercept(bool _intercept)
@@ -692,21 +721,11 @@ public:
         ctxTraceLevel = _traceLevel;
     }
 
-    virtual void flush(bool closing, bool aborted) const
-    {
-    }
-
     StringBuffer &getStats(StringBuffer &s) const
     {
         return stats.toStr(s);
     }
 
-    virtual void dumpStats(IWorkUnit *wu) const
-    {
-        Owned<IStatisticGatherer> gatherer = createGlobalStatisticGatherer(wu);
-        stats.recordStatistics(*gatherer);
-    }
-
     virtual bool isIntercepted() const
     {
         return intercept;
@@ -724,18 +743,22 @@ public:
         stats.addStatistic(kind, value);
     }
 
-    virtual unsigned queryTraceLevel() const
+    virtual void mergeStats(const CRuntimeStatisticCollection &from) const
     {
-        return ctxTraceLevel;
+        stats.merge(from);
     }
-    inline ITimeReporter *queryTimer() const
+    virtual const CRuntimeStatisticCollection &queryStats() const
     {
-        return timeReporter;
+        return stats;
+    }
+
+    virtual unsigned queryTraceLevel() const
+    {
+        return ctxTraceLevel;
     }
     void reset()
     {
         stats.reset();
-        queryActiveTimer()->reset();
     }
 };
 
@@ -760,23 +783,10 @@ public:
     }
 };
 
-class SimpleContextLogger : public ContextLogger
-{
-    unsigned instanceId;
-public:
-    SimpleContextLogger(unsigned _instanceId) : instanceId(_instanceId)
-    {
-    }
-    virtual StringBuffer &getLogPrefix(StringBuffer &ret) const
-    {
-        return ret.append(instanceId);  
-    }
-};
-
 class SlaveContextLogger : public StringContextLogger
 {
-    mutable Owned<IMessagePacker> output;
-    mutable bool anyOutput;
+    Owned<IMessagePacker> output;
+    bool anyOutput;
     bool traceActivityTimes;
     bool debuggerActive;
     bool checkingHeap;
@@ -786,19 +796,12 @@ public:
     SlaveContextLogger();
     SlaveContextLogger(IRoxieQueryPacket *packet);
     void set(IRoxieQueryPacket *packet);
-    virtual void flush(bool closing, bool aborted) const;
-    inline bool queryTraceActivityTimes() const { return traceActivityTimes; }
+    void flush();
     inline bool queryDebuggerActive() const { return debuggerActive; }
-    inline bool queryCheckingHeap() const { return checkingHeap; }
-    inline void setDebuggerActive(bool _active) { debuggerActive = _active; }
     inline const CRuntimeStatisticCollection &queryStats() const
     {
         return stats;
     }
-    inline void requestAbort()
-    {
-        aborted = true;
-    }
     inline const char *queryWuid()
     {
         return wuid.get();

File diff suppressed because it is too large
+ 206 - 275
roxie/ccd/ccdactivities.cpp


+ 3 - 3
roxie/ccd/ccdactivities.hpp

@@ -46,11 +46,12 @@ interface IActivityFactory : extends IInterface
     virtual ThorActivityKind getKind() const = 0;
     virtual void getActivityMetrics(StringBuffer &reply) const = 0;
     virtual void getXrefInfo(IPropertyTree &reply, const IRoxieContextLogger &logctx) const = 0;
+    virtual void mergeStats(const CRuntimeStatisticCollection &from) const = 0;
 };
 
 interface IRoxieSlaveActivity : extends IInterface
 {
-    virtual bool process() = 0;
+    virtual IMessagePacker *process() = 0;
     virtual bool check() = 0;
     virtual void abort() = 0;
     virtual IRoxieQueryPacket *queryPacket() const = 0;
@@ -61,11 +62,10 @@ interface IRoxieSlaveActivity : extends IInterface
 
 interface ISlaveActivityFactory : extends IActivityFactory
 {
-    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const = 0;
+    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const = 0;
     virtual StringBuffer &toString(StringBuffer &ret) const = 0;
     virtual const char *queryQueryName() const = 0;
     virtual void addChildQuery(unsigned id, ActivityArray *childQuery) = 0;
-    virtual void noteStatistics(const CRuntimeStatisticCollection &stats) = 0;
 };
 
 typedef const void * cvp;

+ 23 - 45
roxie/ccd/ccdcontext.cpp

@@ -1036,7 +1036,7 @@ protected:
     Owned<IDistributedFileTransaction> superfileTransaction;
 
     CriticalSection statsCrit;
-    const ContextLogger &logctx;
+    const IRoxieContextLogger &logctx;
 
 protected:
     CriticalSection resultsCrit;
@@ -1096,11 +1096,17 @@ protected:
 
 public:
     IMPLEMENT_IINTERFACE;
-    CSlaveContext(const IQueryFactory *_factory, const ContextLogger &_logctx, IRoxieQueryPacket *_packet, bool _debuggerActive)
+    CSlaveContext(const IQueryFactory *_factory, const IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, bool _hasChildren)
         : factory(_factory), logctx(_logctx), options(factory->queryOptions())
     {
+        bool debuggerActive = false;
         if (_packet)
+        {
             header = &_packet->queryHeader();
+            const byte *traceInfo = _packet->queryTraceInfo();
+            options.setFromSlaveLoggingFlags(*traceInfo);
+            debuggerActive = (*traceInfo & LOGGING_DEBUGGERACTIVE) != 0 && _hasChildren;  // No option to debug simple remote activity
+        }
         else
             header = NULL;
         startTime = lastWuAbortCheck = msTick();
@@ -1109,7 +1115,7 @@ public:
         deserializedResultStore = NULL;
         rereadResults = NULL;
         xmlStoredDatasetReadFlags = ptr_none;
-        if (_debuggerActive)
+        if (debuggerActive)
         {
             assertex(header);
             CSlaveDebugContext *slaveDebugContext = new CSlaveDebugContext(this, logctx, *header);
@@ -1141,17 +1147,14 @@ public:
         logctx.noteStatistic(kind, value);
     }
 
-    virtual void CTXLOG(const char *format, ...) const
+    virtual void mergeStats(const CRuntimeStatisticCollection &from) const
     {
-        va_list args;
-        va_start(args, format);
-        logctx.CTXLOGva(format, args);
-        va_end(args);
+        logctx.mergeStats(from);
     }
 
-    virtual void CTXLOGva(const char *format, va_list args) const
+    virtual const CRuntimeStatisticCollection &queryStats() const
     {
-        logctx.CTXLOGva(format, args);
+        return logctx.queryStats();
     }
 
     virtual void CTXLOGa(TracingCategory category, const char *prefix, const char *text) const
@@ -1159,27 +1162,6 @@ public:
         logctx.CTXLOGa(category, prefix, text);
     }
 
-    virtual void logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const
-    {
-        va_list args;
-        va_start(args, format);
-        logctx.logOperatorExceptionVA(E, file, line, format, args);
-        va_end(args);
-    }
-
-    virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const
-    {
-        logctx.logOperatorExceptionVA(E, file, line, format, args);
-    }
-
-    virtual void CTXLOGae(IException *E, const char *file, unsigned line, const char *prefix, const char *format, ...) const
-    {
-        va_list args;
-        va_start(args, format);
-        logctx.CTXLOGaeva(E, file, line, prefix, format, args);
-        va_end(args);
-    }
-
     virtual void CTXLOGaeva(IException *E, const char *file, unsigned line, const char *prefix, const char *format, va_list args) const
     {
         logctx.CTXLOGaeva(E, file, line, prefix, format, args);
@@ -1261,11 +1243,6 @@ public:
         }
     }
 
-    void setOptions(const SlaveContextLogger &ctx)
-    {
-        options.setFromSlaveContextLogger(ctx);
-    }
-
     virtual void checkAbort()
     {
         // MORE - really should try to apply limits at slave end too
@@ -2376,11 +2353,9 @@ protected:
     }
 };
 
-IRoxieSlaveContext *createSlaveContext(const IQueryFactory *_factory, const SlaveContextLogger &_logctx, IRoxieQueryPacket *packet)
+IRoxieSlaveContext *createSlaveContext(const IQueryFactory *_factory, const IRoxieContextLogger &_logctx, IRoxieQueryPacket *packet, bool hasChildren)
 {
-    CSlaveContext *ret = new CSlaveContext(_factory, _logctx, packet, _logctx.queryDebuggerActive());
-    ret->setOptions(_logctx);
-    return ret;
+    return new CSlaveContext(_factory, _logctx, packet, hasChildren);
 }
 
 class CRoxieServerDebugContext : extends CBaseServerDebugContext
@@ -2535,6 +2510,7 @@ class CRoxieServerContext : public CSlaveContext, implements IRoxieServerContext
 
 protected:
     Owned<CRoxieWorkflowMachine> workflow;
+    Owned<ITimeReporter> myTimer;
     mutable MapStringToMyClass<IResolvedFile> fileCache;
     SafeSocket *client;
     bool isBlocked;
@@ -2593,6 +2569,7 @@ protected:
 
         lastSocketCheckTime = startTime;
         lastHeartBeat = startTime;
+        myTimer.setown(createStdTimeReporter());
     }
 
     void startWorkUnit()
@@ -2648,7 +2625,7 @@ protected:
 public:
     IMPLEMENT_IINTERFACE;
 
-    CRoxieServerContext(const IQueryFactory *_factory, const ContextLogger &_logctx)
+    CRoxieServerContext(const IQueryFactory *_factory, const IRoxieContextLogger &_logctx)
         : CSlaveContext(_factory, _logctx, NULL, false), serverQueryFactory(_factory)
     {
         init();
@@ -2799,6 +2776,7 @@ public:
 
     virtual void process()
     {
+        MTIME_SECTION(myTimer, "Process");
         EclProcessFactory pf = (EclProcessFactory) factory->queryDll()->getEntry("createProcess");
         Owned<IEclProcess> p = pf();
         try
@@ -2852,9 +2830,9 @@ public:
             WorkunitUpdate w(&workUnit->lock());
             w->setState(aborted ? WUStateAborted : (failed ? WUStateFailed : WUStateCompleted));
             addTimeStamp(w, SSTglobal, NULL, StWhenQueryFinished);
-            ITimeReporter *timer = logctx.queryTimer();
-            updateWorkunitTimings(w, timer);
-            logctx.dumpStats(w);
+            updateWorkunitTimings(w, myTimer);
+            Owned<IStatisticGatherer> gatherer = createGlobalStatisticGatherer(w);
+            logctx.queryStats().recordStatistics(*gatherer);
 
             WuStatisticTarget statsTarget(w, "roxie");
             rowManager->reportPeakStatistics(statsTarget, 0);
@@ -3868,7 +3846,7 @@ IRoxieServerContext *createRoxieServerContext(IPropertyTree *context, const IQue
         return new CRoxieServerContext(context, factory, client, isXml ? MarkupFmt_XML : MarkupFmt_Unknown, isRaw, isBlocked, httpHelper, trim, _logctx, readFlags, querySetName);
 }
 
-IRoxieServerContext *createOnceServerContext(const IQueryFactory *factory, const ContextLogger &_logctx)
+IRoxieServerContext *createOnceServerContext(const IQueryFactory *factory, const IRoxieContextLogger &_logctx)
 {
     return new CRoxieServerContext(factory, _logctx);
 }

+ 2 - 2
roxie/ccd/ccdcontext.hpp

@@ -105,9 +105,9 @@ typedef IEclProcess* (* EclProcessFactory)();
 class CRoxieWorkflowMachine;
 
 extern IDeserializedResultStore *createDeserializedResultStore();
-extern IRoxieSlaveContext *createSlaveContext(const IQueryFactory *factory, const SlaveContextLogger &logctx, IRoxieQueryPacket *packet);
+extern IRoxieSlaveContext *createSlaveContext(const IQueryFactory *factory, const IRoxieContextLogger &logctx, IRoxieQueryPacket *packet, bool hasRemoteChildren);
 extern IRoxieServerContext *createRoxieServerContext(IPropertyTree *context, const IQueryFactory *factory, SafeSocket &client, bool isXml, bool isRaw, bool isBlocked, HttpHelper &httpHelper, bool trim, const ContextLogger &logctx, PTreeReaderOptions xmlReadFlags, const char *querySetName);
-extern IRoxieServerContext *createOnceServerContext(const IQueryFactory *factory, const ContextLogger &_logctx);
+extern IRoxieServerContext *createOnceServerContext(const IQueryFactory *factory, const IRoxieContextLogger &_logctx);
 extern IRoxieServerContext *createWorkUnitServerContext(IConstWorkUnit *wu, const IQueryFactory *factory, const ContextLogger &logctx);
 extern CRoxieWorkflowMachine *createRoxieWorkflowMachine(IPropertyTree *_workflowInfo, IConstWorkUnit *wu, bool doOnce, const IRoxieContextLogger &_logctx);
 

+ 1 - 6
roxie/ccd/ccdlistener.cpp

@@ -1205,10 +1205,7 @@ public:
             Owned<IRoxieServerContext> ctx = queryFactory->createContext(wu, logctx);
             try
             {
-                {
-                    MTIME_SECTION(logctx.queryTimer(), "Process");
-                    ctx->process();
-                }
+                ctx->process();
                 memused = ctx->getMemoryUsage();
                 slavesReplyLen = ctx->getSlavesReplyLen();
                 ctx->done(false);
@@ -1247,7 +1244,6 @@ public:
             logctx.getStats(s);
             logctx.CTXLOG("COMPLETE: %s complete in %d msecs memory=%d Mb priority=%d slavesreply=%d%s", wuid.get(), elapsed, memused, priority, slavesReplyLen, s.str());
         }
-        logctx.flush(true, false);
     }
 
 private:
@@ -1893,7 +1889,6 @@ readAnother:
         }
         else
         {
-            logctx.flush(true, false);
             try
             {
                 if (client && !isHTTP && !isStatus)

+ 11 - 11
roxie/ccd/ccdquery.cpp

@@ -218,14 +218,14 @@ public:
         return *onceResultStore;
     }
 
-    virtual IPropertyTree &queryOnceContext(const IQueryFactory *factory, const ContextLogger &logctx) const
+    virtual IPropertyTree &queryOnceContext(const IQueryFactory *factory, const IRoxieContextLogger &logctx) const
     {
         checkOnceDone(factory, logctx);
         assertex(onceContext != NULL);
         return *onceContext;
     }
 
-    virtual void checkOnceDone(const IQueryFactory *factory, const ContextLogger &logctx) const
+    virtual void checkOnceDone(const IQueryFactory *factory, const IRoxieContextLogger &logctx) const
     {
         CriticalBlock b(onceCrit);
         if (!onceContext)
@@ -433,11 +433,11 @@ void QueryOptions::updateFromContext(bool &value, const IPropertyTree *ctx, cons
         value = ctx->getPropBool(name);
 }
 
-void QueryOptions::setFromSlaveContextLogger(const SlaveContextLogger &logctx)
+void QueryOptions::setFromSlaveLoggingFlags(unsigned loggingFlags)
 {
     // MORE - priority/timelimit ?
-    checkingHeap = logctx.queryCheckingHeap();
-    traceActivityTimes = logctx.queryTraceActivityTimes();
+    checkingHeap = (loggingFlags & LOGGING_CHECKINGHEAP) != 0;
+    traceActivityTimes = (loggingFlags & LOGGING_TIMEACTIVITIES) != 0;
 }
 
 //----------------------------------------------------------------------------------------------
@@ -1226,7 +1226,7 @@ public:
         return sharedOnceContext->queryOnceResultStore();
     }
 
-    virtual IPropertyTree &queryOnceContext(const ContextLogger &logctx) const
+    virtual IPropertyTree &queryOnceContext(const IRoxieContextLogger &logctx) const
     {
         assertex(sharedOnceContext);
         return sharedOnceContext->queryOnceContext(this, logctx);
@@ -1425,7 +1425,7 @@ public:
     {
         return package;
     }
-    virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const ContextLogger &logctx) const
+    virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const IRoxieContextLogger &logctx) const
     {
         throwUnexpected();  // only on server...
     }
@@ -1448,7 +1448,7 @@ public:
         return strdup(result ? result : defaultValue);
     }
 
-    virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
+    virtual IRoxieSlaveContext *createSlaveContext(const IRoxieContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const
     {
         throwUnexpected();   // only implemented in derived slave class
     }
@@ -1589,7 +1589,7 @@ public:
         return createWorkUnitServerContext(wu, this, _logctx);
     }
 
-    virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const ContextLogger &logctx) const
+    virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const IRoxieContextLogger &logctx) const
     {
         IPropertyTree *workflow = queryWorkflowTree();
         if (workflow)
@@ -1845,9 +1845,9 @@ public:
     {
     }
 
-    virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
+    virtual IRoxieSlaveContext *createSlaveContext(const IRoxieContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const
     {
-        return ::createSlaveContext(this, logctx, packet);
+        return ::createSlaveContext(this, logctx, packet, hasChildren);
     }
 
     virtual ActivityArray *loadGraph(IPropertyTree &graph, const char *graphName)

+ 8 - 9
roxie/ccd/ccdquery.hpp

@@ -74,9 +74,9 @@ class CRoxieWorkflowMachine;
 
 interface ISharedOnceContext : extends IInterface
 {
-    virtual IPropertyTree &queryOnceContext(const IQueryFactory *queryFactory, const ContextLogger &_logctx) const = 0;
+    virtual IPropertyTree &queryOnceContext(const IQueryFactory *queryFactory, const IRoxieContextLogger &_logctx) const = 0;
     virtual IDeserializedResultStore &queryOnceResultStore() const = 0;
-    virtual void checkOnceDone(const IQueryFactory *queryFactory, const ContextLogger &_logctx) const = 0;
+    virtual void checkOnceDone(const IQueryFactory *queryFactory, const IRoxieContextLogger &_logctx) const = 0;
 };
 
 //----------------------------------------------------------------------------------------------
@@ -92,7 +92,7 @@ public:
 
     void setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo);
     void setFromContext(const IPropertyTree *ctx);
-    void setFromSlaveContextLogger(const SlaveContextLogger &logctx);
+    void setFromSlaveLoggingFlags(unsigned loggingFlags);
 
 
     unsigned priority;
@@ -130,7 +130,7 @@ private:
 
 interface IQueryFactory : extends IInterface
 {
-    virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const = 0;
+    virtual IRoxieSlaveContext *createSlaveContext(const IRoxieContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const = 0;
     virtual IActivityGraph *lookupGraph(const char *name, IProbeManager *probeManager, const IRoxieContextLogger &logctx, IRoxieServerActivity *parentActivity) const = 0;
     virtual ISlaveActivityFactory *getSlaveActivityFactory(unsigned id) const = 0;
     virtual IRoxieServerActivityFactory *getRoxieServerActivityFactory(unsigned id) const = 0;
@@ -151,13 +151,13 @@ interface IQueryFactory : extends IInterface
     virtual IConstWorkUnit *queryWorkUnit() const = 0;
     virtual ISharedOnceContext *querySharedOnceContext() const = 0;
     virtual IDeserializedResultStore &queryOnceResultStore() const = 0;
-    virtual IPropertyTree &queryOnceContext(const ContextLogger &logctx) const = 0;
+    virtual IPropertyTree &queryOnceContext(const IRoxieContextLogger &logctx) const = 0;
 
     virtual const IRoxiePackage &queryPackage() const = 0;
     virtual void getActivityMetrics(StringBuffer &reply) const = 0;
 
     virtual IPropertyTree *cloneQueryXGMML() const = 0;
-    virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const ContextLogger &logctx) const = 0;
+    virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const IRoxieContextLogger &logctx) const = 0;
     virtual char *getEnv(const char *name, const char *defaultValue) const = 0;
 
     virtual IRoxieServerContext *createContext(IPropertyTree *xml, SafeSocket &client, TextMarkupFormat mlFmt, bool isRaw, bool isBlocked, HttpHelper &httpHelper, bool trim, const ContextLogger &_logctx, PTreeReaderOptions xmlReadFlags, const char *querySetName) const = 0;
@@ -243,10 +243,9 @@ public:
     virtual IQueryFactory &queryQueryFactory() const { return queryFactory; }
     virtual ThorActivityKind getKind() const { return kind; }
 
-    virtual void noteStatistics(const CRuntimeStatisticCollection &fromStats)
+    virtual void mergeStats(const CRuntimeStatisticCollection &from) const
     {
-        // Merge in the stats from this instance
-        mystats.merge(fromStats);
+        mystats.merge(from);
     }
 
     virtual void getEdgeProgressInfo(unsigned idx, IPropertyTree &edge) const

+ 41 - 47
roxie/ccd/ccdqueue.cpp

@@ -162,7 +162,7 @@ size32_t channelWrite(unsigned channel, void const* buf, size32_t size)
     return minwrote;
 }
 
-#define TEST_SLAVE_FAILURE
+// #define TEST_SLAVE_FAILURE
 
 //============================================================================================
 
@@ -467,6 +467,7 @@ void SlaveContextLogger::set(IRoxieQueryPacket *packet)
     debuggerActive = false;
     checkingHeap = false;
     traceActivityTimes = false;
+    stats.reset();
     start = msTick();
     if (packet)
     {
@@ -532,47 +533,39 @@ void SlaveContextLogger::set(IRoxieQueryPacket *packet)
     }
 }
 
-void SlaveContextLogger::flush(bool closing, bool aborted) const
+void SlaveContextLogger::flush()
 {
     if (output)
     {
         CriticalBlock b(crit);
-        if (aborted)
-            output->abort();
-        else
+        if (mergeSlaveStatistics)
         {
-            if (closing && mergeSlaveStatistics)
-            {
-                MemoryBuffer buf;
-                buf.append((char) LOG_STATVALUES); // A special log entry for the stats
-                stats.serialize(buf);
-                unsigned len = buf.length();
-                void *ret = output->getBuffer(len, true);
-                memcpy(ret, buf.toByteArray(), len);
-                output->putBuffer(ret, len, true);
-                anyOutput = true;
-            }
-            ForEachItemIn(idx, log) // typically just one, as currently coded...
+            MemoryBuffer buf;
+            buf.append((char) LOG_STATVALUES); // A special log entry for the stats
+            if (stats.serialize(buf))
             {
-                MemoryBuffer buf;
-                LogItem &logItem = log.item(idx);
-                logItem.serialize(buf);
                 unsigned len = buf.length();
                 void *ret = output->getBuffer(len, true);
                 memcpy(ret, buf.toByteArray(), len);
                 output->putBuffer(ret, len, true);
                 anyOutput = true;
             }
-            log.kill();
-            if (closing)
-            {
-                if (anyOutput)
-                    output->flush(true);
-                else
-                    output->abort();
-                output.clear();
-            }
         }
+        ForEachItemIn(idx, log)
+        {
+            MemoryBuffer buf;
+            LogItem &logItem = log.item(idx);
+            logItem.serialize(buf);
+            unsigned len = buf.length();
+            void *ret = output->getBuffer(len, true);
+            memcpy(ret, buf.toByteArray(), len);
+            output->putBuffer(ret, len, true);
+            anyOutput = true;
+        }
+        log.kill();
+        if (anyOutput)
+            output->flush(true);
+         output.clear();
     }
 }
 
@@ -705,16 +698,6 @@ public:
     void enqueue(IRoxieQueryPacket *x)
     {
         {
-#ifdef _DEBUG
-            RoxiePacketHeader &header = x->queryHeader();
-            if (traceLevel > 10)
-            {
-                StringBuffer xx;
-                SlaveContextLogger l(x);
-                l.CTXLOG("enqueued %s", header.toString(xx).str());
-                l.flush(true, false);
-            }
-#endif
             CriticalBlock qc(qcrit);
 #ifdef TIME_PACKETS
             header.tick = msTick();
@@ -1118,8 +1101,8 @@ public:
             Owned <ISlaveActivityFactory> factory = queryFactory->getSlaveActivityFactory(activityId);
             assertex(factory);
             setActivity(factory->createActivity(logctx, packet));
-            bool skip = false;
 #ifdef TEST_SLAVE_FAILURE
+            bool skip = false;
             if (testSlaveFailure) 
             {
                 // Meaning of each byte in testSlaveFailure
@@ -1163,15 +1146,26 @@ public:
                     }
                 }
             }
-#endif
-            if (!skip && activity->process())
-                atomic_inc(&activitiesCompleted);
-            factory->noteStatistics(logctx.queryStats());
-            if (logctx.queryTraceLevel() > 5)
+            if (!skip)
             {
-                StringBuffer x;
-                logctx.CTXLOG("done processing %s", header.toString(x).str());
+#endif
+                Owned<IMessagePacker> output = activity->process();
+                if (logctx.queryTraceLevel() > 5)
+                {
+                    StringBuffer x;
+                    logctx.CTXLOG("done processing %s", header.toString(x).str());
+                }
+                if (output)
+                {
+                    atomic_inc(&activitiesCompleted);
+                    busy = false; // Keep order - before setActivity below
+                    setActivity(NULL);  // Ensures all stats are merged from child queries etc
+                    logctx.flush();
+                    output->flush(true);
+                }
+#ifdef TEST_SLAVE_FAILURE
             }
+#endif
         }
         catch (IUserException *E)
         {

+ 0 - 1
roxie/ccd/ccdqueue.ipp

@@ -103,7 +103,6 @@ public:
     }
 
     virtual void flush(bool last_message) { data.setLength(lastput); }
-    virtual void abort() {}
     virtual void sendMetaInfo(const void *buf, unsigned len) { throwUnexpected(); }
     virtual unsigned size() const { return lastput; }
 };

+ 30 - 71
roxie/ccd/ccdserver.cpp

@@ -242,12 +242,13 @@ public:
     {
         ctx->noteStatistic(kind, value);
     }
-    virtual void CTXLOG(const char *format, ...) const
+    virtual void mergeStats(const CRuntimeStatisticCollection &from) const
     {
-        va_list args;
-        va_start(args, format);
-        ctx->CTXLOGva(format, args);
-        va_end(args);
+        ctx->mergeStats(from);
+    }
+    virtual const CRuntimeStatisticCollection &queryStats() const
+    {
+        return ctx->queryStats();
     }
     virtual void CTXLOGva(const char *format, va_list args) const
     {
@@ -257,24 +258,10 @@ public:
     {
         ctx->CTXLOGa(category, prefix, text);
     }
-    virtual void logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const
-    {
-        va_list args;
-        va_start(args, format);
-        ctx->logOperatorExceptionVA(E, file, line, format, args);
-        va_end(args);
-    }
     virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const
     {
         ctx->logOperatorExceptionVA(E, file, line, format, args);
     }
-    virtual void CTXLOGae(IException *E, const char *file, unsigned line, const char *prefix, const char *format, ...) const
-    {
-        va_list args;
-        va_start(args, format);
-        ctx->CTXLOGaeva(E, file, line, prefix, format, args);
-        va_end(args);
-    }
     virtual void CTXLOGaeva(IException *E, const char *file, unsigned line, const char *prefix, const char *format, va_list args) const
     {
         ctx->CTXLOGaeva(E, file, line, prefix, format, args);
@@ -367,6 +354,10 @@ public:
     {
         return ctx->getWorkunitRowReader(wuid, name, sequence, xmlTransformer, rowAllocator, isGrouped);
     }
+    virtual void mergeStats(const CRuntimeStatisticCollection &from)
+    {
+        ctx->mergeStats(from);
+    }
 protected:
     IRoxieSlaveContext * ctx;
 };
@@ -468,6 +459,11 @@ public:
         throwUnexpected();
     }
 
+    virtual void mergeStats(const CRuntimeStatisticCollection &from) const
+    {
+        CActivityFactory::mergeStats(from);
+    }
+
     virtual void noteProcessed(unsigned idx, unsigned _processed, unsigned __int64 _totalCycles, unsigned __int64 _localCycles) const
     {
         if (_processed || _totalCycles || _localCycles)
@@ -596,11 +592,6 @@ public:
         throwUnexpected(); // only implemented by index-related subclasses
     }
 
-    virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const
-    {
-        mystats.addStatistic(kind, value);
-    }
-
     virtual void getXrefInfo(IPropertyTree &reply, const IRoxieContextLogger &logctx) const
     {
         // Most activities have nothing to say...
@@ -949,9 +940,15 @@ public:
             state = STATEreset;  // bit pointless but there you go... 
         }
         if (factory && !debugging)
+        {
             factory->noteProcessed(0, processed, totalCycles, localCycles);
+            factory->mergeStats(stats);
+        }
         if (ctx)
+        {
             ctx->noteProcessed(*this, this, 0, processed, totalCycles, localCycles);
+            ctx->mergeStats(stats);
+        }
         basehelper.Release();
         ::Release(rowAllocator);
     }
@@ -973,22 +970,6 @@ public:
     }
 
     // MORE - most of this is copied from ccd.hpp - can't we refactor?
-    virtual void CTXLOG(const char *format, ...) const
-    {
-        va_list args;
-        va_start(args, format);
-        CTXLOGva(format, args);
-        va_end(args);
-    }
-
-    virtual void CTXLOGva(const char *format, va_list args) const
-    {
-        StringBuffer text, prefix;
-        getLogPrefix(prefix);
-        text.valist_appendf(format, args);
-        CTXLOGa(LOG_TRACING, prefix.str(), text.str());
-    }
-
     virtual void CTXLOGa(TracingCategory category, const char *prefix, const char *text) const
     {
         if (ctx)
@@ -997,31 +978,6 @@ public:
             DBGLOG("[%s] %s", prefix, text);
     }
 
-    virtual void logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const
-    {
-        va_list args;
-        va_start(args, format);
-        StringBuffer prefix;
-        getLogPrefix(prefix);
-        CTXLOGaeva(E, file, line, prefix.str(), format, args);
-        va_end(args);
-    }
-
-    virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const
-    {
-        StringBuffer prefix;
-        getLogPrefix(prefix);
-        CTXLOGaeva(E, file, line, prefix.str(), format, args);
-    }
-
-    virtual void CTXLOGae(IException *E, const char *file, unsigned line, const char *prefix, const char *format, ...) const
-    {
-        va_list args;
-        va_start(args, format);
-        CTXLOGaeva(E, file, line, prefix, format, args);
-        va_end(args);
-    }
-
     virtual void CTXLOGaeva(IException *E, const char *file, unsigned line, const char *prefix, const char *format, va_list args) const
     {
         if (ctx)
@@ -1054,15 +1010,18 @@ public:
             log->Release(); // Should never happen
         }
     }
-
     virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const
     {
-        if (factory)
-            factory->noteStatistic(kind, value);
-        if (ctx)
-            ctx->noteStatistic(kind, value);
         stats.addStatistic(kind, value);
     }
+    virtual void mergeStats(const CRuntimeStatisticCollection &from) const
+    {
+        stats.merge(from);
+    }
+    virtual const CRuntimeStatisticCollection &queryStats() const
+    {
+        return stats;
+    }
 
     virtual StringBuffer &getLogPrefix(StringBuffer &ret) const
     {
@@ -26538,7 +26497,7 @@ protected:
         queryDll.setown(createExeQueryDll("roxie"));
         stateInfo.setown(createPTreeFromXMLString("<test memoryLimit='50000000'/>"));
         queryFactory.setown(createServerQueryFactory("test", queryDll.getLink(), *package, stateInfo, false, false));
-        ctx.setown(createSlaveContext(queryFactory, logctx, NULL));
+        ctx.setown(createSlaveContext(queryFactory, logctx, NULL, false));
         queryActiveTimer()->reset();
     }
 

+ 0 - 1
roxie/ccd/ccdserver.hpp

@@ -211,7 +211,6 @@ interface IRoxieServerActivityFactory : extends IActivityFactory
     virtual bool isGraphInvariant() const = 0;
     virtual IRoxieServerSideCache *queryServerSideCache() const = 0;
     virtual IDefRecordMeta *queryActivityMeta() const = 0;
-    virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const = 0;
     virtual unsigned numInputs() const = 0;
 };
 interface IGraphResult : public IInterface

+ 0 - 1
roxie/udplib/udplib.hpp

@@ -48,7 +48,6 @@ interface IMessagePacker : extends IInterface
     virtual void putBuffer(const void *buf, unsigned len, bool variable) = 0;
     virtual void flush(bool last_message = false) = 0;
     virtual bool dataQueued() = 0;
-    virtual void abort() = 0;
     virtual void sendMetaInfo(const void *buf, unsigned len) = 0;
 
     virtual unsigned size() const = 0;  // Total amount written via putBuffer plus any overhead from record length prefixes

+ 0 - 8
roxie/udplib/udptrs.cpp

@@ -455,7 +455,6 @@ class CSendManager : public CInterface, implements ISendManager
         unsigned        requested_size;
         MemoryBuffer    metaInfo;
         bool            last_message_done;
-        bool            aborted;
         int             queue_number;
         
     public:
@@ -476,7 +475,6 @@ class CSendManager : public CInterface, implements ISendManager
             package_header.msgSeq = _msgSeq;
             package_header.udpSequence = 0; // these are allocated when transmitted
             
-            aborted = false;
             packed_request = false;
             part_buffer = bufferManager->allocate();
             data_buffer_size = DATA_PAYLOAD - sizeof(UdpPacketHeader);
@@ -507,11 +505,6 @@ class CSendManager : public CInterface, implements ISendManager
             }
         }
 
-        virtual void abort() 
-        {
-            aborted = true;
-        }
-        
         virtual void *getBuffer(unsigned len, bool variable) 
         {
             if (variable)
@@ -591,7 +584,6 @@ class CSendManager : public CInterface, implements ISendManager
 
         virtual void flush(bool last_msg = false) 
         {
-            assert(!aborted);
             if (!last_message_done && last_msg) 
             {
                 last_message_done = true;

+ 20 - 25
system/jlib/jlog.cpp

@@ -2540,6 +2540,24 @@ void SysLogMsgHandler::addToPTree(IPropertyTree * tree) const
     tree->addPropTree("handler", handlerTree);
 }
 
+// Default implementations of the functions in IContextLogger interface
+
+void IContextLogger::CTXLOG(const char *format, ...) const
+{
+    va_list args;
+    va_start(args, format);
+    CTXLOGva(format, args);
+    va_end(args);
+}
+
+void IContextLogger::logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const
+{
+    va_list args;
+    va_start(args, format);
+    logOperatorExceptionVA(E, file, line, format, args);
+    va_end(args);
+}
+
 class DummyLogCtx : implements IContextLogger
 {
 public:
@@ -2547,34 +2565,12 @@ public:
     virtual void Link() const {}
     virtual bool Release() const { return false; }
 
-    virtual void CTXLOG(const char *format, ...) const
-    {
-        va_list args;
-        va_start(args, format);
-        CTXLOGva(format, args);
-        va_end(args);
-    }
     virtual void CTXLOGva(const char *format, va_list args) const
     {
         StringBuffer ss;
         ss.valist_appendf(format, args);
         DBGLOG("%s", ss.str());
     }
-    virtual void CTXLOGa(unsigned activityId, const char *text) const
-    {
-        DBGLOG("[%d] %s", activityId, text);
-    }
-    virtual StringBuffer &getLogPrefix(StringBuffer &ret) const 
-    {
-        return ret;
-    }
-    virtual void logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const
-    {
-        va_list args;
-        va_start(args, format);
-        logOperatorExceptionVA(E, file, line, format, args);
-        va_end(args);
-    }
     virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const
     {
         StringBuffer ss;
@@ -2589,11 +2585,10 @@ public:
             ss.append(": ").valist_appendf(format, args);
         LOG(MCoperatorProgress, unknownJob, "%s", ss.str());
     }
-    virtual bool isIntercepted() const
+    virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const
     {
-        return false;
     }
-    virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const
+    virtual void mergeStats(const CRuntimeStatisticCollection &from) const
     {
     }
     virtual unsigned queryTraceLevel() const

+ 3 - 2
system/jlib/jlog.hpp

@@ -999,11 +999,12 @@ extern jlib_decl void AuditSystemAccess(const char *userid, bool success, char c
 
 interface IContextLogger : extends IInterface
 {
-    virtual void CTXLOG(const char *format, ...) const  __attribute__((format(printf, 2, 3))) = 0;
+    void CTXLOG(const char *format, ...) const  __attribute__((format(printf, 2, 3)));
     virtual void CTXLOGva(const char *format, va_list args) const = 0;
-    virtual void logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const  __attribute__((format(printf, 5, 6))) = 0;
+    void logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const  __attribute__((format(printf, 5, 6)));
     virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const = 0;
     virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const = 0;
+    virtual void mergeStats(const CRuntimeStatisticCollection &from) const = 0;
     virtual unsigned queryTraceLevel() const = 0;
 };
 

+ 2 - 2
system/jlib/jstats.cpp

@@ -1411,7 +1411,7 @@ void CRuntimeStatisticCollection::deserializeMerge(MemoryBuffer& in)
     }
 }
 
-MemoryBuffer& CRuntimeStatisticCollection::serialize(MemoryBuffer& out) const
+bool CRuntimeStatisticCollection::serialize(MemoryBuffer& out) const
 {
     unsigned numValid = 0;
     ForEachItem(i1)
@@ -1430,7 +1430,7 @@ MemoryBuffer& CRuntimeStatisticCollection::serialize(MemoryBuffer& out) const
             out.append(value);
         }
     }
-    return out;
+    return numValid != 0;
 }
 
 //---------------------------------------------------

+ 6 - 3
system/jlib/jstats.h

@@ -429,7 +429,7 @@ extern const jlib_decl StatisticsMapping allStatistics;
 
 //---------------------------------------------------------------------------------------------------------------------
 
-//MORE: We probably want to have functions that peform the atomic equivalents
+//MORE: We probably want to have functions that perform the atomic equivalents
 class CRuntimeStatistic
 {
 public:
@@ -487,6 +487,10 @@ public:
     {
         queryStatistic(kind).add(value);
     }
+    void addStatisticAtomic(StatisticKind kind, unsigned __int64 value)
+    {
+        queryStatistic(kind).addAtomic(value);
+    }
     void mergeStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction)
     {
         queryStatistic(kind).merge(value, mergeAction);
@@ -504,7 +508,6 @@ public:
         unsigned num = mapping.numStatistics();
         for (unsigned i = 0; i <= num; i++)
             values[i].clear();
-        memset(values, 0, sizeof(unsigned __int64) * num);
     }
 
     inline const StatisticsMapping & queryMapping() const { return mapping; };
@@ -523,7 +526,7 @@ public:
     // Print out collected stats to string as XML
     StringBuffer &toXML(StringBuffer &str) const;
     // Serialize/deserialize
-    MemoryBuffer &serialize(MemoryBuffer & out) const;
+    bool serialize(MemoryBuffer & out) const;  // Returns true if any non-zero
     void deserialize(MemoryBuffer & in);
     void deserializeMerge(MemoryBuffer& in);
 protected:

+ 3 - 14
thorlcr/graph/thgraph.cpp

@@ -2427,26 +2427,12 @@ public:
     {
         traceLevel = 1;
     }
-    virtual void CTXLOG(const char *format, ...) const
-    {
-        va_list args;
-        va_start(args, format);
-        CTXLOGva(format, args);
-        va_end(args);
-    }
     virtual void CTXLOGva(const char *format, va_list args) const
     {
         StringBuffer ss;
         ss.valist_appendf(format, args);
         LOG(MCdebugProgress, thorJob, "%s", ss.str());
     }
-    virtual void logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const
-    {
-        va_list args;
-        va_start(args, format);
-        logOperatorExceptionVA(E, file, line, format, args);
-        va_end(args);
-    }
     virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const
     {
         StringBuffer ss;
@@ -2464,6 +2450,9 @@ public:
     virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const
     {
     }
+    virtual void mergeStats(const CRuntimeStatisticCollection &from) const
+    {
+    }
     virtual unsigned queryTraceLevel() const
     {
         return traceLevel;