Pārlūkot izejas kodu

HPCC-23712 Thor-on-demand with queueing

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 5 gadi atpakaļ
vecāks
revīzija
8f46c21be4

+ 1 - 1
common/fileview2/fvdisksource.cpp

@@ -781,7 +781,7 @@ bool IndirectDiskDataSource::loadBlock(__int64 startRow, offset_t startOffset)
 
     //Resubmit the query...
     submitWorkUnit(browseWuid, username, password);
-    WUState finalState = waitForWorkUnitToComplete(browseWuid, -1, true);
+    WUState finalState = waitForWorkUnitToComplete(browseWuid, -1, { WUStateWait });
     if(!((finalState == WUStateCompleted) || (finalState == WUStateWait)))
         return false;
 

+ 1 - 1
common/fileview2/fvquerysource.cpp

@@ -146,7 +146,7 @@ bool QueryDataSource::loadBlock(__int64 startRow, offset_t startOffset)
 
     //Resubmit the query...
     submitWorkUnit(browseWuid, username, password);
-    WUState finalState = waitForWorkUnitToComplete(browseWuid, -1, true);
+    WUState finalState = waitForWorkUnitToComplete(browseWuid, -1, { WUStateWait } );
     if(!((finalState == WUStateCompleted) || (finalState == WUStateWait)))
         return false;
 

+ 22 - 22
common/workunit/workunit.cpp

@@ -55,6 +55,10 @@
 #include "workunit.ipp"
 #include "digisign.hpp"
 
+#include <list>
+#include <string>
+#include <algorithm>
+
 using namespace cryptohelper;
 
 static int workUnitTraceLevel = 1;
@@ -5736,7 +5740,7 @@ public:
         return new CConstWUArrayIterator(results);
     }
 
-    virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState)
+    virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, std::list<WUState> expectedStates)
     {
         WUState ret = WUStateUnknown;
         StringBuffer wuRoot;
@@ -5745,6 +5749,9 @@ public:
         if (timeout == 0) //no need to subscribe
         {
             ret = (WUState) getEnum(conn->queryRoot(), "@state", states);
+            auto it = std::find(expectedStates.begin(), expectedStates.end(), ret);
+            if (it != expectedStates.end())
+                return ret;
             switch (ret)
             {
             case WUStateCompiled:
@@ -5756,9 +5763,6 @@ public:
             case WUStateFailed:
             case WUStateAborted:
                 return ret;
-            case WUStateWait:
-                if(returnOnWaitState)
-                    return ret;
             default:
                 break;
             }
@@ -5775,6 +5779,9 @@ public:
             for (;;)
             {
                 ret = (WUState) getEnum(conn->queryRoot(), "@state", states);
+                auto it = std::find(expectedStates.begin(), expectedStates.end(), ret);
+                if (it != expectedStates.end())
+                    return ret;
                 switch (ret)
                 {
                 case WUStateCompiled:
@@ -5787,10 +5794,6 @@ public:
                 case WUStateAborted:
                     return ret;
                 case WUStateWait:
-                    if(returnOnWaitState)
-                    {
-                        return ret;
-                    }
                     break;
                 case WUStateCompiling:
                 case WUStateRunning:
@@ -6126,9 +6129,9 @@ public:
     {
         baseFactory->clearAborting(wuid);
     }
-    virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState)
+    virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, std::list<WUState> expectedStates)
     {
-        return baseFactory->waitForWorkUnit(wuid, timeout, compiled, returnOnWaitState);
+        return baseFactory->waitForWorkUnit(wuid, timeout, compiled, expectedStates);
     }
     virtual WUAction waitForWorkUnitAction(const char * wuid, WUAction original)
     {
@@ -11928,21 +11931,21 @@ void testWorkflow()
 
 //------------------------------------------------------------------------------------------
 
-extern WUState waitForWorkUnitToComplete(const char * wuid, int timeout, bool returnOnWaitState)
+extern WUState waitForWorkUnitToComplete(const char * wuid, int timeout, std::list<WUState> expectedStates)
 {
-    return factory->waitForWorkUnit(wuid, (unsigned) timeout, false, returnOnWaitState);
+    return factory->waitForWorkUnit(wuid, (unsigned) timeout, false, expectedStates);
 }
 
-extern WORKUNIT_API WUState secWaitForWorkUnitToComplete(const char * wuid, ISecManager &secmgr, ISecUser &secuser, int timeout, bool returnOnWaitState)
+extern WORKUNIT_API WUState secWaitForWorkUnitToComplete(const char * wuid, ISecManager &secmgr, ISecUser &secuser, int timeout, std::list<WUState> expectedStates)
 {
     if (checkWuSecAccess(wuid, &secmgr, &secuser, SecAccess_Read, "Wait for Complete", false, true))
-        return waitForWorkUnitToComplete(wuid, timeout, returnOnWaitState);
+        return waitForWorkUnitToComplete(wuid, timeout, expectedStates);
     return WUStateUnknown;
 }
 
 extern bool waitForWorkUnitToCompile(const char * wuid, int timeout)
 {
-    switch(factory->waitForWorkUnit(wuid, (unsigned) timeout, true, true))
+    switch(factory->waitForWorkUnit(wuid, (unsigned) timeout, true, { WUStateWait }))
     {
     case WUStateCompiled:
     case WUStateCompleted:
@@ -13279,15 +13282,12 @@ void launchK8sJob(const char *componentName, const char *wuid, const char *job,
     }
 }
 
-void runK8sJob(const char *componentName, const char *wuid, const char *job, bool wait, const std::list<std::pair<std::string, std::string>> &extraParams)
+void runK8sJob(const char *componentName, const char *wuid, const char *job, bool del, const std::list<std::pair<std::string, std::string>> &extraParams)
 {
     launchK8sJob(componentName, wuid, job, extraParams);
-    if (wait)
-    {
-        waitK8sJob(componentName, job);
-#ifndef _DEBUG
+    waitK8sJob(componentName, job);
+    if (del)
         deleteK8sJob(componentName, job);
-#endif
-    }
 }
+
 #endif

+ 4 - 4
common/workunit/workunit.hpp

@@ -1463,7 +1463,7 @@ interface IWorkUnitFactory : extends IPluggableFactory
     virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset) = 0;
     virtual bool isAborting(const char *wuid) const = 0;
     virtual void clearAborting(const char *wuid) = 0;
-    virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState) = 0;
+    virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, std::list<WUState> expectedStates) = 0;
     virtual WUAction waitForWorkUnitAction(const char * wuid, WUAction original) = 0;
 
     virtual unsigned validateRepository(bool fixErrors) = 0;
@@ -1570,9 +1570,9 @@ inline bool isWorkunitDAToken(const char * distributedAccessToken)
 }
 
 //returns a state code.  WUStateUnknown == timeout
