瀏覽代碼

HPCC-24793 Prevent (by default) failed K8s scheduled jobs persisting

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 4 年之前
父節點
當前提交
0daf222dca

+ 82 - 43
common/workunit/workunit.cpp

@@ -12371,7 +12371,7 @@ extern WORKUNIT_API bool secDebugWorkunit(const char * wuid, ISecManager &secmgr
     {
         Owned<IConstWorkUnit> wu = globalFactory->openWorkUnit(wuid, &secmgr, &secuser);
         SCMStringBuffer ip;
-        unsigned port;
+        unsigned port = 0;
         try
         {
             port = wu->getDebugAgentListenerPort();
@@ -13667,9 +13667,7 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP
     else
     {
         VStringBuffer job("%s-%s", wuid.str(), graphName);
-        bool deleteJobs = queryComponentConfig().getPropBool("@deleteJobs", true);
-        unsigned pendingTimeoutSecs = queryComponentConfig().getPropInt("@pendingTimeoutSecs", defaultPendingTimeSecs);
-        runK8sJob("thormanager", wuid, job, deleteJobs, pendingTimeoutSecs, { { "graphName", graphName} });
+        runK8sJob("thormanager", wuid, job, { { "graphName", graphName} });
     }
 
     /* In k8s, Thor feeds back the terminating exception via the workunit.
@@ -14062,6 +14060,17 @@ static void setResources(IPropertyTree *workerConfig, const IConstWorkUnit *work
     setResourcesItem("limits", "cpu", cpuRequest, "m");
 }
 
+KeepK8sJobs translateKeepJobs(const char *keepJob)
+{
+    if (!isEmptyString(keepJob)) // common case
+    {
+        if (streq("podfailures", keepJob))
+            return KeepK8sJobs::podfailures;
+        else if (streq("all", keepJob))
+            return KeepK8sJobs::all;
+    }
+    return KeepK8sJobs::none;
+}
 
 void deleteK8sResource(const char *componentName, const char *job, const char *resource)
 {
@@ -14077,7 +14086,7 @@ void deleteK8sResource(const char *componentName, const char *job, const char *r
         throw makeStringException(0, "Failed to run kubectl delete");
 }
 
-void waitK8sJob(const char *componentName, const char *job, unsigned pendingTimeoutSecs)
+void waitK8sJob(const char *componentName, const char *job, unsigned pendingTimeoutSecs, KeepK8sJobs keepJob)
 {
     VStringBuffer jobname("%s-%s", componentName, job);
     jobname.toLowerCase();
@@ -14087,42 +14096,62 @@ void waitK8sJob(const char *componentName, const char *job, unsigned pendingTime
 
     unsigned delay = 100;
     unsigned start = msTick();
-    for (;;)
+
+    bool schedulingTimeout = false;
+    Owned<IException> exception;
+    try
     {
-        StringBuffer output, error;
-        unsigned ret = runExternalCommand(nullptr, output, error, waitJob.str(), nullptr);
-        if (ret || error.length())
-            throw makeStringExceptionV(0, "Failed to run %s: error %u: %s", waitJob.str(), ret, error.str());
-        if (!streq(output, "1"))  // status.active value
+        for (;;)
         {
-            // Job is no longer active - we can terminate
-            DBGLOG("kubectl jobs output: %s", output.str());
-            unsigned ret = runExternalCommand(nullptr, output.clear(), error.clear(), checkJobExitCode.str(), nullptr);
+            StringBuffer output, error;
+            unsigned ret = runExternalCommand(nullptr, output, error, waitJob.str(), nullptr);
             if (ret || error.length())
-                throw makeStringExceptionV(0, "Failed to run %s: error %u: %s", checkJobExitCode.str(), ret, error.str());
-            if (output.length() && !streq(output, "0"))  // state.terminated.exitCode
-                throw makeStringExceptionV(0, "Failed to run %s: pod exited with error: %s", jobname.str(), output.str());
-            break;
-        }
-        ret = runExternalCommand(nullptr, output.clear(), error.clear(), getScheduleStatus.str(), nullptr);
-        if (error.length())
-        {
-            DBGLOG("kubectl get schedule status error: %s", error.str());
-            break;
-        }
-        // Check whether pod has been scheduled yet - if resources are not available pods may block indefinitely waiting to be scheduled, and
-        // we would prefer them to fail instead.
-        bool pending = streq(output, "False");
-        if (pendingTimeoutSecs && pending && msTick()-start > pendingTimeoutSecs*1000)
-        {
-            VStringBuffer getReason("kubectl get pods --selector=job-name=%s \"--output=jsonpath={range .items[*].status.conditions[?(@.type=='PodScheduled')]}{.reason}{': '}{.message}{end}\"", jobname.str());
-            runExternalCommand(componentName, output.clear(), error.clear(), getReason.str(), nullptr);
-            throw makeStringExceptionV(0, "Failed to run %s - pod not scheduled after %u seconds: %s ", jobname.str(), pendingTimeoutSecs, output.str());
+                throw makeStringExceptionV(0, "Failed to run %s: error %u: %s", waitJob.str(), ret, error.str());
+            if (!streq(output, "1"))  // status.active value
+            {
+                // Job is no longer active - we can terminate
+                DBGLOG("kubectl jobs output: %s", output.str());
+                unsigned ret = runExternalCommand(nullptr, output.clear(), error.clear(), checkJobExitCode.str(), nullptr);
+                if (ret || error.length())
+                    throw makeStringExceptionV(0, "Failed to run %s: error %u: %s", checkJobExitCode.str(), ret, error.str());
+                if (output.length() && !streq(output, "0"))  // state.terminated.exitCode
+                    throw makeStringExceptionV(0, "Failed to run %s: pod exited with error: %s", jobname.str(), output.str());
+                break;
+            }
+            ret = runExternalCommand(nullptr, output.clear(), error.clear(), getScheduleStatus.str(), nullptr);
+            if (error.length())
+            {
+                DBGLOG("kubectl get schedule status error: %s", error.str());
+                break;
+            }
+            // Check whether pod has been scheduled yet - if resources are not available pods may block indefinitely waiting to be scheduled, and
+            // we would prefer them to fail instead.
+            bool pending = streq(output, "False");
+            if (pendingTimeoutSecs && pending && msTick()-start > pendingTimeoutSecs*1000)
+            {
+                schedulingTimeout = true;
+                VStringBuffer getReason("kubectl get pods --selector=job-name=%s \"--output=jsonpath={range .items[*].status.conditions[?(@.type=='PodScheduled')]}{.reason}{': '}{.message}{end}\"", jobname.str());
+                runExternalCommand(componentName, output.clear(), error.clear(), getReason.str(), nullptr);
+                throw makeStringExceptionV(0, "Failed to run %s - pod not scheduled after %u seconds: %s ", jobname.str(), pendingTimeoutSecs, output.str());
+            }
+            MilliSleep(delay);
+            if (delay < 10000)
+                delay = delay * 2;
         }
-        MilliSleep(delay);
-        if (delay < 10000)
-            delay = delay * 2;
     }
+    catch (IException *e)
+    {
+        EXCLOG(e, nullptr);
+        exception.setown(e);
+    }
+    if (keepJob != KeepK8sJobs::all)
+    {
+        // Delete jobs unless the pod failed and keepJob==podfailures
+        if ((nullptr == exception) || (KeepK8sJobs::podfailures != keepJob) || schedulingTimeout)
+            deleteK8sResource(componentName, job, "job");
+    }
+    if (exception)
+        throw exception.getClear();
 }
 
 bool applyK8sYaml(const char *componentName, const char *wuid, const char *job, const char *suffix, const std::list<std::pair<std::string, std::string>> &extraParams, bool optional)
@@ -14185,18 +14214,28 @@ bool applyK8sYaml(const char *componentName, const char *wuid, const char *job,
     return true;
 }
 
-void runK8sJob(const char *componentName, const char *wuid, const char *job, bool del, unsigned pendingTimeoutSecs, const std::list<std::pair<std::string, std::string>> &extraParams)
+static constexpr unsigned defaultPendingTimeSecs = 600;
+void runK8sJob(const char *componentName, const char *wuid, const char *job, const std::list<std::pair<std::string, std::string>> &extraParams)
 {
+    KeepK8sJobs keepJob = translateKeepJobs(queryComponentConfig().queryProp("@keepJobs"));
+    unsigned pendingTimeoutSecs = queryComponentConfig().getPropInt("@pendingTimeoutSecs", defaultPendingTimeSecs);
+
     bool removeNetwork = applyK8sYaml(componentName, wuid, job, "networkspec", extraParams, true);
     applyK8sYaml(componentName, wuid, job, "jobspec", extraParams, false);
-    waitK8sJob(componentName, job, pendingTimeoutSecs);
-    if (del)
+    Owned<IException> exception;
+    try
+    {
+        waitK8sJob(componentName, job, pendingTimeoutSecs, keepJob);
+    }
+    catch (IException *e)
     {
-        deleteK8sResource(componentName, job, "job");
-        if (removeNetwork)
-            deleteK8sResource(componentName, job, "networkpolicy");
+        EXCLOG(e, nullptr);
+        exception.setown(e);        
     }
-    // MORE - this will not remove the network if the job fails.
+    if (removeNetwork)
+        deleteK8sResource(componentName, job, "networkpolicy");
+    if (exception)
+        throw exception.getClear();
 }
 
 #endif

+ 5 - 4
common/workunit/workunit.hpp

@@ -1748,13 +1748,14 @@ inline cost_type calcCost(cost_type ratePerHour, unsigned __int64 ms) { return r
 extern WORKUNIT_API void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IPropertyTree &config);
 
 #ifdef _CONTAINERIZED
+enum class KeepK8sJobs { none, podfailures, all };
+extern WORKUNIT_API KeepK8sJobs translateKeepJobs(const char *keepJobs);
+
 extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, const char *graphName, const char *multiJobLingerQueueName);
 extern WORKUNIT_API void deleteK8sResource(const char *componentName, const char *job, const char *resource);
-extern WORKUNIT_API void waitK8sJob(const char *componentName, const char *job, unsigned pendingTimeoutSecs);
+extern WORKUNIT_API void waitK8sJob(const char *componentName, const char *job, unsigned pendingTimeoutSecs, KeepK8sJobs keepJob);
 extern WORKUNIT_API bool applyK8sYaml(const char *componentName, const char *wuid, const char *job, const char *suffix, const std::list<std::pair<std::string, std::string>> &extraParams, bool optional);
-
-constexpr unsigned defaultPendingTimeSecs = 600;
-extern WORKUNIT_API void runK8sJob(const char *componentName, const char *wuid, const char *job, bool del=true, unsigned pendingTimeoutSecs=defaultPendingTimeSecs, const std::list<std::pair<std::string, std::string>> &extraParams={});
+extern WORKUNIT_API void runK8sJob(const char *componentName, const char *wuid, const char *job, const std::list<std::pair<std::string, std::string>> &extraParams={});
 #endif
 
 #endif

+ 1 - 3
ecl/agentexec/agentexec.cpp

@@ -248,9 +248,7 @@ public:
                     params.push_back({ "graphName", graphName.get() });
                     jobName.append('-').append(graphName);
                 }
-                bool deleteJobs = queryComponentConfig().getPropBool("@deleteJobs", true);
-                unsigned pendingTimeoutSecs = queryComponentConfig().getPropInt("@pendingTimeoutSecs", defaultPendingTimeSecs);
-                runK8sJob(jobSpecName, wuid, jobName, deleteJobs, pendingTimeoutSecs, params);
+                runK8sJob(jobSpecName, wuid, jobName, params);
             }
             else
             {

+ 1 - 3
ecl/eclccserver/eclccserver.cpp

@@ -487,12 +487,10 @@ public:
 #ifdef _CONTAINERIZED
         if (!globals->getPropBool("@useChildProcesses", false) && !globals->hasProp("@workunit"))
         {
-            bool deleteJobs = queryComponentConfig().getPropBool("@deleteJobs", true);
-            unsigned pendingTimeoutSecs = queryComponentConfig().getPropInt("@pendingTimeoutSecs", defaultPendingTimeSecs);
             Owned<IException> error;
             try
             {
-                runK8sJob("compile", wuid, wuid, deleteJobs, pendingTimeoutSecs);
+                runK8sJob("compile", wuid, wuid);
             }
             catch (IException *E)
             {

+ 15 - 10
thorlcr/master/thmastermain.cpp

@@ -879,6 +879,8 @@ int main( int argc, const char *argv[]  )
     getThorQueueNames(_queueNames, thorName);
     queueName.set(_queueNames.str());
 
+    Owned<IException> exception;
+    StringBuffer cloudJobName;
     try
     {
         CSDSServerStatus &serverStatus = openThorServerStatus();
@@ -900,7 +902,6 @@ int main( int argc, const char *argv[]  )
         kjServiceMpTag = allocateClusterMPTag();
 
         unsigned numWorkers = 0;
-        StringBuffer cloudJobName;
         const char *workunit = nullptr;
         const char *graphName = nullptr;
 #ifdef _CONTAINERIZED
@@ -1017,23 +1018,27 @@ int main( int argc, const char *argv[]  )
         }
         else
             PROGLOG("Registration aborted");
-#ifdef _CONTAINERIZED
         registry.clear();
-        if (globals->getPropBool("@deleteJobs", true))
-            deleteK8sResource("thorworker", cloudJobName, "job");
-        setExitCode(0);
-#endif
         LOG(MCdebugProgress, thorJob, "ThorMaster terminated OK");
     }
     catch (IException *e) 
     {
         FLLOG(MCexception(e), thorJob, e,"ThorMaster");
-        e->Release();
+        exception.setown(e);
+    }
 #ifdef _CONTAINERIZED
-        setExitCode(0); // do we ever want to exit with non-zero in K8s world - which prevents job completing
-#endif
-        
+    if (!cloudJobName.isEmpty())
+    {
+        KeepK8sJobs keepJob = translateKeepJobs(globals->queryProp("@keepJobs"));
+        if (keepJob != KeepK8sJobs::all)
+        {
+            // Delete jobs unless the pod failed and keepJob==podfailures
+            if ((nullptr == exception) || (KeepK8sJobs::podfailures != keepJob))
+                deleteK8sResource("thorworker", cloudJobName, "job");
+        }
     }
+    setExitCode(0);
+#endif
 
     // cleanup handler to be sure we end
     thorEndHandler->start(30);