浏览代码

HPCC-15088 Suppress misleading skew stats where small row count

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 9 年之前
父节点
当前提交
bad6519696

+ 2 - 2
thorlcr/activities/group/thgroup.cpp

@@ -30,8 +30,8 @@ public:
     virtual void init()
     {
         CMasterActivity::init();
-        statNumGroups.setown(new CThorStats(StNumGroups));
-        statNumGroupMax.setown(new CThorStats(StNumGroupMax));
+        statNumGroups.setown(new CThorStats(queryJob(), StNumGroups));
+        statNumGroupMax.setown(new CThorStats(queryJob(), StNumGroupMax));
     }
     virtual void deserializeStats(unsigned node, MemoryBuffer &mb)
     {

+ 2 - 2
thorlcr/activities/hashdistrib/thhashdistrib.cpp

@@ -85,8 +85,8 @@ class HashJoinDistributeActivityMaster : public HashDistributeActivityMaster
 public:
     HashJoinDistributeActivityMaster(DistributeMode mode, CMasterGraphElement *info) : HashDistributeActivityMaster(mode, info)
     {
-        lhsProgress.setown(new ProgressInfo);
-        rhsProgress.setown(new ProgressInfo);
+        lhsProgress.setown(new ProgressInfo(queryJob()));
+        rhsProgress.setown(new ProgressInfo(queryJob()));
     }
     virtual void deserializeStats(unsigned node, MemoryBuffer &mb)
     {

+ 2 - 2
thorlcr/activities/indexread/thindexread.cpp

@@ -187,8 +187,8 @@ public:
         progressKinds.append(StNumIndexSeeks);
         progressKinds.append(StNumIndexScans);
         ForEachItemIn(l, progressKinds)
-            progressInfoArr.append(*new ProgressInfo);
-        inputProgress.setown(new ProgressInfo);
+            progressInfoArr.append(*new ProgressInfo(queryJob()));
+        inputProgress.setown(new ProgressInfo(queryJob()));
         reInit = 0 != (indexBaseHelper->getFlags() & (TIRvarfilename|TIRdynamicfilename));
     }
     virtual void init()

+ 1 - 1
thorlcr/activities/indexwrite/thindexwrite.cpp

@@ -44,7 +44,7 @@ public:
     IndexWriteActivityMaster(CMasterGraphElement *info) : CMasterActivity(info)
     {
         helper = (IHThorIndexWriteArg *)queryHelper();
-        replicateProgress.setown(new ProgressInfo);
+        replicateProgress.setown(new ProgressInfo(queryJob()));
         publishReplicatedDone = !globals->getPropBool("@replicateAsync", true);
         recordsProcessed = 0;
         refactor = singlePartKey = isLocal = false;

+ 3 - 3
thorlcr/activities/join/thjoin.cpp

@@ -78,11 +78,11 @@ class JoinActivityMaster : public CMasterActivity
         }
     } *climitedcmp;
 public:
-    JoinActivityMaster(CMasterGraphElement * info, bool local) : CMasterActivity(info), extraStats(spillStatistics)
+    JoinActivityMaster(CMasterGraphElement * info, bool local) : CMasterActivity(info), extraStats(info->queryJob(), spillStatistics)
     {
         ActPrintLog("JoinActivityMaster");
-        lhsProgress.setown(new ProgressInfo);
-        rhsProgress.setown(new ProgressInfo);
+        lhsProgress.setown(new ProgressInfo(queryJob()));
+        rhsProgress.setown(new ProgressInfo(queryJob()));
         helper = NULL;
         islocal = local;
         imaster = NULL;

+ 1 - 1
thorlcr/activities/keyedjoin/thkeyedjoin.cpp

@@ -54,7 +54,7 @@ public:
             progressKinds.append(StNumDiskRejected);
         }
         ForEachItemIn(l, progressKinds)
-            progressInfoArr.append(*new ProgressInfo);
+            progressInfoArr.append(*new ProgressInfo(queryJob()));
         localKey = false;
         numTags = 0;
         tags[0] = tags[1] = tags[2] = tags[3] = TAG_NULL;

+ 1 - 1
thorlcr/activities/loop/thloop.cpp

