浏览代码

HPCC-14410 Introduce row manager shared amongst slave channels

Commit2: add shared allocator/slave allocator access methods/
implementation and configuration

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 9 年之前
父节点
当前提交
ead375bcd1

+ 1 - 1
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -2853,7 +2853,7 @@ public:
         iCompare = helper->queryCompare();
 
         // JCSMORE - really should ask / lookup what flags the allocator created for extractKey has...
-        allocFlags = queryJobChannel().queryThorAllocator().queryFlags();
+        allocFlags = queryJobChannel().queryThorAllocator()->queryFlags();
 
         // JCSMORE - it may not be worth extracting the key,
         // if there's an upstream activity that holds onto rows for periods of time (e.g. sort)

+ 16 - 8
thorlcr/graph/thgraph.cpp

@@ -2473,7 +2473,8 @@ CJobBase::CJobBase(ILoadedDllEntry *_querySo, const char *_graphName) : querySo(
     dirty = true;
     aborted = false;
     mpJobTag = TAG_NULL;
-    globalMemorySize = globals->getPropInt("@globalMemorySize"); // in MB
+    globalMemoryMB = globals->getPropInt("@globalMemorySize"); // in MB
+    numChannels = globals->getPropInt("@channelsPerSlave", 1);
     oldNodeCacheMem = 0;
     pluginMap = new SafePluginMap(&pluginCtx, true);
 
@@ -2523,9 +2524,11 @@ void CJobBase::init()
 
     crcChecking = 0 != getWorkUnitValueInt("THOR_ROWCRC", globals->getPropBool("@THOR_ROWCRC", false));
     usePackedAllocator = 0 != getWorkUnitValueInt("THOR_PACKEDALLOCATOR", globals->getPropBool("@THOR_PACKEDALLOCATOR", true));
-    memorySpillAt = (unsigned)getWorkUnitValueInt("memorySpillAt", globals->getPropInt("@memorySpillAt", 80));
+    memorySpillAtPercentage = (unsigned)getWorkUnitValueInt("memorySpillAt", globals->getPropInt("@memorySpillAt", 80));
+    sharedMemoryLimitPercentage = (unsigned)getWorkUnitValueInt("globalMemoryLimitPC", globals->getPropInt("@sharedMemoryLimit", 90));
+    sharedMemoryMB = globalMemoryMB*sharedMemoryLimitPercentage/100;
 
-    PROGLOG("Global memory size = %d MB, memory spill at = %d%%", globalMemorySize, memorySpillAt);
+    PROGLOG("Global memory size = %d MB, shared memory = %d%%, memory spill at = %d%%", globalMemoryMB, sharedMemoryLimitPercentage, memorySpillAtPercentage);
     StringBuffer tracing("maxActivityCores = ");
     if (maxActivityCores)
         tracing.append(maxActivityCores);
@@ -2543,6 +2546,7 @@ void CJobBase::beforeDispose()
 
 CJobBase::~CJobBase()
 {
+    jobChannels.kill(); // avoiding circular references. Kill before other CJobBase components are destroyed that channels reference.
     ::Release(userDesc);
     ::Release(pluginMap);
 
@@ -2713,9 +2717,9 @@ __int64 CJobBase::getOptInt64(const char *opt, __int64 dft)
     return getWorkUnitValueInt(opt, globals->getPropInt64(gOpt, dft));
 }
 
-IThorAllocator *CJobBase::createThorAllocator()
+IThorAllocator *CJobBase::getThorAllocator(unsigned channel)
 {
-    return ::createThorAllocator(((memsize_t)globalMemorySize)*0x100000, memorySpillAt, *logctx, crcChecking, usePackedAllocator);
+    return sharedAllocator.getLink();
 }
 
 /// CJobChannel
@@ -2724,8 +2728,7 @@ CJobChannel::CJobChannel(CJobBase &_job, IMPServer *_mpServer, unsigned _channel
     : job(_job), mpServer(_mpServer), channel(_channel)
 {
     aborted = false;
-    codeCtx = NULL;
-    thorAllocator.setown(job.createThorAllocator());
+    thorAllocator.setown(job.getThorAllocator(channel));
     timeReporter = createStdTimeReporter();
     jobComm.setown(mpServer->createCommunicator(&job.queryJobGroup()));
     myrank = job.queryJobGroup().rank(queryMyNode());
@@ -2739,7 +2742,7 @@ CJobChannel::~CJobChannel()
     thorAllocator.clear();
     wait();
     clean();
-    ::Release(codeCtx);
+    codeCtx.clear();
     timeReporter->Release();
 }
 
@@ -2759,6 +2762,11 @@ ICodeContext &CJobChannel::queryCodeContext() const
     return *codeCtx;
 }
 
+ICodeContext &CJobChannel::querySharedMemCodeContext() const
+{
+    return *sharedMemCodeCtx;
+}
+
 mptag_t CJobChannel::deserializeMPTag(MemoryBuffer &mb)
 {
     mptag_t tag;

+ 11 - 5
thorlcr/graph/thgraph.hpp

@@ -758,7 +758,8 @@ protected:
     Owned<IPropertyTree> xgmml;
     Owned<IGraphTempHandler> tmpHandler;
     bool timeActivities;
-    unsigned maxActivityCores, globalMemorySize;
+    unsigned numChannels;
+    unsigned maxActivityCores, globalMemoryMB, sharedMemoryMB;
     unsigned forceLogGraphIdMin, forceLogGraphIdMax;
     Owned<IContextLogger> logctx;
     Owned<IPerfMonHook> perfmonhook;
@@ -768,9 +769,11 @@ protected:
     OwnedMalloc<unsigned> jobSlaveChannelNum;
     bool crcChecking;
     bool usePackedAllocator;
-    unsigned memorySpillAt;
     rank_t myNodeRank;
     Owned<IPropertyTree> graphXGMML;
+    unsigned memorySpillAtPercentage, sharedMemoryLimitPercentage;
+    CriticalSection sharedAllocatorCrit;
+    Owned<IThorAllocator> sharedAllocator;
 
     class CThorPluginCtx : public SimplePluginCtx
     {
@@ -860,7 +863,8 @@ public:
     unsigned getOptUInt(const char *opt, unsigned dft=0) { return (unsigned)getOptInt(opt, dft); }
     __int64 getOptInt64(const char *opt, __int64 dft=0);
     unsigned __int64 getOptUInt64(const char *opt, unsigned __int64 dft=0) { return (unsigned __int64)getOptInt64(opt, dft); }
-    virtual IThorAllocator *createThorAllocator();
+    IThorAllocator *querySharedAllocator() const { return sharedAllocator; }
+    virtual IThorAllocator *getThorAllocator(unsigned channel);
 
     virtual void abort(IException *e);
     virtual void debugRequest(CMessageBuffer &msg, const char *request) const { }
@@ -904,7 +908,8 @@ protected:
     rank_t myrank;
     Linked<IMPServer> mpServer;
     bool aborted;
-    CThorCodeContextBase *codeCtx;
+    Owned<CThorCodeContextBase> codeCtx;
+    Owned<CThorCodeContextBase> sharedMemCodeCtx;
     unsigned channel;
 
     void removeAssociates(CGraphBase &graph)
@@ -960,8 +965,9 @@ public:
     }
 
     ICodeContext &queryCodeContext() const;
+    ICodeContext &querySharedMemCodeContext() const;
     IThorResult *getOwnedResult(graph_id gid, activity_id ownerId, unsigned resultId);
-    IThorAllocator &queryThorAllocator() const { return *thorAllocator; }
+    IThorAllocator *queryThorAllocator() const { return thorAllocator; }
     ICommunicator &queryJobComm() const { return *jobComm; }
     IMPServer &queryMPServer() const { return *mpServer; }
     const rank_t &queryMyRank() const { return myrank; }

+ 20 - 2
thorlcr/graph/thgraphmaster.cpp

@@ -1259,6 +1259,21 @@ public:
     }
 };
 
