Przeglądaj źródła

HPCC-17777 Fix issue with DICTIONARY's spilling.

Refactor stream spiller and Thor's spilling row array to allow
them to cope with sparse array spilling/readback.
Introduce a sparse graph result type.

This fix avoids a potential crash if job is under memory pressure
and a DICTIONARY result spill is attempted.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 8 lat temu
rodzic
commit
888f1c2e10

+ 84 - 40
common/thorhelper/thorcommon.cpp

@@ -1134,7 +1134,7 @@ protected:
     CThorStreamDeserializerSource source;
     Owned<ISourceRowPrefetcher> prefetcher;
     CThorContiguousRowBuffer prefetchBuffer; // used if prefetcher set
-    bool grouped;
+    EmptyRowSemantics emptyRowSemantics;
     bool eoi;
     bool eos;
     bool eog;
@@ -1156,13 +1156,12 @@ protected:
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CRowStreamReader(IFileIO *_fileio, IMemoryMappedFile *_mmfile, IRowInterfaces *rowif, offset_t _ofs, offset_t _len, bool _tallycrc, bool _grouped)
-        : fileio(_fileio), mmfile(_mmfile), allocator(rowif->queryRowAllocator()), prefetchBuffer(NULL) 
+    CRowStreamReader(IFileIO *_fileio, IMemoryMappedFile *_mmfile, IRowInterfaces *rowif, offset_t _ofs, offset_t _len, bool _tallycrc, EmptyRowSemantics _emptyRowSemantics)
+        : fileio(_fileio), mmfile(_mmfile), allocator(rowif->queryRowAllocator()), prefetchBuffer(NULL), emptyRowSemantics(_emptyRowSemantics)
     {
 #ifdef TRACE_CREATE
         PROGLOG("CRowStreamReader %d = %p",++rdnum,this);
 #endif
-        grouped = _grouped;
         eoi = false;
         eos = false;
         eog = false;
@@ -1175,7 +1174,7 @@ public:
         if (prefetcher)
             prefetchBuffer.setStream(strm);
         source.setStream(strm);
-        deserializer.set(rowif->queryRowDeserializer());            
+        deserializer.set(rowif->queryRowDeserializer());
     }
 
     ~CRowStreamReader()
