浏览代码

HPCC-25698 Roxie detect client socket close

Signed-off-by: Mark Kelly <mark.kelly@lexisnexisrisk.com>
Mark Kelly 4 年之前
父节点
当前提交
e80e31b904
共有 2 个文件被更改,包括 43 次插入18 次删除
  1. 4 1
      roxie/ccd/ccdcontext.cpp
  2. 39 17
      system/jlib/jsocket.cpp

+ 4 - 1
roxie/ccd/ccdcontext.cpp

@@ -2773,11 +2773,14 @@ public:
         {
             if (socketCheckInterval)
             {
-                if (ticksNow - lastSocketCheckTime > socketCheckInterval)
+                if (ticksNow - lastSocketCheckTime >= socketCheckInterval)
                 {
                     CriticalBlock b(abortLock);
                     if (!protocol->checkConnection())
+                    {
+                        DBGLOG("Client socket close detected");
                         throw MakeStringException(ROXIE_CLIENT_CLOSED, "Client socket closed");
+                    }
                     lastSocketCheckTime = ticksNow;
                 }
             }

+ 39 - 17
system/jlib/jsocket.cpp

@@ -1628,6 +1628,7 @@ int CSocket::logPollError(unsigned revents, const char *rwstr)
 int CSocket::wait_read(unsigned timeout)
 {
     int ret = 0;
+    int retrycount = 100;
     while (sock!=INVALID_SOCKET)
     {
 #ifdef _USE_SELECT
@@ -1661,9 +1662,14 @@ int CSocket::wait_read(unsigned timeout)
                 LOGERR2(err,1,"wait_read");
                 break;
             }
+            else
+            {
+                if (retrycount-- <= 0)
+                    break;
+            }
         }
         else if (ret == 0)
-        {   // timeout
+        {   // timeout, no error (timeout can be 0 for a fast check)
             break;
         }
         else
@@ -2116,26 +2122,42 @@ bool CSocket::check_connection()
     if (state != ss_open) 
         return false;
 
-    unsigned retrycount=100;
-EintrRetry:
-    int rc;
-    if (sockmode==sm_udp_server) { // udp server
-        DEFINE_SOCKADDR(u);
-        socklen_t  ul = setSockAddr(u,returnep,returnep.port);
-        rc = sendto(sock, NULL, 0, 0, &u.sa, ul);
-    }
-    else {
-        rc = send(sock, NULL, 0, SEND_FLAGS);
-    }
-    if (rc < 0) {
+    // This routine is for TCP stream sockets.
+    // It is the callers responsibility to ensure this is called
+    // in a thread-safe manner, while no other threads are reading.
+
+    // wait_read() returns = 0 - socket ok atm
+    //             returns < 0 - error, assume closed
+    //             returns > 0 - recv() to check for eof
+    int rc = wait_read(0);
+    if (rc == 0)
+        return true;
+    else if (rc < 0)
+        return false;
+
+    int retrycount = 100;
+    char buffer;
+
+EintrRecv:
+    rc = recv(sock, &buffer, sizeof(buffer), MSG_PEEK);
+
+    // recv() returns = 0 - socket eof (closed)
+    //        returns < 0 - error, assume closed
+    //        returns > 0 - socket ok atm
+    if (rc == 0)
+        return false;
+    else if (rc < 0)
+    {
         int err=ERRNO();
-        if ((err==JSE_INTR)&&(retrycount--!=0)) {
-            LOGERR2(err,7,"EINTR retrying");
-            goto EintrRetry;
+        if ((err==JSE_INTR)&&(retrycount--!=0))
+        {
+            LOGERR2(err,7,"recv EINTR retrying");
+            goto EintrRecv;
         }
         else
             return false;
-    }   
+    }
+
     return true;
 }