Ver código fonte

HPCC-9244 - Improved LOOKUP,MANY scheme

LOOKUP MANY was horrendously slow if there were truly many
duplicates. New scheme avoids pathogenic case and also improves
efficiency of ATMOST processing.

Refactored All vs LOOKUP vs LOOKUP MANY implementations

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 11 anos atrás
pai
commit
056ec23835

+ 2 - 0
system/jlib/jthread.hpp

@@ -138,6 +138,8 @@ public:
 interface IThreaded
 {
     virtual void main() = 0;
+protected:
+    virtual ~IThreaded() {}
 };
 
 // utility class, useful for containing a thread

+ 35 - 5
testing/ecl/key/smartjoin.xml

@@ -32,33 +32,63 @@
  <Row><i>2</i><idl>Y</idl><idr>Y</idr></Row>
 </Dataset>
 <Dataset name='Result 4'>
+ <Row><i>1</i><idl>A</idl><idr>A</idr></Row>
+ <Row><i>1</i><idl>A</idl><idr>B</idr></Row>
+ <Row><i>1</i><idl>A</idl><idr>C</idr></Row>
+ <Row><i>1</i><idl>B</idl><idr>A</idr></Row>
+ <Row><i>1</i><idl>B</idl><idr>B</idr></Row>
+ <Row><i>1</i><idl>B</idl><idr>C</idr></Row>
+ <Row><i>1</i><idl>C</idl><idr>A</idr></Row>
+ <Row><i>1</i><idl>C</idl><idr>B</idr></Row>
+ <Row><i>1</i><idl>C</idl><idr>C</idr></Row>
+ <Row><i>2</i><idl>X</idl><idr>X</idr></Row>
+ <Row><i>2</i><idl>X</idl><idr>Y</idr></Row>
+ <Row><i>2</i><idl>Y</idl><idr>X</idr></Row>
+ <Row><i>2</i><idl>Y</idl><idr>Y</idr></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><i>1</i><idl>A</idl><idr>A</idr></Row>
+ <Row><i>1</i><idl>A</idl><idr>B</idr></Row>
+ <Row><i>1</i><idl>A</idl><idr>C</idr></Row>
+ <Row><i>1</i><idl>B</idl><idr>A</idr></Row>
+ <Row><i>1</i><idl>B</idl><idr>B</idr></Row>
+ <Row><i>1</i><idl>B</idl><idr>C</idr></Row>
+ <Row><i>1</i><idl>C</idl><idr>A</idr></Row>
+ <Row><i>1</i><idl>C</idl><idr>B</idr></Row>
+ <Row><i>1</i><idl>C</idl><idr>C</idr></Row>
+ <Row><i>2</i><idl>X</idl><idr>X</idr></Row>
+ <Row><i>2</i><idl>X</idl><idr>Y</idr></Row>
+ <Row><i>2</i><idl>Y</idl><idr>X</idr></Row>
+ <Row><i>2</i><idl>Y</idl><idr>Y</idr></Row>
+</Dataset>
+<Dataset name='Result 6'>
  <Row><i>1</i><idl>A</idl><idr>abc</idr></Row>
  <Row><i>1</i><idl>B</idl><idr>abc</idr></Row>
  <Row><i>1</i><idl>C</idl><idr>abc</idr></Row>
  <Row><i>2</i><idl>X</idl><idr>xy</idr></Row>
  <Row><i>2</i><idl>Y</idl><idr>xy</idr></Row>
 </Dataset>
-<Dataset name='Result 5'>
+<Dataset name='Result 7'>
  <Row><cnt>5</cnt></Row>
 </Dataset>
-<Dataset name='Result 6'>
+<Dataset name='Result 8'>
  <Row><i>1</i><idl>A</idl><idr>ABC</idr></Row>
  <Row><i>1</i><idl>B</idl><idr>ABC</idr></Row>
  <Row><i>1</i><idl>C</idl><idr>ABC</idr></Row>
  <Row><i>2</i><idl>X</idl><idr>XY</idr></Row>
  <Row><i>2</i><idl>Y</idl><idr>XY</idr></Row>
 </Dataset>
