Browse Source

HPCC-15995 Allow heapflags to be configured in thor

Mainly here to allow testing of different allocators, but also in
preparation for thor to use different flags depending on the context.

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 9 years ago
parent
commit
98dd41f618
39 changed files with 102 additions and 56 deletions
  1. 2 1
      common/thorhelper/roxiehelper.cpp
  2. 6 5
      common/thorhelper/thorcommon.cpp
  3. 5 1
      common/thorhelper/thorcommon.hpp
  4. 4 0
      ecl/eclagent/eclagent.ipp
  5. 2 2
      ecl/hthor/hthor.cpp
  6. 4 0
      roxie/ccd/ccdactivities.cpp
  7. 5 0
      roxie/ccd/ccdcontext.cpp
  8. 6 6
      roxie/ccd/ccdserver.cpp
  9. 1 1
      roxie/ccd/ccdserver.hpp
  10. 1 0
      rtl/include/eclhelper.hpp
  11. 1 1
      thorlcr/activities/aggregate/thaggregate.cpp
  12. 1 1
      thorlcr/activities/aggregate/thaggregateslave.cpp
  13. 1 1
      thorlcr/activities/diskread/thdiskread.cpp
  14. 4 4
      thorlcr/activities/fetch/thfetchslave.cpp
  15. 2 2
      thorlcr/activities/hashdistrib/thhashdistribslave.cpp
  16. 1 1
      thorlcr/activities/indexread/thindexread.cpp
  17. 1 1
      thorlcr/activities/iterate/thgroupiterateslave.cpp
  18. 3 3
      thorlcr/activities/join/thjoin.cpp
  19. 3 3
      thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp
  20. 1 1
      thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
  21. 1 1
      thorlcr/activities/loop/thloop.cpp
  22. 1 1
      thorlcr/activities/merge/thmerge.cpp
  23. 2 2
      thorlcr/activities/msort/thmsort.cpp
  24. 1 1
      thorlcr/activities/msort/thmsortslave.cpp
  25. 1 1
      thorlcr/activities/piperead/thprslave.cpp
  26. 1 1
      thorlcr/activities/result/thresult.cpp
  27. 1 1
      thorlcr/activities/thdiskbaseslave.cpp
  28. 1 1
      thorlcr/activities/wuidwrite/thwuidwrite.cpp
  29. 13 5
      thorlcr/graph/thgraph.cpp
  30. 5 0
      thorlcr/graph/thgraph.hpp
  31. 4 0
      thorlcr/graph/thgraphslave.cpp
  32. 2 1
      thorlcr/msort/tsorta.cpp
  33. 2 1
      thorlcr/msort/tsortm.cpp
  34. 1 1
      thorlcr/msort/tsorts.cpp
  35. 5 0
      thorlcr/thorcodectx/thcodectx.cpp
  36. 1 0
      thorlcr/thorcodectx/thcodectx.hpp
  37. 4 4
      thorlcr/thorutil/thmem.cpp
  38. 1 1
      thorlcr/thorutil/thmem.hpp
  39. 1 0
      tools/wutool/wutool.cpp

+ 2 - 1
common/thorhelper/roxiehelper.cpp

@@ -1162,7 +1162,8 @@ protected:
             StringBuffer spillBasename;
             spillBasename.append(tempDirectory).append(PATHSEPCHAR).appendf("spill_sort_%" I64F "u", seq);
             Owned<IRowLinkCounter> linker = new RoxieRowLinkCounter();
-            Owned<IRowInterfaces> rowInterfaces = createRowInterfaces(rowMeta, activityId, ctx);
+            unsigned heapFlags = 0;
+            Owned<IRowInterfaces> rowInterfaces = createRowInterfaces(rowMeta, activityId, heapFlags, ctx);
             diskMerger.setown(createDiskMerger(rowInterfaces, linker, spillBasename));
         }
         return diskMerger;

+ 6 - 5
common/thorhelper/thorcommon.cpp

@@ -1057,7 +1057,7 @@ void CThorContiguousRowBuffer::skipVUni()
 
 // ===========================================
 
