Przeglądaj źródła

HPCC-11968 Don't use unique allocators per bucket

Unique allocators per bucket, were there as an optimization, to
help pages become free when a bucket spill event occurred.
However, the upshot of having a unique allocator per bucket, was
a reserved roxiemem page per bucket, which meant a very high
initial overhead for the bucket set and premature spilling.

Revert to shared allocator.

Also, avoid rehash() causing other buckets to spill, use
maxSpillCost on table resize to ensure only lower priority
spilling occurs.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 11 lat temu
rodzic
commit
5aa7bbd2e5

+ 14 - 11
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -2206,7 +2206,6 @@ public:
 #define HASHDEDUP_HT_INC_SIZE 0x10000 // 64k (rows)
 #define HASHDEDUP_BUCKETS_MIN 11 // (NB: prime #)
 #define HASHDEDUP_BUCKETS_MAX 9973 // (NB: prime #)
-#define HASHDEDUP_BUCKET_POSTSPILL_PRIORITY 5 // very high, by this stage it's cheap to dispose of
 
 class HashDedupSlaveActivityBase;
 class CBucket;
@@ -2243,10 +2242,10 @@ public:
         return true;
     }
     void init(rowidx_t sz);
-    const void *allocateNewTable()
+    const void *allocateNewTable(unsigned maxSpillCost)
     {
         rowidx_t newMaxRows = maxRows+HASHDEDUP_HT_INC_SIZE;
-        return allocateRowTable(newMaxRows);
+        return allocateRowTable(newMaxRows, maxSpillCost);
     }
     void rehash(const void **newRows);
     bool lookupRow(unsigned htPos, const void *row) const // return true == match
@@ -2452,7 +2451,7 @@ class CBucketHandler : public CSimpleInterface, implements IInterface, implement
     // IBufferedRowCallback
         virtual unsigned getSpillCost() const
         {
-            return HASHDEDUP_BUCKET_POSTSPILL_PRIORITY;
+            return SPILL_PRIORITY_HASHDEDUP_BUCKET_POSTSPILL;
         }
         virtual bool freeBufferedRows(bool critical)
         {
@@ -2843,11 +2842,13 @@ CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IRowInterfaces *_rowIf, IRo
 
 {
     spilt = false;
-    // ideally want rows in bucket to be contiguous, so when it spills, pages will be released
+    /* Although, using a unique allocator per bucket would mean on a spill event, the pages could be freed,
+     * it is too costly overall, because in effect it means a roxieimem page for each bucket is reserved.
+     * Sharing an allocator, will likely mean that pages are not be freed on spill events, but freed row space will be shared.
+     */
     if (extractKey)
-    {   // use own allocator
-        unsigned flags = owner.allocFlags | roxiemem::RHFunique;
-        _keyAllocator.setown(owner.queryJob().getRowAllocator(keyIf->queryRowMetaData(), owner.queryActivityId(), (roxiemem::RoxieHeapFlags)flags));
+    {
+        _keyAllocator.setown(owner.queryJob().getRowAllocator(keyIf->queryRowMetaData(), owner.queryActivityId(), owner.allocFlags));
         keyAllocator = _keyAllocator;
     }
     else
@@ -2957,7 +2958,7 @@ bool CBucket::rehash()
     OwnedConstThorRow newHtRows;
     {
         CriticalUnblock b(lock); // allocate may cause spill
-        newHtRows.setown(htRows->allocateNewTable());
+        newHtRows.setown(htRows->allocateNewTable(SPILL_PRIORITY_HASHDEDUP_REHASH)); // don't force other hash tables to spill for rehash
     }
     if (!newHtRows)
         return false;
@@ -3169,8 +3170,10 @@ CBucketHandler *CBucketHandler::getNextBucketHandler(Owned<IRowStream> &nextInpu
         if (bucket->isSpilt())
         {
             rowcount_t keyCount, count;
-            // JCSMORE ideally, each key and row stream, would use a unique allocator per destination bucket
-            // thereby keeping rows/keys together in pages, making it easier to free pages on spill requests
+            /* If each key and row stream were to use a unique allocator per destination bucket
+             * thereby keeping rows/keys together in pages, it would make it easier to free pages on spill requests.
+             * However, it would also mean a lot of allocators with at least one page per allocate, which ties up a lot of memory
+             */
             Owned<IRowStream> keyStream = bucket->getKeyStream(&keyCount);
             dbgassertex(keyStream);
             Owned<CBucketHandler> newBucketHandler = new CBucketHandler(owner, rowIf, keyIf, iRowHash, iKeyHash, iCompare, extractKey, depth+1, div*numBuckets);

+ 4 - 0
thorlcr/thorutil/thmem.hpp

@@ -201,6 +201,7 @@ enum {
 graph_decl StringBuffer &getRecordString(const void *key, IOutputRowSerializer *serializer, const char *prefix, StringBuffer &out);
 
 //NB: low priorities are spilt 1st
+#define SPILL_PRIORITY_VERYLOW 50
 #define SPILL_PRIORITY_LOW  100
 #define SPILL_PRIORITY_HIGH 1000000
 #define SPILL_PRIORITY_DEFAULT SPILL_PRIORITY_LOW
@@ -211,7 +212,10 @@ graph_decl StringBuffer &getRecordString(const void *key, IOutputRowSerializer *
 #define SPILL_PRIORITY_RESULT SPILL_PRIORITY_LOW
 
 #define SPILL_PRIORITY_GROUPSORT SPILL_PRIORITY_LOW+1000
+
+#define SPILL_PRIORITY_HASHDEDUP_REHASH SPILL_PRIORITY_LOW+1900
 #define SPILL_PRIORITY_HASHDEDUP SPILL_PRIORITY_LOW+2000
+#define SPILL_PRIORITY_HASHDEDUP_BUCKET_POSTSPILL SPILL_PRIORITY_VERYLOW // very low, by this stage it's cheap to dispose of
 
 #define SPILL_PRIORITY_JOIN SPILL_PRIORITY_HIGH
 #define SPILL_PRIORITY_SELFJOIN SPILL_PRIORITY_HIGH