Bläddra i källkod

HPCC-26796 Switch to using a lock free list for DataBuffers

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 3 år sedan
förälder
incheckning
c6fdc1fa5d
2 ändrade filer med 95 tillägg och 38 borttagningar
  1. 82 32
      roxie/roxiemem/roxiemem.cpp
  2. 13 6
      roxie/roxiemem/roxiemem.hpp

+ 82 - 32
roxie/roxiemem/roxiemem.cpp

@@ -4336,6 +4336,15 @@ void initAllocSizeMappings(const unsigned * sizes)
 
 //---------------------------------------------------------------------------------------------------------------------
 
+// Id is the pointer value with a sequence number stored in the lower bits (typically 1K).  These functions create ids and extract the elements
+inline DataBuffer * getPtrFromDataId(memsize_t id) { return (DataBuffer *)(id & DATA_ALIGNMENT_MASK); }
+inline unsigned getSeqFromDataId(memsize_t id) { return (unsigned)(id & (DATA_ALIGNMENT_SIZE-1)); }
+inline bool isNullDataId(memsize_t id) { return (id & DATA_ALIGNMENT_MASK) == 0; }
+inline memsize_t createDataId(DataBuffer * ptr, unsigned seq) { return memsize_t(ptr) | seq; }
+
+//Tracing version to ensure that sequence numbers are processed correctly
+//inline memsize_t createDataId(DataBuffer * ptr, unsigned seq) { DBGLOG("Create %p:%u", ptr, seq); return memsize_t(ptr) | seq; }
+
 class CChunkingRowManager : public CRowManager
 {
     friend class CRoxieFixedRowHeap;
@@ -4452,11 +4461,11 @@ public:
         DataBuffer *dfinger = activeBuffs;
         while (dfinger)
         {
-            DataBuffer *next = dfinger->next;
+            DataBuffer *next = dfinger->nextActive;
             if (memTraceLevel >= 2 && dfinger->queryCount()!=1)
                 logctx.CTXLOG("RoxieMemMgr: Memory leak: %d records remain linked in active dataBuffer list - addr=%p rowMgr=%p", 
                         dfinger->queryCount()-1, dfinger, this);
-            dfinger->next = NULL;
+            dfinger->nextActive = 0;
             dfinger->mgr = NULL; // Avoid calling back to noteDataBufferReleased, which would be unhelpful
             dfinger->count.store(0, std::memory_order_relaxed);
             dfinger->released();
@@ -4822,18 +4831,18 @@ public:
             while (finger && possibleGoers.load(std::memory_order_relaxed))
             {
                 // MORE - if we get a load of data in and none out this can start to bog down...
-                DataBuffer *next = finger->next;
+                DataBuffer *next = finger->nextActive;
                 if (finger->queryCount()==1)
                 {
                     if (memTraceLevel >= 4)
                         logctx.CTXLOG("RoxieMemMgr: attachDataBuff() detaching DataBuffer linked in active list - addr=%p rowMgr=%p",
                                 finger, this);
-                    finger->next = NULL;
+                    finger->nextActive = 0;
                     finger->Release();
                     dataBuffs--;
                     possibleGoers.fetch_sub(1, std::memory_order_relaxed); // It doesn't matter when other threads see this update
                     if (last)
-                        last->next = next;
+                        last->nextActive = next;
                     else
                         activeBuffs = next;    // MORE - this is yukky code - surely there's a cleaner way!
                 }
@@ -4841,8 +4850,8 @@ public:
                     last = finger;
                 finger = next;
             }
-            assert(dataBuff->next==NULL);
-            dataBuff->next = activeBuffs;
+            assert(dataBuff->nextActive == nullptr);
+            dataBuff->nextActive = activeBuffs;
             activeBuffs = dataBuff;
             dataBuffs++;
 
@@ -6249,7 +6258,7 @@ void DataBuffer::Release()
 
 void DataBuffer::released()
 {
-    assert(next == NULL);
+    assert(nextDataId == 0);
     DataBufferBottom *bottom = (DataBufferBottom *)findBase(this);
     assert((char *)bottom != (char *)this);
     if (memTraceLevel >= 4)
@@ -6388,19 +6397,36 @@ public:
             if (curBlock)
             {
                 DataBufferBottom *bottom = curBlock;
-                CriticalBlock c(bottom->crit);
-                if (bottom->freeChain)
+                // No need to hold the critical section on bottom->crit since there is a lock on the CDataBufferManager so nothing else can
+                // allocate anything from any of the blocks, and freeHeadId is lock free - so no need to lock for that
+
+                memsize_t curHeadId = bottom->freeHeadId;
+                if (!isNullDataId(curHeadId))
                 {
-                    dataBuffersActive.fetch_add(1);
-                    curBlock->Link();
-                    DataBuffer *x = bottom->freeChain;
-                    bottom->freeChain = x->next;
-                    x->next = NULL;
-                    if (memTraceLevel >= 4)
-                        DBGLOG("RoxieMemMgr: CDataBufferManager::allocate() reallocated DataBuffer - addr=%p", x);
-                    return ::new(x) DataBuffer();
+                    //If the pointer is non-null then it will remain non-null (other threads can only add items to the list, not remove them)
+                    for (;;)
+                    {
+                        DataBuffer * curFree = getPtrFromDataId(curHeadId);
+
+                        //Nothing else can modify curFree, so this pointer will remain valid
+                        DataBuffer * nextFree = getPtrFromDataId(curFree->nextDataId);
+
+                        //Always update the sequence number when updating the freeHeadId to avoid ABA problem.  Increment before
+                        //extracting the id to ensure it wraps correctly.
+                        memsize_t nextHeadId = createDataId(nextFree, getSeqFromDataId(curHeadId+1));
+                        if (likely(bottom->freeHeadId.compare_exchange_weak(curHeadId, nextHeadId, std::memory_order_acq_rel)))
+                        {
+                            dataBuffersActive.fetch_add(1);
+                            bottom->Link();
+                            curFree->nextDataId = 0;
+                            if (memTraceLevel >= 4)
+                                DBGLOG("RoxieMemMgr: CDataBufferManager::allocate() reallocated DataBuffer - addr=%p", curFree);
+                            return ::new(curFree) DataBuffer();
+                        }
+                    }
                 }
-                else if (nextOffset < HEAP_ALIGNMENT_SIZE) // Is there any space in the current block (it must be a whole block)
+
+                if (nextOffset < HEAP_ALIGNMENT_SIZE) // Is there any space in the current block (it must be a whole block)
                 {
                     dataBuffersActive.fetch_add(1);
                     curBlock->Link();
@@ -6433,22 +6459,38 @@ public:
             {
                 for (;;)
                 {
-                    CriticalBlock c(finger->crit);
-                    if (finger->freeChain)
+                    memsize_t curHeadId = finger->freeHeadId;
+                    if (!isNullDataId(curHeadId))
                     {
+                        //If the pointer is non-null then it will remain non-null (items can only be added, not removed
+                        //from the free list)
                         if (finger->isAliveAndLink())
                         {
                             curBlock = finger;
                             nextBase = nullptr; // should never be accessed
                             nextOffset = HEAP_ALIGNMENT_SIZE; // only use the free chain to allocate
                             dataBuffersActive.fetch_add(1);
-                            finger->Link(); // and once for the value we are about to return
-                            DataBuffer *x = finger->freeChain;
-                            finger->freeChain = x->next;
-                            x->next = NULL;
-                            if (memTraceLevel >= 4)
-                                DBGLOG("RoxieMemMgr: CDataBufferManager::allocate() reallocated DataBuffer - addr=%p", x);
-                            return ::new(x) DataBuffer();
+
+                            for (;;)
+                            {
+                                DataBuffer * curFree = getPtrFromDataId(curHeadId);
+                                assertex(curFree);
+
+                                //Nothing else can modify curFree, so this pointer will remain valid
+                                DataBuffer * nextFree = getPtrFromDataId(curFree->nextDataId);
+
+                                //Always update the sequence number when updating the freeHeadId to avoid ABA problem
+                                memsize_t nextHeadId = createDataId(nextFree, getSeqFromDataId(curHeadId+1));
+                                if (likely(finger->freeHeadId.compare_exchange_weak(curHeadId, nextHeadId, std::memory_order_acq_rel)))
+                                {
+                                    dataBuffersActive.fetch_add(1);
+                                    finger->Link();
+                                    curFree->nextDataId = 0;
+                                    if (memTraceLevel >= 4)
+                                        DBGLOG("RoxieMemMgr: CDataBufferManager::allocate() reallocated DataBuffer - addr=%p", curFree);
+                                    return ::new(curFree) DataBuffer();
+                                }
+                            }
                         }
                     }
                     finger = finger->nextBottom;
@@ -6496,14 +6538,22 @@ DataBufferBottom::DataBufferBottom(CDataBufferManager *_owner, DataBufferBottom
         prevBottom = this;
         nextBottom = this;
     }
-    freeChain = NULL;
+    freeHeadId = 0;
 }
 
 void DataBufferBottom::addToFreeChain(DataBuffer * buffer)
 {
-    CriticalBlock b(crit);
-    buffer->next = freeChain;
-    freeChain = buffer;
+    memsize_t expected = freeHeadId.load();
+    for (;;)
+    {
+        //MORE: Increment the sequence whenever an item is added to avoid the ABA problem.
+        //Incremented inside getSeqFromDataId() to ensure the seq does not overflow
+        unsigned seq = getSeqFromDataId(expected+1);
+        memsize_t newHead = createDataId(buffer, seq);
+        buffer->nextDataId = expected;
+        if (likely(freeHeadId.compare_exchange_weak(expected, newHead, std::memory_order_acq_rel)))
+            break;
+    }
 }
 
 void DataBufferBottom::Release()

+ 13 - 6
roxie/roxiemem/roxiemem.hpp

@@ -214,7 +214,7 @@ private:
     void released();
 
 protected:
-    DataBuffer() : count(1)
+    DataBuffer() : count(1), nextDataId(0)
     {
     }
 public:
@@ -230,13 +230,20 @@ public:
     }
     void noteReleased(const void *ptr);
     void noteLinked(const void *ptr);
+    bool attachToRowMgr(IRowManager *rowMgr);
+
 public:
     std::atomic_uint count;
     unsigned filler = 0; // keeps valgrind happy
     IRowManager *mgr = nullptr;
-    DataBuffer *next = nullptr;   // Used when chaining them together in rowMgr
+    union
+    {
+        std::atomic<memsize_t> nextDataId;  // Used when chaining freeblocks within a DataBufferBottom
+        DataBuffer * nextActive;        // Used when attached to an RowManager
+    };
     DataBuffer *msgNext = nullptr;    // Next databuffer in same slave message
-    bool attachToRowMgr(IRowManager *rowMgr);
+
+    /* data */
     char data[1]; // actually DATA_PAYLOAD
 };
 
@@ -248,9 +255,9 @@ class roxiemem_decl DataBufferBottom : public HeapletBase
 private:
     friend class CDataBufferManager;
     std::atomic<CDataBufferManager *> owner;
-    DataBufferBottom *nextBottom;   // Used when chaining them together in CDataBufferManager 
-    DataBufferBottom *prevBottom;   // Used when chaining them together in CDataBufferManager 
-    DataBuffer *freeChain;
+    DataBufferBottom *nextBottom = nullptr; // Used when chaining them together in CDataBufferManager
+    DataBufferBottom *prevBottom = nullptr; // Used when chaining them together in CDataBufferManager
+    std::atomic<memsize_t> freeHeadId{0};               // a chain of freed DataBuffers, implemented as a lock free list bottom bits are a sequence number
     CriticalSection crit;
 
     virtual void noteReleased(const void *ptr) override;