Переглянути джерело

HPCC-3327 Row arrays limited to 500 million rows

In places (in thor) where an expanding array of rows is allocated from the
roxie/thor memory manager (i.e.. most activities that need to gather large
numbers of rows), we will hit issues (either incorrect behaviour or failures)
when the array size exceeds 4Gb.

With 64-bit pointers, that effectively creates a limit on the number of rows
at 500 million.

Change roxiemem layer to support allocations of > 4gb. This should push the
limit up to 4 billion rows.

We will need to address that limit too, but in a separate issue.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 роки тому
батько
коміт
87e3fbe651

+ 1 - 1
roxie/ccd/ccdmain.cpp

@@ -633,7 +633,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
         numChannels = topology->getPropInt("@numChannels", 0);
         numActiveChannels = topology->getPropInt("@numActiveChannels", numChannels);
         statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600);
-        roxiemem::memTraceSizeLimit = topology->getPropInt("@memTraceSizeLimit", 0);
+        roxiemem::memTraceSizeLimit = (memsize_t) topology->getPropInt64("@memTraceSizeLimit", 0);
         callbackRetries = topology->getPropInt("@callbackRetries", 3);
         callbackTimeout = topology->getPropInt("@callbackTimeout", 5000);
         lowTimeout = topology->getPropInt("@lowTimeout", 10000);

+ 2 - 2
roxie/ccd/ccdstate.cpp

