Selaa lähdekoodia

Merge pull request #6555 from jakesmith/hpcc-12443

HPCC-12443 Improve distributor tracing

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 vuotta sitten
vanhempi
commit
5806448f75

+ 1 - 1
thorlcr/activities/fetch/thfetchslave.cpp

@@ -174,7 +174,7 @@ public:
     {
         fposHash = new CFPosHandler(*iFetchHandler, offsetCount, offsetTable);
         keyIn.set(_keyIn);
-        distributor = createHashDistributor(&owner, owner.queryContainer().queryJob().queryJobComm(), tag, false, this);
+        distributor = createHashDistributor(&owner, owner.queryContainer().queryJob().queryJobComm(), tag, false, this, "FetchStream");
         keyOutStream.setown(distributor->connect(keyRowIf, keyIn, fposHash, NULL));
     }
     virtual IRowStream *queryOutput() { return this; }

+ 90 - 55
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -459,10 +459,10 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                     if (postCount<preCount*9/10)
                         dedupSuccesses++;
                     dedupSamples++;
-                    ActPrintLog(owner.activity, "pre-dedup sample %d : %d unique out of %d, took: %d ms", dedupSamples, postCount, preCount, tookMs);
+                    owner.ActPrintLog("pre-dedup sample %d : %d unique out of %d, took: %d ms", dedupSamples, postCount, preCount, tookMs);
                     if ((10 == dedupSamples) && (0 == dedupSuccesses))
                     {
-                        ActPrintLog(owner.activity, "disabling distribute pre-dedup");
+                        owner.ActPrintLog("disabling distribute pre-dedup");
                         doDedup = false;
                     }
                 }
@@ -489,7 +489,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             if (owner.sendBlock(i, msg))
                 return true;
             atomic_inc(&numFinished);
-            ActPrintLog(owner.activity, "CSender::sendBlock stopped slave %d (finished=%d)", i+1, atomic_read(&numFinished));
+            owner.ActPrintLog("CSender::sendBlock stopped slave %d (finished=%d)", i+1, atomic_read(&numFinished));
             return false;
         }
         inline bool selfPush(unsigned i) const
@@ -511,7 +511,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                     }
                     catch (IException *e)
                     {
-                        ActPrintLog(owner.activity, e, "HDIST: closeWrite");
+                        owner.ActPrintLog(e, "HDIST: closeWrite");
                         owner.fireException(e);
                         e->Release();
                     }
@@ -578,7 +578,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
         }
         void process(IRowStream *input)
         {
-            ActPrintLog(owner.activity, "Distribute send start");
+            owner.ActPrintLog("Distribute send start");
             reinit();
             CCycleTimer timer;
             rowcount_t totalSent = 0;
@@ -688,7 +688,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                         loop
                         {
                             if (timer.elapsedCycles() >= queryOneSecCycles()*10)
-                                ActPrintLog(owner.activity, "HD sender, waiting for space, active writers = %d", queryInactiveWriters());
+                                owner.ActPrintLog("HD sender, waiting for space, active writers = %d", queryInactiveWriters());
                             timer.reset();
 
                             if (senderFullSem.wait(10000))
@@ -726,12 +726,12 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             }
             catch (IException *e)
             {
-                ActPrintLog(owner.activity, e, "HDIST: sender.process");
+                owner.ActPrintLog(e, "HDIST: sender.process");
                 owner.fireException(e);
                 e->Release();
             }
 
-            ActPrintLog(owner.activity, "Distribute send finishing");
+            owner.ActPrintLog("Distribute send finishing");
             if (!aborted)
             {
                 // send remainder
@@ -748,12 +748,12 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                     }
                 }
             }
-            ActPrintLog(owner.activity, "HDIST: waiting for threads");
+            owner.ActPrintLog("HDIST: waiting for threads");
             writerPool->joinAll();
-            ActPrintLog(owner.activity, "HDIST: calling closeWrite()");
+            owner.ActPrintLog("HDIST: calling closeWrite()");
             closeWrite();
 
