Sfoglia il codice sorgente

HPCC-26380 Roxie topology server heartbeat times are too short

Use different timeouts for agents vs servers, and handle degraded agents
differently from degraded servers.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 3 anni fa
parent
commit
402f7dfd80

+ 3 - 2
helm/hpcc/templates/roxie.yaml

@@ -36,6 +36,7 @@ data:
 {{ toYaml ( omit .me "logging" "topoServer" "encryptInTransit" "env") | indent 6 }}
       numChannels: {{ .numChannels }}
       topologyServers: "{{ .toponame }}:{{ .topoport }}"
+      heartbeatInterval: {{ .heartbeatInterval }}
       resolveLocally: false
 {{- $mtlsEnabled := (eq (include "hpcc.isMtlsEnabled" (dict "root" .root)) "true") -}}
 {{/* By default use encryption if local certificates are enabled, but allow it to be turned off via roxie .encryptInTransit value */}}
@@ -61,8 +62,7 @@ data:
   {{ .toponame }}.yaml:
     version: 1.0
     toposerver:
-      port: {{ .topoport }}
-      traceLevel: {{ .toposerver.traceLevel | default 1 }}
+{{ toYaml ( omit .toposerver "logging" "env") | indent 6 }}
 {{- include "hpcc.generateLoggingConfig" (dict "root" .root "me" .toposerver) | indent 6 }}
 {{- end -}}
 
@@ -75,6 +75,7 @@ data:
 {{- $_ := set $commonCtx "toponame" (printf "%s-toposerver" $roxie.name) -}}
 {{- $_ := set $commonCtx "numChannels" ($roxie.numChannels | int | default 1) -}}
 {{- $_ := set $commonCtx "topoport" ($toposerver.port | int | default 9004) -}}
+{{- $_ := set $commonCtx "heartbeatInterval" ($toposerver.heartbeatInterval | int | default 10000) -}}
 {{- $_ := set $toposerver "name" $commonCtx.toponame -}}
 {{- $configSHA := include "hpcc.getConfigSHA" ($commonCtx | merge (dict "configMapHelper" "hpcc.roxieConfigMap" "component" "roxie" "excludeKeys" "global")) }}
 {{- $topoconfigSHA := include "hpcc.getConfigSHA" ($commonCtx | merge (dict "configMapHelper" "hpcc.roxieTopoConfigMap" "component" "toposerver" "excludeKeys" "global")) }}

+ 27 - 15
roxie/topo/toposerver.cpp

@@ -76,11 +76,13 @@ StringBuffer cachedDigest;
 bool responseDirty = true;
 unsigned lastTimeoutCheck = 0;
 unsigned lastTopologyReport = 0;
-const unsigned timeoutCheckInterval = 1000;
-const unsigned heartbeatInterval = 5000;
-const unsigned timeoutHeartbeatInterval = 10000;
-const unsigned removeHeartbeatInterval = 120000;
-const unsigned topologyReportInterval = 60000;
+
+unsigned timeoutCheckInterval = 1000;       // How often we check to see what has expired
+unsigned heartbeatInterval = 5000;          // How often nodes send heartbeats
+unsigned timeoutHeartbeatServer = 60000;    // How long before a server is marked as down
+unsigned timeoutHeartbeatAgent = 10000;     // How long before an agent is marked as down
+unsigned removeHeartbeatInterval = 120000;  // How long before a node is removed from list
+unsigned topologyReportInterval = 60000;    // How often topology is reported to logging (if traceLevel >= 2)
 bool aborted = false;
 Semaphore stopping;
 StringBuffer topologyFile;
@@ -144,26 +146,28 @@ void timeoutTopology()
         return;
     for (auto it = topology.begin(); it != topology.end(); /* no increment */)
     {
+        bool isServer = it->first.rfind("server", 0)==0;
         unsigned lastSeen = it->second.lastSeen;
-        if (now-lastSeen > timeoutHeartbeatInterval)
+        unsigned timeout = isServer ? timeoutHeartbeatServer : timeoutHeartbeatAgent;
+        // If a server is missing a heartbeat for a while, we mark it as down. Queued packets for that server will get discarded, and
+        // it will be sorted to the end of the priority list for agent requests
+        // The timeout is different for server vs agent - for servers, we want to be sure it really is down, and there's no huge cost for waiting,
+        // while for agents we want to divert traffic away from it ASAP (so long as there are other destinations available
+        if (now-lastSeen > timeout)
         {
             if (traceLevel)
             {
                 DBGLOG("No heartbeat for %u ms for %s", now-lastSeen, it->first.c_str());
             }
+            responseDirty = true;
             if (now-lastSeen > removeHeartbeatInterval)
-                it = topology.erase(it);
-            else
             {
-                it->second.instance = 0;  // By leaving the entry present but with instance=0, we will ensure that all clients get to see that the machine is no longer present
-                ++it;
+                it = topology.erase(it);
+                continue;
             }
-            responseDirty = true;
-        }
-        else
-        {
-            ++it;
+            it->second.instance = 0;  // By leaving the entry present but with instance=0, we will ensure that all clients get to see that the machine is no longer present
         }
+        ++it;
     }
     lastTimeoutCheck = now;
 }
