Browse Source

Merge pull request #4554 from jakesmith/hpcc-9555

HPCC-9555 - Fix possible crash when hashdedup spills

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 12 years ago
parent
commit
9f4b97c84b
1 changed files with 32 additions and 27 deletions
  1. 32 27
      thorlcr/activities/hashdistrib/thhashdistribslave.cpp

+ 32 - 27
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -2294,7 +2294,7 @@ class CBucket : public CSimpleInterface, implements IInterface
     ICompare *iCompare;
     Owned<IEngineRowAllocator> _keyAllocator;
     IEngineRowAllocator *keyAllocator;
-    CHashTableRowTable &htRows;
+    CHashTableRowTable *htRows;
     bool extractKey, spilt;
     CriticalSection lock;
     unsigned bucketN;
@@ -2303,7 +2303,7 @@ class CBucket : public CSimpleInterface, implements IInterface
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CBucket(HashDedupSlaveActivityBase &_owner, IRowInterfaces *_rowIf, IRowInterfaces *_keyIf, IHash *_iRowHash, IHash *_iKeyHash, ICompare *_iCompare, bool _extractKey, unsigned _bucketN, CHashTableRowTable &_htRows);
+    CBucket(HashDedupSlaveActivityBase &_owner, IRowInterfaces *_rowIf, IRowInterfaces *_keyIf, IHash *_iRowHash, IHash *_iKeyHash, ICompare *_iCompare, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows);
     void setSpilt()
     {
         if (spilt)
@@ -2317,10 +2317,11 @@ public:
     void clear();
     bool clearHashTable(bool ptrTable) // returns true if freed mem
     {
+        dbgassertex(htRows);
         if (ptrTable)
-            return htRows.kill();
+            return htRows->kill();
         else
-            return htRows.clear();
+            return htRows->clear();
     }
     bool spillHashTable(); // returns true if freed mem
     bool rehash();
@@ -2331,7 +2332,7 @@ public:
     }
     inline IRowStream *getRowStream(rowcount_t *count) { return rowSpill.getReader(count); }
     inline IRowStream *getKeyStream(rowcount_t *count) { return keySpill.getReader(count); }
-    inline rowidx_t getKeyCount() const { return htRows.queryHtElements(); }
+    inline rowidx_t getKeyCount() const { return htRows->queryHtElements(); }
     inline rowcount_t getSpiltRowCount() const { return rowSpill.getCount(); }
     inline rowcount_t getSpiltKeyCount() const { return keySpill.getCount(); }
     inline bool isSpilt() const { return spilt; }
@@ -2653,12 +2654,12 @@ public:
             }
             else
             {
+                // If spill event occurred, disk buckets + key buckets will have been created by this stage.
+                bucketHandler->flushBuckets();
+
                 Owned<CBucketHandler> nextBucketHandler;
                 loop
                 {
-                    // If spill event occurred, disk buckets + key buckets will have been created by this stage.
-                    bucketHandler->flushBuckets();
-
                     // pop off parents until one has a bucket left to read
                     Owned<IRowStream> nextInput;
                     nextBucketHandler.setown(bucketHandler->getNextBucketHandler(nextInput));
@@ -2774,7 +2775,7 @@ void CHashTableRowTable::rehash(const void **newRows)
 
 //
 
-CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IRowInterfaces *_rowIf, IRowInterfaces *_keyIf, IHash *_iRowHash, IHash *_iKeyHash, ICompare *_iCompare, bool _extractKey, unsigned _bucketN, CHashTableRowTable &_htRows)
+CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IRowInterfaces *_rowIf, IRowInterfaces *_keyIf, IHash *_iRowHash, IHash *_iKeyHash, ICompare *_iCompare, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows)
     : owner(_owner), rowIf(_rowIf), keyIf(_keyIf), iRowHash(_iRowHash), iKeyHash(_iKeyHash), iCompare(_iCompare), extractKey(_extractKey), bucketN(_bucketN), htRows(_htRows),
       rowSpill(owner, _rowIf, "rows", _bucketN), keySpill(owner, _keyIf, "keys", _bucketN)
 
@@ -2794,7 +2795,11 @@ CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IRowInterfaces *_rowIf, IRo
 void CBucket::clear()
 {
     // bucket read-only after this (getKeyStream/getRowStream etc.)
-    clearHashTable(false);
+    if (htRows)
+    {
+        clearHashTable(false);
+        htRows = NULL;
+    }
 }
 
 bool CBucket::spillHashTable()
