Browse Source

Merge pull request #3812 from ghalliday/issue8297c

HPCC-8297 Provide a callback mechanism when resizing a row

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 years ago
parent
commit
280cc22def
3 changed files with 135 additions and 24 deletions
  1. 112 22
      roxie/roxiemem/roxiemem.cpp
  2. 20 1
      roxie/roxiemem/roxiemem.hpp
  3. 3 1
      roxie/roxiemem/roxierow.cpp

+ 112 - 22
roxie/roxiemem/roxiemem.cpp

@@ -1658,7 +1658,7 @@ public:
     }
 
     void * doAllocate(memsize_t _size, unsigned allocatorId);
-    void *expandHeap(void * original, memsize_t copysize, memsize_t oldcapacity, memsize_t newsize, unsigned activityId, memsize_t &capacity);
+    void expandHeap(void * original, memsize_t copysize, memsize_t oldcapacity, memsize_t newsize, unsigned activityId, IRowResizeCallback & callback);
 
 protected:
     HugeHeaplet * allocateHeaplet(memsize_t _size, unsigned allocatorId);
@@ -1970,6 +1970,23 @@ const unsigned maxStepSize = numStepBlocks * roundupStepSize;
 const bool hasAnyStepBlocks = roundupDoubleLimit <= roundupStepSize;
 const unsigned firstFractionalHeap = (FixedSizeHeaplet::dataAreaSize()/(maxStepSize+ALLOC_ALIGNMENT))+1;
 
+class CVariableRowResizeCallback : public IRowResizeCallback
+{
+public:
+    inline CVariableRowResizeCallback(memsize_t & _capacity, void * & _row) : capacity(_capacity), row(_row) {}
+
+    virtual void lock() { }
+    virtual void unlock() { }
+    virtual void update(memsize_t size, void * ptr) { capacity = size; row = ptr; }
+    virtual void atomicUpdate(memsize_t size, void * ptr) { capacity = size; row = ptr; }
+
+public:
+    memsize_t & capacity;
+    void * & row;
+};
+
+
+
 class CChunkingRowManager : public CInterface, implements IRowManager
 {
     friend class CRoxieFixedRowHeap;
@@ -2304,8 +2321,9 @@ public:
             logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager::setMemoryLimit new memlimit=%"I64F"u pageLimit=%u spillLimit=%u rowMgr=%p", (unsigned __int64) bytes, pageLimit, spillPageLimit, this);
     }
 
-    virtual void *resizeRow(void * original, memsize_t copysize, memsize_t newsize, unsigned activityId, memsize_t &capacity)
+    virtual void resizeRow(memsize_t &capacity, void * & ptr, memsize_t copysize, memsize_t newsize, unsigned activityId)
     {
+        void * const original = ptr;
         assertex(newsize);
         assertex(!HeapletBase::isShared(original));
         memsize_t curCapacity = HeapletBase::capacity(original);
@@ -2313,16 +2331,46 @@ public:
         {
             //resizeRow never shrinks memory
             capacity = curCapacity;
-            return original;
+            return;
         }
         if (curCapacity > FixedSizeHeaplet::maxHeapSize())
-            return hugeHeap.expandHeap(original, copysize, curCapacity, newsize, activityId, capacity);
+        {
+            CVariableRowResizeCallback callback(capacity, ptr);
+            hugeHeap.expandHeap(original, copysize, curCapacity, newsize, activityId, callback);
+            return;
+        }
 
         void *ret = allocate(newsize, activityId);
         memcpy(ret, original, copysize);
+        memsize_t newCapacity = HeapletBase::capacity(ret);
         HeapletBase::release(original);
-        capacity = HeapletBase::capacity(ret);
-        return ret;
+        capacity = newCapacity;
+        ptr = ret;
+        return;
+    }
+
+    virtual void resizeRow(void * original, memsize_t copysize, memsize_t newsize, unsigned activityId, IRowResizeCallback & callback)
+    {
+        assertex(newsize);
+        assertex(!HeapletBase::isShared(original));
+        memsize_t curCapacity = HeapletBase::capacity(original);
+        if (newsize <= curCapacity)
+        {
+            //resizeRow never shrinks memory
+            return;
+        }
+        if (curCapacity > FixedSizeHeaplet::maxHeapSize())
+        {
+            hugeHeap.expandHeap(original, copysize, curCapacity, newsize, activityId, callback);
+            return;
+        }
+
+        void *ret = allocate(newsize, activityId);
+        memcpy(ret, original, copysize);
+        memsize_t newCapacity = HeapletBase::capacity(ret);
+        callback.atomicUpdate(newCapacity, ret);
+        HeapletBase::release(original);
+        return;
     }
 
     virtual void *finalizeRow(void * original, memsize_t initialSize, memsize_t finalSize, unsigned activityId)
@@ -2691,7 +2739,8 @@ void * CRoxieVariableRowHeap::allocate(memsize_t size, memsize_t & capacity)
 
 void * CRoxieVariableRowHeap::resizeRow(void * original, memsize_t copysize, memsize_t newsize, memsize_t &capacity)
 {
-    return rowManager->resizeRow(original, copysize, newsize, allocatorId, capacity);
+    rowManager->resizeRow(capacity, original, copysize, newsize, allocatorId);
+    return original;
 }
 
 void * CRoxieVariableRowHeap::finalizeRow(void *final, memsize_t originalSize, memsize_t finalSize)
@@ -2741,7 +2790,7 @@ void * CHugeChunkingHeap::doAllocate(memsize_t _size, unsigned allocatorId)
     return head->allocateHuge(_size);
 }
 
-void *CHugeChunkingHeap::expandHeap(void * original, memsize_t copysize, memsize_t oldcapacity, memsize_t newsize, unsigned activityId, memsize_t &capacity)
+void CHugeChunkingHeap::expandHeap(void * original, memsize_t copysize, memsize_t oldcapacity, memsize_t newsize, unsigned activityId, IRowResizeCallback & callback)
 {
     unsigned newPages = PAGES(newsize + HugeHeaplet::dataOffset(), HEAP_ALIGNMENT_SIZE);
     unsigned oldPages = PAGES(oldcapacity + HugeHeaplet::dataOffset(), HEAP_ALIGNMENT_SIZE);
@@ -2791,6 +2840,11 @@ void *CHugeChunkingHeap::expandHeap(void * original, memsize_t copysize, memsize
                         assert(finger != NULL); // Should always have found it
                     }
                 }
+
+                //Copying data within the block => must lock for the duration
+                if (!release)
+                    callback.lock();
+
                 // MORE - If we were really clever, we could manipulate the page table to avoid moving ANY data here...
                 memmove(realloced, oldbase, copysize + HugeHeaplet::dataOffset());  // NOTE - assumes no trailing data (e.g. end markers)
                 SpinBlock b(crit);
@@ -2799,10 +2853,30 @@ void *CHugeChunkingHeap::expandHeap(void * original, memsize_t copysize, memsize
                 active = head;
             }
             void * ret = (char *) realloced + HugeHeaplet::dataOffset();
-            capacity = head->setCapacity(newsize);
+            memsize_t newCapacity = head->setCapacity(newsize);
             if (release)
+            {
+                //Update the pointer before the old one becomes invalid
+                callback.atomicUpdate(newCapacity, ret);
+
                 subfree_aligned(oldbase, oldPages);
-            return ret;
+            }
+            else
+            {
+                if (realloced != oldbase)
+                {
+                    //previously locked => update the pointer and then unlock
+                    callback.update(newCapacity, ret);
+                    callback.unlock();
+                }
+                else
+                {
+                    //Extended at the end - update the max capacity
+                    callback.atomicUpdate(newCapacity, ret);
+                }
+            }
+
+            return;
         }
 
         //If the allocation fails, then try and free some memory by calling the callbacks
@@ -4505,6 +4579,18 @@ protected:
     }
 };
 
+class CSimpleRowResizeCallback : public CVariableRowResizeCallback
+{
+public:
+    CSimpleRowResizeCallback(memsize_t & _capacity, void * & _row) : CVariableRowResizeCallback(_capacity, _row), locks(0) {}
+
+    virtual void lock() { ++locks; }
+    virtual void unlock() { --locks; }
+
+public:
+    unsigned locks;
+};
+
 
 const memsize_t memorySize = 0x60000000;
 class RoxieMemStressTests : public CppUnit::TestFixture
@@ -4642,10 +4728,11 @@ protected:
             loop
             {
                 memsize_t nextSize = (memsize_t)(requestSize*1.25);
-                memsize_t capacity;
-                void *next = rowManager->resizeRow(prev, requestSize, nextSize, 1, capacity);
+                memsize_t curSize = RoxieRowCapacity(prev);
+                CSimpleRowResizeCallback callback(curSize, prev);
+                rowManager->resizeRow(prev, requestSize, nextSize, 1, callback);
+                ASSERT(curSize >= nextSize);
                 requestSize = nextSize;
-                prev = next;
             }
         }
         catch (IException *E)
@@ -4673,11 +4760,14 @@ protected:
             loop
             {
                 memsize_t nextSize = (memsize_t)(requestSize*1.25);
-                memsize_t capacity;
-                void *next1 = rowManager->resizeRow(prev1, requestSize, nextSize, 1, capacity);
-                prev1 = next1;
-                void *next2 = rowManager->resizeRow(prev2, requestSize, nextSize, 1, capacity);
-                prev2 = next2;
+                memsize_t newSize1 = RoxieRowCapacity(prev1);
+                memsize_t newSize2 = RoxieRowCapacity(prev2);
+                CSimpleRowResizeCallback callback1(newSize1, prev1);
+                CSimpleRowResizeCallback callback2(newSize2, prev2);
+                rowManager->resizeRow(prev1, requestSize, nextSize, 1, callback1);
+                ASSERT(newSize1 >= nextSize);
+                rowManager->resizeRow(prev2, requestSize, nextSize, 1, callback2);
+                ASSERT(newSize2 >= nextSize);
                 requestSize = nextSize;
             }
         }
@@ -4741,17 +4831,17 @@ protected:
         ASSERT(rowManager->numPagesAfterCleanup(true)==0);
         memsize_t capacity;
         void *huge1 = rowManager->allocate(initialAllocSize, 1);
-        void *huge2 = rowManager->resizeRow(huge1, initialAllocSize, hugeAllocSize, 1, capacity);
+        rowManager->resizeRow(capacity, huge1, initialAllocSize, hugeAllocSize, 1);
         ASSERT(capacity > hugeAllocSize);
         ASSERT(rowManager->numPagesAfterCleanup(true)==4097);
-        ReleaseRoxieRow(huge2);
+        ReleaseRoxieRow(huge1);
         ASSERT(rowManager->numPagesAfterCleanup(true)==0);
 
         huge1 = rowManager->allocate(hugeAllocSize/2, 1);
-        huge2 = rowManager->resizeRow(huge1, hugeAllocSize/2, hugeAllocSize, 1, capacity);
+        rowManager->resizeRow(capacity, huge1, hugeAllocSize/2, hugeAllocSize, 1);
         ASSERT(capacity > hugeAllocSize);
         ASSERT(rowManager->numPagesAfterCleanup(true)==4097);
-        ReleaseRoxieRow(huge2);
+        ReleaseRoxieRow(huge1);
         ASSERT(rowManager->numPagesAfterCleanup(true)==0);
 
         ASSERT(rowManager->getExpectedCapacity(hugeAllocSize, RHFnone) > hugeAllocSize);

+ 20 - 1
roxie/roxiemem/roxiemem.hpp

@@ -411,11 +411,30 @@ enum RoxieHeapFlags
     RHFoldfixed         = 0x0008,  // Don't create a special fixed size heap for this
 };
 
+//This interface is here to allow atomic updates to allocations when they are being resized.  There are a few complications:
+//- If a new block is allocated, we just need to update the capacity/pointer atomically
+//  but they need to be updated at the same time, otherwise a spill occuring after the pointer update, but before
+//  the resizeRow returns could lead to an out of date capacity.
+//- If a block is resized by expanding earlier then the pointer needs to be locked while the data is being copied.
+//- If any intermediate function adds extra bytes to the amount to be allocated it will need a local implementation
+//  to apply a delta to the values being passed through.
+//
+//NOTE: update will not be called if the allocation was already large enough => the size must be set to the capacity.
+
+interface IRowResizeCallback
+{
+    virtual void lock() = 0; // prevent access to the row pointer
+    virtual void unlock() = 0; // allow access to the row pointer
+    virtual void update(memsize_t capacity, void * ptr) = 0; // update the capacity, row pointer while a lock is held.
+    virtual void atomicUpdate(memsize_t capacity, void * ptr) = 0; // update the row pointer while no lock is held
+};
+
 // Variable size aggregated link-counted Roxie (etc) row manager
 interface IRowManager : extends IInterface
 {
     virtual void *allocate(memsize_t size, unsigned activityId) = 0;
-    virtual void *resizeRow(void * original, memsize_t copysize, memsize_t newsize, unsigned activityId, memsize_t &capacity) = 0;
+    virtual void resizeRow(void * original, memsize_t copysize, memsize_t newsize, unsigned activityId, IRowResizeCallback & callback) = 0;
+    virtual void resizeRow(memsize_t & capacity, void * & original, memsize_t copysize, memsize_t newsize, unsigned activityId) = 0;
     virtual void *finalizeRow(void *final, memsize_t originalSize, memsize_t finalSize, unsigned activityId) = 0;
     virtual void setMemoryLimit(memsize_t size, memsize_t spillSize = 0) = 0;
     virtual unsigned allocated() = 0;

+ 3 - 1
roxie/roxiemem/roxierow.cpp

@@ -216,7 +216,9 @@ protected:
             return rowset;
 
         memsize_t capacity;
-        return (byte * *)rowManager.resizeRow(rowset, oldRowCount * sizeof(void *), newRowCount * sizeof(void *), allocatorId | ACTIVITY_FLAG_ISREGISTERED, capacity);
+        void * ptr = (void *)rowset;
+        rowManager.resizeRow(capacity, ptr, oldRowCount * sizeof(void *), newRowCount * sizeof(void *), allocatorId | ACTIVITY_FLAG_ISREGISTERED);
+        return (byte * *)ptr;
     }
 
 protected: