Browse Source

HPCC-23975 Convert act. stats to CThorStatsCollection

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 5 năm trước cách đây
mục cha
commit
e55cf02473
39 tập tin đã thay đổi với 356 bổ sung790 xóa
  1. 11 11
      roxie/ccd/ccdserver.cpp
  2. 4 4
      system/jlib/jstatcodes.h
  3. 3 31
      system/jlib/jstats.cpp
  4. 41 7
      system/jlib/jstats.h
  5. 13 0
      system/jlib/jutil.hpp
  6. 1 27
      thorlcr/activities/group/thgroup.cpp
  7. 4 4
      thorlcr/activities/group/thgroupslave.cpp
  8. 6 29
      thorlcr/activities/hashdistrib/thhashdistrib.cpp
  9. 15 13
      thorlcr/activities/hashdistrib/thhashdistribslave.cpp
  10. 1 35
      thorlcr/activities/indexread/thindexread.cpp
  11. 35 36
      thorlcr/activities/indexread/thindexreadslave.cpp
  12. 2 20
      thorlcr/activities/indexwrite/thindexwrite.cpp
  13. 5 5
      thorlcr/activities/indexwrite/thindexwriteslave.cpp
  14. 1 37
      thorlcr/activities/join/thjoin.cpp
  15. 25 13
      thorlcr/activities/join/thjoinslave.cpp
  16. 1 41
      thorlcr/activities/keyedjoin/thkeyedjoin-legacy.cpp
  17. 1 30
      thorlcr/activities/keyedjoin/thkeyedjoin.cpp
  18. 11 4
      thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp
  19. 29 21
      thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp
  20. 1 31
      thorlcr/activities/lookupjoin/thlookupjoin.cpp
  21. 7 5
      thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
  22. 1 15
      thorlcr/activities/loop/thloop.cpp
  23. 5 5
      thorlcr/activities/loop/thloopslave.cpp
  24. 15 11
      thorlcr/activities/msort/thgroupsortslave.cpp
  25. 1 13
      thorlcr/activities/msort/thmsort.cpp
  26. 12 8
      thorlcr/activities/msort/thmsortslave.cpp
  27. 11 9
      thorlcr/activities/selfjoin/thselfjoinslave.cpp
  28. 5 51
      thorlcr/activities/thdiskbase.cpp
  29. 0 10
      thorlcr/activities/thdiskbase.ipp
  30. 18 15
      thorlcr/activities/thdiskbaseslave.cpp
  31. 0 1
      thorlcr/activities/thdiskbaseslave.ipp
  32. 8 178
      thorlcr/graph/thgraphmaster.cpp
  33. 4 48
      thorlcr/graph/thgraphmaster.ipp
  34. 14 10
      thorlcr/graph/thgraphslave.cpp
  35. 6 5
      thorlcr/graph/thgraphslave.hpp
  36. 2 1
      thorlcr/slave/slave.cpp
  37. 1 1
      thorlcr/slave/slave.ipp
  38. 17 2
      thorlcr/thorutil/thormisc.cpp
  39. 19 3
      thorlcr/thorutil/thormisc.hpp

+ 11 - 11
roxie/ccd/ccdserver.cpp

@@ -381,26 +381,26 @@ static const StatisticsMapping actStatistics({StWhenFirstRow, StTimeElapsed, StT
                                               StNumRowsProcessed, StNumSlaves, StNumStarts, StNumStops, StNumStrands,
                                               StNumScansPerRow, StNumAllocations, StNumAllocationScans,
                                               StTimeFirstExecute, StCycleLocalExecuteCycles, StCycleTotalExecuteCycles});
-static const StatisticsMapping joinStatistics(&actStatistics, {StNumAtmostTriggered});
-static const StatisticsMapping keyedJoinStatistics(&joinStatistics, { StNumServerCacheHits, StNumIndexSeeks, StNumIndexScans, StNumIndexWildSeeks,
+static const StatisticsMapping joinStatistics({StNumAtmostTriggered}, actStatistics);
+static const StatisticsMapping keyedJoinStatistics({ StNumServerCacheHits, StNumIndexSeeks, StNumIndexScans, StNumIndexWildSeeks,
                                                     StNumIndexSkips, StNumIndexNullSkips, StNumIndexMerges, StNumIndexMergeCompares,
                                                     StNumPreFiltered, StNumPostFiltered, StNumIndexAccepted, StNumIndexRejected,
                                                     StNumIndexRowsRead, StNumDiskRowsRead, StNumDiskSeeks, StNumDiskAccepted,
                                                     StNumBlobCacheHits, StNumLeafCacheHits, StNumNodeCacheHits,
                                                     StNumBlobCacheAdds, StNumLeafCacheAdds, StNumNodeCacheAdds,
-                                                    StNumDiskRejected});
-static const StatisticsMapping indexStatistics(&actStatistics, {StNumServerCacheHits, StNumIndexSeeks, StNumIndexScans, StNumIndexWildSeeks,
+                                                    StNumDiskRejected}, joinStatistics);
+static const StatisticsMapping indexStatistics({StNumServerCacheHits, StNumIndexSeeks, StNumIndexScans, StNumIndexWildSeeks,
                                                 StNumIndexSkips, StNumIndexNullSkips, StNumIndexMerges, StNumIndexMergeCompares,
                                                 StNumPreFiltered, StNumPostFiltered, StNumIndexAccepted, StNumIndexRejected,
                                                 StNumBlobCacheHits, StNumLeafCacheHits, StNumNodeCacheHits,
                                                 StNumBlobCacheAdds, StNumLeafCacheAdds, StNumNodeCacheAdds,
-                                                StNumIndexRowsRead});
-static const StatisticsMapping diskStatistics(&actStatistics, {StNumServerCacheHits, StNumDiskRowsRead, StNumDiskSeeks, StNumDiskAccepted,
-                                               StNumDiskRejected });
-static const StatisticsMapping soapStatistics(&actStatistics, { StTimeSoapcall });
-static const StatisticsMapping groupStatistics(&actStatistics, { StNumGroups, StNumGroupMax });
-static const StatisticsMapping sortStatistics(&actStatistics, { StTimeSortElapsed });
-static const StatisticsMapping indexWriteStatistics(&actStatistics, { StNumDuplicateKeys });
+                                                StNumIndexRowsRead}, actStatistics);
+static const StatisticsMapping diskStatistics({StNumServerCacheHits, StNumDiskRowsRead, StNumDiskSeeks, StNumDiskAccepted,
+                                               StNumDiskRejected }, actStatistics);
+static const StatisticsMapping soapStatistics({ StTimeSoapcall }, actStatistics);
+static const StatisticsMapping groupStatistics({ StNumGroups, StNumGroupMax }, actStatistics);
+static const StatisticsMapping sortStatistics({ StTimeSortElapsed }, actStatistics);
+static const StatisticsMapping indexWriteStatistics({ StNumDuplicateKeys }, actStatistics);
 
 //=================================================================================
 

+ 4 - 4
system/jlib/jstatcodes.h

@@ -116,11 +116,11 @@ enum StatisticKind
 {
     StKindNone,
     StKindAll,
-  StWhenGraphStarted,                   // Deprecated use StWhenStarted
-  StWhenGraphFinished,                  // Deprecated use StWhenFinished
+    StWhenGraphStarted,                 // Deprecated use StWhenStarted
+    StWhenGraphFinished,                // Deprecated use StWhenFinished
     StWhenFirstRow,                     // When the first row is processed by slave activity
-  StWhenQueryStarted,                   // Deprecated use StWhenStarted
-  StWhenQueryFinished,                  // Deprecated use StWhenFinished
+    StWhenQueryStarted,                 // Deprecated use StWhenStarted
+    StWhenQueryFinished,                // Deprecated use StWhenFinished
     StWhenCreated,
     StWhenCompiled,
     StWhenWorkunitModified,             // Not sure this is very useful

+ 3 - 31
system/jlib/jstats.cpp

@@ -1189,36 +1189,6 @@ static int compareUnsigned(unsigned const * left, unsigned const * right)
     return (*left < *right) ? -1 : (*left > *right) ? +1 : 0;
 }
 
-StatisticsMapping::StatisticsMapping(const std::initializer_list<StatisticKind> &kinds)
-{
-    for (auto kind : kinds)
-    {
-        assert(kind != StKindNone);
-        assert(!indexToKind.contains(kind));
-        indexToKind.append(kind);
-    }
-    createMappings();
-}
-
-StatisticsMapping::StatisticsMapping(const StatisticsMapping * from, const std::initializer_list<StatisticKind> &kinds)
-{
-    ForEachItemIn(idx, from->indexToKind)
-        indexToKind.append(from->indexToKind.item(idx));
-    for (auto kind : kinds)
-    {
-        assert(kind != StKindNone);
-        assert(!indexToKind.contains(kind));
-        indexToKind.append(kind);
-    }
-    createMappings();
-}
-
-StatisticsMapping::StatisticsMapping()
-{
-    for (int i = StKindAll+1; i < StMax; i++)
-        indexToKind.append(i);
-    createMappings();
-}
 
 void StatisticsMapping::createMappings()
 {
@@ -1236,7 +1206,7 @@ void StatisticsMapping::createMappings()
     }
 }
 
-const StatisticsMapping allStatistics;
+const StatisticsMapping allStatistics(StKindAll);
 const StatisticsMapping heapStatistics({StNumAllocations, StNumAllocationScans});
 const StatisticsMapping diskLocalStatistics({StCycleDiskReadIOCycles, StSizeDiskRead, StNumDiskReads, StCycleDiskWriteIOCycles, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries});
 const StatisticsMapping diskRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries});
@@ -3690,3 +3660,5 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
     return true;
 }
 #endif
+
+

+ 41 - 7
system/jlib/jstats.h

@@ -400,13 +400,32 @@ protected:
 class jlib_decl StatisticsMapping
 {
 public:
-    //Takes a list of StatisticKind
-    StatisticsMapping(const std::initializer_list<StatisticKind> &kinds);
-    //Takes an existing Mapping, and extends it with a list of StatisticKind
-    StatisticsMapping(const StatisticsMapping * from, const std::initializer_list<StatisticKind> &kinds);
-    //Accepts all StatisticKind values
-    StatisticsMapping();
-
+    //Takes a list of StatisticKind and a variable number of existing mappings and combines
+    template <typename... Mappings>
+    StatisticsMapping(const std::initializer_list<StatisticKind> &kinds, const Mappings &... mappings) : StatisticsMapping(&mappings...)
+    {
+        for (auto kind : kinds)
+        {
+            assert((kind != StKindNone) && (kind != StKindAll));
+            assert(!indexToKind.contains(kind));
+            indexToKind.append(kind);
+        }
+        createMappings();
+    }
+    StatisticsMapping(StatisticKind kind)
+    {
+        if (StKindAll == kind)
+        {
+            for (int i = StKindAll+1; i < StMax; i++)
+                indexToKind.append(i);
+        }
+        else
+        {
+            assert(kind != StKindNone);
+            indexToKind.append(kind);
+        }
+        createMappings();
+    }
     inline unsigned getIndex(StatisticKind kind) const
     {
         dbgassertex(kind >= StKindNone && kind < StMax);
@@ -416,11 +435,26 @@ public:
     inline unsigned numStatistics() const { return indexToKind.ordinality(); }
 
 protected:
+    StatisticsMapping() { }
+    template <typename Mapping>
+    StatisticsMapping(const Mapping *mapping)
+    {
+        ForEachItemIn(idx, mapping->indexToKind)
+            indexToKind.append(mapping->indexToKind.item(idx));
+    }
+    template <typename Mapping, typename... Mappings>
+    StatisticsMapping(const Mapping *mapping, const Mappings * ... mappings) : StatisticsMapping(mappings...)
+    {
+        ForEachItemIn(idx, mapping->indexToKind)
+            indexToKind.append(mapping->indexToKind.item(idx));
+    }
     void createMappings();
 
 protected:
     UnsignedArray kindToIndex;
     UnsignedArray indexToKind;
+private:
+    StatisticsMapping& operator=(const StatisticsMapping&) =delete;
 };
 
 extern const jlib_decl StatisticsMapping allStatistics;

+ 13 - 0
system/jlib/jutil.hpp

@@ -26,6 +26,7 @@
 
 #include <algorithm> 
 #include <iterator>
+#include <functional>
 
 #if defined (__APPLE__)
 #include <mach/mach_time.h>
@@ -565,5 +566,17 @@ inline bool stdContains(Container&& container, Value &&v)
     return container.end() != std::find(container.begin(), container.end(), std::forward<Value>(v));
 }
 
+
+class jlib_decl COnScopeExit
+{
+    const std::function<void()> exitFunc;
+public:
+    inline COnScopeExit(const std::function<void()> &_exitFunc) : exitFunc(_exitFunc) { }
+    inline ~COnScopeExit()
+    {
+        exitFunc();
+    }
+};
+
 #endif
 

+ 1 - 27
thorlcr/activities/group/thgroup.cpp

@@ -21,35 +21,9 @@
 
 class CGroupBaseActivityMaster : public CMasterActivity
 {
-    Owned<CThorStats> statNumGroups;
-    Owned<CThorStats> statNumGroupMax;
 public:
-    CGroupBaseActivityMaster(CMasterGraphElement *info) : CMasterActivity(info)
+    CGroupBaseActivityMaster(CMasterGraphElement *info) : CMasterActivity(info, groupActivityStatistics)
     {
-        statNumGroups.setown(new CThorStats(queryJob(), StNumGroups));
-        statNumGroupMax.setown(new CThorStats(queryJob(), StNumGroupMax));
-    }
-    virtual void init()
-    {
-        CMasterActivity::init();
-    }
-    virtual void deserializeStats(unsigned node, MemoryBuffer &mb)
-    {
-        CMasterActivity::deserializeStats(node, mb);
-
-        rowcount_t numGroups;
-        mb.read(numGroups);
-        statNumGroups->set(node, numGroups);
-
-        rowcount_t numGroupMax;
-        mb.read(numGroupMax);
-        statNumGroupMax->set(node, numGroupMax);
-    }
-    virtual void getActivityStats(IStatisticGatherer & stats)
-    {
-        CMasterActivity::getActivityStats(stats);
-        statNumGroups->getStats(stats, false);
-        statNumGroupMax->getStats(stats, false);
     }
 };
 

+ 4 - 4
thorlcr/activities/group/thgroupslave.cpp

@@ -53,7 +53,7 @@ class GroupSlaveActivity : public CSlaveActivity
     }
 public:
     GroupSlaveActivity(CGraphElementBase *_container)
-        : CSlaveActivity(_container)
+        : CSlaveActivity(_container, groupActivityStatistics)
     {
         helper = static_cast <IHThorGroupArg *> (queryHelper());
         rolloverEnabled = false;
@@ -189,9 +189,9 @@ public:
     virtual bool isGrouped() const override{ return true; }
     virtual void serializeStats(MemoryBuffer &mb) override
     {
-        CSlaveActivity::serializeStats(mb);
-        mb.append(numGroups);
-        mb.append(numGroupMax);
+        stats.setStatistic(StNumGroups, numGroups);
+        stats.setStatistic(StNumGroupMax, numGroupMax);
+        PARENT::serializeStats(mb);
     }
 };
 

+ 6 - 29
thorlcr/activities/hashdistrib/thhashdistrib.cpp

@@ -38,8 +38,8 @@ class HashDistributeMasterBase : public CMasterActivity
     mptag_t mptag;
     mptag_t mptag2; // for tag 2
 public:
-    HashDistributeMasterBase(DistributeMode _mode, CMasterGraphElement *info) 
-        : CMasterActivity(info), mode(_mode) 
+    HashDistributeMasterBase(DistributeMode _mode, CMasterGraphElement *info, const StatisticsMapping &actStatsMapping = basicActivityStatistics) 
+        : CMasterActivity(info, actStatsMapping), mode(_mode) 
     {
         mptag = TAG_NULL;
         mptag2 = TAG_NULL;
@@ -75,35 +75,12 @@ public:
     HashDistributeActivityMaster(DistributeMode mode, CMasterGraphElement *info) : HashDistributeMasterBase(mode, info) { }
 };
 
-class HashJoinDistributeActivityMaster : public HashDistributeActivityMaster
+class HashJoinDistributeActivityMaster : public HashDistributeMasterBase
 {
-    Owned<ProgressInfo> lhsProgress, rhsProgress;
-
 public:
-    HashJoinDistributeActivityMaster(DistributeMode mode, CMasterGraphElement *info) : HashDistributeActivityMaster(mode, info)
-    {
-        lhsProgress.setown(new ProgressInfo(queryJob()));
-        rhsProgress.setown(new ProgressInfo(queryJob()));
-    }
-    virtual void deserializeStats(unsigned node, MemoryBuffer &mb)
-    {
-        HashDistributeActivityMaster::deserializeStats(node, mb);
-        rowcount_t lhsProgressCount, rhsProgressCount;
-        mb.read(lhsProgressCount);
-        mb.read(rhsProgressCount);
-        lhsProgress->set(node, lhsProgressCount);
-        rhsProgress->set(node, rhsProgressCount);
-    }
-    virtual void getEdgeStats(IStatisticGatherer & stats, unsigned idx)
+    HashJoinDistributeActivityMaster(DistributeMode mode, CMasterGraphElement *info)
+        : HashDistributeMasterBase(mode, info, hashJoinActivityStatistics)
     {
-        //This should be an activity stats
-        HashDistributeActivityMaster::getEdgeStats(stats, idx);
-        assertex(0 == idx);
-        lhsProgress->processInfo();
-        rhsProgress->processInfo();
-
-        stats.addStatistic(StNumLeftRows, lhsProgress->queryTotal());
-        stats.addStatistic(StNumRightRows, rhsProgress->queryTotal());
     }
 };
 
@@ -163,7 +140,7 @@ class ReDistributeActivityMaster : public HashDistributeMasterBase
     mptag_t statstag;
 
 public:
-    ReDistributeActivityMaster(CMasterGraphElement *info) : HashDistributeMasterBase(DM_redistribute, info) 
+    ReDistributeActivityMaster(CMasterGraphElement *info) : HashDistributeMasterBase(DM_redistribute, info)
     { 
         statstag = container.queryJob().allocateMPTag();
     }

+ 15 - 13
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -3861,7 +3861,7 @@ class HashJoinSlaveActivity : public CSlaveActivity, implements IStopInput
 
 public:
     HashJoinSlaveActivity(CGraphElementBase *_container)
-        : CSlaveActivity(_container)
+        : CSlaveActivity(_container, hashJoinActivityStatistics)
     {
         lhsProgressCount = rhsProgressCount = 0;
         mptag = TAG_NULL;
@@ -4008,20 +4008,22 @@ public:
         info.canStall = true;
         info.unknownRowsOutput = true;
     }
-    void serializeStats(MemoryBuffer &mb)
+    virtual void serializeStats(MemoryBuffer &mb) override
     {
-        CSlaveActivity::serializeStats(mb);
-        CriticalBlock b(joinHelperCrit);
-        if (!joinhelper) // bit odd, but will leave as was for now.
-        {
-            mb.append(lhsProgressCount);
-            mb.append(rhsProgressCount);
-        }
-        else
         {
-            mb.append(joinhelper->getLhsProgress());
-            mb.append(joinhelper->getRhsProgress());
-        }
+            CriticalBlock b(joinHelperCrit);
+            if (!joinhelper) // bit odd, but will leave as was for now.
+            {
+                stats.setStatistic(StNumLeftRows, lhsProgressCount);
+                stats.setStatistic(StNumRightRows, rhsProgressCount);
+            }
+            else
+            {
+                stats.setStatistic(StNumLeftRows, joinhelper->getLhsProgress());
+                stats.setStatistic(StNumRightRows, joinhelper->getRhsProgress());
+            }
+        }    
+        PARENT::serializeStats(mb);
     }
 };
 #ifdef _MSC_VER

+ 1 - 35
thorlcr/activities/indexread/thindexread.cpp

@@ -35,9 +35,6 @@ protected:
     bool nofilter = false;
     bool localKey = false;
     bool partitionKey = false;
-    ProgressInfoArray progressInfoArr;
-    UnsignedArray progressKinds;
-    Owned<ProgressInfo> inputProgress;
     StringBuffer fileName;
 
     rowcount_t aggregateToLimit()
@@ -208,16 +205,11 @@ protected:
     }
 
 public:
-    CIndexReadBase(CMasterGraphElement *info) : CMasterActivity(info)
+    CIndexReadBase(CMasterGraphElement *info) : CMasterActivity(info, indexReadActivityStatistics)
     {
         indexBaseHelper = (IHThorIndexReadBaseArg *)queryHelper();
         if (!container.queryLocalOrGrouped())
             mpTag = container.queryJob().allocateMPTag();
-        progressKinds.append(StNumIndexSeeks);
-        progressKinds.append(StNumIndexScans);
-        ForEachItemIn(l, progressKinds)
-            progressInfoArr.append(*new ProgressInfo(queryJob()));
-        inputProgress.setown(new ProgressInfo(queryJob()));
         reInit = 0 != (indexBaseHelper->getFlags() & (TIRvarfilename|TIRdynamicfilename));
     }
     virtual void init() override
@@ -287,32 +279,6 @@ public:
         if (partNumbers.ordinality())
             fileDesc->serializeParts(dst, partNumbers);
     }
-    virtual void deserializeStats(unsigned node, MemoryBuffer &mb) override
-    {
-        CMasterActivity::deserializeStats(node, mb);
-        rowcount_t progress;
-        mb.read(progress);
-        inputProgress->set(node, progress);
-        ForEachItemIn(p, progressKinds)
-        {
-            unsigned __int64 st;
-            mb.read(st);
-            progressInfoArr.item(p).set(node, st);
-        }
-    }
-    virtual void getEdgeStats(IStatisticGatherer & stats, unsigned idx) override
-    {
-        //This should be an activity stats
-        CMasterActivity::getEdgeStats(stats, idx);
-
-        stats.addStatistic(StNumIndexRowsRead, inputProgress->queryTotal());
-        ForEachItemIn(p, progressInfoArr)
-        {
-            ProgressInfo &progress = progressInfoArr.item(p);
-            progress.processInfo();
-            stats.addStatistic((StatisticKind)progressKinds.item(p), progress.queryTotal());
-        }
-    }
     virtual void abort() override
     {
         CMasterActivity::abort();

+ 35 - 36
thorlcr/activities/indexread/thindexreadslave.cpp

@@ -53,10 +53,6 @@ protected:
     rowcount_t remoteLimit = RCMAX;
     bool localKey = false;
     size32_t seekGEOffset = 0;
-    __int64 lastSeeks = 0, lastScans = 0;
-    UInt64Array _statsArr;
-    SpinLock statLock;  // MORE: Can this be avoided by passing in the delta?
-    unsigned __int64 *statsArr = nullptr;
     size32_t fixedDiskRecordSize = 0;
     rowcount_t progress = 0;
     bool eoi = false;
@@ -80,6 +76,24 @@ protected:
     rowcount_t rowLimit = RCMAX;
     bool useRemoteStreaming = false;
 
+    template<class StatProvider>
+    class CCaptureIndexStats
+    {
+        CRuntimeStatisticCollection &stats;
+        StatProvider &statProvider;
+        unsigned __int64 startSeeks = 0, startScans = 0;
+    public:
+        inline CCaptureIndexStats(CRuntimeStatisticCollection &_stats, StatProvider &_statProvider) : stats(_stats), statProvider(_statProvider)
+        {
+            startSeeks = statProvider.querySeeks();
+            startScans = statProvider.queryScans();
+        }
+        inline ~CCaptureIndexStats()
+        {
+            stats.mergeStatistic(StNumIndexSeeks, statProvider.querySeeks() - startSeeks);
+            stats.mergeStatistic(StNumIndexScans, statProvider.queryScans() - startScans);
+        }
+    };
 
     class TransformCallback : implements IThorIndexCallback , public CSimpleInterface
     {
@@ -364,7 +378,6 @@ public:
             return;
         resetManager(manager);
         callback.prepareManager(manager);
-        resetLastStats();
         helper->createSegmentMonitors(manager);
         manager->finishSegmentMonitors();
         manager->reset();
@@ -394,10 +407,12 @@ public:
         const void *ret = nullptr;
         while (true)
         {
-            ret = currentInput->nextKey();
-            noteStats(currentInput->querySeeks(), currentInput->queryScans());
-            if (ret)
-                break;
+            {
+                CCaptureIndexStats<IIndexLookup> scoped(stats, *currentInput);
+                ret = currentInput->nextKey();
+                if (ret)
+                    break;
+            }
             configureNextInput();
             if (!currentInput)
                 break;
@@ -482,8 +497,8 @@ public:
             keyedLimitCount = getLocalCount(keyedLimit, true);
     }
 public:
-    CIndexReadSlaveBase(CGraphElementBase *container) 
-        : CSlaveActivity(container), callback(*this)
+    CIndexReadSlaveBase(CGraphElementBase *container)
+        : CSlaveActivity(container, indexReadActivityStatistics), callback(*this)
     {
         helper = (IHThorIndexReadBaseArg *)container->queryHelper();
         limitTransformExtra = nullptr;
@@ -492,9 +507,6 @@ public:
         deserializer.set(queryRowDeserializer());
         serializer.set(queryRowSerializer());
         helper->setCallback(&callback);
-        _statsArr.append(0);
-        _statsArr.append(0);
-        statsArr = _statsArr.getArray();
         reInit = 0 != (helper->getFlags() & (TIRvarfilename|TIRdynamicfilename));
     }
     rowcount_t getLocalCount(const rowcount_t keyedLimit, bool hard)
@@ -513,11 +525,11 @@ public:
                 break;
             if (keyManager)
                 prepareManager(keyManager);
+            CCaptureIndexStats<IIndexLookup> scoped(stats, *indexInput);
             if (hard) // checkCount checks hard key count only.
                 count += indexInput->checkCount(keyedLimit-count); // part max, is total limit [keyedLimit] minus total so far [count]
             else
                 count += indexInput->getCount();
-            noteStats(indexInput->querySeeks(), indexInput->queryScans());
             bool limitHit = count > keyedLimit;
             if (keyManager)
                 resetManager(keyManager);
@@ -535,18 +547,6 @@ public:
         sendPartialCount(*this, count);
         return getFinalCount(*this);
     }
-    inline void resetLastStats()
-    {
-        lastSeeks = lastScans = 0;
-    }
-    inline void noteStats(unsigned seeks, unsigned scans)
-    {
-        SpinBlock b(statLock);
-        statsArr[AS_Seeks] += seeks-lastSeeks;
-        statsArr[AS_Scans] += scans-lastScans;
-        lastSeeks = seeks;
-        lastScans = scans;
-    }
 
 // IThorSlaveActivity
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
@@ -570,7 +570,6 @@ public:
         if (parts)
             deserializePartFileDescriptors(data, partDescs);
         localKey = partDescs.ordinality() ? partDescs.item(0).queryOwner().queryProperties().getPropBool("@local", false) : false;
-        lastSeeks = lastScans = 0;
         localMerge = (localKey && partDescs.ordinality()>1) || seekGEOffset;
 
         if (parts)
@@ -684,12 +683,10 @@ public:
             currentManager = nullptr;
         }
     }
