浏览代码

HPCC-23526 Roxie should resolve topology name more often

In k8s settings, the topology server name may refer to a different IP if
topology server restarts, and the dns entry may not have been set up at the
time roxie is starting. Delay resolution of the name until when we are
actually making the call.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 5 年之前
父节点
当前提交
015d1d7904
共有 3 个文件被更改,包括 28 次插入28 次删除
  1. 8 12
      roxie/ccd/ccdmain.cpp
  2. 19 15
      roxie/udplib/udptopo.cpp
  3. 1 1
      roxie/udplib/udptopo.hpp

+ 8 - 12
roxie/ccd/ccdmain.cpp

@@ -409,7 +409,6 @@ public:
     }
     }
 };
 };
 
 
-static SocketEndpointArray topologyServers;
 static std::vector<RoxieEndpointInfo> myRoles;
 static std::vector<RoxieEndpointInfo> myRoles;
 static std::vector<unsigned> farmerPorts;
 static std::vector<unsigned> farmerPorts;
 static std::vector<std::pair<unsigned, unsigned>> slaveChannels;
 static std::vector<std::pair<unsigned, unsigned>> slaveChannels;
@@ -536,6 +535,8 @@ static constexpr const char * defaultJson = R"!!({
     "queueNames": "roxie.roxie",
     "queueNames": "roxie.roxie",
     "resolveLocally": true,
     "resolveLocally": true,
     "serverPorts": "9876,0",
     "serverPorts": "9876,0",
+    "roxieMulticastEnabled": false,
+    "useAeron": false,
     "RoxieFarmProcess":  {
     "RoxieFarmProcess":  {
       "name": "default",
       "name": "default",
       "port": 9876,
       "port": 9876,
@@ -675,15 +676,9 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
             }
             }
         }
         }
         const char *topos = topology->queryProp("@topologyServers");
         const char *topos = topology->queryProp("@topologyServers");
+        StringArray topoValues;
         if (topos)
         if (topos)
-        {
-            StringArray topoValues;
             topoValues.appendList(topos, ",", true);
             topoValues.appendList(topos, ",", true);
-            ForEachItemIn(idx, topoValues)
-            {
-                topologyServers.append(SocketEndpoint(topoValues.item(idx)));  // MORE - in cloud case we may need to explicitly find all pods for a single service?
-            }
-        }
         const char *serverPorts = topology->queryProp("@serverPorts");
         const char *serverPorts = topology->queryProp("@serverPorts");
         if (serverPorts)
         if (serverPorts)
         {
         {
@@ -858,7 +853,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
             envInstallNASHooks(nas);
             envInstallNASHooks(nas);
         }
         }
         useDynamicServers = topology->getPropBool("@useDynamicServers", !useOldTopology);
         useDynamicServers = topology->getPropBool("@useDynamicServers", !useOldTopology);
-        useAeron = topology->getPropBool("@useAeron", useDynamicServers);
+        useAeron = topology->getPropBool("@useAeron", false);
         localSlave = topology->getPropBool("@localSlave", false);
         localSlave = topology->getPropBool("@localSlave", false);
         numChannels = topology->getPropInt("@numChannels", 0);
         numChannels = topology->getPropInt("@numChannels", 0);
         doIbytiDelay = topology->getPropBool("@doIbytiDelay", true);
         doIbytiDelay = topology->getPropBool("@doIbytiDelay", true);
@@ -930,7 +925,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
         udpFlowSocketsSize = topology->getPropInt("@udpFlowSocketsSize", 131072);
         udpFlowSocketsSize = topology->getPropInt("@udpFlowSocketsSize", 131072);
         udpLocalWriteSocketSize = topology->getPropInt("@udpLocalWriteSocketSize", 1024000);
         udpLocalWriteSocketSize = topology->getPropInt("@udpLocalWriteSocketSize", 1024000);
         
         
