فهرست منبع

HPCC-14409 Introduce a row manager per slave channel.

Avoids heavy (CPU bound) contention inside single RM,
and voids slave channels starving each other of memory

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 9 سال پیش
والد
کامیت
020d96a665

+ 1 - 1
thorlcr/activities/fetch/thfetchslave.cpp

@@ -346,7 +346,7 @@ public:
         {
             IOutputMetaData *keyRowMeta = QUERYINTERFACE(fetchBaseHelper->queryExtractedSize(), IOutputMetaData);
             assertex(keyRowMeta);
-            keyRowAllocator.setown(queryJob().getRowAllocator(keyRowMeta, queryActivityId()));
+            keyRowAllocator.setown(queryJobChannel().getRowAllocator(keyRowMeta, queryActivityId()));
         }
         appendOutputLinked(this);
     }

+ 12 - 12
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -1004,7 +1004,7 @@ public:
         pullBufferSize = DISTRIBUTE_PULL_BUFFER_SIZE;
         selfstopped = false;
         pull = false;
-        rowManager = activity->queryJob().queryRowManager();
+        rowManager = activity->queryJobChannel().queryRowManager();
 
         StringBuffer compType;
         activity->getOpt(THOROPT_HDIST_COMP, compType);
@@ -2855,7 +2855,7 @@ public:
         iCompare = helper->queryCompare();
 
         // JCSMORE - really should ask / lookup what flags the allocator created for extractKey has...
-        allocFlags = queryJob().queryThorAllocator()->queryFlags();
+        allocFlags = queryJobChannel().queryThorAllocator()->queryFlags();
 
         // JCSMORE - it may not be worth extracting the key,
         // if there's an upstream activity that holds onto rows for periods of time (e.g. sort)
