Ver código fonte

HPCC-23715 Guillotine thor job exceeding cost limit

Signed-off-by: Shamser Ahmed <shamser.ahmed@lexisnexis.co.uk>
Shamser Ahmed 5 anos atrás
pai
commit
9dbbf9dbd4

+ 85 - 6
common/workunit/workunit.cpp

@@ -2487,8 +2487,75 @@ protected:
     IArrayOf<StatisticAggregator> aggregators;
 };
 
+// Aggregate costs excluding all hThor costs
+// Note: the only reason that this class is required is that it is not possible to determine the creator
+//       of a scope when iterating with IConstWUScopeIterator.  (When this functionlity becomes available, 
+//       consider filtering within aggregateCost and eliminating this class.)
+class CostAggregatorExcludeHThor : public CInterfaceOf<IWuScopeVisitor>
+{
+public:
+    virtual void noteStatistic(StatisticKind kind, unsigned __int64 value, IConstWUStatistic & extra) override
+    {
+        if (extra.getCreatorType() != SCThthor)
+            totalCost += value;
+    }
+    virtual void noteAttribute(WuAttr attr, const char * value) override { throwUnexpected(); }
+    virtual void noteHint(const char * kind, const char * value) override { throwUnexpected(); }
+    virtual void noteException(IConstWUException & exception) { throwUnexpected(); }
+    virtual cost_type getTotalCost() const { return totalCost; }
+protected:
+    cost_type totalCost = 0;
+};
 
-//To calculate aggregates, create a scope iterator, and an instance of a StatisticAggregator, and play the attributes through the interface
+// Aggregrate Thor or hThor costs
+cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool excludeHThor)
+{
+    // depth 1: workflow, depth 2: graph, depth 3: subgraph
+    WuScopeFilter filter;
+    if (scope && *scope) // All costs under specified scope (used to calculate all costs for a workflow)
+    {
+        filter.addScope(scope);
+        filter.setIncludeNesting(3);
+        filter.addOutputStatistic(StCostExecute);
+        filter.addRequiredStat(StCostExecute);
+    }
+    else
+        filter.addFilter("stat[CostExecute],depth[1..3],nested[0],where[CostExecute]");
+    filter.addSource("global");
+    filter.finishedFilter();
+    Owned<IConstWUScopeIterator> it = &wu->getScopeIterator(filter);
+    // Note: use of CostAggregatorExcludeHThor will be a slower, so use only when necessary
+    if (excludeHThor)
+    {
+        CostAggregatorExcludeHThor aggregator;
+        for (it->first(); it->isValid(); )
+        {
+            it->playProperties(aggregator);
+            stat_type value;
+            if (it->getStat(StCostExecute, value))
+                it->nextSibling();
+            else
+                it->next();
+        }
+        return aggregator.getTotalCost();
+    }
+    else
+    {
+        cost_type totalCost = 0;
+        for (it->first(); it->isValid(); )
+        {
+            stat_type value = 0.0;
+            if (it->getStat(StCostExecute, value))
+            {
+                totalCost += value;
+                it->nextSibling();
+            }
+            else
+                it->next();
+        }
+        return totalCost;
+    }
+}
 
 //---------------------------------------------------------------------------------------------------------------------
 
@@ -3883,6 +3950,8 @@ public:
             { return c->getDebugValueInt(propname, defVal); }
     virtual __int64 getDebugValueInt64(const char * propname, __int64 defVal) const
             { return c->getDebugValueInt64(propname, defVal); }
+    virtual double getDebugValueReal(const char * propname, double defVal) const
+            { return c->getDebugValueReal(propname, defVal); }
     virtual bool getDebugValueBool(const char * propname, bool defVal) const
             { return c->getDebugValueBool(propname, defVal); }
     virtual IStringIterator & getDebugValues() const 
@@ -7742,7 +7811,17 @@ __int64 CLocalWorkUnit::getDebugValueInt64(const char *propname, __int64 defVal)
     CriticalBlock block(crit);
     StringBuffer prop("Debug/");
     prop.append(lower);
-    return p->getPropInt64(prop.str(), defVal); 
+    return p->getPropInt64(prop.str(), defVal);
+}
+
+double CLocalWorkUnit::getDebugValueReal(const char *propname, double defVal) const
+{
+    StringBuffer lower;
+    lower.append(propname).toLowerCase();
+    CriticalBlock block(crit);
+    StringBuffer prop("Debug/");
+    prop.append(lower);
+    return p->getPropReal(prop.str(), defVal);
 }
 
 bool CLocalWorkUnit::getDebugValueBool(const char * propname, bool defVal) const
@@ -12987,15 +13066,15 @@ extern WORKUNIT_API void addTimeStamp(IWorkUnit * wu, StatisticScopeType scopeTy
     wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scopestr, kind, NULL, getTimeStampNowValue(), 1, 0, StatsMergeAppend);
 }
 
-extern WORKUNIT_API double calculateThorCost(__int64 timeNs, unsigned clusterWidth)
+extern WORKUNIT_API cost_type calculateThorCost(unsigned __int64 ms, unsigned clusterWidth)
 {
     IPropertyTree *costs = queryCostsConfiguration();
     if (costs)
     {
-        double thor_master_rate = costs->getPropReal("thor/@master", 0.0);
-        double thor_slave_rate = costs->getPropReal("thor/@slave", 0.0);
+        cost_type thor_master_rate = money2cost_type(costs->getPropReal("thor/@master"));
+        cost_type thor_slave_rate = money2cost_type(costs->getPropReal("thor/@slave"));
 
-        return calcCost(thor_master_rate, timeNs) + calcCost(thor_slave_rate, timeNs) * clusterWidth;
+        return calcCost(thor_master_rate, ms) + calcCost(thor_slave_rate, ms) * clusterWidth;
     }
     return 0;
 }

+ 4 - 4
common/workunit/workunit.hpp

@@ -1174,6 +1174,7 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo
     virtual IStringVal & getDebugValue(const char * propname, IStringVal & str) const = 0;
     virtual int getDebugValueInt(const char * propname, int defVal) const = 0;
     virtual __int64 getDebugValueInt64(const char * propname, __int64 defVal) const = 0;
+    virtual double getDebugValueReal(const char * propname,  double defVal) const = 0;
     virtual bool getDebugValueBool(const char * propname, bool defVal) const = 0;
     virtual IStringIterator & getDebugValues() const = 0;
     virtual IStringIterator & getDebugValues(const char * prop) const = 0;
@@ -1646,6 +1647,7 @@ extern WORKUNIT_API void updateWorkunitStat(IWorkUnit * wu, StatisticScopeType s
 extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *timer);
 extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer);
 extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search);
+extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false);
 
 extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name);
 extern WORKUNIT_API void descheduleWorkunit(char const * wuid);
@@ -1658,7 +1660,7 @@ extern WORKUNIT_API const char * getWorkunitActionStr(WUAction action);
 extern WORKUNIT_API WUAction getWorkunitAction(const char * actionStr);
 
 extern WORKUNIT_API void addTimeStamp(IWorkUnit * wu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, unsigned wfid=0);
-extern WORKUNIT_API double calculateThorCost(__int64 timeNs, unsigned clusterWidth);
+extern WORKUNIT_API cost_type calculateThorCost(unsigned __int64 ms, unsigned clusterWidth);
 
 extern WORKUNIT_API IPropertyTree * getWUGraphProgress(const char * wuid, bool readonly);
 
@@ -1687,9 +1689,7 @@ inline bool isGlobalScope(const char * scope) { return scope && (streq(scope, GL
 extern WORKUNIT_API bool isValidPriorityValue(const char * priority);
 extern WORKUNIT_API bool isValidMemoryValue(const char * memoryUnit);
 
-#define HourToSeconds(n) ((n)/3600)
-#define NanoSecondsToSeconds(n) ((double)(n)/1000000000)
-inline __int64 calcCost(double ratePerHour, __int64 timeNS) { return HourToSeconds(ratePerHour) * NanoSecondsToSeconds(timeNS) * 1e6; }
+inline cost_type calcCost(cost_type ratePerHour, unsigned __int64 ms) { return ratePerHour * ms / 1000 / 3600; }
 
 #ifdef _CONTAINERIZED
 extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, const char *graphName);

+ 1 - 0
common/workunit/workunit.ipp

@@ -258,6 +258,7 @@ public:
     virtual IStringIterator & getDebugValues(const char *prop) const;
     virtual int getDebugValueInt(const char * propname, int defVal) const;
     virtual __int64 getDebugValueInt64(const char * propname, __int64 defVal) const;
+    double getDebugValueReal(const char *propname, double defVal) const;
     virtual bool getDebugValueBool(const char * propname, bool defVal) const;
     virtual unsigned getExceptionCount() const;
     virtual IConstWUExceptionIterator & getExceptions() const;

+ 1 - 0
ecl/eclagent/agentctx.hpp

@@ -122,6 +122,7 @@ struct IAgentContext : extends IGlobalCodeContext
     virtual void updateWULogfile() = 0;
     virtual bool forceNewDiskReadActivity() const = 0;
     virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, char const * source) = 0;
+    virtual cost_type queryAgentMachineCost() const = 0;
 };
 
 #endif // AGENTCTX_HPP_INCL

+ 52 - 11
ecl/eclagent/eclagent.cpp

@@ -79,6 +79,7 @@ using roxiemem::OwnedRoxieString;
 #define ABORT_CHECK_INTERVAL 30     // seconds
 #define ABORT_DEADMAN_INTERVAL (60*5)  // seconds
 #define MAX_FILE_READ_FAIL_COUNT 3
+#define CHECK_COST_INTERVAL 120 // How frequently workunit cost is checked against cost limit (seconds)
 
 typedef IEclProcess* (* EclProcessFactory)();
 
@@ -565,7 +566,20 @@ EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bo
         updateSuppliedXmlParams(w);
     }
     IPropertyTree *costs = queryCostsConfiguration();
-    agentMachineCost = costs ? costs->getPropReal("@agent", 0.0): 0.0;
+    if (costs)
+    {
+        agentMachineCost = money2cost_type(costs->getPropReal("@agent"));
+        if (agentMachineCost)
+        {
+            double softCostLimit = costs->getPropReal("@limit");
+            double guillotineCost = wu->getDebugValueReal("maxCost", softCostLimit);
+            double hardCostLimit = costs->getPropReal("@hardlimit");
+            if (hardCostLimit && ((guillotineCost == 0) || (guillotineCost > hardCostLimit)))
+                guillotineCost = hardCostLimit;
+            abortmonitor->setGuillotineCost(money2cost_type(guillotineCost));
+        }
+    }
+
 }
 
 EclAgent::~EclAgent()
@@ -1920,7 +1934,7 @@ void EclAgent::doProcess()
         const __int64 elapsedNs = elapsedTimer.elapsedNs();
         updateWorkunitStat(w, SSTglobal, NULL, StTimeElapsed, nullptr, elapsedNs);
 
-        const __int64 cost = calcCost(agentMachineCost, elapsedNs);
+        const cost_type cost = aggregateCost(w, nullptr, false);
         if (cost)
             w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
 
@@ -2324,7 +2338,7 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime
     updateWorkunitStat(wu, SSTworkflow, scope, StWhenStarted, nullptr, startTime, 0);
     updateWorkunitStat(wu, SSTworkflow, scope, StTimeElapsed, nullptr, elapsedNs, 0);
 
-    const __int64 cost = calcCost(agent.queryAgentMachineCost(), elapsedNs);
+    const cost_type cost = calcCost(agent.queryAgentMachineCost(), nanoToMilli(elapsedNs)) + aggregateCost(wu, scope, true);
     if (cost)
         wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
 }
