|
@@ -4316,6 +4316,7 @@ IRowStream *mergeLocalAggs(Owned<IHashDistributor> &distributor, CSlaveActivity
|
|
|
IHThorRowAggregator &helper;
|
|
|
IHashDistributor &distributor;
|
|
|
CSlaveActivity &activity;
|
|
|
+ bool stopped = false;
|
|
|
public:
|
|
|
CAggregatingStream(IHThorRowAggregator &_helper, IEngineRowAllocator &_rowAllocator, ICompare &_cmp, IHashDistributor &_distributor, CSlaveActivity &_activity)
|
|
|
: helper(_helper), rowAllocator(_rowAllocator), cmp(_cmp), distributor(_distributor), rowBuilder(_rowAllocator), activity(_activity)
|
|
@@ -4323,6 +4324,7 @@ IRowStream *mergeLocalAggs(Owned<IHashDistributor> &distributor, CSlaveActivity
|
|
|
}
|
|
|
void start(IRowStream *_input)
|
|
|
{
|
|
|
+ stopped = false;
|
|
|
input.setown(_input);
|
|
|
}
|
|
|
// IRowStream
|
|
@@ -4363,6 +4365,9 @@ IRowStream *mergeLocalAggs(Owned<IHashDistributor> &distributor, CSlaveActivity
|
|
|
}
|
|
|
virtual void stop() override
|
|
|
{
|
|
|
+ if (stopped)
|
|
|
+ return;
|
|
|
+ stopped = true;
|
|
|
sz = 0;
|
|
|
rowBuilder.clear();
|
|
|
input->stop();
|