Browse Source

HPCC-19059 Allow all rows for a heap to be released in a single call

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 7 years ago
parent
commit
e016c3bcc5

+ 11 - 0
common/thorhelper/roxierow.cpp

@@ -310,6 +310,11 @@ public:
         heap->gatherStats(stats);
     }
 
+    virtual void releaseAllRows() override
+    {
+        heap->releaseAllRows();
+    }
+
 protected:
     Owned<roxiemem::IFixedRowHeap> heap;
 };
@@ -371,6 +376,12 @@ public:
         heap->gatherStats(stats);
     }
 
+    virtual void releaseAllRows() override
+    {
+        //It is not legal to call releaseAllRows on a variable size allocator - they are not allocated in a single heap
+        throwUnexpected();
+    }
+
 protected:
     Owned<roxiemem::IVariableRowHeap> heap;
 };

+ 1 - 0
ecl/hqlcpp/hqlcppds.cpp

@@ -3031,6 +3031,7 @@ public:
     virtual IOutputRowDeserializer *createInternalDeserializer(ICodeContext *ctx) override { throwUnexpected(); }
     virtual IEngineRowAllocator *createChildRowAllocator(const RtlTypeInfo *type) override { throwUnexpected(); }
     virtual void gatherStats(CRuntimeStatisticCollection & stats) override {}
+    virtual void releaseAllRows() override { throwUnexpected(); }
 };
 
 //Use a (constant) transform to map selectors of the form queryActiveTableSelector().field

+ 6 - 1
roxie/ccd/ccdserver.cpp

@@ -7611,7 +7611,12 @@ class CRoxieServerHashDedupActivity : public CRoxieServerActivity
         {
             bestCompare=helper.queryCompareBest();
         }
-        virtual ~HashDedupTable() { _releaseAll(); }
+        virtual ~HashDedupTable()
+        {
+            //elementRowAllocator is a unique allocator, so all rows can be freed in a single call
+            elementRowAllocator->releaseAllRows();
+            tablecount = 0;
+        }
 
         virtual unsigned getHashFromElement(const void *et) const       
         {

+ 176 - 1
roxie/roxiemem/roxiemem.cpp

@@ -1352,6 +1352,8 @@ public:
     virtual void checkHeap() const = 0;
     virtual void getPeakActivityUsage(IActivityMemoryUsageMap *map) const = 0;
     virtual bool isFull() const = 0;
+    virtual void releaseAllRows() = 0;
+
 #ifdef _WIN32
 #ifdef new
 #define __new_was_defined
@@ -1857,7 +1859,7 @@ public:
             }
             else
             {
-                const char *ptr = block + (chunkSize-chunkCapacity);  // assumes the overhead is all at the start
+                const char *ptr = block + chunkHeaderSize;  // assumes the overhead is all at the start
                 std::atomic_uint * ptrCount = (std::atomic_uint *)(ptr - sizeof(std::atomic_uint));
                 unsigned rowCount = ptrCount->load(std::memory_order_relaxed);
                 if (ROWCOUNT(rowCount) != 0)
@@ -1870,6 +1872,33 @@ public:
         }
     }
 
+    virtual void releaseAllRows() override
+    {
+        unsigned base = 0;
+        unsigned limit = freeBase.load(std::memory_order_acquire); // acquire ensures that any link counts will be initialised
+        while (base < limit)
+        {
+            char *block = data() + base;
+            if (heapFlags & RHFscanning)
+            {
+                if (((std::atomic_uint *)block)->load(std::memory_order_relaxed) < FREE_ROW_COUNT_MIN)
+                {
+                    callDestructor(block);
+                }
+            }
+            else
+            {
+                const char *ptr = block + chunkHeaderSize;  // assumes the overhead is all at the start
+                std::atomic_uint * ptrCount = (std::atomic_uint *)(ptr - sizeof(std::atomic_uint));
+                unsigned rowCount = ptrCount->load(std::memory_order_relaxed);
+                if (ROWCOUNT(rowCount) != 0)
+                    callDestructor(block);
+            }
+            base += chunkSize;
+        }
+    }
+
+
     virtual void checkHeap() const 
     {
         //This function may not give 100% accurate results if called if there are concurrent allocations/releases
@@ -1963,6 +1992,17 @@ private:
         const char * ptr = (const char *)block + chunkHeaderSize;
         logctx.CTXLOG("Block size %u at %p %swas allocated by activity %u and not freed (%d)", chunkSize, ptr, hasChildren ? "(with children) " : "", getActivityId(allocatorId), ROWCOUNT(rowCount));
     }
+
+    void callDestructor(char * block)
+    {
+        ChunkHeader * header = (ChunkHeader *)block;
+        unsigned rowCount = header->count.load();
+        if (rowCount & ROWCOUNT_DESTRUCTOR_FLAG)
+        {
+            unsigned allocatorId = header->allocatorId;
+            allocatorCache->onDestroy(allocatorId & MAX_ACTIVITY_ID, (block + chunkHeaderSize));
+        }
+    }
 };
 
 //================================================================================
@@ -2110,6 +2150,30 @@ public:
         leaked -= numLeaked;
     }
 
+    virtual void releaseAllRows() override
+    {
+        unsigned base = 0;
+        unsigned limit = freeBase.load(std::memory_order_acquire); // acquire ensures that any link counts will be initialised
+        while (base < limit)
+        {
+            char *block = data() + base;
+            if (heapFlags & RHFscanning)
+            {
+                if (((std::atomic_uint *)block)->load(std::memory_order_relaxed) < FREE_ROW_COUNT_MIN)
+                    callDestructor(block);
+            }
+            else
+            {
+                const char *ptr = block + chunkHeaderSize;  // assumes the overhead is all at the start
+                std::atomic_uint * ptrCount = (std::atomic_uint *)(ptr - sizeof(std::atomic_uint));
+                unsigned rowCount = ptrCount->load(std::memory_order_relaxed);
+                if (ROWCOUNT(rowCount) != 0)
+                    callDestructor(block);
+            }
+            base += chunkSize;
+        }
+    }
+
     virtual unsigned _rawAllocatorId(const void *ptr) const
     {
         return sharedAllocatorId;
@@ -2145,6 +2209,14 @@ public:
             map->noteMemUsage(sharedAllocatorId, numAllocs * chunkSize, numAllocs);
         }
     }
+
+    void callDestructor(char * block)
+    {
+        ChunkHeader * header = (ChunkHeader *)block;
+        unsigned rowCount = header->count.load();
+        if (rowCount & ROWCOUNT_DESTRUCTOR_FLAG)
+            allocatorCache->onDestroy(sharedAllocatorId & MAX_ACTIVITY_ID, (block + chunkHeaderSize));
+    }
 };
 
 //================================================================================
@@ -2329,6 +2401,13 @@ public:
     {
         return (count.load(std::memory_order_relaxed) > 1);
     }
+
+    virtual void releaseAllRows() override
+    {
+        //Weird for this to ever be called.
+        if (allocatorId & ACTIVITY_FLAG_NEEDSDESTRUCTOR)
+            allocatorCache->onDestroy(allocatorId & MAX_ACTIVITY_ID, data());
+    }
 };
 
 //================================================================================
@@ -2563,6 +2642,11 @@ public:
 
     virtual void *allocate();
 
+    virtual void releaseAllRows()
+    {
+        throw MakeStringExceptionDirect(ROXIEMM_RELEASE_ALL_SHARED_HEAP, "Illegal to release all rows for a shared heap");
+    }
+
 protected:
     size32_t chunkCapacity;
 };
@@ -2604,6 +2688,11 @@ public:
         CRoxieFixedRowHeapBase::clearRowManager();
     }
 
+    virtual void releaseAllRows() override
+    {
+        heap->releaseAllRows();
+    }
+
 protected:
     Owned<T> heap;
 };
@@ -2639,6 +2728,12 @@ public:
             ::ReleaseRoxieRow(rows[curRow++]);
     }
 
+    virtual void releaseAllRows() override
+    {
+        curRow = numRows;
+        CRoxieDirectFixedRowHeap<T>::releaseAllRows();
+    }
+
 protected:
     static const unsigned maxRows = 16; // Maximum number of rows to allocate at once.
     char * rows[maxRows];
@@ -3277,6 +3372,8 @@ public:
         curCompactTarget = target;
     }
 
+    void releaseAllRows();
+
 protected:
     void * doAllocateRow(unsigned allocatorId, unsigned maxSpillCost);
     unsigned doAllocateRowBlock(unsigned allocatorId, unsigned maxSpillCost, unsigned max, char * * rows);
@@ -5999,6 +6096,32 @@ void CChunkedHeap::checkScans(unsigned allocatorId)
         reportScanProblem(allocatorId, numScans, merged);
 }
 
+void CChunkedHeap::releaseAllRows()
+{
+    CriticalBlock b(heapletLock);
+
+    if (heaplets)
+    {
+        Heaplet *finger = heaplets;
+
+        //Note: This loop doesn't unlink the list because the list and all blocks are going to be disposed.
+        do
+        {
+            Heaplet *next = getNext(finger);
+            finger->releaseAllRows();
+            delete finger;
+            finger = next;
+        } while (finger != heaplets);
+
+        heaplets = nullptr;
+        activeHeaplet = nullptr;
+    }
+
+    possibleEmptyPages.store(false, std::memory_order_release);
+    headMaybeSpace.store(BLOCKLIST_NULL, std::memory_order_release);
+}
+
+
 //================================================================================
 
 ChunkedHeaplet * CFixedChunkedHeap::allocateHeaplet()
@@ -6687,6 +6810,7 @@ class RoxieMemTests : public CppUnit::TestFixture
         CPPUNIT_TEST(testCompressSize);
         CPPUNIT_TEST(testBitmap);
         CPPUNIT_TEST(testAllocSize);
+        CPPUNIT_TEST(testReleaseAll);
         CPPUNIT_TEST(testHuge);
         CPPUNIT_TEST(testAll);
         CPPUNIT_TEST(testRecursiveCallbacks);