-<Dataset name='Result 7'>
+<Dataset name='Result 9'>
  <Row><i>1</i><idl>A</idl><idr>abc</idr></Row>
  <Row><i>1</i><idl>B</idl><idr>abc</idr></Row>
  <Row><i>1</i><idl>C</idl><idr>abc</idr></Row>
  <Row><i>2</i><idl>X</idl><idr>xy</idr></Row>
  <Row><i>2</i><idl>Y</idl><idr>xy</idr></Row>
 </Dataset>
-<Dataset name='Result 8'>
+<Dataset name='Result 10'>
  <Row><cnt>5</cnt></Row>
 </Dataset>
-<Dataset name='Result 9'>
+<Dataset name='Result 11'>
  <Row><i>1</i><idl>A</idl><idr>ABC</idr></Row>
  <Row><i>1</i><idl>B</idl><idr>ABC</idr></Row>
  <Row><i>1</i><idl>C</idl><idr>ABC</idr></Row>

+ 4 - 0
testing/ecl/smartjoin.ecl

@@ -41,6 +41,8 @@ gr1 := GROUP(ds1, i, id);
 j1 := JOIN(ds1, ds2, LEFT.i = RIGHT.i, trans(LEFT, RIGHT), SMART);
 j2 := JOIN(gr1, ds2, LEFT.i = RIGHT.i, trans(LEFT, RIGHT), SMART);
 j3 := JOIN(gr1, gr1, LEFT.i = RIGHT.i, trans(LEFT, RIGHT), SMART);
+j4 := JOIN(gr1, gr1, LEFT.i = RIGHT.i, trans(LEFT, RIGHT), SMART, HINT(lkjoin_localfailover));
+j5 := JOIN(gr1, gr1, LEFT.i = RIGHT.i, trans(LEFT, RIGHT), SMART, HINT(lkjoin_hashjoinfailover));
 
 p1 := PROJECT(ds1, createOut(LEFT.i, LEFT.id, ''));
 gp1 := GROUP(p1, i, idL);
@@ -75,6 +77,8 @@ sequential(
     output(SORT(j1, i)),
     output(SORT(TABLE(j2, { cnt := COUNT(GROUP) }), cnt)),  // check output is not grouped
     output(SORT(j3, i)),
+    output(SORT(j4, i)),
+    output(SORT(j5, i)),
     output(SORT(d1, i)),
     output(SORT(TABLE(d2, { cnt := COUNT(GROUP) }), cnt)),  // check output is not grouped
     output(SORT(d3, i)),

+ 1 - 1
thorlcr/activities/filter/thfilterslave.cpp

@@ -248,7 +248,7 @@ public:
 
     CFilterGroupSlaveActivity(CGraphElementBase *container) : CFilterSlaveActivityBase(container), CThorSteppable(this)
     {
-        groupLoader.setown(createThorRowLoader(*this, NULL, false, rc_allMem));
+        groupLoader.setown(createThorRowLoader(*this, NULL, stableSort_none, rc_allMem));
     }
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {

+ 1 - 1
thorlcr/activities/group/thgroupslave.cpp

@@ -88,7 +88,7 @@ public:
 
         if (rolloverEnabled && !firstNode())  // 1st node can have nothing to send
         {
-            Owned<IThorRowCollector> collector = createThorRowCollector(*this, NULL, false, rc_mixed, SPILL_PRIORITY_SPILLABLE_STREAM);
+            Owned<IThorRowCollector> collector = createThorRowCollector(*this, NULL, stableSort_none, rc_mixed, SPILL_PRIORITY_SPILLABLE_STREAM);
             Owned<IRowWriter> writer = collector->getWriter();
             if (next)
             {

+ 2 - 2
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -3351,7 +3351,7 @@ public:
         if (!lhsDistributor)
             lhsDistributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag, false, this));
         Owned<IRowStream> reader = lhsDistributor->connect(queryRowInterfaces(inL), inL, ihashL, icompareL);
-        Owned<IThorRowLoader> loaderL = createThorRowLoader(*this, ::queryRowInterfaces(inL), icompareL, true, rc_allDisk, SPILL_PRIORITY_HASHJOIN);
+        Owned<IThorRowLoader> loaderL = createThorRowLoader(*this, ::queryRowInterfaces(inL), icompareL, stableSort_earlyAlloc, rc_allDisk, SPILL_PRIORITY_HASHJOIN);
         strmL.setown(loaderL->load(reader, abortSoon));
         loaderL.clear();
         reader.clear();
@@ -3362,7 +3362,7 @@ public:
         if (!rhsDistributor)
             rhsDistributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag2, false, this));
         reader.setown(rhsDistributor->connect(queryRowInterfaces(inR), inR, ihashR, icompareR));
-        Owned<IThorRowLoader> loaderR = createThorRowLoader(*this, ::queryRowInterfaces(inR), icompareR, true, rc_mixed, SPILL_PRIORITY_HASHJOIN);;
+        Owned<IThorRowLoader> loaderR = createThorRowLoader(*this, ::queryRowInterfaces(inR), icompareR, stableSort_earlyAlloc, rc_mixed, SPILL_PRIORITY_HASHJOIN);;
         strmR.setown(loaderR->load(reader, abortSoon));
         loaderR.clear();
         reader.clear();

+ 2 - 2
thorlcr/activities/join/thjoinslave.cpp

@@ -414,8 +414,8 @@ public:
     void dolocaljoin()
     {
         // NB: old version used to force both sides all to disk
-        Owned<IThorRowLoader> iLoaderL = createThorRowLoader(*this, ::queryRowInterfaces(input1), compare1, true, rc_mixed, SPILL_PRIORITY_JOIN);
-        Owned<IThorRowLoader> iLoaderR = createThorRowLoader(*this, ::queryRowInterfaces(input2), compare2, true, rc_mixed, SPILL_PRIORITY_JOIN);
+        Owned<IThorRowLoader> iLoaderL = createThorRowLoader(*this, ::queryRowInterfaces(input1), compare1, stableSort_earlyAlloc, rc_mixed, SPILL_PRIORITY_JOIN);
+        Owned<IThorRowLoader> iLoaderR = createThorRowLoader(*this, ::queryRowInterfaces(input2), compare2, stableSort_earlyAlloc, rc_mixed, SPILL_PRIORITY_JOIN);
         bool isemptylhs = false;
         if (helper->isLeftAlreadySorted()) {
             ThorDataLinkMetaInfo info;

+ 22 - 31
thorlcr/activities/lookupjoin/thlookupjoin.cpp

@@ -24,6 +24,17 @@ class CLookupJoinActivityMaster : public CMasterActivity
 {
     mptag_t broadcast2MpTag, lhsDistributeTag, rhsDistributeTag;
 
+    bool isAll() const
+    {
+        switch (container.getKind())
+        {
+            case TAKalljoin:
+            case TAKalldenormalize:
+            case TAKalldenormalizegroup:
+                return true;
+        }
+        return false;
+    }
 public:
     CLookupJoinActivityMaster(CMasterGraphElement * info) : CMasterActivity(info)
     {
@@ -32,14 +43,17 @@ public:
         else
         {
             mpTag = container.queryJob().allocateMPTag(); // NB: base takes ownership and free's
-            broadcast2MpTag = container.queryJob().allocateMPTag();
-            lhsDistributeTag = container.queryJob().allocateMPTag();
-            rhsDistributeTag = container.queryJob().allocateMPTag();
+            if (!isAll())
+            {
+                broadcast2MpTag = container.queryJob().allocateMPTag();
+                lhsDistributeTag = container.queryJob().allocateMPTag();
+                rhsDistributeTag = container.queryJob().allocateMPTag();
+            }
         }
     }
     ~CLookupJoinActivityMaster()
     {
-        if (!container.queryLocal())
+        if (!container.queryLocal() && !isAll())
         {
             container.queryJob().freeMPTag(broadcast2MpTag);
             container.queryJob().freeMPTag(lhsDistributeTag);
@@ -52,35 +66,12 @@ public:
         if (!container.queryLocal())
         {
             serializeMPtag(dst, mpTag);
-            serializeMPtag(dst, broadcast2MpTag);
-            serializeMPtag(dst, lhsDistributeTag);
-            serializeMPtag(dst, rhsDistributeTag);
-        }
-    }
-    void process()
-    {
-        if (!container.queryLocal() && container.queryJob().querySlaves() > 1)
-        {
-            CMessageBuffer msg;
-            unsigned nslaves = container.queryJob().querySlaves();
-            unsigned s = 1;
-            rowcount_t totalCount = 0, slaveCount;
-            for (; s<=nslaves; s++)
+            if (!isAll())
             {
-                if (!receiveMsg(msg, s, mpTag))
-                    return;
-                msg.read(slaveCount);
-                if (RCUNSET == slaveCount)
-                {
-                    totalCount = RCUNSET;
-                    break; // unknown
-                }
-                totalCount += slaveCount;
+                serializeMPtag(dst, broadcast2MpTag);
+                serializeMPtag(dst, lhsDistributeTag);
+                serializeMPtag(dst, rhsDistributeTag);
             }
-            s=1;
-            msg.clear().append(totalCount);
-            for (; s<=nslaves; s++)
-                container.queryJob().queryJobComm().send(msg, s, mpTag);
         }
     }
 };

Diferenças do arquivo suprimidas por serem muito extensas
+ 1440 - 1025
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp


+ 1 - 1
thorlcr/activities/msort/thgroupsortslave.cpp

@@ -60,7 +60,7 @@ public:
         dataLinkStart();
         input = inputs.item(0);
         unsigned spillPriority = container.queryGrouped() ? SPILL_PRIORITY_GROUPSORT : SPILL_PRIORITY_LARGESORT;
-        iLoader.setown(createThorRowLoader(*this, queryRowInterfaces(input), iCompare, !unstable, rc_mixed, spillPriority));
+        iLoader.setown(createThorRowLoader(*this, queryRowInterfaces(input), iCompare, unstable ? stableSort_none : stableSort_earlyAlloc, rc_mixed, spillPriority));
         startInput(input);
         eoi = false;
         if (container.queryGrouped())

+ 1 - 1
thorlcr/activities/msort/thsortu.cpp

@@ -1730,7 +1730,7 @@ public:
         }
         for (unsigned i=0;i<numworkers;i++) 
             delete workers[i];
-        delete workers;
+        delete [] workers;
         ::Release(jhelper);
     }
 

+ 2 - 2
thorlcr/activities/rollup/throllupslave.cpp

@@ -87,7 +87,7 @@ public:
         in = NULL;
         helper = NULL;
         abort = NULL;
-        rowLoader.setown(createThorRowLoader(*activity, NULL, false, rc_allMem));
+        rowLoader.setown(createThorRowLoader(*activity, NULL, stableSort_none, rc_allMem));
     }
 
     void init(IThorDataLink * _in, IHThorDedupArg * _helper, bool _keepLeft, bool * _abort, IStopInput *_iStopInput)
