瀏覽代碼

HPCC-12282 Move roxie query stats to use new mechanism

Fix serialization of child activity stats from slaves

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 年之前
父節點
當前提交
2d172da101

+ 2 - 0
common/thorhelper/roxiehelper.ipp

@@ -58,6 +58,8 @@ enum TracingCategory
     LOG_TRACING,
     LOG_ERROR,
     LOG_STATVALUES,
+    LOG_CHILDSTATS,
+    LOG_CHILDCOUNT,
 };
 
 class LogItem;

+ 88 - 79
roxie/ccd/ccd.hpp

@@ -512,6 +512,8 @@ public:
         case LOG_TRACING: return "TRACE";
         case LOG_ERROR: return "ERROR";
         case LOG_STATVALUES: return "STATVALUES";
+        case LOG_CHILDSTATS: return "CHILDSTATS";
+        case LOG_CHILDCOUNT: return "CHILDCOUNT";
         default: return "UNKNOWN";
         }
     }
@@ -545,84 +547,6 @@ public:
 
 };
 
-// Used as a base class by classes that want to intercept statistics but pass on other logging (i.e. slave or server activity classes)
-// Note that the stats are upmerged on destruction
-// Also note that we assume single-threaded access to stats.
-
-class IndirectContextLogger : public CInterface, implements IRoxieContextLogger
-{
-protected:
-    const IRoxieContextLogger *logctx;
-    bool aborted;
-    mutable CRuntimeStatisticCollection stats;
-
-public:
-    IMPLEMENT_IINTERFACE;
-
-    IndirectContextLogger(const IRoxieContextLogger *_logctx, const StatisticsMapping & _statsMapping)
-    : logctx(_logctx), stats(_statsMapping)
-    {
-        aborted = false;
-    }
-    ~IndirectContextLogger()
-    {
-        if (logctx)
-            logctx->mergeStats(stats);
-    }
-    inline void abort()
-    {
-        aborted = true;
-    }
-    virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const
-    {
-        if (aborted)
-            throw MakeStringException(ROXIE_ABORT_ERROR, "Roxie server requested abort for running activity");
-        stats.addStatistic(kind, value);
-    }
-    virtual void mergeStats(const CRuntimeStatisticCollection &from) const
-    {
-        stats.merge(from);
-    }
-    virtual const CRuntimeStatisticCollection &queryStats() const
-    {
-        return stats;
-    }
-
-    // The rest are passthroughs
-
-    virtual unsigned queryTraceLevel() const
-    {
-        return logctx ? logctx->queryTraceLevel() : traceLevel;
-    }
-    virtual StringBuffer &getLogPrefix(StringBuffer &ret) const
-    {
-        return logctx ? logctx->getLogPrefix(ret) : ret;
-    }
-    virtual bool isIntercepted() const
-    {
-        return logctx ? logctx->isIntercepted() : false;
-    }
-    virtual void CTXLOGa(TracingCategory category, const char *prefix, const char *text) const
-    {
-        if (logctx)
-            logctx->CTXLOGa(category, prefix, text);
-    }
-    virtual void CTXLOGaeva(IException *E, const char *file, unsigned line, const char *prefix, const char *format, va_list args) const
-    {
-        if (logctx)
-            logctx->CTXLOGaeva(E, file, line, prefix, format, args);
-    }
-    virtual void CTXLOGl(LogItem *log) const
-    {
-        if (logctx)
-            logctx->CTXLOGl(log);
-    }
-    virtual bool isBlind() const
-    {
-        return logctx ? logctx->isBlind() : blindLogging;
-    }
-};
-
 // MORE - this code probably should be commoned up with some of the new stats code
 extern void putStatsValue(IPropertyTree *node, const char *statName, const char *statType, unsigned __int64 val);
 extern void putStatsValue(StringBuffer &reply, const char *statName, const char *statType, unsigned __int64 val);
@@ -784,7 +708,7 @@ public:
 class SlaveContextLogger : public StringContextLogger
 {
     Owned<IMessagePacker> output;
-    bool anyOutput;
+    mutable bool anyOutput; // messy
     bool debuggerActive;
     bool checkingHeap;
     IpAddress ip;
@@ -793,6 +717,8 @@ public:
     SlaveContextLogger();
     SlaveContextLogger(IRoxieQueryPacket *packet);
     void set(IRoxieQueryPacket *packet);
+    void putStatProcessed(unsigned subGraphId, unsigned actId, unsigned idx, unsigned processed) const;
+    void putStats(unsigned subGraphId, unsigned actId, const CRuntimeStatisticCollection &stats) const;
     void flush();
     inline bool queryDebuggerActive() const { return debuggerActive; }
     inline const CRuntimeStatisticCollection &queryStats() const
@@ -804,4 +730,87 @@ public:
         return wuid.get();
     }
 };
+
+// Used as a base class by classes that want to intercept statistics but pass on other logging (i.e. slave or server activity classes)
+// Note that the stats are upmerged on destruction
+// Also note that we assume single-threaded access to stats.
+
+class IndirectContextLogger : public CInterface, implements IRoxieContextLogger
+{
+protected:
+    SlaveContextLogger *logctx;
+    bool aborted;
+    mutable CRuntimeStatisticCollection stats;
+
+public:
+    IMPLEMENT_IINTERFACE;
+
+    IndirectContextLogger(SlaveContextLogger *_logctx, const StatisticsMapping & _statsMapping)
+    : logctx(_logctx), stats(_statsMapping)
+    {
+        aborted = false;
+    }
+    ~IndirectContextLogger()
+    {
+        if (logctx)
+            logctx->mergeStats(stats);
+    }
+    inline SlaveContextLogger & querySlaveLogContext()
+    {
+        return *logctx;
+    }
+    inline void abort()
+    {
+        aborted = true;
+    }
+    virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const
+    {
+        if (aborted)
+            throw MakeStringException(ROXIE_ABORT_ERROR, "Roxie server requested abort for running activity");
+        stats.addStatistic(kind, value);
+    }
+    virtual void mergeStats(const CRuntimeStatisticCollection &from) const
+    {
+        stats.merge(from);
+    }
+    virtual const CRuntimeStatisticCollection &queryStats() const
+    {
+        return stats;
+    }
+
+    // The rest are passthroughs
+
+    virtual unsigned queryTraceLevel() const
+    {
+        return logctx ? logctx->queryTraceLevel() : traceLevel;
+    }
+    virtual StringBuffer &getLogPrefix(StringBuffer &ret) const
+    {
+        return logctx ? logctx->getLogPrefix(ret) : ret;
+    }
+    virtual bool isIntercepted() const
+    {
+        return logctx ? logctx->isIntercepted() : false;
+    }
+    virtual void CTXLOGa(TracingCategory category, const char *prefix, const char *text) const
+    {
+        if (logctx)
+            logctx->CTXLOGa(category, prefix, text);
+    }
+    virtual void CTXLOGaeva(IException *E, const char *file, unsigned line, const char *prefix, const char *format, va_list args) const
+    {
+        if (logctx)
+            logctx->CTXLOGaeva(E, file, line, prefix, format, args);
+    }
+    virtual void CTXLOGl(LogItem *log) const
+    {
+        if (logctx)
+            logctx->CTXLOGl(log);
+    }
+    virtual bool isBlind() const
+    {
+        return logctx ? logctx->isBlind() : blindLogging;
+    }
+};
+
 #endif