-IRowInterfaces *createRowInterfaces(IOutputMetaData *meta, unsigned actid, ICodeContext *context)
+IRowInterfaces *createRowInterfaces(IOutputMetaData *meta, unsigned actid, unsigned heapFlags, ICodeContext *context)
 {
     class cRowInterfaces: implements IRowInterfaces, public CSimpleInterface
     {
@@ -1070,11 +1070,12 @@ IRowInterfaces *createRowInterfaces(IOutputMetaData *meta, unsigned actid, ICode
         CSingletonLock allocatorlock;
         CSingletonLock serializerlock;
         CSingletonLock deserializerlock;
+        unsigned heapFlags;
 
     public:
         IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-        cRowInterfaces(IOutputMetaData *_meta,unsigned _actid, ICodeContext *_context)
-            : meta(_meta)
+        cRowInterfaces(IOutputMetaData *_meta,unsigned _actid, unsigned _heapFlags, ICodeContext *_context)
+            : meta(_meta), heapFlags(_heapFlags)
         {
             context = _context;
             actid = _actid;
@@ -1083,7 +1084,7 @@ IRowInterfaces *createRowInterfaces(IOutputMetaData *meta, unsigned actid, ICode
         {
             if (allocatorlock.lock()) {
                 if (!allocator&&meta) 
-                    allocator.setown(context->getRowAllocator(meta, actid));
+                    allocator.setown(context->getRowAllocatorEx(meta, actid, heapFlags));
                 allocatorlock.unlock();
             }
             return allocator;
@@ -1119,7 +1120,7 @@ IRowInterfaces *createRowInterfaces(IOutputMetaData *meta, unsigned actid, ICode
             return context;
         }
     };
-    return new cRowInterfaces(meta,actid,context);
+    return new cRowInterfaces(meta,actid,heapFlags,context);
 };
 
 class CRowStreamReader : implements IExtRowStream, public CSimpleInterface

+ 5 - 1
common/thorhelper/thorcommon.hpp

@@ -79,7 +79,7 @@ interface IRowInterfaces: extends IInterface
 
 extern THORHELPER_API void useMemoryMappedRead(bool on);
 
-extern THORHELPER_API IRowInterfaces *createRowInterfaces(IOutputMetaData *meta, unsigned actid, ICodeContext *context);
+extern THORHELPER_API IRowInterfaces *createRowInterfaces(IOutputMetaData *meta, unsigned actid, unsigned heapFlags, ICodeContext *context);
 
 
 enum RowReaderWriterFlags
@@ -525,6 +525,10 @@ public:
     {
         return ctx->getRowAllocator(meta, activityId);
     }
+    virtual IEngineRowAllocator * getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, unsigned heapFlags) const
+    {
+        return ctx->getRowAllocatorEx(meta, activityId, heapFlags);
+    }
     virtual const char *cloneVString(const char *str) const
     {
         return ctx->cloneVString(str);

+ 4 - 0
ecl/eclagent/eclagent.ipp

@@ -608,6 +608,10 @@ public:
     {
         return allocatorMetaCache->ensure(meta, activityId, roxiemem::RHFnone);
     }
+    virtual IEngineRowAllocator * getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, unsigned heapFlags) const
+    {
+        return allocatorMetaCache->ensure(meta, activityId, (roxiemem::RoxieHeapFlags)heapFlags);
+    }
     virtual const char *cloneVString(const char *str) const
     {
         return rowManager->cloneVString(str);

+ 2 - 2
ecl/hthor/hthor.cpp

@@ -376,7 +376,7 @@ void CHThorDiskWriteActivity::ready()
     uncompressedBytesWritten = 0;
     numRecords = 0;
     sizeLimit = agent.queryWorkUnit()->getDebugValueInt64("hthorDiskWriteSizeLimit", defaultHThorDiskWriteSizeLimit);
-    rowIf.setown(createRowInterfaces(input->queryOutputMeta(), activityId, agent.queryCodeContext()));
+    rowIf.setown(createRowInterfaces(input->queryOutputMeta(), activityId, 0, agent.queryCodeContext()));
     open();
 }
 
@@ -3943,7 +3943,7 @@ bool CHThorGroupSortActivity::sortAndSpillRows()
             }
         };
         Owned<IRowLinkCounter> linker = new CHThorRowLinkCounter();
-        Owned<IRowInterfaces> rowInterfaces = createRowInterfaces(input->queryOutputMeta(), activityId, agent.queryCodeContext());
+        Owned<IRowInterfaces> rowInterfaces = createRowInterfaces(input->queryOutputMeta(), activityId, 0, agent.queryCodeContext());
         diskMerger.setown(createDiskMerger(rowInterfaces, linker, fbase.str()));
     }
     sorter->performSort();

+ 4 - 0
roxie/ccd/ccdactivities.cpp

@@ -595,6 +595,10 @@ public:
     {
         return queryContext->queryCodeContext()->getRowAllocator(meta, activityId);
     }
+    virtual IEngineRowAllocator * getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, unsigned heapFlags) const
+    {
+        return queryContext->queryCodeContext()->getRowAllocatorEx(meta, activityId, heapFlags);
+    }
     virtual const char *cloneVString(const char *str) const
     {
         return queryContext->queryCodeContext()->cloneVString(str);

+ 5 - 0
roxie/ccd/ccdcontext.cpp

@@ -1674,6 +1674,11 @@ public:
         return allocatorMetaCache->ensure(meta, activityId, roxiemem::RHFnone);
     }
 
+    virtual IEngineRowAllocator *getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, unsigned flags) const
+    {
+        return allocatorMetaCache->ensure(meta, activityId, (roxiemem::RoxieHeapFlags)flags);
+    }
+
     virtual IEngineRowAllocator *getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
     {
         return allocatorMetaCache->ensure(meta, activityId, flags);

+ 6 - 6
roxie/ccd/ccdserver.cpp

@@ -406,7 +406,7 @@ public:
         dependentCount = 0;
         optParallel = _graphNode.getPropInt("att[@name='parallel']/@value", 0);
         optUnordered = !_graphNode.getPropBool("att[@name='ordered']/@value", true);
-        heapFlags = _graphNode.getPropInt("hint[@name='heap']/@value", 0);
+        heapFlags = _graphNode.getPropInt("hint[@name='heapflags']/@value", 0);
     }
     
     ~CRoxieServerActivityFactoryBase()
@@ -616,9 +616,9 @@ public:
     {
         return actStatistics; // Overridden by anyone that needs more
     }
-    virtual unsigned getHeapFlags() const
+    virtual roxiemem::RoxieHeapFlags getHeapFlags() const
     {
-        return heapFlags;
+        return (roxiemem::RoxieHeapFlags)heapFlags;
     }
 };
 
@@ -1071,7 +1071,7 @@ public:
     inline void createRowAllocator()
     {
         if (!rowAllocator) 
-            rowAllocator = ctx->queryCodeContext()->getRowAllocator(meta.queryOriginal(), activityId);
+            rowAllocator = ctx->getRowAllocatorEx(meta.queryOriginal(), activityId, factory->getHeapFlags());
     }
 
     inline ICodeContext *queryCodeContext()
