Преглед на файлове

Merge pull request #7164 from richardkchapman/sort-join-group

HPCC-9228 Roxie/hthor join activities should sort and group their inputs

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday преди 10 години
родител
ревизия
805cef10e3

+ 903 - 0
common/thorhelper/roxiehelper.cpp

@@ -19,6 +19,8 @@
 #include "thorherror.h"
 #include "roxiehelper.hpp"
 #include "roxielmj.hpp"
+#include "roxierow.hpp"
+#include "roxierowbuff.hpp"
 
 #include "jmisc.hpp"
 #include "jfile.hpp"
@@ -411,8 +413,909 @@ bool CRHLimitedCompareHelper::getGroup(OwnedRowArray &group, const void *left)
     }
     return group.ordinality()>0;
 }
+
+//=========================================================================================
+
+// default implementations - can be overridden for efficiency...
+bool ISimpleInputBase::nextGroup(ConstPointerArray & group)
+{
+    // MORE - this should be replaced with a version that reads to a builder
+    const void * next;
+    while ((next = nextInGroup()) != NULL)
+        group.append(next);
+    if (group.ordinality())
+        return true;
+    return false;
+}
+
+void ISimpleInputBase::readAll(RtlLinkedDatasetBuilder &builder)
+{
+    loop
+    {
+        const void *nextrec = nextInGroup();
+        if (!nextrec)
+        {
+            nextrec = nextInGroup();
+            if (!nextrec)
+                break;
+            builder.appendEOG();
+        }
+        builder.appendOwn(nextrec);
+    }
+}
+
+
+//=========================================================================================
+
+// Ability to read an input stream and group and/or sort it on-the-fly
+
+using roxiemem::OwnedConstRoxieRow;
+
+class InputReaderBase  : public CInterfaceOf<IGroupedInput>
+{
+protected:
+    IInputBase *input;
+public:
+    InputReaderBase(IInputBase *_input)
+    : input(_input)
+    {
+    }
+
+    virtual IOutputMetaData * queryOutputMeta() const
+    {
+        return input->queryOutputMeta();
+    }
+};
+
+class GroupedInputReader : public InputReaderBase
+{
+protected:
+    bool firstRead;
+    bool eof;
+    bool endGroupPending;
+    OwnedConstRoxieRow next;
+    const ICompare *compare;
+public:
+    GroupedInputReader(IInputBase *_input, const ICompare *_compare)
+    : InputReaderBase(_input), compare(_compare)
+    {
+        firstRead = false;
+        eof = false;
+        endGroupPending = false;
+    }
+
+    virtual const void *nextInGroup()
+    {
+        if (!firstRead)
+        {
+            firstRead = true;
+            next.setown(input->nextInGroup());
+        }
+
+        if (eof || endGroupPending)
+        {
+            endGroupPending = false;
+            return NULL;
+        }
+
+        OwnedConstRoxieRow prev(next.getClear());
+        next.setown(input->nextUngrouped());  // skip incoming grouping if present
+
+        if (next)
+        {
+            dbgassertex(prev);  // If this fails, you have an initial empty group. That is not legal.
+            if (compare && compare->docompare(prev, next) != 0)
+                endGroupPending = true;
+        }
+        else
+            eof = true;
+        return prev.getClear();
+    }
+};
+
+class DegroupedInputReader : public InputReaderBase
+{
+public:
+    DegroupedInputReader(IInputBase *_input) : InputReaderBase(_input)
+    {
+    }
+    virtual const void *nextInGroup()
+    {
+        return input->nextUngrouped();
+    }
+};
+
+class SortedInputReader : public InputReaderBase
+{
+protected:
+    DegroupedInputReader degroupedInput;
+    Owned<ISortAlgorithm> sorter;
+    bool firstRead;
+public:
+    SortedInputReader(IInputBase *_input, ISortAlgorithm *_sorter)
+      : InputReaderBase(_input), degroupedInput(_input), sorter(_sorter), firstRead(false)
+    {
+        sorter->reset();
+    }
+
+    virtual const void *nextInGroup()
+    {
+        if (!firstRead)
+        {
+            firstRead = true;
+            sorter->prepare(&degroupedInput);
+        }
+        return sorter->next();
+    }
+};
+
+class SortedGroupedInputReader : public SortedInputReader
+{
+protected:
+    bool eof;
+    bool endGroupPending;
+    OwnedConstRoxieRow next;
+    const ICompare *compare;
+public:
+    SortedGroupedInputReader(IInputBase *_input, const ICompare *_compare, ISortAlgorithm *_sorter)
+      : SortedInputReader(_input, _sorter), compare(_compare), eof(false), endGroupPending(false)
+    {
+    }
+
+    virtual const void *nextInGroup()
+    {
+        if (!firstRead)
+        {
+            firstRead = true;
+            sorter->prepare(&degroupedInput);
+            next.setown(sorter->next());
+        }
+
+        if (eof || endGroupPending)
+        {
+            endGroupPending = false;
+            return NULL;
+        }
+
+        OwnedConstRoxieRow prev(next.getClear());
+        next.setown(sorter->next());
+
+        if (next)
+        {
+            dbgassertex(prev);  // If this fails, you have an initial empty group. That is not legal.
+            if (compare->docompare(prev, next) != 0) // MORE - could assert >=0, as input is supposed to be sorted
+                 endGroupPending = true;
+        }
+        else
+            eof = true;
+        return prev.getClear();
+    }
+};
+
+extern IGroupedInput *createGroupedInputReader(IInputBase *_input, const ICompare *_groupCompare)
+{
+    dbgassertex(_input && _groupCompare);
+    return new GroupedInputReader(_input, _groupCompare);
+}
+
+extern IGroupedInput *createDegroupedInputReader(IInputBase *_input)
+{
+    dbgassertex(_input);
+    return new DegroupedInputReader(_input);
+}
+
+extern IGroupedInput *createSortedInputReader(IInputBase *_input, ISortAlgorithm *_sorter)
+{
+    dbgassertex(_input && _sorter);
+    return new SortedInputReader(_input, _sorter);
+}
+
+extern IGroupedInput *createSortedGroupedInputReader(IInputBase *_input, const ICompare *_groupCompare, ISortAlgorithm *_sorter)
+{
+    dbgassertex(_input && _groupCompare && _sorter);
+    return new SortedGroupedInputReader(_input, _groupCompare, _sorter);
+}
+
 //========================================================================================= 
 
