Browse Source

Merge pull request #9044 from jakesmith/hpcc-10132

HPCC-10132 Implement constant rhs for lookup/smart join

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 8 years ago
parent
commit
fe6ebcbedb

+ 5 - 0
testing/regress/ecl/key/looplookup.xml

@@ -0,0 +1,5 @@
+<Dataset name='Result 1'>
+ <Row><id>201</id></Row>
+ <Row><id>202</id></Row>
+ <Row><id>203</id></Row>
+</Dataset>

+ 32 - 0
testing/regress/ecl/looplookup.ecl

@@ -0,0 +1,32 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2016 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+//noroxie TBD
+//nohthor TBD
+
+rec := RECORD  
+ unsigned id;
+END;
+
+rhs := DATASET(10000, TRANSFORM(rec, SELF.id := COUNTER) , DISTRIBUTED) : INDEPENDENT;
+lhs := DATASET([{1}, {2}, {3}], rec);
+
+
+loopBody(DATASET(rec) currentlhs) := JOIN(currentlhs, rhs, LEFT.id=RIGHT.id, TRANSFORM(rec, SELF.id := RIGHT.id+1), SMART, HINT(lookupRhsConstant(true)));
+doloop := LOOP(lhs, 200, loopBody(ROWS(LEFT)));
+
+doloop;

+ 94 - 63
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -921,7 +921,8 @@ protected:
     OwnedConstThorRow leftRow;
 
     IThorDataLink *leftITDL, *rightITDL;
-    Owned<IRowStream> left, right;
+    Owned<IRowStream> left;
+    IRowStream *right = nullptr;
     IThorAllocator *rightThorAllocator;
     roxiemem::IRowManager *rightRowManager;
     Owned<IThorRowInterfaces> sharedRightRowInterfaces;
@@ -942,6 +943,7 @@ protected:
     IOutputMetaData *rightOutputMeta;
     PointerArrayOf<CThorRowArrayWithFlushMarker> rhsSlaveRows;
     IArrayOf<IRowStream> gatheredRHSNodeStreams;
+    bool rhsConstant = false;
 
     rowidx_t nextRhsRow;
     unsigned keepLimit;
@@ -1354,6 +1356,7 @@ public:
         broadcastLock = NULL;
         if (!isGlobal())
             setRequireInitData(false);
+        rhsConstant = getOptBool("lookupRhsConstant", false); // for testing purposes only
         appendOutputLinked(this);
     }
     ~CInMemJoinBase()
@@ -1387,6 +1390,7 @@ public:
             leftexception.setown(e);
         }
     }
+    virtual bool isRhsConstant() const { return rhsConstant; }
 
 // IThorSlaveActivity overloaded methods
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
@@ -1394,6 +1398,28 @@ public:
         StringBuffer str;
         ActPrintLog("Join type is %s", getJoinTypeStr(str).str());
 
+        assertex(inputs.ordinality() == 2);
+
+        leftITDL = queryInput(0);
+        rightITDL = queryInput(1);
+        rightOutputMeta = rightITDL->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta();
+        rightAllocator.setown(rightThorAllocator->getRowAllocator(rightOutputMeta, container.queryId(), (roxiemem::RoxieHeapFlags)(roxiemem::RHFpacked|roxiemem::RHFunique)));
+
+        allocator.set(queryRowAllocator());
+        leftAllocator.set(::queryRowAllocator(leftITDL));
+        outputMeta.set(leftITDL->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta());
+
+        rightSerializer.set(::queryRowSerializer(rightITDL));
+        rightDeserializer.set(::queryRowDeserializer(rightITDL));
+
+        if ((flags & JFonfail) || (flags & JFleftouter))
+        {
+            RtlDynamicRowBuilder rr(rightAllocator);
+            rr.ensureRow();
+            size32_t rrsz = helper->createDefaultRight(rr);
+            defaultRight.setown(rr.finalizeRowClear(rrsz));
+        }
+
         if (isGlobal())
         {
             mpTag = container.queryJobChannel().deserializeMPTag(data);
@@ -1409,6 +1435,11 @@ public:
                 rowProcessor = new CRowProcessor(*this);
                 broadcastLock = new CriticalSection;
             }