@@ -357,6 +361,14 @@ int main(int argc, const char *argv[])
         Owned<IPropertyTree> topology = loadConfiguration(defaultYaml, argv, "toposerver", "TOPOSERVER", nullptr, nullptr);
         traceLevel = topology->getPropInt("@traceLevel", 1);
         topoPort = topology->getPropInt("@port", TOPO_SERVER_PORT);
+
+        timeoutCheckInterval = topology->getPropInt("@timeoutCheckInterval", timeoutCheckInterval);
+        heartbeatInterval = topology->getPropInt("@heartbeatInterval", heartbeatInterval);
+        timeoutHeartbeatAgent = topology->getPropInt("@timeoutHeartbeatAgent", timeoutHeartbeatAgent);
+        timeoutHeartbeatServer = topology->getPropInt("@timeoutHeartbeatServer", timeoutHeartbeatServer);
+        removeHeartbeatInterval = topology->getPropInt("@removeHeartbeatInterval", removeHeartbeatInterval);
+        topologyReportInterval = topology->getPropInt("@topologyReportInterval", topologyReportInterval);
+
 #ifndef _CONTAINERIZED
         if (topology->getPropBool("@stdlog", traceLevel != 0))
             queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time | MSGFIELD_milliTime | MSGFIELD_thread | MSGFIELD_prefix);

+ 46 - 16
roxie/udplib/udptopo.cpp

@@ -124,7 +124,7 @@ public:
     virtual const ChannelInfo &queryChannelInfo(unsigned channel) const override;
     virtual const std::vector<unsigned> &queryChannels() const override;
     virtual bool implementsChannel(unsigned channel) const override;
-    virtual void report(StringBuffer &ret) const override;
+    virtual StringBuffer &report(StringBuffer &ret) const override;
     virtual time_t queryServerInstance(const SocketEndpoint &ep) const override;
 private:
     std::map<unsigned, SocketEndpointArray> agents;  // indexed by channel
