|
@@ -16,6 +16,7 @@
|
|
|
############################################################################## */
|
|
|
|
|
|
#include "platform.h"
|
|
|
+#include <math.h>
|
|
|
#include "jarray.hpp"
|
|
|
#include "jfile.hpp"
|
|
|
#include "jmutex.hpp"
|
|
@@ -1001,12 +1002,65 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
|
|
|
} cBlock(activeTasks);
|
|
|
|
|
|
{
|
|
|
+#ifdef _CONTAINERIZED
|
|
|
+ double stdDev = 0.0;
|
|
|
+ unsigned __int64 min, max;
|
|
|
+ unsigned minNode, maxNode;
|
|
|
+ const StatisticsMapping podStatistics({StNumPods});
|
|
|
+ CRuntimeSummaryStatisticCollection podStats(podStatistics);
|
|
|
+ std::vector<std::string> nodeNames; // ordered list of the unique node names
|
|
|
+ try
|
|
|
+ {
|
|
|
+ // collate pod distribution
|
|
|
+ VStringBuffer selector("thorworker-%s-%s", wuid.get(), graphName);
|
|
|
+ std::vector<std::vector<std::string>> pods = getPodNodes(selector.toLowerCase());
|
|
|
+ std::unordered_map<std::string, unsigned> podPerNodeCounts;
|
|
|
+ for (const auto &podNode: pods)
|
|
|
+ {
|
|
|
+ const std::string &node = podNode[1]; // pod is 1st item, node is 2nd
|
|
|
+ podPerNodeCounts[node]++; // NB: if doesn't exist is created with default value of 0 1st
|
|
|
+ }
|
|
|
+ for (const auto &node: podPerNodeCounts)
|
|
|
+ {
|
|
|
+ podStats.mergeStatistic(StNumPods, node.second, nodeNames.size());
|
|
|
+ nodeNames.push_back(node.first);
|
|
|
+ }
|
|
|
+ stdDev = podStats.queryStdDevInfo(StNumPods, min, max, minNode, maxNode);
|
|
|
+ }
|
|
|
+ catch (IException *e)
|
|
|
+ {
|
|
|
+ EXCLOG(e);
|
|
|
+ e->Release();
|
|
|
+ }
|
|
|
+
|
|
|
+ // calculate the above, before locking the workunit below to avoid holding lock whilst issuing getPodNodes call
|
|
|
+#endif
|
|
|
+
|
|
|
Owned<IWorkUnit> wu = &workunit.lock();
|
|
|
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StWhenStarted, NULL, startTs, 1, 0, StatsMergeAppend);
|
|
|
//Could use addTimeStamp(wu, SSTgraph, graphName, StWhenStarted, wfid) if start time could be this point
|
|
|
wu->setState(WUStateRunning);
|
|
|
VStringBuffer version("%d.%d", THOR_VERSION_MAJOR, THOR_VERSION_MINOR);
|
|
|
wu->setDebugValue("ThorVersion", version.str(), true);
|
|
|
+
|
|
|
+#ifdef _CONTAINERIZED
|
|
|
+ // issue warning and publish pod distribution stats, if any stddev
|
|
|
+ if (stdDev)
|
|
|
+ {
|
|
|
+ Owned<IStatisticGatherer> collector = createGlobalStatisticGatherer(wu);
|
|
|
+ StatsScopeId wfidScopeId(SSTworkflow, wfid);
|
|
|
+ StatsScopeId graphScopeId(graphName);
|
|
|
+ collector->beginScope(wfidScopeId);
|
|
|
+ collector->beginScope(graphScopeId);
|
|
|
+ podStats.recordStatistics(*collector, false);
|
|
|
+
|
|
|
+ StringBuffer scopeStr;
|
|
|
+ wfidScopeId.getScopeText(scopeStr).append(':');
|
|
|
+ graphScopeId.getScopeText(scopeStr);
|
|
|
+ Owned<IException> e = makeStringExceptionV(-1, "%s: Degraded performance. Worker pods are unevenly distributed over nodes. StdDev=%.2f. min node(%s) has %" I64F "u pods, max node(%s) has %" I64F "u pods", scopeStr.str(), stdDev, nodeNames[minNode].c_str(), min, nodeNames[maxNode].c_str(), max);
|
|
|
+ reportExceptionToWorkunit(*wu, e);
|
|
|
+ }
|
|
|
+#endif
|
|
|
}
|
|
|
|
|
|
setWuid(workunit.queryWuid(), workunit.queryClusterName());
|