Browse Source

Merge pull request #4512 from ghalliday/issue9505

HPCC-9505 - Add roxiemem activity mem tracing on OOM …

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 years ago
parent
commit
20fdba7154

+ 113 - 39
roxie/roxiemem/roxiemem.cpp

@@ -1173,6 +1173,7 @@ public:
         unsigned base = 0;
         unsigned limit = atomic_read(&freeBase);
         memsize_t running = 0;
+        unsigned runningCount = 0;
         unsigned lastId = 0;
         while (base < limit)
         {
@@ -1187,17 +1188,21 @@ public:
                 if (activityId != lastId)
                 {
                     if (lastId)
-                        map->noteMemUsage(lastId, running);
+                        map->noteMemUsage(lastId, running, runningCount);
                     lastId = activityId;
                     running = chunkSize;
+                    runningCount = 1;
                 }
                 else
+                {
                     running += chunkSize;
+                    runningCount++;
+                }
             }
             base += chunkSize;
         }
         if (lastId)
-            map->noteMemUsage(lastId, running);
+            map->noteMemUsage(lastId, running, runningCount);
     }
 
 private:
@@ -1321,22 +1326,11 @@ public:
     virtual void getPeakActivityUsage(IActivityMemoryUsageMap *map) const
     {
         //This function may not give 100% accurate results if called if there are concurrent allocations/releases
-        unsigned base = 0;
-        unsigned limit = atomic_read(&freeBase);
-        memsize_t running = 0;
-        while (base < limit)
-        {
-            const char *block = data() + base;
-            ChunkHeader * header = (ChunkHeader *)block;
-            unsigned rowCount = atomic_read(&header->count);
-            if (ROWCOUNT(rowCount) != 0)
-                running += chunkSize;
-            base += chunkSize;
-        }
-        if (running)
+        unsigned numAllocs = queryCount()-1;
+        if (numAllocs)
         {
             unsigned activityId = getActivityId(sharedAllocatorId);
-            map->noteMemUsage(activityId, running);
+            map->noteMemUsage(activityId, numAllocs * chunkSize, numAllocs);
         }
     }
 };
@@ -1349,7 +1343,7 @@ class HugeHeaplet : public BigHeapletBase
 protected:
     unsigned allocatorId;
 
-    inline unsigned _sizeInPages() 
+    inline unsigned _sizeInPages() const
     {
         return PAGES(chunkCapacity + dataOffset(), HEAP_ALIGNMENT_SIZE);
     }
@@ -1461,7 +1455,8 @@ public:
     virtual void getPeakActivityUsage(IActivityMemoryUsageMap *map) const 
     {
         unsigned activityId = getActivityId(allocatorId);
-        map->noteMemUsage(activityId, chunkCapacity);
+        map->noteMemUsage(activityId, chunkCapacity, 1);
+        map->noteHeapUsage(chunkCapacity, RHFpacked, _sizeInPages(), chunkCapacity);
     }
 
     virtual void checkHeap() const
@@ -1479,16 +1474,30 @@ struct ActivityEntry
     memsize_t usage;
 };
 
+struct HeapEntry : public CInterface
+{
+public:
+    HeapEntry(memsize_t _allocatorSize, RoxieHeapFlags _heapFlags, memsize_t _numPages, memsize_t _memUsed) :
+        allocatorSize(_allocatorSize), heapFlags(_heapFlags), numPages(_numPages), memUsed(_memUsed)
+    {
+    }
+
+    memsize_t allocatorSize;
+    RoxieHeapFlags heapFlags;
+    memsize_t numPages;
+    memsize_t memUsed;
+};
+
 typedef MapBetween<unsigned, unsigned, ActivityEntry, ActivityEntry> MapActivityToActivityEntry;
 
 class CActivityMemoryUsageMap : public CInterface, implements IActivityMemoryUsageMap
 {
     MapActivityToActivityEntry map;
+    CIArrayOf<HeapEntry> heaps;
     memsize_t maxUsed;
     memsize_t totalUsed;
     unsigned maxActivity;
 
-
 public:
     IMPLEMENT_IINTERFACE;
     CActivityMemoryUsageMap()
@@ -1498,7 +1507,7 @@ public:
         maxActivity = 0;
     }
 
