Просмотр исходного кода

Merge branch 'candidate-8.0.x' into candidate-8.2.x

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 3 лет назад
Родитель
Сommit
8a47c3459c
3 измененных файлов с 23 добавлено и 4 удалено
  1. 5 1
      .devcontainer/devcontainer.json
  2. 2 1
      roxie/ccd/ccdqueue.cpp
  3. 16 2
      roxie/udplib/udptrr.cpp

+ 5 - 1
.devcontainer/devcontainer.json

@@ -15,7 +15,11 @@
 	],
 	// "postCreateCommand": "git config oh-my-zsh.hide-info 1",
 	// Set *default* container specific settings.json values on container create.
-	"settings": {},
+	"settings": {
+		"cmake.configureArgs": [
+			"-DUSE_SHLIBDEPS=ON"
+		]
+	},
 	// Add the IDs of extensions you want installed when the container is created.
 	"extensions": [
 		"ms-vscode.cpptools",

+ 2 - 1
roxie/ccd/ccdqueue.cpp

@@ -2862,10 +2862,11 @@ public:
         if (udpResendLostPackets && udpMaxSlotsPerClient > TRACKER_BITS)
             udpMaxSlotsPerClient = TRACKER_BITS;
         unsigned serverFlowPort = topology->getPropInt("@serverFlowPort", CCD_SERVER_FLOW_PORT);
+        bool sendFlowOnDataPort = topology->getPropBool("@sendFlowOnDataPort", true);
         unsigned dataPort = topology->getPropInt("@dataPort", CCD_DATA_PORT);
         unsigned clientFlowPort = topology->getPropInt("@clientFlowPort", CCD_CLIENT_FLOW_PORT);
         receiveManager.setown(createReceiveManager(serverFlowPort, dataPort, clientFlowPort, udpQueueSize, udpMaxSlotsPerClient, encryptionInTransit));
-        sendManager.setown(createSendManager(serverFlowPort, dataPort, clientFlowPort, udpSendQueueSize, fastLaneQueue ? 3 : 2, bucket, encryptionInTransit));
+        sendManager.setown(createSendManager(sendFlowOnDataPort ? dataPort : serverFlowPort, dataPort, clientFlowPort, udpSendQueueSize, fastLaneQueue ? 3 : 2, bucket, encryptionInTransit));
     }
 
     virtual void abortPendingData(const SocketEndpoint &ep) override

+ 16 - 2
roxie/udplib/udptrr.cpp

@@ -54,6 +54,9 @@ static unsigned lastFlowPermitsSent = 0;
 static unsigned lastFlowRequestsReceived = 0;
 static unsigned lastDataPacketsReceived = 0;
 
+// The code that redirects flow messages from data socket to flow socket relies on the assumption tested here
+static_assert(sizeof(UdpRequestToSendMsg) < sizeof(UdpPacketHeader), "Expected UDP rts size to be less than packet header");
+
 class CReceiveManager : implements IReceiveManager, public CInterface
 {
     /*
@@ -573,7 +576,8 @@ class CReceiveManager : implements IReceiveManager, public CInterface
     class receive_data : public Thread 
     {
         CReceiveManager &parent;
-        ISocket *receive_socket;
+        ISocket *receive_socket = nullptr;
+        ISocket *selfFlowSocket = nullptr;
         std::atomic<bool> running = { false };
         Semaphore started;
         
@@ -585,6 +589,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
             if (check_max_socket_read_buffer(ip_buffer) < 0) 
                 throw MakeStringException(ROXIE_UDP_ERROR, "System socket max read buffer is less than %u", ip_buffer);
             receive_socket = ISocket::udp_create(parent.data_port);
+            selfFlowSocket = ISocket::udp_connect(SocketEndpoint(parent.receive_flow_port, myNode.getIpAddress()));
             receive_socket->set_receive_buffer_size(ip_buffer);
             size32_t actualSize = receive_socket->get_receive_buffer_size();
             DBGLOG("UdpReceiver: rcv_data_socket created port=%d requested sockbuffsize=%d actual sockbuffsize=%d", parent.data_port, ip_buffer, actualSize);
@@ -603,8 +608,11 @@ class CReceiveManager : implements IReceiveManager, public CInterface
             running = false;
             if (receive_socket)
                 receive_socket->close();
+            if (selfFlowSocket)
+                selfFlowSocket->close();
             join();
             ::Release(receive_socket);
+            ::Release(selfFlowSocket);
         }
 
         virtual int run() 
@@ -625,7 +633,13 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                 {
                     unsigned int res;
                     b = bufferManager->allocate();
-                    receive_socket->read(b->data, 1, DATA_PAYLOAD, res, 5);
+                    while (true)
+                    {
+                        receive_socket->read(b->data, 1, DATA_PAYLOAD, res, 5);
+                        if (res!=sizeof(UdpRequestToSendMsg))
+                            break;
+                        selfFlowSocket->write(b->data, res);
+                    }
                     dataPacketsReceived++;
                     UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
                     assert(hdr.length == res && hdr.length > sizeof(hdr));