Просмотр исходного кода

HPCC-10073 Allow a spill priority to be specified on an allocation

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 11 лет назад
Родитель
Сommit
2d21c5e404
3 измененных файлов с 188 добавлено и 81 удалено
  1. 180 76
      roxie/roxiemem/roxiemem.cpp
  2. 5 3
      roxie/roxiemem/roxiemem.hpp
  3. 3 2
      thorlcr/thorutil/thmem.cpp

+ 180 - 76
roxie/roxiemem/roxiemem.cpp

@@ -2015,11 +2015,11 @@ public:
     {
     }
 
-    void * doAllocate(memsize_t _size, unsigned allocatorId);
-    void expandHeap(void * original, memsize_t copysize, memsize_t oldcapacity, memsize_t newsize, unsigned activityId, IRowResizeCallback & callback);
+    void * doAllocate(memsize_t _size, unsigned allocatorId, unsigned maxSpillPriority);
+    bool expandHeap(void * original, memsize_t copysize, memsize_t oldcapacity, memsize_t newsize, unsigned activityId, unsigned maxSpillPriority, IRowResizeCallback & callback);
 
 protected:
-    HugeHeaplet * allocateHeaplet(memsize_t _size, unsigned allocatorId);
+    HugeHeaplet * allocateHeaplet(memsize_t _size, unsigned allocatorId, unsigned maxSpillPriority);
 
     virtual void reportHeapUsage(IActivityMemoryUsageMap * usageMap, unsigned numPages, memsize_t numAllocs) const
     {
@@ -2036,7 +2036,7 @@ public:
         chunksPerPage  = FixedSizeHeaplet::dataAreaSize() / chunkSize;
     }
 
-    void * doAllocate(unsigned allocatorId);
+    void * doAllocate(unsigned allocatorId, unsigned maxSpillPriority);
 
     virtual void reportHeapUsage(IActivityMemoryUsageMap * usageMap, unsigned numPages, memsize_t numAllocs) const
     {
@@ -2048,7 +2048,7 @@ public:
     inline unsigned maxChunksPerPage() const { return chunksPerPage; }
 
 protected:
-    inline void * inlineDoAllocate(unsigned allocatorId);
+    inline void * inlineDoAllocate(unsigned allocatorId, unsigned maxSpillPriority);
     virtual ChunkedHeaplet * allocateHeaplet() = 0;
 
 protected:
@@ -2059,8 +2059,8 @@ protected:
 class CFixedChunkedHeap : public CChunkedHeap
 {
 public:
-    CFixedChunkedHeap(CChunkingRowManager * _rowManager, const IContextLogger &_logctx, const IRowAllocatorCache *_allocatorCache, size32_t _chunkSize, unsigned _flags)
-        : CChunkedHeap(_rowManager, _logctx, _allocatorCache, _chunkSize, _flags)
+    CFixedChunkedHeap(CChunkingRowManager * _rowManager, const IContextLogger &_logctx, const IRowAllocatorCache *_allocatorCache, size32_t _chunkSize, unsigned _flags, unsigned _defaultSpillPriority)
+        : CChunkedHeap(_rowManager, _logctx, _allocatorCache, _chunkSize, _flags), defaultSpillPriority(_defaultSpillPriority)
     {
     }
 
@@ -2075,13 +2075,16 @@ public:
 
 protected:
     virtual ChunkedHeaplet * allocateHeaplet();
+
+protected:
+    unsigned defaultSpillPriority;
 };
 
 class CPackedChunkingHeap : public CChunkedHeap
 {
 public:
-    CPackedChunkingHeap(CChunkingRowManager * _rowManager, const IContextLogger &_logctx, const IRowAllocatorCache *_allocatorCache, size32_t _chunkSize, unsigned _flags, unsigned _allocatorId)
-        : CChunkedHeap(_rowManager, _logctx, _allocatorCache, _chunkSize, _flags), allocatorId(_allocatorId)
+    CPackedChunkingHeap(CChunkingRowManager * _rowManager, const IContextLogger &_logctx, const IRowAllocatorCache *_allocatorCache, size32_t _chunkSize, unsigned _flags, unsigned _allocatorId, unsigned _defaultSpillPriority)
+        : CChunkedHeap(_rowManager, _logctx, _allocatorCache, _chunkSize, _flags), allocatorId(_allocatorId), defaultSpillPriority(_defaultSpillPriority)
     {
     }
 
@@ -2099,6 +2102,7 @@ protected:
 
 protected:
     unsigned allocatorId;
+    unsigned defaultSpillPriority;
 };
 
 //================================================================================
@@ -2206,16 +2210,18 @@ class BufferedRowCallbackManager
                 goSem.wait();
                 if (abort)
                     break;
-                args.result = owner.releaseBuffersNow(args.critical, false, 0);
+                //This class is only used for stress testing => free all we can.
+                unsigned maxSpillPriority = RequiredPriority;
+                args.result = owner.releaseBuffersNow(maxSpillPriority, args.critical, false, 0);
                 doneSem.signal();
             }
             return 0;
         }
 
