Преглед на файлове

HPCC-23342 Do not listen/accept for those dali clients that do not require it

Signed-off-by: Mark Kelly <mark.kelly@lexisnexisrisk.com>
Mark Kelly преди 5 години
родител
ревизия
a3f18ed902

+ 2 - 2
dali/base/daclient.cpp

@@ -88,11 +88,11 @@ IDaliClient_Exception *createClientException(DaliClientError err, const char *ms
 
 
 
-bool initClientProcess(IGroup *servergrp, DaliClientRole role, unsigned mpport, const char *clientVersion, const char *minServerVersion, unsigned timeout)
+bool initClientProcess(IGroup *servergrp, DaliClientRole role, unsigned mpport, const char *clientVersion, const char *minServerVersion, unsigned timeout, bool listen)
 {
     assertex(servergrp);
     daliClientIsActive = true;
-    startMPServer(role, mpport);
+    startMPServer(role, mpport, false, listen);
     Owned<ICommunicator> comm(createCommunicator(servergrp,true));
     IGroup * covengrp;
     if (!registerClientProcess(comm.get(),covengrp,timeout,role))

+ 1 - 1
dali/base/daclient.hpp

@@ -26,7 +26,7 @@
 #include "mpcomm.hpp"
 #include "dasds.hpp"
 
-extern da_decl bool initClientProcess(IGroup *servergrp, DaliClientRole role, unsigned mpport=0, const char *clientVersion=NULL, const char *minServerVersion=NULL, unsigned timeout=MP_WAIT_FOREVER);
+extern da_decl bool initClientProcess(IGroup *servergrp, DaliClientRole role, unsigned mpport=0, const char *clientVersion=NULL, const char *minServerVersion=NULL, unsigned timeout=MP_WAIT_FOREVER, bool listen=false);
 extern da_decl bool reinitClientProcess(IGroup *servergrp, DaliClientRole role, const char *clientVersion=NULL, const char *minServerVersion=NULL, unsigned timeout=MP_WAIT_FOREVER); 
 extern da_decl void closedownClientProcess();
 extern da_decl bool daliClientActive();

+ 1 - 1
dali/sasha/saserver.cpp

@@ -335,7 +335,7 @@ int main(int argc, const char* argv[])
 
         unsigned short port = (stop||coalescer)?0:DEFAULT_SASHA_PORT;
         Owned<IGroup> serverGroup = createIGroupRetry(daliServer.str(),DALI_SERVER_PORT);
-        initClientProcess(serverGroup, DCR_SashaServer, port, NULL, NULL, MP_WAIT_FOREVER);
+        initClientProcess(serverGroup, DCR_SashaServer, port, nullptr, nullptr, MP_WAIT_FOREVER, true);
         if (!stop&!coalescer) {
             startLogMsgParentReceiver();    // for auditing
             connectLogMsgManagerToDali();

+ 2 - 2
dali/server/daserver.cpp

@@ -612,7 +612,7 @@ int main(int argc, const char* argv[])
         }
 
         unsigned short myport = epa.item(myrank).port;
-        startMPServer(DCR_DaliServer, myport, true);
+        startMPServer(DCR_DaliServer, myport, true, true);
         Owned<IMPServer> mpServer = getMPServer();
         Owned<IWhiteListHandler> whiteListHandler = createWhiteListHandler(populateWhiteListFromEnvironment, formatDaliRole);
         mpServer->installWhiteListCallback(whiteListHandler);
@@ -687,7 +687,7 @@ int main(int argc, const char* argv[])
             throw;
         }
         PROGLOG("DASERVER[%d] starting - listening to port %d",myrank,queryMyNode()->endpoint().port);
-        startMPServer(DCR_DaliServer, myport,false);
+        startMPServer(DCR_DaliServer, myport, false, true);
         bool ok = true;
         ForEachItemIn(i2,servers)
         {

+ 7 - 2
system/jlib/jsocket.cpp

@@ -939,6 +939,11 @@ int CSocket::post_connect ()
 
 void CSocket::open(int listen_queue_size,bool reuseports)
 {
+    // If listen_queue_size==0 then bind port to address but
+    // do not actually listen() for accepting connections.
+    // This is used when a unique IP:port is needed for MP client
+    // INode/IGroup internals, but client never actually accepts connections.
+
     if (IP6preferred)
         sock = ::socket(AF_INET6, connectionless()?SOCK_DGRAM:SOCK_STREAM, PF_INET6);
     else
@@ -964,7 +969,7 @@ void CSocket::open(int listen_queue_size,bool reuseports)
 #ifndef _WIN32
     reuseports = true;  // for some reason linux requires reuse ports
 #endif
-    if (reuseports) {
+    if (reuseports && listen_queue_size) {
         int on = 1;
         setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));
     }
@@ -996,7 +1001,7 @@ ErrPortInUse:
             THROWJSOCKEXCEPTION(saverr);
         }
     }