@@ -573,7 +573,7 @@ public:
     {
         helper = (IHThorRollupGroupArg *)queryHelper();
         appendOutputLinked(this);   // adding 'me' to outputs array
-        groupLoader.setown(createThorRowLoader(*this, NULL, false, rc_allMem));
+        groupLoader.setown(createThorRowLoader(*this, NULL, stableSort_none, rc_allMem));
     }
     virtual void start()
     {

+ 1 - 1
thorlcr/activities/selfjoin/thselfjoinslave.cpp

@@ -63,7 +63,7 @@ private:
 #if THOR_TRACE_LEVEL > 5
         ActPrintLog("SELFJOIN: Performing local self-join");
 #endif
-        Owned<IThorRowLoader> iLoader = createThorRowLoader(*this, ::queryRowInterfaces(input), compare, !isUnstable(), rc_mixed, SPILL_PRIORITY_SELFJOIN);
+        Owned<IThorRowLoader> iLoader = createThorRowLoader(*this, ::queryRowInterfaces(input), compare, isUnstable() ? stableSort_none : stableSort_earlyAlloc, rc_mixed, SPILL_PRIORITY_SELFJOIN);
         Owned<IRowStream> rs = iLoader->load(input, abortSoon);
         stopInput(input);
         input = NULL;

+ 2 - 2
thorlcr/msort/tsorts.cpp

@@ -472,7 +472,7 @@ public:
         }
         else
         {
-            collector->setup(&iCompare, isStable, rc_allMem, SPILL_PRIORITY_DISABLE); // must not spill
+            collector->setup(&iCompare, isStable ? stableSort_earlyAlloc : stableSort_none, rc_allMem, SPILL_PRIORITY_DISABLE); // must not spill
             collector->transferRowsIn(localRows);
             collector->ensure((rowidx_t)globalTotal); // pre-expand row array for efficiency
 
@@ -1198,7 +1198,7 @@ public:
         icollate = _icollate?_icollate:_icompare;
         icollateupper = _icollateupper?_icollateupper:icollate;
 
-        Owned<IThorRowLoader> sortedloader = createThorRowLoader(*activity, rowif, nosort?NULL:icompare, isstable, rc_allDiskOrAllMem, SPILL_PRIORITY_SELFJOIN);
+        Owned<IThorRowLoader> sortedloader = createThorRowLoader(*activity, rowif, nosort?NULL:icompare, isstable ? stableSort_earlyAlloc : stableSort_none, rc_allDiskOrAllMem, SPILL_PRIORITY_SELFJOIN);
         Owned<IRowStream> overflowstream;
         memsize_t inMemUsage = 0;
         try

+ 6 - 3
thorlcr/slave/slave.cpp

@@ -232,6 +232,7 @@ void ProcessSlaveActivity::done()
 #include "xmlwrite/thxmlwriteslave.ipp"
 
 CActivityBase *createLookupJoinSlave(CGraphElementBase *container);
+CActivityBase *createAllJoinSlave(CGraphElementBase *container);
 CActivityBase *createXmlParseSlave(CGraphElementBase *container);
 CActivityBase *createKeyDiffSlave(CGraphElementBase *container);
 CActivityBase *createKeyPatchSlave(CGraphElementBase *container);
@@ -440,14 +441,16 @@ public:
             case TAKlookupjoin:
             case TAKlookupdenormalize:
             case TAKlookupdenormalizegroup:
-            case TAKalljoin:
-            case TAKalldenormalize:
-            case TAKalldenormalizegroup:
             case TAKsmartjoin:
             case TAKsmartdenormalize:
             case TAKsmartdenormalizegroup:
                 ret = createLookupJoinSlave(this);
                 break;
+            case TAKalljoin:
+            case TAKalldenormalize:
+            case TAKalldenormalizegroup:
+                ret = createAllJoinSlave(this);
+                break;
             case TAKselfjoin:
                 if (queryLocalOrGrouped())
                     ret = createLocalSelfJoinSlave(this);

+ 1 - 1
thorlcr/thorutil/thbuf.cpp

@@ -632,7 +632,7 @@ public:
     COverflowableBuffer(CActivityBase &_activity, IRowInterfaces *_rowIf, bool _grouped, bool _shared, unsigned spillPriority)
         : activity(_activity), rowIf(_rowIf), grouped(_grouped), shared(_shared)
     {
-        collector.setown(createThorRowCollector(activity, rowIf, NULL, false, rc_mixed, spillPriority, grouped));
+        collector.setown(createThorRowCollector(activity, rowIf, NULL, stableSort_none, rc_mixed, spillPriority, grouped));
         writer.setown(collector->getWriter());
         eoi = false;
     }

+ 66 - 27
thorlcr/thorutil/thmem.cpp

@@ -524,8 +524,8 @@ void CThorExpandingRowArray::doSort(rowidx_t n, void **const rows, ICompare &com
             stableTablePtr = stableTable;
         }
         void **_rows = rows;
-        memcpy(stableTable, _rows, n*sizeof(void **));
-        parqsortvecstable(stableTable, n, compare, (void ***)_rows, maxCores);
+        memcpy(stableTablePtr, _rows, n*sizeof(void **));
+        parqsortvecstable(stableTablePtr, n, compare, (void ***)_rows, maxCores);
         while (n--)
         {
             *_rows = **((void ***)_rows);
@@ -713,6 +713,23 @@ bool CThorExpandingRowArray::appendRows(CThorExpandingRowArray &inRows, bool tak
     return true;
 }
 
+bool CThorExpandingRowArray::appendRows(CThorSpillableRowArray &inRows, bool takeOwnership)
+{
+    rowidx_t num = inRows.numCommitted();
+    if (0 == num)
+        return true;
+    if (numRows+num >= maxRows)
+    {
+        if (!ensure(numRows + num))
+            return false;
+    }
+    const void **newRows = rows+numRows;
+    inRows.transferRowsCopy(newRows, takeOwnership);
+
+    numRows += num;
+    return true;
+}
+
 void CThorExpandingRowArray::clearUnused()
 {
     if (rows)
@@ -1260,6 +1277,27 @@ void CThorSpillableRowArray::swap(CThorSpillableRowArray &other)
     commitRows = otherCommitRows;
 }
 
+void CThorSpillableRowArray::transferRowsCopy(const void **outRows, bool takeOwnership)
+{
+    if (0 == numRows)
+        return;
+    assertex(numRows == commitRows);
+    memcpy(outRows, rows, numRows*sizeof(void **));
+    if (takeOwnership)
+        firstRow = commitRows = numRows = 0;
+    else
+    {
+        const void **lastNewRow = outRows+numRows-1;
+        loop
+        {
+            LinkThorRow(*outRows);
+            if (outRows == lastNewRow)
+                break;
+            outRows++;
+        }
+    }
+}
+
 IRowStream *CThorSpillableRowArray::createRowStream()
 {
     return new CSpillableStream(activity, *this, rowIf, allowNulls);
@@ -1281,9 +1319,10 @@ protected:
     unsigned maxCores;
     unsigned outStreams;
     ICompare *iCompare;
-    bool isStable, preserveGrouping;
+    StableSortFlag stableSort;
+    bool preserveGrouping;
     IRowInterfaces *rowIf;
-    SpinLock readerLock;
+    CriticalSection readerLock;
     bool mmRegistered;
     Owned<CSharedSpillableRowSet> spillableRowSet;
     unsigned options;
@@ -1356,7 +1395,7 @@ protected:
     }
     IRowStream *getStream(CThorExpandingRowArray *allMemRows, memsize_t *memUsage, bool shared)
     {
-        SpinBlock b(readerLock);
+        CriticalBlock b(readerLock);
         if (0 == outStreams)
         {
             spillableRows.flush();
@@ -1480,9 +1519,9 @@ protected:
         }
     }
 public:
-    CThorRowCollectorBase(CActivityBase &_activity, IRowInterfaces *_rowIf, ICompare *_iCompare, bool _isStable, RowCollectorSpillFlags _diskMemMix, unsigned _spillPriority)
+    CThorRowCollectorBase(CActivityBase &_activity, IRowInterfaces *_rowIf, ICompare *_iCompare, StableSortFlag _stableSort, RowCollectorSpillFlags _diskMemMix, unsigned _spillPriority)
         : activity(_activity),
-          rowIf(_rowIf), iCompare(_iCompare), isStable(_isStable), diskMemMix(_diskMemMix), spillPriority(_spillPriority),
+          rowIf(_rowIf), iCompare(_iCompare), stableSort(_stableSort), diskMemMix(_diskMemMix), spillPriority(_spillPriority),
           spillableRows(_activity, _rowIf)
     {
         preserveGrouping = false;
@@ -1495,7 +1534,7 @@ public:
             enableSpillingCallback();
         maxCores = activity.queryMaxCores();
         options = 0;
-        spillableRows.setup(rowIf, false, isStable?stableSort_earlyAlloc:stableSort_none);
+        spillableRows.setup(rowIf, false, stableSort);
     }
     ~CThorRowCollectorBase()
     {
@@ -1533,10 +1572,10 @@ public:
         spillableRows.transferFrom(src);
         enableSpillingCallback();
     }
-    virtual void setup(ICompare *_iCompare, bool _isStable, RowCollectorSpillFlags _diskMemMix, unsigned _spillPriority)
+    virtual void setup(ICompare *_iCompare, StableSortFlag _stableSort, RowCollectorSpillFlags _diskMemMix, unsigned _spillPriority)
     {
         iCompare = _iCompare;
-        isStable = _isStable;
+        stableSort = _stableSort;
         diskMemMix = _diskMemMix;
         spillPriority = _spillPriority;
         if (rc_allMem == diskMemMix)
@@ -1546,7 +1585,7 @@ public:
             mmRegistered = false;
             activity.queryJob().queryRowManager()->removeRowBuffer(this);
         }
-        spillableRows.setup(rowIf, false, isStable?stableSort_earlyAlloc:stableSort_none);
+        spillableRows.setup(rowIf, false, stableSort);
     }
     virtual void ensure(rowidx_t max)
     {
@@ -1602,8 +1641,8 @@ class CThorRowLoader : public CThorRowCollectorBase, implements IThorRowLoader
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CThorRowLoader(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, bool isStable, RowCollectorSpillFlags diskMemMix, unsigned spillPriority)
-        : CThorRowCollectorBase(activity, rowIf, iCompare, isStable, diskMemMix, spillPriority)
+    CThorRowLoader(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix, unsigned spillPriority)
+        : CThorRowCollectorBase(activity, rowIf, iCompare, stableSort, diskMemMix, spillPriority)
     {
     }
 // IThorRowCollectorCommon
@@ -1612,9 +1651,9 @@ public:
     virtual unsigned overflowScale() const { return CThorRowCollectorBase::overflowScale(); }
     virtual void transferRowsOut(CThorExpandingRowArray &dst, bool sort) { CThorRowCollectorBase::transferRowsOut(dst, sort); }
     virtual void transferRowsIn(CThorExpandingRowArray &src) { CThorRowCollectorBase::transferRowsIn(src); }
-    virtual void setup(ICompare *iCompare, bool isStable=false, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50)
+    virtual void setup(ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50)
     {
-        CThorRowCollectorBase::setup(iCompare, isStable, diskMemMix, spillPriority);
+        CThorRowCollectorBase::setup(iCompare, stableSort, diskMemMix, spillPriority);
     }
     virtual void ensure(rowidx_t max) { CThorRowCollectorBase::ensure(max); }
     virtual void setOptions(unsigned options)  { CThorRowCollectorBase::setOptions(options); }
@@ -1630,14 +1669,14 @@ public:
     }
 };
 
