Browse Source

HPCC-20152 Fix global loop result issues.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 7 years ago
parent
commit
784c513358

+ 5 - 4
thorlcr/activities/loop/thloop.cpp

@@ -229,9 +229,6 @@ public:
             unsigned loopCounter = 1;
             for (;;)
             {
-                if (sync(loopCounter))
-                    break;
-
                 // NB: This is exactly the same as the slave implementation up until the execute().
                 IThorBoundLoopGraph *boundGraph = queryContainer().queryLoopGraph();
                 unsigned condLoopCounter = (flags & IHThorLoopArg::LFcounter) ? loopCounter : 0;
@@ -247,9 +244,13 @@ public:
                 if (loopAgain) // cannot be 0
                     boundGraph->prepareLoopAgainResult(*this, ownedResults, loopAgain);
 
+                // ensure results prepared before graph begins
+                if (sync(loopCounter))
+                    break;
+
                 boundGraph->execute(*this, condLoopCounter, ownedResults, (IRowWriterMultiReader *)NULL, 0, extractBuilder.size(), extractBuilder.getbytes());
                 ++loopCounter;
-                if (barrier)
+                if (barrier) // barrier passed once loopAgain result used
                 {
                     if (!barrier->wait(false))
                         break;

+ 20 - 20
thorlcr/activities/loop/thloopslave.cpp

@@ -362,6 +362,21 @@ public:
                     }
                 }
 
+                IThorBoundLoopGraph *boundGraph = queryContainer().queryLoopGraph();
+                unsigned condLoopCounter = (flags & IHThorLoopArg::LFcounter) ? loopCounter : 0;
+                unsigned loopAgain = (flags & IHThorLoopArg::LFnewloopagain) ? helper->loopAgainResult() : 0;
+                ownedResults.setown(queryGraph().createThorGraphResults(3));
+                // ensures remote results are available, via owning activity (i.e. this loop act)
+                // so that when aggregate result is fetched from the master, it will retrieve from the act, not the (already cleaned) graph localresults
+                ownedResults->setOwner(container.queryId());
+
+                boundGraph->prepareLoopResults(*this, ownedResults);
+                if (condLoopCounter) // cannot be 0
+                    boundGraph->prepareCounterResult(*this, ownedResults, condLoopCounter, 2);
+                if (loopAgain) // cannot be 0
+                    boundGraph->prepareLoopAgainResult(*this, ownedResults, loopAgain);
+
+                // ensure results prepared before graph begins
                 if (syncIterations)
                 {
                     // 0 signals this slave has finished, but don't stop until all have
@@ -380,21 +395,6 @@ public:
                 }
 
                 loopPending->flush();
-
-                IThorBoundLoopGraph *boundGraph = queryContainer().queryLoopGraph();
-                unsigned condLoopCounter = (flags & IHThorLoopArg::LFcounter) ? loopCounter:0;
-                unsigned loopAgain = (flags & IHThorLoopArg::LFnewloopagain) ? helper->loopAgainResult() : 0;
-                ownedResults.setown(queryGraph().createThorGraphResults(3));
-                // ensures remote results are available, via owning activity (i.e. this loop act)
-                // so that when aggregate result is fetched from the master, it will retrieve from the act, not the (already cleaned) graph localresults
-                ownedResults->setOwner(container.queryId());
-
-                boundGraph->prepareLoopResults(*this, ownedResults);
-                if (condLoopCounter) // cannot be 0
-                    boundGraph->prepareCounterResult(*this, ownedResults, condLoopCounter, 2);
-                if (loopAgain) // cannot be 0
-                    boundGraph->prepareLoopAgainResult(*this, ownedResults, loopAgain);
-
                 boundGraph->execute(*this, condLoopCounter, ownedResults, loopPending.getClear(), loopPendingCount, extractBuilder.size(), extractBuilder.getbytes());
 
                 Owned<IThorResult> result0 = ownedResults->getResult(0);
@@ -402,11 +402,6 @@ public:
 
                 if (flags & IHThorLoopArg::LFnewloopagain)
                 {
-                    if (barrier)
-                    {
-                        if (!barrier->wait(false))
-                            return NULL; // aborted
-                    }
                     Owned<IThorResult> loopAgainResult = ownedResults->getResult(helper->loopAgainResult(), !queryGraph().isLocalChild());
                     assertex(loopAgainResult);
                     Owned<IRowStream> loopAgainRows = loopAgainResult->getRowStream();
@@ -415,6 +410,11 @@ public:
                     //Result is a row which contains a single boolean field.
                     if (!((const bool *)row.get())[0])
                         finishedLooping = true; // NB: will finish when loopPending has been consumed
+                    if (barrier) // barrier passed once loopAgain result used
+                    {
+                        if (!barrier->wait(false))
+                            return NULL; // aborted
+                    }
                 }
                 loopPending.setown(createOverflowableBuffer(*this, this, ers_forbidden, true));
                 loopPendingCount = 0;

+ 2 - 2
thorlcr/graph/thgraph.cpp

@@ -256,8 +256,8 @@ public:
         if (!resultRowIf)
             resultRowIf.setown(activity.createRowInterfaces(resultMeta));
         ThorGraphResultType resultType = activity.queryGraph().isLocalChild() ? thorgraphresult_nul : thorgraphresult_distributed;
-        IThorResult *loopResult = results->createResult(activity, 0, resultRowIf, resultType); // loop output
-        IThorResult *inputResult = results->createResult(activity, 1, resultRowIf, resultType); // loop input
+        IThorResult *loopResult =  activity.queryGraph().createResult(activity, 0, results, resultRowIf, resultType); // loop output
+        IThorResult *inputResult = activity.queryGraph().createResult(activity, 1, results, resultRowIf, resultType); // loop input
     }
     virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *results, IRowWriterMultiReader *inputStream, rowcount_t rowStreamCount, size32_t parentExtractSz, const byte *parentExtract)
     {