Browse Source

Merge pull request #8316 from ghalliday/issue14871b

HPCC-14871 Initialise strands variables after activity has been initialised

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 years ago
parent
commit
8488cae577
2 changed files with 59 additions and 28 deletions
  1. 49 22
      roxie/ccd/ccdserver.cpp
  2. 10 6
      roxie/roxiemem/roxiemem.cpp

+ 49 - 22
roxie/ccd/ccdserver.cpp

@@ -1653,6 +1653,7 @@ protected:
     IArrayOf<StrandProcessor> strands;
     Owned<IStrandBranch> branch;
     Owned<IStrandJunction> splitter;
+    Owned<IStrandJunction> sourceJunction; // A junction applied to the output of a source activity
     std::atomic<unsigned> active;
 public:
     CRoxieServerStrandedActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, const StrandOptions &_strandOptions)
@@ -1662,16 +1663,10 @@ public:
         active = 0;
     }
 
-    virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
-    {
-        ForEachItemIn(idx, strands)
-        {
-            strands.item(idx).start();
-            active++;
-        }
-        CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
-        startJunction(splitter);
-    }
+    //This function is pure (But also implemented out of line) to force the derived classes to implement it.
+    //After calling the base class start method, and initialising any values from the helper they must call onStartStrands(),
+    //this must also happen before any rows are read from the strands (e.g., by a source junction)
+    virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused) = 0;
 
     virtual void reset()
     {
@@ -1680,6 +1675,7 @@ public:
             strands.item(idx).reset();
         resetJunction(splitter);
         CRoxieServerActivity::reset();
+        resetJunction(sourceJunction);
     }
 
     virtual void stop()
@@ -1766,14 +1762,14 @@ public:
                     //If the output activities are also stranded then need to create a version of the branch
                     bool isGrouped = queryOutputMeta()->isGrouped();
                     branch.setown(createStrandBranch(ctx->queryRowManager(), strandOptions.numStrands, strandOptions.blockSize, true, isGrouped, true));
-                    splitter.set(branch->queryInputJunction());
+                    sourceJunction.set(branch->queryInputJunction());
                     recombiner.set(branch->queryOutputJunction());
 
                     //This is different from the branch above.  The first "junction" has the source activity as the input, and the outputs as the result of the activity
                     for (unsigned strandNo = 0; strandNo < strandOptions.numStrands; strandNo++)
                     {
-                        splitter->setInput(strandNo, &strands.item(strandNo));
-                        streams.append(splitter->queryOutput(strandNo));
+                        sourceJunction->setInput(strandNo, &strands.item(strandNo));
+                        streams.append(sourceJunction->queryOutput(strandNo));
                     }
 #ifdef TRACE_STRANDS
                     if (traceLevel > 2)
@@ -1806,8 +1802,25 @@ public:
     }
 
     inline unsigned numStrands() const { return strands.ordinality(); }
+
+protected:
+
+    void onStartStrands()
+    {
+        ForEachItemIn(idx, strands)
+            strands.item(idx).start();
+    }
 };
 