@@ -1650,7 +1650,7 @@ public:
     {
         if (needsAllocator)
         {
-            unsigned extraFlags = _parent.factory->getHeapFlags();
+            roxiemem::RoxieHeapFlags extraFlags = _parent.factory->getHeapFlags();
             rowAllocator = parent.queryContext()->getRowAllocatorEx(parent.queryOutputMeta(), parent.queryId(), (roxiemem::RoxieHeapFlags)(roxiemem::RHFunique|extraFlags));
         }
         else
@@ -11470,7 +11470,7 @@ public:
         if (extend)
             diskout->seek(0, IFSend);
         tallycrc = !factory->queryQueryFactory().queryOptions().skipFileFormatCrcCheck && !(helper.getFlags() & TDRnocrccheck) && !blockcompressed;
-        Owned<IRowInterfaces> rowIf = createRowInterfaces(input->queryOutputMeta(), activityId, ctx->queryCodeContext());
+        Owned<IRowInterfaces> rowIf = createRowInterfaces(input->queryOutputMeta(), activityId, factory->getHeapFlags(), ctx->queryCodeContext());
         rowSerializer.set(rowIf->queryRowSerializer());
         unsigned rwFlags = rw_autoflush;
         if(grouped)

+ 1 - 1
roxie/ccd/ccdserver.hpp

@@ -219,7 +219,7 @@ interface IRoxieServerActivityFactory : extends IActivityFactory
     virtual unsigned numInputs() const = 0;
     virtual const StatisticsMapping &queryStatsMapping() const = 0;
     virtual bool isInputOrdered(bool consumerOrdered, unsigned idx) const = 0;
-    virtual unsigned getHeapFlags() const = 0;
+    virtual roxiemem::RoxieHeapFlags getHeapFlags() const = 0;
 };
 interface IGraphResult : public IInterface
 {

+ 1 - 0
rtl/include/eclhelper.hpp

@@ -654,6 +654,7 @@ interface ICodeContext : public IResourceContext
     virtual void getRowJSON(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags) = 0;
     virtual unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence) = 0;
     virtual ISectionTimer * registerTimer(unsigned activityId, const char * name) = 0;
+    virtual IEngineRowAllocator * getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, unsigned flags) const = 0;
 };
 
 

+ 1 - 1
thorlcr/activities/aggregate/thaggregate.cpp

@@ -74,7 +74,7 @@ public:
             if (sz)
             {
                 IHThorThroughAggregateArg *helper = (IHThorThroughAggregateArg *)queryHelper();
-                Owned<IThorRowInterfaces> aggRowIf = createThorRowInterfaces(queryRowManager(), helper->queryAggregateRecordSize(), queryId(), queryCodeContext());
+                Owned<IThorRowInterfaces> aggRowIf = createThorRowInterfaces(queryRowManager(), helper->queryAggregateRecordSize(), queryId(), 0, queryCodeContext());
                 CThorStreamDeserializerSource mds(sz, msg.readDirect(sz));
                 RtlDynamicRowBuilder rowBuilder(aggRowIf->queryRowAllocator());
                 size32_t sz = aggRowIf->queryRowDeserializer()->deserialize(rowBuilder, mds);

+ 1 - 1
thorlcr/activities/aggregate/thaggregateslave.cpp

@@ -262,7 +262,7 @@ public:
     {
         ActivityTimer s(totalCycles, timeActivities);
         PARENT::start();
-        aggrowif.setown(createThorRowInterfaces(queryRowManager(), helper->queryAggregateRecordSize(),queryId(),queryCodeContext()));
+        aggrowif.setown(createRowInterfaces(helper->queryAggregateRecordSize()));
         partResult.setAllocator(aggrowif->queryRowAllocator()).ensureRow();
         helper->clearAggregate(partResult);
     }

+ 1 - 1
thorlcr/activities/diskread/thdiskread.cpp

@@ -117,7 +117,7 @@ public:
     {
         IRecordSize *recordSize = helper->queryOutputMeta();
 
-        Owned<IThorRowInterfaces> rowIf = createThorRowInterfaces(queryRowManager(), helper->queryOutputMeta(), queryId(), queryCodeContext());
+        Owned<IThorRowInterfaces> rowIf = createRowInterfaces(helper->queryOutputMeta());
         OwnedConstThorRow result = getAggregate(*this, container.queryJob().querySlaves(), *rowIf, *helper, mpTag);
         if (!result)
             return;

+ 4 - 4
thorlcr/activities/fetch/thfetchslave.cpp

@@ -340,7 +340,7 @@ public:
             memset(encryptedKey, 0, encryptedKeyLen);
             free(encryptedKey);
         }
-        fetchDiskRowIf.setown(createThorRowInterfaces(queryRowManager(), fetchContext->queryDiskRecordSize(), queryId(), queryCodeContext()));
+        fetchDiskRowIf.setown(createRowInterfaces(fetchContext->queryDiskRecordSize()));
     }
 
     virtual void initializeFileParts()
@@ -422,7 +422,7 @@ public:
                 keyIn = new CKeyFieldExtract(this, *inputStream, *fetchBaseHelper, *fetchContext);
                 keyInMeta.set(QUERYINTERFACE(fetchBaseHelper->queryExtractedSize(), IOutputMetaData));
             }
-            keyInIf.setown(createThorRowInterfaces(queryRowManager(), keyInMeta,queryId(),queryCodeContext()));
+            keyInIf.setown(createRowInterfaces(keyInMeta));
         }
         else
         {
@@ -451,11 +451,11 @@ public:
                 }
             };
             Owned<IOutputMetaData> fmeta = createFixedSizeMetaData(sizeof(offset_t)); // should be provided by Gavin?
-            keyInIf.setown(createThorRowInterfaces(queryRowManager(), fmeta,queryId(),queryCodeContext()));
+            keyInIf.setown(createRowInterfaces(fmeta));
             keyIn = new CKeyFPosExtract(keyInIf, this, *inputStream, *fetchBaseHelper, *fetchContext);
         }
 
-        Owned<IThorRowInterfaces> rowIf = createThorRowInterfaces(queryRowManager(), queryRowMetaData(), queryId(), queryCodeContext());
+        Owned<IThorRowInterfaces> rowIf = createRowInterfaces(queryRowMetaData());
         fetchStream = createFetchStream(*this, keyInIf, rowIf, abortSoon, parts, offsetCount, offsetMapSz, offsetMapBytes.toByteArray(), this, mptag, eexp);
         fetchStreamOut = fetchStream->queryOutput();
         fetchStream->start(keyIn);

+ 2 - 2
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -2852,7 +2852,7 @@ public:
         }
         if (extractKey)
         {
-            _keyRowInterfaces.setown(createThorRowInterfaces(queryRowManager(), km, queryId(), queryCodeContext()));
+            _keyRowInterfaces.setown(createRowInterfaces(km));
             keyRowInterfaces = _keyRowInterfaces;
             rowKeyCompare = helper->queryRowKeyCompare();
             iKeyHash = helper->queryKeyHash();
@@ -3764,7 +3764,7 @@ RowAggregator *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivityBas
             virtual void stop() { }
         };
         Owned<IOutputMetaData> nodeRowMeta = createOutputMetaDataWithChildRow(activity.queryRowAllocator(), sizeof(size32_t));
-        Owned<IThorRowInterfaces> nodeRowMetaRowIf = createThorRowInterfaces(activity.queryRowManager(), nodeRowMeta, activity.queryId(), activity.queryCodeContext());
+        Owned<IThorRowInterfaces> nodeRowMetaRowIf = activity.createRowInterfaces(nodeRowMeta);
         Owned<IRowStream> localAggregatedStream = new CRowAggregatedStream(activity, nodeRowMetaRowIf, localAggTable);
         class CNodeCompare : implements ICompare, implements IHash
         {

+ 1 - 1
thorlcr/activities/indexread/thindexread.cpp

@@ -449,7 +449,7 @@ public:
     {
         if (container.queryLocalOrGrouped())
             return;
-        Owned<IThorRowInterfaces> rowIf = createThorRowInterfaces(queryRowManager(), helper->queryOutputMeta(), queryId(), queryCodeContext());
+        Owned<IThorRowInterfaces> rowIf = createRowInterfaces(helper->queryOutputMeta());
         OwnedConstThorRow result = getAggregate(*this, container.queryJob().querySlaves(), *rowIf, *helper, mpTag);
         if (!result)
             return;

+ 1 - 1
thorlcr/activities/iterate/thgroupiterateslave.cpp

@@ -118,7 +118,7 @@ public:
     GroupProcessSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
         helper = static_cast <IHThorProcessArg *> (queryHelper());
-        rightrowif.setown(createThorRowInterfaces(queryRowManager(), helper->queryRightRecordSize(),queryId(),queryCodeContext()));
+        rightrowif.setown(createRowInterfaces(helper->queryRightRecordSize()));
         rightAllocator.set(rightrowif->queryRowAllocator());
         appendOutputLinked(this);   // adding 'me' to outputs array
     }

+ 3 - 3
thorlcr/activities/join/thjoin.cpp

@@ -189,10 +189,10 @@ public:
                 }
                 if (helper->getJoinFlags()&JFslidingmatch) // JCSMORE shouldn't be necessary
                     primaryKeySerializer = NULL;
-                Owned<IThorRowInterfaces> primaryRowIf = createThorRowInterfaces(queryRowManager(), primaryInput->queryHelper()->queryOutputMeta(), queryId(), queryCodeContext());
+                Owned<IThorRowInterfaces> primaryRowIf = createRowInterfaces(primaryInput->queryHelper()->queryOutputMeta());
                 Owned<IThorRowInterfaces> secondaryRowIf;
                 if (secondaryInput)
-                    secondaryRowIf.setown(createThorRowInterfaces(queryRowManager(), secondaryInput->queryHelper()->queryOutputMeta(), queryId(), queryCodeContext()));
+                    secondaryRowIf.setown(createRowInterfaces(secondaryInput->queryHelper()->queryOutputMeta()));
 
                 bool betweenjoin = (helper->getJoinFlags()&JFslidingmatch)!=0;
                 if (container.getKind() == TAKselfjoin)