-        bool releaseBuffers(const bool critical)
+        bool releaseBuffers(unsigned maxSpillPriority, const bool critical)
         {
             if (isCurrentThread())
-                return owner.releaseBuffersNow(critical, false, 0);
+                return owner.releaseBuffersNow(maxSpillPriority, critical, false, 0);
 
             bool ok;
             {
@@ -2258,6 +2264,7 @@ public:
         minCallbackThreshold = 1;
         releaseWhenModifyCallback = false;
         releaseWhenModifyCallbackCritical = false;
+        backgroundReleasePriority = RequiredPriority;
     }
     ~BufferedRowCallbackManager()
     {
@@ -2271,7 +2278,7 @@ public:
     void addRowBuffer(IBufferedRowCallback * callback)
     {
         if (releaseWhenModifyCallback)
-            releaseBuffers(releaseWhenModifyCallbackCritical, false, 0);
+            releaseBuffers(RequiredPriority, releaseWhenModifyCallbackCritical, false, 0);
 
         CriticalBlock block(callbackCrit);
         //Assuming a small number so perform an insertion sort.
@@ -2291,7 +2298,7 @@ public:
     void removeRowBuffer(IBufferedRowCallback * callback)
     {
         if (releaseWhenModifyCallback)
-            releaseBuffers(releaseWhenModifyCallbackCritical, false, 0);
+            releaseBuffers(RequiredPriority, releaseWhenModifyCallbackCritical, false, 0);
 
         CriticalBlock block(callbackCrit);
         rowBufferCallbacks.zap(callback);
@@ -2343,11 +2350,11 @@ public:
     }
 
     //Release buffers will ensure that the rows are attempted to be cleaned up before returning
-    bool releaseBuffers(const bool critical, bool checkSequence, unsigned prevReleaseSeq)
+    bool releaseBuffers(unsigned maxSpillPriority, const bool critical, bool checkSequence, unsigned prevReleaseSeq)
     {
         if (!releaseBuffersThread)
-            return releaseBuffersNow(critical, checkSequence, prevReleaseSeq);
-        return releaseBuffersThread->releaseBuffers(critical);
+            return releaseBuffersNow(maxSpillPriority, critical, checkSequence, prevReleaseSeq);
+        return releaseBuffersThread->releaseBuffers(maxSpillPriority, critical);
     }
 
     void runReleaseBufferThread()
@@ -2357,7 +2364,7 @@ public:
             releaseBuffersSem.wait();
             if (abortBufferThread)
                 break;
-            releaseBuffersNow(false, false, 0);
+            releaseBuffersNow(backgroundReleasePriority, false, false, 0);
             atomic_set(&releasingBuffers, 0);
         }
     }
@@ -2371,8 +2378,9 @@ public:
         }
     }
 