@@ -7418,6 +7542,7 @@ protected:
         virtual void checkValid(unsigned cacheId, const void *row) const { }
 
         void clear() { counter = 0; }
+        unsigned getCounter() { return counter.load(); };
 
         mutable std::atomic_uint counter;
     };
@@ -7804,6 +7929,56 @@ protected:
         testGeneralCas();
     }
 
+    void testReleaseAll(IFixedRowHeap * heap, CountingRowAllocatorCache & rowCache)
+    {
+        rowCache.clear();
+
+        const unsigned numRows = 200;
+        const void * rows[numRows];
+        for (unsigned i=0; i < numRows; i++)
+            rows[i] = heap->finalizeRow(heap->allocate());
+
+        //test that calling releaseAllRows() calls all the destructors
+        CPPUNIT_ASSERT_EQUAL(0U, rowCache.getCounter());
+        heap->releaseAllRows();
+        CPPUNIT_ASSERT_EQUAL(numRows, rowCache.getCounter());
+
+        //Check the heap is still in a valid state
+        //Test again - releasing half the rows before calling releaseAllRows() to free any that remain
+        rowCache.clear();
+        for (unsigned i2=0; i2 < numRows; i2++)
+            rows[i2] = heap->finalizeRow(heap->allocate());
+
+        for (unsigned i3=0; i3 < numRows; i3 += 2)
+            ::ReleaseRoxieRow(rows[i3]);
+
+        CPPUNIT_ASSERT_EQUAL(numRows / 2, rowCache.getCounter());
+        heap->releaseAllRows();
+        CPPUNIT_ASSERT_EQUAL(numRows, rowCache.getCounter());
+    }
+
+    void testReleaseAll(CountingRowAllocatorCache & rowCache, IRowManager * rowManager, unsigned flags)
+    {
+        Owned<IFixedRowHeap> heap = rowManager->createFixedRowHeap(100, ACTIVITY_FLAG_ISREGISTERED|0, flags);
+        testReleaseAll(heap, rowCache);
+    }
+
+    void testReleaseAll()
+    {
+        CountingRowAllocatorCache rowCache;
+        Owned<IRowManager> rowManager = createRowManager(0, NULL, logctx, &rowCache);
+
+        //Test releaseAll on various different heap variants
+        testReleaseAll(rowCache, rowManager, RHFhasdestructor|RHFunique);
+        testReleaseAll(rowCache, rowManager, RHFhasdestructor|RHFunique|RHFpacked);
+        testReleaseAll(rowCache, rowManager, RHFhasdestructor|RHFunique|RHFscanning);
+        testReleaseAll(rowCache, rowManager, RHFhasdestructor|RHFunique|RHFpacked|RHFscanning);
+        testReleaseAll(rowCache, rowManager, RHFhasdestructor|RHFunique|RHFblocked);
+        testReleaseAll(rowCache, rowManager, RHFhasdestructor|RHFunique|RHFpacked|RHFblocked);
+        testReleaseAll(rowCache, rowManager, RHFhasdestructor|RHFunique|RHFscanning|RHFdelayrelease);
+        testReleaseAll(rowCache, rowManager, RHFhasdestructor|RHFunique|RHFpacked|RHFscanning|RHFdelayrelease);
+    }
+
     void testCallback(unsigned numPerPage, unsigned pages, unsigned spillPages, double scale, unsigned flags)
     {
         CountingRowAllocatorCache rowCache;

+ 2 - 0
roxie/roxiemem/roxiemem.hpp

@@ -36,6 +36,7 @@
 #define ROXIEMM_INVALID_MEMORY_ALIGNMENT  ROXIEMM_ERROR_START+2
 #define ROXIEMM_HEAP_ERROR                ROXIEMM_ERROR_START+3
 #define ROXIEMM_TOO_MUCH_MEMORY           ROXIEMM_ERROR_START+4
+#define ROXIEMM_RELEASE_ALL_SHARED_HEAP   ROXIEMM_ERROR_START+5
 // NB: max ROXIEMM_* error is ROXIEMM_ERROR_END (see errorlist.h)
 
 #ifdef __64BIT__
@@ -398,6 +399,7 @@ interface IFixedRowHeap : extends IRowHeap
     virtual void *allocate() = 0;
     virtual void *finalizeRow(void *final) = 0;
     virtual void emptyCache() = 0;
+    virtual void releaseAllRows() = 0; // Release any active heaplets and rows within those heaplets.  Use with extreme care.
 };
 
 interface IVariableRowHeap : extends IRowHeap

+ 1 - 0
rtl/include/eclhelper.hpp

@@ -270,6 +270,7 @@ interface IEngineRowAllocator : extends IInterface
     virtual IEngineRowAllocator *createChildRowAllocator(const RtlTypeInfo *childtype) = 0;
 
     virtual void gatherStats(CRuntimeStatisticCollection & stats) = 0;
+    virtual void releaseAllRows() = 0; // Only valid on unique allocators, use with extreme care
 };
 
 interface IRowSerializerTarget