Ver código fonte

HPCC-10414 Clean up various issues with new stats code

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 11 anos atrás
pai
commit
e18001e2fe

+ 1 - 1
common/thorhelper/roxierow.cpp

@@ -469,7 +469,7 @@ public:
         else
         {
             //assert(false);
-            return 12345678; // Used for tracing, better than a crash...
+            return UNKNOWN_ACTIVITY; // Used for tracing, better than a crash...
         }
     }
     virtual StringBuffer &getActivityDescriptor(unsigned cacheId, StringBuffer &out) const

+ 1 - 1
common/workunit/workunit.cpp

@@ -5504,7 +5504,7 @@ StringBuffer &formatGraphTimerLabel(StringBuffer &str, const char *graphName, un
 StringBuffer &formatGraphTimerScope(StringBuffer &str, const char *graphName, unsigned subGraphNum, unsigned __int64 subId)
 {
     str.append(graphName);
-    if (subId) str.append(": ").append(subId);
+    if (subId) str.append(":").append(subId);
     return str;
 }
 

+ 3 - 3
ecl/eclccserver/eclccserver.cpp

@@ -160,9 +160,9 @@ class EclccCompileThread : public CInterface, implements IPooledThread, implemen
                 {
                     section.insert(0, "eclcc: ");
 
-                    unsigned __int64 mval = atoi(total); // in milliseconds
-                    unsigned __int64 umax = atoi(max); // in microseconds
-                    unsigned __int64 cnt = atoi(count);
+                    unsigned __int64 mval = atoi64(total); // in milliseconds
+                    unsigned __int64 umax = atoi64(max); // in microseconds
+                    unsigned __int64 cnt = atoi64(count);
                     const char * wuScope = section.str(); // should be different
                     const char * description = section.str();
                     updateWorkunitTiming(workunit, "eclcc", wuScope, description, milliToNano(mval), cnt, umax*1000);

+ 2 - 2
ecl/hthor/hthor.ipp

@@ -1085,7 +1085,7 @@ class CSimpleSorterBase : public CInterface, public ISorter
 {
 public:
     CSimpleSorterBase(ICompare * _compare, roxiemem::IRowManager * _rowManager, size32_t _initialSize, size32_t _commitDelta) : compare(_compare), finger(0), rowManager(_rowManager),
-        rowsToSort(_rowManager, _initialSize, _commitDelta) {}
+        rowsToSort(_rowManager, _initialSize, _commitDelta, UNKNOWN_ROWSET_ID) {}
     virtual ~CSimpleSorterBase()                            { killSorted(); }
     IMPLEMENT_IINTERFACE;
     virtual bool addRow(const void * next)                  { return rowsToSort.append(next); }
@@ -1106,7 +1106,7 @@ public:
     virtual const DynamicRoxieOutputRowArray & getRowArray()     { return rowsToSort; }
     virtual void flushRows()                                { rowsToSort.flush(); }
     virtual size32_t numCommitted() const                   { return rowsToSort.numCommitted(); }
-    virtual void setActivityId(unsigned _activityId)        { activityId = _activityId; }
+    virtual void setActivityId(unsigned _activityId)        { activityId = _activityId; rowsToSort.setAllocatorId(_activityId); }
 
 protected:
     roxiemem::IRowManager * rowManager;

+ 1 - 1
ecllibrary/std/system/Workunit.ecl

@@ -143,7 +143,7 @@ EXPORT dataset(TimingRecord) WorkunitTimings(varstring wuid) :=
  * @param wuid          the name of the workunit
 */
 
-EXPORT dataset(StatisticRecord) WorkunitStatistics(varstring wuid, bool includeActivities = false) :=
+EXPORT dataset(StatisticRecord) WorkunitStatistics(varstring wuid, boolean includeActivities = false) :=
   lib_workunitservices.WorkUnitServices.WorkunitStatistics(wuid, includeActivities);
 
 

+ 1 - 1
plugins/workunitservices/workunitservices.cpp

@@ -141,7 +141,7 @@ static const char * EclDefinition =
 "  dataset(WsFileRead) WorkunitFilesRead(const varstring wuid) : c,context,entrypoint='wsWorkunitFilesRead'; \n"
 "  dataset(WsFileWritten) WorkunitFilesWritten(const varstring wuid) : c,context,entrypoint='wsWorkunitFilesWritten'; \n"
 "  dataset(WsTiming) WorkunitTimings(const varstring wuid) : c,context,entrypoint='wsWorkunitTimings'; \n"
-"  streamed dataset(WsStatistic) WorkunitStatistics(const varstring wuid, bool includeActivities = false) : c,context,entrypoint='wsWorkunitStatistics'; \n"
+"  streamed dataset(WsStatistic) WorkunitStatistics(const varstring wuid, boolean includeActivities = false) : c,context,entrypoint='wsWorkunitStatistics'; \n"
     
 "END;";
 

+ 4 - 0
roxie/ccd/ccdcontext.cpp

@@ -1977,6 +1977,10 @@ public:
         rowManager->setMemoryLimit(serverQueryFactory->getMemoryLimit());
         workflow.setown(_factory->createWorkflowMachine(false, logctx));
         context.setown(createPTree(ipt_caseInsensitive));
+
+        //MORE: Use various debug settings to override settings:
+        rowManager->setActivityTracking(workUnit->getDebugValueBool("traceRoxiePeakMemory", false));
+
         startWorkUnit();
     }
 

+ 1 - 1
roxie/ccd/ccdserver.cpp

@@ -7321,7 +7321,7 @@ class CSpillingQuickSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>, imp
 
 public:
     CSpillingQuickSortAlgorithm(ICompare *_compare, IRoxieSlaveContext * _ctx, IOutputMetaData * _rowMeta, unsigned _activityId)
-        : rowsToSort(&_ctx->queryRowManager(), InitialSortElements, CommitStep), ctx(_ctx), compare(_compare), rowMeta(_rowMeta), activityId(_activityId)
+        : rowsToSort(&_ctx->queryRowManager(), InitialSortElements, CommitStep, _activityId), ctx(_ctx), compare(_compare), rowMeta(_rowMeta), activityId(_activityId)
     {
         ctx->queryRowManager().addRowBuffer(this);
     }

+ 27 - 20
roxie/roxiemem/roxiemem.cpp

@@ -1213,17 +1213,17 @@ public:
         {
             const char *block = data() + base;
             ChunkHeader * header = (ChunkHeader *)block;
-            unsigned activityId = getActivityId(header->allocatorId);
+            unsigned allocatorId = header->allocatorId;
             //Potential race condition - a block could become allocated between these two lines.
             //That may introduce invalid activityIds (from freed memory) in the memory tracing.
             unsigned rowCount = atomic_read(&header->count);
             if (ROWCOUNT(rowCount) != 0)
             {
-                if (activityId != lastId)
+                if (allocatorId != lastId)
                 {
                     if (lastId)
                         map->noteMemUsage(lastId, running, runningCount);
-                    lastId = activityId;
+                    lastId = allocatorId;
                     running = chunkSize;
                     runningCount = 1;
                 }
@@ -1410,8 +1410,7 @@ public:
         unsigned numAllocs = queryCount()-1;
         if (numAllocs)
         {
-            unsigned activityId = getActivityId(sharedAllocatorId);
-            map->noteMemUsage(activityId, numAllocs * chunkSize, numAllocs);
+            map->noteMemUsage(sharedAllocatorId, numAllocs * chunkSize, numAllocs);
         }
     }
 };
@@ -1539,8 +1538,7 @@ public:
 
     virtual void getPeakActivityUsage(IActivityMemoryUsageMap *map) const 
     {
-        unsigned activityId = getActivityId(allocatorId);
-        map->noteMemUsage(activityId, chunkCapacity, 1);
+        map->noteMemUsage(allocatorId, chunkCapacity, 1);
         map->noteHeapUsage(chunkCapacity, RHFpacked, _sizeInPages(), chunkCapacity);
     }
 
@@ -1564,7 +1562,7 @@ public:
 //
 struct ActivityEntry 
 {
-    unsigned activityId;
+    unsigned allocatorId;
     unsigned allocations;
     memsize_t usage;
 };
@@ -1591,7 +1589,7 @@ class CActivityMemoryUsageMap : public CInterface, implements IActivityMemoryUsa
     CIArrayOf<HeapEntry> heaps;
     memsize_t maxUsed;
     memsize_t totalUsed;
-    unsigned maxActivity;
+    unsigned allocatorIdMax;
 
 public:
     IMPLEMENT_IINTERFACE;
@@ -1599,13 +1597,13 @@ public:
     {
         maxUsed = 0;
         totalUsed = 0;
-        maxActivity = 0;
+        allocatorIdMax = 0;
     }
 
-    virtual void noteMemUsage(unsigned activityId, memsize_t memUsed, unsigned numAllocs)
+    virtual void noteMemUsage(unsigned allocatorId, memsize_t memUsed, unsigned numAllocs)
     {
         totalUsed += memUsed;
-        ActivityEntry *ret = map.getValue(activityId);
+        ActivityEntry *ret = map.getValue(allocatorId);
         if (ret)
         {
             memUsed += ret->usage;
@@ -1614,13 +1612,13 @@ public:
         }
         else
         {
-            ActivityEntry e = {activityId, numAllocs, memUsed};
-            map.setValue(activityId, e);
+            ActivityEntry e = {allocatorId, numAllocs, memUsed};
+            map.setValue(allocatorId, e);
         }
         if (memUsed > maxUsed)
         {
             maxUsed = memUsed;
-            maxActivity = activityId;
+            allocatorIdMax = allocatorId;
         }
     }
 
@@ -1654,7 +1652,8 @@ public:
             while (j)
             {
                 j--;
-                logctx.CTXLOG("%"I64F"u bytes allocated by activity %u (%u allocations)", (unsigned __int64) results[j]->usage, getRealActivityId(results[j]->activityId, allocatorCache), results[j]->allocations);
+                unsigned activityId = getRealActivityId(results[j]->allocatorId, allocatorCache);
+                logctx.CTXLOG("%"I64F"u bytes allocated by activity %u (%u allocations)", (unsigned __int64) results[j]->usage, activityId, results[j]->allocations);
             }
 
             memsize_t totalHeapPages = 0;
@@ -1703,12 +1702,20 @@ public:
             j++;
         }
         qsort(results, j, sizeof(results[0]), sortUsage);
-        StringBuffer activityId;
+        StringBuffer activityText;
         while (j)
         {
             j--;
-            activityId.clear().append(getRealActivityId(results[j]->activityId, allocatorCache));
-            target.addStatistic(NULL, activityId.str(), "roxiepeakmem", NULL, SMEASURE_MEM_KB, results[j]->usage / 1024, results[j]->allocations, 0, false);
+            unsigned allocatorId = results[j]->allocatorId;
+            unsigned activityId = getRealActivityId(allocatorId, allocatorCache);
+            activityText.clear();
+            if (allocatorId & ACTIVITY_FLAG_ISREGISTERED)
+                activityText.append("ac").append(activityId);
+            else if ((allocatorId & MAX_ACTIVITY_ID) == UNKNOWN_ROWSET_ID)
+                activityText.append("rowset");
+            else
+                activityText.append("ac").append(allocatorId & MAX_ACTIVITY_ID);
+            target.addStatistic(NULL, activityText.str(), "roxiepeakmem", NULL, SMEASURE_MEM_KB, results[j]->usage / 1024, results[j]->allocations, 0, false);
         }
         delete [] results;
 
@@ -4171,7 +4178,7 @@ namespace roxiemem {
 class SimpleRowBuffer : implements IBufferedRowCallback
 {
 public:
-    SimpleRowBuffer(IRowManager * rowManager, unsigned _priority) : priority(_priority), rows(rowManager, 0, 1)
+    SimpleRowBuffer(IRowManager * rowManager, unsigned _priority) : priority(_priority), rows(rowManager, 0, 1, UNKNOWN_ROWSET_ID)
     {
     }
 

+ 2 - 0
roxie/roxiemem/roxiemem.hpp

@@ -56,6 +56,8 @@
 // MAX_ACTIVITY_ID is further subdivided:
 #define ALLOCATORID_CHECK_MASK          0x00300000
 #define ALLOCATORID_MASK                0x000fffff
+#define UNKNOWN_ROWSET_ID               0x000F8421              // Use as the allocatorId for a rowset from an unknown activity
+#define UNKNOWN_ACTIVITY                123456789
 
 #define ALLOC_ALIGNMENT                 sizeof(void *)          // Minimum alignment of data allocated from the heap manager
 #define PACKED_ALIGNMENT                4                       // Minimum alignment of packed blocks

+ 4 - 6
roxie/roxiemem/roxierowbuff.cpp

@@ -24,16 +24,14 @@
 #include <sys/mman.h>
 #endif
 
-const unsigned RowArrayActivityId = 0xc3f7;
-
 namespace roxiemem {
 
-RoxieOutputRowArray::RoxieOutputRowArray(IRowManager * _rowManager, rowidx_t initialSize, size32_t _commitDelta) :
-    rowManager(_rowManager), commitDelta(_commitDelta)
+RoxieOutputRowArray::RoxieOutputRowArray(IRowManager * _rowManager, rowidx_t initialSize, size32_t _commitDelta, unsigned _allocatorId) :
+    rowManager(_rowManager), commitDelta(_commitDelta), allocatorId(_allocatorId)
 {
     if (initialSize)
     {
-        rows = static_cast<const void * *>(rowManager->allocate(initialSize * sizeof(void*), RowArrayActivityId));
+        rows = static_cast<const void * *>(rowManager->allocate(initialSize * sizeof(void*), allocatorId));
         maxRows = RoxieRowCapacity(rows) / sizeof(void *);
     }
     else
@@ -158,7 +156,7 @@ bool DynamicRoxieOutputRowArray::ensure(rowidx_t requiredRows)
     const void * * newRows;
     try
     {
-        newRows = static_cast<const void * *>(rowManager->allocate(newSize * sizeof(void*), RowArrayActivityId));
+        newRows = static_cast<const void * *>(rowManager->allocate(newSize * sizeof(void*), allocatorId));
         if (!newRows)
             return false;
     }

+ 6 - 3
roxie/roxiemem/roxierowbuff.hpp

@@ -46,7 +46,7 @@ typedef size32_t rowidx_t;
 class roxiemem_decl RoxieOutputRowArray
 {
 public:
-    RoxieOutputRowArray(IRowManager * _rowManager, rowidx_t _initialSize, size32_t _commitDelta);
+    RoxieOutputRowArray(IRowManager * _rowManager, rowidx_t _initialSize, size32_t _commitDelta, unsigned _allocatorId);
     inline ~RoxieOutputRowArray() { kill(); }
 
     //The following can be called from the writer, without any need to lock first.
@@ -93,6 +93,8 @@ public:
     inline void lock() const { cs.enter(); }
     inline void unlock() const { cs.leave(); }
 
+    virtual void setAllocatorId(unsigned _allocatorId)        { allocatorId = _allocatorId; }
+
 protected:
     virtual bool ensure(rowidx_t requiredRows) { return false; }
 
@@ -103,6 +105,7 @@ protected:
     rowidx_t firstRow; // Only rows firstRow..numRows are considered initialized.  Only read/write within cs.
     rowidx_t numRows;  // rows that have been added can only be updated by writing thread.
     rowidx_t commitRows;  // can only be updated by writing thread within a critical section
+    unsigned allocatorId;
     const size32_t commitDelta;  // How many rows need to be written before they are added to the committed region?
     mutable CriticalSection cs;
 };
@@ -110,8 +113,8 @@ protected:
 class roxiemem_decl DynamicRoxieOutputRowArray : public RoxieOutputRowArray
 {
 public:
-    DynamicRoxieOutputRowArray(IRowManager * _rowManager, rowidx_t _initialSize, size32_t _commitDelta)
-        : RoxieOutputRowArray(_rowManager, _initialSize, _commitDelta) {}
+    DynamicRoxieOutputRowArray(IRowManager * _rowManager, rowidx_t _initialSize, size32_t _commitDelta, unsigned _allocatorId)
+        : RoxieOutputRowArray(_rowManager, _initialSize, _commitDelta, _allocatorId) {}
 
 protected:
     virtual bool ensure(rowidx_t requiredRows);