Преглед на файлове

HPCC-8686 - Allow global join merge stream to spill

Global join used to merge partitioned side to disk, change to
merge to memory and conditional spill on demand.
Also refactor global join code to remove duplicated code shared
between rightPartition/leftPartition implementations.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith преди 11 години
родител
ревизия
e7ae91af8a
променени са 6 файла, в които са добавени 259 реда и са изтрити 363 реда
  1. 1 1
      ecl/hqlcpp/hqlresource.cpp
  2. 237 345
      thorlcr/activities/join/thjoinslave.cpp
  3. 3 3
      thorlcr/thorutil/thbuf.cpp
  4. 1 1
      thorlcr/thorutil/thbufdef.hpp
  5. 16 12
      thorlcr/thorutil/thmem.cpp
  6. 1 1
      thorlcr/thorutil/thmem.hpp

+ 1 - 1
ecl/hqlcpp/hqlresource.cpp

@@ -101,7 +101,7 @@ void getResources(IHqlExpression * expr, CResources & resources, const CResource
         }
         else
         {
-            resources.setHeavyweight().set(RESslavememory, MEM_Const_Minimal+SORT_BUFFER_TOTAL+JOINR_SMART_BUFFER_SIZE);
+            resources.setHeavyweight().set(RESslavememory, MEM_Const_Minimal+SORT_BUFFER_TOTAL+JOIN_SMART_BUFFER_SIZE);
             if (!isLocal)
             {
 #ifndef SORT_USING_MP

+ 237 - 345
thorlcr/activities/join/thjoinslave.cpp

@@ -40,31 +40,24 @@
 
 class JoinSlaveActivity : public CSlaveActivity, public CThorDataLink, implements ISmartBufferNotify
 {
-    Owned<IThorDataLink> input1;
-    Owned<IThorDataLink> input2;
+    Owned<IThorDataLink> leftInput, rightInput;
+    Owned<IThorDataLink> secondaryInput, primaryInput;
     IHThorJoinBaseArg *helper;
     IHThorJoinArg *helperjn;
     IHThorDenormalizeArg *helperdn;
     Owned<IThorSorter> sorter;
     unsigned portbase;
     mptag_t mpTagRPC;
-    ICompare *compare1;
-    ICompare *compare2;
-    ISortKeySerializer *keyserializer1;
-    ISortKeySerializer *keyserializer2;
+    ICompare *leftCompare;
+    ICompare *rightCompare;
+    ISortKeySerializer *leftKeySerializer;
+    ISortKeySerializer *rightKeySerializer;
     ICompare *collate;
     ICompare *collateupper; // if non-null then between join
 
-    Owned<IRowStream> strm1;  // reads from disk
-    Owned<IRowStream> strm2;  // from merge 
-    void *temprec;
-    Semaphore in2startsem;
-    Owned<IException> in2exception;
-    Semaphore in1startsem;
-    Owned<IException> in1exception;
-
-    int infh;
-    StringBuffer tempname;
+    Owned<IRowStream> leftStream, rightStream;
+    Semaphore secondaryStartSem;
+    Owned<IException> secondaryStartException;
 
     bool islocal;
     Owned<IBarrier> barrier;
@@ -77,19 +70,19 @@ class JoinSlaveActivity : public CSlaveActivity, public CThorDataLink, implement
     Owned<IJoinHelper> joinhelper;
     rowcount_t lhsProgressCount, rhsProgressCount;
     CriticalSection joinHelperCrit;
-    bool in1stopped;
-    bool in2stopped;
+    bool leftInputStopped;
+    bool rightInputStopped;
     bool rightpartition;
 
 
-    bool nosortPrimary()
+    bool noSortPartitionSide()
     {
         if (ALWAYS_SORT_PRIMARY)
             return false;
         return (rightpartition?helper->isRightAlreadySorted():helper->isLeftAlreadySorted());
     }
 
-    bool nosortSecondary()
+    bool noSortOtherSide()
     {
         return (rightpartition?helper->isLeftAlreadySorted():helper->isRightAlreadySorted());
     }
@@ -141,9 +134,8 @@ public:
 #ifdef _TESTING
         started = false;
 #endif
-        in1stopped = true;
-        in2stopped = true;
-        infh = 0;
+        leftInputStopped = true;
+        rightInputStopped = true;
         lhsProgressCount = 0;
         rhsProgressCount = 0;
         mpTagRPC = TAG_NULL;
@@ -155,15 +147,6 @@ public:
             freePort(portbase,NUMSLAVEPORTS);
     }
 
-    struct cCollateReverse: public ICompare
-    {
-        ICompare *collate;
-        int docompare(const void *a,const void *b) const 
-        {
-            return -collate->docompare(b,a);
-        }
-    } collaterev, collaterevupper;
-
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
         if (!islocal) {
@@ -197,10 +180,10 @@ public:
             default:
                 throwUnexpected();
         }
-        compare1 = helper->queryCompareLeft();
-        compare2 = helper->queryCompareRight();
-        keyserializer1 = helper->querySerializeLeft();
-        keyserializer2 = helper->querySerializeRight();
+        leftCompare = helper->queryCompareLeft();
+        rightCompare = helper->queryCompareRight();
+        leftKeySerializer = helper->querySerializeLeft();
+        rightKeySerializer = helper->querySerializeRight();
         if (helper->getJoinFlags()&JFslidingmatch) {
             collate = helper->queryCompareLeftRightLower();
             collateupper = helper->queryCompareLeftRightUpper();
@@ -212,14 +195,8 @@ public:
     }
     virtual void onInputStarted(IException *except)
     {
-        if (rightpartition) {
-            in1exception.set(except);
-            in1startsem.signal();
-        }
-        else {
-            in2exception.set(except);
-            in2startsem.signal();
-        }
+        secondaryStartException.set(except);
+        secondaryStartSem.signal();
     }
     virtual bool startAsync() { return true; }
     virtual void onInputFinished(rowcount_t count)
@@ -227,17 +204,25 @@ public:
         ActPrintLog("JOIN: %s input finished, %"RCPF"d rows read", rightpartition?"LHS":"RHS", count);
     }
 
-    void doDataLinkStart(bool denorm)
+    void doDataLinkStart()
     {
         dataLinkStart();
         CriticalBlock b(joinHelperCrit);
-        if (denorm)
-            joinhelper.setown(createDenormalizeHelper(*this, helperdn, this));
-        else
+        switch(container.getKind())
         {
-            bool hintunsortedoutput = getOptBool(THOROPT_UNSORTED_OUTPUT, JFreorderable & helper->getJoinFlags());
-            bool hintparallelmatch = getOptBool(THOROPT_PARALLEL_MATCH, hintunsortedoutput); // i.e. unsorted, implies use parallel by default, otherwise no point
-            joinhelper.setown(createJoinHelper(*this, helperjn, this, hintparallelmatch, hintunsortedoutput));
+            case TAKjoin:
+            {
+                bool hintunsortedoutput = getOptBool(THOROPT_UNSORTED_OUTPUT, JFreorderable & helper->getJoinFlags());
+                bool hintparallelmatch = getOptBool(THOROPT_PARALLEL_MATCH, hintunsortedoutput); // i.e. unsorted, implies use parallel by default, otherwise no point
+                joinhelper.setown(createJoinHelper(*this, helperjn, this, hintparallelmatch, hintunsortedoutput));
+                break;
+            }
+            case TAKdenormalize:
+            case TAKdenormalizegroup:
+            {
+                joinhelper.setown(createDenormalizeHelper(*this, helperdn, this));
+                break;
+            }
         }
     }
 
@@ -245,114 +230,115 @@ public:
     {
         ActivityTimer s(totalCycles, timeActivities, NULL);
         rightpartition = (container.getKind()==TAKjoin)&&((helper->getJoinFlags()&JFpartitionright)!=0);
-        input1.set(inputs.item(0));
-        input2.set(inputs.item(1));
-        if (rightpartition) {
-#define JOINL_SMART_BUFFER_SIZE JOINR_SMART_BUFFER_SIZE
-            input1.setown(createDataLinkSmartBuffer(this, input1,JOINL_SMART_BUFFER_SIZE,isSmartBufferSpillNeeded(inputs.item(0)->queryFromActivity()),false,RCUNBOUND,this,false,&container.queryJob().queryIDiskUsage()));
-            ActPrintLog("JOIN: Starting L then R");
-            startInput(input1); 
-            in1stopped = false;
-            try { 
-                startInput(input2);
-                in2stopped = false;
-            }
-            catch (IException *e)
-            {
-                fireException(e);
-                barrier->cancel();
-                in1startsem.wait();
-                stopInput1();
-                throw;
-            }
-            in1startsem.wait();
-            if (in1exception) 
-            {
-                IException *e=in1exception.getClear();
-                fireException(e);
-                barrier->cancel();
-                stopInput2();
-                throw e;
-            }
+
+        Linked<IRowInterfaces> primaryRowIf, secondaryRowIf;
+
+        StringAttr primaryInputStr, secondaryInputStr;
+        bool &secondaryInputStopped = rightInputStopped;
+        bool &primaryInputStopped = leftInputStopped;
+        if (rightpartition)
+        {
+            primaryInput.set(inputs.item(1));
+            secondaryInput.set(inputs.item(0));
+            secondaryInputStopped = leftInputStopped;
+            primaryInputStopped = rightInputStopped;
+            primaryInputStr.set("R");
+            secondaryInputStr.set("L");
         }
-        else {
-            input2.setown(createDataLinkSmartBuffer(this, input2,JOINR_SMART_BUFFER_SIZE,isSmartBufferSpillNeeded(inputs.item(1)->queryFromActivity()),false,RCUNBOUND,this,false,&container.queryJob().queryIDiskUsage()));
-            ActPrintLog("JOIN: Starting R then L");
-            startInput(input2); 
-            in2stopped = false;
-            try { 
-                startInput(input1); 
-                in1stopped = false;
-            }
-            catch (IException *e)
-            {
-                fireException(e);
-                if (barrier) barrier->cancel();
-                in2startsem.wait();
-                stopInput2();
-                throw;
-            }
-            in2startsem.wait();
-            if (in2exception) 
-            {
-                IException *e=in2exception.getClear();
-                fireException(e);
-                if (barrier) barrier->cancel();
-                stopInput1();
-                throw e;
-            }
-        }               
-        
-        switch(container.getKind()) {
-        case TAKjoin:
-            doDataLinkStart(false);
-            if (islocal)
-                dolocaljoin();
-            else
-            {
-                if (!doglobaljoin()) {
-                    Sleep(1000); // let original error through
-                    throw MakeThorException(TE_BarrierAborted,"JOIN: Barrier Aborted");
-                }
-            }
-            break;
-        case TAKdenormalize:
-        case TAKdenormalizegroup:
-            doDataLinkStart(true);
-            if (islocal)
-                dolocaljoin();
-            else
+        else
+        {
+            primaryInput.set(inputs.item(0));
+            secondaryInput.set(inputs.item(1));
+            primaryInputStr.set("L");
+            secondaryInputStr.set("R");
+        }
+        ActPrintLog("JOIN partition: %s", primaryInputStr.get());
+
+        secondaryInput.setown(createDataLinkSmartBuffer(this, secondaryInput, JOIN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(secondaryInput->queryFromActivity()),
+                                                    false, RCUNBOUND, this, false, &container.queryJob().queryIDiskUsage()));
+        ActPrintLog("JOIN: Starting %s then %s", secondaryInputStr.get(), primaryInputStr.get());
+        startInput(secondaryInput);
+        secondaryInputStopped = false;
+        try
+        {
+            startInput(primaryInput);
+            primaryInputStopped = false;
+        }
+        catch (IException *e)
+        {
+            fireException(e);
+            barrier->cancel();
+            secondaryStartSem.wait();
+            stopOtherInput();
+            throw;
+        }
+        secondaryStartSem.wait();
+        if (secondaryStartException)
+        {
+            IException *e=secondaryStartException.getClear();
+            fireException(e);
+            barrier->cancel();
+            stopPartitionInput();
+            throw e;
+        }
+        if (rightpartition)
+        {
+            leftInput.set(secondaryInput);
+            rightInput.set(primaryInput);
+        }
+        else
+        {
+            leftInput.set(primaryInput);
+            rightInput.set(secondaryInput);
+        }
+
+        doDataLinkStart();
+        if (islocal)
+            dolocaljoin();
+        else
+        {
+            if (!doglobaljoin())
             {
-                if (!doglobaljoin()) {
-                    Sleep(1000); // let original error through first
-                    throw MakeThorException(TE_BarrierAborted,"DENORMALIZE: Barrier Aborted");
-                }
+                Sleep(1000); // let original error through
+                throw MakeActivityException(this, TE_BarrierAborted, "JOIN: Barrier Aborted");
             }
-            break;
         }
-        if (!strm1.get()||!strm2.get()) {
-            throw MakeThorException(TE_FailedToStartJoinStreams, "Failed to start join streams");
-        }
-        joinhelper->init(strm1, strm2, ::queryRowAllocator(inputs.item(0)),::queryRowAllocator(inputs.item(1)),::queryRowMetaData(inputs.item(0)), &abortSoon);
+        if (!leftStream.get()||!rightStream.get())
+            throw MakeActivityException(this, TE_FailedToStartJoinStreams, "Failed to start join streams");
+        joinhelper->init(leftStream, rightStream, ::queryRowAllocator(inputs.item(0)),::queryRowAllocator(inputs.item(1)),::queryRowMetaData(inputs.item(0)), &abortSoon);
     }
-    void stopInput1()
+    void stopLeftInput()
     {
-        if (!in1stopped) {
-            stopInput(input1, "(L)");
-            in1stopped = true;
+        if (!leftInputStopped) {
+            stopInput(leftInput, "(L)");
+            leftInputStopped = true;
         }
     }
-    void stopInput2()
+    void stopRightInput()
     {
-        if (!in2stopped) {
-            stopInput(input2, "(R)");
-            in2stopped = true;
+        if (!rightInputStopped) {
+            stopInput(rightInput, "(R)");
+            rightInputStopped = true;
         }
     }
+    void stopPartitionInput()
+    {
+        if (rightpartition)
+            stopRightInput();
+        else
+            stopLeftInput();
+    }
+    void stopOtherInput()
+    {
+        if (rightpartition)
+            stopLeftInput();
+        else
+            stopRightInput();
+    }
     void stop() 
     {
-        stopInput1();
-        stopInput2();
+        stopLeftInput();
+        stopRightInput();
         lhsProgressCount = joinhelper->getLhsProgress();
         rhsProgressCount = joinhelper->getRhsProgress();
         {
@@ -360,22 +346,18 @@ public:
             joinhelper.clear();
         }
         ActPrintLog("SortJoinSlaveActivity::stop");
-        strm2.clear();
+        rightStream.clear();
         if (!islocal) {
-            unsigned bn=nosortPrimary()?2:4;
+            unsigned bn=noSortPartitionSide()?2:4;
             ActPrintLog("JOIN waiting barrier.%d",bn);
             barrier->wait(false);
             ActPrintLog("JOIN barrier.%d raised",bn);
             sorter->stopMerge();
         }
-        strm1.clear();
-        if (infh)
-            _close(infh);
-        if (tempname.length())
-            remove(tempname.toCharArray());
+        leftStream.clear();
         dataLinkStop();
-        input1.clear();
-        input2.clear();
+        leftInput.clear();
+        rightInput.clear();
     }
     void reset()
     {
@@ -386,8 +368,8 @@ public:
     void kill()
     {
         sorter.clear();
-        input1.clear();
-        input2.clear();
+        leftInput.clear();
+        rightInput.clear();
         CSlaveActivity::kill();
     }
 
@@ -414,228 +396,138 @@ public:
     void dolocaljoin()
     {
         // NB: old version used to force both sides all to disk
-        Owned<IThorRowLoader> iLoaderL = createThorRowLoader(*this, ::queryRowInterfaces(input1), compare1, stableSort_earlyAlloc, rc_mixed, SPILL_PRIORITY_JOIN);
-        Owned<IThorRowLoader> iLoaderR = createThorRowLoader(*this, ::queryRowInterfaces(input2), compare2, stableSort_earlyAlloc, rc_mixed, SPILL_PRIORITY_JOIN);
+        Owned<IThorRowLoader> iLoaderL = createThorRowLoader(*this, ::queryRowInterfaces(leftInput), leftCompare, stableSort_earlyAlloc, rc_mixed, SPILL_PRIORITY_JOIN);
+        Owned<IThorRowLoader> iLoaderR = createThorRowLoader(*this, ::queryRowInterfaces(rightInput), rightCompare, stableSort_earlyAlloc, rc_mixed, SPILL_PRIORITY_JOIN);
         bool isemptylhs = false;
         if (helper->isLeftAlreadySorted()) {
             ThorDataLinkMetaInfo info;
-            input1->getMetaInfo(info);      
+            leftInput->getMetaInfo(info);
             if (info.totalRowsMax==0) 
                 isemptylhs = true;
             if (rightpartition)
-                strm1.set(input1.get()); // already ungrouped
+                leftStream.set(leftInput.get()); // already ungrouped
             else
-                strm1.setown(createUngroupStream(input1));
+                leftStream.setown(createUngroupStream(leftInput));
         }
         else {
             StringBuffer tmpStr;
-            strm1.setown(iLoaderL->load(input1, abortSoon));
+            leftStream.setown(iLoaderL->load(leftInput, abortSoon));
             isemptylhs = 0 == iLoaderL->numRows();
-            stopInput1();
+            stopLeftInput();
         }
         if (isemptylhs&&((helper->getJoinFlags()&JFrightouter)==0)) {
             ActPrintLog("ignoring RHS as LHS empty");
-            strm2.setown(createNullRowStream());
-            stopInput2();
+            rightStream.setown(createNullRowStream());
+            stopRightInput();
         }
         else if (helper->isRightAlreadySorted()) 
         {
             if (rightpartition)
-                strm2.set(createUngroupStream(input2));
+                rightStream.set(createUngroupStream(rightInput));
             else
-                strm2.set(input2.get()); // already ungrouped
+                rightStream.set(rightInput.get()); // already ungrouped
         }
         else {
-            strm2.setown(iLoaderR->load(input2, abortSoon));
-            stopInput2();
+            rightStream.setown(iLoaderR->load(rightInput, abortSoon));
+            stopRightInput();
         }
     }
     bool doglobaljoin()
     {
-        Linked<IRowInterfaces> rowif1 = queryRowInterfaces(input1);
-        Linked<IRowInterfaces> rowif2 = queryRowInterfaces(input2);
-        // NB two near identical branches here - should be parameterized at some stage
-        if (rightpartition)
+        Linked<IRowInterfaces> primaryRowIf, secondaryRowIf;
+        ICompare *primaryCompare, *secondaryCompare;
+        ISortKeySerializer *primaryKeySerializer, *secondaryKeySerializer;
+        ICompare *primaryCollate, *primaryCollateUpper;
+        struct cCollateReverse: public ICompare
         {
-            ActPrintLog("JOIN partition right");
-            rowcount_t totalrows;
-            collaterev.collate = collate;
-            collaterevupper.collate = collateupper;
-            if (nosortPrimary())
-            {
-                OwnedConstThorRow partitionrow  = input2->ungroupedNextRow();
-                strm2.set(new cRowStreamPlus1Adaptor(input2,partitionrow));
-                sorter->Gather(rowif1,input1,compare1,&collaterev,collateupper?&collaterevupper:NULL,keyserializer2,partitionrow,nosortSecondary(),isUnstable(),abortSoon, rowif2); // keyserializer2 *is* correct
-                partitionrow.clear();
-                stopInput1();
-                if (abortSoon)
-                {
-                    barrier->cancel();
-                    return false;
-                }
-                ActPrintLog("JOIN waiting barrier.1");
-                if (!barrier->wait(false))
-                    return false;
-                ActPrintLog("JOIN barrier.1 raised");
-                strm1.setown(sorter->startMerge(totalrows));
-                return true;
-            }
-            else
+            ICompare *collate;
+            cCollateReverse(ICompare *_collate) : collate(_collate) { }
+            int docompare(const void *a,const void *b) const
             {
-                strm2.set(input2);
-                sorter->Gather(rowif2,input2,compare2,NULL,NULL,keyserializer2,NULL,false,isUnstable(),abortSoon, NULL); 
-                stopInput2();
-                if (abortSoon)
-                {
-                    barrier->cancel();
-                    return false;
-                }
-                ActPrintLog("JOIN waiting barrier.1");
-                if (!barrier->wait(false))
-                    return false;
-                ActPrintLog("JOIN barrier.1 raised"); 
-                Owned<IRowStream> rstrm2 =sorter->startMerge(totalrows);
-
-                GetTempName(tempname.clear(),"joinspill",false); // don't use alt temp dir
-                Owned<IFile> tempf = createIFile(tempname.str());
-                unsigned rwFlags = DEFAULT_RWFLAGS;
-                if (getOptBool(THOROPT_COMPRESS_SPILLS, true))
-                    rwFlags |= rw_compress;
-                Owned<IRowWriter> tmpstrm = createRowWriter(tempf, rowif2, rwFlags);
-                if (!tmpstrm)
-                {
-                    ActPrintLogEx(&queryContainer(), thorlog_null, MCerror, "Cannot open %s", tempname.toCharArray());
-                    throw MakeErrnoException("JoinSlaveActivity::doglobaljoin");
-                }
-                copyRowStream(rstrm2,tmpstrm); 
-                tmpstrm->flush();
-                tmpstrm.clear();
-                rstrm2.clear();
-                try
-                {
-                    strm2.setown(createRowStream(tempf, rowif2, rwFlags));
-
-                    ActPrintLog("JOIN waiting barrier.2");
-                    if (!barrier->wait(false))
-                        return false;
-                    ActPrintLog("JOIN barrier.2 raised");
-                    sorter->stopMerge();
-                    sorter->Gather(rowif1,input1,compare1,&collaterev,collateupper?&collaterevupper:NULL,keyserializer2,NULL,nosortSecondary(),isUnstable(),abortSoon,rowif2); // keyserializer2 *is* correct
-                    stopInput1();
-                    if (abortSoon)
-                    {
-                        barrier->cancel();
-                        return false;
-                    }
-                    ActPrintLog("JOIN waiting barrier.3");
-                    if (!barrier->wait(false))
-                        return false;
-                    ActPrintLog("JOIN barrier.3 raised");
-                    strm1.setown(sorter->startMerge(totalrows));
-                    return true;
-                }
-                catch (IException *)
-                {
-                    if (infh)
-                    {
-                        _close(infh);
-                        infh = 0;
-                    }
-                    throw;
-                }
+                return -collate->docompare(b,a);
             }
+        } collateRev(collate), collateRevUpper(collateupper);
+
+        Owned<IRowStream> secondaryStream, primaryStream;
+        if (rightpartition)
+        {
+            primaryCompare = rightCompare;
+            primaryKeySerializer = rightKeySerializer;
+            secondaryCompare = leftCompare;
+            secondaryKeySerializer = leftKeySerializer;
+            primaryCollate = &collateRev;
+            primaryCollateUpper = collateupper?&collateRevUpper:NULL;
         }
         else
         {
-            rowcount_t totalrows;
-            if (nosortPrimary())
-            {
-                OwnedConstThorRow partitionrow  = input1->ungroupedNextRow();
-                strm1.set(new cRowStreamPlus1Adaptor(input1,partitionrow));
-                sorter->Gather(rowif2,input2,compare2,collate,collateupper,keyserializer1,partitionrow,nosortSecondary(),isUnstable(),abortSoon, rowif1); // keyserializer1 *is* correct
-                partitionrow.clear();
-                stopInput2();
-                if (abortSoon)
-                {
-                    barrier->cancel();
-                    return false;
-                }
-                ActPrintLog("JOIN waiting barrier.1");
-                if (!barrier->wait(false))
-                    return false;
-                ActPrintLog("JOIN barrier.1 raised");
-                strm2.setown(sorter->startMerge(totalrows));
-                return true;
-            }
-            else
+            primaryCompare = leftCompare;
+            primaryKeySerializer = leftKeySerializer;
+            secondaryCompare = rightCompare;
+            secondaryKeySerializer = rightKeySerializer;
+            primaryCollate = collate;
+            primaryCollateUpper = collateupper;
+        }
+        primaryRowIf.set(queryRowInterfaces(primaryInput));
+        secondaryRowIf.set(queryRowInterfaces(secondaryInput));
+
+        OwnedConstThorRow partitionRow;
+        rowcount_t totalrows;
+
+        if (noSortPartitionSide())
+        {
+            partitionRow.setown(primaryInput->ungroupedNextRow());
+            primaryStream.set(new cRowStreamPlus1Adaptor(primaryInput, partitionRow));
+        }
+        else
+        {
+            sorter->Gather(primaryRowIf,primaryInput,primaryCompare,NULL,NULL,primaryKeySerializer,NULL,false,isUnstable(),abortSoon,NULL);
+            stopPartitionInput();
+            if (abortSoon)
             {
-                strm1.set(input1);
-                sorter->Gather(rowif1,input1,compare1,NULL,NULL,keyserializer1,NULL,false,isUnstable(),abortSoon, NULL);
-                stopInput1();
-                if (abortSoon)
-                {
-                    barrier->cancel();
-                    return false;
-                }
-                ActPrintLog("JOIN waiting barrier.1");
-                if (!barrier->wait(false))
-                    return false;
-                ActPrintLog("JOIN barrier.1 raised");
-                Owned<IRowStream> rstrm1 = sorter->startMerge(totalrows);
-
-                // JCSMORE - spill whole of sorted input1 to disk.
-                // it could keep in memory until needed to spill..
-
-                GetTempName(tempname.clear(),"joinspill",false); // don't use alt temp dir
-                Owned<IFile> tempf = createIFile(tempname.str());
-                unsigned rwFlags = DEFAULT_RWFLAGS;
-                if (getOptBool(THOROPT_COMPRESS_SPILLS, true))
-                    rwFlags |= rw_compress;
-                Owned<IRowWriter> tmpstrm = createRowWriter(tempf, rowif1, rwFlags);
-                if (!tmpstrm)
-                {
-                    ActPrintLogEx(&queryContainer(), thorlog_null, MCerror, "Cannot open %s", tempname.toCharArray());
-                    throw MakeErrnoException("JoinSlaveActivity::doglobaljoin");
-                }
-                copyRowStream(rstrm1,tmpstrm); 
-                tmpstrm->flush();
-                tmpstrm.clear();
-                rstrm1.clear();
-                try
-                {
-                    strm1.setown(createRowStream(tempf, rowif1, rwFlags));
-
-                    ActPrintLog("JOIN waiting barrier.2");
-                    if (!barrier->wait(false))
-                        return false;
-                    ActPrintLog("JOIN barrier.2 raised");
-                    sorter->stopMerge();
-                    sorter->Gather(rowif2,input2,compare2,collate,collateupper,keyserializer1,NULL,nosortSecondary(),isUnstable(),abortSoon,rowif1); // keyserializer1 *is* correct
-                    stopInput2();
-                    if (abortSoon)
-                    {
-                        barrier->cancel();
-                        return false;
-                    }
-                    ActPrintLog("JOIN waiting barrier.3");
-                    if (!barrier->wait(false))
-                        return false;
-                    ActPrintLog("JOIN barrier.3 raised");
-                    strm2.setown(sorter->startMerge(totalrows));
-                    return true;
-                }
-                catch (IException *)
-                {
-                    if (infh)
-                    {
-                        _close(infh);
-                        infh = 0;
-                    }
-                    throw;
-                }
+                barrier->cancel();
+                return false;
             }
+            ActPrintLog("JOIN waiting barrier.1");
+            if (!barrier->wait(false))
+                return false;
+            ActPrintLog("JOIN barrier.1 raised");
+
+            // primaryWriter will keep as much in memory as possible.
+            Owned<IRowWriterMultiReader> primaryWriter = createOverflowableBuffer(*this, this, false);
+            primaryStream.setown(sorter->startMerge(totalrows));
+            copyRowStream(primaryStream, primaryWriter);
+            primaryStream.setown(primaryWriter->getReader()); // NB: rhsWriter no longer needed after this point
+
+            ActPrintLog("JOIN waiting barrier.2");
+            if (!barrier->wait(false))
+                return false;
+            ActPrintLog("JOIN barrier.2 raised");
+            sorter->stopMerge();
+        }
+        sorter->Gather(secondaryRowIf,secondaryInput,secondaryCompare,primaryCollate,primaryCollateUpper,primaryKeySerializer,partitionRow,noSortOtherSide(),isUnstable(),abortSoon,primaryRowIf); // primaryKeySerializer *is* correct
+        partitionRow.clear();
+        stopOtherInput();
+        if (abortSoon)
+        {
+            barrier->cancel();
+            return false;
+        }
+        ActPrintLog("JOIN waiting barrier.3");
+        if (!barrier->wait(false))
+            return false;
+        ActPrintLog("JOIN barrier.3 raised");
+        secondaryStream.setown(sorter->startMerge(totalrows));
+        if (rightpartition)
+        {
+            leftStream.setown(secondaryStream.getClear());
+            rightStream.setown(primaryStream.getClear());
+        }
+        else
+        {
+            leftStream.setown(primaryStream.getClear());
+            rightStream.setown(secondaryStream.getClear());
         }
-        return false;
+        return true;
     }
     virtual void serializeStats(MemoryBuffer &mb)
     {

+ 3 - 3
thorlcr/thorutil/thbuf.cpp

@@ -624,13 +624,13 @@ class COverflowableBuffer : public CSimpleInterface, implements IRowWriterMultiR
     IRowInterfaces *rowIf;
     Owned<IThorRowCollector> collector;
     Owned<IRowWriter> writer;
-    bool eoi, grouped, shared;
+    bool eoi, shared;
 
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    COverflowableBuffer(CActivityBase &_activity, IRowInterfaces *_rowIf, bool _grouped, bool _shared, unsigned spillPriority)
-        : activity(_activity), rowIf(_rowIf), grouped(_grouped), shared(_shared)
+    COverflowableBuffer(CActivityBase &_activity, IRowInterfaces *_rowIf, bool grouped, bool _shared, unsigned spillPriority)
+        : activity(_activity), rowIf(_rowIf), shared(_shared)
     {
         collector.setown(createThorRowCollector(activity, rowIf, NULL, stableSort_none, rc_mixed, spillPriority, grouped));
         writer.setown(collector->getWriter());

+ 1 - 1
thorlcr/thorutil/thbufdef.hpp

@@ -32,7 +32,7 @@
 #define INDEXWRITE_SMART_BUFFER_SIZE            (0x100000*12)           // 12MB
 #define COUNTPROJECT_SMART_BUFFER_SIZE          (0x100000*12)           // 12MB
 #define ENTH_SMART_BUFFER_SIZE                  (0x100000*12)           // 12MB
-#define JOINR_SMART_BUFFER_SIZE                 (0x100000*12)           // 12MB
+#define JOIN_SMART_BUFFER_SIZE                 (0x100000*12)            // 12MB
 #define LOOKUPJOINL_SMART_BUFFER_SIZE           (0x100000*12)           // 12MB
 #define CATCH_BUFFER_SIZE                       (0x100000*12)           // 12MB
 #define SKIPLIMIT_BUFFER_SIZE                   (0x100000*12)           // 12MB

+ 16 - 12
thorlcr/thorutil/thmem.cpp

@@ -190,11 +190,13 @@ protected:
         if (0 == numRows)
             return false;
 
-        StringBuffer tempname;
-        GetTempName(tempname,"streamspill", true);
-        spillFile.setown(createIFile(tempname.str()));
+        StringBuffer tempName;
+        VStringBuffer tempPrefix("streamspill_%d", activity.queryActivityId());
+        GetTempName(tempName, tempPrefix.str(), true);
+        spillFile.setown(createIFile(tempName.str()));
 
-        rows.save(*spillFile, useCompression); // saves committed rows
+        VStringBuffer spillPrefixStr("SpillableStream(%d)", SPILL_PRIORITY_SPILLABLE_STREAM); // const for now
+        rows.save(*spillFile, useCompression, spillPrefixStr.str()); // saves committed rows
         rows.noteSpilled(numRows);
         return true;
     }
@@ -1181,12 +1183,12 @@ static int callbackSortRev(IInterface **cb2, IInterface **cb1)
     return 1;
 }
 
-rowidx_t CThorSpillableRowArray::save(IFile &iFile, bool useCompression)
+rowidx_t CThorSpillableRowArray::save(IFile &iFile, bool useCompression, const char *tracingPrefix)
 {
     rowidx_t n = numCommitted();
     if (0 == n)
         return 0;
-    ActPrintLog(&activity, "CThorSpillableRowArray::save %"RIPF"d rows", n);
+    ActPrintLog(&activity, "%s: CThorSpillableRowArray::save %"RIPF"d rows", tracingPrefix, n);
 
     if (useCompression)
         assertex(0 == writeCallbacks.ordinality()); // incompatible
@@ -1237,7 +1239,7 @@ rowidx_t CThorSpillableRowArray::save(IFile &iFile, bool useCompression)
     writer->flush();
     offset_t bytesWritten = writer->getPosition();
     writer.clear();
-    ActPrintLog(&activity, "CThorSpillableRowArray::save done, bytes = %"I64F"d", (__int64)bytesWritten);
+    ActPrintLog(&activity, "%s: CThorSpillableRowArray::save done, bytes = %"I64F"d", tracingPrefix, (__int64)bytesWritten);
     return n;
 }
 
@@ -1366,19 +1368,21 @@ protected:
             return false;
 
         totalRows += numRows;
+        StringBuffer tempPrefix, tempName;
         if (iCompare)
         {
             ActPrintLog(&activity, "Sorting %"RIPF"d rows", spillableRows.numCommitted());
             CCycleTimer timer;
             spillableRows.sort(*iCompare, maxCores); // sorts committed rows
             ActPrintLog(&activity, "Sort took: %f", ((float)timer.elapsedMs())/1000);
+            tempPrefix.append("srt");
         }
-
-        StringBuffer tempname;
-        GetTempName(tempname,"srtspill",true);
-        Owned<IFile> iFile = createIFile(tempname.str());
+        tempPrefix.appendf("spill_%d", activity.queryActivityId());
+        GetTempName(tempName, tempPrefix.str(), true);
+        Owned<IFile> iFile = createIFile(tempName.str());
         spillFiles.append(new CFileOwner(iFile.getLink()));
-        spillableRows.save(*iFile, activity.getOptBool(THOROPT_COMPRESS_SPILLS, true)); // saves committed rows
+        VStringBuffer spillPrefixStr("RowCollector(%d)", spillPriority);
+        spillableRows.save(*iFile, activity.getOptBool(THOROPT_COMPRESS_SPILLS, true), spillPrefixStr.str()); // saves committed rows
         spillableRows.noteSpilled(numRows);
 
         ++overflowCount;

+ 1 - 1
thorlcr/thorutil/thmem.hpp

@@ -449,7 +449,7 @@ public:
 
     //A thread calling the following functions must own the lock, or guarantee no other thread will access
     void sort(ICompare & compare, unsigned maxcores);
-    rowidx_t save(IFile &file, bool useCompression);
+    rowidx_t save(IFile &file, bool useCompression, const char *tracingPrefix);
     const void **getBlock(rowidx_t readRows);
     inline void noteSpilled(rowidx_t spilledRows)
     {