+class CQuickSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>
+{
+protected:
+    unsigned curIndex;
+    ConstPointerArray sorted;
+    ICompare *compare;
+
+public:
+    CQuickSortAlgorithm(ICompare *_compare) : compare(_compare)
+    {
+        curIndex = 0;
+    }
+
+    virtual void prepare(IInputBase *input)
+    {
+        curIndex = 0;
+        if (input->nextGroup(sorted))
+            qsortvec(const_cast<void * *>(sorted.getArray()), sorted.ordinality(), *compare);
+    }
+
+    virtual const void *next()
+    {
+        if (sorted.isItem(curIndex))
+            return sorted.item(curIndex++);
+        return NULL;
+    }
+
+    virtual void reset()
+    {
+        while (sorted.isItem(curIndex))
+            ReleaseRoxieRow(sorted.item(curIndex++));
+        curIndex = 0;
+        sorted.kill();
+    }
+};
+
+class CStableQuickSortAlgorithm : public CQuickSortAlgorithm
+{
+public:
+    CStableQuickSortAlgorithm(ICompare *_compare) : CQuickSortAlgorithm(_compare)
+    {
+    }
+    virtual void prepare(IInputBase *input)
+    {
+        curIndex = 0;
+        if (input->nextGroup(sorted))
+        {
+            unsigned numRows = sorted.ordinality();
+            void **rows = const_cast<void * *>(sorted.getArray());
+            MemoryAttr tempAttr(numRows*sizeof(void **)); // Temp storage for stable sort. This should probably be allocated from roxiemem
+            void **temp = (void **) tempAttr.bufferBase();
+            memcpy(temp, rows, numRows*sizeof(void **));
+            qsortvecstable(temp, numRows, *compare, (void ***)rows);
+            for (unsigned i = 0; i < numRows; i++)
+            {
+                *rows = **((void ***)rows);
+                rows++;
+            }
+        }
+    }
+};
+
+#define INSERTION_SORT_BLOCKSIZE 1024
+
+class SortedBlock : public CInterface, implements IInterface
+{
+    unsigned sequence;
+    const void **rows;
+    unsigned length;
+    unsigned pos;
+
+    SortedBlock(const SortedBlock &);
+public:
+    IMPLEMENT_IINTERFACE;
+
+    SortedBlock(unsigned _sequence, roxiemem::IRowManager *rowManager, unsigned activityId) : sequence(_sequence)
+    {
+        rows = (const void **) rowManager->allocate(INSERTION_SORT_BLOCKSIZE * sizeof(void *), activityId);
+        length = 0;
+        pos = 0;
+    }
+
+    ~SortedBlock()
+    {
+        while (pos < length)
+            ReleaseRoxieRow(rows[pos++]);
+        ReleaseRoxieRow(rows);
+    }
+
+    int compareTo(SortedBlock *r, ICompare *compare)
+    {
+        int rc = compare->docompare(rows[pos], r->rows[r->pos]);
+        if (!rc)
+            rc = sequence - r->sequence;
+        return rc;
+    }
+
+    const void *next()
+    {
+        if (pos < length)
+            return rows[pos++];
+        else
+            return NULL;
+    }
+
+    inline bool eof()
+    {
+        return pos==length;
+    }
+
+    bool insert(const void *next, ICompare *_compare )
+    {
+        unsigned b = length;
+        if (b == INSERTION_SORT_BLOCKSIZE)
+            return false;
+        else if (b < 7)
+        {
+            while (b)
+            {
+                if (_compare->docompare(next, rows[b-1]) >= 0)
+                    break;
+                b--;
+            }
+            if (b != length)
+                memmove(&rows[b+1], &rows[b], (length - b) * sizeof(void *));
+            rows[b] = next;
+            length++;
+            return true;
+        }
+        else
+        {
+            unsigned int a = 0;
+            while ((int)a<b)
+            {
+                int i = (a+b)/2;
+                int rc = _compare->docompare(next, rows[i]);
+                if (rc>=0)
+                    a = i+1;
+                else
+                    b = i;
+            }
+            if (a != length)
+                memmove(&rows[a+1], &rows[a], (length - a) * sizeof(void *));
+            rows[a] = next;
+            length++;
+            return true;
+        }
+    }
+};
+
+class CInsertionSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>
+{
+    SortedBlock *curBlock;
+    unsigned blockNo;
+    IArrayOf<SortedBlock> blocks;
+    unsigned activityId;
+    roxiemem::IRowManager *rowManager;
+    ICompare *compare;
+
+    void newBlock()
+    {
+        blocks.append(*curBlock);
+        curBlock = new SortedBlock(blockNo++, rowManager, activityId);
+    }
+
+    inline static int doCompare(SortedBlock &l, SortedBlock &r, ICompare *compare)
+    {
+        return l.compareTo(&r, compare);
+    }
+
+    void makeHeap()
+    {
+        /* Permute blocks to establish the heap property
+           For each element p, the children are p*2+1 and p*2+2 (provided these are in range)
+           The children of p must both be greater than or equal to p
+           The parent of a child c is given by p = (c-1)/2
+        */
+        unsigned i;
+        unsigned n = blocks.length();
+        SortedBlock **s = blocks.getArray();
+        for (i=1; i<n; i++)
+        {
+            SortedBlock * r = s[i];
+            int c = i; /* child */
+            while (c > 0)
+            {
+                int p = (c-1)/2; /* parent */
+                if ( doCompare( blocks.item(c), blocks.item(p), compare ) >= 0 )
+                    break;
+                s[c] = s[p];
+                s[p] = r;
+                c = p;
+            }
+        }
+    }
+
+    void remakeHeap()
+    {
+        /* The row associated with block[0] will have changed
+           This code restores the heap property
+        */
+        unsigned p = 0; /* parent */
+        unsigned n = blocks.length();
+        SortedBlock **s = blocks.getArray();
+        while (1)
+        {
+            unsigned c = p*2 + 1; /* child */
+            if ( c >= n )
+                break;
+            /* Select smaller child */
+            if ( c+1 < n && doCompare( blocks.item(c+1), blocks.item(c), compare ) < 0 ) c += 1;
+            /* If child is greater or equal than parent then we are done */
+            if ( doCompare( blocks.item(c), blocks.item(p), compare ) >= 0 )
+                break;
+            /* Swap parent and child */
+            SortedBlock *r = s[c];
+            s[c] = s[p];
+            s[p] = r;
+            /* child becomes parent */
+            p = c;
+        }
+    }
+
+public:
+    CInsertionSortAlgorithm(ICompare *_compare, roxiemem::IRowManager *_rowManager, unsigned _activityId)
+        : compare(_compare)
+    {
+        rowManager = _rowManager;
+        activityId = _activityId;
+        curBlock = NULL;
+        blockNo = 0;
+    }
+
+    virtual void reset()
+    {
+        blocks.kill();
+        delete curBlock;
+        curBlock = NULL;
+        blockNo = 0;
+    }
+
+    virtual void prepare(IInputBase *input)
+    {
+        blockNo = 0;
+        curBlock = new SortedBlock(blockNo++, rowManager, activityId);
+        loop
+        {
+            const void *next = input->nextInGroup();
+            if (!next)
+                break;
+            if (!curBlock->insert(next, compare))
+            {
+                newBlock();
+                curBlock->insert(next, compare);
+            }
+        }
+        if (blockNo > 1)
+        {
+            blocks.append(*curBlock);
+            curBlock = NULL;
+            makeHeap();
+        }
+    }
+
+    virtual const void * next()
+    {
+        const void *ret;
+        if (blockNo==1) // single block case..
+        {
+            ret = curBlock->next();
+        }
+        else if (blocks.length())
+        {
+            SortedBlock &top = blocks.item(0);
+            ret = top.next();
+            if (top.eof())
+                blocks.replace(blocks.popGet(), 0);
+            remakeHeap();
+        }
+        else
+            ret = NULL;
+        return ret;
+    }
+};
+
+class CHeapSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>
+{
+    unsigned curIndex;
+    ConstPointerArray sorted;
+    bool inputAlreadySorted;
+    IntArray sequences;
+    bool eof;
+    ICompare *compare;
+
+#ifdef _CHECK_HEAPSORT
+    void checkHeap() const
+    {
+        unsigned n = sorted.ordinality();
+        if (n)
+        {
+            ICompare *_compare = compare;
+            void **s = sorted.getArray();
+            int *sq = sequences.getArray();
+            unsigned p;
+#if 0
+            CTXLOG("------------------------%d entries-----------------", n);
+            for (p = 0; p < n; p++)
+            {
+                CTXLOG("HEAP %d: %d %.10s", p, sq[p], s[p] ? s[p] : "..");
+            }
+#endif
+            for (p = 0; p < n; p++)
+            {
+                unsigned c = p*2+1;
+                if (c<n)
+                    assertex(!s[c] || (docompare(p, c, _compare, s, sq) <= 0));
+                c++;
+                if (c<n)
+                    assertex(!s[c] || (docompare(p, c, _compare, s, sq) <= 0));
+            }
+        }
+    }
+#else
+    inline void checkHeap() const {}
+#endif
+
+    const void *removeHeap()
+    {
+        unsigned n = sorted.ordinality();
+        if (n)
+        {
+            const void *ret = sorted.item(0);
+            if (n > 1 && ret)
+            {
+                ICompare *_compare = compare;
+                const void **s = sorted.getArray();
+                int *sq = sequences.getArray();
+                unsigned v = 0; // vacancy
+                loop
+                {
+                    unsigned c = 2*v + 1;
+                    if (c < n)
+                    {
+                        unsigned f = c; // favourite to fill it
+                        c++;
+                        if (c < n && s[c] && (!s[f] || (docompare(f, c, _compare, s, sq) > 0))) // is the smaller of the children
+                            f = c;
+                        sq[v] = sq[f];
+                        if ((s[v] = s[f]) != NULL)
+                            v = f;
+                        else
+                            break;
+                    }
+                    else
+                    {
+                        s[v] = NULL;
+                        break;
+                    }
+                }
+            }
+            checkHeap();
+            return ret;
+        }
+        else
+            return NULL;
+    }
+
+    static inline int docompare(unsigned l, unsigned r, ICompare *_compare, const void **s, int *sq)
+    {
+        int rc = _compare->docompare(s[l], s[r]);
+        if (!rc)
+            rc = sq[l] - sq[r];
+        return rc;
+    }
+
+    void insertHeap(const void *next)
+    {
+        // Upside-down heap sort
+        // Maintain a heap where every parent is lower than each of its children
+        // Root (at node 0) is lowest record seen, nodes 2n+1, 2n+2 are the children
+        // To insert a row, add it at end then keep swapping with parent as long as parent is greater
+        // To remove a row, take row 0, then recreate heap by replacing it with smaller of two children and so on down the tree
+        // Nice features:
+        // 1. Deterministic
+        // 2. Sort time can be overlapped with upstream/downstream processes - there is no delay between receiving last record from input and deliveriing first to output
+        // 3. Already sorted case can be spotted at zero cost while reading.
+        // 4. If you don't read all the results, you don't have to complete the sort
+        // BUT it is NOT stable, so we have to use a parallel array of sequence numbers
+
+        unsigned n = sorted.ordinality();
+        sorted.append(next);
+        sequences.append(n);
+        if (!n)
+            return;
+        ICompare *_compare = compare;
+        const void **s = sorted.getArray();
+        if (inputAlreadySorted)
+        {
+            if (_compare->docompare(next, s[n-1]) >= 0)
+                return;
+            else
+            {
+                // MORE - could delay creating sequences until now...
+                inputAlreadySorted = false;
+            }
+        }
+        int *sq = sequences.getArray();
+        unsigned q = n;
+        while (n)
+        {
+            unsigned parent = (n-1) / 2;
+            const void *p = s[parent];
+            if (_compare->docompare(p, next) <= 0)
+                break;
+            s[n] = p;
+            sq[n] = sq[parent];
+            s[parent] = next;
+            sq[parent] = q;
+            n = parent;
+        }
+    }
+
+public:
+    CHeapSortAlgorithm(ICompare *_compare) : compare(_compare)
+    {
+        inputAlreadySorted = true;
+        curIndex = 0;
+        eof = false;
+    }
+
+    virtual void reset()
+    {
+        eof = false;
+        if (inputAlreadySorted)
+        {
+            while (sorted.isItem(curIndex))
+                ReleaseRoxieRow(sorted.item(curIndex++));
+            sorted.kill();
+        }
+        else
+        {
+            roxiemem::ReleaseRoxieRows(sorted);
+        }
+        inputAlreadySorted = true;
+        sequences.kill();
+    }
+
+    virtual void prepare(IInputBase *input)
+    {
+        inputAlreadySorted = true;
+        curIndex = 0;
+        eof = false;
+        assertex(sorted.ordinality()==0);
+        const void *next = input->nextInGroup();
+        if (!next)
+        {
+            eof = true;
+            return;
+        }
+        loop
+        {
+            insertHeap(next);
+            next = input->nextInGroup();
+            if (!next)
+                break;
+        }
+        checkHeap();
+    }
+
+    virtual const void * next()
+    {
+        if (inputAlreadySorted)
+        {
+            if (sorted.isItem(curIndex))
+            {
+                return sorted.item(curIndex++);
+            }
+            else
+                return NULL;
+        }
+        else
+            return removeHeap();
+    }
+};
+
+class CSpillingQuickSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>, implements roxiemem::IBufferedRowCallback
+{
+    enum {
+        InitialSortElements = 0,
+        //The number of rows that can be added without entering a critical section, and therefore also the number
+        //of rows that might not get freed when memory gets tight.
+        CommitStep=32
+    };
+    roxiemem::DynamicRoxieOutputRowArray rowsToSort;
+    roxiemem::RoxieSimpleInputRowArray sorted;
+    ICompare *compare;
+    roxiemem::IRowManager &rowManager;
+    Owned<IDiskMerger> diskMerger;
+    Owned<IRowStream> diskReader;
+    IOutputMetaData *rowMeta;
+    StringAttr tempDirectory;
+    ICodeContext *ctx;
+    unsigned activityId;
+    bool stable;
+
+public:
+    CSpillingQuickSortAlgorithm(ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId, bool _stable)
+        : rowsToSort(&_rowManager, InitialSortElements, CommitStep, _activityId),
+          rowManager(_rowManager), compare(_compare), rowMeta(_rowMeta), ctx(_ctx), tempDirectory(_tempDirectory), activityId(_activityId), stable(_stable)
+    {
+        rowManager.addRowBuffer(this);
+    }
+    ~CSpillingQuickSortAlgorithm()
+    {
+        rowManager.removeRowBuffer(this);
+        diskReader.clear();
+    }
+
+    virtual void prepare(IInputBase *input)
+    {
+        loop
+        {
+            const void * next = input->nextInGroup();
+            if (!next)
+                break;
+            if (!rowsToSort.append(next))
+            {
+                {
+                    roxiemem::RoxieOutputRowArrayLock block(rowsToSort);
+                    //We should have been called back to free any committed rows, but occasionally it may not (e.g., if
+                    //the problem is global memory is exhausted) - in which case force a spill here (but add any pending
+                    //rows first).
+                    if (rowsToSort.numCommitted() != 0)
+                    {
+                        rowsToSort.flush();
+                        spillRows();
+                    }
+                    //Ensure new rows are written to the head of the array.  It needs to be a separate call because
+                    //spillRows() cannot shift active row pointer since it can be called from any thread
+                    rowsToSort.flush();
+                }
+
+                if (!rowsToSort.append(next))
+                {
+                    ReleaseRoxieRow(next);
+                    throw MakeStringException(ROXIEMM_MEMORY_LIMIT_EXCEEDED, "Insufficient memory to append sort row");
+                }
+            }
+        }
+        rowsToSort.flush();
+
+        roxiemem::RoxieOutputRowArrayLock block(rowsToSort);
+        if (diskMerger)
+        {
+            spillRows();
+            rowsToSort.kill();
+            diskReader.setown(diskMerger->merge(compare));
+        }
+        else
+        {
+            unsigned numRows = rowsToSort.numCommitted();
+            if (numRows)
+            {
+                void ** rows = const_cast<void * *>(rowsToSort.getBlock(numRows));
+                //MORE: Should this be parallel?  Should that be dependent on whether it is grouped?  Should be a hint.
+                if (stable)
+                {
+                    MemoryAttr tempAttr(numRows*sizeof(void **)); // Temp storage for stable sort. This should probably be allocated from roxiemem
+                    void **temp = (void **) tempAttr.bufferBase();
+                    memcpy(temp, rows, numRows*sizeof(void **));
+                    qsortvecstable(temp, numRows, *compare, (void ***)rows);
+                    for (unsigned i = 0; i < numRows; i++)
+                    {
+                        *rows = **((void ***)rows);
+                        rows++;
+                    }
+                }
+                else
+                    qsortvec(rows, numRows, *compare);
+            }
+            sorted.transferFrom(rowsToSort);
+        }
+    }
+
+    virtual const void *next()
+    {
+        if(diskReader)
+            return diskReader->nextRow();
+        return sorted.dequeue();
+    }
+
+    virtual void reset()
+    {
+        //MORE: This could transfer any row pointer from sorted back to rowsToSort. It would trade
+        //fewer heap allocations with not freeing up the memory from large group sorts.
+        rowsToSort.clearRows();
+        sorted.kill();
+        //Disk reader must be cleared before the merger - or the files may still be locked.
+        diskReader.clear();
+        diskMerger.clear();
+    }
+
+//interface roxiemem::IBufferedRowCallback
+    virtual unsigned getSpillCost() const
+    {
+        //Spill global sorts before grouped sorts
+        if (rowMeta->isGrouped())
+            return 20;
+        return 10;
+    }
+    virtual bool freeBufferedRows(bool critical)
+    {
+        roxiemem::RoxieOutputRowArrayLock block(rowsToSort);
+        return spillRows();
+    }
+
+protected:
+    bool spillRows()
+    {
+        unsigned numRows = rowsToSort.numCommitted();
+        if (numRows == 0)
+            return false;
+
+        const void * * rows = rowsToSort.getBlock(numRows);
+        qsortvec(const_cast<void * *>(rows), numRows, *compare);
+
+        Owned<IRowWriter> out = queryMerger()->createWriteBlock();
+        for (unsigned i= 0; i < numRows; i++)
+        {
+            out->putRow(rows[i]);
+        }
+        rowsToSort.noteSpilled(numRows);
+        return true;
+    }
+
+    IDiskMerger * queryMerger()
+    {
+        if (!diskMerger)
+        {
+            unsigned __int64 seq = (memsize_t)this ^ get_cycles_now();
+            StringBuffer spillBasename;
+            spillBasename.append(tempDirectory).append(PATHSEPCHAR).appendf("spill_sort_%" I64F "u", seq);
+            Owned<IRowLinkCounter> linker = new RoxieRowLinkCounter();
+            Owned<IRowInterfaces> rowInterfaces = createRowInterfaces(rowMeta, activityId, ctx);
+            diskMerger.setown(createDiskMerger(rowInterfaces, linker, spillBasename));
+        }
+        return diskMerger;
+    }
+};
+
+extern ISortAlgorithm *createQuickSortAlgorithm(ICompare *_compare)
+{
+    return new CQuickSortAlgorithm(_compare);
+}
+
+extern ISortAlgorithm *createStableQuickSortAlgorithm(ICompare *_compare)
+{
+    return new CStableQuickSortAlgorithm(_compare);
+}
+
+extern ISortAlgorithm *createInsertionSortAlgorithm(ICompare *_compare, roxiemem::IRowManager *_rowManager, unsigned _activityId)
+{
+    return new CInsertionSortAlgorithm(_compare, _rowManager, _activityId);
+}
+
+extern ISortAlgorithm *createHeapSortAlgorithm(ICompare *_compare)
+{
+    return new CHeapSortAlgorithm(_compare);
+}
+
+extern ISortAlgorithm *createSpillingQuickSortAlgorithm(ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId, bool _stable)
+{
+    return new CSpillingQuickSortAlgorithm(_compare, _rowManager, _rowMeta, _ctx, _tempDirectory, _activityId, _stable);
+}
+
+extern ISortAlgorithm *createSortAlgorithm(RoxieSortAlgorithm _algorithm, ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId)
+{
+    switch (_algorithm)
+    {
+    case heapSortAlgorithm:
+        return createHeapSortAlgorithm(_compare);
+    case insertionSortAlgorithm:
+        return createInsertionSortAlgorithm(_compare, &_rowManager, _activityId);
+    case quickSortAlgorithm:
+        return createQuickSortAlgorithm(_compare);
+    case stableQuickSortAlgorithm:
+        return createStableQuickSortAlgorithm(_compare);
+    case spillingQuickSortAlgorithm:
+    case stableSpillingQuickSortAlgorithm:
+        return createSpillingQuickSortAlgorithm(_compare, _rowManager, _rowMeta, _ctx, _tempDirectory, _activityId, _algorithm==stableSpillingQuickSortAlgorithm);
+    default:
+        break;
+    }
+    throwUnexpected();
+}
+
+//===================================================
+
 CSafeSocket::CSafeSocket(ISocket *_sock)
 {
     httpMode = false;

+ 31 - 1
common/thorhelper/roxiehelper.hpp

@@ -77,7 +77,37 @@ public:
     IProperties *queryUrlParameters(){return parameters;}
 };
 