+            sharedRightRowInterfaces.setown(createThorRowInterfaces(rightRowManager, rightOutputMeta, queryId(), queryHeapFlags(), &queryJobChannel().querySharedMemCodeContext()));
+            rhs.setup(sharedRightRowInterfaces);
+            // NB: use sharedRightRowInterfaces, so that expanding ptr array is using shared allocator
+            for (unsigned s=0; s<container.queryJob().querySlaves(); s++)
+                rhsSlaveRows.item(s)->setup(sharedRightRowInterfaces, false, stableSort_none, true);
         }
     }
     virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
@@ -1416,28 +1447,32 @@ public:
         PARENT::setInputStream(index, _input, consumerOrdered);
         if (0 == index)
             setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), LOOKUPJOINL_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(input->queryFromActivity()), grouped, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()));
+        else
+        {
+            dbgassertex(1 == index);
+            right = queryInputStream(1);
+        }
+    }
+    virtual void reset() override
+    {
+        PARENT::reset();
+        if (!isRhsConstant())
+        {
+            gotRHS = false;
+            rhsTableLen = 0;
+        }
     }
     virtual void start() override
     {
-        assertex(inputs.ordinality() == 2);
-
-        gotRHS = false;
         nextRhsRow = 0;
         joined = 0;
         joinCounter = 0;
         leftMatch = false;
         rhsNext = NULL;
-        rhsTableLen = 0;
-        leftITDL = queryInput(0);
-        rightITDL = queryInput(1);
-        rightOutputMeta = rightITDL->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta();
-        rightAllocator.setown(rightThorAllocator->getRowAllocator(rightOutputMeta, container.queryId(), (roxiemem::RoxieHeapFlags)(roxiemem::RHFpacked|roxiemem::RHFunique)));
 
+        left.set(inputStream); // can be replaced by loader stream
         if (isGlobal())
         {
-            sharedRightRowInterfaces.setown(createThorRowInterfaces(rightRowManager, rightOutputMeta, queryId(), queryHeapFlags(), &queryJobChannel().querySharedMemCodeContext()));
-            rhs.setup(sharedRightRowInterfaces);
-
             // It is not until here, that it is guaranteed all channel slave activities have been initialized.
             if (!channelActivitiesAssigned)
             {
@@ -1448,46 +1483,34 @@ public:
                     channels[c] = &channel;
                 }
                 broadcaster->setBroadcastLock(channels[0]->queryBroadcastLock());
+                channel0Broadcaster = channels[0]->broadcaster;
             }
-            channel0Broadcaster = channels[0]->broadcaster;
-            // NB: use sharedRightRowInterfaces, so that expanding ptr array is using shared allocator
-            for (unsigned s=0; s<container.queryJob().querySlaves(); s++)
-                rhsSlaveRows.item(s)->setup(sharedRightRowInterfaces, false, stableSort_none, true);
         }
-        allocator.set(queryRowAllocator());
-        leftAllocator.set(::queryRowAllocator(leftITDL));
-        outputMeta.set(leftITDL->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta());
 
         eos = eog = someSinceEog = false;
         atomic_set(&interChannelToNotifyCount, 0);
         currentHashEntry.index = 0;
         currentHashEntry.count = 0;
 
-        rightSerializer.set(::queryRowSerializer(rightITDL));
-        rightDeserializer.set(::queryRowDeserializer(rightITDL));
-
-        if ((flags & JFonfail) || (flags & JFleftouter))
-        {
-            RtlDynamicRowBuilder rr(rightAllocator);
-            rr.ensureRow();
-            size32_t rrsz = helper->createDefaultRight(rr);
-            defaultRight.setown(rr.finalizeRowClear(rrsz));
-        }
-
-        CAsyncCallStart asyncLeftStart(std::bind(&CInMemJoinBase::startLeftInput, this));
-        try
+        if (hasStarted() && isRhsConstant()) // if this is the 2nd+ iteration and the RHS is constant, don't both restarting right, it will not be used.
         {
-            startInput(1);
+            startLeftInput();
         }
-        catch (CATCHALL)
+        else
         {
+            CAsyncCallStart asyncLeftStart(std::bind(&CInMemJoinBase::startLeftInput, this));
+            try
+            {
+                startInput(1);
+            }
+            catch (CATCHALL)
+            {
+                asyncLeftStart.wait();
+                left->stop();
+                throw;
+            }
             asyncLeftStart.wait();
-            left->stop();
-            throw;
         }
