Przeglądaj źródła

HPCC-17181 Refactor aggregate HT to use Roxiemem+merge sort locals

Previously hash aggregates, were creating a rowbuilder object per new
row on the heap. With a large number of unique values, this created a
lot of heap objects that would be randomly accessed as the hash table
was used. This was particularly expensive (very variable time) during
the merge phase. In most extreme cases it was 60x slower than
fastest run.

This change instead pre-sorts the local hash tables, before performing
a distribute merge. This has the secondary advantage of being able to
stream out the aggregated values on each node, instead of having to
wait for the whole global aggregate to have finished.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 8 lat temu
rodzic
commit
a8d8a35597

+ 1 - 0
rtl/eclrtl/rtlds_imp.hpp

@@ -267,6 +267,7 @@ public:
     }
     inline size32_t getMaxLength() const { return maxLength; }
     inline void * getUnfinalizedClear() { void * ret = self; self = NULL; return ret; }
+    inline void * getUnfinalized() { return self; }
 
     inline void clear() { if (self) { rowAllocator->releaseRow(self); self = NULL; } }
     inline RtlDynamicRowBuilder & setAllocator(IEngineRowAllocator * _rowAllocator) { rowAllocator = _rowAllocator; return *this; }

+ 10 - 7
thorlcr/activities/diskread/thdiskreadslave.cpp

@@ -963,7 +963,8 @@ class CDiskGroupAggregateSlave
 
     IHThorDiskGroupAggregateArg *helper;
     bool gathered, eoi;
-    Owned<RowAggregator> localAggTable;
+    Owned<IAggregateTable> localAggTable;
+    Owned<IRowStream> aggregateStream;
     Owned<IEngineRowAllocator> allocator;
     bool merging;
     Owned<IHashDistributor> distributor;
@@ -1004,8 +1005,8 @@ public:
         ActivityTimer s(totalCycles, timeActivities);
         CDiskReadSlaveActivityRecord::start();
         gathered = eoi = false;
-        localAggTable.setown(new RowAggregator(*helper, *helper));
-        localAggTable->start(queryRowAllocator());
+        localAggTable.setown(createRowAggregator(*this, *helper, *helper));
+        localAggTable->init(queryRowAllocator());
     }
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
     {
@@ -1048,10 +1049,12 @@ public:
 
                 if (!container.queryLocalOrGrouped() && container.queryJob().querySlaves()>1)
                 {
+                    Owned<IRowStream> localAggStream = localAggTable->getRowStream(true);
                     BooleanOnOff onOff(merging);
-                    bool ordered = 0 != (TDRorderedmerge & helper->getFlags());
-                    localAggTable.setown(mergeLocalAggs(distributor, *this, *helper, *helper, localAggTable, mpTag, ordered));
+                    aggregateStream.setown(mergeLocalAggs(distributor, *this, *helper, *helper, localAggStream, mpTag));
                 }
+                else
+                    aggregateStream.setown(localAggTable->getRowStream(false));
             }
             catch (IException *e)
             {
@@ -1060,11 +1063,11 @@ public:
                 throw checkAndCreateOOMContextException(this, e, "aggregating using hash table", localAggTable->elementCount(), queryDiskRowInterfaces()->queryRowMetaData(), NULL);
             }
         }
-        Owned<AggregateRowBuilder> next = localAggTable->nextResult();
+        const void *next = aggregateStream->nextRow();
         if (next)
         {
             dataLinkIncrement();
-            return next->finalizeRowClear();
+            return next;
         }
         eoi = true;
         return NULL;

+ 338 - 112
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -3737,119 +3737,342 @@ public:
 
 //===========================================================================
 