@@ -89,7 +89,7 @@ protected:
 public:
     CLoopActivityMasterBase(CMasterGraphElement *info) : CMasterActivity(info)
     {
-        loopCounterProgress.setown(new CThorStats(StNumIterations));
+        loopCounterProgress.setown(new CThorStats(queryJob(), StNumIterations));
         if (!container.queryLocalOrGrouped())
             mpTag = container.queryJob().allocateMPTag();
         loopGraph = NULL;

+ 1 - 1
thorlcr/activities/msort/thmsort.cpp

@@ -36,7 +36,7 @@ class CSortBaseActivityMaster : public CMasterActivity
 {
     CThorStatsCollection extraStats;
 public:
-    CSortBaseActivityMaster(CMasterGraphElement * info) : CMasterActivity(info), extraStats(spillStatistics) { }
+    CSortBaseActivityMaster(CMasterGraphElement * info) : CMasterActivity(info), extraStats(info->queryJob(), spillStatistics) { }
 
     virtual void deserializeStats(unsigned node, MemoryBuffer &mb)
     {

+ 4 - 4
thorlcr/activities/thdiskbase.cpp

@@ -29,10 +29,10 @@
 #include "eclhelper.hpp" // tmp for IHThorArg interface
 #include "thdiskbase.ipp"
 
-CDiskReadMasterBase::CDiskReadMasterBase(CMasterGraphElement *info) : CMasterActivity(info), diskStats(diskReadRemoteStatistics)
+CDiskReadMasterBase::CDiskReadMasterBase(CMasterGraphElement *info) : CMasterActivity(info), diskStats(info->queryJob(), diskReadRemoteStatistics)
 {
     hash = NULL;
-    inputProgress.setown(new ProgressInfo);
+    inputProgress.setown(new ProgressInfo(queryJob()));
 }
 
 void CDiskReadMasterBase::init()
@@ -238,10 +238,10 @@ void CWriteMasterBase::publish()
         queryThorFileManager().publish(container.queryJob(), fileName, *fileDesc, NULL, targetOffset);
 }
 
-CWriteMasterBase::CWriteMasterBase(CMasterGraphElement *info) : CMasterActivity(info), diskStats(diskWriteRemoteStatistics)
+CWriteMasterBase::CWriteMasterBase(CMasterGraphElement *info) : CMasterActivity(info), diskStats(info->queryJob(), diskWriteRemoteStatistics)
 {
     publishReplicatedDone = !globals->getPropBool("@replicateAsync", true);
-    replicateProgress.setown(new ProgressInfo);
+    replicateProgress.setown(new ProgressInfo(queryJob()));
 
     diskHelperBase = (IHThorDiskWriteArg *)queryHelper();
     targetOffset = 0;

+ 7 - 7
thorlcr/graph/thgraphmaster.cpp

@@ -357,19 +357,19 @@ void CSlaveMessageHandler::main()
 
 //////////////////////
 
-CMasterActivity::CMasterActivity(CGraphElementBase *_container) : CActivityBase(_container), threaded("CMasterActivity", this)
+CMasterActivity::CMasterActivity(CGraphElementBase *_container) : CActivityBase(_container), threaded("CMasterActivity", this), timingInfo(_container->queryJob())
 {
     notedWarnings = createThreadSafeBitSet();
     mpTag = TAG_NULL;
     data = new MemoryBuffer[container.queryJob().querySlaves()];
     asyncStart = false;
     if (container.isSink())
-        progressInfo.append(*new ProgressInfo);
+        progressInfo.append(*new ProgressInfo(queryJob()));
     else
     {
         unsigned o=0;
         for (; o<container.getOutputs(); o++)
-            progressInfo.append(*new ProgressInfo);
+            progressInfo.append(*new ProgressInfo(queryJob()));
     }
 }
 
@@ -2798,7 +2798,7 @@ static bool suppressStatisticIfZero(StatisticKind kind)
 
 ///////////////////////////////////////////////////
 
-CThorStats::CThorStats(StatisticKind _kind) : kind(_kind)
+CThorStats::CThorStats(CJobBase &_ctx, StatisticKind _kind) : ctx(_ctx), kind(_kind)
 {
     unsigned c = queryClusterWidth();
     while (c--) counts.append(0);
@@ -2824,7 +2824,7 @@ void CThorStats::reset()
 
 void CThorStats::calculateSkew()
 {
-    if (max)
+    if (max > ctx.querySlaves()) // i.e. if small count, suppress skew stats.
     {
         unsigned count = counts.ordinality();
         double _avg = (double)tot/count;
@@ -2889,13 +2889,13 @@ void CThorStats::getStats(IStatisticGatherer & stats, bool suppressMinMaxWhenEqu
 
 ///////////////////////////////////////////////////
 
-CTimingInfo::CTimingInfo() : CThorStats(StTimeLocalExecute)
+CTimingInfo::CTimingInfo(CJobBase &ctx) : CThorStats(ctx, StTimeLocalExecute)
 {
 }
 
 ///////////////////////////////////////////////////
 
-ProgressInfo::ProgressInfo() : CThorStats(StNumRowsProcessed)
+ProgressInfo::ProgressInfo(CJobBase &ctx) : CThorStats(ctx, StNumRowsProcessed)
 {
     startcount = stopcount = 0;
 }

+ 6 - 5
thorlcr/graph/thgraphmaster.ipp

@@ -189,6 +189,7 @@ public:
 class graphmaster_decl CThorStats : public CInterface, implements IInterface
 {
 protected:
+    CJobBase &ctx;
     unsigned __int64 max, min, tot, avg;
     unsigned maxSkew, minSkew, minNode, maxNode;
     UInt64Array counts;
@@ -197,7 +198,7 @@ protected:
 public:
     IMPLEMENT_IINTERFACE;
 
-    CThorStats(StatisticKind _kind);
+    CThorStats(CJobBase &ctx, StatisticKind _kind);
     void reset();
     virtual void processInfo();
 
@@ -222,12 +223,12 @@ protected:
 class graphmaster_decl CThorStatsCollection : public CInterface
 {
 public:
-    CThorStatsCollection(const StatisticsMapping & _mapping) : mapping(_mapping)
+    CThorStatsCollection(CJobBase &ctx, const StatisticsMapping & _mapping) : mapping(_mapping)
     {
         unsigned num = mapping.numStatistics();
         stats = new Owned<CThorStats>[num];
         for (unsigned i=0; i < num; i++)
-            stats[i].setown(new CThorStats(mapping.getKind(i)));
+            stats[i].setown(new CThorStats(ctx, mapping.getKind(i)));
     }
     ~CThorStatsCollection()
     {
@@ -263,7 +264,7 @@ private:
 class graphmaster_decl CTimingInfo : public CThorStats
 {
 public:
-    CTimingInfo();
+    CTimingInfo(CJobBase &ctx);
     void getStats(IStatisticGatherer & stats) { CThorStats::getStats(stats, false); }
 };
 
@@ -271,7 +272,7 @@ class graphmaster_decl ProgressInfo : public CThorStats
 {
     unsigned startcount, stopcount;
 public:
-    ProgressInfo();
+    ProgressInfo(CJobBase &ctx);
 
     virtual void processInfo();
     void getStats(IStatisticGatherer & stats);