Procházet zdrojové kódy

HPCC-23348 Roxie may lock up under load

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman před 5 roky
rodič
revize
3c2ab06e58
1 změnil soubory, kde provedl 47 přidání a 35 odebrání
  1. 47 35
      roxie/udplib/udptrr.cpp

+ 47 - 35
roxie/udplib/udptrr.cpp

@@ -466,54 +466,66 @@ class CReceiveManager : implements IReceiveManager, public CInterface
             adjustPriority(1);
         #endif
             UdpRequestToSendMsg msg;
-            unsigned timeout = 5000;  // Is this too long?
+            unsigned timeoutExpires = msTick() + 5000;
             while (running)
             {
                 try
                 {
                     const unsigned l = sizeof(msg);
                     unsigned int res ;
-                    flow_socket->readtms(&msg, l, l, res, timeout);
-                    assert(res==l);
-                    if (udpTraceLevel > 5)
+                    unsigned now = msTick();
+                    if (now >= timeoutExpires)
                     {
-                        StringBuffer ipStr;
-                        DBGLOG("UdpReceiver: received %s msg from node=%s", flowType::name(msg.cmd), msg.sourceNode.getTraceText(ipStr).str());
+                        if (currentRequester)
+                            timeoutExpires = now + timedOut(currentRequester);
+                        else
+                            timeoutExpires = now + 5000;
                     }
-                    UdpSenderEntry *sender = &parent.sendersTable[msg.sourceNode.getNodeAddress()];
-                    switch (msg.cmd)
+                    else
                     {
-                    case flowType::request_to_send:
-                        if (pendingRequests || currentRequester)
-                            enqueueRequest(sender);   // timeout does not change - there's still an active request
-                        else
-                            timeout = okToSend(sender);
-                        break;
+                        flow_socket->readtms(&msg, l, l, res, timeoutExpires-now);
+                        now = msTick();
+                        assert(res==l);
+                        if (udpTraceLevel > 5)
+                        {
+                            StringBuffer ipStr;
+                            DBGLOG("UdpReceiver: received %s msg from node=%s", flowType::name(msg.cmd), msg.sourceNode.getTraceText(ipStr).str());
+                        }
+                        UdpSenderEntry *sender = &parent.sendersTable[msg.sourceNode.getNodeAddress()];
+                        switch (msg.cmd)
+                        {
+                        case flowType::request_to_send:
+                            if (pendingRequests || currentRequester)
+                                enqueueRequest(sender);   // timeoutExpires does not change - there's still an active request
+                            else
+                                timeoutExpires = now + okToSend(sender);
+                            break;
 
-                    case flowType::send_completed:
-                        parent.inflight += msg.packets;
-                        if (noteDone(sender) && pendingRequests)
-                            timeout = sendNextOk();
-                        else
-                            timeout = 5000;
-                        break;
+                        case flowType::send_completed:
+                            parent.inflight += msg.packets;
+                            if (noteDone(sender) && pendingRequests)
+                                timeoutExpires = now + sendNextOk();
+                            else
+                                timeoutExpires = now + 5000;
+                            break;
 
-                    case flowType::request_to_send_more:
-                        parent.inflight += msg.packets;
-                        if (noteDone(sender))
-                        {
-                            if (pendingRequests)
+                        case flowType::request_to_send_more:
+                            parent.inflight += msg.packets;
+                            if (noteDone(sender))
                             {
-                                enqueueRequest(sender);
-                                timeout = sendNextOk();
+                                if (pendingRequests)
+                                {
+                                    enqueueRequest(sender);
+                                    timeoutExpires = now + sendNextOk();
+                                }
+                                else
+                                    timeoutExpires = now + okToSend(sender);
                             }
-                            else
-                                timeout = okToSend(sender);
-                        }
-                        break;
+                            break;
 
-                    default:
-                        DBGLOG("UdpReceiver: received unrecognized flow control message cmd=%i", msg.cmd);
+                        default:
+                            DBGLOG("UdpReceiver: received unrecognized flow control message cmd=%i", msg.cmd);
+                        }
                     }
                 }
                 catch (IException *e)
@@ -524,7 +536,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                         // A timeout implies that there is an active permission to send, but nothing has happened.
                         // Could be a really busy (or crashed) slave, could be a lost packet
                         if (currentRequester)
-                            timeout = timedOut(currentRequester);
+                            timeoutExpires = msTick() + timedOut(currentRequester);
                     }
                     else if (running)
                     {