-    void serializeStats(MemoryBuffer &mb)
+    virtual void serializeStats(MemoryBuffer &mb) override
     {
-        CSlaveActivity::serializeStats(mb);
-        mb.append(progress);
-        ForEachItemIn(s, _statsArr)
-            mb.append(_statsArr.item(s));
+        stats.setStatistic(StNumRowsProcessed, progress);
+        PARENT::serializeStats(mb);
     }
 };
 
@@ -758,9 +755,9 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase
             helper->mapOutputToInput(tempBuilder, seek, numFields); // NOTE - weird interface to mapOutputToInput means that it STARTS writing at seekGEOffset...
             rawSeek = (byte *)temp;
         }
+        CCaptureIndexStats<IKeyManager> scoped(stats, *currentManager);
         if (!currentManager->lookupSkip(rawSeek, seekGEOffset, seekSize))
             return NULL;
-        noteStats(currentManager->querySeeks(), currentManager->queryScans());
         const byte *row = currentManager->queryKeyBuffer();
 #ifdef _DEBUG
         if (memcmp(row + seekGEOffset, rawSeek, seekSize) < 0)
@@ -1067,13 +1064,14 @@ public:
                         break;
                     if (keyManager)
                         prepareManager(keyManager);
+
+                    CCaptureIndexStats<IIndexLookup> scoped(stats, *indexInput);
                     while (true)
                     {
                         const void *key = indexInput->nextKey();
                         if (!key)
                             break;
                         ++progress;
-                        noteStats(indexInput->querySeeks(), indexInput->queryScans());
                         helper->processRow(key, this);
                         callback.finishedRow();
                     }
@@ -1211,10 +1209,11 @@ public:
                             break;
                         if (keyManager)
                             prepareManager(keyManager);
+
+                        CCaptureIndexStats<IIndexLookup> scoped(stats, *indexInput);
                         while (true)
                         {
                             const void *key = indexInput->nextKey();
-                            noteStats(indexInput->querySeeks(), indexInput->queryScans());
                             if (!key)
                                 break;
                             if (incKeyedExceedsLimit())

+ 2 - 20
thorlcr/activities/indexwrite/thindexwrite.cpp

@@ -37,18 +37,14 @@ class IndexWriteActivityMaster : public CMasterActivity
     StringArray clusters;
     mptag_t mpTag2;
     bool refactor;
-    Owned<ProgressInfo> replicateProgress;
-    bool publishReplicatedDone;
     CDfsLogicalFileName dlfn;
     IHThorIndexWriteArg *helper;
     StringAttr fileName;
 
 public:
-    IndexWriteActivityMaster(CMasterGraphElement *info) : CMasterActivity(info)
+    IndexWriteActivityMaster(CMasterGraphElement *info) : CMasterActivity(info, indexWriteActivityStatistics)
     {
         helper = (IHThorIndexWriteArg *)queryHelper();
-        replicateProgress.setown(new ProgressInfo(queryJob()));
-        publishReplicatedDone = !globals->getPropBool("@replicateAsync", true);
         recordsProcessed = 0;
         refactor = singlePartKey = isLocal = false;
         mpTag2 = TAG_NULL;
@@ -345,25 +341,11 @@ public:
         }
         duplicateKeyCount = 0;
     }
-    virtual void deserializeStats(unsigned node, MemoryBuffer &mb)
-    {
-        CMasterActivity::deserializeStats(node, mb);
-        unsigned repPerc;
-        mb.read(repPerc);
-        replicateProgress->set(node, repPerc);
-    }
-    virtual void getActivityStats(IStatisticGatherer & stats)
+    virtual void getActivityStats(IStatisticGatherer & stats) override
     {
         CMasterActivity::getActivityStats(stats);
         stats.addStatistic(StNumDuplicateKeys, cummulativeDuplicateKeyCount);
-        if (publishReplicatedDone)
-        {
-            replicateProgress->processInfo();
-            //GC->JCS An average of percentages doesn't give you a very accurate answer..
-            stats.addStatistic(StPerReplicated, replicateProgress->queryAverage() * 10000);
-        }
     }
-
 };
 
 CActivityBase *createIndexWriteActivityMaster(CMasterGraphElement *container)

+ 5 - 5
thorlcr/activities/indexwrite/thindexwriteslave.cpp

@@ -75,7 +75,7 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt
 public:
     IMPLEMENT_IINTERFACE_USING(PARENT);
 
-    IndexWriteSlaveActivity(CGraphElementBase *_container) : ProcessSlaveActivity(_container)
+    IndexWriteSlaveActivity(CGraphElementBase *_container) : ProcessSlaveActivity(_container, indexWriteActivityStatistics)
     {
         helper = static_cast <IHThorIndexWriteArg *> (queryHelper());
         init();
@@ -639,7 +639,7 @@ public:
             fireException(e);
         }
     }
-    virtual void onInputFinished(rowcount_t finalcount)
+    virtual void onInputFinished(rowcount_t finalcount) override
     {
         if (!sizeSignalled)
         {
@@ -647,14 +647,14 @@ public:
             ActPrintLog("finished input %" RCPF "d", finalcount);
         }
     }
-    virtual void serializeStats(MemoryBuffer &mb)
+    virtual void serializeStats(MemoryBuffer &mb) override
     {
+        stats.setStatistic(StPerReplicated, replicateDone);
         PARENT::serializeStats(mb);
-        mb.append(replicateDone);
     }
 
 // ICopyFileProgress
-    CFPmode onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize)
+    virtual CFPmode onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize) override
     {
         replicateDone = sizeDone ? ((unsigned)(sizeDone*100/totalSize)) : 0;
         return abortSoon?CFPstop:CFPcontinue;

+ 1 - 37
thorlcr/activities/join/thjoin.cpp

@@ -39,8 +39,6 @@ class JoinActivityMaster : public CMasterActivity
     unsigned selfJoinWarnLevel, lastMsgTime;
     mptag_t mpTagRPC, barrierMpTag;
     Owned<IBarrier> barrier;
-    Owned<ProgressInfo> lhsProgress, rhsProgress;
-    CThorStatsCollection extraStats;
 
     bool nosortPrimary()
     {
@@ -78,11 +76,9 @@ class JoinActivityMaster : public CMasterActivity
         }
     } *climitedcmp;
 public:
-    JoinActivityMaster(CMasterGraphElement * info, bool local) : CMasterActivity(info), extraStats(spillStatistics)
+    JoinActivityMaster(CMasterGraphElement * info, bool local) : CMasterActivity(info, joinActivityStatistics)
     {
         ActPrintLog("JoinActivityMaster");
-        lhsProgress.setown(new ProgressInfo(queryJob()));
-        rhsProgress.setown(new ProgressInfo(queryJob()));
         helper = (IHThorJoinArg *)queryHelper();
         islocal = local;
         imaster = NULL;
@@ -99,11 +95,6 @@ public:
         container.queryJob().freeMPTag(barrierMpTag);
         delete climitedcmp;
     }
-    virtual void getActivityStats(IStatisticGatherer & stats)
-    {
-        CMasterActivity::getActivityStats(stats);
-        extraStats.getStats(stats);
-    }
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
     {
         if (!islocal)
@@ -335,34 +326,7 @@ public:
         }
         ActPrintLog("process exit");
     }
-    virtual void deserializeStats(unsigned node, MemoryBuffer &mb)
-    {
-        CMasterActivity::deserializeStats(node, mb);
-        rowcount_t lhsProgressCount, rhsProgressCount;
-        mb.read(lhsProgressCount);
-        lhsProgress->set(node, lhsProgressCount);
-        if (TAKselfjoin != container.getKind() && TAKselfjoinlight != container.getKind())
-        {
-            mb.read(rhsProgressCount);
-            rhsProgress->set(node, rhsProgressCount);
-        }
 
-        extraStats.deserialize(node, mb);
-    }
-    virtual void getEdgeStats(IStatisticGatherer & stats, unsigned idx)
-    {
-        //This should be an activity stats
-        CMasterActivity::getEdgeStats(stats, idx);
-        assertex(0 == idx);
-
-        lhsProgress->processInfo();
-        stats.addStatistic(StNumLeftRows, lhsProgress->queryTotal());
-        if (TAKselfjoin != container.getKind() && TAKselfjoinlight != container.getKind())
-        {
-            rhsProgress->processInfo();
-            stats.addStatistic(StNumRightRows, rhsProgress->queryTotal());
-        }
-    }
 #define MSGTIME (5*60*1000)
     virtual bool fireException(IException *_e)
     {

+ 25 - 13
thorlcr/activities/join/thjoinslave.cpp

@@ -64,7 +64,6 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify
     Owned<IJoinHelper> joinhelper;
     rowcount_t lhsProgressCount = 0, rhsProgressCount = 0;
     CriticalSection joinHelperCrit;
-    CRuntimeStatisticCollection spillStats;
     IHThorJoinBaseArg *helper;
     IHThorJoinArg *helperjn;
     IHThorDenormalizeArg *helperdn;
@@ -136,7 +135,7 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify
 
 public:
     JoinSlaveActivity(CGraphElementBase *_container, bool local)
-        : CSlaveActivity(_container), spillStats(spillStatistics)
+        : CSlaveActivity(_container, joinActivityStatistics)
     {
         islocal = local;
         switch (container.getKind())
@@ -458,7 +457,10 @@ public:
             leftStream.setown(iLoaderL->load(leftInputStream, abortSoon));
             isemptylhs = 0 == iLoaderL->numRows();
             stopLeftInput();
+
+            CRuntimeStatisticCollection spillStats(spillStatistics);
             mergeStats(spillStats, iLoaderL);
+            stats.merge(spillStats);
         }
         IEngineRowStream *rightInputStream = queryInputStream(1);
         if (isemptylhs&&((helper->getJoinFlags()&JFrightouter)==0))
@@ -478,7 +480,10 @@ public:
         {
             rightStream.setown(iLoaderR->load(rightInputStream, abortSoon));
             stopRightInput();
+
+            CRuntimeStatisticCollection spillStats(spillStatistics);
             mergeStats(spillStats, iLoaderR);
+            stats.merge(spillStats);
         }
     }
     bool doglobaljoin()
