Explorar o código

HPCC-9258 Make Roxie UDP ports configurable

Roxie uses ports 8887, 9000, 9001, 9002, and 9003 for UDP traffic. These
should be configurable.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman %!s(int64=12) %!d(string=hai) anos
pai
achega
26c49a78c6

+ 37 - 0
initfiles/componentfiles/configxml/roxie.xsd.in

@@ -1029,6 +1029,43 @@
       </xs:annotation>
     </xs:attribute>
   </xs:attributeGroup>
+  <xs:attributeGroup name="Ports">
+    <xs:attribute name="multicastPort" type="xs:nonNegativeInteger" use="optional" default="8887">
+      <xs:annotation>
+        <xs:appinfo>
+          <tooltip>Port used for multicast (server->slave) data</tooltip>
+        </xs:appinfo>
+      </xs:annotation>
+    </xs:attribute>
+    <xs:attribute name="serverFlowPort" type="xs:nonNegativeInteger" use="optional" default="9000">
+      <xs:annotation>
+        <xs:appinfo>
+          <tooltip>Port used for UDP (slave->server) server flow control messages</tooltip>
+        </xs:appinfo>
+      </xs:annotation>
+    </xs:attribute>
+    <xs:attribute name="dataPort" type="xs:nonNegativeInteger" use="optional" default="9001">
+      <xs:annotation>
+        <xs:appinfo>
+          <tooltip>Port used for UDP (slave->server) data</tooltip>
+        </xs:appinfo>
+      </xs:annotation>
+    </xs:attribute>
+    <xs:attribute name="clientFlowPort" type="xs:nonNegativeInteger" use="optional" default="9002">
+      <xs:annotation>
+        <xs:appinfo>
+          <tooltip>Port used for UDP (slave->server) client flow control messages</tooltip>
+        </xs:appinfo>
+      </xs:annotation>
+    </xs:attribute>
+    <xs:attribute name="snifferPort" type="xs:nonNegativeInteger" use="optional" default="9003">
+      <xs:annotation>
+        <xs:appinfo>
+          <tooltip>Port used for UDP (slave->server) sniffer</tooltip>
+        </xs:appinfo>
+      </xs:annotation>
+    </xs:attribute>
+  </xs:attributeGroup>
   <xs:attributeGroup name="Debug">
     <xs:attribute name="checkCompleted" type="xs:boolean" use="optional" default="true">
       <xs:annotation>

+ 1 - 0
roxie/ccd/ccd.hpp

@@ -320,6 +320,7 @@ extern unsigned lowTimeout;
 extern unsigned highTimeout;
 extern unsigned slaTimeout;
 extern unsigned headRegionSize;
+extern unsigned ccdMulticastPort;
 extern CriticalSection ccdChannelsCrit;
 extern IPropertyTree* ccdChannels;
 extern IPropertyTree* topology;

+ 3 - 1
roxie/ccd/ccdmain.cpp

@@ -58,6 +58,7 @@ unsigned numServerThreads = 30;
 unsigned numSlaveThreads = 30;
 unsigned numRequestArrayThreads = 5;
 unsigned headRegionSize;
+unsigned ccdMulticastPort;
 bool enableHeartBeat = true;
 unsigned parallelLoopFlowLimit = 100;
 unsigned perChannelFlowLimit = 10;
@@ -335,7 +336,7 @@ void addChannel(unsigned channel, const char *dataDirectory, bool isMe, bool sus
     }
     if (!localSlave)
     {
-        addEndpoint(channel, slaveIp, CCD_MULTICAST_PORT);
+        addEndpoint(channel, slaveIp, ccdMulticastPort);
     }
 }
 
@@ -629,6 +630,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
         bool isCCD = false;
 
         headRegionSize = topology->getPropInt("@headRegionSize", 50);
+        ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT);
         numChannels = topology->getPropInt("@numChannels", 0);
         numActiveChannels = topology->getPropInt("@numActiveChannels", numChannels);
         statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600);

+ 10 - 6
roxie/ccd/ccdqueue.cpp

@@ -87,7 +87,7 @@ void joinMulticastChannel(unsigned channel)
     {
         IpAddress multicastIp;
         getChannelIp(multicastIp, channel);
-        SocketEndpoint ep(CCD_MULTICAST_PORT, multicastIp);
+        SocketEndpoint ep(ccdMulticastPort, multicastIp);
         StringBuffer epStr;
         ep.getUrlStr(epStr);
         if (!multicastSocket->join_multicast_group(ep))
@@ -108,11 +108,11 @@ void addEndpoint(unsigned channel, const IpAddress &slaveIp, unsigned port)
         multicastIp = slaveIp;
     if (!isSlaveEndpoint(channel, multicastIp))
     {
-        SocketEndpoint &ep = *new SocketEndpoint(CCD_MULTICAST_PORT, multicastIp);
+        SocketEndpoint &ep = *new SocketEndpoint(ccdMulticastPort, multicastIp);
         slaveEndpoints[channel].append(ep);
         if (!multicastSocket)
         {
-            multicastSocket.setown(ISocket::udp_create(CCD_MULTICAST_PORT));
+            multicastSocket.setown(ISocket::udp_create(ccdMulticastPort));
             multicastSocket->set_receive_buffer_size(udpMulticastBufferSize);
             size32_t actualSize = multicastSocket->get_receive_buffer_size();
             if (actualSize < udpMulticastBufferSize)
@@ -121,7 +121,7 @@ void addEndpoint(unsigned channel, const IpAddress &slaveIp, unsigned port)
                 throwUnexpected();
             }
             if (traceLevel)
-                DBGLOG("Roxie: multicast socket created port=%d sockbuffsize=%d actual %d", CCD_MULTICAST_PORT, udpMulticastBufferSize, actualSize);
+                DBGLOG("Roxie: multicast socket created port=%d sockbuffsize=%d actual %d", ccdMulticastPort, udpMulticastBufferSize, actualSize);
         }
     }
     if (channel)
@@ -1732,8 +1732,12 @@ public:
         getChannelIp(snifferIp, snifferChannel);
         if (udpMaxSlotsPerClient > udpQueueSize)
             udpMaxSlotsPerClient = udpQueueSize;
-        receiveManager.setown(createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, CCD_SNIFFER_PORT, snifferIp, udpQueueSize, udpMaxSlotsPerClient, myNodeIndex));
-        sendManager.setown(createSendManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, CCD_SNIFFER_PORT, snifferIp, udpSendQueueSize, fastLaneQueue ? 3 : 2, udpResendEnabled ? udpMaxSlotsPerClient : 0, bucket, myNodeIndex));
+        unsigned serverFlowPort = topology->getPropInt("@serverFlowPort", CCD_SERVER_FLOW_PORT);
+        unsigned dataPort = topology->getPropInt("@dataPort", CCD_DATA_PORT);
+        unsigned clientFlowPort = topology->getPropInt("@clientFlowPort", CCD_CLIENT_FLOW_PORT);
+        unsigned snifferPort = topology->getPropInt("@snifferPort", CCD_SNIFFER_PORT);
+        receiveManager.setown(createReceiveManager(serverFlowPort, dataPort, clientFlowPort, snifferPort, snifferIp, udpQueueSize, udpMaxSlotsPerClient, myNodeIndex));
+        sendManager.setown(createSendManager(serverFlowPort, dataPort, clientFlowPort, snifferPort, snifferIp, udpSendQueueSize, fastLaneQueue ? 3 : 2, udpResendEnabled ? udpMaxSlotsPerClient : 0, bucket, myNodeIndex));
         running = false;
     }