瀏覽代碼

HPCC-20864 Avoid race condition fetching loop input dataset

If a child query activity asked for the entire global loop
input dataset, a race could happen where the result on some
slaves was not prepared at the time the result was requested,
causing an assert to be hit.

Fix is to move the synchronization point, until after the
input is prepared, to do this required moving some of the
code out of a couple of 'execute' helper functions.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 6 年之前
父節點
當前提交
335cf6dd96
共有 4 個文件被更改,包括 23 次插入50 次删除
  1. 11 2
      thorlcr/activities/loop/thloop.cpp
  2. 11 3
      thorlcr/activities/loop/thloopslave.cpp
  3. 1 43
      thorlcr/graph/thgraph.cpp
  4. 0 2
      thorlcr/graph/thgraph.hpp

+ 11 - 2
thorlcr/activities/loop/thloop.cpp

@@ -248,7 +248,8 @@ public:
                 if (sync(loopCounter))
                     break;
 
-                boundGraph->execute(*this, condLoopCounter, ownedResults, (IRowWriterMultiReader *)NULL, 0, extractBuilder.size(), extractBuilder.getbytes());
+                boundGraph->queryGraph()->executeChild(extractBuilder.size(), extractBuilder.getbytes(), ownedResults, NULL);
+
                 ++loopCounter;
                 if (barrier) // barrier passed once all slave graphs have completed
                 {
@@ -320,9 +321,17 @@ public:
         unsigned loopCounter = 1;
         for (;;)
         {
+            Owned<IThorGraphResults> results = queryGraph().createThorGraphResults(1);
+            unsigned condLoopCounter = (helper->getFlags() & IHThorGraphLoopArg::GLFcounter) ? loopCounter : 0;
+            IThorBoundLoopGraph *boundGraph = queryContainer().queryLoopGraph();
+            if (condLoopCounter)
+                boundGraph->prepareCounterResult(*this, results, condLoopCounter, 0);
+
             if (sync(loopCounter))
                 break;
-            queryContainer().queryLoopGraph()->execute(*this, (helper->getFlags() & IHThorGraphLoopArg::GLFcounter)?loopCounter:0, loopResults.get(), extractBuilder.size(), extractBuilder.getbytes());
+
+            boundGraph->queryGraph()->executeChild(extractBuilder.size(), extractBuilder.getbytes(), results, loopResults);
+
             ++loopCounter;
         }
     }

+ 11 - 3
thorlcr/activities/loop/thloopslave.cpp

@@ -376,6 +376,10 @@ public:
                 if (loopAgain) // cannot be 0
                     boundGraph->prepareLoopAgainResult(*this, ownedResults, loopAgain);
 
+                loopPending->flush();
+                Owned<IThorResult> inputResult = ownedResults->getResult(1);
+                inputResult->setResultStream(loopPending.getClear(), loopPendingCount);
+
                 // ensure results prepared before graph begins
                 if (syncIterations)
                 {
@@ -394,8 +398,7 @@ public:
                     return NULL;
                 }
 
-                loopPending->flush();
-                boundGraph->execute(*this, condLoopCounter, ownedResults, loopPending.getClear(), loopPendingCount, extractBuilder.size(), extractBuilder.getbytes());
+                boundGraph->queryGraph()->executeChild(extractBuilder.size(), extractBuilder.getbytes(), ownedResults, NULL);
 
                 Owned<IThorResult> result0 = ownedResults->getResult(0);
                 curInput.setown(result0->getRowStream());
@@ -506,10 +509,15 @@ public:
                 resultWriter->putRow(row.getClear());
             }
 