@@ -585,7 +590,9 @@ public:
             sorter->Gather(secondaryRowIf, secondaryInputStream, secondaryCompare, primarySecondaryCompare, primarySecondaryUpperCompare, primaryKeySerializer, primaryCompare, partitionRow, noSortOtherSide(), isUnstable(), abortSoon, primaryRowIf); // primaryKeySerializer *is* correct
         else
             sorter->Gather(secondaryRowIf, secondaryInputStream, secondaryCompare, nullptr, nullptr, nullptr, nullptr, partitionRow, noSortOtherSide(), isUnstable(), abortSoon, nullptr);
+        CRuntimeStatisticCollection spillStats(spillStatistics);
         mergeStats(spillStats, sorter);
+        stats.merge(spillStats);
         //MORE: Stats from spilling the primaryStream??
         partitionRow.clear();
         stopOtherInput();
@@ -613,19 +620,24 @@ public:
     }
     virtual void serializeStats(MemoryBuffer &mb) override
     {
-        CSlaveActivity::serializeStats(mb);
-        CriticalBlock b(joinHelperCrit);
-        if (!joinhelper)
-        {
-            mb.append(lhsProgressCount);
-            mb.append(rhsProgressCount);
-        }
-        else
         {
-            mb.append(joinhelper->getLhsProgress());
-            mb.append(joinhelper->getRhsProgress());
+            bool isSelfJoin = (TAKselfjoin == container.getKind() || TAKselfjoinlight != container.getKind());
+
+            CriticalBlock b(joinHelperCrit);
+            if (!joinhelper) // bit odd, but will leave as was for now.
+            {
+                stats.setStatistic(StNumLeftRows, lhsProgressCount);
+                if (!isSelfJoin)
+                    stats.setStatistic(StNumRightRows, rhsProgressCount);
+            }
+            else
+            {
+                stats.setStatistic(StNumLeftRows, joinhelper->getLhsProgress());
+                if (!isSelfJoin)
+                    stats.setStatistic(StNumRightRows, joinhelper->getRhsProgress());
+            }
         }
-        spillStats.serialize(mb);
+        PARENT::serializeStats(mb);
     }
 };
 

+ 1 - 41
thorlcr/activities/keyedjoin/thkeyedjoin-legacy.cpp

@@ -36,29 +36,11 @@ class CKeyedJoinMaster : public CMasterActivity
     bool localKey, remoteDataFiles;
     unsigned numTags;
     mptag_t tags[4];
-    ProgressInfoArray progressInfoArr;
-    UnsignedArray progressKinds;
-
 
 public:
-    CKeyedJoinMaster(CMasterGraphElement *info) : CMasterActivity(info)
+    CKeyedJoinMaster(CMasterGraphElement *info) : CMasterActivity(info, keyedJoinActivityStatistics)
     {
         helper = (IHThorKeyedJoinArg *) queryHelper();
-        //GH->JCS a bit wasteful creating this array each time.
-        progressKinds.append(StNumIndexSeeks);
-        progressKinds.append(StNumIndexScans);
-        progressKinds.append(StNumIndexAccepted);
-        progressKinds.append(StNumPostFiltered);
-        progressKinds.append(StNumPreFiltered);
-
-        if (helper->diskAccessRequired())
-        {
-            progressKinds.append(StNumDiskSeeks);
-            progressKinds.append(StNumDiskAccepted);
-            progressKinds.append(StNumDiskRejected);
-        }
-        ForEachItemIn(l, progressKinds)
-            progressInfoArr.append(*new ProgressInfo(queryJob()));
         localKey = false;
         numTags = 0;
         tags[0] = tags[1] = tags[2] = tags[3] = TAG_NULL;
@@ -317,28 +299,6 @@ public:
             }
         }
     }
-    virtual void deserializeStats(unsigned node, MemoryBuffer &mb)
-    {
-        CMasterActivity::deserializeStats(node, mb);
-        ForEachItemIn(p, progressKinds)
-        {
-            unsigned __int64 st;
-            mb.read(st);
-            progressInfoArr.item(p).set(node, st);
-        }
-    }
-    virtual void getEdgeStats(IStatisticGatherer & stats, unsigned idx)
-    {
-        //This should be an activity stats
-        CMasterActivity::getEdgeStats(stats, idx);
-        assertex(0 == idx);
-        ForEachItemIn(p, progressInfoArr)
-        {
-            ProgressInfo &progress = progressInfoArr.item(p);
-            progress.processInfo();
-            stats.addStatistic((StatisticKind)progressKinds.item(p), progress.queryTotal());
-        }
-    }
 };
 
 

+ 1 - 30
thorlcr/activities/keyedjoin/thkeyedjoin.cpp

@@ -28,7 +28,6 @@
 #include "thkeyedjoin.ipp"
 #include "jhtree.hpp"
 
-static const std::array<StatisticKind, 8> progressKinds{ StNumIndexSeeks, StNumIndexScans, StNumIndexAccepted, StNumPostFiltered, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected };
 
 class CKeyedJoinMaster : public CMasterActivity
 {
@@ -37,8 +36,6 @@ class CKeyedJoinMaster : public CMasterActivity
     MemoryBuffer initMb;
     unsigned numTags = 0;
     std::vector<mptag_t> tags;
-    ProgressInfoArray progressInfoArr;
-
     bool local = false;
     bool remoteKeyedLookup = false;
     bool remoteKeyedFetch = false;
@@ -250,14 +247,10 @@ class CKeyedJoinMaster : public CMasterActivity
 
 
 public:
-    CKeyedJoinMaster(CMasterGraphElement *info) : CMasterActivity(info)
+    CKeyedJoinMaster(CMasterGraphElement *info) : CMasterActivity(info, keyedJoinActivityStatistics)
     {
         helper = (IHThorKeyedJoinArg *) queryHelper();
-        unsigned numStats = helper->diskAccessRequired() ? 8 : 5; // see progressKinds array
-        for (unsigned s=0; s<numStats; s++)
-            progressInfoArr.append(*new ProgressInfo(queryJob()));
         reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) || (helper->getJoinFlags() & JFvarindexfilename);
-
         // NB: force options are there to force all parts to be remote, even if local to slave (handled on slave)
         remoteKeyedLookup = getOptBool(THOROPT_REMOTE_KEYED_LOOKUP, true);
         if (getOptBool(THOROPT_FORCE_REMOTE_KEYED_LOOKUP))
@@ -516,28 +509,6 @@ public:
             }
         }
     }
-    virtual void deserializeStats(unsigned node, MemoryBuffer &mb)
-    {
-        CMasterActivity::deserializeStats(node, mb);
-        ForEachItemIn(p, progressInfoArr)
-        {
-            unsigned __int64 st;
-            mb.read(st);
-            progressInfoArr.item(p).set(node, st);
-        }
-    }
-    virtual void getEdgeStats(IStatisticGatherer & stats, unsigned idx)
-    {
-        //This should be an activity stats
-        CMasterActivity::getEdgeStats(stats, idx);
-        assertex(0 == idx);
-        ForEachItemIn(p, progressInfoArr)
-        {
-            ProgressInfo &progress = progressInfoArr.item(p);
-            progress.processInfo();
-            stats.addStatistic(progressKinds[p], progress.queryTotal());
-        }
-    }
 };
 
 

+ 11 - 4
thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp

@@ -1645,7 +1645,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
 public:
     IMPLEMENT_IINTERFACE_USING(CSlaveActivity);
 
-    CKeyedJoinSlave(CGraphElementBase *_container) : CSlaveActivity(_container)
+    CKeyedJoinSlave(CGraphElementBase *_container) : CSlaveActivity(_container, keyedJoinActivityStatistics)
     {
 #ifdef TRACE_JOINGROUPS
         groupsPendsNoted = fetchReadBack = groupPendsEnded = doneGroupsDeQueued = wroteToFetchPipe = groupsComplete = 0;
@@ -2418,11 +2418,18 @@ public:
         info.unknownRowsOutput = true;
     }
 
-    void serializeStats(MemoryBuffer &mb)
+    virtual void serializeStats(MemoryBuffer &mb) override
     {
-        CSlaveActivity::serializeStats(mb);
+        constexpr StatisticKind mapping[] = { StNumIndexSeeks, StNumIndexScans, StNumIndexAccepted, StNumPostFiltered, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected };
         ForEachItemIn(s, _statsArr)
-            mb.append(_statsArr.item(s));
+        {
+            unsigned __int64 v = _statsArr.item(s);
+            if (0 == v)
+                continue;
+            stats.setStatistic(mapping[s], v);
+        }
+
+        CSlaveActivity::serializeStats(mb);
     }
 
 friend class CKeyedFetchHandler;

+ 29 - 21
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -770,8 +770,6 @@ public:
     }
 };
 
-enum AdditionStats { AS_Seeks, AS_Scans, AS_Accepted, AS_PostFiltered, AS_PreFiltered,  AS_DiskSeeks, AS_DiskAccepted, AS_DiskRejected };
-
 struct PartIO
 {
     PartIO() {}
@@ -1719,8 +1717,16 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
             unsigned partNo = partCopy & partMask;
             unsigned copy = partCopy >> partBits;
 
-            ScopedAtomic<unsigned __int64> diskRejected(activity.statsArr[AS_DiskRejected]);
-            ScopedAtomic<unsigned __int64> diskSeeks(activity.statsArr[AS_DiskSeeks]);
+            unsigned __int64 diskAccepted = 0;
+            unsigned __int64 diskRejected = 0;
+            unsigned __int64 diskSeeks = 0;
+            auto onScopeExitFunc = [&]()
+            {
+                activity.stats.mergeStatistic(StNumDiskAccepted, diskAccepted);
+                activity.stats.mergeStatistic(StNumDiskRejected, diskRejected);
+                activity.stats.mergeStatistic(StNumDiskSeeks, diskSeeks);
+            };
+            COnScopeExit scoped(onScopeExitFunc);
             for (unsigned r=0; r<processing.ordinality() && !stopped; r++)
             {
                 OwnedConstThorRow row = processing.getClear(r);
@@ -1757,7 +1763,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
                     // If !preserverOrder, right rows added to single array in jg, so pass 0
                     joinGroup->addRightMatchCompletePending(activity.preserveOrder ? indexPartNo : 0, sequence, joinFieldsSz, fetchRow);
 
-                    if (++activity.statsArr[AS_DiskAccepted] > activity.rowLimit)
+                    if (++diskAccepted > activity.rowLimit)
                         helper->onLimitExceeded();
                 }
                 else
@@ -1872,7 +1878,13 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
             CMessageBuffer msg;
             prepAndSend(msg, processing, selected, partNo, copy);
 
-            ScopedAtomic<unsigned __int64> diskSeeks(activity.statsArr[AS_DiskSeeks]);
+            unsigned __int64 diskSeeks = 0;
+            auto onScopeExitFunc = [&]()
+            {
+                activity.stats.mergeStatistic(StNumDiskSeeks, diskSeeks);
+            };
+            COnScopeExit scoped(onScopeExitFunc);
+
             unsigned numRows = processing.ordinality();
             // read back results and feed in to appropriate join groups.
 
@@ -1947,8 +1959,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
                     unsigned rejected = 0;
                     mb.read(accepted);
                     mb.read(rejected);
-                    activity.statsArr[AS_DiskAccepted] += accepted;
-                    activity.statsArr[AS_DiskRejected] += rejected;
+                    activity.stats.mergeStatistic(StNumDiskAccepted, accepted);
+                    activity.stats.mergeStatistic(StNumDiskRejected, rejected);
                 }
                 if (received == numRows)
                     break;