-    virtual void noteMemUsage(unsigned activityId, unsigned memUsed)
+    virtual void noteMemUsage(unsigned activityId, memsize_t memUsed, unsigned numAllocs)
     {
         totalUsed += memUsed;
         ActivityEntry *ret = map.getValue(activityId);
@@ -1506,11 +1515,11 @@ public:
         {
             memUsed += ret->usage;
             ret->usage = memUsed;
-            ret->allocations++;
+            ret->allocations += numAllocs;
         }
         else
         {
-            ActivityEntry e = {activityId, 1, memUsed};
+            ActivityEntry e = {activityId, numAllocs, memUsed};
             map.setValue(activityId, e);
         }
         if (memUsed > maxUsed)
@@ -1520,6 +1529,11 @@ public:
         }
     }
 
+    void noteHeapUsage(memsize_t allocatorSize, RoxieHeapFlags heapFlags, memsize_t numPages, memsize_t memUsed)
+    {
+        heaps.append(*new HeapEntry(allocatorSize, heapFlags, numPages, memUsed));
+    }
+
     static int sortUsage(const void *_l, const void *_r)
     {
         const ActivityEntry *l = *(const ActivityEntry **) _l;
@@ -1547,6 +1561,25 @@ public:
                 j--;
                 logctx.CTXLOG("%"I64F"u bytes allocated by activity %u (%u allocations)", (unsigned __int64) results[j]->usage, getRealActivityId(results[j]->activityId, allocatorCache), results[j]->allocations);
             }
+            logctx.CTXLOG("Heaps:");
+            ForEachItemIn(iHeap, heaps)
+            {
+                HeapEntry & cur = heaps.item(iHeap);
+                StringBuffer flags;
+                if (cur.heapFlags & RHFpacked)
+                    flags.append("P");
+                if (cur.heapFlags & RHFunique)
+                    flags.append("U");
+                if (cur.heapFlags & RHFvariable)
+                    flags.append("V");
+
+                //Should never be called with numPages == 0, but protect against divide by zero in case of race condition etc.
+                unsigned percentUsed = cur.numPages ? (unsigned)((cur.memUsed * 100) / (cur.numPages * HEAP_ALIGNMENT_SIZE)) : 100;
+                unsigned __int64 memReserved = cur.numPages * HEAP_ALIGNMENT_SIZE;
+                logctx.CTXLOG("size: %"I64F"u [%s] reserved: %"I64F"u %u%% (%"I64F"u/%"I64F"u) used",
+                        (unsigned __int64) cur.allocatorSize, flags.str(), (unsigned __int64) cur.numPages, percentUsed, (unsigned __int64) cur.memUsed, (unsigned __int64) memReserved);
+            }
+
             logctx.CTXLOG("------------------ End of snapshot");
             delete [] results;
         }
@@ -1758,16 +1791,23 @@ public:
         return total;
     }
 
-    void getPeakActivityUsage(IActivityMemoryUsageMap * usageMap)
+    void getPeakActivityUsage(IActivityMemoryUsageMap * usageMap) const
     {
         SpinBlock c1(crit);
         BigHeapletBase *finger = active;
+        unsigned numPages = 0;
+        memsize_t numAllocs = 0;
         while (finger)
         {
-            if (finger->queryCount()!=1)
+            unsigned thisCount = finger->queryCount()-1;
+            if (thisCount != 0)
                 finger->getPeakActivityUsage(usageMap);
+            numAllocs += thisCount;
+            numPages++;
             finger = getNext(finger);
         }
+        if (numPages)
+            reportHeapUsage(usageMap, numPages, numAllocs);
     }
 
     inline bool isEmpty() const { return !active; }
@@ -1778,6 +1818,8 @@ public:
     }
 
 protected:
+    virtual void reportHeapUsage(IActivityMemoryUsageMap * usageMap, unsigned numPages, memsize_t numAllocs) const = 0;
+
     inline BigHeapletBase * getNext(const BigHeapletBase * ptr) const { return ptr->next; }
     inline void setNext(BigHeapletBase * ptr, BigHeapletBase * next) const { ptr->next = next; }
 
@@ -1803,6 +1845,11 @@ public:
 
 protected:
     HugeHeaplet * allocateHeaplet(memsize_t _size, unsigned allocatorId);
+
+    virtual void reportHeapUsage(IActivityMemoryUsageMap * usageMap, unsigned numPages, memsize_t numAllocs) const
+    {
+        //Already processed in HugeHeaplet::getPeakActivityUsage(IActivityMemoryUsageMap *map) const
+    }
 };
 
 class CNormalChunkingHeap : public CChunkingHeap
@@ -1815,6 +1862,11 @@ public:
 
     void * doAllocate(unsigned allocatorId);
 
+    virtual void reportHeapUsage(IActivityMemoryUsageMap * usageMap, unsigned numPages, memsize_t numAllocs) const
+    {
+        usageMap->noteHeapUsage(chunkSize, (RoxieHeapFlags)flags, numPages, chunkSize * numAllocs);
+    }
+
 protected:
     inline void * inlineDoAllocate(unsigned allocatorId);
     virtual BigHeapletBase * allocateHeaplet() = 0;
@@ -2137,7 +2189,7 @@ class CChunkingRowManager : public CInterface, implements IRowManager
     friend class CFixedChunkingHeap;
 
     SpinLock crit;
-    SpinLock fixedCrit;  // Should possibly be a ReadWriteLock - better with high contention, worse with low
+    mutable SpinLock fixedCrit;  // Should possibly be a ReadWriteLock - better with high contention, worse with low
     CIArrayOf<CFixedChunkingHeap> normalHeaps;
     CHugeChunkingHeap hugeHeap;
     ITimeLimiter *timeLimit;
@@ -2180,7 +2232,7 @@ public:
             size32_t rounded = roundup(prevSize+1);
             dbgassertex(ROUNDEDHEAP(rounded) == normalHeaps.ordinality());
             size32_t thisSize = ROUNDEDSIZE(rounded);
-            normalHeaps.append(*new CFixedChunkingHeap(this, _logctx, _allocatorCache, thisSize, 0));
+            normalHeaps.append(*new CFixedChunkingHeap(this, _logctx, _allocatorCache, thisSize, RHFvariable));
             prevSize = thisSize;
         }
         pageLimit = (unsigned) PAGES(_memLimit, HEAP_ALIGNMENT_SIZE);
@@ -2338,20 +2390,9 @@ public:
         return (total != 0);
     }
 
-    virtual void getPeakActivityUsage()
+    void getPeakActivityUsage()
     {
-        Owned<IActivityMemoryUsageMap> map = new CActivityMemoryUsageMap;
-        ForEachItemIn(iNormal, normalHeaps)
-            normalHeaps.item(iNormal).getPeakActivityUsage(map);
-        hugeHeap.getPeakActivityUsage(map);
-
-        SpinBlock block(fixedCrit); //Spinblock needed if we can add/remove fixed heaps while allocations are occuring
-        ForEachItemIn(i, fixedHeaps)
-        {
-            CChunkingHeap & fixedHeap = fixedHeaps.item(i);
-            fixedHeap.getPeakActivityUsage(map);
-        }
-
+        Owned<IActivityMemoryUsageMap> map = getActivityUsage();
         SpinBlock c1(crit);
         usageMap.setown(map.getClear());
     }
@@ -2682,6 +2723,7 @@ public:
                     if (numHeapPages == atomic_read(&totalHeapPages))
                     {
                         logctx.CTXLOG("RoxieMemMgr: Memory limit exceeded - current %u, requested %u, limit %u", pageCount, numRequested, pageLimit);
+                        reportMemoryUsage(false);
                         PrintStackReport();
                         throw MakeStringException(ROXIEMM_MEMORY_LIMIT_EXCEEDED, "memory limit exceeded");
                     }
