Explorar el Código

HPCC-15136 Add a statistic to include the number of strands

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman hace 9 años
padre
commit
40d50cd363

+ 1 - 1
roxie/ccd/ccd.hpp

@@ -721,7 +721,7 @@ public:
     SlaveContextLogger();
     SlaveContextLogger(IRoxieQueryPacket *packet);
     void set(IRoxieQueryPacket *packet);
-    void putStatProcessed(unsigned subGraphId, unsigned actId, unsigned idx, unsigned processed) const;
+    void putStatProcessed(unsigned subGraphId, unsigned actId, unsigned idx, unsigned processed, unsigned strands) const;
     void putStats(unsigned subGraphId, unsigned actId, const CRuntimeStatisticCollection &stats) const;
     void flush();
     inline bool queryDebuggerActive() const { return debuggerActive; }

+ 5 - 3
roxie/ccd/ccdcontext.cpp

@@ -2414,10 +2414,10 @@ public:
         throwUnexpected();
     }
 
-    virtual void noteProcessed(unsigned subgraphId, unsigned activityId, unsigned _idx, unsigned _processed) const
+    virtual void noteProcessed(unsigned subgraphId, unsigned activityId, unsigned _idx, unsigned _processed, unsigned _strands) const
     {
         const SlaveContextLogger &slaveLogCtx = static_cast<const SlaveContextLogger &>(logctx);
-        slaveLogCtx.putStatProcessed(subgraphId, activityId, _idx, _processed);
+        slaveLogCtx.putStatProcessed(subgraphId, activityId, _idx, _processed, _strands);
     }
 
     virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, unsigned subgraphId, unsigned activityId, const ActivityTimeAccumulator &_totalCycles, cycle_t _localCycles) const
@@ -2761,7 +2761,7 @@ public:
         workflow.setown(_factory->createWorkflowMachine(workUnit, false, logctx));
     }
 
-    virtual void noteProcessed(unsigned subgraphId, unsigned activityId, unsigned _idx, unsigned _processed) const
+    virtual void noteProcessed(unsigned subgraphId, unsigned activityId, unsigned _idx, unsigned _processed, unsigned _strands) const
     {
         if (_processed)
         {
@@ -2770,6 +2770,8 @@ public:
                 IStatisticGatherer & builder = graphStats->queryStatsBuilder();
                 StatsSubgraphScope graphScope(builder, subgraphId);
                 StatsEdgeScope scope(builder, activityId, _idx);
+                if (_strands)
+                    builder.addStatistic(StNumStrands, _strands);
                 builder.addStatistic(StNumRowsProcessed, _processed);
                 builder.addStatistic(StNumStarted, 1);
                 builder.addStatistic(StNumStopped, 1);

+ 1 - 1
roxie/ccd/ccdcontext.hpp

@@ -58,7 +58,7 @@ interface IRoxieSlaveContext : extends IRoxieContextLogger
     virtual IRoxieWriteHandler *createLFN(const char *filename, bool overwrite, bool extend, const StringArray &clusters) = 0;
     virtual void onFileCallback(const RoxiePacketHeader &header, const char *lfn, bool isOpt, bool isLocal) = 0;
     virtual IActivityGraph * getLibraryGraph(const LibraryCallFactoryExtra &extra, IRoxieServerActivity *parentActivity) = 0;
-    virtual void noteProcessed(unsigned subgraphId, unsigned activityId, unsigned _idx, unsigned _processed) const = 0;
+    virtual void noteProcessed(unsigned subgraphId, unsigned activityId, unsigned _idx, unsigned _processed, unsigned _strands) const = 0;
     virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, unsigned subgraphId, unsigned activityId, const ActivityTimeAccumulator &_totalCycles, cycle_t _localCycles) const = 0;
     virtual IProbeManager *queryProbeManager() const = 0;
     virtual IDebuggableContext *queryDebugContext() const = 0;

+ 2 - 1
roxie/ccd/ccdqueue.cpp

@@ -539,7 +539,7 @@ void SlaveContextLogger::set(IRoxieQueryPacket *packet)
 }
 
 
