Browse Source

HPCC-7996 Introduce memory manager spill threshold option

Add a thor option to allow the memory manager to be configured, so that it
can spill when it hits a certain threshold(%), default 80%.
The reasoning being, that if the memory manager waits until it is out of
memory before requesting memory to be freed, those requests can take some
time (e.g. sort+flush to disk), and cause all other small memory consumers
to block in the meantime.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 12 năm trước cách đây
mục cha
commit
672f3e3b0f

+ 22 - 0
system/jlib/jdebug.hpp

@@ -88,6 +88,28 @@ struct ITimeReporter : public IInterface
   virtual void serialize(MemoryBuffer &mb) = 0;
 };
 
+class CCycleTimer
+{
+    cycle_t start_time;
+public:
+    CCycleTimer()
+    {
+        reset();
+    }
+    inline void reset()
+    {
+        start_time = get_cycles_now();
+    }
+    inline cycle_t elapsedCycles()
+    {
+        return get_cycles_now() - start_time;
+    }
+    inline unsigned elapsedMs()
+    {
+        return cycle_to_nanosec(elapsedCycles())/1000000;
+    }
+};
+
 class jlib_decl TimeSection
 {
 public:

+ 14 - 2
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -534,6 +534,7 @@ public:
 
     virtual void recvloop()
     {
+        CCycleTimer timer;
         MemoryBuffer tempMb;
         try {
             ActPrintLog(activity, "Read loop start");
@@ -564,8 +565,19 @@ public:
                     }
                     {
                         CriticalBlock block(putsect);
-                        while (!rowSource.eos()) 
-                            pipewr->putRow(ptrallocator.deserializeRow(allocator,rowSource));      
+                        while (!rowSource.eos())
+                        {
+                            timer.reset();
+                            const void *row = ptrallocator.deserializeRow(allocator,rowSource);
+                            unsigned took=timer.elapsedMs();
+                            if (took>=1000)
+                                DBGLOG("RECVLOOP deserializeRow blocked for : %d second(s)", took/1000);
+                            timer.reset();
+                            pipewr->putRow(row);
+                            took=timer.elapsedMs();
+                            if (took>=1000)
+                                DBGLOG("RECVLOOP pipewr->putRow blocked for : %d second(s)", took/1000);
+                        }
                     }
                 }
                 else {

+ 3 - 2
thorlcr/graph/thgraph.cpp

@@ -2354,13 +2354,14 @@ void CJobBase::init()
 #endif
     bool crcChecking = 0 != getWorkUnitValueInt("THOR_ROWCRC", globals->getPropBool("@THOR_ROWCRC", defaultCrcChecking));
     bool usePackedAllocator = 0 != getWorkUnitValueInt("THOR_PACKEDALLOCATOR", globals->getPropBool("@THOR_PACKEDALLOCATOR", false));
-    thorAllocator.setown(createThorAllocator(((memsize_t)gmemSize)*0x100000, crcChecking, usePackedAllocator));
+    unsigned memorySpillAt = getWorkUnitValueInt("memorySpillAt", globals->getPropInt("@memorySpillAt", 80));
+    thorAllocator.setown(createThorAllocator(((memsize_t)gmemSize)*0x100000, memorySpillAt, crcChecking, usePackedAllocator));
 
     unsigned defaultMemMB = gmemSize*3/4;
     unsigned largeMemSize = getOptInt("@largeMemSize", defaultMemMB);
     if (gmemSize && largeMemSize >= gmemSize)
         throw MakeStringException(0, "largeMemSize(%d) can not exceed globalMemorySize(%d)", largeMemSize, gmemSize);
-    PROGLOG("Global memory size = %d MB, large mem size = %d MB", gmemSize, largeMemSize);
+    PROGLOG("Global memory size = %d MB, memory spill at = %d%%, large mem size = %d MB", gmemSize, memorySpillAt, largeMemSize);
     StringBuffer tracing("maxActivityCores = ");
     if (maxActivityCores)
         tracing.append(maxActivityCores);

+ 11 - 5
thorlcr/thorutil/thmem.cpp

@@ -1138,7 +1138,12 @@ protected:
 
         totalRows += numRows;
         if (iCompare)
+        {
+            ActPrintLog(&activity, "Sorting %"RIPF"d rows", spillableRows.numCommitted());
+            CCycleTimer timer;
             spillableRows.sort(*iCompare, maxCores); // sorts committed rows
+            ActPrintLog(&activity, "Sort took: %f", ((float)timer.elapsedMs())/1000);
+        }
 
         StringBuffer tempname;
         GetTempName(tempname,"srtspill",true);
@@ -1712,9 +1717,10 @@ protected:
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CThorAllocator(memsize_t memSize, bool _usePacked) : usePacked(_usePacked)
+    CThorAllocator(memsize_t memSize, unsigned memorySpillAt, bool _usePacked) : usePacked(_usePacked)
     {
         rowManager.setown(roxiemem::createRowManager(memSize, NULL, queryDummyContextLogger(), this, false));
+        rowManager->setMemoryLimit(memSize, memSize/100*memorySpillAt);
         rtlSetReleaseRowHook(this);
     }
     ~CThorAllocator()
@@ -1827,7 +1833,7 @@ public:
 class CThorCrcCheckingAllocator : public CThorAllocator
 {
 public:
-    CThorCrcCheckingAllocator(memsize_t memSize, bool usePacked) : CThorAllocator(memSize, usePacked)
+    CThorCrcCheckingAllocator(memsize_t memSize, unsigned memorySpillAt, bool usePacked) : CThorAllocator(memSize, memorySpillAt, usePacked)
     {
     }
 // IThorAllocator
@@ -1843,14 +1849,14 @@ public:
 };
 
 
-IThorAllocator *createThorAllocator(memsize_t memSize, bool crcChecking, bool usePacked)
+IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, bool crcChecking, bool usePacked)
 {
     PROGLOG("CRC allocator %s", crcChecking?"ON":"OFF");
     PROGLOG("Packed allocator %s", usePacked?"ON":"OFF");
     if (crcChecking)
-        return new CThorCrcCheckingAllocator(memSize, usePacked);
+        return new CThorCrcCheckingAllocator(memSize, memorySpillAt, usePacked);
     else
-        return new CThorAllocator(memSize, usePacked);
+        return new CThorAllocator(memSize, memorySpillAt, usePacked);
 }
 
 

+ 1 - 1
thorlcr/thorutil/thmem.hpp

@@ -156,7 +156,7 @@ interface IThorAllocator : extends IInterface
     virtual roxiemem::IRowManager *queryRowManager() const = 0;
 };
 
-IThorAllocator *createThorAllocator(memsize_t memSize, bool crcChecking, bool usePacked);
+IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, bool crcChecking, bool usePacked);
 
 extern graph_decl IOutputMetaData *createOutputMetaDataWithExtra(IOutputMetaData *meta, size32_t sz);
 extern graph_decl IOutputMetaData *createOutputMetaDataWithChildRow(IEngineRowAllocator *childAllocator, size32_t extraSz);