Quellcode durchsuchen

HPCC-23944 eclcc takes a long time to quit if k8s cluster stopped

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman vor 5 Jahren
Ursprung
Commit
805d7c8fd0

+ 35 - 14
common/workunit/workunit.cpp

@@ -13904,7 +13904,7 @@ void deleteK8sJob(const char *componentName, const char *job)
     jobname.toLowerCase();
     VStringBuffer deleteJob("kubectl delete job/%s", jobname.str());
     StringBuffer output, error;
-    bool ret = runExternalCommand(output, error, deleteJob.str(), nullptr);
+    bool ret = runExternalCommand(componentName, output, error, deleteJob.str(), nullptr);
     DBGLOG("kubectl delete output: %s", output.str());
     if (error.length())
         DBGLOG("kubectl delete error: %s", error.str());
@@ -13912,23 +13912,44 @@ void deleteK8sJob(const char *componentName, const char *job)
         throw makeStringException(0, "Failed to run kubectl delete");
 }
 
-void waitK8sJob(const char *componentName, const char *job, const char *condition)
+void waitK8sJob(const char *componentName, const char *job)
 {
     VStringBuffer jobname("%s-%s", componentName, job);
     jobname.toLowerCase();
+    VStringBuffer waitJob("kubectl get jobs %s -o jsonpath={.status.active}", jobname.str());
+    VStringBuffer getScheduleStatus("kubectl get pods --selector=job-name=%s --output=jsonpath={.items[*].status.conditions[?(@.type=='PodScheduled')].status}", jobname.str());
 
-    if (isEmptyString(condition))
-        condition = "condition=complete";
-
-    // MORE - blocks indefinitely here if you request too many resources
-    VStringBuffer waitJob("kubectl wait --for=%s --timeout=10h job/%s", condition, jobname.str());  // MORE - make timeout configurable
-    StringBuffer output, error;
-    bool ret = runExternalCommand(output, error, waitJob.str(), nullptr);
-    DBGLOG("kubectl wait output: %s", output.str());
-    if (error.length())
-        DBGLOG("kubectl wait error: %s", error.str());
-    if (ret)
-        throw makeStringException(0, "Failed to run kubectl wait");
+    unsigned delay = 100;
+    unsigned start = msTick();
+    unsigned pendingTimeout = 10000;
+    for (;;)
+    {
+        StringBuffer output, error;
+        unsigned ret = runExternalCommand(componentName, output, error, waitJob.str(), nullptr);
+        if (ret || error.length())
+            throw makeStringExceptionV(0, "Failed to run %s: error %u: %s", waitJob.str(), ret, error.str());
+        if (!streq(output, "1"))  // status.active value
+        {
+            DBGLOG("kubectl jobs output: %s", output.str());
+            break;
+        }
+        ret = runExternalCommand(componentName, output.clear(), error.clear(), getScheduleStatus.str(), nullptr);
+        if (error.length())
+        {
+            DBGLOG("kubectl get pods error: %s", error.str());
+            break;
+        }
+        bool pending = streq(output, "False");
+        if (pending && msTick()-start > pendingTimeout)
+        {
+            VStringBuffer getReason("kubectl get pods --selector=job-name=%s \"--output=jsonpath={range .items[*].status.conditions[?(@.type=='PodScheduled')]}{.reason}{': '}{.message}{end}\"", jobname.str());
+            runExternalCommand(componentName, output.clear(), error.clear(), getReason.str(), nullptr);
+            throw makeStringExceptionV(0, "Failed to run %s - pod not scheduled after %d ms: %s ", jobname.str(), pendingTimeout, output.str());
+        }
+        MilliSleep(delay);
+        if (delay < 10000)
+            delay = delay * 2;
+    }
 }
 
 void launchK8sJob(const char *componentName, const char *wuid, const char *job, const std::list<std::pair<std::string, std::string>> &extraParams)

+ 1 - 1
common/workunit/workunit.hpp