-extern WORKUNIT_API WUState waitForWorkUnitToComplete(const char * wuid, int timeout = -1, bool returnOnWaitState = false);
+extern WORKUNIT_API WUState waitForWorkUnitToComplete(const char * wuid, int timeout = -1, std::list<WUState> expectedStates = {});
 extern WORKUNIT_API bool waitForWorkUnitToCompile(const char * wuid, int timeout = -1);
-extern WORKUNIT_API WUState secWaitForWorkUnitToComplete(const char * wuid, ISecManager &secmgr, ISecUser &secuser, int timeout = -1, bool returnOnWaitState = false);
+extern WORKUNIT_API WUState secWaitForWorkUnitToComplete(const char * wuid, ISecManager &secmgr, ISecUser &secuser, int timeout = -1, std::list<WUState> expectedStates = {});
 extern WORKUNIT_API bool secWaitForWorkUnitToCompile(const char * wuid, ISecManager &secmgr, ISecUser &secuser, int timeout = -1);
 extern WORKUNIT_API bool secDebugWorkunit(const char * wuid, ISecManager &secmgr, ISecUser &secuser, const char *command, StringBuffer &response);
 extern WORKUNIT_API WUState getWorkUnitState(const char* state);
@@ -1691,7 +1691,7 @@ extern WORKUNIT_API bool isValidMemoryValue(const char * memoryUnit);
 extern WORKUNIT_API void deleteK8sJob(const char *componentName, const char *job);
 extern WORKUNIT_API void waitK8sJob(const char *componentName, const char *job, const char *condition=nullptr);
 extern WORKUNIT_API void launchK8sJob(const char *componentName, const char *wuid, const char *job, const std::list<std::pair<std::string, std::string>> &extraParams={});
-extern WORKUNIT_API void runK8sJob(const char *componentName, const char *wuid, const char *job, bool wait=true, const std::list<std::pair<std::string, std::string>> &extraParams={});
+extern WORKUNIT_API void runK8sJob(const char *componentName, const char *wuid, const char *job, bool del=true, const std::list<std::pair<std::string, std::string>> &extraParams={});
 #endif
 
 #endif

+ 1 - 1
common/workunit/workunit.ipp

@@ -647,7 +647,7 @@ public:
     virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset);
     virtual bool isAborting(const char *wuid) const;
     virtual void clearAborting(const char *wuid);
-    virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState) = 0;
+    virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled,  std::list<WUState> expectedStates) = 0;
 
     virtual StringArray &getUniqueValues(WUSortField field, const char *prefix, StringArray &result) const
     {

+ 1 - 1
common/workunit/wujobq.hpp

@@ -154,7 +154,7 @@ extern WORKUNIT_API IJobQueueItem *deserializeJobQueueItem(MemoryBuffer &mb);
 
 extern WORKUNIT_API IJobQueue *createJobQueue(const char *name);
 
-extern bool WORKUNIT_API runWorkUnit(const char *wuid, const char *cluster);
+extern bool WORKUNIT_API runWorkUnit(const char *wuid, const char *queueName);
 extern bool WORKUNIT_API runWorkUnit(const char *wuid);
 extern WORKUNIT_API StringBuffer & getQueuesContainingWorkUnit(const char *wuid, StringBuffer &queueList);
 extern WORKUNIT_API void removeWorkUnitFromAllQueues(const char *wuid);

+ 24 - 3
dockerfiles/hpcc/templates/_helpers.tpl

@@ -64,7 +64,7 @@ data:
     global:
       imageVersion: {{ .root.Values.global.image.version | quote }}
       singleNode: {{ .root.Values.global.singleNode }}
-{{ include "hpcc.generateComponentConfigMap" . | indent 2 -}}
+{{ include "hpcc.generateComponentConfigMap" . | indent 2 }}
 {{- end -}}
 
 {{- /* Add a ConfigMap volume for a component */ -}}
@@ -180,12 +180,33 @@ initContainers:
 {{- /* Add security context */ -}}
 {{- /* Pass in a dictionary with root and me defined */ -}}
 {{- define "hpcc.addSecurityContext" -}}
-{{- if .root.Values.global.privileged -}}
+{{- if .root.Values.global.privileged }}
 securityContext:
   privileged: true
   capabilities:
     add:
     - SYS_PTRACE
-{{- end -}}
+{{- end }}
 {{- end -}}
 
+
+{{- /* Generate instance queue names */ -}}
+{{- define "hpcc.generateConfigMapQueues" -}}
+queues:
+{{- range $.Values.eclagent }}
+- name: {{ .name }}
+  type: {{ .type | default "hthor" }}
+  prefix: {{ .prefix | default "null" }}
+{{- end -}}
+{{- range $.Values.roxie }}
+- name: {{ .name }}
+  type: roxie 
+  prefix: {{ .prefix | default "null" }}
+{{- end -}}
+{{- range $.Values.thor }}
+- name: {{ .name }}
+  type: thor
+  prefix: {{ .prefix | default "null" }}
+  width: {{ mul (.numSlaves | default 1) ( .channelsPerSlave | default 1) }}
+{{- end }}
+{{- end }}

+ 1 - 1
dockerfiles/hpcc/templates/dali.yaml

@@ -20,7 +20,7 @@ spec:
         args: [
                 # {{ include "hpcc.configArg" . }} - dali does not support this yet
               ]
-{{ include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 8 }}
+{{- include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 8 }}
 {{ include "hpcc.addImageAttrs" (dict "root" $ "me" . "imagename" "dali") | indent 8 }}
         volumeMounts:
 {{ include "hpcc.addConfigVolumeMount" . | indent 8 }}

+ 1 - 1
dockerfiles/hpcc/templates/eclagent.yaml

@@ -23,7 +23,7 @@ spec:
                 {{ include "hpcc.configArg" . }},
                 {{ include "hpcc.daliArg" $ }}
               ]
