Browse Source

Merge branch 'candidate-8.2.x' into candidate-8.4.x

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 3 years ago
parent
commit
6739fbbe0e
2 changed files with 20 additions and 29 deletions
  1. 6 23
      roxie/roxiemem/roxiemem.cpp
  2. 14 6
      roxie/roxiemem/roxiemem.hpp

+ 6 - 23
roxie/roxiemem/roxiemem.cpp

@@ -6335,9 +6335,8 @@ class CDataBufferManager : implements IDataBufferManager, public CInterface
                 // NOTE - do NOT put a CriticalBlock c(finger->crit) here since:
                 // 1. It's not needed - no-one modifies finger->nextBottom chain but me and I hold CDataBufferManager::crit
                 // 2. finger->crit is about to get released back to the pool and it's important that it is not locked at the time!
-                if (finger->okToFree.load(std::memory_order_acquire))
+                if (finger->count.load(std::memory_order_acquire) == 0)
                 {
-                    assert(!finger->isAlive());
                     DataBufferBottom *goer = finger;
                     finger = finger->nextBottom;
                     unlink(goer);
@@ -6431,9 +6430,7 @@ public:
                     CriticalBlock c(finger->crit);
                     if (finger->freeChain)
                     {
-                        finger->Link(); // Link once for the reference we save in curBlock
-                                        // Release (to dec ref count) when no more free blocks in the page
-                        if (finger->isAlive())
+                        if (finger->isAliveAndLink())
                         {
                             curBlock = finger;
                             nextBase = nullptr; // should never be accessed
@@ -6478,7 +6475,7 @@ public:
     }
 };
 
-DataBufferBottom::DataBufferBottom(CDataBufferManager *_owner, DataBufferBottom *ownerFreeChain) : okToFree(false)
+DataBufferBottom::DataBufferBottom(CDataBufferManager *_owner, DataBufferBottom *ownerFreeChain)
 {
     owner = _owner;
     if (ownerFreeChain)
@@ -6505,25 +6502,11 @@ void DataBufferBottom::addToFreeChain(DataBuffer * buffer)
 
 void DataBufferBottom::Release()
 {
+    //Need to save the value of owner because the object can be freed as soon as counter goes to 0
+    CDataBufferManager * savedOwner = owner;
     if (count.fetch_sub(1, std::memory_order_release) == 1)
     {
-        //No acquire fence - released() is assumed not to access the data in this buffer
-        //If it does it should contain an acquire fence
-        released();
-    }
-}
-
-void DataBufferBottom::released()
-{
-    // Not safe to free here as owner may be in the middle of allocating from it
-    // instead, give owner a hint that it's worth thinking about freeing this page next time it is safe
-    unsigned expected = 0;
-    if (count.compare_exchange_strong(expected, DEAD_PSEUDO_COUNT, std::memory_order_release))
-    {
-        //Need to save the value of owner because the object can be freed as soon as okToFree is set
-        CDataBufferManager * savedOwner = owner;
-        //No acquire fence required since the following code doesn't read anything from the object
-        okToFree.store(true, std::memory_order_release);
+        //indicate to the owner that there is now a page that is ready to be freed.
         savedOwner->freePending.store(true, std::memory_order_release);
     }
 }

+ 14 - 6
roxie/roxiemem/roxiemem.hpp

@@ -143,9 +143,20 @@ protected:
     virtual void _internalFreeNoDestructor(const void *ptr) = 0;
 
 public:
-    inline bool isAlive() const
+    inline bool isAliveAndLink() const
     {
-        return count.load(std::memory_order_relaxed) < DEAD_PSEUDO_COUNT;        //only safe if Link() is called first
+        unsigned expected = count.load(std::memory_order_acquire);
+        for (;;)
+        {
+            //If count is 0, or count >= DEAD_PSEUDO_COUNT then return false - combine both into a single test
+            if ((expected-1) >= (DEAD_PSEUDO_COUNT-1))
+                return false;
+
+            //Avoid incrementing the link count if xxcount=0, otherwise it introduces a race condition
+            //so use compare and exchange to increment it only if it hasn't changed
+            if (count.compare_exchange_weak(expected, expected+1, std::memory_order_acq_rel))
+                return true;
+        }
     }
 
     static void release(const void *ptr);
@@ -236,15 +247,12 @@ class roxiemem_decl DataBufferBottom : public HeapletBase
 {
 private:
     friend class CDataBufferManager;
-    CDataBufferManager * volatile owner;
-    std::atomic_uint okToFree;      // use uint since it is more efficient on some architectures
+    std::atomic<CDataBufferManager *> owner;
     DataBufferBottom *nextBottom;   // Used when chaining them together in CDataBufferManager 
     DataBufferBottom *prevBottom;   // Used when chaining them together in CDataBufferManager 
     DataBuffer *freeChain;
     CriticalSection crit;
 
-    void released();
-
     virtual void noteReleased(const void *ptr) override;
     virtual void noteReleased(unsigned count, const byte * * rowset) override;
     virtual bool _isShared(const void *ptr) const;