Przeglądaj źródła

HPCC-9572 - Fix hash dedup row leak after spill event

When hash dedup spills, it continues to use the bucket hash tables
for dedupping purposes. The memory manager callback to flush
the HT was not protected by a mutex, which meant the reset could
overlap with another thread adding new rows, causing the flush to
in effect lose those newly added rows.

Also, close all bucket spill streams at end of phase (instead of
waiting for each new level to start). Minor significance, but the
output streams will have some buffered rows, and it is better to
close and flush those out upfront, rather than leave them in memory
whilst the other phases are being handled.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 12 lat temu
rodzic
commit
c418d04b58

+ 29 - 15
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -2324,6 +2324,7 @@ public:
             return htRows->clear();
     }
     bool spillHashTable(); // returns true if freed mem
+    bool flush(bool critical);
     bool rehash();
     void close()
     {
@@ -2390,19 +2391,8 @@ class CBucketHandler : public CSimpleInterface, implements IInterface, implement
                 ++owner.nextSpilledBucketFlush;
                 if (owner.nextSpilledBucketFlush == owner.numBuckets)
                     owner.nextSpilledBucketFlush = 0;
-                if (bucket->isSpilt())
-                {
-                    rowidx_t count = bucket->getKeyCount();
-                    // want to avoid flushing tiny buckets (unless critical), to make room for a few rows repeatedly
-                    if (critical || (count >= HASHDEDUP_MINSPILL_THRESHOLD))
-                    {
-                        if (bucket->clearHashTable(critical))
-                        {
-                            PROGLOG("Flushed bucket %d - %d elements", bucket->queryBucketNumber(), count);
-                            return true;
-                        }
-                    }
-                }
+                if (bucket->flush(critical))
+                    return true;
                 if (startNum == owner.nextSpilledBucketFlush)
                     break;
             }
@@ -2825,6 +2815,25 @@ bool CBucket::spillHashTable()
     return true;
 }
 
+bool CBucket::flush(bool critical)
+{
+    CriticalBlock b(lock);
+    if (isSpilt())
+    {
+        rowidx_t count = getKeyCount();
+        // want to avoid flushing tiny buckets (unless critical), to make room for a few rows repeatedly
+        if (critical || (count >= HASHDEDUP_MINSPILL_THRESHOLD))
+        {
+            if (clearHashTable(critical))
+            {
+                PROGLOG("Flushed(%s) bucket %d - %d elements", critical?"(critical)":"", queryBucketNumber(), count);
+                return true;
+            }
+        }
+    }
+    return false;
+}
+
 bool CBucket::addKey(const void *key, unsigned hashValue)
 {
     {
@@ -2949,7 +2958,13 @@ void CBucketHandler::flushBuckets()
     owner.queryJob().queryRowManager()->removeRowBuffer(this);
     owner.queryJob().queryRowManager()->removeRowBuffer(&postSpillFlush);
     for (unsigned i=0; i<numBuckets; i++)
-        buckets[i]->clear();
+    {
+        CBucket &bucket = *buckets[i];
+        bucket.clear();
+        // close stream now, to flush rows out in write streams
+        if (bucket.isSpilt())
+            bucket.close();
+    }
 }
 
 unsigned CBucketHandler::getBucketEstimateWithPrev(rowcount_t totalRows, rowidx_t prevPeakKeys, rowidx_t keyCount) const
@@ -3063,7 +3078,6 @@ CBucketHandler *CBucketHandler::getNextBucketHandler(Owned<IRowStream> &nextInpu
         CBucket *bucket = buckets[currentBucket];
         if (bucket->isSpilt())
         {
-            bucket->close();
             rowcount_t keyCount, count;
             // JCSMORE ideally, each key and row stream, would use a unique allocator per destination bucket
             // thereby keeping rows/keys together in pages, making it easier to free pages on spill requests

+ 2 - 0
thorlcr/graph/thgraph.cpp

@@ -2351,6 +2351,7 @@ public:
 
     CThorContextLogger(CJobBase &_job) : job(_job)
     {
+        traceLevel = 1;
     }
     virtual void CTXLOG(const char *format, ...) const
     {
@@ -2466,6 +2467,7 @@ void CJobBase::init()
 CJobBase::~CJobBase()
 {
     clean();
+    thorAllocator->queryRowManager()->reportMemoryUsage(false);
     PROGLOG("CJobBase resetting memory manager");
     thorAllocator.clear();