Browse Source

Merge pull request #6710 from jakesmith/hpcc-11928

HPCC-11928 Smart Join failing over to full join too early.

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 10 years ago
parent
commit
af8046fb95

+ 5 - 2
thorlcr/activities/lookupjoin/thlookupjoin.cpp

@@ -22,7 +22,7 @@
 
 class CLookupJoinActivityMaster : public CMasterActivity
 {
-    mptag_t broadcast2MpTag, lhsDistributeTag, rhsDistributeTag;
+    mptag_t broadcast2MpTag, broadcast3MpTag, lhsDistributeTag, rhsDistributeTag;
 
     bool isAll() const
     {
@@ -39,13 +39,14 @@ public:
     CLookupJoinActivityMaster(CMasterGraphElement * info) : CMasterActivity(info)
     {
         if (container.queryLocal())
-            broadcast2MpTag = lhsDistributeTag = rhsDistributeTag = TAG_NULL;
+            broadcast2MpTag = broadcast3MpTag, lhsDistributeTag = rhsDistributeTag = TAG_NULL;
         else
         {
             mpTag = container.queryJob().allocateMPTag(); // NB: base takes ownership and free's
             if (!isAll())
             {
                 broadcast2MpTag = container.queryJob().allocateMPTag();
+                broadcast3MpTag = container.queryJob().allocateMPTag();
                 lhsDistributeTag = container.queryJob().allocateMPTag();
                 rhsDistributeTag = container.queryJob().allocateMPTag();
             }
@@ -56,6 +57,7 @@ public:
         if (!container.queryLocal() && !isAll())
         {
             container.queryJob().freeMPTag(broadcast2MpTag);
+            container.queryJob().freeMPTag(broadcast3MpTag);
             container.queryJob().freeMPTag(lhsDistributeTag);
             container.queryJob().freeMPTag(rhsDistributeTag);
             // NB: if mpTag is allocated, the activity base class frees
@@ -69,6 +71,7 @@ public:
             if (!isAll())
             {
                 serializeMPtag(dst, broadcast2MpTag);
+                serializeMPtag(dst, broadcast3MpTag);
                 serializeMPtag(dst, lhsDistributeTag);
                 serializeMPtag(dst, rhsDistributeTag);
             }

+ 321 - 173
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -39,7 +39,7 @@ enum join_t { JT_Undefined, JT_Inner, JT_LeftOuter, JT_RightOuter, JT_LeftOnly,
 #define MAX_QUEUE_BLOCKS 5
 
 enum broadcast_code { bcast_none, bcast_send, bcast_sendStopping, bcast_stop };
-enum broadcast_flags { bcastflag_spilt=0x100 };
+enum broadcast_flags { bcastflag_null=0, bcastflag_spilt=0x100, bcastflag_standardjoin=0x200 };
 #define BROADCAST_CODE_MASK 0x00FF
 #define BROADCAST_FLAG_MASK 0xFF00
 class CSendItem : public CSimpleInterface
@@ -331,7 +331,7 @@ class CBroadcaster : public CSimpleInterface
                     CriticalBlock b(allDoneLock);
                     if (slaveStop(sendItem->queryOrigin()-1) || allDone)
                     {
-                        recvInterface->bCastReceive(NULL); // signal last
+                        recvInterface->bCastReceive(sendItem.getClear());
                         ActPrintLog(&activity, "recvLoop, received last slaveStop");
                         // NB: this slave has nothing more to receive.
                         // However the sender will still be re-broadcasting some packets, including these stop packets
@@ -372,7 +372,7 @@ public:
     {
         stopping = _stopping;
         if (stopping)
-            slavesStopping->set(myNode-1, true);
+            setStopping();
         recvInterface = _recvInterface;
         stopRecv = false;
         mpTag = _mpTag;
@@ -431,6 +431,14 @@ public:
         broadcastToOthers(sendItem);
         return !allRequestStop;
     }
+    bool isStopping()
+    {
+        return slavesStopping->test(myNode-1);
+    }
+    void setStopping()
+    {
+        slavesStopping->set(myNode-1, true);
+    }
 };
 
 /* CMarker processes a sorted set of rows, comparing every adjacent row.
@@ -663,6 +671,7 @@ class CInMemJoinBase : public CSlaveActivity, public CThorDataLink, public CAllO
     Owned<IException> leftexception;
 
     bool eos, eog, someSinceEog;
+    SpinLock rHSRowSpinLock;
 
 protected:
     typedef CAllOrLookupHelper<HELPER> HELPERBASE;
@@ -711,7 +720,12 @@ protected:
                 blockQueue.enqueue(NULL);
             }
         }
-        void wait() { threaded.join(); }
+        void wait()
+        {
+            threaded.join();
+            if (exception)
+                throw exception.getClear();
+        }
         void addBlock(CSendItem *sendItem)
         {
             if (exception)
@@ -835,6 +849,10 @@ protected:
     }
     void processRHSRows(unsigned slave, MemoryBuffer &mb)
     {
+        /* JCSMORE - I wonder if this should be done asynchronously on a few threads (<=1 per target)
+         * It also might be better to use hash the rows now and assign to rhsNodeRows arrays, it's a waste of hash() calls
+         * if it never spills, but will make flushing non-locals simpler if spilling occurs.
+         */
         CThorSpillableRowArray &rows = *rhsNodeRows.item(slave-1);
         RtlDynamicRowBuilder rowBuilder(rightAllocator);
         CThorStreamDeserializerSource memDeserializer(mb.length(), mb.toByteArray());
@@ -842,13 +860,13 @@ protected:
         {
             size32_t sz = rightDeserializer->deserialize(rowBuilder, memDeserializer);
             OwnedConstThorRow fRow = rowBuilder.finalizeRowClear(sz);
-            // NB: If spilt, addLocalRHSRow will filter out non-locals
-            addLocalRHSRow(rows, fRow);
+            // NB: If spilt, addRHSRow will filter out non-locals
+            if (!addRHSRow(rows, fRow)) // NB: in SMART case, must succeed
+                throw MakeActivityException(this, 0, "Out of memory: Unable to add any more rows to RHS");
         }
     }
     void broadcastRHS() // broadcasting local rhs
     {
-        bool stopRHSBroadcast = false;
         Owned<CSendItem> sendItem = broadcaster.newSendItem(bcast_send);
         MemoryBuffer mb;
         try
@@ -863,21 +881,21 @@ protected:
                     if (!row)
                         break;
 
-                    if (!addLocalRHSRow(localRhsRows, row))
-                        stopRHSBroadcast = true;
+                    if (!addRHSRow(localRhsRows, row)) // may cause broadcaster to be told to stop (for isStopping() to become true)
+                        throw MakeActivityException(this, 0, "Out of memory: Unable to add any more rows to RHS");
 
                     rightSerializer->serialize(mbser, (const byte *)row.get());
-                    if (mb.length() >= MAX_SEND_SIZE || stopRHSBroadcast)
+                    if (mb.length() >= MAX_SEND_SIZE || broadcaster.isStopping())
                         break;
                 }
                 if (0 == mb.length())
                     break;
-                if (stopRHSBroadcast)
+                if (broadcaster.isStopping())
                     sendItem->setFlag(bcastflag_spilt);
                 ThorCompress(mb, sendItem->queryMsg());
                 if (!broadcaster.send(sendItem))
                     break;
-                if (stopRHSBroadcast)
+                if (broadcaster.isStopping())
                     break;
                 mb.clear();
                 broadcaster.resetSendItem(sendItem);
@@ -890,7 +908,7 @@ protected:
         }
 
         sendItem.setown(broadcaster.newSendItem(bcast_stop));
-        if (stopRHSBroadcast)
+        if (broadcaster.isStopping())
             sendItem->setFlag(bcastflag_spilt);
         ActPrintLog("Sending final RHS broadcast packet");
         broadcaster.send(sendItem); // signals stop to others
@@ -1124,6 +1142,8 @@ public:
         allocator.set(queryRowAllocator());
         leftAllocator.set(::queryRowAllocator(leftITDL));
         outputMeta.set(leftITDL->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta());
+        for (unsigned s=0; s<container.queryJob().querySlaves(); s++)
+            rhsNodeRows.item(s)->setup(queryRowInterfaces(rightITDL));
 
         eos = eog = someSinceEog = false;
 
@@ -1198,6 +1218,15 @@ public:
         broadcaster.end();
         rowProcessor.wait();
     }
+    void doBroadcastStop(mptag_t tag, broadcast_flags flag)
+    {
+        broadcaster.start(this, tag, false);
+        Owned<CSendItem> sendItem = broadcaster.newSendItem(bcast_stop);
+        if (flag)
+            sendItem->setFlag(flag);
+        broadcaster.send(sendItem); // signals stop to others
+        broadcaster.end();
+    }
     rowidx_t getGlobalRHSTotal()
     {
         rowcount_t rhsRows = 0;
@@ -1211,12 +1240,16 @@ public:
         }
         return (rowidx_t)rhsRows;
     }
-    virtual bool addLocalRHSRow(CThorSpillableRowArray &localRhsRows, const void *row)
+    virtual bool addRHSRow(CThorSpillableRowArray &rhsRows, const void *row)
     {
         LinkThorRow(row);
-        if (!localRhsRows.append(row))
-            throw MakeActivityException(this, 0, "Out of memory: Cannot append local rhs row");
-        return true;
+        {
+            SpinBlock b(rHSRowSpinLock);
+            if (rhsRows.append(row))
+                return true;
+        }
+        ReleaseThorRow(row);
+        return false;
     }
 // ISmartBufferNotify
     virtual void onInputStarted(IException *except)
@@ -1289,6 +1322,7 @@ protected:
     using PARENT::fuzzyMatch;
     using PARENT::keepLimit;
     using PARENT::doBroadcastRHS;
+    using PARENT::doBroadcastStop;
     using PARENT::getGlobalRHSTotal;
     using PARENT::getOptBool;
     using PARENT::broadcaster;
@@ -1310,32 +1344,27 @@ protected:
     unsigned abortLimit, atMost;
     bool dedup, stable;
 
-    mptag_t lhsDistributeTag, rhsDistributeTag, broadcast2MpTag;
+    mptag_t lhsDistributeTag, rhsDistributeTag, broadcast2MpTag, broadcast3MpTag;
 
     // Handling failover to a) hashed local lookupjoin b) hash distributed standard join
+    bool smart;
     bool rhsCollated;
-    bool failoverToLocalLookupJoin, failoverToStdJoin;
     Owned<IHashDistributor> lhsDistributor, rhsDistributor;
     ICompare *compareLeft;
     UnsignedArray flushedRowMarkers;
-
-    atomic_t performLocalLookup;
+    CriticalSection overflowCrit;
+    Owned<CFileOwner> overflowWriteFile;
+    Owned<IRowWriter> overflowWriteStream;
+    rowcount_t overflowWriteCount;
+    atomic_t localLookup, standardJoin;
     CriticalSection broadcastSpillingLock;
     Owned<IJoinHelper> joinHelper;
 
-    inline bool isSmart() const
-    {
-        switch (container.getKind())
-        {
-            case TAKsmartjoin:
-            case TAKsmartdenormalize:
-            case TAKsmartdenormalizegroup:
-                return true;
-        }
-        return false;
-    }
-    inline bool doPerformLocalLookup() const { return 0 != atomic_read(&performLocalLookup); }
-    inline void setPerformLocalLookup(bool tf) { atomic_set(&performLocalLookup, (int)tf); }
+    inline bool isSmart() const { return smart; }
+    inline bool isLocalLookup() const { return 0 != atomic_read(&localLookup); }
+    inline void setLocalLookup(bool tf) { atomic_set(&localLookup, (int)tf); }
+    inline bool isStandardJoin() const { return 0 != atomic_read(&standardJoin); }
+    inline void setStandardJoin(bool tf) { atomic_set(&standardJoin, (int)tf); }
     rowidx_t clearNonLocalRows(CThorSpillableRowArray &rows, rowidx_t startPos)
     {
         CThorArrayLockBlock block(rows);
@@ -1373,11 +1402,13 @@ protected:
         // This is likely to free memory, so block others (threads) until done
         // NB: This will not block appends
         CriticalBlock b(broadcastSpillingLock);
-        if (doPerformLocalLookup())
+        if (isLocalLookup())
             return false;
-        setPerformLocalLookup(true);
+        setLocalLookup(true);
         ActPrintLog("Clearing non-local rows - cause: %s", msg);
 
+        broadcaster.setStopping(); // signals to broadcast to start stopping
+
         rowidx_t clearedRows = 0;
         if (rhsCollated)
         {
@@ -1403,16 +1434,6 @@ protected:
         ActPrintLog("handleLowMem: clearedRows = %"RIPF"d", clearedRows);
         return 0 != clearedRows;
     }
-    void setupDistributors()
-    {
-        if (!rhsDistributor)
-        {
-            rhsDistributor.setown(createHashDistributor(this, queryJob().queryJobComm(), rhsDistributeTag, false, NULL, "RHS"));
-            right.setown(rhsDistributor->connect(queryRowInterfaces(rightITDL), right.getClear(), rightHash, NULL));
-            lhsDistributor.setown(createHashDistributor(this, queryJob().queryJobComm(), lhsDistributeTag, false, NULL, "LHS"));
-            left.setown(lhsDistributor->connect(queryRowInterfaces(leftITDL), left.getClear(), leftHash, NULL));
-        }
-    }
     bool setupHT(rowidx_t size)
     {
         if (size < 10)
@@ -1469,23 +1490,27 @@ protected:
  * 10) The LHS side is loaded and spilt and sorted if necessary
  * 11) A regular join helper is created to perform a local join against the two hash distributed sorted sides.
  */
-        bool rhsAlreadySorted = helper->isRightAlreadyLocallySorted();
+        bool rhsAlreadySorted = false;
         CMarker marker(*this);
         if (needGlobal)
         {
+            rhsAlreadySorted = helper->isRightAlreadySorted();
             doBroadcastRHS(stopping);
             rowidx_t rhsRows;
             {
                 CriticalBlock b(broadcastSpillingLock);
-                rhsRows = getGlobalRHSTotal();
+                rhsRows = getGlobalRHSTotal(); // total count of flushed rows
             }
-            if (!doPerformLocalLookup())
+            overflowWriteStream.clear(); // if created, would imply already localLookup=true, there cannot be anymore after broadcast complete
+            if (!isLocalLookup())
             {
                 if (stable && !rhsAlreadySorted)
                     rhs.setup(NULL, false, stableSort_earlyAlloc);
+                bool success=false;
                 try
                 {
-                    rhs.ensure(rhsRows, SPILL_PRIORITY_LOW); // NB: Could OOM, handled by exception handler
+                    // NB: If this ensure returns false, it will have called the MM callbacks and have setup isLocalLookup() already
+                    success = rhs.ensure(rhsRows, SPILL_PRIORITY_LOW); // NB: Could OOM, handled by exception handler
                 }
                 catch (IException *e)
                 {
@@ -1500,9 +1525,16 @@ protected:
                     default:
                         throw;
                     }
+                    success = false;
+                }
+                if (!success)
+                {
                     ActPrintLog("Out of memory trying to size the global RHS row table for a SMART join, will now attempt a distributed local lookup join");
-                    clearAllNonLocalRows("OOM on sizing global row table"); // NB: someone else could have provoked callback already
-                    dbgassertex(doPerformLocalLookup());
+                    if (!isLocalLookup())
+                    {
+                        clearAllNonLocalRows("OOM on sizing global row table"); // NB: someone else could have provoked callback already
+                        dbgassertex(isLocalLookup());
+                    }
                 }
             }
 
@@ -1518,13 +1550,13 @@ protected:
                  * and need to protect rhsNodeRows access
                  */
                 CriticalBlock b(broadcastSpillingLock);
-                if (!doPerformLocalLookup())
+                if (!isLocalLookup())
                 {
                     // If spilt, don't size ht table now, if local rhs fits, ht will be sized later
                     ForEachItemIn(a, rhsNodeRows)
                     {
                         CThorSpillableRowArray &rows = *rhsNodeRows.item(a);
-                        rhs.appendRows(rows, true);
+                        rhs.appendRows(rows, true); // NB: This should not cause spilling, rhs is already sized and we are only copying ptrs in
                         rows.kill(); // free up ptr table asap
                     }
                     // Have to keep broadcastSpillingLock locked until sort and calculate are done
@@ -1533,16 +1565,16 @@ protected:
                     ActPrintLog("Collated all RHS rows");
                 }
             }
-            if (!doPerformLocalLookup()) // check again after processing above
+            if (!isLocalLookup()) // check again after processing above
             {
-                if (!setupHT(uniqueKeys)) // NB: Sizing could cause spilling callback to be triggered and/or OOM
+                if (!setupHT(uniqueKeys)) // NB: Sizing can cause spilling callback to be triggered or OOM in case of !smart
                 {
                     ActPrintLog("Out of memory trying to size the global hash table for a SMART join, will now attempt a distributed local lookup join");
-                    clearAllNonLocalRows("OOM on sizing global hash table"); // NB: someone else could have provoked callback already
-                    dbgassertex(doPerformLocalLookup());
+                    clearAllNonLocalRows("OOM on sizing global hash table"); // NB: setupHT should have provoked callback already
+                    dbgassertex(isLocalLookup());
                 }
             }
-            if (failoverToLocalLookupJoin)
+            if (isSmart())
             {
                 /* NB: Potentially one of the slaves spilt late after broadcast and rowprocessor finished
                  * NB2: This is also to let others know of this slaves spill state.
@@ -1553,24 +1585,25 @@ protected:
 
                 ActPrintLog("Broadcasting final split status");
                 broadcaster.reset();
-                // NB: using a different tag from 1st broadcast, as 2nd on other nodes can start sending before 1st on this has quit receiving
-                broadcaster.start(this, broadcast2MpTag, false);
-                Owned<CSendItem> sendItem = broadcaster.newSendItem(bcast_stop);
-                if (doPerformLocalLookup())
-                    sendItem->setFlag(bcastflag_spilt);
-                broadcaster.send(sendItem); // signals stop to others
-                broadcaster.end();
+                doBroadcastStop(broadcast2MpTag, isLocalLookup() ? bcastflag_spilt : bcastflag_null);
             }
 
             /* All slaves now know whether any one spilt or not, i.e. whether to perform local hash join or not
              * If any have, still need to distribute rest of RHS..
              */
 
-            // flush spillable row arrays, and clear any non-locals if performLocalLookup and compact
-            if (doPerformLocalLookup())
+            if (!isLocalLookup())
+            {
+                if (stopping) // broadcast done and no-one spilt, this node can now stop
+                    return;
+            }
+            else
             {
                 ActPrintLog("Spilt whilst broadcasting, will attempt distribute local lookup join");
 
+                // Either it has collated already and remaining rows are sorted, or if didn't get that far, can't rely on previous state of rhsAlreadySorted
+                rhsAlreadySorted = rhsCollated;
+
                 // NB: lhs ordering and grouping lost from here on..
                 if (grouped)
                     throw MakeActivityException(this, 0, "Degraded to distributed lookup join, LHS order cannot be preserved");
@@ -1578,8 +1611,6 @@ protected:
                 // If HT sized already and now spilt, too big clear and size when local size known
                 clearHT();
 
-                setupDistributors();
-
                 if (stopping)
                 {
                     ActPrintLog("getRHS stopped");
@@ -1589,103 +1620,169 @@ protected:
                      */
                     return;
                 }
-
-                /* NB: The collected broadcast rows thus far (in rhsNodeRows or rhs) were ordered/deterministic.
-                 * However, the rest of the rows received via the distributor are non-deterministic.
-                 * Therefore the order overall is non-deterministic from this point on.
-                 * For that reason, the rest of the RHS (distributed) rows will be processed ahead of the
-                 * collected [broadcast] rows in the code below for efficiency reasons.
-                 */
-                IArrayOf<IRowStream> streams;
-                streams.append(*right.getLink()); // what remains of 'right' will be read through distributor
-
-                if (rhsCollated)
-                {
-                    /* JCSMORE - I think 'right' must be empty if here (if rhsCollated=true)
-                     * so above streams.append(*right.getLink()); should go in else block below only AFAICS
-                     */
-
-                    // NB: If spilt after rhsCollated, callback will have cleared and compacted
-                    streams.append(*rhs.createRowStream()); // NB: will kill array when stream exhausted
-                }
-                else
-                {
-                    // NB: If cleared before rhsCollated, then need to clear non-locals that were added after spill
-                    ForEachItemIn(a, rhsNodeRows)
-                    {
-                        CThorSpillableRowArray &sRowArray = *rhsNodeRows.item(a);
-                        CThorExpandingRowArray rowArray(*this, NULL);
-                        rowArray.transferFrom(sRowArray);
-                        clearNonLocalRows(rowArray, flushedRowMarkers.item(a));
-                        rowArray.compact();
-                        streams.append(*rowArray.createRowStream()); // NB: will kill array when stream exhausted
-                    }
-                }
-                right.setown(createConcatRowStream(streams.ordinality(), streams.getArray()));
-            }
-            else
-            {
-                if (stopping) // broadcast done and no-one spilt, this node can now stop
-                    return;
             }
         }
         else
         {
+            rhsAlreadySorted = helper->isRightAlreadyLocallySorted();
             if (stopping) // if local can stop now
                 return;
-            setPerformLocalLookup(true);
+            setLocalLookup(true);
         }
 
-        if (doPerformLocalLookup())
+        if (isLocalLookup())
         {
             Owned<IThorRowLoader> rowLoader;
-            ICompare *cmp = rhsAlreadySorted ? NULL : compareRight;
-            if (failoverToStdJoin)
+            if (!needGlobal || !rhsCollated) // If global && rhsCollated, then no need for rowLoader
             {
-                dbgassertex(!stable);
-                if (getOptBool(THOROPT_LKJOIN_HASHJOINFAILOVER)) // for testing only (force to disk, as if spilt)
-                    rowLoader.setown(createThorRowLoader(*this, queryRowInterfaces(rightITDL), cmp, stableSort_none, rc_allDisk, SPILL_PRIORITY_LOOKUPJOIN));
+                ICompare *cmp = rhsAlreadySorted ? NULL : compareRight;
+                if (isSmart())
+                {
+                    dbgassertex(!stable);
+                    if (getOptBool(THOROPT_LKJOIN_HASHJOINFAILOVER)) // for testing only (force to disk, as if spilt)
+                        rowLoader.setown(createThorRowLoader(*this, queryRowInterfaces(rightITDL), cmp, stableSort_none, rc_allDisk, SPILL_PRIORITY_LOOKUPJOIN));
+                    else
+                        rowLoader.setown(createThorRowLoader(*this, queryRowInterfaces(rightITDL), cmp, stableSort_none, rc_mixed, SPILL_PRIORITY_LOOKUPJOIN));
+                }
                 else
-                    rowLoader.setown(createThorRowLoader(*this, queryRowInterfaces(rightITDL), cmp, stableSort_none, rc_mixed, SPILL_PRIORITY_LOOKUPJOIN));
+                {
+                    // i.e. will fire OOM if runs out of memory loading local right
+                    rowLoader.setown(createThorRowLoader(*this, queryRowInterfaces(rightITDL), cmp, stable ? stableSort_lateAlloc : stableSort_none, rc_allMem, SPILL_PRIORITY_DISABLE));
+                }
             }
-            else
+            Owned<IRowStream> rightStream;
+            if (needGlobal)
             {
-                // i.e. will fire OOM if runs out of memory loading local right
-                rowLoader.setown(createThorRowLoader(*this, queryRowInterfaces(rightITDL), cmp, stable ? stableSort_lateAlloc : stableSort_none, rc_allMem, SPILL_PRIORITY_DISABLE));
+                if (!rhsCollated) // NB: If spilt after rhsCollated, callback will have cleared and compacted, rows will still be sorted
+                {
+                    /* NB: If cleared before rhsCollated, then need to clear non-locals that were added after spill
+                     * There should not be many, as broadcast starts to stop as soon as a slave notifies it is spilling
+                     * and ignores all non-locals.
+                     */
+
+                    // First identify largest row set from nodes, clear (non-locals) and compact.
+                    rowidx_t largestRowCount = 0;
+                    CThorSpillableRowArray *largestRhsNodeRows = NULL;
+                    ForEachItemIn(a, rhsNodeRows)
+                    {
+                        CThorSpillableRowArray &rows = *rhsNodeRows.item(a);
+                        clearNonLocalRows(rows, flushedRowMarkers.item(a));
+
+                        rows.compact(); // JCS->GH - really we want to resize rhsNodeRows now to free up as much space as possible (see HPCC-12511)
+
+                        rowidx_t c = rows.numCommitted();
+                        if (c > largestRowCount)
+                        {
+                            largestRowCount = c;
+                            largestRhsNodeRows = &rows;
+                        }
+                    }
+                    /* NB: The collected broadcast rows thus far (in rhsNodeRows or rhs) were ordered/deterministic.
+                     * However, the rest of the rows received via the distributor are non-deterministic.
+                     * Therefore the order overall is non-deterministic from this point on.
+                     * For that reason, the rest of the RHS (distributed) rows will be processed ahead of the
+                     * collected [broadcast] rows in the code below for efficiency reasons.
+                     */
+                    IArrayOf<IRowStream> rightStreams;
+                    if (largestRhsNodeRows) // unlikely, but could happen if 0 local rhs rows left
+                    {
+                        // add largest directly into loader, so it can alleviate some memory immediately. NB: doesn't allocate
+                        rowLoader->transferRowsIn(*largestRhsNodeRows);
+
+                        /* the others + the rest of the RHS (distributed) are streamed into loader
+                         * the streams have a lower spill priority than the loader, IOW, they will spill before the loader spills.
+                         * If the loader has to spill, then we have to failover to standard join
+                         */
+                        ForEachItemIn(a, rhsNodeRows)
+                        {
+                            CThorSpillableRowArray &rows = *rhsNodeRows.item(a);
+                            if (&rows == largestRhsNodeRows)
+                                continue;
+                            /* NB: will kill array when stream exhausted or if spilt
+                             * Ensure spill priority of these spillable streams is lower than the stream in the loader in the next stage
+                             */
+                            rightStreams.append(*rows.createRowStream()); // NB: default SPILL_PRIORITY_SPILLABLE_STREAM is lower than SPILL_PRIORITY_LOOKUPJOIN
+                        }
+                    }
+                    // NB: 'right' deliberately added after rhsNodeRow streams, so that rhsNodeRow can be consumed into loader 1st
+                    right.setown(rhsDistributor->connect(queryRowInterfaces(rightITDL), right.getClear(), rightHash, NULL));
+                    rightStreams.append(*right.getLink()); // what remains of 'right' will be read through distributor
+                    if (overflowWriteFile)
+                    {
+                        unsigned rwFlags = DEFAULT_RWFLAGS;
+                        if (getOptBool(THOROPT_COMPRESS_SPILLS, true))
+                            rwFlags |= rw_compress;
+                        ActPrintLog("Reading overflow RHS broadcast rows : %"RCPF"d", overflowWriteCount);
+                        Owned<IRowStream> overflowStream = createRowStream(&overflowWriteFile->queryIFile(), queryRowInterfaces(rightITDL), rwFlags);
+                        rightStreams.append(* overflowStream.getClear());
+                    }
+                    if (rightStreams.ordinality()>1)
+                        right.setown(createConcatRowStream(rightStreams.ordinality(), rightStreams.getArray()));
+                    else
+                        right.set(&rightStreams.item(0));
+                    rightStream.setown(rowLoader->load(right, abortSoon, false, &rhs, NULL, false));
+                }
             }
+            else
+                rightStream.setown(rowLoader->load(right, abortSoon, false, &rhs, NULL, false));
 
-            Owned<IRowStream> rightStream = rowLoader->load(right, abortSoon, false, &rhs);
             if (!rightStream)
             {
                 ActPrintLog("RHS local rows fitted in memory, count: %"RIPF"d", rhs.ordinality());
                 // all fitted in memory, rows were transferred out back into 'rhs' and sorted
 
                 /* Now need to size HT.
-                 * transfer rows back into a spillable container
-                 * If HT sizing DOESN'T cause spill, then, row will be transferred back into 'rhs'
+                 * If HT sizing DOESN'T cause spill, then rows will be transferred back into 'rhs'
                  * If HT sizing DOES cause spill, sorted rightStream will be created.
+                 * transfer rows back into a spillable container
                  */
 
                 rowLoader.clear();
 
-                // If stable (and sort needed), already sorted by rowLoader
-                rowidx_t uniqueKeys = marker.calculate(rhs, compareRight, !rhsAlreadySorted && !stable);
-
-                Owned<IThorRowCollector> collector = createThorRowCollector(*this, queryRowInterfaces(rightITDL), cmp, stableSort_none, rc_mixed, SPILL_PRIORITY_LOOKUPJOIN);
-                collector->setOptions(rcflag_noAllInMemSort); // If fits into memory, don't want it resorted
-                collector->transferRowsIn(rhs); // can spill after this
+                // Either was already sorted, or rowLoader->load() sorted on transfer out to rhs
+                rowidx_t uniqueKeys = marker.calculate(rhs, compareRight, false);
 
-                if (!setupHT(uniqueKeys))
+                /* Although HT is allocated with a low spill priority, it can still cause callbacks
+                 * so try to allocate before rhs is transferred to spillable collector
+                 */
+                bool htAllocated = setupHT(uniqueKeys);
+                if (!htAllocated)
                 {
-                    ActPrintLog("Out of memory trying to allocate the [LOCAL] hash table for a SMART join, will now failover to a std hash join");
+                    ActPrintLog("Out of memory trying to allocate the [LOCAL] hash table for a SMART join (%"RIPF"d rows), will now failover to a std hash join", uniqueKeys);
+                    Owned<IThorRowCollector> collector = createThorRowCollector(*this, queryRowInterfaces(rightITDL), NULL, stableSort_none, rc_mixed, SPILL_PRIORITY_LOOKUPJOIN);
+                    collector->setOptions(rcflag_noAllInMemSort); // If fits into memory, don't want it resorted
+                    collector->transferRowsIn(rhs); // can spill after this
                     rightStream.setown(collector->getStream());
                 }
-                else
-                    rightStream.setown(collector->getStream(false, &rhs));
             }
-            if (rightStream) // NB: returned stream, implies (spilt or setupHT OOM'd) AND sorted, if not, 'rhs' is filled
+            if (rightStream)
             {
-                ActPrintLog("RHS spilt to disk. Standard Join will be used");
+                ActPrintLog("Local RHS spilt to disk. Standard Join will be used");
+                setStandardJoin(true);
+            }
+
+            // Barrier point, did all slaves succeed in building local RHS HT?
+            if (needGlobal && isSmart())
+            {
+                bool localRequiredStandardJoin = isStandardJoin();
+                ActPrintLog("Checking other slaves for local RHS lookup status, this slaves RHS %s", localRequiredStandardJoin?"did NOT fit" : "DID fit");
+                broadcaster.reset();
+                broadcast_flags flag = localRequiredStandardJoin ? bcastflag_standardjoin : bcastflag_null;
+                doBroadcastStop(broadcast3MpTag, flag);
+                if (!localRequiredStandardJoin && isStandardJoin())
+                {
+                    ActPrintLog("Other slaves did NOT fit, consequently this slave must fail over to standard join as well");
+                    dbgassertex(!rightStream);
+                    rightStream.setown(rhs.createRowStream());
+                }
+
+                // start LHS distributor, needed either way, by local lookup or full join
+                left.setown(lhsDistributor->connect(queryRowInterfaces(leftITDL), left.getClear(), leftHash, NULL));
+            }
+
+            if (isStandardJoin()) // NB: returned stream, implies (spilt or setupHT OOM'd) AND sorted, if not, 'rhs' is filled
+            {
+                ActPrintLog("Performing standard join");
 
                 // NB: lhs ordering and grouping lost from here on.. (will have been caught earlier if global)
                 if (grouped)
@@ -1751,8 +1848,9 @@ public:
     CLookupJoinActivityBase(CGraphElementBase *_container) : PARENT(_container)
     {
         rhsCollated = false;
-        broadcast2MpTag = lhsDistributeTag = rhsDistributeTag = TAG_NULL;
-        setPerformLocalLookup(false);
+        broadcast2MpTag = broadcast3MpTag = lhsDistributeTag = rhsDistributeTag = TAG_NULL;
+        setLocalLookup(false);
+        setStandardJoin(false);
 
         leftHash = helper->queryHashLeft();
         rightHash = helper->queryHashRight();
@@ -1770,9 +1868,19 @@ public:
         if (abortLimit < atMost)
             atMost = abortLimit;
 
-        failoverToLocalLookupJoin = failoverToStdJoin = isSmart();
-        ActPrintLog("failoverToLocalLookupJoin=%s, failoverToStdJoin=%s",
-                failoverToLocalLookupJoin?"true":"false", failoverToStdJoin?"true":"false");
+        switch (container.getKind())
+        {
+            case TAKsmartjoin:
+            case TAKsmartdenormalize:
+            case TAKsmartdenormalizegroup:
+                smart = true;
+                break;
+            default:
+                smart = false;
+                break;
+        }
+        overflowWriteCount = 0;
+        ActPrintLog("Smart join = %s", smart?"true":"false");
     }
     bool exceedsLimit(rowidx_t count, const void *left, const void *right, const void *&failRow)
     {
@@ -1821,30 +1929,38 @@ public:
         if (!container.queryLocal())
         {
             broadcast2MpTag = container.queryJob().deserializeMPTag(data);
+            broadcast3MpTag = container.queryJob().deserializeMPTag(data);
             lhsDistributeTag = container.queryJob().deserializeMPTag(data);
             rhsDistributeTag = container.queryJob().deserializeMPTag(data);
+            rhsDistributor.setown(createHashDistributor(this, queryJob().queryJobComm(), rhsDistributeTag, false, NULL, "RHS"));
+            lhsDistributor.setown(createHashDistributor(this, queryJob().queryJobComm(), lhsDistributeTag, false, NULL, "LHS"));
         }
     }
     virtual void start()
     {
         ActivityTimer s(totalCycles, timeActivities);
         PARENT::start();
+        if (isSmart())
+        {
+            setLocalLookup(false);
+            setStandardJoin(false);
+            rhsCollated = false;
+            flushedRowMarkers.kill();
 
-        if (!isSmart())
+            if (needGlobal)
+            {
+                overflowWriteCount = 0;
+                overflowWriteFile.clear();
+                overflowWriteStream.clear();
+                queryJob().queryRowManager()->addRowBuffer(this);
+            }
+        }
+        else
         {
             bool inputGrouped = leftITDL->isGrouped();
             dbgassertex(inputGrouped == grouped); // std. lookup join expects these to match
         }
 
-        setPerformLocalLookup(false);
-        rhsCollated = false;
-        flushedRowMarkers.kill();
-
-        if (failoverToLocalLookupJoin)
-        {
-            if (needGlobal)
-                queryJob().queryRowManager()->addRowBuffer(this);
-        }
     }
     CATCH_NEXTROW()
     {
@@ -1856,9 +1972,9 @@ public:
             if (isSmart())
             {
                 msg.append("SmartJoin - ");
-                if (joinHelper)
+                if (isStandardJoin())
                     msg.append("Failed over to standard join");
-                else if (needGlobal && doPerformLocalLookup())
+                else if (needGlobal && isLocalLookup())
                     msg.append("Failed over to hash distributed local lookup join");
                 else
                     msg.append("All in memory lookup join");
@@ -1912,15 +2028,22 @@ public:
 // IBCastReceive
     virtual void bCastReceive(CSendItem *sendItem)
     {
-        if (sendItem)
+        if (0 != (sendItem->queryFlags() & bcastflag_spilt))
         {
-            if (0 != (sendItem->queryFlags() & bcastflag_spilt))
-            {
-                VStringBuffer msg("Notification that slave %d spilt", sendItem->queryOrigin());
-                clearAllNonLocalRows(msg.str());
-            }
+            VStringBuffer msg("Notification that slave %d spilt", sendItem->queryOrigin());
+            clearAllNonLocalRows(msg.str());
+        }
+        else if (0 != (sendItem->queryFlags() & bcastflag_standardjoin))
+        {
+            VStringBuffer msg("Notification that slave %d required standard join", sendItem->queryOrigin());
+            setStandardJoin(true);
         }
-        rowProcessor.addBlock(sendItem); // NB: NULL indicates end
+        if (bcast_stop == sendItem->queryCode())
+        {
+            sendItem->Release();
+            sendItem = NULL; // NB: NULL indicates end
+        }
+        rowProcessor.addBlock(sendItem);
     }
 // IBufferedRowCallback
     virtual unsigned getSpillCost() const
@@ -1932,27 +2055,47 @@ public:
         // NB: only installed if lookup join and global
         return clearAllNonLocalRows("Out of memory callback");
     }
-    virtual bool addLocalRHSRow(CThorSpillableRowArray &localRhsRows, const void *row)
+    virtual bool addRHSRow(CThorSpillableRowArray &rhsRows, const void *row)
     {
-        if (doPerformLocalLookup())
+        /* NB: If PARENT::addRHSRow fails, it will cause clearAllNonLocalRows() to have been triggered and localLookup to be set
+         * When all is done, a last pass is needed to clear out non-locals
+         */
+        if (!overflowWriteFile)
         {
+            if (!isLocalLookup() && PARENT::addRHSRow(rhsRows, row))
+                return true;
+            dbgassertex(isLocalLookup());
             // keep it only if it hashes to my node
             unsigned hv = rightHash->hash(row);
-            if ((myNode-1) == (hv % numNodes))
-                PARENT::addLocalRHSRow(localRhsRows, row);
-            // ok so switch tactics.
-            // clearAllNonLocalRows() will have cleared out non-locals by now
-            // Returning false here, will stop the broadcaster
+            if ((myNode-1) != (hv % numNodes))
+                return true; // throw away non-local row
+            if (PARENT::addRHSRow(rhsRows, row))
+                return true;
 
-            return false;
-        }
-        else
-        {
-            /* NB: It could still spill here, i.e. before appending a non-local row
-             * When all is done, a last pass is needed to clear out non-locals
+            /* Could OOM whilst still failing over to local lookup again, dealing with last row, or trailing
+             * few rows being received. Unlikely since all local rows will have been cleared, but possible,
+             * particularly if last rows end up causing row ptr table expansion here.
+             *
+             * Need to stash away somewhere to allow it continue.
              */
-            PARENT::addLocalRHSRow(localRhsRows, row);
+            CriticalBlock b(overflowCrit); // could be coming from broadcaster or receiver
+            if (!overflowWriteFile)
+            {
+                unsigned rwFlags = DEFAULT_RWFLAGS;
+                if (getOptBool(THOROPT_COMPRESS_SPILLS, true))
+                    rwFlags |= rw_compress;
+                StringBuffer tempFilename;
+                GetTempName(tempFilename, "lookup_local", true);
+                ActPrintLog("Overflowing RHS broadcast rows to spill file: %s", tempFilename.str());
+                OwnedIFile iFile = createIFile(tempFilename.str());
+                overflowWriteFile.setown(new CFileOwner(iFile.getLink()));
+                overflowWriteStream.setown(createRowWriter(iFile, queryRowInterfaces(rightITDL), rwFlags));
+            }
         }
+        ++overflowWriteCount;
+        LinkThorRow(row);
+        CriticalBlock b(overflowCrit); // could be coming from broadcaster or receiver
+        overflowWriteStream->putRow(row);
         return true;
     }
 };
@@ -2341,7 +2484,12 @@ public:
 // IBCastReceive
     virtual void bCastReceive(CSendItem *sendItem)
     {
-        rowProcessor.addBlock(sendItem); // NB: NULL indicates end
+        if (bcast_stop == sendItem->queryCode())
+        {
+            sendItem->Release();
+            sendItem = NULL; // NB: NULL indicates end
+        }
+        rowProcessor.addBlock(sendItem);
     }
 };
 

+ 60 - 28
thorlcr/thorutil/thmem.cpp

@@ -173,15 +173,15 @@ StringBuffer &getRecordString(const void *key, IOutputRowSerializer *serializer,
 
 //====
 
+// NB: rows are transferred into derivatives of CSpillableStreamBase and read or spilt, but are never written to
 class CSpillableStreamBase : public CSimpleInterface, implements roxiemem::IBufferedRowCallback
 {
 protected:
     CActivityBase &activity;
     IRowInterfaces *rowIf;
-    bool preserveNulls, ownsRows, useCompression;
+    bool preserveNulls, ownsRows, useCompression, spillPriority;
     CThorSpillableRowArray rows;
     OwnedIFile spillFile;
-    Owned<IRowStream> spillStream;
 
     bool spillRows()
     {
@@ -197,22 +197,26 @@ protected:
 
         VStringBuffer spillPrefixStr("SpillableStream(%d)", SPILL_PRIORITY_SPILLABLE_STREAM); // const for now
         rows.save(*spillFile, useCompression, spillPrefixStr.str()); // saves committed rows
+        rows.kill(); // no longer needed, readers will pull from spillFile. NB: ok to kill array as rows is never written to or expanded
         return true;
     }
-
+    void clearSpillingCallback()
+    {
+        activity.queryJob().queryRowManager()->removeRowBuffer(this);
+    }
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls)
-        : activity(_activity), rowIf(_rowIf), rows(_activity, _rowIf, _preserveNulls), preserveNulls(_preserveNulls)
+    CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriorirty)
+        : activity(_activity), rowIf(_rowIf), rows(_activity, _rowIf, _preserveNulls), preserveNulls(_preserveNulls), spillPriority(_spillPriorirty)
     {
+    	assertex(inRows.isFlushed());
         rows.swap(inRows);
         useCompression = false;
     }
     ~CSpillableStreamBase()
     {
-        activity.queryJob().queryRowManager()->removeRowBuffer(this);
-        spillStream.clear();
+        clearSpillingCallback();
         if (spillFile)
             spillFile->remove();
     }
@@ -220,7 +224,7 @@ public:
 // IBufferedRowCallback
     virtual unsigned getSpillCost() const
     {
-        return SPILL_PRIORITY_SPILLABLE_STREAM;
+        return spillPriority;
     }
     virtual bool freeBufferedRows(bool critical)
     {
@@ -262,8 +266,8 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
             CThorArrayLockBlock block(owner->rows);
             if (owner->spillFile) // i.e. has spilt
             {
+                owner->clearSpillingCallback();
                 assertex(((offset_t)-1) != outputOffset);
-                owner->rows.kill(); // no longer needed, frees pointer array
                 unsigned rwFlags = DEFAULT_RWFLAGS;
                 if (owner->preserveNulls)
                     rwFlags |= rw_grouped;
@@ -272,7 +276,10 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
                 return spillStream->nextRow();
             }
             else if (pos == owner->rows.numCommitted())
+            {
+                owner->clearSpillingCallback();
                 return NULL;
+            }
             return owner->rows.get(pos++);
         }
         virtual void stop() { }
@@ -291,8 +298,8 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CSharedSpillableRowSet(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls)
-        : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls)
+    CSharedSpillableRowSet(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriority)
+        : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls, _spillPriority)
     {
         activity.queryJob().queryRowManager()->addRowBuffer(this);
     }