-    void startReleaseBufferThread()
+    void startReleaseBufferThread(unsigned maxSpillPriority)
     {
+        backgroundReleasePriority = maxSpillPriority;
         if (!backgroundReleaseBuffersThread)
         {
             backgroundReleaseBuffersThread.setown(new BackgroundReleaseBufferThread(this));
@@ -2426,7 +2434,7 @@ public:
     }
 
 protected:
-    bool doReleaseBuffers(const bool critical, const unsigned minSuccess)
+    bool doReleaseBuffers(unsigned maxSpillPriority, const bool critical, const unsigned minSuccess)
     {
         const unsigned numCallbacks = rowBufferCallbacks.ordinality();
         if (numCallbacks == 0)
@@ -2437,6 +2445,9 @@ protected:
         //Loop through each set of different priorities
         ForEachItemIn(level, callbackRanges)
         {
+            if (rowBufferCallbacks.item(first)->getPriority() > maxSpillPriority)
+                break;
+
             unsigned last = callbackRanges.item(level);
             unsigned start = nextCallbacks.item(level);
             unsigned cur = start;
@@ -2469,7 +2480,7 @@ protected:
     }
 
     //Release buffers will ensure that the rows are attempted to be cleaned up before returning
-    bool releaseBuffersNow(const bool critical, bool checkSequence, unsigned prevReleaseSeq)
+    bool releaseBuffersNow(unsigned maxSpillPriority, const bool critical, bool checkSequence, unsigned prevReleaseSeq)
     {
         const unsigned minSuccess = minCallbackThreshold;
         CriticalBlock block(callbackCrit);
@@ -2480,7 +2491,7 @@ protected:
 
         //Call non critical first, then critical - if applicable.
         //Should there be more levels of importance than critical/non critical?
-        if (doReleaseBuffers(false, minSuccess) || (critical && doReleaseBuffers(true, minSuccess)))
+        if (doReleaseBuffers(maxSpillPriority, false, minSuccess) || (critical && doReleaseBuffers(maxSpillPriority, true, minSuccess)))
         {
             //Increment first so that any called knows some rows may have been freed
             atomic_inc(&releaseSeq);
@@ -2511,6 +2522,7 @@ protected:
     atomic_t releasingBuffers;  // boolean if pre-emptive releasing thread is active
     atomic_t releaseSeq;
     unsigned minCallbackThreshold;
+    unsigned backgroundReleasePriority;
     bool releaseWhenModifyCallback;
     bool releaseWhenModifyCallbackCritical;
     volatile bool abortBufferThread;
@@ -2598,7 +2610,7 @@ public:
             size32_t rounded = roundup(prevSize+1);
             dbgassertex(ROUNDEDHEAP(rounded) == normalHeaps.ordinality());
             size32_t thisSize = ROUNDEDSIZE(rounded);
-            normalHeaps.append(*new CFixedChunkedHeap(this, _logctx, _allocatorCache, thisSize, RHFvariable));
+            normalHeaps.append(*new CFixedChunkedHeap(this, _logctx, _allocatorCache, thisSize, RHFvariable, RequiredPriority));
             prevSize = thisSize;
         }
         pageLimit = (unsigned) PAGES(_memLimit, HEAP_ALIGNMENT_SIZE);
@@ -2843,13 +2855,26 @@ public:
     {
         beforeAllocate(_size, activityId);
         if (_size > FixedSizeHeaplet::maxHeapSize())
-            return hugeHeap.doAllocate(_size, activityId);
+            return hugeHeap.doAllocate(_size, activityId, RequiredPriority);
+        size32_t size32 = (size32_t) _size;
+
+        size32_t rounded = roundup(size32 + FixedSizeHeaplet::chunkHeaderSize);
+        size32_t whichHeap = ROUNDEDHEAP(rounded);
+        CFixedChunkedHeap & normalHeap = normalHeaps.item(whichHeap);
+        return normalHeap.doAllocate(activityId, RequiredPriority);
+    }
+
+    virtual void *allocate(memsize_t _size, unsigned activityId, unsigned maxSpillPriority)
+    {
+        beforeAllocate(_size, activityId);
+        if (_size > FixedSizeHeaplet::maxHeapSize())
+            return hugeHeap.doAllocate(_size, activityId, maxSpillPriority);
         size32_t size32 = (size32_t) _size;
 
         size32_t rounded = roundup(size32 + FixedSizeHeaplet::chunkHeaderSize);
         size32_t whichHeap = ROUNDEDHEAP(rounded);
         CFixedChunkedHeap & normalHeap = normalHeaps.item(whichHeap);
-        return normalHeap.doAllocate(activityId);
+        return normalHeap.doAllocate(activityId, maxSpillPriority);
     }
 
     virtual const char *cloneVString(size32_t len, const char *str)
@@ -2876,7 +2901,7 @@ public:
             return NULL;
     }
 
-    virtual void setMemoryLimit(memsize_t bytes, memsize_t spillSize)
+    virtual void setMemoryLimit(memsize_t bytes, memsize_t spillSize, unsigned backgroundReleasePriority)
     {
         memsize_t systemMemoryLimit = getTotalMemoryLimit();
         if (bytes > systemMemoryLimit)
@@ -2888,7 +2913,7 @@ public:
 
         //The test allows no limit on memory, but spill above a certain amount.  Not sure if useful...
         if (spillPageLimit && (pageLimit != spillPageLimit))
-            callbacks.startReleaseBufferThread();
+            callbacks.startReleaseBufferThread(backgroundReleasePriority);
         else
             callbacks.stopReleaseBufferThread();
 
@@ -2932,7 +2957,7 @@ public:
         if (curCapacity > FixedSizeHeaplet::maxHeapSize())
         {
             CVariableRowResizeCallback callback(capacity, ptr);
-            hugeHeap.expandHeap(original, copysize, curCapacity, newsize, activityId, callback);
+            hugeHeap.expandHeap(original, copysize, curCapacity, newsize, activityId, RequiredPriority, callback);
             return;
         }
 
@@ -2942,10 +2967,9 @@ public:
         HeapletBase::release(original);
         capacity = newCapacity;
         ptr = ret;
-        return;
     }
 
-    virtual void resizeRow(void * original, memsize_t copysize, memsize_t newsize, unsigned activityId, IRowResizeCallback & callback)
+    virtual bool resizeRow(void * original, memsize_t copysize, memsize_t newsize, unsigned activityId, unsigned maxSpillPriority, IRowResizeCallback & callback)
     {
         assertex(newsize);
         assertex(!HeapletBase::isShared(original));
@@ -2953,20 +2977,19 @@ public:
         if (newsize <= curCapacity)
         {
             //resizeRow never shrinks memory
-            return;
+            return true;
         }
         if (curCapacity > FixedSizeHeaplet::maxHeapSize())
-        {
-            hugeHeap.expandHeap(original, copysize, curCapacity, newsize, activityId, callback);
-            return;
-        }
+            return hugeHeap.expandHeap(original, copysize, curCapacity, newsize, activityId, maxSpillPriority, callback);
 
-        void *ret = allocate(newsize, activityId);
+        void *ret = allocate(newsize, activityId, maxSpillPriority);
+        if (!ret)
+            return false;
         memcpy(ret, original, copysize);
         memsize_t newCapacity = HeapletBase::capacity(ret);
         callback.atomicUpdate(newCapacity, ret);
         HeapletBase::release(original);
-        return;
+        return true;
     }
 
     virtual void *finalizeRow(void * original, memsize_t initialSize, memsize_t finalSize, unsigned activityId)
@@ -3038,7 +3061,10 @@ public:
             dataBuffPages = ndataBuffPages;
         }
         if (needCheck)
-            checkLimit(0);
+        {
+            const unsigned maxSpillPriority = 0;
+            checkLimit(0, maxSpillPriority);
+        }
         return true;
     }
     
