Jelajahi Sumber

HPCC-8245 - Smart Lookup Join

Implementation of Lookup Join that on OOM fails over to a
hash distributed lookup join, then if that OOM's, fails over to
a regular fully distributed(both sides) local join.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 12 tahun lalu
induk
melakukan
e40a95784a

+ 1 - 1
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -97,7 +97,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
     Owned<ISmartRowBuffer> piperd;
 
     /*
-     * CSendBucket - a collection of rows destinate for a particular destination target(slave)
+     * CSendBucket - a collection of rows destined for a particular destination target(slave)
      */
     class CSendBucket : public CSimpleInterface, implements IRowStream
     {

+ 24 - 2
thorlcr/activities/lookupjoin/thlookupjoin.cpp

@@ -22,14 +22,36 @@
 
 class CLookupJoinActivityMaster : public CMasterActivity
 {
+    mptag_t lhsDistributeTag, rhsDistributeTag;
+
 public:
     CLookupJoinActivityMaster(CMasterGraphElement * info) : CMasterActivity(info)
     {
-        mpTag = container.queryJob().allocateMPTag();
+        if (container.queryLocal())
+            lhsDistributeTag = rhsDistributeTag = TAG_NULL;
+        else
+        {
+	        mpTag = container.queryJob().allocateMPTag(); // NB: base takes ownership and free's
+            lhsDistributeTag = container.queryJob().allocateMPTag();
+            rhsDistributeTag = container.queryJob().allocateMPTag();
+        }
+    }
+    ~CLookupJoinActivityMaster()
+    {
+        if (!container.queryLocal())
+        {
+            container.queryJob().freeMPTag(lhsDistributeTag);
+            container.queryJob().freeMPTag(rhsDistributeTag);
+        }
     }
     void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
     {
-        dst.append((int)mpTag);
+        if (!container.queryLocal())
+        {
+            serializeMPtag(dst, mpTag);
+            serializeMPtag(dst, lhsDistributeTag);
+            serializeMPtag(dst, rhsDistributeTag);
+        }
     }
     void process()
     {

+ 484 - 136
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -28,6 +28,8 @@
 #include "jisem.hpp"
 
 #include "thorxmlwrite.hpp"
+#include "../hashdistrib/thhashdistribslave.ipp"
+#include "thsortu.hpp"
 
 #ifdef _DEBUG
 #define _TRACEBROADCAST
@@ -41,29 +43,38 @@ enum joinkind_t { join_lookup, join_all, denormalize_lookup, denormalize_all };
 #define MAX_QUEUE_BLOCKS 5
 
 enum broadcast_code { bcast_none, bcast_send, bcast_sendStopping, bcast_stop };
+enum broadcast_flags { bcastflag_spilt=0x01 };
+#define BROADCAST_CODE_MASK 0x00FF
+#define BROADCAST_FLAG_MASK 0xFF00
 class CSendItem : public CSimpleInterface
 {
     CMessageBuffer msg;
-    broadcast_code code;
+    unsigned info;
     unsigned origin, headerLen;
 public:
-    CSendItem(broadcast_code _code, unsigned _origin) : code(_code), origin(_origin)
+    CSendItem(broadcast_code _code, unsigned _origin) : info((unsigned)_code), origin(_origin)
     {
-        msg.append((unsigned)code);
+        msg.append(info);
         msg.append(origin);
         headerLen = msg.length();
     }
     CSendItem(CMessageBuffer &_msg)
     {
         msg.swapWith(_msg);
-        msg.read((unsigned &)code);
+        msg.read((unsigned &)info);
         msg.read(origin);
     }
     unsigned length() const { return msg.length(); }
     void reset() { msg.setLength(headerLen); }
     CMessageBuffer &queryMsg() { return msg; }
-    broadcast_code queryCode() const { return code; }
+    broadcast_code queryCode() const { return (broadcast_code)(info & BROADCAST_CODE_MASK); }
     unsigned queryOrigin() const { return origin; }
+    broadcast_flags queryFlags() const { return (broadcast_flags)((info & BROADCAST_FLAG_MASK)>>8); }
+    void setFlag(broadcast_flags _flag)
+    {
+        info = (info & ~BROADCAST_FLAG_MASK) | ((byte)_flag << 8);
+        msg.writeDirect(0, sizeof(info), &info); // update
+    }
 };
 
 
@@ -383,6 +394,10 @@ public:
             code = bcast_sendStopping;
         return new CSendItem(code, myNode);
     }
+    void resetSendItem(CSendItem *sendItem)
+    {
+        sendItem->reset();
+    }
     void waitReceiverDone()
     {
         {
@@ -419,12 +434,6 @@ public:
         broadcastToOthers(sendItem);
         return !allRequestStop;
     }
-    void final()
-    {
-        ActPrintLog(&activity, "CBroadcaster::final()");
-        Owned<CSendItem> sendItem = newSendItem(bcast_stop);
-        send(sendItem);
-    }
 };
 
 
@@ -436,6 +445,7 @@ public:
 
 */
 
+
 /*
  * The main activity class
  * It's intended to be used when the RHS globally, is small enough to fit within the memory of a single node.
@@ -444,7 +454,7 @@ public:
  * It also handles match conditions where there is no hard match (, ALL), in those cases no hash table is needed.
  * TODO: right outer/only joins
  */
-class CLookupJoinActivity : public CSlaveActivity, public CThorDataLink, implements ISmartBufferNotify, implements IBCastReceive
+class CLookupJoinActivity : public CSlaveActivity, public CThorDataLink, implements ISmartBufferNotify, implements IBCastReceive, implements roxiemem::IBufferedRowCallback
 {
     IHThorHashJoinArg *hashJoinHelper;
     IHThorAllJoinArg *allJoinHelper;
@@ -454,8 +464,8 @@ class CLookupJoinActivity : public CSlaveActivity, public CThorDataLink, impleme
     IHash *leftHash, *rightHash;
     ICompare *compareRight, *compareLeftRight;
 
-    Owned<IThorDataLink> right;
-    Owned<IThorDataLink> left;
+    IThorDataLink *leftITDL, *rightITDL;
+    Owned<IRowStream> left, right;
     Owned<IEngineRowAllocator> rightAllocator;
     Owned<IEngineRowAllocator> leftAllocator;
     Owned<IEngineRowAllocator> allocator;
@@ -477,6 +487,7 @@ class CLookupJoinActivity : public CSlaveActivity, public CThorDataLink, impleme
     const void *rhsNext;
     Owned<IOutputMetaData> outputMeta;
     rowcount_t rhsTotalCount;
+    mptag_t lhsDistributeTag, rhsDistributeTag;
 
     PointerArrayOf<CThorExpandingRowArray> rhsNodeRows;
     CBroadcaster broadcaster;
@@ -494,6 +505,16 @@ class CLookupJoinActivity : public CSlaveActivity, public CThorDataLink, impleme
     Semaphore gotOtherROs;
     bool waitForOtherRO, fuzzyMatch, returnMany, dedup;
 
+    // Handling failover to a) hashed local lookupjoin b) hash distributed standard join
+    CriticalSection localHashCrit;
+    bool spiltBroadcastingRHS, localLookupJoin;
+    bool failoverToLocalLookupJoin, failoverToStdJoin;
+    rank_t myNode;
+    unsigned numNodes;
+    Owned<IHashDistributor> lhsDistributor, rhsDistributor;
+    ICompare *compareLeft;
+    Owned<IJoinHelper> joinHelper;
+
     inline bool isLookup() { return (joinKind==join_lookup)||(joinKind==denormalize_lookup); }
     inline bool isAll() { return (joinKind==join_all)||(joinKind==denormalize_all); }
     inline bool isDenormalize() { return (joinKind==denormalize_all)||(joinKind==denormalize_lookup); }
@@ -622,7 +643,12 @@ public:
         returnMany = false;
         candidateMatches = 0;
         atMost = 0;
+        spiltBroadcastingRHS = localLookupJoin = false;
+        myNode = queryJob().queryMyRank();
+        numNodes = queryJob().querySlaves();
         needGlobal = !container.queryLocal() && (container.queryJob().querySlaves() > 1);
+        lhsDistributeTag = rhsDistributeTag = TAG_NULL;
+        failoverToLocalLookupJoin = failoverToStdJoin = false;
     }
     ~CLookupJoinActivity()
     {
@@ -633,13 +659,42 @@ public:
                 rows->Release();
         }
     }
-    void stopRightInput()
+    bool clearNonLocalRows(const char *msg)
     {
-        if (right)
+        rowidx_t clearedRows = 0;
         {
-            stopInput(right, "(R)");
-            right.clear();
+            CriticalBlock b(localHashCrit);
+            if (spiltBroadcastingRHS)
+                return false;
+            ActPrintLog("Clearing non-local rows - cause: %s", msg);
+            spiltBroadcastingRHS = true;
+            ForEachItemIn(a, rhsNodeRows)
+            {
+                CThorExpandingRowArray &rows = *rhsNodeRows.item(a);
+                ActPrintLog("pre-hash-clear - rows[%d] had %"RIPF"d rows", a, rows.ordinality());
+                rowidx_t numRows = rows.ordinality();
+                for (unsigned r=0; r<numRows; r++)
+                {
+                    unsigned hv = rightHash->hash(rows.query(r));
+                    if ((myNode-1) != (hv % numNodes))
+                    {
+                        OwnedConstThorRow row = rows.getClear(r); // dispose of
+                        ++clearedRows;
+                    }
+                }
+                rowidx_t chkCount = 0;
+                for (unsigned r=0; r<numRows; r++)
+                {
+                    if (NULL != rows.query(r))
+                        ++chkCount;
+                }
+                rows.compact();
+                ActPrintLog("post-hash-clear - rows[%d] has %"RIPF"d rows (chkCount=%"RIPF"d)", a, rows.ordinality(), chkCount);
+            }
         }
+
+        ActPrintLog("handleLowMem: clearedRows = %"RIPF"d", clearedRows);
+        return 0 != clearedRows;
     }
 
 // IThorSlaveActivity overloaded methods
@@ -684,6 +739,7 @@ public:
                 leftHash = hashJoinHelper->queryHashLeft();
                 rightHash = hashJoinHelper->queryHashRight();
                 compareRight = hashJoinHelper->queryCompareRight();
+                compareLeft = hashJoinHelper->queryCompareLeft();
                 compareLeftRight = hashJoinHelper->queryCompareLeftRight();
                 flags = hashJoinHelper->getJoinFlags();
                 if (JFmanylookup & flags)
@@ -696,6 +752,14 @@ public:
                 bool maySkip = 0 != (flags & JFtransformMaySkip);
                 dedup = compareRight && !maySkip && !fuzzyMatch && !returnMany;
 
+#if 0 // JCS->GH - Need new flag (HPCC-10068)
+                if (0 != (flags & JFnoorderedlhs))
+                {
+                    failoverToLocalLookupJoin = true;
+                    failoverToStdJoin = true;
+                }
+#endif
+
                 // code gen should spot invalid constants on KEEP with LOOKUP (without MANY)
                 break;
             }
