浏览代码

Merge pull request #13681 from ghalliday/mergestats

HPCC-23948 Support merging to create summary statistics

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 5 年之前
父节点
当前提交
22a099c448
共有 2 个文件被更改,包括 30 次插入36 次删除
  1. 23 31
      system/jlib/jstats.cpp
  2. 7 5
      system/jlib/jstats.h

+ 23 - 31
system/jlib/jstats.cpp

@@ -2173,34 +2173,20 @@ unsigned __int64 CRuntimeStatisticCollection::getSerialStatisticValue(StatisticK
     return value + convertMeasure(rawKind, kind, rawValue);
 }
 
-void CRuntimeStatisticCollection::merge(const CRuntimeStatisticCollection & other)
+void CRuntimeStatisticCollection::merge(const CRuntimeStatisticCollection & other, unsigned node)
 {
-    if (&mapping == &other.mapping)
+    ForEachItemIn(i, other)
     {
-        ForEachItemIn(i, other)
-        {
-            unsigned __int64 value = other.values[i].get();
-            if (value)
-            {
-                StatisticKind kind = getKind(i);
-                values[i].merge(value, queryMergeMode(kind));
-            }
-        }
-    }
-    else
-    {
-        ForEachItemIn(i, other)
-        {
-            StatisticKind kind = other.getKind(i);
-            unsigned __int64 value = other.getStatisticValue(kind);
-            if (value)
-                mergeStatistic(kind, value);
-        }
+        StatisticKind kind = other.getKind(i);
+        unsigned __int64 value = other.getStatisticValue(kind);
+        if (value)
+            mergeStatistic(kind, value, node);
     }
+
     CNestedRuntimeStatisticMap *otherNested = other.queryNested();
     if (otherNested)
     {
-        ensureNested().merge(*otherNested);
+        ensureNested().merge(*otherNested, node);
     }
 }
 
@@ -2237,6 +2223,11 @@ void CRuntimeStatisticCollection::mergeStatistic(StatisticKind kind, unsigned __
     queryStatistic(kind).merge(value, queryMergeMode(kind));
 }
 
+void CRuntimeStatisticCollection::mergeStatistic(StatisticKind kind, unsigned __int64 value, unsigned node)
+{
+    mergeStatistic(kind, value);
+}
+
 void CRuntimeStatisticCollection::reset()
 {
     unsigned num = mapping.numStatistics();
@@ -2468,11 +2459,11 @@ CNestedRuntimeStatisticMap * CRuntimeSummaryStatisticCollection::createNested()
     return new CNestedSummaryRuntimeStatisticMap;
 }
 
-void CRuntimeSummaryStatisticCollection::mergeStatistic(StatisticKind kind, unsigned __int64 value)
+void CRuntimeSummaryStatisticCollection::mergeStatistic(StatisticKind kind, unsigned __int64 value, unsigned node)
 {
     CRuntimeStatisticCollection::mergeStatistic(kind, value);
     unsigned index = queryMapping().getIndex(kind);
-    derived[index].mergeStatistic(value, 0);
+    derived[index].mergeStatistic(value, node);
 }
 
 static bool isSignificantSkew(StatisticKind kind, unsigned __int64 range, unsigned __int64 count)
@@ -2506,13 +2497,14 @@ void CRuntimeSummaryStatisticCollection::recordStatistics(IStatisticGatherer & t
                 double minSkew = (10000.0 * ((mean-minValue)/mean));
                 unsigned __int64 range = maxValue - minValue;
 
+                //MORE: Add some level of control over which derived stats are reported for a given kind(/scope?)
                 target.addStatistic((StatisticKind)(serialKind|StMinX), minValue);
                 target.addStatistic((StatisticKind)(serialKind|StMaxX), maxValue);
                 target.addStatistic((StatisticKind)(serialKind|StAvgX), (unsigned __int64)mean);
                 target.addStatistic((StatisticKind)(serialKind|StDeltaX), range);
                 target.addStatistic((StatisticKind)(serialKind|StStdDevX), (unsigned __int64)stdDev);
-                //If all nodes are the same then we re actually merging results from multiple runs
-                //if the range is less than the count then
+
+                //If min and max nodes are the same then all nodes have the same values, so no benefit in reporting skews.
                 if ((cur.minNode != cur.maxNode) && isSignificantSkew(serialKind, range, cur.count))
                 {
                     target.addStatistic((StatisticKind)(serialKind|StSkewMin), (unsigned __int64)minSkew);
@@ -2558,12 +2550,12 @@ void CNestedRuntimeStatisticCollection::deserialize(MemoryBuffer & in)
 
 void CNestedRuntimeStatisticCollection::deserializeMerge(MemoryBuffer& in)
 {
-    stats->deserialize(in);
+    stats->deserializeMerge(in);
 }
 
-void CNestedRuntimeStatisticCollection::merge(const CNestedRuntimeStatisticCollection & other)
+void CNestedRuntimeStatisticCollection::merge(const CNestedRuntimeStatisticCollection & other, unsigned node)
 {
-    stats->merge(other.queryStats());
+    stats->merge(other.queryStats(), node);
 }
 
 bool CNestedRuntimeStatisticCollection::serialize(MemoryBuffer& out) const
@@ -2662,14 +2654,14 @@ void CNestedRuntimeStatisticMap::deserializeMerge(MemoryBuffer& in)
     }
 }
 