+ 57 - 63
roxie/ccd/ccdactivities.cpp

@@ -200,7 +200,7 @@ public:
     {
         CActivityFactory::getActivityMetrics(reply);
     }
-    IRoxieSlaveContext *createSlaveContext(const IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+    IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
     {
         return queryFactory.createSlaveContext(logctx, packet, childQueries.length()!=0);
     }
@@ -209,11 +209,10 @@ public:
         if (datafile)
             addXrefFileInfo(reply, datafile);
     }
-    IRoxieSlaveContext *createChildQueries(IHThorArg *colocalArg, IArrayOf<IActivityGraph> &childGraphs, IProbeManager *_probeManager, const IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+    void createChildQueries(IArrayOf<IActivityGraph> &childGraphs, IHThorArg *colocalArg, IProbeManager *_probeManager, IRoxieSlaveContext *queryContext, const SlaveContextLogger &logctx) const
     {
-        if (meta.needsDestruct() || meta.needsSerializeDisk() || childQueries.length())
+        if (childQueries.length())
         {
-            Owned<IRoxieSlaveContext> queryContext = queryFactory.createSlaveContext(logctx, packet, childQueries.length()!=0);
             ForEachItemIn(idx, childQueries)
             {
                 if (!_probeManager) // MORE - the probeAllRows is a hack!
@@ -223,9 +222,7 @@ public:
                 queryContext->noteChildGraph(childQueryIndexes.item(idx), childGraph);
                 childGraph->onCreate(queryContext, colocalArg);             //NB: onCreate() on helper for activities in child graph are delayed, otherwise this would go wrong.
             }
-            return queryContext.getClear();
         }
-        return NULL;
     }
 
     Owned<const IResolvedFile> datafile;
@@ -290,7 +287,7 @@ protected:
 class CRoxieSlaveActivity : public CInterface, implements IRoxieSlaveActivity, implements ICodeContext
 {
 protected:
-    Owned<IndirectContextLogger> logctx;
+    Owned <IndirectContextLogger> logctx;
     Linked<IRoxieQueryPacket> packet;
     mutable Owned<IRoxieSlaveContext> queryContext; // bit of a hack but easier than changing the ICodeContext callback interface to remove const
     const CSlaveActivityFactory *basefactory;
@@ -332,18 +329,15 @@ protected:
 
     virtual void onCreate()
     {
+        queryContext.setown(basefactory->createSlaveContext(logctx->querySlaveLogContext(), packet));
 #ifdef _DEBUG
         // MORE - need to consider debugging....
         if (probeAllRows)
-        {
             probeManager.setown(createProbeManager());
-            queryContext.setown(basefactory->createChildQueries(basehelper, childGraphs, probeManager, *logctx, packet));
-        }
-        else
+        basefactory->createChildQueries(childGraphs, basehelper, probeManager, queryContext, logctx->querySlaveLogContext());
+#else
+        basefactory->createChildQueries(childGraphs, basehelper, NULL, queryContext, logctx->querySlaveLogContext());
 #endif
-            queryContext.setown(basefactory->createChildQueries(basehelper, childGraphs, NULL, *logctx, packet));
-        if (!queryContext)
-            queryContext.setown(basefactory->createSlaveContext(*logctx, packet));
         if (meta.needsSerializeDisk())
             serializer.setown(meta.createDiskSerializer(queryContext->queryCodeContext(), basefactory->queryId()));
         if (needsRowAllocator())
@@ -369,7 +363,7 @@ protected:
     {
     }
 
-    CRoxieSlaveActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_factory)
+    CRoxieSlaveActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_factory)
         : packet(_packet), basefactory(_factory)
     {
         logctx.setown(new IndirectContextLogger(&_logctx, allStatistics));
@@ -842,7 +836,7 @@ protected:
     CriticalSection pcrit;
 
 public:
-    CRoxieDiskReadBaseActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
+    CRoxieDiskReadBaseActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
         IInMemoryIndexManager *_manager, 
         unsigned _parallelPartNo, unsigned _numParallel, bool _forceUnkeyed)
         : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory),
@@ -1040,7 +1034,7 @@ protected:
     IHThorDiskReadArg *helper;
 
 public:
-    CRoxieDiskReadActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
+    CRoxieDiskReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
         IInMemoryIndexManager *_manager)
         : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, 1, false)
     {
@@ -1084,7 +1078,7 @@ protected:
     const IResolvedFile *datafile;
 
 public:
-    CRoxieCsvReadActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory,
+    CRoxieCsvReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory,
                           const CSlaveActivityFactory *_aFactory, IInMemoryIndexManager *_manager, const IResolvedFile *_datafile)
         : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, 1, true), datafile(_datafile)
     {
@@ -1129,7 +1123,7 @@ protected:
     IHThorXmlReadArg *helper;
 
 public:
-    CRoxieXmlReadActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
+    CRoxieXmlReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
         IInMemoryIndexManager *_manager)
         : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, 1, true)
     {
@@ -1171,7 +1165,7 @@ public:
     {
     }
 
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
     {
         return new CRoxieDiskReadActivity(logctx, packet, helperFactory, this, manager);
     }
@@ -1193,7 +1187,7 @@ public:
     {
     }
 
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
     {
         return new CRoxieCsvReadActivity(logctx, packet, helperFactory, this, manager, datafile);
     }
@@ -1215,7 +1209,7 @@ public:
     {
     }
 
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
     {
         return new CRoxieXmlReadActivity(logctx, packet, helperFactory, this, manager);
     }
@@ -1724,7 +1718,7 @@ protected:
     IHThorDiskNormalizeArg *helper;
 
 public:
-    CRoxieDiskNormalizeActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
+    CRoxieDiskNormalizeActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
         IInMemoryIndexManager *_manager)
         : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, 1, false)
     {
@@ -1769,7 +1763,7 @@ public:
     {
     }
 
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
     {
         return new CRoxieDiskNormalizeActivity(logctx, packet, helperFactory, this, manager);
     }
@@ -1968,7 +1962,7 @@ protected:
     IHThorDiskCountArg *helper;
 
 public:
-    CRoxieDiskCountActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
+    CRoxieDiskCountActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
         IInMemoryIndexManager *_manager)
         : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, 1, false)
     {
@@ -2004,7 +1998,7 @@ public:
     {
     }
 
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
     {
         return new CRoxieDiskCountActivity(logctx, packet, helperFactory, this, manager);
     }
@@ -2236,7 +2230,7 @@ protected:
     IHThorDiskAggregateArg *helper;
 
 public:
-    CRoxieDiskAggregateActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
+    CRoxieDiskAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
         IInMemoryIndexManager *_manager,
         unsigned _parallelPartNo, unsigned _numParallel, bool _forceUnkeyed)
         : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, _parallelPartNo, _numParallel, _forceUnkeyed)