-{{ include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 8 }}
+{{- include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 8 }}
 {{ include "hpcc.addImageAttrs" (dict "root" $ "me" . "imagename" "eclagent") | indent 8 }}
 {{ include "hpcc.addVolumeMounts" . | indent 8 }}
       volumes:

+ 2 - 18
dockerfiles/hpcc/templates/eclccserver.yaml

@@ -23,7 +23,7 @@ spec:
                 {{ include "hpcc.configArg" . }},
                 {{ include "hpcc.daliArg" $ }}
               ]
-{{ include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 8 }}
+{{- include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 8 }}
 {{ include "hpcc.addImageAttrs" (dict "root" $ "me" . "imagename" "eclccserver") | indent 8 }}
 {{ include "hpcc.addVolumeMounts" . | indent 8 }}
       volumes:
@@ -31,23 +31,7 @@ spec:
 {{ include "hpcc.addVolumes" $ | indent 6 }}
 ---
 {{- include "hpcc.generateConfigMap" (dict "root" $ "component" "eclccserver" "me" .) }}
-      queues:
-{{- range $.Values.eclagent }}
-      - name: {{ .name }}
-        type: {{ .type | default "hthor" }}
-        prefix: {{ .prefix | default "null" }}
-{{- end -}}
-{{- range $.Values.roxie }}
-      - name: {{ .name }}
-        type: roxie 
-        prefix: {{ .prefix | default "null" }}
-{{- end -}}
-{{- range $.Values.thor }}
-      - name: {{ .name }}
-        type: thor
-        prefix: {{ .prefix | default "null" }}
-        width: {{ mul (.numSlaves | default 1) ( .channelsPerSlave | default 1) }}
-{{- end }}
+{{ include "hpcc.generateConfigMapQueues" $ | indent 6 }}
   eclccserver-jobspec.yaml: |
     apiVersion: batch/v1
     kind: Job

+ 4 - 3
dockerfiles/hpcc/templates/esp.yaml

@@ -17,10 +17,10 @@ spec:
       containers:
       - name: {{ .name | quote }}
         args: [
-                # {{ include "hpcc.configArg" . }},
-                # {{ include "hpcc.daliArg" $ }}
+                {{ include "hpcc.configArg" . }},
+                {{ include "hpcc.daliArg" $ }}
               ]
-{{ include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 8 }}
+{{- include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 8 }}
 {{ include "hpcc.addImageAttrs" (dict "root" $ "me" . "imagename" "esp") | indent 8 }}
 {{ include "hpcc.addVolumeMounts" . | indent 8 }}
       volumes:
@@ -28,6 +28,7 @@ spec:
 {{ include "hpcc.addVolumes" $ | indent 6 }}
 ---
 {{- include "hpcc.generateConfigMap" (dict "root" $ "component" "esp" "me" .) }}
+{{ include "hpcc.generateConfigMapQueues" $ | indent 6 }}
 ---
 apiVersion: v1
 kind: Service

+ 1 - 1
dockerfiles/hpcc/templates/localroxie.yaml

@@ -27,7 +27,7 @@ spec:
                 "--localSlave=true",
                 "--resolveLocally=false"
               ]
-{{ include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 8 }}
+{{- include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 8 }}
 {{ include "hpcc.addImageAttrs" (dict "root" $ "me" . "imagename" "roxie") | indent 8 }}
 {{ include "hpcc.addVolumeMounts" . | indent 8 }}
       volumes:

+ 1 - 1
dockerfiles/hpcc/templates/roxie.yaml

@@ -21,7 +21,7 @@ spec:
     spec:
       containers:
       - name: {{ $toponame | quote }}
-{{ include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 8 }}
+{{- include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 8 }}
 {{ include "hpcc.addImageAttrs" (dict "root" $ "me" $roxie "imagename" "toposerver") | indent 8 }}
         volumeMounts:
 {{ include "hpcc.addConfigVolumeMount" . | indent 8 }}

+ 63 - 24
dockerfiles/hpcc/templates/thor.yaml

@@ -2,46 +2,84 @@
 {{- if not $thor.disabled -}}
 {{- $masterPort := $thor.masterport | default 20000 -}}
 {{- $slavePort := $thor.slaveport | default 20100 -}}
-{{- $eclagentName := printf "%s-agent" .name }}
+{{- $hthorName := printf "%s-hthor" .name }}
+{{- $eclAgentName := printf "%s-agent" .name }}
+{{- $thorAgentName := printf "%s-thoragent" .name }}
 {{- $slaveName := printf "%s-slave" .name }}
 {{- $serviceName := printf "%s-svc" .name }}
+{{- $thorScope := omit . "eclagent" "thoragent" "hthor" }}
+{{- $eclAgentDefaults := dict "name" $eclAgentName "replicas" 1 }}
+{{- $eclAgentScope := .eclagent | mergeOverwrite $eclAgentDefaults | default $eclAgentDefaults }}
+{{- $thorAgentDefaults := dict "name" $thorAgentName "containerPerAgent" true "replicas" 1 }}
+{{- $thorAgentScope := .thoragent | mergeOverwrite $thorAgentDefaults | default $thorAgentDefaults }}
+{{- $hthorDefaults := dict "name" $hthorName }}
+{{- $hthorScope := .hthor | mergeOverwrite $hthorDefaults | default $hthorDefaults }}
 apiVersion: apps/v1
 kind: Deployment
 metadata:
-  name: {{ $eclagentName | quote }}
+  name: {{ $eclAgentName | quote }}
 spec:
-  replicas: {{ $thor.eclagent.replicas | default 1 }}
+  replicas: {{ $eclAgentScope.replicas }}
   selector:
     matchLabels:
-      run: {{ $eclagentName | quote }}
+      run: {{ $eclAgentName | quote }}
   template:
     metadata:
       labels:
-        run: {{ $eclagentName | quote }}
+        run: {{ $eclAgentName | quote }}
     spec:
       serviceAccountName: hpcc
       containers:
-      - name: {{ $eclagentName | quote }}
-        ports:
-          - containerPort: {{ $masterPort }}
+      - name: {{ $eclAgentName | quote }}
         args: [
-                {{ include "hpcc.configArg" . }},
-                {{ include "hpcc.daliArg" $ }}
+                {{ include "hpcc.configArg" $eclAgentScope }},
+                {{ include "hpcc.daliArg" $ }},
+                {{ printf "\"--name=%s\"" .name }},
+                {{ printf "\"--processConfig=/etc/config/%s.yaml\"" $hthorName }}
               ]
-{{ include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 8 }}
+{{- include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 8 }}
 {{ include "hpcc.addImageAttrs" (dict "root" $ "me" . "imagename" "eclagent") | indent 8 }}
 {{ include "hpcc.addVolumeMounts" . | indent 8 }}
       volumes:
 {{ include "hpcc.addConfigVolume" . | indent 6 }}
 {{ include "hpcc.addVolumes" $ | indent 6 }}
 ---
-{{- include "hpcc.generateConfigMap" (dict "root" $ "component" "thor" "me" .) }}
-{{- if $thor.eclagent }}
-    eclagent:
-      name: {{ .name }}
-{{ toYaml $thor.eclagent | indent 6 }}
-{{- end }}
-  eclagent-jobspec.yaml: |
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: {{ $thorAgentName | quote }}
+spec:
+  replicas: {{ $thorAgentScope.replicas }}
+  selector:
+    matchLabels:
+      run: {{ $thorAgentName | quote }}
+  template:
+    metadata:
+      labels:
+        run: {{ $thorAgentName | quote }}
+    spec:
+      serviceAccountName: hpcc
+      containers:
+      - name: {{ $thorAgentName | quote }}
+        args: [
+                {{ include "hpcc.configArg" $thorAgentScope }},
+                {{ include "hpcc.daliArg" $ }},
+                {{ printf "\"--name=%s\"" .name }},
+                {{ printf "\"--processConfig=/etc/config/%s.yaml\"" .name }}
+              ]
+{{- include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 8 }}
+{{ include "hpcc.addImageAttrs" (dict "root" $ "me" . "imagename" "eclagent") | indent 8 }}
+{{ include "hpcc.addVolumeMounts" . | indent 8 }}
+      volumes:
+{{ include "hpcc.addConfigVolume" . | indent 6 }}
+{{ include "hpcc.addVolumes" $ | indent 6 }}
+---
+{{- include "hpcc.generateConfigMap" (dict "root" $ "component" "thor" "me" $thorScope) }}
+{{ include "hpcc.generateComponentConfigMap" (dict "root" $ "component" "eclagent" "me" $eclAgentScope) | indent 2}}
+{{ include "hpcc.generateComponentConfigMap" (dict "root" $ "component" "eclagent" "me" $thorAgentScope) | indent 2}}
+      type: thor
+{{ include "hpcc.generateComponentConfigMap" (dict "root" $ "component" "hthor" "me" $hthorScope) | indent 2}}
+  hthor-jobspec.yaml: |
     apiVersion: batch/v1
     kind: Job
     metadata:
@@ -54,6 +92,7 @@ spec:
           {{- include "hpcc.checkDataStorageHostMount" (dict "root" $) | indent 10 }}
           containers:
           - name: %jobname
+{{- include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 12 }}
             image: {{ include "hpcc.imageName" (dict "root" $ "me" . "imagename" "eclagent") }}
             resources:
               requests:
@@ -63,8 +102,8 @@ spec:
                 #limit-memory
                 #limit-cpu
             command: [
-                        "eclagent",
-                        {{ include "hpcc.configArg" . }},
+                        "hthor",
+                        {{ include "hpcc.configArg" $hthorScope }},
                         {{ include "hpcc.daliArg" $ }},
                         %args
                      ]
@@ -87,10 +126,10 @@ spec:
           {{- include "hpcc.checkDataStorageHostMount" (dict "root" $) | indent 10 }}
           containers:
           - name: %jobname
-{{ include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 12 }}
+{{- include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 12 }}
 {{ include "hpcc.addImageAttrs" (dict "root" $ "me" . "imagename" "thormaster") | indent 12 }}
             args: [
-                    {{ include "hpcc.configArg" . }},
+                    {{ include "hpcc.configArg" $thorScope }},
                     {{ include "hpcc.daliArg" $ }},
                     %args
                   ]
@@ -113,10 +152,10 @@ spec:
           serviceAccountName: hpcc
           containers:
           - name: %jobname
-{{ include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 12 }}
+{{- include "hpcc.addSecurityContext" (dict "root" $ "me" .) | indent 12 }}
 {{ include "hpcc.addImageAttrs" (dict "root" $ "me" . "imagename" "thorslave") | indent 12 }}
             args: [
-                    {{ include "hpcc.configArg" . }},
+                    {{ include "hpcc.configArg" $thorAgentScope }},
                     {{ include "hpcc.daliArg" $ }},
                     %args
                   ]

+ 12 - 0
dockerfiles/hpcc/values.schema.json

@@ -173,6 +173,10 @@
         "replicas": {
           "type": "integer"
         },
