瀏覽代碼

HPCC-12391 Refactor activity timers to track start/end time too

More refactoring, getting splitters right.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 年之前
父節點
當前提交
5d63caa79c
共有 4 個文件被更改,包括 91 次插入57 次删除
  1. 28 18
      roxie/ccd/ccdcontext.cpp
  2. 2 1
      roxie/ccd/ccdcontext.hpp
  3. 59 37
      roxie/ccd/ccdserver.cpp
  4. 2 1
      roxie/ccd/ccdserver.hpp

+ 28 - 18
roxie/ccd/ccdcontext.cpp

@@ -1210,34 +1210,44 @@ public:
         return logctx.queryTraceLevel();
     }
 
-    virtual void noteProcessed(const IRoxieServerActivity *activity, unsigned _idx, unsigned _processed, const ActivityTimeAccumulator &_totalCycles, unsigned __int64 _localCycles) const
+    virtual void noteProcessed(const IRoxieServerActivity *activity, unsigned _idx, unsigned _processed) const
+    {
+        if (_processed)
+        {
+            if (graphStats)
+            {
+                IStatisticGatherer & builder = graphStats->queryStatsBuilder();
+                StatsSubgraphScope graphScope(builder, activity->querySubgraphId());
+                StatsEdgeScope scope(builder, activity->queryId(), _idx);
+                builder.addStatistic(StNumRowsProcessed, _processed);
+            }
+            logctx.noteStatistic(StNumRowsProcessed, _processed);
+        }
+    }
+
+    virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, const IRoxieServerActivity *activity, const ActivityTimeAccumulator &_totalCycles, unsigned __int64 _localCycles) const
     {
         if (graphStats)
         {
             IStatisticGatherer & builder = graphStats->queryStatsBuilder();
             StatsSubgraphScope graphScope(builder, activity->querySubgraphId());
-
+            StatsActivityScope scope(builder, activity->queryId());
+            if (_totalCycles.totalCycles)
             {
-                StatsActivityScope scope(builder, activity->queryId());
-                //MORE: This is potentially problematic if noteProcessed is called for multiple edges => check if totalCycles is 0.
-                //There should really be two separate functions - one to update activity statistics, and another for edge statistics
-                if (_totalCycles.totalCycles)
-                {
-                    builder.addStatistic(StWhenFirstRow, (_totalCycles.firstRow));
-                    builder.addStatistic(StTimeElapsed, (_totalCycles.elapsed()));
-                    builder.addStatistic(StTimeTotalExecute, cycle_to_nanosec(_totalCycles.totalCycles));
-                    builder.addStatistic(StTimeLocalExecute, cycle_to_nanosec(_localCycles));
-                }
+                builder.addStatistic(StWhenFirstRow, (_totalCycles.firstRow));
+                builder.addStatistic(StTimeElapsed, (_totalCycles.elapsed()));
+                builder.addStatistic(StTimeTotalExecute, cycle_to_nanosec(_totalCycles.totalCycles));
+                builder.addStatistic(StTimeLocalExecute, cycle_to_nanosec(_localCycles));
             }
-
-            if (_processed)
+            ForEachItemIn(i, fromStats)
             {
-                StatsEdgeScope scope(builder, activity->queryId(), _idx);
-                builder.addStatistic(StNumRowsProcessed, _processed);
+                StatisticKind kind = fromStats.getKind(i);
+                unsigned __int64 value = fromStats.getStatisticValue(kind);
+                if (value)
+                    builder.addStatistic(kind, value);
             }
         }
-        if (_processed)
-            logctx.noteStatistic(StNumRowsProcessed, _processed);
+        logctx.mergeStats(fromStats);
     }
 
     virtual void checkAbort()

+ 2 - 1
roxie/ccd/ccdcontext.hpp

@@ -58,7 +58,8 @@ interface IRoxieSlaveContext : extends IRoxieContextLogger
     virtual IRoxieWriteHandler *createLFN(const char *filename, bool overwrite, bool extend, const StringArray &clusters) = 0;
     virtual void onFileCallback(const RoxiePacketHeader &header, const char *lfn, bool isOpt, bool isLocal) = 0;
     virtual IActivityGraph * getLibraryGraph(const LibraryCallFactoryExtra &extra, IRoxieServerActivity *parentActivity) = 0;
-    virtual void noteProcessed(const IRoxieServerActivity *activity, unsigned _idx, unsigned _processed,  const ActivityTimeAccumulator &_totalCycles, unsigned __int64 _localCycles) const = 0;
+    virtual void noteProcessed(const IRoxieServerActivity *activity, unsigned _idx, unsigned _processed) const = 0;
+    virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, const IRoxieServerActivity *activity, const ActivityTimeAccumulator &_totalCycles, unsigned __int64 _localCycles) const = 0;
     virtual IProbeManager *queryProbeManager() const = 0;
     virtual IDebuggableContext *queryDebugContext() const = 0;
     virtual void printResults(IXmlWriter *output, const char *name, unsigned sequence) = 0;

+ 59 - 37
roxie/ccd/ccdserver.cpp