@@ -1198,35 +1197,60 @@ public:
 
     const void *nextRow()
     {
-        if (eog) {
+        if (eog)
+        {
             eog = false;
-            return NULL;
+            return nullptr;
         }
         if (eos)
-            return NULL;
-        if (source.eos()) {
+            return nullptr;
+        if (source.eos())
+        {
             eos = true;
-            return NULL;
+            return nullptr;
         }
-        RtlDynamicRowBuilder rowBuilder(*allocator);
-        size_t size = deserializer->deserialize(rowBuilder,source);
-        if (grouped) {
+        if (ers_allow == emptyRowSemantics)
+        {
             byte b;
-            source.read(sizeof(b),&b);
-            eog = (b==1);
+            source.read(sizeof(b), &b);
+            if (1==b)
+                return nullptr;
+            RtlDynamicRowBuilder rowBuilder(*allocator);
+            size_t size = deserializer->deserialize(rowBuilder, source);
+            return rowBuilder.finalizeRowClear(size);
+        }
+        else
+        {
+            RtlDynamicRowBuilder rowBuilder(*allocator);
+            size_t size = deserializer->deserialize(rowBuilder, source);
+            if (ers_eogonly == emptyRowSemantics)
+            {
+                byte b;
+                source.read(sizeof(b), &b);
+                eog = (b==1);
+            }
+            return rowBuilder.finalizeRowClear(size);
         }
-        return rowBuilder.finalizeRowClear(size);
     }
 
     const void *prefetchRow(size32_t *sz)
     {
         if (eog) 
             eog = false;
-        else if (!eos) {
+        else if (!eos)
+        {
             if (source.eos()) 
                 eos = true;
-            else {
+            else
+            {
                 assertex(prefetcher);
+                if (ers_allow == emptyRowSemantics)
+                {
+                    byte b;
+                    strm->get(sizeof(b),&b);
+                    if (1==b)
+                        return nullptr;
+                }
                 prefetcher->readAhead(prefetchBuffer);
                 const byte * ret = prefetchBuffer.queryRow();
                 if (sz)
@@ -1242,7 +1266,8 @@ public:
     void prefetchDone()
     {
         prefetchBuffer.finishedRow();
-        if (grouped) {
+        if (ers_eogonly == emptyRowSemantics)
+        {
             byte b;
             strm->get(sizeof(b),&b);
             eog = (b==1);
@@ -1293,8 +1318,8 @@ class CLimitedRowStreamReader : public CRowStreamReader
     unsigned __int64 rownum;
 
 public:
-    CLimitedRowStreamReader(IFileIO *_fileio, IMemoryMappedFile *_mmfile, IRowInterfaces *rowif, offset_t _ofs, offset_t _len, unsigned __int64 _maxrows, bool _tallycrc, bool _grouped)
-        : CRowStreamReader(_fileio, _mmfile, rowif, _ofs, _len, _tallycrc, _grouped)
+    CLimitedRowStreamReader(IFileIO *_fileio, IMemoryMappedFile *_mmfile, IRowInterfaces *rowif, offset_t _ofs, offset_t _len, unsigned __int64 _maxrows, bool _tallycrc, EmptyRowSemantics _emptyRowSemantics)
+        : CRowStreamReader(_fileio, _mmfile, rowif, _ofs, _len, _tallycrc, _emptyRowSemantics)
     {
         maxrows = _maxrows;
         rownum = 0;
@@ -1329,6 +1354,7 @@ bool UseMemoryMappedRead = false;
 IExtRowStream *createRowStreamEx(IFile *file, IRowInterfaces *rowIf, offset_t offset, offset_t len, unsigned __int64 maxrows, unsigned rwFlags, IExpander *eexp)
 {
     bool compressed = TestRwFlag(rwFlags, rw_compress);
+    EmptyRowSemantics emptyRowSemantics = extractESRFromRWFlags(rwFlags);
     if (UseMemoryMappedRead && !compressed)
     {
         PROGLOG("Memory Mapped read of %s",file->queryFilename());
@@ -1336,9 +1362,9 @@ IExtRowStream *createRowStreamEx(IFile *file, IRowInterfaces *rowIf, offset_t of
         if (!mmfile)
             return NULL;
         if (maxrows == (unsigned __int64)-1)
-            return new CRowStreamReader(NULL, mmfile, rowIf, offset, len, TestRwFlag(rwFlags, rw_crc), TestRwFlag(rwFlags, rw_grouped));
+            return new CRowStreamReader(NULL, mmfile, rowIf, offset, len, TestRwFlag(rwFlags, rw_crc), emptyRowSemantics);
         else
-            return new CLimitedRowStreamReader(NULL, mmfile, rowIf, offset, len, maxrows, TestRwFlag(rwFlags, rw_crc), TestRwFlag(rwFlags, rw_grouped));
+            return new CLimitedRowStreamReader(NULL, mmfile, rowIf, offset, len, maxrows, TestRwFlag(rwFlags, rw_crc), emptyRowSemantics);
     }
     else
     {
@@ -1354,9 +1380,9 @@ IExtRowStream *createRowStreamEx(IFile *file, IRowInterfaces *rowIf, offset_t of
         if (!fileio)
             return NULL;
         if (maxrows == (unsigned __int64)-1)
-            return new CRowStreamReader(fileio, NULL, rowIf, offset, len, TestRwFlag(rwFlags, rw_crc), TestRwFlag(rwFlags, rw_grouped));
+            return new CRowStreamReader(fileio, NULL, rowIf, offset, len, TestRwFlag(rwFlags, rw_crc), emptyRowSemantics);
         else
-            return new CLimitedRowStreamReader(fileio, NULL, rowIf, offset, len, maxrows, TestRwFlag(rwFlags, rw_crc), TestRwFlag(rwFlags, rw_grouped));
+            return new CLimitedRowStreamReader(fileio, NULL, rowIf, offset, len, maxrows, TestRwFlag(rwFlags, rw_crc), emptyRowSemantics);
     }
 }
 
@@ -1380,7 +1406,7 @@ class CRowStreamWriter : private IRowSerializerTarget, implements IExtRowWriter,
     Linked<IOutputRowSerializer> serializer;
     Linked<IEngineRowAllocator> allocator;
     CRC32 crc;
-    bool grouped;
+    EmptyRowSemantics emptyRowSemantics;
     bool tallycrc;
     unsigned nested;
     MemoryAttr ma;
@@ -1441,13 +1467,12 @@ class CRowStreamWriter : private IRowSerializerTarget, implements IExtRowWriter,
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CRowStreamWriter(IFileIOStream *_stream,IOutputRowSerializer *_serializer,IEngineRowAllocator *_allocator,bool _grouped, bool _tallycrc, bool _autoflush)
-        : stream(_stream), serializer(_serializer), allocator(_allocator)
+    CRowStreamWriter(IFileIOStream *_stream, IOutputRowSerializer *_serializer, IEngineRowAllocator *_allocator, EmptyRowSemantics _emptyRowSemantics, bool _tallycrc, bool _autoflush)
+        : stream(_stream), serializer(_serializer), allocator(_allocator), emptyRowSemantics(_emptyRowSemantics)
     {
 #ifdef TRACE_CREATE
         PROGLOG("createRowWriter %d = %p",++wrnum,this);
 #endif
-        grouped = _grouped;
         tallycrc = _tallycrc;
         nested = 0;
         buf = (byte *)ma.allocate(ROW_WRITER_BUFFERSIZE);
@@ -1472,26 +1497,44 @@ public:
 
     void putRow(const void *row)
     {
-        if (row) {
-            serializer->serialize(*this,(const byte *)row);
-            if (grouped) {
+        if (row)
+        {
+            if (ers_allow == emptyRowSemantics)
+            {
                 byte b = 0;
-                if (bufpos<ROW_WRITER_BUFFERSIZE) 
-                    buf[bufpos++] = b;
-                else 
-                    extbuf.append(b);
+                put(1, &b);
+                serializer->serialize(*this, (const byte *)row);
+            }
+            else
+            {
+                serializer->serialize(*this, (const byte *)row);
+                if (ers_eogonly == emptyRowSemantics)
+                {
+                    byte b = 0;
+                    if (bufpos<ROW_WRITER_BUFFERSIZE)
+                        buf[bufpos++] = b;
+                    else
+                        extbuf.append(b);
+                }
             }
             allocator->releaseRow(row);
         }
-        else if (grouped) { // backpatch
+        else if (ers_eogonly == emptyRowSemantics) // backpatch
+        {
             byte b = 1;
             if (extbuf.length())
-                extbuf.writeDirect(extbuf.length()-1,sizeof(b),&b);
-            else {
+                extbuf.writeDirect(extbuf.length()-1, sizeof(b), &b);
+            else
+            {
                 assertex(bufpos);
                 buf[bufpos-1] = b;
             }
         }
+        else if (ers_allow == emptyRowSemantics)
+        {
+            byte b = 1;
+            put(1, &b);
+        }
     }
 
     void flush()
@@ -1617,7 +1660,8 @@ IExtRowWriter *createRowWriter(IFileIOStream *strm, IRowInterfaces *rowIf, unsig
 {
     if (0 != (flags & (rw_extend|rw_buffered|COMP_MASK)))
         throw MakeStringException(0, "Unsupported createRowWriter flags");
-    Owned<CRowStreamWriter> writer = new CRowStreamWriter(strm, rowIf->queryRowSerializer(), rowIf->queryRowAllocator(), TestRwFlag(flags, rw_grouped), TestRwFlag(flags, rw_crc), TestRwFlag(flags, rw_autoflush));
+    EmptyRowSemantics emptyRowSemantics = extractESRFromRWFlags(flags);
+    Owned<CRowStreamWriter> writer = new CRowStreamWriter(strm, rowIf->queryRowSerializer(), rowIf->queryRowAllocator(), emptyRowSemantics, TestRwFlag(flags, rw_crc), TestRwFlag(flags, rw_autoflush));
     return writer.getClear();
 }
 

+ 26 - 1
common/thorhelper/thorcommon.hpp

@@ -93,7 +93,8 @@ enum RowReaderWriterFlags
     rw_autoflush      = 0x40,
     rw_buffered       = 0x80,
     rw_lzw            = 0x100, // if rw_compress
-    rw_lz4            = 0x200  // if rw_compress
+    rw_lz4            = 0x200, // if rw_compress
+    rw_sparse         = 0x400  // NB: mutually exclusive with rw_grouped
 };
 #define DEFAULT_RWFLAGS (rw_buffered|rw_autoflush|rw_compressblkcrc)
 inline bool TestRwFlag(unsigned flags, RowReaderWriterFlags flag) { return 0 != (flags & flag); }
@@ -159,6 +160,30 @@ interface IExtRowWriter: extends IRowWriter
     virtual void flush(CRC32 *crcout=NULL) = 0;
 };
 
+enum EmptyRowSemantics { ers_forbidden, ers_allow, ers_eogonly };
+inline unsigned mapESRToRWFlags(EmptyRowSemantics emptyRowSemantics)
+{
+    switch (emptyRowSemantics)
+    {
+        case ers_allow:
+            return rw_sparse;
+        case ers_eogonly:
+            return rw_grouped;
+        default:
+            return 0;
+    }
+}
+
+inline EmptyRowSemantics extractESRFromRWFlags(unsigned rwFlags)
+{
+    if (TestRwFlag(rwFlags, rw_sparse))
+        return ers_allow;
+    else if (TestRwFlag(rwFlags, rw_grouped))
+        return ers_eogonly;
+    else
+        return ers_forbidden;
+}
+
 interface IExpander;
 extern THORHELPER_API IExtRowStream *createRowStream(IFile *file, IRowInterfaces *rowif, unsigned flags=DEFAULT_RWFLAGS, IExpander *eexp=NULL);
 extern THORHELPER_API IExtRowStream *createRowStreamEx(IFile *file, IRowInterfaces *rowif, offset_t offset=0, offset_t len=(offset_t)-1, unsigned __int64 maxrows=(unsigned __int64)-1, unsigned flags=DEFAULT_RWFLAGS, IExpander *eexp=NULL);

+ 1 - 1
thorlcr/activities/aggregate/thaggregateslave.cpp

@@ -49,7 +49,7 @@ protected:
         if (1 == numPartialResults)
             return firstRow;
 
-        CThorExpandingRowArray partialResults(*this, this, true, stableSort_none, true, numPartialResults);
+        CThorExpandingRowArray partialResults(*this, this, ers_allow, stableSort_none, true, numPartialResults);
         if (hadElement)
             partialResults.setRow(0, firstRow);
         --numPartialResults;

+ 1 - 1
thorlcr/activities/catch/thcatchslave.cpp

@@ -155,7 +155,7 @@ class CSkipCatchSlaveActivity : public CCatchSlaveActivityBase
         try
         {
             gathered = true;
-            Owned<IRowWriterMultiReader> overflowBuf = createOverflowableBuffer(*this, queryRowInterfaces(input), true);
+            Owned<IRowWriterMultiReader> overflowBuf = createOverflowableBuffer(*this, queryRowInterfaces(input), ers_eogonly);
             running = true;
             while (running)
             {

+ 1 - 1
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -2976,7 +2976,7 @@ friend class CBucket;
 };
 
 CHashTableRowTable::CHashTableRowTable(HashDedupSlaveActivityBase &_activity, IThorRowInterfaces *rowIf, IHash *_iRowHash, IHash *_iKeyHash, ICompare *_iCompare)
-    : CThorExpandingRowArray(_activity, rowIf, true),
+    : CThorExpandingRowArray(_activity, rowIf, ers_allow),
       activity(_activity), iRowHash(_iRowHash), iKeyHash(_iKeyHash), iCompare(_iCompare)
 {
     htMax = htElements = 0;

+ 1 - 1
thorlcr/activities/join/thjoinslave.cpp

@@ -551,7 +551,7 @@ public:
             ActPrintLog("JOIN barrier.1 raised");
 
             // primaryWriter will keep as much in memory as possible.
-            Owned<IRowWriterMultiReader> primaryWriter = createOverflowableBuffer(*this, primaryRowIf, false);
+            Owned<IRowWriterMultiReader> primaryWriter = createOverflowableBuffer(*this, primaryRowIf, ers_forbidden);
             primaryStream.setown(sorter->startMerge(totalrows));
             copyRowStream(primaryStream, primaryWriter);
             primaryStream.setown(primaryWriter->getReader()); // NB: rhsWriter no longer needed after this point

+ 1 - 1
thorlcr/activities/limit/thlimitslave.cpp

@@ -238,7 +238,7 @@ public:
     virtual void start() override
     {
         CLimitSlaveActivityBase::start();
-        buf.setown(createOverflowableBuffer(*this, this, true));
+        buf.setown(createOverflowableBuffer(*this, this, ers_eogonly));
     }
     CATCH_NEXTROW()
     {

+ 7 - 9
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -774,14 +774,12 @@ class CThorRowArrayWithFlushMarker : public CThorSpillableRowArray
 public:
     CThorRowArrayWithFlushMarker(CActivityBase &activity) : CThorSpillableRowArray(activity)
     {
-        flushMarker = 0;
     }
-    CThorRowArrayWithFlushMarker(CActivityBase &activity, IThorRowInterfaces *rowIf, bool allowNulls=false, StableSortFlag stableSort=stableSort_none, rowidx_t initialSize=InitialSortElements, size32_t commitDelta=CommitStep)
-        : CThorSpillableRowArray(activity, rowIf, allowNulls, stableSort, initialSize, commitDelta)
+    CThorRowArrayWithFlushMarker(CActivityBase &activity, IThorRowInterfaces *rowIf, EmptyRowSemantics emptyRowSemantics=ers_forbidden, StableSortFlag stableSort=stableSort_none, rowidx_t initialSize=InitialSortElements, size32_t commitDelta=CommitStep)
+        : CThorSpillableRowArray(activity, rowIf, emptyRowSemantics, stableSort, initialSize, commitDelta)
     {
-        flushMarker = 0;
     }
-    rowidx_t flushMarker;
+    rowidx_t flushMarker = 0;
 };
 
 
@@ -1436,7 +1434,7 @@ public:
             rhs.setup(sharedRightRowInterfaces);
             // NB: use sharedRightRowInterfaces, so that expanding ptr array is using shared allocator
             for (unsigned s=0; s<container.queryJob().querySlaves(); s++)
-                rhsSlaveRows.item(s)->setup(sharedRightRowInterfaces, false, stableSort_none, true);
+                rhsSlaveRows.item(s)->setup(sharedRightRowInterfaces, ers_forbidden, stableSort_none, true);
         }
     }
     virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
@@ -2025,7 +2023,7 @@ protected:
             if (!hasFailedOverToLocal())
             {
                 if (stable && !globallySorted)
-                    rhs.setup(sharedRightRowInterfaces, false, stableSort_earlyAlloc);
+                    rhs.setup(sharedRightRowInterfaces, ers_forbidden, stableSort_earlyAlloc);
                 bool success=false;
                 try
                 {
@@ -2079,7 +2077,7 @@ protected:
                     if (stable && !globallySorted)
                     {
                         ActPrintLog("Clearing rhs stable ptr table");
-                        rhs.setup(sharedRightRowInterfaces, false, stableSort_none); // don't need stable ptr table anymore
+                        rhs.setup(sharedRightRowInterfaces, ers_forbidden, stableSort_none); // don't need stable ptr table anymore
                     }
                 }
             }
@@ -2681,7 +2679,7 @@ public:
             if (isGlobal())
             {
                 for (unsigned s=0; s<container.queryJob().querySlaves(); s++)
-                    rhsSlaveRows.item(s)->setup(sharedRightRowInterfaces, false, stableSort_none, false);
+                    rhsSlaveRows.item(s)->setup(sharedRightRowInterfaces, ers_forbidden, stableSort_none, false);
             }
         }
     }

+ 5 - 5
thorlcr/activities/loop/thloop.cpp

@@ -296,7 +296,7 @@ public:
         unsigned maxIterations = helper->numIterations();
         if ((int)maxIterations < 0) maxIterations = 0;
         Owned<IThorGraphResults> loopResults = queryGraph().createThorGraphResults(maxIterations);
-        IThorResult *result = loopResults->createResult(*this, 0, this, true);
+        IThorResult *result = loopResults->createResult(*this, 0, this, mergeResultTypes(thorgraphresult_distributed, thorgraphresult_grouped));
 
         helper->createParentExtract(extractBuilder);
 
@@ -347,11 +347,11 @@ public:
     CLocalResultActivityMaster(CMasterGraphElement *info) : CLocalResultActivityMasterBase(info)
     {
     }
-    virtual void createResult()
+    virtual void createResult() override
     {
         IHThorLocalResultWriteArg *helper = (IHThorLocalResultWriteArg *)queryHelper();
         CGraphBase *graph = container.queryResultsGraph();
-        graph->createResult(*this, helper->querySequence(), this, true); // NB graph owns result
+        graph->createResult(*this, helper->querySequence(), this, mergeResultTypes(thorgraphresult_distributed, thorgraphresult_grouped)); // NB graph owns result
     }
 };
 
@@ -370,7 +370,7 @@ public:
     {
         IHThorGraphLoopResultWriteArg *helper = (IHThorGraphLoopResultWriteArg *)queryHelper();
         CGraphBase *graph = container.queryResultsGraph();
-        graph->createGraphLoopResult(*this, inputRowIf, true); // NB graph owns result
+        graph->createGraphLoopResult(*this, inputRowIf, mergeResultTypes(thorgraphresult_distributed, thorgraphresult_grouped)); // NB graph owns result
     }
 };
 
@@ -390,7 +390,7 @@ public:
     {
         IHThorDictionaryResultWriteArg *helper = (IHThorDictionaryResultWriteArg *)queryHelper();
         CGraphBase *graph = container.queryResultsGraph();
-        graph->createResult(*this, helper->querySequence(), this, true); // NB graph owns result
+        graph->createResult(*this, helper->querySequence(), this, mergeResultTypes(thorgraphresult_distributed, thorgraphresult_sparse)); // NB graph owns result
     }
 };
 

+ 31 - 7
thorlcr/activities/loop/thloopslave.cpp

@@ -275,7 +275,7 @@ public:
         helper->createParentExtract(extractBuilder);
         maxIterations = helper->numIterations();
         if ((int)maxIterations < 0) maxIterations = 0;
-        loopPending.setown(createOverflowableBuffer(*this, this, false, true));
+        loopPending.setown(createOverflowableBuffer(*this, this, ers_forbidden, true));
         loopPendingCount = 0;
         finishedLooping = ((container.getKind() == TAKloopcount) && (maxIterations == 0));
         if ((flags & IHThorLoopArg::LFnewloopagain) && !helper->loopFirstTime())
@@ -412,7 +412,7 @@ public:
                     if (!((const bool *)row.get())[0])
                         finishedLooping = true; // NB: will finish when loopPending has been consumed
                 }
-                loopPending.setown(createOverflowableBuffer(*this, this, false, true));
+                loopPending.setown(createOverflowableBuffer(*this, this, ers_forbidden, true));
                 loopPendingCount = 0;
                 ++loopCounter;
                 if ((container.getKind() == TAKloopcount) && (loopCounter > maxIterations))
@@ -470,7 +470,12 @@ public:
         if (!executed)
         {
             executed = true;
-            IThorResult *result = loopResults->createResult(*this, 0, this, !queryGraph().isLocalChild());
+            ThorGraphResultType resultType = thorgraphresult_nul;
+            if (!queryGraph().isLocalChild())
+                resultType = mergeResultTypes(resultType, thorgraphresult_distributed);
+            if (input->isGrouped())
+                resultType = mergeResultTypes(resultType, thorgraphresult_grouped);
+            IThorResult *result = loopResults->createResult(*this, 0, this, resultType);
             Owned<IRowWriter> resultWriter = result->getWriter();
             for (;;)
             {
@@ -629,7 +634,12 @@ public:
         abortSoon = false;
         assertex(container.queryResultsGraph());
         CGraphBase *graph = container.queryResultsGraph();
-        IThorResult *result = graph->createResult(*this, helper->querySequence(), this, !queryGraph().isLocalChild());  // NB graph owns result
+        ThorGraphResultType resultType = thorgraphresult_nul;
+        if (!queryGraph().isLocalChild())
+            resultType = mergeResultTypes(resultType, thorgraphresult_distributed);
+        if (input->isGrouped())
+            resultType = mergeResultTypes(resultType, thorgraphresult_grouped);
+        IThorResult *result = graph->createResult(*this, helper->querySequence(), this, resultType);  // NB graph owns result
         resultWriter.setown(result->getWriter());
     }
     CATCH_NEXTROW()
@@ -718,7 +728,12 @@ public:
     {
         IHThorLocalResultWriteArg *helper = (IHThorLocalResultWriteArg *)queryHelper();
         CGraphBase *graph = container.queryResultsGraph();
-        return graph->createResult(*this, helper->querySequence(), this, !queryGraph().isLocalChild());
+        ThorGraphResultType resultType = thorgraphresult_nul;
+        if (!queryGraph().isLocalChild())
+            resultType = mergeResultTypes(resultType, thorgraphresult_distributed);
+        if (input->isGrouped())
+            resultType = mergeResultTypes(resultType, thorgraphresult_grouped);
+        return graph->createResult(*this, helper->querySequence(), this, resultType);
     }
 };
 
@@ -760,7 +775,11 @@ public:
             builder.appendOwn(row);
         }
         CGraphBase *graph = container.queryResultsGraph();
-        IThorResult *result = graph->createResult(*this, helper->querySequence(), this, !queryGraph().isLocalChild());
+        ThorGraphResultType resultType = thorgraphresult_nul;
+        if (!queryGraph().isLocalChild())
+            resultType = mergeResultTypes(resultType, thorgraphresult_distributed);
+        resultType = mergeResultTypes(resultType, thorgraphresult_sparse);
+        IThorResult *result = graph->createResult(*this, helper->querySequence(), this, resultType);
         Owned<IRowWriter> resultWriter = result->getWriter();
         size32_t dictSize = builder.getcount();
         byte ** dictRows = builder.queryrows();
@@ -1384,7 +1403,12 @@ public:
     {
         IHThorGraphLoopResultWriteArg *helper = (IHThorGraphLoopResultWriteArg *)queryHelper();
         CGraphBase *graph = container.queryResultsGraph();
-        return graph->createGraphLoopResult(*this, input->queryFromActivity(), !queryGraph().isLocalChild());
+        ThorGraphResultType resultType = thorgraphresult_nul;
+        if (!queryGraph().isLocalChild())
+            resultType = mergeResultTypes(resultType, thorgraphresult_distributed);
+        if (input->isGrouped())
+            resultType = mergeResultTypes(resultType, thorgraphresult_grouped);
+        return graph->createGraphLoopResult(*this, input->queryFromActivity(), resultType);
     }
 };
 

+ 1 - 1
thorlcr/activities/thdiskbase.cpp

@@ -455,7 +455,7 @@ rowcount_t getCount(CActivityBase &activity, unsigned partialResults, rowcount_t
 const void *getAggregate(CActivityBase &activity, unsigned partialResults, IThorRowInterfaces &rowIf, IHThorCompoundAggregateExtra &aggHelper, mptag_t mpTag)
 {
     // JCSMORE - pity this isn't common routine with similar one in aggregate, but helper is not common
-    CThorExpandingRowArray slaveResults(activity, &activity, true, stableSort_none, true, partialResults);
+    CThorExpandingRowArray slaveResults(activity, &activity, ers_allow, stableSort_none, true, partialResults);
     unsigned _partialResults = partialResults;
     while (_partialResults--)
     {

+ 31 - 20
thorlcr/graph/thgraph.cpp

@@ -47,7 +47,8 @@ class CThorGraphResult : implements IThorResult, implements IRowWriter, public C
     Owned<IRowWriterMultiReader> rowBuffer;
     IThorRowInterfaces *rowIf;
     IEngineRowAllocator *allocator;
-    bool stopped, readers, distributed;
+    bool stopped, readers;
+    ThorGraphResultType resultType;
 
     void init()
     {
@@ -63,7 +64,7 @@ class CThorGraphResult : implements IThorResult, implements IRowWriter, public C
     public:
         IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-        CStreamWriter(CThorGraphResult &_owner) : owner(_owner), rows(owner.activity, owner.rowIf, true)
+        CStreamWriter(CThorGraphResult &_owner, EmptyRowSemantics emptyRowSemantics) : owner(_owner), rows(owner.activity, owner.rowIf, emptyRowSemantics)
         {
         }
 
@@ -81,13 +82,20 @@ class CThorGraphResult : implements IThorResult, implements IRowWriter, public C
 public:
     IMPLEMENT_IINTERFACE;
 
-    CThorGraphResult(CActivityBase &_activity, IThorRowInterfaces *_rowIf, bool _distributed, unsigned spillPriority) : activity(_activity), rowIf(_rowIf), distributed(_distributed)
+    CThorGraphResult(CActivityBase &_activity, IThorRowInterfaces *_rowIf, ThorGraphResultType _resultType, unsigned spillPriority) : activity(_activity), rowIf(_rowIf), resultType(_resultType)
     {
         init();
+        EmptyRowSemantics emptyRowSemantics;
+        if (isGrouped())
+            emptyRowSemantics = ers_eogonly;
+        else if (isSparse())
+            emptyRowSemantics = ers_allow;
+        else
+            emptyRowSemantics = ers_forbidden;
         if (SPILL_PRIORITY_DISABLE == spillPriority)
-            rowBuffer.setown(new CStreamWriter(*this));
+            rowBuffer.setown(new CStreamWriter(*this, emptyRowSemantics));
         else
-            rowBuffer.setown(createOverflowableBuffer(activity, rowIf, true, true));
+            rowBuffer.setown(createOverflowableBuffer(activity, rowIf, emptyRowSemantics, true));
     }
 
 // IRowWriter
@@ -118,7 +126,9 @@ public:
     }
     virtual IThorRowInterfaces *queryRowInterfaces() { return rowIf; }
     virtual CActivityBase *queryActivity() { return &activity; }
-    virtual bool isDistributed() const { return distributed; }
+    virtual bool isDistributed() const { return resultType & thorgraphresult_distributed; }
+    virtual bool isSparse() const { return resultType & thorgraphresult_sparse; }
+    virtual bool isGrouped() const { return resultType & thorgraphresult_grouped; }
     virtual void serialize(MemoryBuffer &mb)
     {
         Owned<IRowStream> stream = getRowStream();
@@ -191,18 +201,18 @@ public:
 
 /////
 
-IThorResult *CThorGraphResults::createResult(CActivityBase &activity, unsigned id, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
+IThorResult *CThorGraphResults::createResult(CActivityBase &activity, unsigned id, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
 {
-    Owned<IThorResult> result = ::createResult(activity, rowIf, distributed, spillPriority);
+    Owned<IThorResult> result = ::createResult(activity, rowIf, resultType, spillPriority);
     setResult(id, result);
     return result;
 }
 
 /////
 
-IThorResult *createResult(CActivityBase &activity, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
+IThorResult *createResult(CActivityBase &activity, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
 {
-    return new CThorGraphResult(activity, rowIf, distributed, spillPriority);
+    return new CThorGraphResult(activity, rowIf, resultType, spillPriority);
 }
 
 /////
@@ -230,7 +240,7 @@ public:
         thor_loop_counter_t * res = (thor_loop_counter_t *)counterRow.ensureCapacity(sizeof(thor_loop_counter_t),NULL);
         *res = loopCounter;
         OwnedConstThorRow counterRowFinal = counterRow.finalizeRowClear(sizeof(thor_loop_counter_t));
-        IThorResult *counterResult = results->createResult(activity, pos, countRowIf, false, SPILL_PRIORITY_DISABLE);
+        IThorResult *counterResult = results->createResult(activity, pos, countRowIf, thorgraphresult_nul, SPILL_PRIORITY_DISABLE);
         Owned<IRowWriter> counterResultWriter = counterResult->getWriter();
         counterResultWriter->putRow(counterRowFinal.getClear());
     }
@@ -238,14 +248,15 @@ public:
     {
         if (!loopAgainRowIf)
             loopAgainRowIf.setown(activity.createRowInterfaces(loopAgainMeta));
-        activity.queryGraph().createResult(activity, pos, results, loopAgainRowIf, !activity.queryGraph().isLocalChild(), SPILL_PRIORITY_DISABLE);
+        activity.queryGraph().createResult(activity, pos, results, loopAgainRowIf, activity.queryGraph().isLocalChild() ? thorgraphresult_nul : thorgraphresult_distributed, SPILL_PRIORITY_DISABLE);
     }
     virtual void prepareLoopResults(CActivityBase &activity, IThorGraphResults *results)
     {
         if (!resultRowIf)
             resultRowIf.setown(activity.createRowInterfaces(resultMeta));
-        IThorResult *loopResult = results->createResult(activity, 0, resultRowIf, !activity.queryGraph().isLocalChild()); // loop output
-        IThorResult *inputResult = results->createResult(activity, 1, resultRowIf, !activity.queryGraph().isLocalChild()); // loop input
+        ThorGraphResultType resultType = activity.queryGraph().isLocalChild() ? thorgraphresult_nul : thorgraphresult_distributed;
+        IThorResult *loopResult = results->createResult(activity, 0, resultRowIf, resultType); // loop output
+        IThorResult *inputResult = results->createResult(activity, 1, resultRowIf, resultType); // loop input
     }
     virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *results, IRowWriterMultiReader *inputStream, rowcount_t rowStreamCount, size32_t parentExtractSz, const byte *parentExtract)
     {
@@ -2076,19 +2087,19 @@ IThorResult *CGraphBase::getGraphLoopResult(unsigned id, bool distributed)
     return graphLoopResults->getResult(id, distributed);
 }
 
-IThorResult *CGraphBase::createResult(CActivityBase &activity, unsigned id, IThorGraphResults *results, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
+IThorResult *CGraphBase::createResult(CActivityBase &activity, unsigned id, IThorGraphResults *results, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
 {
-    return results->createResult(activity, id, rowIf, distributed, spillPriority);
+    return results->createResult(activity, id, rowIf, resultType, spillPriority);
 }
 
-IThorResult *CGraphBase::createResult(CActivityBase &activity, unsigned id, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
+IThorResult *CGraphBase::createResult(CActivityBase &activity, unsigned id, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
 {
-    return localResults->createResult(activity, id, rowIf, distributed, spillPriority);
+    return localResults->createResult(activity, id, rowIf, resultType, spillPriority);
 }
 
-IThorResult *CGraphBase::createGraphLoopResult(CActivityBase &activity, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
+IThorResult *CGraphBase::createGraphLoopResult(CActivityBase &activity, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
 {
-    return graphLoopResults->createResult(activity, rowIf, distributed, spillPriority);
+    return graphLoopResults->createResult(activity, rowIf, resultType, spillPriority);
 }
 
 // IEclGraphResults

+ 17 - 9
thorlcr/graph/thgraph.hpp

@@ -121,14 +121,22 @@ interface IThorResult : extends IInterface
     virtual const void * getLinkedRowResult() = 0;
 };
 
+enum ThorGraphResultType:unsigned
+{
+    thorgraphresult_nul = 0x00,
+    thorgraphresult_distributed = 0x01,
+    thorgraphresult_grouped = 0x02,
+    thorgraphresult_sparse = 0x04
+};
+inline ThorGraphResultType mergeResultTypes(ThorGraphResultType l, ThorGraphResultType r) { return (ThorGraphResultType) (l|r); }
 class CActivityBase;
 // JCSMORE - based on IHThorGraphResults
 interface IThorGraphResults : extends IEclGraphResults
 {
     virtual void clear() = 0;
     virtual IThorResult *getResult(unsigned id, bool distributed=false) = 0;
-    virtual IThorResult *createResult(CActivityBase &activity, unsigned id, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT) = 0;
-    virtual IThorResult *createResult(CActivityBase &activity, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT) = 0;
+    virtual IThorResult *createResult(CActivityBase &activity, unsigned id, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority=SPILL_PRIORITY_RESULT) = 0;
+    virtual IThorResult *createResult(CActivityBase &activity, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority=SPILL_PRIORITY_RESULT) = 0;
     virtual unsigned addResult(IThorResult *result) = 0;
     virtual void setResult(unsigned id, IThorResult *result) = 0;
     virtual unsigned count() = 0;
@@ -740,9 +748,9 @@ public:
 
     virtual IThorResult *getResult(unsigned id, bool distributed=false);
     virtual IThorResult *getGraphLoopResult(unsigned id, bool distributed=false);
-    virtual IThorResult *createResult(CActivityBase &activity, unsigned id, IThorGraphResults *results, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT);
-    virtual IThorResult *createResult(CActivityBase &activity, unsigned id, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT);
-    virtual IThorResult *createGraphLoopResult(CActivityBase &activity, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT);
+    virtual IThorResult *createResult(CActivityBase &activity, unsigned id, IThorGraphResults *results, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority=SPILL_PRIORITY_RESULT);
+    virtual IThorResult *createResult(CActivityBase &activity, unsigned id, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority=SPILL_PRIORITY_RESULT);
+    virtual IThorResult *createGraphLoopResult(CActivityBase &activity, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority=SPILL_PRIORITY_RESULT);
 
 // IEclGraphResults
     virtual void getDictionaryResult(unsigned & count, byte * * & ret, unsigned id);
@@ -1195,10 +1203,10 @@ public:
         // NB: stream static after this, i.e. nothing can be added to this result
         return LINK(&results.item(id));
     }
-    virtual IThorResult *createResult(CActivityBase &activity, unsigned id, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT);
-    virtual IThorResult *createResult(CActivityBase &activity, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT)
+    virtual IThorResult *createResult(CActivityBase &activity, unsigned id, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority=SPILL_PRIORITY_RESULT);
+    virtual IThorResult *createResult(CActivityBase &activity, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority=SPILL_PRIORITY_RESULT)
     {
-        return createResult(activity, results.ordinality(), rowIf, distributed, spillPriority);
+        return createResult(activity, results.ordinality(), rowIf, resultType, spillPriority);
     }
     virtual unsigned addResult(IThorResult *result)
     {
@@ -1237,7 +1245,7 @@ public:
     virtual activity_id queryOwnerId() const { return ownerId; }
 };
 
-extern graph_decl IThorResult *createResult(CActivityBase &activity, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT);
+extern graph_decl IThorResult *createResult(CActivityBase &activity, IThorRowInterfaces *rowIf, ThorGraphResultType resulType, unsigned spillPriority=SPILL_PRIORITY_RESULT);
 
 
 class CGraphElementBase;

+ 4 - 4
thorlcr/graph/thgraphmaster.cpp

@@ -2014,7 +2014,7 @@ class CCollatedResult : implements IThorResult, public CSimpleInterface
                 }
             }
         }
-        Owned<IThorResult> _result = ::createResult(activity, rowIf, false, spillPriority);
+        Owned<IThorResult> _result = ::createResult(activity, rowIf, thorgraphresult_nul, spillPriority);
         Owned<IRowWriter> resultWriter = _result->getWriter();
         for (unsigned s=0; s<numSlaves; s++)
         {
@@ -2748,21 +2748,21 @@ bool CMasterGraph::deserializeStats(unsigned node, MemoryBuffer &mb)
     return true;
 }
 
-IThorResult *CMasterGraph::createResult(CActivityBase &activity, unsigned id, IThorGraphResults *results, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
+IThorResult *CMasterGraph::createResult(CActivityBase &activity, unsigned id, IThorGraphResults *results, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
 {
     Owned<CCollatedResult> result = new CCollatedResult(*this, activity, rowIf, id, results->queryOwnerId(), spillPriority);
     results->setResult(id, result);
     return result;
 }
 
-IThorResult *CMasterGraph::createResult(CActivityBase &activity, unsigned id, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
+IThorResult *CMasterGraph::createResult(CActivityBase &activity, unsigned id, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
 {
     Owned<CCollatedResult> result = new CCollatedResult(*this, activity, rowIf, id, localResults->queryOwnerId(), spillPriority);
     localResults->setResult(id, result);
     return result;
 }
 
-IThorResult *CMasterGraph::createGraphLoopResult(CActivityBase &activity, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
+IThorResult *CMasterGraph::createGraphLoopResult(CActivityBase &activity, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
 {
     Owned<CCollatedResult> result = new CCollatedResult(*this, activity, rowIf, 0, localResults->queryOwnerId(), spillPriority);
     unsigned id = graphLoopResults->addResult(result);

+ 3 - 3
thorlcr/graph/thgraphmaster.ipp

@@ -76,9 +76,9 @@ public:
     virtual void done() override;
     virtual void reset() override;
     virtual void abort(IException *e) override;
-    IThorResult *createResult(CActivityBase &activity, unsigned id, IThorGraphResults *results, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT);
-    IThorResult *createResult(CActivityBase &activity, unsigned id, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT);
-    IThorResult *createGraphLoopResult(CActivityBase &activity, IThorRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT);
+    IThorResult *createResult(CActivityBase &activity, unsigned id, IThorGraphResults *results, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority=SPILL_PRIORITY_RESULT);
+    IThorResult *createResult(CActivityBase &activity, unsigned id, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority=SPILL_PRIORITY_RESULT);
+    IThorResult *createGraphLoopResult(CActivityBase &activity, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority=SPILL_PRIORITY_RESULT);
 
 // IExceptionHandler
     virtual bool fireException(IException *e);

+ 1 - 1
thorlcr/graph/thgraphslave.cpp

@@ -1277,7 +1277,7 @@ IThorResult *CSlaveGraph::getGlobalResult(CActivityBase &activity, IThorRowInter
     if (!queryJobChannel().queryJobComm().send(msg, 0, queryJob().querySlaveMpTag(), LONGTIMEOUT))
         throwUnexpected();
 
-    Owned<IThorResult> result = ::createResult(activity, rowIf, false);
+    Owned<IThorResult> result = ::createResult(activity, rowIf, thorgraphresult_nul);
     Owned<IRowWriter> resultWriter = result->getWriter();
 
     MemoryBuffer mb;

+ 11 - 47
thorlcr/msort/tsortm.cpp

@@ -186,7 +186,7 @@ struct PartitionInfo
     size32_t guard;
     Linked<IThorRowInterfaces> prowif;
     PartitionInfo(CActivityBase *_activity, IThorRowInterfaces *rowif)
-        : splitkeys(*_activity, rowif, true), prowif(rowif)
+        : splitkeys(*_activity, rowif, ers_allow), prowif(rowif)
     {
         nodes = NULL;
         mpports = NULL;
@@ -222,42 +222,6 @@ struct PartitionInfo
         // should be more defensive here
         return (numnodes!=0)&&(splitkeys.ordinality()!=0);
     }
-
-    void serialize(MemoryBuffer &mb)
-    {
-        mb.append(numnodes);
-        unsigned i;
-        for (i=0;i<numnodes;i++)
-            nodes[i].serialize(mb);
-        for (i=0;i<numnodes;i++)
-            mb.append((unsigned short)mpports[i]);
-        mb.append((unsigned)mpTagRPC);
-        mb.append(guard);
-        splitkeys.serialize(mb);
-    }   
-    void deserialize(size32_t len,void *src)
-    {
-        kill();
-        MemoryBuffer mb(len,src);
-        mb.read(numnodes);
-        nodes = (SocketEndpoint *)malloc(numnodes*sizeof(SocketEndpoint));
-        unsigned i;
-        for (i=0;i<numnodes;i++)
-            nodes[i].deserialize(mb);
-        mpports = (unsigned short *)malloc(numnodes*sizeof(unsigned short));
-        for (i=0;i<numnodes;i++) 
-            mb.read(mpports[i]);
-        unsigned t;
-        mb.read(t);
-        mpTagRPC = (mptag_t)t;
-        size32_t left = mb.remaining();
-        size32_t dsguard;
-        mb.read(dsguard);
-        if (guard!=dsguard)
-            throw MakeStringException(-1,"SORT: PartitionInfo meta info mismatch(%d,%d)",guard,dsguard);
-        splitkeys.kill();
-        splitkeys.deserialize(left, mb.readDirect(left));
-    }
 };
 
 
@@ -595,7 +559,7 @@ public:
         unsigned averagesamples = OVERSAMPLE*numnodes;  
         rowcount_t averagerecspernode = (rowcount_t)(total/numnodes);
         CriticalSection asect;
-        CThorExpandingRowArray sample(*activity, keyIf, true);
+        CThorExpandingRowArray sample(*activity, keyIf, ers_allow);
         class casyncfor1: public CAsyncFor
         {
             CSortMaster &owner;
@@ -651,7 +615,7 @@ public:
         offset_t ts=sample.serializedSize();
         estrecsize = numsamples?((size32_t)(ts/numsamples)):100;
         sample.sort(*icompare, activity->queryMaxCores());
-        CThorExpandingRowArray mid(*activity, keyIf, true);
+        CThorExpandingRowArray mid(*activity, keyIf, ers_allow);
         if (numsamples) // could shuffle up empty nodes here
         {
             for (unsigned i=0;i<numsplits;i++)
@@ -757,12 +721,12 @@ public:
             return splitmap.getClear();
         }
         unsigned numsplits=numnodes-1;
-        CThorExpandingRowArray emin(*activity, keyIf, true);
-        CThorExpandingRowArray emax(*activity, keyIf, true);
-        CThorExpandingRowArray totmid(*activity, keyIf, true);
+        CThorExpandingRowArray emin(*activity, keyIf, ers_allow);
+        CThorExpandingRowArray emax(*activity, keyIf, ers_allow);
+        CThorExpandingRowArray totmid(*activity, keyIf, ers_allow);
         ECFarray = &totmid;
         ECFcompare = icompare;
-        CThorExpandingRowArray mid(*activity, keyIf, true);
+        CThorExpandingRowArray mid(*activity, keyIf, ers_allow);
         unsigned i;
         unsigned j;
         for(i=0;i<numsplits;i++)
@@ -925,8 +889,8 @@ public:
                     }
                 }
 
-                CThorExpandingRowArray newmin(*activity, keyIf, true);
-                CThorExpandingRowArray newmax(*activity, keyIf, true);
+                CThorExpandingRowArray newmin(*activity, keyIf, ers_allow);
+                CThorExpandingRowArray newmax(*activity, keyIf, ers_allow);
                 unsigned __int64 maxerror=0;
                 unsigned __int64 nodewanted = (stotal/numnodes); // Note scaled total
                 unsigned __int64 variancelimit = estrecsize?maxdeviance/estrecsize:0;
@@ -1088,7 +1052,7 @@ public:
         // I think this dependent on row being same format as meta
 
         unsigned numsplits=numnodes-1;
-        CThorExpandingRowArray splits(*activity, keyIf, true);
+        CThorExpandingRowArray splits(*activity, keyIf, ers_allow);
         char *s=cosortfilenames;
         unsigned i;
         for(i=0;i<numnodes;i++)
@@ -1122,7 +1086,7 @@ public:
     {
         ActPrintLog(activity, "Previous partition");
         unsigned numsplits=numnodes-1;
-        CThorExpandingRowArray splits(*activity, keyIf, true);
+        CThorExpandingRowArray splits(*activity, keyIf, ers_allow);
         unsigned i;
         for(i=1;i<numnodes;i++)
         {

+ 7 - 7
thorlcr/msort/tsorts.cpp

@@ -167,7 +167,7 @@ public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
     CWriteIntercept(CActivityBase &_activity, IThorRowInterfaces *_rowIf, unsigned _interval)
-        : activity(_activity), rowIf(_rowIf), interval(_interval), sampleRows(activity, rowIf, true)
+        : activity(_activity), rowIf(_rowIf), interval(_interval), sampleRows(activity, rowIf, ers_forbidden)
     {
         interval = _interval;
         idx = 0;
@@ -867,9 +867,9 @@ public:
     {
         // finds the keys within the ranges specified
         // uses empty keys (0 size) if none found
-        CThorExpandingRowArray low(*activity, keyIf, true);
-        CThorExpandingRowArray high(*activity, keyIf, true);
-        CThorExpandingRowArray mid(*activity, keyIf, true);
+        CThorExpandingRowArray low(*activity, keyIf, ers_allow);
+        CThorExpandingRowArray high(*activity, keyIf, ers_allow);
+        CThorExpandingRowArray mid(*activity, keyIf, ers_allow);
         low.deserializeExpand(lbufsize, lkeybuf);
         high.deserializeExpand(hbufsize, hkeybuf);
         unsigned n=low.ordinality();
@@ -936,13 +936,13 @@ public:
     }
     virtual void MultiBinChop(size32_t keybufsize, const byte * keybuf, unsigned num, rowcount_t * pos, byte cmpfn)
     {
-        CThorExpandingRowArray keys(*activity, keyIf, true);
+        CThorExpandingRowArray keys(*activity, keyIf, ers_allow);
         keys.deserialize(keybufsize, keybuf);
         doBinChop(keys, pos, num, cmpfn);
     }
     virtual void MultiBinChopStart(size32_t keybufsize, const byte * keybuf, byte cmpfn)
     {
-        CThorExpandingRowArray keys(*activity, keyIf, true);
+        CThorExpandingRowArray keys(*activity, keyIf, ers_allow);
         keys.deserializeExpand(keybufsize, keybuf);
         assertex(multibinchoppos==NULL); // check for reentrancy
         multibinchopnum = keys.ordinality();
@@ -971,7 +971,7 @@ public:
         for (i=0;i<mapsize;i++)
             ActPrintLog(activity, "%" RCPF "d ",overflowmap[i]);
 #endif
-        CThorExpandingRowArray keys(*activity, keyIf, true);
+        CThorExpandingRowArray keys(*activity, keyIf, ers_allow);
         keys.deserialize(keybufsize, keybuf);
         for (i=0;i<mapsize-1;i++)
             AdjustOverflow(overflowmap[i], keys.query(i), cmpfn);

+ 6 - 6
thorlcr/thorutil/thbuf.cpp

@@ -631,10 +631,10 @@ class COverflowableBuffer : public CSimpleInterface, implements IRowWriterMultiR
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    COverflowableBuffer(CActivityBase &_activity, IThorRowInterfaces *_rowIf, bool grouped, bool _shared, unsigned spillPriority)
+    COverflowableBuffer(CActivityBase &_activity, IThorRowInterfaces *_rowIf, EmptyRowSemantics emptyRowSemantics, bool _shared, unsigned spillPriority)
         : activity(_activity), rowIf(_rowIf), shared(_shared)
     {
-        collector.setown(createThorRowCollector(activity, rowIf, NULL, stableSort_none, rc_mixed, spillPriority, grouped));
+        collector.setown(createThorRowCollector(activity, rowIf, NULL, stableSort_none, rc_mixed, spillPriority, emptyRowSemantics));
         writer.setown(collector->getWriter());
         eoi = false;
     }
@@ -662,9 +662,9 @@ public:
     }
 };
 
-IRowWriterMultiReader *createOverflowableBuffer(CActivityBase &activity, IThorRowInterfaces *rowIf, bool grouped, bool shared, unsigned spillPriority)
+IRowWriterMultiReader *createOverflowableBuffer(CActivityBase &activity, IThorRowInterfaces *rowIf, EmptyRowSemantics emptyRowSemantics, bool shared, unsigned spillPriority)
 {
-    return new COverflowableBuffer(activity, rowIf, grouped, shared, spillPriority);
+    return new COverflowableBuffer(activity, rowIf, emptyRowSemantics, shared, spillPriority);
 }
 
 
@@ -1169,7 +1169,7 @@ friend class CRowSet;
 };
 
 CRowSet::CRowSet(CSharedWriteAheadBase &_sharedWriteAhead, unsigned _chunk, unsigned maxRows)
-    : sharedWriteAhead(_sharedWriteAhead), rows(*_sharedWriteAhead.activity, _sharedWriteAhead.activity, true, stableSort_none, true, maxRows), chunk(_chunk)
+    : sharedWriteAhead(_sharedWriteAhead), rows(*_sharedWriteAhead.activity, _sharedWriteAhead.activity, ers_eogonly, stableSort_none, true, maxRows), chunk(_chunk)
 {
 }
 
@@ -1745,7 +1745,7 @@ public:
         eos = eow = readerBlocked = false;
         rowPos = rowsToRead = 0;
         writersComplete = writersBlocked = 0;
-        rows.setup(rowIf, false, stableSort_none, true); // turning on throwOnOom;
+        rows.setup(rowIf, ers_forbidden, stableSort_none, true); // turning on throwOnOom;
     }
     ~CRowMultiWriterReader()
     {

+ 1 - 1
thorlcr/thorutil/thbuf.hpp

@@ -81,7 +81,7 @@ interface IRowWriterMultiReader : extends IRowWriter
     virtual IRowStream *getReader() = 0;
 };
 
-extern graph_decl IRowWriterMultiReader *createOverflowableBuffer(CActivityBase &activity, IThorRowInterfaces *rowif, bool grouped, bool shared=false, unsigned spillPriority=SPILL_PRIORITY_OVERFLOWABLE_BUFFER);
+extern graph_decl IRowWriterMultiReader *createOverflowableBuffer(CActivityBase &activity, IThorRowInterfaces *rowif, EmptyRowSemantics emptyRowSemantics, bool shared=false, unsigned spillPriority=SPILL_PRIORITY_OVERFLOWABLE_BUFFER);
 // NB first write all then read (not interleaved!)
 
 // Multiple writers, one reader

+ 45 - 72
thorlcr/thorutil/thmem.cpp

@@ -229,7 +229,8 @@ public:
 class CSpillableStreamBase : public CSpillable
 {
 protected:
-    bool preserveNulls, ownsRows;
+    bool ownsRows;
+    EmptyRowSemantics emptyRowSemantics;
     unsigned spillCompInfo;
     CThorSpillableRowArray rows;
     OwnedIFile spillFile;
@@ -252,13 +253,13 @@ protected:
         return true;
     }
 public:
-    CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IThorRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriority)
-        : CSpillable(_activity, _rowIf, _spillPriority), rows(_activity), preserveNulls(_preserveNulls)
+    CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IThorRowInterfaces *_rowIf, EmptyRowSemantics _emptyRowSemantics, unsigned _spillPriority)
+        : CSpillable(_activity, _rowIf, _spillPriority), rows(_activity), emptyRowSemantics(_emptyRowSemantics)
     {
         assertex(inRows.isFlushed());
         ownsRows = false;
         spillCompInfo = 0x0;
-        rows.setup(rowIf, _preserveNulls);
+        rows.setup(rowIf, emptyRowSemantics);
         rows.swap(inRows);
     }
     ~CSpillableStreamBase()
@@ -337,9 +338,7 @@ class CSharedSpillableRowSet : public CSpillableStreamBase
                     {
                         block.clearCB = true;
                         assertex(((offset_t)-1) != outputOffset);
-                        unsigned rwFlags = DEFAULT_RWFLAGS;
-                        if (owner->preserveNulls)
-                            rwFlags |= rw_grouped;
+                        unsigned rwFlags = DEFAULT_RWFLAGS | mapESRToRWFlags(owner->emptyRowSemantics);
                         spillStream.setown(::createRowStreamEx(owner->spillFile, owner->rowIf, outputOffset, (offset_t)-1, (unsigned __int64)-1, rwFlags));
                         owner->rows.unregisterWriteCallback(*this); // no longer needed
                         ret = spillStream->nextRow();
@@ -358,7 +357,7 @@ class CSharedSpillableRowSet : public CSpillableStreamBase
                 }
                 if (ret)
                     return ret;
-                if (!owner->preserveNulls)
+                if (ers_forbidden == owner->emptyRowSemantics)
                     eos = true;
             }
             return nullptr;
@@ -380,8 +379,8 @@ class CSharedSpillableRowSet : public CSpillableStreamBase
     };
 
 public:
-    CSharedSpillableRowSet(CActivityBase &_activity, CThorSpillableRowArray &inRows, IThorRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriority)
-        : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls, _spillPriority)
+    CSharedSpillableRowSet(CActivityBase &_activity, CThorSpillableRowArray &inRows, IThorRowInterfaces *_rowIf, EmptyRowSemantics _emptyRowSemantics, unsigned _spillPriority)
+        : CSpillableStreamBase(_activity, inRows, _rowIf, _emptyRowSemantics, _spillPriority)
     {
         activateSpillingCallback();
     }
@@ -391,9 +390,7 @@ public:
         if (spillFile) // already spilled?
         {
             block.clearCB = true;
-            unsigned rwFlags = DEFAULT_RWFLAGS;
-            if (preserveNulls)
-                rwFlags |= rw_grouped;
+            unsigned rwFlags = DEFAULT_RWFLAGS | mapESRToRWFlags(emptyRowSemantics);
             return ::createRowStream(spillFile, rowIf, rwFlags);
         }
         rowidx_t toRead = rows.numCommitted();
@@ -414,8 +411,8 @@ class CSpillableStream : public CSpillableStreamBase, implements IRowStream
 public:
     IMPLEMENT_IINTERFACE_USING(CSpillableStreamBase);
 
-    CSpillableStream(CActivityBase &_activity, CThorSpillableRowArray &inRows, IThorRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriority, unsigned _spillCompInfo)
-        : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls, _spillPriority)
+    CSpillableStream(CActivityBase &_activity, CThorSpillableRowArray &inRows, IThorRowInterfaces *_rowIf, EmptyRowSemantics _emptyRowSemantics, unsigned _spillPriority, unsigned _spillCompInfo)
+        : CSpillableStreamBase(_activity, inRows, _rowIf, _emptyRowSemantics, _spillPriority)
     {
         spillCompInfo = _spillCompInfo;
         pos = numReadRows = 0;
@@ -449,8 +446,7 @@ public:
                     rwFlags |= rw_compress;
                     rwFlags |= spillCompInfo;
                 }
-                if (preserveNulls)
-                    rwFlags |= rw_grouped;
+                rwFlags |= mapESRToRWFlags(emptyRowSemantics);
                 spillStream.setown(createRowStream(spillFile, rowIf, rwFlags));
                 return spillStream->nextRow();
             }
@@ -668,18 +664,15 @@ inline bool CThorExpandingRowArray::_resize(rowidx_t requiredRows, unsigned maxS
     return true;
 }
 
-CThorExpandingRowArray::CThorExpandingRowArray(CActivityBase &_activity)
-    : activity(_activity)
+CThorExpandingRowArray::CThorExpandingRowArray(CActivityBase &_activity) : activity(_activity)
 {
-    initCommon();
-    setup(NULL, false, stableSort_none, true);
+    rowManager = activity.queryRowManager();
 }
 
-CThorExpandingRowArray::CThorExpandingRowArray(CActivityBase &_activity, IThorRowInterfaces *_rowIf, bool _allowNulls, StableSortFlag _stableSort, bool _throwOnOom, rowidx_t initialSize)
+CThorExpandingRowArray::CThorExpandingRowArray(CActivityBase &_activity, IThorRowInterfaces *_rowIf, EmptyRowSemantics _emptyRowSemantics, StableSortFlag _stableSort, bool _throwOnOom, rowidx_t initialSize)
     : activity(_activity)
 {
-    initCommon();
-    setup(_rowIf, _allowNulls, _stableSort, _throwOnOom);
+    setup(_rowIf, _emptyRowSemantics, _stableSort, _throwOnOom);
     if (initialSize)
     {
         rows = static_cast<const void * *>(rowManager->allocate(initialSize * sizeof(void*), activity.queryContainer().queryId(), defaultMaxSpillCost));
@@ -697,18 +690,10 @@ CThorExpandingRowArray::~CThorExpandingRowArray()
     ReleaseThorRow(stableTable);
 }
 
-void CThorExpandingRowArray::initCommon()
-{
-    stableTable = NULL;
-    rows = NULL;
-    maxRows = 0;
-    numRows = 0;
-}
-
-void CThorExpandingRowArray::setup(IThorRowInterfaces *_rowIf, bool _allowNulls, StableSortFlag _stableSort, bool _throwOnOom)
+void CThorExpandingRowArray::setup(IThorRowInterfaces *_rowIf, EmptyRowSemantics _emptyRowSemantics, StableSortFlag _stableSort, bool _throwOnOom)
 {
     rowIf = _rowIf;
-    allowNulls = _allowNulls;
+    emptyRowSemantics = _emptyRowSemantics;
     stableSort = _stableSort;
     throwOnOom = _throwOnOom;
     if (rowIf)
@@ -784,7 +769,7 @@ void CThorExpandingRowArray::swap(CThorExpandingRowArray &other)
     IThorRowInterfaces *otherRowIf = other.rowIf;
     const void **otherRows = other.rows;
     void **otherStableTable = other.stableTable;
-    bool otherAllowNulls = other.allowNulls;
+    EmptyRowSemantics otherEmptyRowSemantics = other.emptyRowSemantics;
     StableSortFlag otherStableSort = other.stableSort;
     bool otherThrowOnOom = other.throwOnOom;
     unsigned otherDefaultMaxSpillCost = other.defaultMaxSpillCost;
@@ -796,7 +781,7 @@ void CThorExpandingRowArray::swap(CThorExpandingRowArray &other)
     other.stableTable = stableTable;
     other.maxRows = maxRows;
     other.numRows = numRows;
-    other.setup(rowIf, allowNulls, stableSort, throwOnOom);
+    other.setup(rowIf, emptyRowSemantics, stableSort, throwOnOom);
     other.setDefaultMaxSpillCost(defaultMaxSpillCost);
 
     rowManager = otherRowManager;
@@ -804,7 +789,7 @@ void CThorExpandingRowArray::swap(CThorExpandingRowArray &other)
     stableTable = otherStableTable;
     maxRows = otherMaxRows;
     numRows = otherNumRows;
-    setup(otherRowIf, otherAllowNulls, otherStableSort, otherThrowOnOom);
+    setup(otherRowIf, otherEmptyRowSemantics, otherStableSort, otherThrowOnOom);
     setDefaultMaxSpillCost(otherDefaultMaxSpillCost);
 }
 
@@ -1160,7 +1145,7 @@ void CThorExpandingRowArray::serialize(MemoryBuffer &mb)
 {
     assertex(serializer);
     CMemoryRowSerializer s(mb);
-    if (!allowNulls)
+    if (emptyRowSemantics == ers_forbidden)
         serialize(s);
     else
     {
@@ -1234,7 +1219,7 @@ void CThorExpandingRowArray::deserializeRow(IRowDeserializerSource &in)
 
 void CThorExpandingRowArray::deserialize(size32_t sz, const void *buf)
 {
-    if (allowNulls)
+    if (emptyRowSemantics != ers_forbidden)
     {
         ASSERTEX((sz>=sizeof(short))&&(*(unsigned short *)buf==0x7631)); // check for mismatch
         buf = (const byte *)buf+sizeof(unsigned short);
@@ -1243,7 +1228,7 @@ void CThorExpandingRowArray::deserialize(size32_t sz, const void *buf)
     CThorStreamDeserializerSource d(sz,buf);
     while (!d.eos())
     {
-        if (allowNulls)
+        if (emptyRowSemantics != ers_forbidden)
         {
             bool nullrow;
             d.read(sizeof(bool),&nullrow);
@@ -1291,15 +1276,12 @@ void CThorSpillableRowArray::safeUnregisterWriteCallback(IWritePosCallback &cb)
 CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity)
     : CThorExpandingRowArray(activity)
 {
-    initCommon();
-    commitDelta = CommitStep;
     throwOnOom = false;
 }
 
-CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity, IThorRowInterfaces *rowIf, bool allowNulls, StableSortFlag stableSort, rowidx_t initialSize, size32_t _commitDelta)
-    : CThorExpandingRowArray(activity, rowIf, false, stableSort, false, initialSize), commitDelta(_commitDelta)
+CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity, IThorRowInterfaces *rowIf, EmptyRowSemantics emptyRowSemantics, StableSortFlag stableSort, rowidx_t initialSize, size32_t _commitDelta)
+    : CThorExpandingRowArray(activity, rowIf, ers_forbidden, stableSort, false, initialSize), commitDelta(_commitDelta)
 {
-    initCommon();
 }
 
 CThorSpillableRowArray::~CThorSpillableRowArray()
@@ -1307,12 +1289,6 @@ CThorSpillableRowArray::~CThorSpillableRowArray()
     clearRows();
 }
 
-void CThorSpillableRowArray::initCommon()
-{
-    commitRows = 0;
-    firstRow = 0;
-}
-
 void CThorSpillableRowArray::clearRows()
 {
     roxiemem::ReleaseRoxieRowRange(rows, firstRow, numRows);
@@ -1361,7 +1337,7 @@ rowidx_t CThorSpillableRowArray::save(IFile &iFile, unsigned _spillCompInfo, boo
     rowidx_t n = numCommitted();
     if (0 == n)
         return 0;
-    ActPrintLog(&activity, "%s: CThorSpillableRowArray::save (skipNulls=%s, allowNulls=%s) max rows = %"  RIPF "u", tracingPrefix, boolToStr(skipNulls), boolToStr(allowNulls), n);
+    ActPrintLog(&activity, "%s: CThorSpillableRowArray::save (skipNulls=%s, emptyRowSemantics=%u) max rows = %"  RIPF "u", tracingPrefix, boolToStr(skipNulls), emptyRowSemantics, n);
 
     if (_spillCompInfo)
         assertex(0 == writeCallbacks.ordinality()); // incompatible
@@ -1372,8 +1348,7 @@ rowidx_t CThorSpillableRowArray::save(IFile &iFile, unsigned _spillCompInfo, boo
         rwFlags |= rw_compress;
         rwFlags |= _spillCompInfo;
     }
-    if (allowNulls)
-        rwFlags |= rw_grouped;
+    rwFlags |= mapESRToRWFlags(emptyRowSemantics);
 
     // NB: This is always called within a CThorArrayLockBlock, as such no writebacks are added or updating
     rowidx_t nextCBI = RCIDXMAX; // indicates none
@@ -1420,7 +1395,7 @@ rowidx_t CThorSpillableRowArray::save(IFile &iFile, unsigned _spillCompInfo, boo
             }
             else if (!skipNulls)
             {
-                assertex(allowNulls);
+                assertex(emptyRowSemantics != ers_forbidden);
                 writer->putRow(NULL);
             }
             ++i;
@@ -1589,7 +1564,7 @@ void CThorSpillableRowArray::transferRowsCopy(const void **outRows, bool takeOwn
 IRowStream *CThorSpillableRowArray::createRowStream(unsigned spillPriority, unsigned spillCompInfo)
 {
     assertex(rowIf);
-    return new CSpillableStream(activity, *this, rowIf, allowNulls, spillPriority, spillCompInfo);
+    return new CSpillableStream(activity, *this, rowIf, emptyRowSemantics, spillPriority, spillCompInfo);
 }
 
 
@@ -1608,7 +1583,7 @@ protected:
     offset_t sizeSpill;
     ICompare *iCompare;
     StableSortFlag stableSort;
-    bool preserveGrouping;
+    EmptyRowSemantics emptyRowSemantics = ers_forbidden;
     CriticalSection readerLock;
     Owned<CSharedSpillableRowSet> spillableRowSet;
     unsigned options;
@@ -1646,10 +1621,10 @@ protected:
         spillCycles += spillTimer.elapsedCycles();
         return true;
     }
-    void setPreserveGrouping(bool _preserveGrouping)
+    void setEmptyRowSemantics(EmptyRowSemantics _emptyRowSemantics)
     {
-        preserveGrouping = _preserveGrouping;
-        spillableRows.setAllowNulls(preserveGrouping);
+        emptyRowSemantics = _emptyRowSemantics;
+        spillableRows.setEmptyRowSemantics(emptyRowSemantics);
     }
     bool flush()
     {
@@ -1734,8 +1709,7 @@ protected:
             rwFlags |= rw_compress;
             rwFlags |= spillCompInfo;
         }
-        if (preserveGrouping)
-            rwFlags |= rw_grouped;
+        rwFlags |= mapESRToRWFlags(emptyRowSemantics);
         IArrayOf<IRowStream> instrms;
         ForEachItemIn(f, spillFiles)
         {
@@ -1778,7 +1752,7 @@ protected:
                     instrms.append(*spillableRows.createRowStream(spillPriority, spillCompInfo)); // NB: stream will take ownership of rows in spillableRows
                 else
                 {
-                    spillableRowSet.setown(new CSharedSpillableRowSet(activity, spillableRows, rowIf, preserveGrouping, spillPriority));
+                    spillableRowSet.setown(new CSharedSpillableRowSet(activity, spillableRows, rowIf, emptyRowSemantics, spillPriority));
                     instrms.append(*spillableRowSet->createRowStream());
                 }
             }
@@ -1815,7 +1789,6 @@ public:
           iCompare(_iCompare), stableSort(_stableSort), diskMemMix(_diskMemMix),
           spillableRows(_activity)
     {
-        preserveGrouping = false;
         totalRows = 0;
         overflowCount = outStreams = 0;
         sizeSpill = 0;
@@ -1825,7 +1798,7 @@ public:
             activateSpillingCallback();
         maxCores = activity.queryMaxCores();
         options = 0;
-        spillableRows.setup(rowIf, false, stableSort);
+        spillableRows.setup(rowIf, ers_forbidden, stableSort);
         if (activity.getOptBool(THOROPT_COMPRESS_SPILLS, true))
         {
             StringBuffer compType;
@@ -1914,7 +1887,7 @@ public:
             mmRegistered = false;
             activity.queryRowManager()->removeRowBuffer(this);
         }
-        spillableRows.setup(rowIf, false, stableSort);
+        spillableRows.setup(rowIf, ers_forbidden, stableSort);
     }
     virtual void resize(rowidx_t max)
     {
@@ -1967,7 +1940,7 @@ class CThorRowLoader : public CThorRowCollectorBase, implements IThorRowLoader
         if (doReset)
             reset();
         activateSpillingCallback();
-        setPreserveGrouping(trl_preserveGrouping == grouping);
+        setEmptyRowSemantics(trl_preserveGrouping == grouping ? ers_eogonly : ers_forbidden);
         while (!abort)
         {
             const void *next = in->nextRow();
@@ -2050,10 +2023,10 @@ public:
     {
     }
 // IThorRowCollectorCommon
-    virtual void setPreserveGrouping(bool tf)
+    virtual void setEmptyRowSemantics(EmptyRowSemantics emptyGroupSemantics)
     {
-        assertex(!iCompare || !tf); // can't sort if group preserving
-        CThorRowCollectorBase::setPreserveGrouping(tf);
+        assertex(!iCompare || (ers_forbidden == emptyGroupSemantics)); // can't sort if preserving end of groups or nulls
+        CThorRowCollectorBase::setEmptyRowSemantics(emptyGroupSemantics);
     }
     virtual rowcount_t numRows() const { return CThorRowCollectorBase::numRows(); }
     virtual unsigned numOverflows() const { return CThorRowCollectorBase::numOverflows(); }
@@ -2118,10 +2091,10 @@ public:
     virtual bool shrink(StringBuffer *traceInfo) { return CThorRowCollectorBase::shrink(traceInfo); }
 };
 
-IThorRowCollector *createThorRowCollector(CActivityBase &activity, IThorRowInterfaces *rowIf, ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix, unsigned spillPriority, bool preserveGrouping)
+IThorRowCollector *createThorRowCollector(CActivityBase &activity, IThorRowInterfaces *rowIf, ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix, unsigned spillPriority, EmptyRowSemantics emptyRowSemantics)
 {
     Owned<IThorRowCollector> collector = new CThorRowCollector(activity, rowIf, iCompare, stableSort, diskMemMix, spillPriority);
-    collector->setPreserveGrouping(preserveGrouping);
+    collector->setEmptyRowSemantics(emptyRowSemantics);
     return collector.getClear();
 }
 

+ 26 - 28
thorlcr/thorutil/thmem.hpp

@@ -263,7 +263,6 @@ class graph_decl CThorExpandingRowArray : public CSimpleInterface
         virtual void unlock() const {  }
     } dummyLock;
 
-    void initCommon();
     bool resizeRowTable(void **&_rows, rowidx_t requiredRows, bool copy, unsigned maxSpillCost, memsize_t &newCapacity, const char *errMsg);
     bool _resize(rowidx_t requiredRows, unsigned maxSpillCost);
     const void **_allocateRowTable(rowidx_t num, unsigned maxSpillCost);
@@ -273,18 +272,18 @@ class graph_decl CThorExpandingRowArray : public CSimpleInterface
 
 protected:
     CActivityBase &activity;
-    IThorRowInterfaces *rowIf;
-    IEngineRowAllocator *allocator;
-    IOutputRowSerializer *serializer;
-    IOutputRowDeserializer *deserializer;
-    roxiemem::IRowManager *rowManager;
-    const void **rows;
-    void **stableTable;
-    bool throwOnOom; // tested during array expansion (resize())
-    bool allowNulls;
-    StableSortFlag stableSort;
-    rowidx_t maxRows;  // Number of rows that can fit in the allocated memory.
-    rowidx_t numRows;  // High water mark of rows added
+    IThorRowInterfaces *rowIf = nullptr;
+    IEngineRowAllocator *allocator = nullptr;
+    IOutputRowSerializer *serializer = nullptr;
+    IOutputRowDeserializer *deserializer = nullptr;
+    roxiemem::IRowManager *rowManager = nullptr;
+    const void **rows = nullptr;
+    void **stableTable = nullptr;
+    bool throwOnOom = true; // tested during array expansion (resize())
+    EmptyRowSemantics emptyRowSemantics = ers_forbidden;
+    StableSortFlag stableSort = stableSort_none;
+    rowidx_t maxRows = 0;  // Number of rows that can fit in the allocated memory.
+    rowidx_t numRows = 0;  // High water mark of rows added
     unsigned defaultMaxSpillCost = roxiemem::SpillAllCost;
 
     const void **allocateRowTable(rowidx_t num);
@@ -295,12 +294,12 @@ protected:
     inline rowidx_t getRowsCapacity() const { return rows ? RoxieRowCapacity(rows) / sizeof(void *) : 0; }
 public:
     CThorExpandingRowArray(CActivityBase &activity);
-    CThorExpandingRowArray(CActivityBase &activity, IThorRowInterfaces *rowIf, bool allowNulls=false, StableSortFlag stableSort=stableSort_none, bool throwOnOom=true, rowidx_t initialSize=InitialSortElements);
+    CThorExpandingRowArray(CActivityBase &activity, IThorRowInterfaces *rowIf, EmptyRowSemantics emptyRowSemantics=ers_forbidden, StableSortFlag stableSort=stableSort_none, bool throwOnOom=true, rowidx_t initialSize=InitialSortElements);
     ~CThorExpandingRowArray();
     CActivityBase &queryActivity() { return activity; }
     // NB: throws error on OOM by default
-    void setup(IThorRowInterfaces *rowIf, bool allowNulls=false, StableSortFlag stableSort=stableSort_none, bool throwOnOom=true);
-    inline void setAllowNulls(bool b) { allowNulls = b; }
+    void setup(IThorRowInterfaces *rowIf, EmptyRowSemantics emptyRowSemantics=ers_forbidden, StableSortFlag stableSort=stableSort_none, bool throwOnOom=true);
+    inline void setEmptyRowSemantics(EmptyRowSemantics _emptyRowSemantics) { emptyRowSemantics = _emptyRowSemantics; }
     inline void setDefaultMaxSpillCost(unsigned _defaultMaxSpillCost) { defaultMaxSpillCost = _defaultMaxSpillCost; }
     inline unsigned queryDefaultMaxSpillCost() const { return defaultMaxSpillCost; }
     void clearRows();
@@ -320,7 +319,7 @@ public:
     }
     inline bool append(const void *row) // NB: takes ownership on success
     {
-        assertex(row || allowNulls);
+        assertex(row || (emptyRowSemantics != ers_forbidden));
         if (numRows >= maxRows)
         {
             if (!resize(numRows+1))
@@ -404,14 +403,13 @@ interface IWritePosCallback : extends IInterface
 
 class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implements IThorArrayLock
 {
-    size32_t commitDelta;  // How many rows need to be written before they are added to the committed region?
-    rowidx_t firstRow; // Only rows firstRow..numRows are considered initialized.  Only read/write within cs.
-    rowidx_t commitRows;  // can only be updated by writing thread within a critical section
+    size32_t commitDelta = CommitStep;  // How many rows need to be written before they are added to the committed region?
+    rowidx_t firstRow = 0; // Only rows firstRow..numRows are considered initialized.  Only read/write within cs.
+    rowidx_t commitRows = 0;  // can only be updated by writing thread within a critical section
     mutable CriticalSection cs;
     ICopyArrayOf<IWritePosCallback> writeCallbacks;
     size32_t compBlkSz = 0; // means use default
 
-    void initCommon();
     bool _flush(bool force);
     void doFlush();
     inline bool needToMoveRows(bool force) { return (firstRow != 0 && (force || (firstRow >= commitRows/2))); }
@@ -419,18 +417,18 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem
 public:
 
     CThorSpillableRowArray(CActivityBase &activity);
-    CThorSpillableRowArray(CActivityBase &activity, IThorRowInterfaces *rowIf, bool allowNulls=false, StableSortFlag stableSort=stableSort_none, rowidx_t initialSize=InitialSortElements, size32_t commitDelta=CommitStep);
+    CThorSpillableRowArray(CActivityBase &activity, IThorRowInterfaces *rowIf, EmptyRowSemantics emptyRowSemantics=ers_forbidden, StableSortFlag stableSort=stableSort_none, rowidx_t initialSize=InitialSortElements, size32_t commitDelta=CommitStep);
     ~CThorSpillableRowArray();
     // NB: default throwOnOom to false
-    void setup(IThorRowInterfaces *rowIf, bool allowNulls=false, StableSortFlag stableSort=stableSort_none, bool throwOnOom=false)
+    void setup(IThorRowInterfaces *rowIf, EmptyRowSemantics emptyRowSemantics=ers_forbidden, StableSortFlag stableSort=stableSort_none, bool throwOnOom=false)
     {
-        CThorExpandingRowArray::setup(rowIf, allowNulls, stableSort, throwOnOom);
+        CThorExpandingRowArray::setup(rowIf, emptyRowSemantics, stableSort, throwOnOom);
     }
     void registerWriteCallback(IWritePosCallback &cb);
     void unregisterWriteCallback(IWritePosCallback &cb);
     void safeRegisterWriteCallback(IWritePosCallback &cb);
     void safeUnregisterWriteCallback(IWritePosCallback &cb);
-    inline void setAllowNulls(bool b) { CThorExpandingRowArray::setAllowNulls(b); }
+    inline void setEmptyRowSemantics(EmptyRowSemantics _emptyRowSemantics) { CThorExpandingRowArray::setEmptyRowSemantics(_emptyRowSemantics); }
     inline void setDefaultMaxSpillCost(unsigned defaultMaxSpillCost) { CThorExpandingRowArray::setDefaultMaxSpillCost(defaultMaxSpillCost); }
     inline void setCompBlockSize(size32_t sz) { compBlkSz = sz; }
     inline unsigned queryDefaultMaxSpillCost() const { return CThorExpandingRowArray::queryDefaultMaxSpillCost(); }
@@ -444,7 +442,7 @@ public:
     inline bool append(const void *row) __attribute__((warn_unused_result))
     {
         //GH->JCS Should this really be inline?
-        assertex(row || allowNulls);
+        assertex(row || (emptyRowSemantics != ers_forbidden));
         if (numRows >= maxRows)
         {
             if (!resize(numRows+1))
@@ -553,7 +551,7 @@ interface IThorRowLoader : extends IThorRowCollectorCommon
 
 interface IThorRowCollector : extends IThorRowCollectorCommon
 {
-    virtual void setPreserveGrouping(bool tf) = 0;
+    virtual void setEmptyRowSemantics(EmptyRowSemantics emptyRowSemantics) = 0;
     virtual IRowWriter *getWriter() = 0;
     virtual void reset() = 0;
     virtual IRowStream *getStream(bool shared=false, CThorExpandingRowArray *allMemRows=NULL) = 0;
@@ -564,7 +562,7 @@ interface IThorRowCollector : extends IThorRowCollectorCommon
 
 extern graph_decl IThorRowLoader *createThorRowLoader(CActivityBase &activity, IThorRowInterfaces *rowIf, ICompare *iCompare=NULL, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT);
 extern graph_decl IThorRowLoader *createThorRowLoader(CActivityBase &activity, ICompare *iCompare=NULL, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT);
-extern graph_decl IThorRowCollector *createThorRowCollector(CActivityBase &activity, IThorRowInterfaces *rowIf, ICompare *iCompare=NULL, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT, bool preserveGrouping=false);
+extern graph_decl IThorRowCollector *createThorRowCollector(CActivityBase &activity, IThorRowInterfaces *rowIf, ICompare *iCompare=NULL, StableSortFlag stableSort=stableSort_none, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=SPILL_PRIORITY_DEFAULT, EmptyRowSemantics emptyRowSemantics=ers_forbidden);