@@ -2746,6 +2788,22 @@ protected:
         return NULL;
     }
 
+    IActivityMemoryUsageMap * getActivityUsage() const
+    {
+        Owned<IActivityMemoryUsageMap> map = new CActivityMemoryUsageMap;
+        ForEachItemIn(iNormal, normalHeaps)
+            normalHeaps.item(iNormal).getPeakActivityUsage(map);
+        hugeHeap.getPeakActivityUsage(map);
+
+        SpinBlock block(fixedCrit); //Spinblock needed if we can add/remove fixed heaps while allocations are occuring
+        ForEachItemIn(i, fixedHeaps)
+        {
+            CChunkingHeap & fixedHeap = fixedHeaps.item(i);
+            fixedHeap.getPeakActivityUsage(map);
+        }
+        return map.getClear();
+    }
+
     CFixedChunkingHeap * createFixedHeap(size32_t size, unsigned activityId, unsigned flags)
     {
         dbgassertex(!(flags & RHFpacked));
@@ -2830,6 +2888,22 @@ protected:
         callbacks.removeRowBuffer(callback);
     }
 
+    virtual void reportMemoryUsage(bool peak) const
+    {
+        if (peak)
+        {
+            if (usageMap)
+                usageMap->report(logctx, allocatorCache);
+        }
+        else
+        {
+            logctx.CTXLOG("RoxieMemMgr: pageLimit=%u peakPages=%u dataBuffs=%u dataBuffPages=%u possibleGoers=%u rowMgr=%p",
+                          pageLimit, peakPages, dataBuffs, dataBuffPages, atomic_read(&possibleGoers), this);
+            Owned<IActivityMemoryUsageMap> map = getActivityUsage();
+            map->report(logctx, allocatorCache);
+        }
+    }
+
     virtual memsize_t getExpectedCapacity(memsize_t size, unsigned heapFlags)
     {
         if (size > FixedSizeHeaplet::maxHeapSize())

+ 4 - 1
roxie/roxiemem/roxiemem.hpp

@@ -382,6 +382,7 @@ enum RoxieHeapFlags
     RHFhasdestructor    = 0x0002,
     RHFunique           = 0x0004,  // create a separate fixed size allocator
     RHFoldfixed         = 0x0008,  // Don't create a special fixed size heap for this
+    RHFvariable         = 0x0010,  // only used for tracing
 };
 
 //This interface is here to allow atomic updates to allocations when they are being resized.  There are a few complications:
@@ -425,6 +426,7 @@ interface IRowManager : extends IInterface
     virtual IVariableRowHeap * createVariableRowHeap(unsigned activityId, unsigned roxieHeapFlags) = 0;            // should this be passed the initial size?
     virtual void addRowBuffer(IBufferedRowCallback * callback) = 0;
     virtual void removeRowBuffer(IBufferedRowCallback * callback) = 0;
+    virtual void reportMemoryUsage(bool peak) const = 0;
     virtual memsize_t getExpectedCapacity(memsize_t size, unsigned heapFlags) = 0; // what is the expected capacity for a given size allocation
     virtual memsize_t getExpectedFootprint(memsize_t size, unsigned heapFlags) = 0; // how much memory will a given size allocation actually use.
 };
@@ -438,7 +440,8 @@ interface ITimeLimiter
 
 interface IActivityMemoryUsageMap : public IInterface
 {
-    virtual void noteMemUsage(unsigned activityId, unsigned memUsed) = 0;
+    virtual void noteMemUsage(unsigned activityId, memsize_t memUsed, unsigned numAllocs) = 0;
+    virtual void noteHeapUsage(memsize_t allocatorSize, RoxieHeapFlags heapFlags, memsize_t memReserved, memsize_t memUsed) = 0;
     virtual void report(const IContextLogger &logctx, const IRowAllocatorCache *allocatorCache) = 0;
 };
 

+ 59 - 1
thorlcr/graph/thgraph.cpp

