Quellcode durchsuchen

HPCC-13603 Fix corruption due to large handle + associated issues

A socket handle that exceeded the internal jsocket limit (32k)
caused an exception to be thrown, but only after it had already
been used and caused corruption (typically to the stack)

There were also issues in dafilesrv when handling this exception.

Also added some comments to CRemoteFileServer describing how the
thread pool, select handler and throttling interact.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith vor 10 Jahren
Ursprung
Commit
01bc967e25
3 geänderte Dateien mit 87 neuen und 32 gelöschten Zeilen
  1. 47 8
      common/remote/sockfile.cpp
  2. 19 12
      system/jlib/jsocket.cpp
  3. 21 12
      system/jlib/jthread.cpp

+ 47 - 8
common/remote/sockfile.cpp

@@ -3253,18 +3253,49 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer, imple
         {
             if (selecthandled) 
                 processCommand(); // buffer already filled
-            else {
-                while (parent->threadRunningCount()<=TARGET_ACTIVE_THREADS) { // if too many threads add to select handler
-                    int w = socket->wait_read(1000);
+            else
+            {
+                while (parent->threadRunningCount()<=TARGET_ACTIVE_THREADS) // if too many threads add to select handler
+                {
+                    int w;
+                    try
+                    {
+                        w = socket->wait_read(1000);
+                    }
+                    catch (IException *e)
+                    {
+                        EXCLOG(e, "CRemoteClientHandler::main wait_read error");
+                        e->Release();
+                        parent->onCloseSocket(this,1);
+                        return;
+                    }
                     if (w==0)
                         break;
-                    if ((w<0)||!immediateCommand()) {
+                    if ((w<0)||!immediateCommand())
+                    {
                         if (w<0) 
                             WARNLOG("CRemoteClientHandler::main wait_read error");
                         parent->onCloseSocket(this,1);
                         return;
                     }
                 }
+
+                /* This is a bit confusing..
+                 * The addClient below, adds this request to a selecthandler handled by another thread
+                 * and passes ownership of 'this' (CRemoteClientHandler)
+                 *
+                 * When notified, the selecthandler will launch a new pool thread to handle the request
+                 * If the pool thread limit is hit, the selecthandler will be blocked [ see comment in CRemoteFileServer::notify() ]
+                 *
+                 * Either way, a thread pool slot is occupied when processing a request.
+                 * Blocked threads, will be blocked for up to 1 minute (as defined by createThreadPool call)
+                 * IOW, if there are lots of incoming clients that can't be serviced by the CThrottle limit,
+                 * a large number of pool threads will build up after a while.
+                 *
+                 * The CThrottler mechanism, imposes a further hard limit on how many concurrent request threads can be active.
+                 * If the thread pool had an absolute limit (instead of just introducing a delay), then I don't see the point
+                 * in this additional layer of throttling..
+                 */
                 selecthandled = true;
                 parent->addClient(this);    // add to select handler
             }
@@ -3358,6 +3389,7 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer, imple
             catch (IException *e) {
                 // suppress some more errors clearing client
                 EXCLOG(e,"cCommandProcessor::main(2)");
+                e->Release();
             }
         }
         bool stop()
@@ -3418,6 +3450,7 @@ public:
 #endif
 
         INFINITE,TARGET_MIN_THREADS));
+        threads->setStartDelayTracing(60); // trace amount delayed every minute.
         stopping = false;
         clientcounttick = msTick();
         closedclients = 0;
@@ -4559,7 +4592,6 @@ public:
     {
         if (listenep.isNull())
             acceptsock.setown(ISocket::create(listenep.port));
-
         else {
             StringBuffer ips;
             listenep.getIpText(ips);
@@ -4662,13 +4694,15 @@ public:
                     }
                     if (!sock||stopping)
                         break;
+                    runClient(sock.getClear());
                 }
                 catch (IException *e) {
                     EXCLOG(e,"CRemoteFileServer");
                     e->Release();
-                    break;
+                    sock.clear();
+                    if (!QUERYINTERFACE(e, IJSOCK_Exception))
+                        break;
                 }
-                runClient(sock.getClear());
             }
             else
                 checkTimeout();
@@ -4782,6 +4816,7 @@ public:
             params.client->Link();
             clients.append(*params.client);
         }
+        // NB: This could be blocked, by thread pool limit
         threads->start(&params);
     }
 