@@ -2277,7 +2271,7 @@ protected:
     Owned<IOutputRowDeserializer> deserializer;
 
 public:
-    CParallelRoxieActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_factory, unsigned _numParallel)
+    CParallelRoxieActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_factory, unsigned _numParallel)
         : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _factory), numParallel(_numParallel)
     {
         assertex(numParallel > 1);
@@ -2369,7 +2363,7 @@ protected:
     IHThorDiskAggregateArg *helper;
     OwnedConstRoxieRow finalRow;
 public:
-    CParallelRoxieDiskAggregateActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
+    CParallelRoxieDiskAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
         IInMemoryIndexManager *_manager, unsigned _numParallel) :
         CParallelRoxieActivity(_logctx, _packet, _hFactory, _aFactory, _numParallel)
     {
@@ -2495,7 +2489,7 @@ public:
     {
     }
 
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
     {
         if (parallelAggregate > 1)
             return new CParallelRoxieDiskAggregateActivity(logctx, packet, helperFactory, this, manager, parallelAggregate);
@@ -2715,7 +2709,7 @@ protected:
     }
 
 public:
-    CRoxieDiskGroupAggregateActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
+    CRoxieDiskGroupAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
         IInMemoryIndexManager *_manager,
         unsigned partNo, unsigned numParts, bool _forceUnkeyed)
         : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, partNo, numParts, _forceUnkeyed),
@@ -2761,7 +2755,7 @@ protected:
     Owned<IRowManager> rowManager;
 
 public:
-    CParallelRoxieDiskGroupAggregateActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
+    CParallelRoxieDiskGroupAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
         IInMemoryIndexManager *_manager, unsigned _numParallel) :
         CParallelRoxieActivity(_logctx, _packet, _hFactory, _aFactory, _numParallel),
         helper((IHThorDiskGroupAggregateArg *) basehelper),
@@ -2868,7 +2862,7 @@ public:
     {
     }
 
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
     {
         if (parallelAggregate > 1)
             return new CParallelRoxieDiskGroupAggregateActivity(logctx, packet, helperFactory, this, manager, parallelAggregate);
@@ -3140,7 +3134,7 @@ protected:
         keyArray.setown(varFileInfo->getKeyArray(activityMeta, layoutTranslators, isOpt, packet->queryHeader().channel, allowFieldTranslation));
     }
 
-    CRoxieKeyedActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieKeyedActivityFactory *_aFactory)
+    CRoxieKeyedActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieKeyedActivityFactory *_aFactory)
         : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory), 
         keyArray(_aFactory->queryKeyArray()),
         layoutTranslators(_aFactory->queryLayoutTranslators()),
@@ -3208,7 +3202,7 @@ protected:
     }
 
 public:
-    CRoxieIndexActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory, unsigned _steppingOffset)
+    CRoxieIndexActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory, unsigned _steppingOffset)
         : CRoxieKeyedActivity(_logctx, _packet, _hFactory, _aFactory), 
         factory(_aFactory),
         steppingOffset(_steppingOffset),
@@ -3385,7 +3379,7 @@ protected:
     IHThorCompoundReadExtra * readHelper;
 
 public:
-    CRoxieIndexReadActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory, unsigned _steppingOffset)
+    CRoxieIndexReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory, unsigned _steppingOffset)
         : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, _steppingOffset)
     {
         onCreate();
@@ -3667,7 +3661,7 @@ public:
         }
     }
     
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
     {
         return new CRoxieIndexReadActivity(logctx, packet, helperFactory, this, steppingOffset);
     }
@@ -3692,7 +3686,7 @@ protected:
     IHThorCompoundNormalizeExtra * normalizeHelper;
 
 public:
-    CRoxieIndexNormalizeActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory)
+    CRoxieIndexNormalizeActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory)
         : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, 0) //MORE - stepping?
     {
         onCreate();
@@ -3825,7 +3819,7 @@ public:
         init(helper, graphNode);
     }
 
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
     {
         return new CRoxieIndexNormalizeActivity(logctx, packet, helperFactory, this);
     }
@@ -3853,7 +3847,7 @@ protected:
     unsigned __int64 keyedLimit;
 
 public:
-    CRoxieIndexCountActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory)
+    CRoxieIndexCountActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory)
         : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, 0)
     {
         onCreate();
@@ -3961,7 +3955,7 @@ public:
         init(helper, graphNode);
     }
 
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
     {
         return new CRoxieIndexCountActivity(logctx, packet, helperFactory, this);
     }
@@ -3985,7 +3979,7 @@ protected:
     IHThorCompoundAggregateExtra * aggregateHelper;
 
 public:
-    CRoxieIndexAggregateActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory)
+    CRoxieIndexAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory)
         : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, 0)
     {
         onCreate();
@@ -4059,7 +4053,7 @@ public:
         init(helper, graphNode);
     }
 
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
     {
         return new CRoxieIndexAggregateActivity(logctx, packet, helperFactory, this);
     }
@@ -4087,7 +4081,7 @@ protected:
 
 public:
     IMPLEMENT_IINTERFACE;
-    CRoxieIndexGroupAggregateActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory, ThorActivityKind _kind)
+    CRoxieIndexGroupAggregateActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieIndexActivityFactory *_aFactory, ThorActivityKind _kind)
         : CRoxieIndexActivity(_logctx, _packet, _hFactory, _aFactory, 0),
           aggregateHelper((IHThorIndexGroupAggregateArg *) basehelper),
           results(*aggregateHelper, *aggregateHelper), kind(_kind)
@@ -4241,7 +4235,7 @@ public:
         init(helper, graphNode);
     }
 
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
     {
         return new CRoxieIndexGroupAggregateActivity(logctx, packet, helperFactory, this, kind);
     }