-        roxieMulticastEnabled = topology->getPropBool("@roxieMulticastEnabled", true);   // enable use of multicast for sending requests to slaves
+        roxieMulticastEnabled = topology->getPropBool("@roxieMulticastEnabled", true) && !useAeron;   // enable use of multicast for sending requests to slaves
         if (udpSnifferEnabled && !roxieMulticastEnabled)
         if (udpSnifferEnabled && !roxieMulticastEnabled)
         {
         {
             DBGLOG("WARNING: ignoring udpSnifferEnabled setting as multicast not enabled");
             DBGLOG("WARNING: ignoring udpSnifferEnabled setting as multicast not enabled");
@@ -1128,6 +1123,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
             if (!numChannels)
             if (!numChannels)
                 throw makeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels not set");
                 throw makeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels not set");
             IpAddress myIP(".");
             IpAddress myIP(".");
+            myNode.setIp(myIP);
             for (unsigned port: farmerPorts)
             for (unsigned port: farmerPorts)
             {
             {
                 VStringBuffer xpath("RoxieFarmProcess[@port='%u']", port);
                 VStringBuffer xpath("RoxieFarmProcess[@port='%u']", port);
@@ -1162,9 +1158,9 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
         globalPackageSetManager = createRoxiePackageSetManager(standAloneDll.getClear());
         globalPackageSetManager = createRoxiePackageSetManager(standAloneDll.getClear());
         globalPackageSetManager->load();
         globalPackageSetManager->load();
         unsigned snifferChannel = numChannels+2; // MORE - why +2 not +1??
         unsigned snifferChannel = numChannels+2; // MORE - why +2 not +1??
-        if (useDynamicServers && topologyServers.length())
+        if (useDynamicServers && topoValues.length())
         {
         {
-            startTopoThread(topologyServers, myRoles, traceLevel);
+            startTopoThread(topoValues, myRoles, traceLevel);
         }
         }
         ROQ = createOutputQueueManager(snifferChannel, numSlaveThreads);
         ROQ = createOutputQueueManager(snifferChannel, numSlaveThreads);
         ROQ->setHeadRegionSize(headRegionSize);
         ROQ->setHeadRegionSize(headRegionSize);

+ 19 - 15
roxie/udplib/udptopo.cpp

@@ -208,15 +208,15 @@ class TopologyManager
 {
 {
 public:
 public:
     TopologyManager() { currentTopology.setown(new CTopologyServer); };
     TopologyManager() { currentTopology.setown(new CTopologyServer); };
-    void setServers(const SocketEndpointArray &_topoServers);
+    void setServers(const StringArray &_topoServers);
     void setRoles(const std::vector<RoxieEndpointInfo> &myRoles);
     void setRoles(const std::vector<RoxieEndpointInfo> &myRoles);
     const ITopologyServer &getCurrent();
     const ITopologyServer &getCurrent();
 
 
-    void update();
+    bool update();
 private:
 private:
     Owned<const ITopologyServer> currentTopology;
     Owned<const ITopologyServer> currentTopology;
     SpinLock lock;
     SpinLock lock;
-    SocketEndpointArray topoServers;
+    StringArray topoServers;
     const unsigned topoConnectTimeout = 1000;
     const unsigned topoConnectTimeout = 1000;
     const unsigned maxReasonableResponse = 32*32*1024;  // At ~ 32 bytes per entry, 1024 channels and 32-way redundancy that's a BIG cluster!
     const unsigned maxReasonableResponse = 32*32*1024;  // At ~ 32 bytes per entry, 1024 channels and 32-way redundancy that's a BIG cluster!
     StringBuffer md5;
     StringBuffer md5;
@@ -225,13 +225,15 @@ private:
 
 
 static TopologyManager topologyManager;
 static TopologyManager topologyManager;
 
 
-void TopologyManager::update()
+bool TopologyManager::update()
 {
 {
+    bool updated = false;
     ForEachItemIn(idx, topoServers)
     ForEachItemIn(idx, topoServers)
     {
     {
         try
         try
         {
         {
-            Owned<ISocket> topo = ISocket::connect_timeout(topoServers.item(idx), topoConnectTimeout);
+            SocketEndpoint ep(topoServers.item(idx));  // MORE - there may be more than one IP
+            Owned<ISocket> topo = ISocket::connect_timeout(ep, topoConnectTimeout);
             if (topo)
             if (topo)
             {
             {
                 unsigned topoBufLen = md5.length()+topoBuf.length();
                 unsigned topoBufLen = md5.length()+topoBuf.length();
@@ -244,15 +246,13 @@ void TopologyManager::update()
                 _WINREV(responseLen);
                 _WINREV(responseLen);
                 if (!responseLen)
                 if (!responseLen)
                 {
                 {
-                    StringBuffer s;
-                    DBGLOG("Unexpected empty response from topology server %s", topoServers.item(idx).getUrlStr(s).str());
+                    DBGLOG("Unexpected empty response from topology server %s", topoServers.item(idx));
                 }
                 }
                 else
                 else
                 {
                 {
                     if (responseLen > maxReasonableResponse)
                     if (responseLen > maxReasonableResponse)
                     {
                     {
-                        StringBuffer s;
-                        DBGLOG("Unexpectedly large response (%u) from topology server %s", responseLen, topoServers.item(idx).getUrlStr(s).str());
+                        DBGLOG("Unexpectedly large response (%u) from topology server %s", responseLen, topoServers.item(idx));
                     }
                     }
                     else
                     else
                     {
                     {
@@ -271,13 +271,14 @@ void TopologyManager::update()
                                     Owned<const ITopologyServer> newServer = new CTopologyServer(eol);
                                     Owned<const ITopologyServer> newServer = new CTopologyServer(eol);
                                     SpinBlock b(lock);
                                     SpinBlock b(lock);
                                     currentTopology.swap(newServer);
                                     currentTopology.swap(newServer);
+                                    updated = true;
                                 }
                                 }
                             }
                             }
                         }
                         }
                         else
                         else
                         {
                         {
                             StringBuffer s;
                             StringBuffer s;
-                            DBGLOG("Unexpected response from topology server %s: %.*s", topoServers.item(idx).getUrlStr(s).str(), responseLen, mem);
+                            DBGLOG("Unexpected response from topology server %s: %.*s", topoServers.item(idx), responseLen, mem);
                         }
                         }
                     }
                     }
                 }
                 }
@@ -285,10 +286,12 @@ void TopologyManager::update()
         }
         }
         catch (IException *E)
         catch (IException *E)
         {
         {
+            DBGLOG("While connecting to %s", topoServers.item(idx));
             EXCLOG(E);
             EXCLOG(E);
             E->Release();
             E->Release();
         }
         }
     }
     }
+    return updated;
 }
 }
 
 
 const ITopologyServer &TopologyManager::getCurrent()
 const ITopologyServer &TopologyManager::getCurrent()
@@ -297,7 +300,7 @@ const ITopologyServer &TopologyManager::getCurrent()
     return *currentTopology.getLink();
     return *currentTopology.getLink();
 }
 }
 
 
