Просмотр исходного кода

Merge pull request #13462 from shamser/issue23665

HPCC-23665 Calculate and store workflow cost

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 5 лет назад
Родитель
Сommit
ad27ef747e

+ 1 - 2
common/workunit/workunit.cpp

@@ -12990,8 +12990,7 @@ extern WORKUNIT_API double calculateThorCost(__int64 timeNs, unsigned clusterWid
         double thor_master_rate = costs->getPropReal("thor/@master", 0.0);
         double thor_slave_rate = costs->getPropReal("thor/@slave", 0.0);
 
-        double time_seconds = ((double)timeNs/1000000000);
-        return (time_seconds * thor_master_rate + time_seconds * thor_slave_rate * clusterWidth)*1E6;
+        return calcCost(thor_master_rate, timeNs) + calcCost(thor_slave_rate, timeNs) * clusterWidth;
     }
     return 0;
 }

+ 4 - 0
common/workunit/workunit.hpp

@@ -1687,6 +1687,10 @@ inline bool isGlobalScope(const char * scope) { return scope && (streq(scope, GL
 extern WORKUNIT_API bool isValidPriorityValue(const char * priority);
 extern WORKUNIT_API bool isValidMemoryValue(const char * memoryUnit);
 
+#define HourToSeconds(n) ((n)/3600)
+#define NanoSecondsToSeconds(n) ((double)(n)/1000000000)
+inline __int64 calcCost(double ratePerHour, __int64 timeNS) { return HourToSeconds(ratePerHour) * NanoSecondsToSeconds(timeNS) * 1e6; }
+
 #ifdef _CONTAINERIZED
 extern WORKUNIT_API void deleteK8sJob(const char *componentName, const char *job);
 extern WORKUNIT_API void waitK8sJob(const char *componentName, const char *job, const char *condition=nullptr);

+ 6 - 0
ecl/eclagent/eclagent.cpp

@@ -563,6 +563,8 @@ EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bo
             w->setXmlParams(_queryXML);
         updateSuppliedXmlParams(w);
     }
+    IPropertyTree *costs = queryCostsConfiguration();
+    agentMachineCost = costs ? costs->getPropReal("@agent", 0.0): 0.0;
 }
 
 EclAgent::~EclAgent()
@@ -2309,6 +2311,10 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime
     scope.append(WorkflowScopePrefix).append(wfid);
     updateWorkunitStat(wu, SSTworkflow, scope, StWhenStarted, nullptr, startTime, 0);
     updateWorkunitStat(wu, SSTworkflow, scope, StTimeElapsed, nullptr, elapsedNs, 0);
+
+    const __int64 cost = calcCost(agent.queryAgentMachineCost(), elapsedNs);
+    if (cost)
+        wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
 }
 
 void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)

+ 6 - 0
ecl/eclagent/eclagent.ipp

@@ -390,6 +390,7 @@ private:
     StringAttr agentTempDir;
     Owned<IOrderedOutputSerializer> outputSerializer;
     int retcode;
+    double agentMachineCost = 0.0;
 
 private:
     void doSetResultString(type_t type, const char * stepname, unsigned sequence, int len, const char *val);
@@ -692,6 +693,11 @@ public:
     {
         return createRoxieRowAllocator(cache, *rowManager, meta, activityId, id, flags);
     }
+    virtual double queryAgentMachineCost() const
+    {
+        return agentMachineCost;
+    }
+
 };
 
 //---------------------------------------------------------------------------