@@ -4281,7 +4275,7 @@ public:
         }
     }
 
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const;
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const;
 
     virtual StringBuffer &toString(StringBuffer &s) const
     {
@@ -4312,7 +4306,7 @@ protected:
     virtual size32_t doFetch(ARowBuilder & rowBuilder, offset_t pos, offset_t rawpos, void *inputData) = 0;
 
 public:
-    CRoxieFetchActivityBase(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory)
+    CRoxieFetchActivityBase(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory)
         : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory), factory(_aFactory)
     {
         helper = (IHThorFetchBaseArg *) basehelper;
@@ -4405,7 +4399,7 @@ class CRoxieFetchActivity : public CRoxieFetchActivityBase
     Owned<IEngineRowAllocator> diskAllocator;
     Owned<IOutputRowDeserializer> rowDeserializer;
 public:
-    CRoxieFetchActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory)
+    CRoxieFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory)
         : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory)
     {
         IHThorFetchContext * fetchContext = static_cast<IHThorFetchContext *>(helper->selectInterface(TAIfetchcontext_1));
@@ -4426,7 +4420,7 @@ public:
     }
 };
 
-IRoxieSlaveActivity *CRoxieFetchActivityFactory::createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+IRoxieSlaveActivity *CRoxieFetchActivityFactory::createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
 {
     return new CRoxieFetchActivity(logctx, packet, helperFactory, this);
 }
@@ -4438,7 +4432,7 @@ class CRoxieCSVFetchActivity : public CRoxieFetchActivityBase
     CSVSplitter csvSplitter;    
 
 public:
-    CRoxieCSVFetchActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory, unsigned _maxColumns)
+    CRoxieCSVFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory, unsigned _maxColumns)
         : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory)
     {
         const char * quotes = NULL;
@@ -4496,7 +4490,7 @@ class CRoxieXMLFetchActivity : public CRoxieFetchActivityBase, implements IXMLSe
 
 public:
     IMPLEMENT_IINTERFACE;
-    CRoxieXMLFetchActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory, unsigned _streamBufferSize)
+    CRoxieXMLFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory, unsigned _streamBufferSize)
         : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory),
           streamBufferSize(_streamBufferSize)
     {
@@ -4559,7 +4553,7 @@ public:
         assertex(!csvInfo->queryEBCDIC());
     }
 
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
     {
         return new CRoxieCSVFetchActivity(logctx, packet, helperFactory, this, maxColumns);
     }
@@ -4573,7 +4567,7 @@ public:
     {
     }
 
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
     {
         return new CRoxieXMLFetchActivity(logctx, packet, helperFactory, this, 4096);
     }
@@ -4626,7 +4620,7 @@ public:
         }
     }
 
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const;
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const;
 
     virtual StringBuffer &toString(StringBuffer &s) const
     {
@@ -4652,7 +4646,7 @@ class CRoxieKeyedJoinIndexActivity : public CRoxieKeyedActivity
     unsigned inputDone;
 
 public:
-    CRoxieKeyedJoinIndexActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieKeyedJoinIndexActivityFactory *_aFactory)
+    CRoxieKeyedJoinIndexActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieKeyedJoinIndexActivityFactory *_aFactory)
         : factory(_aFactory), CRoxieKeyedActivity(_logctx, _packet, _hFactory, _aFactory)
     {
         helper = (IHThorKeyedJoinArg *) basehelper;
@@ -4713,7 +4707,7 @@ public:
             Owned<IRoxieQueryPacket> indexPacket = createRoxiePacket(indexPacketData);
             Owned<ISlaveActivityFactory> indexActivityFactory = factory->queryQueryFactory().getSlaveActivityFactory(indexActivityId.activityId);
             assertex(indexActivityFactory != NULL);
-            rootIndexActivity.setown(indexActivityFactory->createActivity(*logctx, indexPacket));
+            rootIndexActivity.setown(indexActivityFactory->createActivity(logctx->querySlaveLogContext(), indexPacket));
             rootIndex = rootIndexActivity->queryIndexReadActivity();
     
             varFileInfo.setown(rootIndex->getVarFileInfo());
@@ -4745,7 +4739,7 @@ public:
     }
 };
 
-IRoxieSlaveActivity *CRoxieKeyedJoinIndexActivityFactory::createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+IRoxieSlaveActivity *CRoxieKeyedJoinIndexActivityFactory::createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
 {
     return new CRoxieKeyedJoinIndexActivity(logctx, packet, helperFactory, this);
 }
@@ -4968,7 +4962,7 @@ public:
 
     }
 
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const;
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const;
 
     virtual StringBuffer &toString(StringBuffer &s) const
     {
@@ -5001,7 +4995,7 @@ class CRoxieKeyedJoinFetchActivity : public CRoxieSlaveActivity
     }
 
 public:
-    CRoxieKeyedJoinFetchActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieKeyedJoinFetchActivityFactory *_aFactory)
+    CRoxieKeyedJoinFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieKeyedJoinFetchActivityFactory *_aFactory)
         : factory(_aFactory), 
           CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory)
     {
@@ -5114,7 +5108,7 @@ IMessagePacker *CRoxieKeyedJoinFetchActivity::process()
         return output.getClear();
 }
 
-IRoxieSlaveActivity *CRoxieKeyedJoinFetchActivityFactory::createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+IRoxieSlaveActivity *CRoxieKeyedJoinFetchActivityFactory::createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
 {
     return new CRoxieKeyedJoinFetchActivity(logctx, packet, helperFactory, this);
 }
@@ -5135,7 +5129,7 @@ protected:
     unsigned remoteId;
 
 public:
-    CRoxieRemoteActivity(IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory, unsigned _remoteId)
+    CRoxieRemoteActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory, unsigned _remoteId)
         : CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory), 
         remoteId(_remoteId)
     {
@@ -5249,7 +5243,7 @@ public:
     {
     }
 
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
     {
         return new CRoxieRemoteActivity(logctx, packet, helperFactory, this, remoteId);
     }
@@ -5317,7 +5311,7 @@ public:
         }
     }
 
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
     {
         throwUnexpected();  // don't actually want to create an activity
     }

+ 1 - 1
roxie/ccd/ccdactivities.hpp