-void CNestedRuntimeStatisticMap::merge(const CNestedRuntimeStatisticMap & other)
+void CNestedRuntimeStatisticMap::merge(const CNestedRuntimeStatisticMap & other, unsigned node)
 {
     ReadLockBlock b(other.lock);
     ForEachItemIn(i, other.map)
     {
         CNestedRuntimeStatisticCollection & cur = other.map.item(i);
         CNestedRuntimeStatisticCollection & target = addNested(cur.scope, cur.queryMapping());
-        target.merge(cur);
+        target.merge(cur, node);
     }
 }
 

+ 7 - 5
system/jlib/jstats.h

@@ -508,7 +508,7 @@ public:
     {
         queryStatistic(kind).addAtomic(value);
     }
-    virtual void mergeStatistic(StatisticKind kind, unsigned __int64 value);
+    void mergeStatistic(StatisticKind kind, unsigned __int64 value);
     void setStatistic(StatisticKind kind, unsigned __int64 value)
     {
         queryStatistic(kind).set(value);
@@ -528,7 +528,7 @@ public:
     inline StatisticKind getKind(unsigned i) const { return mapping.getKind(i); }
     inline unsigned __int64 getValue(unsigned i) const { return values[i].get(); }
 
-    void merge(const CRuntimeStatisticCollection & other);
+    void merge(const CRuntimeStatisticCollection & other, unsigned node = 0);
     void updateDelta(CRuntimeStatisticCollection & target, const CRuntimeStatisticCollection & source);
     void rollupStatistics(IContextLogger * target) { rollupStatistics(1, &target); }
     void rollupStatistics(unsigned num, IContextLogger * const * targets) const;
@@ -542,6 +542,7 @@ public:
     // Print out collected stats to string as XML
     StringBuffer &toXML(StringBuffer &str) const;
     // Serialize/deserialize
+    virtual void mergeStatistic(StatisticKind kind, unsigned __int64 value, unsigned node);
     virtual bool serialize(MemoryBuffer & out) const;  // Returns true if any non-zero
     virtual void deserialize(MemoryBuffer & in);
     virtual void deserializeMerge(MemoryBuffer& in);
@@ -572,12 +573,13 @@ public:
     CRuntimeSummaryStatisticCollection(const StatisticsMapping & _mapping);
     ~CRuntimeSummaryStatisticCollection();
 
-    virtual void mergeStatistic(StatisticKind kind, unsigned __int64 value) override;
     virtual void recordStatistics(IStatisticGatherer & target) const override;
     virtual bool serialize(MemoryBuffer & out) const override;  // Returns true if any non-zero
     virtual void deserialize(MemoryBuffer & in) override;
     virtual void deserializeMerge(MemoryBuffer& in) override;
 
+    void mergeStatistic(StatisticKind kind, unsigned __int64 value, unsigned node);
+
 protected:
     struct DerivedStats
     {
@@ -617,7 +619,7 @@ public:
     bool serialize(MemoryBuffer & out) const;  // Returns true if any non-zero
     void deserialize(MemoryBuffer & in);
     void deserializeMerge(MemoryBuffer& in);
-    void merge(const CNestedRuntimeStatisticCollection & other);
+    void merge(const CNestedRuntimeStatisticCollection & other, unsigned node);
     void recordStatistics(IStatisticGatherer & target) const;
     StringBuffer & toStr(StringBuffer &str) const;
     StringBuffer & toXML(StringBuffer &str) const;
@@ -638,7 +640,7 @@ public:
     bool serialize(MemoryBuffer & out) const;  // Returns true if any non-zero
     void deserialize(MemoryBuffer & in);
     void deserializeMerge(MemoryBuffer& in);
-    void merge(const CNestedRuntimeStatisticMap & other);
+    void merge(const CNestedRuntimeStatisticMap & other, unsigned node);
     void recordStatistics(IStatisticGatherer & target) const;
     StringBuffer & toStr(StringBuffer &str) const;
     StringBuffer & toXML(StringBuffer &str) const;