@@ -4807,11 +4842,15 @@ public:
         if (client->buf.length()) {
             cCommandProcessor::cCommandProcessorParams params;
             params.client = client.getClear();
+
+            /* This can block because the thread pool is full and therefore block the selecthandler
+             * This is akin to the main server blocking post accept() for the same reason.
+             */
             threads->start(&params);
         }
         else 
             onCloseSocket(client,3);    // removes owned handles
-        
+
         return false;
     }
 

+ 19 - 12
system/jlib/jsocket.cpp

@@ -1200,6 +1200,7 @@ bool CSocket::connect_timeout( unsigned timeout, bool noexception)
         if ((err == EINPROGRESS)||(err == EWOULDBLOCK)) {
             T_FD_SET fds;
             struct timeval tv;
+            CHECKSOCKRANGE(sock);
             XFD_ZERO(&fds);
             FD_SET((unsigned)sock, &fds);
             T_FD_SET except;
@@ -1207,7 +1208,6 @@ bool CSocket::connect_timeout( unsigned timeout, bool noexception)
             FD_SET((unsigned)sock, &except);
             tv.tv_sec = remaining / 1000;
             tv.tv_usec = (remaining % 1000)*1000;
-            CHECKSOCKRANGE(sock);
             int rc = ::select( sock + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
             if (rc>0) {
                 // select succeeded - return error from socket (0 if connected)
@@ -1306,6 +1306,7 @@ void CSocket::connect_wait(unsigned timems)
             while (!blockselect && ((err == EINPROGRESS)||(err == EWOULDBLOCK))) {
                 T_FD_SET fds;
                 struct timeval tv;
+                CHECKSOCKRANGE(sock);
                 XFD_ZERO(&fds);
                 FD_SET((unsigned)sock, &fds);
                 T_FD_SET except;
@@ -1318,7 +1319,6 @@ void CSocket::connect_wait(unsigned timems)
                 tv.tv_sec = 0;
                 tv.tv_usec = 0;
     #endif
-                CHECKSOCKRANGE(sock);
                 int rc = ::select( sock + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
                 if (rc>0) {
                     // select succeeded - return error from socket (0 if connected)
@@ -1441,9 +1441,9 @@ int CSocket::wait_read(unsigned timeout)
     int ret = 0;
     while (sock!=INVALID_SOCKET) {
         T_FD_SET fds;
+        CHECKSOCKRANGE(sock);
         XFD_ZERO(&fds);
         FD_SET((unsigned)sock, &fds);
-        CHECKSOCKRANGE(sock);
         if (timeout==WAIT_FOREVER) {
             ret = ::select( sock + 1, (fd_set *)&fds, NULL, NULL, NULL );
         }
@@ -1471,9 +1471,9 @@ int CSocket::wait_write(unsigned timeout)
     int ret = 0;
     while (sock!=INVALID_SOCKET) {
         T_FD_SET fds;
+        CHECKSOCKRANGE(sock);
         XFD_ZERO(&fds);
         FD_SET((unsigned)sock, &fds);
-        CHECKSOCKRANGE(sock);
         if (timeout==WAIT_FOREVER) {
             ret = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, NULL );
         }
@@ -3690,12 +3690,12 @@ public:
         }
         T_FD_SET fds;
         struct timeval tv;
+        CHECKSOCKRANGE(sock);
         XFD_ZERO(&fds);
         FD_SET((unsigned)sock, &fds);
         //FD_SET((unsigned)sock, &except);
         tv.tv_sec = 0;
         tv.tv_usec = 0;
-        CHECKSOCKRANGE(sock);
         int rc = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, &tv );
         if (rc<0) {
             StringBuffer sockstr;
@@ -4742,13 +4742,18 @@ public:
 
     void add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
     {
-        CriticalBlock block(sect);
-        epollthread->add(sock,mode,nfy);
+        CriticalBlock block(sect); // JCS->MK - are these blocks necessary? epollthread->add() uses it's own CS.
+
+        /* JCS->MK, the CSocketSelectHandler variety, checks result of thread->add and spins up another handler
+         * Shouldn't epoll version do the same?
+         */
+        if (!epollthread->add(sock,mode,nfy))
+            throw MakeStringException(-1, "CSocketEpollHandler: failed to add socket to epollthread handler: sock # = %d", sock->OShandle());
     }
 
     void remove(ISocket *sock)
     {
-        CriticalBlock block(sect);
+        CriticalBlock block(sect); // JCS->MK - are these blocks necessary? epollthread->add() uses it's own CS.
         epollthread->remove(sock);
     }
 
@@ -6003,7 +6008,7 @@ public:
                 if ((err == EINPROGRESS)||(err == EWOULDBLOCK))
                     err = 0; // continue
                 else {
-                    if (err==0) 
+                    if (err==0)
                         connectdone = true; // done immediately
                     else if(!oneshot) //  probably ECONNREFUSED but treat all errors same
                         refused_sleep((waitremaining==remaining)?waittm:connecttm,refuseddelay); // this stops becoming cpu bound
@@ -6011,6 +6016,7 @@ public:
             }
             if (!connectdone&&(err==0)) {
                 SOCKET s = sock->sock;
+                CHECKSOCKRANGE(s);
                 T_FD_SET fds;
                 struct timeval tv;
                 XFD_ZERO(&fds);
@@ -6020,7 +6026,6 @@ public:
                 FD_SET((unsigned)s, &except);
                 tv.tv_sec = remaining / 1000;
                 tv.tv_usec = (remaining % 1000)*1000;
-                CHECKSOCKRANGE(s);
                 int rc = ::select( s + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
                 if (rc==0) 
                     break; // timeout
@@ -6091,8 +6096,10 @@ int wait_multiple(bool isRead,               //IN   true if wait read, false it
 #ifdef _DEBUG
         dbgSB.appendf(" %d",socks.item(idx));
 #endif
-        maxSocket = socks.item(idx) > maxSocket ? socks.item(idx) : maxSocket;
-        FD_SET((unsigned)socks.item(idx), &fds);
+        SOCKET s = socks.item(idx);
+        CHECKSOCKRANGE(s);
+        maxSocket = s > maxSocket ? s : maxSocket;
+        FD_SET((unsigned)s, &fds);
     }
 #ifdef _DEBUG
     DBGLOG("%s",dbgSB.str());

+ 21 - 12
system/jlib/jthread.cpp

@@ -928,23 +928,32 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter
         PooledThreadHandle ret;
         {
             CriticalBlock block(crit);
-            if (timedout&&!availsem.wait(0)) {  // make sure take allocated sem if has become available
-                if (noBlock || timeout > 0)
-                    throw MakeStringException(0, "No threads available in pool %s", poolname.get());
-                WARNLOG("Pool limit exceeded for %s", poolname.get());
+            if (timedout)
+            {
+                if (!availsem.wait(0)) {  // make sure take allocated sem if has become available
+                    if (noBlock || timeout > 0)
+                        throw MakeStringException(0, "No threads available in pool %s", poolname.get());
+                    WARNLOG("Pool limit exceeded for %s", poolname.get());
+                }
+                else
+                    timedout = false;
             }
             if (traceStartDelayPeriod)
             {
                 ++startsInPeriod;
-                startDelayInPeriod += startTimer.elapsedCycles();
-                if (overAllTimer.elapsedCycles() >= queryOneSecCycles()*traceStartDelayPeriod) // check avg. delay per minute
+                if (timedout)
                 {
-                    cycle_t avg = startDelayInPeriod/startsInPeriod;
-                    unsigned avgMs = static_cast<unsigned>(cycle_to_nanosec(avg)/1000000);
-                    PROGLOG("%s: %d threads started in last %d seconds, average delay = %d milliseconds", poolname.get(), startsInPeriod, traceStartDelayPeriod, avgMs);
-                    startsInPeriod = 0;
-                    startDelayInPeriod = 0;
-                    overAllTimer.reset();
+                    startDelayInPeriod += startTimer.elapsedCycles();
+                    if (overAllTimer.elapsedCycles() >= queryOneSecCycles()*traceStartDelayPeriod) // check avg. delay per minute
+                    {
+                        double totalDelayMs = (static_cast<double>(cycle_to_nanosec(startDelayInPeriod)))/1000000;
+                        double avgDelayMs = (static_cast<double>(cycle_to_nanosec(startDelayInPeriod/startsInPeriod)))/1000000;
+                        unsigned totalElapsedSecs = overAllTimer.elapsedMs()/1000;
+                        PROGLOG("%s: %u threads started in last %u seconds, total delay = %0.2f milliseconds, average delay = %0.2f milliseconds, currently running = %u", poolname.get(), startsInPeriod, totalElapsedSecs, totalDelayMs, avgDelayMs, runningCount());
+                        startsInPeriod = 0;
+                        startDelayInPeriod = 0;
+                        overAllTimer.reset();
+                    }
                 }
             }
             CPooledThreadWrapper &t = allocThread();