|
@@ -1281,6 +1281,8 @@ public:
|
|
|
|
|
|
inline void addToSpaceList();
|
|
|
virtual void verifySpaceList();
|
|
|
+
|
|
|
+ inline bool isWithinHeap(CHeap * search) const { return heap == search; }
|
|
|
};
|
|
|
|
|
|
|
|
@@ -3152,6 +3154,19 @@ class BufferedRowCallbackManager
|
|
|
return numSuccess;
|
|
|
}
|
|
|
|
|
|
+ void report(const IContextLogger &logctx) const
|
|
|
+ {
|
|
|
+ StringBuffer msg;
|
|
|
+ msg.appendf(" ac(%u) cost(%u):", activityId, cost);
|
|
|
+ ForEachItemIn(i, callbacks)
|
|
|
+ {
|
|
|
+ if (i == nextCallback)
|
|
|
+ msg.append(" {").append(callbacks.item(i).first).append("}");
|
|
|
+ else
|
|
|
+ msg.append(" ").append(callbacks.item(i).first);
|
|
|
+ }
|
|
|
+ logctx.CTXLOG("%s", msg.str());
|
|
|
+ }
|
|
|
inline unsigned getSpillCost() const { return cost; }
|
|
|
inline unsigned getActivityId() const { return activityId; }
|
|
|
|
|
@@ -3272,6 +3287,14 @@ public:
|
|
|
return releaseBuffersThread->releaseBuffers(slaveId, maxSpillCost, critical);
|
|
|
}
|
|
|
|
|
|
+ void reportActive(const IContextLogger &logctx) const
|
|
|
+ {
|
|
|
+ logctx.CTXLOG("--Active callbacks--");
|
|
|
+ CriticalBlock block(callbackCrit);
|
|
|
+ ForEachItemIn(i, rowBufferCallbacks)
|
|
|
+ rowBufferCallbacks.item(i).report(logctx);
|
|
|
+ }
|
|
|
+
|
|
|
void runReleaseBufferThread()
|
|
|
{
|
|
|
loop
|
|
@@ -3401,7 +3424,7 @@ protected:
|
|
|
}
|
|
|
|
|
|
protected:
|
|
|
- CriticalSection callbackCrit;
|
|
|
+ mutable CriticalSection callbackCrit;
|
|
|
Semaphore releaseBuffersSem;
|
|
|
CIArrayOf<CallbackItem> rowBufferCallbacks;
|
|
|
PointerArrayOf<IBufferedRowCallback> activeCallbacks;
|
|
@@ -3490,7 +3513,6 @@ private:
|
|
|
CHugeHeap hugeHeap;
|
|
|
ITimeLimiter *timeLimit;
|
|
|
DataBufferBase *activeBuffs;
|
|
|
- const IContextLogger &logctx;
|
|
|
unsigned peakPages;
|
|
|
unsigned dataBuffs;
|
|
|
unsigned dataBuffPages;
|
|
@@ -3509,6 +3531,7 @@ private:
|
|
|
bool minimizeFootprintCritical;
|
|
|
|
|
|
protected:
|
|
|
+ const IContextLogger &logctx;
|
|
|
unsigned maxPageLimit;
|
|
|
unsigned spillPageLimit;
|
|
|
|
|
@@ -4065,7 +4088,7 @@ public:
|
|
|
|
|
|
//Try and directly free up some buffers. It is worth trying again if one of the release functions thinks it
|
|
|
//freed up some memory.
|
|
|
- //The following reduces the nubmer of times the callback is called, but I'm not sure how this affects
|
|
|
+ //The following reduces the number of times the callback is called, but I'm not sure how this affects
|
|
|
//performance. I think better if a single free is likely to free up some memory, and worse if not.
|
|
|
const bool skipReleaseIfAnotherThreadReleases = true;
|
|
|
if (!releaseCallbackMemory(maxSpillCost, true, skipReleaseIfAnotherThreadReleases, lastReleaseSeq))
|
|
@@ -4347,6 +4370,7 @@ public:
|
|
|
virtual void setMinimizeFootprint(bool value, bool critical) { throwUnexpected(); }
|
|
|
virtual void setReleaseWhenModifyCallback(bool value, bool critical) { throwUnexpected(); }
|
|
|
virtual unsigned querySlaveId() const { return slaveId; }
|
|
|
+ virtual void reportMemoryUsage(bool peak) const;
|
|
|
|
|
|
protected:
|
|
|
virtual unsigned getPageLimit() const;
|
|
@@ -4426,6 +4450,12 @@ public:
|
|
|
|
|
|
virtual unsigned querySlaveId() const { return 0; }
|
|
|
|
|
|
+ virtual void reportMemoryUsage(bool peak) const
|
|
|
+ {
|
|
|
+ CChunkingRowManager::reportMemoryUsage(peak);
|
|
|
+ callbacks.reportActive(logctx);
|
|
|
+ }
|
|
|
+
|
|
|
protected:
|
|
|
virtual void addRowBuffer(IBufferedRowCallback * callback)
|
|
|
{
|
|
@@ -4559,6 +4589,11 @@ unsigned CSlaveRowManager::getPageLimit() const
|
|
|
return globalManager->getSlavePageLimit(slaveId);
|
|
|
}
|
|
|
|
|
|
+void CSlaveRowManager::reportMemoryUsage(bool peak) const
|
|
|
+{
|
|
|
+ CChunkingRowManager::reportMemoryUsage(peak);
|
|
|
+ globalManager->reportMemoryUsage(peak);
|
|
|
+}
|
|
|
|
|
|
//================================================================================
|
|
|
|
|
@@ -4672,6 +4707,8 @@ void CHugeHeap::expandHeap(void * original, memsize_t copysize, memsize_t oldcap
|
|
|
unsigned newPages = PAGES(newsize + HugeHeaplet::dataOffset(), HEAP_ALIGNMENT_SIZE);
|
|
|
unsigned oldPages = PAGES(oldcapacity + HugeHeaplet::dataOffset(), HEAP_ALIGNMENT_SIZE);
|
|
|
void *oldbase = (void *) ((memsize_t) original & HEAP_ALIGNMENT_MASK);
|
|
|
+ HugeHeaplet * oldHeaplet = (HugeHeaplet *)oldbase;
|
|
|
+ assertex(oldHeaplet->isWithinHeap(this));
|
|
|
|
|
|
//Check if we are shrinking the number of pages.
|
|
|
if (newPages <= oldPages)
|