-        asyncLeftStart.wait();
-        left.set(inputStream);
-        right.set(queryInputStream(1));
         if (leftexception)
         {
             right->stop();
@@ -1509,13 +1532,12 @@ public:
     }
     virtual void stop()
     {
-        // JCS->GH - if in a child query, it would be good to preserve RHS.. would need tip/flag from codegen that constant
-        clearRHS();
-        clearHT();
-        if (right)
+        if (!isRhsConstant()) // if rhs is a constant dataset, or if failed over
         {
-            stopInput(1, "(R)");
-            right.clear();
+            clearRHS();
+            clearHT();
+            if (right)
+                stopInput(1, "(R)");
         }
         if (broadcaster)
             broadcaster->reset();
@@ -2266,7 +2288,7 @@ protected:
             if (getOptBool(THOROPT_LKJOIN_HASHJOINFAILOVER)) // for testing only (force to disk, as if spilt)
                 channelDistributor.spill(false);
 
-            Owned<IRowStream> distChannelStream = rhsDistributor->connect(queryRowInterfaces(rightITDL), right.getClear(), rightHash, NULL);
+            Owned<IRowStream> distChannelStream = rhsDistributor->connect(queryRowInterfaces(rightITDL), right, rightHash, NULL);
             channelDistributor.processDistRight(distChannelStream);
         }
         catch (IException *e)
@@ -2408,7 +2430,7 @@ protected:
                     }
 
                     // start LHS distributor, needed by local lookup or full join
-                    left.setown(lhsDistributor->connect(queryRowInterfaces(leftITDL), left.getClear(), leftHash, NULL));
+                    left.setown(lhsDistributor->connect(queryRowInterfaces(leftITDL), left, leftHash, NULL));
 
                     // NB: Some channels in this or other slave processes may have fallen over to hash join
                 }
@@ -2440,7 +2462,10 @@ protected:
                     if (hasFailedOverToLocal())
                         marker.reset();
                     if (!prepareLocalHT(marker, *rightCollector)) // can cause others to spill, but must not be allowed to spill channel rows I'm working on.
+                    {
+                        setFailoverToStandard(true);
                         ActPrintLog("Out of memory trying to prepare [LOCAL] hashtable for a SMART join (%" RIPF "d rows), will now failover to a std hash join", rhs.ordinality());
+                    }
                     rightStream.setown(rightCollector->getStream(false, &rhs));
                 }
             }
@@ -2587,6 +2612,8 @@ public:
         }
         return false;
     }
+    virtual bool isRhsConstant() const { return PARENT::isRhsConstant() && !hasFailedOverToStandard(); }
+
 // IThorSlaveActivity overloaded methods
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
@@ -2603,26 +2630,35 @@ public:
             rhsDistributor.setown(createHashDistributor(this, queryJobChannel().queryJobComm(), rhsDistributeTag, false, NULL, "RHS"));
             lhsDistributor.setown(createHashDistributor(this, queryJobChannel().queryJobComm(), lhsDistributeTag, false, NULL, "LHS"));
         }
-    }
-    virtual void start() override
-    {
-        ActivityTimer s(totalCycles, timeActivities);
-        PARENT::start();
         if (isSmart())
         {
             if (isGlobal())
             {
                 for (unsigned s=0; s<container.queryJob().querySlaves(); s++)
                     rhsSlaveRows.item(s)->setup(sharedRightRowInterfaces, false, stableSort_none, false);
-                setFailoverToLocal(false);
-                rhsCollated = rhsCompacted = false;
             }
-            setFailoverToStandard(false);
         }
-        else
+    }
+    virtual void start() override
+    {
+        PARENT::start();
+        if (!isSmart())
+            dbgassertex(leftITDL->isGrouped() == grouped); // std. lookup join expects these to match
+    }
+    virtual void reset() override
+    {
+        PARENT::reset();
+        if (isSmart())
         {
-            bool inputGrouped = leftITDL->isGrouped();
-            dbgassertex(inputGrouped == grouped); // std. lookup join expects these to match
+            if (!isRhsConstant())
+            {
+                if (isGlobal())
+                {
+                    setFailoverToLocal(false);
+                    rhsCollated = rhsCompacted = false;
+                }
+            }
+            setFailoverToStandard(false);
         }
     }
     CATCH_NEXTROW()
@@ -3203,11 +3239,6 @@ public:
         returnMany = true;
     }
 // IThorSlaveActivity overloaded methods
-    virtual void start()
-    {
-        ActivityTimer s(totalCycles, timeActivities);
-        PARENT::start();
-    }
     CATCH_NEXTROW()
     {
         ActivityTimer t(totalCycles, timeActivities);