@@ -2079,8 +2091,6 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
     bool onFailTransform = false;
     bool keyHasTlk = false;
     std::vector<mptag_t> tags;
-    std::vector<RelaxedAtomic<unsigned __int64>> statsArr; // (seeks, scans, accepted, prefiltered, postfiltered, diskSeeks, diskAccepted, diskRejected)
-    unsigned numStats = 0;
 
     enum HandlerType { ht_remotekeylookup, ht_localkeylookup, ht_localfetch, ht_remotefetch };
     CHandlerContainer keyLookupHandlers;
@@ -2503,6 +2513,13 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
     {
         endOfInput = false;
         CJoinGroup *groupStart = nullptr;
+        unsigned __int64 preFiltered = 0;
+
+        auto onScopeExitFunc = [&]()
+        {
+            stats.mergeStatistic(StNumPreFiltered, preFiltered);
+        };
+        COnScopeExit scoped(onScopeExitFunc);
         do
         {
             if (queryAbortSoon())
@@ -2519,7 +2536,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
             if (helper->leftCanMatch(lhsRow))
                 jg.setown(queueLookup(lhsRow)); // NB: will block if excessive amount queued
             else
-                statsArr[AS_PreFiltered]++;
+                preFiltered++;
             if (!jg && ((joinFlags & JFleftonly) || (joinFlags & JFleftouter)))
             {
                 size32_t maxSz;
@@ -2795,7 +2812,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
 public:
     IMPLEMENT_IINTERFACE_USING(PARENT);
 
-    CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container), readAheadThread(*this), statsArr(8)
+    CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this)
     {
         helper = static_cast <IHThorKeyedJoinArg *> (queryHelper());
         reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) || (helper->getJoinFlags() & JFvarindexfilename);
@@ -2866,7 +2883,6 @@ public:
             initialized = true;
             joinFlags = helper->getJoinFlags();
             needsDiskRead = helper->diskAccessRequired();
-            numStats = needsDiskRead ? 8 : 5;
             fixedRecordSize = helper->queryIndexRecordSize()->getFixedSize(); // 0 if variable and unused
             onFailTransform = (0 != (joinFlags & JFonfail)) && (0 == (joinFlags & JFmatchAbortLimitSkips));
 
@@ -2896,8 +2912,6 @@ public:
             tlkKeyManagers.kill();
             partitionKey = false;
         }
-        for (auto &a : statsArr)
-            a = 0;
         // decode data from master. NB: can be resent and differ if in global loop
         data.read(indexName);
         data.read(totalIndexParts);
@@ -3248,12 +3262,6 @@ public:
         info.canStall = true;
         info.unknownRowsOutput = true;
     }
-    virtual void serializeStats(MemoryBuffer &mb) override
-    {
-        PARENT::serializeStats(mb);
-        for (unsigned s=0; s<numStats; s++)
-            mb.append(statsArr[s]);
-    }
     // IJoinProcessor
     virtual void onComplete(CJoinGroup *joinGroup) override
     {

+ 1 - 31
thorlcr/activities/lookupjoin/thlookupjoin.cpp

@@ -26,7 +26,6 @@ class CLookupJoinActivityMaster : public CMasterActivity
 {
     mptag_t broadcast2MpTag, broadcast3MpTag, lhsDistributeTag, rhsDistributeTag;
     unsigned failoversToLocal = 0;
-    Owned<CThorStats> localFailoverToStd;
     bool isGlobal = false;
 
     bool isAll() const
@@ -43,7 +42,7 @@ class CLookupJoinActivityMaster : public CMasterActivity
         return false;
     }
 public:
-    CLookupJoinActivityMaster(CMasterGraphElement * info) : CMasterActivity(info)
+    CLookupJoinActivityMaster(CMasterGraphElement * info) : CMasterActivity(info, lookupJoinActivityStatistics)
     {
         isGlobal = !container.queryLocal() && (queryJob().querySlaves()>1);
         if (isGlobal)
@@ -57,7 +56,6 @@ public:
                 rhsDistributeTag = container.queryJob().allocateMPTag();
             }
         }
-        localFailoverToStd.setown(new CThorStats(queryJob(), StNumSmartJoinSlavesDegradedToStd));
     }
     ~CLookupJoinActivityMaster()
     {
@@ -83,34 +81,6 @@ public:
             serializeMPtag(dst, rhsDistributeTag);
         }
     }
-    virtual void deserializeStats(unsigned node, MemoryBuffer &mb) override
-    {
-        CMasterActivity::deserializeStats(node, mb);
-
-        if (isSmartJoin(*this))
-        {
-            if (isGlobal)
-            {
-                unsigned _failoversToLocal;
-                mb.read(_failoversToLocal);
-                dbgassertex(0 == failoversToLocal || (_failoversToLocal == failoversToLocal)); // i.e. sanity check, all slaves must have agreed.
-                failoversToLocal = _failoversToLocal;
-            }
-            unsigned failoversToStd;
-            mb.read(failoversToStd);
-            localFailoverToStd->set(node, failoversToStd);
-        }
-    }
-    virtual void getActivityStats(IStatisticGatherer & stats) override
-    {
-        CMasterActivity::getActivityStats(stats);
-        if (isSmartJoin(*this))
-        {
-            if (isGlobal)
-                stats.addStatistic(StNumSmartJoinDegradedToLocal, failoversToLocal);
-            localFailoverToStd->getTotalStat(stats);
-        }
-    }
 };
 
 CActivityBase *createLookupJoinActivityMaster(CMasterGraphElement *container)

+ 7 - 5
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -1312,7 +1312,8 @@ protected:
 public:
     IMPLEMENT_IINTERFACE_USING(CSlaveActivity);
 
-    CInMemJoinBase(CGraphElementBase *_container) : CSlaveActivity(_container), HELPERBASE((HELPER *)queryHelper()), rhs(*this)
+    CInMemJoinBase(CGraphElementBase *_container, const StatisticsMapping &statsMapping = basicActivityStatistics)
+        : CSlaveActivity(_container, statsMapping), HELPERBASE((HELPER *)queryHelper()), rhs(*this)
     {
         gotRHS = false;
         rhsNext = NULL;
@@ -1719,6 +1720,7 @@ protected:
     using PARENT::queryInput;
     using PARENT::rhsRowLock;
     using PARENT::hasStarted;
+    using PARENT::stats;
 
     IHash *leftHash, *rightHash;
     ICompare *compareRight, *compareLeftRight;
@@ -2614,7 +2616,7 @@ public:
         }
         return dedup;
     }
-    CLookupJoinActivityBase(CGraphElementBase *_container) : PARENT(_container)
+    CLookupJoinActivityBase(CGraphElementBase *_container) : PARENT(_container, lookupJoinActivityStatistics)
     {
         rhsCollated = rhsCompacted = false;
         broadcast2MpTag = broadcast3MpTag = lhsDistributeTag = rhsDistributeTag = TAG_NULL;
@@ -2925,13 +2927,13 @@ public:
     }
     virtual void serializeStats(MemoryBuffer &mb) override
     {
-        CSlaveActivity::serializeStats(mb);
         if (isSmart())
         {
             if (isGlobal())
-                mb.append(aggregateFailoversToLocal); // NB: is going to be same for all slaves.
-            mb.append(aggregateFailoversToStandard);
+                stats.setStatistic(StNumSmartJoinDegradedToLocal, aggregateFailoversToLocal); // NB: is going to be same for all slaves.
+            stats.setStatistic(StNumSmartJoinSlavesDegradedToStd, aggregateFailoversToStandard);
         }
+        PARENT::serializeStats(mb);
     }
 };
 

+ 1 - 15
thorlcr/activities/loop/thloop.cpp

@@ -34,7 +34,6 @@ protected:
     CGraphBase *loopGraph = nullptr;
     unsigned emptyIterations = 0;
     unsigned maxEmptyLoopIterations = 1000;
-    Owned<CThorStats> loopCounterProgress;
     bool syncIterations = false;
     bool loopIsInGlobalGraph = false;
     mptag_t syncMpTag = TAG_NULL;
@@ -89,9 +88,8 @@ protected:
         return final;
     }
 public:
-    CLoopActivityMasterBase(CMasterGraphElement *info) : CMasterActivity(info)
+    CLoopActivityMasterBase(CMasterGraphElement *info) : CMasterActivity(info, loopActivityStatistics)
     {
-        loopCounterProgress.setown(new CThorStats(queryJob(), StNumIterations));
         maxEmptyLoopIterations = getOptUInt(THOROPT_LOOP_MAX_EMPTY, 1000);
         loopIsInGlobalGraph = container.queryOwner().isGlobal();
         loopGraph = nullptr;
@@ -144,18 +142,6 @@ public:
         if (loopIsInGlobalGraph)
             cancelReceiveMsg(RANK_ALL, syncMpTag);
     }
-    virtual void deserializeStats(unsigned node, MemoryBuffer &mb) override
-    {
-        CMasterActivity::deserializeStats(node, mb);
-        unsigned loopCounter;
-        mb.read(loopCounter);
-        loopCounterProgress->set(node, loopCounter);
-    }
-    virtual void getActivityStats(IStatisticGatherer & stats) override
-    {
-        CMasterActivity::getActivityStats(stats);
-        loopCounterProgress->getStats(stats, false);
-    }
 };
 
 

+ 5 - 5
thorlcr/activities/loop/thloopslave.cpp

@@ -78,7 +78,7 @@ protected:
         sendLoopingCount(0, 0);
     }
 public:
-    CLoopSlaveActivityBase(CGraphElementBase *_container) : CSlaveActivity(_container)
+    CLoopSlaveActivityBase(CGraphElementBase *_container) : CSlaveActivity(_container, loopActivityStatistics)
     {
         maxEmptyLoopIterations = getOptUInt(THOROPT_LOOP_MAX_EMPTY, 1000);
         loopIsInGlobalGraph = container.queryOwner().isGlobal();
@@ -115,15 +115,15 @@ public:
     {
         initMetaInfo(info);
     }
-    void processDone(MemoryBuffer &mb)
+    virtual void processDone(MemoryBuffer &mb) override
     {
         CSlaveActivity::processDone(mb);
         ((CSlaveGraph *)queryContainer().queryLoopGraph()->queryGraph())->serializeDone(mb);
     }
-    virtual void serializeStats(MemoryBuffer &mb)
+    virtual void serializeStats(MemoryBuffer &mb) override
     {
-        CSlaveActivity::serializeStats(mb);
-        mb.append(loopCounter);
+        stats.setStatistic(StNumIterations, loopCounter);
+        PARENT::serializeStats(mb);
     }
 };
 

+ 15 - 11
thorlcr/activities/msort/thgroupsortslave.cpp

@@ -40,11 +40,10 @@ class CLocalSortSlaveActivity : public CSlaveActivity
     Owned<IRowStream> out;
     bool unstable, eoi;
     CriticalSection statsCs;
-    CRuntimeStatisticCollection spillStats;
 
 public:
     CLocalSortSlaveActivity(CGraphElementBase *_container)
-        : CSlaveActivity(_container), spillStats(spillStatistics)
+        : CSlaveActivity(_container, sortActivityStatistics)
     {
         helper = (IHThorSortArg *)queryHelper();
         iCompare = helper->queryCompare();
@@ -66,14 +65,15 @@ public:
         if (0 == iLoader->numRows())
             eoi = true;
     }
-    void serializeStats(MemoryBuffer &mb)
+    virtual void serializeStats(MemoryBuffer &mb) override
     {
-        CSlaveActivity::serializeStats(mb);
-
-        CriticalBlock block(statsCs);
-        CRuntimeStatisticCollection mergedStats(spillStats);
-        mergeStats(mergedStats, iLoader);
-        mergedStats.serialize(mb);
+        CRuntimeStatisticCollection spillStats(spillStatistics);
+        {
+            CriticalBlock block(statsCs);
+            mergeStats(spillStats, iLoader);
+        }
+        stats.merge(spillStats);
+        PARENT::serializeStats(mb);
     }
 
     virtual void stop()