@@ -3159,21 +3173,30 @@ void EclAgent::abortMonitor()
 {
     StringBuffer errorText;
     unsigned guillotineleft = 0;
-    for (;;) {
-        unsigned waittime = ABORT_CHECK_INTERVAL;   
-        if (abortmonitor->guillotinetimeout) {
-            if (guillotineleft==0) {
+    unsigned checkCostInterval = CHECK_COST_INTERVAL;
+    for (;;)
+    {
+        unsigned waittime = ABORT_CHECK_INTERVAL;
+        if (abortmonitor->guillotinetimeout)
+        {
+            if (guillotineleft==0)
+            {
                 guillotineleft = abortmonitor->guillotinetimeout;
                 DBGLOG("Guillotine set to %ds",guillotineleft);
             }
             if (guillotineleft<waittime)
                 waittime = guillotineleft;
         }
-        if (abortmonitor->sem.wait(waittime*1000) && abortmonitor->stopping) {
+        if ((abortmonitor->guillotineCost > 0) && (checkCostInterval < waittime))
+            waittime = checkCostInterval;
+        if (abortmonitor->sem.wait(waittime*1000) && abortmonitor->stopping)
+        {
             return;
         }
-        if (guillotineleft) {
-            if (abortmonitor->guillotinetimeout) {
+        if (guillotineleft)
+        {
+            if (abortmonitor->guillotinetimeout)
+            {
                 guillotineleft-=waittime;
                 if (guillotineleft==0) {
                     DBGLOG("Guillotine triggered");
@@ -3189,7 +3212,8 @@ void EclAgent::abortMonitor()
             if (queryWorkUnit()) // could be exiting
                 isAborting = queryWorkUnit()->aborting();
         }
-        if (isAborting) {
+        if (isAborting)
+        {
             DBGLOG("Abort detected");
             while (abortmonitor->sem.wait(ABORT_DEADMAN_INTERVAL*1000))
                 if (abortmonitor->stopping)
@@ -3197,6 +3221,23 @@ void EclAgent::abortMonitor()
             IERRLOG("EclAgent failed to abort within %ds - killing process",ABORT_DEADMAN_INTERVAL);
             break;
         }
+        if (abortmonitor->guillotineCost)
+        {
+            if (checkCostInterval<=waittime)
+            {
+                cost_type totalCost = aggregateCost(queryWorkUnit());
+                if (totalCost > abortmonitor->guillotineCost)
+                {
+                    errorText.appendf("Workunit cost limit exceeded");
+                    break;
+                }
+                checkCostInterval=CHECK_COST_INTERVAL;
+            }
+            else
+            {
+                checkCostInterval-=waittime;
+            }
+        }
     }
     fatalAbort(isAborting,errorText.str());
 }

+ 10 - 4
ecl/eclagent/eclagent.ipp

@@ -248,6 +248,10 @@ public:
     {
         ctx->addWuExceptionEx(text, code, severity, audience, source);
     }
+    virtual cost_type queryAgentMachineCost() const override
+    {
+        return ctx->queryAgentMachineCost();
+    };
 
 protected:
     IAgentContext * ctx;
@@ -390,7 +394,7 @@ private:
     StringAttr agentTempDir;
     Owned<IOrderedOutputSerializer> outputSerializer;
     int retcode;
-    double agentMachineCost = 0.0;
+    cost_type agentMachineCost = 0;
 
 private:
     void doSetResultString(type_t type, const char * stepname, unsigned sequence, int len, const char *val);
@@ -430,11 +434,13 @@ private:
     public:
         Semaphore sem;
         bool stopping;
-        unsigned guillotinetimeout;
-        cAbortMonitor(EclAgent &_parent) : Thread("EclAgent Abort Monitor"), parent(_parent) { guillotinetimeout=0; stopping=false; }
+        unsigned guillotinetimeout = 0;
+        cost_type guillotineCost = 0;
+        cAbortMonitor(EclAgent &_parent) : Thread("EclAgent Abort Monitor"), parent(_parent) { guillotinetimeout=0; guillotineCost=0; stopping=false; }
         int  run()  { parent.abortMonitor(); return 0; }
         void stop() { stopping = true; sem.signal(); join(1000*10); }
         void setGuillotineTimeout(unsigned secs) { guillotinetimeout = secs; sem.signal(); }
+        void setGuillotineCost(cost_type cost) { guillotineCost = cost; }
         bool fireException(IException *e)
         {
             StringBuffer text;
@@ -693,7 +699,7 @@ public:
     {
         return createRoxieRowAllocator(cache, *rowManager, meta, activityId, id, flags);
     }
-    virtual double queryAgentMachineCost() const
+    virtual cost_type queryAgentMachineCost() const
     {
         return agentMachineCost;
     }

+ 25 - 2
ecl/eclagent/eclgraph.cpp

@@ -886,7 +886,17 @@ void EclSubGraph::updateProgress()
             if (startGraphTime)
                 parent.updateWUStatistic(lockedwu, SSTsubgraph, subgraphid, StWhenStarted, nullptr, startGraphTime);
             if (elapsedGraphCycles)
-                parent.updateWUStatistic(lockedwu, SSTsubgraph, subgraphid, StTimeElapsed, nullptr, cycle_to_nanosec(elapsedGraphCycles));
+            {
+                unsigned __int64 elapsedTime = cycle_to_nanosec(elapsedGraphCycles);
+                parent.updateWUStatistic(lockedwu, SSTsubgraph, subgraphid, StTimeElapsed, nullptr, elapsedTime);
+                const cost_type cost = calcCost(agent->queryAgentMachineCost(), nanoToMilli(elapsedTime));
+                if (cost)
+                {
+                    StringBuffer scope;
+                    scope.append(WorkflowScopePrefix).append(agent->getWorkflowId()).append(":").append(subgraphid);
+                    lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
+                }
+            }
         }
     }
 }
@@ -1236,6 +1246,14 @@ void EclGraph::execute(const byte * parentExtract)
 
             unsigned __int64 elapsedNs = milliToNano(elapsed);
             updateWorkunitStat(wu, SSTgraph, queryGraphName(), StTimeElapsed, description.str(), elapsedNs, agent->getWorkflowId());
+
+            const cost_type cost = calcCost(agent->queryAgentMachineCost(), nanoToMilli(elapsedNs));
+            if (cost)
+            {
+                StringBuffer scope;
+                scope.append(WorkflowScopePrefix).append(agent->getWorkflowId()).append(":").append(queryGraphName());
+                wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
+            }
         }
 
         if (agent->queryRemoteWorkunit())
@@ -1756,7 +1774,12 @@ void EclAgent::executeThorGraph(const char * graphName)
                 throw deserializeException(reply);
             }
             case DAMP_THOR_REPLY_ABORT:
-                throw new WorkflowException(0,"User abort requested", 0, WorkflowException::ABORT, MSGAUD_user);
+            {
+                Owned<IException> e = deserializeException(reply);
+                StringBuffer msg;
+                e->errorMessage(msg);
+                throw new WorkflowException(e->errorCode(), msg.str(), 0, WorkflowException::ABORT, MSGAUD_user);
+            }
             default:
                 assertex(false);
         }

+ 2 - 2
ecl/eclcc/eclcc.cpp

@@ -1478,8 +1478,8 @@ void EclCC::processSingleQuery(EclCompileInstance & instance,
     updateWorkunitStat(instance.wu, SSTcompilestage, "compile", StTimeElapsed, NULL, totalTimeNs);
 
     IPropertyTree *costs = queryCostsConfiguration();
-    const double machineCost = costs ? costs->getPropReal("@eclcc", 0.0): 0.0;
-    const __int64 cost = calcCost(machineCost, totalTimeNs);
+    const cost_type machineCost = costs ? money2cost_type(costs->getPropReal("@eclcc")) : 0;
+    const cost_type cost = calcCost(machineCost, nanoToMilli(totalTimeNs));
     if (cost)
         instance.wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTcompilestage, "compile", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
 

+ 6 - 1
roxie/ccd/ccdcontext.cpp

@@ -2518,7 +2518,12 @@ protected:
                     throw deserializeException(reply);
                 }
                 case DAMP_THOR_REPLY_ABORT:
-                    throw new WorkflowException(0,"User abort requested", 0, WorkflowException::ABORT, MSGAUD_user);
+                {
+                    Owned<IException> e = deserializeException(reply);
+                    StringBuffer msg;
+                    e->errorMessage(msg);
+                    throw new WorkflowException(e->errorCode(), msg.str(), 0, WorkflowException::ABORT, MSGAUD_user);
+                }
                 default:
                     throwUnexpected();
             }

+ 1 - 1
system/jlib/jptree.hpp

@@ -94,7 +94,7 @@ interface jlib_decl IPropertyTree : extends serializable
     virtual void setPropInt64(const char *xpath, __int64 val) = 0;
     virtual void addPropInt64(const char *xpath, __int64 val) = 0;
 
-    virtual double getPropReal(const char *xpath, double dft) const = 0;
+    virtual double getPropReal(const char *xpath, double dft=0.0) const = 0;
 
     virtual bool getPropBin(const char *xpath, MemoryBuffer &ret) const = 0;
     virtual void setPropBin(const char *xpath, size32_t size, const void *data) = 0;

+ 1 - 1
system/jlib/jptree.ipp

@@ -626,7 +626,7 @@ public:
     virtual int getPropInt(const char *xpath, int dft=0) const override;
     virtual void setPropInt(const char *xpath, int val) override;
     virtual void addPropInt(const char *xpath, int val) override;
-    virtual double getPropReal(const char *xpath, double dft) const override;
+    virtual double getPropReal(const char *xpath, double dft=0.0) const override;
     virtual bool getPropBin(const char * xpath, MemoryBuffer &ret) const override;
     virtual void setPropBin(const char * xpath, size32_t size, const void *data) override;
     virtual void appendPropBin(const char *xpath, size32_t size, const void *data) override;

+ 15 - 1
system/jlib/jstats.cpp

@@ -433,7 +433,7 @@ StringBuffer & formatStatistic(StringBuffer & out, unsigned __int64 value, Stati
     case SMeasureEnum:
         return out.append("Enum{").append(value).append("}"); // JCS->GH for now, should map to known enum text somehow
     case SMeasureCost:
-        return out.appendf("$ %.06f", (double) value / 1E6);
+        return out.appendf("$%.06f", cost_type2money(value) );
     default:
         return out.append(value).append('?');
     }
@@ -595,6 +595,20 @@ void describeScope(StringBuffer & description, const char * scope)
     }
 }
 
+bool isParentScope(const char *parent, const char *scope)
+{
+    const char *p = parent;
+    const char *q = scope;
+    while(*p && (*p==*q))
+    {
+        ++p;
+        ++q;
+    }
+    if ((*p==0) && (*q==':' || *q==0))
+        return true;
+    return false;
+}
+
 const char * queryMeasurePrefix(StatisticMeasure measure)
 {
     switch (measure)

+ 4 - 1
system/jlib/jstats.h

@@ -27,6 +27,7 @@
 #include "jstatcodes.h"
 
 typedef unsigned __int64 stat_type;
+typedef unsigned __int64 cost_type; // Decimal currency amount multiplied by 10^6
 const unsigned __int64 MaxStatisticValue = (unsigned __int64)0-1U;
 const unsigned __int64 AnyStatisticValue = MaxStatisticValue; // Use the maximum value to also represent unknown, since it is unlikely to ever occur.
 
@@ -40,7 +41,8 @@ inline constexpr stat_type statSkewPercent(long double value) { return (stat_typ
 inline constexpr stat_type statSkewPercent(stat_type  value) { return (stat_type)(value * 100); }
 
 inline StatisticKind queryStatsVariant(StatisticKind kind) { return (StatisticKind)(kind & ~StKindMask); }
-
+inline cost_type money2cost_type(double money) { return money * 1E6; }
+inline double cost_type2money(cost_type cost) { return ((double) cost) / 1E6; }
 //---------------------------------------------------------------------------------------------------------------------
 
 //Represents a single level of a scope
@@ -758,6 +760,7 @@ extern jlib_decl int compareScopeName(const char * left, const char * right);
 extern jlib_decl unsigned queryScopeDepth(const char * text);
 extern jlib_decl const char * queryScopeTail(const char * scope);
 extern jlib_decl bool getParentScope(StringBuffer & parent, const char * scope);
+extern jlib_decl bool isParentScope(const char *parent, const char *scope);
 extern jlib_decl void describeScope(StringBuffer & description, const char * scope);
 
 //This interface is primarily here to reduce the dependency between the different components.

+ 40 - 0
thorlcr/master/thdemonserver.cpp

@@ -38,6 +38,11 @@ private:
     unsigned reportRate;
     CIArrayOf<CGraphBase> activeGraphs;
     UnsignedArray graphStarts;
+    cost_type thorMasterCostRate = 0;
+    cost_type thorSlaveCostRate = 0;
+    cost_type costLimit = 0;
+    cost_type workunitCost = 0;
+
     
     void doReportGraph(IStatisticGatherer & stats, CGraphBase *graph)
     {
@@ -83,7 +88,20 @@ private:
         formatGraphTimerScope(graphScope, wfid, graphname, 0, graph.queryGraphId());
         unsigned duration = msTick()-startTime;
         updateWorkunitStat(wu, SSTsubgraph, graphScope, StTimeElapsed, timer, milliToNano(duration));
+        if (costLimit || finished)
+        {
+            const unsigned clusterWidth = queryNodeClusterWidth();
+            const cost_type sgCost = calcCost(thorMasterCostRate, duration) + calcCost(thorSlaveCostRate, duration) * clusterWidth;
+            if (finished)
+                wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StCostExecute, NULL, sgCost, 1, 0, StatsMergeReplace);
 
+            const cost_type totalCost = workunitCost + sgCost;
+            if (totalCost > costLimit)
+            {
+                LOG(MCwarning, thorJob, "ABORT job cost exceeds limit");
+                graph.fireException(MakeThorException(TE_CostExceeded, "Job cost exceeds limit"));
+            }
+        }
         if (finished)
         {
             if (memcmp(graphname,"graph",5)==0)
@@ -180,6 +198,12 @@ public:
     {
         lastReport = msTick();
         reportRate = globals->getPropInt("@watchdogProgressInterval", 30);
+        IPropertyTree *costs = queryCostsConfiguration();
+        if (costs)
+        {
+            thorMasterCostRate = money2cost_type(costs->getPropReal("thor/@master"));
+            thorSlaveCostRate = money2cost_type(costs->getPropReal("thor/@slave"));
+        }
     }
 
     virtual void takeHeartBeat(MemoryBuffer &progressMb)
@@ -227,6 +251,22 @@ public:
     void startGraph(CGraphBase *graph)
     {
         synchronized block(mutex);
+
+        IConstWorkUnit & wu =  graph->queryJob().queryWorkUnit();
+        workunitCost = aggregateCost(&wu);
+
+        const IPropertyTree *costs = queryCostsConfiguration();
+        double softLimit = 0.0, hardLimit = 0.0;
+        if (costs)
+        {
+            softLimit = costs->getPropReal("@limit");
+            hardLimit = costs->getPropReal("@hardlimit");
+        }
+        double tmpcostLimit = wu.getDebugValueReal("maxCost", softLimit);
+        if (hardLimit && ((tmpcostLimit == 0) || (tmpcostLimit > hardLimit)))
+            costLimit = money2cost_type(hardLimit);
+        else
+            costLimit = money2cost_type(tmpcostLimit);
         activeGraphs.append(*LINK(graph));
         unsigned startTime = msTick();
         graphStarts.append(startTime);

+ 14 - 3
thorlcr/master/thgraphmanager.cpp

@@ -838,8 +838,19 @@ void CJobManager::reply(IConstWorkUnit *workunit, const char *wuid, IException *
     else if (e)
     {
         IThorException *te = QUERYINTERFACE(e, IThorException);
-        if (te && TE_WorkUnitAborting == te->errorCode())
-            replyMb.append((unsigned)DAMP_THOR_REPLY_ABORT);
+        if (te)
+        {
+            switch (te->errorCode())
+            {
+            case TE_CostExceeded:
+            case TE_WorkUnitAborting:
+                replyMb.append((unsigned)DAMP_THOR_REPLY_ABORT);
+                break;
+            default:
+                replyMb.append((unsigned)DAMP_THOR_REPLY_ERROR);
+                break;
+            }
+        }
         else
             replyMb.append((unsigned)DAMP_THOR_REPLY_ERROR);
         serializeException(e, replyMb);
@@ -956,7 +967,7 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
         updateWorkunitStat(wu, SSTgraph, graphName, StTimeElapsed, graphTimeStr, graphTimeNs, wfid);
 
         addTimeStamp(wu, SSTgraph, graphName, StWhenFinished, wfid);
-        double cost = calculateThorCost(graphTimeNs, queryNodeClusterWidth());
+        double cost = calculateThorCost(nanoToMilli(graphTimeNs), queryNodeClusterWidth());
         if (cost)
             wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
 

+ 2 - 1
thorlcr/shared/thexception.hpp

@@ -162,7 +162,8 @@
 #define TE_RemoteReadFailure                    TE_Base + 139
 #define TE_MissingOptionalFile                  TE_Base + 140
 #define TE_UnsupportedSortOrder                 TE_Base + 141
-#define TE_Final                                TE_Base + 142       // keep this last
+#define TE_CostExceeded                         TE_Base + 142
+#define TE_Final                                TE_Base + 143       // keep this last
 #define ISTHOREXCEPTION(n) (n > TE_Base && n < TE_Final)
 
 #endif