|
@@ -13667,7 +13667,9 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP
|
|
|
else
|
|
|
{
|
|
|
VStringBuffer job("%s-%s", wuid.str(), graphName);
|
|
|
- runK8sJob("thormanager", wuid, job, queryComponentConfig().getPropBool("@deleteJobs", true), { { "graphName", graphName} });
|
|
|
+ bool deleteJobs = queryComponentConfig().getPropBool("@deleteJobs", true);
|
|
|
+ unsigned pendingTimeoutSecs = queryComponentConfig().getPropInt("@pendingTimeoutSecs", defaultPendingTimeSecs);
|
|
|
+ runK8sJob("thormanager", wuid, job, deleteJobs, pendingTimeoutSecs, { { "graphName", graphName} });
|
|
|
}
|
|
|
|
|
|
/* In k8s, Thor feeds back the terminating exception via the workunit.
|
|
@@ -14075,7 +14077,7 @@ void deleteK8sResource(const char *componentName, const char *job, const char *r
|
|
|
throw makeStringException(0, "Failed to run kubectl delete");
|
|
|
}
|
|
|
|
|
|
-void waitK8sJob(const char *componentName, const char *job)
|
|
|
+void waitK8sJob(const char *componentName, const char *job, unsigned pendingTimeoutSecs)
|
|
|
{
|
|
|
VStringBuffer jobname("%s-%s", componentName, job);
|
|
|
jobname.toLowerCase();
|
|
@@ -14085,7 +14087,6 @@ void waitK8sJob(const char *componentName, const char *job)
|
|
|
|
|
|
unsigned delay = 100;
|
|
|
unsigned start = msTick();
|
|
|
- unsigned pendingTimeout = 10000;
|
|
|
for (;;)
|
|
|
{
|
|
|
StringBuffer output, error;
|
|
@@ -14112,11 +14113,11 @@ void waitK8sJob(const char *componentName, const char *job)
|
|
|
// Check whether pod has been scheduled yet - if resources are not available pods may block indefinitely waiting to be scheduled, and
|
|
|
// we would prefer them to fail instead.
|
|
|
bool pending = streq(output, "False");
|
|
|
- if (pending && msTick()-start > pendingTimeout)
|
|
|
+ if (pendingTimeoutSecs && pending && msTick()-start > pendingTimeoutSecs*1000)
|
|
|
{
|
|
|
VStringBuffer getReason("kubectl get pods --selector=job-name=%s \"--output=jsonpath={range .items[*].status.conditions[?(@.type=='PodScheduled')]}{.reason}{': '}{.message}{end}\"", jobname.str());
|
|
|
runExternalCommand(componentName, output.clear(), error.clear(), getReason.str(), nullptr);
|
|
|
- throw makeStringExceptionV(0, "Failed to run %s - pod not scheduled after %d ms: %s ", jobname.str(), pendingTimeout, output.str());
|
|
|
+ throw makeStringExceptionV(0, "Failed to run %s - pod not scheduled after %u seconds: %s ", jobname.str(), pendingTimeoutSecs, output.str());
|
|
|
}
|
|
|
MilliSleep(delay);
|
|
|
if (delay < 10000)
|
|
@@ -14184,11 +14185,11 @@ bool applyK8sYaml(const char *componentName, const char *wuid, const char *job,
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
-void runK8sJob(const char *componentName, const char *wuid, const char *job, bool del, const std::list<std::pair<std::string, std::string>> &extraParams)
|
|
|
+void runK8sJob(const char *componentName, const char *wuid, const char *job, bool del, unsigned pendingTimeoutSecs, const std::list<std::pair<std::string, std::string>> &extraParams)
|
|
|
{
|
|
|
bool removeNetwork = applyK8sYaml(componentName, wuid, job, "networkspec", extraParams, true);
|
|
|
applyK8sYaml(componentName, wuid, job, "jobspec", extraParams, false);
|
|
|
- waitK8sJob(componentName, job);
|
|
|
+ waitK8sJob(componentName, job, pendingTimeoutSecs);
|
|
|
if (del)
|
|
|
{
|
|
|
deleteK8sResource(componentName, job, "job");
|