@@ -81,8 +81,12 @@ public:
         out.clear();
         if (hasStarted())
         {
-            CriticalBlock block(statsCs);
-            mergeStats(spillStats, iLoader);
+            CRuntimeStatisticCollection spillStats(spillStatistics);
+            {
+                CriticalBlock block(statsCs);
+                mergeStats(spillStats, iLoader);
+            }
+            stats.merge(spillStats);
             iLoader.clear();
         }
         PARENT::stop();

+ 1 - 13
thorlcr/activities/msort/thmsort.cpp

@@ -34,20 +34,8 @@
 
 class CSortBaseActivityMaster : public CMasterActivity
 {
-    CThorStatsCollection extraStats;
 public:
-    CSortBaseActivityMaster(CMasterGraphElement * info) : CMasterActivity(info), extraStats(spillStatistics) { }
-
-    virtual void deserializeStats(unsigned node, MemoryBuffer &mb)
-    {
-        CMasterActivity::deserializeStats(node, mb);
-        extraStats.deserialize(node, mb);
-    }
-    virtual void getActivityStats(IStatisticGatherer & stats)
-    {
-        CMasterActivity::getActivityStats(stats);
-        extraStats.getStats(stats);
-    }
+    CSortBaseActivityMaster(CMasterGraphElement * info) : CMasterActivity(info, sortActivityStatistics) { }
 };
 
 class CGroupSortActivityMaster : public CSortBaseActivityMaster

+ 12 - 8
thorlcr/activities/msort/thmsortslave.cpp

@@ -49,7 +49,6 @@ class MSortSlaveActivity : public CSlaveActivity
     Owned<IBarrier> barrier;
     SocketEndpoint server;
     CriticalSection statsCs;
-    CRuntimeStatisticCollection spillStats;
 
     bool isUnstable()
     {
@@ -57,7 +56,7 @@ class MSortSlaveActivity : public CSlaveActivity
     }
 
 public:
-    MSortSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container), spillStats(spillStatistics)
+    MSortSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container, sortActivityStatistics)
     {
         portbase = 0;
         totalrows = RCUNSET;
@@ -176,11 +175,13 @@ public:
     {
         ActPrintLog("MSortSlaveActivity::kill");
 
+        CRuntimeStatisticCollection spillStats(spillStatistics);
         {
             CriticalBlock block(statsCs);
             mergeStats(spillStats, sorter);
             sorter.clear();
         }
+        stats.merge(spillStats);
         if (portbase)
         {
             queryJobChannel().freePort(portbase, NUMSLAVEPORTS);
@@ -190,12 +191,15 @@ public:
     }
     virtual void serializeStats(MemoryBuffer &mb) override
     {
-        CSlaveActivity::serializeStats(mb);
-
-        CriticalBlock block(statsCs);
-        CRuntimeStatisticCollection mergedStats(spillStats);
-        mergeStats(mergedStats, sorter);
-        mergedStats.serialize(mb);
+        {
+            CRuntimeStatisticCollection spillStats(spillStatistics);
+            {
+                CriticalBlock block(statsCs);
+                mergeStats(spillStats, sorter);
+            }
+            stats.merge(spillStats);
+        }
+        PARENT::serializeStats(mb);    
     }
     CATCH_NEXTROW()
     {

+ 11 - 9
thorlcr/activities/selfjoin/thselfjoinslave.cpp

@@ -49,7 +49,6 @@ private:
     CriticalSection joinHelperCrit;
     Owned<IBarrier> barrier;
     SocketEndpoint server;
-    CRuntimeStatisticCollection spillStats;
 
     bool isUnstable()
     {
@@ -64,7 +63,9 @@ private:
 #endif
         Owned<IThorRowLoader> iLoader = createThorRowLoader(*this, ::queryRowInterfaces(input), compare, isUnstable() ? stableSort_none : stableSort_earlyAlloc, rc_mixed, SPILL_PRIORITY_SELFJOIN);
         Owned<IRowStream> rs = iLoader->load(inputStream, abortSoon);
+        CRuntimeStatisticCollection spillStats(spillStatistics);
         mergeStats(spillStats, iLoader);  // Not sure of the best policy if rs spills later on.
+        stats.merge(spillStats);
         PARENT::stopInput(0);
         return rs.getClear();
     }
@@ -92,7 +93,7 @@ private:
 
 public:
     SelfJoinSlaveActivity(CGraphElementBase *_container, bool _isLocal, bool _isLightweight)
-        : CSlaveActivity(_container), spillStats(spillStatistics)
+        : CSlaveActivity(_container, joinActivityStatistics)
     {
         helper = static_cast <IHThorJoinArg *> (queryHelper());
         isLocal = _isLocal||_isLightweight;
@@ -234,14 +235,15 @@ public:
     }
     virtual void serializeStats(MemoryBuffer &mb) override
     {
+        {
+            CriticalBlock b(joinHelperCrit);
+            rowcount_t p = joinhelper?joinhelper->getLhsProgress():0;
+            stats.setStatistic(StNumLeftRows, p);
+            CRuntimeStatisticCollection spillStats(spillStatistics);
+            mergeStats(spillStats, sorter);    // No danger of a race with reset() because that never replaces a valid sorter
+            stats.merge(spillStats);
+        }
         CSlaveActivity::serializeStats(mb);
-        CriticalBlock b(joinHelperCrit);
-        rowcount_t p = joinhelper?joinhelper->getLhsProgress():0;
-        mb.append(p);
-
-        CRuntimeStatisticCollection mergedStats(spillStats);
-        mergeStats(mergedStats, sorter);    // No danger of a race with reset() because that never replaces a valid sorter
-        mergedStats.serialize(mb);
     }
 };
 

+ 5 - 51
thorlcr/activities/thdiskbase.cpp

@@ -29,10 +29,11 @@
 #include "eclhelper.hpp" // tmp for IHThorArg interface
 #include "thdiskbase.ipp"
 
-CDiskReadMasterBase::CDiskReadMasterBase(CMasterGraphElement *info) : CMasterActivity(info), diskStats(diskReadRemoteStatistics)
+
+CDiskReadMasterBase::CDiskReadMasterBase(CMasterGraphElement *info) :
+    CMasterActivity(info, diskReadActivityStatistics)
 {
     hash = NULL;
-    inputProgress.setown(new ProgressInfo(queryJob()));
 }
 
 void CDiskReadMasterBase::init()
@@ -115,30 +116,6 @@ void CDiskReadMasterBase::serializeSlaveData(MemoryBuffer &dst, unsigned slave)
         CSlavePartMapping::serializeNullMap(dst);
 }
 
-void CDiskReadMasterBase::deserializeStats(unsigned node, MemoryBuffer &mb)
-{
-    CMasterActivity::deserializeStats(node, mb);
-    rowcount_t progress;
-    mb.read(progress);
-    inputProgress->set(node, progress);
-
-    diskStats.deserialize(node, mb);
-}
-
-void CDiskReadMasterBase::getActivityStats(IStatisticGatherer & stats)
-{
-    CMasterActivity::getActivityStats(stats);
-    diskStats.getStats(stats);
-}
-
-void CDiskReadMasterBase::getEdgeStats(IStatisticGatherer & stats, unsigned idx)
-{
-    //This should be an activity stats
-    CMasterActivity::getEdgeStats(stats, idx);
-    inputProgress->processInfo();
-    stats.addStatistic(StNumDiskRowsRead, inputProgress->queryTotal());
-}
-
 /////////////////
 
 void CWriteMasterBase::init()
@@ -324,36 +301,13 @@ void CWriteMasterBase::publish()
     }
 }
 
-CWriteMasterBase::CWriteMasterBase(CMasterGraphElement *info) : CMasterActivity(info), diskStats(diskWriteRemoteStatistics)
+CWriteMasterBase::CWriteMasterBase(CMasterGraphElement *info)
+     : CMasterActivity(info, diskWriteActivityStatistics)
 {
-    publishReplicatedDone = !globals->getPropBool("@replicateAsync", true);
-    replicateProgress.setown(new ProgressInfo(queryJob()));
-
     diskHelperBase = (IHThorDiskWriteArg *)queryHelper();
     targetOffset = 0;
 }
 
-void CWriteMasterBase::deserializeStats(unsigned node, MemoryBuffer &mb)
-{
-    CMasterActivity::deserializeStats(node, mb);
-    unsigned repPerc;
-    mb.read(repPerc);
-    replicateProgress->set(node, repPerc);
-
-    diskStats.deserialize(node, mb);
-}
-
-void CWriteMasterBase::getActivityStats(IStatisticGatherer & stats)
-{
-    CMasterActivity::getActivityStats(stats);
-    if (publishReplicatedDone)
-    {
-        replicateProgress->processInfo();
-        stats.addStatistic(StPerReplicated, replicateProgress->queryAverage() * 10000);
-    }
-    diskStats.getStats(stats);
-}
-
 void CWriteMasterBase::preStart(size32_t parentExtractSz, const byte *parentExtract)
 {
     CMasterActivity::preStart(parentExtractSz, parentExtract);

+ 0 - 10
thorlcr/activities/thdiskbase.ipp

@@ -31,8 +31,6 @@ protected:
     Owned<IFileDescriptor> fileDesc;
     Owned<CSlavePartMapping> mapping;
     IHash *hash;
-    Owned<ProgressInfo> inputProgress;
-    CThorStatsCollection diskStats;
     StringAttr fileName;
 
 public:
@@ -40,16 +38,10 @@ public:
     virtual void init();
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave);
     virtual void validateFile(IDistributedFile *file) { }
-    virtual void deserializeStats(unsigned node, MemoryBuffer &mb);
-    virtual void getActivityStats(IStatisticGatherer & stats);
-    virtual void getEdgeStats(IStatisticGatherer & stats, unsigned idx);
 };
 
 class CWriteMasterBase : public CMasterActivity
 {
-    bool publishReplicatedDone;
-    Owned<ProgressInfo> replicateProgress;
-    CThorStatsCollection diskStats;
     __int64 recordsProcessed;
     bool published;
     StringAttr fileName;
@@ -63,8 +55,6 @@ protected:
     void publish();
 public:
     CWriteMasterBase(CMasterGraphElement *info);
-    virtual void deserializeStats(unsigned node, MemoryBuffer &mb);
-    virtual void getActivityStats(IStatisticGatherer & stats);
     virtual void preStart(size32_t parentExtractSz, const byte *parentExtract);
     virtual void init();
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave);

+ 18 - 15
thorlcr/activities/thdiskbaseslave.cpp

@@ -202,7 +202,7 @@ const char * CDiskPartHandlerBase::queryLogicalFilename(const void * row)
 
 //////////////////////////////////////////////
 
-CDiskReadSlaveActivityBase::CDiskReadSlaveActivityBase(CGraphElementBase *_container, IHThorArg *_helper) : CSlaveActivity(_container)
+CDiskReadSlaveActivityBase::CDiskReadSlaveActivityBase(CGraphElementBase *_container, IHThorArg *_helper) : CSlaveActivity(_container, diskReadActivityStatistics)
 {
     if (_helper)
         baseHelper.set(_helper);
@@ -292,13 +292,14 @@ IThorRowInterfaces * CDiskReadSlaveActivityBase::queryProjectedDiskRowInterfaces
 
 void CDiskReadSlaveActivityBase::serializeStats(MemoryBuffer &mb)
 {
-    CSlaveActivity::serializeStats(mb);
-    mb.append(diskProgress);
-
     CRuntimeStatisticCollection activeStats(diskReadRemoteStatistics);
     if (partHandler)
+    {
         partHandler->gatherStats(activeStats);
-    activeStats.serialize(mb);
+        stats.merge(activeStats);
+    }
+    stats.setStatistic(StNumDiskRowsRead, diskProgress);
+    PARENT::serializeStats(mb);
 }
 
 
@@ -431,13 +432,15 @@ void CDiskWriteSlaveActivityBase::close()
         }
 
         Owned<IFileIO> tmpFileIO;
+        CRuntimeStatisticCollection activeStats(diskReadRemoteStatistics);
         {
             CriticalBlock block(statsCs);
-            mergeStats(fileStats, outputIO);
+            mergeStats(activeStats, outputIO);
 
             // ensure it is released/destroyed after releasing crit, since the IFileIO might involve a final copy and take considerable time.
             tmpFileIO.setown(outputIO.getClear());
         }
