|
@@ -462,6 +462,8 @@ public:
|
|
|
}
|
|
|
virtual void stop()
|
|
|
{
|
|
|
+ if (!gotRHS)
|
|
|
+ getRHS(true);
|
|
|
rhs.reset(false);
|
|
|
stopRightInput();
|
|
|
stopInput(left);
|
|
@@ -594,10 +596,7 @@ public:
|
|
|
{
|
|
|
ActivityTimer t(totalCycles, timeActivities, NULL);
|
|
|
if (!gotRHS)
|
|
|
- {
|
|
|
- getRHS();
|
|
|
- gotRHS = true;
|
|
|
- }
|
|
|
+ getRHS(false);
|
|
|
if (!abortSoon && !eos)
|
|
|
{
|
|
|
if (doRightOuter)
|
|
@@ -838,13 +837,15 @@ public:
|
|
|
info.unknownRowsOutput = true;
|
|
|
info.canStall = true;
|
|
|
}
|
|
|
- void sendToBroadcastSlave()
|
|
|
+ bool sendToBroadcastSlave(bool stopping)
|
|
|
{
|
|
|
#ifdef _TRACEBROADCAST
|
|
|
ActPrintLog("%s: sendToBroadcastSlave sending to slave %d", joinStr.get(), broadcastSlave);
|
|
|
#endif
|
|
|
+ bool allRequestStop = false;
|
|
|
try
|
|
|
{
|
|
|
+ bool sentStop = false;
|
|
|
MemoryBuffer mb;
|
|
|
CMemoryRowSerializer mbs(mb);
|
|
|
|
|
@@ -860,19 +861,30 @@ public:
|
|
|
}
|
|
|
CMessageBuffer msg;
|
|
|
if (!receiveMsg(msg, broadcastSlave, mpTag))
|
|
|
- return;
|
|
|
- if (0 != mb.length())
|
|
|
+ return false;
|
|
|
+ msg.read(allRequestStop);
|
|
|
+ msg.clear();
|
|
|
+ if (!allRequestStop && 0 != mb.length())
|
|
|
{
|
|
|
- ThorCompress(mb.toByteArray(), mb.length(), msg.clear());
|
|
|
+ msg.append(stopping);
|
|
|
+ ThorCompress(mb.toByteArray(), mb.length(), msg);
|
|
|
#ifdef _TRACEBROADCAST
|
|
|
ActPrintLog("sendToBroadcastSlave Compressing buf from %d to %d",mb.length(),msg.length());
|
|
|
#endif
|
|
|
#ifdef _TRACEBROADCAST
|
|
|
ActPrintLog("sendToBroadcastSlave sending reply to %d on tag %d",(int)broadcastSlave,(int)mpTag);
|
|
|
+ if (stopping)
|
|
|
+ sentStop = true;
|
|
|
#endif
|
|
|
}
|
|
|
+ else
|
|
|
+ {
|
|
|
+ // prevent stop at this point unless have already sent stop,
|
|
|
+ // to prevent allRequestStop at broadcaster, before this slave knows about it.
|
|
|
+ msg.append(sentStop && stopping);
|
|
|
+ }
|
|
|
if (!container.queryJob().queryJobComm().reply(msg))
|
|
|
- return;
|
|
|
+ return false;
|
|
|
if (0 == mb.length())
|
|
|
break;
|
|
|
mb.clear();
|
|
@@ -886,6 +898,7 @@ public:
|
|
|
ActPrintLog(e, "CLookupJoinActivity::sendToBroadcastSlave: exception");
|
|
|
throw;
|
|
|
}
|
|
|
+ return !allRequestStop;
|
|
|
}
|
|
|
void processRows(MemoryBuffer &mb)
|
|
|
{
|
|
@@ -913,20 +926,29 @@ public:
|
|
|
stopRightInput();
|
|
|
#endif
|
|
|
}
|
|
|
- void getRHS()
|
|
|
+ void getRHS(bool stopping)
|
|
|
{
|
|
|
+ if (gotRHS)
|
|
|
+ return;
|
|
|
+ gotRHS = true;
|
|
|
Owned<IException> exception;
|
|
|
try
|
|
|
{
|
|
|
- gatherLocal();
|
|
|
if (!container.queryLocal() && container.queryJob().querySlaves() > 1)
|
|
|
{
|
|
|
+ bool allRequestStop = false;
|
|
|
+ gatherLocal();
|
|
|
broadcaster.init(container.queryJob().queryMyRank(), container.queryJob().querySlaves(), &container.queryJob().queryJobComm(), mpTag, broadcastSlave);
|
|
|
if (container.queryJob().queryMyRank()==broadcastSlave)
|
|
|
{
|
|
|
unsigned fromNode = 1;
|
|
|
Owned<IBitSet> slavesDone = createBitSet();
|
|
|
-
|
|
|
+ Owned<IBitSet> slavesStopping = createBitSet();
|
|
|
+ slavesDone->testSet(broadcastSlave-1, true);
|
|
|
+ slavesStopping->testSet(broadcastSlave-1, true);
|
|
|
+ // loop, requesting data from all other slaves (in chunks)
|
|
|
+ // track slaves which have finished sending in slavesDone (signalled via 0 len. packet)
|
|
|
+ // NB: collates from slaves serially, probably should be in parallel
|
|
|
MemoryBuffer tmp;
|
|
|
CMessageBuffer msg;
|
|
|
bool allDone = false;
|
|
@@ -936,19 +958,28 @@ public:
|
|
|
ActPrintLog("getRHS Receiving");
|
|
|
#endif
|
|
|
if (fromNode == broadcastSlave)
|
|
|
- {
|
|
|
- slavesDone->testSet(broadcastSlave-1, true);
|
|
|
++fromNode;
|
|
|
- }
|
|
|
- {
|
|
|
+
|
|
|
+ { // request more
|
|
|
BooleanOnOff onOff(receiving);
|
|
|
+ msg.append(allRequestStop);
|
|
|
container.queryJob().queryJobComm().sendRecv(msg, fromNode, mpTag);
|
|
|
}
|
|
|
#ifdef _TRACEBROADCAST
|
|
|
ActPrintLog("getRHS got %d from %d",msg.length(),(int)fromNode);
|
|
|
#endif
|
|
|
- if (0 == msg.length())
|
|
|
+ bool slaveStopRequest; // only true if slave stopping without needing RHS
|
|
|
+ msg.read(slaveStopRequest);
|
|
|
+ slavesStopping->testSet(fromNode-1, slaveStopRequest);
|
|
|
+ if (stopping)
|
|
|
+ {
|
|
|
+ // can only stop if I'm stopping and all other slaves are
|
|
|
+ allRequestStop = slavesStopping->scan(0, false) == container.queryJob().querySlaves();
|
|
|
+ // if true, next request will tell slaves to stop sending
|
|
|
+ }
|
|
|
+ if (0 == msg.remaining()) // slave signalled no more data
|
|
|
{
|
|
|
+ msg.clear();
|
|
|
bool done = slavesDone->testSet(fromNode-1, true);
|
|
|
assertex(false == done);
|
|
|
if (slavesDone->scan(0, false) == container.queryJob().querySlaves()) // i.e. got all
|
|
@@ -957,73 +988,95 @@ public:
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- ThorExpand(msg, tmp);
|
|
|
+ if (allRequestStop)
|
|
|
+ {
|
|
|
+ // only here, if all stopping, 1st packet from slave and signalled stopping
|
|
|
+ msg.clear(); // no longer wanted
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ ThorExpand(msg, tmp);
|
|
|
#ifdef _TRACEBROADCAST
|
|
|
- ActPrintLog("getRHS expanding.1 %d to %d",msg.length(), tmp.length());
|
|
|
+ ActPrintLog("getRHS expanding.1 %d to %d",msg.length(), tmp.length());
|
|
|
#endif
|
|
|
- msg.clear();
|
|
|
- processRows(tmp);
|
|
|
- tmp.clear();
|
|
|
+ msg.clear();
|
|
|
+ processRows(tmp);
|
|
|
+ tmp.clear();
|
|
|
+ }
|
|
|
}
|
|
|
if (allDone)
|
|
|
break;
|
|
|
}
|
|
|
-
|
|
|
- // now all (global) rows in this (broadcast node) rhs HT.
|
|
|
- CMemoryRowSerializer mbs(tmp.clear());
|
|
|
- allDone = false;
|
|
|
- unsigned r=0;
|
|
|
- while (!abortSoon)
|
|
|
+ if (!allRequestStop)
|
|
|
{
|
|
|
- loop
|
|
|
+ // now all (global) RHS rows on this (broadcast) node
|
|
|
+ CMemoryRowSerializer mbs(tmp.clear());
|
|
|
+ allDone = false;
|
|
|
+ unsigned r=0;
|
|
|
+ while (!abortSoon)
|
|
|
{
|
|
|
- if (r == rhs.ordinality())
|
|
|
+ loop
|
|
|
{
|
|
|
- allDone = true;
|
|
|
- break;
|
|
|
+ if (r == rhs.ordinality())
|
|
|
+ {
|
|
|
+ allDone = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ const byte *row = (const byte *)rhs.item(r++);
|
|
|
+ rightSerializer->serialize(mbs, row);
|
|
|
+ if (tmp.length() > 0x80000)
|
|
|
+ break;
|
|
|
}
|
|
|
- const byte *row = (const byte *)rhs.item(r++);
|
|
|
- rightSerializer->serialize(mbs, row);
|
|
|
- if (tmp.length() > 0x80000)
|
|
|
- break;
|
|
|
- }
|
|
|
- if (0 != tmp.length())
|
|
|
- {
|
|
|
- ThorCompress(tmp, msg);
|
|
|
+ if (0 != tmp.length())
|
|
|
+ {
|
|
|
+ ThorCompress(tmp, msg);
|
|
|
#ifdef _TRACEBROADCAST
|
|
|
- ActPrintLog("getRHS compress.1 %d to %d",tmp.length(), msg.length());
|
|
|
+ ActPrintLog("getRHS compress.1 %d to %d",tmp.length(), msg.length());
|
|
|
#endif
|
|
|
- tmp.clear();
|
|
|
+ tmp.clear();
|
|
|
|
|
|
- broadcaster.broadcast(msg);
|
|
|
- msg.clear();
|
|
|
+ broadcaster.broadcast(msg);
|
|
|
+ msg.clear();
|
|
|
+ }
|
|
|
+ if (allDone)
|
|
|
+ break;
|
|
|
}
|
|
|
- if (allDone)
|
|
|
- break;
|
|
|
}
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- sendToBroadcastSlave();
|
|
|
- rhs.clear();
|
|
|
- MemoryBuffer buf;
|
|
|
- MemoryBuffer expBuf;
|
|
|
- while (broadcaster.receive(buf))
|
|
|
+ if (!sendToBroadcastSlave(stopping))
|
|
|
+ allRequestStop = true;
|
|
|
+ else
|
|
|
{
|
|
|
- ThorExpand(buf, expBuf);
|
|
|
+ rhs.clear();
|
|
|
+ MemoryBuffer buf;
|
|
|
+ MemoryBuffer expBuf;
|
|
|
+ while (broadcaster.receive(buf))
|
|
|
+ {
|
|
|
+ ThorExpand(buf, expBuf);
|
|
|
#ifdef _TRACEBROADCAST
|
|
|
- ActPrintLog("Expanding received buf from %d to %d",buf.length(),expBuf.length());
|
|
|
+ ActPrintLog("Expanding received buf from %d to %d",buf.length(),expBuf.length());
|
|
|
#endif
|
|
|
- processRows(expBuf);
|
|
|
- expBuf.clear();
|
|
|
- broadcaster.broadcast(buf); // will swap buf
|
|
|
- buf.clear();
|
|
|
+ processRows(expBuf);
|
|
|
+ expBuf.clear();
|
|
|
+ broadcaster.broadcast(buf); // will swap buf
|
|
|
+ buf.clear();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- broadcaster.endBroadcast(); // send final
|
|
|
- broadcaster.clear();
|
|
|
+ if (!allRequestStop)
|
|
|
+ {
|
|
|
+ broadcaster.endBroadcast(); // send final
|
|
|
+ broadcaster.clear();
|
|
|
+ prepareRHS();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else if (!stopping)
|
|
|
+ { // single node or local
|
|
|
+ gatherLocal();
|
|
|
+ prepareRHS();
|
|
|
}
|
|
|
- prepareRHS();
|
|
|
}
|
|
|
catch (IOutOfMemException *e) { exception.setown(e); }
|
|
|
catch (IThorRowArrayException *e) { exception.setown(e); }
|