@@ -303,6 +310,7 @@ public:
             CThorArrayLockBlock block(rows);
             if (spillFile)
             {
+                clearSpillingCallback();
                 unsigned rwFlags = DEFAULT_RWFLAGS;
                 if (preserveNulls)
                     rwFlags |= rw_grouped;
@@ -318,12 +326,13 @@ class CSpillableStream : public CSpillableStreamBase, implements IRowStream
 {
     rowidx_t pos, numReadRows, granularity;
     const void **readRows;
+    Owned<IRowStream> spillStream;
 
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CSpillableStream(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls)
-        : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls)
+    CSpillableStream(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriority)
+        : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls, _spillPriority)
     {
         useCompression = activity.getOptBool(THOROPT_COMPRESS_SPILLS, true);
         pos = numReadRows = 0;
@@ -336,6 +345,7 @@ public:
     }
     ~CSpillableStream()
     {
+        spillStream.clear();
         while (pos < numReadRows)
         {
             ReleaseThorRow(readRows[pos++]);
@@ -353,7 +363,7 @@ public:
             CThorArrayLockBlock block(rows);
             if (spillFile)
             {
-                rows.kill(); // no longer needed, frees pointer array
+                clearSpillingCallback();
                 unsigned rwFlags = DEFAULT_RWFLAGS;
                 if (preserveNulls)
                     rwFlags |= rw_grouped;
@@ -362,13 +372,17 @@ public:
                 spillStream.setown(createRowStream(spillFile, rowIf, rwFlags));
                 return spillStream->nextRow();
             }
-            rowidx_t fetch = rows.numCommitted();
-            if (0 == fetch)
+            rowidx_t available = rows.numCommitted();
+            if (0 == available)
                 return NULL;
-            if (fetch >= granularity)
-                fetch = granularity;
+            rowidx_t fetch = (available >= granularity) ? granularity : available;
             // consume 'fetch' rows
             rows.readBlock(readRows, fetch);
+            if (available == fetch)
+            {
+                clearSpillingCallback();
+                rows.kill();
+            }
             numReadRows = fetch;
             pos = 0;
         }
@@ -632,6 +646,7 @@ void CThorExpandingRowArray::compact()
         }
     }
     numRows = freeFinger-rows;
