|
@@ -38,32 +38,6 @@ protected:
|
|
|
StringAttr resultName;
|
|
|
unsigned resultSeq;
|
|
|
|
|
|
-public:
|
|
|
- CWorkUnitWriteMasterBase(CMasterGraphElement * info) : CMasterActivity(info)
|
|
|
- {
|
|
|
- numResults = 0;
|
|
|
- totalSize = 0;
|
|
|
- resultSeq = 0;
|
|
|
- appendOutput = false;
|
|
|
- flushThreshold = -1;
|
|
|
- workunitWriteLimit = 0;
|
|
|
- mpTag = container.queryJob().allocateMPTag(); // used by local too
|
|
|
- activityMaxSize = 0;
|
|
|
- }
|
|
|
- virtual void init()
|
|
|
- {
|
|
|
- CMasterActivity::init();
|
|
|
- // In absense of OPT_OUTPUTLIMIT check pre 5.2 legacy name OPT_OUTPUTLIMIT_LEGACY
|
|
|
- workunitWriteLimit = activityMaxSize ? activityMaxSize : getOptInt(OPT_OUTPUTLIMIT, getOptInt(OPT_OUTPUTLIMIT_LEGACY, defaultDaliResultLimit));
|
|
|
- if (workunitWriteLimit>defaultDaliResultOutputMax)
|
|
|
- throw MakeActivityException(this, 0, "Configured max result size, %d MB, exceeds absolute max limit of %d MB. A huge Dali result usually indicates the ECL needs altering.", workunitWriteLimit, defaultDaliResultOutputMax);
|
|
|
- assertex(workunitWriteLimit<=0x1000); // 32bit limit because MemoryBuffer/CMessageBuffers involved etc.
|
|
|
- workunitWriteLimit *= 0x100000;
|
|
|
- }
|
|
|
- virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
|
|
|
- {
|
|
|
- dst.append((int)mpTag);
|
|
|
- }
|
|
|
void addResult(rowcount_t resultCount, MemoryBuffer &resultData, bool complete)
|
|
|
{
|
|
|
Owned<IWorkUnit> wu = &container.queryJob().queryWorkUnit().lock();
|
|
@@ -89,7 +63,33 @@ public:
|
|
|
ActPrintLog("result flushed");
|
|
|
}
|
|
|
}
|
|
|
- virtual void abort()
|
|
|
+public:
|
|
|
+ CWorkUnitWriteMasterBase(CMasterGraphElement * info) : CMasterActivity(info)
|
|
|
+ {
|
|
|
+ numResults = 0;
|
|
|
+ totalSize = 0;
|
|
|
+ resultSeq = 0;
|
|
|
+ appendOutput = false;
|
|
|
+ flushThreshold = getOptInt(THOROPT_OUTPUT_FLUSH_THRESHOLD, -1);
|
|
|
+ workunitWriteLimit = 0;
|
|
|
+ mpTag = container.queryJob().allocateMPTag(); // used by local too
|
|
|
+ activityMaxSize = 0;
|
|
|
+ }
|
|
|
+ virtual void init() override
|
|
|
+ {
|
|
|
+ CMasterActivity::init();
|
|
|
+ // In absense of OPT_OUTPUTLIMIT check pre 5.2 legacy name OPT_OUTPUTLIMIT_LEGACY
|
|
|
+ workunitWriteLimit = activityMaxSize ? activityMaxSize : getOptInt(OPT_OUTPUTLIMIT, getOptInt(OPT_OUTPUTLIMIT_LEGACY, defaultDaliResultLimit));
|
|
|
+ if (workunitWriteLimit>defaultDaliResultOutputMax)
|
|
|
+ throw MakeActivityException(this, 0, "Configured max result size, %d MB, exceeds absolute max limit of %d MB. A huge Dali result usually indicates the ECL needs altering.", workunitWriteLimit, defaultDaliResultOutputMax);
|
|
|
+ assertex(workunitWriteLimit<=0x1000); // 32bit limit because MemoryBuffer/CMessageBuffers involved etc.
|
|
|
+ workunitWriteLimit *= 0x100000;
|
|
|
+ }
|
|
|
+ virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave) override
|
|
|
+ {
|
|
|
+ dst.append((int)mpTag);
|
|
|
+ }
|
|
|
+ virtual void abort() override
|
|
|
{
|
|
|
CMasterActivity::abort();
|
|
|
cancelReceiveMsg(RANK_ALL, mpTag);
|
|
@@ -102,7 +102,7 @@ public:
|
|
|
CWorkUnitWriteGlobalMasterBase(CMasterGraphElement * info) : CWorkUnitWriteMasterBase(info)
|
|
|
{
|
|
|
}
|
|
|
- void process()
|
|
|
+ virtual void process() override
|
|
|
{
|
|
|
CWorkUnitWriteMasterBase::process();
|
|
|
|
|
@@ -152,13 +152,12 @@ public:
|
|
|
{
|
|
|
helper = (IHThorWorkUnitWriteArg *)queryHelper();
|
|
|
appendOutput = 0 != (POFextend & helper->getFlags());
|
|
|
- flushThreshold = getOptInt(THOROPT_OUTPUT_FLUSH_THRESHOLD, -1);
|
|
|
resultName.set(helper->queryName());
|
|
|
resultSeq = helper->getSequence();
|
|
|
if (POFmaxsize & helper->getFlags())
|
|
|
activityMaxSize = helper->getMaxSize();
|
|
|
}
|
|
|
- virtual void init()
|
|
|
+ virtual void init() override
|
|
|
{
|
|
|
CWorkUnitWriteGlobalMasterBase::init();
|
|
|
if (appendOutput)
|
|
@@ -237,7 +236,6 @@ public:
|
|
|
{
|
|
|
helper = (IHThorWorkUnitWriteArg *)queryHelper();
|
|
|
appendOutput = 0 != (POFextend & helper->getFlags());
|
|
|
- flushThreshold = globals->getPropInt("@output_flush_threshold", -1);
|
|
|
resultName.set(helper->queryName());
|
|
|
resultSeq = helper->getSequence();
|
|
|
sent = 0;
|
|
@@ -252,21 +250,21 @@ public:
|
|
|
replyMsg.swapWith(msg);
|
|
|
unsigned numGot;
|
|
|
msg.read(numGot);
|
|
|
+ dbgassertex(numGot); // slave never sends 0
|
|
|
unsigned l=msg.remaining();
|
|
|
if (workunitWriteLimit && totalSize+resultData.length()+l > workunitWriteLimit)
|
|
|
throw MakeThorException(TE_WorkUnitWriteLimitExceeded, "Dataset too large to output to workunit (limit %d megabytes)", workunitWriteLimit/0x100000);
|
|
|
resultData.append(l, msg.readDirect(l));
|
|
|
numResults += numGot;
|
|
|
|
|
|
- // NB if 0 == numGot - then final packet from sender
|
|
|
- if (0 == numGot || (-1 != flushThreshold && resultData.length() >= (unsigned)flushThreshold))
|
|
|
+ if (-1 != flushThreshold && resultData.length() >= (unsigned)flushThreshold)
|
|
|
{
|
|
|
totalSize += resultData.length();
|
|
|
flushResults(0 == numGot);
|
|
|
}
|
|
|
queryJobChannel().queryJobComm().reply(replyMsg); // ack
|
|
|
}
|
|
|
- virtual void handleSlaveMessage(CMessageBuffer &msg)
|
|
|
+ virtual void handleSlaveMessage(CMessageBuffer &msg) override
|
|
|
{
|
|
|
++sent;
|
|
|
rank_t sender = container.queryJob().queryJobGroup().rank(msg.getSender());
|
|
@@ -276,11 +274,20 @@ public:
|
|
|
msg.clear();
|
|
|
queryJobChannel().queryJobComm().reply(msg);
|
|
|
}
|
|
|
+ virtual void done() override
|
|
|
+ {
|
|
|
+ // NB: This is called when the parent graph is complete
|
|
|
+
|
|
|
+ CWorkUnitWriteMasterBase::done();
|
|
|
+
|
|
|
+ // published any unpublished result or force if 0 results
|
|
|
+ flushResults(0 == numResults);
|
|
|
+ }
|
|
|
};
|
|
|
|
|
|
CActivityBase *createWorkUnitWriteActivityMaster(CMasterGraphElement *container)
|
|
|
{
|
|
|
- if (container->queryLocalOrGrouped())
|
|
|
+ if (container->queryOwner().isLocalChild())
|
|
|
return new CWorkUnitWriteLocalActivityMaster(container);
|
|
|
else
|
|
|
return new CWorkUnitWriteActivityMaster(container);
|
|
@@ -298,7 +305,7 @@ public:
|
|
|
resultName.set(helper->queryName());
|
|
|
resultSeq = helper->getSequence();
|
|
|
}
|
|
|
- virtual void flushResults(bool complete=false)
|
|
|
+ virtual void flushResults(bool complete=false) override
|
|
|
{
|
|
|
assertex(complete);
|
|
|
ActPrintLog("dictionary result");
|