+        "maxActive": {
+          "type": "integer",
+          "minimum": 1
+        },
         "containerPerAgent": {
           "type": "boolean",
           "description": "Should workunits execute in their own container, or as child processes"
@@ -218,9 +222,17 @@
           "type": "string",
           "description": "The (optional) file prefix to add to relative filenames"
         },
+	"numSlaves": {
+          "type": "integer",
+          "description": "The number of slave pods",
+          "minimum": 1
+        },
         "eclagent": {
           "$ref": "#/definitions/eclagent"
         },
+        "thoragent": {
+          "$ref": "#/definitions/eclagent"
+        },
         "image": {
           "$ref": "#/definitions/image"
         }

+ 3 - 3
dockerfiles/hpcc/values.yaml

@@ -86,8 +86,8 @@ thor:
 - name: thor
   numSlaves: 2
   globalMemorySize: 4096
-  maxActive: 2
   prefix: thor
   eclagent:
-    replicas: 1
-    containerPerAgent: false
+    maxActive: 4
+  thoragent:
+    maxActive: 2

+ 55 - 8
ecl/agentexec/agentexec.cpp

@@ -99,7 +99,12 @@ int CEclAgentExecutionServer::run()
         Owned<IGroup> serverGroup = createIGroup(daliServers, DALI_SERVER_PORT);
         initClientProcess(serverGroup, DCR_AgentExec);
 #ifdef _CONTAINERIZED
-        getClusterEclAgentQueueName(queueNames, agentName);
+        if (streq("agent", apptype))    
+            getClusterEclAgentQueueName(queueNames, agentName);
+        else if (streq("thor", apptype))
+            getClusterThorQueueName(queueNames, agentName);
+        else
+            throwUnexpected();
 #else
         getAgentQueueNames(queueNames, agentName);
 #endif
