Explorar o código

Merge pull request #15618 from richardkchapman/udpsim-restart

HPCC-26901 Add support for sender/receiver restarts to udp simulation

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday %!s(int64=3) %!d(string=hai) anos
pai
achega
598a1cab8c
Modificáronse 5 ficheiros con 110 adicións e 37 borrados
  1. 7 1
      roxie/udplib/udpsha.cpp
  2. 4 8
      roxie/udplib/udpsha.hpp
  3. 60 19
      roxie/udplib/udpsim.cpp
  4. 18 4
      roxie/udplib/udptrr.cpp
  5. 21 5
      roxie/udplib/udptrs.cpp

+ 7 - 1
roxie/udplib/udpsha.cpp

@@ -678,6 +678,7 @@ fake read socket that
 
 #ifdef SOCKET_SIMULATION
 bool isUdpTestMode = false;
+bool udpTestUseUdpSockets = true;
 
 CSimulatedQueueWriteSocket* CSimulatedQueueWriteSocket::udp_connect(const SocketEndpoint &ep)
 {
@@ -800,12 +801,17 @@ unsigned getMappedSocketPort(const SocketEndpoint & ep)
 
 CSimulatedUdpReadSocket::CSimulatedUdpReadSocket(const SocketEndpoint &_me)
 {
-    unsigned port = getMappedSocketPort(_me);
+    port = getMappedSocketPort(_me);
     if (connected[port-basePort].exchange(true))
         throw makeStringException(0, "Two ip/ports mapped to the same port - improve the hash (or change maxPorts)!");
     realSocket.setown(ISocket::udp_create(port));
 }
 
+CSimulatedUdpReadSocket::~CSimulatedUdpReadSocket()
+{
+    connected[port-basePort].exchange(false);
+}
+
 size32_t CSimulatedUdpReadSocket::get_receive_buffer_size() { return realSocket->get_receive_buffer_size(); }
 void CSimulatedUdpReadSocket::set_receive_buffer_size(size32_t sz) { realSocket->set_receive_buffer_size(sz); }
 void CSimulatedUdpReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs)

+ 4 - 8
roxie/udplib/udpsha.hpp

@@ -264,7 +264,6 @@ inline bool checkTraceLevel(unsigned category, unsigned level)
     return (udpTraceLevel >= level);
 }
 #define SOCKET_SIMULATION
-#define SOCKET_SIMULATION_UDP
 
 #ifdef SOCKET_SIMULATION
 #ifdef _DEBUG
@@ -278,6 +277,7 @@ extern unsigned flowPacketsSent[flowType::max_flow_cmd];
 #endif
 
 extern UDPLIB_API bool isUdpTestMode;
+extern UDPLIB_API bool udpTestUseUdpSockets;
 
 class CSocketSimulator : public CInterfaceOf<ISocket>
 {
@@ -412,11 +412,14 @@ class CSimulatedUdpSocket : public CSocketSimulator
 protected:
     Owned<ISocket> realSocket;
 };
+
 class CSimulatedUdpReadSocket : public CSimulatedUdpSocket
 {
     CSimulatedUdpReadSocket(const SocketEndpoint &_me);
+    ~CSimulatedUdpReadSocket();
 
 public:
+    unsigned port;
     static CSimulatedUdpReadSocket* udp_create(const SocketEndpoint &_me);
 
     virtual size32_t get_receive_buffer_size() override;
@@ -438,13 +441,6 @@ public:
     virtual void close() override;
 };
 
-#ifdef SOCKET_SIMULATION_UDP
-using CSimulatedWriteSocket = CSimulatedUdpWriteSocket;
-using CSimulatedReadSocket = CSimulatedUdpReadSocket;
-#else
-using CSimulatedWriteSocket = CSimulatedQueueWriteSocket;
-using CSimulatedReadSocket = CSimulatedQueueReadSocket;
-#endif
 
 #endif
 

+ 60 - 19
roxie/udplib/udpsim.cpp

@@ -30,7 +30,10 @@ using roxiemem::IDataBufferManager;
 
 Owned<IDataBufferManager> dbm;
 
-unsigned numThreads = 20;
+static unsigned numThreads = 20;
+static unsigned packetsPerThread = 0;
+static bool restartSender = false;
+static bool restartReceiver = false;
 
 static constexpr const char * defaultYaml = R"!!(
 version: "1.0"
@@ -44,6 +47,9 @@ udpsim:
   help: false
   numThreads: 20
   outputconfig: false
+  packetsPerThread: 10000
+  restartReceiver: false
+  restartSender: false
   udpTraceLevel: 1
   udpTraceTimeouts: true
   udpResendLostPackets: true
@@ -51,6 +57,7 @@ udpsim:
   udpRequestToSendAckTimeout: 1000
   udpMaxPendingPermits: 1
   udpTraceFlow: false
+  useQueue: false
 )!!";
 
 bool isNumeric(const char *str)
