|
@@ -946,7 +946,9 @@ public:
|
|
|
Owned<IBitSet> slavesStopping = createBitSet();
|
|
|
slavesDone->testSet(broadcastSlave-1, true);
|
|
|
slavesStopping->testSet(broadcastSlave-1, true);
|
|
|
-
|
|
|
+ // loop, requesting data from all other slaves (in chunks)
|
|
|
+ // track slaves which have finished sending in slavesDone (signalled via 0 len. packet)
|
|
|
+ // NB: collates from slaves serially, probably should be in parallel
|
|
|
MemoryBuffer tmp;
|
|
|
CMessageBuffer msg;
|
|
|
bool allDone = false;
|
|
@@ -958,7 +960,7 @@ public:
|
|
|
if (fromNode == broadcastSlave)
|
|
|
++fromNode;
|
|
|
|
|
|
- {
|
|
|
+ { // request more
|
|
|
BooleanOnOff onOff(receiving);
|
|
|
msg.append(allRequestStop);
|
|
|
container.queryJob().queryJobComm().sendRecv(msg, fromNode, mpTag);
|
|
@@ -966,16 +968,16 @@ public:
|
|
|
#ifdef _TRACEBROADCAST
|
|
|
ActPrintLog("getRHS got %d from %d",msg.length(),(int)fromNode);
|
|
|
#endif
|
|
|
- bool slaveStopRequest;
|
|
|
+ bool slaveStopRequest; // only true if slave stopping without needing RHS
|
|
|
msg.read(slaveStopRequest);
|
|
|
- // can only stop if all are
|
|
|
slavesStopping->testSet(fromNode-1, slaveStopRequest);
|
|
|
if (stopping)
|
|
|
{
|
|
|
+ // can only stop if I'm stopping and all other slaves are
|
|
|
allRequestStop = slavesStopping->scan(0, false) == container.queryJob().querySlaves();
|
|
|
- // send next time around
|
|
|
+ // if true, next request will tell slaves to stop sending
|
|
|
}
|
|
|
- if (0 == msg.remaining())
|
|
|
+ if (0 == msg.remaining()) // slave signalled no more data
|
|
|
{
|
|
|
msg.clear();
|
|
|
bool done = slavesDone->testSet(fromNode-1, true);
|
|
@@ -987,7 +989,10 @@ public:
|
|
|
else
|
|
|
{
|
|
|
if (allRequestStop)
|
|
|
+ {
|
|
|
+ // only here, if all stopping, 1st packet from slave and signalled stopping
|
|
|
msg.clear(); // no longer wanted
|
|
|
+ }
|
|
|
else
|
|
|
{
|
|
|
ThorExpand(msg, tmp);
|
|
@@ -1004,7 +1009,7 @@ public:
|
|
|
}
|
|
|
if (!allRequestStop)
|
|
|
{
|
|
|
- // now all (global) rows in this (broadcast node) rhs HT.
|
|
|
+ // now all (global) RHS rows on this (broadcast) node
|
|
|
CMemoryRowSerializer mbs(tmp.clear());
|
|
|
allDone = false;
|
|
|
unsigned r=0;
|
|
@@ -1068,7 +1073,7 @@ public:
|
|
|
}
|
|
|
}
|
|
|
else if (!stopping)
|
|
|
- {
|
|
|
+ { // single node or local
|
|
|
gatherLocal();
|
|
|
prepareRHS();
|
|
|
}
|