-            ActPrintLog(owner.activity, "HDIST: Send loop %s %"RCPF"d rows sent", exception.get()?"aborted":"finished", totalSent);
+            owner.ActPrintLog("HDIST: Send loop %s %"RCPF"d rows sent", exception.get()?"aborted":"finished", totalSent);
         }
         void abort()
         {
@@ -770,7 +770,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
     // IExceptionHandler impl.
         virtual bool fireException(IException *e)
         {
-            ActPrintLog(owner.activity, e, "HDIST: CSender");
+            owner.ActPrintLog(e, "HDIST: CSender");
             if (!aborted)
             {
                 abort();
@@ -840,6 +840,40 @@ protected:
             pipewr->putRow(row);
         }
     }
+    void ActPrintLog(const char *format, ...)  __attribute__((format(printf, 2, 3)))
+    {
+        const char *pdFormat;
+        StringBuffer dFormat;
+        if (id.get())
+        {
+            dFormat.appendf("[ %s ] : ", id.get());
+            dFormat.append(format);
+            pdFormat = dFormat.str();
+        }
+        else
+            pdFormat = format;
+        va_list args;
+        va_start(args, format);
+        ::ActPrintLogArgs(&activity->queryContainer(), thorlog_null, MCdebugProgress, pdFormat, args);
+        va_end(args);
+    }
+    void ActPrintLog(IException *e, const char *format, ...) __attribute__((format(printf, 3, 4)))
+    {
+        const char *pdFormat;
+        StringBuffer dFormat;
+        if (id.get())
+        {
+            dFormat.appendf("[ %s ] : ", id.get());
+            dFormat.append(format);
+            pdFormat = dFormat.str();
+        }
+        else
+            pdFormat = format;
+        va_list args;
+        va_start(args, format);
+        ::ActPrintLogArgs(&activity->queryContainer(), e, thorlog_all, MCexception(e), format, args);
+        va_end(args);
+    }
 protected:
     CActivityBase *activity;
     size32_t inputBufferSize, pullBufferSize;
@@ -851,11 +885,12 @@ protected:
     CSender sender;
     unsigned candidateLimit;
     unsigned targetWriterLimit;
+    StringAttr id; // for tracing
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CDistributorBase(CActivityBase *_activity, bool _doDedup, IStopInput *_istop)
-        : activity(_activity), recvthread(this), sendthread(this), sender(*this)
+    CDistributorBase(CActivityBase *_activity, bool _doDedup, IStopInput *_istop, const char *_id)
+        : activity(_activity), recvthread(this), sendthread(this), sender(*this), id(_id)
     {
         aborted = connected = false;
         doDedup = _doDedup;
@@ -874,16 +909,16 @@ public:
 
         allowSpill = activity->getOptBool(THOROPT_HDIST_SPILL, true);
         if (allowSpill)
-            ActPrintLog(activity, "Using spilling buffer (will spill if overflows)");
+            ActPrintLog("Using spilling buffer (will spill if overflows)");
         writerPoolSize = activity->getOptUInt(THOROPT_HDIST_WRITE_POOL_SIZE, DEFAULT_WRITEPOOLSIZE);
         if (writerPoolSize>(numnodes*2))
             writerPoolSize = numnodes*2; // limit to 2 per target
-        ActPrintLog(activity, "Writer thread pool size : %d", writerPoolSize);
+        ActPrintLog("Writer thread pool size : %d", writerPoolSize);
         candidateLimit = activity->getOptUInt(THOROPT_HDIST_CANDIDATELIMIT);
-        ActPrintLog(activity, "candidateLimit : %d", candidateLimit);
-        ActPrintLog(activity, "inputBufferSize : %d, bucketSendSize = %d", inputBufferSize, bucketSendSize);
+        ActPrintLog("candidateLimit : %d", candidateLimit);
+        ActPrintLog("inputBufferSize : %d, bucketSendSize = %d", inputBufferSize, bucketSendSize);
         targetWriterLimit = activity->getOptUInt(THOROPT_HDIST_TARGETWRITELIMIT);
-        ActPrintLog(activity, "targetWriterLimit : %d", targetWriterLimit);
+        ActPrintLog("targetWriterLimit : %d", targetWriterLimit);
     }
 
     ~CDistributorBase()
@@ -894,7 +929,7 @@ public:
         }
         catch (IException *e)
         {
-            ActPrintLog(activity, e, "HDIST: CDistributor");
+            ActPrintLog(e, "HDIST: CDistributor");
             e->Release();
         }
     }