-//========================================================================================= 
+//==============================================================================================================
+
+typedef enum {heapSortAlgorithm, insertionSortAlgorithm, quickSortAlgorithm, stableQuickSortAlgorithm, spillingQuickSortAlgorithm, stableSpillingQuickSortAlgorithm, unknownSortAlgorithm } RoxieSortAlgorithm;
+
+interface ISortAlgorithm : extends IInterface
+{
+    virtual void prepare(IInputBase *input) = 0;
+    virtual const void *next() = 0;
+    virtual void reset() = 0;
+};
+
+extern THORHELPER_API ISortAlgorithm *createQuickSortAlgorithm(ICompare *_compare);
+extern THORHELPER_API ISortAlgorithm *createStableQuickSortAlgorithm(ICompare *_compare);
+extern THORHELPER_API ISortAlgorithm *createInsertionSortAlgorithm(ICompare *_compare, roxiemem::IRowManager *_rowManager, unsigned _activityId);
+extern THORHELPER_API ISortAlgorithm *createHeapSortAlgorithm(ICompare *_compare);
+extern THORHELPER_API ISortAlgorithm *createSpillingQuickSortAlgorithm(ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId, bool _stable);
+
+extern THORHELPER_API ISortAlgorithm *createSortAlgorithm(RoxieSortAlgorithm algorithm, ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId);
+
+//=========================================================================================
+
+interface IGroupedInput : extends IInterface, extends IInputBase
+{
+};
+
+extern THORHELPER_API IGroupedInput *createGroupedInputReader(IInputBase *_input, const ICompare *_groupCompare);
+extern THORHELPER_API IGroupedInput *createDegroupedInputReader(IInputBase *_input);
+extern THORHELPER_API IGroupedInput *createSortedInputReader(IInputBase *_input, ISortAlgorithm *_sorter);
+extern THORHELPER_API IGroupedInput *createSortedGroupedInputReader(IInputBase *_input, const ICompare *_groupCompare, ISortAlgorithm *_sorter);
+
+//=========================================================================================
 
 interface SafeSocket : extends IInterface
 {

+ 10 - 1
common/thorhelper/roxiehelper.ipp

@@ -19,7 +19,7 @@
 #define ROXIEHELPER_IPP
 
 #include "thorhelper.hpp"
-
+#include "rtlds_imp.hpp"
 #include "jlog.hpp"
 
 extern THORHELPER_API unsigned traceLevel;
@@ -30,6 +30,15 @@ extern THORHELPER_API unsigned traceLevel;
 struct ISimpleInputBase //base for IInputBase and IHThorSimpleInput
 {
     virtual const void * nextInGroup() = 0;     // return NULL for eog/eof
+    virtual bool nextGroup(ConstPointerArray & group);      // note: default implementation can be overridden for efficiency...
+    virtual void readAll(RtlLinkedDatasetBuilder &builder); // note: default implementation can be overridden for efficiency...
+    inline const void * nextUngrouped()
+    {
+        const void * ret = nextInGroup();
+        if (!ret)
+            ret = nextInGroup();
+        return ret;
+    };
 };
 
 interface IOutputMetaData;

+ 0 - 58
ecl/hqlcpp/hqlttcpp.cpp

@@ -2569,42 +2569,6 @@ IHqlExpression * ThorHqlTransformer::normalizeCoGroup(IHqlExpression * expr)
     return expr->cloneAllAnnotations(grouped);
 }
 
-static IHqlExpression * getNonThorSortedJoinInput(IHqlExpression * joinExpr, IHqlExpression * dataset, const HqlExprArray & sorts, bool implicitSubSort)
-{
-    if (!sorts.length())
-        return LINK(dataset);
-
-    LinkedHqlExpr expr = dataset;
-    if (isGrouped(expr))
-    {
-        expr.setown(createDataset(no_group, LINK(expr), NULL));
-        expr.setown(cloneInheritedAnnotations(joinExpr, expr));
-    }
-
-    // if already sorted or grouped, use it!
-    OwnedHqlExpr groupOrder = createValueSafe(no_sortlist, makeSortListType(NULL), sorts);
-    groupOrder.setown(replaceSelector(groupOrder, queryActiveTableSelector(), expr->queryNormalizedSelector()));
-
-    //not used for thor, so sort can be local
-    OwnedHqlExpr table = ensureSorted(expr, groupOrder, joinExpr, false, true, true, implicitSubSort, false);
-    if (table != expr)
-        table.setown(cloneInheritedAnnotations(joinExpr, table));
-
-    OwnedHqlExpr group = createDatasetF(no_group, table.getClear(), LINK(groupOrder), NULL);
-    return cloneInheritedAnnotations(joinExpr, group);
-}
-
-
-static bool sameOrGrouped(IHqlExpression * newLeft, IHqlExpression * oldLeft)
-{
-    if (newLeft->queryBody() == oldLeft->queryBody())
-        return true;
-    if (newLeft->getOperator() != no_group)
-        return false;
-    newLeft = newLeft->queryChild(0);
-    return (newLeft->queryBody() == oldLeft->queryBody());
-}
-
 static bool canReorderMatchExistingLocalSort(HqlExprArray & newElements1, HqlExprArray & newElements2, IHqlExpression * ds1, Shared<IHqlExpression> & ds2, const HqlExprArray & elements1, const HqlExprArray & elements2, bool canSubSort, bool isLocal, bool alwaysLocal)
 {
     newElements1.kill();
@@ -3048,28 +3012,6 @@ IHqlExpression * ThorHqlTransformer::normalizeJoinOrDenormalize(IHqlExpression *
                 return expr->clone(args);
             }
         }
