Browse Source

HPCC-23961 Refactor stats to use CRuntimeSummaryStatisticCollection's

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 5 years ago
parent
commit
ee5460e0a2

+ 2 - 2
thorlcr/activities/join/thjoin.cpp

@@ -78,7 +78,7 @@ class JoinActivityMaster : public CMasterActivity
         }
     } *climitedcmp;
 public:
-    JoinActivityMaster(CMasterGraphElement * info, bool local) : CMasterActivity(info), extraStats(info->queryJob(), spillStatistics)
+    JoinActivityMaster(CMasterGraphElement * info, bool local) : CMasterActivity(info), extraStats(spillStatistics)
     {
         ActPrintLog("JoinActivityMaster");
         lhsProgress.setown(new ProgressInfo(queryJob()));
@@ -347,7 +347,7 @@ public:
             rhsProgress->set(node, rhsProgressCount);
         }
 
-        extraStats.deserializeMerge(node, mb);
+        extraStats.deserialize(node, mb);
     }
     virtual void getEdgeStats(IStatisticGatherer & stats, unsigned idx)
     {

+ 2 - 3
thorlcr/activities/msort/thmsort.cpp

@@ -36,13 +36,12 @@ class CSortBaseActivityMaster : public CMasterActivity
 {
     CThorStatsCollection extraStats;
 public:
-    CSortBaseActivityMaster(CMasterGraphElement * info) : CMasterActivity(info), extraStats(info->queryJob(), spillStatistics) { }
+    CSortBaseActivityMaster(CMasterGraphElement * info) : CMasterActivity(info), extraStats(spillStatistics) { }
 
     virtual void deserializeStats(unsigned node, MemoryBuffer &mb)
     {
         CMasterActivity::deserializeStats(node, mb);
-
-        extraStats.deserializeMerge(node, mb);
+        extraStats.deserialize(node, mb);
     }
     virtual void getActivityStats(IStatisticGatherer & stats)
     {

+ 4 - 4
thorlcr/activities/thdiskbase.cpp

@@ -29,7 +29,7 @@
 #include "eclhelper.hpp" // tmp for IHThorArg interface
 #include "thdiskbase.ipp"
 
-CDiskReadMasterBase::CDiskReadMasterBase(CMasterGraphElement *info) : CMasterActivity(info), diskStats(info->queryJob(), diskReadRemoteStatistics)
+CDiskReadMasterBase::CDiskReadMasterBase(CMasterGraphElement *info) : CMasterActivity(info), diskStats(diskReadRemoteStatistics)
 {
     hash = NULL;
     inputProgress.setown(new ProgressInfo(queryJob()));
@@ -122,7 +122,7 @@ void CDiskReadMasterBase::deserializeStats(unsigned node, MemoryBuffer &mb)
     mb.read(progress);
     inputProgress->set(node, progress);
 
-    diskStats.deserializeMerge(node, mb);
+    diskStats.deserialize(node, mb);
 }
 
 void CDiskReadMasterBase::getActivityStats(IStatisticGatherer & stats)
@@ -324,7 +324,7 @@ void CWriteMasterBase::publish()
     }
 }
 
-CWriteMasterBase::CWriteMasterBase(CMasterGraphElement *info) : CMasterActivity(info), diskStats(info->queryJob(), diskWriteRemoteStatistics)
+CWriteMasterBase::CWriteMasterBase(CMasterGraphElement *info) : CMasterActivity(info), diskStats(diskWriteRemoteStatistics)
 {
     publishReplicatedDone = !globals->getPropBool("@replicateAsync", true);
     replicateProgress.setown(new ProgressInfo(queryJob()));
@@ -340,7 +340,7 @@ void CWriteMasterBase::deserializeStats(unsigned node, MemoryBuffer &mb)
     mb.read(repPerc);
     replicateProgress->set(node, repPerc);
 
-    diskStats.deserializeMerge(node, mb);
+    diskStats.deserialize(node, mb);
 }
 
 void CWriteMasterBase::getActivityStats(IStatisticGatherer & stats)

+ 22 - 15
thorlcr/graph/thgraphmaster.cpp

@@ -362,20 +362,20 @@ void CSlaveMessageHandler::threadmain()
 
 //////////////////////
 
-CMasterActivity::CMasterActivity(CGraphElementBase *_container) : CActivityBase(_container), threaded("CMasterActivity", this), timingInfo(_container->queryJob()),
-                                                                  blockedTime(queryJob(), StTimeBlocked)
+static const StatisticsMapping activityStatsMapping({StTimeLocalExecute, StTimeBlocked});
+
+CMasterActivity::CMasterActivity(CGraphElementBase *_container) : CActivityBase(_container), threaded("CMasterActivity", this), actStats(activityStatsMapping)
 {
     notedWarnings = createThreadSafeBitSet();
     mpTag = TAG_NULL;
     data = new MemoryBuffer[container.queryJob().querySlaves()];
     asyncStart = false;
     if (container.isSink())
-        progressInfo.append(*new ProgressInfo(queryJob()));
+        edgeStatsVector.push_back(new CThorEdgeCollection);
     else
     {
-        unsigned o=0;
-        for (; o<container.getOutputs(); o++)
-            progressInfo.append(*new ProgressInfo(queryJob()));
+        for (unsigned o=0; o<container.getOutputs(); o++)
+            edgeStatsVector.push_back(new CThorEdgeCollection);
     }
 }
 
@@ -518,10 +518,10 @@ void CMasterActivity::deserializeStats(unsigned node, MemoryBuffer &mb)
     CriticalBlock b(progressCrit); // don't think needed
     deserializeActivityStats(node, mb);
     rowcount_t count;
-    ForEachItemIn(p, progressInfo)
+    for (auto &collection: edgeStatsVector)
     {
         mb.read(count);
-        progressInfo.item(p).set(node, count);
+        collection->set(node, count);
     }
 }
 
