Bläddra i källkod

HPCC-25471 - Roxie UDP timeout calculation can go wrong if CPU starved

Also fixes minor error in tracing spotted during investigation.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 4 år sedan
förälder
incheckning
f7743b8f75
1 ändrade filer med 14 tillägg och 9 borttagningar
  1. 14 9
      roxie/udplib/udptrr.cpp

+ 14 - 9
roxie/udplib/udptrr.cpp

@@ -171,7 +171,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                 if (udpTraceLevel > 1)
                 {
                     StringBuffer ipStr;
-                    DBGLOG("UdpReceiver: sending ok_to_send %d msg to node=%s", maxTransfer, returnAddress.getIpText(ipStr).str());
+                    DBGLOG("UdpReceiver: sending ok_to_send %d msg to node=%s", maxTransfer, dest.getIpText(ipStr).str());
                 }
                 flowSocket->write(&msg, sizeof(UdpPermitToSendMsg));
             }
@@ -348,6 +348,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
             unsigned timeout = ((max_transfer * DATA_PAYLOAD) / 100) + 10; // in ms assuming mtu package size with 100x margin on 100 Mbit network // MORE - hideous!
             currentRequester = requester;
             requester->requestToSend(max_transfer, myNode.getIpAddress());
+            assert(timeout >= 10 && timeout <= 20000);
             return timeout;
         }
 
@@ -369,6 +370,8 @@ class CReceiveManager : implements IReceiveManager, public CInterface
         unsigned timedOut(UdpSenderEntry *requester)
         {
             // MORE - this will retry indefinitely if agent in question is dead
+            // As coded, this rescinds the permission to send that just timed out and tells the next person to have a go
+            // Thus leading to "Received completed message is not from current sender" messages the the send was in flight
             currentRequester = nullptr;
             if (requester->retryOnTimeout())
                 enqueueRequest(requester);
@@ -484,7 +487,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                     else
                     {
                         flow_socket->readtms(&msg, l, l, res, timeoutExpires-now);
-                        now = msTick();
+                        unsigned newTimeout = 0;
                         assert(res==l);
                         if (udpTraceLevel > 5)
                         {
@@ -496,17 +499,17 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                         {
                         case flowType::request_to_send:
                             if (pendingRequests || currentRequester)
-                                enqueueRequest(sender);   // timeoutExpires does not change - there's still an active request
+                                enqueueRequest(sender);   // timeoutExpires does not change - there's still an active request. We have not given a new permission
                             else
-                                timeoutExpires = now + okToSend(sender);
+                                newTimeout = okToSend(sender);
                             break;
 
                         case flowType::send_completed:
                             parent.inflight += msg.packets;
-                            if (noteDone(sender) && pendingRequests)
-                                timeoutExpires = now + sendNextOk();
+                            if (noteDone(sender) && pendingRequests)   // This && looks wrong - noteDone returning false should mean we haven't seen the completed we wanted - so current timeout still applies. Or the one below is wrong...
+                                newTimeout = sendNextOk();
                             else
-                                timeoutExpires = now + 5000;
+                                newTimeout = 5000;
                             break;
 
                         case flowType::request_to_send_more:
@@ -516,16 +519,18 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                                 if (pendingRequests)
                                 {
                                     enqueueRequest(sender);
-                                    timeoutExpires = now + sendNextOk();
+                                    newTimeout = sendNextOk();
                                 }
                                 else
-                                    timeoutExpires = now + okToSend(sender);
+                                    newTimeout = okToSend(sender);
                             }
                             break;
 
                         default:
                             DBGLOG("UdpReceiver: received unrecognized flow control message cmd=%i", msg.cmd);
                         }
+                        if (newTimeout)
+                            timeoutExpires = msTick() + newTimeout;
                     }
                 }
                 catch (IException *e)