|
@@ -2206,7 +2206,6 @@ public:
|
|
|
#define HASHDEDUP_HT_INC_SIZE 0x10000 // 64k (rows)
|
|
|
#define HASHDEDUP_BUCKETS_MIN 11 // (NB: prime #)
|
|
|
#define HASHDEDUP_BUCKETS_MAX 9973 // (NB: prime #)
|
|
|
-#define HASHDEDUP_BUCKET_POSTSPILL_PRIORITY 5 // very high, by this stage it's cheap to dispose of
|
|
|
|
|
|
class HashDedupSlaveActivityBase;
|
|
|
class CBucket;
|
|
@@ -2243,10 +2242,10 @@ public:
|
|
|
return true;
|
|
|
}
|
|
|
void init(rowidx_t sz);
|
|
|
- const void *allocateNewTable()
|
|
|
+ const void *allocateNewTable(unsigned maxSpillCost)
|
|
|
{
|
|
|
rowidx_t newMaxRows = maxRows+HASHDEDUP_HT_INC_SIZE;
|
|
|
- return allocateRowTable(newMaxRows);
|
|
|
+ return allocateRowTable(newMaxRows, maxSpillCost);
|
|
|
}
|
|
|
void rehash(const void **newRows);
|
|
|
bool lookupRow(unsigned htPos, const void *row) const // return true == match
|
|
@@ -2452,7 +2451,7 @@ class CBucketHandler : public CSimpleInterface, implements IInterface, implement
|
|
|
// IBufferedRowCallback
|
|
|
virtual unsigned getSpillCost() const
|
|
|
{
|
|
|
- return HASHDEDUP_BUCKET_POSTSPILL_PRIORITY;
|
|
|
+ return SPILL_PRIORITY_HASHDEDUP_BUCKET_POSTSPILL;
|
|
|
}
|
|
|
virtual bool freeBufferedRows(bool critical)
|
|
|
{
|
|
@@ -2843,11 +2842,13 @@ CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IRowInterfaces *_rowIf, IRo
|
|
|
|
|
|
{
|
|
|
spilt = false;
|
|
|
- // ideally want rows in bucket to be contiguous, so when it spills, pages will be released
|
|
|
+ /* Although, using a unique allocator per bucket would mean on a spill event, the pages could be freed,
|
|
|
+ * it is too costly overall, because in effect it means a roxieimem page for each bucket is reserved.
|
|
|
+ * Sharing an allocator, will likely mean that pages are not be freed on spill events, but freed row space will be shared.
|
|
|
+ */
|
|
|
if (extractKey)
|
|
|
- { // use own allocator
|
|
|
- unsigned flags = owner.allocFlags | roxiemem::RHFunique;
|
|
|
- _keyAllocator.setown(owner.queryJob().getRowAllocator(keyIf->queryRowMetaData(), owner.queryActivityId(), (roxiemem::RoxieHeapFlags)flags));
|
|
|
+ {
|
|
|
+ _keyAllocator.setown(owner.queryJob().getRowAllocator(keyIf->queryRowMetaData(), owner.queryActivityId(), owner.allocFlags));
|
|
|
keyAllocator = _keyAllocator;
|
|
|
}
|
|
|
else
|
|
@@ -2957,7 +2958,7 @@ bool CBucket::rehash()
|
|
|
OwnedConstThorRow newHtRows;
|
|
|
{
|
|
|
CriticalUnblock b(lock); // allocate may cause spill
|
|
|
- newHtRows.setown(htRows->allocateNewTable());
|
|
|
+ newHtRows.setown(htRows->allocateNewTable(SPILL_PRIORITY_HASHDEDUP_REHASH)); // don't force other hash tables to spill for rehash
|
|
|
}
|
|
|
if (!newHtRows)
|
|
|
return false;
|
|
@@ -3169,8 +3170,10 @@ CBucketHandler *CBucketHandler::getNextBucketHandler(Owned<IRowStream> &nextInpu
|
|
|
if (bucket->isSpilt())
|
|
|
{
|
|
|
rowcount_t keyCount, count;
|
|
|
- // JCSMORE ideally, each key and row stream, would use a unique allocator per destination bucket
|
|
|
- // thereby keeping rows/keys together in pages, making it easier to free pages on spill requests
|
|
|
+ /* If each key and row stream were to use a unique allocator per destination bucket
|
|
|
+ * thereby keeping rows/keys together in pages, it would make it easier to free pages on spill requests.
|
|
|
+ * However, it would also mean a lot of allocators with at least one page per allocate, which ties up a lot of memory
|
|
|
+ */
|
|
|
Owned<IRowStream> keyStream = bucket->getKeyStream(&keyCount);
|
|
|
dbgassertex(keyStream);
|
|
|
Owned<CBucketHandler> newBucketHandler = new CBucketHandler(owner, rowIf, keyIf, iRowHash, iKeyHash, iCompare, extractKey, depth+1, div*numBuckets);
|