@@ -2341,6 +2341,62 @@ public:
 };
 
 ////
+// IContextLogger
+class CThorContextLogger : CSimpleInterface, implements IContextLogger
+{
+    CJobBase &job;
+    unsigned traceLevel;
+public:
+    IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
+
+    CThorContextLogger(CJobBase &_job) : job(_job)
+    {
+    }
+    virtual void CTXLOG(const char *format, ...) const
+    {
+        va_list args;
+        va_start(args, format);
+        CTXLOGva(format, args);
+        va_end(args);
+    }
+    virtual void CTXLOGva(const char *format, va_list args) const
+    {
+        StringBuffer ss;
+        ss.valist_appendf(format, args);
+        LOG(MCdebugProgress, thorJob, "%s", ss.str());
+    }
+    virtual void logOperatorException(IException *E, const char *file, unsigned line, const char *format, ...) const
+    {
+        va_list args;
+        va_start(args, format);
+        logOperatorExceptionVA(E, file, line, format, args);
+        va_end(args);
+    }
+    virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const
+    {
+        StringBuffer ss;
+        ss.append("ERROR");
+        if (E)
+            ss.append(": ").append(E->errorCode());
+        if (file)
+            ss.appendf(": %s(%d) ", file, line);
+        if (E)
+            E->errorMessage(ss.append(": "));
+        if (format)
+            ss.append(": ").valist_appendf(format, args);
+        LOG(MCoperatorProgress, thorJob, "%s", ss.str());
+    }
+    virtual void noteStatistic(unsigned statCode, unsigned __int64 value, unsigned count) const
+    {
+    }
+    virtual unsigned queryTraceLevel() const
+    {
+        return traceLevel;
+    }
+};
+
+////
+
 CJobBase::CJobBase(const char *_graphName) : graphName(_graphName)
 {
     maxDiskUsage = diskUsage = 0;
@@ -2374,6 +2430,8 @@ void CJobBase::init()
     forceLogGraphIdMin = (graph_id)getWorkUnitValueInt("forceLogGraphIdMin", 0);
     forceLogGraphIdMax = (graph_id)getWorkUnitValueInt("forceLogGraphIdMax", 0);
 
+    logctx.setown(new CThorContextLogger(*this));
+
     // global setting default on, can be overridden by #option
     timeActivities = 0 != getWorkUnitValueInt("timeActivities", globals->getPropBool("@timeActivities", true));
     maxActivityCores = (unsigned)getWorkUnitValueInt("maxActivityCores", globals->getPropInt("@maxActivityCores", 0)); // NB: 0 means system decides
@@ -2388,7 +2446,7 @@ void CJobBase::init()
     bool crcChecking = 0 != getWorkUnitValueInt("THOR_ROWCRC", globals->getPropBool("@THOR_ROWCRC", defaultCrcChecking));
     bool usePackedAllocator = 0 != getWorkUnitValueInt("THOR_PACKEDALLOCATOR", globals->getPropBool("@THOR_PACKEDALLOCATOR", false));
     unsigned memorySpillAt = (unsigned)getWorkUnitValueInt("memorySpillAt", globals->getPropInt("@memorySpillAt", 80));
-    thorAllocator.setown(createThorAllocator(((memsize_t)globalMemorySize)*0x100000, memorySpillAt, crcChecking, usePackedAllocator));
+    thorAllocator.setown(createThorAllocator(((memsize_t)globalMemorySize)*0x100000, memorySpillAt, *logctx, crcChecking, usePackedAllocator));
 
     unsigned defaultMemMB = globalMemorySize*3/4;
     unsigned largeMemSize = getOptUInt("@largeMemSize", defaultMemMB);

+ 3 - 0
thorlcr/graph/thgraph.hpp

@@ -808,6 +808,8 @@ protected:
     bool timeActivities;
     unsigned maxActivityCores, globalMemorySize;
     unsigned forceLogGraphIdMin, forceLogGraphIdMax;
+    Owned<IContextLogger> logctx;
+
     class CThorPluginCtx : public SimplePluginCtx
     {
     public:
@@ -846,6 +848,7 @@ public:
     const char *queryGraphName() const { return graphName; }
     bool queryForceLogging(graph_id graphId, bool def) const;
     ITimeReporter &queryTimeReporter() { return *timeReporter; }
+    const IContextLogger &queryContextLogger() const { return *logctx; }
     virtual IGraphTempHandler *createTempHandler(bool errorOnMissing) = 0;
     virtual CGraphBase *createGraph() = 0;
     void joinGraph(CGraphBase &graph);

+ 1 - 1
thorlcr/thorcodectx/thcodectx.hpp

@@ -100,7 +100,7 @@ public:
     }
     virtual const IContextLogger &queryContextLogger() const
     {
-        return queryDummyContextLogger();
+        return job.queryContextLogger();
     }
 
     virtual void executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract) { UNIMPLEMENTED; }

