|
@@ -31,16 +31,18 @@ class CLoopActivityMasterBase : public CMasterActivity
|
|
|
{
|
|
|
protected:
|
|
|
rtlRowBuilder extractBuilder;
|
|
|
- CGraphBase *loopGraph;
|
|
|
- unsigned emptyIterations;
|
|
|
- unsigned maxEmptyLoopIterations;
|
|
|
+ CGraphBase *loopGraph = nullptr;
|
|
|
+ unsigned emptyIterations = 0;
|
|
|
+ unsigned maxEmptyLoopIterations = 1000;
|
|
|
Owned<CThorStats> loopCounterProgress;
|
|
|
- bool global;
|
|
|
+ bool syncIterations = false;
|
|
|
+ bool loopIsInGlobalGraph = false;
|
|
|
+ mptag_t syncMpTag = TAG_NULL;
|
|
|
|
|
|
bool sync(unsigned loopCounter)
|
|
|
{
|
|
|
unsigned loopEnds = 0;
|
|
|
- unsigned nodes = container.queryJob().querySlaves();
|
|
|
+ unsigned nodes = queryJob().querySlaves();
|
|
|
unsigned n = nodes;
|
|
|
bool allEmptyIterations = true;
|
|
|
CMessageBuffer msg;
|
|
@@ -49,7 +51,7 @@ protected:
|
|
|
for (;;)
|
|
|
{
|
|
|
rank_t sender;
|
|
|
- if (receiveMsg(msg, RANK_ALL, mpTag, &sender, SYNC_TIMEOUT))
|
|
|
+ if (receiveMsg(msg, RANK_ALL, syncMpTag, &sender, SYNC_TIMEOUT))
|
|
|
break;
|
|
|
if (abortSoon)
|
|
|
return true; // NB: returning true, denotes end of loop
|
|
@@ -79,7 +81,7 @@ protected:
|
|
|
msg.append(ok && !final); // This is to tell slave whether it should continue or not
|
|
|
n = nodes;
|
|
|
while (n--) // a barrier really
|
|
|
- queryJobChannel().queryJobComm().send(msg, n+1, mpTag, LONGTIMEOUT);
|
|
|
+ queryJobChannel().queryJobComm().send(msg, n+1, syncMpTag, LONGTIMEOUT);
|
|
|
|
|
|
if (!ok)
|
|
|
throw MakeActivityException(this, 0, "Executed LOOP with empty input and output %u times", emptyIterations);
|
|
@@ -90,11 +92,16 @@ public:
|
|
|
CLoopActivityMasterBase(CMasterGraphElement *info) : CMasterActivity(info)
|
|
|
{
|
|
|
loopCounterProgress.setown(new CThorStats(queryJob(), StNumIterations));
|
|
|
- if (!container.queryLocalOrGrouped())
|
|
|
- mpTag = container.queryJob().allocateMPTag();
|
|
|
- loopGraph = NULL;
|
|
|
+ maxEmptyLoopIterations = getOptUInt(THOROPT_LOOP_MAX_EMPTY, 1000);
|
|
|
+ loopIsInGlobalGraph = container.queryOwner().isGlobal();
|
|
|
+ loopGraph = nullptr;
|
|
|
+ }
|
|
|
+ ~CLoopActivityMasterBase()
|
|
|
+ {
|
|
|
+ if (TAG_NULL != syncMpTag)
|
|
|
+ queryJob().freeMPTag(syncMpTag);
|
|
|
}
|
|
|
- virtual bool fireException(IException *e)
|
|
|
+ virtual bool fireException(IException *e) override
|
|
|
{
|
|
|
EXCLOG(e, "Loop master passed exception, aborting loop graph(s)");
|
|
|
try
|
|
@@ -108,48 +115,47 @@ public:
|
|
|
}
|
|
|
return CMasterActivity::fireException(e);
|
|
|
}
|
|
|
- virtual void init()
|
|
|
+ virtual void init() override
|
|
|
{
|
|
|
CMasterActivity::init();
|
|
|
loopGraph = queryContainer().queryLoopGraph()->queryGraph();
|
|
|
- global = !loopGraph->isLocalOnly();
|
|
|
- if (container.queryLocalOrGrouped())
|
|
|
- return;
|
|
|
- maxEmptyLoopIterations = getOptUInt(THOROPT_LOOP_MAX_EMPTY, 1000);
|
|
|
+ syncIterations = !loopGraph->isLocalOnly();
|
|
|
+ if (loopIsInGlobalGraph)
|
|
|
+ syncMpTag = queryJob().allocateMPTag();
|
|
|
}
|
|
|
- virtual void process()
|
|
|
+ virtual void process() override
|
|
|
{
|
|
|
CMasterActivity::process();
|
|
|
emptyIterations = 0;
|
|
|
}
|
|
|
- virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
|
|
|
+ virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave) override
|
|
|
{
|
|
|
- if (!container.queryLocalOrGrouped())
|
|
|
- serializeMPtag(dst, mpTag);
|
|
|
+ if (loopIsInGlobalGraph)
|
|
|
+ serializeMPtag(dst, syncMpTag);
|
|
|
}
|
|
|
- virtual void slaveDone(size32_t slaveIdx, MemoryBuffer &mb)
|
|
|
+ virtual void slaveDone(size32_t slaveIdx, MemoryBuffer &mb) override
|
|
|
{
|
|
|
CMasterGraph *graph = (CMasterGraph *)loopGraph;
|
|
|
graph->handleSlaveDone(slaveIdx, mb);
|
|
|
}
|
|
|
- virtual void abort()
|
|
|
+ virtual void abort() override
|
|
|
{
|
|
|
CMasterActivity::abort();
|
|
|
- cancelReceiveMsg(RANK_ALL, mpTag);
|
|
|
+ if (loopIsInGlobalGraph)
|
|
|
+ cancelReceiveMsg(RANK_ALL, syncMpTag);
|
|
|
}
|
|
|
- virtual void deserializeStats(unsigned node, MemoryBuffer &mb)
|
|
|
+ virtual void deserializeStats(unsigned node, MemoryBuffer &mb) override
|
|
|
{
|
|
|
CMasterActivity::deserializeStats(node, mb);
|
|
|
unsigned loopCounter;
|
|
|
mb.read(loopCounter);
|
|
|
loopCounterProgress->set(node, loopCounter);
|
|
|
}
|
|
|
- virtual void getActivityStats(IStatisticGatherer & stats)
|
|
|
+ virtual void getActivityStats(IStatisticGatherer & stats) override
|
|
|
{
|
|
|
CMasterActivity::getActivityStats(stats);
|
|
|
loopCounterProgress->getStats(stats, false);
|
|
|
}
|
|
|
-
|
|
|
};
|
|
|
|
|
|
|
|
@@ -164,16 +170,16 @@ class CLoopActivityMaster : public CLoopActivityMasterBase
|
|
|
// similar to sync, but continiously listens for messages from slaves
|
|
|
// slave only sends if above threashold, or if was at threshold and non empty
|
|
|
// this routine is here to spot when all are whirling around processing nothing for > threshold
|
|
|
- Owned<IBitSet> emptyIterations = createThreadSafeBitSet();
|
|
|
+ Owned<IBitSet> emptyIterationSlaves = createThreadSafeBitSet();
|
|
|
unsigned loopEnds = 0;
|
|
|
- unsigned nodes = container.queryJob().querySlaves();
|
|
|
+ unsigned nodes = queryJob().querySlaves();
|
|
|
unsigned n = nodes;
|
|
|
bool allEmptyIterations = true;
|
|
|
CMessageBuffer msg;
|
|
|
for (;;)
|
|
|
{
|
|
|
rank_t sender;
|
|
|
- if (!receiveMsg(msg, RANK_ALL, mpTag, &sender, LONGTIMEOUT))
|
|
|
+ if (!receiveMsg(msg, RANK_ALL, syncMpTag, &sender, LONGTIMEOUT))
|
|
|
return;
|
|
|
unsigned slaveLoopCounterReq, slaveEmptyIterations;
|
|
|
msg.read(slaveLoopCounterReq);
|
|
@@ -185,37 +191,39 @@ class CLoopActivityMaster : public CLoopActivityMasterBase
|
|
|
break; // all done
|
|
|
}
|
|
|
bool overLimit = slaveEmptyIterations > maxEmptyLoopIterations;
|
|
|
- emptyIterations->set(sender-1, overLimit);
|
|
|
- if (emptyIterations->scan(0, 0) >= nodes) // all empty
|
|
|
+ emptyIterationSlaves->set(sender-1, overLimit);
|
|
|
+ if (emptyIterationSlaves->scan(0, 0) >= nodes) // all empty
|
|
|
throw MakeActivityException(this, 0, "Executed LOOP with empty input and output > %d maxEmptyLoopIterations times on all nodes", maxEmptyLoopIterations);
|
|
|
}
|
|
|
}
|
|
|
public:
|
|
|
CLoopActivityMaster(CMasterGraphElement *info) : CLoopActivityMasterBase(info)
|
|
|
{
|
|
|
- if (!container.queryLocalOrGrouped())
|
|
|
- barrier.setown(container.queryJobChannel().createBarrier(mpTag));
|
|
|
}
|
|
|
- virtual void init()
|
|
|
+ virtual void init() override
|
|
|
{
|
|
|
CLoopActivityMasterBase::init();
|
|
|
helper = (IHThorLoopArg *) queryHelper();
|
|
|
flags = helper->getFlags();
|
|
|
if (TAKloopdataset == container.getKind())
|
|
|
assertex(flags & IHThorLoopArg::LFnewloopagain);
|
|
|
- if (!global && (flags & IHThorLoopArg::LFnewloopagain))
|
|
|
+ if (flags & IHThorLoopArg::LFnewloopagain)
|
|
|
{
|
|
|
- if (container.queryOwner().isGlobal())
|
|
|
- global = true;
|
|
|
+ if (loopIsInGlobalGraph)
|
|
|
+ {
|
|
|
+ mpTag = queryJob().allocateMPTag();
|
|
|
+ barrier.setown(queryJobChannel().createBarrier(mpTag));
|
|
|
+ syncIterations = true;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- virtual void process()
|
|
|
+ virtual void process() override
|
|
|
{
|
|
|
CLoopActivityMasterBase::process();
|
|
|
- if (container.queryLocalOrGrouped())
|
|
|
+ if (!loopIsInGlobalGraph)
|
|
|
return;
|
|
|
|
|
|
- if (global)
|
|
|
+ if (syncIterations)
|
|
|
{
|
|
|
helper->createParentExtract(extractBuilder);
|
|
|
unsigned loopCounter = 1;
|
|
@@ -241,7 +249,7 @@ public:
|
|
|
|
|
|
boundGraph->execute(*this, condLoopCounter, ownedResults, (IRowWriterMultiReader *)NULL, 0, extractBuilder.size(), extractBuilder.getbytes());
|
|
|
++loopCounter;
|
|
|
- if (flags & IHThorLoopArg::LFnewloopagain)
|
|
|
+ if (barrier)
|
|
|
{
|
|
|
if (!barrier->wait(false))
|
|
|
break;
|
|
@@ -251,7 +259,13 @@ public:
|
|
|
else
|
|
|
checkEmpty();
|
|
|
}
|
|
|
- virtual void abort()
|
|
|
+ virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave) override
|
|
|
+ {
|
|
|
+ CLoopActivityMasterBase::serializeSlaveData(dst, slave);
|
|
|
+ if (barrier)
|
|
|
+ serializeMPtag(dst, mpTag);
|
|
|
+ }
|
|
|
+ virtual void abort() override
|
|
|
{
|
|
|
CLoopActivityMasterBase::abort();
|
|
|
if (barrier)
|
|
@@ -272,10 +286,10 @@ public:
|
|
|
CGraphLoopActivityMaster(CMasterGraphElement *info) : CLoopActivityMasterBase(info)
|
|
|
{
|
|
|
}
|
|
|
- virtual void init()
|
|
|
+ virtual void init() override
|
|
|
{
|
|
|
CLoopActivityMasterBase::init();
|
|
|
- if (!global)
|
|
|
+ if (!syncIterations)
|
|
|
return;
|
|
|
IHThorGraphLoopArg *helper = (IHThorGraphLoopArg *) queryHelper();
|
|
|
Owned<IThorGraphResults> results = queryGraph().createThorGraphResults(1);
|
|
@@ -283,16 +297,13 @@ public:
|
|
|
queryContainer().queryLoopGraph()->prepareCounterResult(*this, results, 1, 0);
|
|
|
loopGraph->setResults(results);
|
|
|
}
|
|
|
- virtual void process()
|
|
|
+ virtual void process() override
|
|
|
{
|
|
|
CLoopActivityMasterBase::process();
|
|
|
- if (container.queryLocalOrGrouped())
|
|
|
+ if (!loopIsInGlobalGraph)
|
|
|
return;
|
|
|
|
|
|
IHThorGraphLoopArg *helper = (IHThorGraphLoopArg *) queryHelper();
|
|
|
- bool global = !loopGraph->isLocalOnly();
|
|
|
- if (!global)
|
|
|
- return;
|
|
|
unsigned maxIterations = helper->numIterations();
|
|
|
if ((int)maxIterations < 0) maxIterations = 0;
|
|
|
Owned<IThorGraphResults> loopResults = queryGraph().createThorGraphResults(maxIterations);
|
|
@@ -327,13 +338,13 @@ public:
|
|
|
CLocalResultActivityMasterBase(CMasterGraphElement *info) : CMasterActivity(info)
|
|
|
{
|
|
|
}
|
|
|
- virtual void init()
|
|
|
+ virtual void init() override
|
|
|
{
|
|
|
CMasterActivity::init();
|
|
|
reset();
|
|
|
}
|
|
|
virtual void createResult() = 0;
|
|
|
- virtual void process()
|
|
|
+ virtual void process() override
|
|
|
{
|
|
|
IHThorLocalResultWriteArg *helper = (IHThorLocalResultWriteArg *)queryHelper();
|
|
|
inputRowIf.setown(createRowInterfaces(container.queryInput(0)->queryHelper()->queryOutputMeta()));
|
|
@@ -386,7 +397,7 @@ public:
|
|
|
CDictionaryResultActivityMaster(CMasterGraphElement *info) : CLocalResultActivityMasterBase(info)
|
|
|
{
|
|
|
}
|
|
|
- virtual void createResult()
|
|
|
+ virtual void createResult() override
|
|
|
{
|
|
|
IHThorDictionaryResultWriteArg *helper = (IHThorDictionaryResultWriteArg *)queryHelper();
|
|
|
CGraphBase *graph = container.queryResultsGraph();
|