+class CThorCodeContextMasterSharedMem : public CThorCodeContextMaster
+{
+    IThorAllocator *sharedAllocator;
+public:
+    CThorCodeContextMasterSharedMem(CJobChannel &jobChannel, IThorAllocator *_sharedAllocator, IConstWorkUnit &_workunit, ILoadedDllEntry &querySo, IUserDescriptor &userDesc)
+        : CThorCodeContextMaster(jobChannel, _workunit, querySo, userDesc)
+    {
+        sharedAllocator = _sharedAllocator;
+    }
+    virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
+    {
+        return sharedAllocator->getRowAllocator(meta, activityId);
+    }
+};
+
 
 /////////////
 
@@ -1284,7 +1299,8 @@ CJobMaster::CJobMaster(IConstWorkUnit &_workunit, const char *graphName, ILoaded
     user.set(workunit->queryUser());
     token.append(_token.str());
     scope.append(_scope.str());
-    globalMemorySize = globals->getPropInt("@masterMemorySize", globals->getPropInt("@globalMemorySize")); // in MB
+    globalMemoryMB = globals->getPropInt("@masterMemorySize", globals->getPropInt("@globalMemorySize")); // in MB
+    numChannels = 1;
     init();
 
     resumed = WUActionResume == workunit->getAction();
@@ -1304,6 +1320,7 @@ CJobMaster::CJobMaster(IConstWorkUnit &_workunit, const char *graphName, ILoaded
         plugin.getPluginName(name);
         loadPlugin(pluginMap, pluginsDir.str(), name.str());
     }
+    sharedAllocator.setown(::createThorAllocator(globalMemoryMB, 0, 1, memorySpillAtPercentage, *logctx, crcChecking, usePackedAllocator));
     Owned<IMPServer> mpServer = getMPServer();
     addChannel(mpServer);
     mpJobTag = allocateMPTag();
@@ -1904,7 +1921,8 @@ bool CJobMaster::fireException(IException *e)
 
 CJobMasterChannel::CJobMasterChannel(CJobBase &job, IMPServer *mpServer, unsigned channel) : CJobChannel(job, mpServer, channel)
 {
-    codeCtx = new CThorCodeContextMaster(*this, job.queryWorkUnit(), job.queryDllEntry(), *job.queryUserDescriptor());
+    codeCtx.setown(new CThorCodeContextMaster(*this, job.queryWorkUnit(), job.queryDllEntry(), *job.queryUserDescriptor()));
+    sharedMemCodeCtx.setown(new CThorCodeContextMasterSharedMem(*this, job.querySharedAllocator(), job.queryWorkUnit(), job.queryDllEntry(), *job.queryUserDescriptor()));
 }
 
 CGraphBase *CJobMasterChannel::createGraph()

+ 23 - 4
thorlcr/graph/thgraphslave.cpp

@@ -1076,6 +1076,21 @@ public:
     }
 };
 