@@ -2867,7 +2867,7 @@ public:
             isVariable = km->isVariableSize();
             if (!isVariable && helper->queryOutputMeta()->isFixedSize())
             {
-                roxiemem::IRowManager *rM = queryJob().queryRowManager();
+                roxiemem::IRowManager *rM = queryJobChannel().queryRowManager();
                 memsize_t keySize = rM->getExpectedCapacity(km->getMinRecordSize(), allocFlags);
                 memsize_t rowSize = rM->getExpectedCapacity(helper->queryOutputMeta()->getMinRecordSize(), allocFlags);
                 if (keySize >= rowSize)
@@ -3023,7 +3023,7 @@ CHashTableRowTable::CHashTableRowTable(HashDedupSlaveActivityBase &_activity, IR
 void CHashTableRowTable::init(rowidx_t sz)
 {
     // reinitialize if need bigger or if requested size is much smaller than existing
-    rowidx_t newMaxRows = activity.queryJob().queryRowManager()->getExpectedCapacity(sz * sizeof(rowidx_t *), activity.allocFlags) / sizeof(rowidx_t *);
+    rowidx_t newMaxRows = activity.queryJobChannel().queryRowManager()->getExpectedCapacity(sz * sizeof(rowidx_t *), activity.allocFlags) / sizeof(rowidx_t *);
     if (newMaxRows <= maxRows && ((maxRows-newMaxRows) <= HASHDEDUP_HT_INC_SIZE))
         return;
     ReleaseThorRow(rows);
@@ -3087,7 +3087,7 @@ CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IRowInterfaces *_rowIf, IRo
      */
     if (extractKey)
     {
-        _keyAllocator.setown(owner.queryJob().getRowAllocator(keyIf->queryRowMetaData(), owner.queryActivityId(), owner.allocFlags));
+        _keyAllocator.setown(owner.queryJobChannel().getRowAllocator(keyIf->queryRowMetaData(), owner.queryActivityId(), owner.allocFlags));
         keyAllocator = _keyAllocator;
     }
     else
@@ -3277,16 +3277,16 @@ CBucketHandler::CBucketHandler(HashDedupSlaveActivityBase &_owner, IRowInterface
 
 CBucketHandler::~CBucketHandler()
 {
-    owner.queryJob().queryRowManager()->removeRowBuffer(this);
-    owner.queryJob().queryRowManager()->removeRowBuffer(&postSpillFlush);
+    owner.queryJobChannel().queryRowManager()->removeRowBuffer(this);
+    owner.queryJobChannel().queryRowManager()->removeRowBuffer(&postSpillFlush);
     for (unsigned i=0; i<numBuckets; i++)
         ::Release(buckets[i]);
 }
 
 void CBucketHandler::flushBuckets()
 {
-    owner.queryJob().queryRowManager()->removeRowBuffer(this);
-    owner.queryJob().queryRowManager()->removeRowBuffer(&postSpillFlush);
+    owner.queryJobChannel().queryRowManager()->removeRowBuffer(this);
+    owner.queryJobChannel().queryRowManager()->removeRowBuffer(&postSpillFlush);
     for (unsigned i=0; i<numBuckets; i++)
     {
         CBucket &bucket = *buckets[i];
@@ -3335,7 +3335,7 @@ unsigned CBucketHandler::getBucketEstimate(rowcount_t totalRows) const
         // likely to be way off for variable
 
         // JCSMORE - will need to change based on whether upstream keeps packed or not.
-        roxiemem::IRowManager *rM = owner.queryJob().queryRowManager();
+        roxiemem::IRowManager *rM = owner.queryJobChannel().queryRowManager();
 
         memsize_t availMem = roxiemem::getTotalMemoryLimit()-0x500000;
         memsize_t initKeySize = rM->getExpectedCapacity(keyIf->queryRowMetaData()->getMinRecordSize(), owner.allocFlags);
@@ -3382,9 +3382,9 @@ void CBucketHandler::init(unsigned _numBuckets, IRowStream *keyStream)
         htRows.setOwner(buckets[i]);
     }
     ActPrintLog(&owner, "Max %d buckets, current depth = %d", numBuckets, depth+1);
-    owner.queryJob().queryRowManager()->addRowBuffer(this);
+    owner.queryJobChannel().queryRowManager()->addRowBuffer(this);
     // postSpillFlush not needed until after 1 spill event, but not safe to add within callback
-    owner.queryJob().queryRowManager()->addRowBuffer(&postSpillFlush);
+    owner.queryJobChannel().queryRowManager()->addRowBuffer(&postSpillFlush);
     if (keyStream)
     {
         loop

+ 1 - 1
thorlcr/activities/indexwrite/thindexwriteslave.cpp

@@ -306,7 +306,7 @@ public:
 
         ThorDataLinkMetaInfo info;
         inputs.item(0)->getMetaInfo(info);
-        outRowAllocator.setown(queryJob().getRowAllocator(helper->queryDiskRecordSize(), container.queryId()));
+        outRowAllocator.setown(queryJobChannel().getRowAllocator(helper->queryDiskRecordSize(), container.queryId()));
         if (refactor)
         {
             assertex(isLocal);

+ 1 - 1
thorlcr/activities/iterate/thiterateslave.cpp

@@ -208,7 +208,7 @@ public:
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
         helper = static_cast <IHThorProcessArg *> (queryHelper());
-        rightRowAllocator.setown(queryJob().getRowAllocator(helper->queryRightRecordSize(),queryActivityId()));
+        rightRowAllocator.setown(queryJobChannel().getRowAllocator(helper->queryRightRecordSize(),queryActivityId()));
         IterateSlaveActivityBase::init(data,slaveData);
     }
 

+ 2 - 2
thorlcr/activities/join/thjoinslave.cpp

@@ -604,8 +604,8 @@ public:
     CMergeJoinSlaveBaseActivity(CGraphElementBase *container, CMergeJoinProcessor &_processor) : CThorNarySlaveActivity(container), CThorDataLink(this), CThorSteppable(this), processor(_processor)
     {
         helper = (IHThorNWayMergeJoinArg *)queryHelper();
-        inputAllocator.setown(queryJob().getRowAllocator(helper->queryInputMeta(), queryActivityId()));
-        outputAllocator.setown(queryJob().getRowAllocator(helper->queryOutputMeta(), queryActivityId()));
+        inputAllocator.setown(queryJobChannel().getRowAllocator(helper->queryInputMeta(), queryActivityId()));
+        outputAllocator.setown(queryJobChannel().getRowAllocator(helper->queryOutputMeta(), queryActivityId()));
     }
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {

+ 5 - 5
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -1832,7 +1832,7 @@ public:
         node = queryJobChannel().queryMyRank()-1;
         onFailTransform = (0 != (joinFlags & JFonfail)) && (0 == (joinFlags & JFmatchAbortLimitSkips));
 
-        joinFieldsAllocator.setown(queryJob().getRowAllocator(helper->queryJoinFieldsRecordSize(), queryActivityId()));
+        joinFieldsAllocator.setown(queryJobChannel().getRowAllocator(helper->queryJoinFieldsRecordSize(), queryActivityId()));
         if (onFailTransform || (joinFlags & JFleftouter))
         {
             RtlDynamicRowBuilder rr(joinFieldsAllocator);
@@ -1970,7 +1970,7 @@ public:
                 Owned<IOutputMetaData> fetchInputMeta;
                 if (0 != helper->queryFetchInputRecordSize()->getMinRecordSize())
                 {
-                    fetchInputAllocator.setown(queryJob().getRowAllocator(helper->queryFetchInputRecordSize(), queryActivityId()));
+                    fetchInputAllocator.setown(queryJobChannel().getRowAllocator(helper->queryFetchInputRecordSize(), queryActivityId()));
                     fetchInputMeta.setown(createOutputMetaDataWithChildRow(fetchInputAllocator, FETCHKEY_HEADER_SIZE));
                 }
                 else
@@ -2004,15 +2004,15 @@ public:
         if (needsDiskRead)
         {
             Owned<IOutputMetaData> meta = createFixedSizeMetaData(KEYLOOKUP_HEADER_SIZE);
-            keyLookupAllocator.setown(queryJob().getRowAllocator(meta, queryActivityId()));
+            keyLookupAllocator.setown(queryJobChannel().getRowAllocator(meta, queryActivityId()));
         }
         else
         {
             Owned<IOutputMetaData> meta = createOutputMetaDataWithChildRow(joinFieldsAllocator, KEYLOOKUP_HEADER_SIZE);
-            keyLookupAllocator.setown(queryJob().getRowAllocator(meta, queryActivityId()));
+            keyLookupAllocator.setown(queryJobChannel().getRowAllocator(meta, queryActivityId()));
         }
 
-        indexInputAllocator.setown(queryJob().getRowAllocator(helper->queryIndexReadInputRecordSize(), queryActivityId()));
+        indexInputAllocator.setown(queryJobChannel().getRowAllocator(helper->queryIndexReadInputRecordSize(), queryActivityId()));
 
         ////////////////////
 

+ 4 - 4
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -582,7 +582,7 @@ public:
         else
         {
             size32_t bitSetMemSz = getBitSetMemoryRequirement(rowCount);
-            void *pBitSetMem = activity.queryJob().queryRowManager()->allocate(bitSetMemSz, activity.queryContainer().queryId(), SPILL_PRIORITY_LOW);
+            void *pBitSetMem = activity.queryJobChannel().queryRowManager()->allocate(bitSetMemSz, activity.queryContainer().queryId(), SPILL_PRIORITY_LOW);
             if (!pBitSetMem)
                 return false;
 
@@ -1645,7 +1645,7 @@ protected:
                  * Need to remove spill callback and broadcast one last message to know.
                  */
 
-                queryJob().queryRowManager()->removeRowBuffer(this);
+                queryJobChannel().queryRowManager()->removeRowBuffer(this);
 
                 ActPrintLog("Broadcasting final split status");
                 broadcaster.reset();
@@ -2053,7 +2053,7 @@ public:
                 overflowWriteCount = 0;
                 overflowWriteFile.clear();
                 overflowWriteStream.clear();
-                queryJob().queryRowManager()->addRowBuffer(this);
+                queryJobChannel().queryRowManager()->addRowBuffer(this);
             }
         }
         else
@@ -2221,7 +2221,7 @@ public:
         memsize_t sz = (memsize_t)_sz;
         if (sz != _sz) // treat as OOM exception for handling purposes.
             throw MakeStringException(ROXIEMM_MEMORY_LIMIT_EXCEEDED, "Unsigned overflow, trying to allocate hash table of size: %" I64F "d ", _sz);
-        void *ht = activity->queryJob().queryRowManager()->allocate(sz, activity->queryContainer().queryId(), SPILL_PRIORITY_LOW);
+        void *ht = activity->queryJobChannel().queryRowManager()->allocate(sz, activity->queryContainer().queryId(), SPILL_PRIORITY_LOW);
         memset(ht, 0, sz);
         htMemory.setown(ht);
         htSize = size;

+ 2 - 4
thorlcr/activities/msort/thsortu.cpp

@@ -1682,9 +1682,8 @@ class CMultiCoreJoinHelper: public CMultiCoreJoinHelperBase
         {
             PROGLOG("CMultiCoreJoinHelper::cWorker started");
 
-            CJobBase &job = parent->activity.queryJob();
             Owned<IRowInterfaces> rowIf = parent->activity.getRowInterfaces();
-            Owned<IEngineRowAllocator> allocator = job.getRowAllocator(rowIf->queryRowMetaData(), parent->activity.queryActivityId(), (roxiemem::RoxieHeapFlags)(roxiemem::RHFpacked|roxiemem::RHFunique));
+            Owned<IEngineRowAllocator> allocator = parent->activity.queryJobChannel().getRowAllocator(rowIf->queryRowMetaData(), parent->activity.queryActivityId(), (roxiemem::RoxieHeapFlags)(roxiemem::RHFpacked|roxiemem::RHFunique));
 
             IRowWriter *rowWriter = rowStream->queryWriter();
             loop
@@ -1882,9 +1881,8 @@ class CMultiCoreUnorderedJoinHelper: public CMultiCoreJoinHelperBase
         }
         int run()
         {
-            CJobBase &job = parent->activity.queryJob();
             Owned<IRowInterfaces> rowIf = parent->activity.getRowInterfaces();
-            Owned<IEngineRowAllocator> allocator = job.getRowAllocator(rowIf->queryRowMetaData(), parent->activity.queryActivityId(), (roxiemem::RoxieHeapFlags)(roxiemem::RHFpacked|roxiemem::RHFunique));
+            Owned<IEngineRowAllocator> allocator = parent->activity.queryJobChannel().getRowAllocator(rowIf->queryRowMetaData(), parent->activity.queryActivityId(), (roxiemem::RoxieHeapFlags)(roxiemem::RHFpacked|roxiemem::RHFunique));
 
             Owned<IRowWriter> rowWriter = parent->multiWriter->getWriter();
             PROGLOG("CMulticoreUnorderedJoinHelper::cWorker started");

+ 21 - 17
thorlcr/graph/thgraph.cpp

@@ -2474,12 +2474,10 @@ void CJobBase::init()
     pausing = false;
     resumed = false;
 
-    bool crcChecking = 0 != getWorkUnitValueInt("THOR_ROWCRC", globals->getPropBool("@THOR_ROWCRC", false));
-    bool usePackedAllocator = 0 != getWorkUnitValueInt("THOR_PACKEDALLOCATOR", globals->getPropBool("@THOR_PACKEDALLOCATOR", true));
-    unsigned memorySpillAt = (unsigned)getWorkUnitValueInt("memorySpillAt", globals->getPropInt("@memorySpillAt", 80));
-    thorAllocator.setown(createThorAllocator(((memsize_t)globalMemorySize)*0x100000, memorySpillAt, *logctx, crcChecking, usePackedAllocator));
+    crcChecking = 0 != getWorkUnitValueInt("THOR_ROWCRC", globals->getPropBool("@THOR_ROWCRC", false));
+    usePackedAllocator = 0 != getWorkUnitValueInt("THOR_PACKEDALLOCATOR", globals->getPropBool("@THOR_PACKEDALLOCATOR", true));
+    memorySpillAt = (unsigned)getWorkUnitValueInt("memorySpillAt", globals->getPropInt("@memorySpillAt", 80));
 
-    unsigned defaultMemMB = globalMemorySize*3/4;
     PROGLOG("Global memory size = %d MB, memory spill at = %d%%", globalMemorySize, memorySpillAt);
     StringBuffer tracing("maxActivityCores = ");
     if (maxActivityCores)
@@ -2498,10 +2496,6 @@ void CJobBase::beforeDispose()
 
 CJobBase::~CJobBase()
 {
-    thorAllocator->queryRowManager()->reportMemoryUsage(false);
-    PROGLOG("CJobBase resetting memory manager");
-    thorAllocator.clear();
-
     ::Release(userDesc);
     ::Release(pluginMap);
 
@@ -2672,14 +2666,9 @@ __int64 CJobBase::getOptInt64(const char *opt, __int64 dft)
     return getWorkUnitValueInt(opt, globals->getPropInt64(gOpt, dft));
 }
 
-IEngineRowAllocator *CJobBase::getRowAllocator(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
+IThorAllocator *CJobBase::createThorAllocator()
 {
-    return thorAllocator->getRowAllocator(meta, activityId, flags);
-}
-
-roxiemem::IRowManager *CJobBase::queryRowManager() const
-{
-    return thorAllocator->queryRowManager();
+    return ::createThorAllocator(((memsize_t)globalMemorySize)*0x100000, memorySpillAt, *logctx, crcChecking, usePackedAllocator);
 }
 
 /// CJobChannel
@@ -2689,6 +2678,7 @@ CJobChannel::CJobChannel(CJobBase &_job, IMPServer *_mpServer, unsigned _channel
 {
     aborted = false;
     codeCtx = NULL;
+    thorAllocator.setown(job.createThorAllocator());
     timeReporter = createStdTimeReporter();
     jobComm.setown(mpServer->createCommunicator(&job.queryJobGroup()));
     myrank = job.queryJobGroup().rank(queryMyNode());
@@ -2697,6 +2687,9 @@ CJobChannel::CJobChannel(CJobBase &_job, IMPServer *_mpServer, unsigned _channel
 
 CJobChannel::~CJobChannel()
 {
+    thorAllocator->queryRowManager()->reportMemoryUsage(false);
+    PROGLOG("CJobBase resetting memory manager");
+    thorAllocator.clear();
     wait();
     clean();
     ::Release(codeCtx);
@@ -2731,6 +2724,17 @@ mptag_t CJobChannel::deserializeMPTag(MemoryBuffer &mb)
     return tag;
 }
 
+IEngineRowAllocator *CJobChannel::getRowAllocator(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
+{
+    return thorAllocator->getRowAllocator(meta, activityId, flags);
+}
+
+roxiemem::IRowManager *CJobChannel::queryRowManager() const
+{
+    return thorAllocator->queryRowManager();
+}
+
+
 static void noteDependency(CGraphElementBase *targetActivity, CGraphElementBase *sourceActivity, CGraphBase *targetGraph, CGraphBase *sourceGraph, unsigned controlId)
 {
     targetActivity->addDependsOn(sourceGraph, controlId);
@@ -3044,7 +3048,7 @@ IEngineRowAllocator * CActivityBase::queryRowAllocator()
 {
     if (CABallocatorlock.lock()) {
         if (!rowAllocator)
-            rowAllocator.setown(queryJob().getRowAllocator(queryRowMetaData(),queryActivityId()));
+            rowAllocator.setown(queryJobChannel().getRowAllocator(queryRowMetaData(),queryActivityId()));
         CABallocatorlock.unlock();
     }
     return rowAllocator;

+ 8 - 4
thorlcr/graph/thgraph.hpp

@@ -728,13 +728,11 @@ public:
 
 interface ILoadedDllEntry;
 interface IConstWorkUnit;
-interface IThorAllocator;
 class CThorCodeContextBase;
 
 class graph_decl CJobBase : public CInterface, implements IDiskUsage, implements IExceptionHandler
 {
 protected:
-    Owned<IThorAllocator> thorAllocator;
     CriticalSection crit;
     Owned<ILoadedDllEntry> querySo;
     IUserDescriptor *userDesc;
@@ -757,6 +755,10 @@ protected:
     Owned<IPerfMonHook> perfmonhook;
     size32_t oldNodeCacheMem;
     CIArrayOf<CJobChannel> jobChannels;
+    bool crcChecking;
+    bool usePackedAllocator;
+    unsigned memorySpillAt;
+
 
     class CThorPluginCtx : public SimplePluginCtx
     {
@@ -808,9 +810,7 @@ public:
     void addDependencies(IPropertyTree *xgmml, bool failIfMissing=true);
     void addSubGraph(IPropertyTree &xgmml);
 
-    IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags=roxiemem::RHFnone) const;
     roxiemem::IRowManager *queryRowManager() const;
-    IThorAllocator *queryThorAllocator() const { return thorAllocator; }
     bool queryUseCheckpoints() const;
     bool queryPausing() const { return pausing; }
     bool queryResumed() const { return resumed; }
@@ -845,6 +845,7 @@ public:
     unsigned getOptUInt(const char *opt, unsigned dft=0) { return (unsigned)getOptInt(opt, dft); }
     __int64 getOptInt64(const char *opt, __int64 dft=0);
     unsigned __int64 getOptUInt64(const char *opt, unsigned __int64 dft=0) { return (unsigned __int64)getOptInt64(opt, dft); }
+    virtual IThorAllocator *createThorAllocator();
 
     virtual void abort(IException *e);
 
@@ -872,6 +873,7 @@ interface IGraphExecutor : extends IInterface
     virtual void wait() = 0;
 };
 
+interface IThorAllocator;
 class graph_decl CJobChannel : public CInterface, implements IGraphCallback, implements IExceptionHandler
 {
 protected:
@@ -948,6 +950,8 @@ public:
     IMPServer &queryMPServer() const { return *mpServer; }
     const rank_t &queryMyRank() const { return myrank; }
     mptag_t deserializeMPTag(MemoryBuffer &mb);
+    IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags=roxiemem::RHFnone) const;
+    roxiemem::IRowManager *queryRowManager() const;
 
     virtual void abort(IException *e);
     virtual IBarrier *createBarrier(mptag_t tag) { UNIMPLEMENTED; return NULL; }

+ 11 - 0
thorlcr/graph/thgraphslave.cpp

@@ -1108,6 +1108,13 @@ CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, co
 #endif
     querySo.setown(createDllEntry(_querySo, false, NULL));
     tmpHandler.setown(createTempHandler(true));
+    if (globals->getPropBool("@processPerSlave", true))
+        channelMemorySize = globalMemorySize;
+    else
+    {
+        unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
+        channelMemorySize = globalMemorySize/slavesPerNode;
+    }
 }
 
 void CJobSlave::addChannel(IMPServer *mpServer)
@@ -1169,6 +1176,10 @@ mptag_t CJobSlave::deserializeMPTag(MemoryBuffer &mb)
     return tag;
 }
 
+IThorAllocator *CJobSlave::createThorAllocator()
+{
+    return ::createThorAllocator(((memsize_t)channelMemorySize)*0x100000, memorySpillAt, *logctx, crcChecking, usePackedAllocator);
+}
 
 // IGraphCallback
 CJobSlaveChannel::CJobSlaveChannel(CJobBase &_job, IMPServer *mpServer, unsigned channel) : CJobChannel(_job, mpServer, channel)

+ 2 - 0
thorlcr/graph/thgraphslave.hpp

@@ -140,6 +140,7 @@ class graphslave_decl CJobSlave : public CJobBase
     ISlaveWatchdog *watchdog;
     Owned<IPropertyTree> workUnitInfo;
     size32_t oldNodeCacheMem;
+    unsigned channelMemorySize;
 
 public:
     IMPLEMENT_IINTERFACE;
@@ -157,6 +158,7 @@ public:
     virtual __int64 getWorkUnitValueInt(const char *prop, __int64 defVal) const;
     virtual StringBuffer &getWorkUnitValue(const char *prop, StringBuffer &str) const;
     virtual bool getWorkUnitValueBool(const char *prop, bool defVal) const;
+    virtual IThorAllocator *createThorAllocator();
 // IExceptionHandler
     virtual bool fireException(IException *e)
     {

+ 8 - 0
thorlcr/master/thmastermain.cpp

@@ -648,6 +648,14 @@ int main( int argc, char *argv[]  )
             }
             if (0 == mmemSize)
                 mmemSize = gmemSize;
+            if (!processPerSlave)
+            {
+                /* Preserving previous semantics, if globalMemorySize defined, it defined how much per slave.
+                 * So if N virtual slaves, the total memory needs to be globalMemorySize * slavesPerNode
+                 * The slave process will give each slave channel row manager a split of the total
+                 */
+                globals->setPropInt("@globalMemorySize", gmemSize * slavesPerNode);
+            }
         }
         bool gmemAllowHugePages = globals->getPropBool("@heapUseHugePages", false);
         gmemAllowHugePages = globals->getPropBool("@heapMasterUseHugePages", gmemAllowHugePages);

+ 5 - 2
thorlcr/slave/slavmain.cpp

@@ -178,8 +178,11 @@ public:
     }
     ~CJobListener()
     {
-        for (unsigned sc=1; sc<slavesPerNode; sc++)
-            mpServers.item(sc).stop();
+        if (!processPerSlave)
+        {
+            for (unsigned sc=1; sc<slavesPerNode; sc++)
+                mpServers.item(sc).stop();
+        }
         mpServers.kill();
         stop();
     }

+ 3 - 3
thorlcr/thorcodectx/thcodectx.cpp

@@ -110,17 +110,17 @@ char *CThorCodeContextBase::getExpandLogicalName(const char * logicalName)
 
 IEngineRowAllocator * CThorCodeContextBase::getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
 { 
-    return jobChannel.queryJob().getRowAllocator(meta, activityId);
+    return jobChannel.getRowAllocator(meta, activityId);
 }
 
 const char * CThorCodeContextBase::cloneVString(const char * str) const
 {
-    return jobChannel.queryJob().queryRowManager()->cloneVString(str);
+    return jobChannel.queryRowManager()->cloneVString(str);
 }
 
 const char * CThorCodeContextBase::cloneVString(size32_t len, const char * str) const
 {
-    return jobChannel.queryJob().queryRowManager()->cloneVString(len, str);
+    return jobChannel.queryRowManager()->cloneVString(len, str);
 }
 
 IEclGraphResults *CThorCodeContextBase::resolveLocalQuery(__int64 gid)

+ 1 - 1
thorlcr/thorutil/thbuf.cpp

@@ -1742,7 +1742,7 @@ public:
         if (readGranularity > limit)
             readGranularity = limit; // readGranularity must be <= limit;
         numWriters = 0;
-        roxiemem::IRowManager *rowManager = activity.queryJob().queryRowManager();
+        roxiemem::IRowManager *rowManager = activity.queryJobChannel().queryRowManager();
         readRows = static_cast<const void * *>(rowManager->allocate(readGranularity * sizeof(void*), activity.queryContainer().queryId()));
         eos = eow = readerBlocked = false;
         rowPos = rowsToRead = 0;

+ 9 - 10
thorlcr/thorutil/thmem.cpp

@@ -191,7 +191,7 @@ protected:
         if (!mmRegistered)
         {
             mmRegistered = true;
-            activity.queryJob().queryRowManager()->addRowBuffer(this);
+            activity.queryJobChannel().queryRowManager()->addRowBuffer(this);
         }
     }
     inline void clearSpillingCallback()
@@ -199,7 +199,7 @@ protected:
         if (mmRegistered)
         {
             mmRegistered = false;
-            activity.queryJob().queryRowManager()->removeRowBuffer(this);
+            activity.queryJobChannel().queryRowManager()->removeRowBuffer(this);
         }
     }
 public:
@@ -359,7 +359,7 @@ public:
         granularity = 500; // JCSMORE - rows
 
         // a small amount of rows to read from swappable rows
-        roxiemem::IRowManager *rowManager = activity.queryJob().queryRowManager();
+        roxiemem::IRowManager *rowManager = activity.queryJobChannel().queryRowManager();
         readRows = static_cast<const void * *>(rowManager->allocate(granularity * sizeof(void*), activity.queryContainer().queryId(), inRows.queryDefaultMaxSpillCost()));
         addSpillingCallback();
     }
@@ -614,7 +614,7 @@ CThorExpandingRowArray::CThorExpandingRowArray(CActivityBase &_activity, IRowInt
     rows = NULL;
     maxRows = 0;
     numRows = 0;
-    rowManager = activity.queryJob().queryRowManager();
+    rowManager = activity.queryJobChannel().queryRowManager();
     throwOnOom = false;
     setup(_rowIf, _allowNulls, _stableSort, _throwOnOom);
     setDefaultMaxSpillCost(roxiemem::SpillAllCost);
@@ -1044,7 +1044,7 @@ offset_t CThorExpandingRowArray::serializedSize()
 
 memsize_t CThorExpandingRowArray::getMemUsage()
 {
-    roxiemem::IRowManager *rM = activity.queryJob().queryRowManager();
+    roxiemem::IRowManager *rM = activity.queryJobChannel().queryRowManager();
     IOutputMetaData *meta = rowIf->queryRowMetaData();
     IOutputMetaData *diskMeta = meta->querySerializedDiskMeta(); // GH->JCS - really I want a internalMeta here.
     rowidx_t c = ordinality();
@@ -1663,7 +1663,7 @@ protected:
     {
         if (mmRegistered)
         {
-            activity.queryJob().queryRowManager()->removeRowBuffer(this);
+            activity.queryJobChannel().queryRowManager()->removeRowBuffer(this);
             mmRegistered = false;
         }
     }
@@ -1671,7 +1671,7 @@ protected:
     {
         if (!mmRegistered && spillingEnabled())
         {
-            activity.queryJob().queryRowManager()->addRowBuffer(this);
+            activity.queryJobChannel().queryRowManager()->addRowBuffer(this);
             mmRegistered = true;
         }
     }
@@ -1746,7 +1746,7 @@ public:
         if (mmRegistered && !spillingEnabled())
         {
             mmRegistered = false;
-            activity.queryJob().queryRowManager()->removeRowBuffer(this);
+            activity.queryJobChannel().queryRowManager()->removeRowBuffer(this);
         }
         spillableRows.setup(rowIf, false, stableSort);
     }
@@ -2161,8 +2161,7 @@ public:
 
 IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &logctx, bool crcChecking, bool usePacked)
 {
-    PROGLOG("CRC allocator %s", crcChecking?"ON":"OFF");
-    PROGLOG("Packed allocator %s", usePacked?"ON":"OFF");
+    PROGLOG("Thor allocator: Size=%d (MB), CRC=%s, Packed=%s", (unsigned)(memSize/0x100000), crcChecking?"ON":"OFF", usePacked?"ON":"OFF");
     roxiemem::RoxieHeapFlags flags;
     if (usePacked)
         flags = roxiemem::RHFpacked;