-RowAggregator *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivityBase &activity, IHThorRowAggregator &helper, IHThorHashAggregateExtra &helperExtra, RowAggregator *localAggTable, mptag_t mptag, bool ordered)
+/*
+ * Implements a IAggregateTable, as a hash table.
+ * Create a HT (in roxiemem) that holds {row, hash, size} entries.
+ * Rows being added that match existing elements will be merged via the supplied IHThorRowAggregator methods.
+ * The resulting aggregated rows can be retrieved as an unsorted IRowStream or
+ * as a sorted IRowStream, in which case the implementation will re-purpose the HT memory as a linear row
+ * array and sort in place and return a IRowStream based on sorted rows.
+ */
+class CAggregateHT : public CSimpleInterfaceOf<IAggregateTable>, implements IRowStream
 {
-    Owned<IRowStream> strm;
-    Owned<RowAggregator> globalAggTable = new RowAggregator(helperExtra, helper);
-    globalAggTable->start(activity.queryRowAllocator());
-    __int64 readCount = 0;
-    if (ordered)
-    {
-        class CRowAggregatedStream : implements IRowStream, public CInterface
-        {
-            CActivityBase &activity;
-            IThorRowInterfaces *rowIf;
-            Linked<RowAggregator> localAggregated;
-            RtlDynamicRowBuilder outBuilder;
-            size32_t node;
-        public:
-            IMPLEMENT_IINTERFACE;
-            CRowAggregatedStream(CActivityBase &_activity, IThorRowInterfaces *_rowIf, RowAggregator *_localAggregated) : activity(_activity), rowIf(_rowIf), localAggregated(_localAggregated), outBuilder(_rowIf->queryRowAllocator())
-            {
-                node = activity.queryContainer().queryJobChannel().queryMyRank();
-            }
-            // IRowStream impl.
-            virtual const void *nextRow()
+    struct HTEntry
+    {
+        void *row;
+        unsigned hash;
+        size32_t size;
+    };
+    CActivityBase &activity;
+    IEngineRowAllocator *rowAllocator = nullptr;
+    IHash * hasher = nullptr;
+    IHash * elementHasher = nullptr;
+    ICompare * comparer = nullptr;
+    ICompare * elementComparer = nullptr;
+    IHThorRowAggregator & helper;
+
+    unsigned htn = 0;
+    unsigned n = 0;
+    unsigned iPos = 0;
+    HTEntry *table = nullptr;
+
+    void expand()
+    {
+        HTEntry *t = table;
+        HTEntry *endT = table+htn;
+        htn += htn;
+        HTEntry *newTable = (HTEntry *)activity.queryRowManager()->allocate(((memsize_t)htn)*sizeof(HTEntry), activity.queryContainer().queryId());
+        // could check capacity and see if higher pow2
+        memset(newTable, 0, sizeof(HTEntry)*htn);
+        while (t != endT)
+        {
+            if (t->row)
             {
-                Owned<AggregateRowBuilder> next = localAggregated->nextResult();
-                if (!next) return NULL;
-                byte *outPtr = outBuilder.getSelf();
-                memcpy(outPtr, &node, sizeof(node));
-                const void *nextRow = next->finalizeRowClear();
-                memcpy(outPtr+sizeof(node), &nextRow, sizeof(const void *));
-                return outBuilder.finalizeRowClear(sizeof(node)+sizeof(const void *));
+                unsigned i = t->hash & (htn - 1);
+                while (newTable[i].row)
+                {
+                    i++;
+                    if (i==htn)
+                        i = 0;
+                }
+                newTable[i] = *t;
             }
-            virtual void stop() { }
-        };
-        Owned<IOutputMetaData> nodeRowMeta = createOutputMetaDataWithChildRow(activity.queryRowAllocator(), sizeof(size32_t));
-        Owned<IThorRowInterfaces> nodeRowMetaRowIf = activity.createRowInterfaces(nodeRowMeta);
-        Owned<IRowStream> localAggregatedStream = new CRowAggregatedStream(activity, nodeRowMetaRowIf, localAggTable);
-        class CNodeCompare : implements ICompare, implements IHash
+            ++t;
+        }
+        ReleaseThorRow(table);
+        table = newTable;
+    }
+    inline unsigned find(const void *row, unsigned h, ICompare *cmp)
+    {
+        unsigned i = h & (htn - 1);
+        while (true)
         {
-            IHash *baseHash;
-        public:
-            CNodeCompare(IHash *_baseHash) : baseHash(_baseHash) { }
-            virtual int docompare(const void *l,const void *r) const
+            HTEntry &ht = table[i];
+            if (nullptr == ht.row)
+                return i;
+            if ((table[i].hash==h) && (0 == cmp->docompare(row, ht.row)))
+                return i;
+            if (++i==htn)
+                i = 0;
+        }
+    }
+    inline void addNew(unsigned i, unsigned h, void *row, size32_t sz)
+    {
+        HTEntry *ht;
+        if (n >= ((htn * 3) / 4)) // if over 75% full
+        {
+            expand();
+            // re-find empty slot
+            i = h & (htn - 1);
+            while (true)
             {
-                size32_t lNode, rNode;
-                memcpy(&lNode, l, sizeof(size32_t));
-                memcpy(&rNode, r, sizeof(size32_t));
-                return (int)lNode-(int)rNode;
+                ht = &table[i];
+                if (nullptr == ht->row)
+                    break;
+                if (++i==htn)
+                    i = 0;
             }
-            virtual unsigned hash(const void *rowMeta)
+        }
+        else
+            ht = &table[i];
+        ht->row = row;
+        ht->hash = h;
+        ht->size = sz;
+        n++;
+    }
+public:
+    IMPLEMENT_IINTERFACE_USING(CSimpleInterfaceOf<IAggregateTable>);
+
+    CAggregateHT(CActivityBase &_activity, IHThorHashAggregateExtra &_extra, IHThorRowAggregator & _helper) : activity(_activity), helper(_helper)
+    {
+        hasher = _extra.queryHash();
+        elementHasher = _extra.queryHashElement();
+        comparer = _extra.queryCompareRowElement();
+        elementComparer = _extra.queryCompareElements();
+        htn = 8;
+        n = 0;
+        table = (HTEntry *)activity.queryRowManager()->allocate(((memsize_t)htn)*sizeof(HTEntry), activity.queryContainer().queryId());
+        // could check capacity and see if higher pow2
+        memset(table, 0, sizeof(HTEntry)*htn);
+    }
+    ~CAggregateHT()
+    {
+        reset();
+        ReleaseThorRow(table);
+    }
+    virtual void reset() override
+    {
+        n = 0;
+        iPos = 0;
+        if (table)
+        {
+            HTEntry *t = table;
+            HTEntry *endT = table+htn;
+            while (t != endT)
             {
-                const void *row;
-                memcpy(&row, ((const byte *)rowMeta)+sizeof(size32_t), sizeof(const void *));
-                return baseHash->hash(row);
+                if (t->row)
+                {
+                    ReleaseRoxieRow(t->row);
+                    t->row = nullptr;
+                }
+                ++t;
             }
-        } nodeCompare(helperExtra.queryHashElement());
-        if (!distributor)
-            distributor.setown(createPullHashDistributor(&activity, activity.queryContainer().queryJobChannel().queryJobComm(), mptag, false, NULL, "MERGEAGGS"));
-        strm.setown(distributor->connect(nodeRowMetaRowIf, localAggregatedStream, &nodeCompare, &nodeCompare));
-        for (;;)
+        }
+        else
         {
-            OwnedConstThorRow rowMeta = strm->nextRow();
-            if (!rowMeta)
-                break;
-            readCount++;
-            const void *row;
-            memcpy(&row, ((const byte *)rowMeta.get())+sizeof(size32_t), sizeof(const void *));
-            globalAggTable->mergeElement(row);
+            htn = 8;
+            table = (HTEntry *)activity.queryRowManager()->allocate(((memsize_t)htn)*sizeof(HTEntry), activity.queryContainer().queryId());
+            // could check capacity and see if higher pow2
+            memset(table, 0, sizeof(HTEntry)*htn);
         }
     }
-    else
+    virtual void init(IEngineRowAllocator *_rowAllocator) override
+    {
+        rowAllocator = _rowAllocator;
+    }
+    // Creates or merges new rows into HT entry as unfinalized rows
+    virtual void addRow(const void *row) override
     {
-        class CRowAggregatedStream : implements IRowStream, public CInterface
+        unsigned h = hasher->hash(row);
+        unsigned i = find(row, h, comparer);
+        HTEntry *ht = &table[i];
+        if (ht->row)
         {
-            Linked<RowAggregator> localAggregated;
-        public:
-            IMPLEMENT_IINTERFACE;
-            CRowAggregatedStream(RowAggregator *_localAggregated) : localAggregated(_localAggregated)
+            RtlDynamicRowBuilder rowBuilder(rowAllocator, ht->size, ht->row);
+            ht->size = helper.processNext(rowBuilder, row);
+            ht->row = rowBuilder.getUnfinalizedClear();
+        }
+        else
+        {
+            RtlDynamicRowBuilder rowBuilder(rowAllocator);
+            helper.clearAggregate(rowBuilder);
+            size32_t sz = helper.processFirst(rowBuilder, row);
+            addNew(i, h, rowBuilder.getUnfinalizedClear(), sz);
+        }
+    }
+    virtual unsigned elementCount() const override
+    {
+        return n;
+    }
+    /* Returns a row stream of the HT rows.
+     * If sorted=true:
+     *  1) Uses the existing HT memory to avoid reallocating new memory
+     *  2) Compacts HT row pointers to front of HT memory space (re-purposed as a row array)
+     *  3) Sorts row array by aggregate key field(s)
+     *  4) Clears CAggregateHT's hash table members and passes ownership of row array memory
+     *     to a IRowStream implementation.
+     */
+    IRowStream *getRowStream(bool sorted) override
+    {
+        if (sorted)
+        {
+            unsigned iPos = 0;
+            void **rowTable = (void **)table;
+            // re-purposing table memory as row ptr table
+            // 1st compact rows in table to the front.
+            while (iPos != htn)
             {
+                HTEntry &ht = table[iPos];
+                if (ht.row)
+                {
+                    *rowTable = rowAllocator->finalizeRow(ht.size, ht.row, ht.size);
+                    ++rowTable;
+                }
+                ++iPos;
             }
-            // IRowStream impl.
-            virtual const void *nextRow()
+            size32_t remainingSz = ((byte *)&table[htn]) - ((byte *)rowTable);
+            memset(rowTable, 0, remainingSz);
+
+            // sort elements
+            unsigned elems = ((byte *)rowTable-(byte *)table)/sizeof(void *);
+            rowTable = (void **)table;
+            parqsortvec(rowTable, elems, *elementComparer, 0);
+
+            // Implements a IRowStream to walk sorted rows that were previously in HT.
+            class CSortedLocalAggregateStream : public CSimpleInterfaceOf<IRowStream>
             {
-                Owned<AggregateRowBuilder> next = localAggregated->nextResult();
-                if (!next) return NULL;
-                return next->finalizeRowClear();
-            }
-            virtual void stop() { }
-        };
-        Owned<IRowStream> localAggregatedStream = new CRowAggregatedStream(localAggTable);
-        if (!distributor)
-            distributor.setown(createHashDistributor(&activity, activity.queryContainer().queryJobChannel().queryJobComm(), mptag, false, NULL, "MERGEAGGS"));
-        Owned<IThorRowInterfaces> rowIf = activity.getRowInterfaces(); // create new rowIF / avoid using activities IRowInterface, otherwise suffer from circular link
-        strm.setown(distributor->connect(rowIf, localAggregatedStream, helperExtra.queryHashElement(), NULL));
-        for (;;)
+                const void **rows;
+                const void **rowsPtr;
+                const void **endRows;
+                unsigned numRows;
+            public:
+                CSortedLocalAggregateStream(const void **_rows, unsigned _numRows) : rows(_rows), numRows(_numRows)
+                {
+                    rowsPtr = rows;
+                    endRows = rowsPtr+numRows;
+                }
+                ~CSortedLocalAggregateStream()
+                {
+                    roxiemem::ReleaseRoxieRowArray(numRows, rows);
+                    ::ReleaseRoxieRow(rows);
+                }
+                // IRowStream
+                virtual const void *nextRow() override
+                {
+                    if (rowsPtr == endRows)
+                        return nullptr;
+                    const void *row = *rowsPtr;
+                    *rowsPtr++ = nullptr;
+                    return row;
+                }
+                virtual void stop() override { }
+            };
+            // NB: CSortedLocalAggregateStream takes ownership of rowTable
+            table = nullptr;
+            n = 0;
+            htn = 0;
+            return new CSortedLocalAggregateStream((const void **)rowTable, elems);
+        }
+        else
+            return LINK(this);
+    }
+    // IRowStream
+    virtual const void *nextRow() override
+    {
+        while (iPos != htn)
         {
-            OwnedConstThorRow row = strm->nextRow();
-            if (!row)
-                break;
-            readCount++;
-            globalAggTable->mergeElement(row);
+            HTEntry &ht = table[iPos++];
+            if (ht.row)
+            {
+                const void * row = rowAllocator->finalizeRow(ht.size, ht.row, ht.size);
+                ht.row = nullptr;
+                return row;
+            }
         }
+        return nullptr;
     }