@@ -234,7 +234,7 @@ public:
                 }
                 else if (!nosortPrimary()||betweenjoin)
                 {
-                    Owned<IThorRowInterfaces> secondaryRowIf = createThorRowInterfaces(queryRowManager(), secondaryInput->queryHelper()->queryOutputMeta(), queryId(), queryCodeContext());
+                    Owned<IThorRowInterfaces> secondaryRowIf = createRowInterfaces(secondaryInput->queryHelper()->queryOutputMeta());
 
                     imaster->SortSetup(primaryRowIf, primaryCompare, primaryKeySerializer, false, true, NULL, NULL);
                     ActPrintLog("JOIN waiting for barrier.1");

+ 3 - 3
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -738,7 +738,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
                     unsigned endRequestsCount = owner.container.queryJob().querySlaves();
                     Owned<IBitSet> endRequests = createThreadSafeBitSet(); // NB: verification only
 
-                    Owned<IThorRowInterfaces> fetchDiskRowIf = createThorRowInterfaces(owner.queryRowManager(), owner.helper->queryDiskRecordSize(),owner.queryId(),owner.queryCodeContext());
+                    Owned<IThorRowInterfaces> fetchDiskRowIf = owner.createRowInterfaces(owner.helper->queryDiskRecordSize());
                     while (!aborted)
                     {
                         CMessageBuffer replyMb;
@@ -1964,11 +1964,11 @@ public:
                 }
                 else
                     fetchInputMeta.setown(createFixedSizeMetaData(FETCHKEY_HEADER_SIZE));
-                fetchInputMetaRowIf.setown(createThorRowInterfaces(queryRowManager(), fetchInputMeta,queryId(),queryCodeContext()));
+                fetchInputMetaRowIf.setown(createRowInterfaces(fetchInputMeta));
                 fetchInputMetaAllocator.set(fetchInputMetaRowIf->queryRowAllocator());
 
                 Owned<IOutputMetaData> fetchOutputMeta = createOutputMetaDataWithChildRow(joinFieldsAllocator, FETCHKEY_HEADER_SIZE);
-                fetchOutputRowIf.setown(createThorRowInterfaces(queryRowManager(), fetchOutputMeta,queryId(),queryCodeContext()));
+                fetchOutputRowIf.setown(createRowInterfaces(fetchOutputMeta));
 
                 fetchHandler = new CKeyedFetchHandler(*this);
 

+ 1 - 1
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -1435,7 +1435,7 @@ public:
 
         if (isGlobal())
         {
-            sharedRightRowInterfaces.setown(createThorRowInterfaces(rightRowManager, rightOutputMeta, queryId(), &queryJobChannel().querySharedMemCodeContext()));
+            sharedRightRowInterfaces.setown(createThorRowInterfaces(rightRowManager, rightOutputMeta, queryId(), queryHeapFlags(), &queryJobChannel().querySharedMemCodeContext()));
             rhs.setup(sharedRightRowInterfaces);
 
             // It is not until here, that it is guaranteed all channel slave activities have been initialized.

+ 1 - 1
thorlcr/activities/loop/thloop.cpp

@@ -338,7 +338,7 @@ public:
         assertex(container.queryResultsGraph());
         Owned<CGraphBase> graph = queryJobChannel().getGraph(container.queryResultsGraph()->queryGraphId());
         IHThorLocalResultWriteArg *helper = (IHThorLocalResultWriteArg *)queryHelper();
-        inputRowIf.setown(createThorRowInterfaces(queryRowManager(), container.queryInput(0)->queryHelper()->queryOutputMeta(),queryId(),queryCodeContext()));
+        inputRowIf.setown(createRowInterfaces(container.queryInput(0)->queryHelper()->queryOutputMeta()));
         createResult();
     }
 };

+ 1 - 1
thorlcr/activities/merge/thmerge.cpp

@@ -58,7 +58,7 @@ public:
         ActPrintLog("GlobalMergeActivityMaster::process");
         CMasterActivity::process();     
         IHThorMergeArg *helper = (IHThorMergeArg *)queryHelper();   
-        Owned<IThorRowInterfaces> rowif = createThorRowInterfaces(queryRowManager(), helper->queryOutputMeta(),queryId(),queryCodeContext());
+        Owned<IThorRowInterfaces> rowif = createRowInterfaces(helper->queryOutputMeta());
         CThorKeyArray sample(*this, rowif,helper->querySerialize(),helper->queryCompare(),helper->queryCompareKey(),helper->queryCompareRowKey());
 
         unsigned n = container.queryJob().querySlaves();

+ 2 - 2
thorlcr/activities/msort/thmsort.cpp

@@ -174,8 +174,8 @@ protected:
                 skewThreshold = container.queryJob().getWorkUnitValueInt("defaultSkewThreshold", 0);
         }
 
-        Owned<IThorRowInterfaces> rowif = createThorRowInterfaces(queryRowManager(), container.queryInput(0)->queryHelper()->queryOutputMeta(),queryId(),queryCodeContext());
-        Owned<IThorRowInterfaces> auxrowif = createThorRowInterfaces(queryRowManager(), helper->querySortedRecordSize(),queryId(),queryCodeContext());
+        Owned<IThorRowInterfaces> rowif = createRowInterfaces(container.queryInput(0)->queryHelper()->queryOutputMeta());
+        Owned<IThorRowInterfaces> auxrowif = createRowInterfaces(helper->querySortedRecordSize());
         try
         {
             imaster->SortSetup(rowif,helper->queryCompare(),helper->querySerialize(),cosortfilenames.length()!=0,true,cosortfilenames.str(),auxrowif);

+ 1 - 1
thorlcr/activities/msort/thmsortslave.cpp

@@ -105,7 +105,7 @@ public:
             }
             
             Linked<IThorRowInterfaces> rowif = queryRowInterfaces(input);
