Browse Source

Merge pull request #11883 from jakesmith/hpcc-20864

HPCC-20864 Avoid race condition fetching loop input dataset

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 6 years ago
parent
commit
d760b3dcec

+ 11 - 2
thorlcr/activities/loop/thloop.cpp

@@ -248,7 +248,8 @@ public:
                 if (sync(loopCounter))
                     break;
 
-                boundGraph->execute(*this, condLoopCounter, ownedResults, (IRowWriterMultiReader *)NULL, 0, extractBuilder.size(), extractBuilder.getbytes());
+                boundGraph->queryGraph()->executeChild(extractBuilder.size(), extractBuilder.getbytes(), ownedResults, NULL);
+
                 ++loopCounter;
                 if (barrier) // barrier passed once all slave graphs have completed
                 {
@@ -320,9 +321,17 @@ public:
         unsigned loopCounter = 1;
         for (;;)
         {
+            Owned<IThorGraphResults> results = queryGraph().createThorGraphResults(1);
+            unsigned condLoopCounter = (helper->getFlags() & IHThorGraphLoopArg::GLFcounter) ? loopCounter : 0;
+            IThorBoundLoopGraph *boundGraph = queryContainer().queryLoopGraph();
+            if (condLoopCounter)
+                boundGraph->prepareCounterResult(*this, results, condLoopCounter, 0);
+
             if (sync(loopCounter))
                 break;
-            queryContainer().queryLoopGraph()->execute(*this, (helper->getFlags() & IHThorGraphLoopArg::GLFcounter)?loopCounter:0, loopResults.get(), extractBuilder.size(), extractBuilder.getbytes());
+
+            boundGraph->queryGraph()->executeChild(extractBuilder.size(), extractBuilder.getbytes(), results, loopResults);
+
             ++loopCounter;
         }
     }

+ 11 - 3
thorlcr/activities/loop/thloopslave.cpp

@@ -376,6 +376,10 @@ public:
                 if (loopAgain) // cannot be 0
                     boundGraph->prepareLoopAgainResult(*this, ownedResults, loopAgain);
 
+                loopPending->flush();
+                Owned<IThorResult> inputResult = ownedResults->getResult(1);
+                inputResult->setResultStream(loopPending.getClear(), loopPendingCount);
+
                 // ensure results prepared before graph begins
                 if (syncIterations)
                 {
@@ -394,8 +398,7 @@ public:
                     return NULL;
                 }
 
-                loopPending->flush();
-                boundGraph->execute(*this, condLoopCounter, ownedResults, loopPending.getClear(), loopPendingCount, extractBuilder.size(), extractBuilder.getbytes());
+                boundGraph->queryGraph()->executeChild(extractBuilder.size(), extractBuilder.getbytes(), ownedResults, NULL);
 
                 Owned<IThorResult> result0 = ownedResults->getResult(0);
                 curInput.setown(result0->getRowStream());
@@ -506,10 +509,15 @@ public:
                 resultWriter->putRow(row.getClear());
             }
 
+            IThorBoundLoopGraph *boundGraph = queryContainer().queryLoopGraph();
             for (; loopCounter<=maxIterations; loopCounter++)
             {
+                unsigned condLoopCounter = (helper->getFlags() & IHThorGraphLoopArg::GLFcounter) ? loopCounter : 0;
+                Owned<IThorGraphResults> results = queryGraph().createThorGraphResults(1);
+                if (condLoopCounter)
+                    boundGraph->prepareCounterResult(*this, results, condLoopCounter, 0);
                 sendLoopingCount(loopCounter, 0);
-                queryContainer().queryLoopGraph()->execute(*this, (flags & IHThorGraphLoopArg::GLFcounter)?loopCounter:0, loopResults, extractBuilder.size(), extractBuilder.getbytes());
+                boundGraph->queryGraph()->executeChild(parentExtractSz, parentExtract, results, loopResults);
             }
             int iNumResults = loopResults->count();
             Owned<IThorResult> finalResult = loopResults->getResult(iNumResults-1); //Get the last result, which isnt necessarily 'maxIterations'

+ 1 - 43
thorlcr/graph/thgraph.cpp

@@ -244,6 +244,7 @@ public:
         IThorResult *counterResult = results->createResult(activity, pos, countRowIf, thorgraphresult_nul, SPILL_PRIORITY_DISABLE);
         Owned<IRowWriter> counterResultWriter = counterResult->getWriter();
         counterResultWriter->putRow(counterRowFinal.getClear());
+        graph->setLoopCounter(loopCounter);
     }
     virtual void prepareLoopAgainResult(CActivityBase &activity, IThorGraphResults *results, unsigned pos)
     {
@@ -259,49 +260,6 @@ public:
         IThorResult *loopResult =  activity.queryGraph().createResult(activity, 0, results, resultRowIf, resultType); // loop output
         IThorResult *inputResult = activity.queryGraph().createResult(activity, 1, results, resultRowIf, resultType); // loop input
     }
-    virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *results, IRowWriterMultiReader *inputStream, rowcount_t rowStreamCount, size32_t parentExtractSz, const byte *parentExtract)
-    {
-        if (counter)
-            graph->setLoopCounter(counter);
-        Owned<IThorResult> inputResult = results->getResult(1);
-        if (inputStream)
-            inputResult->setResultStream(inputStream, rowStreamCount);
-        graph->executeChild(parentExtractSz, parentExtract, results, NULL);
-    }
-    virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *graphLoopResults, size32_t parentExtractSz, const byte *parentExtract)
-    {
-        Owned<IThorGraphResults> results = graph->createThorGraphResults(1);
-        if (counter)
-        {
-            prepareCounterResult(activity, results, counter, 0);
-            graph->setLoopCounter(counter);
-        }
-        try
-        {
-            graph->executeChild(parentExtractSz, parentExtract, results, graphLoopResults);
-        }
-        catch (IException *e)
-        {
-            IThorException *te = QUERYINTERFACE(e, IThorException);
-            if (!te)
-            {
-                Owned<IThorException> e2 = MakeActivityException(&activity, e, "Exception running child graphs");
-                e->Release();
-                te = e2.getClear();
-            }
-            else if (!te->queryActivityId())
-                setExceptionActivityInfo(activity.queryContainer(), te);
-            try { graph->abort(te); }
-            catch (IException *abortE)
-            {
-                Owned<IThorException> e2 = MakeActivityException(&activity, abortE, "Exception whilst aborting graph");
-                abortE->Release();
-                EXCLOG(e2, NULL);
-            }
-            graph->queryJobChannel().fireException(te);
-            throw te;
-        }
-    }
     virtual CGraphBase *queryGraph() { return graph; }
 };
 

+ 0 - 2
thorlcr/graph/thgraph.hpp

@@ -158,8 +158,6 @@ interface IThorBoundLoopGraph : extends IInterface
     virtual void prepareLoopResults(CActivityBase &activity, IThorGraphResults *results) = 0;
     virtual void prepareCounterResult(CActivityBase &activity, IThorGraphResults *results, unsigned loopCounter, unsigned pos) = 0;
     virtual void prepareLoopAgainResult(CActivityBase &activity, IThorGraphResults *results, unsigned pos) = 0;
-    virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *results, IRowWriterMultiReader *rowStream, rowcount_t rowStreamCount, size32_t parentExtractSz, const byte * parentExtract) = 0;
-    virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults * graphLoopResults, size32_t parentExtractSz, const byte * parentExtract) = 0;
     virtual CGraphBase *queryGraph() = 0;
 };