@@ -724,7 +788,11 @@ public:
             joinType = JT_Inner;
 
         if (!container.queryLocal())
+        {
             mpTag = container.queryJob().deserializeMPTag(data);
+            lhsDistributeTag = container.queryJob().deserializeMPTag(data);
+            rhsDistributeTag = container.queryJob().deserializeMPTag(data);
+        }
 
         unsigned slaves = container.queryJob().querySlaves();
         rhsNodeRows.ensure(slaves);
@@ -752,26 +820,40 @@ public:
         assertex(inputs.ordinality() == 2);
         gotRHS = false;
         nextRhsRow = 0;
+        joined = 0;
+        leftMatch = false;
         rhsNext = NULL;
         candidateMatches = 0;
         rhsTotalCount = RCUNSET;
         htCount = htDedupCount = 0;
-        eos = false;
-        grouped = inputs.item(0)->isGrouped();
-        left.set(inputs.item(0));
+        rhsTableLen = 0;
+        rhsRows = RIUNSET;
+        eos = eog = someSinceEog = false;
+        leftITDL = inputs.item(0);
+        rightITDL = inputs.item(1);
+        grouped = leftITDL->isGrouped();
         allocator.set(queryRowAllocator());
-        leftAllocator.set(::queryRowAllocator(left));
-        outputMeta.set(left->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta());
-        left.setown(createDataLinkSmartBuffer(this,left,LOOKUPJOINL_SMART_BUFFER_SIZE,isSmartBufferSpillNeeded(left->queryFromActivity()),grouped,RCUNBOUND,this,false,&container.queryJob().queryIDiskUsage()));       
-        startInput(left);
-        right.set(inputs.item(1));
-        rightAllocator.set(::queryRowAllocator(right));
-        rightSerializer.set(::queryRowSerializer(right));
-        rightDeserializer.set(::queryRowDeserializer(right));
+        leftAllocator.set(::queryRowAllocator(leftITDL));
+        outputMeta.set(leftITDL->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta());
+        leftITDL = createDataLinkSmartBuffer(this,leftITDL,LOOKUPJOINL_SMART_BUFFER_SIZE,isSmartBufferSpillNeeded(leftITDL->queryFromActivity()),grouped,RCUNBOUND,this,false,&container.queryJob().queryIDiskUsage());
+        left.setown(leftITDL);
+        startInput(leftITDL);
+        right.set(rightITDL);
+        rightAllocator.set(::queryRowAllocator(rightITDL));
+        rightSerializer.set(::queryRowSerializer(rightITDL));
+        rightDeserializer.set(::queryRowDeserializer(rightITDL));
+
+        spiltBroadcastingRHS = localLookupJoin = false;
+
+        if (failoverToLocalLookupJoin && hashJoinHelper) // only for LOOKUP not ALL
+        {
+        	if (needGlobal)
+        		queryJob().queryRowManager()->addRowBuffer(this);
+        }
 
         try
         {
-            startInput(right); 
+            startInput(rightITDL);
         }
         catch (CATCHALL)
         {
@@ -828,18 +910,39 @@ public:
         cancelReceiveMsg(RANK_ALL, mpTag);
         broadcaster.cancel();
         rowProcessor.abort();
+        if (rhsDistributor)
+            rhsDistributor->abort();
+        if (lhsDistributor)
+            lhsDistributor->abort();
     }
     virtual void stop()
     {
-        if (!gotRHS)
-            getRHS(true);
+        if (!gotRHS && needGlobal)
+            getRHS(true); // If global, need to handle RHS until all are slaves stop
+
+        // JCS->GH - if in a child query, it would be to preserve RHS.. would need tip/flag from codegen that constant
         clearRHS();
-        stopRightInput();
-        stopInput(left);
-        dataLinkStop();
-        left.clear();
-        right.clear();
+
+        if (right)
+        {
+            stopInput(right, "(R)");
+            right.clear();
+        }
         broadcaster.reset();
+        stopInput(left, "(L)");
+        left.clear();
+        if (rhsDistributor)
+        {
+            rhsDistributor->disconnect(true);
+            rhsDistributor->join();
+        }
+        if (lhsDistributor)
+        {
+            lhsDistributor->disconnect(true);
+            lhsDistributor->join();
+        }
+        joinHelper.clear();
+        dataLinkStop();
     }
     inline bool match(const void *lhs, const void *rhsrow)
     {
@@ -967,16 +1070,25 @@ public:
         ActivityTimer t(totalCycles, timeActivities, NULL);
         if (!gotRHS)
             getRHS(false);
+        OwnedConstThorRow row;
+        if (joinHelper) // regular join (hash join)
+            row.setown(joinHelper->nextRow());
+        else
+            row.setown(lookupNextRow());
+        if (!row.get())
+            return NULL;
+        dataLinkIncrement();
+        return row.getClear();
+    }
+    const void *lookupNextRow()
+    {
         if (!abortSoon && !eos)
         {
             if (doRightOuter)
             {
                 OwnedConstThorRow row = handleRightOnly();
                 if (row)
-                {
-                    dataLinkIncrement();
                     return row.getClear();
-                }
                 return NULL;
             }
             loop
@@ -1036,7 +1148,6 @@ public:
                                                 if (transformedSize)
                                                 {
                                                     candidateMatches = 0;
-                                                    dataLinkIncrement();
                                                     return ret.finalizeRowClear(transformedSize);
                                                 }
                                             }
@@ -1070,10 +1181,7 @@ public:
                                 prepareRightOnly();
                                 OwnedConstThorRow row = handleRightOnly();
                                 if (row)
-                                {
-                                    dataLinkIncrement();
                                     return row.getClear();
-                                }
                             }
                             else
                                 eos = true;