@@ -530,21 +530,20 @@ void CMasterActivity::deserializeActivityStats(unsigned node, MemoryBuffer &mb)
     unsigned __int64 localTimeNs, blockedTimeNs;
     mb.read(localTimeNs);
     mb.read(blockedTimeNs);
-    timingInfo.set(node, localTimeNs);
-    blockedTime.set(node, blockedTimeNs);
+    actStats.setStatistic(node, StTimeLocalExecute, localTimeNs);
+    actStats.setStatistic(node, StTimeBlocked, blockedTimeNs);
 }
 
 void CMasterActivity::getActivityStats(IStatisticGatherer & stats)
 {
-    timingInfo.getStats(stats);
-    blockedTime.getStats(stats,false);
+    actStats.getStats(stats);
 }
 
 void CMasterActivity::getEdgeStats(IStatisticGatherer & stats, unsigned idx)
 {
     CriticalBlock b(progressCrit);
-    if (progressInfo.isItem(idx))
-        progressInfo.item(idx).getStats(stats);
+    if (idx < edgeStatsVector.size())
+        edgeStatsVector[idx]->getStats(stats);
 }
 
 void CMasterActivity::done()
@@ -2965,8 +2964,16 @@ void CThorStats::getStats(IStatisticGatherer & stats, bool suppressMinMaxWhenEqu
 
 ///////////////////////////////////////////////////
 
-CTimingInfo::CTimingInfo(CJobBase &ctx) : CThorStats(ctx, StTimeLocalExecute)
+const StatisticsMapping CThorEdgeCollection::edgeStatsMapping({StNumRowsProcessed, StNumStarts, StNumStops});
+
+void CThorEdgeCollection::set(unsigned node, unsigned __int64 value)
 {
+    unsigned __int64 numRows = value & THORDATALINK_COUNT_MASK;
+    byte stop = (value & THORDATALINK_STOPPED) ? 1 : 0;
+    byte start = (value & THORDATALINK_STARTED) ? 1 : 0;
+    setStatistic(node, StNumRowsProcessed, numRows);
+    setStatistic(node, StNumStops, stop);
+    setStatistic(node, StNumStarts, start);
 }
 
 ///////////////////////////////////////////////////

+ 20 - 33
thorlcr/graph/thgraphmaster.ipp

@@ -71,50 +71,38 @@ protected:
 
 class graphmaster_decl CThorStatsCollection : public CInterface
 {
+    std::vector<OwnedMalloc<CRuntimeStatisticCollection>> nodeStats;
+    const StatisticsMapping & mapping;
 public:
-    CThorStatsCollection(CJobBase &ctx, const StatisticsMapping & _mapping) : mapping(_mapping)
+    CThorStatsCollection(const StatisticsMapping & _mapping) : mapping(_mapping)
     {
-        unsigned num = mapping.numStatistics();
-        stats = new Owned<CThorStats>[num];
-        for (unsigned i=0; i < num; i++)
-            stats[i].setown(new CThorStats(ctx, mapping.getKind(i)));
+        unsigned c = queryClusterWidth();
+        while (c--)
+            nodeStats.push_back(new CRuntimeStatisticCollection(mapping));
     }
-    ~CThorStatsCollection()
+    void deserialize(unsigned node, MemoryBuffer & mb)
     {
-        delete [] stats;
+        nodeStats[node]->deserialize(mb);
     }
-
-    void deserializeMerge(unsigned node, MemoryBuffer & mb)
-    {
-        CRuntimeStatisticCollection nodeStats(mapping);
-        nodeStats.deserialize(mb);
-        extract(node, nodeStats);
-    }
-
-    void extract(unsigned node, const CRuntimeStatisticCollection & source)
+    void setStatistic(unsigned node, StatisticKind kind, unsigned __int64 value)
     {
-        for (unsigned i=0; i < mapping.numStatistics(); i++)
-            stats[i]->extract(node, source);
+        nodeStats[node]->setStatistic(kind, value);
     }
-
     void getStats(IStatisticGatherer & result)
     {
-        for (unsigned i=0; i < mapping.numStatistics(); i++)
-        {
-            stats[i]->getStats(result, false);
-        }
+        CRuntimeSummaryStatisticCollection summary(mapping);
+        for (unsigned n=0; n < nodeStats.size(); n++) // NB: size is = queryClusterWidth()
+            summary.merge(*nodeStats[n], n);
+        summary.recordStatistics(result);
     }
-
-private:
-    Owned<CThorStats> * stats;
-    const StatisticsMapping & mapping;
 };
 
-class graphmaster_decl CTimingInfo : public CThorStats
+class graphmaster_decl CThorEdgeCollection : public CThorStatsCollection
 {
+    static const StatisticsMapping edgeStatsMapping;
 public:
-    CTimingInfo(CJobBase &ctx);
-    void getStats(IStatisticGatherer & stats) { CThorStats::getStats(stats, false); }
+    CThorEdgeCollection() : CThorStatsCollection(edgeStatsMapping) { }
+    void set(unsigned node, unsigned __int64 value);
 };
 
 class graphmaster_decl ProgressInfo : public CThorStats
@@ -291,9 +279,8 @@ class graphmaster_decl CMasterActivity : public CActivityBase, implements IThrea
     IArrayOf<IDistributedFile> readFiles;
 
 protected:
-    ProgressInfoArray progressInfo;
-    CTimingInfo timingInfo;
-    CThorStats blockedTime;
+    std::vector<OwnedMalloc<CThorEdgeCollection>> edgeStatsVector;
+    CThorStatsCollection actStats;
     IBitSet *notedWarnings;
 
     void addReadFile(IDistributedFile *file, bool temp=false);