Browse Source

Merge pull request #15466 from shamser/issue26670

HPCC-26670 Summary CostFileAccess for Thor workunits

Reviewed-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 3 years ago
parent
commit
442e276ad2

+ 55 - 2
common/workunit/workunit.cpp

@@ -2607,7 +2607,7 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu
         cost_type totalCost = 0;
         for (it->first(); it->isValid(); )
         {
-            stat_type value = 0.0;
+            stat_type value = 0;
             if (it->getStat(StCostExecute, value))
             {
                 totalCost += value;
@@ -2620,6 +2620,43 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu
     }
 }
 
+//aggregate disk costs from top-level subgraphs (when scope specified) or workflows (scope not specified)
+cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope)
+{
+    WuScopeFilter filter;
+    if (!isEmptyString(scope))
+        filter.addScope(scope);
+    else
+        filter.addScope("");      // Needed to match scope
+    // when scope is a workflow, sum graph costs (or subgraph cost when no graph cost) to get workflow cost
+    // (Costs from child graphs and activities should have been summed up to graph/subgraph level already)
+    // when isEmptyString(scope), sum workflow costs (or graph cost when no workflow cost) to get global cost
+    // (Costs from all levels below graph should be summed upto at least graph level already)
+    // i.e. need 2 levels of nesting
+    filter.setIncludeNesting(2);
+    // includeNesting(2) needs just source "global". However, WuScopeFilter is incorrectly inferring the source as "global,stats",
+    // causing too many of the stats to be pulled in and inefficiency.  Here, explicitly set source to "global"
+    filter.addSource("global");
+    filter.addOutputStatistic(StCostFileAccess);
+    filter.addRequiredStat(StCostFileAccess);
+    filter.finishedFilter();
+    Owned<IConstWUScopeIterator> it = &wu->getScopeIterator(filter);
+    cost_type totalCost = 0;
+    for (it->first(); it->isValid(); )
+    {
+        cost_type value = 0;
+        if (it->getStat(StCostFileAccess, value))
+        {
+            totalCost += value;
+            it->nextSibling();
+        }
+        else
+        {
+            it->next();
+        }
+    }
+    return totalCost;
+}
 //---------------------------------------------------------------------------------------------------------------------
 
 
@@ -4307,6 +4344,8 @@ public:
             { return c->getAbortTimeStamp(); }
     virtual unsigned __int64 getExecuteCost() const
             { return c->getExecuteCost(); }
+    virtual unsigned __int64 getFileAccessCost() const
+            { return c->getFileAccessCost(); }
     virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree)
             { return c->import(wuTree, graphProgressTree); }
 
@@ -12262,11 +12301,25 @@ unsigned __int64 CLocalWorkUnit::getAbortTimeStamp() const
     CriticalBlock block(crit);
     return p->getPropInt64("Tracing/AbortTimeStamp", 0);
 }
+
 unsigned __int64 CLocalWorkUnit::getExecuteCost() const
 {
     CriticalBlock block(crit);
     Owned<IPropertyTreeIterator> iter = p->getElements("./Statistics/Statistic[@kind='CostExecute'][@scope='']");
-    unsigned __int64 totalCost = 0;
+    cost_type totalCost = 0;
+    ForEach(*iter)
+    {
+        IPropertyTree &stat = iter->query();
+        totalCost += stat.getPropInt64("@value");
+    }
+    return totalCost;
+}
+
+unsigned __int64 CLocalWorkUnit::getFileAccessCost() const
+{
+    CriticalBlock block(crit);
+    Owned<IPropertyTreeIterator> iter = p->getElements("./Statistics/Statistic[@kind='CostFileAccess'][@scope='']");
+    cost_type totalCost = 0;
     ForEach(*iter)
     {
         IPropertyTree &stat = iter->query();

+ 2 - 1
common/workunit/workunit.hpp

@@ -1301,6 +1301,7 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo
     virtual IStringVal & getAbortBy(IStringVal & str) const = 0;
     virtual unsigned __int64 getAbortTimeStamp() const = 0;
     virtual unsigned __int64 getExecuteCost() const = 0;
+    virtual unsigned __int64 getFileAccessCost() const = 0;
 };
 
 
@@ -1707,7 +1708,7 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *ti
 extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer);
 extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search);
 extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false);
-
+extern WORKUNIT_API cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope);
 extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name);
 extern WORKUNIT_API void descheduleWorkunit(char const * wuid);
 #if 0

+ 1 - 1
common/workunit/workunit.ipp

@@ -307,7 +307,7 @@ public:
     virtual IStringVal & getAbortBy(IStringVal & str) const;
     virtual unsigned __int64 getAbortTimeStamp() const;
     virtual unsigned __int64 getExecuteCost() const;
-
+    virtual unsigned __int64 getFileAccessCost() const;
     void clearExceptions(const char *source=nullptr);
     void commit();
     IWUException *createException();

+ 7 - 1
ecl/eclagent/eclagent.cpp

@@ -1975,7 +1975,9 @@ void EclAgent::doProcess()
         const cost_type cost = aggregateCost(w, nullptr, false);
         if (cost)
             w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
-
+        const cost_type diskAccessCost = aggregateDiskAccessCost(w, nullptr);
+        if (diskAccessCost)
+            w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace);
         addTimings(w);
 
         switch (w->getState())
@@ -2515,9 +2517,13 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime
     updateWorkunitStat(wu, SSTworkflow, scope, StWhenStarted, nullptr, startTime, 0);
     updateWorkunitStat(wu, SSTworkflow, scope, StTimeElapsed, nullptr, elapsedNs, 0);
 
+    // Note: costs are aggregated when the timings are recording
     const cost_type cost = money2cost_type(calcCost(agent.queryAgentMachineCost(), nanoToMilli(elapsedNs))) + aggregateCost(wu, scope, true);
     if (cost)
         wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
+    const cost_type diskAccessCost = aggregateDiskAccessCost(wu, scope);
+    if (diskAccessCost)
+        wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace);
 }
 
 void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)

+ 1 - 1
esp/scm/ws_workunits.ecm

@@ -25,7 +25,7 @@ EspInclude(ws_workunits_queryset_req_resp);
 
 ESPservice [
     auth_feature("DEFERRED"), //This declares that the method logic handles feature level authorization
-    version("1.84"), default_client_version("1.84"), cache_group("ESPWsWUs"),
+    version("1.85"), default_client_version("1.85"), cache_group("ESPWsWUs"),
     noforms,exceptions_inline("./smc_xslt/exceptions.xslt"),use_method_name] WsWorkunits
 {
     ESPmethod [cache_seconds(60), resp_xsl_default("/esp/xslt/workunits.xslt")]     WUQuery(WUQueryRequest, WUQueryResponse);

+ 1 - 0
esp/scm/ws_workunits_struct.ecm

@@ -440,6 +440,7 @@ ESPStruct [nil_remove] ECLWorkunit
     [min_ver("1.67")] int HelpersCount;
     [min_ver("1.78")] ESParray<string> ServiceNames;
     [min_ver("1.84")] double ExecuteCost;
+    [min_ver("1.85")] double FileAccessCost;
 };
 
 ESPStruct [nil_remove] WUECLAttribute

+ 2 - 0
esp/services/ws_workunits/ws_workunitsHelpers.cpp

@@ -1051,6 +1051,8 @@ void WsWuInfo::getInfo(IEspECLWorkunit &info, unsigned long flags)
     info.setHasDebugValue(cw->hasDebugValue("__calculated__complexity__"));
     if(version>=1.84)
         info.setExecuteCost(cost_type2money(cw->getExecuteCost()));
+    if(version>=1.85)
+        info.setFileAccessCost(cost_type2money(cw->getFileAccessCost()));
 
     getClusterInfo(info, flags);
     getExceptions(info, flags);

+ 1 - 0
thorlcr/graph/thgraph.hpp

@@ -781,6 +781,7 @@ public:
     void doExecuteChild(size32_t parentExtractSz, const byte *parentExtract);
     void executeChild(size32_t & retSize, void * & ret, size32_t parentExtractSz, const byte *parentExtract);
     void setResults(IThorGraphResults *results);
+    virtual cost_type getDiskAccessCost() = 0;
     virtual void executeChild(size32_t parentExtractSz, const byte *parentExtract, IThorGraphResults *results, IThorGraphResults *graphLoopResults);
     virtual void executeChild(size32_t parentExtractSz, const byte *parentExtract);
     virtual bool serializeStats(MemoryBuffer &mb) override { return false; }

