|
@@ -210,9 +210,9 @@ public:
|
|
|
{
|
|
|
ctx->mergeStats(from);
|
|
|
}
|
|
|
- virtual const CRuntimeStatisticCollection &queryStats() const
|
|
|
+ virtual void gatherStats(CRuntimeStatisticCollection & merged) const override
|
|
|
{
|
|
|
- return ctx->queryStats();
|
|
|
+ ctx->gatherStats(merged);
|
|
|
}
|
|
|
virtual void CTXLOGva(const char *format, va_list args) const __attribute__((format(printf,2,0)))
|
|
|
{
|
|
@@ -282,9 +282,9 @@ public:
|
|
|
{
|
|
|
ctx->noteProcessed(subgraphId, activityId, _idx, _processed, _strands);
|
|
|
}
|
|
|
- virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, unsigned subgraphId, unsigned activityId, const ActivityTimeAccumulator &_totalCycles, cycle_t _localCycles) const
|
|
|
+ virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, unsigned subgraphId, unsigned activityId) const
|
|
|
{
|
|
|
- ctx->mergeActivityStats(fromStats, subgraphId, activityId, _totalCycles, _localCycles);
|
|
|
+ ctx->mergeActivityStats(fromStats, subgraphId, activityId);
|
|
|
}
|
|
|
virtual IProbeManager *queryProbeManager() const
|
|
|
{
|
|
@@ -350,7 +350,8 @@ protected:
|
|
|
|
|
|
static const StatisticsMapping actStatistics(StWhenFirstRow, StTimeElapsed, StTimeLocalExecute, StTimeTotalExecute, StSizeMaxRowSize,
|
|
|
StNumRowsProcessed, StNumSlaves, StNumStarted, StNumStopped, StNumStrands,
|
|
|
- StNumScansPerRow, StNumAllocations, StNumAllocationScans, StKindNone);
|
|
|
+ StNumScansPerRow, StNumAllocations, StNumAllocationScans,
|
|
|
+ StTimeFirstExecute, StCycleLocalExecuteCycles, StCycleTotalExecuteCycles, StKindNone);
|
|
|
static const StatisticsMapping joinStatistics(&actStatistics, StNumAtmostTriggered, StKindNone);
|
|
|
static const StatisticsMapping keyedJoinStatistics(&joinStatistics, StNumServerCacheHits, StNumIndexSeeks, StNumIndexScans, StNumIndexWildSeeks,
|
|
|
StNumIndexSkips, StNumIndexNullSkips, StNumIndexMerges, StNumIndexMergeCompares,
|
|
@@ -388,11 +389,8 @@ protected:
|
|
|
bool optUnordered = false; // is the output specified as unordered?
|
|
|
unsigned heapFlags;
|
|
|
|
|
|
- mutable CriticalSection statsCrit;
|
|
|
mutable __int64 processed;
|
|
|
mutable __int64 started;
|
|
|
- mutable __int64 totalCycles;
|
|
|
- mutable __int64 localCycles;
|
|
|
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
@@ -402,8 +400,6 @@ public:
|
|
|
{
|
|
|
processed = 0;
|
|
|
started = 0;
|
|
|
- totalCycles = 0;
|
|
|
- localCycles = 0;
|
|
|
dependentCount = 0;
|
|
|
optParallel = _graphNode.getPropInt("att[@name='parallel']/@value", 0);
|
|
|
optUnordered = !_graphNode.getPropBool("att[@name='ordered']/@value", true);
|
|
@@ -490,18 +486,6 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, const ActivityTimeAccumulator &_totalCycles, cycle_t _localCycles) const
|
|
|
- {
|
|
|
- if (_totalCycles.totalCycles || _localCycles)
|
|
|
- {
|
|
|
- CriticalBlock b(statsCrit);
|
|
|
- dbgassertex(_totalCycles.totalCycles >= _localCycles);
|
|
|
- totalCycles += _totalCycles.totalCycles;
|
|
|
- localCycles += _localCycles;
|
|
|
- }
|
|
|
- CActivityFactory::mergeStats(fromStats);
|
|
|
- }
|
|
|
-
|
|
|
virtual void noteStarted() const
|
|
|
{
|
|
|
CriticalBlock b(statsCrit);
|
|
@@ -532,10 +516,13 @@ public:
|
|
|
CriticalBlock b(statsCrit);
|
|
|
if (started)
|
|
|
putStatsValue(&node, "_roxieStarted", "sum", started);
|
|
|
- if (totalCycles)
|
|
|
- putStatsValue(&node, "totalTime", "sum", (unsigned) (cycle_to_nanosec(totalCycles)/1000));
|
|
|
- if (localCycles)
|
|
|
- putStatsValue(&node, "localTime", "sum", (unsigned) (cycle_to_nanosec(localCycles)/1000));
|
|
|
+ //MORE: The following is information is now duplicated
|
|
|
+ unsigned __int64 totalNs = mystats.getSerialStatisticValue(StTimeTotalExecute);
|
|
|
+ unsigned __int64 localNs = mystats.getSerialStatisticValue(StTimeLocalExecute);
|
|
|
+ if (totalNs)
|
|
|
+ putStatsValue(&node, "totalTime", "sum", (unsigned) (totalNs/1000));
|
|
|
+ if (localNs)
|
|
|
+ putStatsValue(&node, "localTime", "sum", (unsigned) (localNs/1000));
|
|
|
}
|
|
|
|
|
|
virtual void resetNodeProgressInfo()
|
|
@@ -544,22 +531,19 @@ public:
|
|
|
CriticalBlock b(statsCrit);
|
|
|
processed = 0;
|
|
|
started = 0;
|
|
|
- totalCycles = 0;
|
|
|
- localCycles = 0;
|
|
|
}
|
|
|
virtual void getActivityMetrics(StringBuffer &reply) const
|
|
|
{
|
|
|
CActivityFactory::getActivityMetrics(reply);
|
|
|
CriticalBlock b(statsCrit);
|
|
|
putStatsValue(reply, "_roxieStarted", "sum", started);
|
|
|
- putStatsValue(reply, "totalTime", "sum", (unsigned) (cycle_to_nanosec(totalCycles)/1000));
|
|
|
- putStatsValue(reply, "localTime", "sum", (unsigned) (cycle_to_nanosec(localCycles)/1000));
|
|
|
+ putStatsValue(reply, "totalTime", "sum", (unsigned) (mystats.getSerialStatisticValue(StTimeTotalExecute)/1000));
|
|
|
+ putStatsValue(reply, "localTime", "sum", (unsigned) (mystats.getSerialStatisticValue(StTimeLocalExecute)/1000));
|
|
|
}
|
|
|
- virtual unsigned __int64 queryLocalCycles() const
|
|
|
+ virtual unsigned __int64 queryLocalTimeNs() const
|
|
|
{
|
|
|
- return localCycles;
|
|
|
+ return mystats.getSerialStatisticValue(StTimeLocalExecute);
|
|
|
}
|
|
|
-
|
|
|
virtual IQueryFactory &queryQueryFactory() const
|
|
|
{
|
|
|
return CActivityFactory::queryQueryFactory();
|
|
@@ -936,7 +920,6 @@ protected:
|
|
|
MapStringToMyClass<ThorSectionTimer> functionTimers;
|
|
|
unsigned processed;
|
|
|
ActivityTimeAccumulator totalCycles;
|
|
|
- cycle_t localCycles;
|
|
|
unsigned activityId;
|
|
|
activityState state;
|
|
|
bool createPending;
|
|
@@ -962,7 +945,6 @@ public:
|
|
|
inputStream = NULL;
|
|
|
meta.set(basehelper.queryOutputMeta());
|
|
|
processed = 0;
|
|
|
- localCycles = 0;
|
|
|
state=STATEreset;
|
|
|
rowAllocator = NULL;
|
|
|
debugging = _probeManager != NULL; // Don't want to collect timing stats from debug sessions
|
|
@@ -981,7 +963,6 @@ public:
|
|
|
ctx = NULL;
|
|
|
meta.set(basehelper.queryOutputMeta());
|
|
|
processed = 0;
|
|
|
- localCycles = 0;
|
|
|
state=STATEreset;
|
|
|
rowAllocator = NULL;
|
|
|
debugging = false;
|
|
@@ -993,6 +974,12 @@ public:
|
|
|
|
|
|
~CRoxieServerActivity()
|
|
|
{
|
|
|
+ basehelper.Release();
|
|
|
+ ::Release(rowAllocator);
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void beforeDispose() override
|
|
|
+ {
|
|
|
if (traceStartStop)
|
|
|
{
|
|
|
// There was an old comment here stating // Note- CTXLOG may not be safe
|
|
@@ -1010,20 +997,25 @@ public:
|
|
|
DBGLOG("STATE: Activity %d destroyed but not reset", activityId);
|
|
|
state = STATEreset; // bit pointless but there you go...
|
|
|
}
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void updateFactoryStatistics() override
|
|
|
+ {
|
|
|
+ CRuntimeStatisticCollection mergedStats(stats.queryMapping());
|
|
|
+ gatherStats(mergedStats);
|
|
|
+
|
|
|
if (factory && !debugging)
|
|
|
{
|
|
|
if (processed)
|
|
|
factory->noteProcessed(0, processed);
|
|
|
- factory->mergeActivityStats(stats, totalCycles, localCycles);
|
|
|
+ factory->mergeStats(mergedStats);
|
|
|
}
|
|
|
if (ctx && factory)
|
|
|
{
|
|
|
if (processed)
|
|
|
ctx->noteProcessed(factory->querySubgraphId(), activityId, 0, processed, 0);
|
|
|
- ctx->mergeActivityStats(stats, factory->querySubgraphId(), activityId, totalCycles, localCycles);
|
|
|
+ ctx->mergeActivityStats(mergedStats, factory->querySubgraphId(), activityId);
|
|
|
}
|
|
|
- basehelper.Release();
|
|
|
- ::Release(rowAllocator);
|
|
|
}
|
|
|
|
|
|
virtual const IRoxieContextLogger &queryLogCtx()const
|
|
@@ -1063,12 +1055,11 @@ public:
|
|
|
#endif
|
|
|
return nullptr;
|
|
|
}
|
|
|
- void mergeStrandStats(unsigned strandProcessed, const ActivityTimeAccumulator & strandCycles, const CRuntimeStatisticCollection & strandStats)
|
|
|
+ void mergeStrandStats(unsigned strandProcessed, const ActivityTimeAccumulator & strandCycles)
|
|
|
{
|
|
|
CriticalBlock cb(statscrit);
|
|
|
processed += strandProcessed;
|
|
|
totalCycles.merge(strandCycles);
|
|
|
- stats.merge(strandStats);
|
|
|
}
|
|
|
inline void ensureRowAllocator()
|
|
|
{
|
|
@@ -1140,9 +1131,14 @@ public:
|
|
|
{
|
|
|
stats.merge(from);
|
|
|
}
|
|
|
- virtual const CRuntimeStatisticCollection &queryStats() const
|
|
|
+ virtual void gatherStats(CRuntimeStatisticCollection & merged) const override
|
|
|
{
|
|
|
- return stats;
|
|
|
+ merged.merge(stats);
|
|
|
+ if (rowAllocator)
|
|
|
+ rowAllocator->gatherStats(merged);
|
|
|
+
|
|
|
+ totalCycles.addStatistics(merged);
|
|
|
+ merged.mergeStatistic(StCycleLocalExecuteCycles, queryLocalCycles());
|
|
|
}
|
|
|
|
|
|
virtual StringBuffer &getLogPrefix(StringBuffer &ret) const
|
|
@@ -1340,11 +1336,6 @@ public:
|
|
|
}
|
|
|
if (inputStream)
|
|
|
inputStream->stop();
|
|
|
- if (rowAllocator)
|
|
|
- {
|
|
|
- stats.reset(heapStatistics);
|
|
|
- rowAllocator->gatherStats(stats);
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1383,7 +1374,6 @@ public:
|
|
|
resetJunction(junction);
|
|
|
ForEachItemIn(idx, dependencies)
|
|
|
dependencies.item(idx).reset();
|
|
|
- localCycles = queryLocalCycles(); // We can't call queryLocalCycles() in the destructor, so save the information here when we can.
|
|
|
if (input)
|
|
|
input->reset();
|
|
|
}
|
|
@@ -1681,7 +1671,6 @@ public:
|
|
|
processed = 0;
|
|
|
numProcessedLastGroup = 0;
|
|
|
totalCycles.reset();
|
|
|
- stats.reset();
|
|
|
}
|
|
|
virtual void stop()
|
|
|
{
|
|
@@ -1690,11 +1679,9 @@ public:
|
|
|
if (inputStream)
|
|
|
inputStream->stop();
|
|
|
|
|
|
- stats.reset(heapStatistics); // Heap stats are always gathered from scratch each time
|
|
|
- if (rowAllocator)
|
|
|
- rowAllocator->gatherStats(stats);
|
|
|
parent.stop();
|
|
|
- parent.mergeStrandStats(processed, totalCycles, stats);
|
|
|
+ //MORE: Move totalCycles (+processed?) to gatherStats()
|
|
|
+ parent.mergeStrandStats(processed, totalCycles);
|
|
|
}
|
|
|
stopped = true;
|
|
|
}
|
|
@@ -1709,6 +1696,14 @@ public:
|
|
|
}
|
|
|
inline void requestAbort() { abortRequested.store(true, std::memory_order_relaxed); }
|
|
|
inline bool isAborting() { return abortRequested.load(std::memory_order_relaxed); }
|
|
|
+
|
|
|
+ //MORE: processed and totalCycles should be included here, and not merged in stop()
|
|
|
+ void gatherStats(CRuntimeStatisticCollection & mergedStats) const
|
|
|
+ {
|
|
|
+ mergedStats.merge(stats);
|
|
|
+ if (rowAllocator)
|
|
|
+ rowAllocator->gatherStats(mergedStats);
|
|
|
+ }
|
|
|
};
|
|
|
|
|
|
class CRoxieServerStrandedActivity : public CRoxieServerActivity
|
|
@@ -1740,6 +1735,13 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ virtual void gatherStats(CRuntimeStatisticCollection & merged) const override
|
|
|
+ {
|
|
|
+ CRoxieServerActivity::gatherStats(merged);
|
|
|
+ ForEachItemIn(i, strands)
|
|
|
+ strands.item(i).gatherStats(merged);
|
|
|
+ }
|
|
|
+
|
|
|
virtual void onCreate(IHThorArg *_colocalArg)
|
|
|
{
|
|
|
CRoxieServerActivity::_onCreate(_colocalArg, strands.ordinality());
|
|
@@ -4758,7 +4760,6 @@ public:
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- ActivityTimeAccumulator dummy; // We could serialize from slave? Would get confusing though
|
|
|
CRuntimeStatisticCollection childStats(allStatistics);
|
|
|
childStats.deserialize(buf);
|
|
|
if (traceLevel > 5)
|
|
@@ -4766,7 +4767,7 @@ public:
|
|
|
StringBuffer s;
|
|
|
activity.queryLogCtx().CTXLOG("Processing ChildStats for child %d subgraph %d: %s", childId, graphId, childStats.toStr(s).str());
|
|
|
}
|
|
|
- activity.queryContext()->mergeActivityStats(childStats, graphId, childId, dummy, 0);
|
|
|
+ activity.queryContext()->mergeActivityStats(childStats, graphId, childId);
|
|
|
}
|
|
|
}
|
|
|
ReleaseRoxieRow(rowlen);
|
|
@@ -26930,6 +26931,8 @@ public:
|
|
|
|
|
|
~CActivityGraph()
|
|
|
{
|
|
|
+ ForEachItemIn(i, activities)
|
|
|
+ activities.item(i).updateFactoryStatistics();
|
|
|
if (probeManager)
|
|
|
probeManager->deleteGraph((IArrayOf<IActivityBase>*)&activities, (IArrayOf<IInputBase>*)&probes);
|
|
|
}
|
|
@@ -28063,7 +28066,7 @@ protected:
|
|
|
qsort(output, 2000, sizeof(output[0]), compareFunc);
|
|
|
testActivity(activity, input, output);
|
|
|
|
|
|
- unsigned __int64 us = cycle_to_nanosec(factory->queryLocalCycles()/1000);
|
|
|
+ unsigned __int64 us = factory->queryLocalTimeNs();
|
|
|
DBGLOG("Simple %s sorts: activity time %u.%u ms", type==2?"Heap" : (type==1 ? "Insertion" : "Quick"), (int)(us/1000), (int)(us%1000));
|
|
|
factory->resetNodeProgressInfo();
|
|
|
if (type)
|