Browse Source

HPCC-15995 Implementation of scanning heaplets

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 9 years ago
parent
commit
37d9088c2e
4 changed files with 234 additions and 146 deletions
  1. 10 1
      roxie/ccd/ccdserver.cpp
  2. 1 0
      roxie/ccd/ccdserver.hpp
  3. 221 145
      roxie/roxiemem/roxiemem.cpp
  4. 2 0
      roxie/roxiemem/roxiemem.hpp

+ 10 - 1
roxie/ccd/ccdserver.cpp

@@ -385,6 +385,7 @@ protected:
     bool optStableInput = true; // is the input forced to ordered?
     bool optUnstableInput = false;  // is the input forced to unordered?
     bool optUnordered = false; // is the output specified as unordered?
+    unsigned heapFlags;
 
     mutable CriticalSection statsCrit;
     mutable __int64 processed;
@@ -405,6 +406,7 @@ public:
         dependentCount = 0;
         optParallel = _graphNode.getPropInt("att[@name='parallel']/@value", 0);
         optUnordered = !_graphNode.getPropBool("att[@name='ordered']/@value", true);
+        heapFlags = _graphNode.getPropInt("hint[@name='heap']/@value", 0);
     }
     
     ~CRoxieServerActivityFactoryBase()
@@ -614,6 +616,10 @@ public:
     {
         return actStatistics; // Overridden by anyone that needs more
     }
+    virtual unsigned getHeapFlags() const
+    {
+        return heapFlags;
+    }
 };
 
 class CRoxieServerMultiInputInfo
@@ -1643,7 +1649,10 @@ public:
       : parent(_parent), inputStream(_inputStream), stats(parent.queryStatsMapping()), timeActivities(_parent.timeActivities), abortRequested(false)
     {
         if (needsAllocator)
-            rowAllocator = parent.queryContext()->getRowAllocatorEx(parent.queryOutputMeta(), parent.queryId(), roxiemem::RHFunique);
+        {
+            unsigned extraFlags = _parent.factory->getHeapFlags();
+            rowAllocator = parent.queryContext()->getRowAllocatorEx(parent.queryOutputMeta(), parent.queryId(), (roxiemem::RoxieHeapFlags)(roxiemem::RHFunique|extraFlags));
+        }
         else
             rowAllocator = NULL;
     }

+ 1 - 0
roxie/ccd/ccdserver.hpp

@@ -219,6 +219,7 @@ interface IRoxieServerActivityFactory : extends IActivityFactory
     virtual unsigned numInputs() const = 0;
     virtual const StatisticsMapping &queryStatsMapping() const = 0;
     virtual bool isInputOrdered(bool consumerOrdered, unsigned idx) const = 0;
