Browse Source

Merge pull request #4141 from richardkchapman/mp-ports

HPCC-8539 Add configuration option to specify dali client MP port range

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 12 years ago
parent
commit
960d80b81a

+ 3 - 2
dali/base/daclient.cpp

@@ -88,14 +88,15 @@ IDaliClient_Exception *createClientException(DaliClientError err, const char *ms
 
 
 
-bool initClientProcess(IGroup *servergrp, DaliClientRole role, unsigned short mpport, const char *clientVersion, const char *minServerVersion, unsigned timeout)
+bool initClientProcess(IGroup *servergrp, DaliClientRole role, unsigned mpport, const char *clientVersion, const char *minServerVersion, unsigned timeout)
 {
     assertex(servergrp);
     daliClientIsActive = true;
     startMPServer(mpport);
     Owned<ICommunicator> comm(createCommunicator(servergrp,true));
     IGroup * covengrp;
-    if (!registerClientProcess(comm.get(),covengrp,timeout,role)) {
+    if (!registerClientProcess(comm.get(),covengrp,timeout,role))
+    {
         daliClientIsActive = false;
         return false;
     }

+ 1 - 1
dali/base/daclient.hpp

@@ -26,7 +26,7 @@
 #include "mpcomm.hpp"
 #include "dasds.hpp"
 
-extern da_decl bool initClientProcess(IGroup *servergrp, DaliClientRole role, unsigned short mpport=0, const char *clientVersion=NULL, const char *minServerVersion=NULL, unsigned timeout=MP_WAIT_FOREVER);
+extern da_decl bool initClientProcess(IGroup *servergrp, DaliClientRole role, unsigned mpport=0, const char *clientVersion=NULL, const char *minServerVersion=NULL, unsigned timeout=MP_WAIT_FOREVER);
 extern da_decl bool reinitClientProcess(IGroup *servergrp, DaliClientRole role, const char *clientVersion=NULL, const char *minServerVersion=NULL, unsigned timeout=MP_WAIT_FOREVER); 
 extern da_decl void closedownClientProcess();
 extern da_decl bool daliClientActive();

+ 4 - 0
initfiles/etc/DIR_NAME/environment.xml.in

@@ -15,6 +15,10 @@
   <runtime>${RUNTIME_PATH}</runtime>
   <sourcedir>${CONFIG_SOURCE_PATH}</sourcedir>
   <user>${RUNTIME_USER}</user>
+  <ports>
+   <mpStart>7101</mpStart>
+   <mpEnd>7500</mpEnd>
+  </ports>
  </EnvSettings>
  <Hardware>
   <Computer computerType="linuxmachine"

+ 1 - 1
plugins/fileservices/fileservices.cpp

@@ -198,7 +198,7 @@ static IPropertyTree *getEnvironment()
             env.setown(createPTreeFromIPT(conn->queryRoot())); // we don't really need to copy here
     }
     if (!env.get())
-        env.setown(getHPCCenvironment());
+        env.setown(getHPCCEnvironment());
     return env.getClear();
 }
 

+ 3 - 3
system/include/portlist.h

@@ -53,9 +53,9 @@
 
 #define DALI_SERVER_PORT                7070 
 #define DATA_TRANSFER_PORT              7080
-#define DAFILESRV_PORT                  7100 /// aka daliservix
-#define MP_BASE_PORT                    7101 //..7999 (for Dali too)
-#define MP_PORT_RANGE                   400
+#define DAFILESRV_PORT                  7100 // aka daliservix
+#define MP_START_PORT                   7101 // Default range for MP ports
+#define MP_END_PORT                     7500
 
 //ESP SERVICES
 //INSECURE

+ 11 - 7
system/jlib/jutil.cpp

@@ -2261,9 +2261,9 @@ StringBuffer & fillConfigurationDirectoryEntry(const char *dir,const char *name,
     return dirout;
 }
 
-IPropertyTree *getHPCCenvironment(const char *confloc)
+IPropertyTree *getHPCCEnvironment(const char *configFileName)
 {
-    StringBuffer configFileSpec(confloc);
+    StringBuffer configFileSpec(configFileName);
     if (!configFileSpec.length())
 #ifdef _WIN32 
         return NULL;
@@ -2271,16 +2271,20 @@ IPropertyTree *getHPCCenvironment(const char *confloc)
         configFileSpec.set(CONFIG_DIR).append(PATHSEPSTR).append("environment.conf");
 #endif  
     Owned<IProperties> props = createProperties(configFileSpec.str());
-    if (props) {
+    if (props)
+    {
         StringBuffer envfile;
-        if (props->getProp("environment",envfile)&&envfile.length()) {
-            if (!isAbsolutePath(envfile.str())) {
+        if (props->getProp("environment",envfile)&&envfile.length())
+        {
+            if (!isAbsolutePath(envfile.str()))
+            {
                 StringBuffer tail(envfile);
                 splitDirTail(configFileSpec.str(),envfile.clear());
                 addPathSepChar(envfile).append(tail);
             }
             Owned<IFile> file = createIFile(envfile.str());
-            if (file) {
+            if (file)
+            {
                 Owned<IFileIO> fileio = file->open(IFOread);
                 if (fileio)
                     return createPTree(*fileio);
@@ -2292,7 +2296,7 @@ IPropertyTree *getHPCCenvironment(const char *confloc)
 
 static IPropertyTree *getOSSdirTree()
 {
-    Owned<IPropertyTree> envtree = getHPCCenvironment();
+    Owned<IPropertyTree> envtree = getHPCCEnvironment();
     if (envtree) {
         IPropertyTree *ret = envtree->queryPropTree("Software/Directories");
         if (ret) 

+ 1 - 1
system/jlib/jutil.hpp

@@ -256,7 +256,7 @@ public:
 
 extern jlib_decl StringBuffer passwordInput(const char* prompt, StringBuffer& passwd);
 
-extern jlib_decl IPropertyTree *getHPCCenvironment(const char *confloc=NULL);
+extern jlib_decl IPropertyTree *getHPCCEnvironment(const char *configFileName=NULL);
 extern jlib_decl bool getConfigurationDirectory(const IPropertyTree *dirtree, // NULL to use HPCC config
                                                 const char *category, 
                                                 const char *component,

+ 41 - 16
system/mp/mpcomm.cpp

@@ -397,7 +397,7 @@ class CMPConnectThread: public Thread
     CMPServer *parent;
     void checkSelfDestruct(void *p,size32_t sz);
 public:
-    CMPConnectThread(CMPServer *_parent,unsigned short port); 
+    CMPConnectThread(CMPServer *_parent, unsigned port);
     ~CMPConnectThread()
     {
         ::Release(listensock);
@@ -445,7 +445,7 @@ public:
     UserPacketHandler           *userpackethandler;         // default
 
 
-    CMPServer(unsigned short port);
+    CMPServer(unsigned _port);
     ~CMPServer();
     void start();
     void stop();
@@ -1547,30 +1547,51 @@ bool CMPChannel::sendPingReply(unsigned timeout,bool identifyself)
 }
     
 // --------------------------------------------------------
-CMPConnectThread::CMPConnectThread(CMPServer *_parent,unsigned short port)
+CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port)
     : Thread("MP Connection Thread")
 {
     parent = _parent;
-    if (port==0) {  // need to connect early to resolve clash
-        loop {
-            port = MP_BASE_PORT+getRandom()%MP_PORT_RANGE;
-            try {
-                listensock = ISocket::create(port,16);  // better not to have *too* many waiting
+    if (!port)
+    {
+        // need to connect early to resolve clash
+        Owned<IPropertyTree> env = getHPCCEnvironment();
+        unsigned minPort, maxPort;
+        if (env)
+        {
+            minPort = env->getPropInt("EnvSettings/ports/mpStart", MP_START_PORT);
+            maxPort = env->getPropInt("EnvSettings/ports/mpEnd", MP_END_PORT);
+        }
+        else
+        {
+            minPort = MP_START_PORT;
+            maxPort = MP_END_PORT;
+        }
+        assertex(maxPort >= minPort);
+        Owned<IJSOCK_Exception> lastErr;
+        unsigned numPorts = maxPort - minPort + 1;
+        for (int retries = 0; retries < numPorts * 3; retries++)
+        {
+            port = minPort + getRandom() % numPorts;
+            try
+            {
+                listensock = ISocket::create(port, 16);  // better not to have *too* many waiting
                 break;
             }
             catch (IJSOCK_Exception *e)
             {
                 if (e->errorCode()!=JSOCKERR_port_in_use)
                     throw;
-                e->Release();
+                lastErr.setown(e);
             }
         }
+        if (!listensock)
+            throw lastErr.getClear();
     }
     else 
         listensock = NULL;  // delay create till running
     parent->setPort(port);
 #ifdef _TRACE
-    LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread Init Port = %d",port);
+    LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread Init Port = %d", port);
 #endif
     running = false;
 }
@@ -1771,10 +1792,11 @@ CMPChannel &CMPServer::lookup(const SocketEndpoint &endpoint)
 }
 
 
-CMPServer::CMPServer(unsigned short _port) 
+CMPServer::CMPServer(unsigned _port)
 {
+    port = 0;   // connectthread tells me what port it actually connected on
     checkclosed = false;
-    connectthread = new CMPConnectThread(this,_port);
+    connectthread = new CMPConnectThread(this, _port);
     selecthandler = createSocketSelectHandler();
     pingpackethandler = new PingPacketHandler;              // TAG_SYS_PING
     pingreplypackethandler = new PingReplyPacketHandler;    // TAG_SYS_PING_REPLY
@@ -2493,16 +2515,19 @@ IInterCommunicator &queryWorldCommunicator()
     return *worldcomm;
 }
 
-void startMPServer(unsigned short port,bool paused)
+void startMPServer(unsigned port, bool paused)
 {
     assertex(sizeof(PacketHeader)==32);
     CriticalBlock block(CMPServer::serversect); 
-    if (CMPServer::servernest==0) {
-        if (!CMPServer::serverpaused) {
+    if (CMPServer::servernest==0)
+    {
+        if (!CMPServer::serverpaused)
+        {
             delete MPserver;
             MPserver = new CMPServer(port);
         }
-        if (paused) {
+        if (paused)
+        {
             CMPServer::serverpaused = true;
             return;
         }

+ 1 - 1
system/mp/mpcomm.hpp

@@ -89,7 +89,7 @@ extern mp_decl mptag_t createReplyTag(); // creates (short-lived) reply-tag;
 extern mp_decl ICommunicator *createCommunicator(IGroup *group,bool outer=false); // outer allows nodes outside group to send
 extern mp_decl IInterCommunicator &queryWorldCommunicator();
 
-extern mp_decl void startMPServer(unsigned short port,bool paused=false);
+extern mp_decl void startMPServer(unsigned port,bool paused=false);
 extern mp_decl void stopMPServer();
 
 interface IConnectionMonitor: extends IInterface