|
@@ -94,7 +94,6 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
IStopInput *istop;
|
|
|
size32_t fixedEstSize;
|
|
|
Owned<IRowWriter> pipewr;
|
|
|
- roxiemem::IRowManager *rowManager;
|
|
|
Owned<ISmartRowBuffer> piperd;
|
|
|
|
|
|
protected:
|
|
@@ -1004,7 +1003,6 @@ public:
|
|
|
pullBufferSize = DISTRIBUTE_PULL_BUFFER_SIZE;
|
|
|
selfstopped = false;
|
|
|
pull = false;
|
|
|
- rowManager = activity->queryJobChannel().queryRowManager();
|
|
|
|
|
|
StringBuffer compType;
|
|
|
activity->getOpt(THOROPT_HDIST_COMP, compType);
|
|
@@ -2855,7 +2853,7 @@ public:
|
|
|
iCompare = helper->queryCompare();
|
|
|
|
|
|
// JCSMORE - really should ask / lookup what flags the allocator created for extractKey has...
|
|
|
- allocFlags = queryJobChannel().queryThorAllocator()->queryFlags();
|
|
|
+ allocFlags = queryJobChannel().queryThorAllocator().queryFlags();
|
|
|
|
|
|
// JCSMORE - it may not be worth extracting the key,
|
|
|
// if there's an upstream activity that holds onto rows for periods of time (e.g. sort)
|
|
@@ -2867,16 +2865,16 @@ public:
|
|
|
isVariable = km->isVariableSize();
|
|
|
if (!isVariable && helper->queryOutputMeta()->isFixedSize())
|
|
|
{
|
|
|
- roxiemem::IRowManager *rM = queryJobChannel().queryRowManager();
|
|
|
- memsize_t keySize = rM->getExpectedCapacity(km->getMinRecordSize(), allocFlags);
|
|
|
- memsize_t rowSize = rM->getExpectedCapacity(helper->queryOutputMeta()->getMinRecordSize(), allocFlags);
|
|
|
+ roxiemem::IRowManager &rM = queryRowManager();
|
|
|
+ memsize_t keySize = rM.getExpectedCapacity(km->getMinRecordSize(), allocFlags);
|
|
|
+ memsize_t rowSize = rM.getExpectedCapacity(helper->queryOutputMeta()->getMinRecordSize(), allocFlags);
|
|
|
if (keySize >= rowSize)
|
|
|
extractKey = false;
|
|
|
}
|
|
|
}
|
|
|
if (extractKey)
|
|
|
{
|
|
|
- _keyRowInterfaces.setown(createRowInterfaces(km, queryActivityId(), queryCodeContext()));
|
|
|
+ _keyRowInterfaces.setown(createRowInterfaces(km, queryId(), queryCodeContext()));
|
|
|
keyRowInterfaces = _keyRowInterfaces;
|
|
|
rowKeyCompare = helper->queryRowKeyCompare();
|
|
|
iKeyHash = helper->queryKeyHash();
|
|
@@ -3023,7 +3021,7 @@ CHashTableRowTable::CHashTableRowTable(HashDedupSlaveActivityBase &_activity, IR
|
|
|
void CHashTableRowTable::init(rowidx_t sz)
|
|
|
{
|
|
|
// reinitialize if need bigger or if requested size is much smaller than existing
|
|
|
- rowidx_t newMaxRows = activity.queryJobChannel().queryRowManager()->getExpectedCapacity(sz * sizeof(rowidx_t *), activity.allocFlags) / sizeof(rowidx_t *);
|
|
|
+ rowidx_t newMaxRows = activity.queryRowManager().getExpectedCapacity(sz * sizeof(rowidx_t *), activity.allocFlags) / sizeof(rowidx_t *);
|
|
|
if (newMaxRows <= maxRows && ((maxRows-newMaxRows) <= HASHDEDUP_HT_INC_SIZE))
|
|
|
return;
|
|
|
ReleaseThorRow(rows);
|
|
@@ -3087,7 +3085,7 @@ CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IRowInterfaces *_rowIf, IRo
|
|
|
*/
|
|
|
if (extractKey)
|
|
|
{
|
|
|
- _keyAllocator.setown(owner.queryJobChannel().getRowAllocator(keyIf->queryRowMetaData(), owner.queryActivityId(), owner.allocFlags));
|
|
|
+ _keyAllocator.setown(owner.getRowAllocator(keyIf->queryRowMetaData(), owner.allocFlags));
|
|
|
keyAllocator = _keyAllocator;
|
|
|
}
|
|
|
else
|
|
@@ -3277,16 +3275,16 @@ CBucketHandler::CBucketHandler(HashDedupSlaveActivityBase &_owner, IRowInterface
|
|
|
|
|
|
CBucketHandler::~CBucketHandler()
|
|
|
{
|
|
|
- owner.queryJobChannel().queryRowManager()->removeRowBuffer(this);
|
|
|
- owner.queryJobChannel().queryRowManager()->removeRowBuffer(&postSpillFlush);
|
|
|
+ owner.queryRowManager().removeRowBuffer(this);
|
|
|
+ owner.queryRowManager().removeRowBuffer(&postSpillFlush);
|
|
|
for (unsigned i=0; i<numBuckets; i++)
|
|
|
::Release(buckets[i]);
|
|
|
}
|
|
|
|
|
|
void CBucketHandler::flushBuckets()
|
|
|
{
|
|
|
- owner.queryJobChannel().queryRowManager()->removeRowBuffer(this);
|
|
|
- owner.queryJobChannel().queryRowManager()->removeRowBuffer(&postSpillFlush);
|
|
|
+ owner.queryRowManager().removeRowBuffer(this);
|
|
|
+ owner.queryRowManager().removeRowBuffer(&postSpillFlush);
|
|
|
for (unsigned i=0; i<numBuckets; i++)
|
|
|
{
|
|
|
CBucket &bucket = *buckets[i];
|
|
@@ -3335,11 +3333,11 @@ unsigned CBucketHandler::getBucketEstimate(rowcount_t totalRows) const
|
|
|
// likely to be way off for variable
|
|
|
|
|
|
// JCSMORE - will need to change based on whether upstream keeps packed or not.
|
|
|
- roxiemem::IRowManager *rM = owner.queryJobChannel().queryRowManager();
|
|
|
+ roxiemem::IRowManager &rM = owner.queryRowManager();
|
|
|
|
|
|
memsize_t availMem = roxiemem::getTotalMemoryLimit()-0x500000;
|
|
|
- memsize_t initKeySize = rM->getExpectedCapacity(keyIf->queryRowMetaData()->getMinRecordSize(), owner.allocFlags);
|
|
|
- memsize_t minBucketSpace = retBuckets * rM->getExpectedCapacity(HASHDEDUP_HT_BUCKET_SIZE * sizeof(void *), owner.allocFlags);
|
|
|
+ memsize_t initKeySize = rM.getExpectedCapacity(keyIf->queryRowMetaData()->getMinRecordSize(), owner.allocFlags);
|
|
|
+ memsize_t minBucketSpace = retBuckets * rM.getExpectedCapacity(HASHDEDUP_HT_BUCKET_SIZE * sizeof(void *), owner.allocFlags);
|
|
|
|
|
|
rowcount_t _maxRowGuess = (availMem-minBucketSpace) / initKeySize; // without taking into account ht space / other overheads
|
|
|
rowidx_t maxRowGuess;
|
|
@@ -3347,7 +3345,7 @@ unsigned CBucketHandler::getBucketEstimate(rowcount_t totalRows) const
|
|
|
maxRowGuess = (rowidx_t)RCIDXMAX/sizeof(void *);
|
|
|
else
|
|
|
maxRowGuess = (rowidx_t)_maxRowGuess;
|
|
|
- memsize_t bucketSpace = retBuckets * rM->getExpectedCapacity(((maxRowGuess+retBuckets-1)/retBuckets) * sizeof(void *), owner.allocFlags);
|
|
|
+ memsize_t bucketSpace = retBuckets * rM.getExpectedCapacity(((maxRowGuess+retBuckets-1)/retBuckets) * sizeof(void *), owner.allocFlags);
|
|
|
// now rebase maxRowGuess
|
|
|
_maxRowGuess = (availMem-bucketSpace) / initKeySize;
|
|
|
if (_maxRowGuess >= RCIDXMAX/sizeof(void *))
|
|
@@ -3382,9 +3380,9 @@ void CBucketHandler::init(unsigned _numBuckets, IRowStream *keyStream)
|
|
|
htRows.setOwner(buckets[i]);
|
|
|
}
|
|
|
ActPrintLog(&owner, "Max %d buckets, current depth = %d", numBuckets, depth+1);
|
|
|
- owner.queryJobChannel().queryRowManager()->addRowBuffer(this);
|
|
|
+ owner.queryRowManager().addRowBuffer(this);
|
|
|
// postSpillFlush not needed until after 1 spill event, but not safe to add within callback
|
|
|
- owner.queryJobChannel().queryRowManager()->addRowBuffer(&postSpillFlush);
|
|
|
+ owner.queryRowManager().addRowBuffer(&postSpillFlush);
|
|
|
if (keyStream)
|
|
|
{
|
|
|
loop
|
|
@@ -3772,7 +3770,7 @@ RowAggregator *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivityBas
|
|
|
virtual void stop() { }
|
|
|
};
|
|
|
Owned<IOutputMetaData> nodeRowMeta = createOutputMetaDataWithChildRow(activity.queryRowAllocator(), sizeof(size32_t));
|
|
|
- Owned<IRowInterfaces> nodeRowMetaRowIf = createRowInterfaces(nodeRowMeta, activity.queryActivityId(), activity.queryCodeContext());
|
|
|
+ Owned<IRowInterfaces> nodeRowMetaRowIf = createRowInterfaces(nodeRowMeta, activity.queryId(), activity.queryCodeContext());
|
|
|
Owned<IRowStream> localAggregatedStream = new CRowAggregatedStream(activity, nodeRowMetaRowIf, localAggTable);
|
|
|
class CNodeCompare : implements ICompare, implements IHash
|
|
|
{
|