瀏覽代碼

Merge pull request #7689 from jakesmith/hpcc-14067

HPCC-14067 Avoid possible deadlock if spilling + callback called

Reviewed-By: Mark Kelly <mark.kelly@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 年之前
父節點
當前提交
9a54bfb275
共有 1 個文件被更改,包括 59 次插入18 次删除
  1. 59 18
      thorlcr/thorutil/thmem.cpp

+ 59 - 18
thorlcr/thorutil/thmem.cpp

@@ -183,6 +183,7 @@ protected:
     unsigned spillPriority;
     CThorSpillableRowArray rows;
     OwnedIFile spillFile;
+    bool mmRegistered;
 
     bool spillRows()
     {
@@ -201,9 +202,21 @@ protected:
         rows.kill(); // no longer needed, readers will pull from spillFile. NB: ok to kill array as rows is never written to or expanded
         return true;
     }
-    void clearSpillingCallback()
+    inline void addSpillingCallback()
+    {
+        if (!mmRegistered)
+        {
+            mmRegistered = true;
+            activity.queryJob().queryRowManager()->addRowBuffer(this);
+        }
+    }
+    inline void clearSpillingCallback()
     {
-        activity.queryJob().queryRowManager()->removeRowBuffer(this);
+        if (mmRegistered)
+        {
+            mmRegistered = false;
+            activity.queryJob().queryRowManager()->removeRowBuffer(this);
+        }
     }
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
@@ -211,9 +224,10 @@ public:
     CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriorirty)
         : activity(_activity), rowIf(_rowIf), rows(_activity, _rowIf, _preserveNulls), preserveNulls(_preserveNulls), spillPriority(_spillPriorirty)
     {
-    	assertex(inRows.isFlushed());
+        assertex(inRows.isFlushed());
         rows.swap(inRows);
         useCompression = false;
+        mmRegistered = false;
     }
     ~CSpillableStreamBase()
     {
@@ -229,9 +243,30 @@ public:
     }
     virtual bool freeBufferedRows(bool critical)
     {
+        if (spillFile) // i.e. if spilt already. NB: this is thread-safe, as 'spillFile' only set by spillRows() call below and can't be called on multiple threads concurrently.
+            return false;
         CThorArrayLockBlock block(rows);
         return spillRows();
     }
+friend class CRowsLockBlock;
+};
+
+class CRowsLockBlock
+{
+    CSpillableStreamBase &owner;
+public:
+    inline CRowsLockBlock(CSpillableStreamBase &_owner) : owner(_owner)
+    {
+        owner.rows.lock();
+        clearCB = false;
+    }
+    inline ~CRowsLockBlock()
+    {
+        owner.rows.unlock();
+        if (clearCB)
+            owner.clearSpillingCallback();
+    }
+    bool clearCB;
 };
 
 // NB: Shared/spillable, holds all rows in mem until needs to spill.
@@ -264,10 +299,10 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
         {
             if (spillStream)
                 return spillStream->nextRow();
-            CThorArrayLockBlock block(owner->rows);
+            CRowsLockBlock block(*owner);
             if (owner->spillFile) // i.e. has spilt
             {
-                owner->clearSpillingCallback();
+                block.clearCB = true;
                 assertex(((offset_t)-1) != outputOffset);
                 unsigned rwFlags = DEFAULT_RWFLAGS;
                 if (owner->preserveNulls)
@@ -278,7 +313,7 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
             }
             else if (pos == owner->rows.numCommitted())
             {
-                owner->clearSpillingCallback();
+                block.clearCB = true;
                 return NULL;
             }
             return owner->rows.get(pos++);
@@ -302,16 +337,16 @@ public:
     CSharedSpillableRowSet(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriority)
         : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls, _spillPriority)
     {
-        activity.queryJob().queryRowManager()->addRowBuffer(this);
+        addSpillingCallback();
     }
     IRowStream *createRowStream()
     {
         {
             // already spilled?
-            CThorArrayLockBlock block(rows);
+            CRowsLockBlock block(*this);
             if (spillFile)
             {
-                clearSpillingCallback();
+                block.clearCB = true;
                 unsigned rwFlags = DEFAULT_RWFLAGS;
                 if (preserveNulls)
                     rwFlags |= rw_grouped;
@@ -342,7 +377,7 @@ public:
         // a small amount of rows to read from swappable rows
         roxiemem::IRowManager *rowManager = activity.queryJob().queryRowManager();
         readRows = static_cast<const void * *>(rowManager->allocate(granularity * sizeof(void*), activity.queryContainer().queryId(), inRows.queryDefaultMaxSpillCost()));
-        activity.queryJob().queryRowManager()->addRowBuffer(this);
+        addSpillingCallback();
     }
     ~CSpillableStream()
     {
@@ -361,10 +396,10 @@ public:
             return spillStream->nextRow();
         if (pos == numReadRows)
         {
-            CThorArrayLockBlock block(rows);
+            CRowsLockBlock block(*this);
             if (spillFile)
             {
-                clearSpillingCallback();
+                block.clearCB = true;
                 unsigned rwFlags = DEFAULT_RWFLAGS;
                 if (preserveNulls)
                     rwFlags |= rw_grouped;
@@ -375,13 +410,16 @@ public:
             }
             rowidx_t available = rows.numCommitted();
             if (0 == available)
+            {
+                block.clearCB = true;
                 return NULL;
+            }
             rowidx_t fetch = (available >= granularity) ? granularity : available;
             // consume 'fetch' rows
             rows.readBlock(readRows, fetch);
             if (available == fetch)
             {
-                clearSpillingCallback();
+                block.clearCB = true;
                 rows.kill();
             }
             numReadRows = fetch;
@@ -392,7 +430,10 @@ public:
         ++pos;
         return row;
     }
-    virtual void stop() { }
+    virtual void stop()
+    {
+        clearSpillingCallback();
+    }
 };
 
 //====
@@ -796,12 +837,12 @@ bool CThorExpandingRowArray::binaryInsert(const void *row, ICompare &compare, bo
     binary_vec_insert_stable(row, rows, numRows, compare); // takes ownership of row
     if (dropLast)
     {
-    	// last row falls out, i.e. release last row and don't increment numRows
-    	dbgassertex(numRows); // numRows must be >=1 for dropLast
-    	ReleaseThorRow(rows[numRows]);
+        // last row falls out, i.e. release last row and don't increment numRows
+        dbgassertex(numRows); // numRows must be >=1 for dropLast
+        ReleaseThorRow(rows[numRows]);
     }
     else
-    	++numRows;
+        ++numRows;
     return true;
 }