Browse Source

HPCC-14083 Avoid early removal of spilling callback

When an output of a shared spilling stream is exhausted the
spill callback was removed, this commit leaves the callback in
place, but removes the write position callback, which is no
longer needed for the completed output.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 9 years ago
parent
commit
f9a555b5f3
2 changed files with 20 additions and 5 deletions
  1. 18 5
      thorlcr/thorutil/thmem.cpp
  2. 2 0
      thorlcr/thorutil/thmem.hpp

+ 18 - 5
thorlcr/thorutil/thmem.cpp

@@ -235,12 +235,12 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
         {
             pos = 0;
             outputOffset = (offset_t)-1;
-            owner->rows.registerWriteCallback(*this);
+            owner->rows.safeRegisterWriteCallback(*this);
         }
         ~CStream()
         {
             spillStream.clear(); // NB: clear stream 1st
-            owner->rows.unregisterWriteCallback(*this);
+            owner->rows.safeUnregisterWriteCallback(*this);
             owner.clear();
         }
     // IRowStream
@@ -262,12 +262,15 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
             }
             else if (pos == owner->rows.numCommitted())
             {
-                owner->clearSpillingCallback();
+                owner->rows.unregisterWriteCallback(*this); // no longer needed
                 return NULL;
             }
             return owner->rows.get(pos++);
         }
-        virtual void stop() { }
+        virtual void stop()
+        {
+            owner->rows.safeUnregisterWriteCallback(*this); // no longer needed
+        }
     // IWritePosCallback
         virtual rowidx_t queryRecordNumber()
         {
@@ -1163,12 +1166,22 @@ void CThorExpandingRowArray::deserializeExpand(size32_t sz, const void *data)
 
 void CThorSpillableRowArray::registerWriteCallback(IWritePosCallback &cb)
 {
-    CThorArrayLockBlock block(*this);
     writeCallbacks.append(cb); // NB not linked to avoid circular dependency
 }
 
 void CThorSpillableRowArray::unregisterWriteCallback(IWritePosCallback &cb)
 {
+    writeCallbacks.zap(cb);
+}
+
+void CThorSpillableRowArray::safeRegisterWriteCallback(IWritePosCallback &cb)
+{
+    CThorArrayLockBlock block(*this);
+    writeCallbacks.append(cb); // NB not linked to avoid circular dependency
+}
+
+void CThorSpillableRowArray::safeUnregisterWriteCallback(IWritePosCallback &cb)
+{
     CThorArrayLockBlock block(*this);
     writeCallbacks.zap(cb);
 }

+ 2 - 0
thorlcr/thorutil/thmem.hpp

@@ -412,6 +412,8 @@ public:
     }
     void registerWriteCallback(IWritePosCallback &cb);
     void unregisterWriteCallback(IWritePosCallback &cb);
+    void safeRegisterWriteCallback(IWritePosCallback &cb);
+    void safeUnregisterWriteCallback(IWritePosCallback &cb);
     inline void setAllowNulls(bool b) { CThorExpandingRowArray::setAllowNulls(b); }
     inline void setDefaultMaxSpillCost(unsigned defaultMaxSpillCost) { CThorExpandingRowArray::setDefaultMaxSpillCost(defaultMaxSpillCost); }
     inline unsigned queryDefaultMaxSpillCost() const { return CThorExpandingRowArray::queryDefaultMaxSpillCost(); }