-            Owned<IThorRowInterfaces> auxrowif = createThorRowInterfaces(queryRowManager(), helper->querySortedRecordSize(),queryId(),queryCodeContext());
+            Owned<IThorRowInterfaces> auxrowif = createRowInterfaces(helper->querySortedRecordSize());
             sorter->Gather(
                 rowif,
                 inputStream,

+ 1 - 1
thorlcr/activities/piperead/thprslave.cpp

@@ -194,7 +194,7 @@ public:
         needTransform = false;
 
         if (needTransform)
-            inrowif.setown(createThorRowInterfaces(queryRowManager(), helper->queryDiskRecordSize(), queryId(), queryCodeContext()));
+            inrowif.setown(createRowInterfaces(helper->queryDiskRecordSize()));
         setRequireInitData(false);
         appendOutputLinked(this);
     }

+ 1 - 1
thorlcr/activities/result/thresult.cpp

@@ -65,7 +65,7 @@ public:
                 if (results)
                     throw MakeThorException(TE_UnexpectedMultipleSlaveResults, "Received greater than one result from slaves");
                 IHThorRemoteResultArg *helper = (IHThorRemoteResultArg *)queryHelper();
-                Owned<IThorRowInterfaces> resultRowIf = createThorRowInterfaces(queryRowManager(), helper->queryOutputMeta(), queryId(), queryCodeContext());
+                Owned<IThorRowInterfaces> resultRowIf = createRowInterfaces(helper->queryOutputMeta());
                 CThorStreamDeserializerSource mds(sz, mb.readDirect(sz));
                 RtlDynamicRowBuilder rowBuilder(resultRowIf->queryRowAllocator());
                 size32_t sz = resultRowIf->queryRowDeserializer()->deserialize(rowBuilder, mds);

+ 1 - 1
thorlcr/activities/thdiskbaseslave.cpp

@@ -279,7 +279,7 @@ void CDiskReadSlaveActivityBase::kill()
 IThorRowInterfaces * CDiskReadSlaveActivityBase::queryDiskRowInterfaces()
 {
     if (!diskRowIf) 
-        diskRowIf.setown(createThorRowInterfaces(queryRowManager(), helper->queryDiskRecordSize(),queryId(),queryCodeContext()));
+        diskRowIf.setown(createRowInterfaces(helper->queryDiskRecordSize()));
     return diskRowIf;
 }
 

+ 1 - 1
thorlcr/activities/wuidwrite/thwuidwrite.cpp

