|
@@ -161,7 +161,6 @@ class CSpillable : public CSimpleInterfaceOf<roxiemem::IBufferedRowCallback>
|
|
|
protected:
|
|
|
bool mmRegistered = false;
|
|
|
bool mmActivated = false;
|
|
|
- bool clearCB = false; // if true, deregisters the roxiemem callback on deactivation, otherwise leaves registered but inactive.
|
|
|
unsigned spillPriority = SPILL_PRIORITY_DISABLE;
|
|
|
IThorRowInterfaces *rowIf = nullptr;
|
|
|
roxiemem::IRowManager *rowManager = nullptr;
|
|
@@ -196,12 +195,7 @@ public:
|
|
|
inline void deactivateSpillingCallback()
|
|
|
{
|
|
|
if (mmActivated)
|
|
|
- {
|
|
|
- if (clearCB)
|
|
|
- ensureSpillingCallbackRemoved(); // will re-add on next activateSpillingCallback()
|
|
|
- else // leave registered
|
|
|
- mmActivated = false;
|
|
|
- }
|
|
|
+ mmActivated = false;
|
|
|
}
|
|
|
inline void ensureSpillingCallbackInstalled()
|
|
|
{
|
|
@@ -389,7 +383,6 @@ public:
|
|
|
CSharedSpillableRowSet(CActivityBase &_activity, CThorSpillableRowArray &inRows, IThorRowInterfaces *_rowIf, EmptyRowSemantics _emptyRowSemantics, unsigned _spillPriority)
|
|
|
: CSpillableStreamBase(_activity, inRows, _rowIf, _emptyRowSemantics, _spillPriority)
|
|
|
{
|
|
|
- activateSpillingCallback();
|
|
|
}
|
|
|
IRowStream *createRowStream()
|
|
|
{
|
|
@@ -1723,6 +1716,8 @@ protected:
|
|
|
}
|
|
|
IRowStream *getStream(CThorExpandingRowArray *allMemRows, memsize_t *memUsage, bool shared)
|
|
|
{
|
|
|
+ bool activateSharedCallback = false;
|
|
|
+
|
|
|
{
|
|
|
CThorArrayLockBlock block(spillableRows); // ensure locked until deactivated
|
|
|
if (0 == outStreams++)
|
|
@@ -1771,6 +1766,10 @@ protected:
|
|
|
}
|
|
|
if (shared)
|
|
|
{
|
|
|
+ /* delay installing spilling callback, until after release of spillableRows lock
|
|
|
+ * because roxiemem's background thread may be blocked on that lock, and calling roxiemem::addRowBuffer here would cause deadlock
|
|
|
+ */
|
|
|
+ activateSharedCallback = true;
|
|
|
spillableRowSet.setown(new CSharedSpillableRowSet(activity, spillableRows, rowIf, emptyRowSemantics, spillPriority));
|
|
|
spillableRowSet->setTracingPrefix(tracingPrefix);
|
|
|
}
|
|
@@ -1783,6 +1782,9 @@ protected:
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ if (activateSharedCallback)
|
|
|
+ spillableRowSet->activateSpillingCallback();
|
|
|
+
|
|
|
|
|
|
// NB: CStreamFileOwner links CFileOwner - last usage will auto delete file
|
|
|
// which may be one of these streams or CThorRowCollectorBase itself
|