+    virtual unsigned getHeapFlags() const = 0;
 };
 interface IGraphResult : public IInterface
 {

+ 221 - 145
roxie/roxiemem/roxiemem.cpp

@@ -61,6 +61,7 @@
 namespace roxiemem {
 
 #define NOTIFY_UNUSED_PAGES_ON_FREE     // avoid linux swapping 'freed' pages to disk
+//#define ALWAYS_USE_SCAN_HEAP            // option to test out using the scanning heaplets
 
 //The following constants should probably be tuned depending on the architecture - see Tuning Test at the end of the file.
 #define DEFAULT_PARALLEL_SYNC_RELEASE_GRANULARITY       2048        // default
@@ -782,7 +783,7 @@ static void subfree_aligned(void *ptr, unsigned pages = 1)
         loop
         {
             unsigned prev = heapBitmap[wordOffset];
-            if ((prev & mask) != 0)
+            if (unlikely((prev & mask) != 0))
                 HEAPERROR("RoxieMemMgr: Page freed twice");
 
             unsigned next = prev | mask;
@@ -829,7 +830,7 @@ static void clearBits(unsigned start, unsigned len)
         unsigned heapword = heapBitmap[wordOffset];
         while (len--)
         {
-            if ((heapword & mask) == 0)
+            if (unlikely((heapword & mask) == 0))
                 HEAPERROR("RoxieMemMgr: Page freed twice");
             heapword &= ~mask;
             mask <<= 1;
@@ -1314,6 +1315,8 @@ public:
 
 #define HEAPLET_DATA_AREA_OFFSET(heapletType) ((size32_t) ((sizeof(heapletType) + CACHE_LINE_SIZE - 1) / CACHE_LINE_SIZE) * CACHE_LINE_SIZE)
 
+const static unsigned FREE_ROW_COUNT = (unsigned)-1;
+
 class CChunkedHeap;
 class ChunkedHeaplet : public Heaplet
 {
@@ -1322,6 +1325,8 @@ protected:
     std::atomic_uint freeBase;
     const size32_t chunkSize;
     unsigned sharedAllocatorId;
+    unsigned heapFlags;
+    unsigned nextMatchOffset = 0;
 
     inline char *data() const
     {
@@ -1332,8 +1337,8 @@ protected:
     //classes, but that means it is hard to common up some of the code efficiently
 
 public:
-    ChunkedHeaplet(CHeap * _heap, const IRowAllocatorCache *_allocatorCache, size32_t _chunkSize, size32_t _chunkCapacity)
-        : Heaplet(_heap, _allocatorCache, _chunkCapacity), r_blocks(0), freeBase(0), chunkSize(_chunkSize)
+    ChunkedHeaplet(CHeap * _heap, const IRowAllocatorCache *_allocatorCache, size32_t _chunkSize, size32_t _chunkCapacity, unsigned _heapFlags)
+        : Heaplet(_heap, _allocatorCache, _chunkCapacity), r_blocks(0), freeBase(0), chunkSize(_chunkSize), heapFlags(_heapFlags)
     {
         sharedAllocatorId = 0;
     }
@@ -1352,11 +1357,7 @@ public:
         return (bytesFree >= chunkSize);
     }
 
-    virtual bool isFull() const
-    {
-        //Has all the space been allocated at least once, and is the free chain empty.
-        return !hasAnyUnallocatedSpace() && (r_blocks.load(std::memory_order_relaxed) & RBLOCKS_OFFSET_MASK) == 0;
-    }
+    virtual bool isFull() const;
 
     inline static unsigned dataOffset() { return HEAPLET_DATA_AREA_OFFSET(ChunkedHeaplet); }
 
@@ -1412,6 +1413,8 @@ public:
 
     void precreateFreeChain()
     {
+        if (heapFlags & RHFnofragment)
+            return;
         //The function is to aid testing - it allows the cas code to be tested without a surrounding lock
         //Allocate all possible rows and add them to the free space map.
         //This is not worth doing in general because it effectively replaces atomic_sets with atomic_cas
@@ -1472,30 +1475,39 @@ protected:
 
     inline void inlineReleasePointer(char * ptr)
     {
-        unsigned r_ptr = makeRelative(ptr);
-#ifdef HAS_EFFICIENT_CAS
-        unsigned old_blocks = r_blocks.load(std::memory_order_relaxed);  // can be relaxed because the cas will fail if not up to date.
-#endif
-        loop
+        if (heapFlags & RHFnofragment)
         {
-#ifndef HAS_EFFICIENT_CAS
+            ((std::atomic_uint *)ptr)->store(FREE_ROW_COUNT, std::memory_order_release);
+            if (nextSpace.load(std::memory_order_relaxed) == 0)
+                addToSpaceList();
+        }
+        else
+        {
+            unsigned r_ptr = makeRelative(ptr);
+#ifdef HAS_EFFICIENT_CAS
             unsigned old_blocks = r_blocks.load(std::memory_order_relaxed);  // can be relaxed because the cas will fail if not up to date.
 #endif
-            //To prevent the ABA problem the top part of r_blocks stores an incrementing tag
-            //which is incremented whenever something is added to the free list
-            * (unsigned *) ptr = (old_blocks & RBLOCKS_OFFSET_MASK);
-            unsigned new_tag = ((old_blocks & RBLOCKS_CAS_TAG_MASK) + RBLOCKS_CAS_TAG);
-            unsigned new_blocks = new_tag | r_ptr;
-
-            //memory_order_release ensures updates to next and count etc are available once the cas completes.
-            if (compare_exchange_efficient(r_blocks, old_blocks, new_blocks, std::memory_order_release, std::memory_order_relaxed))
+            loop
             {
-                //Try and add it to the potentially free page chain if it isn't already present.
-                //It is impossible to make it more restrictive -e.g., only when freeing and full because of
-                //various race conditions.
-                if (nextSpace.load(std::memory_order_relaxed) == 0)
-                    addToSpaceList();
-                break;
+#ifndef HAS_EFFICIENT_CAS
+                unsigned old_blocks = r_blocks.load(std::memory_order_relaxed);  // can be relaxed because the cas will fail if not up to date.
+#endif
+                //To prevent the ABA problem the top part of r_blocks stores an incrementing tag
+                //which is incremented whenever something is added to the free list
+                * (unsigned *) ptr = (old_blocks & RBLOCKS_OFFSET_MASK);
+                unsigned new_tag = ((old_blocks & RBLOCKS_CAS_TAG_MASK) + RBLOCKS_CAS_TAG);
+                unsigned new_blocks = new_tag | r_ptr;
+
+                //memory_order_release ensures updates to next and count etc are available once the cas completes.
+                if (compare_exchange_efficient(r_blocks, old_blocks, new_blocks, std::memory_order_release, std::memory_order_relaxed))
+                {
+                    //Try and add it to the potentially free page chain if it isn't already present.
+                    //It is impossible to make it more restrictive -e.g., only when freeing and full because of
+                    //various race conditions.
+                    if (nextSpace.load(std::memory_order_relaxed) == 0)
+                        addToSpaceList();
+                    break;
+                }
             }
         }
 
@@ -1523,8 +1535,8 @@ class FixedSizeHeaplet : public ChunkedHeaplet
 public:
     enum { chunkHeaderSize = sizeof(ChunkHeader) };
 
-    FixedSizeHeaplet(CHeap * _heap, const IRowAllocatorCache *_allocatorCache, size32_t size)
-    : ChunkedHeaplet(_heap, _allocatorCache, size, size - chunkHeaderSize)
+    FixedSizeHeaplet(CHeap * _heap, const IRowAllocatorCache *_allocatorCache, size32_t size, unsigned _heapFlags)
+    : ChunkedHeaplet(_heap, _allocatorCache, size, size - chunkHeaderSize, _heapFlags)
     {
     }
 
@@ -1666,13 +1678,24 @@ public:
         while (leaked > 0 && base < limit)
         {
             const char *block = data() + base;
-            const char *ptr = block + (chunkSize-chunkCapacity);  // assumes the overhead is all at the start
-            std::atomic_uint * ptrCount = (std::atomic_uint *)(ptr - sizeof(std::atomic_uint));
-            unsigned rowCount = ptrCount->load(std::memory_order_relaxed);
-            if (ROWCOUNT(rowCount) != 0)
+            if (heapFlags & RHFnofragment)
+            {
+                if (((std::atomic_uint *)block)->load(std::memory_order_relaxed) != FREE_ROW_COUNT)
+                {
+                    reportLeak(block, logctx);
+                    leaked--;
+                }
+            }
+            else
             {
-                reportLeak(block, logctx);
-                leaked--;
+                const char *ptr = block + (chunkSize-chunkCapacity);  // assumes the overhead is all at the start
+                std::atomic_uint * ptrCount = (std::atomic_uint *)(ptr - sizeof(std::atomic_uint));
+                unsigned rowCount = ptrCount->load(std::memory_order_relaxed);
+                if (ROWCOUNT(rowCount) != 0)
+                {
+                    reportLeak(block, logctx);
+                    leaked--;
+                }
             }
             base += chunkSize;
         }
@@ -1752,7 +1775,7 @@ private:
                 PrintStackReport();
         }
 #endif
-        if ((header->allocatorId & ~ACTIVITY_MASK) != ACTIVITY_MAGIC)
+        if (unlikely((header->allocatorId & ~ACTIVITY_MASK) != ACTIVITY_MAGIC))
         {
             DBGLOG("%s: Invalid pointer %p id(%x) cnt(%x)", reason, ptr, header->allocatorId, header->count.load());
             PrintStackReport();
@@ -1787,8 +1810,8 @@ class PackedFixedSizeHeaplet : public ChunkedHeaplet
 public:
     enum { chunkHeaderSize = sizeof(ChunkHeader) };
 
-    PackedFixedSizeHeaplet(CHeap * _heap, const IRowAllocatorCache *_allocatorCache, size32_t size, unsigned _allocatorId)
-        : ChunkedHeaplet(_heap, _allocatorCache, size, size - chunkHeaderSize)
+    PackedFixedSizeHeaplet(CHeap * _heap, const IRowAllocatorCache *_allocatorCache, size32_t size, unsigned _allocatorId, unsigned _heapFlags)
+        : ChunkedHeaplet(_heap, _allocatorCache, size, size - chunkHeaderSize, _heapFlags)
     {
         sharedAllocatorId = _allocatorId;
     }
@@ -2312,37 +2335,24 @@ protected:
 };
 
 
+template <class T>
 class CRoxieDirectFixedRowHeap : public CRoxieFixedRowHeapBase
 {
 public:
-    CRoxieDirectFixedRowHeap(CChunkingRowManager * _rowManager, unsigned _allocatorId, RoxieHeapFlags _flags, CFixedChunkedHeap * _heap)
+    CRoxieDirectFixedRowHeap(CChunkingRowManager * _rowManager, unsigned _allocatorId, RoxieHeapFlags _flags, T * _heap)
         : CRoxieFixedRowHeapBase(_rowManager, _allocatorId, _flags), heap(_heap)
     {
     }
-    ~CRoxieDirectFixedRowHeap();
-
-    virtual void *allocate();
-
-    virtual void clearRowManager()
+    ~CRoxieDirectFixedRowHeap()
     {
-        heap.clear();
-        CRoxieFixedRowHeapBase::clearRowManager();
+        if (heap && (flags & RHFunique))
+            heap->noteOrphaned();
     }
 
-protected:
-    Owned<CFixedChunkedHeap> heap;
-};
-
-class CRoxieDirectPackedRowHeap : public CRoxieFixedRowHeapBase
-{
-public:
-    CRoxieDirectPackedRowHeap(CChunkingRowManager * _rowManager, unsigned _allocatorId, RoxieHeapFlags _flags, CPackedChunkingHeap * _heap)
-        : CRoxieFixedRowHeapBase(_rowManager, _allocatorId, _flags), heap(_heap)
+    virtual void *allocate()
     {
+        return heap->allocate(allocatorId);
     }
-    ~CRoxieDirectPackedRowHeap();
-
-    virtual void *allocate();
 
     virtual void clearRowManager()
     {
@@ -2351,7 +2361,7 @@ public:
     }
 
 protected:
-    Owned<CPackedChunkingHeap> heap;
+    Owned<T> heap;
 };
 
 //================================================================================
@@ -2901,7 +2911,8 @@ public:
 
 
 protected:
-    inline void * inlineDoAllocate(unsigned allocatorId, unsigned maxSpillCost);
+    void * doAllocateRow(unsigned allocatorId, unsigned maxSpillCost);
+
     virtual ChunkedHeaplet * allocateHeaplet() = 0;
 
 protected:
@@ -2941,7 +2952,7 @@ public:
     {
     }
 
-    void * allocate();
+    void * allocate(unsigned allocatorId);
 
     virtual bool matches(size32_t searchSize, unsigned searchActivity, unsigned searchFlags) const
     {
@@ -2995,45 +3006,96 @@ char * ChunkedHeaplet::allocateChunk()
     //The spin lock for the heap this chunk belongs to must be held when this function is called
     char *ret;
     const size32_t size = chunkSize;
-#ifdef HAS_EFFICIENT_CAS
-    unsigned old_blocks = r_blocks.load(std::memory_order_acquire); // acquire ensures that *(unsigned *)ret is up to date
-#endif
-    loop
+
+    if (heapFlags & RHFnofragment)
     {
-#ifndef HAS_EFFICIENT_CAS
-        unsigned old_blocks = r_blocks.load(std::memory_order_acquire); // acquire ensures that *(unsigned *)ret is up to date
-#endif
+        unsigned numAllocs = count.load(std::memory_order_acquire)-1;
+        CChunkedHeap * chunkHeap = static_cast<CChunkedHeap *>(heap);
+        unsigned maxAllocs = chunkHeap->maxChunksPerPage();
+        unsigned curFreeBase = freeBase.load(std::memory_order_relaxed);
 
-        unsigned r_ret = (old_blocks & RBLOCKS_OFFSET_MASK);
-        if (r_ret)
+        //If more than half full allocate an entry from the expanding free area
+        if (numAllocs * size  * 2 >= curFreeBase)
         {
-            ret = makeAbsolute(r_ret);
-            //may have been allocated by another thread, but still legal to dereference
-            //the cas will fail if the contents are invalid.  May be flagged as a benign race.
-            unsigned next = *(unsigned *)ret;
+            //There is no ABA issue on freeBase because it is never decremented (and no next chain with it)
+            size32_t bytesFree = dataAreaSize() - curFreeBase;
+            if (bytesFree >= size)
+            {
+                //This is the only place that modifies freeBase, so it can be unconditional since caller must have a lock.
+                freeBase.store(curFreeBase + size, std::memory_order_relaxed);
+                ret = data() + curFreeBase;
+                goto done;
+            }
+            if (numAllocs == maxAllocs)
+                return nullptr;
+        }
 
-            //There is a potential ABA problem if other thread(s) allocate two or more items, and free the first
-            //item in the window before the following cas.  r_block would match, but next would be invalid.
-            //To avoid that a tag is stored in the top bits of r_blocks which is modified whenever an item is added
-            //onto the free list.  The offsets in the freelist do not need tags.
-            unsigned new_blocks = (old_blocks & RBLOCKS_CAS_TAG_MASK) | next;
-            if (compare_exchange_efficient(r_blocks, old_blocks, new_blocks, std::memory_order_acquire, std::memory_order_acquire))
-                break;
+        {
+            //Scan through all the memory, checking for a block marked as free - should terminate very quickly unless highly fragmented
+            size32_t offset = nextMatchOffset;
+            size32_t delta = 0;
+            //Code as a do..while loop partly to prevent gcc complaining that ret could be uninitialised.
+            do
+            {
+                ret = data() + offset;
+                offset += size;
+                if (offset == curFreeBase)
+                    offset = 0;
+                if (((std::atomic_uint *)ret)->load(std::memory_order_relaxed) == FREE_ROW_COUNT)
+                    break;
+                delta += size;
+            } while (delta < curFreeBase);
 
-            //NOTE: Currently I think a lock is always held before allocating from a chunk, so I'm not sure there is an ABA problem!
+            nextMatchOffset = offset;
+            //save offset
+            assertex(delta != curFreeBase);
         }
-        else
+done:
+        //Mark as allocated before return - while spin lock is still guaranteed to be active
+        ((std::atomic_uint *)ret)->store(0, std::memory_order_relaxed);
+    }
+    else
+    {
+#ifdef HAS_EFFICIENT_CAS
+        unsigned old_blocks = r_blocks.load(std::memory_order_acquire); // acquire ensures that *(unsigned *)ret is up to date
+#endif
+        loop
         {
-            unsigned curFreeBase = freeBase.load(std::memory_order_relaxed);
-            //There is no ABA issue on freeBase because it is never decremented (and no next chain with it)
-            size32_t bytesFree = dataAreaSize() - curFreeBase;
-            if (bytesFree < size)
-                return NULL;
+#ifndef HAS_EFFICIENT_CAS
+            unsigned old_blocks = r_blocks.load(std::memory_order_acquire); // acquire ensures that *(unsigned *)ret is up to date
+#endif
 
-            //This is the only place that modifies freeBase, so it can be unconditional since caller must have a lock.
-            freeBase.store(curFreeBase + size, std::memory_order_relaxed);
-            ret = data() + curFreeBase;
-            break;
+            unsigned r_ret = (old_blocks & RBLOCKS_OFFSET_MASK);
+            if (r_ret)
+            {
+                ret = makeAbsolute(r_ret);
+                //may have been allocated by another thread, but still legal to dereference
+                //the cas will fail if the contents are invalid.  May be flagged as a benign race.
+                unsigned next = *(unsigned *)ret;
+
+                //There is a potential ABA problem if other thread(s) allocate two or more items, and free the first
+                //item in the window before the following cas.  r_block would match, but next would be invalid.
+                //To avoid that a tag is stored in the top bits of r_blocks which is modified whenever an item is added
+                //onto the free list.  The offsets in the freelist do not need tags.
+                unsigned new_blocks = (old_blocks & RBLOCKS_CAS_TAG_MASK) | next;
+                if (compare_exchange_efficient(r_blocks, old_blocks, new_blocks, std::memory_order_acquire, std::memory_order_acquire))
+                    break;
+
+                //NOTE: Currently I think a lock is always held before allocating from a chunk, so I'm not sure there is an ABA problem!
+            }
+            else
+            {
+                unsigned curFreeBase = freeBase.load(std::memory_order_relaxed);
+                //There is no ABA issue on freeBase because it is never decremented (and no next chain with it)
+                size32_t bytesFree = dataAreaSize() - curFreeBase;
+                if (bytesFree < size)
+                    return NULL;
+
+                //This is the only place that modifies freeBase, so it can be unconditional since caller must have a lock.
+                freeBase.store(curFreeBase + size, std::memory_order_relaxed);
+                ret = data() + curFreeBase;
+                break;
+            }
         }
     }
 
@@ -3041,6 +3103,14 @@ char * ChunkedHeaplet::allocateChunk()
     return ret;
 }
 
+bool ChunkedHeaplet::isFull() const
+{
+    CChunkedHeap * chunkHeap = static_cast<CChunkedHeap *>(heap);
+    unsigned numAllocs = count.load(std::memory_order_acquire)-1;
+    unsigned maxAllocs = chunkHeap->maxChunksPerPage();
+    return (numAllocs == maxAllocs);
+}
+
 const void * ChunkedHeaplet::_compactRow(const void * ptr, HeapCompactState & state)
 {
     //NB: If this already belongs to a full heaplet then leave it where it is..
@@ -3615,7 +3685,11 @@ public:
             size32_t rounded = roundup(prevSize+1);
             dbgassertex(ROUNDEDHEAP(rounded) == normalHeaps.ordinality());
             size32_t thisSize = ROUNDEDSIZE(rounded);
-            normalHeaps.append(*new CFixedChunkedHeap(this, _logctx, _allocatorCache, thisSize, RHFvariable, SpillAllCost));
+            unsigned flags = RHFvariable;
+#ifdef ALWAYS_USE_SCAN_HEAP
+            flags |= RHFnofragment;
+#endif
+            normalHeaps.append(*new CFixedChunkedHeap(this, _logctx, _allocatorCache, thisSize, flags, SpillAllCost));
             prevSize = thisSize;
         }
         maxPageLimit = (unsigned) PAGES(_memLimit, HEAP_ALIGNMENT_SIZE);
@@ -3829,20 +3903,11 @@ public:
 
     inline void beforeAllocate(memsize_t _size, unsigned activityId)
     {
-        if (memTraceSizeLimit && _size >= memTraceSizeLimit)
-        {
-            logctx.CTXLOG("Activity %u requesting %" I64F "u bytes!", getActivityId(activityId), (unsigned __int64) _size);
-            PrintStackReport();
-        }
-        if (timeLimit)
-        {
-            unsigned __int64 cyclesNow = get_cycles_now();
-            if (cyclesNow - cyclesChecked >= cyclesCheckInterval)
-            {
-                timeLimit->checkAbort();
-                cyclesChecked = cyclesNow;  // No need to lock - worst that can happen is we call too often which is harmless
-            }
-        }
+        //MORE: We could avoid calling beforeAllocate() at all if neither of these flags are set at heap creation time.
+        if (unlikely(memTraceSizeLimit))
+            checkTraceAllocationSize(_size, activityId);
+        if (unlikely(timeLimit))
+            checkTimeLimit();
     }
 
     virtual void *allocate(memsize_t _size, unsigned activityId)
@@ -4090,6 +4155,9 @@ public:
 
     virtual IFixedRowHeap * createFixedRowHeap(size32_t fixedSize, unsigned activityId, unsigned roxieHeapFlags, unsigned maxSpillCost)
     {
+#ifdef ALWAYS_USE_SCAN_HEAP
+        roxieHeapFlags |= RHFnofragment;
+#endif
         CRoxieFixedRowHeapBase * rowHeap = doCreateFixedRowHeap(fixedSize, activityId, roxieHeapFlags, maxSpillCost);
 
         SpinBlock block(fixedSpinLock);
@@ -4213,21 +4281,24 @@ public:
 
 
 protected:
+    void checkTraceAllocationSize(memsize_t _size, unsigned activityId);
+    void checkTimeLimit();
+
     CRoxieFixedRowHeapBase * doCreateFixedRowHeap(size32_t fixedSize, unsigned activityId, unsigned roxieHeapFlags, unsigned maxSpillCost)
     {
         if ((roxieHeapFlags & RHFoldfixed) || (fixedSize > FixedSizeHeaplet::maxHeapSize()))
             return new CRoxieFixedRowHeap(this, activityId, (RoxieHeapFlags)roxieHeapFlags, fixedSize);
 
-        unsigned heapFlags = roxieHeapFlags & (RHFunique|RHFpacked);
+        unsigned heapFlags = roxieHeapFlags & (RHFunique|RHFpacked|RHFblocked|RHFnofragment);
         if (heapFlags & RHFpacked)
         {
             CPackedChunkingHeap * heap = createPackedHeap(fixedSize, activityId, heapFlags, maxSpillCost);
-            return new CRoxieDirectPackedRowHeap(this, activityId, (RoxieHeapFlags)roxieHeapFlags, heap);
+            return new CRoxieDirectFixedRowHeap<CPackedChunkingHeap>(this, activityId, (RoxieHeapFlags)roxieHeapFlags, heap);
         }
         else
         {
             CFixedChunkedHeap * heap = createFixedHeap(fixedSize, activityId, heapFlags, maxSpillCost);
-            return new CRoxieDirectFixedRowHeap(this, activityId, (RoxieHeapFlags)roxieHeapFlags, heap);
+            return new CRoxieDirectFixedRowHeap<CFixedChunkedHeap>(this, activityId, (RoxieHeapFlags)roxieHeapFlags, heap);
         }
     }
 
@@ -4270,7 +4341,11 @@ protected:
         //If not unique I think it is quite possibly better to reuse one of the existing heaps used for variable length
         //Advantage is fewer pages uses.  Disadvantage is greater likelihood of fragementation being unable to copy
         //rows to consolidate them.  Almost certainly packed or unique will be set in that situation though.
+#ifdef ALWAYS_USE_SCAN_HEAP
         if (!(flags & RHFunique))
+#else
+        if (!(flags & (RHFunique|RHFnofragment)))
+#endif
         {
             size32_t whichHeap = ROUNDEDHEAP(rounded);
             return LINK(&normalHeaps.item(whichHeap));
@@ -4409,6 +4484,29 @@ protected:
     virtual IRowManager * querySlaveRowManager(unsigned slave) { return NULL; }
 };
 
+void CChunkingRowManager::checkTraceAllocationSize(memsize_t _size, unsigned activityId)
+{
+    if (memTraceSizeLimit && _size >= memTraceSizeLimit)
+    {
+        logctx.CTXLOG("Activity %u requesting %" I64F "u bytes!", getActivityId(activityId), (unsigned __int64) _size);
+        PrintStackReport();
+    }
+}
+
+void CChunkingRowManager::checkTimeLimit()
+{
+    if (timeLimit)
+    {
+        unsigned __int64 cyclesNow = get_cycles_now();
+        if (cyclesNow - cyclesChecked >= cyclesCheckInterval)
+        {
+            timeLimit->checkAbort();
+            cyclesChecked = cyclesNow;  // No need to lock - worst that can happen is we call too often which is harmless
+        }
+    }
+}
+
+
 //================================================================================
 
 class CGlobalRowManager;
@@ -4675,29 +4773,6 @@ void * CRoxieFixedRowHeap::allocate()
     return rowManager->allocate(chunkCapacity, allocatorId);
 }
 
-void * CRoxieDirectFixedRowHeap::allocate()
-{
-    return heap->allocate(allocatorId);
-}
-
-CRoxieDirectFixedRowHeap::~CRoxieDirectFixedRowHeap()
-{
-    if (heap && (flags & RHFunique))
-        heap->noteOrphaned();
-}
-
-CRoxieDirectPackedRowHeap::~CRoxieDirectPackedRowHeap()
-{
-    if (heap && (flags & RHFunique))
-        heap->noteOrphaned();
-}
-
-void * CRoxieDirectPackedRowHeap::allocate()
-{
-    return heap->allocate();
-}
-
-
 void * CRoxieVariableRowHeap::allocate(memsize_t size, memsize_t & capacity)
 {
     void * ret = rowManager->allocate(size, allocatorId);
@@ -4858,7 +4933,7 @@ void CHugeHeap::expandHeap(void * original, memsize_t copysize, memsize_t oldcap
 
 
 //An inline function used to common up the allocation code for fixed and non fixed sizes.
-void * CChunkedHeap::inlineDoAllocate(unsigned allocatorId, unsigned maxSpillCost)
+void * CChunkedHeap::doAllocateRow(unsigned allocatorId, unsigned maxSpillCost)
 {
     //Only hold the spinblock while walking the list - so subsequent calls to checkLimit don't deadlock.
     //NB: The allocation is split into two - finger->allocateChunk, and finger->initializeChunk().
@@ -4997,7 +5072,7 @@ const void * CChunkedHeap::compactRow(const void * ptr, HeapCompactState & state
 
 void * CChunkedHeap::doAllocate(unsigned activityId, unsigned maxSpillCost)
 {
-    return inlineDoAllocate(activityId, maxSpillCost);
+    return doAllocateRow(activityId, maxSpillCost);
 }
 
 //================================================================================
@@ -5007,13 +5082,13 @@ ChunkedHeaplet * CFixedChunkedHeap::allocateHeaplet()
     void * memory = suballoc_aligned(1, true);
     if (!memory)
         return NULL;
-    return new (memory) FixedSizeHeaplet(this, allocatorCache, chunkSize);
+    return new (memory) FixedSizeHeaplet(this, allocatorCache, chunkSize, flags);
 }
 
 void * CFixedChunkedHeap::allocate(unsigned activityId)
 {
     rowManager->beforeAllocate(chunkSize-FixedSizeHeaplet::chunkHeaderSize, activityId);
-    return inlineDoAllocate(activityId, defaultSpillCost);
+    return doAllocateRow(activityId, defaultSpillCost);
 }
 
 
@@ -5022,13 +5097,13 @@ ChunkedHeaplet * CPackedChunkingHeap::allocateHeaplet()
     void * memory = suballoc_aligned(1, true);
     if (!memory)
         return NULL;
-    return new (memory) PackedFixedSizeHeaplet(this, allocatorCache, chunkSize, allocatorId);
+    return new (memory) PackedFixedSizeHeaplet(this, allocatorCache, chunkSize, allocatorId, flags);
 }
 
-void * CPackedChunkingHeap::allocate()
+void * CPackedChunkingHeap::allocate(unsigned allocatorId)
 {
     rowManager->beforeAllocate(chunkSize-PackedFixedSizeHeaplet::chunkHeaderSize, allocatorId);
-    return inlineDoAllocate(allocatorId, defaultSpillCost);
+    return doAllocateRow(allocatorId, defaultSpillCost);
 }
 
 
@@ -5677,8 +5752,8 @@ protected:
                 CACHE_LINE_SIZE, (size32_t)sizeof(Heaplet), (size32_t)sizeof(ChunkedHeaplet), (size32_t)sizeof(FixedSizeHeaplet), (size32_t)sizeof(PackedFixedSizeHeaplet), (size32_t)sizeof(HugeHeaplet));
         DBGLOG("Heap: fixed(%u) packed(%u) huge(%u)",
                 (size32_t)sizeof(CFixedChunkedHeap), (size32_t)sizeof(CPackedChunkingHeap), (size32_t)sizeof(CHugeHeap));
-        DBGLOG("IHeap: fixed(%u) directfixed(%u) packed(%u) variable(%u)",
-                (size32_t)sizeof(CRoxieFixedRowHeap), (size32_t)sizeof(CRoxieDirectFixedRowHeap), (size32_t)sizeof(CRoxieDirectPackedRowHeap), (size32_t)sizeof(CRoxieVariableRowHeap));
+        DBGLOG("IHeap: fixed(%u) directfixed(%u) variable(%u)",
+                (size32_t)sizeof(CRoxieFixedRowHeap), (size32_t)sizeof(CRoxieDirectFixedRowHeap<CPackedChunkingHeap>), (size32_t)sizeof(CRoxieVariableRowHeap));
 
         ASSERT(FixedSizeHeaplet::dataOffset() >= sizeof(FixedSizeHeaplet));
         ASSERT(PackedFixedSizeHeaplet::dataOffset() >= sizeof(PackedFixedSizeHeaplet));
@@ -6495,8 +6570,9 @@ protected:
         Owned<IRowManager> rowManager = createRowManager(0, NULL, logctx, NULL);
         CountingRowAllocatorCache rowCache;
         void * memory = suballoc_aligned(1, true);
+        unsigned heapFlags = 0;
         CFixedChunkedHeap dummyHeap((CChunkingRowManager*)rowManager.get(), logctx, &rowCache, 32, 0, SpillAllCost);
-        FixedSizeHeaplet * heaplet = new (memory) FixedSizeHeaplet(&dummyHeap, &rowCache, 32);
+        FixedSizeHeaplet * heaplet = new (memory) FixedSizeHeaplet(&dummyHeap, &rowCache, 32, heapFlags);
         heaplet->precreateFreeChain();
         Semaphore sem;
         CasAllocatorThread * threads[numCasThreads];

+ 2 - 0
roxie/roxiemem/roxiemem.hpp

@@ -402,6 +402,8 @@ enum RoxieHeapFlags
     RHFunique           = 0x0004,  // create a separate fixed size allocator
     RHFoldfixed         = 0x0008,  // Don't create a special fixed size heap for this
     RHFvariable         = 0x0010,  // only used for tracing
+    RHFblocked          = 0x0040,  // allocate blocks of rows
+    RHFnofragment       = 0x0080,  // the allocated records will not be fragmented
 
     //internal flags
     RHForphaned         = 0x80000000,   // heap will no longer be used, can be deleted