@@ -302,7 +302,7 @@ public:
     {
         assertex(complete);
         ActPrintLog("dictionary result");
-        Owned<IThorRowInterfaces> rowIf = createThorRowInterfaces(queryRowManager(), container.queryInput(0)->queryHelper()->queryOutputMeta(),queryId(),queryCodeContext());
+        Owned<IThorRowInterfaces> rowIf = createRowInterfaces(container.queryInput(0)->queryHelper()->queryOutputMeta());
         IOutputRowDeserializer *deserializer = rowIf->queryRowDeserializer();
         CMessageBuffer mb;
         Owned<ISerialStream> stream = createMemoryBufferSerialStream(resultData);

+ 13 - 5
thorlcr/graph/thgraph.cpp

@@ -225,7 +225,7 @@ public:
     virtual void prepareCounterResult(CActivityBase &activity, IThorGraphResults *results, unsigned loopCounter, unsigned pos)
     {
         if (!countRowIf)
-            countRowIf.setown(createThorRowInterfaces(activity.queryRowManager(), counterMeta, activityId, activity.queryCodeContext()));
+            countRowIf.setown(activity.createRowInterfaces(counterMeta));
         RtlDynamicRowBuilder counterRow(countRowIf->queryRowAllocator());
         thor_loop_counter_t * res = (thor_loop_counter_t *)counterRow.ensureCapacity(sizeof(thor_loop_counter_t),NULL);
         *res = loopCounter;
@@ -237,13 +237,13 @@ public:
     virtual void prepareLoopAgainResult(CActivityBase &activity, IThorGraphResults *results, unsigned pos)
     {
         if (!loopAgainRowIf)
-            loopAgainRowIf.setown(createThorRowInterfaces(activity.queryRowManager(), loopAgainMeta, activityId, activity.queryCodeContext()));
+            loopAgainRowIf.setown(activity.createRowInterfaces(loopAgainMeta));
         activity.queryGraph().createResult(activity, pos, results, loopAgainRowIf, !activity.queryGraph().isLocalChild(), SPILL_PRIORITY_DISABLE);
     }
     virtual void prepareLoopResults(CActivityBase &activity, IThorGraphResults *results)
     {
         if (!resultRowIf)
-            resultRowIf.setown(createThorRowInterfaces(activity.queryRowManager(), resultMeta, activityId, activity.queryCodeContext()));
+            resultRowIf.setown(activity.createRowInterfaces(resultMeta));
         IThorResult *loopResult = results->createResult(activity, 0, resultRowIf, !activity.queryGraph().isLocalChild()); // loop output
         IThorResult *inputResult = results->createResult(activity, 1, resultRowIf, !activity.queryGraph().isLocalChild()); // loop input
     }
@@ -2953,6 +2953,11 @@ void CActivityBase::ActPrintLog(IException *e)
     ActPrintLog(e, "%s", "");
 }
 
+IThorRowInterfaces * CActivityBase::createRowInterfaces(IOutputMetaData * meta)
+{
+    return createThorRowInterfaces(queryRowManager(), meta, queryId(), queryHeapFlags(), queryCodeContext());
+}
+
 bool CActivityBase::fireException(IException *e)
 {
     Owned<IThorException> _te;
@@ -2992,7 +2997,10 @@ IEngineRowAllocator * CActivityBase::queryRowAllocator()
 {
     if (CABallocatorlock.lock()) {
         if (!rowAllocator)
-            rowAllocator.setown(getRowAllocator(queryRowMetaData()));
+        {
+            roxiemem::RoxieHeapFlags heapFlags = queryHeapFlags();
+            rowAllocator.setown(getRowAllocator(queryRowMetaData(), heapFlags));
+        }
         CABallocatorlock.unlock();
     }
     return rowAllocator;
@@ -3021,7 +3029,7 @@ IOutputRowDeserializer * CActivityBase::queryRowDeserializer()
 IThorRowInterfaces *CActivityBase::getRowInterfaces()
 {
     // create an independent instance, to avoid circular link dependency problems
-    return createThorRowInterfaces(queryRowManager(), queryRowMetaData(), container.queryId(), queryCodeContext());
+    return createThorRowInterfaces(queryRowManager(), queryRowMetaData(), container.queryId(), queryHeapFlags(), queryCodeContext());
 }
 
 IEngineRowAllocator *CActivityBase::getRowAllocator(IOutputMetaData * meta, roxiemem::RoxieHeapFlags flags) const

+ 5 - 0
thorlcr/graph/thgraph.hpp

@@ -507,6 +507,7 @@ class graph_decl CGraphBase : public CInterface, implements IEclGraphResults, im
         virtual unsigned logString(const char * text) const { return ctx->logString(text); }
         virtual const IContextLogger &queryContextLogger() const { return ctx->queryContextLogger(); }
         virtual IEngineRowAllocator * getRowAllocator(IOutputMetaData * meta, unsigned activityId) const { return ctx->getRowAllocator(meta, activityId); }
+        virtual IEngineRowAllocator * getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, unsigned heapFlags) const { return ctx->getRowAllocatorEx(meta, activityId, heapFlags); }
         virtual void getResultRowset(size32_t & tcount, byte * * & tgt, const char * name, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { ctx->getResultRowset(tcount, tgt, name, sequence, _rowAllocator, isGrouped, xmlTransformer, csvTransformer); }
         virtual void getResultDictionary(size32_t & tcount, byte * * & tgt,IEngineRowAllocator * _rowAllocator,  const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher) { ctx->getResultDictionary(tcount, tgt, _rowAllocator, name, sequence, xmlTransformer, csvTransformer, hasher); }
 
@@ -1028,6 +1029,8 @@ public:
     inline bool queryInitialized() const { return initialized; }
     inline void setInitialized(bool tf) { initialized = tf; }
     inline bool queryTimeActivities() const { return timeActivities; }
+    inline roxiemem::RoxieHeapFlags queryHeapFlags() const { return (roxiemem::RoxieHeapFlags)container.getOptInt("heapflags", 0); }
+
     void onStart(size32_t _parentExtractSz, const byte *_parentExtract) { parentExtractSz = _parentExtractSz; parentExtract = _parentExtract; }
     bool receiveMsg(ICommunicator &comm, CMessageBuffer &mb, const rank_t rank, const mptag_t mpTag, rank_t *sender=NULL, unsigned timeout=MP_WAIT_FOREVER);
     bool receiveMsg(CMessageBuffer &mb, const rank_t rank, const mptag_t mpTag, rank_t *sender=NULL, unsigned timeout=MP_WAIT_FOREVER);
@@ -1060,6 +1063,8 @@ public:
     void ActPrintLog(IException *e, const char *format, ...) __attribute__((format(printf, 3, 4)));
     void ActPrintLog(IException *e);
 
+    IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta);
+
 // IExceptionHandler
     bool fireException(IException *e);
     __declspec(noreturn) void processAndThrowOwnedException(IException * e) __attribute__((noreturn));

+ 4 - 0
thorlcr/graph/thgraphslave.cpp

@@ -1460,6 +1460,10 @@ public:
     {
         return sharedAllocator->getRowAllocator(meta, activityId);
     }
+    virtual IEngineRowAllocator *getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, unsigned heapFlags) const
+    {
+        return sharedAllocator->getRowAllocator(meta, activityId, (roxiemem::RoxieHeapFlags)heapFlags);
+    }
 };
 
 class CSlaveGraphTempHandler : public CGraphTempHandler

+ 2 - 1
thorlcr/msort/tsorta.cpp