@@ -149,6 +154,7 @@ int CEclAgentExecutionServer::run()
                 catch(IException *e)
                 {
                     EXCLOG(e, "CEclAgentExecutionServer::run: ");
+                    e->Release();
                 }
                 catch(...)
                 {
@@ -189,7 +195,7 @@ int CEclAgentExecutionServer::run()
 class WaitThread : public CInterfaceOf<IPooledThread>
 {
 public:
-    WaitThread(const char *_dali, const char *_apptype) : dali(_dali), apptype(_apptype)
+    WaitThread(const char *_dali, const char *_apptype, const char *_queue) : dali(_dali), apptype(_apptype), queue(_queue)
     {
     }
     virtual void init(void *param) override
@@ -206,29 +212,69 @@ public:
     }
     virtual void threadmain() override
     {
+        Owned<IException> exception;
         try
         {
+            StringAttr jobSpecName(apptype);
+            StringAttr processName(apptype);
+
+            /* NB: In the case of handling apptype='thor', the queued items is of the form <wuid>/<graphName>
+             */
+            StringAttr graphName;
+            bool isThorJob = streq("thor", apptype);
+            if (isThorJob)
+            {
+                StringArray sArray;
+                sArray.appendList(wuid.get(), "/");
+                assertex(2 == sArray.ordinality());
+                wuid.set(sArray.item(0));
+                graphName.set(sArray.item(1));
+
+                // JCSMORE - idealy apptype, image and executable name would all be same.
+                jobSpecName.set("thormaster");
+                processName.set("thormaster_lcr");
+            }
             if (queryComponentConfig().getPropBool("@containerPerAgent", false))  // MORE - make this a per-workunit setting?
             {
-                runK8sJob(apptype, wuid, wuid);
+                std::list<std::pair<std::string, std::string>> params = { };
+                if (queryComponentConfig().getPropBool("@useThorQueue", true))
+                    params.push_back({ "queue", queue.get() });
+                StringBuffer jobName(wuid);
+                if (isThorJob)
+                {
+                    params.push_back({ "graphName", graphName.get() });
+                    jobName.append('-').append(graphName);
+                }
+                runK8sJob(jobSpecName, wuid, jobName, queryComponentConfig().getPropBool("@deleteJobs", true), params);
             }
             else
             {
-                VStringBuffer exec("%s --workunit=%s --daliServers=%s", apptype.str(), wuid.str(), dali.str());
+                VStringBuffer exec("%s --workunit=%s --daliServers=%s", processName.get(), wuid.str(), dali.str());
+                exec.append(" --config=");
+                queryComponentConfig().getProp("@processConfig", exec);
+                if (queryComponentConfig().getPropBool("@useThorQueue", true))
+                    exec.append(" --queue=").append(queue);
+                if (isThorJob)
+                    exec.appendf(" --graphName=%s", graphName.get());
                 Owned<IPipeProcess> pipe = createPipeProcess();
                 if (!pipe->run(apptype.str(), exec.str(), ".", false, true, false, 0, false))
                     throw makeStringExceptionV(0, "Failed to run %s", exec.str());
             }
         }
-        catch (IException *E)
+        catch (IException *e)
+        {
+            exception.setown(e);
+        }
+        if (exception)
         {
-            EXCLOG(E);
-            E->Release();
+            EXCLOG(exception);
             Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
             Owned<IWorkUnit> workunit = factory->updateWorkUnit(wuid);
             if (workunit)
             {
                 workunit->setState(WUStateFailed);
+                StringBuffer eStr;
+                addExceptionToWorkunit(workunit, SeverityError, "agentexec", exception->errorCode(), exception->errorMessage(eStr).str(), nullptr, 0, 0, 0);
                 workunit->commit();
             }
         }
@@ -237,13 +283,14 @@ private:
     StringAttr wuid;
     StringAttr dali;
     StringAttr apptype;
+    StringAttr queue;
 };
 #endif
 
 IPooledThread *CEclAgentExecutionServer::createNew()
 {
 #ifdef _CONTAINERIZED
-    return new WaitThread(daliServers, apptype);
+    return new WaitThread(daliServers, apptype, agentName);
 #else
     throwUnexpected();
 #endif

+ 50 - 8
ecl/eclagent/eclgraph.cpp

@@ -38,6 +38,10 @@
 #include "commonext.hpp"
 #include "thorcommon.hpp"
 
+#include <list>
+#include <string>
+#include <algorithm>
+
 using roxiemem::OwnedRoxieString;
 
 //---------------------------------------------------------------------------
@@ -1506,17 +1510,51 @@ extern IProbeManager *createDebugManager(IDebuggableContext *debugContext, const
 
 void EclAgent::executeThorGraph(const char * graphName)
 {
+    unsigned timelimit = queryWorkUnit()->getDebugValueInt("thorConnectTimeout", agentTopology->getPropInt("@thorConnectTimeout", 60));
 #ifdef _CONTAINERIZED
     // NB: If a single Eclagent were to want to launch >1 Thor, then the threading could be in the workflow above this call.
-    Owned<IWorkUnitFactory> wuFactory = getWorkUnitFactory();
+    setBlocked();
+    unlockWorkUnit();
+
+    WUState state = WUStateUnknown;
+    if (agentTopology->hasProp("@queue"))
     {
-        Owned<IWorkUnit> w = updateWorkUnit();
-        w->setState(WUStateBlocked);
+        VStringBuffer queueName("%s.thor", agentTopology->queryProp("@queue"));
+        DBGLOG("Queueing wuid=%s, graph=%s, on queue=%s, timelimit=%u seconds", wuid.str(), graphName, queueName.str(), timelimit);
+        Owned<IJobQueue> queue = createJobQueue(queueName.str());
+        queue->connect(false);
+        VStringBuffer jobName("%s/%s", wuid.get(), graphName);
+        IJobQueueItem *item = createJobQueueItem(jobName);
+        queue->enqueue(item);
+
+        // NB: overall max runtime if guillotine set handled by abortmonitor
+        unsigned runningTimeLimit = queryWorkUnit()->getDebugValueInt("maxRunTime", 0);
+        runningTimeLimit = runningTimeLimit ? runningTimeLimit : INFINITE;
+
+        std::list<WUState> expectedStates = { WUStateRunning, WUStateWait };
+        for (unsigned i=0; i<2; i++)
+        {
+            WUState state = waitForWorkUnitToComplete(wuid, timelimit*1000, expectedStates);
+            DBGLOG("Got state: %s", getWorkunitStateStr(state));
+            if (WUStateWait == state) // already finished
+                break;
+            else if ((INFINITE != timelimit) && (WUStateUnknown == state))
+                throw makeStringExceptionV(0, "Query %s failed to start within specified timelimit (%u) seconds", wuid.str(), timelimit);
+            else
+            {
+                auto it = std::find(expectedStates.begin(), expectedStates.end(), state);
+                if (it == expectedStates.end())
+                    throw makeStringExceptionV(0, "Query %s failed, state: %s", wuid.str(), getWorkunitStateStr(state));
+            }
+            timelimit = runningTimeLimit;
+            expectedStates = { WUStateWait };
+        }
+    }
+    else
+    {        
+        VStringBuffer job("%s-%s", wuid.str(), graphName);
+        runK8sJob("thormaster", wuid, job, queryComponentConfig().getPropBool("@deleteJobs", true), { { "graphName", graphName} });
     }
-    unlockWorkUnit();
-        
-    VStringBuffer job("%s-%s", wuid.str(), graphName);
-    runK8sJob("thormaster", wuid, job, true, { { "graphName", graphName} });
 
     if (wuRead->getExceptionCount())
     {
@@ -1534,12 +1572,16 @@ void EclAgent::executeThorGraph(const char * graphName)
             }
         }
     }
+    else if (WUStateFailed == state)
+        throw makeStringException(0, "Workunit failed");
+
+    setRunning();
+    unlockWorkUnit();
 #else
     StringAttr wuid(wuRead->queryWuid());
     StringAttr owner(wuRead->queryUser());
     StringAttr cluster(wuRead->queryClusterName());
     int priority = wuRead->getPriorityValue();
-    unsigned timelimit = queryWorkUnit()->getDebugValueInt("thorConnectTimeout", agentTopology->getPropInt("@thorConnectTimeout", 60));
     Owned<IConstWUClusterInfo> c = getTargetClusterInfo(cluster.str());
     if (!c)
         throw MakeStringException(0, "Invalid thor cluster %s", cluster.str());

+ 1 - 1
ecl/eclccserver/eclccserver.cpp

@@ -462,7 +462,7 @@ public:
 #ifdef _CONTAINERIZED
         if (globals->getPropBool("@containerPerCompile", false) && !globals->hasProp("@workunit"))
         {
-            runK8sJob("eclccserver", wuid, wuid);
+            runK8sJob("eclccserver", wuid, wuid, globals->getPropBool("@deleteJobs", true));
             return;
         }
 #endif

+ 24 - 3
esp/platform/espp.cpp

@@ -200,7 +200,7 @@ int start_init_main(int argc, char** argv, int (*init_main_func)(int, char**))
 #define SET_ESP_SIGNAL_HANDLER(sig, handler) signal(sig, handler)
 #define RESET_ESP_SIGNAL_HANDLER(sig, handler) signal(sig, handler)
 
-int start_init_main(int argc, char** argv, int (*init_main_func)(int,char**))
+int start_init_main(int argc, const char** argv, int (*init_main_func)(int, const char**))
 {
     return init_main_func(argc, argv);
 }
@@ -295,6 +295,15 @@ void openEspLogFile(IPropertyTree* envpt, IPropertyTree* procpt)
         UseSysLogForOperatorMessages();
 }   
 
+
+static constexpr const char * defaultYaml = R"!!(
+version: "1.0"
+esp:
+  name: myesp
+  daliServers: dali
+)!!";
+
+
 static void usage()
 {
     puts("ESP - Enterprise Service Platform server. (C) 2001-2011, HPCC Systems®.");
@@ -309,7 +318,7 @@ static void usage()
     exit(1);
 }
 
