|
@@ -231,10 +231,7 @@ protected:
|
|
|
if (nesting)
|
|
|
nested.append(len, ptr);
|
|
|
else
|
|
|
- {
|
|
|
- size32_t sz = compressor.write(ptr, len);
|
|
|
- dbgassertex(sz);
|
|
|
- }
|
|
|
+ verifyex(0 != compressor.write(ptr, len));
|
|
|
}
|
|
|
virtual size32_t beginNested(size32_t count)
|
|
|
{
|
|
@@ -1240,7 +1237,9 @@ public:
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
+#ifdef _DEBUG
|
|
|
size32_t sz = recvMb.length();
|
|
|
+#endif
|
|
|
if (expander)
|
|
|
CSendBucket::deserializeCompress(recvMb, tempMb.clear(), *expander);
|
|
|
else
|
|
@@ -2207,7 +2206,6 @@ public:
|
|
|
//===========================================================================
|
|
|
class CHDRproportional: implements IHash, public CSimpleInterface
|
|
|
{
|
|
|
- CActivityBase *activity;
|
|
|
Owned<IFile> tempfile;
|
|
|
Owned<IRowStream> tempstrm;
|
|
|
IHThorHashDistributeArg *args;
|
|
@@ -2230,7 +2228,7 @@ class CHDRproportional: implements IHash, public CSimpleInterface
|
|
|
}
|
|
|
|
|
|
public:
|
|
|
- CHDRproportional(CActivityBase *_activity, IHThorHashDistributeArg *_args,mptag_t _mastertag) : activity(_activity)
|
|
|
+ CHDRproportional(IHThorHashDistributeArg *_args, mptag_t _mastertag)
|
|
|
{
|
|
|
args = _args;
|
|
|
statstag = _mastertag;
|
|
@@ -2436,7 +2434,7 @@ public:
|
|
|
PARENT::init(data, slaveData);
|
|
|
mptag_t tag = container.queryJobChannel().deserializeMPTag(data);
|
|
|
IHThorHashDistributeArg *distribargs = (IHThorHashDistributeArg *)queryHelper();
|
|
|
- partitioner.setown(new CHDRproportional(this, distribargs,tag));
|
|
|
+ partitioner.setown(new CHDRproportional(distribargs,tag));
|
|
|
ihash = partitioner;
|
|
|
setupDist = false;
|
|
|
}
|
|
@@ -2538,7 +2536,7 @@ class CHashTableRowTable : private CThorExpandingRowArray
|
|
|
{
|
|
|
CBucket *owner;
|
|
|
HashDedupSlaveActivityBase &activity;
|
|
|
- IHash *iRowHash, *iKeyHash;
|
|
|
+ IHash *iKeyHash;
|
|
|
ICompare *iCompare;
|
|
|
rowidx_t htElements, htMax;
|
|
|
|
|
@@ -2569,7 +2567,7 @@ class CHashTableRowTable : private CThorExpandingRowArray
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
|
|
|
|
- CHashTableRowTable(HashDedupSlaveActivityBase &_activity, IThorRowInterfaces *rowIf, IHash *_iRowHash, IHash *_iKeyHash, ICompare *_iCompare);
|
|
|
+ CHashTableRowTable(HashDedupSlaveActivityBase &_activity, IThorRowInterfaces *rowIf, IHash *_iKeyHash, ICompare *_iCompare);
|
|
|
inline const void *query(rowidx_t i) const { return CThorExpandingRowArray::query(i); }
|
|
|
inline void setOwner(CBucket *_owner) { owner = _owner; }
|
|
|
bool kill()
|
|
@@ -2729,9 +2727,7 @@ public:
|
|
|
class CBucket : public CSimpleInterface
|
|
|
{
|
|
|
HashDedupSlaveActivityBase &owner;
|
|
|
- IThorRowInterfaces *rowIf, *keyIf;
|
|
|
- IHash *iRowHash, *iKeyHash;
|
|
|
- ICompare *iCompare;
|
|
|
+ IThorRowInterfaces *keyIf;
|
|
|
Owned<IEngineRowAllocator> _keyAllocator;
|
|
|
IEngineRowAllocator *keyAllocator;
|
|
|
CHashTableRowTable *htRows;
|
|
@@ -2746,7 +2742,7 @@ class CBucket : public CSimpleInterface
|
|
|
bool streamed = false;
|
|
|
|
|
|
public:
|
|
|
- CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, IHash *_iRowHash, IHash *_iKeyHash, ICompare *_iCompare, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows);
|
|
|
+ CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows);
|
|
|
bool addKey(const void *key, unsigned hashValue);
|
|
|
bool addRow(const void *row, unsigned hashValue);
|
|
|
void clear();
|
|
@@ -3044,7 +3040,7 @@ protected:
|
|
|
hashTables = (CHashTableRowTable **)_hashTables.getArray();
|
|
|
for (unsigned i=numHashTables; i<_numHashTables; i++)
|
|
|
{
|
|
|
- hashTables[i] = new CHashTableRowTable(*this, keyRowInterfaces, iHash, iKeyHash, rowKeyCompare);
|
|
|
+ hashTables[i] = new CHashTableRowTable(*this, keyRowInterfaces, iKeyHash, rowKeyCompare);
|
|
|
hashTables[i]->init(initSize);
|
|
|
}
|
|
|
}
|
|
@@ -3065,7 +3061,6 @@ public:
|
|
|
keyRowInterfaces = NULL;
|
|
|
hashTables = NULL;
|
|
|
numHashTables = initialNumBuckets = 0;
|
|
|
- roxiemem::RoxieHeapFlags allocFlags = roxiemem::RHFnone;
|
|
|
appendOutputLinked(this);
|
|
|
keepBest = false;
|
|
|
testSpillTimes = getOptInt("testHashDedupSpillTimes");
|
|
@@ -3283,9 +3278,9 @@ friend class CHashTableRowTable;
|
|
|
friend class CBucket;
|
|
|
};
|
|
|
|
|
|
-CHashTableRowTable::CHashTableRowTable(HashDedupSlaveActivityBase &_activity, IThorRowInterfaces *rowIf, IHash *_iRowHash, IHash *_iKeyHash, ICompare *_iCompare)
|
|
|
+CHashTableRowTable::CHashTableRowTable(HashDedupSlaveActivityBase &_activity, IThorRowInterfaces *rowIf, IHash *_iKeyHash, ICompare *_iCompare)
|
|
|
: CThorExpandingRowArray(_activity, rowIf, ers_allow),
|
|
|
- activity(_activity), iRowHash(_iRowHash), iKeyHash(_iKeyHash), iCompare(_iCompare)
|
|
|
+ activity(_activity), iKeyHash(_iKeyHash), iCompare(_iCompare)
|
|
|
{
|
|
|
htMax = htElements = 0;
|
|
|
owner = NULL;
|
|
@@ -3350,8 +3345,8 @@ void CHashTableRowTable::rehash(const void **newRows)
|
|
|
|
|
|
//
|
|
|
|
|
|
-CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, IHash *_iRowHash, IHash *_iKeyHash, ICompare *_iCompare, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows)
|
|
|
- : owner(_owner), rowIf(_rowIf), keyIf(_keyIf), iRowHash(_iRowHash), iKeyHash(_iKeyHash), iCompare(_iCompare), extractKey(_extractKey), bucketN(_bucketN), htRows(_htRows),
|
|
|
+CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows)
|
|
|
+ : owner(_owner), keyIf(_keyIf), extractKey(_extractKey), bucketN(_bucketN), htRows(_htRows),
|
|
|
rowSpill(owner, _rowIf, "rows", _bucketN), keySpill(owner, _keyIf, "keys", _bucketN)
|
|
|
|
|
|
{
|
|
@@ -3688,7 +3683,7 @@ void CBucketHandler::init(unsigned _numBuckets, IRowStream *keyStream)
|
|
|
for (unsigned i=0; i<numBuckets; i++)
|
|
|
{
|
|
|
CHashTableRowTable &htRows = owner.queryHashTable(i);
|
|
|
- buckets[i] = new CBucket(owner, rowIf, keyIf, iRowHash, iKeyHash, iCompare, extractKey, i, &htRows);
|
|
|
+ buckets[i] = new CBucket(owner, rowIf, keyIf, extractKey, i, &htRows);
|
|
|
htRows.setOwner(buckets[i]);
|
|
|
}
|
|
|
ActPrintLog(&owner, "Max %d buckets, current depth = %d", numBuckets, depth+1);
|