Procházet zdrojové kódy

HPCC-23762 Thor linger

Allow thor to linger a configurable amount of time after it
has complete executing a graph.
The job/workflow detected that the workunit is associated with
a lingering Thor, it will communicate with it directly if it
can and avoid the latency of stopping/starting new master and
slave containers.

Also make a change to master/slave so that if containerPerAgent
is off in thor agent, then ensure master doesn't use a fix
port (akin to hthor process).

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith před 5 roky
rodič
revize
cb0363e1a8

+ 54 - 0
common/workunit/workunit.cpp

@@ -13192,6 +13192,60 @@ bool isValidMemoryValue(const char *memoryUnit)
 }
 
 #ifdef _CONTAINERIZED
+bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, const char *graphName)
+{
+    // check if lingering thor instance is up.
+
+    /* If code was dependent on reading a workunit in parallel, the whole area of
+     * workunit locking and reading will need revisiting, because at the moment the 
+     * workunit reading code does not lock at all.
+     * 
+     */
+
+    /* NB: forcing a reload here to ensure Debug values are recent.
+     * Could be improved by refreshing only the specific area of interest (i.e. Debug/ in this case).
+     * The area of persisting a non-locked connection to workunits should be resivisted..
+     */
+    workunit.forceReload();
+
+    Owned<IStringIterator> iter = &workunit.getDebugValues("thorinstance_*");
+    ForEach(*iter)
+    {
+        /* NB: Thor's set their running endpoint into Debug values of workunit
+         * Check to see if workunit has any Thor's available lingering instances.
+         */
+        SCMStringBuffer thorInstance;
+        iter->str(thorInstance);
+        SCMStringBuffer thorInstanceValue;
+        if (workunit.getDebugValueBool(thorInstance.s, false))
+        {
+            {
+                Owned<IWorkUnit> w = &workunit.lock();
+                w->setDebugValue(thorInstance.s, "0", true);
+            }
+            /* NB: there's a window where Thor could shutdown here.
+             * In that case, the sendRecv will fail and it will fall through to queueing.
+             */
+            const char *instanceName = strchr(thorInstance.str(), '_') + 1;
+
+            Owned<INode> masterNode = createINode(instanceName);
+            CMessageBuffer msg;
+            VStringBuffer jobStr("%s/%s", workunit.queryWuid(), graphName?graphName:"");
+            msg.append(jobStr);
+            if (queryWorldCommunicator().sendRecv(msg, masterNode, MPTAG_THOR, 10000))
+            {
+                bool ok;
+                msg.read(ok);
+                if (graphName) // if graphName==nullptr, then continue around to look at others
+                {
+                    if (ok)
+                        return true;
+                }
+            }
+        }
+    }
+    return false;
+}
 
 static void setResources(StringBuffer &jobYaml, const IConstWorkUnit *workunit, const char *process)
 {

+ 1 - 0
common/workunit/workunit.hpp

@@ -1692,6 +1692,7 @@ extern WORKUNIT_API bool isValidMemoryValue(const char * memoryUnit);
 inline __int64 calcCost(double ratePerHour, __int64 timeNS) { return HourToSeconds(ratePerHour) * NanoSecondsToSeconds(timeNS) * 1e6; }
 
 #ifdef _CONTAINERIZED
+extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, const char *graphName);
 extern WORKUNIT_API void deleteK8sJob(const char *componentName, const char *job);
 extern WORKUNIT_API void waitK8sJob(const char *componentName, const char *job, const char *condition=nullptr);
 extern WORKUNIT_API void launchK8sJob(const char *componentName, const char *wuid, const char *job, const std::list<std::pair<std::string, std::string>> &extraParams={});

+ 7 - 4
dockerfiles/hpcc/templates/thor.yaml

@@ -1,13 +1,10 @@
 {{ range $thor := $.Values.thor -}}
 {{- if not $thor.disabled -}}
-{{- $masterPort := $thor.masterport | default 20000 -}}
-{{- $slavePort := $thor.slaveport | default 20100 -}}
 {{- $hthorName := printf "%s-hthor" .name }}
 {{- $eclAgentName := printf "%s-agent" .name }}
 {{- $thorAgentName := printf "%s-thoragent" .name }}
 {{- $slaveName := printf "%s-slave" .name }}
 {{- $serviceName := printf "%s-svc" .name }}
-{{- $thorScope := omit . "eclagent" "thoragent" "hthor" }}
 {{- $eclAgentDefaults := dict "name" $eclAgentName "replicas" 1 }}
 {{- $eclAgentScope := .eclagent | mergeOverwrite $eclAgentDefaults | default $eclAgentDefaults }}
 {{- $agentAppType := $eclAgentScope.type | default "hthor" }}