-
-        //Ensure that inputs to the activities in hthor/roxie are sorted and grouped.  (Should really be done in the engines)
-        OwnedHqlExpr newLeft = getNonThorSortedJoinInput(expr, leftDs, joinInfo.queryLeftSort(), options.implicitSubSort);
-        OwnedHqlExpr newRight = getNonThorSortedJoinInput(expr, rightDs, joinInfo.queryRightSort(), options.implicitSubSort);
-        try
-        {
-            if ((leftDs != newLeft) || (rightDs != newRight))
-            {
-                HqlExprArray args;
-                args.append(*newLeft.getClear());
-                args.append(*newRight.getClear());
-                unwindChildren(args, expr, 2);
-                args.append(*createAttribute(_normalized_Atom));
-                return expr->clone(args);
-            }
-        }
-        catch (IException * e)
-        {
-            //Couldn't work out the sort orders - shouldn't be fatal because may constant fold later.
-            EXCLOG(e, "Transform");
-            e->Release();
-        }
     }
 
     //Convert hash selfjoin to self-join(distribute)

+ 48 - 26
ecl/hthor/hthor.cpp

@@ -2758,7 +2758,7 @@ const void * CHThorFilterActivity::nextGE(const void * seek, unsigned numFields)
         return ret.getClear();
     }
 