+    virtual void stop() override { iPos = 0; }
+};
 
-    strm->stop();
-    strm.clear();
-    distributor->disconnect(true);
-    distributor->join();
+IAggregateTable *createRowAggregator(CActivityBase &activity, IHThorHashAggregateExtra &extra, IHThorRowAggregator &helper)
+{
+    return new CAggregateHT(activity, extra, helper);
+}
 
-    activity.ActPrintLog("HASHAGGREGATE: Read %" RCPF "d records to build hash table", readCount);
-    StringBuffer str("HASHAGGREGATE: After distribution merge contains ");
-    activity.ActPrintLog("%s", str.append(globalAggTable->elementCount()).append("entries").str());
-    return globalAggTable.getClear();
+IRowStream *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivityBase &activity, IHThorRowAggregator &helper, IHThorHashAggregateExtra &helperExtra, IRowStream *localAggStream, mptag_t mptag)
+{
+    Owned<IRowStream> strm;
+    ICompare *elementComparer = helperExtra.queryCompareElements();
+    if (!distributor)
+        distributor.setown(createPullHashDistributor(&activity, activity.queryContainer().queryJobChannel().queryJobComm(), mptag, false, NULL, "MERGEAGGS"));
+    Owned<IThorRowInterfaces> rowIf = activity.getRowInterfaces(); // create new rowIF / avoid using activities IRowInterface, otherwise suffer from circular link
+    IHash *hasher = helperExtra.queryHashElement();
+    strm.setown(distributor->connect(rowIf, localAggStream, hasher, elementComparer));
+    IEngineRowAllocator *rowAllocator = activity.queryRowAllocator();
+
+    class CAggregatingStream : public CSimpleInterfaceOf<IRowStream>
+    {
+        size32_t sz = 0;
+        IEngineRowAllocator &rowAllocator;
+        RtlDynamicRowBuilder rowBuilder;
+        Owned<IRowStream> input;
+        ICompare &cmp;
+        IHThorRowAggregator &helper;
+        IHashDistributor &distributor;
+    public:
+        CAggregatingStream(IHThorRowAggregator &_helper, IEngineRowAllocator &_rowAllocator, ICompare &_cmp, IHashDistributor &_distributor)
+            : helper(_helper), rowAllocator(_rowAllocator), cmp(_cmp), distributor(_distributor), rowBuilder(_rowAllocator)
+        {
+        }
+        void start(IRowStream *_input)
+        {
+            input.setown(_input);
+        }
+        // IRowStream
+        virtual const void *nextRow() override
+        {
+            for (;;)
+            {
+                OwnedConstThorRow row = input->nextRow();
+                if (!row)
+                {
+                    if (sz)
+                    {
+                        const void *row = rowBuilder.finalizeRowClear(sz);
+                        sz = 0;
+                        return row;
+                    }
+                    return nullptr;
+                }
+                else if (sz)
+                {
+                    if (0 == cmp.docompare(row, rowBuilder.getUnfinalized()))
+                        sz = helper.mergeAggregate(rowBuilder, row);
+                    else
+                    {
+                        const void *ret = rowBuilder.finalizeRowClear(sz);
+                        sz = cloneRow(rowBuilder, row, rowAllocator.queryOutputMeta());
+                        return ret;
+                    }
+                }
+                else
+                    sz = cloneRow(rowBuilder, row, rowAllocator.queryOutputMeta());
+            }
+            return nullptr;
+        }
+        virtual void stop() override
+        {
+            sz = 0;
+            rowBuilder.clear();
+            input->stop();
+            input.clear();
+            distributor.disconnect(true);
+            distributor.join();
+        }
+    };
+    CAggregatingStream *mergeStrm = new CAggregatingStream(helper, *rowAllocator, *elementComparer, *distributor.get());
+    mergeStrm->start(strm.getClear());
+    return mergeStrm;
 }
 
 #ifdef _MSC_VER