+class CThorCodeContextSlaveSharedMem : public CThorCodeContextSlave
+{
+    IThorAllocator *sharedAllocator;
+public:
+    CThorCodeContextSlaveSharedMem(CJobChannel &jobChannel, IThorAllocator *_sharedAllocator, ILoadedDllEntry &querySo, IUserDescriptor &userDesc, mptag_t mpTag)
+        : CThorCodeContextSlave(jobChannel, querySo, userDesc, mpTag)
+    {
+        sharedAllocator = _sharedAllocator;
+    }
+    virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
+    {
+        return sharedAllocator->getRowAllocator(meta, activityId);
+    }
+};
+
 class CSlaveGraphTempHandler : public CGraphTempHandler
 {
 public:
@@ -1134,7 +1149,7 @@ CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, co
         pluginMap->loadFromList(pluginsList.str());
     }
     tmpHandler.setown(createTempHandler(true));
-    channelMemorySize = globalMemorySize / globals->getPropInt("@channelsPerSlave", 1);
+    sharedAllocator.setown(::createThorAllocator(globalMemoryMB, sharedMemoryMB, numChannels, memorySpillAtPercentage, *logctx, crcChecking, usePackedAllocator));
 }
 
 void CJobSlave::addChannel(IMPServer *mpServer)
@@ -1206,15 +1221,19 @@ mptag_t CJobSlave::deserializeMPTag(MemoryBuffer &mb)
     return tag;
 }
 
-IThorAllocator *CJobSlave::createThorAllocator()
+IThorAllocator *CJobSlave::getThorAllocator(unsigned channel)
 {
-    return ::createThorAllocator(((memsize_t)channelMemorySize)*0x100000, memorySpillAt, *logctx, crcChecking, usePackedAllocator);
+    if (1 == numChannels)
+        return CJobBase::getThorAllocator(channel);
+    else
+        return sharedAllocator->getSlaveAllocator(channel);
 }
 
 // IGraphCallback
 CJobSlaveChannel::CJobSlaveChannel(CJobBase &_job, IMPServer *mpServer, unsigned channel) : CJobChannel(_job, mpServer, channel)
 {
-    codeCtx = new CThorCodeContextSlave(*this, job.queryDllEntry(), *job.queryUserDescriptor(), job.querySlaveMpTag());
+    codeCtx.setown(new CThorCodeContextSlave(*this, job.queryDllEntry(), *job.queryUserDescriptor(), job.querySlaveMpTag()));
+    sharedMemCodeCtx.setown(new CThorCodeContextSlaveSharedMem(*this, job.querySharedAllocator(), job.queryDllEntry(), *job.queryUserDescriptor(), job.querySlaveMpTag()));
 }
 
 IBarrier *CJobSlaveChannel::createBarrier(mptag_t tag)