-void TopologyManager::setServers(const SocketEndpointArray &_topoServers)
+void TopologyManager::setServers(const StringArray &_topoServers)
 {
 {
     ForEachItemIn(idx, _topoServers)
     ForEachItemIn(idx, _topoServers)
         topoServers.append(_topoServers.item(idx));
         topoServers.append(_topoServers.item(idx));
@@ -306,6 +309,7 @@ void TopologyManager::setServers(const SocketEndpointArray &_topoServers)
 void TopologyManager::setRoles(const std::vector<RoxieEndpointInfo> &myRoles)
 void TopologyManager::setRoles(const std::vector<RoxieEndpointInfo> &myRoles)
 {
 {
     topoBuf.clear();
     topoBuf.clear();
+    DBGLOG("TopologyManager::setRoles - %d roles", (int) myRoles.size());
     for (const auto &role : myRoles)
     for (const auto &role : myRoles)
     {
     {
         switch (role.role)
         switch (role.role)
@@ -344,9 +348,9 @@ static std::thread topoThread;
 static Semaphore abortTopo;
 static Semaphore abortTopo;
 const unsigned topoUpdateInterval = 5000;
 const unsigned topoUpdateInterval = 5000;
 
 
-extern UDPLIB_API void startTopoThread(const SocketEndpointArray &topoServers, const std::vector<RoxieEndpointInfo> &myRoles, unsigned traceLevel)
+extern UDPLIB_API void startTopoThread(const StringArray &topoValues, const std::vector<RoxieEndpointInfo> &myRoles, unsigned traceLevel)
 {
 {
-    topologyManager.setServers(topoServers);
+    topologyManager.setServers(topoValues);
     topologyManager.setRoles(myRoles);
     topologyManager.setRoles(myRoles);
     topoThread = std::thread([traceLevel]()
     topoThread = std::thread([traceLevel]()
     {
     {
@@ -354,9 +358,9 @@ extern UDPLIB_API void startTopoThread(const SocketEndpointArray &topoServers, c
         unsigned waitTime = 1000;  // First time around we don't wait as long, so that system comes up faster
         unsigned waitTime = 1000;  // First time around we don't wait as long, so that system comes up faster
         while (!abortTopo.wait(waitTime))
         while (!abortTopo.wait(waitTime))
         {
         {
-            topologyManager.update();
-            if (traceLevel > 2)
+            if (topologyManager.update() && traceLevel)
             {
             {
+                DBGLOG("Topology information updated:");
                 Owned<const ITopologyServer> c = getTopology();
                 Owned<const ITopologyServer> c = getTopology();
                 const SocketEndpointArray &eps = c->querySlaves(0);
                 const SocketEndpointArray &eps = c->querySlaves(0);
                 ForEachItemIn(idx, eps)
                 ForEachItemIn(idx, eps)

+ 1 - 1
roxie/udplib/udptopo.hpp

@@ -115,7 +115,7 @@ struct RoxieEndpointInfo
     unsigned replicationLevel;
     unsigned replicationLevel;
 };
 };
 
 
-extern UDPLIB_API void startTopoThread(const SocketEndpointArray &topoServers, const std::vector<RoxieEndpointInfo> &myRoles, unsigned traceLevel);
+extern UDPLIB_API void startTopoThread(const StringArray &topoServers, const std::vector<RoxieEndpointInfo> &myRoles, unsigned traceLevel);
 extern UDPLIB_API void createStaticTopology(const std::vector<RoxieEndpointInfo> &allRoles, unsigned traceLevel);
 extern UDPLIB_API void createStaticTopology(const std::vector<RoxieEndpointInfo> &allRoles, unsigned traceLevel);
 
 
 #endif
 #endif