@@ -917,7 +952,7 @@ public:
 
     virtual IRowStream *connect(IRowInterfaces *_rowIf, IRowStream *_input, IHash *_ihash, ICompare *_iCompare)
     {
-        ActPrintLog(activity, "HASHDISTRIB: connect");
+        ActPrintLog("HASHDISTRIB: connect");
 
         rowIf.set(_rowIf);
         allocator = _rowIf->queryRowAllocator();
@@ -947,7 +982,7 @@ public:
         sendException.clear();
         recvException.clear();
         start();
-        ActPrintLog(activity, "HASHDISTRIB: connected");
+        ActPrintLog("HASHDISTRIB: connected");
         return piperd.getLink();
     }
 
@@ -997,7 +1032,7 @@ public:
         MemoryBuffer tempMb;
         try
         {
-            ActPrintLog(activity, "Read loop start");
+            ActPrintLog("Read loop start");
             CMessageBuffer recvMb;
             Owned<ISerialStream> stream = createMemoryBufferSerialStream(tempMb);
             CThorStreamDeserializerSource rowSource;
@@ -1012,7 +1047,7 @@ public:
                 if (n==(unsigned)-1)
                     break;
 #ifdef _FULL_TRACE
-                ActPrintLog(activity, "HDIST: Received block %d from slave %d",recvMb.length(),n+1);
+                ActPrintLog("HDIST: Received block %d from slave %d",recvMb.length(),n+1);
 #endif
                 if (recvMb.length())
                 {
@@ -1047,10 +1082,10 @@ public:
                 else
                 {
                     left--;
-                    ActPrintLog(activity, "HDIST: finished slave %d, %d left",n+1,left);
+                    ActPrintLog("HDIST: finished slave %d, %d left",n+1,left);
                 }
 #ifdef _FULL_TRACE
-                ActPrintLog(activity, "HDIST: Put block %d from slave %d",recvMb.length(),n+1);
+                ActPrintLog("HDIST: Put block %d from slave %d",recvMb.length(),n+1);
 #endif
             }
         }
@@ -1059,7 +1094,7 @@ public:
             setRecvExc(e);
         }
 #ifdef _FULL_TRACE
-        ActPrintLog(activity, "HDIST: waiting localFinishedSem");
+        ActPrintLog("HDIST: waiting localFinishedSem");
 #endif
     }
 
@@ -1070,7 +1105,7 @@ public:
         if (piperd)
             piperd->stop();
         pipewr.clear();
-        ActPrintLog(activity, "HDIST: Read loop done");
+        ActPrintLog("HDIST: Read loop done");
     }
 
     void sendloop()
@@ -1095,7 +1130,7 @@ public:
                     sendException.setown(e);
                 else
                 {
-                    ActPrintLog(activity, e, "HDIST: follow on");
+                    ActPrintLog(e, "HDIST: follow on");
                     e->Release();
                 }
             }
@@ -1139,7 +1174,7 @@ public:
 
     void setRecvExc(IException *e)
     {
-        ActPrintLog(activity, e, "HDIST: recvloop");
+        ActPrintLog(e, "HDIST: recvloop");
         abort();
         if (recvException.get())
             e->Release();
@@ -1200,8 +1235,8 @@ public:
 
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CRowDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, IStopInput *istop)
-        : CDistributorBase(activity, doDedup, istop), comm(_comm), tag(_tag)
+    CRowDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, IStopInput *istop, const char *id)
+        : CDistributorBase(activity, doDedup, istop, id), comm(_comm), tag(_tag)
     {
         stopping = false;
     }