-void SlaveContextLogger::putStatProcessed(unsigned subGraphId, unsigned actId, unsigned idx, unsigned processed) const
+void SlaveContextLogger::putStatProcessed(unsigned subGraphId, unsigned actId, unsigned idx, unsigned processed, unsigned strands) const
 {
     if (output && mergeSlaveStatistics)
     {
@@ -549,6 +549,7 @@ void SlaveContextLogger::putStatProcessed(unsigned subGraphId, unsigned actId, u
         buf.append(actId);
         buf.append(idx);
         buf.append(processed);
+        buf.append(strands);
     }
 }
 

+ 24 - 10
roxie/ccd/ccdserver.cpp

@@ -277,9 +277,9 @@ public:
     {
         return ctx->getLibraryGraph(extra, parentActivity);
     }
-    virtual void noteProcessed(unsigned subgraphId, unsigned activityId, unsigned _idx, unsigned _processed) const
+    virtual void noteProcessed(unsigned subgraphId, unsigned activityId, unsigned _idx, unsigned _processed, unsigned _strands) const
     {
-        ctx->noteProcessed(subgraphId, activityId, _idx, _processed);
+        ctx->noteProcessed(subgraphId, activityId, _idx, _processed, _strands);
     }
     virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, unsigned subgraphId, unsigned activityId, const ActivityTimeAccumulator &_totalCycles, cycle_t _localCycles) const
     {
@@ -348,7 +348,7 @@ protected:
 // General activity statistics
 
 static const StatisticsMapping actStatistics(StWhenFirstRow, StTimeElapsed, StTimeLocalExecute, StTimeTotalExecute, StSizeMaxRowSize,
-                                              StNumRowsProcessed, StNumSlaves, StNumStarted, StNumStopped, StKindNone);
+                                              StNumRowsProcessed, StNumSlaves, StNumStarted, StNumStopped, StNumStrands, StKindNone);
 static const StatisticsMapping joinStatistics(&actStatistics, StNumAtmostTriggered, StKindNone);
 static const StatisticsMapping keyedJoinStatistics(&joinStatistics, StNumServerCacheHits, StNumIndexSeeks, StNumIndexScans, StNumIndexWildSeeks,
                                                     StNumIndexSkips, StNumIndexNullSkips, StNumIndexMerges, StNumIndexMergeCompares,
@@ -1013,7 +1013,7 @@ public:
         if (ctx)
         {
             if (processed)
-                ctx->noteProcessed(factory->querySubgraphId(), activityId, 0, processed);
+                ctx->noteProcessed(factory->querySubgraphId(), activityId, 0, processed, 0);
             ctx->mergeActivityStats(stats, factory->querySubgraphId(), activityId, totalCycles, localCycles);
         }
         basehelper.Release();
@@ -1674,6 +1674,18 @@ public:
         active = 0;
     }
 
+    ~CRoxieServerStrandedActivity()
+    {
+        if (strands.ordinality() > 1)
+        {
+            if (factory && !debugging)
+                factory->noteProcessed(0, processed);
+            if (ctx)
+                ctx->noteProcessed(factory->querySubgraphId(), activityId, 0, processed, strands.ordinality());
+            processed = 0;  // To avoid reprocessing in base destructor
+        }
+    }
+
     virtual void onCreate(IHThorArg *_colocalArg)
     {
         CRoxieServerActivity::_onCreate(_colocalArg, strands.ordinality());
@@ -4655,13 +4667,15 @@ public:
                                     buf.read(childId);
                                     if (*logInfo == LOG_CHILDCOUNT)
                                     {
-                                        unsigned childProcessed;
                                         unsigned idx;
-                                        buf.read(childProcessed);
+                                        unsigned childProcessed;
+                                        unsigned childStrands;
                                         buf.read(idx);
+                                        buf.read(childProcessed);
+                                        buf.read(childStrands);
                                         if (traceLevel > 5)
-                                            activity.queryLogCtx().CTXLOG("Processing ChildCount %d idx %d for child %d subgraph %d", childProcessed, idx, childId, graphId);
-                                        activity.queryContext()->noteProcessed(graphId, childId, idx, childProcessed);
+                                            activity.queryLogCtx().CTXLOG("Processing ChildCount %d idx %d strands %d for child %d subgraph %d", childProcessed, idx, childStrands, childId, graphId);
+                                        activity.queryContext()->noteProcessed(graphId, childId, idx, childProcessed, childStrands);
                                     }
                                     else
                                     {
@@ -8555,7 +8569,7 @@ public:
             {
                 parent->factory->noteProcessed(oid, processed);
                 if (parent->ctx)
-                    parent->ctx->noteProcessed(parent->querySubgraphId(), parent->activityId, oid, processed);
+                    parent->ctx->noteProcessed(parent->querySubgraphId(), parent->activityId, oid, processed, 0);
             }
         }
 
@@ -15695,7 +15709,7 @@ class CRoxieServerLibraryCallActivity : public CRoxieServerActivity
             {
                 parent->factory->noteProcessed(oid, processed);
                 if (parent->ctx)
-                    parent->ctx->noteProcessed(parent->querySubgraphId(), parent->activityId, oid, processed);
+                    parent->ctx->noteProcessed(parent->querySubgraphId(), parent->activityId, oid, processed, 0);
             }
         }
 

+ 3 - 0
system/jlib/jstatcodes.h

@@ -176,6 +176,9 @@ enum StatisticKind
     StCycleSpillElapsedCycles,
     StCycleSortElapsedCycles,
 
+    // Stranding stats - on edge
+    StNumStrands,
+
     StMax,
 
     //For any quantity there is potentially the following variants.

+ 1 - 0
system/jlib/jstats.cpp

@@ -581,6 +581,7 @@ static const StatisticMeta statsMetaData[StMax] = {
     { SIZESTAT(SpillFile) },
     { CYCLESTAT(SpillElapsedCycles) },
     { CYCLESTAT(SortElapsedCycles) },
+    { NUMSTAT(Strands) },
 };