@@ -62,7 +62,7 @@ interface IRoxieSlaveActivity : extends IInterface
 
 interface ISlaveActivityFactory : extends IActivityFactory
 {
-    virtual IRoxieSlaveActivity *createActivity(IRoxieContextLogger &logctx, IRoxieQueryPacket *packet) const = 0;
+    virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const = 0;
     virtual StringBuffer &toString(StringBuffer &ret) const = 0;
     virtual const char *queryQueryName() const = 0;
     virtual void addChildQuery(unsigned id, ActivityArray *childQuery) = 0;

+ 134 - 107
roxie/ccd/ccdcontext.cpp

@@ -1013,7 +1013,7 @@ public:
 
 //---------------------------------------------------------------------------------------
 
-class CSlaveContext : public CInterface, implements IRoxieSlaveContext, implements ICodeContext, implements roxiemem::ITimeLimiter, implements IRowAllocatorMetaActIdCacheCallback
+class CRoxieContextBase : public CInterface, implements IRoxieSlaveContext, implements ICodeContext, implements roxiemem::ITimeLimiter, implements IRowAllocatorMetaActIdCacheCallback
 {
 protected:
     Owned<IConstWUGraphProgress> progress;  // These need to be destroyed very late (particularly, after the childgraphs)
@@ -1027,7 +1027,6 @@ protected:
     Owned<IActivityGraph> graph;
     StringBuffer authToken;
     Owned<IPropertyTree> probeQuery;
-    RoxiePacketHeader *header;
     unsigned lastWuAbortCheck;
     unsigned startTime;
     unsigned totSlavesReplyLen;
@@ -1098,34 +1097,15 @@ protected:
 
 public:
     IMPLEMENT_IINTERFACE;
-    CSlaveContext(const IQueryFactory *_factory, const IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, bool _hasChildren)
+    CRoxieContextBase(const IQueryFactory *_factory, const IRoxieContextLogger &_logctx)
         : factory(_factory), logctx(_logctx), options(factory->queryOptions())
     {
-        bool debuggerActive = false;
-        if (_packet)
-        {
-            header = &_packet->queryHeader();
-            const byte *traceInfo = _packet->queryTraceInfo();
-            options.setFromSlaveLoggingFlags(*traceInfo);
-            debuggerActive = (*traceInfo & LOGGING_DEBUGGERACTIVE) != 0 && _hasChildren;  // No option to debug simple remote activity
-        }
-        else
-            header = NULL;
         startTime = lastWuAbortCheck = msTick();
         persists = NULL;
         temporaries = NULL;
         deserializedResultStore = NULL;
         rereadResults = NULL;
         xmlStoredDatasetReadFlags = ptr_none;
-        if (debuggerActive)
-        {
-            assertex(header);
-            CSlaveDebugContext *slaveDebugContext = new CSlaveDebugContext(this, logctx, *header);
-            slaveDebugContext->init(_packet);
-            debugContext.setown(slaveDebugContext);
-            probeManager.setown(createDebugManager(debugContext, "slaveDebugger"));
-        }
-
         aborted = false;
         exceptionLogged = false;
         totSlavesReplyLen = 0;
@@ -1135,7 +1115,7 @@ public:
         //MORE: If checking heap required then should have
         //rowManager.setown(createCheckingHeap(rowManager)) or something similar.
     }
-    ~CSlaveContext()
+    ~CRoxieContextBase()
     {
         ::Release(rereadResults);
         ::Release(persists);
@@ -1210,46 +1190,6 @@ public:
         return logctx.queryTraceLevel();
     }
 
-    virtual void noteProcessed(const IRoxieServerActivity *activity, unsigned _idx, unsigned _processed) const
-    {
-        if (_processed)
-        {
-            if (graphStats)
-            {
-                IStatisticGatherer & builder = graphStats->queryStatsBuilder();
-                StatsSubgraphScope graphScope(builder, activity->querySubgraphId());
-                StatsEdgeScope scope(builder, activity->queryId(), _idx);
-                builder.addStatistic(StNumRowsProcessed, _processed);
-            }
-            logctx.noteStatistic(StNumRowsProcessed, _processed);
-        }
-    }
-
-    virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, const IRoxieServerActivity *activity, const ActivityTimeAccumulator &_totalCycles, unsigned __int64 _localCycles) const
-    {
-        if (graphStats)
-        {
-            IStatisticGatherer & builder = graphStats->queryStatsBuilder();
-            StatsSubgraphScope graphScope(builder, activity->querySubgraphId());
-            StatsActivityScope scope(builder, activity->queryId());
-            if (_totalCycles.totalCycles)
-            {
-                builder.addStatistic(StWhenFirstRow, (_totalCycles.firstRow));
-                builder.addStatistic(StTimeElapsed, (_totalCycles.elapsed()));
-                builder.addStatistic(StTimeTotalExecute, cycle_to_nanosec(_totalCycles.totalCycles));
-                builder.addStatistic(StTimeLocalExecute, cycle_to_nanosec(_localCycles));
-            }
-            ForEachItemIn(i, fromStats)
-            {
-                StatisticKind kind = fromStats.getKind(i);
-                unsigned __int64 value = fromStats.getStatisticValue(kind);
-                if (value)
-                    builder.addStatistic(kind, value);
-            }
-        }
-        logctx.mergeStats(fromStats);
-    }
-
     virtual void checkAbort()
     {
         // MORE - really should try to apply limits at slave end too
@@ -1519,47 +1459,11 @@ public:
         return (const char *) dll->getResource(id);
     }
 
-    virtual const IResolvedFile *resolveLFN(const char *filename, bool isOpt)
-    {
-        CDateTime cacheDate; // Note - this is empty meaning we don't know...
-        return querySlaveDynamicFileCache()->lookupDynamicFile(*this, filename, cacheDate, 0, header, isOpt, false);
-    }
-
-    virtual IRoxieWriteHandler *createLFN(const char *filename, bool overwrite, bool extend, const StringArray &clusters)
-    {
-        throwUnexpected(); // only support writing on the server
-    }
-
-    virtual void onFileCallback(const RoxiePacketHeader &header, const char *lfn, bool isOpt, bool isLocal)
-    {
-        // On a slave, we need to request info using our own header (not the one passed in) and need to get global rather than just local info
-        // (possibly we could get just local if the channel matches but not sure there is any point)
-        Owned<const IResolvedFile> dFile = resolveLFN(lfn, isOpt);
-        if (dFile)
-        {
-            MemoryBuffer mb;
-            mb.append(sizeof(RoxiePacketHeader), &header);
-            mb.append(lfn);
-            dFile->serializePartial(mb, header.channel, isLocal);
-            ((RoxiePacketHeader *) mb.toByteArray())->activityId = ROXIE_FILECALLBACK;
-            Owned<IRoxieQueryPacket> reply = createRoxiePacket(mb);
-            reply->queryHeader().retries = 0;
-            ROQ->sendPacket(reply, *this); // MORE - the caller's log context might be better? Should we unicast? Note that this does not release the packet
-            return;
-        }
-        ROQ->sendAbortCallback(header, lfn, *this);
-        throwUnexpected();
-    }
     virtual ICodeContext *queryCodeContext()
     {
         return this;
     }
 
-    virtual IRoxieServerContext *queryServerContext()
-    {
-        return NULL;
-    }
-
     virtual IProbeManager *queryProbeManager() const
     {
         return probeManager;
@@ -1590,7 +1494,7 @@ public:
     virtual IEngineContext *queryEngineContext() { return NULL; }
     virtual char *getDaliServers() { throwUnexpected(); }
 
-    // The following from ICodeContext should never be executed in slave activity. If we are on Roxie server (or in child query on slave), they will be implemented by more derived CRoxieServerContext class
+    // The following from ICodeContext should never be executed in slave activity. If we are on Roxie server, they will be implemented by more derived CRoxieServerContext class
     virtual void setResultBool(const char *name, unsigned sequence, bool value) { throwUnexpected(); }
     virtual void setResultData(const char *name, unsigned sequence, int len, const void * data) { throwUnexpected(); }
     virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val) { throwUnexpected(); }
@@ -2357,11 +2261,94 @@ protected:
     }
 };
 