+ 7 - 7
thorlcr/thorutil/thmem.cpp

@@ -1741,14 +1741,14 @@ protected:
     mutable Owned<IRowAllocatorMetaActIdCache> allocatorMetaCache;
     Owned<roxiemem::IRowManager> rowManager;
     roxiemem::RoxieHeapFlags flags;
-
+    IContextLogger &logctx;
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CThorAllocator(memsize_t memSize, unsigned memorySpillAt, roxiemem::RoxieHeapFlags _flags) : flags(_flags)
+    CThorAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &_logctx, roxiemem::RoxieHeapFlags _flags) : logctx(_logctx), flags(_flags)
     {
         allocatorMetaCache.setown(createRowAllocatorCache(this));
-        rowManager.setown(roxiemem::createRowManager(memSize, NULL, queryDummyContextLogger(), allocatorMetaCache, false));
+        rowManager.setown(roxiemem::createRowManager(memSize, NULL, logctx, allocatorMetaCache, false));
         rowManager->setMemoryLimit(memSize, 0==memorySpillAt ? 0 : memSize/100*memorySpillAt);
     }
     ~CThorAllocator()
@@ -1778,7 +1778,7 @@ public:
 class CThorCrcCheckingAllocator : public CThorAllocator
 {
 public:
-    CThorCrcCheckingAllocator(memsize_t memSize, unsigned memorySpillAt, roxiemem::RoxieHeapFlags flags) : CThorAllocator(memSize, memorySpillAt, flags)
+    CThorCrcCheckingAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &logctx, roxiemem::RoxieHeapFlags flags) : CThorAllocator(memSize, memorySpillAt, logctx, flags)
     {
     }
 // IThorAllocator
@@ -1791,7 +1791,7 @@ public:
 };
 
 
-IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, bool crcChecking, bool usePacked)
+IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &logctx, bool crcChecking, bool usePacked)
 {
     PROGLOG("CRC allocator %s", crcChecking?"ON":"OFF");
     PROGLOG("Packed allocator %s", usePacked?"ON":"OFF");
@@ -1801,9 +1801,9 @@ IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, b
     else
         flags = roxiemem::RHFnone;
     if (crcChecking)
-        return new CThorCrcCheckingAllocator(memSize, memorySpillAt, flags);
+        return new CThorCrcCheckingAllocator(memSize, memorySpillAt, logctx, flags);
     else
-        return new CThorAllocator(memSize, memorySpillAt, flags);
+        return new CThorAllocator(memSize, memorySpillAt, logctx, flags);
 }
 
 

+ 1 - 1
thorlcr/thorutil/thmem.hpp

@@ -157,7 +157,7 @@ interface IThorAllocator : extends IInterface
     virtual bool queryCrc() const = 0;
 };
 
-IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, bool crcChecking, bool usePacked);
+IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &logctx, bool crcChecking, bool usePacked);
 
 extern graph_decl IOutputMetaData *createOutputMetaDataWithExtra(IOutputMetaData *meta, size32_t sz);
 extern graph_decl IOutputMetaData *createOutputMetaDataWithChildRow(IEngineRowAllocator *childAllocator, size32_t extraSz);