+ 2 - 2
thorlcr/graph/thgraphslave.hpp

@@ -142,7 +142,7 @@ class graphslave_decl CJobSlave : public CJobBase
     ISlaveWatchdog *watchdog;
     Owned<IPropertyTree> workUnitInfo;
     size32_t oldNodeCacheMem;
-    unsigned channelMemorySize;
+    unsigned channelMemoryMB;
 
 public:
     IMPLEMENT_IINTERFACE;
@@ -160,7 +160,7 @@ public:
     virtual __int64 getWorkUnitValueInt(const char *prop, __int64 defVal) const;
     virtual StringBuffer &getWorkUnitValue(const char *prop, StringBuffer &str) const;
     virtual bool getWorkUnitValueBool(const char *prop, bool defVal) const;
-    virtual IThorAllocator *createThorAllocator();
+    virtual IThorAllocator *getThorAllocator(unsigned channel);
     virtual void debugRequest(CMessageBuffer &msg, const char *request) const;
 
 // IExceptionHandler

+ 43 - 9
thorlcr/thorutil/thmem.cpp

@@ -2192,15 +2192,24 @@ protected:
     mutable Owned<IRowAllocatorMetaActIdCache> allocatorMetaCache;
     Owned<roxiemem::IRowManager> rowManager;
     roxiemem::RoxieHeapFlags defaultFlags;
-    IContextLogger &logctx;
+    IContextLogger *logctx;
+    unsigned numChannels;
+
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CThorAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &_logctx, roxiemem::RoxieHeapFlags _defaultFlags) : logctx(_logctx), defaultFlags(_defaultFlags)
+    CThorAllocator(unsigned memLimitMB, unsigned sharedMemLimitMB, unsigned _numChannels, unsigned memorySpillAtPercentage, IContextLogger &_logctx, roxiemem::RoxieHeapFlags _defaultFlags)
+        : numChannels(_numChannels), logctx(&_logctx), defaultFlags(_defaultFlags)
     {
+        memsize_t memLimit = ((memsize_t)memLimitMB)*0x100000;
+        memsize_t sharedMemLimit = ((memsize_t)sharedMemLimitMB)*0x100000;
         allocatorMetaCache.setown(createRowAllocatorCache(this));
-        rowManager.setown(roxiemem::createRowManager(memSize, NULL, logctx, allocatorMetaCache, false, true));
-        rowManager->setMemoryLimit(memSize, 0==memorySpillAt ? 0 : memSize/100*memorySpillAt);
+        if (numChannels>1)
+            rowManager.setown(roxiemem::createGlobalRowManager(memLimit, sharedMemLimit, numChannels, NULL, *logctx, allocatorMetaCache, false, true));
+        else
+            rowManager.setown(roxiemem::createRowManager(memLimit, NULL, *logctx, allocatorMetaCache, false, true));
+
+        rowManager->setMemoryLimit(memLimit, 0==memorySpillAtPercentage ? 0 : memLimit/100*memorySpillAtPercentage);
         const bool paranoid = false;
         if (paranoid)
         {
@@ -2211,6 +2220,14 @@ public:
             rowManager->setReleaseWhenModifyCallback(true, true);
         }
     }
+    CThorAllocator(IThorAllocator &sharedAllocator, unsigned channel)
+    {
+        allocatorMetaCache.setown(createRowAllocatorCache(this));
+        rowManager.set(sharedAllocator.queryRowManager()->querySlaveRowManager(channel));
+        defaultFlags = sharedAllocator.queryFlags();
+        logctx = sharedAllocator.queryLoggingContext();
+        numChannels = 0;
+    }
     ~CThorAllocator()
     {
         rowManager.clear();
@@ -2235,18 +2252,33 @@ public:
         return rowManager;
     }
     virtual roxiemem::RoxieHeapFlags queryFlags() const { return defaultFlags; }
+    virtual IContextLogger *queryLoggingContext() const { return logctx; }
     virtual bool queryCrc() const { return false; }