@@ -3862,15 +4085,15 @@ class CHashAggregateSlave : public CSlaveActivity, implements IHThorRowAggregato
 
     IHThorHashAggregateArg *helper;
     mptag_t mptag;
-    Owned<RowAggregator> localAggTable;
+    Owned<IAggregateTable> localAggTable;
     bool eos;
     Owned<IHashDistributor> distributor;
+    Owned<IRowStream> aggregateStream;
 
     bool doNextGroup()
     {
         try
         {
-            localAggTable->start(queryRowAllocator());
             while (!abortSoon)
             {
                 OwnedConstThorRow row = inputStream->nextRow();
@@ -3906,16 +4129,17 @@ public:
             setRequireInitData(false);
         appendOutputLinked(this);
     }
-    virtual void init(MemoryBuffer & data, MemoryBuffer &slaveData)
+    virtual void init(MemoryBuffer & data, MemoryBuffer &slaveData) override
     {
         if (!container.queryLocalOrGrouped())
         {
             mptag = container.queryJobChannel().deserializeMPTag(data);
             ActPrintLog("HASHAGGREGATE: init tags %d",(int)mptag);
         }
-        localAggTable.setown(new RowAggregator(*helper, *helper));
+        localAggTable.setown(createRowAggregator(*this, *helper, *helper));
+        localAggTable->init(queryRowAllocator());
     }
-    virtual void start()
+    virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);
         PARENT::start();
@@ -3924,20 +4148,22 @@ public:
             ActPrintLog("Table before distribution contains %d entries", localAggTable->elementCount());
         if (!container.queryLocalOrGrouped() && container.queryJob().querySlaves()>1)
         {
-            bool ordered = 0 != (TAForderedmerge & helper->getAggregateFlags());
-            localAggTable.setown(mergeLocalAggs(distributor, *this, *helper, *helper, localAggTable, mptag, ordered));
-            ActPrintLog("Table after distribution contains %d entries", localAggTable->elementCount());
+            Owned<IRowStream> localAggStream = localAggTable->getRowStream(true);
+            aggregateStream.setown(mergeLocalAggs(distributor, *this, *helper, *helper, localAggStream, mptag));
         }
+        else
+            aggregateStream.setown(localAggTable->getRowStream(false));
         eos = false;
     }