-IRoxieSlaveContext *createSlaveContext(const IQueryFactory *_factory, const IRoxieContextLogger &_logctx, IRoxieQueryPacket *packet, bool hasChildren)
+//-----------------------------------------------------------------------------------------------
+
+class CSlaveContext : public CRoxieContextBase
+{
+protected:
+    RoxiePacketHeader *header;
+
+public:
+    CSlaveContext(const IQueryFactory *_factory, const SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, bool _hasChildren)
+    : CRoxieContextBase(_factory, _logctx)
+    {
+        header = &_packet->queryHeader();
+        const byte *traceInfo = _packet->queryTraceInfo();
+        options.setFromSlaveLoggingFlags(*traceInfo);
+        bool debuggerActive = (*traceInfo & LOGGING_DEBUGGERACTIVE) != 0 && _hasChildren;  // No option to debug simple remote activity
+        if (debuggerActive)
+        {
+            CSlaveDebugContext *slaveDebugContext = new CSlaveDebugContext(this, logctx, *header);
+            slaveDebugContext->init(_packet);
+            debugContext.setown(slaveDebugContext);
+            probeManager.setown(createDebugManager(debugContext, "slaveDebugger"));
+        }
+    }
+    virtual void beforeDispose()
+    {
+        // NOTE: This is needed to ensure that owned activities are destroyed BEFORE I am,
+        // to avoid pure virtual calls when they come to call noteProcessed()
+        childGraphs.kill();
+    }
+
+    virtual IRoxieServerContext *queryServerContext()
+    {
+        return NULL;
+    }
+
+    virtual const IResolvedFile *resolveLFN(const char *filename, bool isOpt)
+    {
+        CDateTime cacheDate; // Note - this is empty meaning we don't know...
+        return querySlaveDynamicFileCache()->lookupDynamicFile(*this, filename, cacheDate, 0, header, isOpt, false);
+    }
+
+    virtual IRoxieWriteHandler *createLFN(const char *filename, bool overwrite, bool extend, const StringArray &clusters)
+    {
+        throwUnexpected(); // only support writing on the server
+    }
+
+    virtual void onFileCallback(const RoxiePacketHeader &header, const char *lfn, bool isOpt, bool isLocal)
+    {
+        // On a slave, we need to request info using our own header (not the one passed in) and need to get global rather than just local info
+        // (possibly we could get just local if the channel matches but not sure there is any point)
+        Owned<const IResolvedFile> dFile = resolveLFN(lfn, isOpt);
+        if (dFile)
+        {
+            MemoryBuffer mb;
+            mb.append(sizeof(RoxiePacketHeader), &header);
+            mb.append(lfn);
+            dFile->serializePartial(mb, header.channel, isLocal);
+            ((RoxiePacketHeader *) mb.toByteArray())->activityId = ROXIE_FILECALLBACK;
+            Owned<IRoxieQueryPacket> reply = createRoxiePacket(mb);
+            reply->queryHeader().retries = 0;
+            ROQ->sendPacket(reply, *this); // MORE - the caller's log context might be better? Should we unicast? Note that this does not release the packet
+            return;
+        }
+        ROQ->sendAbortCallback(header, lfn, *this);
+        throwUnexpected();
+    }
+
+    virtual void noteProcessed(unsigned subgraphId, unsigned activityId, unsigned _idx, unsigned _processed) const
+    {
+        const SlaveContextLogger &slaveLogCtx = static_cast<const SlaveContextLogger &>(logctx);
+        slaveLogCtx.putStatProcessed(subgraphId, activityId, _idx, _processed);
+    }
+
+    virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, unsigned subgraphId, unsigned activityId, const ActivityTimeAccumulator &_totalCycles, unsigned __int64 _localCycles) const
+    {
+        const SlaveContextLogger &slaveLogCtx = static_cast<const SlaveContextLogger &>(logctx);
+        slaveLogCtx.putStats(subgraphId, activityId, fromStats);
+    }
+
+};
+
+IRoxieSlaveContext *createSlaveContext(const IQueryFactory *_factory, const SlaveContextLogger &_logctx, IRoxieQueryPacket *packet, bool hasChildren)
 {
     return new CSlaveContext(_factory, _logctx, packet, hasChildren);
 }
 
+//-----------------------------------------------------------------------------------------------
+
 class CRoxieServerDebugContext : extends CBaseServerDebugContext
 {
     // Some questions:
@@ -2500,7 +2487,7 @@ public:
 
 };
 
