|
@@ -235,12 +235,12 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
|
|
|
{
|
|
|
pos = 0;
|
|
|
outputOffset = (offset_t)-1;
|
|
|
- owner->rows.registerWriteCallback(*this);
|
|
|
+ owner->rows.safeRegisterWriteCallback(*this);
|
|
|
}
|
|
|
~CStream()
|
|
|
{
|
|
|
spillStream.clear(); // NB: clear stream 1st
|
|
|
- owner->rows.unregisterWriteCallback(*this);
|
|
|
+ owner->rows.safeUnregisterWriteCallback(*this);
|
|
|
owner.clear();
|
|
|
}
|
|
|
// IRowStream
|
|
@@ -262,12 +262,15 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
|
|
|
}
|
|
|
else if (pos == owner->rows.numCommitted())
|
|
|
{
|
|
|
- owner->clearSpillingCallback();
|
|
|
+ owner->rows.unregisterWriteCallback(*this); // no longer needed
|
|
|
return NULL;
|
|
|
}
|
|
|
return owner->rows.get(pos++);
|
|
|
}
|
|
|
- virtual void stop() { }
|
|
|
+ virtual void stop()
|
|
|
+ {
|
|
|
+ owner->rows.safeUnregisterWriteCallback(*this); // no longer needed
|
|
|
+ }
|
|
|
// IWritePosCallback
|
|
|
virtual rowidx_t queryRecordNumber()
|
|
|
{
|
|
@@ -1163,12 +1166,22 @@ void CThorExpandingRowArray::deserializeExpand(size32_t sz, const void *data)
|
|
|
|
|
|
void CThorSpillableRowArray::registerWriteCallback(IWritePosCallback &cb)
|
|
|
{
|
|
|
- CThorArrayLockBlock block(*this);
|
|
|
writeCallbacks.append(cb); // NB not linked to avoid circular dependency
|
|
|
}
|
|
|
|
|
|
void CThorSpillableRowArray::unregisterWriteCallback(IWritePosCallback &cb)
|
|
|
{
|
|
|
+ writeCallbacks.zap(cb);
|
|
|
+}
|
|
|
+
|
|
|
+void CThorSpillableRowArray::safeRegisterWriteCallback(IWritePosCallback &cb)
|
|
|
+{
|
|
|
+ CThorArrayLockBlock block(*this);
|
|
|
+ writeCallbacks.append(cb); // NB not linked to avoid circular dependency
|
|
|
+}
|
|
|
+
|
|
|
+void CThorSpillableRowArray::safeUnregisterWriteCallback(IWritePosCallback &cb)
|
|
|
+{
|
|
|
CThorArrayLockBlock block(*this);
|
|
|
writeCallbacks.zap(cb);
|
|
|
}
|