@@ -3050,9 +3076,9 @@ public:
                     dataBuffs, dataBuffPages, atomic_read(&possibleGoers), dataBuff, this);
     }
 
-    virtual IFixedRowHeap * createFixedRowHeap(size32_t fixedSize, unsigned activityId, unsigned roxieHeapFlags)
+    virtual IFixedRowHeap * createFixedRowHeap(size32_t fixedSize, unsigned activityId, unsigned roxieHeapFlags, unsigned maxSpillPriority)
     {
-        CRoxieFixedRowHeapBase * rowHeap = doCreateFixedRowHeap(fixedSize, activityId, roxieHeapFlags);
+        CRoxieFixedRowHeapBase * rowHeap = doCreateFixedRowHeap(fixedSize, activityId, roxieHeapFlags, maxSpillPriority);
 
         SpinBlock block(fixedSpinLock);
         //The Row heaps are not linked by the row manager so it can determine when they are released.
@@ -3072,12 +3098,12 @@ public:
         return new CRoxieVariableRowHeap(this, activityId, (RoxieHeapFlags)roxieHeapFlags);
     }
 
-    void checkLimit(unsigned numRequested)
+    bool checkLimit(unsigned numRequested, unsigned maxSpillPriority)
     {
         unsigned totalPages;
         releaseEmptyPages(false);
         if (minimizeFootprint)
-            callbacks.releaseBuffers(minimizeFootprintCritical, false, 0);
+            callbacks.releaseBuffers(RequiredPriority, minimizeFootprintCritical, false, 0);
 
         loop
         {
@@ -3109,7 +3135,7 @@ public:
             //The following reduces the nubmer of times the callback is called, but I'm not sure how this affects
             //performance.  I think better if a single free is likely to free up some memory, and worse if not.
             const bool skipReleaseIfAnotherThreadReleases = true;
-            if (!callbacks.releaseBuffers(true, skipReleaseIfAnotherThreadReleases, lastReleaseSeq))
+            if (!callbacks.releaseBuffers(maxSpillPriority, true, skipReleaseIfAnotherThreadReleases, lastReleaseSeq))
             {
                 //Check if a background thread has freed up some memory.  That can be checked by a comparing value of a counter
                 //which is incremented each time releaseBuffers is successful.
@@ -3120,6 +3146,9 @@ public:
                     releaseEmptyPages(true);
                     if (numHeapPages == atomic_read(&totalHeapPages))
                     {
+                        if (maxSpillPriority != RequiredPriority)
+                            return false;
+
                         logctx.CTXLOG("RoxieMemMgr: Memory limit exceeded - current %u, requested %u, limit %u", pageCount, numRequested, pageLimit);
                         reportMemoryUsage(false);
                         PrintStackReport();
@@ -3140,6 +3169,7 @@ public:
                 getPeakActivityUsage();
             peakPages = totalPages;
         }
+        return true;
     }
 
     void restoreLimit(unsigned numRequested)
@@ -3147,13 +3177,13 @@ public:
         atomic_add(&totalHeapPages, -(int)numRequested);
     }
 
-    bool releaseCallbackMemory(bool critical)
+    bool releaseCallbackMemory(unsigned maxSpillPriority, bool critical)
     {
-        return callbacks.releaseBuffers(critical, false, 0);
+        return callbacks.releaseBuffers(maxSpillPriority, critical, false, 0);
     }
 
 protected:
-    CRoxieFixedRowHeapBase * doCreateFixedRowHeap(size32_t fixedSize, unsigned activityId, unsigned roxieHeapFlags)
+    CRoxieFixedRowHeapBase * doCreateFixedRowHeap(size32_t fixedSize, unsigned activityId, unsigned roxieHeapFlags, unsigned maxSpillPriority)
     {
         if ((roxieHeapFlags & RHFoldfixed) || (fixedSize > FixedSizeHeaplet::maxHeapSize()))
             return new CRoxieFixedRowHeap(this, activityId, (RoxieHeapFlags)roxieHeapFlags, fixedSize);
@@ -3161,12 +3191,12 @@ protected:
         unsigned heapFlags = roxieHeapFlags & (RHFunique|RHFpacked);
         if (heapFlags & RHFpacked)
         {
-            CPackedChunkingHeap * heap = createPackedHeap(fixedSize, activityId, heapFlags);
+            CPackedChunkingHeap * heap = createPackedHeap(fixedSize, activityId, heapFlags, maxSpillPriority);
             return new CRoxieDirectPackedRowHeap(this, activityId, (RoxieHeapFlags)roxieHeapFlags, heap);
         }
         else
         {
-            CFixedChunkedHeap * heap = createFixedHeap(fixedSize, activityId, heapFlags);
+            CFixedChunkedHeap * heap = createFixedHeap(fixedSize, activityId, heapFlags, maxSpillPriority);
             return new CRoxieDirectFixedRowHeap(this, activityId, (RoxieHeapFlags)roxieHeapFlags, heap);
         }
     }
@@ -3202,7 +3232,7 @@ protected:
         return map.getClear();
     }
 
