|
@@ -270,12 +270,12 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
|
|
|
{
|
|
|
pos = 0;
|
|
|
outputOffset = (offset_t)-1;
|
|
|
- owner->rows.registerWriteCallback(*this);
|
|
|
+ owner->rows.registerWriteCallback(*this); // NB: CStream constructor called within rows lock
|
|
|
}
|
|
|
~CStream()
|
|
|
{
|
|
|
spillStream.clear(); // NB: clear stream 1st
|
|
|
- owner->rows.unregisterWriteCallback(*this);
|
|
|
+ owner->rows.safeUnregisterWriteCallback(*this);
|
|
|
owner.clear();
|
|
|
}
|
|
|
// IRowStream
|
|
@@ -325,17 +325,14 @@ public:
|
|
|
}
|
|
|
IRowStream *createRowStream()
|
|
|
{
|
|
|
+ CRowsLockBlock block(*this);
|
|
|
+ if (spillFile) // already spilled?
|
|
|
{
|
|
|
- // already spilled?
|
|
|
- CRowsLockBlock block(*this);
|
|
|
- if (spillFile)
|
|
|
- {
|
|
|
- block.clearCB = true;
|
|
|
- unsigned rwFlags = DEFAULT_RWFLAGS;
|
|
|
- if (preserveNulls)
|
|
|
- rwFlags |= rw_grouped;
|
|
|
- return ::createRowStream(spillFile, rowIf, rwFlags);
|
|
|
- }
|
|
|
+ block.clearCB = true;
|
|
|
+ unsigned rwFlags = DEFAULT_RWFLAGS;
|
|
|
+ if (preserveNulls)
|
|
|
+ rwFlags |= rw_grouped;
|
|
|
+ return ::createRowStream(spillFile, rowIf, rwFlags);
|
|
|
}
|
|
|
return new CStream(*this);
|
|
|
}
|
|
@@ -1174,12 +1171,22 @@ void CThorExpandingRowArray::deserializeExpand(size32_t sz, const void *data)
|
|
|
|
|
|
void CThorSpillableRowArray::registerWriteCallback(IWritePosCallback &cb)
|
|
|
{
|
|
|
- CThorArrayLockBlock block(*this);
|
|
|
writeCallbacks.append(cb); // NB not linked to avoid circular dependency
|
|
|
}
|
|
|
|
|
|
void CThorSpillableRowArray::unregisterWriteCallback(IWritePosCallback &cb)
|
|
|
{
|
|
|
+ writeCallbacks.zap(cb);
|
|
|
+}
|
|
|
+
|
|
|
+void CThorSpillableRowArray::safeRegisterWriteCallback(IWritePosCallback &cb)
|
|
|
+{
|
|
|
+ CThorArrayLockBlock block(*this);
|
|
|
+ writeCallbacks.append(cb); // NB not linked to avoid circular dependency
|
|
|
+}
|
|
|
+
|
|
|
+void CThorSpillableRowArray::safeUnregisterWriteCallback(IWritePosCallback &cb)
|
|
|
+{
|
|
|
CThorArrayLockBlock block(*this);
|
|
|
writeCallbacks.zap(cb);
|
|
|
}
|