Преглед изворни кода

HPCC-13451 Added config setting for Roxie multicast TTL

Changed to now set TTL for all MC sockets.
Default TTL if not set in xml is 1
If TTL is 0 then do not explicitly set TTL
to behave like before (using OS defaults)
Added multicastTTL="1" to default environment.xml

Signed-off-by: Mark Kelly <mark.kelly@lexisnexis.com>
Mark Kelly пре 9 година
родитељ
комит
24f0bd982e

+ 7 - 0
initfiles/componentfiles/configxml/roxie.xsd.in

@@ -386,6 +386,13 @@
           </xs:appinfo>
         </xs:annotation>
       </xs:attribute>
+      <xs:attribute name="multicastTTL" type="xs:nonNegativeInteger" use="optional" default="1">
+        <xs:annotation>
+          <xs:appinfo>
+            <tooltip>The multicast TTL (Time To Live) setting for this roxie cluster.  Zero means do not explicitly set TTL, and use the default OS setting.</tooltip>
+          </xs:appinfo>
+        </xs:annotation>
+      </xs:attribute>
       <xs:attribute name="directory" type="absolutePath" use="optional" default="${EXEC_PREFIX}/lib/${DIR_NAME}/roxie/">
         <xs:annotation>
           <xs:appinfo>

+ 1 - 0
initfiles/etc/DIR_NAME/environment.xml.in

@@ -865,6 +865,7 @@
                 monitorDaliFileServer="false"
                 multicastBase="239.1.1.1"
                 multicastLast="239.1.254.254"
+                multicastTTL="1"
                 name="myroxie"
                 nodeCacheMem="100"
                 nodeCachePreload="false"

+ 14 - 0
roxie/ccd/ccdmain.cpp

@@ -717,6 +717,20 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
         if (udpSnifferEnabled && !roxieMulticastEnabled)
             DBGLOG("WARNING: ignoring udpSnifferEnabled setting as multicast not enabled");
 
+        int ttlTmp = topology->getPropInt("@multicastTTL", 1);
+        if (ttlTmp < 0)
+        {
+            multicastTTL = 1;
+            WARNLOG("multicastTTL value (%d) invalid, must be >=0, resetting to %u", ttlTmp, multicastTTL);
+        }
+        else if (ttlTmp > 255)
+        {
+            multicastTTL = 255;
+            WARNLOG("multicastTTL value (%d) invalid, must be <=%u, resetting to maximum", ttlTmp, multicastTTL);
+        }
+        else
+            multicastTTL = ttlTmp;
+
         indexReadChunkSize = topology->getPropInt("@indexReadChunkSize", 60000);
         numSlaveThreads = topology->getPropInt("@slaveThreads", 30);
         numServerThreads = topology->getPropInt("@serverThreads", 30);

+ 7 - 0
roxie/ccd/ccdqueue.cpp