+        stats.merge(activeStats);
 
         if (!rfsQueryParallel && dlfn.isExternal() && !lastNode())
         {
@@ -461,7 +464,7 @@ void CDiskWriteSlaveActivityBase::close()
 }
 
 CDiskWriteSlaveActivityBase::CDiskWriteSlaveActivityBase(CGraphElementBase *container)
-: ProcessSlaveActivity(container), fileStats(diskWriteRemoteStatistics)
+    : ProcessSlaveActivity(container, diskWriteActivityStatistics)
 {
     diskHelperBase = static_cast <IHThorDiskWriteArg *> (queryHelper());
     grouped = false;
@@ -511,14 +514,14 @@ void CDiskWriteSlaveActivityBase::abort()
 
 void CDiskWriteSlaveActivityBase::serializeStats(MemoryBuffer &mb)
 {
-    CriticalBlock block(statsCs);
-
-    ProcessSlaveActivity::serializeStats(mb);
-    mb.append(replicateDone);
-
-    CRuntimeStatisticCollection activeStats(fileStats);
-    mergeStats(activeStats, outputIO);
-    activeStats.serialize(mb);
+    CRuntimeStatisticCollection activeStats(diskWriteRemoteStatistics);
+    {
+        CriticalBlock block(statsCs);
+        mergeStats(activeStats, outputIO);
+    }
+    stats.merge(activeStats);
+    stats.setStatistic(StPerReplicated, replicateDone);
+    PARENT::serializeStats(mb);
 }
 
 // ICopyFileProgress

+ 0 - 1
thorlcr/activities/thdiskbaseslave.ipp

@@ -126,7 +126,6 @@ protected:
     unsigned usageCount;
     CDfsLogicalFileName dlfn;
     StringBuffer tempExternalName;
-    CRuntimeStatisticCollection fileStats;
     CriticalSection statsCs;
 
     void open();

+ 8 - 178
thorlcr/graph/thgraphmaster.cpp

@@ -362,9 +362,8 @@ void CSlaveMessageHandler::threadmain()
 
 //////////////////////
 
-static const StatisticsMapping activityStatsMapping({StTimeLocalExecute, StTimeBlocked});
-
-CMasterActivity::CMasterActivity(CGraphElementBase *_container) : CActivityBase(_container), threaded("CMasterActivity", this), actStats(activityStatsMapping)
+CMasterActivity::CMasterActivity(CGraphElementBase *_container, const StatisticsMapping &statsMapping)
+    : CActivityBase(_container), threaded("CMasterActivity", this), statsCollection(statsMapping)
 {
     notedWarnings = createThreadSafeBitSet();
     mpTag = TAG_NULL;
@@ -516,7 +515,7 @@ void CMasterActivity::reset()
 void CMasterActivity::deserializeStats(unsigned node, MemoryBuffer &mb)
 {
     CriticalBlock b(progressCrit); // don't think needed
-    deserializeActivityStats(node, mb);
+    statsCollection.deserialize(node, mb);
     rowcount_t count;
     for (auto &collection: edgeStatsVector)
     {
@@ -525,18 +524,9 @@ void CMasterActivity::deserializeStats(unsigned node, MemoryBuffer &mb)
     }
 }
 
-void CMasterActivity::deserializeActivityStats(unsigned node, MemoryBuffer &mb)
-{
-    unsigned __int64 localTimeNs, blockedTimeNs;
-    mb.read(localTimeNs);
-    mb.read(blockedTimeNs);
-    actStats.setStatistic(node, StTimeLocalExecute, localTimeNs);
-    actStats.setStatistic(node, StTimeBlocked, blockedTimeNs);
-}
-
 void CMasterActivity::getActivityStats(IStatisticGatherer & stats)
 {
-    actStats.getStats(stats);
+    statsCollection.getStats(stats);
 }
 
 void CMasterActivity::getEdgeStats(IStatisticGatherer & stats, unsigned idx)
@@ -2159,7 +2149,7 @@ public:
 // CMasterGraph impl.
 //
 
-CMasterGraph::CMasterGraph(CJobChannel &jobChannel) : CGraphBase(jobChannel)
+CMasterGraph::CMasterGraph(CJobChannel &jobChannel) : CGraphBase(jobChannel), graphStats(graphStatistics)
 {
     jobM = (CJobMaster *)&jobChannel.queryJob();
     mpTag = queryJob().allocateMPTag();
@@ -2167,7 +2157,6 @@ CMasterGraph::CMasterGraph(CJobChannel &jobChannel) : CGraphBase(jobChannel)
     waitBarrierTag = queryJob().allocateMPTag();
     startBarrier = jobChannel.createBarrier(startBarrierTag);
     waitBarrier = jobChannel.createBarrier(waitBarrierTag);
-    statNumExecutions.setown(new CThorStats(queryJob(), StNumExecutions));
 }
 
 
@@ -2720,10 +2709,7 @@ bool CMasterGraph::deserializeStats(unsigned node, MemoryBuffer &mb)
 {
     CriticalBlock b(createdCrit);
 
-    unsigned numExecutions;
-    mb.read(numExecutions);
-    statNumExecutions->set(node, numExecutions);
-
+    graphStats.deserialize(node, mb);
     unsigned count;
     mb.read(count);
     if (count)
@@ -2790,7 +2776,7 @@ void CMasterGraph::getStats(IStatisticGatherer &stats)
 {
     // graph specific stats
 
-    statNumExecutions->getStats(stats, false);
+    graphStats.getStats(stats);
 
     Owned<IThorActivityIterator> iter;
     if (queryOwner() && !isGlobal())
@@ -2844,133 +2830,13 @@ IThorResult *CMasterGraph::createGraphLoopResult(CActivityBase &activity, IThorR
 
 ///////////////////////////////////////////////////
 
-static bool suppressStatisticIfZero(StatisticKind kind)
-{
-    switch (kind)
-    {
-    case StNumSpills:
-    case StSizeSpillFile:
-    case StTimeSpillElapsed:
-    case StNumDiskRetries:
-        return true;
-    default:
-        break;
-    }
-    return false;
-}
-
-
-///////////////////////////////////////////////////
-
-CThorStats::CThorStats(CJobBase &_ctx, StatisticKind _kind) : ctx(_ctx), kind(_kind)
-{
-    unsigned c = queryClusterWidth();
-    while (c--) counts.append(0);
-    reset();
-}
-
-void CThorStats::extract(unsigned node, const CRuntimeStatisticCollection & stats)
-{
-    set(node, stats.getStatisticValue(kind));
-}
-
-void CThorStats::set(unsigned node, unsigned __int64 count)
-{
-    counts.replace(count, node);
-}
-
-void CThorStats::reset()
-{
-    tot = max = avg = 0;
-    min = (unsigned __int64) -1;
-    minNode = maxNode = maxSkew = minSkew = 0;
-}
-
-void CThorStats::calculateSkew()
-{
-    unsigned count = counts.ordinality();
-    double _avg = (double)tot/count;
-    if (_avg)
-    {
-        if (max > ctx.querySlaves()) // i.e. if small count, suppress skew stats.
-        {
-            //MORE: Range protection on maxSkew?
-            maxSkew = (unsigned)(10000.0 * (((double)max-_avg)/_avg));
-            minSkew = (unsigned)(10000.0 * ((_avg-(double)min)/_avg));
-        }
-        avg = (unsigned __int64)_avg;
-    }
-}
-
-void CThorStats::tallyValue(unsigned __int64 thiscount, unsigned n)
-{
-    tot += thiscount;
-    if (thiscount > max)
-    {
-        max = thiscount;
-        maxNode = n;
-    }
-    if (thiscount < min)
-    {
-        min = thiscount;
-        minNode = n;
-    }
-}
-
-void CThorStats::processTotal()
-{
-    reset();
-    ForEachItemIn(n, counts)
-    {
-        unsigned __int64 thiscount = counts.item(n);
-        tallyValue(thiscount, n+1);
-    }
-}
-
-void CThorStats::processInfo()
-{
-    processTotal();
-    calculateSkew();
-}
-
-void CThorStats::getTotalStat(IStatisticGatherer & stats)
-{
-    processTotal();
-    stats.addStatistic(kind, tot);
-}
-
-void CThorStats::getStats(IStatisticGatherer & stats, bool suppressMinMaxWhenEqual)
-{
-    processInfo();
-    if ((0 == tot) && suppressStatisticIfZero(kind))
-        return;
-
-    //MORE: For most measures (not time stamps etc.) it would be sensible to output the total here....
-    if (!suppressMinMaxWhenEqual || (maxSkew != minSkew))
-    {
-        stats.addStatistic((StatisticKind)(kind|StMinX), min);
-        stats.addStatistic((StatisticKind)(kind|StMaxX), max);
-        stats.addStatistic((StatisticKind)(kind|StAvgX), avg);
-    }
-
-    if (maxSkew != minSkew)
-    {
-        stats.addStatistic((StatisticKind)(kind|StSkewMin), -(__int64)minSkew); // Save minimum as a negative value so consistent
-        stats.addStatistic((StatisticKind)(kind|StSkewMax), maxSkew);
-        stats.addStatistic((StatisticKind)(kind|StNodeMin), minNode);
-        stats.addStatistic((StatisticKind)(kind|StNodeMax), maxNode);
-    }
-}
-
-///////////////////////////////////////////////////
-
 const StatisticsMapping CThorEdgeCollection::edgeStatsMapping({StNumRowsProcessed, StNumStarts, StNumStops});
 
 void CThorEdgeCollection::set(unsigned node, unsigned __int64 value)
 {
     unsigned __int64 numRows = value & THORDATALINK_COUNT_MASK;
     byte stop = (value & THORDATALINK_STOPPED) ? 1 : 0;
-    byte start = (value & THORDATALINK_STARTED) ? 1 : 0;
+    byte start = stop || (value & THORDATALINK_STARTED) ? 1 : 0; // start implied if stopped, keeping previous semantics
     setStatistic(node, StNumRowsProcessed, numRows);
     setStatistic(node, StNumStops, stop);
     setStatistic(node, StNumStarts, start);
@@ -2978,42 +2844,6 @@ void CThorEdgeCollection::set(unsigned node, unsigned __int64 value)
 
 ///////////////////////////////////////////////////
 
-ProgressInfo::ProgressInfo(CJobBase &ctx) : CThorStats(ctx, StNumRowsProcessed)
-{
-    startCount = stopCount = 0;
-}
-void ProgressInfo::processInfo() // reimplement as counts have special flags (i.e. stop/start)
-{
-    reset();
-    startCount = stopCount = 0;
-    ForEachItemIn(n, counts)
-    {
-        unsigned __int64 thiscount = counts.item(n);
-        if (thiscount & THORDATALINK_STOPPED)
-        {
-            startCount++;
-            stopCount++;
-        }
-        else if (thiscount & THORDATALINK_STARTED)
-            startCount++;
-        thiscount = thiscount & THORDATALINK_COUNT_MASK;
-        tallyValue(thiscount, n+1);
-    }
-    calculateSkew();
-}
-
-void ProgressInfo::getStats(IStatisticGatherer & stats)
-{
-    CThorStats::getStats(stats, true);
-    stats.addStatistic(kind, tot);
-    stats.addStatistic(StNumSlaves, counts.ordinality());
-    stats.addStatistic(StNumStarts, startCount);
-    stats.addStatistic(StNumStops, stopCount);
-}
-
-
-///////////////////////////////////////////////////
-
 CJobMaster *createThorGraph(const char *graphName, IConstWorkUnit &workunit, ILoadedDllEntry *querySo, bool sendSo, const SocketEndpoint &agentEp)
 {
     Owned<CJobMaster> jobMaster = new CJobMaster(workunit, graphName, querySo, sendSo, agentEp);

+ 4 - 48
thorlcr/graph/thgraphmaster.ipp

@@ -35,39 +35,6 @@
 interface ILoadedDllEntry;
 interface IConstWUGraphProgress;
 
-class graphmaster_decl CThorStats : public CInterface
-{
-protected:
-    CJobBase &ctx;
-    unsigned __int64 max, min, tot, avg;
-    unsigned maxSkew, minSkew, minNode, maxNode;
-    UInt64Array counts;
-    StatisticKind kind;
-
-public:
-    CThorStats(CJobBase &ctx, StatisticKind _kind);
-    void reset();
-    virtual void processInfo();
-
-    unsigned __int64 queryTotal() { return tot; }
-    unsigned __int64 queryAverage() { return avg; }
-    unsigned __int64 queryMin() { return min; }
-    unsigned __int64 queryMax() { return max; }
-    unsigned queryMinSkew() { return minSkew; }
-    unsigned queryMaxSkew() { return maxSkew; }
-    unsigned queryMaxNode() { return maxNode; }
-    unsigned queryMinNode() { return minNode; }
-
-    void extract(unsigned node, const CRuntimeStatisticCollection & stats);
-    void set(unsigned node, unsigned __int64 count);
-    void getTotalStat(IStatisticGatherer & stats);
-    void getStats(IStatisticGatherer & stats, bool suppressMinMaxWhenEqual);
-
-protected:
-    void processTotal();
-    void calculateSkew();
-    void tallyValue(unsigned __int64 value, unsigned node);
-};
 
 class graphmaster_decl CThorStatsCollection : public CInterface
 {
@@ -105,17 +72,6 @@ public:
     void set(unsigned node, unsigned __int64 value);
 };
 
-class graphmaster_decl ProgressInfo : public CThorStats
-{
-    unsigned startCount, stopCount;
-public:
-    ProgressInfo(CJobBase &ctx);
-
-    virtual void processInfo();
-    void getStats(IStatisticGatherer & stats);
-};
-typedef CIArrayOf<ProgressInfo> ProgressInfoArray;
-
 class CJobMaster;
 class CMasterGraphElement;
 class graphmaster_decl CMasterGraph : public CGraphBase
@@ -125,7 +81,8 @@ class graphmaster_decl CMasterGraph : public CGraphBase
     Owned<IFatalHandler> fatalHandler;
     CriticalSection exceptCrit;
     bool sentGlobalInit = false;
-    Owned<CThorStats> statNumExecutions;
+    CThorStatsCollection graphStats;
+
 
     CReplyCancelHandler activityInitMsgHandler, bcastMsgHandler, executeReplyMsgHandler;
 
@@ -280,7 +237,7 @@ class graphmaster_decl CMasterActivity : public CActivityBase, implements IThrea
 
 protected:
     std::vector<OwnedMalloc<CThorEdgeCollection>> edgeStatsVector;
-    CThorStatsCollection actStats;
+    CThorStatsCollection statsCollection;
     IBitSet *notedWarnings;
 
     void addReadFile(IDistributedFile *file, bool temp=false);
@@ -289,11 +246,10 @@ protected:
 public:
     IMPLEMENT_IINTERFACE_USING(CActivityBase)
 
-    CMasterActivity(CGraphElementBase *container);
+    CMasterActivity(CGraphElementBase *container, const StatisticsMapping &actStatsMapping = basicActivityStatistics);
     ~CMasterActivity();
 
     virtual void deserializeStats(unsigned node, MemoryBuffer &mb);
-    virtual void deserializeActivityStats(unsigned node, MemoryBuffer &mb);
     virtual void getActivityStats(IStatisticGatherer & stats);
     virtual void getEdgeStats(IStatisticGatherer & stats, unsigned idx);
     virtual void init();

+ 14 - 10
thorlcr/graph/thgraphslave.cpp

@@ -126,7 +126,8 @@ bool CThorInput::isFastThrough() const
 }
 // 
 
-CSlaveActivity::CSlaveActivity(CGraphElementBase *_container) : CActivityBase(_container), CEdgeProgress(this)
+CSlaveActivity::CSlaveActivity(CGraphElementBase *_container, const StatisticsMapping &statsMapping)
+    : CActivityBase(_container), stats(statsMapping), CEdgeProgress(this)
 {
     data = NULL;
 }
@@ -550,16 +551,15 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const
     return localCycles-blockedCycles;
 }
 
-void CSlaveActivity::serializeActivityStats(MemoryBuffer &mb) const
-{
-    mb.append((unsigned __int64)cycle_to_nanosec(queryLocalCycles()));
-    mb.append((unsigned __int64)cycle_to_nanosec(queryBlockedCycles()));
-}
-
 void CSlaveActivity::serializeStats(MemoryBuffer &mb)
 {
-    CriticalBlock b(crit);
-    serializeActivityStats(mb);
+    CriticalBlock b(crit); // JCSMORE not sure what this is protecting..
+
+    // JCS->GH - should these be serialized as cycles, and a different mapping used on master?
+    stats.setStatistic(StTimeLocalExecute, (unsigned __int64)cycle_to_nanosec(queryLocalCycles()));
+    stats.setStatistic(StTimeBlocked, (unsigned __int64)cycle_to_nanosec(queryBlockedCycles()));
+
+    stats.serialize(mb);
     ForEachItemIn(i, outputs)
     {
         IThorDataLink *output = queryOutput(i);
@@ -1229,7 +1229,11 @@ bool CSlaveGraph::serializeStats(MemoryBuffer &mb)
 {
     unsigned beginPos = mb.length();
     mb.append(queryGraphId());
-    mb.append(numExecuted);
+
+    CRuntimeStatisticCollection stats(graphStatistics);
+    stats.setStatistic(StNumExecutions, numExecuted);
+    stats.serialize(mb);
+
     unsigned cPos = mb.length();
     unsigned count = 0;
     mb.append(count);

+ 6 - 5
thorlcr/graph/thgraphslave.hpp

@@ -203,6 +203,8 @@ protected:
     CThorInputArray inputs;
     IPointerArrayOf<IThorDataLink> outputs;
     IPointerArrayOf<IEngineRowStream> outputStreams;
+    CRuntimeStatisticCollection stats;
+
     IThorDataLink *input = nullptr;
     bool inputStopped = false;
     unsigned inputSourceIdx = 0;
@@ -224,7 +226,7 @@ protected:
 public:
     IMPLEMENT_IINTERFACE_USING(CActivityBase)
 
-    CSlaveActivity(CGraphElementBase *container);
+    CSlaveActivity(CGraphElementBase *container, const StatisticsMapping &statsMapping = basicActivityStatistics);
     ~CSlaveActivity();
     void setRequireInitData(bool tf)
     {
@@ -254,7 +256,6 @@ public:
     void stopInput(unsigned index, const char *extra=NULL);
     void stopAllInputs();
     virtual void serializeStats(MemoryBuffer &mb);
-    virtual void serializeActivityStats(MemoryBuffer &mb) const;
     void debugRequest(unsigned edgeIdx, MemoryBuffer &msg);
     bool canStall() const;
     bool isFastThrough() const;
@@ -309,7 +310,7 @@ protected:
     void lateStart(bool any);
 
 public:
-    CSlaveLateStartActivity(CGraphElementBase *container) : CSlaveActivity(container)
+    CSlaveLateStartActivity(CGraphElementBase *container, const StatisticsMapping &statsMapping) : CSlaveActivity(container, statsMapping)
     {
     }
     virtual void start() override;
@@ -392,8 +393,8 @@ protected:
 protected:
     void onStartStrands();
 public:
-    CThorStrandedActivity(CGraphElementBase *container)
-        : CSlaveActivity(container), strandOptions(*container), active(0)
+    CThorStrandedActivity(CGraphElementBase *container, const StatisticsMapping &statsMapping = basicActivityStatistics)
+        : CSlaveActivity(container, statsMapping), strandOptions(*container), active(0)
     {
     }
 

+ 2 - 1
thorlcr/slave/slave.cpp

@@ -49,7 +49,8 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
 
 // ProcessSlaveActivity
 
-ProcessSlaveActivity::ProcessSlaveActivity(CGraphElementBase *container) : CSlaveActivity(container), threaded("ProcessSlaveActivity", this)
+ProcessSlaveActivity::ProcessSlaveActivity(CGraphElementBase *container, const StatisticsMapping &statsMapping)
+    : CSlaveActivity(container, statsMapping), threaded("ProcessSlaveActivity", this)
 {
 }
 

+ 1 - 1
thorlcr/slave/slave.ipp

@@ -42,7 +42,7 @@ protected:
     virtual void process() { }
 
 public:
-    ProcessSlaveActivity(CGraphElementBase *container);
+    ProcessSlaveActivity(CGraphElementBase *container, const StatisticsMapping &statsMapping = basicActivityStatistics);
     virtual void beforeDispose();
 
     virtual void startProcess(bool async=true);

+ 17 - 2
thorlcr/thorutil/thormisc.cpp

@@ -70,6 +70,23 @@ mptag_t kjServiceMpTag;
 Owned<IPropertyTree> globals;
 static Owned<IMPtagAllocator> ClusterMPAllocator;
 
+// stat. mappings shared between master and slave activities
+const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile});
+const StatisticsMapping basicActivityStatistics({StTimeLocalExecute, StTimeBlocked});
+const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics);
+const StatisticsMapping hashJoinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics);
+const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed, StNumIndexSeeks, StNumIndexScans}, basicActivityStatistics);
+const StatisticsMapping indexWriteActivityStatistics({StPerReplicated}, basicActivityStatistics);
+const StatisticsMapping keyedJoinActivityStatistics({ StNumIndexSeeks, StNumIndexScans, StNumIndexAccepted, StNumPostFiltered, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected }, basicActivityStatistics);
+const StatisticsMapping loopActivityStatistics({StNumIterations}, basicActivityStatistics);
+const StatisticsMapping lookupJoinActivityStatistics({StNumSmartJoinSlavesDegradedToStd, StNumSmartJoinDegradedToLocal}, basicActivityStatistics);
+const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics, spillStatistics);
+const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead}, basicActivityStatistics, diskReadRemoteStatistics);
+const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics);
+const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics);
+const StatisticsMapping graphStatistics({StNumExecutions}, basicActivityStatistics);
+
+
 MODULE_INIT(INIT_PRIORITY_STANDARD)
 {
     ClusterMPAllocator.setown(createMPtagRangeAllocator(MPTAG_THORGLOBAL_BASE,MPTAG_THORGLOBAL_COUNT));
@@ -1404,8 +1421,6 @@ IPerfMonHook *createThorMemStatsPerfMonHook(CJobBase &job, int maxLevel, IPerfMo
     return new CPerfMonHook(job, maxLevel, chain);
 }
 
-const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile});
-
 bool isOOMException(IException *_e)
 {
     if (_e)

+ 19 - 3
thorlcr/thorutil/thormisc.hpp

@@ -116,6 +116,25 @@ enum RegistryCode:unsigned { rc_register, rc_deregister };
 #define destroyThorRow(ptr)         free(ptr)
 #define reallocThorRow(ptr, size)   realloc(ptr, size)
 
+
+//statistics gathered by the different activities
+extern graph_decl const StatisticsMapping spillStatistics;
+extern graph_decl const StatisticsMapping basicActivityStatistics;
+extern graph_decl const StatisticsMapping groupActivityStatistics;
+extern graph_decl const StatisticsMapping hashJoinActivityStatistics;
+extern graph_decl const StatisticsMapping indexReadActivityStatistics;
+extern graph_decl const StatisticsMapping indexWriteActivityStatistics;
+extern graph_decl const StatisticsMapping joinActivityStatistics;
+extern graph_decl const StatisticsMapping keyedJoinActivityStatistics;
+extern graph_decl const StatisticsMapping lookupJoinActivityStatistics;
+extern graph_decl const StatisticsMapping loopActivityStatistics;
+extern graph_decl const StatisticsMapping diskReadActivityStatistics;
+extern graph_decl const StatisticsMapping diskWriteActivityStatistics;
+extern graph_decl const StatisticsMapping sortActivityStatistics;
+
+extern graph_decl const StatisticsMapping graphStatistics;
+
+
 class BooleanOnOff
 {
     bool &tf;
@@ -525,9 +544,6 @@ extern graph_decl void logDiskSpace();
 class CJobBase;
 extern graph_decl IPerfMonHook *createThorMemStatsPerfMonHook(CJobBase &job, int minLevel, IPerfMonHook *chain=NULL); // for passing to jdebug startPerformanceMonitor
 
-//statistics gathered by the different activities
-extern const graph_decl StatisticsMapping spillStatistics;
-
 extern graph_decl bool isOOMException(IException *e);
 extern graph_decl IThorException *checkAndCreateOOMContextException(CActivityBase *activity, IException *e, const char *msg, rowcount_t numRows, IOutputMetaData *meta, const void *row);