Jelajahi Sumber

Merge pull request #6485 from jakesmith/hpcc-12242

HPCC-12242 Trace slow MP accept & connects

Reviewed-By: Mark Kelly <mark.kelly@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 tahun lalu
induk
melakukan
e7426e2af3
1 mengubah file dengan 51 tambahan dan 4 penghapusan
  1. 51 4
      system/mp/mpcomm.cpp

+ 51 - 4
system/mp/mpcomm.cpp

@@ -60,8 +60,11 @@
 #define CANCELTIMEOUT       1000        // 1 sec
 
 #define CONNECT_TIMEOUT         (5*60*1000) // 5 mins
-#define CONNECT_READ_TIMEOUT    (3*60*1000) // 3 min    
-#define CONFIRM_TIMEOUT         (CONNECT_READ_TIMEOUT/2)    
+#define CONNECT_READ_TIMEOUT    (3*60*1000) // 3 mins
+#define CONNECT_TIMEOUT_INTERVAL 5000 // 5 secs
+#define CONFIRM_TIMEOUT         (CONNECT_READ_TIMEOUT/2) // 1.5 mins
+#define CONFIRM_TIMEOUT_INTERVAL 5000 // 5 secs
+#define CONFIRM_TRACESLOW_THRESHOLD 1000 // 1 sec
 
 #define VERIFY_DELAY            (1*60*1000)  // 1 Minute
 #define VERIFY_TIMEOUT          (1*60*1000)  // 1 Minute
@@ -607,6 +610,50 @@ public:
 };
 
 
+void traceSlowReadTms(const char *msg, ISocket *sock, void *dst, size32_t minSize, size32_t maxSize, size32_t &sizeRead, unsigned timeoutMs, unsigned timeoutChkIntervalMs)
+{
+    dbgassertex(timeoutChkIntervalMs < timeoutMs);
+    StringBuffer epStr;
+    CCycleTimer readTmsTimer;
+    unsigned intervalTimeoutMs = timeoutChkIntervalMs;
+    loop
+    {
+        try
+        {
+            sock->readtms(dst, minSize, maxSize, sizeRead, intervalTimeoutMs);
+            break;
+        }
+        catch (IJSOCK_Exception *e)
+        {
+            if (JSOCKERR_timeout_expired != e->errorCode())
+                throw;
+            unsigned elapsedMs = readTmsTimer.elapsedMs();
+            if (elapsedMs >= timeoutMs)
+                throw;
+            unsigned remainingMs = timeoutMs-elapsedMs;
+            if (remainingMs < timeoutChkIntervalMs)
+                intervalTimeoutMs = remainingMs;
+            if (0 == epStr.length())
+            {
+                SocketEndpoint ep;
+                sock->getPeerEndpoint(ep);
+                ep.getUrlStr(epStr);
+            }
+            WARNLOG("%s %s, stalled for %d ms so far", msg, epStr.str(), elapsedMs);
+        }
+    }
+    if (readTmsTimer.elapsedMs() >= CONFIRM_TRACESLOW_THRESHOLD)
+    {
+        if (0 == epStr.length())
+        {
+            SocketEndpoint ep;
+            sock->getPeerEndpoint(ep);
+            ep.getUrlStr(epStr);
+        }
+        WARNLOG("%s %s, took: %d ms", msg, epStr.str(), readTmsTimer.elapsedMs());
+    }
+}
+
 class CMPPacketReader;
 
 class CMPChannel: public CInterface
@@ -679,7 +726,7 @@ protected: friend class CMPPacketReader;
 #endif
                 size32_t reply;
                 size32_t rd;
-                newsock->readtms(&reply,sizeof(reply),sizeof(reply),rd,CONNECT_READ_TIMEOUT); 
+                traceSlowReadTms("MP: connect to", newsock, &reply, sizeof(reply), sizeof(reply), rd, CONNECT_READ_TIMEOUT, CONNECT_TIMEOUT_INTERVAL);
 #ifdef _FULLTRACE
                 LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket read %d",reply);
 #endif
@@ -1664,7 +1711,7 @@ int CMPConnectThread::run()
                 SocketEndpoint remoteep;
                 SocketEndpoint hostep;
                 SocketEndpointV4 id[2];
-                sock->readtms(&id[0],sizeof(id),sizeof(id),rd,CONFIRM_TIMEOUT); 
+                traceSlowReadTms("MP: initial accept packet from", sock, &id[0], sizeof(id), sizeof(id), rd, CONFIRM_TIMEOUT, CONFIRM_TIMEOUT_INTERVAL);
                 if (rd != sizeof(id))
                 {
                     FLLOG(MCoperatorWarning, unknownJob, "MP Connect Thread: invalid number of connection bytes serialized");