@@ -1214,18 +1249,18 @@ Restart:
         CMessageBuffer ack;
 #ifdef TRACE_MP
         unsigned waiting = comm.probe(RANK_ALL,tag,NULL);
-        ActPrintLog(activity, "HDIST MP recv(%d) waiting %d",(int)tag, waiting);
+        ActPrintLog("HDIST MP recv(%d) waiting %d",(int)tag, waiting);
 #endif
         if (!comm.recv(msg, RANK_ALL, tag, &sender))
         {
 #ifdef TRACE_MP
-            ActPrintLog(activity, "HDIST MP recv failed");
+            ActPrintLog("HDIST MP recv failed");
 #endif
             return (unsigned)-1;
         }
 #ifdef TRACE_MP
         waiting = comm.probe(RANK_ALL,tag,NULL);
-        ActPrintLog(activity, "HDIST MP received %d from %d reply tag %d, waiting %d",msg.length(), (int)sender, (int)msg.getReplyTag(),waiting);
+        ActPrintLog("HDIST MP received %d from %d reply tag %d, waiting %d",msg.length(), (int)sender, (int)msg.getReplyTag(),waiting);
 #endif
         size32_t sz=msg.length();
         while (sz)
@@ -1263,7 +1298,7 @@ Restart:
     virtual bool sendBlock(unsigned i, CMessageBuffer &msg)
     {
 #ifdef TRACE_MP
-        ActPrintLog(activity, "HDIST MP send(%d,%d,%d)",i+1,(int)tag,msg.length());
+        ActPrintLog("HDIST MP send(%d,%d,%d)",i+1,(int)tag,msg.length());
 #endif
         byte flag=0;
 
@@ -1275,14 +1310,14 @@ Restart:
             flag = 1;               // want ack
             rts.append(flag);
 #ifdef _FULL_TRACE
-            ActPrintLog(activity, "HDIST MP sending RTS to %d",i+1);
+            ActPrintLog("HDIST MP sending RTS to %d",i+1);
 #endif
 
             if (!sendRecv(comm, rts, i+1, tag))
                 return false;
             rts.read(flag);
 #ifdef _FULL_TRACE
-            ActPrintLog(activity, "HDIST MP got CTS from %d, %d",i+1,(int)flag);
+            ActPrintLog("HDIST MP got CTS from %d, %d",i+1,(int)flag);
 #endif
             if (flag==0)
                 return false;           // other end stopped
@@ -1305,7 +1340,7 @@ Restart:
     virtual void stopRecv()
     {
 #ifdef TRACE_MP
-        ActPrintLog(activity, "HDIST MP stopRecv");
+        ActPrintLog("HDIST MP stopRecv");
 #endif
         stopping = true;
     }
@@ -1460,8 +1495,8 @@ class CRowPullDistributor: public CDistributorBase
         selfdone.reinit();
     }
 public:
-    CRowPullDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, IStopInput *istop)
-        : CDistributorBase(activity, doDedup, istop), comm(_comm), tag(_tag)
+    CRowPullDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, IStopInput *istop, const char *id)
+        : CDistributorBase(activity, doDedup, istop, id), comm(_comm), tag(_tag)
     {
         pull = true;
         targetWriterLimit = 1; // >1 target writer can cause packets to be received out of order
@@ -1507,7 +1542,7 @@ public:
         try
         {
             Owned<cSortedDistributeMerger> merger = new cSortedDistributeMerger(*this, numnodes, queryCompare(), queryAllocator(), deserializer);
-            ActPrintLog(activity, "Read loop start");
+            ActPrintLog("Read loop start");
             while (!aborted)
             {
                 const void *row = merger->merged().nextRow();
@@ -1518,7 +1553,7 @@ public:
         }
         catch (IException *e)
         {
-            ActPrintLog(activity, e, "HDIST: recvloop");
+            ActPrintLog(e, "HDIST: recvloop");
             setRecvExc(e);
         }
     }
@@ -1557,7 +1592,7 @@ public:
                     if (!cachefileio)
                         throw MakeStringException(-1,"CRowPullDistributor: Could not create disk cache");
                     diskpos = 0;
-                    ActPrintLog(activity, "CRowPullDistributor spilling to %s",tempname.str());
+                    ActPrintLog("CRowPullDistributor spilling to %s",tempname.str());
                 }
                 cachefileio->write(diskpos,sz,msg.bufferBase());
             }
