浏览代码

HPCC-23526 Roxie should resolve topology name more often

Support multiple toposervers in a headless service

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 5 年之前
父节点
当前提交
cdcc4c5315
共有 3 个文件被更改,包括 124 次插入36 次删除
  1. 42 36
      roxie/udplib/udptopo.cpp
  2. 81 0
      system/jlib/jsocket.cpp
  3. 1 0
      system/jlib/jsocket.hpp

+ 42 - 36
roxie/udplib/udptopo.cpp

@@ -19,6 +19,7 @@
 #include "udplib.hpp"
 #include "udptopo.hpp"
 #include "roxie.hpp"
+#include "portlist.h"
 #include <thread>
 #include <string>
 #include <sstream>
@@ -232,53 +233,58 @@ bool TopologyManager::update()
     {
         try
         {
-            SocketEndpoint ep(topoServers.item(idx));  // MORE - there may be more than one IP
-            Owned<ISocket> topo = ISocket::connect_timeout(ep, topoConnectTimeout);
-            if (topo)
+            SocketEndpointArray eps;
+            eps.fromName(topoServers.item(idx), TOPO_SERVER_PORT);
+            ForEachItemIn(idx, eps)
             {
-                unsigned topoBufLen = md5.length()+topoBuf.length();
-                _WINREV(topoBufLen);
-                topo->write(&topoBufLen, 4);
-                topo->write(md5.str(), md5.length());
-                topo->write(topoBuf.str(), topoBuf.length());
-                unsigned responseLen;
-                topo->read(&responseLen, 4);
-                _WINREV(responseLen);
-                if (!responseLen)
+                const SocketEndpoint &ep = eps.item(idx);
+                Owned<ISocket> topo = ISocket::connect_timeout(ep, topoConnectTimeout);
+                if (topo)
                 {
-                    DBGLOG("Unexpected empty response from topology server %s", topoServers.item(idx));
-                }
-                else
-                {
-                    if (responseLen > maxReasonableResponse)
+                    unsigned topoBufLen = md5.length()+topoBuf.length();
+                    _WINREV(topoBufLen);
+                    topo->write(&topoBufLen, 4);
+                    topo->write(md5.str(), md5.length());
+                    topo->write(topoBuf.str(), topoBuf.length());
+                    unsigned responseLen;
+                    topo->read(&responseLen, 4);
+                    _WINREV(responseLen);
+                    if (!responseLen)
                     {
-                        DBGLOG("Unexpectedly large response (%u) from topology server %s", responseLen, topoServers.item(idx));
+                        DBGLOG("Unexpected empty response from topology server %s", topoServers.item(idx));
                     }
                     else
                     {
-                        MemoryBuffer mb;
-                        char *mem = (char *)mb.reserveTruncate(responseLen);
-                        topo->read(mem, responseLen);
-                        if (responseLen>=md5.length() && mem[0]=='=')
+                        if (responseLen > maxReasonableResponse)
+                        {
+                            DBGLOG("Unexpectedly large response (%u) from topology server %s", responseLen, topoServers.item(idx));
+                        }
+                        else
                         {
-                            if (md5.length()==0 || memcmp(mem, md5.str(), md5.length())!=0)
+                            MemoryBuffer mb;
+                            char *mem = (char *)mb.reserveTruncate(responseLen);
+                            topo->read(mem, responseLen);
+                            if (responseLen>=md5.length() && mem[0]=='=')
                             {
-                                const char *eol = strchr(mem, '\n');
-                                if (eol)
+                                if (md5.length()==0 || memcmp(mem, md5.str(), md5.length())!=0)
                                 {
-                                    eol++;
-                                    md5.clear().append(eol-mem, mem);  // Note: includes '\n'
-                                    Owned<const ITopologyServer> newServer = new CTopologyServer(eol);
-                                    SpinBlock b(lock);
-                                    currentTopology.swap(newServer);
-                                    updated = true;
+                                    const char *eol = strchr(mem, '\n');
+                                    if (eol)
+                                    {
+                                        eol++;
+                                        md5.clear().append(eol-mem, mem);  // Note: includes '\n'
+                                        Owned<const ITopologyServer> newServer = new CTopologyServer(eol);
+                                        SpinBlock b(lock);
+                                        currentTopology.swap(newServer);
+                                        updated = true;
+                                    }
                                 }
                             }
-                        }
-                        else
-                        {
-                            StringBuffer s;
-                            DBGLOG("Unexpected response from topology server %s: %.*s", topoServers.item(idx), responseLen, mem);
+                            else
+                            {
+                                StringBuffer s;
+                                DBGLOG("Unexpected response from topology server %s: %.*s", topoServers.item(idx), responseLen, mem);
+                            }
                         }
                     }
                 }

+ 81 - 0
system/jlib/jsocket.cpp

@@ -6301,6 +6301,87 @@ inline bool appendv4range(SocketEndpointArray *array,char *str,SocketEndpoint &e
     return true;
 }
 
+bool SocketEndpointArray::fromName(const char *name, unsigned defport)
+{
+    // Lookup a single name that may resolve to multiple IPs in a headless service scenario
+    StringArray portSplit;
+    portSplit.appendList(name, ":");
+    switch (portSplit.ordinality())
+    {
+    case 2:
+        defport = atoi(portSplit.item(1));
+        name = portSplit.item(0);
+        // fallthrough
+    case 1:
+        break;
+    default:
+        throw MakeStringException(-1, "Invalid name %s SocketEndpointArray::fromName", name);
+    }
+#if defined(__linux__) || defined (__APPLE__) || defined(getaddrinfo)
+    if (IP4only)
+#endif
+    {
+        CriticalBlock c(hostnamesect);
+        hostent * entry = gethostbyname(name);
+        if (entry && entry->h_addr_list[0])
+        {
+            unsigned ptr = 0;
+            for (;;)
+            {
+                ptr++;
+                if (entry->h_addr_list[ptr]==NULL)
+                    break;
+                SocketEndpoint ep;
+                ep.setNetAddress(sizeof(unsigned),entry->h_addr_list[ptr]);
+                ep.port = defport;
+                append(ep);
+            }
+        }
+        return ordinality()>0;
+    }
+#if defined(__linux__) || defined (__APPLE__) || defined(getaddrinfo)
+    struct addrinfo hints;
+    memset(&hints,0,sizeof(hints));
+    struct addrinfo  *addrInfo = NULL;
+    memset(&hints,0,sizeof(hints));
+    int ret = getaddrinfo(name, NULL , &hints, &addrInfo);
+    if (ret == 0)
+    {
+        struct addrinfo  *ai;
+        for (ai = addrInfo; ai; ai = ai->ai_next)
+        {
+            // DBGLOG("flags=%d, family=%d, socktype=%d, protocol=%d, addrlen=%d, canonname=%s",ai->ai_flags,ai->ai_family,ai->ai_socktype,ai->ai_protocol,ai->ai_addrlen,ai->ai_canonname?ai->ai_canonname:"NULL");
+            if (ai->ai_protocol == IPPROTO_IP)
+            {
+                switch (ai->ai_family)
+                {
+                    case AF_INET:
+                    {
+                        SocketEndpoint ep;
+                        ep.setNetAddress(sizeof(in_addr),&(((sockaddr_in *)ai->ai_addr)->sin_addr));
+                        ep.port = defport;
+                        append(ep);
+                        // StringBuffer s;
+                        // DBGLOG("Lookup %s found %s", name, ep.getUrlStr(s).str());
+                        break;
+                    }
+                case AF_INET6:
+                    {
+                        SocketEndpoint ep;
+                        ep.setNetAddress(sizeof(in_addr6),&(((sockaddr_in6 *)ai->ai_addr)->sin6_addr));
+                        ep.port = defport;
+                        append(ep);
+                        break;
+                    }
+                }
+            }
+        }
+    }
+    freeaddrinfo(addrInfo);
+#endif
+    return ordinality()>0;
+}
+
 void SocketEndpointArray::fromText(const char *text,unsigned defport) 
 {
     // this is quite complicated with (mixed) IPv4 and IPv6

+ 1 - 0
system/jlib/jsocket.hpp

@@ -177,6 +177,7 @@ class jlib_decl SocketEndpointArray : public StructArrayOf<SocketEndpoint>
 { 
 public:
     StringBuffer &getText(StringBuffer &text);
+    bool fromName(const char *name, unsigned defport);
     void fromText(const char *s,unsigned defport);
 };