@@ -103,6 +103,13 @@ void openMulticastSocket()
     if (!multicastSocket)
     {
         multicastSocket.setown(ISocket::udp_create(ccdMulticastPort));
+        if (multicastTTL)
+        {
+            multicastSocket->set_ttl(multicastTTL);
+            DBGLOG("Roxie: multicastTTL: %u", multicastTTL);
+        }
+        else
+            DBGLOG("Roxie: multicastTTL not set");
         multicastSocket->set_receive_buffer_size(udpMulticastBufferSize);
         size32_t actualSize = multicastSocket->get_receive_buffer_size();
         if (actualSize < udpMulticastBufferSize)

+ 2 - 0
roxie/udplib/udplib.hpp

@@ -123,6 +123,8 @@ extern UDPLIB_API unsigned udpTraceCategories;
 extern UDPLIB_API unsigned udpOutQsPriority;
 extern UDPLIB_API void queryMemoryPoolStats(StringBuffer &memStats);
 
+extern UDPLIB_API unsigned multicastTTL;
+
 #ifdef __linux__
 extern UDPLIB_API void setLinuxThreadPriority(int level);
 #endif

+ 1 - 0
roxie/udplib/udpsha.cpp

@@ -43,6 +43,7 @@ bool     enableSocketMaxSetting = false;
 unsigned udpFlowSocketsSize = 131072;
 unsigned udpLocalWriteSocketSize = 1024000;
 
+unsigned multicastTTL = 1;
 
 MODULE_INIT(INIT_PRIORITY_STANDARD)
 {

+ 2 - 2
roxie/udplib/udptrr.cpp

@@ -493,7 +493,7 @@ class CReceiveManager : public CInterface, implements IReceiveManager
           : Thread("udplib::receive_sniffer"), parent(_parent), snifferPort(_snifferPort), snifferIP(_snifferIP), running(false)
         {
             snifferTable = new SnifferEntry[numNodes];
-            sniffer_socket = ISocket::multicast_create(snifferPort, snifferIP);
+            sniffer_socket = ISocket::multicast_create(snifferPort, snifferIP, multicastTTL);
             if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0) {
                 if (!enableSocketMaxSetting)
                     throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
@@ -556,7 +556,7 @@ class CReceiveManager : public CInterface, implements IReceiveManager
                     DBGLOG("UdpReceiver: receive_sniffer::run unknown exception port %u", parent.data_port);
                     if (sniffer_socket) {
                         sniffer_socket->Release();
-                        sniffer_socket = ISocket::multicast_create(snifferPort, snifferIP);
+                        sniffer_socket = ISocket::multicast_create(snifferPort, snifferIP, multicastTTL);
                     }
                     MilliSleep(1000);
                 }

+ 1 - 1
roxie/udplib/udptrs.cpp

@@ -785,7 +785,7 @@ class CSendManager : public CInterface, implements ISendManager
             {
                 if (!sniffer_socket) 
                 {
-                    sniffer_socket = ISocket::multicast_connect(ep, 3);
+                    sniffer_socket = ISocket::multicast_connect(ep, multicastTTL);
                     if (udpTraceLevel > 1)
                     {
                         StringBuffer url;

+ 31 - 10
system/jlib/jsocket.cpp

@@ -430,19 +430,21 @@ public:
     // Block functions
     void        set_block_mode(unsigned flags,size32_t recsize=0,unsigned timeoutms=0);
     bool        send_block(const void *blk,size32_t sz);
-    size32_t        receive_block_size();
-    size32_t        receive_block(void *blk,size32_t sz);
+    size32_t    receive_block_size();
+    size32_t    receive_block(void *blk,size32_t sz);
 
-    size32_t        get_send_buffer_size();
+    size32_t    get_send_buffer_size();
     void        set_send_buffer_size(size32_t sz);
 
     bool        join_multicast_group(SocketEndpoint &ep);   // for udp multicast
     bool        leave_multicast_group(SocketEndpoint &ep);  // for udp multicast
 
-    size32_t        get_receive_buffer_size();
+    void        set_ttl(unsigned _ttl);
+
+    size32_t    get_receive_buffer_size();
     void        set_receive_buffer_size(size32_t sz);
 
-    size32_t        avail_read();
+    size32_t    avail_read();
 
     int         pre_connect(bool block);
     int         post_connect();
@@ -1579,7 +1581,7 @@ void CSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &si
 {
     unsigned startt=usTick();
     size_read = 0;
-    unsigned start;
+    unsigned start = 0;
     unsigned timeleft = 0;
     if (state != ss_open) {
         THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
@@ -2391,6 +2393,21 @@ bool CSocket::leave_multicast_group(SocketEndpoint &ep)
 }
 
 
+void CSocket::set_ttl(unsigned _ttl)
+{
+    if (_ttl)
+    {
+        u_char ttl = _ttl;
+        setsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL, (char *)&ttl, sizeof(ttl));
+    }
+#ifdef SOCKTRACE
+    int ttl0 = 0;
+    socklen_t ttl1 = sizeof(ttl0);
+    getsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl0, &ttl1);
+    DBGLOG("set_ttl: socket fd: %d requested ttl: %d actual ttl: %d", sock, _ttl, ttl0);
+#endif
+    return;
+}
 
 CSocket::~CSocket()
 {
@@ -2510,17 +2527,19 @@ ISocket* ISocket::udp_create(unsigned short p)
     return sock.getClear();
 }
 
-ISocket* ISocket::multicast_create(unsigned short p, const char *mcip)
+ISocket* ISocket::multicast_create(unsigned short p, const char *mcip, unsigned _ttl)
 {
     if (p==0)
         THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
     SocketEndpoint ep(mcip,p);
     Owned<CSocket> sock = new CSocket(ep,sm_multicast_server,mcip);
     sock->open(0,true);
+    if (_ttl)
+        sock->set_ttl(_ttl);
     return sock.getClear();
 }
 
-ISocket* ISocket::multicast_create(unsigned short p, const IpAddress &ip)
+ISocket* ISocket::multicast_create(unsigned short p, const IpAddress &ip, unsigned _ttl)
 {
     if (p==0)
         THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
@@ -2528,6 +2547,8 @@ ISocket* ISocket::multicast_create(unsigned short p, const IpAddress &ip)
     StringBuffer tmp;
     Owned<CSocket> sock = new CSocket(ep,sm_multicast_server,ip.getIpText(tmp).str());
     sock->open(0,true);
+    if (_ttl)
+        sock->set_ttl(_ttl);
     return sock.getClear();
 }
 
@@ -2560,8 +2581,8 @@ ISocket* ISocket::multicast_connect(const SocketEndpoint &ep, unsigned _ttl)
 {
     Owned<CSocket> sock = new CSocket(ep,sm_multicast,NULL);
     sock->udpconnect();
-    u_char ttl = _ttl;
-    setsockopt(sock->OShandle(), IPPROTO_IP, IP_MULTICAST_TTL, (char *) &ttl, sizeof(ttl));
+    if (_ttl)
+        sock->set_ttl(_ttl);
     return sock.getClear();
 }
 

+ 11 - 9
system/jlib/jsocket.hpp

@@ -246,14 +246,14 @@ public:
 
     // Create client socket connected to a multicast server socket
     //
-    static ISocket*  multicast_connect( unsigned short port, const char *mcgroupip, unsigned ttl);
-    static ISocket*  multicast_connect( const SocketEndpoint &ep, unsigned ttl);
+    static ISocket*  multicast_connect( unsigned short port, const char *mcgroupip, unsigned _ttl=0);
+    static ISocket*  multicast_connect( const SocketEndpoint &ep, unsigned _ttl=0);
 
     //
     // Create server multicast socket
     //
-    static ISocket*  multicast_create( unsigned short port, const char *mcgroupip);
-    static ISocket*  multicast_create( unsigned short port, const IpAddress &mcgroupip);
+    static ISocket*  multicast_create( unsigned short port, const char *mcgroupip, unsigned _ttl=0);
+    static ISocket*  multicast_create( unsigned short port, const IpAddress &mcgroupip, unsigned _ttl=0);
 
     //
     // Creates an ISocket for an already created socket
@@ -368,13 +368,15 @@ public:
     virtual bool join_multicast_group(SocketEndpoint &ep) = 0;  // for udp multicast
     virtual bool leave_multicast_group(SocketEndpoint &ep) = 0; // for udp multicast
 
-    virtual size32_t get_receive_buffer_size() =0;              // get OS receive buffer
-    virtual void set_receive_buffer_size(size32_t sz) =0;           // set OS receive buffer size
+    virtual void set_ttl(unsigned _ttl) = 0; // set TTL
 
-    virtual void set_keep_alive(bool set)=0;                    // set option SO_KEEPALIVE
+    virtual size32_t get_receive_buffer_size() = 0;             // get OS receive buffer
+    virtual void set_receive_buffer_size(size32_t sz) = 0;      // set OS receive buffer size
 
-    virtual size32_t udp_write_to(const SocketEndpoint &ep,void const* buf, size32_t size)=0;
-        virtual bool check_connection() = 0;
+    virtual void set_keep_alive(bool set) = 0;                  // set option SO_KEEPALIVE
+
+    virtual size32_t udp_write_to(const SocketEndpoint &ep,void const* buf, size32_t size) = 0;
+    virtual bool check_connection() = 0;
 
 
 /*

+ 5 - 0
system/security/securesocket/securesocket.cpp

@@ -346,6 +346,11 @@ public:
         return false;
     }
 
+    void set_ttl(unsigned _ttl)   // set ttl
+    {
+        throw MakeStringException(-1, "not implemented");
+    }
+
     size32_t get_receive_buffer_size()  // get OS send buffer
     {
         throw MakeStringException(-1, "not implemented");

+ 1 - 0
testing/regress/environment.xml.in

@@ -865,6 +865,7 @@
                 monitorDaliFileServer="false"
                 multicastBase="239.1.1.1"
                 multicastLast="239.1.254.254"
+                multicastTTL="1"
                 name="myroxie"
                 nodeCacheMem="100"
                 nodeCachePreload="false"