-    CFixedChunkedHeap * createFixedHeap(size32_t size, unsigned activityId, unsigned flags)
+    CFixedChunkedHeap * createFixedHeap(size32_t size, unsigned activityId, unsigned flags, unsigned maxSpillPriority)
     {
         dbgassertex(!(flags & RHFpacked));
         size32_t rounded = roundup(size + FixedSizeHeaplet::chunkHeaderSize);
@@ -3226,12 +3256,12 @@ protected:
                 return static_cast<CFixedChunkedHeap *>(match);
         }
 
-        CFixedChunkedHeap * heap = new CFixedChunkedHeap(this, logctx, allocatorCache, chunkSize, flags);
+        CFixedChunkedHeap * heap = new CFixedChunkedHeap(this, logctx, allocatorCache, chunkSize, flags, maxSpillPriority);
         fixedHeaps.append(*LINK(heap));
         return heap;
     }
 
-    CPackedChunkingHeap * createPackedHeap(size32_t size, unsigned activityId, unsigned flags)
+    CPackedChunkingHeap * createPackedHeap(size32_t size, unsigned activityId, unsigned flags, unsigned maxSpillPriority)
     {
         dbgassertex(flags & RHFpacked);
         //Must be 4 byte aligned otherwise the atomic increments on the counts may not be atomic
@@ -3247,7 +3277,7 @@ protected:
                 return static_cast<CPackedChunkingHeap *>(match);
         }
 
-        CPackedChunkingHeap * heap = new CPackedChunkingHeap(this, logctx, allocatorCache, chunkSize, flags, activityId);
+        CPackedChunkingHeap * heap = new CPackedChunkingHeap(this, logctx, allocatorCache, chunkSize, flags, activityId, maxSpillPriority);
         fixedHeaps.append(*LINK(heap));
         return heap;
     }
