Browse Source

Merge pull request #4568 from jakesmith/hpcc-9572

HPCC-9572 - Fix hash dedup row leak after spill event

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 12 years ago
parent
commit
038310c2b9
2 changed files with 31 additions and 15 deletions
  1. 29 15
      thorlcr/activities/hashdistrib/thhashdistribslave.cpp
  2. 2 0
      thorlcr/graph/thgraph.cpp

+ 29 - 15
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -2317,6 +2317,7 @@ public:
             return htRows->clear();
     }
     bool spillHashTable(); // returns true if freed mem
+    bool flush(bool critical);
     bool rehash();
     void close()
     {
@@ -2383,19 +2384,8 @@ class CBucketHandler : public CSimpleInterface, implements IInterface, implement
                 ++owner.nextSpilledBucketFlush;
                 if (owner.nextSpilledBucketFlush == owner.numBuckets)
                     owner.nextSpilledBucketFlush = 0;
-                if (bucket->isSpilt())
-                {
-                    rowidx_t count = bucket->getKeyCount();
-                    // want to avoid flushing tiny buckets (unless critical), to make room for a few rows repeatedly
-                    if (critical || (count >= HASHDEDUP_MINSPILL_THRESHOLD))
-                    {
-                        if (bucket->clearHashTable(critical))
-                        {
-                            PROGLOG("Flushed bucket %d - %d elements", bucket->queryBucketNumber(), count);
-                            return true;
-                        }
-                    }
-                }
+                if (bucket->flush(critical))
+                    return true;
                 if (startNum == owner.nextSpilledBucketFlush)
                     break;
             }
@@ -2822,6 +2812,25 @@ bool CBucket::spillHashTable()
     return true;
 }
 
+bool CBucket::flush(bool critical)
+{
+    CriticalBlock b(lock);
+    if (isSpilt())
+    {
+        rowidx_t count = getKeyCount();
+        // want to avoid flushing tiny buckets (unless critical), to make room for a few rows repeatedly
+        if (critical || (count >= HASHDEDUP_MINSPILL_THRESHOLD))
+        {
+            if (clearHashTable(critical))
+            {
+                PROGLOG("Flushed(%s) bucket %d - %d elements", critical?"(critical)":"", queryBucketNumber(), count);
+                return true;
+            }
+        }
+    }
+    return false;
+}
+
 bool CBucket::addKey(const void *key, unsigned hashValue)
 {
     {
@@ -2955,7 +2964,13 @@ void CBucketHandler::flushBuckets()
     owner.queryJob().queryRowManager()->removeRowBuffer(this);
     owner.queryJob().queryRowManager()->removeRowBuffer(&postSpillFlush);
     for (unsigned i=0; i<numBuckets; i++)
-        buckets[i]->clear();
+    {
+        CBucket &bucket = *buckets[i];
+        bucket.clear();
+        // close stream now, to flush rows out in write streams
+        if (bucket.isSpilt())
+            bucket.close();
+    }
 }
 
 unsigned CBucketHandler::getBucketEstimateWithPrev(rowcount_t totalRows, rowidx_t prevPeakKeys, rowidx_t keyCount) const
@@ -3069,7 +3084,6 @@ CBucketHandler *CBucketHandler::getNextBucketHandler(Owned<IRowStream> &nextInpu
         CBucket *bucket = buckets[currentBucket];
         if (bucket->isSpilt())
         {
-            bucket->close();
             rowcount_t keyCount, count;
             // JCSMORE ideally, each key and row stream, would use a unique allocator per destination bucket
             // thereby keeping rows/keys together in pages, making it easier to free pages on spill requests

+ 2 - 0
thorlcr/graph/thgraph.cpp

@@ -2351,6 +2351,7 @@ public:
 
     CThorContextLogger(CJobBase &_job) : job(_job)
     {
+        traceLevel = 1;
     }
     virtual void CTXLOG(const char *format, ...) const
     {
@@ -2466,6 +2467,7 @@ void CJobBase::init()
 CJobBase::~CJobBase()
 {
     clean();
+    thorAllocator->queryRowManager()->reportMemoryUsage(false);
     PROGLOG("CJobBase resetting memory manager");
     thorAllocator.clear();