Browse Source

HPCC-25756 Auto EclccServer queue refresh

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 4 years ago
parent
commit
595dacaf97
3 changed files with 130 additions and 54 deletions
  1. 97 39
      ecl/eclccserver/eclccserver.cpp
  2. 32 14
      esp/smc/SMCLib/TpWrapper.cpp
  3. 1 1
      helm/hpcc/templates/eclccserver.yaml

+ 97 - 39
ecl/eclccserver/eclccserver.cpp

@@ -870,6 +870,43 @@ static void removePrecompiledHeader()
 // Class EclccServer manages a pool of compile threads
 //------------------------------------------------------------------------------------------------------------------
 
+static StringBuffer &getQueues(StringBuffer &queueNames)
+{
+    Owned<IPropertyTree> config = getComponentConfig();
+#ifdef _CONTAINERIZED
+    bool filtered = false;
+    std::unordered_map<std::string, bool> listenQueues;
+    Owned<IPTreeIterator> listening = config->getElements("listen");
+    ForEach (*listening)
+    {
+        const char *lq = listening->query().queryProp(".");
+        if (lq)
+        {
+            listenQueues[lq] = true;
+            filtered = true;
+        }
+    }
+    Owned<IPTreeIterator> queues = config->getElements("queues");
+    ForEach(*queues)
+    {
+        IPTree &queue = queues->query();
+        const char *qname = queue.queryProp("@name");
+        if (!filtered || listenQueues.count(qname))
+        {
+            if (queueNames.length())
+                queueNames.append(",");
+            getClusterEclCCServerQueueName(queueNames, qname);
+        }
+    }
+#else
+    const char * processName = config->queryProp("@name");
+    SCMStringBuffer scmQueueNames;
+    getEclCCServerQueueNames(scmQueueNames, processName);
+    queueNames.append(scmQueueNames.str());
+#endif
+    return queueNames;
+}
+
 class EclccServer : public CInterface, implements IThreadFactory, implements IAbortHandler
 {
     StringAttr queueNames;
@@ -881,36 +918,79 @@ class EclccServer : public CInterface, implements IThreadFactory, implements IAb
     bool running;
     CSDSServerStatus serverstatus;
     Owned<IJobQueue> queue;
+    CriticalSection queueUpdateCS;
+    StringAttr updatedQueueNames;
+    unsigned reloadConfigCBId = 0;
+
 
+    void configUpdate()
+    {
+        StringBuffer newQueueNames;
+        getQueues(newQueueNames);
+        if (!newQueueNames.length())
+            ERRLOG("No queues found to listen on");
+        Linked<IJobQueue> currentQueue;
+        {
+            CriticalBlock b(queueUpdateCS);
+            if (strsame(queueNames, newQueueNames))
+                return;
+            updatedQueueNames.set(newQueueNames);
+            currentQueue.set(queue);
+            PROGLOG("Updating queue due to queue names change from '%s' to '%s'", queueNames.str(), newQueueNames.str());
+        }
+        if (currentQueue)
+            currentQueue->cancelAcceptConversation();
+    }
 public:
     IMPLEMENT_IINTERFACE;
     EclccServer(const char *_queueName, unsigned _poolSize)
-        : queueNames(_queueName), poolSize(_poolSize), serverstatus("ECLCCserver")
+        : updatedQueueNames(_queueName), poolSize(_poolSize), serverstatus("ECLCCserver")
     {
         threadsActive = 0;
         running = false;
         pool.setown(createThreadPool("eclccServerPool", this, NULL, poolSize, INFINITE));
         serverstatus.queryProperties()->setProp("@cluster", getComponentConfigSP()->queryProp("@name"));
-        serverstatus.queryProperties()->setProp("@queue", queueNames.get());
         serverstatus.commitProperties();
+        reloadConfigCBId = installConfigUpdateHook(std::bind(&EclccServer::configUpdate, this));
     }
 
     ~EclccServer()
     {
+        if (reloadConfigCBId)
+            removeConfigUpdateHook(reloadConfigCBId);
         pool->joinAll(false, INFINITE);
     }
 
     void run()
     {
-        DBGLOG("eclccServer (%d threads) waiting for requests on queue(s) %s", poolSize, queueNames.get());
-        queue.setown(createJobQueue(queueNames.get()));
-        queue->connect(false);
         running = true;
         LocalIAbortHandler abortHandler(*this);
         while (running)
         {
             try
             {
+                bool newQueues = false;
+                {
+                    CriticalBlock b(queueUpdateCS);
+                    if (updatedQueueNames)
+                    {
+                        queueNames.set(updatedQueueNames);
+                        updatedQueueNames.clear();
+                        queue.clear();
+                        queue.setown(createJobQueue(queueNames.get()));
+                        newQueues = true;
+                    }
+                    // onAbort could have triggered before or during the above switch, if so, we do no want to connect/block on new queue
+                    if (!running)
+                        break;
+                }
+                if (newQueues)
+                {
+                    queue->connect(false);
+                    serverstatus.queryProperties()->setProp("@queue", queueNames.get());
+                    serverstatus.commitProperties();
+                    DBGLOG("eclccServer (%d threads) waiting for requests on queue(s) %s", poolSize, queueNames.get());
+                }
                 if (!pool->waitAvailable(10000))
                 {
                     if (getComponentConfigSP()->getPropInt("@traceLevel", 0) > 2)
@@ -963,8 +1043,14 @@ public:
     virtual bool onAbort() 
     {
         running = false;
-        if (queue)
-            queue->cancelAcceptConversation();
+        Linked<IJobQueue> currentQueue;
+        {
+            CriticalBlock b(queueUpdateCS);
+            if (queue)
+                currentQueue.set(queue);
+        }
+        if (currentQueue)
+            currentQueue->cancelAcceptConversation();
         return false;
     }
 };
@@ -1092,42 +1178,14 @@ int main(int argc, const char *argv[])
                 startPerformanceMonitor(optMonitorInterval*1000, PerfMonStandard, nullptr);
 #endif
 
-#ifdef _CONTAINERIZED
-            queryCodeSigner().initForContainer();
-
-            bool filtered = false;
-            std::unordered_map<std::string, bool> listenQueues;
-            Owned<IPTreeIterator> listening = globals->getElements("listen");
-            ForEach (*listening)
-            {
-                const char *lq = listening->query().queryProp(".");
-                if (lq)
-                {
-                    listenQueues[lq] = true;
-                    filtered = true;
-                }
-            }
-
             StringBuffer queueNames;
-            Owned<IPTreeIterator> queues = globals->getElements("queues");
-            ForEach(*queues)
-            {
-                IPTree &queue = queues->query();
-                const char *qname = queue.queryProp("@name");
-                if (!filtered || listenQueues.count(qname))
-                {
-                    if (queueNames.length())
-                        queueNames.append(",");
-                    getClusterEclCCServerQueueName(queueNames, qname);
-                }
-            }
-#else
-            SCMStringBuffer queueNames;
-            getEclCCServerQueueNames(queueNames, processName);
-#endif
+            getQueues(queueNames);
             if (!queueNames.length())
                 throw MakeStringException(0, "No queues found to listen on");
+
 #ifdef _CONTAINERIZED
+            queryCodeSigner().initForContainer();
+
             bool useChildProcesses = globals->getPropInt("@useChildProcesses", false);
             unsigned maxThreads = globals->getPropInt("@maxActive", 4);
 #else

+ 32 - 14
esp/smc/SMCLib/TpWrapper.cpp

@@ -29,6 +29,17 @@
 #include "dautils.hpp"
 #include "dameta.hpp"
 
+static unsigned reloadConfigCBId = (unsigned)-1;
+MODULE_INIT(INIT_PRIORITY_STANDARD)
+{
+    return true;
+}
+MODULE_EXIT()
+{
+    if ((unsigned)-1 != reloadConfigCBId)
+        removeConfigUpdateHook(reloadConfigCBId);
+}
+
 const char* MSG_FAILED_GET_ENVIRONMENT_INFO = "Failed to get environment information.";
 
 //////////////////////////////////////////////////////////////////////
@@ -2387,8 +2398,8 @@ extern TPWRAPPER_API unsigned getThorClusterNames(StringArray& targetNames, Stri
 
 static std::set<std::string> validTargets;
 static CriticalSection validTargetSect;
-static bool targetsDirty = true;
 
+// called within validTargetSect lock
 static void refreshValidTargets()
 {
     validTargets.clear();
@@ -2410,29 +2421,36 @@ static void refreshValidTargets()
     }
 }
 
+static void configUpdate(const IPropertyTree *oldComponentConfiguration, const IPropertyTree *oldGlobalConfiguration)
+{
+    CriticalBlock block(validTargetSect);
+    // as much as effort [small] to check if different as to refresh
+    refreshValidTargets();
+    PROGLOG("Valid targets updated");
+}
+
 extern TPWRAPPER_API void validateTargetName(const char* target)
 {
     if (isEmptyString(target))
         throw makeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Empty target name.");
 
     CriticalBlock block(validTargetSect);
-    if (targetsDirty)
+#ifdef _CONTAINERIZED
+    if ((unsigned) -1 == reloadConfigCBId) // once only
     {
         refreshValidTargets();
-        targetsDirty = false;
+        reloadConfigCBId = installConfigUpdateHook(configUpdate);
     }
-
-    if (validTargets.find(target) != validTargets.end())
-        return;
-
-#ifdef _CONTAINERIZED
-    //Currently, if there's any change to the target queues, esp will be auto restarted by K8s.
-    throw makeStringExceptionV(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid target name: %s", target);
-#else
-    if (!validateTargetClusterName(target))
+    if (validTargets.find(target) == validTargets.end())
         throw makeStringExceptionV(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid target name: %s", target);
-
-    targetsDirty = true;
+#else
+    if (validTargets.find(target) == validTargets.end())
+    {
+        // bare metal rechecks in case env. changed since target list built
+        if (!validateTargetClusterName(target))
+            throw makeStringExceptionV(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid target name: %s", target);
+        refreshValidTargets();
+    }
 #endif
 }
 

+ 1 - 1
helm/hpcc/templates/eclccserver.yaml

@@ -80,7 +80,7 @@ data:
 {{- if not .disabled -}}
 {{- $secretsCategories := list "system" "codeVerify" }}
 {{- $commonCtx := dict "root" $ "me" . "secretsCategories" $secretsCategories }}
-{{- $configSHA := include "hpcc.getConfigSHA" ($commonCtx | merge (dict "configMapHelper" "hpcc.eclccServerConfigMap" "component" "eclccserver" "excludeKeys" "global")) }}
+{{- $configSHA := include "hpcc.getConfigSHA" ($commonCtx | merge (dict "configMapHelper" "hpcc.eclccServerConfigMap" "component" "eclccserver" "excludeKeys" "global,eclccserver.queues")) }}
 apiVersion: apps/v1
 kind: Deployment
 metadata: