Explorar o código

Initial work on roxiemem callbacks etc

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday %!s(int64=13) %!d(string=hai) anos
pai
achega
d89c6fe122

+ 119 - 57
common/roxiehelper/roxierow.cpp

@@ -20,6 +20,98 @@
 #include "roxierow.hpp"
 #include "thorcommon.ipp"
 
+//Classes can be used to configure the allocator, and add extra data to the end.
+//The checking needs to be done by setting a bit in the allocatorid
+class NoCheckingHelper
+{
+public:
+    enum {
+        extraSize = 0,
+        allocatorCheckFlag = 0x00000000
+    };
+    static inline void setCheck(size32_t size, void * ptr) {}
+    static inline bool isValid(const void * ptr) { return true; }
+};
+
+//NOTE: If a row requires checking then the row will also have the bit set to indicate it requires a destructor
+//so that rows are checked on destructon.
+//Therefore checking if the destructor is set for a row in isValid() to protect us from uninitialised crcs.
+class Crc16CheckingHelper
+{
+public:
+    enum {
+        extraSize = sizeof(unsigned short),
+        allocatorCheckFlag = 0x00100000|ACTIVITY_FLAG_NEEDSDESTRUCTOR
+    };
+    static inline void setCheck(size32_t size, void * _ptr)
+    {
+        byte * ptr = static_cast<byte *>(_ptr);
+        size32_t capacity = RoxieRowCapacity(ptr);
+        memset(ptr+size, 0, capacity - size - extraSize);
+        unsigned short * check = reinterpret_cast<unsigned short *>(ptr + capacity - extraSize);
+        *check = crc16(ptr, capacity-extraSize, 0);
+    }
+    static inline bool isValid(const void * _ptr)
+    {
+        if (RoxieRowHasDestructor(_ptr))
+        {
+            const byte * ptr = static_cast<const byte *>(_ptr);
+            size32_t capacity = RoxieRowCapacity(ptr);
+            const unsigned short * check = reinterpret_cast<const unsigned short *>(ptr + capacity - extraSize);
+            return *check == crc16(ptr, capacity-extraSize, 0);
+        }
+        return true;
+    }
+};
+
+class Sum16CheckingHelper
+{
+public:
+    enum {
+        extraSize = sizeof(unsigned short),
+        allocatorCheckFlag = 0x00200000|ACTIVITY_FLAG_NEEDSDESTRUCTOR
+    };
+    static inline void setCheck(size32_t size, void * _ptr)
+    {
+        byte * ptr = static_cast<byte *>(_ptr);
+        size32_t capacity = RoxieRowCapacity(ptr);
+        memset(ptr+size, 0, capacity - size - extraSize);
+        unsigned short * check = reinterpret_cast<unsigned short *>(ptr + capacity - extraSize);
+        *check = chksum16(ptr, capacity-extraSize);
+    }
+    static inline bool isValid(const void * _ptr)
+    {
+        if (RoxieRowHasDestructor(_ptr))
+        {
+            const byte * ptr = static_cast<const byte *>(_ptr);
+            size32_t capacity = RoxieRowCapacity(ptr);
+            const unsigned short * check = reinterpret_cast<const unsigned short *>(ptr + capacity - extraSize);
+            return chksum16(ptr, capacity-extraSize) == *check;
+        }
+        return true;
+    }
+};
+
+bool isRowCheckValid(unsigned allocatorId, const void * row)
+{
+    switch (allocatorId & ALLOCATORID_CHECK_MASK)
+    {
+    case NoCheckingHelper::allocatorCheckFlag:
+        return true;
+    case Crc16CheckingHelper::allocatorCheckFlag:
+        return Crc16CheckingHelper::isValid(row);
+    case Sum16CheckingHelper::allocatorCheckFlag:
+        return Sum16CheckingHelper::isValid(row);
+    default:
+        UNIMPLEMENTED;
+    }
+}
+
+//--------------------------------------------------------------------------------------------------------------------
+
+//More: Function to calculate the total size of a row - requires access to a rowallocator.
+
+//--------------------------------------------------------------------------------------------------------------------
 class RoxieEngineRowAllocatorBase : public CInterface, implements IEngineRowAllocator
 {
 public:
@@ -116,43 +208,7 @@ protected:
     unsigned allocatorId;
 };
 
-//General purpose row allocator here for reference - should be removed once spcialised versions are created
-class RoxieEngineRowAllocator : public RoxieEngineRowAllocatorBase
-{
-public:
-    RoxieEngineRowAllocator(roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId)
-        : RoxieEngineRowAllocatorBase(_rowManager, _meta, _activityId, _allocatorId)
-    {
-    }
-
-    virtual void * createRow()
-    {
-        size32_t allocSize = meta.getInitialSize();
-        return rowManager.allocate(allocSize, allocatorId | ACTIVITY_FLAG_ISREGISTERED);
-    }
-
-    virtual void * createRow(size32_t & allocatedSize)
-    {
-        const size32_t allocSize = meta.getInitialSize();
-        void *ret = rowManager.allocate(allocSize, allocatorId | ACTIVITY_FLAG_ISREGISTERED);
-        //more: allocate could return the allocated size, but that would penalise the fixed row case
-        allocatedSize = RoxieRowCapacity(ret);
-        return ret;
-    }
-
-    virtual void * resizeRow(size32_t newSize, void * row, size32_t & size)
-    {
-        return rowManager.resizeRow(row, size, newSize, allocatorId | ACTIVITY_FLAG_ISREGISTERED, size);
-    }
-
-    virtual void * finalizeRow(size32_t finalSize, void * row, size32_t oldSize)
-    {
-        unsigned id = allocatorId | ACTIVITY_FLAG_ISREGISTERED;
-        if (meta.needsDestruct()) id |= ACTIVITY_FLAG_NEEDSDESTRUCTOR;
-        return rowManager.finalizeRow(row, oldSize, finalSize, id);
-    }
-};
-
+template <class CHECKER>
 class RoxieEngineFixedRowAllocator : public RoxieEngineRowAllocatorBase
 {
 public:
@@ -160,9 +216,9 @@ public:
         : RoxieEngineRowAllocatorBase(_rowManager, _meta, _activityId, _allocatorId)
     {
         unsigned flags = packed ? roxiemem::RHFpacked : roxiemem::RHFnone;
-        if (meta.needsDestruct())
+        if (meta.needsDestruct() || CHECKER::allocatorCheckFlag)
             flags |= roxiemem::RHFhasdestructor;
-        heap.setown(rowManager.createFixedRowHeap(meta.getFixedSize(), allocatorId | ACTIVITY_FLAG_ISREGISTERED, (roxiemem::RoxieHeapFlags)flags));
+        heap.setown(rowManager.createFixedRowHeap(meta.getFixedSize(), allocatorId | ACTIVITY_FLAG_ISREGISTERED | CHECKER::allocatorCheckFlag, (roxiemem::RoxieHeapFlags)flags));
     }
 
     virtual void * createRow()
@@ -184,8 +240,9 @@ public:
 
     virtual void * finalizeRow(size32_t finalSize, void * row, size32_t oldSize)
     {
-        if (!meta.needsDestruct())
+        if (!meta.needsDestruct() && !CHECKER::allocatorCheckFlag)
             return row;
+        CHECKER::setCheck(finalSize, row);
         return heap->finalizeRow(row);
     }
 
@@ -193,58 +250,63 @@ protected:
     Owned<roxiemem::IFixedRowHeap> heap;
 };
 
+template <class CHECKER>
 class RoxieEngineVariableRowAllocator : public RoxieEngineRowAllocatorBase
 {
 public:
-    RoxieEngineVariableRowAllocator(roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, bool _packed)
-        : RoxieEngineRowAllocatorBase(_rowManager, _meta, _activityId, _allocatorId), packed(_packed)
+    RoxieEngineVariableRowAllocator(roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, bool packed)
+        : RoxieEngineRowAllocatorBase(_rowManager, _meta, _activityId, _allocatorId)
     {
         unsigned flags = packed ? roxiemem::RHFpacked : roxiemem::RHFnone;
-        if (meta.needsDestruct())
+        if (meta.needsDestruct() || CHECKER::allocatorCheckFlag)
             flags |= roxiemem::RHFhasdestructor;
-        heap.setown(rowManager.createVariableRowHeap(allocatorId | ACTIVITY_FLAG_ISREGISTERED, (roxiemem::RoxieHeapFlags)flags));
+        heap.setown(rowManager.createVariableRowHeap(allocatorId | ACTIVITY_FLAG_ISREGISTERED | CHECKER::allocatorCheckFlag, (roxiemem::RoxieHeapFlags)flags));
     }
 
     virtual void * createRow()
     {
         size32_t allocSize = meta.getInitialSize();
         size32_t capacity;
-        return heap->allocate(allocSize, capacity);
+        return heap->allocate(allocSize+CHECKER::extraSize, capacity);
     }
 
     virtual void * createRow(size32_t & allocatedSize)
     {
         const size32_t allocSize = meta.getInitialSize();
-        return heap->allocate(allocSize, allocatedSize);
+        return heap->allocate(allocSize+CHECKER::extraSize, allocatedSize);
     }
 
     virtual void * resizeRow(size32_t newSize, void * row, size32_t & size)
     {
-        return heap->resizeRow(row, size, newSize, size);
+        return heap->resizeRow(row, size, newSize+CHECKER::extraSize, size);
     }
 
     virtual void * finalizeRow(size32_t finalSize, void * row, size32_t oldSize)
     {
-        if (!meta.needsDestruct() && !packed)
+        if (!meta.needsDestruct() && !CHECKER::allocatorCheckFlag)
             return row;
-        return heap->finalizeRow(row, oldSize, finalSize);
+        void * newrow = heap->finalizeRow(row, oldSize, finalSize+CHECKER::extraSize);
+        CHECKER::setCheck(finalSize, newrow);
+        return newrow;
     }
 
 protected:
     Owned<roxiemem::IVariableRowHeap> heap;
-    bool packed;    // may not be needed - depends on implementation
 };
 
 
 IEngineRowAllocator * createRoxieRowAllocator(roxiemem::IRowManager & rowManager, IOutputMetaData * meta, unsigned activityId, unsigned allocatorId, bool packed)
 {
-#if 0
-    //old code
-    return new RoxieEngineRowAllocator(rowManager, meta, activityId, allocatorId);
-#else
     if (meta->getFixedSize() != 0)
-        return new RoxieEngineFixedRowAllocator(rowManager, meta, activityId, allocatorId, packed);
+        return new RoxieEngineFixedRowAllocator<NoCheckingHelper>(rowManager, meta, activityId, allocatorId, packed);
+    else
+        return new RoxieEngineVariableRowAllocator<NoCheckingHelper>(rowManager, meta, activityId, allocatorId, packed);
+}
+
+IEngineRowAllocator * createCrcRoxieRowAllocator(roxiemem::IRowManager & rowManager, IOutputMetaData * meta, unsigned activityId, unsigned allocatorId, bool packed)
+{
+    if (meta->getFixedSize() != 0)
+        return new RoxieEngineFixedRowAllocator<Crc16CheckingHelper>(rowManager, meta, activityId, allocatorId, packed);
     else
-        return new RoxieEngineVariableRowAllocator(rowManager, meta, activityId, allocatorId, packed);
-#endif
+        return new RoxieEngineVariableRowAllocator<Crc16CheckingHelper>(rowManager, meta, activityId, allocatorId, packed);
 }

+ 12 - 0
common/roxiehelper/roxierow.hpp

@@ -32,6 +32,18 @@
 #include "roxiemem.hpp"
 #include "eclhelper.hpp"
 
+#define ALLOCATORID_CHECK_MASK  0x00300000
+
 extern ROXIEHELPER_API IEngineRowAllocator * createRoxieRowAllocator(roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, bool packed);
 
+extern ROXIEHELPER_API bool isRowCheckValid(unsigned allocatorId, const void * row);
+
+//Inline call which avoids the call if now row checking is enabled.
+inline bool RoxieRowCheckValid(unsigned allocatorId, const void * row)
+{
+    if (allocatorId & ALLOCATORID_CHECK_MASK)
+        return isRowCheckValid(allocatorId, row);
+    return true;
+}
+
 #endif

+ 4 - 1
ecl/eclagent/eclagent.ipp

@@ -659,7 +659,10 @@ public:
         }
         allocator->queryOutputMeta()->destruct((byte *) row);
     }
-    
+    virtual void checkValid(unsigned cacheId, void *row) const
+    {
+        //MORE
+    }
     virtual const char *queryAllowedPipePrograms()
     {
         return allowedPipeProgs.get();

+ 12 - 1
roxie/ccd/ccdserver.cpp

@@ -28119,8 +28119,19 @@ public:
                 return;
             }
         }
+        if (!RoxieRowCheckValid(cacheId, row))
+        {
+            //MORE: Give an error, but don't throw an exception!
+        }
         allocator->queryOutputMeta()->destruct((byte *) row);
     }
+    virtual void checkValid(unsigned cacheId, void *row) const
+    {
+        if (!RoxieRowCheckValid(cacheId, row))
+        {
+            //MORE: Throw an exception?
+        }
+    }
 
     virtual IWorkUnit *updateWorkUnit() const
     {
@@ -32314,7 +32325,7 @@ protected:
             ASSERT(in.state == TestInput::STATEreset);
             ASSERT(!input2 || in2.state == TestInput::STATEreset);
             ctx->queryRowManager().reportLeaks();
-            ASSERT(ctx->queryRowManager().pages() == 0);
+            ASSERT(ctx->queryRowManager().numPagesAfterCleanup() == 0);
         }
     }
 

+ 460 - 62
roxie/roxiemem/roxiemem.cpp

@@ -28,13 +28,6 @@
 #define _CLEAR_ALLOCATED_ROW
 #endif
 
-//Use for asserts that are highly unlikely to occur, and would likely to be reproduced in debug mode.
-#ifdef _DEBUG
-#define dbgassertex(x) assertex(x)
-#else
-#define dbgassertex(x)
-#endif
-
 namespace roxiemem {
 
 #define USE_MADVISE_ON_FREE     // avoid linux swapping 'freed' pages to disk
@@ -581,6 +574,13 @@ public:
         atomic_set((atomic_t *)ptr, 1|ROWCOUNT_DESTRUCTOR_FLAG);
     }
 
+    virtual bool _hasDestructor(const void *ptr) const
+    {
+        const atomic_t * curCount = reinterpret_cast<const atomic_t *>((const char *) ptr - sizeof(atomic_t));
+        unsigned rowCount = atomic_read(curCount);
+        return (rowCount & ROWCOUNT_DESTRUCTOR_FLAG) != 0;
+    }
+
     virtual void reportLeaks(unsigned &leaked, const IContextLogger &logctx) const
     {
         //This function may not give correct results if called if there are concurrent allocations/releases
@@ -766,6 +766,12 @@ public:
         return HEAP_ALIGNMENT_SIZE - (offsetof(FixedSizeHeaplet, data) + chunkHeaderSize);
     }
 
+    virtual unsigned _rawAllocatorId(const void *ptr) const
+    {
+        ChunkHeader * header = (ChunkHeader *)ptr - 1;
+        return header->allocatorId;
+    }
+
     virtual void reportLeak(const void * block, const IContextLogger &logctx) const
     {
         ChunkHeader * header = (ChunkHeader *)block;
@@ -912,6 +918,11 @@ public:
         logctx.CTXLOG("Block size %u at %p %swas allocated by activity %u and not freed (%d)", chunkSize, ptr, hasChildren ? "(with children) " : "", getActivityId(sharedAllocatorId), ROWCOUNT(rowCount));
     }
 
+    virtual unsigned _rawAllocatorId(const void *ptr) const
+    {
+        return sharedAllocatorId;
+    }
+
     virtual void checkHeap() const
     {
     }
@@ -1017,9 +1028,20 @@ public:
 
     virtual void _setDestructorFlag(const void *ptr)
     {
+        //MORE: This should probably use the top bit of the count as well.
         allocatorId |= ACTIVITY_FLAG_NEEDSDESTRUCTOR;
     }
 
+    virtual bool _hasDestructor(const void *ptr) const
+    {
+        return (allocatorId & ACTIVITY_FLAG_NEEDSDESTRUCTOR) != 0;
+    }
+
+    virtual unsigned _rawAllocatorId(const void *ptr) const
+    {
+        return allocatorId;
+    }
+
     virtual void noteLinked(const void *ptr)
     {
         atomic_inc(&count);
@@ -1312,7 +1334,7 @@ public:
         return total;
     }
 
-    unsigned pages()
+    unsigned releaseEmptyPages()
     {
         unsigned total = 0;
         BigHeapletBase *prev = NULL;
@@ -1330,11 +1352,11 @@ public:
                     setNext(prev, next);
                 else
                     active = next;
+                total += finger->sizeInPages();
                 delete finger;
             }
             else
             {
-                total += finger->sizeInPages();
                 prev = finger;
             }
             finger = next;
@@ -1354,6 +1376,8 @@ public:
         }
     }
 
+    inline bool isEmpty() const { return !active; }
+
     virtual bool matches(size32_t searchSize, unsigned searchActivity, unsigned searchFlags) const
     {
         return false;
@@ -1455,6 +1479,190 @@ protected:
 #define ROUNDEDSIZE(rounded) ((rounded) & ((size32_t)HEAP_ALIGNMENT_SIZE -1))
 #define ROUNDEDHEAP(rounded) ((rounded) / (size32_t)HEAP_ALIGNMENT_SIZE)
 
+class BufferedRowCallbackManager
+{
+    class ReleaseBufferThread : public Thread
+    {
+    public:
+        ReleaseBufferThread(BufferedRowCallbackManager * _owner) : Thread("ReleaseBufferThread"), owner(_owner) {};
+
+        virtual int run()
+        {
+            owner->runReleaseBufferThread();
+            return 0;
+        }
+
+    private:
+        BufferedRowCallbackManager * owner;
+    };
+
+public:
+    BufferedRowCallbackManager()
+    {
+        atomic_set(&releasingBuffers, 0);
+        atomic_set(&releaseSeq, 0);
+        abortBufferThread = false;
+    }
+    ~BufferedRowCallbackManager()
+    {
+        stopReleaseBufferThread();
+    }
+
+    inline unsigned getReleaseSeq() { return atomic_read(&releaseSeq); }
+
+    void addRowBuffer(IBufferedRowCallback * callback)
+    {
+        CriticalBlock block(callbackCrit);
+        //Assuming a small number so perform an insertion sort.
+        unsigned max = rowBufferCallbacks.ordinality();
+        unsigned priority = callback->getPriority();
+        unsigned insertPos = 0;
+        for (; insertPos < max; insertPos++)
+        {
+            IBufferedRowCallback * curCallback = rowBufferCallbacks.item(insertPos);
+            if (curCallback->getPriority() > priority)
+                break;
+        }
+        rowBufferCallbacks.add(callback, insertPos);
+        updateCallbackInfo();
+    }
+
+    void removeRowBuffer(IBufferedRowCallback * callback)
+    {
+        CriticalBlock block(callbackCrit);
+        rowBufferCallbacks.zap(callback);
+        updateCallbackInfo();
+    }
+
+    void updateCallbackInfo()
+    {
+        nextToCheck = 0;
+        const unsigned numCallbacks = rowBufferCallbacks.ordinality();
+        if (numCallbacks > 0)
+        {
+            lowestPriority = rowBufferCallbacks.item(0)->getPriority();
+            unsigned i = 1;
+            for (; i < numCallbacks; i++)
+            {
+                if (rowBufferCallbacks.item(i)->getPriority() != lowestPriority)
+                    break;
+            }
+            numLowestPriority = i;
+        }
+        else
+        {
+            numLowestPriority = 0;
+        }
+    }
+
+    bool releaseBuffers(const bool critical, const unsigned minSuccess)
+    {
+        unsigned numSuccess = 0;
+
+        CriticalBlock block(callbackCrit);
+        const unsigned numCallbacks = rowBufferCallbacks.ordinality();
+        if (numCallbacks == 0)
+            return false;
+
+        //First perform a round robin on the lowest priority elements
+        unsigned i = nextToCheck;
+        loop
+        {
+            IBufferedRowCallback * curCallback = rowBufferCallbacks.item(i);
+            unsigned next = i+1;
+            if (next == numLowestPriority)
+                next = 0;
+
+            if (curCallback->freeBufferedRows(critical))
+            {
+                if (++numSuccess >= minSuccess)
+                {
+                    nextToCheck = next;
+                    atomic_inc(&releaseSeq);
+                    return true;
+                }
+            }
+
+            i = next;
+            if (i == nextToCheck)
+                break;
+        }
+
+        //Should this also perform some kind of round robin?  It would require next items for each priority.
+        nextToCheck = 0;
+        for (i = numLowestPriority; i < rowBufferCallbacks.ordinality(); i++)
+        {
+            IBufferedRowCallback * curCallback = rowBufferCallbacks.item(i);
+            if (curCallback->freeBufferedRows(critical))
+            {
+                if (++numSuccess >= minSuccess)
+                    break;
+            }
+        }
+        if (numSuccess != 0)
+        {
+            atomic_inc(&releaseSeq);
+            return true;
+        }
+        return false;
+    }
+
+    void runReleaseBufferThread()
+    {
+        loop
+        {
+            releaseBuffersSem.wait();
+            if (abortBufferThread)
+                break;
+            releaseBuffers(false, 1);
+//            owner->releaseEmptyPages();
+            atomic_set(&releasingBuffers, 0);
+        }
+    }
+
+    void releaseBuffersInBackground()
+    {
+        if (atomic_cas(&releasingBuffers, 1, 0))
+        {
+            assertex(releaseBuffersThread);
+            releaseBuffersSem.signal();
+        }
+    }
+
+    void startReleaseBufferThread()
+    {
+        if (!releaseBuffersThread)
+        {
+            releaseBuffersThread.setown(new ReleaseBufferThread(this));
+            releaseBuffersThread->start();
+        }
+    }
+
+    void stopReleaseBufferThread()
+    {
+        if (releaseBuffersThread)
+        {
+            abortBufferThread = true;
+            releaseBuffersSem.signal();
+            releaseBuffersThread->join();
+            releaseBuffersThread.clear();
+            abortBufferThread = false;
+        }
+    }
+
+protected:
+    CriticalSection callbackCrit;
+    Semaphore releaseBuffersSem;
+    PointerArrayOf<IBufferedRowCallback> rowBufferCallbacks;
+    Owned<ReleaseBufferThread> releaseBuffersThread;
+    atomic_t releasingBuffers;
+    atomic_t releaseSeq;
+    unsigned nextToCheck;
+    unsigned lowestPriority;
+    unsigned numLowestPriority;
+    volatile bool abortBufferThread;
+};
+
 class CChunkingRowManager : public CInterface, implements IRowManager
 {
     friend class CRoxieFixedRowHeap;
@@ -1467,22 +1675,26 @@ class CChunkingRowManager : public CInterface, implements IRowManager
     SpinLock fixedCrit;
     CIArrayOf<CFixedChunkingHeap> normalHeaps;
     CHugeChunkingHeap hugeHeap;
-    unsigned pageLimit;
     ITimeLimiter *timeLimit;
+    DataBufferBase *activeBuffs;
+    const IContextLogger &logctx;
+    unsigned pageLimit;
+    unsigned spillPageLimit;
     unsigned peakPages;
     unsigned dataBuffs;
     unsigned dataBuffPages;
     atomic_t possibleGoers;
-    DataBufferBase *activeBuffs;
-    const IContextLogger &logctx;
-    bool ignoreLeaks;
-    bool trackMemoryByActivity;
+    atomic_t totalHeapPages;
+    BufferedRowCallbackManager callbacks;
     Owned<IActivityMemoryUsageMap> usageMap;
     CIArrayOf<CChunkingHeap> fixedHeaps;
     CopyCIArrayOf<CRoxieFixedRowHeapBase> fixedRowHeaps;  // These are observed, NOT linked
     const IRowAllocatorCache *allocatorCache;
     unsigned __int64 cyclesChecked;       // When we last checked timelimit
     unsigned __int64 cyclesCheckInterval; // How often we need to check timelimit
+    bool ignoreLeaks;
+    bool trackMemoryByActivity;
+
 
     inline unsigned getActivityId(unsigned rawId) const
     {
@@ -1508,10 +1720,12 @@ public:
             prevSize = thisSize;
         }
         pageLimit = _memLimit / HEAP_ALIGNMENT_SIZE;
+        spillPageLimit = 0;
         timeLimit = _tl;
         peakPages = 0;
         dataBuffs = 0;
         atomic_set(&possibleGoers, 0);
+        atomic_set(&totalHeapPages, 0);
         activeBuffs = NULL;
         dataBuffPages = 0;
         ignoreLeaks = _ignoreLeaks;
@@ -1536,6 +1750,8 @@ public:
 
     ~CChunkingRowManager()
     {
+        callbacks.stopReleaseBufferThread();
+
         if (memTraceLevel >= 2)
             logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager d-tor pageLimit=%u peakPages=%u dataBuffs=%u dataBuffPages=%u possibleGoers=%u rowMgr=%p", 
                     pageLimit, peakPages, dataBuffs, dataBuffPages, atomic_read(&possibleGoers), this);
@@ -1607,23 +1823,21 @@ public:
         return total;
     }
 
-    virtual unsigned pages()
+    virtual unsigned numPagesAfterCleanup()
     {
-        unsigned total = dataBuffPages;
-        ForEachItemIn(iNormal, normalHeaps)
-            total += normalHeaps.item(iNormal).pages();
-        total += hugeHeap.pages();
+        releaseEmptyPages();
+        return dataBuffPages + atomic_read(&totalHeapPages);
+    }
 
-        SpinBlock block(fixedCrit); //Spinblock needed if we can add/remove fixed heaps while allocations are occuring
+    void removeUnusedHeaps()
+    {
+        SpinBlock block(fixedCrit);
         unsigned numHeaps = fixedHeaps.ordinality();
         unsigned i = 0;
         while (i < numHeaps)
         {
             CChunkingHeap & fixedHeap = fixedHeaps.item(i);
-            unsigned thisPages = fixedHeap.pages();
-            total += thisPages;
-            //if this heap has no pages, and no external references then it can be removed
-            if ((thisPages == 0) && !fixedHeap.IsShared())
+            if (fixedHeap.isEmpty() && !fixedHeap.IsShared())
             {
                 fixedHeaps.remove(i);
                 numHeaps--;
@@ -1631,8 +1845,32 @@ public:
             else
                 i++;
         }
+    }
+    void releaseEmptyPages()
+    {
+        unsigned total = 0;
+        ForEachItemIn(iNormal, normalHeaps)
+            total += normalHeaps.item(iNormal).releaseEmptyPages();
+        total += hugeHeap.releaseEmptyPages();
 
-        return total;
+        bool hadUnusedHeap = false;
+        {
+            SpinBlock block(fixedCrit); //Spinblock needed if we can add/remove fixed heaps while allocations are occuring
+            ForEachItemIn(i, fixedHeaps)
+            {
+                CChunkingHeap & fixedHeap = fixedHeaps.item(i);
+                total += fixedHeap.releaseEmptyPages();
+                //if this heap has no pages, and no external references then it can be removed
+                if (fixedHeap.isEmpty() && !fixedHeap.IsShared())
+                    hadUnusedHeap = true;
+                i++;
+            }
+        }
+
+        if (hadUnusedHeap)
+            removeUnusedHeaps();
+
+        atomic_add(&totalHeapPages, -(int)total);
     }
 
     virtual void getPeakActivityUsage()
@@ -1656,7 +1894,7 @@ public:
     //MORE: inline??
     static size32_t roundup(size32_t size)
     {
-        dbgassertex((size > FixedSizeHeaplet::chunkHeaderSize) && (size <= FixedSizeHeaplet::maxHeapSize() + FixedSizeHeaplet::chunkHeaderSize));
+        dbgassertex((size >= FixedSizeHeaplet::chunkHeaderSize) && (size <= FixedSizeHeaplet::maxHeapSize() + FixedSizeHeaplet::chunkHeaderSize));
         //MORE: A binary chop on sizes is likely to be better.
         if (size<=256)
         {
@@ -1723,12 +1961,19 @@ public:
         return normalHeap.doAllocate(activityId);
     }
 
-    virtual void setMemoryLimit(memsize_t bytes)
+    virtual void setMemoryLimit(memsize_t bytes, memsize_t spillSize)
     {
         pageLimit = (unsigned) (bytes / HEAP_ALIGNMENT_SIZE);
+        spillPageLimit = (unsigned) (spillSize / HEAP_ALIGNMENT_SIZE);
+
+        //The test allows no limit on memory, but spill above a certain amount.  Not sure if useful...
+        if (spillPageLimit && (pageLimit != spillPageLimit))
+            callbacks.startReleaseBufferThread();
+        else
+            callbacks.stopReleaseBufferThread();
 
         if (memTraceLevel >= 2)
-            logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager::setMemoryLimit new memlimit=%"I64F"u pageLimit=%u rowMgr=%p", (unsigned __int64) bytes, pageLimit, this);
+            logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager::setMemoryLimit new memlimit=%"I64F"u pageLimit=%u spillLimit=%u rowMgr=%p", (unsigned __int64) bytes, pageLimit, spillPageLimit, this);
     }
 
     virtual void *resizeRow(void * original, size32_t oldsize, size32_t newsize, unsigned activityId, size32_t &capacity)
@@ -1850,18 +2095,48 @@ public:
 
     void checkLimit(unsigned numRequested)
     {
-        unsigned pageCount = pages();
-        if (pageCount + numRequested > peakPages)
+        unsigned totalPages;
+        releaseEmptyPages();
+        loop
         {
-            if (trackMemoryByActivity)
-                getPeakActivityUsage();
-            peakPages = pageCount + numRequested;
+            unsigned lastReleaseSeq = callbacks.getReleaseSeq();
+            //We need to ensure that the number of allocated pages is updated atomically so multiple threads can't all
+            //succeed and have the total take them over the limit.
+            unsigned numHeapPages = atomic_read(&totalHeapPages);
+            unsigned pageCount = dataBuffPages + numHeapPages;
+            totalPages = pageCount + numRequested;
+            if (!pageLimit || totalPages <= pageLimit)
+            {
+                if (!atomic_cas(&totalHeapPages, numHeapPages + numRequested, numHeapPages))
+                    continue;
+                break;
+            }
 
-            if (pageLimit && peakPages > pageLimit)
+            //Try and directly free up some buffers.  It is worth trying again if one of the release functions thinks it
+            //freed up some memory.
+            if (!callbacks.releaseBuffers(true, 1))
             {
-                logctx.CTXLOG("RoxieMemMgr: Memory limit exceeded - current %d, requested %d, limit %d", pageCount, numRequested, pageLimit);
-                throw MakeStringException(ROXIE_MEMORY_LIMIT_EXCEEDED, "memory limit exceeded");
+                //Check if a background thread has freed up some memory.  That can be checked by a comparing value of a counter
+                //which is incremented each time releaseBuffers is successful.
+                if (lastReleaseSeq == callbacks.getReleaseSeq())
+                {
+                    logctx.CTXLOG("RoxieMemMgr: Memory limit exceeded - current %d, requested %d, limit %d", pageCount, numRequested, pageLimit);
+                    throw MakeStringException(ROXIE_MEMORY_LIMIT_EXCEEDED, "memory limit exceeded");
+                }
             }
+            releaseEmptyPages();
+        }
+
+        if (spillPageLimit && (totalPages > spillPageLimit))
+        {
+            callbacks.releaseBuffersInBackground();
+        }
+
+        if (totalPages > peakPages)
+        {
+            if (trackMemoryByActivity)
+                getPeakActivityUsage();
+            peakPages = totalPages;
         }
     }
 
@@ -1963,6 +2238,16 @@ protected:
             fixedHeap.reportLeaks(leaked);
         }
     }
+
+    virtual void addRowBuffer(IBufferedRowCallback * callback)
+    {
+        callbacks.addRowBuffer(callback);
+    }
+
+    virtual void removeRowBuffer(IBufferedRowCallback * callback)
+    {
+        callbacks.removeRowBuffer(callback);
+    }
 };
 
 