-    virtual void stop()
+    virtual void stop() override
     {
         ActPrintLog("HASHAGGREGATE: stopping");
         if (localAggTable)
             localAggTable->reset();
+        aggregateStream.clear();
         PARENT::stop();
     }
-    virtual void abort()
+    virtual void abort() override
     {
         CSlaveActivity::abort();
         if (distributor)
@@ -3946,35 +4172,35 @@ public:
     CATCH_NEXTROW()
     {
         ActivityTimer t(totalCycles, timeActivities);
-        if (eos) return NULL;
-        Owned<AggregateRowBuilder> next = localAggTable->nextResult();
+        if (eos) return nullptr;
+        const void *next = aggregateStream->nextRow();
         if (next)
         {
             dataLinkIncrement();
-            return next->finalizeRowClear();
+            return next;
         }
         if (container.queryGrouped())
         {
-            localAggTable->reset();
+            localAggTable->reset(); // NB: aggregateStream is stream of rows from localAggTable
             if (!doNextGroup())
                 eos = true;
         }
         else
             eos = true;
-        return NULL;
+        return nullptr;
     }
     virtual bool isGrouped() const override { return false; }
-    virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
     {
         initMetaInfo(info);
         info.canStall = true;
         // maybe more?
     }
 // IHThorRowAggregator impl. - JCSMORE more until aggregator allows selectInterface to return.
-    virtual size32_t clearAggregate(ARowBuilder & rowBuilder) { return helper->clearAggregate(rowBuilder); }
-    virtual size32_t processFirst(ARowBuilder & rowBuilder, const void * src) { return helper->processFirst(rowBuilder, src); }
-    virtual size32_t processNext(ARowBuilder & rowBuilder, const void * src) { return helper->processNext(rowBuilder, src); }
-    virtual size32_t mergeAggregate(ARowBuilder & rowBuilder, const void * src) { return helper->mergeAggregate(rowBuilder, src); }
+    virtual size32_t clearAggregate(ARowBuilder & rowBuilder) override { return helper->clearAggregate(rowBuilder); }
+    virtual size32_t processFirst(ARowBuilder & rowBuilder, const void * src) override { return helper->processFirst(rowBuilder, src); }
+    virtual size32_t processNext(ARowBuilder & rowBuilder, const void * src) override { return helper->processNext(rowBuilder, src); }
+    virtual size32_t mergeAggregate(ARowBuilder & rowBuilder, const void * src) override { return helper->mergeAggregate(rowBuilder, src); }
 };
 #ifdef _MSC_VER
 #pragma warning(pop)