-    return nextUngrouped(this);
+    return nextUngrouped();
 }
 
 bool CHThorFilterActivity::gatherConjunctions(ISteppedConjunctionCollector & collector) 
@@ -2869,7 +2869,7 @@ const void * CHThorFilterGroupActivity::nextGE(const void * seek, unsigned numFi
     else
         eof = true;
 
-    return nextUngrouped(this);
+    return nextUngrouped();
 }
 
 
@@ -3664,7 +3664,7 @@ CHThorDegroupActivity::CHThorDegroupActivity(IAgentContext &_agent, unsigned _ac
 
 const void * CHThorDegroupActivity::nextInGroup()
 {
-    const void * ret = nextUngrouped(input);
+    const void * ret = input->nextUngrouped();
     if (ret)
         processed++;
     return ret;
@@ -4245,7 +4245,19 @@ void CHThorJoinActivity::ready()
 {
     CHThorActivityBase::ready();
     input1->ready();
-
+    bool isStable = (helper.getJoinFlags() & JFunstable) == 0;
+    RoxieSortAlgorithm sortAlgorithm = isStable ? stableSpillingQuickSortAlgorithm : spillingQuickSortAlgorithm;
+    StringBuffer tempBase;
+    agent.getTempfileBase(tempBase);
+    if (helper.isLeftAlreadySorted())
+        sortedLeftInput.setown(createDegroupedInputReader(input));
+    else
+        sortedLeftInput.setown(createSortedInputReader(input, createSortAlgorithm(sortAlgorithm, helper.queryCompareLeft(), *queryRowManager(), input->queryOutputMeta(), agent.queryCodeContext(), tempBase, activityId)));
+    ICompare *compareRight = helper.queryCompareRight();
+    if (helper.isRightAlreadySorted())
+        groupedSortedRightInput.setown(createGroupedInputReader(input1, compareRight));
+    else
+        groupedSortedRightInput.setown(createSortedGroupedInputReader(input1, compareRight, createSortAlgorithm(sortAlgorithm, compareRight, *queryRowManager(), input1->queryOutputMeta(), agent.queryCodeContext(), tempBase, activityId)));
     outBuilder.setAllocator(rowAllocator);
     leftOuterJoin = (helper.getJoinFlags() & JFleftouter) != 0;
     rightOuterJoin = (helper.getJoinFlags() & JFrightouter) != 0;
@@ -4289,7 +4301,7 @@ void CHThorJoinActivity::ready()
     if ((helper.getJoinFlags() & JFlimitedprefixjoin) && helper.getJoinLimit()) 
     {   //Limited Match Join (s[1..n])
         limitedhelper.setown(createRHLimitedCompareHelper());
-        limitedhelper->init( helper.getJoinLimit(), input1, collate, helper.queryPrefixCompare() );
+        limitedhelper->init( helper.getJoinLimit(), groupedSortedRightInput, collate, helper.queryPrefixCompare() );
     }
 }
 
@@ -4299,6 +4311,8 @@ void CHThorJoinActivity::done()
     right.clear();
     left.clear();
     pendingRight.clear();
+    sortedLeftInput.clear();
+    groupedSortedRightInput.clear();
     CHThorActivityBase::done();
     input1->done();
 }
@@ -4341,9 +4355,7 @@ void CHThorJoinActivity::createDefaultRight()
 void CHThorJoinActivity::fillLeft()
 {
     matchedLeft = false;
-    left.setown(input->nextInGroup());
-    if (!left)
-        left.setown(input->nextInGroup());
+    left.setown(sortedLeftInput->nextInGroup()); // NOTE: already degrouped
     if(betweenjoin && left && pendingRight && (collate->docompare(left, pendingRight) >= 0))
         fillRight();
     if (limitedhelper && 0==rightIndex)
@@ -4388,12 +4400,12 @@ void CHThorJoinActivity::fillRight()
         }
         else
         {
-            next.setown(input1->nextInGroup());
+            next.setown(groupedSortedRightInput->nextInGroup());
         }
         if(!rightOuterJoin && next && (!left || (collateupper->docompare(left, next) > 0))) // if right is less than left, and not right outer, can skip group
         {
             while(next) 
-                next.setown(input1->nextInGroup());
+                next.setown(groupedSortedRightInput->nextInGroup());
             continue;
         }
         while(next)
@@ -4420,7 +4432,7 @@ void CHThorJoinActivity::fillRight()
                 right.append(next.getClear());
                 do
                 {
-                    next.setown(input1->nextInGroup());
+                    next.setown(groupedSortedRightInput->nextInGroup());
                 } while(next);
                 break;
             }