-class CRoxieServerContext : public CSlaveContext, implements IRoxieServerContext, implements IGlobalCodeContext
+class CRoxieServerContext : public CRoxieContextBase, implements IRoxieServerContext, implements IGlobalCodeContext
 {
     const IQueryFactory *serverQueryFactory;
     CriticalSection daliUpdateCrit;
@@ -2630,7 +2617,7 @@ public:
     IMPLEMENT_IINTERFACE;
 
     CRoxieServerContext(const IQueryFactory *_factory, const IRoxieContextLogger &_logctx)
-        : CSlaveContext(_factory, _logctx, NULL, false), serverQueryFactory(_factory)
+        : CRoxieContextBase(_factory, _logctx), serverQueryFactory(_factory)
     {
         init();
         rowManager->setMemoryLimit(options.memoryLimit);
@@ -2639,7 +2626,7 @@ public:
     }
 
     CRoxieServerContext(IConstWorkUnit *_workUnit, const IQueryFactory *_factory, const ContextLogger &_logctx)
-        : CSlaveContext(_factory, _logctx, NULL, false), serverQueryFactory(_factory)
+        : CRoxieContextBase(_factory, _logctx), serverQueryFactory(_factory)
     {
         init();
         workUnit.set(_workUnit);
@@ -2654,7 +2641,7 @@ public:
     }
 
     CRoxieServerContext(IPropertyTree *_context, const IQueryFactory *_factory, SafeSocket &_client, TextMarkupFormat _mlFmt, bool _isRaw, bool _isBlocked, HttpHelper &httpHelper, bool _trim, const ContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_querySetName)
-        : CSlaveContext(_factory, _logctx, NULL, false), serverQueryFactory(_factory), querySetName(_querySetName)
+        : CRoxieContextBase(_factory, _logctx), serverQueryFactory(_factory), querySetName(_querySetName)
     {
         init();
         context.set(_context);
@@ -2695,6 +2682,46 @@ public:
         workflow.setown(_factory->createWorkflowMachine(workUnit, false, logctx));
     }
 
+    virtual void noteProcessed(unsigned subgraphId, unsigned activityId, unsigned _idx, unsigned _processed) const
+    {
+        if (_processed)
+        {
+            if (graphStats)
+            {
+                IStatisticGatherer & builder = graphStats->queryStatsBuilder();
+                StatsSubgraphScope graphScope(builder, subgraphId);
+                StatsEdgeScope scope(builder, activityId, _idx);
+                builder.addStatistic(StNumRowsProcessed, _processed);
+            }
+            logctx.noteStatistic(StNumRowsProcessed, _processed);
+        }
+    }
+
+    virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, unsigned subgraphId, unsigned activityId, const ActivityTimeAccumulator &_totalCycles, unsigned __int64 _localCycles) const
+    {
+        if (graphStats)
+        {
+            IStatisticGatherer & builder = graphStats->queryStatsBuilder();
+            StatsSubgraphScope graphScope(builder, subgraphId);
+            StatsActivityScope scope(builder, activityId);
+            if (_totalCycles.totalCycles)
+            {
+                builder.addStatistic(StWhenFirstRow, (_totalCycles.firstRow));
+                builder.addStatistic(StTimeElapsed, (_totalCycles.elapsed()));
+                builder.addStatistic(StTimeTotalExecute, cycle_to_nanosec(_totalCycles.totalCycles));
+                builder.addStatistic(StTimeLocalExecute, cycle_to_nanosec(_localCycles));
+            }
+            ForEachItemIn(i, fromStats)
+            {
+                StatisticKind kind = fromStats.getKind(i);
+                unsigned __int64 value = fromStats.getStatisticValue(kind);
+                if (value)
+                    builder.addStatistic(kind, value);
+            }
+        }
+        logctx.mergeStats(fromStats);
+    }
+
     virtual roxiemem::IRowManager &queryRowManager()
     {
         return *rowManager;
@@ -2710,7 +2737,7 @@ public:
 
     virtual void checkAbort()
     {
-        CSlaveContext::checkAbort();
+        CRoxieContextBase::checkAbort();
         unsigned ticksNow = msTick();
         if (options.warnTimeLimit)
         {
@@ -3473,7 +3500,7 @@ public:
     virtual void endGraph(cycle_t startCycles, bool aborting)
     {
         fileCache.kill();
-        CSlaveContext::endGraph(startCycles, aborting);
+        CRoxieContextBase::endGraph(startCycles, aborting);
     }
 
     virtual void onFileCallback(const RoxiePacketHeader &header, const char *lfn, bool isOpt, bool isLocal)

+ 3 - 3
roxie/ccd/ccdcontext.hpp

@@ -58,8 +58,8 @@ interface IRoxieSlaveContext : extends IRoxieContextLogger
     virtual IRoxieWriteHandler *createLFN(const char *filename, bool overwrite, bool extend, const StringArray &clusters) = 0;
     virtual void onFileCallback(const RoxiePacketHeader &header, const char *lfn, bool isOpt, bool isLocal) = 0;
     virtual IActivityGraph * getLibraryGraph(const LibraryCallFactoryExtra &extra, IRoxieServerActivity *parentActivity) = 0;
-    virtual void noteProcessed(const IRoxieServerActivity *activity, unsigned _idx, unsigned _processed) const = 0;
-    virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, const IRoxieServerActivity *activity, const ActivityTimeAccumulator &_totalCycles, unsigned __int64 _localCycles) const = 0;
+    virtual void noteProcessed(unsigned subgraphId, unsigned activityId, unsigned _idx, unsigned _processed) const = 0;
+    virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, unsigned subgraphId, unsigned activityId, const ActivityTimeAccumulator &_totalCycles, unsigned __int64 _localCycles) const = 0;
     virtual IProbeManager *queryProbeManager() const = 0;
     virtual IDebuggableContext *queryDebugContext() const = 0;
     virtual void printResults(IXmlWriter *output, const char *name, unsigned sequence) = 0;
@@ -106,7 +106,7 @@ typedef IEclProcess* (* EclProcessFactory)();
 class CRoxieWorkflowMachine;
 
 extern IDeserializedResultStore *createDeserializedResultStore();
-extern IRoxieSlaveContext *createSlaveContext(const IQueryFactory *factory, const IRoxieContextLogger &logctx, IRoxieQueryPacket *packet, bool hasRemoteChildren);
+extern IRoxieSlaveContext *createSlaveContext(const IQueryFactory *factory, const SlaveContextLogger &logctx, IRoxieQueryPacket *packet, bool hasRemoteChildren);
 extern IRoxieServerContext *createRoxieServerContext(IPropertyTree *context, const IQueryFactory *factory, SafeSocket &client, bool isXml, bool isRaw, bool isBlocked, HttpHelper &httpHelper, bool trim, const ContextLogger &logctx, PTreeReaderOptions xmlReadFlags, const char *querySetName);
 extern IRoxieServerContext *createOnceServerContext(const IQueryFactory *factory, const IRoxieContextLogger &_logctx);
 extern IRoxieServerContext *createWorkUnitServerContext(IConstWorkUnit *wu, const IQueryFactory *factory, const ContextLogger &logctx);

+ 2 - 2
roxie/ccd/ccdquery.cpp

@@ -1445,7 +1445,7 @@ public:
         return strdup(result ? result : defaultValue);
     }
 
