Browse Source

HPCC-23939 Roxie localslave mode not working in containerized mode

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 5 năm trước cách đây
mục cha
commit
58fdb234bd

+ 8 - 6
roxie/ccd/ccdmain.cpp

@@ -633,6 +633,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
         useOldTopology = checkFileExists(topologyFile.str());
         topology = loadConfiguration(useOldTopology ? nullptr : defaultYaml, argv, "roxie", "ROXIE", topologyFile, nullptr, "@netAddress");
         saveTopology();
+        localSlave = topology->getPropBool("@localSlave", false);
         const char *channels = topology->queryProp("@channels");
         if (channels)
         {
@@ -653,6 +654,10 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
                 slaveChannels.push_back(std::pair<unsigned, unsigned>(channel, repl));
             }
         }
+#ifdef _CONTAINERIZED
+        else if (localSlave)
+            slaveChannels.push_back(std::pair<unsigned, unsigned>(1, 0));
+#endif
         const char *topos = topology->queryProp("@topologyServers");
         StringArray topoValues;
         if (topos)
@@ -837,9 +842,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
             Owned<IPropertyTree> nas = envGetNASConfiguration(topology);
             envInstallNASHooks(nas);
         }
-        useDynamicServers = topology->getPropBool("@useDynamicServers", !useOldTopology);
         useAeron = topology->getPropBool("@useAeron", false);
-        localSlave = topology->getPropBool("@localSlave", false);
         numChannels = topology->getPropInt("@numChannels", 0);
         doIbytiDelay = topology->getPropBool("@doIbytiDelay", true);
         minIbytiDelay = topology->getPropInt("@minIbytiDelay", 2);
@@ -1154,10 +1157,9 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
         globalPackageSetManager = createRoxiePackageSetManager(standAloneDll.getClear());
         globalPackageSetManager->load();
         unsigned snifferChannel = numChannels+2; // MORE - why +2 not +1??
-        if (useDynamicServers && topoValues.length())
-        {
-            startTopoThread(topoValues, myRoles, traceLevel);
-        }
+#ifdef _CONTAINERIZED
+        initializeTopology(topoValues, myRoles, traceLevel);
+#endif
         ROQ = createOutputQueueManager(snifferChannel, numSlaveThreads);
         ROQ->setHeadRegionSize(headRegionSize);
         ROQ->start();

+ 0 - 2
roxie/udplib/udplib.hpp

@@ -38,8 +38,6 @@ typedef unsigned ruid_t;   // at 1000/sec recycle every 49 days
 typedef unsigned RecordLengthType;
 #define MAX_RECORD_LENGTH 0xffffffff
 
-extern UDPLIB_API bool useDynamicServers;
-
 class UDPLIB_API ServerIdentifier
 {
 private:

+ 0 - 1
roxie/udplib/udpsha.cpp

@@ -40,7 +40,6 @@ unsigned udpLocalWriteSocketSize = 1024000;
 unsigned udpSnifferReadThreadPriority = 3;
 unsigned udpSnifferSendThreadPriority = 3;
 
-bool useDynamicServers = true;
 unsigned multicastTTL = 1;
 
 MODULE_INIT(INIT_PRIORITY_STANDARD)

+ 18 - 15
roxie/udplib/udptopo.cpp

@@ -356,30 +356,33 @@ static std::thread topoThread;
 static Semaphore abortTopo;
 const unsigned topoUpdateInterval = 5000;
 
-extern UDPLIB_API void startTopoThread(const StringArray &topoValues, const std::vector<RoxieEndpointInfo> &myRoles, unsigned traceLevel)
+extern UDPLIB_API void initializeTopology(const StringArray &topoValues, const std::vector<RoxieEndpointInfo> &myRoles, unsigned traceLevel)
 {
     topologyManager.setServers(topoValues);
     topologyManager.setRoles(myRoles);
-    topoThread = std::thread([traceLevel]()
+    if (topoValues.length())
     {
-        topologyManager.update();
-        unsigned waitTime = 1000;  // First time around we don't wait as long, so that system comes up faster
-        while (!abortTopo.wait(waitTime))
+        topoThread = std::thread([traceLevel]()
         {
-            if (topologyManager.update() && traceLevel)
+            topologyManager.update();
+            unsigned waitTime = 1000;  // First time around we don't wait as long, so that system comes up faster
+            while (!abortTopo.wait(waitTime))
             {
-                DBGLOG("Topology information updated:");
-                Owned<const ITopologyServer> c = getTopology();
-                const SocketEndpointArray &eps = c->querySlaves(0);
-                ForEachItemIn(idx, eps)
+                if (topologyManager.update() && traceLevel)
                 {
-                    StringBuffer s;
-                    DBGLOG("Slave %d: %s", idx, eps.item(idx).getIpText(s).str());
+                    DBGLOG("Topology information updated:");
+                    Owned<const ITopologyServer> c = getTopology();
+                    const SocketEndpointArray &eps = c->querySlaves(0);
+                    ForEachItemIn(idx, eps)
+                    {
+                        StringBuffer s;
+                        DBGLOG("Slave %d: %s", idx, eps.item(idx).getIpText(s).str());
+                    }
                 }
+                waitTime = topoUpdateInterval;
             }
-            waitTime = topoUpdateInterval;
-        }
-    });
+        });
+    }
 }
 
 extern UDPLIB_API void stopTopoThread()

+ 1 - 1
roxie/udplib/udptopo.hpp

@@ -115,7 +115,7 @@ struct RoxieEndpointInfo
     unsigned replicationLevel;
 };
 
-extern UDPLIB_API void startTopoThread(const StringArray &topoServers, const std::vector<RoxieEndpointInfo> &myRoles, unsigned traceLevel);
+extern UDPLIB_API void initializeTopology(const StringArray &topoServers, const std::vector<RoxieEndpointInfo> &myRoles, unsigned traceLevel);
 #ifndef _CONTAINERIZED
 extern UDPLIB_API void createStaticTopology(const std::vector<RoxieEndpointInfo> &allRoles, unsigned traceLevel);
 #endif

+ 0 - 1
roxie/udplib/uttest.cpp

@@ -617,7 +617,6 @@ int main(int argc, char * argv[] )
     InitModuleObjects();
     if (argc < 2)
         usage();
-    useDynamicServers = false;
 
     strdup("Make sure leak checking is working");
     queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time | MSGFIELD_thread | MSGFIELD_prefix);