@@ -314,9 +314,13 @@ public:
     {
         return ctx->getLibraryGraph(extra, parentActivity);
     }
-    virtual void noteProcessed(const IRoxieServerActivity *_activity, unsigned _idx, unsigned _processed,  const ActivityTimeAccumulator &_totalCycles, unsigned __int64 _localCycles) const
+    virtual void noteProcessed(const IRoxieServerActivity *activity, unsigned _idx, unsigned _processed) const
     {
-        ctx->noteProcessed(_activity, _idx, _processed, _totalCycles, _localCycles);
+        ctx->noteProcessed(activity, _idx, _processed);
+    }
+    virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, const IRoxieServerActivity *_activity, const ActivityTimeAccumulator &_totalCycles, unsigned __int64 _localCycles) const
+    {
+        ctx->mergeActivityStats(fromStats, _activity, _totalCycles, _localCycles);
     }
     virtual IProbeManager *queryProbeManager() const
     {
@@ -354,10 +358,6 @@ public:
     {
         return ctx->getWorkunitRowReader(wuid, name, sequence, xmlTransformer, rowAllocator, isGrouped);
     }
-    virtual void mergeStats(const CRuntimeStatisticCollection &from)
-    {
-        ctx->mergeStats(from);
-    }
 protected:
     IRoxieSlaveContext * ctx;
 };
@@ -487,16 +487,26 @@ public:
         CActivityFactory::mergeStats(from);
     }
 
-    virtual void noteProcessed(unsigned idx, unsigned _processed,  const ActivityTimeAccumulator &_totalCycles, unsigned __int64 _localCycles) const
+    virtual void noteProcessed(unsigned _idx, unsigned _processed) const
+    {
+        dbgassertex(!_idx);
+        if (_processed)
+        {
+            CriticalBlock b(statsCrit);
+            processed += processed;
+        }
+    }
+
+    virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, const ActivityTimeAccumulator &_totalCycles, unsigned __int64 _localCycles) const
     {
-        if (_processed || _totalCycles.totalCycles || _localCycles)
+        if (_totalCycles.totalCycles || _localCycles)
         {
             CriticalBlock b(statsCrit);
             dbgassertex(_totalCycles.totalCycles >= _localCycles);
-            processed += _processed;
             totalCycles += _totalCycles.totalCycles;
             localCycles += _localCycles;
         }
+        CActivityFactory::mergeStats(fromStats);
     }
 
     virtual void noteStarted() const
@@ -800,14 +810,12 @@ protected:
         }
     }
 
-    virtual void noteProcessed(unsigned idx, unsigned _processed,  const ActivityTimeAccumulator &_totalCycles, unsigned __int64 _localCycles) const
-    {
-        assertex(numOutputs ? idx < numOutputs : idx==0);
-        CriticalBlock b(statsCrit);
-        processedArray[idx] += _processed;
-        totalCycles += _totalCycles.totalCycles;
-        localCycles += _localCycles;
-    }
+    virtual void noteProcessed(unsigned idx, unsigned _processed) const
+     {
+         assertex(numOutputs ? idx < numOutputs : idx==0);
+         CriticalBlock b(statsCrit);
+         processedArray[idx] += _processed;
+     }
 
     virtual void noteStarted(unsigned idx) const
     {
@@ -966,13 +974,15 @@ public:
         }
         if (factory && !debugging)
         {
-            factory->noteProcessed(0, processed, totalCycles, localCycles);
-            factory->mergeStats(stats);
+            if (processed)
+                factory->noteProcessed(0, processed);
+            factory->mergeActivityStats(stats, totalCycles, localCycles);
         }
         if (ctx)
         {
-            ctx->noteProcessed(this, 0, processed, totalCycles, localCycles);
-            ctx->mergeStats(stats);
+            if (processed)
+                ctx->noteProcessed(this, 0, processed);
+            ctx->mergeActivityStats(stats, this, totalCycles, localCycles);
         }
         basehelper.Release();
         ::Release(rowAllocator);
@@ -3485,12 +3495,14 @@ public:
         memset(buffers, 0, (numChannels+1)*sizeof(ChannelBuffer *));
         parentExtractSize = 0;
         parentExtract = NULL;
+        debugContext = NULL;
         owner = NULL;
         mergeOrder = NULL;
         deferredStart = false;
         processed = 0;
         sentSequence = 0;
         resendSequence = 0;
+        totalCycles = 0;
         serverSideCache = activity.queryServerSideCache();
         bufferStream.setown(createMemoryBufferSerialStream(tempRowBuffer));
         rowSource.setStream(bufferStream);
@@ -8308,7 +8320,7 @@ public:
         unsigned idx;
         unsigned oid;
         unsigned processed;
-        ActivityTimeAccumulator totalCycles;
+        unsigned __int64 totalCycles;  // We track this per output so that the pullers get a meaningful value to use when calculating their localtime
 
     public:
         IMPLEMENT_IINTERFACE;
@@ -8319,6 +8331,7 @@ public:
             oid = 0;
             idx = 0;
             processed = 0;
