Browse Source

HPCC-26779 Roxie flow control assumes flow packets do not overtake data packets

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 3 years ago
parent
commit
4038fc4a8c
2 changed files with 18 additions and 3 deletions
  1. 2 1
      roxie/ccd/ccdqueue.cpp
  2. 16 2
      roxie/udplib/udptrr.cpp

+ 2 - 1
roxie/ccd/ccdqueue.cpp

@@ -2845,10 +2845,11 @@ public:
         if (udpResendLostPackets && udpMaxSlotsPerClient > TRACKER_BITS)
             udpMaxSlotsPerClient = TRACKER_BITS;
         unsigned serverFlowPort = topology->getPropInt("@serverFlowPort", CCD_SERVER_FLOW_PORT);
+        bool sendFlowOnDataPort = topology->getPropBool("@sendFlowOnDataPort", true);
         unsigned dataPort = topology->getPropInt("@dataPort", CCD_DATA_PORT);
         unsigned clientFlowPort = topology->getPropInt("@clientFlowPort", CCD_CLIENT_FLOW_PORT);
         receiveManager.setown(createReceiveManager(serverFlowPort, dataPort, clientFlowPort, udpQueueSize, udpMaxSlotsPerClient, encryptionInTransit));
-        sendManager.setown(createSendManager(serverFlowPort, dataPort, clientFlowPort, udpSendQueueSize, fastLaneQueue ? 3 : 2, bucket, encryptionInTransit));
+        sendManager.setown(createSendManager(sendFlowOnDataPort ? dataPort : serverFlowPort, dataPort, clientFlowPort, udpSendQueueSize, fastLaneQueue ? 3 : 2, bucket, encryptionInTransit));
     }
 
 };

+ 16 - 2
roxie/udplib/udptrr.cpp

@@ -54,6 +54,9 @@ static unsigned lastFlowPermitsSent = 0;
 static unsigned lastFlowRequestsReceived = 0;
 static unsigned lastDataPacketsReceived = 0;
 
+// The code that redirects flow messages from data socket to flow socket relies on the assumption tested here
+static_assert(sizeof(UdpRequestToSendMsg) < sizeof(UdpPacketHeader), "Expected UDP rts size to be less than packet header");
+
 class CReceiveManager : implements IReceiveManager, public CInterface
 {
     /*
@@ -573,7 +576,8 @@ class CReceiveManager : implements IReceiveManager, public CInterface
     class receive_data : public Thread 
     {
         CReceiveManager &parent;
-        ISocket *receive_socket;
+        ISocket *receive_socket = nullptr;
+        ISocket *selfFlowSocket = nullptr;
         std::atomic<bool> running = { false };
         Semaphore started;
         
@@ -585,6 +589,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
             if (check_max_socket_read_buffer(ip_buffer) < 0) 
                 throw MakeStringException(ROXIE_UDP_ERROR, "System socket max read buffer is less than %u", ip_buffer);
             receive_socket = ISocket::udp_create(parent.data_port);
+            selfFlowSocket = ISocket::udp_connect(SocketEndpoint(parent.receive_flow_port, myNode.getIpAddress()));
             receive_socket->set_receive_buffer_size(ip_buffer);
             size32_t actualSize = receive_socket->get_receive_buffer_size();
             DBGLOG("UdpReceiver: rcv_data_socket created port=%d requested sockbuffsize=%d actual sockbuffsize=%d", parent.data_port, ip_buffer, actualSize);
@@ -603,8 +608,11 @@ class CReceiveManager : implements IReceiveManager, public CInterface
             running = false;
             if (receive_socket)
                 receive_socket->close();
+            if (selfFlowSocket)
+                selfFlowSocket->close();
             join();
             ::Release(receive_socket);
+            ::Release(selfFlowSocket);
         }
 
         virtual int run() 
@@ -625,7 +633,13 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                 {
                     unsigned int res;
                     b = bufferManager->allocate();
-                    receive_socket->read(b->data, 1, DATA_PAYLOAD, res, 5);
+                    while (true)
+                    {
+                        receive_socket->read(b->data, 1, DATA_PAYLOAD, res, 5);
+                        if (res!=sizeof(UdpRequestToSendMsg))
+                            break;
+                        selfFlowSocket->write(b->data, res);
+                    }
                     dataPacketsReceived++;
                     UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
                     assert(hdr.length == res && hdr.length > sizeof(hdr));