+ 11 - 1
thorlcr/activities/hashdistrib/thhashdistribslave.ipp

@@ -41,7 +41,17 @@ IHashDistributor *createHashDistributor(
     bool dedup,
     IStopInput *istop, const char *id=NULL); // id optional, used for tracing to identify which distributor if >1 in activity
 
-RowAggregator *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivityBase &activity, IHThorRowAggregator &helper, IHThorHashAggregateExtra &helperExtra, RowAggregator *localAggTable, mptag_t mptag, bool ordered);
+// IAggregateTable allows rows to be added and aggregated and retrieved via a IRowStream
+interface IAggregateTable : extends IInterface
+{
+    virtual void init(IEngineRowAllocator *_rowAllocator) = 0;
+    virtual void reset() = 0;
+    virtual void addRow(const void *row) = 0;
+    virtual unsigned elementCount() const = 0;
+    virtual IRowStream *getRowStream(bool sorted) = 0;
+};
+IAggregateTable *createRowAggregator(CActivityBase &activity, IHThorHashAggregateExtra &extra, IHThorRowAggregator &helper);
+IRowStream *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivityBase &activity, IHThorRowAggregator &helper, IHThorHashAggregateExtra &helperExtra, IRowStream *localAggTable, mptag_t mptag);
 
 activityslaves_decl CActivityBase *createHashDistributeSlave(CGraphElementBase *container);
 activityslaves_decl CActivityBase *createHashDistributeMergeSlave(CGraphElementBase *container);