-    virtual IRoxieSlaveContext *createSlaveContext(const IRoxieContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const
+    virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const
     {
         throwUnexpected();   // only implemented in derived slave class
     }
@@ -1842,7 +1842,7 @@ public:
     {
     }
 
-    virtual IRoxieSlaveContext *createSlaveContext(const IRoxieContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const
+    virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const
     {
         return ::createSlaveContext(this, logctx, packet, hasChildren);
     }

+ 1 - 1
roxie/ccd/ccdquery.hpp

@@ -129,7 +129,7 @@ private:
 
 interface IQueryFactory : extends IInterface
 {
-    virtual IRoxieSlaveContext *createSlaveContext(const IRoxieContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const = 0;
+    virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const = 0;
     virtual IActivityGraph *lookupGraph(const char *name, IProbeManager *probeManager, const IRoxieContextLogger &logctx, IRoxieServerActivity *parentActivity) const = 0;
     virtual ISlaveActivityFactory *getSlaveActivityFactory(unsigned id) const = 0;
     virtual IRoxieServerActivityFactory *getRoxieServerActivityFactory(unsigned id) const = 0;

+ 33 - 0
roxie/ccd/ccdqueue.cpp

@@ -530,6 +530,39 @@ void SlaveContextLogger::set(IRoxieQueryPacket *packet)
     }
 }
 
+
+void SlaveContextLogger::putStatProcessed(unsigned subGraphId, unsigned actId, unsigned idx, unsigned processed) const
+{
+    if (output && mergeSlaveStatistics)
+    {
+        MemoryBuffer buf;
+        buf.append((char) LOG_CHILDCOUNT); // A special log entry for the stats
+        buf.append(subGraphId);
+        buf.append(actId);
+        buf.append(idx);
+        buf.append(processed);
+    }
+}
+
+void SlaveContextLogger::putStats(unsigned subGraphId, unsigned actId, const CRuntimeStatisticCollection &stats) const
+{
+    if (output && mergeSlaveStatistics)
+    {
+        MemoryBuffer buf;
+        buf.append((char) LOG_CHILDSTATS); // A special log entry for the stats
+        buf.append(subGraphId);
+        buf.append(actId);
+        if (stats.serialize(buf))
+        {
+            unsigned len = buf.length();
+            void *ret = output->getBuffer(len, true);
+            memcpy(ret, buf.toByteArray(), len);
+            output->putBuffer(ret, len, true);
+            anyOutput = true;
+        }
+    }
+}
+
 void SlaveContextLogger::flush()
 {
     if (output)

+ 44 - 38
roxie/ccd/ccdserver.cpp

@@ -314,13 +314,13 @@ public:
     {
         return ctx->getLibraryGraph(extra, parentActivity);
     }
-    virtual void noteProcessed(const IRoxieServerActivity *activity, unsigned _idx, unsigned _processed) const
+    virtual void noteProcessed(unsigned subgraphId, unsigned activityId, unsigned _idx, unsigned _processed) const
     {
-        ctx->noteProcessed(activity, _idx, _processed);
+        ctx->noteProcessed(subgraphId, activityId, _idx, _processed);
     }
-    virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, const IRoxieServerActivity *_activity, const ActivityTimeAccumulator &_totalCycles, unsigned __int64 _localCycles) const
+    virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, unsigned subgraphId, unsigned activityId, const ActivityTimeAccumulator &_totalCycles, unsigned __int64 _localCycles) const
     {
-        ctx->mergeActivityStats(fromStats, _activity, _totalCycles, _localCycles);
+        ctx->mergeActivityStats(fromStats, subgraphId, activityId, _totalCycles, _localCycles);
     }
     virtual IProbeManager *queryProbeManager() const
     {
@@ -981,8 +981,8 @@ public:
         if (ctx)
         {
             if (processed)
-                ctx->noteProcessed(this, 0, processed);
-            ctx->mergeActivityStats(stats, this, totalCycles, localCycles);
+                ctx->noteProcessed(factory->querySubgraphId(), activityId, 0, processed);
+            ctx->mergeActivityStats(stats, factory->querySubgraphId(), activityId, totalCycles, localCycles);
         }
         basehelper.Release();
         ::Release(rowAllocator);
@@ -4076,33 +4076,7 @@ public:
                                 throwUnexpected();
                             break;
                         }
-                            // MORE - ROXIE_ALIVE perhaps should go here too
-                        case ROXIE_TRACEINFO:
-                        {
-                            Owned<IMessageUnpackCursor> extra = mr->getCursor(rowManager);
-                            loop
-                            {
-                                RecordLengthType *rowlen = (RecordLengthType *) extra->getNext(sizeof(RecordLengthType));
-                                if (rowlen)
-                                {
-                                    char *logInfo = (char *) extra->getNext(*rowlen);
-                                    MemoryBuffer buf;
-                                    buf.setBuffer(*rowlen, logInfo, false);
-                                    if (*logInfo == LOG_STATVALUES)
-                                    {
-                                        buf.skip(1);
-                                        activity.mergeStats(buf);
-                                    }
-                                    else
-                                        activity.queryLogCtx().CTXLOGl(new LogItem(buf));
-                                    ReleaseRoxieRow(rowlen);
-                                    ReleaseRoxieRow(logInfo);
-                                }
-                                else
-                                    break;
-                            }
-                            break;
-                        }
+                        // MORE - ROXIE_ALIVE perhaps should go here too
                         default:
                             if (ctxTraceLevel > 3)
                                 activity.queryLogCtx().CTXLOG("Discarding packet %p - original %p is NULL or has result already", mr.get(), original);
@@ -4198,13 +4172,45 @@ public:
                                 char *logInfo = (char *) extra->getNext(*rowlen);
                                 MemoryBuffer buf;
                                 buf.setBuffer(*rowlen, logInfo, false);
-                                if (*logInfo == LOG_STATVALUES)
+                                switch ((TracingCategory) *logInfo)
                                 {
+                                case LOG_TRACING:
+                                case LOG_ERROR:
+                                    activity.queryLogCtx().CTXLOGl(new LogItem(buf));
+                                    break;
+                                case LOG_STATVALUES:
                                     buf.skip(1);
                                     activity.mergeStats(buf);
+                                    break;
+                                case LOG_CHILDCOUNT:
+                                case LOG_CHILDSTATS:
+                                    unsigned graphId, childId;
+                                    buf.skip(1);
+                                    buf.read(graphId);
+                                    buf.read(childId);
+                                    if (*logInfo == LOG_CHILDCOUNT)
+                                    {
+                                        unsigned childProcessed;
+                                        unsigned idx;
+                                        buf.read(childProcessed);
+                                        buf.read(idx);
+                                        if (traceLevel > 5)
+                                            activity.queryLogCtx().CTXLOG("Processing ChildCount %d idx %d for child %d subgraph %d", childProcessed, idx, childId, graphId);
+                                        activity.queryContext()->noteProcessed(graphId, childId, idx, childProcessed);
+                                    }
+                                    else
+                                    {
+                                        ActivityTimeAccumulator dummy; // We could serialize from slave? Would get confusing though
+                                        CRuntimeStatisticCollection childStats(allStatistics);
+                                        childStats.deserialize(buf);
+                                        if (traceLevel > 5)
+                                        {
+                                            StringBuffer s;
+                                            activity.queryLogCtx().CTXLOG("Processing ChildStats for child %d subgraph %d: %s", childId, graphId, childStats.toStr(s).str());
+                                        }
+                                        activity.queryContext()->mergeActivityStats(childStats, graphId, childId, dummy, 0);
+                                    }
                                 }
-                                else
-                                    activity.queryLogCtx().CTXLOGl(new LogItem(buf));
                                 ReleaseRoxieRow(rowlen);
                                 ReleaseRoxieRow(logInfo);
                             }
@@ -8345,7 +8351,7 @@ public:
             {
                 parent->factory->noteProcessed(oid, processed);
                 if (parent->ctx)
-                    parent->ctx->noteProcessed(parent, oid, processed);
+                    parent->ctx->noteProcessed(parent->querySubgraphId(), parent->activityId, oid, processed);
             }
         }
 
@@ -15183,7 +15189,7 @@ class CRoxieServerLibraryCallActivity : public CRoxieServerActivity
             {
                 parent->factory->noteProcessed(oid, processed);
                 if (parent->ctx)
-                    parent->ctx->noteProcessed(parent, oid, processed);
+                    parent->ctx->noteProcessed(parent->querySubgraphId(), parent->activityId, oid, processed);
             }
         }