@@ -1699,7 +1699,7 @@ extern WORKUNIT_API void executeThorGraph(const char * graphName, IConstWorkUnit
 #ifdef _CONTAINERIZED
 extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, const char *graphName, const char *multiJobLingerQueueName);
 extern WORKUNIT_API void deleteK8sJob(const char *componentName, const char *job);
-extern WORKUNIT_API void waitK8sJob(const char *componentName, const char *job, const char *condition=nullptr);
+extern WORKUNIT_API void waitK8sJob(const char *componentName, const char *job);
 extern WORKUNIT_API void launchK8sJob(const char *componentName, const char *wuid, const char *job, const std::list<std::pair<std::string, std::string>> &extraParams={});
 extern WORKUNIT_API void runK8sJob(const char *componentName, const char *wuid, const char *job, bool del=true, const std::list<std::pair<std::string, std::string>> &extraParams={});
 #endif

+ 1 - 1
dockerfiles/hpcc/templates/eclccserver.yaml

@@ -48,7 +48,7 @@ data:
 {{ include "hpcc.generateConfigMapQueues" $ | indent 6 }}
     global:
 {{ include "hpcc.generateGlobalConfigMap" $ | indent 6 }}
-  eclccserver-jobspec.yaml: |
+  compile-jobspec.yaml: |
     apiVersion: batch/v1
     kind: Job
     metadata:

+ 28 - 1
ecl/eclccserver/eclccserver.cpp

@@ -464,7 +464,34 @@ public:
 #ifdef _CONTAINERIZED
         if (!globals->getPropBool("@useChildProcesses", false) && !globals->hasProp("@workunit"))
         {
-            runK8sJob("eclccserver", wuid, wuid, globals->getPropBool("@deleteJobs", true));
+            Owned<IException> error;
+            try
+            {
+                runK8sJob("compile", wuid, wuid, globals->getPropBool("@deleteJobs", true));
+            }
+            catch (IException *E)
+            {
+                error.setown(E);
+            }
+            if (error)
+            {
+                Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
+                workunit.setown(factory->updateWorkUnit(wuid.get()));
+                if (workunit)
+                {
+                    if (workunit->aborting() || workunit->getState()==WUStateAborted)
+                        workunit->setState(WUStateAborted);
+                    else
+                    {
+                        StringBuffer msg;
+                        error->errorMessage(msg);
+                        addExceptionToWorkunit(workunit, SeverityError, "eclccserver", error->errorCode(), msg.str(), NULL, 0, 0, 0);
+                        workunit->setState(WUStateFailed);
+                    }
+                }
+                workunit->commit();
+                workunit.clear();
+            }
             return;
         }
 #endif

+ 6 - 1
system/jlib/jutil.cpp

@@ -1838,11 +1838,16 @@ static const char *findExtension(const char *fn)
 
 unsigned runExternalCommand(StringBuffer &output, StringBuffer &error, const char *cmd, const char *input)
 {
+    return runExternalCommand(cmd, output, error, cmd, input);
+}
+
+unsigned runExternalCommand(const char *title, StringBuffer &output, StringBuffer &error, const char *cmd, const char *input)
+{
     try
     {
         Owned<IPipeProcess> pipe = createPipeProcess();
         int ret = START_FAILURE;
-        if (pipe->run(cmd, cmd, ".", input != NULL, true, true, 1024*1024))
+        if (pipe->run(title, cmd, ".", input != NULL, true, true, 1024*1024))
         {
             if (input)
             {

+ 1 - 0
system/jlib/jutil.hpp

@@ -295,6 +295,7 @@ extern jlib_decl bool isContainerized();
 #endif
 
 extern jlib_decl unsigned runExternalCommand(StringBuffer &output, StringBuffer &error, const char *cmd, const char *input);
+extern jlib_decl unsigned runExternalCommand(const char *title, StringBuffer &output, StringBuffer &error, const char *cmd, const char *input);
 
 extern jlib_decl unsigned __int64 greatestCommonDivisor(unsigned __int64 left, unsigned __int64 right);