ソースを参照

HPCC-9505 - Add roxiemem activity mem tracing on OOM

Trace activity allocation summary on roxiemem OOM exception.
Necessitated addition of Thor context logger implementation.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 12 年 前
コミット
daffa29aa8

+ 3 - 3
roxie/roxiemem/roxiemem.cpp

@@ -1326,8 +1326,6 @@ public:
     virtual void getPeakActivityUsage(IActivityMemoryUsageMap *map) const
     {
         //This function may not give 100% accurate results if called if there are concurrent allocations/releases
-        unsigned base = 0;
-        unsigned limit = atomic_read(&freeBase);
         unsigned numAllocs = queryCount()-1;
         if (numAllocs)
         {
@@ -1574,6 +1572,8 @@ public:
                     flags.append("U");
                 if (cur.heapFlags & RHFvariable)
                     flags.append("V");
+
+                //Should never be called with numPages == 0, but protect against divide by zero in case of race condition etc.
                 unsigned percentUsed = cur.numPages ? (unsigned)((cur.memUsed * 100) / (cur.numPages * HEAP_ALIGNMENT_SIZE)) : 100;
                 unsigned __int64 memReserved = cur.numPages * HEAP_ALIGNMENT_SIZE;
                 logctx.CTXLOG("size: %"I64F"u [%s] reserved: %"I64F"u %u%% (%"I64F"u/%"I64F"u) used",
@@ -2390,7 +2390,7 @@ public:
         return (total != 0);
     }
 
-    virtual void getPeakActivityUsage()
+    void getPeakActivityUsage()
     {
         Owned<IActivityMemoryUsageMap> map = getActivityUsage();
         SpinBlock c1(crit);

+ 59 - 1
thorlcr/graph/thgraph.cpp

@@ -2341,6 +2341,62 @@ public:
 };
 
 ////
+// IContextLogger
+class CThorContextLogger : CSimpleInterface, implements IContextLogger
+{
+    CJobBase &job;
+    unsigned traceLevel;
+public:
+    IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
+
+    CThorContextLogger(CJobBase &_job) : job(_job)
+    {
+    }
+    virtual void CTXLOG(const char *format, ...) const
+    {
+        va_list args;
+        va_start(args, format);
+        CTXLOGva(format, args);
+        va_end(args);
+    }
+    virtual void CTXLOGva(const char *format, va_list args) const
+    {
+        StringBuffer ss;
+        ss.valist_appendf(format, args);
+        LOG(MCdebugProgress, thorJob, "%s", ss.str());
+    }
+    virtual void logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const
+    {
+        va_list args;
+        va_start(args, format);
+        logOperatorExceptionVA(E, file, line, format, args);
+        va_end(args);
+    }
+    virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const
+    {
+        StringBuffer ss;
+        ss.append("ERROR");
+        if (E)
+            ss.append(": ").append(E->errorCode());
+        if (file)
+            ss.appendf(": %s(%d) ", file, line);
+        if (E)
+            E->errorMessage(ss.append(": "));
+        if (format)
+            ss.append(": ").valist_appendf(format, args);
+        LOG(MCoperatorProgress, thorJob, "%s", ss.str());
+    }
+    virtual void noteStatistic(unsigned statCode, unsigned __int64 value, unsigned count) const
+    {
+    }
+    virtual unsigned queryTraceLevel() const
+    {
+        return traceLevel;
+    }
+};
+
+////
+
 CJobBase::CJobBase(const char *_graphName) : graphName(_graphName)
 {
     maxDiskUsage = diskUsage = 0;
@@ -2374,6 +2430,8 @@ void CJobBase::init()
     forceLogGraphIdMin = (graph_id)getWorkUnitValueInt("forceLogGraphIdMin", 0);
     forceLogGraphIdMax = (graph_id)getWorkUnitValueInt("forceLogGraphIdMax", 0);
 
+    logctx.setown(new CThorContextLogger(*this));
+
     // global setting default on, can be overridden by #option
     timeActivities = 0 != getWorkUnitValueInt("timeActivities", globals->getPropBool("@timeActivities", true));
     maxActivityCores = (unsigned)getWorkUnitValueInt("maxActivityCores", globals->getPropInt("@maxActivityCores", 0)); // NB: 0 means system decides
@@ -2388,7 +2446,7 @@ void CJobBase::init()
     bool crcChecking = 0 != getWorkUnitValueInt("THOR_ROWCRC", globals->getPropBool("@THOR_ROWCRC", defaultCrcChecking));
     bool usePackedAllocator = 0 != getWorkUnitValueInt("THOR_PACKEDALLOCATOR", globals->getPropBool("@THOR_PACKEDALLOCATOR", false));
     unsigned memorySpillAt = (unsigned)getWorkUnitValueInt("memorySpillAt", globals->getPropInt("@memorySpillAt", 80));
-    thorAllocator.setown(createThorAllocator(((memsize_t)globalMemorySize)*0x100000, memorySpillAt, crcChecking, usePackedAllocator));
+    thorAllocator.setown(createThorAllocator(((memsize_t)globalMemorySize)*0x100000, memorySpillAt, *logctx, crcChecking, usePackedAllocator));
 
     unsigned defaultMemMB = globalMemorySize*3/4;
     unsigned largeMemSize = getOptUInt("@largeMemSize", defaultMemMB);

+ 3 - 0
thorlcr/graph/thgraph.hpp

@@ -808,6 +808,8 @@ protected:
     bool timeActivities;
     unsigned maxActivityCores, globalMemorySize;
     unsigned forceLogGraphIdMin, forceLogGraphIdMax;
+    Owned<IContextLogger> logctx;
+
     class CThorPluginCtx : public SimplePluginCtx
     {
     public:
@@ -846,6 +848,7 @@ public:
     const char *queryGraphName() const { return graphName; }
     bool queryForceLogging(graph_id graphId, bool def) const;
     ITimeReporter &queryTimeReporter() { return *timeReporter; }
+    const IContextLogger &queryContextLogger() const { return *logctx; }
     virtual IGraphTempHandler *createTempHandler(bool errorOnMissing) = 0;
     virtual CGraphBase *createGraph() = 0;
     void joinGraph(CGraphBase &graph);

+ 1 - 1
thorlcr/thorcodectx/thcodectx.hpp

@@ -100,7 +100,7 @@ public:
     }
     virtual const IContextLogger &queryContextLogger() const
     {
-        return queryDummyContextLogger();
+        return job.queryContextLogger();
     }
 
     virtual void executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract) { UNIMPLEMENTED; }

+ 7 - 7
thorlcr/thorutil/thmem.cpp

@@ -1741,14 +1741,14 @@ protected:
     mutable Owned<IRowAllocatorMetaActIdCache> allocatorMetaCache;
     Owned<roxiemem::IRowManager> rowManager;
     roxiemem::RoxieHeapFlags flags;
-
+    IContextLogger &logctx;
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CThorAllocator(memsize_t memSize, unsigned memorySpillAt, roxiemem::RoxieHeapFlags _flags) : flags(_flags)
+    CThorAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &_logctx, roxiemem::RoxieHeapFlags _flags) : logctx(_logctx), flags(_flags)
     {
         allocatorMetaCache.setown(createRowAllocatorCache(this));
-        rowManager.setown(roxiemem::createRowManager(memSize, NULL, queryDummyContextLogger(), allocatorMetaCache, false));
+        rowManager.setown(roxiemem::createRowManager(memSize, NULL, logctx, allocatorMetaCache, false));
         rowManager->setMemoryLimit(memSize, 0==memorySpillAt ? 0 : memSize/100*memorySpillAt);
     }
     ~CThorAllocator()
@@ -1778,7 +1778,7 @@ public:
 class CThorCrcCheckingAllocator : public CThorAllocator
 {
 public:
-    CThorCrcCheckingAllocator(memsize_t memSize, unsigned memorySpillAt, roxiemem::RoxieHeapFlags flags) : CThorAllocator(memSize, memorySpillAt, flags)
+    CThorCrcCheckingAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &logctx, roxiemem::RoxieHeapFlags flags) : CThorAllocator(memSize, memorySpillAt, logctx, flags)
     {
     }
 // IThorAllocator
@@ -1791,7 +1791,7 @@ public:
 };
 
 
-IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, bool crcChecking, bool usePacked)
+IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &logctx, bool crcChecking, bool usePacked)
 {
     PROGLOG("CRC allocator %s", crcChecking?"ON":"OFF");
     PROGLOG("Packed allocator %s", usePacked?"ON":"OFF");
@@ -1801,9 +1801,9 @@ IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, b
     else
         flags = roxiemem::RHFnone;
     if (crcChecking)
-        return new CThorCrcCheckingAllocator(memSize, memorySpillAt, flags);
+        return new CThorCrcCheckingAllocator(memSize, memorySpillAt, logctx, flags);
     else
-        return new CThorAllocator(memSize, memorySpillAt, flags);
+        return new CThorAllocator(memSize, memorySpillAt, logctx, flags);
 }
 
 

+ 1 - 1
thorlcr/thorutil/thmem.hpp

@@ -157,7 +157,7 @@ interface IThorAllocator : extends IInterface
     virtual bool queryCrc() const = 0;
 };
 
-IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, bool crcChecking, bool usePacked);
+IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &logctx, bool crcChecking, bool usePacked);
 
 extern graph_decl IOutputMetaData *createOutputMetaDataWithExtra(IOutputMetaData *meta, size32_t sz);
 extern graph_decl IOutputMetaData *createOutputMetaDataWithChildRow(IEngineRowAllocator *childAllocator, size32_t extraSz);