@@ -15,6 +12,8 @@
 {{- $thorAgentScope := .thoragent | mergeOverwrite $thorAgentDefaults | default $thorAgentDefaults }}
 {{- $hthorDefaults := dict "name" $hthorName }}
 {{- $hthorScope := .hthor | mergeOverwrite $hthorDefaults | default $hthorDefaults }}
+{{- $thorScopeStd := omit . "eclagent" "thoragent" "hthor" }}
+{{- $thorScope := $thorAgentScope.containerPerAgent | ternary ($thorScopeStd) ($thorScopeStd | mergeOverwrite (dict "masterport" 0 "slaveport" 0)) }}
 apiVersion: apps/v1
 kind: Deployment
 metadata:
@@ -102,9 +101,13 @@ data:
     eclagent:
 {{ toYaml $thorAgentScope | indent 6 }}
       type: thor
+{{- if not $thorAgentScope.containerPerAgent }}
+    thor:
+{{ toYaml $thorScope | indent 6 }}
+{{- end }}
     global:
 {{ include "hpcc.generateGlobalConfigMap" $ | indent 6 }}
-{{- if $eclAgentScope.containerPerAgent }}
+{{- if $thorAgentScope.containerPerAgent }}
 
   {{ $agentAppType }}-jobspec.yaml: |
     apiVersion: batch/v1

+ 6 - 0
ecl/eclagent/eclagent.cpp

@@ -33,6 +33,7 @@
 #include "portlist.h"
 #include "dalienv.hpp"
 #include "daaudit.hpp"
+#include "workunit.hpp"
 
 #include "hqlplugins.hpp"
 #include "eclrtl_imp.hpp"
@@ -1901,6 +1902,11 @@ void EclAgent::doProcess()
         logException((IException *) NULL);
     }
 
+#ifdef _CONTAINERIZED
+    // signal to any lingering Thor's that job is complete and they can quit before timeout.
+    executeGraphOnLingeringThor(*wuRead, nullptr);
+#endif
+
     DBGLOG("Process complete");
     // Add some timing stats
     bool deleteJobTemps = true;

+ 18 - 9
ecl/eclagent/eclgraph.cpp

@@ -1519,13 +1519,17 @@ void EclAgent::executeThorGraph(const char * graphName)
     WUState state = WUStateUnknown;
     if (agentTopology->hasProp("@queue"))
     {
-        VStringBuffer queueName("%s.thor", agentTopology->queryProp("@queue"));
-        DBGLOG("Queueing wuid=%s, graph=%s, on queue=%s, timelimit=%u seconds", wuid.str(), graphName, queueName.str(), timelimit);
-        Owned<IJobQueue> queue = createJobQueue(queueName.str());
-        queue->connect(false);
-        VStringBuffer jobName("%s/%s", wuid.get(), graphName);
-        IJobQueueItem *item = createJobQueueItem(jobName);
-        queue->enqueue(item);
+        if (executeGraphOnLingeringThor(*queryWorkUnit(), graphName))
+            PROGLOG("Existing lingering Thor handled graph: %s", graphName);
+        else
+        {
+            VStringBuffer queueName("%s.thor", agentTopology->queryProp("@queue"));
+            DBGLOG("Queueing wuid=%s, graph=%s, on queue=%s, timelimit=%u seconds", wuid.str(), graphName, queueName.str(), timelimit);
+            Owned<IJobQueue> queue = createJobQueue(queueName.str());
+            VStringBuffer jobName("%s/%s", wuid.get(), graphName);
+            IJobQueueItem *item = createJobQueueItem(jobName);
+            queue->enqueue(item);
+        }
 
         // NB: overall max runtime if guillotine set handled by abortmonitor
         unsigned runningTimeLimit = queryWorkUnit()->getDebugValueInt("maxRunTime", 0);
@@ -1572,8 +1576,13 @@ void EclAgent::executeThorGraph(const char * graphName)
             }
         }
     }
-    else if (WUStateFailed == state)
-        throw makeStringException(0, "Workunit failed");
+    else
+    {
+        WorkunitUpdate w = updateWorkUnit();
+        WUState state = w->getState();
+        if (WUStateFailed == state)
+            throw makeStringException(0, "Workunit failed");
+    }
 
     setRunning();
     unlockWorkUnit();

+ 23 - 10
roxie/ccd/ccdcontext.cpp