+
+//For some reason gcc doesn't let you specify a function as pure virtual and define it at the same time.
+void CRoxieServerStrandedActivity::start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
+{
+    CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
+    startJunction(splitter);
+}
+
+
 class CRoxieServerStrandedLateStartActivity : public CRoxieServerStrandedActivity
 {
 protected:
@@ -1820,10 +1833,11 @@ protected:
         eof = prefiltered;
         if (!prefiltered)
         {
-            ForEachItemIn(idx, strands)
-                strands.item(idx).start();
+            //At this point
             input->start(parentExtractSize, parentExtract, false);
             startJunction(splitter);
+            onStartStrands();  // Initialise the strands - the helper will have been correctly initialized by this point.
+            startJunction(sourceJunction);
         }
         else
         {
@@ -5720,11 +5734,11 @@ class CRoxieServerStrandedInlineTableActivity : public CRoxieServerStrandedActiv
             : StrandProcessor(_parent, NULL, true), helper(_helper), curRow(0), maxRows(0)
         {
         }
-        virtual void start()
+        virtual void start() override
         {
+            StrandProcessor::start();
             curRow = 0;
             maxRows = queryParent().numRows;
-            StrandProcessor::start();
         }
         virtual const void * nextRow()
         {
@@ -5766,14 +5780,14 @@ class CRoxieServerStrandedInlineTableActivity : public CRoxieServerStrandedActiv
             sectionSize = queryParent().strandOptions.blockSize;
             eosPending = false;
         }
-        virtual void start()
+        virtual void start() override
         {
+            StrandProcessor::start();
             numStrands = queryParent().numStrands();
             maxRows = queryParent().numRows;
             curRow = std::min(whichStrand * sectionSize, maxRows);
             sectionMaxRows = std::min(curRow + sectionSize, maxRows);
             eosPending = isOrdered && (curRow != maxRows);
-            StrandProcessor::start();
         }
         virtual const void * nextRow()
         {
@@ -5829,10 +5843,11 @@ public:
     }
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
     {
-        numRows = helper.numRows();
         CRoxieServerStrandedActivity::start(parentExtractSize, parentExtract, paused);
+        numRows = helper.numRows();
+        onStartStrands();
+        startJunction(sourceJunction); // This must be started *after* all the strands have been initialised
     }
-
     virtual void setInput(unsigned idx, unsigned _sourceIdx, IFinalRoxieInput *_in)
     {
         throw MakeStringException(ROXIE_SET_INPUT, "Internal error: setInput() called for source activity");
@@ -13795,6 +13810,12 @@ public:
     {
     }
 
+    virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
+    {
+        CRoxieServerStrandedActivity::start(parentExtractSize, parentExtract, paused);
+        onStartStrands();
+    }
+
     virtual StrandProcessor *createStrandProcessor(IEngineRowStream *instream)
     {
         return new ProjectProcessor(*this, instream, (IHThorProjectArg &) basehelper);
@@ -20217,12 +20238,12 @@ class CRoxieServerStrandedParseActivity : public CRoxieServerStrandedActivity
         }
         virtual void start()
         {
+            StrandProcessor::start();
             numProcessedLastGroup = 0;
             curSearchTextLen = 0;
             curSearchText = NULL;
             in = NULL;
             parser->reset();
-            StrandProcessor::start();
         }
         virtual void reset()
         {
@@ -20285,6 +20306,12 @@ public:
     {
     }
 
+    virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
+    {
+        CRoxieServerStrandedActivity::start(parentExtractSize, parentExtract, paused);
+        onStartStrands();
+    }
+
     virtual StrandProcessor *createStrandProcessor(IEngineRowStream *instream)
     {
         return new ParseProcessor(*this, instream, (IHThorParseArg &) basehelper, algorithm);

+ 10 - 6
roxie/roxiemem/roxiemem.cpp

@@ -24,6 +24,7 @@
 #include "tbb/task.h"
 #include "tbb/task_scheduler_init.h"
 #endif
+#include <atomic>
 #include <utility>
 
 #ifndef _WIN32
@@ -3444,7 +3445,7 @@ void initAllocSizeMappings(const unsigned * sizes)
 
 //---------------------------------------------------------------------------------------------------------------------
 
-
+static std::atomic<unsigned> activeRowManagers;
 class CChunkingRowManager : public CRowManager
 {
     friend class CRoxieFixedRowHeap;
@@ -3525,7 +3526,8 @@ public:
         trackMemoryByActivity = false;
 #endif
         if (memTraceLevel >= 2)
-            logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager c-tor memLimit=%" I64F "u pageLimit=%u rowMgr=%p", (unsigned __int64)_memLimit, maxPageLimit, this);
+            logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager c-tor memLimit=%" I64F "u pageLimit=%u rowMgr=%p num=%u",
+                          (unsigned __int64)_memLimit, maxPageLimit, this, activeRowManagers.load());
         if (timeLimit)
         {
             cyclesChecked = get_cycles_now();
@@ -3536,13 +3538,15 @@ public:
             cyclesChecked = 0;
             cyclesCheckInterval = 0;
         }
+        activeRowManagers++;
     }
 
     ~CChunkingRowManager()
     {
+        activeRowManagers--;
         if (memTraceLevel >= 2)
-            logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager d-tor pageLimit=%u peakPages=%u dataBuffs=%u dataBuffPages=%u possibleGoers=%u rowMgr=%p", 
-                    maxPageLimit, peakPages, dataBuffs, dataBuffPages, atomic_read(&possibleGoers), this);
+            logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager d-tor pageLimit=%u peakPages=%u dataBuffs=%u dataBuffPages=%u possibleGoers=%u rowMgr=%p num=%u",
+                    maxPageLimit, peakPages, dataBuffs, dataBuffPages, atomic_read(&possibleGoers), this, activeRowManagers.load());
 
         if (!ignoreLeaks)
             reportLeaks(2);
@@ -4249,8 +4253,8 @@ protected:
         }
         else
         {
-            logctx.CTXLOG("RoxieMemMgr: pageLimit=%u peakPages=%u dataBuffs=%u dataBuffPages=%u possibleGoers=%u rowMgr=%p",
-                          maxPageLimit, peakPages, dataBuffs, dataBuffPages, atomic_read(&possibleGoers), this);
+            logctx.CTXLOG("RoxieMemMgr: pageLimit=%u peakPages=%u dataBuffs=%u dataBuffPages=%u possibleGoers=%u rowMgr=%p cnt(%u)",
+                          maxPageLimit, peakPages, dataBuffs, dataBuffPages, atomic_read(&possibleGoers), this, activeRowManagers.load());
             Owned<IActivityMemoryUsageMap> map = getActivityUsage();
             map->report(logctx, allocatorCache);
         }