|
@@ -222,6 +222,7 @@ class CLoopSlaveActivity : public CLoopSlaveActivityBase
|
|
|
unsigned flags, lastMs;
|
|
|
IHThorLoopArg *helper;
|
|
|
bool eof, finishedLooping;
|
|
|
+ Owned<IBarrier> barrier;
|
|
|
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
@@ -239,6 +240,8 @@ public:
|
|
|
if (container.queryOwner().isGlobal())
|
|
|
global = true;
|
|
|
}
|
|
|
+ if (!container.queryLocalOrGrouped())
|
|
|
+ barrier.setown(container.queryJob().createBarrier(mpTag));
|
|
|
}
|
|
|
virtual void kill()
|
|
|
{
|
|
@@ -246,6 +249,12 @@ public:
|
|
|
loopPending.clear();
|
|
|
curInput.clear();
|
|
|
}
|
|
|
+ virtual void abort()
|
|
|
+ {
|
|
|
+ CLoopSlaveActivityBase::abort();
|
|
|
+ if (barrier)
|
|
|
+ barrier->cancel();
|
|
|
+ }
|
|
|
// IThorDataLink
|
|
|
virtual void start()
|
|
|
{
|
|
@@ -364,7 +373,7 @@ public:
|
|
|
unsigned doLoopAgain = (flags & IHThorLoopArg::LFnewloopagain) ? helper->loopAgainResult() : 0;
|
|
|
ownedResults.setown(queryGraph().createThorGraphResults(3));
|
|
|
// ensures remote results are available, via owning activity (i.e. this loop act)
|
|
|
- // so that when aggreagate result is fetched from the master, it will retreive from the act, not the (already cleaned) graph localresults
|
|
|
+ // so that when aggregate result is fetched from the master, it will retrieve from the act, not the (already cleaned) graph localresults
|
|
|
ownedResults->setOwner(container.queryId());
|
|
|
|
|
|
boundGraph->prepareLoopResults(*this, ownedResults);
|
|
@@ -380,6 +389,11 @@ public:
|
|
|
|
|
|
if (flags & IHThorLoopArg::LFnewloopagain)
|
|
|
{
|
|
|
+ if (!container.queryLocalOrGrouped())
|
|
|
+ {
|
|
|
+ if (!barrier->wait(false))
|
|
|
+ return NULL; // aborted
|
|
|
+ }
|
|
|
Owned<IThorResult> loopAgainResult = ownedResults->getResult(helper->loopAgainResult(), !queryGraph().isLocalChild());
|
|
|
assertex(loopAgainResult);
|
|
|
Owned<IRowStream> loopAgainRows = loopAgainResult->getRowStream();
|