|
@@ -147,7 +147,6 @@ public:
|
|
|
class CLoopActivityMaster : public CLoopActivityMasterBase
|
|
|
{
|
|
|
IHThorLoopArg *helper;
|
|
|
- IThorBoundLoopGraph *boundGraph;
|
|
|
unsigned flags;
|
|
|
Owned<IBarrier> barrier;
|
|
|
|
|
@@ -191,7 +190,6 @@ public:
|
|
|
void init()
|
|
|
{
|
|
|
helper = (IHThorLoopArg *) queryHelper();
|
|
|
- boundGraph = queryContainer().queryLoopGraph();
|
|
|
flags = helper->getFlags();
|
|
|
if (TAKloopdataset == container.getKind())
|
|
|
assertex(flags & IHThorLoopArg::LFnewloopagain);
|
|
@@ -201,25 +199,6 @@ public:
|
|
|
if (container.queryOwner().isGlobal())
|
|
|
global = true;
|
|
|
}
|
|
|
- if (!global)
|
|
|
- return;
|
|
|
- initLoopResults(1);
|
|
|
- }
|
|
|
- void initLoopResults(unsigned loopCounter)
|
|
|
- {
|
|
|
- unsigned doLoopCounter = (flags & IHThorLoopArg::LFcounter) ? loopCounter : 0;
|
|
|
- unsigned doLoopAgain = (flags & IHThorLoopArg::LFnewloopagain) ? helper->loopAgainResult() : 0;
|
|
|
- ownedResults.setown(queryGraph().createThorGraphResults(3)); // will not be cleared until next sync
|
|
|
- // ensures remote results are available, via owning activity (i.e. this loop act)
|
|
|
- // so that when the master/slave result parts are fetched, it will retreive from the act, not the (already cleaed) graph localresults
|
|
|
- ownedResults->setOwner(container.queryId());
|
|
|
-
|
|
|
- boundGraph->prepareLoopResults(*this, ownedResults);
|
|
|
- if (doLoopCounter) // cannot be 0
|
|
|
- boundGraph->prepareCounterResult(*this, ownedResults, loopCounter, 2);
|
|
|
- if (doLoopAgain) // cannot be 0
|
|
|
- boundGraph->prepareLoopAgainResult(*this, ownedResults, helper->loopAgainResult());
|
|
|
-
|
|
|
}
|
|
|
void process()
|
|
|
{
|
|
@@ -235,9 +214,23 @@ public:
|
|
|
{
|
|
|
if (sync(loopCounter))
|
|
|
break;
|
|
|
- if (loopCounter > 1)
|
|
|
- initLoopResults(loopCounter);
|
|
|
- boundGraph->execute(*this, (flags & IHThorLoopArg::LFcounter)?loopCounter:0, ownedResults, (IRowWriterMultiReader *)NULL, 0, extractBuilder.size(), extractBuilder.getbytes());
|
|
|
+
|
|
|
+ // NB: This is exactly the same as the slave implementation up until the execute().
|
|
|
+ IThorBoundLoopGraph *boundGraph = queryContainer().queryLoopGraph();
|
|
|
+ unsigned condLoopCounter = (flags & IHThorLoopArg::LFcounter) ? loopCounter : 0;
|
|
|
+ unsigned loopAgain = (flags & IHThorLoopArg::LFnewloopagain) ? helper->loopAgainResult() : 0;
|
|
|
+ ownedResults.setown(queryGraph().createThorGraphResults(3)); // will not be cleared until next sync
|
|
|
+ // ensures remote results are available, via owning activity (i.e. this loop act)
|
|
|
+ // so that when the master/slave result parts are fetched, it will retreive from the act, not the (already cleaed) graph localresults
|
|
|
+ ownedResults->setOwner(container.queryId());
|
|
|
+
|
|
|
+ boundGraph->prepareLoopResults(*this, ownedResults);
|
|
|
+ if (condLoopCounter)
|
|
|
+ boundGraph->prepareCounterResult(*this, ownedResults, condLoopCounter, 2);
|
|
|
+ if (loopAgain) // cannot be 0
|
|
|
+ boundGraph->prepareLoopAgainResult(*this, ownedResults, loopAgain);
|
|
|
+
|
|
|
+ boundGraph->execute(*this, condLoopCounter, ownedResults, (IRowWriterMultiReader *)NULL, 0, extractBuilder.size(), extractBuilder.getbytes());
|
|
|
++loopCounter;
|
|
|
if (flags & IHThorLoopArg::LFnewloopagain)
|
|
|
{
|