-IThorRowLoader *createThorRowLoader(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, bool isStable, RowCollectorSpillFlags diskMemMix, unsigned spillPriority)
+IThorRowLoader *createThorRowLoader(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix, unsigned spillPriority)
 {
-    return new CThorRowLoader(activity, rowIf, iCompare, isStable, diskMemMix, spillPriority);
+    return new CThorRowLoader(activity, rowIf, iCompare, stableSort, diskMemMix, spillPriority);
 }
 
-IThorRowLoader *createThorRowLoader(CActivityBase &activity, ICompare *iCompare, bool isStable, RowCollectorSpillFlags diskMemMix, unsigned spillPriority)
+IThorRowLoader *createThorRowLoader(CActivityBase &activity, ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix, unsigned spillPriority)
 {
-    return createThorRowLoader(activity, &activity, iCompare, isStable, diskMemMix, spillPriority);
+    return createThorRowLoader(activity, &activity, iCompare, stableSort, diskMemMix, spillPriority);
 }
 
 
@@ -1647,8 +1686,8 @@ class CThorRowCollector : public CThorRowCollectorBase, implements IThorRowColle
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CThorRowCollector(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, bool isStable, RowCollectorSpillFlags diskMemMix, unsigned spillPriority)
-        : CThorRowCollectorBase(activity, rowIf, iCompare, isStable, diskMemMix, spillPriority)
+    CThorRowCollector(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix, unsigned spillPriority)
+        : CThorRowCollectorBase(activity, rowIf, iCompare, stableSort, diskMemMix, spillPriority)
     {
     }
 // IThorRowCollectorCommon