@@ -3413,28 +3443,42 @@ void * CRoxieVariableRowHeap::finalizeRow(void *final, memsize_t originalSize, m
 //================================================================================
 
 //MORE: Make this a nested class??
-HugeHeaplet * CHugeHeap::allocateHeaplet(memsize_t _size, unsigned allocatorId)
+HugeHeaplet * CHugeHeap::allocateHeaplet(memsize_t _size, unsigned allocatorId, unsigned maxSpillPriority)
 {
     unsigned numPages = PAGES(_size + HugeHeaplet::dataOffset(), HEAP_ALIGNMENT_SIZE);
 
     loop
     {
-        rowManager->checkLimit(numPages);
-        //If the allocation fails, then try and free some memory by calling the callbacks
+        if (!rowManager->checkLimit(numPages, maxSpillPriority))
+            return NULL;
 
+        //If the allocation fails, then try and free some memory by calling the callbacks
         void * memory = suballoc_aligned(numPages, true);
         if (memory)
             return new (memory) HugeHeaplet(this, allocatorCache, _size, allocatorId);
 
         rowManager->restoreLimit(numPages);
-        if (!rowManager->releaseCallbackMemory(true))
-            throwHeapExhausted(numPages);
+        if (!rowManager->releaseCallbackMemory(maxSpillPriority, true))
+        {
+            if (maxSpillPriority == RequiredPriority)
+                throwHeapExhausted(numPages);
+            return NULL;
+        }
     }
 }
 
-void * CHugeHeap::doAllocate(memsize_t _size, unsigned allocatorId)
+void * CHugeHeap::doAllocate(memsize_t _size, unsigned allocatorId, unsigned maxSpillPriority)
 {
-    HugeHeaplet *head = allocateHeaplet(_size, allocatorId);
+    HugeHeaplet *head = allocateHeaplet(_size, allocatorId, maxSpillPriority);
+    if (!head)
+    {
+        if (memTraceLevel >= 2)
+        {
+            logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager::allocate(size %"I64F"u) failed to allocate a block - pageLimit=%u peakPages=%u rowMgr=%p",
+                (unsigned __int64) _size, rowManager->pageLimit, rowManager->peakPages, this);
+        }
+        return NULL;
+    }
 
     if (memTraceLevel >= 2 || (memTraceSizeLimit && _size >= memTraceSizeLimit))
     {
@@ -3449,7 +3493,7 @@ void * CHugeHeap::doAllocate(memsize_t _size, unsigned allocatorId)
     return head->allocateHuge(_size);
 }
 
-void CHugeHeap::expandHeap(void * original, memsize_t copysize, memsize_t oldcapacity, memsize_t newsize, unsigned activityId, IRowResizeCallback & callback)
+bool CHugeHeap::expandHeap(void * original, memsize_t copysize, memsize_t oldcapacity, memsize_t newsize, unsigned activityId, unsigned maxSpillPriority, IRowResizeCallback & callback)
 {
     unsigned newPages = PAGES(newsize + HugeHeaplet::dataOffset(), HEAP_ALIGNMENT_SIZE);
     unsigned oldPages = PAGES(oldcapacity + HugeHeaplet::dataOffset(), HEAP_ALIGNMENT_SIZE);
@@ -3461,7 +3505,8 @@ void CHugeHeap::expandHeap(void * original, memsize_t copysize, memsize_t oldcap
         // NOTE: we request permission only for the difference between the old
         // and new sizes, even though we may temporarily hold both. This not only
         // simplifies the code considerably, it's probably desirable
-        rowManager->checkLimit(numPages);
+        if (!rowManager->checkLimit(numPages, maxSpillPriority))
+            return false;
 
         bool release = false;
         void *realloced = subrealloc_aligned(oldbase, oldPages, newPages);
@@ -3537,20 +3582,24 @@ void CHugeHeap::expandHeap(void * original, memsize_t copysize, memsize_t oldcap
                 }
             }
 
-            return;
+            return true;
         }
 
         //If the allocation fails, then try and free some memory by calling the callbacks
 
         rowManager->restoreLimit(numPages);
-        if (!rowManager->releaseCallbackMemory(true))
-            throwHeapExhausted(numPages);
+        if (!rowManager->releaseCallbackMemory(maxSpillPriority, true))
+        {
+            if (maxSpillPriority == RequiredPriority)
+                throwHeapExhausted(numPages);
+            return false;
+        }
     }
 }
 
 
 //An inline function used to common up the allocation code for fixed and non fixed sizes.
-void * CChunkedHeap::inlineDoAllocate(unsigned allocatorId)
+void * CChunkedHeap::inlineDoAllocate(unsigned allocatorId, unsigned maxSpillPriority)
 {
     //Only hold the spinblock while walking the list - so subsequent calls to checkLimit don't deadlock.
     //NB: The allocation is split into two - finger->allocateChunk, and finger->initializeChunk().
@@ -3582,13 +3631,19 @@ void * CChunkedHeap::inlineDoAllocate(unsigned allocatorId)
 
     loop
     {
-        rowManager->checkLimit(1);
+        if (!rowManager->checkLimit(1, maxSpillPriority))
+            return NULL;
+
         donorHeaplet = allocateHeaplet();
         if (donorHeaplet)
             break;
         rowManager->restoreLimit(1);
-        if (!rowManager->releaseCallbackMemory(true))
-            throwHeapExhausted(1);
+        if (!rowManager->releaseCallbackMemory(maxSpillPriority, true))
+        {
+            if (maxSpillPriority == RequiredPriority)
+                throwHeapExhausted(1);
+            return NULL;
+        }
     }
 
     if (memTraceLevel >= 5 || (memTraceLevel >= 3 && chunkSize > 32000))
@@ -3647,9 +3702,9 @@ const void * CChunkedHeap::compactRow(const void * ptr, HeapCompactState & state
     return ptr;
 }
 
-void * CChunkedHeap::doAllocate(unsigned activityId)
+void * CChunkedHeap::doAllocate(unsigned activityId, unsigned maxSpillPriority)
 {
-    return inlineDoAllocate(activityId);
+    return inlineDoAllocate(activityId, maxSpillPriority);
 }
 
 //================================================================================
@@ -3665,7 +3720,7 @@ ChunkedHeaplet * CFixedChunkedHeap::allocateHeaplet()
 void * CFixedChunkedHeap::allocate(unsigned activityId)
 {
     rowManager->beforeAllocate(chunkSize-FixedSizeHeaplet::chunkHeaderSize, activityId);
-    return inlineDoAllocate(activityId);
+    return inlineDoAllocate(activityId, defaultSpillPriority);
 }
 
 
@@ -3680,7 +3735,7 @@ ChunkedHeaplet * CPackedChunkingHeap::allocateHeaplet()
 void * CPackedChunkingHeap::allocate()
 {
     rowManager->beforeAllocate(chunkSize-PackedFixedSizeHeaplet::chunkHeaderSize, allocatorId);
-    return inlineDoAllocate(allocatorId);
+    return inlineDoAllocate(allocatorId, defaultSpillPriority);
 }
 
 
@@ -4150,6 +4205,13 @@ public:
         row.setown(rowManager->allocate(size, 0));
     }
 