@@ -134,6 +141,9 @@ void initOptions(int argc, const char **argv)
     udpDropFlowPackets[flowType::request_to_send_more] = options->getPropInt("@dropRequestToSendMorePackets", 0);  // drop 1 in N
     udpDropFlowPackets[flowType::send_completed] = options->getPropInt("@dropSendCompletedPackets", 0);  // drop 1 in N
 #endif
+    restartSender = options->getPropBool("@restartSender");
+    restartReceiver = options->getPropBool("@restartReceiver");
+
     numThreads = options->getPropInt("@numThreads", 0);
     udpTraceLevel = options->getPropInt("@udpTraceLevel", 1);
     udpTraceTimeouts = options->getPropBool("@udpTraceTimeouts", true);
@@ -142,12 +152,22 @@ void initOptions(int argc, const char **argv)
     udpRequestToSendAckTimeout = options->getPropInt("@udpRequestToSendAckTimeout", 1000);
     udpMaxPendingPermits = options->getPropInt("@udpMaxPendingPermits", 1);
     udpTraceFlow = options->getPropBool("@udpTraceFlow", false);
+    packetsPerThread = options->getPropInt("@packetsPerThread");
+    udpTestUseUdpSockets = !options->getPropBool("@useQueue");
 
     isUdpTestMode = true;
     roxiemem::setTotalMemoryLimit(false, false, false, 20*1024*1024, 0, NULL, NULL);
     dbm.setown(roxiemem::createDataBufferManager(roxiemem::DATA_ALIGNMENT_SIZE));
 }
 
+// How many times the simulated sender [i] should start
+unsigned numStarts(unsigned i)
+{
+    if (i==1 && restartSender)
+        return 2;
+    return 1;
+}
+
 void simulateTraffic()
 {
     constexpr unsigned numReceiveSlots = 100;
@@ -159,31 +179,52 @@ void simulateTraffic()
         Owned<IReceiveManager> rm = createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, numReceiveSlots, maxSlotsPerClient, false);
         unsigned begin = msTick();
         printf("Start test\n");
-        asyncFor(numThreads, numThreads, [maxSendQueueSize](unsigned i)
+        asyncFor(numThreads+1, numThreads+1, [maxSendQueueSize, numReceiveSlots, maxSlotsPerClient, &rm](unsigned i)
         {
-            unsigned header = 0;
-            IpAddress pretendIP(VStringBuffer("8.8.8.%d", i));
-            // Note - this is assuming we send flow on the data port (that option defaults true in roxie too)
-            Owned<ISendManager> sm = createSendManager(CCD_DATA_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, maxSendQueueSize, 3, pretendIP, nullptr, false);
-            Owned<IMessagePacker> mp = sm->createMessagePacker(0, 0, &header, sizeof(header), myNode, 0);
-            for (unsigned j = 0; j < 10000; j++)
+            if (!i)
             {
-                void *buf = mp->getBuffer(500, false);
-                memset(buf, i, 500);
-                mp->putBuffer(buf, 500, false);
+                if (restartReceiver)
+                {
+                    Sleep(100);
+                    rm.clear();
+                    rm.setown(createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, numReceiveSlots, maxSlotsPerClient, false));
+                }
+            }
+            else
+            {
+                unsigned header = 0;
+                unsigned myStarts = numStarts(i);
+                for (unsigned startNo = 0; startNo < myStarts; startNo++)
+                {
+                    IpAddress pretendIP(VStringBuffer("8.8.8.%d", i));
+                    // Note - this is assuming we send flow on the data port (that option defaults true in roxie too)
+                    Owned<ISendManager> sm = createSendManager(CCD_DATA_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, maxSendQueueSize, 3, pretendIP, nullptr, false);
+                    unsigned numPackets = packetsPerThread / myStarts;
+                    for (unsigned j = 0; j < packetsPerThread; j++)
+                    {
+                        Owned<IMessagePacker> mp = sm->createMessagePacker(0, 0, &header, sizeof(header), myNode, 0);
+                        void *buf = mp->getBuffer(500, false);
+                        memset(buf, i, 500);
+                        mp->putBuffer(buf, 500, false);
+                        mp->flush();
+                    }
+
+                    // Wait until all the packets have been sent and acknowledged, for last start only
+                    // For prior starts, we are trying to simulate a sender stopping abruptly (e.g. from a restart) so we don't want to close it down cleanly.
+                    if (startNo == myStarts-1)
+                        while (!sm->allDone())
+                            Sleep(50);
+                    DBGLOG("UdpSim sender thread %d sent %d packets", i, numPackets);
+                }
+                DBGLOG("UdpSim sender thread %d completed", i);
             }
-            mp->flush();
-
-            //wait until all the packets have been sent and acknowledged
-            while(!sm->allDone())
-                Sleep(50);
         });