@@ -4430,7 +4442,7 @@ void CHThorJoinActivity::fillRight()
                 groupCount = 0;
                 while(next) 
                 {
-                    next.setown(input1->nextInGroup());
+                    next.setown(groupedSortedRightInput->nextInGroup());
                 }
             }
             else
@@ -4438,13 +4450,13 @@ void CHThorJoinActivity::fillRight()
                 right.append(next.getClear());
                 groupCount++;
             }
-            next.setown(input1->nextInGroup());
+            next.setown(groupedSortedRightInput->nextInGroup());
             
         }
         // normally only want to read one right group, but if is between join and next right group is in window for left, need to continue
         if(betweenjoin && left)
         {
-            pendingRight.setown(input1->nextInGroup());
+            pendingRight.setown(groupedSortedRightInput->nextInGroup());
             if(!pendingRight || (collate->docompare(left, pendingRight) < 0))
                 break;
         }
@@ -4845,13 +4857,24 @@ bool CHThorJoinActivity::isGrouped()
 CHThorSelfJoinActivity::CHThorSelfJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorJoinArg &_arg, ThorActivityKind _kind) 
         : CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg), outBuilder(NULL)
 {
+    dualCacheInput = NULL;
 }
 
 void CHThorSelfJoinActivity::ready()
 {
     CHThorActivityBase::ready();
     outBuilder.setAllocator(rowAllocator);
-
+    ICompare *compareLeft = helper.queryCompareLeft();
+    if (helper.isLeftAlreadySorted())
+        groupedInput.setown(createGroupedInputReader(input, compareLeft));
+    else
+    {
+        bool isStable = (helper.getJoinFlags() & JFunstable) == 0;
+        RoxieSortAlgorithm sortAlgorithm = isStable ? stableSpillingQuickSortAlgorithm : spillingQuickSortAlgorithm;
+        StringBuffer tempBase;
+        agent.getTempfileBase(tempBase);
+        groupedInput.setown(createSortedGroupedInputReader(input, compareLeft, createSortAlgorithm(sortAlgorithm, compareLeft, *queryRowManager(), input->queryOutputMeta(), agent.queryCodeContext(), tempBase, activityId)));
+    }
     leftOuterJoin = (helper.getJoinFlags() & JFleftouter) != 0;
     rightOuterJoin = (helper.getJoinFlags() & JFrightouter) != 0;
     exclude = (helper.getJoinFlags() & JFexclude) != 0;
@@ -4899,8 +4922,8 @@ void CHThorSelfJoinActivity::ready()
     if ((helper.getJoinFlags() & JFlimitedprefixjoin) && helper.getJoinLimit()) 
     {   //Limited Match Join (s[1..n])
         dualcache.setown(new CRHDualCache());
-        dualcache->init(input);
-        setInput(0, (IHThorInput *)dualcache->queryOut1());
+        dualcache->init(groupedInput);
+        dualCacheInput = dualcache->queryOut1();
         failingOuterAtmost = false;
         matchedLeft = false;
         leftIndex = 0;
@@ -4916,8 +4939,7 @@ void CHThorSelfJoinActivity::done()
 {
     outBuilder.clear();
     group.clear();
-    if (limitedhelper)
-        input = (IHThorInput*)dualcache->input();
+    groupedInput.clear();
     CHThorActivityBase::done();
 }
 
@@ -4929,7 +4951,7 @@ bool CHThorSelfJoinActivity::fillGroup()
     failingOuterAtmost = false;
     OwnedConstRoxieRow next;
     unsigned groupCount = 0;
-    next.setown(input->nextInGroup());
+    next.setown(groupedInput->nextInGroup());
     while(next)
     {
         if(groupCount==abortLimit)
@@ -4957,7 +4979,7 @@ bool CHThorSelfJoinActivity::fillGroup()
             group.clear();
             groupCount = 0;
             while(next) 
-                next.setown(input->nextInGroup());
+                next.setown(groupedInput->nextInGroup());
         }
         else if(groupCount==atmostLimit)
         {
@@ -4973,7 +4995,7 @@ bool CHThorSelfJoinActivity::fillGroup()
                 group.clear();
                 groupCount = 0;
                 while(next) 
-                    next.setown(input->nextInGroup());
+                    next.setown(groupedInput->nextInGroup());
             }
         }
         else
@@ -4981,7 +5003,7 @@ bool CHThorSelfJoinActivity::fillGroup()
             group.append(next.getClear());
             groupCount++;
         }