+ 25 - 0
thorlcr/graph/thgraphmaster.cpp

@@ -2861,6 +2861,31 @@ void CMasterGraph::getStats(IStatisticGatherer &stats)
     }
 }
 
+cost_type CMasterGraph::getDiskAccessCost()
+{
+    cost_type totalDiskAccessCost = 0;
+    Owned<IThorGraphIterator> iterChildGraph = getChildGraphIterator();
+    ForEach(*iterChildGraph)
+    {
+        CGraphBase &graph = iterChildGraph->query();
+        totalDiskAccessCost += graph.getDiskAccessCost();
+    }
+
+    Owned<IThorActivityIterator> iter;
+    if (queryOwner() && !isGlobal())
+        iter.setown(getIterator()); // Local child graphs still send progress, but aren't connected in master
+    else
+        iter.setown(getConnectedIterator());
+    ForEach (*iter)
+    {
+        CMasterGraphElement &container = (CMasterGraphElement &)iter->query();
+        CMasterActivity *activity = (CMasterActivity *)container.queryActivity();
+        if (activity) // may not be created (if within child query)
+            totalDiskAccessCost += activity->getDiskAccessCost();
+    }
+    return totalDiskAccessCost;
+}
+
 IThorResult *CMasterGraph::createResult(CActivityBase &activity, unsigned id, IThorGraphResults *results, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
 {
     Owned<CCollatedResult> result = new CCollatedResult(*this, activity, rowIf, id, results->queryOwnerId(), spillPriority);

+ 3 - 1
thorlcr/graph/thgraphmaster.ipp

@@ -115,6 +115,7 @@ public:
     void readActivityInitData(MemoryBuffer &mb, unsigned slave);
     bool deserializeStats(unsigned node, MemoryBuffer &mb);
     void getStats(IStatisticGatherer &stats);
+    virtual cost_type getDiskAccessCost() override;
     virtual void setComplete(bool tf=true);
     virtual bool prepare(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async) override;
     virtual void execute(size32_t _parentExtractSz, const byte *parentExtract, bool checkDependencies, bool async) override;
@@ -247,7 +248,7 @@ protected:
     std::vector<OwnedPtr<CThorEdgeCollection>> edgeStatsVector;
     CThorStatsCollection statsCollection;
     IBitSet *notedWarnings;
-    stat_type diskAccessCost = 0;
+    cost_type diskAccessCost = 0;
 
     void addReadFile(IDistributedFile *file, bool temp=false);
     IDistributedFile *queryReadFile(unsigned f);
@@ -278,6 +279,7 @@ public:
     virtual bool wait(unsigned timeout);
     virtual void done();
     virtual void kill();
+    virtual cost_type getDiskAccessCost() const { return diskAccessCost; }
 
 // IExceptionHandler
     virtual bool fireException(IException *e);

+ 1 - 0
thorlcr/graph/thgraphslave.hpp

@@ -450,6 +450,7 @@ public:
     virtual void abort(IException *e) override;
     virtual void reset() override;
     virtual void done() override;
+    virtual cost_type getDiskAccessCost() override { UNIMPLEMENTED; }
     virtual IThorGraphResults *createThorGraphResults(unsigned num);
 
 // IExceptionHandler

+ 8 - 2
thorlcr/master/thdemonserver.cpp

@@ -92,8 +92,14 @@ private:
         {
             const unsigned clusterWidth = queryNodeClusterWidth();
             const cost_type sgCost = money2cost_type(calcCost(thorManagerRate, duration) + calcCost(thorWorkerRate, duration) * clusterWidth);
-            if (finished && sgCost)
-                wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StCostExecute, NULL, sgCost, 1, 0, StatsMergeReplace);
+            if (finished)
+            {
+                if (sgCost)
+                    wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StCostExecute, NULL, sgCost, 1, 0, StatsMergeReplace);
+                cost_type costDiskAccess = graph.getDiskAccessCost();
+                if (costDiskAccess)
+                    wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace);
+            }
 
             const cost_type totalCost = workunitCost + sgCost;
             if (costLimit>0.0 && totalCost > costLimit)