|
@@ -566,16 +566,16 @@ static StringBuffer &memmap(StringBuffer &stats)
|
|
|
return stats.appendf("\nHeap size %u pages, %u free, largest block %u", heapTotalPages, freePages, maxBlock);
|
|
|
}
|
|
|
|
|
|
-static void throwHeapExhausted(unsigned allocatorId, unsigned pages)
|
|
|
+static void throwGlobalHeapExhausted(unsigned allocatorId, unsigned pages)
|
|
|
{
|
|
|
- VStringBuffer msg("Memory pool exhausted: pool id %u (%u pages) exhausted, requested %u active(%u) heap(%u/%u)", allocatorId, heapTotalPages, pages, activeRowManagers.load(), heapAllocated, heapTotalPages);
|
|
|
+ VStringBuffer msg("Global memory exhausted: pool id %u (%u pages) exhausted, requested %u active(%u) heap(%u/%u)", allocatorId, heapTotalPages, pages, activeRowManagers.load(), heapAllocated, heapTotalPages);
|
|
|
DBGLOG("%s", msg.str());
|
|
|
throw MakeStringExceptionDirect(ROXIEMM_MEMORY_POOL_EXHAUSTED, msg.str());
|
|
|
}
|
|
|
|
|
|
-static void throwHeapExhausted(unsigned allocatorId, unsigned newPages, unsigned oldPages)
|
|
|
+static void throwGlobalHeapExhausted(unsigned allocatorId, unsigned newPages, unsigned oldPages)
|
|
|
{
|
|
|
- VStringBuffer msg("Memory pool exhausted: pool id %u (%u pages) exhausted, requested %u, had %u active(%u) heap(%u/%u)", allocatorId, heapTotalPages, newPages, oldPages, activeRowManagers.load(), heapAllocated, heapTotalPages);
|
|
|
+ VStringBuffer msg("Global memory exhausted: pool id %u (%u pages) exhausted, requested %u, had %u active(%u) heap(%u/%u)", allocatorId, heapTotalPages, newPages, oldPages, activeRowManagers.load(), heapAllocated, heapTotalPages);
|
|
|
DBGLOG("%s", msg.str());
|
|
|
throw MakeStringExceptionDirect(ROXIEMM_MEMORY_POOL_EXHAUSTED, msg.str());
|
|
|
}
|
|
@@ -605,7 +605,7 @@ static void *suballoc_aligned(size32_t pages, bool returnNullWhenExhausted)
|
|
|
if (heapAllocated + pages > heapTotalPages) {
|
|
|
if (returnNullWhenExhausted)
|
|
|
return NULL;
|
|
|
- throwHeapExhausted(0, pages);
|
|
|
+ throwGlobalHeapExhausted(0, pages);
|
|
|
}
|
|
|
if (heapLargeBlockGranularity)
|
|
|
{
|
|
@@ -733,7 +733,7 @@ static void *suballoc_aligned(size32_t pages, bool returnNullWhenExhausted)
|
|
|
}
|
|
|
if (returnNullWhenExhausted)
|
|
|
return NULL;
|
|
|
- throwHeapExhausted(0, pages);
|
|
|
+ throwGlobalHeapExhausted(0, pages);
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
@@ -2890,6 +2890,8 @@ protected:
|
|
|
inline void internalLock() { heapletLock.enter(); }
|
|
|
inline void internalUnlock() { heapletLock.leave(); }
|
|
|
|
|
|
+ void throwHeapExhausted(unsigned allocatorId, unsigned pages) const;
|
|
|
+
|
|
|
protected:
|
|
|
unsigned flags; // before the pointer so it packs better in 64bit.
|
|
|
Heaplet * activeHeaplet = nullptr; // which block is the current candidate for adding rows.
|
|
@@ -4410,6 +4412,12 @@ public:
|
|
|
|
|
|
inline unsigned getActiveHeapPages() const { return totalHeapPages.load(std::memory_order_relaxed); }
|
|
|
|
|
|
+ virtual void throwHeapExhausted(unsigned allocatorId, unsigned pages)
|
|
|
+ {
|
|
|
+ VStringBuffer msg("Pool memory exhausted: pool id %u exhausted, requested %u heap(%u/%u) global(%u/%u)", allocatorId, pages, getActiveHeapPages(), getPageLimit(), heapAllocated, heapTotalPages);
|
|
|
+ DBGLOG("%s", msg.str());
|
|
|
+ throw MakeStringExceptionDirect(ROXIEMM_MEMORY_POOL_EXHAUSTED, msg.str());
|
|
|
+ }
|
|
|
|
|
|
protected:
|
|
|
void checkTraceAllocationSize(memsize_t _size, unsigned activityId);
|
|
@@ -4663,6 +4671,7 @@ public:
|
|
|
virtual void setReleaseWhenModifyCallback(bool value, bool critical) { throwUnexpected(); }
|
|
|
virtual unsigned querySlaveId() const { return slaveId; }
|
|
|
virtual void reportMemoryUsage(bool peak) const;
|
|
|
+ virtual void throwHeapExhausted(unsigned allocatorId, unsigned pages);
|
|
|
|
|
|
protected:
|
|
|
virtual unsigned getPageLimit() const;
|
|
@@ -4765,12 +4774,32 @@ protected:
|
|
|
|
|
|
//================================================================================
|
|
|
|
|
|
+/*
|
|
|
+ * The Global Memory Manager allows a single block of global memory to be shared by multiple slave channels.
|
|
|
+ *
|
|
|
+ * memLimit is the total memory that is available globally and to all the slave channels
|
|
|
+ * globalLimit is the maximum memory that can be allocated globally
|
|
|
+ *
|
|
|
+ * It has the following requirements:
|
|
|
+ * - Global row manager running out of memory needs to be able to spill any slave callbacks.
|
|
|
+ * - Slave row manager will need to be able to call global callbacks, but not other slaves.
|
|
|
+ * - Once an activity spills, other slaves should also preferentially spill that activity. (Ideally on other nodes as well).
|
|
|
+ *
|
|
|
+ * The memory available to the global row manager should be:
|
|
|
+ * totalMemory - sum(slave-allocated) limited to globalLimit.
|
|
|
+ *
|
|
|
+ * The memory available to each slave should be:
|
|
|
+ * (totalMemory - globallyAllocatedMemory) / numSlaves
|
|
|
+ *
|
|
|
+ * Note: This currently does not allow slave channels to use more than their fair share of memory.
|
|
|
+ */
|
|
|
class CGlobalRowManager : public CCallbackRowManager
|
|
|
{
|
|
|
public:
|
|
|
CGlobalRowManager(memsize_t _memLimit, memsize_t _globalLimit, unsigned _numSlaves, ITimeLimiter *_tl, const IContextLogger &_logctx, const IRowAllocatorCache *_allocatorCache, const IRowAllocatorCache **slaveAllocatorCaches, bool _ignoreLeaks, bool _outputOOMReports)
|
|
|
: CCallbackRowManager(_memLimit, _tl, _logctx, _allocatorCache, _ignoreLeaks, _outputOOMReports), numSlaves(_numSlaves)
|
|
|
{
|
|
|
+ DBGLOG("Create Global/Slave Row Manager %uMB total global can use max %uMB", (unsigned)(_memLimit / 0x100000), (unsigned)(_globalLimit / 0x100000));
|
|
|
assertex(_globalLimit <= _memLimit);
|
|
|
globalPageLimit = (unsigned) PAGES(_globalLimit, HEAP_ALIGNMENT_SIZE);
|
|
|
slaveRowManagers = new CChunkingRowManager * [numSlaves];
|
|
@@ -4822,12 +4851,16 @@ public:
|
|
|
virtual unsigned getPageLimit() const
|
|
|
{
|
|
|
//Sum the total pages allocated by the slaves. Not called very frequently.
|
|
|
- //Do this in preference to maintaining a slaveTotal sine that would involve an atomic add for
|
|
|
+ //Do this in preference to maintaining a slaveTotal since that would involve an atomic add for
|
|
|
//each slave page allocation.
|
|
|
unsigned slavePages = 0;
|
|
|
for (unsigned i=0; i < numSlaves; i++)
|
|
|
slavePages += slaveRowManagers[i]->getActiveHeapPages();
|
|
|
- return globalPageLimit - slavePages;
|
|
|
+ unsigned available = maxPageLimit - slavePages;
|
|
|
+ if (available > globalPageLimit)
|
|
|
+ return globalPageLimit;
|
|
|
+ else
|
|
|
+ return available;
|
|
|
}
|
|
|
|
|
|
virtual IRowManager * querySlaveRowManager(unsigned slave)
|
|
@@ -4836,6 +4869,13 @@ public:
|
|
|
return slaveRowManagers[slave];
|
|
|
}
|
|
|
|
|
|
+ void throwHeapExhausted(unsigned allocatorId, unsigned pages)
|
|
|
+ {
|
|
|
+ VStringBuffer msg("Shared global memory exhausted: pool id %u exhausted, requested %u heap(%u/%u/%u/%u)) global(%u/%u)", allocatorId, pages, getActiveHeapPages(), getPageLimit(), globalPageLimit, maxPageLimit, heapAllocated, heapTotalPages);
|
|
|
+ DBGLOG("%s", msg.str());
|
|
|
+ throw MakeStringExceptionDirect(ROXIEMM_MEMORY_POOL_EXHAUSTED, msg.str());
|
|
|
+ }
|
|
|
+
|
|
|
public:
|
|
|
unsigned getSlavePageLimit(unsigned slaveId) const
|
|
|
{
|
|
@@ -4887,6 +4927,14 @@ void CSlaveRowManager::reportMemoryUsage(bool peak) const
|
|
|
globalManager->reportMemoryUsage(peak);
|
|
|
}
|
|
|
|
|
|
+void CSlaveRowManager::throwHeapExhausted(unsigned allocatorId, unsigned pages)
|
|
|
+{
|
|
|
+ VStringBuffer msg("Channel memory exhausted: pool id %u exhausted, requested %u heap(%u/%u/%u)) global(%u/%u)", allocatorId, pages, getActiveHeapPages(), getPageLimit(), maxPageLimit, heapAllocated, heapTotalPages);
|
|
|
+ DBGLOG("%s", msg.str());
|
|
|
+ throw MakeStringExceptionDirect(ROXIEMM_MEMORY_POOL_EXHAUSTED, msg.str());
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
//================================================================================
|
|
|
|
|
|
void CRoxieFixedRowHeapBase::beforeDispose()
|
|
@@ -4931,6 +4979,11 @@ void * CRoxieVariableRowHeap::finalizeRow(void *final, memsize_t originalSize, m
|
|
|
|
|
|
//================================================================================
|
|
|
|
|
|
+void CHeap::throwHeapExhausted(unsigned allocatorId, unsigned pages) const
|
|
|
+{
|
|
|
+ rowManager->throwHeapExhausted(allocatorId, pages);
|
|
|
+}
|
|
|
+
|
|
|
//MORE: Make this a nested class??
|
|
|
HugeHeaplet * CHugeHeap::allocateHeaplet(memsize_t _size, unsigned allocatorId, unsigned maxSpillCost)
|
|
|
{
|
|
@@ -5061,7 +5114,7 @@ void CHugeHeap::expandHeap(void * original, memsize_t copysize, memsize_t oldcap
|
|
|
{
|
|
|
if (maxSpillCost == SpillAllCost)
|
|
|
rowManager->reportMemoryUsage(false);
|
|
|
- throwHeapExhausted(activityId, newPages, oldPages);
|
|
|
+ throwGlobalHeapExhausted(activityId, newPages, oldPages);
|
|
|
}
|
|
|
}
|
|
|
}
|