-        next.setown(input->nextInGroup());
+        next.setown(groupedInput->nextInGroup());
     }
     if(group.ordinality()==0)
     {
@@ -5005,7 +5027,7 @@ const void * CHThorSelfJoinActivity::nextInGroup()
         {
             if (!group.isItem(rightIndex))
             {
-                lhs.setown(input->nextInGroup());   //get from dualcache
+                lhs.setown(dualCacheInput->nextInGroup());
                 if (lhs)
                 {
                     rightIndex = 0;
@@ -5069,7 +5091,7 @@ const void * CHThorSelfJoinActivity::nextInGroup()
         {
             if(failingLimit || failingOuterAtmost)
             {
-                OwnedConstRoxieRow lhs(input->nextInGroup());
+                OwnedConstRoxieRow lhs(groupedInput->nextInGroup());  // dualCache never active here
                 while(lhs)
                 {
                     const void * ret = joinRecords(lhs, defaultRight, 0, failingLimit);
@@ -5078,7 +5100,7 @@ const void * CHThorSelfJoinActivity::nextInGroup()
                         processed++;
                         return ret;
                     }
-                    lhs.setown(input->nextInGroup());
+                    lhs.setown(groupedInput->nextInGroup());
                 }
                 failingLimit.clear();
             }

+ 5 - 0
ecl/hthor/hthor.ipp

@@ -1235,6 +1235,8 @@ class CHThorJoinActivity : public CHThorActivityBase
     Owned<IException> failingLimit;
     ConstPointerArray filteredRight;
     Owned<IRHLimitedCompareHelper> limitedhelper;
+    Owned<IGroupedInput> sortedLeftInput;
+    Owned<IGroupedInput> groupedSortedRightInput;
 
 //MORE: Following are good candidates for a join base class + others
     OwnedConstRoxieRow defaultLeft;
@@ -1248,6 +1250,7 @@ private:
     void * cloneOrReturnOutput(memsize_t thisSize);
     void fillLeft();
     void fillRight();
+    const void *nextRightInGroup();
     //bool getMatchingRecords();
     //bool queryAdvanceCursors();
     const void * joinRecords(const void * curLeft, const void * curRight, unsigned counter);
@@ -1313,6 +1316,8 @@ class CHThorSelfJoinActivity : public CHThorActivityBase
     Owned<IEngineRowAllocator> defaultAllocator;    
     Owned<IRHLimitedCompareHelper> limitedhelper;
     Owned<CRHDualCache> dualcache;
+    Owned<IGroupedInput> groupedInput;
+    IInputBase *dualCacheInput;
 private:
     bool fillGroup();
     const void * joinRecords(const void * curLeft, const void * curRight, unsigned counter, IException * except);

+ 0 - 2
roxie/ccd/ccdmain.cpp

@@ -75,8 +75,6 @@ bool defaultTimeActivities = true;
 unsigned watchActivityId = 0;
 unsigned testSlaveFailure = 0;
 unsigned restarts = 0;
-bool heapSort = false;
-bool insertionSort = false;
 bool fieldTranslationEnabled = false;
 bool useTreeCopy = true;
 bool mergeSlaveStatistics = true;

+ 3 - 3
roxie/ccd/ccdquery.cpp

@@ -504,7 +504,7 @@ protected:
         case TAKalljoin:
         case TAKalldenormalize:
         case TAKalldenormalizegroup:
-            return createRoxieServerAllJoinActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerAllJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKapply:
             return createRoxieServerApplyActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
         case TAKaggregate:
@@ -649,7 +649,7 @@ protected:
         case TAKjoinlight:
         case TAKdenormalize:
         case TAKdenormalizegroup:
-            return createRoxieServerJoinActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKkeyeddistribute:
             throwUnexpected();  // Code generator should have removed or transformed
         case TAKkeyedjoin:
@@ -709,7 +709,7 @@ protected:
             return createRoxieServerSelectNActivityFactory(id, subgraphId, *this, helperFactory, kind);
         case TAKselfjoin:
         case TAKselfjoinlight:
-            return createRoxieServerSelfJoinActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerSelfJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKskiplimit:
         case TAKcreaterowlimit:
             return createRoxieServerSkipLimitActivityFactory(id, subgraphId, *this, helperFactory, kind);

Файловите разлики са ограничени, защото са твърде много
+ 237 - 1099
roxie/ccd/ccdserver.cpp


+ 3 - 5
roxie/ccd/ccdserver.hpp

@@ -98,8 +98,6 @@ interface IRoxieInput : extends IInterface, extends IInputBase
     virtual void checkAbort() = 0;
     virtual unsigned queryId() const = 0;
 
-    virtual bool nextGroup(ConstPointerArray & group);
-    virtual void readAll(RtlLinkedDatasetBuilder &builder);
     virtual unsigned __int64 queryTotalCycles() const = 0;
     virtual unsigned __int64 queryLocalCycles() const = 0;
     virtual const void * nextSteppedGE(const void * seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra & stepExtra) { throwUnexpected(); }  // can only be called on stepping fields.
@@ -364,7 +362,7 @@ extern IRoxieServerActivityFactory *createRoxieServerDegroupActivityFactory(unsi
 extern IRoxieServerActivityFactory *createRoxieServerSpillReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerDiskWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
 extern IRoxieServerActivityFactory *createRoxieServerIndexWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
+extern IRoxieServerActivityFactory *createRoxieServerJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
 extern IRoxieServerActivityFactory *createRoxieServerDenormalizeActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerConcatActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerMergeActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
@@ -382,9 +380,9 @@ extern IRoxieServerActivityFactory *createRoxieServerProcessActivityFactory(unsi
 extern IRoxieServerActivityFactory *createRoxieServerGroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerFirstNActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerSelectNActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerSelfJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
+extern IRoxieServerActivityFactory *createRoxieServerSelfJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
 extern IRoxieServerActivityFactory *createRoxieServerLookupJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
-extern IRoxieServerActivityFactory *createRoxieServerAllJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
+extern IRoxieServerActivityFactory *createRoxieServerAllJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
 extern IRoxieServerActivityFactory *createRoxieServerTopNActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerLimitActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerSkipLimitActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);

+ 8 - 0
roxie/roxiemem/roxiemem.hpp

@@ -299,6 +299,14 @@ public:
 #define RoxieRowAllocatorId(row) roxiemem::HeapletBase::getAllocatorId(row)
 #define RoxieRowIsShared(row)  roxiemem::HeapletBase::isShared(row)
 
+inline void ReleaseRoxieRows(ConstPointerArray &data)
+{
+    ForEachItemIn(idx, data)
+        ReleaseRoxieRow(data.item(idx));
+    data.kill();
+}
+
+
 class OwnedRoxieRow;
 class OwnedConstRoxieRow
 {