+            IThorBoundLoopGraph *boundGraph = queryContainer().queryLoopGraph();
             for (; loopCounter<=maxIterations; loopCounter++)
             {
+                unsigned condLoopCounter = (helper->getFlags() & IHThorGraphLoopArg::GLFcounter) ? loopCounter : 0;
+                Owned<IThorGraphResults> results = queryGraph().createThorGraphResults(1);
+                if (condLoopCounter)
+                    boundGraph->prepareCounterResult(*this, results, condLoopCounter, 0);
                 sendLoopingCount(loopCounter, 0);
-                queryContainer().queryLoopGraph()->execute(*this, (flags & IHThorGraphLoopArg::GLFcounter)?loopCounter:0, loopResults, extractBuilder.size(), extractBuilder.getbytes());
+                boundGraph->queryGraph()->executeChild(parentExtractSz, parentExtract, results, loopResults);
             }
             int iNumResults = loopResults->count();
             Owned<IThorResult> finalResult = loopResults->getResult(iNumResults-1); //Get the last result, which isnt necessarily 'maxIterations'

+ 1 - 43
thorlcr/graph/thgraph.cpp

@@ -244,6 +244,7 @@ public:
         IThorResult *counterResult = results->createResult(activity, pos, countRowIf, thorgraphresult_nul, SPILL_PRIORITY_DISABLE);
         Owned<IRowWriter> counterResultWriter = counterResult->getWriter();
         counterResultWriter->putRow(counterRowFinal.getClear());
+        graph->setLoopCounter(loopCounter);
     }
     virtual void prepareLoopAgainResult(CActivityBase &activity, IThorGraphResults *results, unsigned pos)
     {
@@ -259,49 +260,6 @@ public:
         IThorResult *loopResult =  activity.queryGraph().createResult(activity, 0, results, resultRowIf, resultType); // loop output
         IThorResult *inputResult = activity.queryGraph().createResult(activity, 1, results, resultRowIf, resultType); // loop input
     }
-    virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *results, IRowWriterMultiReader *inputStream, rowcount_t rowStreamCount, size32_t parentExtractSz, const byte *parentExtract)
-    {
-        if (counter)
-            graph->setLoopCounter(counter);
-        Owned<IThorResult> inputResult = results->getResult(1);
-        if (inputStream)
-            inputResult->setResultStream(inputStream, rowStreamCount);
-        graph->executeChild(parentExtractSz, parentExtract, results, NULL);
-    }
-    virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *graphLoopResults, size32_t parentExtractSz, const byte *parentExtract)
-    {
-        Owned<IThorGraphResults> results = graph->createThorGraphResults(1);
-        if (counter)
-        {
-            prepareCounterResult(activity, results, counter, 0);
-            graph->setLoopCounter(counter);
-        }
-        try
-        {
-            graph->executeChild(parentExtractSz, parentExtract, results, graphLoopResults);
-        }
-        catch (IException *e)
-        {
-            IThorException *te = QUERYINTERFACE(e, IThorException);
-            if (!te)
-            {
-                Owned<IThorException> e2 = MakeActivityException(&activity, e, "Exception running child graphs");
-                e->Release();
-                te = e2.getClear();
-            }
-            else if (!te->queryActivityId())
-                setExceptionActivityInfo(activity.queryContainer(), te);
-            try { graph->abort(te); }
-            catch (IException *abortE)
-            {
-                Owned<IThorException> e2 = MakeActivityException(&activity, abortE, "Exception whilst aborting graph");
-                abortE->Release();
-                EXCLOG(e2, NULL);
-            }
-            graph->queryJobChannel().fireException(te);
-            throw te;
-        }
-    }
     virtual CGraphBase *queryGraph() { return graph; }
 };
 

+ 0 - 2
thorlcr/graph/thgraph.hpp

@@ -158,8 +158,6 @@ interface IThorBoundLoopGraph : extends IInterface
     virtual void prepareLoopResults(CActivityBase &activity, IThorGraphResults *results) = 0;
     virtual void prepareCounterResult(CActivityBase &activity, IThorGraphResults *results, unsigned loopCounter, unsigned pos) = 0;
     virtual void prepareLoopAgainResult(CActivityBase &activity, IThorGraphResults *results, unsigned pos) = 0;
-    virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *results, IRowWriterMultiReader *rowStream, rowcount_t rowStreamCount, size32_t parentExtractSz, const byte * parentExtract) = 0;
-    virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults * graphLoopResults, size32_t parentExtractSz, const byte * parentExtract) = 0;
     virtual CGraphBase *queryGraph() = 0;
 };