@@ -1638,7 +1673,7 @@ public:
             }
         }
 #ifdef TRACE_MP
-        ActPrintLog(activity, "HDIST MPpull recv done(%d)",i);
+        ActPrintLog("HDIST MPpull recv done(%d)",i);
 #endif
         return i;
     }
@@ -1648,7 +1683,7 @@ public:
         if (hasbuf[target]&&(waiting[target]!=TAG_NULL))
         {
 #ifdef TRACE_MP
-            ActPrintLog(activity, "HDIST MP dosend(%d,%d)",i,bufs[target].length());
+            ActPrintLog("HDIST MP dosend(%d,%d)",i,bufs[target].length());
 #endif
             size32_t sz = bufs[target].length();
             // TBD compress here?
@@ -1734,7 +1769,7 @@ public:
     virtual void stopRecv()
     {
 #ifdef TRACE_MP
-        ActPrintLog(activity, "HDIST MPpull stopRecv");
+        ActPrintLog("HDIST MPpull stopRecv");
 #endif
         stopping = true;
         selfready.signal();
@@ -1751,14 +1786,14 @@ public:
 //==================================================================================================
 
 
-IHashDistributor *createHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, bool doDedup, IStopInput *istop)
+IHashDistributor *createHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, bool doDedup, IStopInput *istop, const char *id)
 {
-    return new CRowDistributor(activity, comm, tag, doDedup, istop);
+    return new CRowDistributor(activity, comm, tag, doDedup, istop, id);
 }
 
-IHashDistributor *createPullHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, bool doDedup, IStopInput *istop)
+IHashDistributor *createPullHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, bool doDedup, IStopInput *istop, const char *id=NULL)
 {
-    return new CRowPullDistributor(activity, comm, tag, doDedup, istop);
+    return new CRowPullDistributor(activity, comm, tag, doDedup, istop, id);
 }
 
 
@@ -3419,7 +3454,7 @@ public:
         ICompare *icompareL = joinargs->queryCompareLeft();
         ICompare *icompareR = joinargs->queryCompareRight();
         if (!lhsDistributor)
-            lhsDistributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag, false, this));
+            lhsDistributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag, false, this, "LHS"));
         Owned<IRowStream> reader = lhsDistributor->connect(queryRowInterfaces(inL), inL, ihashL, icompareL);
         Owned<IThorRowLoader> loaderL = createThorRowLoader(*this, ::queryRowInterfaces(inL), icompareL, stableSort_earlyAlloc, rc_allDisk, SPILL_PRIORITY_HASHJOIN);
         strmL.setown(loaderL->load(reader, abortSoon));
@@ -3430,7 +3465,7 @@ public:
         lhsDistributor->join();
         leftdone = true;
         if (!rhsDistributor)
-            rhsDistributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag2, false, this));
+            rhsDistributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag2, false, this, "RHS"));
         reader.setown(rhsDistributor->connect(queryRowInterfaces(inR), inR, ihashR, icompareR));
         Owned<IThorRowLoader> loaderR = createThorRowLoader(*this, ::queryRowInterfaces(inR), icompareR, stableSort_earlyAlloc, rc_mixed, SPILL_PRIORITY_HASHJOIN);;
         strmR.setown(loaderR->load(reader, abortSoon));