@@ -153,6 +153,7 @@ CTopologyServer::CTopologyServer(const char *topologyInfo, const ITopologyServer
 {
     std::istringstream ss(topologyInfo);
     std::string line;
+    std::map<unsigned, SocketEndpointArray> degradedAgents;  // indexed by channel - agents that have not sent heartbeats recently. Use only if nothing else available on channel
     while (std::getline(ss, line, '\n'))
     {
         StringArray fields;
@@ -193,7 +194,7 @@ CTopologyServer::CTopologyServer(const char *topologyInfo, const ITopologyServer
             }
             if (streq(role, "agent"))
             {
-                if (instance)
+                if (instance || ep.equals(myAgentEP))
                 {
                     agents[channel].append(ep);
                     if (ep.equals(myAgentEP))
@@ -204,6 +205,10 @@ CTopologyServer::CTopologyServer(const char *topologyInfo, const ITopologyServer
                     }
                     agents[0].append(ep);
                 }
+                else
+                {
+                    degradedAgents[channel].append(ep);
+                }
             }
             else if (streq(role, "server"))
             {
@@ -224,6 +229,22 @@ CTopologyServer::CTopologyServer(const char *topologyInfo, const ITopologyServer
         else
             DBGLOG("Unable to process information in topology entry %s (expected 5 fields)", line.c_str());
     }
+    // Degraded agents are used only if nothing else is available on the channel
+    for (auto it = degradedAgents.begin(); it != degradedAgents.end(); it++)
+    {
+        unsigned channel = it->first;
+        if (!agents[channel].length())
+        {
+            DBGLOG("Adding degraded agent(s) to channel %d", channel);
+            ForEachItemIn(idx, it->second)
+            {
+                agents[channel].append(it->second.item(idx));
+                agents[0].append(it->second.item(idx));
+            }
+        }
+        else
+            DBGLOG("Ignoring degraded agent(s) on channel %d", channel);
+    }
     for (unsigned i = 0; i < channels.size(); i++)
     {
         unsigned channel = channels[i];
@@ -281,13 +302,25 @@ bool CTopologyServer::implementsChannel(unsigned channel) const
         return true;   // Kinda-sorta - perhaps not true if separated servers from agents, but even then child queries may access channel 0
 }
 
-void CTopologyServer::report(StringBuffer &ret) const
+StringBuffer &CTopologyServer::report(StringBuffer &ret) const
 {
-#ifdef _DEBUG
-    ret.append(rawData);
-#else
-    UNIMPLEMENTED;
-#endif
+    for (auto it = agents.begin(); it != agents.end(); it++)
+    {
+        if (it->second.length())
+        {
+            ret.appendf("Channel %d agents: ", it->first);
+            it->second.getText(ret).newline();
+        }
+    }
+    for (auto it = servers.begin(); it != servers.end(); it++)
+    {
+        if (it->second.length())
+        {
+            ret.appendf("Port %d servers: ", it->first);
+            it->second.getText(ret).newline();
+        }
+    }
+    return ret;
 }
 
 const SocketEndpointArray CTopologyServer::nullArray;
@@ -483,12 +516,13 @@ extern UDPLIB_API void createStaticTopology(const std::vector<RoxieEndpointInfo>
 
 static std::thread topoThread;
 static Semaphore abortTopo;
-const unsigned topoUpdateInterval = 5000;
+unsigned heartbeatInterval = 5000;   // How often roxie servers update topo server
 
 extern UDPLIB_API void initializeTopology(const StringArray &topoValues, const std::vector<RoxieEndpointInfo> &myRoles)
 {
     topologyManager.setServers(topoValues);
     topologyManager.setRoles(myRoles);
+    heartbeatInterval = getComponentConfigSP()->getPropInt("@heartbeatInterval", heartbeatInterval);
 }
 
 extern UDPLIB_API void publishTopology(unsigned traceLevel, const std::vector<RoxieEndpointInfo> &myRoles)
@@ -505,14 +539,10 @@ extern UDPLIB_API void publishTopology(unsigned traceLevel, const std::vector<Ro
                 {
                     DBGLOG("Topology information updated:");
                     Owned<const ITopologyServer> c = getTopology();
-                    const SocketEndpointArray &eps = c->queryAgents(0);
-                    ForEachItemIn(idx, eps)
-                    {
-                        StringBuffer s;
-                        DBGLOG("Agent %d: %s", idx, eps.item(idx).getIpText(s).str());
-                    }
+                    StringBuffer s;
+                    MLOG("%s", c->report(s).str());
                 }
-                waitTime = topoUpdateInterval;
+                waitTime = heartbeatInterval;
             }
             topologyManager.closedown(myRoles);
         });

+ 1 - 1
roxie/udplib/udptopo.hpp

@@ -109,7 +109,7 @@ interface ITopologyServer : public IInterface
     virtual const ChannelInfo &queryChannelInfo(unsigned channel) const = 0;
     virtual const std::vector<unsigned> &queryChannels() const = 0;
     virtual bool implementsChannel(unsigned channel) const = 0;
-    virtual void report(StringBuffer &ret) const = 0;
+    virtual StringBuffer & report(StringBuffer &ret) const = 0;
     virtual time_t queryServerInstance(const SocketEndpoint &ep) const = 0;
 };
 

+ 2 - 1
system/jlib/jlog.cpp

@@ -1747,7 +1747,8 @@ void CLogMsgManager::mreport_direct(unsigned compo, const LogMsgCategory & cat,
         switch (*cursor)
         {
             case '\0':
-                pushMsg(new LogMsg(cat, getNextID(), job, NoLogMsgCode, (int)(cursor-lineStart), lineStart, compo, port, session));
+                if (cursor != lineStart || cursor==msg)
+                    pushMsg(new LogMsg(cat, getNextID(), job, NoLogMsgCode, (int)(cursor-lineStart), lineStart, compo, port, session));
                 return;
             case '\r':
                 // NB: \r or \r\n translated into newline

+ 1 - 1
system/jlib/jsocket.cpp

@@ -6148,7 +6148,7 @@ inline void flushText(StringBuffer &text,unsigned short port,unsigned &rep,unsig
 
 
 
-StringBuffer &SocketEndpointArray::getText(StringBuffer &text)
+StringBuffer &SocketEndpointArray::getText(StringBuffer &text) const
 {
     unsigned count = ordinality();
     if (!count)

+ 1 - 1
system/jlib/jsocket.hpp

@@ -184,7 +184,7 @@ public:
 class jlib_decl SocketEndpointArray : public StructArrayOf<SocketEndpoint>
 { 
 public:
-    StringBuffer &getText(StringBuffer &text);
+    StringBuffer &getText(StringBuffer &text) const;
     bool fromName(const char *name, unsigned defport);
     void fromText(const char *s,unsigned defport);
 };