@@ -2280,16 +2280,19 @@ protected:
         // NB: If a single Eclagent were to want to launch >1 Thor, then the threading could be in the workflow above this call.
         setWUState(WUStateBlocked);
 
-        WUState state = WUStateUnknown;
         if (queryComponentConfig().hasProp("@queue"))
         {
-            VStringBuffer queueName("%s.thor", queryComponentConfig().queryProp("@queue"));
-            DBGLOG("Queueing wuid=%s, graph=%s, on queue=%s, timelimit=%u seconds", wuid.str(), graphName, queueName.str(), timelimit);
-            Owned<IJobQueue> queue = createJobQueue(queueName.str());
-            queue->connect(false);
-            VStringBuffer jobName("%s/%s", wuid.get(), graphName);
-            IJobQueueItem *item = createJobQueueItem(jobName);
-            queue->enqueue(item);
+            if (executeGraphOnLingeringThor(*workUnit, graphName))
+                PROGLOG("Existing lingering Thor handled graph: %s", graphName);
+            else
+            {
+                VStringBuffer queueName("%s.thor", queryComponentConfig().queryProp("@queue"));
+                DBGLOG("Queueing wuid=%s, graph=%s, on queue=%s, timelimit=%u seconds", wuid.str(), graphName, queueName.str(), timelimit);
+                Owned<IJobQueue> queue = createJobQueue(queueName.str());
+                VStringBuffer jobName("%s/%s", wuid.get(), graphName);
+                IJobQueueItem *item = createJobQueueItem(jobName);
+                queue->enqueue(item);
+            }
 
             unsigned runningTimeLimit = workUnit->getDebugValueInt("maxRunTime", 0);
             runningTimeLimit = runningTimeLimit ? runningTimeLimit : INFINITE;
@@ -2335,8 +2338,13 @@ protected:
                 }
             }
         }
-        else if (WUStateFailed == state)
-            throw makeStringException(0, "Workunit failed");
+        else
+        {
+            WorkunitUpdate w(&workUnit->lock());
+            WUState state = w->getState();
+            if (WUStateFailed == state)
+                throw makeStringException(0, "Workunit failed");
+        }
 
         setWUState(WUStateRunning);
 
