Procházet zdrojové kódy

HPCC-8245 - Clear up a few things

1) bug, race between 1st/2nd broadcasters on different slaves
2) bug, some denormalizelookup cases missing from joinhelper
changes
3) Ensure all members initialized in constructor
(move some things from init() to constructor in the process)
4) Clear up tabs vs spaces
5) Unify on one createLookupJoinSlave() method and use TAK type
6) Add #TEST_FAILOVER_ defines
7) Comment in JFsmart usage

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith před 11 roky
rodič
revize
be4533f65c

+ 6 - 2
thorlcr/activities/lookupjoin/thlookupjoin.cpp

@@ -22,7 +22,7 @@
 
 class CLookupJoinActivityMaster : public CMasterActivity
 {
-    mptag_t lhsDistributeTag, rhsDistributeTag;
+    mptag_t broadcast2MpTag, lhsDistributeTag, rhsDistributeTag;
 
 public:
     CLookupJoinActivityMaster(CMasterGraphElement * info) : CMasterActivity(info)
@@ -31,7 +31,8 @@ public:
             lhsDistributeTag = rhsDistributeTag = TAG_NULL;
         else
         {
-	        mpTag = container.queryJob().allocateMPTag(); // NB: base takes ownership and free's
+            mpTag = container.queryJob().allocateMPTag(); // NB: base takes ownership and free's
+            broadcast2MpTag = container.queryJob().allocateMPTag();
             lhsDistributeTag = container.queryJob().allocateMPTag();
             rhsDistributeTag = container.queryJob().allocateMPTag();
         }
@@ -40,8 +41,10 @@ public:
     {
         if (!container.queryLocal())
         {
+            container.queryJob().freeMPTag(broadcast2MpTag);
             container.queryJob().freeMPTag(lhsDistributeTag);
             container.queryJob().freeMPTag(rhsDistributeTag);
+            // NB: if mpTag is allocated, the activity base class frees
         }
     }
     void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
@@ -49,6 +52,7 @@ public:
         if (!container.queryLocal())
         {
             serializeMPtag(dst, mpTag);
+            serializeMPtag(dst, broadcast2MpTag);
             serializeMPtag(dst, lhsDistributeTag);
             serializeMPtag(dst, rhsDistributeTag);
         }

+ 250 - 214
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -15,7 +15,6 @@
     limitations under the License.
 ############################################################################## */
 
-#include "thlookupjoinslave.ipp"
 #include "thactivityutil.ipp"
 #include "javahash.hpp"
 #include "javahash.tpp"
@@ -35,8 +34,11 @@
 #define _TRACEBROADCAST
 #endif
 
+//#define TEST_FAILOVER_DISTRIBUTED_LOOKUPJOIN
+//#define TEST_FAILOVER_HASHJOIN
+
+
 enum join_t { JT_Undefined, JT_Inner, JT_LeftOuter, JT_RightOuter, JT_LeftOnly, JT_RightOnly, JT_LeftOnlyTransform };
-enum joinkind_t { join_lookup, join_all, denormalize_lookup, denormalize_all };
 
 
 #define MAX_SEND_SIZE 0x100000 // 1MB
@@ -487,7 +489,7 @@ class CLookupJoinActivity : public CSlaveActivity, public CThorDataLink, impleme
     const void *rhsNext;
     Owned<IOutputMetaData> outputMeta;
     rowcount_t rhsTotalCount;
-    mptag_t lhsDistributeTag, rhsDistributeTag;
+    mptag_t lhsDistributeTag, rhsDistributeTag, broadcast2MpTag;
 
     PointerArrayOf<CThorExpandingRowArray> rhsNodeRows;
     CBroadcaster broadcaster;
@@ -515,10 +517,54 @@ class CLookupJoinActivity : public CSlaveActivity, public CThorDataLink, impleme
     ICompare *compareLeft;
     Owned<IJoinHelper> joinHelper;
 
-    inline bool isLookup() { return (joinKind==join_lookup)||(joinKind==denormalize_lookup); }
-    inline bool isAll() { return (joinKind==join_all)||(joinKind==denormalize_all); }
-    inline bool isDenormalize() { return (joinKind==denormalize_all)||(joinKind==denormalize_lookup); }
-    inline bool isGroupOp() { return (TAKlookupdenormalizegroup == container.getKind() || TAKsmartdenormalizegroup == container.getKind() || TAKalldenormalizegroup == container.getKind()); }
+    inline bool isLookup() const
+    {
+        switch (container.getKind())
+        {
+            case TAKlookupjoin:
+            case TAKlookupdenormalize:
+            case TAKlookupdenormalizegroup:
+            case TAKsmartjoin:
+                return true;
+        }
+        return false;
+    }
+    inline bool isAll() const
+    {
+        switch (container.getKind())
+        {
+            case TAKalljoin:
+            case TAKalldenormalize:
+            case TAKalldenormalizegroup:
+                return true;
+        }
+        return false;
+    }
+    inline bool isDenormalize() const
+    {
+        switch (container.getKind())
+        {
+            case TAKlookupdenormalize:
+            case TAKlookupdenormalizegroup:
+            case TAKalldenormalize:
+            case TAKalldenormalizegroup:
+            case TAKsmartdenormalizegroup:
+            case TAKsmartdenormalize:
+                return true;
+        }
+        return false;
+    }
+    inline bool isGroupOp() const
+    {
+        switch (container.getKind())
+        {
+            case (TAKlookupdenormalizegroup:
+            case TAKsmartdenormalizegroup:
+            case TAKalldenormalizegroup:
+                return true;
+        }
+        return false;
+    }
     StringBuffer &getJoinTypeStr(StringBuffer &str)
     {
         switch(joinType)
@@ -627,13 +673,11 @@ class CLookupJoinActivity : public CSlaveActivity, public CThorDataLink, impleme
                 rows->kill();
         }
     }
-protected:
-    joinkind_t joinKind;
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CLookupJoinActivity(CGraphElementBase *_container, joinkind_t _joinKind) 
-        : CSlaveActivity(_container), CThorDataLink(this), joinKind(_joinKind), broadcaster(*this), rhs(*this, NULL), ht(*this, NULL, true),
+    CLookupJoinActivity(CGraphElementBase *_container)
+        : CSlaveActivity(_container), CThorDataLink(this), broadcaster(*this), rhs(*this, NULL), ht(*this, NULL, true),
           rowProcessor(*this)
     {
         gotRHS = false;
@@ -647,83 +691,40 @@ public:
         myNode = queryJob().queryMyRank();
         numNodes = queryJob().querySlaves();
         needGlobal = !container.queryLocal() && (container.queryJob().querySlaves() > 1);
-        lhsDistributeTag = rhsDistributeTag = TAG_NULL;
-        failoverToLocalLookupJoin = failoverToStdJoin = false;
-    }
-    ~CLookupJoinActivity()
-    {
-        ForEachItemIn(a, rhsNodeRows)
-        {
-            CThorExpandingRowArray *rows = rhsNodeRows.item(a);
-            if (rows)
-                rows->Release();
-        }
-    }
-    bool clearNonLocalRows(const char *msg)
-    {
-        rowidx_t clearedRows = 0;
-        {
-            CriticalBlock b(localHashCrit);
-            if (spiltBroadcastingRHS)
-                return false;
-            ActPrintLog("Clearing non-local rows - cause: %s", msg);
-            spiltBroadcastingRHS = true;
-            ForEachItemIn(a, rhsNodeRows)
-            {
-                CThorExpandingRowArray &rows = *rhsNodeRows.item(a);
-                ActPrintLog("pre-hash-clear - rows[%d] had %"RIPF"d rows", a, rows.ordinality());
-                rowidx_t numRows = rows.ordinality();
-                for (unsigned r=0; r<numRows; r++)
-                {
-                    unsigned hv = rightHash->hash(rows.query(r));
-                    if ((myNode-1) != (hv % numNodes))
-                    {
-                        OwnedConstThorRow row = rows.getClear(r); // dispose of
-                        ++clearedRows;
-                    }
-                }
-                rowidx_t chkCount = 0;
-                for (unsigned r=0; r<numRows; r++)
-                {
-                    if (NULL != rows.query(r))
-                        ++chkCount;
-                }
-                rows.compact();
-                ActPrintLog("post-hash-clear - rows[%d] has %"RIPF"d rows (chkCount=%"RIPF"d)", a, rows.ordinality(), chkCount);
-            }
-        }
-
-        ActPrintLog("handleLowMem: clearedRows = %"RIPF"d", clearedRows);
-        return 0 != clearedRows;
-    }
+        broadcast2MpTag = lhsDistributeTag = rhsDistributeTag = TAG_NULL;
 
-// IThorSlaveActivity overloaded methods
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
-    {
-        ActPrintLog("init");
-        appendOutputLinked(this);
+        rhsTable = NULL;
+        rhsTableLen = htCount = htDedupCount = 0;
+        rhsRows = RIUNSET;
+        rhsTotalCount = RCUNSET;
+        leftITDL = rightITDL = NULL;
+        candidateIndex = 0;
 
         eos = false;
-        someSinceEog = false;
-        eog = false;
+        eog = someSinceEog = false;
         joined = 0;
         doRightOuter = false;
         leftMatch = false;
         grouped = false;
         lastRightOuter = 0;
+        fuzzyMatch = returnMany = dedup = false;
         waitForOtherRO = true;
+
         hashJoinHelper = NULL;
         allJoinHelper = NULL;
-        abortLimit = 0;
-        compareRight = NULL;
-        leftHash = NULL;
-        rightHash = NULL;
-        compareLeftRight = NULL;
-        keepLimit = 0;
-        switch (joinKind)
+        exclude = false;
+        abortLimit = keepLimit = 0;
+        allJoinHelper = NULL;
+        hashJoinHelper = NULL;
+        leftHash = rightHash = NULL;
+        hashJoinHelper = NULL;
+        compareLeft = compareRight = compareLeftRight = NULL;
+
+        switch (container.getKind())
         {
-            case join_all:
-            case denormalize_all:
+            case TAKalljoin:
+            case TAKalldenormalize:
+            case TAKalldenormalizegroup:
             {
                 allJoinHelper = (IHThorAllJoinArg *)queryHelper();
                 flags = allJoinHelper->getJoinFlags();
@@ -732,16 +733,17 @@ public:
                 fuzzyMatch = 0 != (JFmatchrequired & flags);
                 break;
             }
-            case join_lookup:
-            case denormalize_lookup:
+            case TAKlookupjoin:
+            case TAKlookupdenormalize:
+            case TAKlookupdenormalizegroup:
             {
                 hashJoinHelper = (IHThorHashJoinArg *)queryHelper();
+                flags = hashJoinHelper->getJoinFlags();
                 leftHash = hashJoinHelper->queryHashLeft();
                 rightHash = hashJoinHelper->queryHashRight();
                 compareRight = hashJoinHelper->queryCompareRight();
                 compareLeft = hashJoinHelper->queryCompareLeft();
                 compareLeftRight = hashJoinHelper->queryCompareLeftRight();
-                flags = hashJoinHelper->getJoinFlags();
                 if (JFmanylookup & flags)
                     returnMany = true;
                 keepLimit = hashJoinHelper->getKeepLimit();
@@ -752,14 +754,6 @@ public:
                 bool maySkip = 0 != (flags & JFtransformMaySkip);
                 dedup = compareRight && !maySkip && !fuzzyMatch && !returnMany;
 
-#if 0 // JCS->GH - Need new flag (HPCC-10068)
-                if (0 != (flags & JFnoorderedlhs))
-                {
-                    failoverToLocalLookupJoin = true;
-                    failoverToStdJoin = true;
-                }
-#endif
-
                 // code gen should spot invalid constants on KEEP with LOOKUP (without MANY)
                 break;
             }
@@ -776,8 +770,8 @@ public:
         if (abortLimit < atMost)
             atMost = abortLimit;
 
-        if (flags & JFleftouter)        
-            joinType = exclude ? JT_LeftOnly : JT_LeftOuter;        
+        if (flags & JFleftouter)
+            joinType = exclude ? JT_LeftOnly : JT_LeftOuter;
         else if (flags & JFrightouter)
         {
             UNIMPLEMENTED;
@@ -786,10 +780,67 @@ public:
         }
         else
             joinType = JT_Inner;
+        StringBuffer str;
+        ActPrintLog("Join type is %s", getJoinTypeStr(str).str());
+
+        failoverToLocalLookupJoin = failoverToStdJoin = (0 != (flags & JFsmart));
+    }
+    ~CLookupJoinActivity()
+    {
+        ForEachItemIn(a, rhsNodeRows)
+        {
+            CThorExpandingRowArray *rows = rhsNodeRows.item(a);
+            if (rows)
+                rows->Release();
+        }
+    }
+    bool clearNonLocalRows(const char *msg)
+    {
+        rowidx_t clearedRows = 0;
+        {
+            CriticalBlock b(localHashCrit);
+            if (spiltBroadcastingRHS)
+                return false;
+            ActPrintLog("Clearing non-local rows - cause: %s", msg);
+            spiltBroadcastingRHS = true;
+            ForEachItemIn(a, rhsNodeRows)
+            {
+                CThorExpandingRowArray &rows = *rhsNodeRows.item(a);
+                ActPrintLog("pre-hash-clear - rows[%d] had %"RIPF"d rows", a, rows.ordinality());
+                rowidx_t numRows = rows.ordinality();
+                for (unsigned r=0; r<numRows; r++)
+                {
+                    unsigned hv = rightHash->hash(rows.query(r));
+                    if ((myNode-1) != (hv % numNodes))
+                    {
+                        OwnedConstThorRow row = rows.getClear(r); // dispose of
+                        ++clearedRows;
+                    }
+                }
+                rowidx_t chkCount = 0;
+                for (unsigned r=0; r<numRows; r++)
+                {
+                    if (NULL != rows.query(r))
+                        ++chkCount;
+                }
+                rows.compact();
+                ActPrintLog("post-hash-clear - rows[%d] has %"RIPF"d rows (chkCount=%"RIPF"d)", a, rows.ordinality(), chkCount);
+            }
+        }
+
+        ActPrintLog("handleLowMem: clearedRows = %"RIPF"d", clearedRows);
+        return 0 != clearedRows;
+    }
+
+// IThorSlaveActivity overloaded methods
+    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
+    {
+        appendOutputLinked(this);
 
         if (!container.queryLocal())
         {
             mpTag = container.queryJob().deserializeMPTag(data);
+            broadcast2MpTag = container.queryJob().deserializeMPTag(data);
             lhsDistributeTag = container.queryJob().deserializeMPTag(data);
             rhsDistributeTag = container.queryJob().deserializeMPTag(data);
         }
@@ -798,8 +849,6 @@ public:
         rhsNodeRows.ensure(slaves);
         while (slaves--)
             rhsNodeRows.append(new CThorExpandingRowArray(*this, NULL, true)); // true, nulls not needed?
-        StringBuffer str;
-        ActPrintLog("Join type is %s", getJoinTypeStr(str).str());
     }
     virtual void onInputStarted(IException *except)
     {
@@ -847,8 +896,8 @@ public:
 
         if (failoverToLocalLookupJoin && hashJoinHelper) // only for LOOKUP not ALL
         {
-        	if (needGlobal)
-        		queryJob().queryRowManager()->addRowBuffer(this);
+            if (needGlobal)
+                queryJob().queryRowManager()->addRowBuffer(this);
         }
 
         try
@@ -876,10 +925,11 @@ public:
             rl.ensureRow();
         size32_t rrsz=0;
         size32_t rlsz=0;
-        switch(joinKind)
+        switch (container.getKind())
         {
-            case join_all:
-            case denormalize_all:
+            case TAKalljoin:
+            case TAKalldenormalize:
+            case TAKalldenormalizegroup:
             {
                 if (rr.exists()) 
                     rrsz = allJoinHelper->createDefaultRight(rr);
@@ -887,8 +937,9 @@ public:
                     rlsz = allJoinHelper->createDefaultLeft(rl);
                 break;
             }
-            case join_lookup:
-            case denormalize_lookup:
+            case TAKlookupjoin:
+            case TAKlookupdenormalize:
+            case TAKlookupdenormalizegroup:
             {
                 if (rr.exists()) 
                     rrsz = hashJoinHelper->createDefaultRight(rr);
@@ -920,7 +971,7 @@ public:
         if (!gotRHS && needGlobal)
             getRHS(true); // If global, need to handle RHS until all are slaves stop
 
-        // JCS->GH - if in a child query, it would be to preserve RHS.. would need tip/flag from codegen that constant
+        // JCS->GH - if in a child query, it would be good to preserve RHS.. would need tip/flag from codegen that constant
         clearRHS();
 
         if (right)
@@ -946,13 +997,15 @@ public:
     }
     inline bool match(const void *lhs, const void *rhsrow)
     {
-        switch (joinKind)
+        switch (container.getKind())
         {
-            case join_all:
-            case denormalize_all:
+            case TAKalljoin:
+            case TAKalldenormalize:
+            case TAKalldenormalizegroup:
                 return allJoinHelper->match(lhs, rhsrow);
-            case join_lookup:
-            case denormalize_lookup:
+            case TAKlookupjoin:
+            case TAKlookupdenormalize:
+            case TAKlookupdenormalizegroup:
                 return hashJoinHelper->match(lhs, rhsrow);
             default:
                 throwUnexpected();
@@ -962,14 +1015,16 @@ public:
     {
         RtlDynamicRowBuilder row(allocator);
         size32_t thisSize;
-        switch (joinKind)
+        switch (container.getKind())
         {
-            case join_all:
-            case denormalize_all:
+            case TAKalljoin:
+            case TAKalldenormalize:
+            case TAKalldenormalizegroup:
                 thisSize = allJoinHelper->transform(row, lhs, rhsrow);
                 break;
-            case join_lookup:
-            case denormalize_lookup:
+            case TAKlookupjoin:
+            case TAKlookupdenormalize:
+            case TAKlookupdenormalizegroup:
                 thisSize = hashJoinHelper->transform(row, lhs, rhsrow);
                 break;
             default:
@@ -1350,10 +1405,7 @@ public:
                 {
                     OwnedConstThorRow row = right->ungroupedNextRow();
                     if (!row)
-                    {
-                        ActPrintLog("broadcasting last row (!row)");
                         break;
-                    }
 
                     {
                         CriticalBlock b(localHashCrit);
@@ -1382,23 +1434,14 @@ public:
                         break;
                 }
                 if (0 == mb.length())
-                {
-                    ActPrintLog("Stopped broadcasting because 0 == mb.length()");
                     break;
-                }
                 if (stopRHSBroadcast)
                     sendItem->setFlag(bcastflag_spilt);
                 ThorCompress(mb, sendItem->queryMsg());
                 if (!broadcaster.send(sendItem))
-                {
-                    ActPrintLog("Stopped broadcasting because !broadcaster.send(sendItem)");
                     break;
-                }
                 if (stopRHSBroadcast)
-                {
-                    ActPrintLog("Stopped broadcasting because stopRHSBroadcast");
                     break;
-                }
                 mb.clear();
                 broadcaster.resetSendItem(sendItem);
             }
@@ -1489,20 +1532,18 @@ public:
             if (!receiveMsg(msg, 0, mpTag))
                 return;
             msg.read(rhsTotalCount);
-        }
-		if (needGlobal)
-		{
-			rowProcessor.start();
-			broadcaster.start(this, mpTag, stopping);
-			broadcastRHS();
-			broadcaster.end();
-			rowProcessor.wait();
-
-			if (stopping)
-			{
-			    queryJob().queryRowManager()->removeRowBuffer(this);
-			    return;
-			}
+
+            rowProcessor.start();
+            broadcaster.start(this, mpTag, stopping);
+            broadcastRHS();
+            broadcaster.end();
+            rowProcessor.wait();
+
+            if (stopping)
+            {
+                queryJob().queryRowManager()->removeRowBuffer(this);
+                return;
+            }
 
             rhsRows = 0;
             {
@@ -1528,27 +1569,33 @@ public:
             else
                 rhs.ensure(rhsRows);
 
-			/* NB: Potentially one of the slaves spilt late after broadcast and rowprocessor finished
-			 * Need to remove spill callback and broadcast one last message to know.
-			 */
+            /* NB: Potentially one of the slaves spilt late after broadcast and rowprocessor finished
+             * Need to remove spill callback and broadcast one last message to know.
+             */
+
+            queryJob().queryRowManager()->removeRowBuffer(this);
 
-			queryJob().queryRowManager()->removeRowBuffer(this);
+            ActPrintLog("Broadcasting final split status");
+            broadcaster.reset();
+            // NB: using a different tag from 1st broadcast, as 2nd on other nodes can start sending before 1st on this has quit receiving
+            broadcaster.start(this, broadcast2MpTag, false);
+            Owned<CSendItem> sendItem = broadcaster.newSendItem(bcast_stop);
+            if (spiltBroadcastingRHS)
+                sendItem->setFlag(bcastflag_spilt);
+            broadcaster.send(sendItem); // signals stop to others
+            broadcaster.end();
 
-			broadcaster.reset();
-			broadcaster.start(this, mpTag, false);
-	        Owned<CSendItem> sendItem = broadcaster.newSendItem(bcast_stop);
-	        if (spiltBroadcastingRHS)
-	            sendItem->setFlag(bcastflag_spilt);
-	        ActPrintLog("Sending final RHS broadcast packet");
-	        broadcaster.send(sendItem); // signals stop to others
-			broadcaster.end();
+            /* All slaves now know whether any one spilt or not, i.e. whether to perform local hash join or not
+             * If any have, still need to distribute rest of RHS..
+             */
 
-			/* All slaves now know whether any one spilt or not, i.e. whether to perform local hash join or not
-			 * If any have, still need to distribute rest of RHS..
-			 */
+#if defined(TEST_FAILOVER_DISTRIBUTED_LOOKUPJOIN) || defined (TEST_FAILOVER_HASHJOIN)
+            if (failoverToLocalLookupJoin && isLookup())
+                clearNonLocalRows("testing");
+#endif
 
-			if (spiltBroadcastingRHS) // NB: Can only be active for LOOKUP (not ALL)
-			{
+            if (spiltBroadcastingRHS) // NB: Can only be active for LOOKUP (not ALL)
+            {
                 ActPrintLog("Spilt whilst broadcasting, will attempt distribute local lookup join");
                 localLookupJoin = true;
 
@@ -1558,9 +1605,9 @@ public:
                     throw MakeActivityException(this, 0, "Degraded to distributed lookup join, LHS order cannot be preserved");
 
                 // If HT sized already (due to total from meta) and now spilt, too big clear and size later
-			    ht.kill();
+                ht.kill();
 
-				setupDistributors();
+                setupDistributors();
 
                 IArrayOf<IRowStream> streams;
                 streams.append(*right.getLink()); // what remains of 'right' will be read through distributor
@@ -1570,50 +1617,54 @@ public:
                     streams.append(*rowArray.createRowStream(0, (rowidx_t)-1, true, true)); // NB: will kill array when stream exhausted
                 }
                 right.setown(createConcatRowStream(streams.ordinality(), streams.getArray()));
-			}
-			else
-			{
-	            if (rhsTotalCount != RCUNSET) // verify matches meta if set/calculated (and haven't spilt)
-	                assertex(rhsRows == rhsTotalCount);
-			}
-		}
-		else
-		{
+            }
+            else
+            {
+                if (rhsTotalCount != RCUNSET) // verify matches meta if set/calculated (and haven't spilt)
+                    assertex(rhsRows == rhsTotalCount);
+            }
+        }
+        else
+        {
             if (isLookup())
                 localLookupJoin = true;
-		    else
-		    {
-	            if (RCUNSET != rhsTotalCount)
-	            {
-	                rhsRows = (rowidx_t)rhsTotalCount;
-	                rhs.ensure(rhsRows);
-	            }
-		        // local ALL join, must fit into memory
-	            while (!abortSoon)
-	            {
-	                OwnedConstThorRow row = right->ungroupedNextRow();
-	                if (!row)
-	                    break;
-	                rhs.append(row.getClear());
-	            }
-	            if (RIUNSET == rhsRows)
-	                rhsRows = rhs.ordinality();
-		    }
-		}
-
-		if (localLookupJoin) // NB: Can only be active for LOOKUP (not ALL)
-		{
-		    Owned<IThorRowLoader> rowLoader;
-		    if (failoverToStdJoin)
-		    {
-		        rowLoader.setown(createThorRowLoader(*this, queryRowInterfaces(rightITDL), compareRight, false, rc_mixed, SPILL_PRIORITY_LOOKUPJOIN));
+            else
+            {
+                if (RCUNSET != rhsTotalCount)
+                {
+                    rhsRows = (rowidx_t)rhsTotalCount;
+                    rhs.ensure(rhsRows);
+                }
+                // local ALL join, must fit into memory
+                while (!abortSoon)
+                {
+                    OwnedConstThorRow row = right->ungroupedNextRow();
+                    if (!row)
+                        break;
+                    rhs.append(row.getClear());
+                }
+                if (RIUNSET == rhsRows)
+                    rhsRows = rhs.ordinality();
+            }
+        }
+
+        if (localLookupJoin) // NB: Can only be active for LOOKUP (not ALL)
+        {
+            Owned<IThorRowLoader> rowLoader;
+            if (failoverToStdJoin)
+            {
+#ifdef TEST_FAILOVER_HASHJOIN
+                rowLoader.setown(createThorRowLoader(*this, queryRowInterfaces(rightITDL), compareRight, false, rc_allDisk, SPILL_PRIORITY_LOOKUPJOIN));
+#else
+                rowLoader.setown(createThorRowLoader(*this, queryRowInterfaces(rightITDL), compareRight, false, rc_mixed, SPILL_PRIORITY_LOOKUPJOIN));
                 rowLoader->setOptions(rcflag_noAllInMemSort); // If fits into memory, don't want it sorted
-		    }
-		    else
-		    {
-		        // i.e. will fire OOM if runs out of memory loading local right
+#endif
+            }
+            else
+            {
+                // i.e. will fire OOM if runs out of memory loading local right
                 rowLoader.setown(createThorRowLoader(*this, queryRowInterfaces(rightITDL), compareRight, false, rc_allMem, SPILL_PRIORITY_DISABLE));
-		    }
+            }
             Owned<IRowStream> rightStream = rowLoader->load(right, abortSoon, false, &rhs);
 
             if (!rightStream)
@@ -1684,9 +1735,9 @@ public:
                 joinHelper->init(left, rightStream, leftAllocator, rightAllocator, ::queryRowMetaData(leftITDL), &abortSoon);
                 return;
             }
-		}
-		if (!stopping)
-			prepareRHS();
+        }
+        if (!stopping)
+            prepareRHS();
     }
     void prepareRHS()
     {
@@ -1759,27 +1810,12 @@ public:
     }
     virtual bool freeBufferedRows(bool critical)
     {
-    	// NB: only installed if lookup join and global
+        // NB: only installed if lookup join and global
         return clearNonLocalRows("Out of memory callback");
     }
 };
 
 CActivityBase *createLookupJoinSlave(CGraphElementBase *container) 
 { 
-    return new CLookupJoinActivity(container, join_lookup); 
-}
-
-CActivityBase *createAllJoinSlave(CGraphElementBase *container) 
-{ 
-    return new CLookupJoinActivity(container, join_all); 
-}
-
-CActivityBase *createLookupDenormalizeSlave(CGraphElementBase *container) 
-{ 
-    return new CLookupJoinActivity(container, denormalize_lookup); 
-}
-
-CActivityBase *createAllDenormalizeSlave(CGraphElementBase *container) 
-{ 
-    return new CLookupJoinActivity(container, denormalize_all); 
+    return new CLookupJoinActivity(container);
 }

+ 0 - 28
thorlcr/activities/lookupjoin/thlookupjoinslave.ipp

@@ -1,28 +0,0 @@
-/*##############################################################################
-
-    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
-
-    Licensed under the Apache License, Version 2.0 (the "License");
-    you may not use this file except in compliance with the License.
-    You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-############################################################################## */
-
-#ifndef _thlookupjoinslave_ipp
-#define _thlookupjoinslave_ipp
-
-#include "slave.ipp"
-
-activityslaves_decl CActivityBase *createLookupJoinSlave(CGraphElementBase *container);
-activityslaves_decl CActivityBase *createAllJoinSlave(CGraphElementBase *container);
-activityslaves_decl CActivityBase *createLookupDenormalizeSlave(CGraphElementBase *container);
-activityslaves_decl CActivityBase *createAllDenormalizeSlave(CGraphElementBase *container);
-
-#endif

+ 24 - 10
thorlcr/activities/msort/thsortu.cpp

@@ -474,7 +474,7 @@ public:
             abortlimit = (unsigned)-1;
         keepremaining = keepmax;
         outputmetaL = _outputmeta;
-        if (TAKdenormalize == kind || TAKhashdenormalize == kind)
+        if (TAKdenormalize == kind || TAKhashdenormalize == kind || TAKlookupdenormalize == kind)
             denormTmp.setAllocator(allocator).ensureRow();
         return true;
     }
@@ -553,6 +553,7 @@ public:
                     switch (kind) {
                         case TAKdenormalize:
                         case TAKhashdenormalize:
+                        case TAKlookupdenormalize:
                         {
                             const void *lhs = defaultLeft;
                             do {
@@ -569,7 +570,6 @@ public:
                         }
                         case TAKdenormalizegroup:
                         case TAKhashdenormalizegroup:
-                        case TAKlookupdenormalize:
                         case TAKlookupdenormalizegroup:
                             assertex(!denormRows.ordinality());
                             do {
@@ -599,6 +599,7 @@ public:
                 switch (kind) {
                     case TAKdenormalize:
                     case TAKhashdenormalize:
+                    case TAKlookupdenormalize:
                     {
                         const void *lhs = defaultLeft;
                         do {
@@ -617,7 +618,6 @@ public:
                     }
                     case TAKdenormalizegroup:
                     case TAKhashdenormalizegroup:
-                    case TAKlookupdenormalize:
                     case TAKlookupdenormalizegroup:
 
                         assertex(!denormRows.ordinality());
@@ -654,11 +654,11 @@ public:
                 switch (kind) {
                     case TAKdenormalize:
                     case TAKhashdenormalize:
+                    case TAKlookupdenormalize:
                         fret.set(nextleft);
                         break;
                     case TAKdenormalizegroup:
                     case TAKhashdenormalizegroup:
-                    case TAKlookupdenormalize:
                     case TAKlookupdenormalizegroup:
                         gotsz = helper->transform(ret, nextleft, NULL, 0, (const void **)NULL);
                         break;
@@ -676,12 +676,24 @@ public:
             else
             {
                 // output group if needed before advancing
-                if ((TAKdenormalize == kind || TAKhashdenormalize == kind) && outSz)
-                    fret.setown(denormLhs.getClear()); // denormLhs holding transform progress
-                else if ((TAKdenormalizegroup == kind || TAKhashdenormalizegroup == kind) && denormRows.ordinality())
+
+                switch (kind)
                 {
-                    gotsz = helper->transform(ret, nextleft, denormRows.query(0), denormRows.ordinality(), denormRows.getRowArray());
-                    denormRows.kill();
+                    case TAKdenormalize:
+                    case TAKhashdenormalize:
+                    case TAKlookupdenormalize:
+                        if (outSz)
+                            fret.setown(denormLhs.getClear()); // denormLhs holding transform progress
+                        break;
+                    case TAKdenormalizegroup:
+                    case TAKhashdenormalizegroup:
+                    case TAKlookupdenormalizegroup:
+                        if (denormRows.ordinality())
+                        {
+                            gotsz = helper->transform(ret, nextleft, denormRows.query(0), denormRows.ordinality(), denormRows.getRowArray());
+                            denormRows.kill();
+                        }
+                        break;
                 }
             }
             nextL();            // output outer once
@@ -701,6 +713,7 @@ public:
                     switch (kind) {
                         case TAKdenormalize:
                         case TAKhashdenormalize:
+                        case TAKlookupdenormalize:
                         {
                             size32_t sz = helper->transform(ret, denormLhs, rightgroup.query(rightidx), ++denormCount);
                             if (sz)
@@ -713,7 +726,6 @@ public:
                         }
                         case TAKdenormalizegroup:
                         case TAKhashdenormalizegroup:
-                        case TAKlookupdenormalize:
                         case TAKlookupdenormalizegroup:
                         {
                             const void *rhsRow = rightgroup.query(rightidx);
@@ -752,9 +764,11 @@ public:
         {
         case TAKdenormalizegroup:
         case TAKhashdenormalizegroup:
+        case TAKlookupdenormalizegroup:
             denormRows.kill(); // fall through
         case TAKdenormalize:
         case TAKhashdenormalize:
+        case TAKlookupdenormalize:
             outSz = 0;
             denormLhs.clear();
             denormCount = 0;

+ 9 - 4
thorlcr/slave/slave.cpp

@@ -206,7 +206,6 @@ void ProcessSlaveActivity::done()
 #include "join/thjoinslave.ipp"
 #include "keyedjoin/thkeyedjoinslave.ipp"
 #include "limit/thlimitslave.ipp"
-#include "lookupjoin/thlookupjoinslave.ipp"
 #include "merge/thmergeslave.ipp"
 #include "msort/thgroupsortslave.ipp"
 #include "msort/thmsortslave.ipp"
@@ -232,6 +231,7 @@ void ProcessSlaveActivity::done()
 #include "wuidwrite/thwuidwriteslave.ipp"
 #include "xmlwrite/thxmlwriteslave.ipp"
 
+CActivityBase *createLookupJoinSlave(CGraphElementBase *container);
 CActivityBase *createXmlParseSlave(CGraphElementBase *container);
 CActivityBase *createKeyDiffSlave(CGraphElementBase *container);
 CActivityBase *createKeyPatchSlave(CGraphElementBase *container);
@@ -439,10 +439,12 @@ public:
                 break;
             case TAKlookupjoin:
             case TAKsmartjoin:
-                ret = createLookupJoinSlave(this);
-                break;
+            case TAKlookupdenormalize:
+            case TAKlookupdenormalizegroup:
             case TAKalljoin:
-                ret = createAllJoinSlave(this);
+            case TAKalldenormalize:
+            case TAKalldenormalizegroup:
+                ret = createLookupJoinSlave(this);
                 break;
             case TAKselfjoin:
                 if (queryLocalOrGrouped())
@@ -535,6 +537,7 @@ public:
             case TAKnwayjoin:
                 ret = createNWayMergeJoinActivity(this);
                 break;
+<<<<<<< HEAD
             case TAKalldenormalize:
             case TAKalldenormalizegroup:
                 ret = createAllDenormalizeSlave(this);
@@ -545,6 +548,8 @@ public:
             case TAKsmartdenormalizegroup:
                 ret = createLookupDenormalizeSlave(this);
                 break;
+=======
+>>>>>>> HPCC-8245 - Clear up a few things
             case TAKchilddataset:
                 UNIMPLEMENTED;
             case TAKchilditerator:

+ 9 - 0
thorlcr/thorutil/thmem.cpp

@@ -1418,6 +1418,15 @@ protected:
                     instrms.append(*spillableRowSet->createRowStream());
                 }
             }
+            else
+            {
+                // 0 rows, no overflow and candidate for allMemRows
+                if ((rc_allDiskOrAllMem == diskMemMix) || // must supply allMemRows, only here if no spilling (see above)
+                    (NULL!=allMemRows && (rc_allMem == diskMemMix)) ||
+                    (NULL!=allMemRows && (rc_mixed == diskMemMix) && 0 == overflowCount) // if allMemRows given, only if no spilling
+                   )
+                    return NULL;
+            }
         }
         if (0 == instrms.ordinality())
             return createNullRowStream();