@@ -1779,8 +1779,8 @@ private:
             }
             else if (stricmp(queryName, "control:memtracesizelimit")==0)
             {
-                roxiemem::memTraceSizeLimit = control->getPropInt("@val", control->getPropInt("@value", 0)); // used to accept @value so coded like this for backward compatibility
-                topology->setPropInt("@memTraceSizeLimit", roxiemem::memTraceSizeLimit);
+                roxiemem::memTraceSizeLimit = (memsize_t) control->getPropInt64("@val", control->getPropInt64("@value", 0)); // used to accept @value so coded like this for backward compatibility
+                topology->setPropInt64("@memTraceSizeLimit", roxiemem::memTraceSizeLimit);
             }
             else if (stricmp(queryName, "control:metrics")==0)
             {

+ 95 - 43
roxie/roxiemem/roxiemem.cpp

@@ -26,6 +26,7 @@
 
 #ifdef _DEBUG
 #define _CLEAR_ALLOCATED_ROW
+//#define _CLEAR_ALLOCATED_HUGE_ROW
 #endif
 
 #ifdef _WIN32
@@ -40,7 +41,7 @@ namespace roxiemem {
 #define TIMEOUT_CHECK_FREQUENCY_MILLISECONDS 10
 
 unsigned memTraceLevel = 1;
-size32_t memTraceSizeLimit = 0;
+memsize_t memTraceSizeLimit = 0;
 
 unsigned DATA_ALIGNMENT_SIZE=0x400;
 /*------------
@@ -546,7 +547,7 @@ class BigHeapletBase : public HeapletBase
 protected:
     BigHeapletBase *next;
     const IRowAllocatorCache *allocatorCache;
-    const unsigned chunkCapacity;
+    const memsize_t chunkCapacity;
     
     inline unsigned getActivityId(unsigned allocatorId) const
     {
@@ -554,7 +555,7 @@ protected:
     }
 
 public:
-    BigHeapletBase(const IRowAllocatorCache *_allocatorCache, unsigned _chunkCapacity) : chunkCapacity(_chunkCapacity)
+    BigHeapletBase(const IRowAllocatorCache *_allocatorCache, memsize_t _chunkCapacity) : chunkCapacity(_chunkCapacity)
     {
         next = NULL;
         allocatorCache = _allocatorCache;
@@ -562,7 +563,7 @@ public:
 
     virtual size32_t sizeInPages() = 0;
 
-    virtual size32_t _capacity() const { return chunkCapacity; }
+    virtual memsize_t _capacity() const { return chunkCapacity; }
     virtual void reportLeaks(unsigned &leaked, const IContextLogger &logctx) const = 0;
     virtual void checkHeap() const = 0;
     virtual void getPeakActivityUsage(IActivityMemoryUsageMap *map) const = 0;
@@ -1042,13 +1043,13 @@ protected:
         return PAGES(chunkCapacity + offsetof(HugeHeaplet, data), HEAP_ALIGNMENT_SIZE);
     }
 
-    inline size32_t calcCapacity(unsigned requestedSize) const
+    inline memsize_t calcCapacity(memsize_t requestedSize) const
     {
         return align_pow2(requestedSize + dataOffset(), HEAP_ALIGNMENT_SIZE) - dataOffset();
     }
 
 public:
-    HugeHeaplet(const IRowAllocatorCache *_allocatorCache, unsigned _hugeSize, unsigned _allocatorId) : BigHeapletBase(_allocatorCache, calcCapacity(_hugeSize))
+    HugeHeaplet(const IRowAllocatorCache *_allocatorCache, memsize_t _hugeSize, unsigned _allocatorId) : BigHeapletBase(_allocatorCache, calcCapacity(_hugeSize))
     {
         allocatorId = _allocatorId;
     }
@@ -1118,11 +1119,11 @@ public:
         throwUnexpected();
     }
 
-    void *allocateHuge(unsigned size)
+    void *allocateHuge(memsize_t size)
     {
         atomic_inc(&count);
         dbgassertex(size <= chunkCapacity);
-#ifdef _CLEAR_ALLOCATED_ROW
+#ifdef _CLEAR_ALLOCATED_HUGE_ROW
         memset(&data, 0xcc, chunkCapacity);
 #endif
         return &data;
@@ -1130,7 +1131,7 @@ public:
 
     virtual void reportLeaks(unsigned &leaked, const IContextLogger &logctx) const 
     {
-        logctx.CTXLOG("Block size %u was allocated by activity %u and not freed", chunkCapacity, getActivityId(allocatorId));
+        logctx.CTXLOG("Block size %"I64F"u was allocated by activity %u and not freed", (unsigned __int64) chunkCapacity, getActivityId(allocatorId));
         leaked--;
     }
 
@@ -1327,9 +1328,9 @@ public:
     }
     IMPLEMENT_IINTERFACE
 
-    virtual void *allocate(size32_t size, size32_t & capacity);
-    virtual void *resizeRow(void * original, size32_t oldsize, size32_t newsize, size32_t &capacity);
-    virtual void *finalizeRow(void *final, size32_t originalSize, size32_t finalSize);
+    virtual void *allocate(memsize_t size, memsize_t & capacity);
+    virtual void *resizeRow(void * original, memsize_t oldsize, memsize_t newsize, memsize_t &capacity);
+    virtual void *finalizeRow(void *final, memsize_t originalSize, memsize_t finalSize);
 
 protected:
     CChunkingRowManager * rowManager;       // Lifetime of rowManager is guaranteed to be longer
@@ -1415,8 +1416,8 @@ public:
                     break;
 
                 if (memTraceLevel >= 3)
-                    logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager::pages() freeing Heaplet linked in active list - addr=%p pages=%u capacity=%u rowMgr=%p",
-                            finger, finger->sizeInPages(), finger->_capacity(), this);
+                    logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager::pages() freeing Heaplet linked in active list - addr=%p pages=%u capacity=%"I64F"u rowMgr=%p",
+                            finger, finger->sizeInPages(), (unsigned __int64) finger->_capacity(), this);
                 if (prev)
                     setNext(prev, next);
                 else
@@ -1474,10 +1475,10 @@ public:
     {
     }
 
-    void * doAllocate(size32_t _size, unsigned allocatorId);
+    void * doAllocate(memsize_t _size, unsigned allocatorId);
 
 protected:
-    HugeHeaplet * allocateHeaplet(size32_t _size, unsigned allocatorId);
+    HugeHeaplet * allocateHeaplet(memsize_t _size, unsigned allocatorId);
 };
 
 class CNormalChunkingHeap : public CChunkingHeap
@@ -2064,11 +2065,11 @@ public:
         return ROUNDED(baseBlock + (firstFractionalHeap-frac), alignedsize);
     }
 
-    inline void beforeAllocate(unsigned _size, unsigned activityId)
+    inline void beforeAllocate(memsize_t _size, unsigned activityId)
     {
-        if (memTraceSizeLimit && _size > memTraceSizeLimit)
+        if (memTraceSizeLimit && _size >= memTraceSizeLimit)
         {
-            logctx.CTXLOG("Activity %u requesting %u bytes!", getActivityId(activityId), _size);
+            logctx.CTXLOG("Activity %u requesting %"I64F"u bytes!", getActivityId(activityId), (unsigned __int64) _size);
             PrintStackReport();
         }
         if (timeLimit)
@@ -2082,13 +2083,14 @@ public:
         }
     }
 
-    virtual void *allocate(unsigned _size, unsigned activityId)
+    virtual void *allocate(memsize_t _size, unsigned activityId)
     {
         beforeAllocate(_size, activityId);
         if (_size > FixedSizeHeaplet::maxHeapSize())
             return hugeHeap.doAllocate(_size, activityId);
+        size32_t size32 = (size32_t) _size;
 
-        size32_t rounded = roundup(_size + FixedSizeHeaplet::chunkHeaderSize);
+        size32_t rounded = roundup(size32 + FixedSizeHeaplet::chunkHeaderSize);
         size32_t whichHeap = ROUNDEDHEAP(rounded);
         CFixedChunkingHeap & normalHeap = normalHeaps.item(whichHeap);
         return normalHeap.doAllocate(activityId);
@@ -2114,11 +2116,11 @@ public:
             logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager::setMemoryLimit new memlimit=%"I64F"u pageLimit=%u spillLimit=%u rowMgr=%p", (unsigned __int64) bytes, pageLimit, spillPageLimit, this);
     }
 
-    virtual void *resizeRow(void * original, size32_t oldsize, size32_t newsize, unsigned activityId, size32_t &capacity)
+    virtual void *resizeRow(void * original, memsize_t oldsize, memsize_t newsize, unsigned activityId, memsize_t &capacity)
     {
         assertex(newsize);
         assertex(!HeapletBase::isShared(original));
-        size32_t curCapacity = HeapletBase::capacity(original);
+        memsize_t curCapacity = HeapletBase::capacity(original);
         if (newsize <= curCapacity)
         {
             //resizeRow never shrinks memory
@@ -2133,7 +2135,7 @@ public:
         return ret;
     }
 
-    virtual void *finalizeRow(void * original, size32_t initialSize, size32_t finalSize, unsigned activityId)
+    virtual void *finalizeRow(void * original, memsize_t initialSize, memsize_t finalSize, unsigned activityId)
     {
         dbgassertex(finalSize);
         dbgassertex(!HeapletBase::isShared(original));
@@ -2424,34 +2426,36 @@ protected:
         callbacks.removeRowBuffer(callback);
     }
 
-    virtual size_t getExpectedCapacity(size32_t size, unsigned heapFlags)
+    virtual memsize_t getExpectedCapacity(memsize_t size, unsigned heapFlags)
     {
         if (size > FixedSizeHeaplet::maxHeapSize())
         {
-            unsigned numPages = PAGES(size + HugeHeaplet::dataOffset(), HEAP_ALIGNMENT_SIZE);
+            memsize_t numPages = PAGES(size + HugeHeaplet::dataOffset(), HEAP_ALIGNMENT_SIZE);
             return (numPages * HEAP_ALIGNMENT_SIZE) - HugeHeaplet::dataOffset();
         }
+        size32_t size32 = (size32_t) size;
 
         if (heapFlags & RHFpacked)
-            return align_pow2(size + PackedFixedSizeHeaplet::chunkHeaderSize, PACKED_ALIGNMENT) - PackedFixedSizeHeaplet::chunkHeaderSize;
+            return align_pow2(size32 + PackedFixedSizeHeaplet::chunkHeaderSize, PACKED_ALIGNMENT) - PackedFixedSizeHeaplet::chunkHeaderSize;
 
-        size32_t rounded = roundup(size + FixedSizeHeaplet::chunkHeaderSize);
+        size32_t rounded = roundup(size32 + FixedSizeHeaplet::chunkHeaderSize);
         size32_t heapSize = ROUNDEDSIZE(rounded);
         return heapSize - FixedSizeHeaplet::chunkHeaderSize;
     }
 
-    virtual size_t getExpectedFootprint(size32_t size, unsigned heapFlags)
+    virtual memsize_t getExpectedFootprint(memsize_t size, unsigned heapFlags)
     {
         if (size > FixedSizeHeaplet::maxHeapSize())
         {
-            unsigned numPages = PAGES(size + HugeHeaplet::dataOffset(), HEAP_ALIGNMENT_SIZE);
+            memsize_t numPages = PAGES(size + HugeHeaplet::dataOffset(), HEAP_ALIGNMENT_SIZE);
             return (numPages * HEAP_ALIGNMENT_SIZE);
         }
+        size32_t size32 = (size32_t) size;
 
         if (heapFlags & RHFpacked)
-            return align_pow2(size + PackedFixedSizeHeaplet::chunkHeaderSize, PACKED_ALIGNMENT);
+            return align_pow2(size32 + PackedFixedSizeHeaplet::chunkHeaderSize, PACKED_ALIGNMENT);
 
-        size32_t rounded = roundup(size + FixedSizeHeaplet::chunkHeaderSize);
+        size32_t rounded = roundup(size32 + FixedSizeHeaplet::chunkHeaderSize);
         size32_t heapSize = ROUNDEDSIZE(rounded);
         return heapSize;
     }
@@ -2488,19 +2492,19 @@ void * CRoxieDirectPackedRowHeap::allocate()
 }
 
 
-void * CRoxieVariableRowHeap::allocate(size32_t size, size32_t & capacity)
+void * CRoxieVariableRowHeap::allocate(memsize_t size, memsize_t & capacity)
 {
     void * ret = rowManager->allocate(size, allocatorId);
     capacity = RoxieRowCapacity(ret);
     return ret;
 }
 
-void * CRoxieVariableRowHeap::resizeRow(void * original, size32_t oldsize, size32_t newsize, size32_t &capacity)
+void * CRoxieVariableRowHeap::resizeRow(void * original, memsize_t oldsize, memsize_t newsize, memsize_t &capacity)
 {
     return rowManager->resizeRow(original, oldsize, newsize, allocatorId, capacity);
 }
 
-void * CRoxieVariableRowHeap::finalizeRow(void *final, size32_t originalSize, size32_t finalSize)
+void * CRoxieVariableRowHeap::finalizeRow(void *final, memsize_t originalSize, memsize_t finalSize)
 {
     //If rows never shrink then the following is sufficient.
     if (flags & RHFhasdestructor)
@@ -2511,7 +2515,7 @@ void * CRoxieVariableRowHeap::finalizeRow(void *final, size32_t originalSize, si
 //================================================================================
 
 //MORE: Make this a nested class??
-HugeHeaplet * CHugeChunkingHeap::allocateHeaplet(size32_t _size, unsigned allocatorId)
+HugeHeaplet * CHugeChunkingHeap::allocateHeaplet(memsize_t _size, unsigned allocatorId)
 {
     unsigned numPages = PAGES(_size + HugeHeaplet::dataOffset(), HEAP_ALIGNMENT_SIZE);
 
@@ -2530,15 +2534,15 @@ HugeHeaplet * CHugeChunkingHeap::allocateHeaplet(size32_t _size, unsigned alloca
     }
 }
 
-void * CHugeChunkingHeap::doAllocate(size32_t _size, unsigned allocatorId)
+void * CHugeChunkingHeap::doAllocate(memsize_t _size, unsigned allocatorId)
 {
     HugeHeaplet *head = allocateHeaplet(_size, allocatorId);
 
-    if (memTraceLevel >= 2 || _size>= 10000000)
+    if (memTraceLevel >= 2 || (memTraceSizeLimit && _size >= memTraceSizeLimit))
     {
         unsigned numPages = head->sizeInPages();
-        logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager::allocate(size %u) allocated new HugeHeaplet size %u - addr=%p pages=%u pageLimit=%u peakPages=%u rowMgr=%p",
-            _size, (unsigned) (numPages*HEAP_ALIGNMENT_SIZE), head, numPages, rowManager->pageLimit, rowManager->peakPages, this);
+        logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager::allocate(size %"I64F"u) allocated new HugeHeaplet size %"I64F"u - addr=%p pages=%u pageLimit=%u peakPages=%u rowMgr=%p",
+            (unsigned __int64) _size, (unsigned __int64) (numPages*HEAP_ALIGNMENT_SIZE), head, numPages, rowManager->pageLimit, rowManager->peakPages, this);
     }
 
     SpinBlock b(crit);
@@ -2692,7 +2696,7 @@ bool DataBuffer::attachToRowMgr(IRowManager *rowMgr)
     }
 }
 
-size32_t DataBuffer::_capacity() const { throwUnexpected(); }
+memsize_t DataBuffer::_capacity() const { throwUnexpected(); }
 void DataBuffer::_setDestructorFlag(const void *ptr) { throwUnexpected(); }
 
 class CDataBufferManager : public CInterface, implements IDataBufferManager
@@ -2938,7 +2942,7 @@ bool DataBufferBottom::_isShared(const void *ptr) const
         return base->_isShared(ptr);
 }
 
-size32_t DataBufferBottom::_capacity() const { throwUnexpected(); }
+memsize_t DataBufferBottom::_capacity() const { throwUnexpected(); }
 void DataBufferBottom::_setDestructorFlag(const void *ptr) { throwUnexpected(); }
 
 //================================================================================
@@ -4232,10 +4236,58 @@ protected:
     }
 };
 
+const memsize_t hugeMemorySize = 0x110000000;
+const memsize_t hugeAllocSize = 0x100000001;
+// const memsize_t initialAllocSize = hugeAllocSize/2; // need to support expand block for that to fit
+const memsize_t initialAllocSize = 0x100;
+
+class RoxieMemHugeTests : public CppUnit::TestFixture
+{
+    CPPUNIT_TEST_SUITE( RoxieMemHugeTests );
+    CPPUNIT_TEST(testHuge);
+    CPPUNIT_TEST_SUITE_END();
+    const IContextLogger &logctx;
+
+public:
+    RoxieMemHugeTests() : logctx(queryDummyContextLogger())
+    {
+        setTotalMemoryLimit(hugeMemorySize, 0, NULL);
+    }
+
+    ~RoxieMemHugeTests()
+    {
+        releaseRoxieHeap();
+    }
+
+protected:
+    void testHuge()
+    {
+        Owned<IRowManager> rowManager = createRowManager(0, NULL, logctx, NULL);
+        void * huge = rowManager->allocate(hugeAllocSize, 1);
+        ASSERT(rowManager->numPagesAfterCleanup(true)==4097);
+        ReleaseRoxieRow(huge);
+        ASSERT(rowManager->numPagesAfterCleanup(true)==0);
+        memsize_t capacity;
+        void *huge1 = rowManager->allocate(initialAllocSize, 1);
+        void *huge2 = rowManager->resizeRow(huge1, initialAllocSize, hugeAllocSize, 1, capacity);
+        ASSERT(capacity > hugeAllocSize);
+        ASSERT(rowManager->numPagesAfterCleanup(true)==4097);
+        ReleaseRoxieRow(huge2);
+        ASSERT(rowManager->numPagesAfterCleanup(true)==0);
+
+        ASSERT(rowManager->getExpectedCapacity(hugeAllocSize, RHFnone) > hugeAllocSize);
+        ASSERT(rowManager->getExpectedFootprint(hugeAllocSize, RHFnone) > hugeAllocSize);
+        ASSERT(true);
+    }
+};
+
 CPPUNIT_TEST_SUITE_REGISTRATION( RoxieMemTests );
 CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( RoxieMemTests, "RoxieMemTests" );
 CPPUNIT_TEST_SUITE_REGISTRATION( RoxieMemStressTests );
 CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( RoxieMemStressTests, "RoxieMemStressTests" );
-
+#ifdef __64BIT__
+//CPPUNIT_TEST_SUITE_REGISTRATION( RoxieMemHugeTests );
+CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( RoxieMemHugeTests, "RoxieMemHugeTests" );
+#endif
 } // namespace roxiemem
 #endif

+ 13 - 13
roxie/roxiemem/roxiemem.hpp

@@ -107,7 +107,7 @@ protected:
 
     virtual void noteReleased(const void *ptr) = 0;
     virtual bool _isShared(const void *ptr) const = 0;
-    virtual size32_t _capacity() const = 0;
+    virtual memsize_t _capacity() const = 0;
     virtual void _setDestructorFlag(const void *ptr) = 0;
     virtual bool _hasDestructor(const void *ptr) const = 0;
     virtual unsigned _rawAllocatorId(const void *ptr) const = 0;
@@ -144,7 +144,7 @@ public:
         throwUnexpected();
     }
 
-    static size32_t capacity(const void *ptr)
+    static memsize_t capacity(const void *ptr)
     {
         if (ptr)
         {
@@ -261,7 +261,7 @@ class roxiemem_decl DataBuffer : public DataBufferBase
     friend class CDataBufferManager;
 private:
     virtual void released();
-    virtual size32_t _capacity() const;
+    virtual memsize_t _capacity() const;
     virtual void _setDestructorFlag(const void *ptr);
     virtual bool _hasDestructor(const void *ptr) const { return false; }
     virtual unsigned _rawAllocatorId(const void *ptr) const { return 0; }
@@ -293,7 +293,7 @@ private:
     virtual void released();
     virtual void noteReleased(const void *ptr);
     virtual bool _isShared(const void *ptr) const;
-    virtual size32_t _capacity() const;
+    virtual memsize_t _capacity() const;
     virtual void _setDestructorFlag(const void *ptr);
     virtual bool _hasDestructor(const void *ptr) const { return false; }
     virtual unsigned _rawAllocatorId(const void *ptr) const { return 0; }
@@ -396,9 +396,9 @@ interface IFixedRowHeap : extends IInterface
 
 interface IVariableRowHeap : extends IInterface
 {
-    virtual void *allocate(size32_t size, size32_t & capacity) = 0;
-    virtual void *resizeRow(void * original, size32_t oldsize, size32_t newsize, size32_t &capacity) = 0;
-    virtual void *finalizeRow(void *final, size32_t originalSize, size32_t finalSize) = 0;
+    virtual void *allocate(memsize_t size, memsize_t & capacity) = 0;
+    virtual void *resizeRow(void * original, memsize_t oldsize, memsize_t newsize, memsize_t &capacity) = 0;
+    virtual void *finalizeRow(void *final, memsize_t originalSize, memsize_t finalSize) = 0;
 };
 
 enum RoxieHeapFlags
@@ -413,9 +413,9 @@ enum RoxieHeapFlags
 // Variable size aggregated link-counted Roxie (etc) row manager
 interface IRowManager : extends IInterface
 {
-    virtual void *allocate(size32_t size, unsigned activityId) = 0;
-    virtual void *resizeRow(void * original, size32_t oldsize, size32_t newsize, unsigned activityId, size32_t &capacity) = 0;
-    virtual void *finalizeRow(void *final, size32_t originalSize, size32_t finalSize, unsigned activityId) = 0;
+    virtual void *allocate(memsize_t size, unsigned activityId) = 0;
+    virtual void *resizeRow(void * original, memsize_t oldsize, memsize_t newsize, unsigned activityId, memsize_t &capacity) = 0;
+    virtual void *finalizeRow(void *final, memsize_t originalSize, memsize_t finalSize, unsigned activityId) = 0;
     virtual void setMemoryLimit(memsize_t size, memsize_t spillSize = 0) = 0;
     virtual unsigned allocated() = 0;
     virtual unsigned numPagesAfterCleanup(bool forceFreeAll) = 0; // calls releaseEmptyPages() then returns
@@ -430,8 +430,8 @@ interface IRowManager : extends IInterface
     virtual IVariableRowHeap * createVariableRowHeap(unsigned activityId, unsigned roxieHeapFlags) = 0;            // should this be passed the initial size?
     virtual void addRowBuffer(IBufferedRowCallback * callback) = 0;
     virtual void removeRowBuffer(IBufferedRowCallback * callback) = 0;
-    virtual size_t getExpectedCapacity(size32_t size, unsigned heapFlags) = 0; // what is the expected capacity for a given size allocation
-    virtual size_t getExpectedFootprint(size32_t size, unsigned heapFlags) = 0; // how much memory will a given size allocation actually use.
+    virtual memsize_t getExpectedCapacity(memsize_t size, unsigned heapFlags) = 0; // what is the expected capacity for a given size allocation
+    virtual memsize_t getExpectedFootprint(memsize_t size, unsigned heapFlags) = 0; // how much memory will a given size allocation actually use.
 };
 
 extern roxiemem_decl void setDataAlignmentSize(unsigned size);
@@ -464,7 +464,7 @@ extern roxiemem_decl memsize_t getTotalMemoryLimit();
 extern roxiemem_decl void releaseRoxieHeap();
 extern roxiemem_decl bool memPoolExhausted();
 extern roxiemem_decl unsigned memTraceLevel;
-extern roxiemem_decl size32_t memTraceSizeLimit;
+extern roxiemem_decl memsize_t memTraceSizeLimit;
 
 extern roxiemem_decl atomic_t dataBufferPages;
 extern roxiemem_decl atomic_t dataBuffersActive;

+ 16 - 12
roxie/roxiemem/roxierow.cpp

@@ -48,7 +48,7 @@ public:
     static inline void setCheck(size32_t size, void * _ptr)
     {
         byte * ptr = static_cast<byte *>(_ptr);
-        size32_t capacity = RoxieRowCapacity(ptr);
+        memsize_t capacity = RoxieRowCapacity(ptr);
         memset(ptr+size, 0, capacity - size - extraSize);
         unsigned short * check = reinterpret_cast<unsigned short *>(ptr + capacity - extraSize);
         *check = crc16(ptr, capacity-extraSize, 0);
@@ -58,7 +58,7 @@ public:
         if (RoxieRowHasDestructor(_ptr))
         {
             const byte * ptr = static_cast<const byte *>(_ptr);
-            size32_t capacity = RoxieRowCapacity(ptr);
+            memsize_t capacity = RoxieRowCapacity(ptr);
             const unsigned short * check = reinterpret_cast<const unsigned short *>(ptr + capacity - extraSize);
             return *check == crc16(ptr, capacity-extraSize, 0);
         }
@@ -77,7 +77,7 @@ public:
     static inline void setCheck(size32_t size, void * _ptr)
     {
         byte * ptr = static_cast<byte *>(_ptr);
-        size32_t capacity = RoxieRowCapacity(ptr);
+        memsize_t capacity = RoxieRowCapacity(ptr);
         memset(ptr+size, 0, capacity - size - extraSize);
         unsigned short * check = reinterpret_cast<unsigned short *>(ptr + capacity - extraSize);
         *check = chksum16(ptr, capacity-extraSize);
@@ -87,7 +87,7 @@ public:
         if (RoxieRowHasDestructor(_ptr))
         {
             const byte * ptr = static_cast<const byte *>(_ptr);
-            size32_t capacity = RoxieRowCapacity(ptr);
+            memsize_t capacity = RoxieRowCapacity(ptr);
             const unsigned short * check = reinterpret_cast<const unsigned short *>(ptr + capacity - extraSize);
             return chksum16(ptr, capacity-extraSize) == *check;
         }
@@ -200,7 +200,7 @@ protected:
         if (newRowCount * sizeof(void *) <= RoxieRowCapacity(rowset))
             return rowset;
 
-        size32_t capacity;
+        memsize_t capacity;
         return (byte * *)rowManager.resizeRow(rowset, oldRowCount * sizeof(void *), newRowCount * sizeof(void *), allocatorId | ACTIVITY_FLAG_ISREGISTERED, capacity);
     }
 
@@ -268,27 +268,31 @@ public:
 
     virtual void * createRow()
     {
-        size32_t allocSize = meta.getInitialSize();
-        size32_t capacity;
+        memsize_t allocSize = meta.getInitialSize();
+        memsize_t capacity;
         return heap->allocate(allocSize+CHECKER::extraSize, capacity);
     }
 
     virtual void * createRow(size32_t & allocatedSize)
     {
-        const size32_t allocSize = meta.getInitialSize();
-        void * row = heap->allocate(allocSize+CHECKER::extraSize, allocatedSize);
+        const memsize_t allocSize = meta.getInitialSize();
+        memsize_t newCapacity = allocatedSize;
+        void * row = heap->allocate(allocSize+CHECKER::extraSize, newCapacity);
         //This test should get constant folded to avoid the decrement when not checked.
         if (CHECKER::extraSize)
-            allocatedSize -= CHECKER::extraSize;
+            newCapacity -= CHECKER::extraSize;
+        allocatedSize = newCapacity;
         return row;
     }
 
     virtual void * resizeRow(size32_t newSize, void * row, size32_t & size)
     {
         size32_t oldsize = size;  // don't need to include the extra checking bytes
-        void * newrow = heap->resizeRow(row, oldsize, newSize+CHECKER::extraSize, size);
+        memsize_t newCapacity = size;
+        void * newrow = heap->resizeRow(row, oldsize, newSize+CHECKER::extraSize, newCapacity);
         if (CHECKER::extraSize)
-            size -= CHECKER::extraSize;
+            newCapacity -= CHECKER::extraSize;
+        size = newCapacity;
         return newrow;
     }
 

+ 1 - 1
roxie/roxiemem/roxierowbuff.hpp

@@ -40,7 +40,7 @@ A derived implementation resizes the array if necessary.
 
 */
 
-//This allows up to 32Gb of pointers to rows on a 64bit machine.  That is likely to be enough for the next few years.
+//This allows up to 32Gb of pointers to rows on a 64bit machine.  That is likely to be enough for the next few months.
 typedef size32_t rowidx_t;
 
 class roxiemem_decl RoxieOutputRowArray

+ 3 - 3
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -2393,8 +2393,8 @@ public:
             if (!isVariable && helper->queryOutputMeta()->isFixedSize())
             {
                 roxiemem::IRowManager *rM = queryJob().queryRowManager();
-                size32_t keySize = rM->getExpectedCapacity(km->getMinRecordSize(), allocFlags);
-                size32_t rowSize = rM->getExpectedCapacity(helper->queryOutputMeta()->getMinRecordSize(), allocFlags);
+                memsize_t keySize = rM->getExpectedCapacity(km->getMinRecordSize(), allocFlags);
+                memsize_t rowSize = rM->getExpectedCapacity(helper->queryOutputMeta()->getMinRecordSize(), allocFlags);
                 if (keySize >= rowSize)
                     extractKey = false;
             }
@@ -2760,7 +2760,7 @@ unsigned CBucketHandler::getBucketEstimate(rowcount_t totalRows) const
         roxiemem::IRowManager *rM = owner.queryJob().queryRowManager();
 
         memsize_t availMem = roxiemem::getTotalMemoryLimit()-0x500000;
-        size32_t initKeySize = rM->getExpectedCapacity(keyIf->queryRowMetaData()->getMinRecordSize(), owner.allocFlags);
+        memsize_t initKeySize = rM->getExpectedCapacity(keyIf->queryRowMetaData()->getMinRecordSize(), owner.allocFlags);
         memsize_t minBucketSpace = retBuckets * rM->getExpectedCapacity(HASHDEDUP_HT_BUCKET_SIZE * sizeof(void *), owner.allocFlags);
 
         rowcount_t _maxRowGuess = (availMem-minBucketSpace) / initKeySize; // without taking into account ht space / other overheads