-        printf("End test %u\n", msTick() - begin);
+        printf("UdpSim test took %ums\n", msTick() - begin);
     }
     catch (IException * e)
     {
-        StringBuffer msg;
-        printf("Exception: %s\n", e->errorMessage(msg).str());
+        EXCLOG(e);
+        e->Release();
     }
 }
 

+ 18 - 4
roxie/udplib/udptrr.cpp

@@ -132,7 +132,10 @@ class CReceiveManager : implements IReceiveManager, public CInterface
             SocketEndpoint ep(port, dest);
 #ifdef SOCKET_SIMULATION
             if (isUdpTestMode)
-                flowSocket = CSimulatedWriteSocket::udp_connect(ep);
+                if (udpTestUseUdpSockets)
+                    flowSocket = CSimulatedUdpWriteSocket::udp_connect(ep);
+                else
+                    flowSocket = CSimulatedQueueWriteSocket::udp_connect(ep);
             else
 #endif
                 flowSocket = ISocket::udp_connect(ep);
@@ -426,7 +429,10 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                 throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
 #ifdef SOCKET_SIMULATION
             if (isUdpTestMode)
-                flow_socket.setown(CSimulatedReadSocket::udp_create(SocketEndpoint(flow_port, myNode.getIpAddress())));
+                if (udpTestUseUdpSockets)
+                    flow_socket.setown(CSimulatedUdpReadSocket::udp_create(SocketEndpoint(flow_port, myNode.getIpAddress())));
+                else
+                    flow_socket.setown(CSimulatedQueueReadSocket::udp_create(SocketEndpoint(flow_port, myNode.getIpAddress())));
             else
 #endif
                 flow_socket.setown(ISocket::udp_create(flow_port));
@@ -619,8 +625,16 @@ class CReceiveManager : implements IReceiveManager, public CInterface
 #ifdef SOCKET_SIMULATION
             if (isUdpTestMode)
             {
-                receive_socket = CSimulatedReadSocket::udp_create(SocketEndpoint(parent.data_port, myNode.getIpAddress()));
-                selfFlowSocket = CSimulatedWriteSocket::udp_connect(SocketEndpoint(parent.receive_flow_port, myNode.getIpAddress()));
+                if (udpTestUseUdpSockets)
+                {
+                    receive_socket = CSimulatedUdpReadSocket::udp_create(SocketEndpoint(parent.data_port, myNode.getIpAddress()));
+                    selfFlowSocket = CSimulatedUdpWriteSocket::udp_connect(SocketEndpoint(parent.receive_flow_port, myNode.getIpAddress()));
+                }
+                else
+                {
+                    receive_socket = CSimulatedQueueReadSocket::udp_create(SocketEndpoint(parent.data_port, myNode.getIpAddress()));
+                    selfFlowSocket = CSimulatedQueueWriteSocket::udp_connect(SocketEndpoint(parent.receive_flow_port, myNode.getIpAddress()));
+                }
            }
             else
 #endif

+ 21 - 5
roxie/udplib/udptrs.cpp

@@ -583,8 +583,16 @@ public:
 #ifdef SOCKET_SIMULATION
                 if (isUdpTestMode)
                 {
-                    send_flow_socket = CSimulatedWriteSocket::udp_connect(sendFlowEp);
-                    data_socket = CSimulatedWriteSocket::udp_connect(dataEp);
+                    if (udpTestUseUdpSockets)
+                    {
+                        send_flow_socket = CSimulatedUdpWriteSocket::udp_connect(sendFlowEp);
+                        data_socket = CSimulatedUdpWriteSocket::udp_connect(dataEp);
+                    }
+                    else
+                    {
+                        send_flow_socket = CSimulatedQueueWriteSocket::udp_connect(sendFlowEp);
+                        data_socket = CSimulatedQueueWriteSocket::udp_connect(dataEp);
+                    }
                 }
                 else
 #endif
@@ -666,8 +674,11 @@ class CSendManager : implements ISendManager, public CInterface
 
         ~StartedThread()
         {
-            running = false;
-            join();
+            if (running)
+            {
+                running = false;
+                join();
+            }
         }
 
         virtual void start()
@@ -779,7 +790,12 @@ class CSendManager : implements ISendManager, public CInterface
                 throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
 #ifdef SOCKET_SIMULATION
             if (isUdpTestMode)
-                flow_socket.setown(CSimulatedReadSocket::udp_create(SocketEndpoint(receive_port, parent.myIP)));
+            {
+                if (udpTestUseUdpSockets)
+                    flow_socket.setown(CSimulatedUdpReadSocket::udp_create(SocketEndpoint(receive_port, parent.myIP)));
+                else
+                    flow_socket.setown(CSimulatedQueueReadSocket::udp_create(SocketEndpoint(receive_port, parent.myIP)));
+            }
             else
 #endif
                 flow_socket.setown(ISocket::udp_create(receive_port));