@@ -1662,9 +1701,9 @@ public:
     virtual unsigned overflowScale() const { return CThorRowCollectorBase::overflowScale(); }
     virtual void transferRowsOut(CThorExpandingRowArray &dst, bool sort) { CThorRowCollectorBase::transferRowsOut(dst, sort); }
     virtual void transferRowsIn(CThorExpandingRowArray &src) { CThorRowCollectorBase::transferRowsIn(src); }
-    virtual void setup(ICompare *iCompare, bool isStable=false, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50)
+    virtual void setup(ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50)
     {
-        CThorRowCollectorBase::setup(iCompare, isStable, diskMemMix, spillPriority);
+        CThorRowCollectorBase::setup(iCompare, stableSort, diskMemMix, spillPriority);
     }
     virtual void ensure(rowidx_t max) { CThorRowCollectorBase::ensure(max); }
     virtual void setOptions(unsigned options) { CThorRowCollectorBase::setOptions(options); }
@@ -1706,16 +1745,16 @@ public:
     }
 };
 
-IThorRowCollector *createThorRowCollector(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, bool isStable, RowCollectorSpillFlags diskMemMix, unsigned spillPriority, bool preserveGrouping)
+IThorRowCollector *createThorRowCollector(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix, unsigned spillPriority, bool preserveGrouping)
 {
-    Owned<IThorRowCollector> collector = new CThorRowCollector(activity, rowIf, iCompare, isStable, diskMemMix, spillPriority);
+    Owned<IThorRowCollector> collector = new CThorRowCollector(activity, rowIf, iCompare, stableSort, diskMemMix, spillPriority);
     collector->setPreserveGrouping(preserveGrouping);
     return collector.getClear();
 }
 
-IThorRowCollector *createThorRowCollector(CActivityBase &activity, ICompare *iCompare, bool isStable, RowCollectorSpillFlags diskMemMix, unsigned spillPriority, bool preserveGrouping)
+IThorRowCollector *createThorRowCollector(CActivityBase &activity, ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix, unsigned spillPriority, bool preserveGrouping)
 {
-    return createThorRowCollector(activity, &activity, iCompare, isStable, diskMemMix, spillPriority, preserveGrouping);
+    return createThorRowCollector(activity, &activity, iCompare, stableSort, diskMemMix, spillPriority, preserveGrouping);
 }
 
 

+ 7 - 5
thorlcr/thorutil/thmem.hpp

@@ -349,6 +349,7 @@ public:
     void transferFrom(CThorSpillableRowArray &src);
     void removeRows(rowidx_t start, rowidx_t n);
     bool appendRows(CThorExpandingRowArray &inRows, bool takeOwnership);
+    bool appendRows(CThorSpillableRowArray &inRows, bool takeOwnership);
     void clearUnused();
     void sort(ICompare &compare, unsigned maxCores);
     void reorder(rowidx_t start, rowidx_t num, rowidx_t *neworder);
@@ -481,6 +482,7 @@ public:
     void deserialize(size32_t sz, const void *buf, bool hasNulls){ CThorExpandingRowArray::deserialize(sz, buf); }
     void deserializeRow(IRowDeserializerSource &in) { CThorExpandingRowArray::deserializeRow(in); }
     bool ensure(rowidx_t requiredRows) { return CThorExpandingRowArray::ensure(requiredRows); }
