浏览代码

Merge pull request #13468 from jakesmith/hpcc-23367-thor-on-demand

HPCC-23367 Thor-on-demand - 1st cut.

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 5 年之前
父节点
当前提交
73059f9dd8

+ 53 - 20
common/workunit/workunit.cpp

@@ -13182,13 +13182,51 @@ static void setResources(StringBuffer &jobYaml, const IConstWorkUnit *workunit,
     if (cpuLimit)
         jobYaml.replaceString("#limit-cpu", s.clear().appendf("cpu: \"%um\"", cpuLimit));
 }
-void runK8sJob(const char *name, const char *wuid)
+
+
+void deleteK8sJob(const char *componentName, const char *job)
+{
+    VStringBuffer jobname("%s-%s", componentName, job);
+    jobname.toLowerCase();
+    VStringBuffer deleteJob("kubectl delete job/%s", jobname.str());
+    StringBuffer output, error;
+    bool ret = runExternalCommand(output, error, deleteJob.str(), nullptr);
+    DBGLOG("kubectl delete output: %s", output.str());
+    if (error.length())
+        DBGLOG("kubectl delete error: %s", error.str());
+    if (ret)
+        throw makeStringException(0, "Failed to run kubectl delete");
+}
+
+void waitK8sJob(const char *componentName, const char *job, const char *condition)
 {
-    VStringBuffer jobname("%s-%s", name, wuid);
+    VStringBuffer jobname("%s-%s", componentName, job);
     jobname.toLowerCase();
-    VStringBuffer args("--workunit=%s", wuid);
+
+    if (isEmptyString(condition))
+        condition = "condition=complete";
+
+    // MORE - blocks indefinitely here if you request too many resources
+    VStringBuffer waitJob("kubectl wait --for=%s --timeout=10h job/%s", condition, jobname.str());  // MORE - make timeout configurable
+    StringBuffer output, error;
+    bool ret = runExternalCommand(output, error, waitJob.str(), nullptr);
+    DBGLOG("kubectl wait output: %s", output.str());
+    if (error.length())
+        DBGLOG("kubectl wait error: %s", error.str());
+    if (ret)
+        throw makeStringException(0, "Failed to run kubectl wait");
+}
+
+void launchK8sJob(const char *componentName, const char *wuid, const char *job, const std::list<std::pair<std::string, std::string>> &extraParams)
+{
+    VStringBuffer jobname("%s-%s", componentName, job);
+    jobname.toLowerCase();
+    VStringBuffer args("\"--workunit=%s\"", wuid);
+    for (const auto &p: extraParams)
+        args.append(',').newline().append("\"--").append(p.first.c_str()).append('=').append(p.second.c_str()).append("\"");
+    VStringBuffer jobSpecFilename("/etc/config/%s-jobspec.yaml", componentName);
     StringBuffer jobYaml;
-    jobYaml.loadFile("/etc/config/jobspec.yaml", false);
+    jobYaml.loadFile(jobSpecFilename, false);
     jobYaml.replaceString("%jobname", jobname.str());
     jobYaml.replaceString("%args", args.str());
 
@@ -13197,7 +13235,7 @@ void runK8sJob(const char *name, const char *wuid)
     {
         Owned<IConstWorkUnit> workunit = factory->openWorkUnit(wuid);
         if (workunit)
-            setResources(jobYaml, workunit, name);
+            setResources(jobYaml, workunit, componentName);
     }
 
     StringBuffer output, error;
@@ -13210,22 +13248,17 @@ void runK8sJob(const char *name, const char *wuid)
         DBGLOG("Using job yaml %s", jobYaml.str());
         throw makeStringException(0, "Failed to start kubectl job");
     }
-    // MORE - blocks indefinitely here if you request too many resources
-    VStringBuffer waitJob("kubectl wait --for=condition=complete --timeout=10h job/%s", jobname.str());  // MORE - make timeout configurable
-    ret = runExternalCommand(output.clear(), error.clear(), waitJob.str(), nullptr);
-    DBGLOG("kubectl wait output: %s", output.str());
-    if (error.length())
-        DBGLOG("kubectl wait error: %s", error.str());
-    if (ret)
-        throw makeStringException(0, "Failed to run kubectl wait");
+}
+
+void runK8sJob(const char *componentName, const char *wuid, const char *job, bool wait, const std::list<std::pair<std::string, std::string>> &extraParams)
+{
+    launchK8sJob(componentName, wuid, job, extraParams);
+    if (wait)
+    {
+        waitK8sJob(componentName, job);
 #ifndef _DEBUG
-    VStringBuffer deleteJob("kubectl delete job/%s", jobname.str());
-    ret = runExternalCommand(output.clear(), error.clear(), deleteJob.str(), nullptr);
-    DBGLOG("kubectl delete output: %s", output.str());
-    if (error.length())
-        DBGLOG("kubectl delete error: %s", error.str());
-    if (ret)
-        throw makeStringException(0, "Failed to run kubectl delete");
+        deleteK8sJob(componentName, job);
 #endif
+    }
 }
 #endif

+ 7 - 1
common/workunit/workunit.hpp

@@ -37,6 +37,9 @@
 #include "jprop.hpp"
 #include "wuattr.hpp"
 #include <vector>
+#include <list>
+#include <utility>
+#include <string>
 
 #define LEGACY_GLOBAL_SCOPE "workunit"
 #define GLOBAL_SCOPE ""