+    void priorityAllocate(unsigned allocPriority)
+    {
+        row.setown(rowManager->allocate(size, 0, allocPriority));
+    }
+
+    inline bool hasRow() const { return row != NULL; }
+
 protected:
     OwnedRoxieRow row;
     IRowManager * rowManager;
@@ -4213,6 +4275,7 @@ class RoxieMemTests : public CppUnit::TestFixture
 {
     CPPUNIT_TEST_SUITE( RoxieMemTests );
         CPPUNIT_TEST(testSetup);
+        CPPUNIT_TEST(testPriorityCallbacks);
         CPPUNIT_TEST(testRoundup);
         CPPUNIT_TEST(testBitmapThreading);
         CPPUNIT_TEST(testAllocSize);
@@ -5016,7 +5079,7 @@ protected:
         Owned<IRowManager> rowManager = createRowManager(0, NULL, logctx, NULL);
         CountingRowAllocatorCache rowCache;
         void * memory = suballoc_aligned(1, true);
-        CFixedChunkedHeap dummyHeap((CChunkingRowManager*)rowManager.get(), logctx, &rowCache, 32, 0);
+        CFixedChunkedHeap dummyHeap((CChunkingRowManager*)rowManager.get(), logctx, &rowCache, 32, 0, RequiredPriority);
         FixedSizeHeaplet * heaplet = new (memory) FixedSizeHeaplet(&dummyHeap, &rowCache, 32);
         Semaphore sem;
         CasAllocatorThread * threads[numCasThreads];
@@ -5413,6 +5476,47 @@ protected:
         testRecursiveCallbacks1();
         testRecursiveCallbacks2();
     }
+    void testPriorityCallbacks1()
+    {
+        //Test with a limit set on the memory manager
+        const size32_t bigRowSize = HEAP_ALIGNMENT_SIZE * 2 / 3;
+        Owned<IRowManager> rowManager = createRowManager(1, NULL, logctx, NULL);
+
+        SimpleCallbackBlockAllocator alloc1(rowManager, bigRowSize, 20);
+        SimpleCallbackBlockAllocator alloc2(rowManager, bigRowSize, 10);
+
+        alloc1.allocate();
+        ASSERT(alloc1.hasRow());
+        alloc2.priorityAllocate(10);
+        ASSERT(alloc1.hasRow());
+        ASSERT(!alloc2.hasRow());
+        alloc2.priorityAllocate(20);
+        ASSERT(!alloc1.hasRow());
+        ASSERT(alloc2.hasRow());
+    }
+    void testPriorityCallbacks2()
+    {
+        //Test with no limit set on the memory manager
+        Owned<IRowManager> rowManager = createRowManager(0, NULL, logctx, NULL);
+
+        const memsize_t bigRowSize = HEAP_ALIGNMENT_SIZE * (heapTotalPages * 2 / 3);
+        SimpleCallbackBlockAllocator alloc1(rowManager, bigRowSize, 20);
+        SimpleCallbackBlockAllocator alloc2(rowManager, bigRowSize, 10);
+
+        alloc1.allocate();
+        ASSERT(alloc1.hasRow());
+        alloc2.priorityAllocate(10);
+        ASSERT(alloc1.hasRow());
+        ASSERT(!alloc2.hasRow());
+        alloc2.priorityAllocate(20);
+        ASSERT(!alloc1.hasRow());
+        ASSERT(alloc2.hasRow());
+    }
+    void testPriorityCallbacks()
+    {
+        testPriorityCallbacks1();
+        testPriorityCallbacks2();
+    }
 };
 
 class CSimpleRowResizeCallback : public CVariableRowResizeCallback
@@ -5566,7 +5670,7 @@ protected:
                 memsize_t nextSize = (memsize_t)(requestSize*1.25);
                 memsize_t curSize = RoxieRowCapacity(prev);
                 CSimpleRowResizeCallback callback(curSize, prev);
-                rowManager->resizeRow(prev, requestSize, nextSize, 1, callback);
+                rowManager->resizeRow(prev, requestSize, nextSize, 1, RequiredPriority, callback);
                 ASSERT(curSize >= nextSize);
                 requestSize = nextSize;
             }
@@ -5600,9 +5704,9 @@ protected:
                 memsize_t newSize2 = RoxieRowCapacity(prev2);
                 CSimpleRowResizeCallback callback1(newSize1, prev1);
                 CSimpleRowResizeCallback callback2(newSize2, prev2);
