|
@@ -2100,7 +2100,7 @@ public:
|
|
|
}
|
|
|
virtual void start() override
|
|
|
{
|
|
|
- ActivityTimer s(totalCycles, timeActivities);
|
|
|
+ ActivityTimer s(slaveTimerStats, timeActivities);
|
|
|
PARENT::start();
|
|
|
eofin = false;
|
|
|
instrm.set(inputStream);
|
|
@@ -2136,7 +2136,7 @@ public:
|
|
|
}
|
|
|
CATCH_NEXTROW()
|
|
|
{
|
|
|
- ActivityTimer t(totalCycles, timeActivities); // careful not to call again in derivatives
|
|
|
+ ActivityTimer t(slaveTimerStats, timeActivities); // careful not to call again in derivatives
|
|
|
if (abortSoon||eofin)
|
|
|
{
|
|
|
eofin = true;
|
|
@@ -2440,7 +2440,7 @@ public:
|
|
|
}
|
|
|
virtual void start() override
|
|
|
{
|
|
|
- ActivityTimer s(totalCycles, timeActivities);
|
|
|
+ ActivityTimer s(slaveTimerStats, timeActivities);
|
|
|
PARENT::start();
|
|
|
bool passthrough;
|
|
|
Owned<IRowStream> calcStream = partitioner->calc(this, input, inputStream, passthrough); // may return NULL
|
|
@@ -3115,7 +3115,7 @@ public:
|
|
|
}
|
|
|
virtual void start() override
|
|
|
{
|
|
|
- ActivityTimer s(totalCycles, timeActivities);
|
|
|
+ ActivityTimer s(slaveTimerStats, timeActivities);
|
|
|
PARENT::start();
|
|
|
eos = lastEog = false;
|
|
|
ThorDataLinkMetaInfo info;
|
|
@@ -3150,7 +3150,7 @@ public:
|
|
|
}
|
|
|
CATCH_NEXTROW()
|
|
|
{
|
|
|
- ActivityTimer t(totalCycles, timeActivities);
|
|
|
+ ActivityTimer t(slaveTimerStats, timeActivities);
|
|
|
if (eos)
|
|
|
return NULL;
|
|
|
|
|
@@ -3789,7 +3789,7 @@ public:
|
|
|
virtual void start() override
|
|
|
{
|
|
|
HashDedupSlaveActivityBase::start();
|
|
|
- ActivityTimer s(totalCycles, timeActivities);
|
|
|
+ ActivityTimer s(slaveTimerStats, timeActivities);
|
|
|
Owned<IThorRowInterfaces> myRowIf = getRowInterfaces(); // avoiding circular link issues
|
|
|
instrm.setown(distributor->connect(myRowIf, distInput, iHash, iCompare, keepBestCompare));
|
|
|
distInput = instrm.get();
|
|
@@ -3883,7 +3883,7 @@ public:
|
|
|
}
|
|
|
virtual void start()
|
|
|
{
|
|
|
- ActivityTimer s(totalCycles, timeActivities);
|
|
|
+ ActivityTimer s(slaveTimerStats, timeActivities);
|
|
|
startAllInputs();
|
|
|
leftdone = false;
|
|
|
eof = false;
|
|
@@ -3990,7 +3990,7 @@ public:
|
|
|
}
|
|
|
CATCH_NEXTROW()
|
|
|
{
|
|
|
- ActivityTimer t(totalCycles, timeActivities);
|
|
|
+ ActivityTimer t(slaveTimerStats, timeActivities);
|
|
|
if (!eof) {
|
|
|
OwnedConstThorRow row = joinhelper->nextRow();
|
|
|
if (row) {
|
|
@@ -4293,7 +4293,7 @@ IAggregateTable *createRowAggregator(CActivityBase &activity, IHThorHashAggregat
|
|
|
return new CAggregateHT(activity, extra, helper);
|
|
|
}
|
|
|
|
|
|
-IRowStream *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivityBase &activity, IHThorRowAggregator &helper, IHThorHashAggregateExtra &helperExtra, IRowStream *localAggStream, mptag_t mptag)
|
|
|
+IRowStream *mergeLocalAggs(Owned<IHashDistributor> &distributor, CSlaveActivity &activity, IHThorRowAggregator &helper, IHThorHashAggregateExtra &helperExtra, IRowStream *localAggStream, mptag_t mptag)
|
|
|
{
|
|
|
Owned<IRowStream> strm;
|
|
|
ICompare *elementComparer = helperExtra.queryCompareElements();
|
|
@@ -4313,9 +4313,10 @@ IRowStream *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivityBase &
|
|
|
ICompare &cmp;
|
|
|
IHThorRowAggregator &helper;
|
|
|
IHashDistributor &distributor;
|
|
|
+ CSlaveActivity &activity;
|
|
|
public:
|
|
|
- CAggregatingStream(IHThorRowAggregator &_helper, IEngineRowAllocator &_rowAllocator, ICompare &_cmp, IHashDistributor &_distributor)
|
|
|
- : helper(_helper), rowAllocator(_rowAllocator), cmp(_cmp), distributor(_distributor), rowBuilder(_rowAllocator)
|
|
|
+ CAggregatingStream(IHThorRowAggregator &_helper, IEngineRowAllocator &_rowAllocator, ICompare &_cmp, IHashDistributor &_distributor, CSlaveActivity &_activity)
|
|
|
+ : helper(_helper), rowAllocator(_rowAllocator), cmp(_cmp), distributor(_distributor), rowBuilder(_rowAllocator), activity(_activity)
|
|
|
{
|
|
|
}
|
|
|
void start(IRowStream *_input)
|
|
@@ -4327,7 +4328,11 @@ IRowStream *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivityBase &
|
|
|
{
|
|
|
for (;;)
|
|
|
{
|
|
|
- OwnedConstThorRow row = input->nextRow();
|
|
|
+ OwnedConstThorRow row;
|
|
|
+ {
|
|
|
+ BlockedActivityTimer t(activity.slaveTimerStats, activity.queryTimeActivities());
|
|
|
+ row.setown(input->nextRow());
|
|
|
+ }
|
|
|
if (!row)
|
|
|
{
|
|
|
if (sz)
|
|
@@ -4360,11 +4365,12 @@ IRowStream *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivityBase &
|
|
|
rowBuilder.clear();
|
|
|
input->stop();
|
|
|
input.clear();
|
|
|
+ BlockedActivityTimer t(activity.slaveTimerStats, activity.queryTimeActivities());
|
|
|
distributor.disconnect(true);
|
|
|
distributor.join();
|
|
|
}
|
|
|
};
|
|
|
- CAggregatingStream *mergeStrm = new CAggregatingStream(helper, *rowAllocator, *elementComparer, *distributor.get());
|
|
|
+ CAggregatingStream *mergeStrm = new CAggregatingStream(helper, *rowAllocator, *elementComparer, *distributor.get(), activity);
|
|
|
mergeStrm->start(strm.getClear());
|
|
|
return mergeStrm;
|
|
|
}
|
|
@@ -4435,7 +4441,7 @@ public:
|
|
|
}
|
|
|
virtual void start() override
|
|
|
{
|
|
|
- ActivityTimer s(totalCycles, timeActivities);
|
|
|
+ ActivityTimer s(slaveTimerStats, timeActivities);
|
|
|
PARENT::start();
|
|
|
doNextGroup(); // or local set if !grouped
|
|
|
if (!container.queryGrouped())
|
|
@@ -4469,7 +4475,7 @@ public:
|
|
|
}
|
|
|
CATCH_NEXTROW()
|
|
|
{
|
|
|
- ActivityTimer t(totalCycles, timeActivities);
|
|
|
+ ActivityTimer t(slaveTimerStats, timeActivities);
|
|
|
if (eos) return nullptr;
|
|
|
const void *next = aggregateStream->nextRow();
|
|
|
if (next)
|
|
@@ -4524,12 +4530,12 @@ public:
|
|
|
}
|
|
|
virtual void start() override
|
|
|
{
|
|
|
- ActivityTimer s(totalCycles, timeActivities);
|
|
|
+ ActivityTimer s(slaveTimerStats, timeActivities);
|
|
|
PARENT::start();
|
|
|
}
|
|
|
CATCH_NEXTROW()
|
|
|
{
|
|
|
- ActivityTimer t(totalCycles, timeActivities);
|
|
|
+ ActivityTimer t(slaveTimerStats, timeActivities);
|
|
|
OwnedConstThorRow row = inputStream->ungroupedNextRow();
|
|
|
if (!row)
|
|
|
return NULL;
|