+    // JCSMORE - this may be a good time to call IRowManager::compactRows
 }
 
 void CThorExpandingRowArray::kill()
@@ -1321,6 +1336,13 @@ void CThorSpillableRowArray::transferFrom(CThorExpandingRowArray &src)
     commitRows = numRows;
 }
 
+void CThorSpillableRowArray::transferFrom(CThorSpillableRowArray &src)
+{
+    CThorArrayLockBlock block(*this);
+    CThorExpandingRowArray::transferFrom(src);
+    commitRows = numRows;
+}
+
 void CThorSpillableRowArray::swap(CThorSpillableRowArray &other)
 {
     CThorArrayLockBlock block(*this);
@@ -1365,9 +1387,10 @@ void CThorSpillableRowArray::transferRowsCopy(const void **outRows, bool takeOwn
     }
 }
 
-IRowStream *CThorSpillableRowArray::createRowStream()
+IRowStream *CThorSpillableRowArray::createRowStream(unsigned spillPriority)
 {
-    return new CSpillableStream(activity, *this, rowIf, allowNulls);
+    assertex(rowIf);
+    return new CSpillableStream(activity, *this, rowIf, allowNulls, spillPriority);
 }
 
 
@@ -1534,10 +1557,10 @@ protected:
                     return NULL;
                 }
                 if (!shared)
