Преглед изворни кода

HPCC-13984 Increase listen backlog for large clusters with many thor slaves

Added check for kernel somaxconn and env mpSoMaxConn at MP start up.
Added changes to conf and xml file for mpStart, mpEnd, mpSoMaxConn to
not be nested so that configmgr will use with new configs.
Fixed typo in xml.in.
Updated testing/regress/environment.xml.in to match same changes.

Signed-off-by: Mark Kelly <mark.kelly@lexisnexis.com>
Mark Kelly пре 10 година
родитељ
комит
70e096b224

+ 3 - 1
initfiles/etc/DIR_NAME/environment.conf.in

@@ -21,7 +21,9 @@ interface=*
 use_epoll=true
 # allow kernel pagecache flushing where enabled (true/false)
 allow_pgcache_flush=true
-
+mpStart=7101
+mpEnd=7500
+mpSoMaxConn=128
 #enable SSL for dafilesrv remote file access
 #dfsUseSSL=false
 #dfsSSLCertFile=/certfilepath/certfile

+ 3 - 4
initfiles/etc/DIR_NAME/environment.xml.in

@@ -15,10 +15,9 @@
   <runtime>${RUNTIME_PATH}</runtime>
   <sourcedir>${CONFIG_SOURCE_PATH}</sourcedir>
   <user>${RUNTIME_USER}</user>
-  <ports>
-   <mpStart>7101</mpStart>
-   <mpEnd>7500</mpEnd>
-  </ports>
+  <mpStart>7101</mpStart>
+  <mpEnd>7500</mpEnd>
+  <mpSoMaxConn>128</mpSoMaxConn>
  </EnvSettings>
  <Hardware>
   <Computer computerType="linuxmachine"

+ 47 - 5
system/mp/mpcomm.cpp

@@ -393,7 +393,28 @@ void releaseDynamicTag(unsigned short tag)
     freetags.append(tag);
 }
 
+bool check_kernparam(const char *path, int *value)
+{
+#ifdef __linux__
+    FILE *f = fopen(path,"r");
+    char res[32];
+    char *r = 0;
+    if (f) {
+        r = fgets(res, sizeof(res), f);
+        fclose(f);
+        if (r) {
+            *value = atoi(r);
+            return true;
+        }
+    }
+#endif
+    return false;
+}
 
+bool check_somaxconn(int *val)
+{
+    return check_kernparam("/proc/sys/net/core/somaxconn", val);
+}
 
 class CMPServer;
 class CMPChannel;
@@ -404,6 +425,7 @@ class CMPConnectThread: public Thread
     bool running;
     ISocket *listensock;
     CMPServer *parent;
+    int mpSoMaxConn;
     void checkSelfDestruct(void *p,size32_t sz);
 public:
     CMPConnectThread(CMPServer *_parent, unsigned port);
@@ -1778,15 +1800,35 @@ CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port)
     : Thread("MP Connection Thread")
 {
     parent = _parent;
+    mpSoMaxConn = 0;
+    Owned<IPropertyTree> env = getHPCCEnvironment();
+    if (env)
+    {
+        mpSoMaxConn = env->getPropInt("EnvSettings/mpSoMaxConn", 0);
+        if (!mpSoMaxConn)
+            mpSoMaxConn = env->getPropInt("EnvSettings/ports/mpSoMaxConn", 0);
+    }
+    if (mpSoMaxConn)
+    {
+        int kernSoMaxConn = 0;
+        bool soMaxCheck = check_somaxconn(&kernSoMaxConn);
+        if (soMaxCheck && (mpSoMaxConn > kernSoMaxConn))
+            WARNLOG("MP: kernel listen queue backlog setting (somaxconn=%d) is lower than environment mpSoMaxConn (%d) setting and should be increased", kernSoMaxConn, mpSoMaxConn);
+    }
+    if (!mpSoMaxConn)
+        mpSoMaxConn = SOMAXCONN;
     if (!port)
     {
         // need to connect early to resolve clash
-        Owned<IPropertyTree> env = getHPCCEnvironment();
         unsigned minPort, maxPort;
         if (env)
         {
-            minPort = env->getPropInt("EnvSettings/ports/mpStart", MP_START_PORT);
-            maxPort = env->getPropInt("EnvSettings/ports/mpEnd", MP_END_PORT);
+            minPort = env->getPropInt("EnvSettings/mpStart", 0);
+            if (!minPort)
+                minPort = env->getPropInt("EnvSettings/ports/mpStart", MP_START_PORT);
+            maxPort = env->getPropInt("EnvSettings/mpEnd", 0);
+            if (!maxPort)
+                maxPort = env->getPropInt("EnvSettings/ports/mpEnd", MP_END_PORT);
         }
         else
         {
@@ -1801,7 +1843,7 @@ CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port)
             port = minPort + getRandom() % numPorts;
             try
             {
-                listensock = ISocket::create(port, 16);  // better not to have *too* many waiting
+                listensock = ISocket::create(port, mpSoMaxConn);
                 break;
             }
             catch (IJSOCK_Exception *e)
@@ -1861,7 +1903,7 @@ void CMPConnectThread::checkSelfDestruct(void *p,size32_t sz)
 void CMPConnectThread::start(unsigned short port)
 {
     if (!listensock)
-        listensock = ISocket::create(port,16);  
+        listensock = ISocket::create(port, mpSoMaxConn);
     running = true;
     Thread::start();
 }

+ 55 - 11
system/mp/test/mptest.cpp

@@ -32,8 +32,8 @@ using namespace std;
 
 class CSectionTimer
 {
-    HiresTimer hrt[100];
-    unsigned tids[100];
+    HiresTimer hrt[1000];
+    unsigned tids[1000];
 
     const char *name;
     static CriticalSection findsect;
@@ -46,7 +46,7 @@ class CSectionTimer
         CriticalBlock block(findsect);
         unsigned tid = (unsigned)(memsize_t)GetCurrentThreadId();
         unsigned i;
-        for (i=0;i<99;i++) {
+        for (i=0;i<999;i++) {
             if (tids[i]==tid)
                 break;
             if (tids[i]==0) {
@@ -424,7 +424,7 @@ void MultiTest(ICommunicator *_comm)
             unsigned n=(comm->queryGroup().ordinality()-1)*N;
             CMessageBuffer mb;
             CRandomBuffer *buff = new CRandomBuffer();
-            PrintLog("MPTEST: started server");
+            PrintLog("MPTEST: started server, myrank = %d", comm->queryGroup().rank());
             try {
                 while(n--) {
                     mb.clear();
@@ -444,6 +444,10 @@ void MultiTest(ICommunicator *_comm)
 #endif
 
                     mb.clear().append(buff->crc);
+
+                    int delay = getRandom() % 20;
+                    Sleep(delay);
+
                     comm->reply(mb);
                 }
             }
@@ -488,7 +492,7 @@ void MultiTest(ICommunicator *_comm)
         targets[k] = t;
     }
 
-    PrintLog("MPTEST: client started");
+    PrintLog("MPTEST: client started, myrank = %d", comm->queryGroup().rank());
 
     try {
         while (n--) {
@@ -682,7 +686,7 @@ int main(int argc, char* argv[])
 
 #ifndef MYMACHINES
     if (argc<3) {
-        printf("\nMPTEST: Usage: %s <myport> <ip:port> <ip:port> ...\n\n", argv[0]);
+        printf("\nMPTEST: Usage: %s <myport> [-f <file> | <ip:port> <ip:port> ...]\n\n", argv[0]);
         return 0;
     }
 #endif
@@ -690,12 +694,14 @@ int main(int argc, char* argv[])
     try {
         EnableSEHtoExceptionMapping();
         StringBuffer lf;
-        openLogFile(lf, "mptest.log");
         // PrintLog("MPTEST Starting");
 
 #ifndef MYMACHINES
         int num_nodes = 0;
         int my_port = atoi(argv[1]);
+        char logfile[256] = { "" };
+        sprintf(logfile,"mptest-%d.log",my_port);
+        // openLogFile(lf, logfile);
 
         PrintLog("MPTEST: Starting %d", my_port);
 
@@ -703,17 +709,54 @@ int main(int argc, char* argv[])
 
         INode *nodes[1000];
 
+        bool has_argfile = false;
+        char argfile[256] = { "" };
+        if (argc > 3)
+        {
+            if (strcmp(argv[2], "-f") == 0)
+            {
+                has_argfile = true;
+                strcpy(argfile, argv[3]);
+            }
+        }
+
         int i = 1;
-        while (i+1 < argc && i-1 < 1000) {
-            PrintLog("MPTEST: adding node %d, port = <%s>", i-1, argv[i+1]);
-            nodes[i-1] = createINode(argv[i+1], my_port);
-            i++;
+        if (has_argfile)
+        {
+            char hoststr[256] = { "" };
+            FILE *fp = fopen(argfile, "r");
+            if (fp == NULL)
+            {
+                PrintLog("MPTest: Error cannot open file <%s>", argfile);
+                return 1;
+            }
+            char line[256] = { "" };
+            while(fgets(line, 255, fp) != NULL)
+            {
+                int srtn = sscanf(line,"%s",hoststr);
+                if (srtn == 1 && line[0] != '#')
+                {
+                    PrintLog("MPTEST: adding node %d, port = <%s>", i-1, hoststr);
+                    nodes[i-1] = createINode(hoststr, my_port);
+                    i++;
+                }
+            }
+            fclose(fp);
+        }
+        else
+        {
+            while (i+1 < argc && i-1 < 1000) {
+                PrintLog("MPTEST: adding node %d, port = <%s>", i-1, argv[i+1]);
+                nodes[i-1] = createINode(argv[i+1], my_port);
+                i++;
+            }
         }
 
         PrintLog("MPTEST: num_nodes = %d", i-1);
 
         IGroup *group = createIGroup(i-1,nodes);
 #else
+        openLogFile(lf, "mptest.log");
         startMPServer(MPPORT);
         IGroup *group = createIGroup(MYMACHINES,MPPORT); 
 #endif
@@ -784,5 +827,6 @@ int main(int argc, char* argv[])
         pexception("Exception",e);
     }
 
+    PrintLog("MPTEST: bye");
     return 0;
 }

+ 3 - 4
testing/regress/environment.xml.in

@@ -15,10 +15,9 @@
   <runtime>${RUNTIME_PATH}</runtime>
   <sourcedir>${CONFIG_SOURCE_PATH}</sourcedir>
   <user>${RUNTIME_USER}</user>
-  <ports>
-   <mpStart>7101</mpStart>
-   <mpEnd>7500</mpEnd>
-  </ports>
+  <mpStart>7101</mpStart>
+  <mpEnd>7500</mpEnd>
+  <mpSoMaxConn>128</mpSoMaxConn>
  </EnvSettings>
  <Hardware>
   <Computer computerType="linuxmachine"