-int init_main(int argc, char* argv[])
+int init_main(int argc, const char* argv[])
 {
     for (unsigned i=0;i<(unsigned)argc;i++) {
         if (streq(argv[i],"--daemon") || streq(argv[i],"-d")) {
@@ -397,6 +406,18 @@ int init_main(int argc, char* argv[])
         else
             throw MakeStringException(-1, "Failed to load config file %s", cfgfile);
 
+#ifdef _CONTAINERIZED
+        Owned<IPropertyTree> espConfig;
+        /* For now, whilst esp lives with needing/reading a copy of the whole /Environment as it's configuration
+         * continue to do so, but also read component configuration (esp.yaml), and carry it inside the envpt tree,
+         * that is passed through services.
+         * Each service that can pick up the component config from "Config"
+         */
+        espConfig.setown(loadConfiguration(defaultYaml, argv, "esp", "ESP", nullptr, nullptr));
+        procpt->setProp("@daliServers", espConfig->queryProp("@daliServers"));
+        envpt->setPropTree("Config", espConfig.getClear());
+#endif
+
         const char* build_ver = BUILD_TAG;
         setBuildVersion(build_ver);
 
@@ -494,7 +515,7 @@ int init_main(int argc, char* argv[])
 // [2] config location - local file name or dali address
 // [3] config location type - "dali" or ""
 
-int main(int argc, char* argv[])
+int main(int argc, const char* argv[])
 {
     start_init_main(argc, argv, init_main);
     stopPerformanceMonitor();

+ 26 - 1
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -351,6 +351,7 @@ void CWsWorkunitsEx::init(IPropertyTree *cfg, const char *process, const char *s
         OERRLOG("No Dali Connection Active.");
         throw MakeStringException(-1, "No Dali Connection Active. Please Specify a Dali to connect to in you configuration file");
     }
+    config.setown(cfg->getPropTree("Config"));   
 
     DBGLOG("Initializing %s service [process = %s]", service, process);
 
@@ -441,6 +442,26 @@ void CWsWorkunitsEx::init(IPropertyTree *cfg, const char *process, const char *s
 void CWsWorkunitsEx::refreshValidClusters()
 {
     validClusters.kill();
+#ifdef _CONTAINERIZED
+    // discovered from generated cluster names
+ {
+     StringBuffer s;
+     toXML(config, s);
+     PROGLOG("config s = %s", s.str());
+ }
+    Owned<IPropertyTreeIterator> iter = config->getElements("queues");
+    ForEach(*iter)
+    {
+        IPropertyTree &queue = iter->query();
+        const char *qName = queue.queryProp("@name");
+        bool* found = validClusters.getValue(qName);
+        if (!found || !*found)
+        {
+            validClusters.setValue(qName, true);
+            PROGLOG("adding valid cluster: %s", qName);
+        }
+    }
+#else
     Owned<IStringIterator> it = getTargetClusters(NULL, NULL);
     ForEach(*it)
     {
@@ -450,6 +471,7 @@ void CWsWorkunitsEx::refreshValidClusters()
         if (!found || !*found)
             validClusters.setValue(val.str(), true);
     }
+#endif
 }
 
 bool CWsWorkunitsEx::isValidCluster(const char *cluster)
@@ -1191,7 +1213,10 @@ bool CWsWorkunitsEx::onWUWaitComplete(IEspContext &context, IEspWUWaitRequest &r
         WsWuHelpers::checkAndTrimWorkunit("WUWaitComplete", wuid);
         ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Full);
         PROGLOG("WUWaitComplete: %s", wuid.str());
-        resp.setStateID(secWaitForWorkUnitToComplete(wuid.str(), *context.querySecManager(), *context.queryUser(), req.getWait(), req.getReturnOnWait()));
+        std::list<WUState> expectedStates;
+        if (req.getReturnOnWait())
+            expectedStates.push_back(WUStateWait);
+        resp.setStateID(secWaitForWorkUnitToComplete(wuid.str(), *context.querySecManager(), *context.queryUser(), req.getWait(), expectedStates));
     }
     catch(IException* e)
     {

+ 1 - 0
esp/services/ws_workunits/ws_workunitsService.hpp

@@ -374,6 +374,7 @@ private:
     int maxRequestEntityLength;
     Owned<IThreadPool> clusterQueryStatePool;
     unsigned thorSlaveLogThreadPoolSize = THOR_SLAVE_LOG_THREAD_POOL_SIZE;
+    Owned<IPropertyTree> config;
 
 
 public:

+ 8 - 3
plugins/cassandra/cassandrawu.cpp

@@ -42,6 +42,10 @@
 
 #include "cassandraembed.hpp"
 
+#include <list>
+#include <string>
+#include <algorithm>
+
 #define EXPORT DECL_EXPORT
 
 namespace cassandraembed {
@@ -3703,7 +3707,7 @@ public:
     virtual bool isAborting(const char *wuid) const - done in the base class using dali
     virtual void clearAborting(const char *wuid) - done in the base class using dali
     */
-    virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState)
+    virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, std::list<WUState> expectedStates)
     {
         Owned<WorkUnitWaiter> waiter = new WorkUnitWaiter(wuid, SubscribeOptionState);
         LocalIAbortHandler abortHandler(*waiter);
@@ -3727,6 +3731,9 @@ public:
             StringBuffer stateStr;
             getCassString(stateStr, stateVal);
             WUState state = getWorkUnitState(stateStr);
+            auto it = std::find(expectedStates.begin(), expectedStates.end(), state);
+            if (it != expectedStates.end())
+                return state;
             switch (state)
             {
             case WUStateCompiled:
@@ -3739,8 +3746,6 @@ public:
             case WUStateAborted:
                 return state;
             case WUStateWait:
-                if (returnOnWaitState)
-                    return state;
                 break;
             case WUStateCompiling:
             case WUStateRunning:

+ 50 - 4
roxie/ccd/ccdcontext.cpp

@@ -40,6 +40,10 @@
 #include "roxiehelper.hpp"
 #include "enginecontext.hpp"
 
+#include <list>
+#include <string>
+#include <algorithm>
+
 using roxiemem::IRowManager;
 
 //=======================================================================================================================
@@ -2270,13 +2274,51 @@ protected:
         assertex(workUnit);
         StringAttr wuid(workUnit->queryWuid());
 
+        unsigned timelimit = workUnit->getDebugValueInt("thorConnectTimeout", defaultThorConnectTimeout);
+
 #ifdef _CONTAINERIZED
         // NB: If a single Eclagent were to want to launch >1 Thor, then the threading could be in the workflow above this call.
         setWUState(WUStateBlocked);
-            
-        VStringBuffer job("%s-%s", wuid.get(), graphName);
-        runK8sJob("thormaster", wuid, job, true, { { "graphName", graphName} });
 
+        WUState state = WUStateUnknown;
+        if (queryComponentConfig().hasProp("@queue"))
+        {
+            VStringBuffer queueName("%s.thor", queryComponentConfig().queryProp("@queue"));
+            DBGLOG("Queueing wuid=%s, graph=%s, on queue=%s, timelimit=%u seconds", wuid.str(), graphName, queueName.str(), timelimit);
+            Owned<IJobQueue> queue = createJobQueue(queueName.str());
+            queue->connect(false);
+            VStringBuffer jobName("%s/%s", wuid.get(), graphName);
+            IJobQueueItem *item = createJobQueueItem(jobName);
+            queue->enqueue(item);
+
+            unsigned runningTimeLimit = workUnit->getDebugValueInt("maxRunTime", 0);
+            runningTimeLimit = runningTimeLimit ? runningTimeLimit : INFINITE;
+
+            std::list<WUState> expectedStates = { WUStateRunning, WUStateWait };
+            for (unsigned i=0; i<2; i++)
+            {
+                WUState state = waitForWorkUnitToComplete(wuid, timelimit*1000, expectedStates);
+                DBGLOG("Got state: %s", getWorkunitStateStr(state));
+                if (WUStateWait == state) // already finished
+                    break;
+                else if ((INFINITE != timelimit) && (WUStateUnknown == state))
+                    throw makeStringExceptionV(0, "Query %s failed to start within specified timelimit (%u) seconds", wuid.str(), timelimit);
+                else
+                {
+                    auto it = std::find(expectedStates.begin(), expectedStates.end(), state);
+                    if (it == expectedStates.end())
+                        throw makeStringExceptionV(0, "Query %s failed, state: %s", wuid.str(), getWorkunitStateStr(state));
+                }
+                timelimit = runningTimeLimit;
+                expectedStates = { WUStateWait };
+            }
+        }
+        else
+        {        
+            VStringBuffer job("%s-%s", wuid.str(), graphName);
+            runK8sJob("thormaster", wuid, job, queryComponentConfig().getPropBool("@deleteJobs", true), { { "graphName", graphName} });
+        }        
+            
         if (workUnit->getExceptionCount())
         {
             Owned<IConstWUExceptionIterator> iter = &workUnit->getExceptions();
@@ -2293,12 +2335,16 @@ protected:
                 }
             }
         }
+        else if (WUStateFailed == state)
+            throw makeStringException(0, "Workunit failed");
+
+        setWUState(WUStateRunning);
+
 #else    
         StringAttr owner(workUnit->queryUser());
         StringAttr cluster(workUnit->queryClusterName());
 
         int priority = workUnit->getPriorityValue();
-        unsigned timelimit = workUnit->getDebugValueInt("thorConnectTimeout", defaultThorConnectTimeout);
 #ifdef _CONTAINERIZED
         StringBuffer queueName;
         queueName.append(cluster).append(".thor");

+ 5 - 3
thorlcr/master/thgraphmanager.cpp

@@ -451,6 +451,8 @@ bool CJobManager::execute(IConstWorkUnit *workunit, const char *wuid, const char
 
 void CJobManager::run()
 {
+    LOG(MCdebugProgress, thorJob, "Listening for graph");
+
     setWuid(NULL);
     StringBuffer soPath;
     globals->getProp("@query_so_dir", soPath);
@@ -1107,10 +1109,9 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
         Owned<CJobManager> jobManager = new CJobManager(logHandler);
         try
         {
-            LOG(MCdebugProgress, thorJob, "Listening for graph");
-
             if (wuid) // one-shot, quits after running
             {
+                PROGLOG("Executing: wuid=%s, graph=%s", wuid, graphName);
                 Owned<IWorkUnitFactory> factory;
                 Owned<IConstWorkUnit> workunit;
                 factory.setown(getWorkUnitFactory());
@@ -1118,9 +1119,9 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
                 SocketEndpoint dummyAgentEp;
                 jobManager->execute(workunit, wuid, graphName, dummyAgentEp);
                 IException *e = jobManager->queryExitException();
+                Owned<IWorkUnit> w = &workunit->lock();
                 if (e)
                 {
-                    Owned<IWorkUnit> w = &workunit->lock();
                     Owned<IWUException> we = w->createException();
                     we->setSeverity(SeverityInformation);
                     StringBuffer errStr;
@@ -1129,6 +1130,7 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
                     we->setExceptionSource("thormasterexception");
                     we->setExceptionCode(e->errorCode());
                 }
+                w->setState(WUStateWait);
             }  
             else
                 jobManager->run();

+ 2 - 1
thorlcr/master/thmastermain.cpp

@@ -945,7 +945,8 @@ int main( int argc, const char *argv[]  )
             PROGLOG("Registration aborted");
 #ifdef _CONTAINERIZED
         registry.clear();
-        deleteK8sJob("thorslave", cloudJobName);
+        if (globals->getPropBool("@deleteJobs", true))
+            deleteK8sJob("thorslave", cloudJobName);
         setExitCode(0);
 #endif
         LOG(MCdebugProgress, thorJob, "ThorMaster terminated OK");