@@ -3613,7 +3648,7 @@ CThorRowAggregator *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivi
             }
         } nodeCompare(helperExtra.queryHashElement());
         if (!distributor)
-            distributor.setown(createPullHashDistributor(&activity, activity.queryContainer().queryJob().queryJobComm(), mptag, false, NULL));
+            distributor.setown(createPullHashDistributor(&activity, activity.queryContainer().queryJob().queryJobComm(), mptag, false, NULL, "MERGEAGGS"));
         strm.setown(distributor->connect(nodeRowMetaRowIf, localAggregatedStream, &nodeCompare, &nodeCompare));
         loop
         {
@@ -3647,7 +3682,7 @@ CThorRowAggregator *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivi
         };
         Owned<IRowStream> localAggregatedStream = new CRowAggregatedStream(localAggTable);
         if (!distributor)
-            distributor.setown(createHashDistributor(&activity, activity.queryContainer().queryJob().queryJobComm(), mptag, false, NULL));
+            distributor.setown(createHashDistributor(&activity, activity.queryContainer().queryJob().queryJobComm(), mptag, false, NULL, "MERGEAGGS"));
         Owned<IRowInterfaces> rowIf = activity.getRowInterfaces(); // create new rowIF / avoid using activities IRowInterface, otherwise suffer from circular link
         strm.setown(distributor->connect(rowIf, localAggregatedStream, helperExtra.queryHashElement(), NULL));
         loop

+ 1 - 1
thorlcr/activities/hashdistrib/thhashdistribslave.ipp

@@ -39,7 +39,7 @@ IHashDistributor *createHashDistributor(
     ICommunicator &comm, 
     mptag_t tag, 
     bool dedup,
-    IStopInput *istop);
+    IStopInput *istop, const char *id=NULL); // id optional, used for tracing to identify which distributor if >1 in activity
 
 CThorRowAggregator *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivityBase &activity, IHThorRowAggregator &helper, IHThorHashAggregateExtra &helperExtra, CThorRowAggregator *localAggTable, mptag_t mptag, bool ordered);
 

+ 7 - 2
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -1407,9 +1407,9 @@ protected:
     {
         if (!rhsDistributor)
         {
-            rhsDistributor.setown(createHashDistributor(this, queryJob().queryJobComm(), rhsDistributeTag, false, NULL));
+            rhsDistributor.setown(createHashDistributor(this, queryJob().queryJobComm(), rhsDistributeTag, false, NULL, "RHS"));
             right.setown(rhsDistributor->connect(queryRowInterfaces(rightITDL), right.getClear(), rightHash, NULL));
-            lhsDistributor.setown(createHashDistributor(this, queryJob().queryJobComm(), lhsDistributeTag, false, NULL));
+            lhsDistributor.setown(createHashDistributor(this, queryJob().queryJobComm(), lhsDistributeTag, false, NULL, "LHS"));
             left.setown(lhsDistributor->connect(queryRowInterfaces(leftITDL), left.getClear(), leftHash, NULL));
         }
     }
@@ -1530,6 +1530,7 @@ protected:
                     // Have to keep broadcastSpillingLock locked until sort and calculate are done
                     uniqueKeys = marker.calculate(rhs, compareRight, !rhsAlreadySorted);
                     rhsCollated = true;
+                    ActPrintLog("Collated all RHS rows");
                 }
             }
             if (!doPerformLocalLookup()) // check again after processing above
@@ -1600,6 +1601,10 @@ protected:
 
                 if (rhsCollated)
                 {
+                    /* JCSMORE - I think 'right' must be empty if here (if rhsCollated=true)
+                     * so above streams.append(*right.getLink()); should go in else block below only AFAICS
+                     */
+
                     // NB: If spilt after rhsCollated, callback will have cleared and compacted
                     streams.append(*rhs.createRowStream()); // NB: will kill array when stream exhausted
                 }