@@ -1153,7 +1261,6 @@ public:
                     if (ret)
                     {
                         someSinceEog = true;
-                        dataLinkIncrement();
                         return ret.getClear();
                     }
                 }
@@ -1176,7 +1283,6 @@ public:
                                         rhsNext = NULL;
                                     else
                                         nextRhs();
-                                    dataLinkIncrement();
                                     return row.getClear();
                                 }
                             }
@@ -1191,7 +1297,6 @@ public:
                         if (row)
                         {
                             someSinceEog = true;
-                            dataLinkIncrement();
                             return row.getClear();
                         }
                     }
@@ -1230,49 +1335,85 @@ public:
                 h = 0;
         }
     }
-    // Add to HT if one has been created, otherwise to row array and HT will be created later
-    void addRow(const void *p)
-    {
-        if (rhsTableLen)
-            addRowHt(p);
-        else
-            rhs.append(p);
-    }
-    void sendRHS() // broadcasting local rhs
+    void broadcastRHS() // broadcasting local rhs
     {
+        bool stopRHSBroadcast = false;
+        Owned<CSendItem> sendItem = broadcaster.newSendItem(bcast_send);
+        MemoryBuffer mb;
         try
         {
             CThorExpandingRowArray &localRhsRows = *rhsNodeRows.item(queryJob().queryMyRank()-1);
-            Owned<CSendItem> sendItem = broadcaster.newSendItem(bcast_send);
-            MemoryBuffer mb;
-            CMemoryRowSerializer mbs(mb);
+            CMemoryRowSerializer mbser(mb);
             while (!abortSoon)
             {
                 while (!abortSoon)
                 {
                     OwnedConstThorRow row = right->ungroupedNextRow();
                     if (!row)
+                    {
+                        ActPrintLog("broadcasting last row (!row)");
                         break;
-                    localRhsRows.append(row.getLink());
-                    rightSerializer->serialize(mbs, (const byte *)row.get());
-                    if (mb.length() >= MAX_SEND_SIZE)
+                    }
+
+                    {
+                        CriticalBlock b(localHashCrit);
+                        if (spiltBroadcastingRHS)
+                        {
+                            // keep it only if it hashes to my node
+                            unsigned hv = rightHash->hash(row.get());
+                            if ((myNode-1) == (hv % numNodes))
+                                localRhsRows.append(row.getLink());
+
+                            // ok so switch tactics.
+                            // clearNonLocalRows() will have cleared out non-locals by now
+                            // but I may be half way through serializing rows here, which are mixed and this row
+                            // may still need to be sent.
+                            // The destination rowProcessor will take care of any that need post-filtering,
+                            // so ensure last buffer is sent, below before exiting broadcastRHS broadcast
+
+                            stopRHSBroadcast = true;
+                        }
+                        else
+                            localRhsRows.append(row.getLink());
+                    }
+
+                    rightSerializer->serialize(mbser, (const byte *)row.get());
+                    if (mb.length() >= MAX_SEND_SIZE || stopRHSBroadcast)
                         break;
                 }
                 if (0 == mb.length())
+                {
+                    ActPrintLog("Stopped broadcasting because 0 == mb.length()");
                     break;
+                }
+                if (stopRHSBroadcast)
+                    sendItem->setFlag(bcastflag_spilt);
                 ThorCompress(mb, sendItem->queryMsg());
                 if (!broadcaster.send(sendItem))
+                {
+                    ActPrintLog("Stopped broadcasting because !broadcaster.send(sendItem)");
                     break;
+                }
+                if (stopRHSBroadcast)
+                {
+                    ActPrintLog("Stopped broadcasting because stopRHSBroadcast");
+                    break;
+                }
                 mb.clear();
-                sendItem->reset();
+                broadcaster.resetSendItem(sendItem);
             }
         }
         catch (IException *e)
         {
-            ActPrintLog(e, "CLookupJoinActivity::sendRHS: exception");
+            ActPrintLog(e, "CLookupJoinActivity::broadcastRHS: exception");
             throw;
         }
-        broadcaster.final(); // signal stop to others
+
+        sendItem.setown(broadcaster.newSendItem(bcast_stop));
+        if (stopRHSBroadcast)
+            sendItem->setFlag(bcastflag_spilt);
+        ActPrintLog("Sending final RHS broadcast packet");
+        broadcaster.send(sendItem); // signals stop to others
     }
     void processRHSRows(unsigned slave, MemoryBuffer &mb)
     {
@@ -1283,17 +1424,61 @@ public:
         {
             size32_t sz = rightDeserializer->deserialize(rowBuilder, memDeserializer);
             OwnedConstThorRow fRow = rowBuilder.finalizeRowClear(sz);
-            rows.append(fRow.getClear());
+            CriticalBlock b(localHashCrit);
+            if (spiltBroadcastingRHS)
+            {
+                /* NB: recvLoop should be winding down, a slave signal spilt and communicated to all
+                 * So these will be the last few broadcast rows, when broadcaster is complete, the rest will be hash distributed
+                 */
+
+                // hash row and discard unless for this node
+                unsigned hv = rightHash->hash(fRow.get());
+                if ((myNode-1) == (hv % numNodes))
+                    rows.append(fRow.getClear());
+            }
+            else
+                rows.append(fRow.getClear());
+        }
+    }
+    void setupDistributors()
+    {
+        if (!rhsDistributor)
+        {
+            rhsDistributor.setown(createHashDistributor(this, queryJob().queryJobComm(), rhsDistributeTag, false, NULL));
+            right.setown(rhsDistributor->connect(queryRowInterfaces(rightITDL), right.getClear(), rightHash, NULL));
+            lhsDistributor.setown(createHashDistributor(this, queryJob().queryJobComm(), lhsDistributeTag, false, NULL));
+            left.setown(lhsDistributor->connect(queryRowInterfaces(leftITDL), left.getClear(), leftHash, NULL));
         }
     }
     void getRHS(bool stopping)
     {
+/*
+ * This handles LOOKUP and ALL, but most of the complexity is for LOOKUP handling OOM
+ * Global LOOKUP:
+ * 1) distributes RHS (using broadcaster)
+ * 2) sizes the hash table
+ * 3) If there is no OOM event, it is done and the RHS hash table is built.
+ *    ELSE -
+ * 4) If during 1) or 2) an OOM event occurs, all other slaves are notified.
+ *    If in the middle of broadcasting, it will stop sending RHS
+ *    The spill event will flush out all rows that do not hash to local slave
+ * 5) Hash distributor streams are setup for the [remaining] RHS and unread LHS.
+ * 6) The broadcast rows + the remaining RHS distributed stream are consumed into a single row array.
+ * 7) When done if it has not spilt, the RHS hash table is sized.
+ * 8) If there is no OOM event, the RHS is done and the RHS hash table is built
+ *    The distributed LHS stream is used to perform a local lookup join.
+ *    ELSE -
+ * 9) If during 6) or 7) an OOM event occurs, the stream loader, will spill and sort as necessary.
+ * 10) The LHS side is loaded and spilt and sorted if necessary
+ * 11) A regular join helper is created to perform a local join against the two hash distributed sorted sides.
+ */
+
         if (gotRHS)
             return;
         gotRHS = true;
         // if input counts known, get global aggregate and pre-allocate HT
         ThorDataLinkMetaInfo rightMeta;
-        right->getMetaInfo(rightMeta);
+        rightITDL->getMetaInfo(rightMeta);
         if (rightMeta.totalRowsMin == rightMeta.totalRowsMax)
             rhsTotalCount = rightMeta.totalRowsMax;
         if (needGlobal)
@@ -1305,99 +1490,244 @@ public:
                 return;
             msg.read(rhsTotalCount);
         }