+    void transferRowsCopy(const void **outRows, bool takeOwnership);
 
     virtual IThorArrayLock &queryLock() { return *this; }
 // IThorArrayLock
@@ -498,7 +500,7 @@ interface IThorRowCollectorCommon : extends IInterface
     virtual unsigned overflowScale() const = 0;
     virtual void transferRowsOut(CThorExpandingRowArray &dst, bool sort=true) = 0;
     virtual void transferRowsIn(CThorExpandingRowArray &src) = 0;
-    virtual void setup(ICompare *iCompare, bool isStable=false, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50) = 0;
+    virtual void setup(ICompare *iCompare, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50) = 0;
     virtual void ensure(rowidx_t max) = 0;
     virtual void setOptions(unsigned options) = 0;
 };
@@ -517,10 +519,10 @@ interface IThorRowCollector : extends IThorRowCollectorCommon
     virtual IRowStream *getStream(bool shared=false, CThorExpandingRowArray *allMemRows=NULL) = 0;
 };
 
-extern graph_decl IThorRowLoader *createThorRowLoader(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare=NULL, bool isStable=false, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT);
-extern graph_decl IThorRowLoader *createThorRowLoader(CActivityBase &activity, ICompare *iCompare=NULL, bool isStable=false, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT);
-extern graph_decl IThorRowCollector *createThorRowCollector(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare=NULL, bool isStable=false, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT, bool preserveGrouping=false);
-extern graph_decl IThorRowCollector *createThorRowCollector(CActivityBase &activity, ICompare *iCompare=NULL, bool isStable=false, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT, bool preserveGrouping=false);
+extern graph_decl IThorRowLoader *createThorRowLoader(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare=NULL, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT);
+extern graph_decl IThorRowLoader *createThorRowLoader(CActivityBase &activity, ICompare *iCompare=NULL, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT);
+extern graph_decl IThorRowCollector *createThorRowCollector(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare=NULL, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT, bool preserveGrouping=false);
+extern graph_decl IThorRowCollector *createThorRowCollector(CActivityBase &activity, ICompare *iCompare=NULL, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT, bool preserveGrouping=false);