|
@@ -2101,7 +2101,7 @@ public:
|
|
|
}
|
|
|
virtual void start() override
|
|
|
{
|
|
|
- ActivityTimer s(totalCycles, timeActivities);
|
|
|
+ ActivityTimer s(slaveTimerStats, timeActivities);
|
|
|
PARENT::start();
|
|
|
eofin = false;
|
|
|
instrm.set(inputStream);
|
|
@@ -2137,7 +2137,7 @@ public:
|
|
|
}
|
|
|
CATCH_NEXTROW()
|
|
|
{
|
|
|
- ActivityTimer t(totalCycles, timeActivities); // careful not to call again in derivatives
|
|
|
+ ActivityTimer t(slaveTimerStats, timeActivities); // careful not to call again in derivatives
|
|
|
if (abortSoon||eofin)
|
|
|
{
|
|
|
eofin = true;
|
|
@@ -2442,7 +2442,7 @@ public:
|
|
|
}
|
|
|
virtual void start() override
|
|
|
{
|
|
|
- ActivityTimer s(totalCycles, timeActivities);
|
|
|
+ ActivityTimer s(slaveTimerStats, timeActivities);
|
|
|
PARENT::start();
|
|
|
bool passthrough;
|
|
|
Owned<IRowStream> calcStream = partitioner->calc(this, input, inputStream, passthrough); // may return NULL
|
|
@@ -3120,7 +3120,7 @@ public:
|
|
|
}
|
|
|
virtual void start() override
|
|
|
{
|
|
|
- ActivityTimer s(totalCycles, timeActivities);
|
|
|
+ ActivityTimer s(slaveTimerStats, timeActivities);
|
|
|
PARENT::start();
|
|
|
eos = lastEog = false;
|
|
|
ThorDataLinkMetaInfo info;
|
|
@@ -3155,7 +3155,7 @@ public:
|
|
|
}
|
|
|
CATCH_NEXTROW()
|
|
|
{
|
|
|
- ActivityTimer t(totalCycles, timeActivities);
|
|
|
+ ActivityTimer t(slaveTimerStats, timeActivities);
|
|
|
if (eos)
|
|
|
return NULL;
|
|
|
|
|
@@ -3794,7 +3794,7 @@ public:
|
|
|
virtual void start() override
|
|
|
{
|
|
|
HashDedupSlaveActivityBase::start();
|
|
|
- ActivityTimer s(totalCycles, timeActivities);
|
|
|
+ ActivityTimer s(slaveTimerStats, timeActivities);
|
|
|
Owned<IThorRowInterfaces> myRowIf = getRowInterfaces(); // avoiding circular link issues
|
|
|
instrm.setown(distributor->connect(myRowIf, distInput, iHash, iCompare, keepBestCompare));
|
|
|
distInput = instrm.get();
|
|
@@ -3890,7 +3890,7 @@ public:
|
|
|
}
|
|
|
virtual void start()
|
|
|
{
|
|
|
- ActivityTimer s(totalCycles, timeActivities);
|
|
|
+ ActivityTimer s(slaveTimerStats, timeActivities);
|
|
|
startAllInputs();
|
|
|
leftdone = false;
|
|
|
eof = false;
|
|
@@ -3997,7 +3997,7 @@ public:
|
|
|
}
|
|
|
CATCH_NEXTROW()
|
|
|
{
|
|
|
- ActivityTimer t(totalCycles, timeActivities);
|
|
|
+ ActivityTimer t(slaveTimerStats, timeActivities);
|
|
|
if (!eof) {
|
|
|
OwnedConstThorRow row = joinhelper->nextRow();
|
|
|
if (row) {
|
|
@@ -4300,7 +4300,7 @@ IAggregateTable *createRowAggregator(CActivityBase &activity, IHThorHashAggregat
|
|
|
return new CAggregateHT(activity, extra, helper);
|
|
|
}
|
|
|
|
|
|
-IRowStream *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivityBase &activity, IHThorRowAggregator &helper, IHThorHashAggregateExtra &helperExtra, IRowStream *localAggStream, mptag_t mptag)
|
|
|
+IRowStream *mergeLocalAggs(Owned<IHashDistributor> &distributor, CSlaveActivity &activity, IHThorRowAggregator &helper, IHThorHashAggregateExtra &helperExtra, IRowStream *localAggStream, mptag_t mptag)
|
|
|
{
|
|
|
Owned<IRowStream> strm;
|
|
|
ICompare *elementComparer = helperExtra.queryCompareElements();
|
|
@@ -4320,9 +4320,10 @@ IRowStream *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivityBase &
|
|
|
ICompare &cmp;
|
|
|
IHThorRowAggregator &helper;
|
|
|
IHashDistributor &distributor;
|
|
|
+ CSlaveActivity &activity;
|
|
|
public:
|
|
|
- CAggregatingStream(IHThorRowAggregator &_helper, IEngineRowAllocator &_rowAllocator, ICompare &_cmp, IHashDistributor &_distributor)
|
|
|
- : helper(_helper), rowAllocator(_rowAllocator), cmp(_cmp), distributor(_distributor), rowBuilder(_rowAllocator)
|
|
|
+ CAggregatingStream(IHThorRowAggregator &_helper, IEngineRowAllocator &_rowAllocator, ICompare &_cmp, IHashDistributor &_distributor, CSlaveActivity &_activity)
|
|
|
+ : helper(_helper), rowAllocator(_rowAllocator), cmp(_cmp), distributor(_distributor), rowBuilder(_rowAllocator), activity(_activity)
|
|
|
{
|
|
|
}
|
|
|
void start(IRowStream *_input)
|
|
@@ -4334,7 +4335,11 @@ IRowStream *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivityBase &
|
|
|
{
|
|
|
for (;;)
|
|
|
{
|
|
|
- OwnedConstThorRow row = input->nextRow();
|
|
|
+ OwnedConstThorRow row;
|
|
|
+ {
|
|
|
+ BlockedActivityTimer t(activity.slaveTimerStats, activity.queryTimeActivities());
|
|
|
+ row.setown(input->nextRow());
|
|
|
+ }
|
|
|
if (!row)
|
|
|
{
|
|
|
if (sz)
|
|
@@ -4367,11 +4372,12 @@ IRowStream *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivityBase &
|
|
|
rowBuilder.clear();
|
|
|
input->stop();
|
|
|
input.clear();
|
|
|
+ BlockedActivityTimer t(activity.slaveTimerStats, activity.queryTimeActivities());
|
|
|
distributor.disconnect(true);
|
|
|
distributor.join();
|
|
|
}
|
|
|
};
|
|
|
- CAggregatingStream *mergeStrm = new CAggregatingStream(helper, *rowAllocator, *elementComparer, *distributor.get());
|
|
|
+ CAggregatingStream *mergeStrm = new CAggregatingStream(helper, *rowAllocator, *elementComparer, *distributor.get(), activity);
|
|
|
mergeStrm->start(strm.getClear());
|
|
|
return mergeStrm;
|
|
|
}
|
|
@@ -4442,7 +4448,7 @@ public:
|
|
|
}
|
|
|
virtual void start() override
|
|
|
{
|
|
|
- ActivityTimer s(totalCycles, timeActivities);
|
|
|
+ ActivityTimer s(slaveTimerStats, timeActivities);
|
|
|
PARENT::start();
|
|
|
doNextGroup(); // or local set if !grouped
|
|
|
if (!container.queryGrouped())
|
|
@@ -4476,7 +4482,7 @@ public:
|
|
|
}
|
|
|
CATCH_NEXTROW()
|
|
|
{
|
|
|
- ActivityTimer t(totalCycles, timeActivities);
|
|
|
+ ActivityTimer t(slaveTimerStats, timeActivities);
|
|
|
if (eos) return nullptr;
|
|
|
const void *next = aggregateStream->nextRow();
|
|
|
if (next)
|
|
@@ -4531,12 +4537,12 @@ public:
|
|
|
}
|
|
|
virtual void start() override
|
|
|
{
|
|
|
- ActivityTimer s(totalCycles, timeActivities);
|
|
|
+ ActivityTimer s(slaveTimerStats, timeActivities);
|
|
|
PARENT::start();
|
|
|
}
|
|
|
CATCH_NEXTROW()
|
|
|
{
|
|
|
- ActivityTimer t(totalCycles, timeActivities);
|
|
|
+ ActivityTimer t(slaveTimerStats, timeActivities);
|
|
|
OwnedConstThorRow row = inputStream->ungroupedNextRow();
|
|
|
if (!row)
|
|
|
return NULL;
|