+            totalCycles = 0;
             eofpending = false;
             eof = false;
             stopped = false;
@@ -8328,6 +8341,12 @@ public:
         {
             if (traceStartStop)
                 DBGLOG("%p ~OutputAdaptor %d", this, oid);
+            if (processed && parent && parent->factory)
+            {
+                parent->factory->noteProcessed(oid, processed);
+                if (parent->ctx)
+                    parent->ctx->noteProcessed(parent, oid, processed);
+            }
         }
 
         void init()
@@ -8336,7 +8355,6 @@ public:
                 DBGLOG("%p init Input adaptor %d", this, oid);
             idx = 0;
             processed = 0;
-            totalCycles.totalCycles = 0;
             eofpending = false;
             eof = false;
             stopped = false;
@@ -8359,12 +8377,12 @@ public:
         
         virtual unsigned __int64 queryTotalCycles() const
         {
-            return totalCycles.totalCycles;
+            return totalCycles;
         }
 
         virtual unsigned __int64 queryLocalCycles() const
         {
-            return 0;
+            return 0;  // Should never be called
         }
 
         virtual IRoxieInput *queryInput(unsigned idx) const
@@ -8374,7 +8392,7 @@ public:
 
         virtual const void * nextInGroup()
         {
-            ActivityTimer t(totalCycles, parent->timeActivities);
+            SimpleActivityTimer t(totalCycles, parent->timeActivities);
             if (eof)
                 return NULL;
             const void *ret = parent->readBuffered(idx, oid);
@@ -8428,8 +8446,6 @@ public:
             if (traceStartStop)
                 parent->CTXLOG("%p reset Input adaptor %d stopped = %d", this, oid, stopped);
             parent->reset(oid);
-            parent->processed += processed;
-            processed = 0;
             idx = 0; // value should not be relevant really but this is the safest...
             stopped = false;
         };
@@ -8508,7 +8524,7 @@ public:
     const void *readBuffered(unsigned idx, unsigned oid)
     {
         CriticalBlock b(crit);
-        ActivityTimer t(totalCycles, timeActivities); // NOTE - time spent waiting for crit not included here. Is that right?
+        ActivityTimer t(totalCycles, timeActivities); // NOTE - time spent waiting for crit not included here. But it will have been included on the totalTime of the person holding the crit, so that is right
         if (idx == headIdx) // test once without getting the crit2 sec
         {
             CriticalUnblock b1(crit);
@@ -8667,11 +8683,6 @@ public:
             CRoxieServerActivity::reset();
     };
 
-    virtual unsigned __int64 queryLocalCycles() const
-    {
-        return 0;
-    }
-
     virtual const void *nextInGroup()
     {
         throwUnexpected(); // Internal logic error - we are not anybody's input
@@ -15164,6 +15175,18 @@ class CRoxieServerLibraryCallActivity : public CRoxieServerActivity
             init();
         }
 
+        ~OutputAdaptor()
+        {
+            if (traceStartStop)
+                DBGLOG("%p ~OutputAdaptor %d", this, oid);
+            if (processed && parent && parent->factory)
+            {
+                parent->factory->noteProcessed(oid, processed);
+                if (parent->ctx)
+                    parent->ctx->noteProcessed(parent, oid, processed);
+            }
+        }
+
         void init()
         {
             processed = 0;
@@ -15193,7 +15216,7 @@ class CRoxieServerLibraryCallActivity : public CRoxieServerActivity
 
         virtual void reset()
         {
-            parent->reset(oid, processed);
+            parent->reset(oid);
             CExtractMapperInput::reset();
             init();
         };
@@ -15352,9 +15375,8 @@ public:
         }
     }
 
-    void reset(unsigned oid, unsigned _processed)
+    void reset(unsigned oid)
     {
-        processed += _processed;
         started = false;
         error.clear();
         numActiveOutputs = numOutputs;

+ 2 - 1
roxie/ccd/ccdserver.hpp

@@ -200,7 +200,8 @@ interface IRoxieServerActivityFactory : extends IActivityFactory
     virtual IOutputMetaData *queryOutputMeta() const = 0;
     virtual IHThorArg &getHelper() const = 0;
     virtual IRoxieServerActivity *createFunction(IHThorArg &helper, IProbeManager *_probeManager) const = 0;
-    virtual void noteProcessed(unsigned idx, unsigned processed, const ActivityTimeAccumulator &totalCycles, unsigned __int64 localCycles) const = 0;
+    virtual void noteProcessed(unsigned idx, unsigned processed) const = 0;
+    virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, const ActivityTimeAccumulator &totalCycles, unsigned __int64 localCycles) const = 0;
     virtual void onCreateChildQueries(IRoxieSlaveContext *ctx, IHThorArg *colocalArg, IArrayOf<IActivityGraph> &childGraphs) const = 0;
     virtual void createChildQueries(IArrayOf<IActivityGraph> &childGraphs, IRoxieServerActivity *parentActivity, IProbeManager *_probeManager, const IRoxieContextLogger &_logctx) const = 0;
     virtual void noteStarted() const = 0;