@@ -2010,8 +2295,7 @@ void * CRoxieVariableRowHeap::resizeRow(void * original, size32_t oldsize, size3
 
 void * CRoxieVariableRowHeap::finalizeRow(void *final, size32_t originalSize, size32_t finalSize)
 {
-    return rowManager->finalizeRow(final, originalSize, finalSize, allocatorId);
-    //If never shrink the following should be sufficient.
+    //If rows never shrink then the following is sufficient.
     if (flags & RHFhasdestructor)
         HeapletBase::setDestructorFlag(final);
     return final;
@@ -2413,6 +2697,71 @@ bool DataBufferBottom::_isShared(const void *ptr) const
 size32_t DataBufferBottom::_capacity() const { throwUnexpected(); }
 void DataBufferBottom::_setDestructorFlag(const void *ptr) { throwUnexpected(); }
 
+//================================================================================
+
+void RoxieRowArray::set(const void * row, unsigned i)
+{
+    while (rows.ordinality() < i)
+        rows.append(NULL);
+    rows.add(row, i);
+}
+
+//A very simple implementation of a buffered rows class - far from efficient.  Too much overhead adding rows.
+class SimpleRowBuffer : implements IBufferedRowCallback
+{
+public:
+    SimpleRowBuffer(unsigned _priority) : priority(_priority), locked(false)
+    {
+        numFree = 0;
+        numCriticalFree = 0;
+    }
+    ~SimpleRowBuffer() { if (numFree) DBGLOG("Free: %u Critical: %u", numFree, numCriticalFree); };
+
+//interface IBufferedRowCallback
+    virtual unsigned getPriority() const { return priority; }
+    virtual bool freeBufferedRows(bool critical)
+    {
+        numFree++;
+        if (critical)
+            numCriticalFree++;
+        CriticalBlock block(cs);
+        if (locked || (rows.ordinality() == 0))
+            return false;
+        rows.kill();
+        return true;
+    }
+
+    void addRow(const void * row)
+    {
+        CriticalBlock block(cs);
+        rows.append(row);
+    }
+
+    void kill()
+    {
+        CriticalBlock block(cs);
+        rows.kill();
+    }
+
+    void lockRows()
+    {
+        CriticalBlock block(cs);
+        locked = true;
+    }
+    void unlockRows()
+    {
+        CriticalBlock block(cs);
+        locked = false;
+    }
+
+protected:
+    RoxieRowArray rows;
+    CriticalSection cs;
+    unsigned priority;
+    unsigned numCriticalFree;
+    unsigned numFree;
+    bool locked;
+};
 
 //================================================================================
 //
@@ -2487,6 +2836,7 @@ public:
 class RoxieMemTests : public CppUnit::TestFixture  
 {
     CPPUNIT_TEST_SUITE( RoxieMemTests );
+        CPPUNIT_TEST(testCallbacks);/*
         CPPUNIT_TEST(testBitmapThreading);
         CPPUNIT_TEST(testAllocSize);
         CPPUNIT_TEST(testHuge);
@@ -2494,7 +2844,7 @@ class RoxieMemTests : public CppUnit::TestFixture
         CPPUNIT_TEST(testAll);
         CPPUNIT_TEST(testDatamanager);
         CPPUNIT_TEST(testBitmap);
-        CPPUNIT_TEST(testDatamanagerThreading);
+        CPPUNIT_TEST(testDatamanagerThreading);*/
     CPPUNIT_TEST_SUITE_END();
     const IContextLogger &logctx;
 
@@ -2795,7 +3145,7 @@ protected:
     {
         Owned<IRowManager> rm1 = createRowManager(0, NULL, logctx, NULL);
         ReleaseRoxieRow(rm1->allocate(1800000, 0));
-        ASSERT(rm1->pages()==0);
+        ASSERT(rm1->numPagesAfterCleanup()==0);
         ASSERT(rm1->getMemoryUsage()==2);
     }
 
@@ -2827,7 +3177,7 @@ protected:
     {
         Owned<IRowManager> rm1 = createRowManager(0, NULL, logctx, NULL);
         ReleaseRoxieRow(rm1->allocate(1000, 0));
-        ASSERT(rm1->pages()==0);
+        ASSERT(rm1->numPagesAfterCleanup()==0);
         ASSERT(rm1->getMemoryUsage()==1);
 
         void *r1 = rm1->allocate(1000, 0);
@@ -2838,13 +3188,13 @@ protected:
         r2 = rm1->allocate(1000, 0);
         ReleaseRoxieRow(r1);
         ReleaseRoxieRow(r2);
-        ASSERT(rm1->pages()==0);
+        ASSERT(rm1->numPagesAfterCleanup()==0);
         ASSERT(rm1->getMemoryUsage()==1);
 
 
         Owned<IRowManager> rm2 = createRowManager(0, NULL, logctx, NULL);
         ReleaseRoxieRow(rm2->allocate(4000000, 0));
-        ASSERT(rm2->pages()==0);
+        ASSERT(rm2->numPagesAfterCleanup()==0);
         ASSERT(rm2->getMemoryUsage()==4);
 
         r1 = rm2->allocate(4000000, 0);
@@ -2855,14 +3205,14 @@ protected:
         r2 = rm2->allocate(4000000, 0);
         ReleaseRoxieRow(r1);
         ReleaseRoxieRow(r2);
-        ASSERT(rm2->pages()==0);
+        ASSERT(rm2->numPagesAfterCleanup()==0);
         ASSERT(rm2->getMemoryUsage()==8);
 
         for (unsigned d = 0; d < 50; d++)
         {
             Owned<IRowManager> rm3 = createRowManager(0, NULL, logctx, NULL);
             ReleaseRoxieRow(rm3->allocate(HEAP_ALIGNMENT_SIZE - d + 10, 0));
-            ASSERT(rm3->pages()==0);
+            ASSERT(rm3->numPagesAfterCleanup()==0);
         }
 
         // test leak reporting does not crash....
@@ -2965,7 +3315,7 @@ protected:
             ASSERT(strcmp(E->errorMessage(s).str(), "memory limit exceeded")==0);
             E->Release();
         }
-        ASSERT(rm1->pages()==20);
+        ASSERT(rm1->numPagesAfterCleanup()==20);
     }
     void testCycling()
     {
@@ -3054,7 +3404,7 @@ protected:
         memset(alloc1, 99, capacity);
         void * alloc2 = rm->allocate(capacity, 0);
         ASSERT(RoxieRowCapacity(alloc2)==capacity);
-        ASSERT(rm->pages()==expectedPages);
+        ASSERT(rm->numPagesAfterCleanup()==expectedPages);
         memset(alloc2, 99, capacity);
         ReleaseRoxieRow(alloc1);
         ReleaseRoxieRow(alloc2);
@@ -3089,6 +3439,7 @@ protected:
         virtual unsigned getActivityId(unsigned cacheId) const { return 0; }
         virtual StringBuffer &getActivityDescriptor(unsigned cacheId, StringBuffer &out) const { return out.append(cacheId); }
         virtual void onDestroy(unsigned cacheId, void *row) const { atomic_inc(&counter); }
+        virtual void checkValid(unsigned cacheId, void *row) const { }
 
         mutable atomic_t counter;
     };
@@ -3099,36 +3450,55 @@ protected:
         CasAllocatorThread(Semaphore & _sem) : Thread("AllocatorThread"), sem(_sem)
         {
         }
+        ~CasAllocatorThread()
+        {
+        }
 
         virtual void * allocate() = 0;
         virtual void * finalize(void * ptr) = 0;
 
+        void setPriority(IRowManager * _rm, unsigned _priority) { rm = _rm; priority = _priority; }
+
         int run()
         {
-            void * saved[numCasAlloc];
+            SimpleRowBuffer saved(priority);
+            if (rm)
+                rm->addRowBuffer(&saved);
             sem.wait();
-            //Allocate two rows and then release 1 trying to trigger potential ABA problems in the cas code.
-            for (unsigned i=0; i < numCasIter; i++)
+            try
             {
-                for (unsigned j=0; j < numCasAlloc; j++)
+                //Allocate two rows and then release 1 trying to trigger potential ABA problems in the cas code.
+                for (unsigned i=0; i < numCasIter; i++)
                 {
-                    //Allocate 2 rows, and add first back on the list again
-                    void * alloc1 = allocate();
-                    void * alloc2 = allocate();
-                    *(unsigned*)alloc1 = 0xdddddddd;
-                    *(unsigned*)alloc2 = 0xdddddddd;
-                    alloc1 = finalize(alloc1);
-                    alloc2 = finalize(alloc2);
-                    ReleaseRoxieRow(alloc1);
-                    saved[j] = alloc2;
+                    for (unsigned j=0; j < numCasAlloc; j++)
+                    {
+                        //Allocate 2 rows, and add first back on the list again
+                        void * alloc1 = allocate();
+                        void * alloc2 = allocate();
+                        *(unsigned*)alloc1 = 0xdddddddd;
+                        *(unsigned*)alloc2 = 0xdddddddd;
+                        alloc1 = finalize(alloc1);
+                        alloc2 = finalize(alloc2);
+                        ReleaseRoxieRow(alloc1);
+                        saved.addRow(alloc2);
+                    }
+                    saved.kill();
                 }
-                for (unsigned j=0; j < numCasAlloc; j++)
-                    ReleaseRoxieRow(saved[j]);
+                if (rm)
+                    rm->removeRowBuffer(&saved);
+            }
+            catch (...)
+            {
+                if (rm)
+                    rm->removeRowBuffer(&saved);
+                throw;
             }
             return 0;
         }
     protected:
         Semaphore & sem;
+        IRowManager * rm;
+        unsigned priority;
     };
     void runCasTest(const char * title, Semaphore & sem, CasAllocatorThread * threads[])
     {
@@ -3357,6 +3727,34 @@ protected:
         testPackedCas();
         testGeneralCas();
     }
+
+    void testCallback(unsigned pages, unsigned spillPages, double scale)
+    {
+        CountingRowAllocatorCache rowCache;
+        Owned<IFixedRowHeap> rowHeap;
+        Owned<IRowManager> rowManager = createRowManager(0, NULL, logctx, &rowCache);
+        rowManager->setMemoryLimit(pages * numCasThreads * HEAP_ALIGNMENT_SIZE, spillPages * numCasThreads * HEAP_ALIGNMENT_SIZE);
+        //For this test the row heap is assign to a variable that will be destroyed after the manager, to ensure that works.
+        rowHeap.setown(rowManager->createFixedRowHeap(0x10000-64, ACTIVITY_FLAG_ISREGISTERED|0, RHFhasdestructor));
+        Semaphore sem;
+        CasAllocatorThread * threads[numCasThreads];
+        for (unsigned i1 = 0; i1 < numCasThreads; i1++)
+        {
+            FixedCasAllocatorThread * cur = new FixedCasAllocatorThread(rowHeap, sem);
+            cur->setPriority(rowManager, (unsigned)(i1*scale)+1);
+            threads[i1] = cur;
+        }
+        runCasTest("hard callback allocator", sem, threads);
+        ASSERT(atomic_read(&rowCache.counter) == 2 * numCasThreads * numCasIter * numCasAlloc);
+    }
+    void testCallbacks()
+    {
+        testCallback(2, 0, 0);
+        testCallback(2, 1, 1);
+        testCallback(10, 5, 1); // 1 at each priority level - can cause exhaustion since rows tend to get left in highest priority.
+        testCallback(10, 5, 0.25);  // 4 at each priority level
+        testCallback(10, 5, 0); // all at the same priority level
+    }
 };
 
 CPPUNIT_TEST_SUITE_REGISTRATION( RoxieMemTests );