-                rowManager->resizeRow(prev1, requestSize, nextSize, 1, callback1);
+                rowManager->resizeRow(prev1, requestSize, nextSize, 1, RequiredPriority, callback1);
                 ASSERT(newSize1 >= nextSize);
-                rowManager->resizeRow(prev2, requestSize, nextSize, 1, callback2);
+                rowManager->resizeRow(prev2, requestSize, nextSize, 1, RequiredPriority, callback2);
                 ASSERT(newSize2 >= nextSize);
                 requestSize = nextSize;
             }

+ 5 - 3
roxie/roxiemem/roxiemem.hpp

@@ -79,6 +79,7 @@ interface IRowAllocatorCache : extends IInterface
 //memory.  E.g., sorts can spill to disk, read ahead buffers can reduce the number being readahead etc.
 //Lower priority callbacks are called before higher priority.
 //The freeBufferedRows will call all callbacks with critical=false, before calling with critical=true
+const static unsigned RequiredPriority = (unsigned)-1;
 interface IBufferedRowCallback
 {
     virtual unsigned getPriority() const = 0; // lower values get freed up first.
@@ -423,9 +424,10 @@ interface IRowResizeCallback
 interface IRowManager : extends IInterface
 {
     virtual void *allocate(memsize_t size, unsigned activityId) = 0;
+    virtual void *allocate(memsize_t _size, unsigned activityId, unsigned maxSpillPriority) = 0;
     virtual const char *cloneVString(const char *str) = 0;
     virtual const char *cloneVString(size32_t len, const char *str) = 0;
-    virtual void resizeRow(void * original, memsize_t copysize, memsize_t newsize, unsigned activityId, IRowResizeCallback & callback) = 0;
+    virtual bool resizeRow(void * original, memsize_t copysize, memsize_t newsize, unsigned activityId, unsigned maxSpillPriority, IRowResizeCallback & callback) = 0;
     virtual void resizeRow(memsize_t & capacity, void * & original, memsize_t copysize, memsize_t newsize, unsigned activityId) = 0;
     virtual void *finalizeRow(void *final, memsize_t originalSize, memsize_t finalSize, unsigned activityId) = 0;
     virtual unsigned allocated() = 0;
@@ -436,7 +438,7 @@ interface IRowManager : extends IInterface
     virtual void noteDataBuffReleased(DataBuffer *dataBuff) = 0 ;
     virtual void reportLeaks() = 0;
     virtual void checkHeap() = 0;
-    virtual IFixedRowHeap * createFixedRowHeap(size32_t fixedSize, unsigned activityId, unsigned roxieHeapFlags) = 0;
+    virtual IFixedRowHeap * createFixedRowHeap(size32_t fixedSize, unsigned activityId, unsigned roxieHeapFlags, unsigned maxSpillPriority = RequiredPriority) = 0;
     virtual IVariableRowHeap * createVariableRowHeap(unsigned activityId, unsigned roxieHeapFlags) = 0;            // should this be passed the initial size?
     virtual void addRowBuffer(IBufferedRowCallback * callback) = 0;
     virtual void removeRowBuffer(IBufferedRowCallback * callback) = 0;
@@ -447,7 +449,7 @@ interface IRowManager : extends IInterface
 
 //Allow various options to be configured
     virtual void setActivityTracking(bool val) = 0;
-    virtual void setMemoryLimit(memsize_t size, memsize_t spillSize = 0) = 0;  // First size is max memory, second is the limit which will trigger a background thread to reduce memory
+    virtual void setMemoryLimit(memsize_t size, memsize_t spillSize = 0, unsigned backgroundReleasePriority = RequiredPriority) = 0;  // First size is max memory, second is the limit which will trigger a background thread to reduce memory
 
     //set the number of callbacks that successfully free some memory before deciding it is good enough.
     //Default is 1, use -1 to free all possible memory whenever an out of memory occurs

+ 3 - 2
thorlcr/thorutil/thmem.cpp

@@ -464,11 +464,12 @@ bool CThorExpandingRowArray::resizeRowTable(void **oldRows, memsize_t newCapacit
 {
     try
     {
+        unsigned spillPriority = roxiemem::RequiredPriority;
         if (oldRows)
-            rowManager->resizeRow(oldRows, copy?RoxieRowCapacity(oldRows):0, newCapacity, activity.queryContainer().queryId(), callback);
+            rowManager->resizeRow(oldRows, copy?RoxieRowCapacity(oldRows):0, newCapacity, activity.queryContainer().queryId(), spillPriority, callback);
         else
         {
-            void **newRows = (void **)rowManager->allocate(newCapacity, activity.queryContainer().queryId());
+            void **newRows = (void **)rowManager->allocate(newCapacity, activity.queryContainer().queryId(), spillPriority);
             callback.atomicUpdate(RoxieRowCapacity(newRows), newRows);
         }
     }