Forráskód Böngészése

HPCC-23606 Limit number of active eclagent tasks per agentexec

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 5 éve
szülő
commit
e5e5c07480

+ 31 - 1
common/workunit/workunit.cpp

@@ -13809,6 +13809,27 @@ bool isValidMemoryValue(const char *memoryUnit)
 }
 
 #ifdef _CONTAINERIZED
+
+static void setResources(StringBuffer &jobYaml, const IConstWorkUnit *workunit, const char *process)
+{
+    StringBuffer s;
+    unsigned memRequest = workunit->getDebugValueInt(s.clear().appendf("%s-memRequest", process), 0);
+    unsigned memLimit = workunit->getDebugValueInt(s.clear().appendf("%s-memLimit", process), 0);
+    if (memLimit && memLimit < memRequest)
+        memLimit = memRequest;
+    if (memRequest)
+        jobYaml.replaceString("#request-memory", s.clear().appendf("memory: \"%uMi\"", memRequest));
+    if (memLimit)
+        jobYaml.replaceString("#limit-memory", s.clear().appendf("memory: \"%uMi\"", memLimit));
+    unsigned cpuRequest = workunit->getDebugValueInt(s.clear().appendf("%s-cpuRequest", process), 0);
+    unsigned cpuLimit = workunit->getDebugValueInt(s.clear().appendf("%s-cpuLimit", process), 0);
+    if (cpuLimit && cpuLimit < cpuRequest)
+        cpuLimit = cpuRequest;
+    if (cpuRequest)
+        jobYaml.replaceString("#request-cpu", s.clear().appendf("cpu: \"%um\"", cpuRequest));
+    if (cpuLimit)
+        jobYaml.replaceString("#limit-cpu", s.clear().appendf("cpu: \"%um\"", cpuLimit));
+}
 void runK8sJob(const char *name, const char *wuid)
 {
     VStringBuffer jobname("%s-%s", name, wuid);
@@ -13818,6 +13839,15 @@ void runK8sJob(const char *name, const char *wuid)
     jobYaml.loadFile("/etc/config/jobspec.yaml", false);
     jobYaml.replaceString("%jobname", jobname.str());
     jobYaml.replaceString("%args", args.str());
+
+    Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
+    if (factory)
+    {
+        Owned<IConstWorkUnit> workunit = factory->openWorkUnit(wuid);
+        if (workunit)
+            setResources(jobYaml, workunit, name);
+    }
+
     StringBuffer output, error;
     unsigned ret = runExternalCommand(output, error, "kubectl apply -f -", jobYaml.str());
     DBGLOG("kubectl output: %s", output.str());
@@ -13828,7 +13858,7 @@ void runK8sJob(const char *name, const char *wuid)
         DBGLOG("Using job yaml %s", jobYaml.str());
         throw makeStringException(0, "Failed to start kubectl job");
     }
-
+    // MORE - blocks indefinitely here if you request too many resources
     VStringBuffer waitJob("kubectl wait --for=condition=complete --timeout=10h job/%s", jobname.str());  // MORE - make timeout configurable
     ret = runExternalCommand(output.clear(), error.clear(), waitJob.str(), nullptr);
     DBGLOG("kubectl wait output: %s", output.str());

+ 7 - 0
dockerfiles/hpcc/templates/eclagent.yaml

@@ -37,6 +37,13 @@ spec:
           containers:
           - name: %jobname
             image: {{ include "hpcc.utils.imageName" (dict "root" $ "imagename" "eclagent") }}