+ 76 - 7
roxie/roxiemem/roxiemem.hpp

@@ -33,6 +33,13 @@
  #define roxiemem_decl
 #endif
 
+//Use for asserts that are highly unlikely to occur, and would likely to be reproduced in debug mode.
+#ifdef _DEBUG
+#define dbgassertex(x) assertex(x)
+#else
+#define dbgassertex(x)
+#endif
+
 #ifdef __64BIT__
 #define HEAP_ALIGNMENT_SIZE I64C(0x100000u)                     // 1 mb heaplets - may be too big?
 #else
@@ -57,6 +64,13 @@ interface IRowAllocatorCache
     virtual unsigned getActivityId(unsigned cacheId) const = 0;
     virtual StringBuffer &getActivityDescriptor(unsigned cacheId, StringBuffer &out) const = 0;
     virtual void onDestroy(unsigned cacheId, void *row) const = 0;
+    virtual void checkValid(unsigned cacheId, void *row) const = 0;
+};
+
+interface IBufferedRowCallback
+{
+    virtual unsigned getPriority() const = 0; // lower values get freed up first.
+    virtual bool freeBufferedRows(bool critical) = 0; // return true if managed to free something.
 };
 
 struct roxiemem_decl HeapletBase
@@ -78,6 +92,8 @@ protected:
     virtual bool _isShared(const void *ptr) const = 0;
     virtual size32_t _capacity() const = 0;
     virtual void _setDestructorFlag(const void *ptr) = 0;
+    virtual bool _hasDestructor(const void *ptr) const = 0;
+    virtual unsigned _rawAllocatorId(const void *ptr) const = 0;
     virtual void noteLinked(const void *ptr) = 0;
 
     inline static HeapletBase *findBase(const void *ptr)
@@ -125,11 +141,24 @@ public:
 
     static void setDestructorFlag(const void *ptr)
     {
-        if (ptr)
-        {
-            HeapletBase *h = findBase(ptr);
-            h->_setDestructorFlag(ptr);
-        }
+        dbgassertex(ptr);
+        HeapletBase *h = findBase(ptr);
+        h->_setDestructorFlag(ptr);
+    }
+
+    static bool hasDestructor(const void *ptr)
+    {
+        dbgassertex(ptr);
+        HeapletBase *h = findBase(ptr);
+        return h->_hasDestructor(ptr);
+    }
+
+    static unsigned getAllocatorId(const void *ptr)
+    {
+        dbgassertex(ptr);
+        HeapletBase *h = findBase(ptr);
+        unsigned id = h->_rawAllocatorId(ptr);
+        return (id & ACTIVITY_MASK);
     }
 
     static void releaseClear(const void *&ptr)
@@ -217,6 +246,8 @@ private:
     virtual void released();
     virtual size32_t _capacity() const;
     virtual void _setDestructorFlag(const void *ptr);
+    virtual bool _hasDestructor(const void *ptr) const { return false; }
+    virtual unsigned _rawAllocatorId(const void *ptr) const { return 0; }
 protected:
     DataBuffer()
     {
@@ -247,6 +278,8 @@ private:
     virtual bool _isShared(const void *ptr) const;
     virtual size32_t _capacity() const;
     virtual void _setDestructorFlag(const void *ptr);
+    virtual bool _hasDestructor(const void *ptr) const { return false; }
+    virtual unsigned _rawAllocatorId(const void *ptr) const { return 0; }
     virtual void noteLinked(const void *ptr);
 
 public:
@@ -255,10 +288,15 @@ public:
     void addToFreeChain(DataBufferBase * buffer);
 };
 
+//Actions applied to roxie rows
 #define ReleaseRoxieRow(row) roxiemem::HeapletBase::release(row)
 #define ReleaseClearRoxieRow(row) roxiemem::HeapletBase::releaseClear(row)
 #define LinkRoxieRow(row) roxiemem::HeapletBase::link(row)
+
+//Functions to determine information about roxie rows
 #define RoxieRowCapacity(row)  roxiemem::HeapletBase::capacity(row)
+#define RoxieRowHasDestructor(row)  roxiemem::HeapletBase::hasDestructor(row)
+#define RoxieRowAllocatorId(row) roxiemem::HeapletBase::getAllocatorId(row)
 
 class OwnedRoxieRow;
 class OwnedConstRoxieRow
@@ -333,6 +371,35 @@ private:
 };
 
 
+class roxiemem_decl RoxieRowArray
+{
+public:
+    inline ~RoxieRowArray() { kill(); }
+    inline void add(const void * row, unsigned i) { rows.add(row, i); }
+    inline void append(const void * row) { rows.append(row); }
+    inline const void * get(unsigned i) const { return rows.item(i); }
+    inline const void * getClear(unsigned i) { const void * row = rows.item(i); rows.replace(NULL, i); return row; }
+    inline const void * item(unsigned i) const { return rows.item(i); }
+    inline const void * link(unsigned i) const { const void * row = rows.item(i); if (row) LinkRoxieRow(row); return row; }
+           void set(const void * row, unsigned i);
+    inline void kill()
+    {
+        ForEachItemIn(idx, rows)
+            ReleaseRoxieRow(rows.item(idx));
+        rows.kill();
+    }
+    inline void killClear()
+    {
+        ForEachItemIn(idx, rows)
+            ReleaseRoxieRow(getClear(idx));
+        rows.kill();
+    }
+    inline unsigned ordinality() const { return rows.ordinality(); }
+
+private:
+    ConstPointerArray rows;
+};
+
 
 interface IFixedRowHeap : extends IInterface
 {
@@ -362,9 +429,9 @@ interface IRowManager : extends IInterface
     virtual void *allocate(size32_t size, unsigned activityId) = 0;
     virtual void *resizeRow(void * original, size32_t oldsize, size32_t newsize, unsigned activityId, size32_t &capacity) = 0;
     virtual void *finalizeRow(void *final, size32_t originalSize, size32_t finalSize, unsigned activityId) = 0;
-    virtual void setMemoryLimit(memsize_t size) = 0;
+    virtual void setMemoryLimit(memsize_t size, memsize_t spillSize = 0) = 0;
     virtual unsigned allocated() = 0;
-    virtual unsigned pages() = 0;
+    virtual unsigned numPagesAfterCleanup() = 0; // ensures any empty pages are freed back to the heap
     virtual unsigned getMemoryUsage() = 0;
     virtual bool attachDataBuff(DataBuffer *dataBuff) = 0 ;
     virtual void noteDataBuffReleased(DataBuffer *dataBuff) = 0 ;
@@ -373,6 +440,8 @@ interface IRowManager : extends IInterface
     virtual void checkHeap() = 0;
     virtual IFixedRowHeap * createFixedRowHeap(size32_t fixedSize, unsigned activityId, unsigned roxieHeapFlags) = 0;
     virtual IVariableRowHeap * createVariableRowHeap(unsigned activityId, unsigned roxieHeapFlags) = 0;            // should this be passed the initial size?
+    virtual void addRowBuffer(IBufferedRowCallback * callback);
+    virtual void removeRowBuffer(IBufferedRowCallback * callback);
 };
 
 extern roxiemem_decl void setDataAlignmentSize(unsigned size);

+ 3 - 1
system/jlib/jatomic.hpp

@@ -38,6 +38,8 @@ extern "C"
 #define InterlockedIncrement _InterlockedIncrement
 #pragma intrinsic (_InterlockedDecrement)
 #define InterlockedDecrement _InterlockedDecrement
+#pragma intrinsic (_InterlockedExchangeAdd)
+#define InterlockedExchangeAdd _InterlockedExchangeAdd
 
 typedef volatile long atomic_t;
 #define ATOMIC_INIT(i)                  (i)
@@ -49,7 +51,7 @@ typedef volatile long atomic_t;
 #define atomic_read(v)                  (*v)
 #define atomic_set(v,i)                 ((*v) = (i))
 #define atomic_xchg(i, v)               InterlockedExchange(v, i)
-#define atomic_add(v,i)                 InterlockedAdd(v,i)
+#define atomic_add(v,i)                 InterlockedExchangeAdd(v,i)
 #define atomic_add_exchange(v, i)       InterlockedExchangeAdd(v,i)
 #define atomic_xchg_ptr(p, v)           InterlockedExchangePointer(v,p)
 #if defined (_MSC_VER) && (_MSC_VER <= 1200)

+ 5 - 0
thorlcr/thorutil/thmem.cpp

@@ -101,6 +101,8 @@ public:
     unsigned getActivityId(unsigned cacheId) const;
     StringBuffer &getActivityDescriptor(unsigned cacheId, StringBuffer &out) const;
     void onDestroy(unsigned cacheId, void *row) const;
+    virtual void checkValid(unsigned cacheId, void *row) const;
+
     void reset();  // resets allocators
     void clear();
     size32_t subSize(unsigned cacheId,const void *row) const;
@@ -1203,6 +1205,9 @@ void CThorRowAllocatorCache::onDestroy(unsigned cacheId, void *row) const
 {
     item(cacheId).queryOutputMeta()->destruct((byte *) row); 
 }
+void CThorRowAllocatorCache::checkValid(unsigned cacheId, void *row) const
+{
+}
 
 
 size32_t CThorRowAllocatorCache::subSize(unsigned cacheId, const void *row) const