|
@@ -362,6 +362,21 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ IThorBoundLoopGraph *boundGraph = queryContainer().queryLoopGraph();
|
|
|
+ unsigned condLoopCounter = (flags & IHThorLoopArg::LFcounter) ? loopCounter : 0;
|
|
|
+ unsigned loopAgain = (flags & IHThorLoopArg::LFnewloopagain) ? helper->loopAgainResult() : 0;
|
|
|
+ ownedResults.setown(queryGraph().createThorGraphResults(3));
|
|
|
+ // ensures remote results are available, via owning activity (i.e. this loop act)
|
|
|
+ // so that when aggregate result is fetched from the master, it will retrieve from the act, not the (already cleaned) graph localresults
|
|
|
+ ownedResults->setOwner(container.queryId());
|
|
|
+
|
|
|
+ boundGraph->prepareLoopResults(*this, ownedResults);
|
|
|
+ if (condLoopCounter) // cannot be 0
|
|
|
+ boundGraph->prepareCounterResult(*this, ownedResults, condLoopCounter, 2);
|
|
|
+ if (loopAgain) // cannot be 0
|
|
|
+ boundGraph->prepareLoopAgainResult(*this, ownedResults, loopAgain);
|
|
|
+
|
|
|
+ // ensure results prepared before graph begins
|
|
|
if (syncIterations)
|
|
|
{
|
|
|
// 0 signals this slave has finished, but don't stop until all have
|
|
@@ -380,21 +395,6 @@ public:
|
|
|
}
|
|
|
|
|
|
loopPending->flush();
|
|
|
-
|
|
|
- IThorBoundLoopGraph *boundGraph = queryContainer().queryLoopGraph();
|
|
|
- unsigned condLoopCounter = (flags & IHThorLoopArg::LFcounter) ? loopCounter:0;
|
|
|
- unsigned loopAgain = (flags & IHThorLoopArg::LFnewloopagain) ? helper->loopAgainResult() : 0;
|
|
|
- ownedResults.setown(queryGraph().createThorGraphResults(3));
|
|
|
- // ensures remote results are available, via owning activity (i.e. this loop act)
|
|
|
- // so that when aggregate result is fetched from the master, it will retrieve from the act, not the (already cleaned) graph localresults
|
|
|
- ownedResults->setOwner(container.queryId());
|
|
|
-
|
|
|
- boundGraph->prepareLoopResults(*this, ownedResults);
|
|
|
- if (condLoopCounter) // cannot be 0
|
|
|
- boundGraph->prepareCounterResult(*this, ownedResults, condLoopCounter, 2);
|
|
|
- if (loopAgain) // cannot be 0
|
|
|
- boundGraph->prepareLoopAgainResult(*this, ownedResults, loopAgain);
|
|
|
-
|
|
|
boundGraph->execute(*this, condLoopCounter, ownedResults, loopPending.getClear(), loopPendingCount, extractBuilder.size(), extractBuilder.getbytes());
|
|
|
|
|
|
Owned<IThorResult> result0 = ownedResults->getResult(0);
|
|
@@ -402,11 +402,6 @@ public:
|
|
|
|
|
|
if (flags & IHThorLoopArg::LFnewloopagain)
|
|
|
{
|
|
|
- if (barrier)
|
|
|
- {
|
|
|
- if (!barrier->wait(false))
|
|
|
- return NULL; // aborted
|
|
|
- }
|
|
|
Owned<IThorResult> loopAgainResult = ownedResults->getResult(helper->loopAgainResult(), !queryGraph().isLocalChild());
|
|
|
assertex(loopAgainResult);
|
|
|
Owned<IRowStream> loopAgainRows = loopAgainResult->getRowStream();
|
|
@@ -415,6 +410,11 @@ public:
|
|
|
//Result is a row which contains a single boolean field.
|
|
|
if (!((const bool *)row.get())[0])
|
|
|
finishedLooping = true; // NB: will finish when loopPending has been consumed
|
|
|
+ if (barrier) // barrier passed once loopAgain result used
|
|
|
+ {
|
|
|
+ if (!barrier->wait(false))
|
|
|
+ return NULL; // aborted
|
|
|
+ }
|
|
|
}
|
|
|
loopPending.setown(createOverflowableBuffer(*this, this, ers_forbidden, true));
|
|
|
loopPendingCount = 0;
|