Ver código fonte

HPCC-25639 Roxie will resend permitToSend indefinitely

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 4 anos atrás
pai
commit
e49d39daed
2 arquivos alterados com 36 adições e 9 exclusões
  1. 19 1
      roxie/ccd/ccdmain.cpp
  2. 17 8
      roxie/udplib/udptrr.cpp

+ 19 - 1
roxie/ccd/ccdmain.cpp

@@ -942,7 +942,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
         udpRetryBusySenders = topology->getPropInt("@udpRetryBusySenders", 0);
 
         // Historically, this was specified in seconds. Assume any value <= 10 is a legacy value specified in seconds!
-        udpMaxRetryTimedoutReqs = topology->getPropInt("@udpMaxRetryTimedoutReqs", 0);
         udpRequestToSendTimeout = topology->getPropInt("@udpRequestToSendTimeout", 0);
         if (udpRequestToSendTimeout<=10)
             udpRequestToSendTimeout *= 1000;
@@ -957,7 +956,21 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
             else
                 udpRequestToSendTimeout = 5000;
         }
+
         udpRequestToSendAckTimeout = topology->getPropInt("@udpRequestToSendAckTimeout", 100);
+        if (!udpRequestToSendAckTimeout)
+        {
+            udpRequestToSendAckTimeout = 100;
+            DBGLOG("Bad or missing value for udpRequestToSendAckTimeout - using %u", udpRequestToSendAckTimeout);
+        }
+        udpMaxRetryTimedoutReqs = topology->getPropInt("@udpMaxRetryTimedoutReqs", 0);
+#ifdef _CONTAINERIZED
+        if (!udpMaxRetryTimedoutReqs)   // 0 traditionally means retry forever - which is a really bad idea in cloud world where replacement node may have different IP
+        {
+            udpMaxRetryTimedoutReqs = 60000/udpRequestToSendAckTimeout;  // Give up after 1 minute
+            DBGLOG("Bad or missing value for udpMaxRetryTimedoutReqs - using %u", udpMaxRetryTimedoutReqs);
+        }
+#endif
         // MORE: might want to check socket buffer sizes against sys max here instead of udp threads ?
         udpSnifferReadThreadPriority = topology->getPropInt("@udpSnifferReadThreadPriority", 3);
         udpSnifferSendThreadPriority = topology->getPropInt("@udpSnifferSendThreadPriority", 3);
@@ -1184,6 +1197,11 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
 #ifdef _CONTAINERIZED
         IpAddress myIP(".");
         myNode.setIp(myIP);
+        if (traceLevel)
+        {
+            StringBuffer s;
+            DBGLOG("My node ip=%s", myIP.getIpText(s).str());
+        }
         if (topology->getPropBool("@server", true))
         {
             Owned<IPropertyTreeIterator> roxieFarms = topology->getElements("./services");

+ 17 - 8
roxie/udplib/udptrr.cpp

@@ -121,7 +121,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
         flowType::flowCmd state = flowType::send_completed;    // Meaning I'm not on any queue
         sequence_t flowSeq = 0;                // the sender's most recent flow sequence number
         sequence_t sendSeq = 0;                // the sender's most recent sequence number from request-to-send, representing sequence number of next packet it will send
-        unsigned timeouts = 0;
+        unsigned timeouts = 0;                 // How many consecutive timeouts have happened on the current request
         unsigned requestTime = 0;              // When we received the active requestToSend
         unsigned timeStamp = 0;                // When we last sent okToSend
 
@@ -157,11 +157,6 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                 return false;
         }
 
-        inline void noteDone()
-        {
-            timeouts = 0;
-        }
-
         bool canSendAny() const
         {
             // We can send some if (a) the first available new packet is less than TRACKER_BITS above the first unreceived packet or
@@ -196,6 +191,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
             flowSeq = _flowSeq;
             sendSeq = _sendSeq;
             requestTime = msTick();
+            timeouts = 0;
             try
             {
                 UdpPermitToSendMsg msg;
@@ -496,8 +492,21 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                                 }
                                 UdpSenderEntry *next = finger->nextSender;
                                 pendingPermits.remove(finger);
-                                pendingRequests.append(finger);
-                                finger->state = flowType::request_to_send;  // Go to the back of the queue  - MORE - lets have some code to eventually give up! Or just give up here?
+                                if (++finger->timeouts > udpMaxRetryTimedoutReqs && udpMaxRetryTimedoutReqs != 0)
+                                {
+                                    if (udpTraceLevel || udpTraceFlow || udpTraceTimeouts)
+                                    {
+                                        StringBuffer s;
+                                        DBGLOG("permit to send %" SEQF "u to node %s timed out %u times - abandoning", finger->flowSeq, finger->dest.getIpText(s).str(), finger->timeouts);
+                                    }
+                                }
+                                else
+                                {
+                                    // Put it back on the queue (at the back)
+                                    finger->timeStamp = now;
+                                    pendingRequests.append(finger);
+                                    finger->state = flowType::request_to_send;
+                                }
                                 finger = next;
                             }
                             else