-        if (RCUNSET==rhsTotalCount)
-            rhsTableLen = 0; // set later after gather
-        else
-        {
+		if (needGlobal)
+		{
+			rowProcessor.start();
+			broadcaster.start(this, mpTag, stopping);
+			broadcastRHS();
+			broadcaster.end();
+			rowProcessor.wait();
+
+			if (stopping)
+			{
+			    queryJob().queryRowManager()->removeRowBuffer(this);
+			    return;
+			}
+
+            rhsRows = 0;
+            {
+                CriticalBlock b(localHashCrit);
+                ForEachItemIn(a, rhsNodeRows)
+                {
+                    CThorExpandingRowArray &rows = *rhsNodeRows.item(a);
+                    rhsRows += rows.ordinality();
+                }
+            }
+
             if (isLookup())
             {
-                rhsTableLen = getHTSize(rhsTotalCount);
-                ht.ensure(rhsTableLen);
+                rhsTableLen = getHTSize(rhsRows);
+                // NB: This sizing could cause spilling callback to be triggered
+                ht.ensure(rhsTableLen); // Pessimistic if LOOKUP,KEEP(1)
+                /* JCSMORE - failure to size should not be failure condition
+                 * It will mark spiltBroadcastingRHS and try to degrade
+                 * JCS->GH: However, need to catch OOM somehow..
+                 */
                 ht.clearUnused();
-                // NB: 'rhs' row array will not be used
             }
             else
+                rhs.ensure(rhsRows);
+
+			/* NB: Potentially one of the slaves spilt late after broadcast and rowprocessor finished
+			 * Need to remove spill callback and broadcast one last message to know.
+			 */
+
+			queryJob().queryRowManager()->removeRowBuffer(this);
+
+			broadcaster.reset();
+			broadcaster.start(this, mpTag, false);
+	        Owned<CSendItem> sendItem = broadcaster.newSendItem(bcast_stop);
+	        if (spiltBroadcastingRHS)
+	            sendItem->setFlag(bcastflag_spilt);
+	        ActPrintLog("Sending final RHS broadcast packet");
+	        broadcaster.send(sendItem); // signals stop to others
+			broadcaster.end();
+
+			/* All slaves now know whether any one spilt or not, i.e. whether to perform local hash join or not
+			 * If any have, still need to distribute rest of RHS..
+			 */
+
+			if (spiltBroadcastingRHS) // NB: Can only be active for LOOKUP (not ALL)
+			{
+                ActPrintLog("Spilt whilst broadcasting, will attempt distribute local lookup join");
+                localLookupJoin = true;
+
+                // NB: lhs ordering and grouping lost from here on..
+                // JCS->GH - I hope it never hits this.. i.e. you prevent such forms being generated..
+                if (grouped)
+                    throw MakeActivityException(this, 0, "Degraded to distributed lookup join, LHS order cannot be preserved");
+
+                // If HT sized already (due to total from meta) and now spilt, too big clear and size later
+			    ht.kill();
+
+				setupDistributors();
+
+                IArrayOf<IRowStream> streams;
+                streams.append(*right.getLink()); // what remains of 'right' will be read through distributor
+                ForEachItemIn(a, rhsNodeRows)
+                {
+                    CThorExpandingRowArray &rowArray = *rhsNodeRows.item(a);
+                    streams.append(*rowArray.createRowStream(0, (rowidx_t)-1, true, true)); // NB: will kill array when stream exhausted
+                }
+                right.setown(createConcatRowStream(streams.ordinality(), streams.getArray()));
+			}
+			else
+			{
+	            if (rhsTotalCount != RCUNSET) // verify matches meta if set/calculated (and haven't spilt)
+	                assertex(rhsRows == rhsTotalCount);
+			}
+		}
+		else
+		{
+            if (isLookup())
+                localLookupJoin = true;
+		    else
+		    {
+	            if (RCUNSET != rhsTotalCount)
+	            {
+	                rhsRows = (rowidx_t)rhsTotalCount;
+	                rhs.ensure(rhsRows);
+	            }
+		        // local ALL join, must fit into memory
+	            while (!abortSoon)
+	            {
+	                OwnedConstThorRow row = right->ungroupedNextRow();
+	                if (!row)
+	                    break;
+	                rhs.append(row.getClear());
+	            }
+	            if (RIUNSET == rhsRows)
+	                rhsRows = rhs.ordinality();
+		    }
+		}
+
+		if (localLookupJoin) // NB: Can only be active for LOOKUP (not ALL)
+		{
+		    Owned<IThorRowLoader> rowLoader;
+		    if (failoverToStdJoin)
+		    {
+		        rowLoader.setown(createThorRowLoader(*this, queryRowInterfaces(rightITDL), compareRight, false, rc_mixed, SPILL_PRIORITY_LOOKUPJOIN));
+                rowLoader->setOptions(rcflag_noAllInMemSort); // If fits into memory, don't want it sorted
+		    }
+		    else
+		    {
+		        // i.e. will fire OOM if runs out of memory loading local right
+                rowLoader.setown(createThorRowLoader(*this, queryRowInterfaces(rightITDL), compareRight, false, rc_allMem, SPILL_PRIORITY_DISABLE));
+		    }
+            Owned<IRowStream> rightStream = rowLoader->load(right, abortSoon, false, &rhs);
+
+            if (!rightStream)
             {
-                rhsTableLen = 0;
-                rhs.ensure((rowidx_t)rhsTotalCount);
+                ActPrintLog("RHS local rows fitted in memory, count: %"RIPF"d", rhs.ordinality());
+                // all fitted in memory, rows were transferred out back into 'rhs'
+                // Will be unsorted because of rcflag_noAllInMemSort
+
+                /* Now need to size HT.
+                 * transfer rows back into a spillable container
+                 * If HT sizing DOESN'T cause spill, then, row will be transferred back into 'rhs'
+                 * If HT sizing DOES cause spill, sorted rightStream will be created.
+                 */
+
+                rowLoader.clear();
+                Owned<IThorRowCollector> collector = createThorRowCollector(*this, queryRowInterfaces(rightITDL), compareRight,false, rc_mixed, SPILL_PRIORITY_LOOKUPJOIN);
+                collector->setOptions(rcflag_noAllInMemSort); // If fits into memory, don't want it sorted
+                rhsRows = rhs.ordinality();
+                collector->transferRowsIn(rhs);
+                rhsTableLen = getHTSize(rhsRows);
+
+                // could cause spilling of 'rhs'
+                ht.ensure(rhsTableLen); // Pessimistic if LOOKUP,KEEP(1)
+                /* JCSMORE - failure to size should not be failure condition
+                 * If it failed, the 'collector' will have spilt and it will not need HT
+                 * JCS->GH: However, need to catch OOM somehow..
+                 */
+
+                ht.clearUnused();
+                rightStream.setown(collector->getStream(false, &rhs));
             }
-        }
-        if (needGlobal)
-        {
-            rowProcessor.start();
-            broadcaster.start(this, mpTag, stopping);
-            sendRHS();
-            broadcaster.end();
-            rowProcessor.wait();
-        }
-        else if (!stopping)
-        {
-            while (!abortSoon)
+            if (rightStream) // NB: returned stream, implies spilt AND sorted, if not, 'rhs' is filled
             {
-                OwnedConstThorRow row = right->ungroupedNextRow();
-                if (!row)
-                    break;
-                addRow(row.getClear());
+                ht.kill(); // no longer needed
+
+                ActPrintLog("RHS spilt to disk. Standard Join will be used.");
+                ActPrintLog("Loading/Sorting LHS");
+
+                // NB: lhs ordering and grouping lost from here on.. (will have been caught earlier if global)
+                // JCS->GH - I hope it never hits this.. i.e. you prevent such a form being generated..
+                if (grouped)
+                    throw MakeActivityException(this, 0, "Degraded to standard join, LHS order cannot be preserved");
+
+                rowLoader.setown(createThorRowLoader(*this, queryRowInterfaces(leftITDL), compareLeft));
+                left.setown(rowLoader->load(left, abortSoon, false));
+                leftITDL = inputs.item(0); // reset
+                ActPrintLog("LHS loaded/sorted");
+
+                // rightStream is sorted
+                // so now going to do a std. join on distributed sorted streams
+                switch(container.getKind())
+                {
+                    case TAKlookupjoin:
+                    {
+                        // JCS->GH - are you going to generate JFreorderable flag?
+                        bool hintunsortedoutput = getOptBool(THOROPT_UNSORTED_OUTPUT, JFreorderable & flags);
+                        bool hintparallelmatch = getOptBool(THOROPT_PARALLEL_MATCH, hintunsortedoutput); // i.e. unsorted, implies use parallel by default, otherwise no point
+                        joinHelper.setown(createJoinHelper(*this, hashJoinHelper, this, hintparallelmatch, hintunsortedoutput));
+                        break;
+                    }
+                    case TAKlookupdenormalize:
+                    case TAKlookupdenormalizegroup:
+                        joinHelper.setown(createDenormalizeHelper(*this, hashJoinHelper, this));
+                        break;
+                    default:
+                        throwUnexpected();
+                }
+                joinHelper->init(left, rightStream, leftAllocator, rightAllocator, ::queryRowMetaData(leftITDL), &abortSoon);
+                return;
             }
-        }
-        if (!stopping)
-            prepareRHS();
+		}
+		if (!stopping)
+			prepareRHS();
     }
     void prepareRHS()
     {
-        if (needGlobal)
+        // NB: this method is not used if we've failed over to a regular join in getRHS()
+
+        // JCSMORE - would be nice to make this multi-core, clashes and compares can be expensive
+        if (localLookupJoin)
         {
-            rowidx_t maxRows = 0;
-            ForEachItemIn(a, rhsNodeRows)
+            // If got this far, without turning into a standard fully distributed join, then all rows are in rhs
+            if (isLookup()) // if isAll(), want to leave them in rhs as is.
             {
-                CThorExpandingRowArray &rows = *rhsNodeRows.item(a);
-                maxRows += rows.ordinality();
-            }
-            if (rhsTotalCount != RCUNSET)
-            { // ht pre-expanded already
-                assertex(maxRows == rhsTotalCount);
+                rowidx_t r=0;
+                for (; r<rhs.ordinality(); r++)
+                    addRowHt(rhs.getClear(r));
+                rhs.kill(); // free up ptr table asap
             }
-            rhsRows = maxRows;
         }
-        else // local
-        {
-            if (RCUNSET != rhsTotalCount)
-                rhsRows = (rowidx_t)rhsTotalCount;
-            else // all join, or lookup if total count unkown
-                rhsRows = rhs.ordinality();
-        }
-        if (RCUNSET == rhsTotalCount) //NB: if rhsTotalCount known, will have been sized earlier
+        else if (needGlobal)
         {
+            // If global and !localLookupJoin, then rows are in 'rhsNodeRows' arrays
             if (isAll())
             {
-                if (needGlobal) // otherwise (local), it expanded as rows added
-                    rhs.ensure(rhsRows);
+                ForEachItemIn(a2, rhsNodeRows)
+                {
+                    CThorExpandingRowArray &rows = *rhsNodeRows.item(a2);
+                    rowidx_t r=0;
+                    for (; r<rows.ordinality(); r++)
+                        rhs.append(rows.getClear(r));
+                    rows.kill(); // free up ptr table asap
+                }
             }
             else
             {
-                rhsTableLen = getHTSize(rhsRows);
-                ht.ensure(rhsTableLen); // Pessimistic if LOOKUP,KEEP(1)
-                ht.clearUnused();
-                if (!needGlobal)
+                ForEachItemIn(a2, rhsNodeRows)
                 {
+                    CThorExpandingRowArray &rows = *rhsNodeRows.item(a2);
                     rowidx_t r=0;
-                    for (; r<rhs.ordinality(); r++)
-                        addRowHt(rhs.getClear(r));
-                    rhs.kill(); // free up ptr table asap
+                    for (; r<rows.ordinality(); r++)
+                        addRowHt(rows.getClear(r));
+                    rows.kill(); // free up ptr table asap
                 }
-                // else built up from rhsNodeRows
-            }
-        }
-        if (needGlobal)
-        {
-            // JCSMORE - would be nice to make this multi-core, clashes and compares can be expensive
-            ForEachItemIn(a2, rhsNodeRows)
-            {
-                CThorExpandingRowArray &rows = *rhsNodeRows.item(a2);
-                rowidx_t r=0;
-                for (; r<rows.ordinality(); r++)
-                    addRow(rows.getClear(r));
-                rows.kill(); // free up ptr table asap
             }
         }
         ActPrintLog("rhs table: %d elements", rhsRows);
@@ -1412,7 +1742,25 @@ public:
 // IBCastReceive
     virtual void bCastReceive(CSendItem *sendItem)
     {
-        rowProcessor.addBlock(sendItem);
+        if (sendItem)
+        {
+            if (0 != (sendItem->queryFlags() & bcastflag_spilt))
+            {
+                VStringBuffer msg("Notification that slave %d spilt", sendItem->queryOrigin());
+                clearNonLocalRows(msg.str());
+            }
+        }
+        rowProcessor.addBlock(sendItem); // NB: NULL indicates end
+    }
+// IBufferedRowCallback
+    virtual unsigned getPriority() const
+    {
+        return SPILL_PRIORITY_LOOKUPJOIN;
+    }
+    virtual bool freeBufferedRows(bool critical)
+    {
+    	// NB: only installed if lookup join and global
+        return clearNonLocalRows("Out of memory callback");
     }
 };
 

