소스 검색

Merge pull request #10344 from ghalliday/issue17730

HPCC-17730 Switch memory manager from a spinlock to a critical section.

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 년 전
부모
커밋
b51f2df68b
1개의 변경된 파일24개의 추가작업 그리고 20개의 파일을 삭제
  1. 24 20
      roxie/roxiemem/roxiemem.cpp

+ 24 - 20
roxie/roxiemem/roxiemem.cpp

@@ -2717,7 +2717,7 @@ public:
 
     void reportLeaks(unsigned &leaked) const
     {
-        NonReentrantSpinBlock c1(heapletLock);
+        CriticalBlock c1(heapletLock);
         Heaplet * start = heaplets;
         if (start)
         {
@@ -2735,7 +2735,7 @@ public:
 
     void checkHeap()
     {
-        NonReentrantSpinBlock c1(heapletLock);
+        CriticalBlock c1(heapletLock);
         Heaplet * start = heaplets;
         if (start)
         {
@@ -2751,7 +2751,7 @@ public:
     unsigned allocated()
     {
         unsigned total = 0;
-        NonReentrantSpinBlock c1(heapletLock);
+        CriticalBlock c1(heapletLock);
         Heaplet * start = heaplets;
         if (start)
         {
@@ -2808,7 +2808,7 @@ public:
             return 0;
 
         unsigned total = 0;
-        NonReentrantSpinBlock c1(heapletLock);
+        CriticalBlock c1(heapletLock);
         //Check again in case other thread has also called this function and no other pages have been released.
         if (!possibleEmptyPages.load(std::memory_order_acquire))
             return 0;
@@ -2904,7 +2904,7 @@ public:
         unsigned numPages = 0;
         memsize_t numAllocs = 0;
         {
-            NonReentrantSpinBlock c1(heapletLock);
+            CriticalBlock c1(heapletLock);
             Heaplet * start = heaplets;
             if (start)
             {
@@ -2999,7 +2999,7 @@ protected:
     CChunkingRowManager * rowManager;
     const IRowAllocatorCache *allocatorCache;
     const IContextLogger & logctx;
-    mutable NonReentrantSpinLock heapletLock;
+    mutable CriticalSection heapletLock;
     std::atomic_uint headMaybeSpace{BLOCKLIST_NULL};  // The head of the list of heaplets which potentially have some space.  When adding must use mo_release, when removing must use mo_acquire
     std::atomic_uint possibleEmptyPages{false};  // Are there any pages with 0 records.  Primarily here to avoid walking long page chains.
     HeapletStats stats;
@@ -5131,7 +5131,7 @@ void * CHugeHeap::doAllocate(memsize_t _size, unsigned allocatorId, unsigned max
             (unsigned __int64) _size, (unsigned __int64) (numPages*HEAP_ALIGNMENT_SIZE), head, numPages, rowManager->getPageLimit(), rowManager->peakPages, this);
     }
 
-    NonReentrantSpinBlock b(heapletLock);
+    CriticalBlock b(heapletLock);
     insertHeaplet(head);
     return head->allocateHuge(_size);
 }
@@ -5185,7 +5185,7 @@ void CHugeHeap::expandHeap(void * original, memsize_t copysize, memsize_t oldcap
             {
                 // Remove the old block from the chain
                 {
-                    NonReentrantSpinBlock b(heapletLock);
+                    CriticalBlock b(heapletLock);
                     removeHeaplet(oldhead);
                 }
 
@@ -5203,7 +5203,7 @@ void CHugeHeap::expandHeap(void * original, memsize_t copysize, memsize_t oldcap
                 callback.unlock();
 
                 {
-                    NonReentrantSpinBlock b(heapletLock);
+                    CriticalBlock b(heapletLock);
                     insertHeaplet(head);
                 }
 
@@ -5237,7 +5237,7 @@ void CChunkedHeap::gatherStats(CRuntimeStatisticCollection & result)
     HeapletStats merged;
     {
         //Copy the stats when a lock is held to ensure num and distance are consistent.
-        NonReentrantSpinBlock b(heapletLock);
+        CriticalBlock b(heapletLock);
         merged = stats;
     }
 
@@ -5250,15 +5250,16 @@ void CChunkedHeap::gatherStats(CRuntimeStatisticCollection & result)
 
 void * CChunkedHeap::doAllocateRow(unsigned allocatorId, unsigned maxSpillCost)
 {
-    //Only hold the spinblock while walking the list - so subsequent calls to checkLimit don't deadlock.
+    //Only hold the lock while walking the list - so subsequent calls to checkLimit don't deadlock.
     //NB: The allocation is split into two - finger->allocateChunk, and finger->initializeChunk().
-    //The latter is done outside the spinblock, to reduce the window for contention.
+    //The latter is done outside the lock, to reduce the window for contention.
     ChunkedHeaplet * donorHeaplet;
     char * chunk;
     for (;;)
     {
         {
-            NonReentrantSpinBlock b(heapletLock);
+            CriticalBlock block(heapletLock);
+
             if (likely(activeHeaplet))
             {
                 //This cast is safe because we are within a member of CChunkedHeap
@@ -5294,6 +5295,8 @@ void * CChunkedHeap::doAllocateRow(unsigned allocatorId, unsigned maxSpillCost)
         }
 
         //NB: At this point activeHeaplet = NULL;
+        //This code needs to be outside the critical section, otherwise it may deadlock with another thread trying
+        //to dispose of any free pages.
         try
         {
             rowManager->checkLimit(1, maxSpillCost);
@@ -5314,17 +5317,17 @@ void * CChunkedHeap::doAllocateRow(unsigned allocatorId, unsigned maxSpillCost)
         rowManager->restoreLimit(1);
 
         //Could check if activeHeaplet was now set (and therefore allocated by another thread), and if so restart
-        //the function, but grabbing the spin lock would be inefficient.
+        //the function, but unlikley to be worthwhile
         if (!rowManager->releaseCallbackMemory(maxSpillCost, true))
             throwHeapExhausted(allocatorId, 1);
     }
 
-    if (memTraceLevel >= 5 || (memTraceLevel >= 3 && chunkSize > 32000))
+    if (memTraceLevel >= 3 && (memTraceLevel >= 5 || chunkSize > 32000))
         logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager::allocate(size %u) allocated new FixedSizeHeaplet size %u - addr=%p pageLimit=%u peakPages=%u rowMgr=%p",
                 chunkSize, chunkSize, donorHeaplet, rowManager->getPageLimit(), rowManager->peakPages, this);
 
     {
-        NonReentrantSpinBlock b(heapletLock);
+        CriticalBlock block(heapletLock);
         insertHeaplet(donorHeaplet);
 
         //While this thread was allocating a block, another thread may have also done the same.
@@ -5346,7 +5349,6 @@ void * CChunkedHeap::doAllocateRow(unsigned allocatorId, unsigned maxSpillCost)
                 addToSpaceList(active);
                 activeHeaplet = donorHeaplet;
             }
-
         }
         else
             activeHeaplet = donorHeaplet;
@@ -5354,9 +5356,11 @@ void * CChunkedHeap::doAllocateRow(unsigned allocatorId, unsigned maxSpillCost)
         //If no protecting lock there would be a race e.g., if another thread allocates all the rows!
         if (!chunk)
             chunk = donorHeaplet->allocateChunk();
+        dbgassertex(chunk);
     }
     
 gotChunk:
+
     //since a chunk has been allocated from donorHeaplet it cannot be released at this point.
     return donorHeaplet->initChunk(chunk, allocatorId);
 }
@@ -5372,7 +5376,7 @@ unsigned CChunkedHeap::doAllocateRowBlock(unsigned allocatorId, unsigned maxSpil
     for (;;)
     {
         {
-            NonReentrantSpinBlock b(heapletLock);
+            CriticalBlock b(heapletLock);
             if (activeHeaplet)
             {
                 //This cast is safe because we are within a member of CChunkedHeap
@@ -5438,7 +5442,7 @@ unsigned CChunkedHeap::doAllocateRowBlock(unsigned allocatorId, unsigned maxSpil
                 chunkSize, chunkSize, donorHeaplet, rowManager->getPageLimit(), rowManager->peakPages, this);
 
     {
-        NonReentrantSpinBlock b(heapletLock);
+        CriticalBlock b(heapletLock);
         insertHeaplet(donorHeaplet);
 
         //While this thread was allocating a block, another thread also did the same.  Ensure that other block is
@@ -5514,7 +5518,7 @@ void CChunkedHeap::checkScans(unsigned allocatorId)
     HeapletStats merged;
     {
         //Copy the stats when a lock is held to ensure num and distance are consistent.
-        NonReentrantSpinBlock b(heapletLock);
+        CriticalBlock b(heapletLock);
         merged = stats;
     }