|
@@ -2559,6 +2559,7 @@ class CHashTableRowTable : private CThorExpandingRowArray
|
|
|
++pos;
|
|
|
if (row) return row;
|
|
|
}
|
|
|
+ // JCSMORE - could clear parent table at this point, i.e. free up ptr table
|
|
|
stopped = true;
|
|
|
return nullptr;
|
|
|
}
|
|
@@ -2741,8 +2742,8 @@ class CBucket : public CSimpleInterface
|
|
|
bool keepBest;
|
|
|
ICompare *keepBestCompare;
|
|
|
void doSpillHashTable();
|
|
|
- bool rowsInBucketDedupedAlready;
|
|
|
- bool streamed = true;
|
|
|
+ bool completed = false;
|
|
|
+ bool streamed = false;
|
|
|
|
|
|
public:
|
|
|
CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, IHash *_iRowHash, IHash *_iKeyHash, ICompare *_iCompare, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows);
|
|
@@ -2760,7 +2761,7 @@ public:
|
|
|
bool spillHashTable(bool critical); // returns true if freed mem
|
|
|
bool flush(bool critical);
|
|
|
bool rehash();
|
|
|
- void close()
|
|
|
+ void closeSpillStreams()
|
|
|
{
|
|
|
rowSpill.close();
|
|
|
keySpill.close();
|
|
@@ -2785,14 +2786,14 @@ public:
|
|
|
return htRows->queryRow(htPos);
|
|
|
return nullptr;
|
|
|
}
|
|
|
- inline void setRowsInBucketDeduped()
|
|
|
+ inline void setCompleted()
|
|
|
{
|
|
|
dbgassertex(!isSpilt());
|
|
|
- rowsInBucketDedupedAlready=true;
|
|
|
+ completed = true;
|
|
|
}
|
|
|
- inline bool areRowsInBucketDeduped() const
|
|
|
+ inline bool isCompleted() const
|
|
|
{
|
|
|
- return rowsInBucketDedupedAlready;
|
|
|
+ return completed;
|
|
|
}
|
|
|
};
|
|
|
|
|
@@ -2811,7 +2812,6 @@ class CBucketHandler : public CSimpleInterface, implements IInterface, implement
|
|
|
mutable rowidx_t peakKeyCount;
|
|
|
bool callbacksInstalled = false;
|
|
|
unsigned nextBestBucket = 0;
|
|
|
- bool bestReady = false;
|
|
|
CriticalSection spillCrit;
|
|
|
|
|
|
rowidx_t getTotalBucketCount() const
|
|
@@ -2906,7 +2906,12 @@ public:
|
|
|
// The one left, will be last bucket standing and grown to fill mem
|
|
|
// it is still useful to use as much as poss. of remaining bucket HT as filter
|
|
|
if (bucket->spillHashTable(critical))
|
|
|
+ {
|
|
|
+ // If marked as done, then can close now (NB: must be closed before can be read by getNextBestRowStream())
|
|
|
+ if (bucket->isCompleted())
|
|
|
+ bucket->closeSpillStreams(); // close stream now, to flush rows out in write streams, so ready to be read
|
|
|
return true;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
while (nextToSpill != start);
|
|
@@ -2931,35 +2936,57 @@ public:
|
|
|
{
|
|
|
return spillBucket(critical);
|
|
|
}
|
|
|
- IRowStream * getNextBestRowStream()
|
|
|
+
|
|
|
+ void checkCompletedBuckets()
|
|
|
{
|
|
|
// NB: Called only once input has been read
|
|
|
CriticalBlock b(spillCrit); // buckets can still be spilt
|
|
|
- if (!bestReady)
|
|
|
- {
|
|
|
- // All non-spilled buckets in memory at this point in time are deduped
|
|
|
- // -> set flag in all these buckets just in case they are spilled so that
|
|
|
- // when they need to be streamed back it's not necessary to dedup again
|
|
|
- bestReady = true;
|
|
|
- for (unsigned cur=0; cur<numBuckets; cur++)
|
|
|
- {
|
|
|
- if (!buckets[cur]->isSpilt())
|
|
|
- buckets[cur]->setRowsInBucketDeduped();
|
|
|
- }
|
|
|
+ // All non-spilled buckets in memory at this point in time are fully deduped
|
|
|
+ // -> set flag in all these buckets just in case they are spilled so that
|
|
|
+ // if they need to be streamed back it's not necessary to dedup again
|
|
|
+ for (unsigned cur=0; cur<numBuckets; cur++)
|
|
|
+ {
|
|
|
+ CBucket &bucket = *buckets[cur];
|
|
|
+ if (bucket.isSpilt())
|
|
|
+ bucket.closeSpillStreams(); // close stream now, to flush rows out in write streams, so ready to be read
|
|
|
+ else
|
|
|
+ bucket.setCompleted();
|
|
|
}
|
|
|
+ }
|
|
|
+/*
|
|
|
+ * NB: getNextBestRowStream() is only used when BEST involved
|
|
|
+ * It is called once all input rows have been consumed.
|
|
|
+ *
|
|
|
+ * Returns: a stream of next available unspilt, or spilt buckets
|
|
|
+ * that were marked complete (setCompleted())
|
|
|
+ */
|
|
|
+ IRowStream * getNextBestRowStream()
|
|
|
+ {
|
|
|
+ // NB: Called only once input has been read
|
|
|
+ CriticalBlock b(spillCrit); // buckets can still be spilt
|
|
|
while (nextBestBucket < numBuckets)
|
|
|
{
|
|
|
CBucket *bucket = buckets[nextBestBucket++];
|
|
|
+
|
|
|
+ /* JCSMORE - It would be better to prioritize non-spilt buckets first
|
|
|
+ * So that those memory consuming buckets are consumed 1st.
|
|
|
+ */
|
|
|
if (bucket->isSpilt())
|
|
|
{
|
|
|
- if (bucket->areRowsInBucketDeduped())
|
|
|
+ if (bucket->isCompleted())
|
|
|
{
|
|
|
+ /* NB: getSpillRowStream() will be empty, because this bucket completed
|
|
|
+ * And key stream are whole rows because BEST does not extract key.
|
|
|
+ */
|
|
|
rowcount_t count; // unused
|
|
|
- return bucket->getSpillRowStream(&count);
|
|
|
+ return bucket->getSpillKeyStream(&count);
|
|
|
}
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
+ /* JCSMORE - this should really create a stream that is spillable
|
|
|
+ * As it is, getRowStream() marks the bucket as unspillable until the stream is consumed
|
|
|
+ */
|
|
|
if (bucket->getKeyCount())
|
|
|
return bucket->getRowStream();
|
|
|
}
|
|
@@ -3167,6 +3194,16 @@ public:
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
+ // end of input from phase
|
|
|
+
|
|
|
+ // For testing only: spill one bucket. NB: this is in effect before finished reading input. i.e. the buckets are not yet marked as completed
|
|
|
+ if (testSpillTimes)
|
|
|
+ {
|
|
|
+ bucketHandler->spillBucket(false);
|
|
|
+ testSpillTimes--;
|
|
|
+ }
|
|
|
+
|
|
|
+ bucketHandler->checkCompletedBuckets();
|
|
|
// Keepbest has populated the hashtable with the best rows
|
|
|
// -> stream back best rows from hash table
|
|
|
if (keepBest)
|
|
@@ -3178,8 +3215,10 @@ public:
|
|
|
testSpillTimes--;
|
|
|
}
|
|
|
|
|
|
- /* Get next available best IRowStream, i.e. buckets that did not spill before end of input.
|
|
|
- * The bucket whose stream is returned is no longer be spillable.
|
|
|
+ /* Get next available best IRowStream, i.e. buckets that did not spill before end of input or,
|
|
|
+ * buckets which were marked complete, but have since spilt.
|
|
|
+ * The bucket whose stream is returned is no longer spillable
|
|
|
+ * (JCSMORE - but could use a spillable stream impl. so they were spillable)
|
|
|
* Other buckets continue to be, but are marked to be ignored by future handler stages.
|
|
|
*/
|
|
|
bestRowStream.setown(bucketHandler->getNextBestRowStream());
|
|
@@ -3313,7 +3352,7 @@ void CHashTableRowTable::rehash(const void **newRows)
|
|
|
|
|
|
CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, IHash *_iRowHash, IHash *_iKeyHash, ICompare *_iCompare, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows)
|
|
|
: owner(_owner), rowIf(_rowIf), keyIf(_keyIf), iRowHash(_iRowHash), iKeyHash(_iKeyHash), iCompare(_iCompare), extractKey(_extractKey), bucketN(_bucketN), htRows(_htRows),
|
|
|
- rowSpill(owner, _rowIf, "rows", _bucketN), keySpill(owner, _keyIf, "keys", _bucketN), rowsInBucketDedupedAlready(false)
|
|
|
+ rowSpill(owner, _rowIf, "rows", _bucketN), keySpill(owner, _keyIf, "keys", _bucketN)
|
|
|
|
|
|
{
|
|
|
spilt = false;
|
|
@@ -3561,13 +3600,7 @@ void CBucketHandler::flushBuckets()
|
|
|
{
|
|
|
clearCallbacks();
|
|
|
for (unsigned i=0; i<numBuckets; i++)
|
|
|
- {
|
|
|
- CBucket &bucket = *buckets[i];
|
|
|
- bucket.clear();
|
|
|
- // close stream now, to flush rows out in write streams
|
|
|
- if (bucket.isSpilt())
|
|
|
- bucket.close();
|
|
|
- }
|
|
|
+ buckets[i]->clear();
|
|
|
}
|
|
|
|
|
|
unsigned CBucketHandler::getBucketEstimateWithPrev(rowcount_t totalRows, rowidx_t prevPeakKeys, rowidx_t keyCount) const
|
|
@@ -3681,7 +3714,7 @@ CBucketHandler *CBucketHandler::getNextBucketHandler(Owned<IRowStream> &nextInpu
|
|
|
while (currentBucket<numBuckets)
|
|
|
{
|
|
|
CBucket *bucket = buckets[currentBucket];
|
|
|
- if (bucket->isSpilt() && !bucket->areRowsInBucketDeduped())
|
|
|
+ if (bucket->isSpilt() && !bucket->isCompleted())
|
|
|
{
|
|
|
rowcount_t keyCount, count;
|
|
|
/* If each key and row stream were to use a unique allocator per target bucket
|