@@ -2802,16 +2807,16 @@ bool CBucket::spillHashTable()
     rowidx_t removeN;
     {
         CriticalBlock b(lock);
-        removeN = htRows.queryHtElements();
+        removeN = htRows->queryHtElements();
         if (0 == removeN || spilt) // NB: if split, will be handled by CBucket on different priority
             return false;
         setSpilt();
         // JCSMORE - could detach row table here and let 'lock' go whilst spilling to disk
         // would have to careful to ensure that when buck is closed, it waits for pending write
-        rowidx_t maxRows = htRows.queryMaxRows();
+        rowidx_t maxRows = htRows->queryMaxRows();
         for (rowidx_t i=0; i<maxRows; i++)
         {
-            OwnedConstThorRow key = htRows.getRowClear(i);
+            OwnedConstThorRow key = htRows->getRowClear(i);
             if (key)
                 keySpill.putRow(key.getClear());
         }
@@ -2827,7 +2832,7 @@ bool CBucket::addKey(const void *key, unsigned hashValue)
         if (!isSpilt())
         {
             bool doAdd = true;
-            if (!htRows.hasRoom())
+            if (!htRows->hasRoom())
             {
                 // attempt rehash
                 if (!rehash())
@@ -2835,9 +2840,9 @@ bool CBucket::addKey(const void *key, unsigned hashValue)
             }
             if (doAdd)
             {
-                unsigned htPos = hashValue % htRows.queryMaxRows();
+                unsigned htPos = hashValue % htRows->queryMaxRows();
                 LinkThorRow(key);
-                htRows.addRow(htPos, key);
+                htRows->addRow(htPos, key);
                 return true;
             }
         }
@@ -2858,11 +2863,11 @@ bool CBucket::rehash()
     OwnedConstThorRow newHtRows;
     {
         CriticalUnblock b(lock); // allocate may cause spill
-        newHtRows.setown(htRows.allocateNewTable());
+        newHtRows.setown(htRows->allocateNewTable());
     }
     if (!newHtRows)
         return false;
-    htRows.rehash((const void **)newHtRows.getClear());
+    htRows->rehash((const void **)newHtRows.getClear());
     return true;
 }
 
@@ -2871,17 +2876,17 @@ bool CBucket::addRow(const void *row, unsigned hashValue)
     {
         CriticalBlock b(lock);
         bool doAdd = true;
-        bool needRehash = !htRows.hasRoom();
+        bool needRehash = !htRows->hasRoom();
         if (needRehash)
         {
             if (isSpilt())
                 doAdd = false; // don't rehash if full and already spilt
         }
-        if (htRows.queryMaxRows()) // might be 0, if HT cleared
+        if (htRows->queryMaxRows()) // might be 0, if HT cleared
         {
             // Even if not adding, check HT for dedupping purposes upfront
-            unsigned htPos = hashValue % htRows.queryMaxRows();
-            if (htRows.lookupRow(htPos, row))
+            unsigned htPos = hashValue % htRows->queryMaxRows();
+            if (htRows->lookupRow(htPos, row))
                 return false; // dedupped
 
             if (doAdd)
@@ -2889,9 +2894,9 @@ bool CBucket::addRow(const void *row, unsigned hashValue)
                 if (needRehash)
                 {
                     if (rehash()) // even if rehash fails, there may be room to continue (following a flush)
-                        htPos = hashValue % htRows.queryMaxRows();
+                        htPos = hashValue % htRows->queryMaxRows();
                 }
-                if (htRows.hasRoom())
+                if (htRows->hasRoom())
                 {
                     OwnedConstThorRow key;
                     if (extractKey)
@@ -2904,8 +2909,8 @@ bool CBucket::addRow(const void *row, unsigned hashValue)
                     }
                     else
                         key.set(row);
-                    if (htRows.queryMaxRows())
-                        htRows.addRow(htPos, key.getClear());
+                    if (htRows->queryMaxRows())
+                        htRows->addRow(htPos, key.getClear());
                     if (!isSpilt()) // could have spilt whilst extracting key
                         return true;
 
@@ -3028,7 +3033,7 @@ void CBucketHandler::init(unsigned _numBuckets, IRowStream *keyStream)
     for (unsigned i=0; i<numBuckets; i++)
     {
         CHashTableRowTable &htRows = owner.queryHashTable(i);
-        buckets[i] = new CBucket(owner, rowIf, keyIf, iRowHash, iKeyHash, iCompare, extractKey, i, htRows);
+        buckets[i] = new CBucket(owner, rowIf, keyIf, iRowHash, iKeyHash, iCompare, extractKey, i, &htRows);
         htRows.setOwner(buckets[i]);
     }
     ActPrintLog(&owner, "Max %d buckets, current depth = %d", numBuckets, depth+1);