|
@@ -84,6 +84,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
|
|
|
bool writeBlocked = false, pagedOut = false;
|
|
|
CriticalSection connectLock, prepareInputLock, writeAheadCrit;
|
|
|
PointerArrayOf<Semaphore> stalledWriters;
|
|
|
+ UnsignedArray stalledWriterIdxs;
|
|
|
unsigned stoppedOutputs = 0;
|
|
|
unsigned activeOutputs = 0;
|
|
|
rowcount_t recsReady = 0;
|
|
@@ -155,6 +156,7 @@ public:
|
|
|
recsReady = 0;
|
|
|
writeBlocked = false;
|
|
|
stalledWriters.kill();
|
|
|
+ stalledWriterIdxs.kill();
|
|
|
ForEachItemIn(o, outputs)
|
|
|
{
|
|
|
CSplitterOutput *output = (CSplitterOutput *)outputs.item(o);
|
|
@@ -240,8 +242,20 @@ public:
|
|
|
else if (writeBlocked) // NB: only used by 'balanced' splitter, which blocks write when too far ahead
|
|
|
{
|
|
|
stalledWriters.append(&writeBlockSem);
|
|
|
- CriticalUnblock ub(writeAheadCrit);
|
|
|
- writeBlockSem.wait(); // when active writer unblocks, signals all stalledWriters
|
|
|
+ stalledWriterIdxs.append(outIdx);
|
|
|
+ loop
|
|
|
+ {
|
|
|
+ {
|
|
|
+ CriticalUnblock ub(writeAheadCrit);
|
|
|
+ if (writeBlockSem.wait(60000)) // when active writer unblocks, signals all stalledWriters
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ ForEachItemIn(i, stalledWriterIdxs)
|
|
|
+ {
|
|
|
+ unsigned idx = stalledWriterIdxs.item(i);
|
|
|
+ ActPrintLog("Splitter output(%d) stalled on writeahead", idx);
|
|
|
+ }
|
|
|
+ }
|
|
|
// recsReady or eofHit will have been updated by the blocking thread by now, loop and re-check
|
|
|
}
|
|
|
else
|