+ 10 - 7
thorlcr/activities/indexread/thindexreadslave.cpp

@@ -645,7 +645,8 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements
 
     IHThorIndexGroupAggregateArg *helper;
     bool gathered, merging;
-    Owned<RowAggregator> localAggTable;
+    Owned<IAggregateTable> localAggTable;
+    Owned<IRowStream> aggregateStream;
     memsize_t maxMem;
     Owned<IHashDistributor> distributor;
     bool done = false;
@@ -676,8 +677,8 @@ public:
     {
         ActivityTimer s(totalCycles, timeActivities);
         PARENT::start();
-        localAggTable.setown(new RowAggregator(*helper, *helper));
-        localAggTable->start(queryRowAllocator());
+        localAggTable.setown(createRowAggregator(*this, *helper, *helper));
+        localAggTable->init(queryRowAllocator());
         gathered = false;
         done = false;
     }
@@ -708,10 +709,12 @@ public:
                 ActPrintLog("INDEXGROUPAGGREGATE: Local aggregate table contains %d entries", localAggTable->elementCount());
                 if (!container.queryLocal() && container.queryJob().querySlaves()>1)
                 {
+                    Owned<IRowStream> localAggStream = localAggTable->getRowStream(true);
                     BooleanOnOff tf(merging);
-                    bool ordered = 0 != (TDRorderedmerge & helper->getFlags());
-                    localAggTable.setown(mergeLocalAggs(distributor, *this, *helper, *helper, localAggTable, mpTag, ordered));
+                    aggregateStream.setown(mergeLocalAggs(distributor, *this, *helper, *helper, localAggStream, mpTag));
                 }
+                else
+                    aggregateStream.setown(localAggTable->getRowStream(false));
             }
             catch (IException *e)
             {
@@ -720,11 +723,11 @@ public:
                 throw checkAndCreateOOMContextException(this, e, "aggregating using hash table", localAggTable->elementCount(), helper->queryDiskRecordSize(), NULL);
             }
         }
-        Owned<AggregateRowBuilder> next = localAggTable->nextResult();
+        const void *next = aggregateStream->nextRow();
         if (next)
         {
             dataLinkIncrement();
-            return next->finalizeRowClear();
+            return next;
         }
         done = true;
         return NULL;