-                    instrms.append(*spillableRows.createRowStream()); // NB: stream will take ownership of rows in spillableRows
+                    instrms.append(*spillableRows.createRowStream(spillPriority)); // NB: stream will take ownership of rows in spillableRows
                 else
                 {
-                    spillableRowSet.setown(new CSharedSpillableRowSet(activity, spillableRows, rowIf, preserveGrouping));
+                    spillableRowSet.setown(new CSharedSpillableRowSet(activity, spillableRows, rowIf, preserveGrouping, spillPriority));
                     instrms.append(*spillableRowSet->createRowStream());
                 }
             }
@@ -1638,6 +1661,12 @@ public:
         spillableRows.transferFrom(src);
         enableSpillingCallback();
     }
+    virtual void transferRowsIn(CThorSpillableRowArray &src)
+    {
+        reset();
+        spillableRows.transferFrom(src);
+        enableSpillingCallback();
+    }
     virtual void setup(ICompare *_iCompare, StableSortFlag _stableSort, RowCollectorSpillFlags _diskMemMix, unsigned _spillPriority)
     {
         iCompare = _iCompare;
@@ -1678,9 +1707,10 @@ public:
 enum TRLGroupFlag { trl_ungroup, trl_preserveGrouping, trl_stopAtEog };
 class CThorRowLoader : public CThorRowCollectorBase, implements IThorRowLoader
 {
-    IRowStream *load(IRowStream *in, const bool &abort, TRLGroupFlag grouping, CThorExpandingRowArray *allMemRows, memsize_t *memUsage)
+    IRowStream *load(IRowStream *in, const bool &abort, TRLGroupFlag grouping, CThorExpandingRowArray *allMemRows, memsize_t *memUsage, bool doReset)
     {
-        reset();
+        if (doReset)
+            reset();
         enableSpillingCallback();
         setPreserveGrouping(trl_preserveGrouping == grouping);
         while (!abort)
@@ -1717,6 +1747,7 @@ public:
     virtual unsigned overflowScale() const { return CThorRowCollectorBase::overflowScale(); }
     virtual void transferRowsOut(CThorExpandingRowArray &dst, bool sort) { CThorRowCollectorBase::transferRowsOut(dst, sort); }
     virtual void transferRowsIn(CThorExpandingRowArray &src) { CThorRowCollectorBase::transferRowsIn(src); }
+    virtual void transferRowsIn(CThorSpillableRowArray &src) { CThorRowCollectorBase::transferRowsIn(src); }
     virtual void setup(ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50)
     {
         CThorRowCollectorBase::setup(iCompare, stableSort, diskMemMix, spillPriority);
@@ -1724,14 +1755,14 @@ public:
     virtual void ensure(rowidx_t max) { CThorRowCollectorBase::ensure(max); }
     virtual void setOptions(unsigned options)  { CThorRowCollectorBase::setOptions(options); }
 // IThorRowLoader
-    virtual IRowStream *load(IRowStream *in, const bool &abort, bool preserveGrouping, CThorExpandingRowArray *allMemRows, memsize_t *memUsage)
+    virtual IRowStream *load(IRowStream *in, const bool &abort, bool preserveGrouping, CThorExpandingRowArray *allMemRows, memsize_t *memUsage, bool doReset)
     {
         assertex(!iCompare || !preserveGrouping); // can't sort if group preserving
-        return load(in, abort, preserveGrouping?trl_preserveGrouping:trl_ungroup, allMemRows, memUsage);
+        return load(in, abort, preserveGrouping?trl_preserveGrouping:trl_ungroup, allMemRows, memUsage, doReset);
     }
-    virtual IRowStream *loadGroup(IRowStream *in, const bool &abort, CThorExpandingRowArray *allMemRows, memsize_t *memUsage)
+    virtual IRowStream *loadGroup(IRowStream *in, const bool &abort, CThorExpandingRowArray *allMemRows, memsize_t *memUsage, bool doReset)
     {
-        return load(in, abort, trl_stopAtEog, allMemRows, memUsage);
+        return load(in, abort, trl_stopAtEog, allMemRows, memUsage, doReset);
     }
 };
 
@@ -1767,6 +1798,7 @@ public:
     virtual unsigned overflowScale() const { return CThorRowCollectorBase::overflowScale(); }
     virtual void transferRowsOut(CThorExpandingRowArray &dst, bool sort) { CThorRowCollectorBase::transferRowsOut(dst, sort); }
     virtual void transferRowsIn(CThorExpandingRowArray &src) { CThorRowCollectorBase::transferRowsIn(src); }
+    virtual void transferRowsIn(CThorSpillableRowArray &src) { CThorRowCollectorBase::transferRowsIn(src); }
     virtual void setup(ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50)
     {
         CThorRowCollectorBase::setup(iCompare, stableSort, diskMemMix, spillPriority);

+ 6 - 3
thorlcr/thorutil/thmem.hpp

@@ -422,6 +422,7 @@ public:
     void kill();
     void compact();
     void flush();
+    inline bool isFlushed() const { return numRows == numCommitted(); }
     inline bool append(const void *row) __attribute__((warn_unused_result))
     {
         //GH->JCS Should this really be inline?
@@ -471,8 +472,9 @@ public:
         swap(from);
     }
     void transferFrom(CThorExpandingRowArray &src);
+    void transferFrom(CThorSpillableRowArray &src);
 
-    IRowStream *createRowStream();
+    IRowStream *createRowStream(unsigned spillPriority=SPILL_PRIORITY_SPILLABLE_STREAM);
 
     offset_t serializedSize()
     {
@@ -513,6 +515,7 @@ interface IThorRowCollectorCommon : extends IInterface
     virtual unsigned overflowScale() const = 0;
     virtual void transferRowsOut(CThorExpandingRowArray &dst, bool sort=true) = 0;
     virtual void transferRowsIn(CThorExpandingRowArray &src) = 0;
+    virtual void transferRowsIn(CThorSpillableRowArray &src) = 0;
     virtual void setup(ICompare *iCompare, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50) = 0;
     virtual void ensure(rowidx_t max) = 0;
     virtual void setOptions(unsigned options) = 0;
@@ -520,8 +523,8 @@ interface IThorRowCollectorCommon : extends IInterface
 
 interface IThorRowLoader : extends IThorRowCollectorCommon
 {
-    virtual IRowStream *load(IRowStream *in, const bool &abort, bool preserveGrouping=false, CThorExpandingRowArray *allMemRows=NULL, memsize_t *memUsage=NULL) = 0;
-    virtual IRowStream *loadGroup(IRowStream *in, const bool &abort, CThorExpandingRowArray *allMemRows=NULL, memsize_t *memUsage=NULL) = 0;
+    virtual IRowStream *load(IRowStream *in, const bool &abort, bool preserveGrouping=false, CThorExpandingRowArray *allMemRows=NULL, memsize_t *memUsage=NULL, bool doReset=true) = 0;
+    virtual IRowStream *loadGroup(IRowStream *in, const bool &abort, CThorExpandingRowArray *allMemRows=NULL, memsize_t *memUsage=NULL, bool doReset=true) = 0;
 };
 
 interface IThorRowCollector : extends IThorRowCollectorCommon