+            resources:
+              requests:
+                #request-memory
+                #request-cpu
+              limits:
+                #limit-memory
+                #limit-cpu
             command: [
                         "eclagent",
                         {{ include "hpcc.utils.configArg" . }},

+ 7 - 0
dockerfiles/hpcc/templates/eclccserver.yaml

@@ -41,6 +41,13 @@ spec:
           containers:
           - name: %jobname
             image: {{ include "hpcc.utils.imageName" (dict "root" $ "imagename" "eclccserver") }}
+            resources:
+              requests:
+                #request-memory
+                #request-cpu
+              limits:
+                #limit-memory
+                #limit-cpu
             command: [
                         "eclccserver",
                         {{ include "hpcc.utils.configArg" . }},

+ 1 - 0
dockerfiles/hpcc/values.yaml

@@ -32,6 +32,7 @@ eclagent:
     version: 1.0
     EclAgent:
       containerPerAgent: true
+      maxActive: 2
     
 eclccserver:
 - name: myeclccserver

+ 7 - 1
dockerfiles/platform-build-incremental/Dockerfile

@@ -22,7 +22,13 @@
 ARG PREV_LABEL
 FROM hpccsystems/platform-build:${PREV_LABEL}
 
-ARG USER=richardkchapman
+RUN apt-get install -y \
+    dnsutils \
+    nano 
+
+RUN curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.17.0/bin/linux/amd64/kubectl && chmod +x ./kubectl && mv ./kubectl /usr/local/bin
+
+ARG USER
 WORKDIR /hpcc-dev/HPCC-Platform
 RUN if ! git config remote.${USER}.url > /dev/null ; then git remote add ${USER} https://github.com/${USER}/HPCC-Platform.git ; fi
 

+ 0 - 6
dockerfiles/platform-core-debug/Dockerfile

@@ -20,12 +20,6 @@
 ARG BUILD_LABEL
 FROM hpccsystems/platform-build:${BUILD_LABEL}
 
-RUN apt-get install -y \
-    dnsutils \
-    nano 
-
-RUN curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.17.0/bin/linux/amd64/kubectl && chmod +x ./kubectl && mv ./kubectl /usr/local/bin
-
 RUN groupadd -g 1000 hpcc
 RUN useradd -s /bin/bash -r -N -c "hpcc runtime User" -u 999 -g hpcc hpcc
 RUN passwd -l hpcc 

+ 0 - 1
ecl/agentexec/CMakeLists.txt

@@ -26,7 +26,6 @@ project( agentexec )
 
 set (    SRCS 
          agentexec.cpp 
-         agentexec.hpp 
     )
 
 include_directories ( 

+ 122 - 90
ecl/agentexec/agentexec.cpp

@@ -20,81 +20,62 @@
 #include "jmisc.hpp"
 #include "jlog.hpp"
 #include "jfile.hpp"
-#include "agentexec.hpp"
 #include "jutil.hpp"
 #include "eclagent.hpp"
 
-Owned<CEclAgentExecutionServer> execSvr = NULL;
-
-
-//---------------------------------------------------------------------------------
-
-CEclAgentExecutionServer::CEclAgentExecutionServer(IPropertyTree *_config) : Thread("Workunit Execution Server"), config(_config)
-{
-    started = false;
-}
-
-CEclAgentExecutionServer::~CEclAgentExecutionServer()
+class CEclAgentExecutionServer : public CInterfaceOf<IThreadFactory>
 {
-    if (started)
-        stop();
-    if (queue)
-        queue.getClear();
-}
+public:
+    CEclAgentExecutionServer(IPropertyTree *config);
+    ~CEclAgentExecutionServer();
+
+    int run();
+    virtual IPooledThread *createNew() override;
+private:
+    bool executeWorkunit(const char * wuid);
+
+    const char *agentName;
+    const char *daliServers;
+    Owned<IJobQueue> queue;
+    Linked<IPropertyTree> config;
+#ifdef _CONTAINERIZED
+    Owned<IThreadPool> pool;
+#endif
+};
 
+//---------------------------------------------------------------------------------
 
-void CEclAgentExecutionServer::start()
+CEclAgentExecutionServer::CEclAgentExecutionServer(IPropertyTree *_config) : config(_config)
 {
-    if (started)
-    {
-        DBGLOG("START called when already started\n");
-        assert(false);
-    }
-
-    {
-        //Build logfile from component properties settings
-        Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(config, "eclagent");
-        lf->setCreateAliasFile(false);
-        lf->beginLogging();
-        PROGLOG("Logging to %s",lf->queryLogFileSpec());
-    }
-
-    //get name of workunit job queue
-    StringBuffer sb;
-    config->getProp("@name", sb.clear());
-    agentName.set(sb);
-    if (!agentName.length())
-    {
-        OERRLOG("'name' not specified in config file\n");
-        throwUnexpected();
-    }
+    //Build logfile from component properties settings
+    Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(config, "eclagent");
+    lf->setCreateAliasFile(false);
+    lf->beginLogging();
+    PROGLOG("Logging to %s",lf->queryLogFileSpec());
+
+    agentName = config->queryProp("@name");
+    assertex(agentName);
     setStatisticsComponentName(SCThthor, agentName, true);
 
-    //get dali server(s)
-    config->getProp("@daliServers", daliServers);
-    if (!daliServers.length())
-    {
-        OERRLOG("'daliServers' not specified in config file\n");
-        throwUnexpected();
-    }
-
-    started = true;
-    Thread::start();
-    Thread::join();
+    daliServers = config->queryProp("@daliServers");
+    assertex(daliServers);
+#ifdef _CONTAINERIZED
+    unsigned poolSize = config->getPropInt("@maxActive", 100);
+    pool.setown(createThreadPool("agentPool", this, NULL, poolSize, INFINITE));
+#endif
 }
 
-//---------------------------------------------------------------------------------
 
-void CEclAgentExecutionServer::stop()
+CEclAgentExecutionServer::~CEclAgentExecutionServer()
 {
-    if (started)
-    {
-        started = false;
-        if (queue)
-            queue->cancelAcceptConversation();
-    }
+#ifdef _CONTAINERIZED
+    pool->joinAll(false, INFINITE);
+#endif
+    if (queue)
+        queue->cancelAcceptConversation();
 }
 
+
 //---------------------------------------------------------------------------------
 
 int CEclAgentExecutionServer::run()
@@ -129,8 +110,16 @@ int CEclAgentExecutionServer::run()
 
     try 
     {
-        while (started)
+        while (true)
         {
+#ifdef _CONTAINERIZED
+            if (!pool->waitAvailable(10000))
+            {
+                if (config->getPropInt("@traceLevel", 0) > 2)
+                    DBGLOG("Blocked for 10 seconds waiting for an available agent slot");
+                continue;
+            }
+#endif
             PROGLOG("AgentExec: Waiting on queue(s) '%s'", queueNames.str());
             Owned<IJobQueueItem> item = queue->dequeue();
             if (item.get())
@@ -153,10 +142,7 @@ int CEclAgentExecutionServer::run()
             }
             else
             {
-                if (started)
-                   IERRLOG("Unexpected dequeue of bogus job queue item, exiting agentexec");
-                removeSentinelFile(sentinelFile);//no reason to restart
-                assert(!started);
+                removeSentinelFile(sentinelFile); // no reason to restart
                 break;
             }
         }
@@ -184,15 +170,75 @@ int CEclAgentExecutionServer::run()
 
 //---------------------------------------------------------------------------------
 
-int CEclAgentExecutionServer::executeWorkunit(const char * wuid)
-{
 #ifdef _CONTAINERIZED
-    if (queryComponentConfig().getPropBool("@containerPerAgent", false))  // MORE - make this a per-workunit setting?
+class WaitThread : public CInterfaceOf<IPooledThread>
+{
+public:
+    WaitThread(const char *_dali) : dali(_dali)
     {
-        runK8sJob("eclagent", wuid);
-        return true;
     }
+    virtual void init(void *param) override
+    {
+        wuid.set((const char *) param);
+    }
+    virtual bool stop() override
+    {
+        return false;
+    }
+    virtual bool canReuse() const override
+    {
+        return false;
+    }
+    virtual void threadmain() override
+    {
+        try
+        {
+            if (queryComponentConfig().getPropBool("@containerPerAgent", false))  // MORE - make this a per-workunit setting?
+            {
+                runK8sJob("eclagent", wuid);
+            }
+            else
+            {
+                VStringBuffer exec("eclagent --workunit=%s --daliServers=%s", wuid.str(), dali.str());
+                Owned<IPipeProcess> pipe = createPipeProcess();
+                if (!pipe->run("eclagent", exec.str(), ".", false, true, false, 0, false))
+                    throw makeStringExceptionV(0, "Failed to run %s", exec.str());
+            }
+        }
+        catch (IException *E)
+        {
+            EXCLOG(E);
+            E->Release();
+            Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
+            Owned<IWorkUnit> workunit = factory->updateWorkUnit(wuid);
+            if (workunit)
+            {
+                workunit->setState(WUStateFailed);
+                workunit->commit();
+            }
+        }
+    }
+private:
+    StringAttr wuid;
+    StringAttr dali;
+};
+#endif
+
+IPooledThread *CEclAgentExecutionServer::createNew()
+{
+#ifdef _CONTAINERIZED
+    return new WaitThread(daliServers);
+#else
+    throwUnexpected();
 #endif
+}
+
+bool CEclAgentExecutionServer::executeWorkunit(const char * wuid)
+{
+#ifdef _CONTAINERIZED
+    pool->start((void *) wuid);
+    return true;
+#else
     //build eclagent command line
     StringBuffer command;
 
@@ -234,17 +280,7 @@ int CEclAgentExecutionServer::executeWorkunit(const char * wuid)
     }
 
     return success && runcode == 0;
-}
-
-//---------------------------------------------------------------------------------
-
-bool ControlHandler()
-{
-    if (execSvr)
-    {
-        execSvr->stop();
-    }
-    return false;
+#endif
 }
 
 //---------------------------------------------------------------------------------
@@ -264,8 +300,6 @@ int main(int argc, const char *argv[])
 #endif
     InitModuleObjects();
 
-    addAbortHandler(ControlHandler);
-
     Owned<IPropertyTree> config;
     try
     {
@@ -277,23 +311,21 @@ int main(int argc, const char *argv[])
         return 1;
     }
 
+    int retcode = 0;
     try
     {
-        execSvr.setown(new CEclAgentExecutionServer(config));
-        execSvr->start();
+        CEclAgentExecutionServer server(config);
+        server.run();
     } 
     catch (...)
     {
         printf("Unexpected error running agentexec server\r\n");
-    }
-    if (execSvr)
-    {
-        execSvr->stop();
+        retcode = 1;
     }
 
     closedownClientProcess();
     releaseAtoms();
     ExitModuleObjects();
 
-    return 0;
+    return retcode;
 }

+ 0 - 46
ecl/agentexec/agentexec.hpp

@@ -1,46 +0,0 @@
-/*##############################################################################
-
-    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
-
-    Licensed under the Apache License, Version 2.0 (the "License");
-    you may not use this file except in compliance with the License.
-    You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-############################################################################## */
-#ifndef AGENTEXEC_SERVER_HPP
-#define AGENTEXEC_SERVER_HPP
-
-//---------------------------------------------------------------------------------
-//  Reads various customizeable properties from agentexec.xml configuration file,
-//  then listens to the dali queue specifed in that file. Reads workunit IDs from 
-//  that queue, and executes eclagent.exe to process those workunits
-//---------------------------------------------------------------------------------
-class CEclAgentExecutionServer : public Thread
-{
-public:
-    CEclAgentExecutionServer(IPropertyTree *config);
-    ~CEclAgentExecutionServer();
-
-    void start();
-    void stop();
-
-private:
-    int run();
-    int executeWorkunit(const char * wuid);
-
-    //attributes
-    bool started;
-    StringAttr agentName;
-    Owned<IJobQueue> queue;
-    StringBuffer daliServers;
-    Linked<IPropertyTree> config;
-};
-
-#endif