@@ -60,7 +60,8 @@ CThorKeyArray::CThorKeyArray(
     keyserializer = NULL;
     if (_serializer) {
         keyserializer = _serializer;
-        keyif.setown(createThorRowInterfaces(rowif->queryRowManager(), keyserializer->queryRecordSize(), rowif->queryActivityId(), rowif->queryCodeContext()));
+        unsigned heapFlags = activity.queryHeapFlags();
+        keyif.setown(createThorRowInterfaces(rowif->queryRowManager(), keyserializer->queryRecordSize(), rowif->queryActivityId(), heapFlags, rowif->queryCodeContext()));
     }
     icompare = _icompare;
     ikeycompare = _ikeycompare?_ikeycompare:(_serializer?NULL:_icompare);

+ 2 - 1
thorlcr/msort/tsortm.cpp

@@ -394,7 +394,8 @@ public:
         keyserializer = _keyserializer;
         if (keyserializer)
         {
-            keyIf.setown(createThorRowInterfaces(rowif->queryRowManager(), keyserializer->queryRecordSize(), activity->queryContainer().queryId(), activity->queryCodeContext()));
+            unsigned heapFlags = 0;
+            keyIf.setown(createThorRowInterfaces(rowif->queryRowManager(), keyserializer->queryRecordSize(), activity->queryContainer().queryId(), heapFlags, activity->queryCodeContext()));
             icompare = keyserializer->queryCompareKey();
         }
         else

+ 1 - 1
thorlcr/msort/tsorts.cpp

@@ -1211,7 +1211,7 @@ public:
         rowCompare = _rowCompare;
         if (keyserializer)
         {
-            keyIf.setown(createThorRowInterfaces(rowif->queryRowManager(), keyserializer->queryRecordSize(), activity->queryContainer().queryId(), activity->queryCodeContext()));
+            keyIf.setown(createThorRowInterfaces(rowif->queryRowManager(), keyserializer->queryRecordSize(), activity->queryContainer().queryId(), activity->queryHeapFlags(), activity->queryCodeContext()));
             rowToKeySerializer.setown(new CRowToKeySerializer(auxrowif, keyIf, keyserializer));
             keyRowCompare = keyserializer->queryCompareKeyRow();
         }

+ 5 - 0
thorlcr/thorcodectx/thcodectx.cpp

@@ -114,6 +114,11 @@ IEngineRowAllocator * CThorCodeContextBase::getRowAllocator(IOutputMetaData * me
     return jobChannel.getRowAllocator(meta, activityId);
 }
 
+IEngineRowAllocator * CThorCodeContextBase::getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, unsigned heapFlags) const
+{
+    return jobChannel.getRowAllocator(meta, activityId, (roxiemem::RoxieHeapFlags)heapFlags);
+}
+
 const char * CThorCodeContextBase::cloneVString(const char * str) const
 {
     return jobChannel.queryRowManager()->cloneVString(str);

+ 1 - 0
thorlcr/thorcodectx/thcodectx.hpp

@@ -106,6 +106,7 @@ public:
     virtual char *getGroupName(); // thorlib.group()
     virtual char *queryIndexMetaData(char const * lfn, char const * xpath) { UNIMPLEMENTED; }
     virtual IEngineRowAllocator * getRowAllocator(IOutputMetaData * meta, unsigned activityId) const;
+    virtual IEngineRowAllocator * getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, unsigned heapFlags) const;
     virtual const char *cloneVString(const char *str) const;
     virtual const char *cloneVString(size32_t len, const char *str) const;
     virtual IEclGraphResults *resolveLocalQuery(__int64 gid);

+ 4 - 4
thorlcr/thorutil/thmem.cpp

@@ -2272,17 +2272,17 @@ ILargeMemLimitNotify *createMultiThorResourceMutex(const char *grpname,CSDSServe
 }
 
 
-IThorRowInterfaces *createThorRowInterfaces(roxiemem::IRowManager *rowManager, IOutputMetaData *meta, unsigned actId, ICodeContext *context)
+IThorRowInterfaces *createThorRowInterfaces(roxiemem::IRowManager *rowManager, IOutputMetaData *meta, unsigned actId, unsigned heapFlags, ICodeContext *context)
 {
     class CThorRowInterfaces : public CSimpleInterfaceOf<IThorRowInterfaces>
     {
         roxiemem::IRowManager *rowManager;
         Owned<IRowInterfaces> baseRowIf;
     public:
-        CThorRowInterfaces(roxiemem::IRowManager *_rowManager, IOutputMetaData *meta, unsigned actId, ICodeContext *context)
+        CThorRowInterfaces(roxiemem::IRowManager *_rowManager, IOutputMetaData *meta, unsigned actId, unsigned heapFlags, ICodeContext *context)
             : rowManager(_rowManager)
         {
-            baseRowIf.setown(createRowInterfaces(meta, actId, context));
+            baseRowIf.setown(createRowInterfaces(meta, actId, heapFlags, context));
         }
         virtual IEngineRowAllocator * queryRowAllocator() { return baseRowIf->queryRowAllocator(); }
         virtual IOutputRowSerializer * queryRowSerializer() { return baseRowIf->queryRowSerializer(); }
@@ -2292,7 +2292,7 @@ IThorRowInterfaces *createThorRowInterfaces(roxiemem::IRowManager *rowManager, I
         virtual ICodeContext *queryCodeContext() { return baseRowIf->queryCodeContext(); }
         virtual roxiemem::IRowManager *queryRowManager() const { return rowManager; }
     };
-    return new CThorRowInterfaces(rowManager, meta, actId, context);
+    return new CThorRowInterfaces(rowManager, meta, actId, heapFlags, context);
 };
 
 

+ 1 - 1
thorlcr/thorutil/thmem.hpp

@@ -85,7 +85,7 @@ interface IThorRowInterfaces : extends IRowInterfaces
     virtual roxiemem::IRowManager *queryRowManager() const = 0;
 };
 
-extern graph_decl IThorRowInterfaces *createThorRowInterfaces(roxiemem::IRowManager *rowManager, IOutputMetaData *meta, unsigned actId, ICodeContext *context);
+extern graph_decl IThorRowInterfaces *createThorRowInterfaces(roxiemem::IRowManager *rowManager, IOutputMetaData *meta, unsigned actId, unsigned heapFlags, ICodeContext *context);
 
 
 class OwnedConstThorRow 

+ 1 - 0
tools/wutool/wutool.cpp

@@ -1775,6 +1775,7 @@ protected:
             // Memory management
 
             virtual IEngineRowAllocator * getRowAllocator(IOutputMetaData * meta, unsigned activityId) const { throwUnexpected(); }
+            virtual IEngineRowAllocator * getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, unsigned heapFlags) const { throwUnexpected(); }
             virtual const char * cloneVString(const char *str) const { throwUnexpected(); }
             virtual const char * cloneVString(size32_t len, const char *str) const { throwUnexpected(); }