-    if (!connectionless()) {
+    if (!connectionless() && listen_queue_size) {
         if (::listen(sock, listen_queue_size) != 0) {
             saverr = ERRNO();
             if (saverr==JSE_ADDRINUSE)

+ 34 - 20
system/mp/mpcomm.cpp

@@ -436,6 +436,7 @@ class CMPChannel;
 class CMPConnectThread: public Thread
 {
     bool running;
+    bool listen;
     ISocket *listensock;
     CMPServer *parent;
     int mpSoMaxConn;
@@ -444,7 +445,7 @@ class CMPConnectThread: public Thread
     void checkSelfDestruct(void *p,size32_t sz);
 
 public:
-    CMPConnectThread(CMPServer *_parent, unsigned port);
+    CMPConnectThread(CMPServer *_parent, unsigned port, bool _listen);
     ~CMPConnectThread()
     {
         ::Release(listensock);
@@ -504,7 +505,7 @@ public:
 
     IMPLEMENT_IINTERFACE_USING(CMPChannelHT);
 
-    CMPServer(unsigned __int64 _role, unsigned _port);
+    CMPServer(unsigned __int64 _role, unsigned _port, bool _listen);
     ~CMPServer();
     void start();
     virtual void stop();
@@ -824,7 +825,7 @@ protected: friend class CMPPacketReader;
             {
                 StringBuffer str;
 #ifdef _TRACE
-                LOG(MCdebugInfo(100), unknownJob, "MP: connecting to %s",remoteep.getUrlStr(str).str());
+                LOG(MCdebugInfo(100), unknownJob, "MP: connecting to %s role: %" I64F "u", remoteep.getUrlStr(str).str(), parent->getRole());
 #endif
                 if (((int)tm.timeout)<0)
                     remaining = CONNECT_TIMEOUT;
@@ -985,7 +986,7 @@ protected: friend class CMPPacketReader;
                 }
 
 #ifdef _TRACE
-                LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket read rd=%u, reply=%u, sizeof(connectHdr)=%lu", rd, reply, sizeof(connectHdr));
+                LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket read rd=%u, sizeof(connectHdr)=%lu", rd, sizeof(connectHdr));
 #endif
 
                 if (rd)
@@ -1580,7 +1581,7 @@ public:
                     // assumes packet header will arrive in one go
                     if (sizeavail<sizeof(hdr)) {
 #ifdef _FULLTRACE
-                        LOG(MCdebugInfo(100), unknownJob, "Selected stalled on header %d %d",sizeavail,sizeavail-sizeof(hdr));
+                        LOG(MCdebugInfo(100), unknownJob, "Selected stalled on header %u %lu",sizeavail,sizeavail-sizeof(hdr));
 #endif
                         size32_t szread;
                         sock->read(&hdr,sizeof(hdr),sizeof(hdr),szread,60); // I don't *really* want to block here but not much else can do
@@ -1935,18 +1936,22 @@ bool CMPChannel::sendPingReply(unsigned timeout,bool identifyself)
 }
     
 // --------------------------------------------------------
-CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port)
+CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port, bool _listen)
     : Thread("MP Connection Thread")
 {
     parent = _parent;
+    listen = _listen;
     mpSoMaxConn = 0;
     mpTraceLevel = 0;
     Owned<IPropertyTree> env = getHPCCEnvironment();
     if (env)
     {
-        mpSoMaxConn = env->getPropInt("EnvSettings/mpSoMaxConn", 0);
-        if (!mpSoMaxConn)
-            mpSoMaxConn = env->getPropInt("EnvSettings/ports/mpSoMaxConn", 0);
+        if (listen)
+        {
+            mpSoMaxConn = env->getPropInt("EnvSettings/mpSoMaxConn", 0);
+            if (!mpSoMaxConn)
+                mpSoMaxConn = env->getPropInt("EnvSettings/ports/mpSoMaxConn", 0);
+        }
         mpTraceLevel = env->getPropInt("EnvSettings/mpTraceLevel", 0);
     }
     if (mpSoMaxConn)
@@ -1956,7 +1961,7 @@ CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port)
         if (soMaxCheck && (mpSoMaxConn > kernSoMaxConn))
             WARNLOG("MP: kernel listen queue backlog setting (somaxconn=%d) is lower than environment mpSoMaxConn (%d) setting and should be increased", kernSoMaxConn, mpSoMaxConn);
     }
-    if (!mpSoMaxConn)
+    if (!mpSoMaxConn && listen)
         mpSoMaxConn = DEFAULT_LISTEN_QUEUE_SIZE;
     if (!port)
     {
@@ -1978,6 +1983,8 @@ CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port)
         }
         assertex(maxPort >= minPort);
         Owned<IJSOCK_Exception> lastErr;
+        // mck - if not listening then could ignore port range and
+        //       let OS select an unused port ...
         unsigned numPorts = maxPort - minPort + 1;
         for (unsigned retries = 0; retries < numPorts * 3; retries++)
         {
@@ -2045,6 +2052,8 @@ void CMPConnectThread::startPort(unsigned short port)
 {
     if (!listensock)
         listensock = ISocket::create(port, mpSoMaxConn);
+    if (!listen)
+        return;
     running = true;
     Thread::start();
 }
@@ -2312,13 +2321,18 @@ CMPChannel *CMPServer::lookup(const SocketEndpoint &endpoint)
 }
 
 
-CMPServer::CMPServer(unsigned __int64 _role, unsigned _port)
+CMPServer::CMPServer(unsigned __int64 _role, unsigned _port, bool _listen)
 {
     RTsalt=0xff;
     role = _role;
     port = 0;   // connectthread tells me what port it actually connected on
     checkclosed = false;
-    connectthread = new CMPConnectThread(this, _port);
+
+    // If !_listen, CMPConnectThread binds a port but does not actually start
+    // running, it is used as a unique IP:port required in MP INode/IGroup internals
+    // for MP clients that do not need to accept connections.
+
+    connectthread = new CMPConnectThread(this, _port, _listen);
     selecthandler = createSocketSelectHandler();
     pingpackethandler = new PingPacketHandler;              // TAG_SYS_PING
     pingreplypackethandler = new PingReplyPacketHandler;    // TAG_SYS_PING_REPLY
@@ -2521,7 +2535,7 @@ void CMPServer::start()
 
 void CMPServer::stop()
 {
-    selecthandler->stop(true); 
+    selecthandler->stop(true);
     connectthread->stop();
     CMPChannel *c = NULL;
     for (;;) {
@@ -3201,10 +3215,10 @@ ICommunicator *CMPServer::createCommunicator(IGroup *group, bool outer)
 
 ///////////////////////////////////
 
-IMPServer *startNewMPServer(unsigned port)
+IMPServer *startNewMPServer(unsigned port, bool listen)
 {
     assertex(sizeof(PacketHeader)==32);
-    CMPServer *mpServer = new CMPServer(0, port);
+    CMPServer *mpServer = new CMPServer(0, port, listen);
     mpServer->start();
     return mpServer;
 }
@@ -3218,7 +3232,7 @@ class CGlobalMPServer : public CMPServer
 public:
     static CriticalSection sect;
 
-    CGlobalMPServer(unsigned __int64 _role, unsigned _port) : CMPServer(_role, _port)
+    CGlobalMPServer(unsigned __int64 _role, unsigned _port, bool _listen) : CMPServer(_role, _port, _listen)
     {
         worldcomm = NULL;
         nestLevel = 0;
@@ -3252,13 +3266,13 @@ MODULE_EXIT()
     ::Release(globalMPServer);
 }
 
-void startMPServer(unsigned __int64 role, unsigned port, bool paused)
+void startMPServer(unsigned __int64 role, unsigned port, bool paused, bool listen)
 {
     assertex(sizeof(PacketHeader)==32);
     CriticalBlock block(CGlobalMPServer::sect);
     if (NULL == globalMPServer)
     {
-        globalMPServer = new CGlobalMPServer(role, port);
+        globalMPServer = new CGlobalMPServer(role, port, listen);
         initMyNode(globalMPServer->getPort());
     }
     if (0 == globalMPServer->queryNest())
@@ -3275,9 +3289,9 @@ void startMPServer(unsigned __int64 role, unsigned port, bool paused)
     globalMPServer->incNest();
 }
 
-void startMPServer(unsigned port, bool paused)
+void startMPServer(unsigned port, bool paused, bool listen)
 {
-    startMPServer(0, port, paused);
+    startMPServer(0, port, paused, listen);
 }
 
 void stopMPServer()

+ 3 - 3
system/mp/mpcomm.hpp

@@ -104,11 +104,11 @@ interface IMPServer : extends IInterface
     virtual IWhiteListHandler *queryWhiteListCallback() const = 0;
 };
 
-extern mp_decl void startMPServer(unsigned port, bool paused=false);
-extern mp_decl void startMPServer(unsigned __int64 role, unsigned port, bool paused=false);
+extern mp_decl void startMPServer(unsigned port, bool paused=false, bool listen=false);
+extern mp_decl void startMPServer(unsigned __int64 role, unsigned port, bool paused=false, bool listen=false);
 extern mp_decl void stopMPServer();
 extern mp_decl IMPServer *getMPServer();
-extern mp_decl IMPServer *startNewMPServer(unsigned port);
+extern mp_decl IMPServer *startNewMPServer(unsigned port, bool listen=false);
 
 interface IConnectionMonitor: extends IInterface
 {

+ 1 - 1
thorlcr/master/thmastermain.cpp

@@ -654,7 +654,7 @@ int main( int argc, const char *argv[]  )
             try
             {
                 LOG(MCdebugProgress, thorJob, "calling initClientProcess %d", thorEp.port);
-                initClientProcess(serverGroup, DCR_ThorMaster, thorEp.port);
+                initClientProcess(serverGroup, DCR_ThorMaster, thorEp.port, nullptr, nullptr, MP_WAIT_FOREVER, true);
                 if (0 == thorEp.port)
                     thorEp.port = queryMyNode()->endpoint().port;
                 // both same

+ 1 - 1
thorlcr/slave/slavmain.cpp

@@ -1665,7 +1665,7 @@ public:
         for (unsigned sc=1; sc<channelsPerSlave; sc++)
         {
             unsigned port = getMachinePortBase() + (sc * localThorPortInc);
-            IMPServer *mpServer = startNewMPServer(port);
+            IMPServer *mpServer = startNewMPServer(port, true);
             if (reconnect)
                 mpServer->setOpt(mpsopt_channelreopen, "true");
             mpServers.append(*mpServer);

+ 1 - 1
thorlcr/slave/thslavemain.cpp

@@ -394,7 +394,7 @@ int main( int argc, const char *argv[]  )
         if (0 == slfEp.port) // assume default from config if not on command line
             slfEp.port = globals->getPropInt("@slaveport", THOR_BASESLAVE_PORT);
 
-        startMPServer(DCR_ThorSlave, slfEp.port, false);
+        startMPServer(DCR_ThorSlave, slfEp.port, false, true);
         if (0 == slfEp.port)
             slfEp.port = queryMyNode()->endpoint().port;
         setMachinePortBase(slfEp.port);