+ 12 - 0
thorlcr/activities/msort/thsortu.cpp

@@ -569,6 +569,8 @@ public:
                         }
                         case TAKdenormalizegroup:
                         case TAKhashdenormalizegroup:
+                        case TAKlookupdenormalize:
+                        case TAKlookupdenormalizegroup:
                             assertex(!denormRows.ordinality());
                             do {
                                 denormRows.append(nextright.getLink());
@@ -582,6 +584,7 @@ public:
                         case TAKhashjoin:
                         case TAKselfjoin:
                         case TAKselfjoinlight:
+                        case TAKlookupjoin:
                             gotsz = helper->transform(ret, defaultLeft, nextright);
                             nextR();
                             break;
@@ -614,6 +617,8 @@ public:
                     }
                     case TAKdenormalizegroup:
                     case TAKhashdenormalizegroup:
+                    case TAKlookupdenormalize:
+                    case TAKlookupdenormalizegroup:
 
                         assertex(!denormRows.ordinality());
                         do {
@@ -633,6 +638,7 @@ public:
                     case TAKhashjoin:
                     case TAKselfjoin:
                     case TAKselfjoinlight:
+                    case TAKlookupjoin:
                         if (!rightgroupmatched[rightidx]) 
                             gotsz = helper->transform(ret, defaultLeft, rightgroup.query(rightidx));
                         rightidx++;
@@ -652,12 +658,15 @@ public:
                         break;
                     case TAKdenormalizegroup:
                     case TAKhashdenormalizegroup:
+                    case TAKlookupdenormalize:
+                    case TAKlookupdenormalizegroup:
                         gotsz = helper->transform(ret, nextleft, NULL, 0, (const void **)NULL);
                         break;
                     case TAKjoin:
                     case TAKhashjoin:
                     case TAKselfjoin:
                     case TAKselfjoinlight:
+                    case TAKlookupjoin:
                         gotsz = helper->transform(ret, nextleft, defaultRight);
                         break;
                     default:
@@ -704,6 +713,8 @@ public:
                         }
                         case TAKdenormalizegroup:
                         case TAKhashdenormalizegroup:
+                        case TAKlookupdenormalize:
+                        case TAKlookupdenormalizegroup:
                         {
                             const void *rhsRow = rightgroup.query(rightidx);
                             LinkThorRow(rhsRow);
@@ -715,6 +726,7 @@ public:
                         case TAKhashjoin:
                         case TAKselfjoin:
                         case TAKselfjoinlight:
+                        case TAKlookupjoin:
                             gotsz = helper->transform(ret,nextleft,rightgroup.query(rightidx));
                             break;
                         default:

+ 0 - 3
thorlcr/activities/thactivityutil.ipp

@@ -38,9 +38,6 @@
 #define OUTPUT_RECORDSIZE
 
 
-//void startInput(CActivityBase *activity, IThorDataLink * i, const char *extra=NULL);
-//void stopInput(IThorDataLink * i, const char * activityName = NULL, activity_id activitiyId = 0);
-
 class CPartHandler : public CSimpleInterface, implements IRowStream
 {
 public:

+ 1 - 1
thorlcr/graph/thgraphslave.cpp

@@ -163,7 +163,7 @@ void CSlaveActivity::startInput(IThorDataLink *itdl, const char *extra)
 #endif
 }
 
-void CSlaveActivity::stopInput(IThorDataLink *itdl, const char *extra)
+void CSlaveActivity::stopInput(IRowStream *itdl, const char *extra)
 {
     StringBuffer s("Stopping input for");
     if (extra)

+ 1 - 1
thorlcr/graph/thgraphslave.hpp

@@ -66,7 +66,7 @@ public:
     void appendOutput(IThorDataLink *itdl) { outputs.append(itdl); };
     void appendOutputLinked(IThorDataLink *itdl) { itdl->Link(); appendOutput(itdl); };
     void startInput(IThorDataLink *itdl, const char *extra=NULL);
-    void stopInput(IThorDataLink *itdl, const char *extra=NULL);
+    void stopInput(IRowStream *itdl, const char *extra=NULL);
 
     unsigned __int64 &getTotalCyclesRef() { return totalCycles; }
     unsigned __int64 queryLocalCycles() const;

+ 1 - 0
thorlcr/shared/thor.hpp

@@ -35,6 +35,7 @@ typedef size32_t rowidx_t;
 #define RCIDXMAX ((rowidx_t)(size32_t)-1)
 #define RIPF ""
 #define RIMAX ((rowidx_t)-1)
+#define RIUNSET RIMAX
 
 #include "jexcept.hpp"
 

+ 90 - 23
thorlcr/thorutil/thmem.cpp

@@ -577,6 +577,41 @@ void CThorExpandingRowArray::clearRows()
     numRows = 0;
 }
 
+void CThorExpandingRowArray::compact()
+{
+    const void **freeFinger = rows;
+    const void **filledFinger = NULL;
+    const void **rowEnd = rows+numRows;
+    rowidx_t newCount = 0;
+    while (freeFinger != rowEnd)
+    {
+        if (NULL == *freeFinger)
+        {
+            if (!filledFinger)
+                filledFinger = freeFinger+1;
+            while (filledFinger != rowEnd)
+            {
+                if (*filledFinger)
+                {
+                    ++newCount;
+                    *freeFinger = *filledFinger;
+                    *filledFinger = NULL; // if !sparse, would prob. be better to memset at end
+                    ++filledFinger;
+                    break;
+                }
+                ++filledFinger;
+            }
+            if (filledFinger == rowEnd) // no more filled elements, so stop
+                break;
+        }
+        else
+            ++newCount;
+        ++freeFinger;
+    }
+    numRows = newCount;
+    // should this [optionally] shrink the row array too?
+}
+
 void CThorExpandingRowArray::kill()
 {
     clearRows();
@@ -789,19 +824,19 @@ bool CThorExpandingRowArray::checkSorted(ICompare *icmp)
     return true;
 }
 
-IRowStream *CThorExpandingRowArray::createRowStream(rowidx_t start, rowidx_t num, bool streamOwns)
+IRowStream *CThorExpandingRowArray::createRowStream(rowidx_t start, rowidx_t num, bool streamOwns, bool clearAtEos)
 {
     class CStream : public CSimpleInterface, implements IRowStream
     {
         CThorExpandingRowArray &parent;
         rowidx_t pos, lastRow;
-        bool owns;
+        bool owns, clearAtEos;
 
     public:
         IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-        CStream(CThorExpandingRowArray &_parent, rowidx_t firstRow, rowidx_t _lastRow, bool _owns)
-            : parent(_parent), pos(firstRow), lastRow(_lastRow), owns(_owns)
+        CStream(CThorExpandingRowArray &_parent, rowidx_t firstRow, rowidx_t _lastRow, bool _owns, bool _clearAtEos)
+            : parent(_parent), pos(firstRow), lastRow(_lastRow), owns(_owns), clearAtEos(_clearAtEos)
         {
         }
 
@@ -809,13 +844,21 @@ IRowStream *CThorExpandingRowArray::createRowStream(rowidx_t start, rowidx_t num
         virtual const void *nextRow()
         {
             if (pos >= lastRow)
+            {
+                if (owns && clearAtEos)
+                    parent.kill();
                 return NULL;
+            }
             if (owns)
                 return parent.getClear(pos++);
             else
                 return parent.get(pos++);
         }
-        virtual void stop() { }
+        virtual void stop()
+        {
+            if (owns && clearAtEos)
+                parent.kill();
+        }
     };
 
     if (start>ordinality())
@@ -826,7 +869,7 @@ IRowStream *CThorExpandingRowArray::createRowStream(rowidx_t start, rowidx_t num
     else
         lastRow = start+num;
 
-    return new CStream(*this, start, lastRow, streamOwns);
+    return new CStream(*this, start, lastRow, streamOwns, clearAtEos);
 }
 
 void CThorExpandingRowArray::partition(ICompare &compare, unsigned num, UnsignedArray &out)
@@ -1216,7 +1259,7 @@ protected:
     CThorSpillableRowArray spillableRows;
     PointerIArrayOf<CFileOwner> spillFiles;
     Owned<IOutputRowSerializer> serializer;
-    RowCollectorFlags diskMemMix;
+    RowCollectorSpillFlags diskMemMix;
     rowcount_t totalRows;
     unsigned spillPriority;
     unsigned overflowCount;
@@ -1228,6 +1271,7 @@ protected:
     SpinLock readerLock;
     bool mmRegistered;
     Owned<CSharedSpillableRowSet> spillableRowSet;
+    unsigned options;
 
     bool spillRows()
     {
@@ -1347,9 +1391,16 @@ protected:
             {
                 totalRows += spillableRows.numCommitted();
                 if (iCompare && (1 == outStreams))
-                    spillableRows.sort(*iCompare, maxCores);
-                // NB: if rc_allDiskOrAllMem and some disk already, will have been spilt already (see above) and not reach here
-                if (rc_allDiskOrAllMem == diskMemMix || (NULL!=allMemRows && (rc_allMem == diskMemMix)))
+                {
+                    // Option(rcflag_noAllInMemSort) - avoid sorting allMemRows
+                    if ((NULL == allMemRows) || (0 == (options & rcflag_noAllInMemSort)))
+                        spillableRows.sort(*iCompare, maxCores);
+                }
+
+                if ((rc_allDiskOrAllMem == diskMemMix) || // must supply allMemRows, only here if no spilling (see above)
+                    (NULL!=allMemRows && (rc_allMem == diskMemMix)) ||
+                    (NULL!=allMemRows && (rc_mixed == diskMemMix) && 0 == overflowCount) // if allMemRows given, only if no spilling
+                   )
                 {
                     assertex(allMemRows);
                     assertex(1 == outStreams);
@@ -1396,8 +1447,16 @@ protected:
             mmRegistered = false;
         }
     }
+    void enableSpillingCallback()
+    {
+        if (!mmRegistered && spillingEnabled())
+        {
+            activity.queryJob().queryRowManager()->addRowBuffer(this);
+            mmRegistered = true;
+        }
+    }
 public:
-    CThorRowCollectorBase(CActivityBase &_activity, IRowInterfaces *_rowIf, ICompare *_iCompare, bool _isStable, RowCollectorFlags _diskMemMix, unsigned _spillPriority)
+    CThorRowCollectorBase(CActivityBase &_activity, IRowInterfaces *_rowIf, ICompare *_iCompare, bool _isStable, RowCollectorSpillFlags _diskMemMix, unsigned _spillPriority)
         : activity(_activity),
           rowIf(_rowIf), iCompare(_iCompare), isStable(_isStable), diskMemMix(_diskMemMix), spillPriority(_spillPriority),
           spillableRows(_activity, _rowIf)
@@ -1414,7 +1473,7 @@ public:
             mmRegistered = true;
         }
         maxCores = activity.queryMaxCores();
-
+        options = 0;
         spillableRows.setup(rowIf, false, isStable?stableSort_earlyAlloc:stableSort_none);
     }
     ~CThorRowCollectorBase()
@@ -1451,8 +1510,9 @@ public:
     {
         reset();
         spillableRows.transferFrom(src);
+        enableSpillingCallback();
     }
-    virtual void setup(ICompare *_iCompare, bool _isStable, RowCollectorFlags _diskMemMix, unsigned _spillPriority)
+    virtual void setup(ICompare *_iCompare, bool _isStable, RowCollectorSpillFlags _diskMemMix, unsigned _spillPriority)
     {
         iCompare = _iCompare;
         isStable = _isStable;
@@ -1471,6 +1531,10 @@ public:
     {
         spillableRows.ensure(max);
     }
+    virtual void setOptions(unsigned _options)
+    {
+        options = _options;
+    }
 // IBufferedRowCallback
     virtual unsigned getPriority() const
     {
@@ -1491,6 +1555,7 @@ class CThorRowLoader : public CThorRowCollectorBase, implements IThorRowLoader
     IRowStream *load(IRowStream *in, const bool &abort, TRLGroupFlag grouping, CThorExpandingRowArray *allMemRows, memsize_t *memUsage)
     {
         reset();
+        enableSpillingCallback();
         setPreserveGrouping(trl_preserveGrouping == grouping);
         while (!abort)
         {
@@ -1516,7 +1581,7 @@ class CThorRowLoader : public CThorRowCollectorBase, implements IThorRowLoader
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CThorRowLoader(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, bool isStable, RowCollectorFlags diskMemMix, unsigned spillPriority)
+    CThorRowLoader(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, bool isStable, RowCollectorSpillFlags diskMemMix, unsigned spillPriority)
         : CThorRowCollectorBase(activity, rowIf, iCompare, isStable, diskMemMix, spillPriority)
     {
     }
@@ -1526,11 +1591,12 @@ public:
     virtual unsigned overflowScale() const { return CThorRowCollectorBase::overflowScale(); }
     virtual void transferRowsOut(CThorExpandingRowArray &dst, bool sort) { CThorRowCollectorBase::transferRowsOut(dst, sort); }
     virtual void transferRowsIn(CThorExpandingRowArray &src) { CThorRowCollectorBase::transferRowsIn(src); }
-    virtual void setup(ICompare *iCompare, bool isStable=false, RowCollectorFlags diskMemMix=rc_mixed, unsigned spillPriority=50)
+    virtual void setup(ICompare *iCompare, bool isStable=false, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50)
     {
         CThorRowCollectorBase::setup(iCompare, isStable, diskMemMix, spillPriority);
     }
     virtual void ensure(rowidx_t max) { CThorRowCollectorBase::ensure(max); }
+    virtual void setOptions(unsigned options)  { CThorRowCollectorBase::setOptions(options); }
 // IThorRowLoader
     virtual IRowStream *load(IRowStream *in, const bool &abort, bool preserveGrouping, CThorExpandingRowArray *allMemRows, memsize_t *memUsage)
     {
@@ -1543,12 +1609,12 @@ public:
     }
 };
 
-IThorRowLoader *createThorRowLoader(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, bool isStable, RowCollectorFlags diskMemMix, unsigned spillPriority)
+IThorRowLoader *createThorRowLoader(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, bool isStable, RowCollectorSpillFlags diskMemMix, unsigned spillPriority)
 {
     return new CThorRowLoader(activity, rowIf, iCompare, isStable, diskMemMix, spillPriority);
 }
 
-IThorRowLoader *createThorRowLoader(CActivityBase &activity, ICompare *iCompare, bool isStable, RowCollectorFlags diskMemMix, unsigned spillPriority)
+IThorRowLoader *createThorRowLoader(CActivityBase &activity, ICompare *iCompare, bool isStable, RowCollectorSpillFlags diskMemMix, unsigned spillPriority)
 {
     return createThorRowLoader(activity, &activity, iCompare, isStable, diskMemMix, spillPriority);
 }
@@ -1560,7 +1626,7 @@ class CThorRowCollector : public CThorRowCollectorBase, implements IThorRowColle
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CThorRowCollector(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, bool isStable, RowCollectorFlags diskMemMix, unsigned spillPriority)
+    CThorRowCollector(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, bool isStable, RowCollectorSpillFlags diskMemMix, unsigned spillPriority)
         : CThorRowCollectorBase(activity, rowIf, iCompare, isStable, diskMemMix, spillPriority)
     {
     }
@@ -1575,11 +1641,12 @@ public:
     virtual unsigned overflowScale() const { return CThorRowCollectorBase::overflowScale(); }
     virtual void transferRowsOut(CThorExpandingRowArray &dst, bool sort) { CThorRowCollectorBase::transferRowsOut(dst, sort); }
     virtual void transferRowsIn(CThorExpandingRowArray &src) { CThorRowCollectorBase::transferRowsIn(src); }
-    virtual void setup(ICompare *iCompare, bool isStable=false, RowCollectorFlags diskMemMix=rc_mixed, unsigned spillPriority=50)
+    virtual void setup(ICompare *iCompare, bool isStable=false, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50)
     {
         CThorRowCollectorBase::setup(iCompare, isStable, diskMemMix, spillPriority);
     }
     virtual void ensure(rowidx_t max) { CThorRowCollectorBase::ensure(max); }
+    virtual void setOptions(unsigned options) { CThorRowCollectorBase::setOptions(options); }
 // IThorRowCollector
     virtual IRowWriter *getWriter()
     {
@@ -1612,20 +1679,20 @@ public:
     {
         CThorRowCollectorBase::reset();
     }
-    virtual IRowStream *getStream(bool shared)
+    virtual IRowStream *getStream(bool shared, CThorExpandingRowArray *allMemRows)
     {
-        return CThorRowCollectorBase::getStream(NULL, NULL, shared);
+        return CThorRowCollectorBase::getStream(allMemRows, NULL, shared);
     }
 };
 
-IThorRowCollector *createThorRowCollector(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, bool isStable, RowCollectorFlags diskMemMix, unsigned spillPriority, bool preserveGrouping)
+IThorRowCollector *createThorRowCollector(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, bool isStable, RowCollectorSpillFlags diskMemMix, unsigned spillPriority, bool preserveGrouping)
 {
     Owned<IThorRowCollector> collector = new CThorRowCollector(activity, rowIf, iCompare, isStable, diskMemMix, spillPriority);
     collector->setPreserveGrouping(preserveGrouping);
     return collector.getClear();
 }
 
-IThorRowCollector *createThorRowCollector(CActivityBase &activity, ICompare *iCompare, bool isStable, RowCollectorFlags diskMemMix, unsigned spillPriority, bool preserveGrouping)
+IThorRowCollector *createThorRowCollector(CActivityBase &activity, ICompare *iCompare, bool isStable, RowCollectorSpillFlags diskMemMix, unsigned spillPriority, bool preserveGrouping)
 {
     return createThorRowCollector(activity, &activity, iCompare, isStable, diskMemMix, spillPriority, preserveGrouping);
 }

+ 13 - 9
thorlcr/thorutil/thmem.hpp

@@ -212,6 +212,7 @@ graph_decl StringBuffer &getRecordString(const void *key, IOutputRowSerializer *
 #define SPILL_PRIORITY_OVERFLOWABLE_BUFFER SPILL_PRIORITY_DEFAULT
 #define SPILL_PRIORITY_SPILLABLE_STREAM SPILL_PRIORITY_DEFAULT
 #define SPILL_PRIORITY_RESULT SPILL_PRIORITY_DEFAULT
+#define SPILL_PRIORITY_LOOKUPJOIN 10
 
 enum StableSortFlag { stableSort_none, stableSort_earlyAlloc, stableSort_lateAlloc };
 class CThorSpillableRowArray;
@@ -357,7 +358,7 @@ public:
     bool equal(ICompare *icmp, CThorExpandingRowArray &other);
     bool checkSorted(ICompare *icmp);
 
-    IRowStream *createRowStream(rowidx_t start=0, rowidx_t num=(rowidx_t)-1, bool streamOwns=true);
+    IRowStream *createRowStream(rowidx_t start=0, rowidx_t num=(rowidx_t)-1, bool streamOwns=true, bool clearAtEos=false);
 
     void partition(ICompare &compare, unsigned num, UnsignedArray &out); // returns num+1 points
 
@@ -370,6 +371,7 @@ public:
     void deserialize(size32_t sz, const void *buf);
     void deserializeExpand(size32_t sz, const void *data);
     bool ensure(rowidx_t requiredRows);
+    void compact();
     virtual IThorArrayLock &queryLock() { return dummyLock; }
 
 friend class CThorSpillableRowArray;
@@ -460,7 +462,7 @@ public:
         kill();
         swap(from);
     }
-    void transferFrom(CThorExpandingRowArray &src); 
+    void transferFrom(CThorExpandingRowArray &src);
 
     IRowStream *createRowStream();
 
@@ -488,7 +490,8 @@ public:
 };
 
 
-enum RowCollectorFlags { rc_mixed, rc_allMem, rc_allDisk, rc_allDiskOrAllMem };
+enum RowCollectorSpillFlags { rc_mixed, rc_allMem, rc_allDisk, rc_allDiskOrAllMem };
+#define rcflag_noAllInMemSort 0x01
 interface IThorRowCollectorCommon : extends IInterface
 {
     virtual rowcount_t numRows() const = 0;
@@ -496,8 +499,9 @@ interface IThorRowCollectorCommon : extends IInterface
     virtual unsigned overflowScale() const = 0;
     virtual void transferRowsOut(CThorExpandingRowArray &dst, bool sort=true) = 0;
     virtual void transferRowsIn(CThorExpandingRowArray &src) = 0;
-    virtual void setup(ICompare *iCompare, bool isStable=false, RowCollectorFlags diskMemMix=rc_mixed, unsigned spillPriority=50) = 0;
+    virtual void setup(ICompare *iCompare, bool isStable=false, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50) = 0;
     virtual void ensure(rowidx_t max) = 0;
+    virtual void setOptions(unsigned options) = 0;
 };
 
 interface IThorRowLoader : extends IThorRowCollectorCommon
@@ -511,13 +515,13 @@ interface IThorRowCollector : extends IThorRowCollectorCommon
     virtual void setPreserveGrouping(bool tf) = 0;
     virtual IRowWriter *getWriter() = 0;
     virtual void reset() = 0;
-    virtual IRowStream *getStream(bool shared=false) = 0;
+    virtual IRowStream *getStream(bool shared=false, CThorExpandingRowArray *allMemRows=NULL) = 0;
 };
 
-extern graph_decl IThorRowLoader *createThorRowLoader(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare=NULL, bool isStable=false, RowCollectorFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT);
-extern graph_decl IThorRowLoader *createThorRowLoader(CActivityBase &activity, ICompare *iCompare=NULL, bool isStable=false, RowCollectorFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT);
-extern graph_decl IThorRowCollector *createThorRowCollector(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare=NULL, bool isStable=false, RowCollectorFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT, bool preserveGrouping=false);
-extern graph_decl IThorRowCollector *createThorRowCollector(CActivityBase &activity, ICompare *iCompare=NULL, bool isStable=false, RowCollectorFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT, bool preserveGrouping=false);
+extern graph_decl IThorRowLoader *createThorRowLoader(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare=NULL, bool isStable=false, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT);
+extern graph_decl IThorRowLoader *createThorRowLoader(CActivityBase &activity, ICompare *iCompare=NULL, bool isStable=false, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT);
+extern graph_decl IThorRowCollector *createThorRowCollector(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare=NULL, bool isStable=false, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT, bool preserveGrouping=false);
+extern graph_decl IThorRowCollector *createThorRowCollector(CActivityBase &activity, ICompare *iCompare=NULL, bool isStable=false, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT, bool preserveGrouping=false);