@@ -1685,7 +1688,10 @@ extern WORKUNIT_API bool isValidPriorityValue(const char * priority);
 extern WORKUNIT_API bool isValidMemoryValue(const char * memoryUnit);
 
 #ifdef _CONTAINERIZED
-extern WORKUNIT_API void runK8sJob(const char *name, const char *wuid);
+extern WORKUNIT_API void deleteK8sJob(const char *componentName, const char *job);
+extern WORKUNIT_API void waitK8sJob(const char *componentName, const char *job, const char *condition=nullptr);
+extern WORKUNIT_API void launchK8sJob(const char *componentName, const char *wuid, const char *job, const std::list<std::pair<std::string, std::string>> &extraParams={});
+extern WORKUNIT_API void runK8sJob(const char *componentName, const char *wuid, const char *job, bool wait=true, const std::list<std::pair<std::string, std::string>> &extraParams={});
 #endif
 
 #endif

+ 4 - 4
dockerfiles/hpcc/templates/_helpers.tpl

@@ -48,12 +48,12 @@ Generate local config info into config section
 {{- .me.name -}}.yaml: |
   version: 1.0
   {{ .component }}:
-{{ toYaml .me | indent 4 -}}
+{{ toYaml .me | indent 4 }}
 {{- end -}}
 
 {{- /* Generate a ConfigMap for a component */ -}}
 {{- /* Pass in a dictionary with root, component and me defined */ -}}
-{{- define "hpcc.generateConfigMap" }}
+{{- define "hpcc.generateConfigMap" -}}
 kind: ConfigMap 
 apiVersion: v1 
 metadata:
@@ -64,8 +64,8 @@ data:
     Global:
       imageVersion: {{ .root.Values.global.image.version | quote }}
       singleNode: {{ .root.Values.global.singleNode }}
-{{ include "hpcc.generateComponentConfigMap" . | indent 2 }}
-{{ end -}}
+{{ include "hpcc.generateComponentConfigMap" . | indent 2 -}}
+{{- end -}}
 
 {{- /* Add a ConfigMap volume for a component */ -}}
 {{- define "hpcc.addConfigVolume" -}}

+ 2 - 2
dockerfiles/hpcc/templates/dali.yaml

@@ -25,7 +25,7 @@ spec:
       volumes:
 {{ include "hpcc.addConfigVolume" . | indent 6 }}
 ---
-{{- include "hpcc.generateConfigMap" (dict "root" $ "component" "dali" "me" .) -}}
+{{- include "hpcc.generateConfigMap" (dict "root" $ "component" "dali" "me" .) }}
 ---
 apiVersion: v1
 kind: Service
@@ -40,4 +40,4 @@ spec:
     run: {{ .name | quote }}
   type: ClusterIP
 ---
-{{- end }}
+{{- end }}

+ 1 - 1
dockerfiles/hpcc/templates/eclagent.yaml

@@ -29,7 +29,7 @@ spec:
 {{ include "hpcc.addVolumes" $ | indent 6 }}
 ---
 {{- include "hpcc.generateConfigMap" (dict "root" $ "component" "eclagent" "me" .) }}
-  jobspec.yaml: |
+  eclagent-jobspec.yaml: |
     apiVersion: batch/v1
     kind: Job
     metadata:

+ 1 - 1
dockerfiles/hpcc/templates/eclccserver.yaml

@@ -31,7 +31,7 @@ spec:
 {{ include "hpcc.addVolumes" $ | indent 6 }}
 ---
 {{- include "hpcc.generateConfigMap" (dict "root" $ "component" "eclccserver" "me" .) }}
-  jobspec.yaml: |
+  eclccserver-jobspec.yaml: |
     apiVersion: batch/v1
     kind: Job
     metadata:

+ 2 - 2
dockerfiles/hpcc/templates/esp.yaml

@@ -26,7 +26,7 @@ spec:
 {{ include "hpcc.addConfigVolume" . | indent 6 }}
 {{ include "hpcc.addVolumes" $ | indent 6 }}
 ---
-{{- include "hpcc.generateConfigMap" (dict "root" $ "component" "esp" "me" .) -}}
+{{- include "hpcc.generateConfigMap" (dict "root" $ "component" "esp" "me" .) }}
 ---
 apiVersion: v1
 kind: Service
@@ -40,4 +40,4 @@ spec:
   selector:
     run: {{ .name | quote }}
   type: LoadBalancer
-{{- end }}
+{{- end }}

+ 1 - 1
dockerfiles/hpcc/templates/localroxie.yaml

@@ -33,7 +33,7 @@ spec:
 {{ include "hpcc.addConfigVolume" . | indent 6 }}
 {{ include "hpcc.addVolumes" $ | indent 6 }}
 ---
-{{- include "hpcc.generateConfigMap" (dict "root" $ "component" "roxie" "me" $roxie ) -}}
+{{- include "hpcc.generateConfigMap" (dict "root" $ "component" "roxie" "me" $roxie ) }}
 ---
 {{- end }}
 {{- end }}

+ 1 - 3
dockerfiles/hpcc/templates/roxie.yaml

@@ -43,9 +43,7 @@ spec:
   clusterIP: None # Headless service
 
 ---
-
-{{- include "hpcc.generateConfigMap" (dict "root" $ "component" "roxie" "me" $roxie) -}}
-
+{{- include "hpcc.generateConfigMap" (dict "root" $ "component" "roxie" "me" $roxie) }}
 ---
 
 {{ if $roxie.serverReplicas -}}

+ 102 - 53
dockerfiles/hpcc/templates/thor.yaml

@@ -1,84 +1,133 @@
 {{ range $thor := $.Values.thor -}}
 {{- $masterPort := $thor.masterport | default 20000 -}}
 {{- $slavePort := $thor.slaveport | default 20100 -}}
+{{- $eclagentName := printf "%s-agent" .name }}
 {{- $slaveName := printf "%s-slave" .name }}
 {{- $serviceName := printf "%s-svc" .name }}
 apiVersion: apps/v1
 kind: Deployment
 metadata:
-  name: {{ .name | quote }}
+  name: {{ $eclagentName | quote }}
 spec:
-  replicas: 1
+  replicas: {{ $thor.eclagent.replicas | default 1 }}
   selector:
     matchLabels:
-      run: {{ .name | quote }}
+      run: {{ $eclagentName | quote }}
   template:
     metadata:
       labels:
-        run: {{ .name | quote }}
+        run: {{ $eclagentName | quote }}
     spec:
-      {{- include "hpcc.checkDataStorageHostMount" (dict "root" $) | indent 6 }}
+      serviceAccountName: hpcc
       containers:
-      - name: {{ .name | quote }}
+      - name: {{ $eclagentName | quote }}
         ports:
           - containerPort: {{ $masterPort }}
         args: [
                 {{ include "hpcc.configArg" . }},
-                {{ include "hpcc.daliArg" $ }},
-                --masterport={{ $masterPort }},
-                --numSlaves={{ $thor.numSlaves }}
+                {{ include "hpcc.daliArg" $ }}
               ]
 {{ include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 8 }}
-{{ include "hpcc.addImageAttrs" (dict "root" $ "me" . "imagename" "thormaster") | indent 8 }}
+{{ include "hpcc.addImageAttrs" (dict "root" $ "me" . "imagename" "eclagent") | indent 8 }}
 {{ include "hpcc.addVolumeMounts" . | indent 8 }}
       volumes:
 {{ include "hpcc.addConfigVolume" . | indent 6 }}
 {{ include "hpcc.addVolumes" $ | indent 6 }}
 ---
-apiVersion: apps/v1
-kind: Deployment
-metadata:
-  name: {{ $slaveName | quote }}
-spec:
-  replicas: {{ $thor.numSlaves }}
-  selector:
-    matchLabels:
-      run: {{ $slaveName | quote }}
-  template:
+{{- include "hpcc.generateConfigMap" (dict "root" $ "component" "thor" "me" .) }}
+{{- if $thor.eclagent }}
+    eclagent:
+{{ toYaml $thor.eclagent | indent 6 }}
+{{- end }}
+  eclagent-jobspec.yaml: |
+    apiVersion: batch/v1
+    kind: Job
     metadata:
-      labels:
-        run: {{ $slaveName | quote }}
+      name: %jobname
     spec:
-      {{- include "hpcc.checkDataStorageHostMount" (dict "root" $) | indent 6 }}
-      containers:
-        - name: {{ $slaveName | quote }}
-          args: [
-                  {{ include "hpcc.configArg" . }},
-                  {{ include "hpcc.daliArg" $ }},
-                  --slaveport={{ $slavePort }},
-                  --master={{ printf "%s:%v" $serviceName $masterPort }}
-                ]
-{{ include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 10 }}
-{{ include "hpcc.addImageAttrs" (dict "root" $ "me" . "imagename" "thorslave") | indent 10 }}
-          ports:
-            - containerPort: {{ $slavePort }}
-{{ include "hpcc.addVolumeMounts" . | indent 10 }}
-      volumes:
-{{ include "hpcc.addConfigVolume" . | indent 6 }}
-{{ include "hpcc.addVolumes" $ | indent 6 }}
----
-apiVersion: v1
-kind: Service
-metadata:
-  name: {{ $serviceName | quote }}
-spec:
-  ports:
-  - port: {{ $masterPort }}
-    protocol: TCP
-    targetPort: {{ $masterPort }}
-  selector:
-    run: {{ .name | quote }}
-  type: ClusterIP
+      ttlSecondsAfterFinished: 100
+      template:
+        spec:
+          serviceAccountName: hpcc
+          {{- include "hpcc.checkDataStorageHostMount" (dict "root" $) | indent 10 }}
+          containers:
+          - name: %jobname
+            image: {{ include "hpcc.imageName" (dict "root" $ "me" . "imagename" "eclagent") }}
+            resources:
+              requests:
+                #request-memory
+                #request-cpu
+              limits:
+                #limit-memory
+                #limit-cpu
+            command: [
+                        "eclagent",
+                        {{ include "hpcc.configArg" . }},
+                        {{ include "hpcc.daliArg" $ }},
+                        %args
+                     ]
+{{ include "hpcc.addVolumeMounts" . | indent 12 }}
+          volumes:
+{{ include "hpcc.addConfigVolume" . | indent 10 }}
+{{ include "hpcc.addVolumes" $ | indent 10 }}
+          restartPolicy: Never
+      backoffLimit: 0
+  thormaster-jobspec.yaml: |
+    apiVersion: batch/v1
+    kind: Job
+    metadata:
+      name: %jobname
+    spec:
+      ttlSecondsAfterFinished: 100
+      template:
+        spec:
+          serviceAccountName: hpcc
+          {{- include "hpcc.checkDataStorageHostMount" (dict "root" $) | indent 10 }}
+          containers:
+          - name: %jobname
+{{ include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 12 }}
+{{ include "hpcc.addImageAttrs" (dict "root" $ "me" . "imagename" "thormaster") | indent 12 }}
+            args: [
+                    {{ include "hpcc.configArg" . }},
+                    {{ include "hpcc.daliArg" $ }},
+                    %args
+                  ]
+{{ include "hpcc.addVolumeMounts" . | indent 12 }}
+          volumes:
+{{ include "hpcc.addConfigVolume" . | indent 10 }}
+{{ include "hpcc.addVolumes" $ | indent 10 }}
+          restartPolicy: Never
+      backoffLimit: 0
+  thorslave-jobspec.yaml: |
+    apiVersion: batch/v1
+    kind: Job
+    metadata:
+      name: %jobname
+    spec:
+      parallelism: {{ $thor.numSlaves }}
+      ttlSecondsAfterFinished: 100
+      template:
+        spec:
+          serviceAccountName: hpcc
+          containers:
+          - name: %jobname
+{{ include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 12 }}
+{{ include "hpcc.addImageAttrs" (dict "root" $ "me" . "imagename" "thorslave") | indent 12 }}
+            args: [
+                    {{ include "hpcc.configArg" . }},
+                    {{ include "hpcc.daliArg" $ }},
+                    %args
+                  ]
+{{ include "hpcc.addVolumeMounts" . | indent 12 }}
+            - name: wsl-pv
+              mountPath: "/var/lib/HPCCSystems/thor/logs"
+          volumes:
+{{ include "hpcc.addConfigVolume" . | indent 10 }}
+{{ include "hpcc.addVolumes" $ | indent 10 }}
+          - name: wsl-pv
+            persistentVolumeClaim:
+              claimName: "wsl-pv-claim"
+          restartPolicy: Never
+      backoffLimit: 0
 ---
-{{- include "hpcc.generateConfigMap" (dict "root" $ "component" "thor" "me" .) -}}
 {{- end }}

+ 12 - 2
dockerfiles/hpcc/values.yaml

@@ -36,11 +36,17 @@ global:
   # This will cause initContainers to be created to ensure that host mount points have correct uid/gid
   hostStorage: true
 
+  # Set to true if using hostPath storageClass
+  # This will cause initContainers to be created to ensure that host mount points have correct uid/gid
+  hostStorage: true
+
+
 dali:
 - name: mydali
 
 eclagent:
 - name: myeclagent
+  queueNames: "hthor.agent" # kludge to match environment - until esp services can pick up dynamically
   replicas: 1
   containerPerAgent: true
   maxActive: 2
@@ -64,6 +70,10 @@ roxie:
 
 thor:
 - name: thor1
-  masterport: 20000
   numSlaves: 2
-  slaveport: 20100
+  globalMemorySize: 4096
+  maxActive: 2
+  eclagent:
+    queueNames: "thor.agent" # kludge to match environment - until esp services can pick up dynamically
+    replicas: 1
+    containerPerAgent: false

+ 1 - 2
dockerfiles/thormaster/Dockerfile

@@ -23,7 +23,6 @@ FROM hpccsystems/platform-core:${BUILD_LABEL}
 USER hpcc
 RUN mkdir -p /var/lib/HPCCSystems/thor
 WORKDIR /var/lib/HPCCSystems/thor
-COPY --chown=hpcc:hpcc thor.xml /var/lib/HPCCSystems/thor
-#RUN thormaster_lcr --init
+RUN thormaster_lcr --init
 ENTRYPOINT ["thormaster_lcr"]
 

+ 0 - 6
dockerfiles/thormaster/thor.xml

@@ -1,6 +0,0 @@
-<Thor name="mythor" daliServers="dali" numSlaves="1" pluginsPath="/opt/HPCCSystems/plugins/" replicateAsync="false"
-      watchdogEnabled="true" watchdogProgressEnabled="true" globalMemorySize="4096">
-  <cost>
-    <thor master="0.000002" slave="0.00001"/>
-  </cost>
-</Thor>

+ 10 - 2
ecl/agentexec/agentexec.cpp

@@ -88,7 +88,15 @@ int CEclAgentExecutionServer::run()
     {
         Owned<IGroup> serverGroup = createIGroup(daliServers, DALI_SERVER_PORT);
         initClientProcess(serverGroup, DCR_AgentExec);
+#ifdef _CONTAINERIZED
+        config->getProp("@queueNames", queueNames.s);
+        
+        // temporary
+        if (0 == queueNames.length())
+            getAgentQueueNames(queueNames, agentName);
+#else
         getAgentQueueNames(queueNames, agentName);
+#endif
         queue.setown(createJobQueue(queueNames.str()));
         queue->connect(false);
     }
@@ -196,7 +204,7 @@ public:
         {
             if (queryComponentConfig().getPropBool("@containerPerAgent", false))  // MORE - make this a per-workunit setting?
             {
-                runK8sJob("eclagent", wuid);
+                runK8sJob("eclagent", wuid, wuid);
             }
             else
             {
@@ -304,7 +312,7 @@ int main(int argc, const char *argv[])
     Owned<IPropertyTree> config;
     try
     {
-        config.setown(loadConfiguration(eclagentDefaultYaml, argv, "EclAgent", "ECLAGENT", "agentexec.xml", nullptr));
+        config.setown(loadConfiguration(eclagentDefaultYaml, argv, "eclagent", "ECLAGENT", "agentexec.xml", nullptr));
     }
     catch (IException *e) 
     {

+ 1 - 1
ecl/eclagent/eclagent.cpp

@@ -3316,7 +3316,7 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
     {
         try
         {
-            agentTopology.setown(loadConfiguration(eclagentDefaultYaml, argv, "EclAgent", "ECLAGENT", "agentexec.xml", nullptr));
+            agentTopology.setown(loadConfiguration(eclagentDefaultYaml, argv, "eclagent", "ECLAGENT", "agentexec.xml", nullptr));
         }
         catch (IException *E)
         {

+ 1 - 1
ecl/eclagent/eclagent.hpp

@@ -19,7 +19,7 @@
 
 static constexpr const char * eclagentDefaultYaml = R"!!(
 version: "1.0"
-EclAgent:
+eclagent:
     analyzeWorkunit: true
     defaultMemoryLimitMB: 300
     name: myeclagent

+ 30 - 0
ecl/eclagent/eclgraph.cpp

@@ -1506,6 +1506,35 @@ extern IProbeManager *createDebugManager(IDebuggableContext *debugContext, const
 
 void EclAgent::executeThorGraph(const char * graphName)
 {
+#ifdef _CONTAINERIZED
+    // NB: If a single Eclagent were to want to launch >1 Thor, then the threading could be in the workflow above this call.
+    Owned<IWorkUnitFactory> wuFactory = getWorkUnitFactory();
+    {
+        Owned<IWorkUnit> w = updateWorkUnit();
+        w->setState(WUStateBlocked);
+    }
+    unlockWorkUnit();
+        
+    VStringBuffer job("%s-%s", wuid.str(), graphName);
+    runK8sJob("thormaster", wuid, job, true, { { "graphName", graphName} });
+
+    if (wuRead->getExceptionCount())
+    {
+        Owned<IConstWUExceptionIterator> iter = &wuRead->getExceptions();
+        ForEach(*iter)
+        {
+            IConstWUException &e = iter->query();
+            SCMStringBuffer str;
+            e.getExceptionSource(str);
+            if (streq("thormasterexception", str.s))
+            {
+                str.clear();
+                e.getExceptionMessage(str);
+                throw makeStringException(e.getExceptionCode(), str.str());
+            }
+        }
+    }
+#else
     StringAttr wuid(wuRead->queryWuid());
     StringAttr owner(wuRead->queryUser());
     StringAttr cluster(wuRead->queryClusterName());
@@ -1683,6 +1712,7 @@ void EclAgent::executeThorGraph(const char * graphName)
         reloadWorkUnit();
     }
     while (resubmit); // if pause interrupted job (i.e. with pausenow action), resubmit graph
+#endif
 }
 
 //In case of logfile rollover, update logfile name(s) stored in workunit

+ 1 - 1
ecl/eclccserver/eclccserver.cpp

@@ -454,7 +454,7 @@ public:
 #ifdef _CONTAINERIZED
         if (globals->getPropBool("@containerPerCompile", false) && !globals->hasProp("@workunit"))
         {
-            runK8sJob("eclccserver", wuid.get());
+            runK8sJob("eclccserver", wuid, wuid);
             return;
         }
 #endif

+ 27 - 1
roxie/ccd/ccdcontext.cpp

@@ -19,6 +19,7 @@
 #include "jlib.hpp"
 
 #include "environment.hpp"
+#include "workunit.hpp"
 #include "wujobq.hpp"
 #include "nbcd.hpp"
 #include "rtlread_imp.hpp"
@@ -2268,6 +2269,31 @@ protected:
     {
         assertex(workUnit);
         StringAttr wuid(workUnit->queryWuid());
+
+#ifdef _CONTAINERIZED
+        // NB: If a single Eclagent were to want to launch >1 Thor, then the threading could be in the workflow above this call.
+        setWUState(WUStateBlocked);
+            
+        VStringBuffer job("%s-%s", wuid.get(), graphName);
+        runK8sJob("thormaster", wuid, job, true, { { "graphName", graphName} });
+
+        if (workUnit->getExceptionCount())
+        {
+            Owned<IConstWUExceptionIterator> iter = &workUnit->getExceptions();
+            ForEach(*iter)
+            {
+                IConstWUException &e = iter->query();
+                SCMStringBuffer str;
+                e.getExceptionSource(str);
+                if (streq("thormasterexception", str.s))
+                {
+                    str.clear();
+                    e.getExceptionMessage(str);
+                    throw makeStringException(e.getExceptionCode(), str.str());
+                }
+            }
+        }
+#else    
         StringAttr owner(workUnit->queryUser());
         StringAttr cluster(workUnit->queryClusterName());
 
@@ -2440,7 +2466,7 @@ protected:
             workUnit->forceReload();
         }
         while (resubmit); // if pause interrupted job (i.e. with pausenow action), resubmit graph
-
+#endif
     }
 };
 

+ 78 - 20
thorlcr/master/thgraphmanager.cpp

@@ -57,6 +57,7 @@ class CJobManager : public CSimpleInterface, implements IJobManager, implements
     CFifoFileCache querySoCache;
     Owned<IJobQueue> jobq;
     ICopyArrayOf<CJobMaster> jobs;
+    Owned<IException> exitException;
 
     Owned<IDeMonServer> demonServer;
     atomic_t            activeTasks;
@@ -233,6 +234,8 @@ public:
     void reply(IConstWorkUnit *workunit, const char *wuid, IException *e, const SocketEndpoint &agentep, bool allDone);
 
     void run();
+    bool execute(IConstWorkUnit *workunit, const char *wuid, const char *graphName, const SocketEndpoint &agentep);
+    IException *queryExitException() { return exitException; }
 
 // IExceptionHandler
     bool fireException(IException *e);
@@ -410,6 +413,42 @@ bool CJobManager::fireException(IException *e)
     return true;
 }
 
+bool CJobManager::execute(IConstWorkUnit *workunit, const char *wuid, const char *graphName, const SocketEndpoint &agentep)
+{
+    try
+    {
+        if (!workunit) // check workunit is available and ready to run.
+            throw MakeStringException(0, "Could not locate workunit %s", wuid);
+        if (workunit->getCodeVersion() == 0)
+            throw makeStringException(0, "Attempting to execute a workunit that hasn't been compiled");
+        if ((workunit->getCodeVersion() > ACTIVITY_INTERFACE_VERSION) || (workunit->getCodeVersion() < MIN_ACTIVITY_INTERFACE_VERSION))
+            throw MakeStringException(0, "Workunit was compiled for eclagent interface version %d, this thor requires version %d..%d", workunit->getCodeVersion(), MIN_ACTIVITY_INTERFACE_VERSION, ACTIVITY_INTERFACE_VERSION);
+
+        if (debugListener)
+        {
+            WorkunitUpdate wu(&workunit->lock());
+            StringBuffer sb;
+            queryHostIP().getIpText(sb);
+            wu->setDebugAgentListenerIP(sb); //tells debugger what IP to write commands to
+            wu->setDebugAgentListenerPort(debugListener->getPort());
+        }
+
+        return doit(workunit, graphName, agentep);
+    }
+    catch (IException *e) 
+    { 
+        IThorException *te = QUERYINTERFACE(e, IThorException);
+        if (te && tea_shutdown==te->queryAction()) 
+            stopped = true;
+        reply(workunit, wuid, e, agentep, false); 
+    }
+    catch (CATCHALL) 
+    { 
+        reply(workunit, wuid, MakeStringException(0, "Unknown exception"), agentep, false); 
+    }
+    return false;
+}
+
 void CJobManager::run()
 {
     setWuid(NULL);
@@ -664,23 +703,7 @@ void CJobManager::run()
         {
             factory.setown(getWorkUnitFactory());
             workunit.setown(factory->openWorkUnit(wuid));
-            if (!workunit) // check workunit is available and ready to run.
-                throw MakeStringException(0, "Could not locate workunit %s", wuid);
-            if (workunit->getCodeVersion() == 0)
-                throw makeStringException(0, "Attempting to execute a workunit that hasn't been compiled");
-            if ((workunit->getCodeVersion() > ACTIVITY_INTERFACE_VERSION) || (workunit->getCodeVersion() < MIN_ACTIVITY_INTERFACE_VERSION))
-                throw MakeStringException(0, "Workunit was compiled for eclagent interface version %d, this thor requires version %d..%d", workunit->getCodeVersion(), MIN_ACTIVITY_INTERFACE_VERSION, ACTIVITY_INTERFACE_VERSION);
-
-            if (debugListener)
-            {
-                WorkunitUpdate wu(&workunit->lock());
-                StringBuffer sb;
-                queryHostIP().getIpText(sb);
-                wu->setDebugAgentListenerIP(sb); //tells debugger what IP to write commands to
-                wu->setDebugAgentListenerPort(debugListener->getPort());
-            }
-
-            allDone = doit(workunit, graphName, agentep);
+            allDone = execute(workunit, wuid, graphName, agentep);
             daliLock.clear();
             reply(workunit, wuid, NULL, agentep, allDone);
         }
@@ -772,6 +795,16 @@ void CJobManager::replyException(CJobMaster &job, IException *e)
 void CJobManager::reply(IConstWorkUnit *workunit, const char *wuid, IException *e, const SocketEndpoint &agentep, bool allDone)
 {
     CriticalBlock b(replyCrit);
+#ifdef _CONTAINERIZED
+    // JCSMORE ignore pause/resume cases for now.
+    if (e)
+    {
+        if (!exitException)
+            exitException.setown(e);
+        return;
+    }
+#else
+    workunit->forceReload();
     if (!conversation) 
         return;
     StringBuffer s;
@@ -822,6 +855,7 @@ void CJobManager::reply(IConstWorkUnit *workunit, const char *wuid, IException *
     Owned<IRemoteConnection> conn = querySDS().connect("/Environment", myProcessSession(), RTM_LOCK_READ, MEDIUMTIMEOUT);
     if (checkThorNodeSwap(globals->queryProp("@name"),e?wuid:NULL,(unsigned)-1))
         abortThor(e, TEC_Swap, false);
+#endif
 }
 
 bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, const SocketEndpoint &agentEp)
@@ -1044,7 +1078,7 @@ void closeThorServerStatus()
     }
 }
 
-void thorMain(ILogMsgHandler *logHandler)
+void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphName)
 {
     aborting = 0;
     unsigned multiThorMemoryThreshold = globals->getPropInt("@multiThorMemoryThreshold")*0x100000;
@@ -1071,9 +1105,33 @@ void thorMain(ILogMsgHandler *logHandler)
         enableForceRemoteReads(); // forces file reads to be remote reads if they match environment setting 'forceRemotePattern' pattern.
 
         Owned<CJobManager> jobManager = new CJobManager(logHandler);
-        try {
+        try
+        {
             LOG(MCdebugProgress, thorJob, "Listening for graph");
-            jobManager->run();
+
+            if (wuid) // one-shot, quits after running
+            {
+                Owned<IWorkUnitFactory> factory;
+                Owned<IConstWorkUnit> workunit;
+                factory.setown(getWorkUnitFactory());
+                workunit.setown(factory->openWorkUnit(wuid));
+                SocketEndpoint dummyAgentEp;
+                jobManager->execute(workunit, wuid, graphName, dummyAgentEp);
+                IException *e = jobManager->queryExitException();
+                if (e)
+                {
+                    Owned<IWorkUnit> w = &workunit->lock();
+                    Owned<IWUException> we = w->createException();
+                    we->setSeverity(SeverityInformation);
+                    StringBuffer errStr;
+                    e->errorMessage(errStr);
+                    we->setExceptionMessage(errStr);
+                    we->setExceptionSource("thormasterexception");
+                    we->setExceptionCode(e->errorCode());
+                }
+            }  
+            else
+                jobManager->run();
         }
         catch (IException *e)
         {

+ 1 - 1
thorlcr/master/thgraphmanager.hpp

@@ -23,7 +23,7 @@ interface IException;
 CSDSServerStatus &queryServerStatus();
 CSDSServerStatus &openThorServerStatus();
 void closeThorServerStatus();
-void thorMain(ILogMsgHandler *logHandler);
+void thorMain(ILogMsgHandler *logHandler, const char *workunit, const char *graphName);
 
 enum ThorExitCodes { TEC_Clean, TEC_CtrlC, TEC_Idle, TEC_Watchdog, TEC_SlaveInit, TEC_Swap, TEC_DaliDown };
 

+ 51 - 22
thorlcr/master/thmastermain.cpp

@@ -50,7 +50,7 @@
 #include "dalienv.hpp"
 #include "dasds.hpp"
 #include "dllserver.hpp"
-
+#include "workunit.hpp"
 #include "rmtfile.hpp"
 
 #include "portlist.h"
@@ -557,12 +557,17 @@ thor:
   daliServers: dali
   watchdogEnabled: true
   watchdogProgressEnabled: true
+  cost:
+    thor:
+      master: "0.000002"
+      slave: "0.00001"
 )!!";
 
 
 #include "thactivitymaster.hpp"
 int main( int argc, const char *argv[]  )
 {
+#ifndef _CONTAINERIZED
     for (unsigned i=0;i<(unsigned)argc;i++) {
         if (streq(argv[i],"--daemon") || streq(argv[i],"-d")) {
             if (daemon(1,0) || write_pidfile(argv[++i])) {
@@ -572,6 +577,7 @@ int main( int argc, const char *argv[]  )
             break;
         }
     }
+#endif
 #if defined(WIN32) && defined(_DEBUG)
     int tmpFlag = _CrtSetDbgFlag( _CRTDBG_REPORT_FLAG );
     tmpFlag |= _CRTDBG_LEAK_CHECK_DF;
@@ -771,9 +777,7 @@ int main( int argc, const char *argv[]  )
         if (overrideReplicateDirectory&&*overrideBaseDirectory)
             setBaseDirectory(overrideReplicateDirectory, true);
         StringBuffer tempDirStr;
-        if (getConfigurationDirectory(globals->queryPropTree("Directories"),"temp","thor",globals->queryProp("@name"), tempDirStr))
-            globals->setProp("@thorTempDirectory", tempDirStr.str());
-        else
+        if (!getConfigurationDirectory(globals->queryPropTree("Directories"),"temp","thor",globals->queryProp("@name"), tempDirStr))
         {
             tempDirStr.append(globals->queryProp("@thorTempDirectory"));
             if (0 == tempDirStr.length())
@@ -784,10 +788,12 @@ int main( int argc, const char *argv[]  )
                 tempDirStr.append("temp");
             }
         }
+        globals->setProp("@thorTempDirectory", tempDirStr);
         logDiskSpace(); // Log before temp space is cleared
         StringBuffer tempPrefix("thtmp");
         tempPrefix.append(getMasterPortBase()).append("_");
-        SetTempDir(tempDirStr.str(), tempPrefix.str(), true);
+        SetTempDir(0, tempDirStr.str(), tempPrefix.str(), true);
+        DBGLOG("Temp directory: %s", queryTempDir());
 
         char thorPath[1024];
         if (!GetCurrentDirectory(1024, thorPath))
@@ -857,26 +863,39 @@ int main( int argc, const char *argv[]  )
         kjServiceMpTag = allocateClusterMPTag();
 
         unsigned numSlaves = 0;
-        if (isContainerized())
-        {
-            if (!globals->hasProp("@numSlaves"))
-                throw makeStringException(0, "Number of slaves not defined (numSlaves)");
-            else
-            {
-                numSlaves = globals->getPropInt("@numSlaves", 0);
-                if (0 == numSlaves)
-                    throw makeStringException(0, "Number of slaves must be > 0 (numSlaves)");
-            }
-        }
+        StringBuffer cloudJobName;
+        const char *workunit = nullptr;
+        const char *graphName = nullptr;
+#ifdef _CONTAINERIZED
+        if (!globals->hasProp("@numSlaves"))
+            throw makeStringException(0, "Number of slaves not defined (numSlaves)");
         else
         {
-            unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
-            unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
-            Owned<IGroup> rawGroup = getClusterNodeGroup(thorname, "ThorCluster");
-            setClusterGroup(queryMyNode(), rawGroup, slavesPerNode, channelsPerSlave, slaveBasePort, localThorPortInc);
-            numSlaves = queryNodeClusterWidth();
+            numSlaves = globals->getPropInt("@numSlaves", 0);
+            if (0 == numSlaves)
+                throw makeStringException(0, "Number of slaves must be > 0 (numSlaves)");
         }
 
+        workunit = globals->queryProp("@workunit");
+        graphName = globals->queryProp("@graphName");
+        if (isEmptyString(workunit))
+            throw makeStringException(0, "missing --workunit");
+        if (isEmptyString(graphName))
+            throw makeStringException(0, "missing --graphName");
+        cloudJobName.appendf("%s-%s", workunit, graphName);
+
+        StringBuffer myIp;
+        queryHostIP().getIpText(myIp);
+
+        launchK8sJob("thorslave", workunit, cloudJobName, { { "graphName", graphName}, { "master", myIp.str() } });
+#else
+        unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
+        unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
+        Owned<IGroup> rawGroup = getClusterNodeGroup(thorname, "ThorCluster");
+        setClusterGroup(queryMyNode(), rawGroup, slavesPerNode, channelsPerSlave, slaveBasePort, localThorPortInc);
+        numSlaves = queryNodeClusterWidth();
+#endif
+
         if (registry->connect(numSlaves))
         {
             if (globals->getPropBool("@replicateOutputs")&&globals->getPropBool("@validateDAFS",true)&&!checkClusterRelicateDAFS(queryNodeGroup()))
@@ -918,17 +937,27 @@ int main( int argc, const char *argv[]  )
             if (pinterval)
                 startPerformanceMonitor(pinterval, PerfMonStandard, nullptr);
 
-            thorMain(logHandler);
+            // NB: workunit/graphName only set in one-shot mode (if isCloud())
+            thorMain(logHandler, workunit, graphName);
             LOG(MCauditInfo, ",Progress,Thor,Terminate,%s,%s,%s",thorname,nodeGroup.str(),queueName.str());
         }
         else
             PROGLOG("Registration aborted");
+#ifdef _CONTAINERIZED
+        registry.clear();
+        deleteK8sJob("thorslave", cloudJobName);
+        setExitCode(0);
+#endif
         LOG(MCdebugProgress, thorJob, "ThorMaster terminated OK");
     }
     catch (IException *e) 
     {
         FLLOG(MCexception(e), thorJob, e,"ThorMaster");
         e->Release();
+#ifdef _CONTAINERIZED
+        setExitCode(0); // do we ever want to exit with non-zero in K8s world - which prevents job completing
+#endif
+        
     }
 
     // cleanup handler to be sure we end

+ 2 - 2
thorlcr/slave/thslavemain.cpp

@@ -448,10 +448,10 @@ int main( int argc, const char *argv[]  )
                 globals->setProp("@thorTempDirectory", tempDirStr.str());
             else
                 tempDirStr.append(globals->queryProp("@thorTempDirectory"));
-            addPathSepChar(tempDirStr).append(getMachinePortBase());
+            addPathSepChar(tempDirStr).append(mySlaveNum);
 
             logDiskSpace(); // Log before temp space is cleared
-            SetTempDir(tempDirStr.str(), "thtmp", true);
+            SetTempDir(mySlaveNum, tempDirStr.str(), "thtmp", true);
 
             useMemoryMappedRead(globals->getPropBool("@useMemoryMappedRead"));
 

+ 6 - 4
thorlcr/thorutil/thormisc.cpp

@@ -600,6 +600,7 @@ public:
     CriticalSection crit;
     bool altallowed;
     bool cleardir;
+    unsigned slaveNum = 0;
 
     CTempNameHandler()
     {
@@ -618,10 +619,11 @@ public:
             return alttempdir;
         return tempdir; 
     }
-    void setTempDir(const char *name, const char *_tempPrefix, bool clear)
+    void setTempDir(unsigned _slaveNum, const char *name, const char *_tempPrefix, bool clear)
     {
         assertex(name && *name);
         CriticalBlock block(crit);
+        slaveNum = _slaveNum;
         assertex(tempdir.isEmpty()); // should only be called once
         tempPrefix.set(_tempPrefix);
         StringBuffer base(name);
@@ -685,7 +687,7 @@ public:
             name.append(alttempdir);
         else
             name.append(tempdir);
-        name.append(tempPrefix).append((unsigned)GetCurrentProcessId()).append('_').append(++num);
+        name.append(tempPrefix).append((unsigned)GetCurrentProcessId()).append('_').append(slaveNum).append('_').append(++num);
         if (suffix)
             name.append("__").append(suffix);
         name.append(".tmp");
@@ -699,9 +701,9 @@ void GetTempName(StringBuffer &name, const char *prefix,bool altdisk)
     TempNameHandler.getTempName(name, prefix, altdisk);
 }
 
-void SetTempDir(const char *name, const char *tempPrefix, bool clear)
+void SetTempDir(unsigned slaveNum, const char *name, const char *tempPrefix, bool clear)
 {
-    TempNameHandler.setTempDir(name, tempPrefix, clear);
+    TempNameHandler.setTempDir(slaveNum, name, tempPrefix, clear);
 }
 
 void ClearDir(const char *dir)

+ 1 - 1
thorlcr/thorutil/thormisc.hpp

@@ -458,7 +458,7 @@ extern graph_decl IThorException *ThorWrapException(IException *e, const char *m
 extern graph_decl void setExceptionActivityInfo(CGraphElementBase &container, IThorException *e);
 
 extern graph_decl void GetTempName(StringBuffer &name, const char *prefix=NULL,bool altdisk=false);
-extern graph_decl void SetTempDir(const char *name, const char *tempPrefix, bool clear);
+extern graph_decl void SetTempDir(unsigned slaveNum, const char *name, const char *tempPrefix, bool clear);
 extern graph_decl void ClearDir(const char *dir);
 extern graph_decl void ClearTempDirs();
 extern graph_decl const char *queryTempDir(bool altdisk=false);