@@ -3088,6 +3096,11 @@ public:
             debugContext->debugTerminate();
         if (workUnit)
         {
+#ifdef _CONTAINERIZED
+            // signal to any lingering Thor's that job is complete and they can quit before timeout.
+            executeGraphOnLingeringThor(*workUnit, nullptr);
+#endif
+
             if (options.failOnLeaks && !failed)
             {
                 cleanupGraphs();

+ 88 - 24
thorlcr/master/thgraphmanager.cpp

@@ -1088,11 +1088,13 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
     {
         Owned<CDaliConnectionValidator> daliConnectValidator = new CDaliConnectionValidator(globals->getPropInt("@verifyDaliConnectionInterval", DEFAULT_VERIFYDALI_POLL));
         Owned<ILargeMemLimitNotify> notify;
-        if (multiThorMemoryThreshold) {
+        if (multiThorMemoryThreshold)
+        {
             StringBuffer ngname;
             if (!globals->getProp("@multiThorResourceGroup",ngname))
                 globals->getProp("@nodeGroup",ngname);
-            if (ngname.length()) {
+            if (ngname.length())
+            {
                 notify.setown(createMultiThorResourceMutex(ngname.str(),serverStatus));
                 setMultiThorMemoryNotify(multiThorMemoryThreshold,notify);
                 PROGLOG("Multi-Thor resource limit for %s set to %" I64F "d",ngname.str(),(__int64)multiThorMemoryThreshold);
@@ -1109,31 +1111,93 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
         Owned<CJobManager> jobManager = new CJobManager(logHandler);
         try
         {
-            if (wuid) // one-shot, quits after running
+#ifndef _CONTAINERIZED
+            jobManager->run();
+#else
+            unsigned lingerPeriod = globals->getPropInt("@lingerPeriod", DEFAULT_LINGER_SECS*1000);
+            StringBuffer instance("thorinstance_");
+            queryMyNode()->endpoint().getUrlStr(instance);
+            StringBuffer currentGraphName(graphName);
+
+            while (true)
             {
-                PROGLOG("Executing: wuid=%s, graph=%s", wuid, graphName);
-                Owned<IWorkUnitFactory> factory;
-                Owned<IConstWorkUnit> workunit;
-                factory.setown(getWorkUnitFactory());
-                workunit.setown(factory->openWorkUnit(wuid));
-                SocketEndpoint dummyAgentEp;
-                jobManager->execute(workunit, wuid, graphName, dummyAgentEp);
-                IException *e = jobManager->queryExitException();
-                Owned<IWorkUnit> w = &workunit->lock();
-                if (e)
+                PROGLOG("Executing: wuid=%s, graph=%s", wuid, currentGraphName.str());
+
                 {
-                    Owned<IWUException> we = w->createException();
-                    we->setSeverity(SeverityInformation);
-                    StringBuffer errStr;
-                    e->errorMessage(errStr);
-                    we->setExceptionMessage(errStr);
-                    we->setExceptionSource("thormasterexception");
-                    we->setExceptionCode(e->errorCode());
+                    Owned<IWorkUnitFactory> factory;
+                    Owned<IConstWorkUnit> workunit;
+                    factory.setown(getWorkUnitFactory());
+                    workunit.setown(factory->openWorkUnit(wuid));
+                    SocketEndpoint dummyAgentEp;
+                    jobManager->execute(workunit, wuid, currentGraphName, dummyAgentEp);
+                    IException *e = jobManager->queryExitException();
+                    Owned<IWorkUnit> w = &workunit->lock();
+                    if (e)
+                    {
+                        Owned<IWUException> we = w->createException();
+                        we->setSeverity(SeverityInformation);
+                        StringBuffer errStr;
+                        e->errorMessage(errStr);
+                        we->setExceptionMessage(errStr);
+                        we->setExceptionSource("thormasterexception");
+                        we->setExceptionCode(e->errorCode());
+
+                        w->setState(WUStateWait);
+                        break;
+                    }
+
+                    if (lingerPeriod)
+                        w->setDebugValue(instance, "1", true);
+
+                    w->setState(WUStateWait);
                 }
-                w->setState(WUStateWait);
-            }  
-            else
-                jobManager->run();
+                currentGraphName.clear();
+
+                if (lingerPeriod)
+                {
+                    CMessageBuffer msg;
+                    CTimeMon timer(lingerPeriod);
+                    bool handled = false;
+                    unsigned remaining;
+                    while (!timer.timedout(&remaining))
+                    {
+                        PROGLOG("Lingering time left: %.2f", ((float)remaining)/1000);
+                        if (!queryWorldCommunicator().recv(msg, NULL, MPTAG_THOR, nullptr, remaining))
+                            break;
+                        StringBuffer next;
+                        msg.read(next);
+
+                        // validate
+                        StringArray sArray;
+                        sArray.appendList(next, "/");
+                        if (2 == sArray.ordinality() && streq(sArray.item(0), wuid))
+                        {
+                            currentGraphName.set(sArray.item(1));
+                            // NB: agent could send empty graphName to terminate early before timeout
+                            msg.clear().append(true);
+                            if (queryWorldCommunicator().reply(msg, 60*1000)) // should be quick!
+                                handled = true;
+                            else
+                                currentGraphName.clear();
+                            break;
+                        }
+                        // reject/ignore duff message.
+                    }
+                    if (!handled) // i.e. if timedout of comms issue, clear lingering flag before exiting
+                    {
+                        // remove lingering instance from workunit
+                        Owned<IWorkUnitFactory> factory;
+                        Owned<IConstWorkUnit> workunit;
+                        factory.setown(getWorkUnitFactory());
+                        workunit.setown(factory->openWorkUnit(wuid));
+                        Owned<IWorkUnit> w = &workunit->lock();
+                        w->setDebugValue(instance, "0", true);
+                    }
+                }
+                if (0 == currentGraphName.length())
+                    break;
+            }
+#endif
         }
         catch (IException *e)
         {

+ 18 - 14
thorlcr/master/thmastermain.cpp

@@ -588,7 +588,7 @@ int main( int argc, const char *argv[]  )
     InitModuleObjects();
     NoQuickEditSection xxx;
     {
-        globals.setown(loadConfiguration(defaultYaml, argv, "thor", "THOR", "thor.xml", nullptr));
+        globals.setown(loadConfiguration(thorDefaultConfigYaml, argv, "thor", "THOR", "thor.xml", nullptr));
     }
     setStatisticsComponentName(SCTthor, globals->queryProp("@name"), true);
 
@@ -615,10 +615,6 @@ int main( int argc, const char *argv[]  )
     if (0 == thorEp.port)
         thorEp.port = globals->getPropInt("@masterport", THOR_BASE_PORT);
 
-     // both same
-    setMasterPortBase(thorEp.port);
-    setMachinePortBase(thorEp.port);
-
     // Remove sentinel asap
     Owned<IFile> sentinelFile = createSentinelTarget();
     removeSentinelFile(sentinelFile);
@@ -650,14 +646,22 @@ int main( int argc, const char *argv[]  )
         Owned<IGroup> serverGroup = createIGroup(daliServer.str(), DALI_SERVER_PORT);
 
         unsigned retry = 0;
-        for (;;) {
-            try {
-                unsigned port = getFixedPort(TPORT_mp);
-                LOG(MCdebugProgress, thorJob, "calling initClientProcess Port %d", port);
-                initClientProcess(serverGroup, DCR_ThorMaster, port);
+        for (;;)
+        {
+            try
+            {
+                LOG(MCdebugProgress, thorJob, "calling initClientProcess %d", thorEp.port);
+                initClientProcess(serverGroup, DCR_ThorMaster, thorEp.port);
+                if (0 == thorEp.port)
+                    thorEp.port = queryMyNode()->endpoint().port;
+                // both same
+                setMasterPortBase(thorEp.port);
+                setMachinePortBase(thorEp.port);
+
                 break;
             }
-            catch (IJSOCK_Exception *e) { 
+            catch (IJSOCK_Exception *e)
+            { 
                 if ((e->errorCode()!=JSOCKERR_port_in_use))
                     throw;
                 FLLOG(MCexception(e), thorJob, e,"InitClientProcess");
@@ -884,10 +888,10 @@ int main( int argc, const char *argv[]  )
             throw makeStringException(0, "missing --graphName");
         cloudJobName.appendf("%s-%s", workunit, graphName);
 
-        StringBuffer myIp;
-        queryHostIP().getIpText(myIp);
+        StringBuffer myEp;
+        queryMyNode()->endpoint().getUrlStr(myEp);
 
-        launchK8sJob("thorslave", workunit, cloudJobName, { { "graphName", graphName}, { "master", myIp.str() } });
+        launchK8sJob("thorslave", workunit, cloudJobName, { { "graphName", graphName}, { "master", myEp.str() } });
 #else
         unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
         unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);

+ 12 - 0
thorlcr/shared/thor.hpp

@@ -54,5 +54,17 @@ inline rowidx_t validRIDX(T X)
     #define CATCHALL DummyCatchAll
 #endif
 
+constexpr const char * thorDefaultConfigYaml = R"!!(
+version: 1.0
+thor:
+  daliServers: dali
+  watchdogEnabled: true
+  watchdogProgressEnabled: true
+  cost:
+    thor:
+      master: "0.000002"
+      slave: "0.00001"
+)!!";
+
 
 #endif

+ 8 - 2
thorlcr/slave/thslavemain.cpp

@@ -349,7 +349,11 @@ int main( int argc, const char *argv[]  )
             return 1;
         }
         cmdArgs = argv+1;
+#ifdef _CONTAINERIZED
+        globals.setown(loadConfiguration(thorDefaultConfigYaml, argv, "thor", "THOR", nullptr, nullptr));
+#else
         loadArgsIntoConfiguration(globals, cmdArgs);
+#endif
 
         const char *master = globals->queryProp("@master");
         if (!master)
@@ -379,12 +383,14 @@ int main( int argc, const char *argv[]  )
         // TBD: use new config/init system for generic handling of init settings vs command line overrides
         if (0 == slfEp.port) // assume default from config if not on command line
             slfEp.port = globals->getPropInt("@slaveport", THOR_BASESLAVE_PORT);
+
+        startMPServer(DCR_ThorSlave, slfEp.port, false);
+        if (0 == slfEp.port)
+            slfEp.port = queryMyNode()->endpoint().port;
         setMachinePortBase(slfEp.port);
 
         setSlaveAffinity(globals->getPropInt("@slaveprocessnum"));
 
-        startMPServer(DCR_ThorSlave, getFixedPort(TPORT_mp), false);
-
         if (globals->getPropBool("@MPChannelReconnect"))
             getMPServer()->setOpt(mpsopt_channelreopen, "true");
 #ifdef USE_MP_LOG

+ 1 - 0
thorlcr/thorutil/thormisc.hpp

@@ -321,6 +321,7 @@ public:
 #define DEFAULT_THORSLAVEPORT 20100
 #define DEFAULT_SLAVEPORTINC 20
 #define DEFAULT_QUERYSO_LIMIT 10
+#define DEFAULT_LINGER_SECS 10
 
 class graph_decl CFifoFileCache : public CSimpleInterface
 {