+    virtual IThorAllocator *getSlaveAllocator(unsigned channel)
+    {
+        assertex(numChannels>1);
+        return new CThorAllocator(*this, channel);
+    }
 };
 
 // derived to avoid a 'crcChecking' check per getRowAllocator only
 class CThorCrcCheckingAllocator : public CThorAllocator
 {
 public:
-    CThorCrcCheckingAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &logctx, roxiemem::RoxieHeapFlags flags) : CThorAllocator(memSize, memorySpillAt, logctx, flags)
+    CThorCrcCheckingAllocator(unsigned memLimitMB, unsigned sharedMemLimitMB, unsigned numChannels, unsigned memorySpillAtPercentage, IContextLogger &logctx, roxiemem::RoxieHeapFlags flags)
+        : CThorAllocator(memLimitMB, sharedMemLimitMB, numChannels, memorySpillAtPercentage, logctx, flags)
+    {
+    }
+    CThorCrcCheckingAllocator(IThorAllocator &sharedAllocator, unsigned channel) : CThorAllocator(sharedAllocator, channel)
     {
     }
 // IThorAllocator
     virtual bool queryCrc() const { return true; }
+    virtual IThorAllocator *getSlaveAllocator(unsigned channel)
+    {
+        assertex(numChannels>1);
+        return new CThorCrcCheckingAllocator(*this, channel);
+    }
 // roxiemem::IRowAllocatorMetaActIdCacheCallback
     virtual IEngineRowAllocator *createAllocator(IRowAllocatorMetaActIdCache * cache, IOutputMetaData *meta, unsigned activityId, unsigned cacheId, roxiemem::RoxieHeapFlags flags) const
     {
@@ -2255,18 +2287,20 @@ public:
 };
 
 
-IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &logctx, bool crcChecking, bool usePacked)
+IThorAllocator *createThorAllocator(unsigned memLimitMB, unsigned sharedMemLimitMB, unsigned numChannels, unsigned memorySpillAtPercentage, IContextLogger &logctx, bool crcChecking, bool usePacked)
 {
-    PROGLOG("Thor allocator: Size=%d (MB), CRC=%s, Packed=%s", (unsigned)(memSize/0x100000), crcChecking?"ON":"OFF", usePacked?"ON":"OFF");
+    PROGLOG("Thor allocator: Size=%d (MB), sharedLimit=%d (MB), CRC=%s, Packed=%s", memLimitMB, sharedMemLimitMB, crcChecking?"ON":"OFF", usePacked?"ON":"OFF");
     roxiemem::RoxieHeapFlags flags;
     if (usePacked)
         flags = roxiemem::RHFpacked;
     else
         flags = roxiemem::RHFnone;
+    dbgassertex(numChannels);
+    dbgassertex((1==numChannels) || sharedMemLimitMB);
     if (crcChecking)
-        return new CThorCrcCheckingAllocator(memSize, memorySpillAt, logctx, flags);
+        return new CThorCrcCheckingAllocator(memLimitMB, sharedMemLimitMB, numChannels, memorySpillAtPercentage, logctx, flags);
     else
-        return new CThorAllocator(memSize, memorySpillAt, logctx, flags);
+        return new CThorAllocator(memLimitMB, sharedMemLimitMB, numChannels, memorySpillAtPercentage, logctx, flags);
 }
 
 

+ 5 - 2
thorlcr/thorutil/thmem.hpp

@@ -161,10 +161,12 @@ interface IThorAllocator : extends IInterface
     virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, activity_id activityId) const = 0;
     virtual roxiemem::IRowManager *queryRowManager() const = 0;
     virtual roxiemem::RoxieHeapFlags queryFlags() const = 0;
+    virtual IContextLogger *queryLoggingContext() const = 0;
     virtual bool queryCrc() const = 0;
+    virtual IThorAllocator *getSlaveAllocator(unsigned channel) = 0;
 };
 
-extern graph_decl IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &logctx, bool crcChecking, bool usePacked);
+extern graph_decl IThorAllocator *createThorAllocator(unsigned memLimitMB, unsigned sharedMemLimitMB, unsigned numChannels, unsigned memorySpillAtPercentage, IContextLogger &logctx, bool crcChecking, bool usePacked);
 
 extern graph_decl IOutputMetaData *createOutputMetaDataWithExtra(IOutputMetaData *meta, size32_t sz);
 extern graph_decl IOutputMetaData *createOutputMetaDataWithChildRow(IEngineRowAllocator *childAllocator, size32_t extraSz);
@@ -195,7 +197,8 @@ extern graph_decl void setMultiThorMemoryNotify(size32_t size,ILargeMemLimitNoti
 /////////////
 
 // JCSMORE
-enum {
+enum
+{
     InitialSortElements = 0,
     //The number of rows that can be added without entering a critical section, and therefore also the number
     //of rows that might not get freed when memory gets tight.