瀏覽代碼

HPCC-26815 Improve the simulated UDP tests

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Richard Chapman 3 年之前
父節點
當前提交
d77848fbad
共有 4 個文件被更改,包括 172 次插入46 次删除
  1. 119 35
      roxie/udplib/udpsha.cpp
  2. 50 10
      roxie/udplib/udpsha.hpp
  3. 2 0
      roxie/udplib/udptrr.cpp
  4. 1 1
      roxie/udplib/udptrs.cpp

+ 119 - 35
roxie/udplib/udpsha.cpp

@@ -674,15 +674,15 @@ fake read socket that
 bool isUdpTestMode = false;
 
 
-CSimulatedWriteSocket* CSimulatedWriteSocket::udp_connect(const SocketEndpoint &ep)
+CSimulatedQueueWriteSocket* CSimulatedQueueWriteSocket::udp_connect(const SocketEndpoint &ep)
 {
-    return new CSimulatedWriteSocket(ep);
+    return new CSimulatedQueueWriteSocket(ep);
 }
 
-size32_t CSimulatedWriteSocket::write(void const* buf, size32_t size)
+size32_t CSimulatedQueueWriteSocket::write(void const* buf, size32_t size)
 {
-    CriticalBlock b(CSimulatedReadSocket::allReadersCrit);
-    CSimulatedReadSocket *dest = CSimulatedReadSocket::connectSimulatedSocket(destEp);
+    CriticalBlock b(CSimulatedQueueReadSocket::allReadersCrit);
+    CSimulatedQueueReadSocket *dest = CSimulatedQueueReadSocket::connectSimulatedSocket(destEp);
     if (dest)
         dest->writeSimulatedPacket(buf, size);
     else
@@ -693,10 +693,10 @@ size32_t CSimulatedWriteSocket::write(void const* buf, size32_t size)
     return size;
 }
 
-std::map<SocketEndpoint, CSimulatedReadSocket *> CSimulatedReadSocket::allReaders;
-CriticalSection CSimulatedReadSocket::allReadersCrit;
+std::map<SocketEndpoint, CSimulatedQueueReadSocket *> CSimulatedQueueReadSocket::allReaders;
+CriticalSection CSimulatedQueueReadSocket::allReadersCrit;
 
-CSimulatedReadSocket::CSimulatedReadSocket(const SocketEndpoint &_me) : me(_me)
+CSimulatedQueueReadSocket::CSimulatedQueueReadSocket(const SocketEndpoint &_me) : me(_me)
 {
     StringBuffer s;
     DBGLOG("Creating fake socket %s", me.getUrlStr(s).str());
@@ -704,7 +704,7 @@ CSimulatedReadSocket::CSimulatedReadSocket(const SocketEndpoint &_me) : me(_me)
     allReaders[me] = this;
 }
 
-CSimulatedReadSocket::~CSimulatedReadSocket()
+CSimulatedQueueReadSocket::~CSimulatedQueueReadSocket()
 {
     StringBuffer s;
     DBGLOG("Closing fake socket %s", me.getUrlStr(s).str());
@@ -712,18 +712,18 @@ CSimulatedReadSocket::~CSimulatedReadSocket()
     allReaders.erase(me);
 }
 
-CSimulatedReadSocket* CSimulatedReadSocket::udp_create(const SocketEndpoint &_me)
+CSimulatedQueueReadSocket* CSimulatedQueueReadSocket::udp_create(const SocketEndpoint &_me)
 {
-    return new CSimulatedReadSocket(_me);
+    return new CSimulatedQueueReadSocket(_me);
 }
 
-CSimulatedReadSocket* CSimulatedReadSocket::connectSimulatedSocket(const SocketEndpoint &ep)
+CSimulatedQueueReadSocket* CSimulatedQueueReadSocket::connectSimulatedSocket(const SocketEndpoint &ep)
 {
     CriticalBlock b(allReadersCrit);
     return allReaders[ep];
 }
 
-void CSimulatedReadSocket::writeSimulatedPacket(void const* buf, size32_t size)
+void CSimulatedQueueReadSocket::writeSimulatedPacket(void const* buf, size32_t size)
 {
     {
         CriticalBlock b(crit);
@@ -740,13 +740,13 @@ void CSimulatedReadSocket::writeSimulatedPacket(void const* buf, size32_t size)
     avail.signal();
 }
 
-void CSimulatedReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs)
+void CSimulatedQueueReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs)
 {
     unsigned tms = timeoutsecs == WAIT_FOREVER ? WAIT_FOREVER : timeoutsecs * 1000;
     readtms(buf, min_size, max_size, size_read, tms);
 }
 
-void CSimulatedReadSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
+void CSimulatedQueueReadSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
                      unsigned timeout)
 {
     size_read = 0;
@@ -774,12 +774,80 @@ void CSimulatedReadSocket::readtms(void* buf, size32_t min_size, size32_t max_si
         throw makeStringException(JSOCKERR_timeout_expired, "");
 }
 
-int CSimulatedReadSocket::wait_read(unsigned timeout)
+int CSimulatedQueueReadSocket::wait_read(unsigned timeout)
 {
     bool ret = avail.wait(timeout);
     return ret;
 }
 
+
+//------------------------------------------------------------------------------------------------------------------------------
+
+//Hash the ip and port and map that to a local port - complain if more than one combination is hashed to the same port.
+constexpr unsigned basePort = 9010;
+constexpr unsigned maxPorts = 980;
+static std::atomic_bool connected[maxPorts];
+unsigned getMappedSocketPort(const SocketEndpoint & ep)
+{
+    unsigned hash = ep.hash(0x31415926);
+    return basePort + hash % maxPorts;
+}
+
+CSimulatedUdpReadSocket::CSimulatedUdpReadSocket(const SocketEndpoint &_me)
+{
+    unsigned port = getMappedSocketPort(_me);
+    if (connected[port-basePort].exchange(true))
+        throw makeStringException(0, "Two ip/ports mapped to the same port - improve the hash (or change maxPorts)!");
+    realSocket.setown(ISocket::udp_create(port));
+}
+
+size32_t CSimulatedUdpReadSocket::get_receive_buffer_size() { return realSocket->get_receive_buffer_size(); }
+void CSimulatedUdpReadSocket::set_receive_buffer_size(size32_t sz) { realSocket->set_receive_buffer_size(sz); }
+void CSimulatedUdpReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs)
+{
+    realSocket->read(buf, min_size, max_size, size_read, timeoutsecs);
+}
+void CSimulatedUdpReadSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout)
+{
+    realSocket->readtms(buf, min_size, max_size, size_read, timeout);
+}
+int CSimulatedUdpReadSocket::wait_read(unsigned timeout)
+{
+    return realSocket->wait_read(timeout);
+}
+void CSimulatedUdpReadSocket::close()
+{
+    realSocket->close();
+}
+
+CSimulatedUdpReadSocket* CSimulatedUdpReadSocket::udp_create(const SocketEndpoint &_me) { return new CSimulatedUdpReadSocket(_me); }
+
+CSimulatedUdpWriteSocket::CSimulatedUdpWriteSocket( const SocketEndpoint &ep)
+{
+    unsigned port = getMappedSocketPort(ep);
+    SocketEndpoint localEp(port, queryLocalIP());
+    realSocket.setown(ISocket::udp_connect(localEp));
+}
+
+CSimulatedUdpWriteSocket*  CSimulatedUdpWriteSocket::udp_connect( const SocketEndpoint &ep) { return new CSimulatedUdpWriteSocket(ep); }
+
+size32_t CSimulatedUdpWriteSocket::write(void const* buf, size32_t size)
+{
+    return realSocket->write(buf, size);
+}
+void CSimulatedUdpWriteSocket::set_send_buffer_size(size32_t sz)
+{
+    realSocket->set_send_buffer_size(sz);
+}
+void CSimulatedUdpWriteSocket::close()
+{
+    realSocket->close();
+}
+
+
+
+//-----------------------------------------------------------------------------------------------------
+
 #ifdef _USE_CPPUNIT
 
 class SimulatedUdpStressTest : public CppUnit::TestFixture
@@ -800,6 +868,8 @@ class SimulatedUdpStressTest : public CppUnit::TestFixture
             udpResendLostPackets = true;
             udpRequestToSendTimeout = 10000;
             udpRequestToSendAckTimeout = 10000;
+            udpMaxPendingPermits = 1;
+            udpTraceFlow = 0;
             isUdpTestMode = true;
             roxiemem::setTotalMemoryLimit(false, false, false, 20*1024*1024, 0, NULL, NULL);
             dbm.setown(roxiemem::createDataBufferManager(roxiemem::DATA_ALIGNMENT_SIZE));
@@ -809,27 +879,41 @@ class SimulatedUdpStressTest : public CppUnit::TestFixture
 
     void simulateTraffic()
     {
-        testInit();
-        myNode.setIp(IpAddress("1.2.3.4"));
-        Owned<IReceiveManager> rm = createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, 2, 2, false);
-        printf("Start test\n");
-        asyncFor(20, 20, [](unsigned i)
+        constexpr unsigned numReceiveSlots = 100;
+        constexpr unsigned maxSlotsPerClient = 100;
+        constexpr unsigned maxSendQueueSize = 100;
+        try
         {
-            unsigned header = 0;
-            IpAddress pretendIP(VStringBuffer("8.8.8.%d", i));
-            // Note - this is assuming we send flow on the data port (that option defaults true in roxie too)
-            Owned<ISendManager> sm = createSendManager(CCD_DATA_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, 100, 3, pretendIP, nullptr, false);
-            Owned<IMessagePacker> mp = sm->createMessagePacker(0, 0, &header, sizeof(header), myNode, 0);
-            for (unsigned i = 0; i < 10000; i++)
+            testInit();
+            myNode.setIp(IpAddress("1.2.3.4"));
+            Owned<IReceiveManager> rm = createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, numReceiveSlots, maxSlotsPerClient, false);
+            unsigned begin = msTick();
+            printf("Start test\n");
+            asyncFor(20, 20, [](unsigned i)
             {
-                void *buf = mp->getBuffer(500, false);
-                memset(buf, i, 500);
-                mp->putBuffer(buf, 500, false);
-            }
-            mp->flush();
-            Sleep(1000);
-        });
-        printf("End test\n");
+                unsigned header = 0;
+                IpAddress pretendIP(VStringBuffer("8.8.8.%d", i));
+                // Note - this is assuming we send flow on the data port (that option defaults true in roxie too)
+                Owned<ISendManager> sm = createSendManager(CCD_DATA_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, maxSendQueueSize, 3, pretendIP, nullptr, false);
+                Owned<IMessagePacker> mp = sm->createMessagePacker(0, 0, &header, sizeof(header), myNode, 0);
+                for (unsigned i = 0; i < 10000; i++)
+                {
+                    void *buf = mp->getBuffer(500, false);
+                    memset(buf, i, 500);
+                    mp->putBuffer(buf, 500, false);
+                }
+                mp->flush();
+                Sleep(100);
+            });
+            //Ideally we would wait until all the packets have been received, but there isn't a simple way to do that
+            printf("End test %u\n", msTick() - begin);
+        }
+        catch (IException * e)
+        {
+            StringBuffer msg;
+            printf("Exception: %s\n", e->errorMessage(msg).str());
+            throw;
+        }
     }
 };
 

+ 50 - 10
roxie/udplib/udpsha.hpp

@@ -264,6 +264,7 @@ inline bool checkTraceLevel(unsigned category, unsigned level)
     return (udpTraceLevel >= level);
 }
 #define SOCKET_SIMULATION
+#define SOCKET_SIMULATION_UDP
 
 #ifdef SOCKET_SIMULATION
 extern bool isUdpTestMode;
@@ -346,12 +347,12 @@ private:
 
 };
 
-class CSimulatedReadSocket : public CSocketSimulator
+class CSimulatedQueueReadSocket : public CSocketSimulator
 {
-    friend class CSimulatedWriteSocket;
+    friend class CSimulatedQueueWriteSocket;
 
-    CSimulatedReadSocket(const SocketEndpoint &_me);
-    ~CSimulatedReadSocket();
+    CSimulatedQueueReadSocket(const SocketEndpoint &_me);
+    ~CSimulatedQueueReadSocket();
 
     std::queue<unsigned> packetSizes;
     std::queue<const void *> packets;
@@ -362,12 +363,12 @@ class CSimulatedReadSocket : public CSocketSimulator
     Semaphore avail;
 
     void writeSimulatedPacket(void const* buf, size32_t size);
-    static std::map<SocketEndpoint, CSimulatedReadSocket *> allReaders;
+    static std::map<SocketEndpoint, CSimulatedQueueReadSocket *> allReaders;
     static CriticalSection allReadersCrit;
 
 public:
-    static CSimulatedReadSocket* udp_create(const SocketEndpoint &_me);
-    static CSimulatedReadSocket* connectSimulatedSocket(const SocketEndpoint &ep);
+    static CSimulatedQueueReadSocket* udp_create(const SocketEndpoint &_me);
+    static CSimulatedQueueReadSocket* connectSimulatedSocket(const SocketEndpoint &ep);
 
     virtual size32_t get_receive_buffer_size() override { return max; }
     virtual void set_receive_buffer_size(size32_t sz) override { max = sz; }
@@ -380,18 +381,57 @@ public:
 
 };
 
-class CSimulatedWriteSocket : public CSocketSimulator
+class CSimulatedQueueWriteSocket : public CSocketSimulator
 {
-    CSimulatedWriteSocket( const SocketEndpoint &ep) : destEp(ep) {}
+    CSimulatedQueueWriteSocket( const SocketEndpoint &ep) : destEp(ep) {}
     const SocketEndpoint destEp;
 public:
-    static CSimulatedWriteSocket*  udp_connect( const SocketEndpoint &ep);
+    static CSimulatedQueueWriteSocket*  udp_connect( const SocketEndpoint &ep);
     virtual size32_t write(void const* buf, size32_t size) override;
     virtual void set_send_buffer_size(size32_t sz) override {};
     virtual void close() override {};
 };
 
 
+class CSimulatedUdpSocket : public CSocketSimulator
+{
+protected:
+    Owned<ISocket> realSocket;
+};
+class CSimulatedUdpReadSocket : public CSimulatedUdpSocket
+{
+    CSimulatedUdpReadSocket(const SocketEndpoint &_me);
+
+public:
+    static CSimulatedUdpReadSocket* udp_create(const SocketEndpoint &_me);
+
+    virtual size32_t get_receive_buffer_size() override;
+    virtual void set_receive_buffer_size(size32_t sz) override;
+    virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs = WAIT_FOREVER) override;
+    virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout) override;
+    virtual int wait_read(unsigned timeout) override;
+    virtual void close() override;
+
+};
+
+class CSimulatedUdpWriteSocket : public CSimulatedUdpSocket
+{
+    CSimulatedUdpWriteSocket( const SocketEndpoint &ep);
+public:
+    static CSimulatedUdpWriteSocket*  udp_connect( const SocketEndpoint &ep);
+    virtual size32_t write(void const* buf, size32_t size) override;
+    virtual void set_send_buffer_size(size32_t sz) override;
+    virtual void close() override;
+};
+
+#ifdef SOCKET_SIMULATION_UDP
+using CSimulatedWriteSocket = CSimulatedUdpWriteSocket;
+using CSimulatedReadSocket = CSimulatedUdpReadSocket;
+#else
+using CSimulatedWriteSocket = CSimulatedQueueWriteSocket;
+using CSimulatedReadSocket = CSimulatedQueueReadSocket;
+#endif
+
 #endif
 
 #endif

+ 2 - 0
roxie/udplib/udptrr.cpp

@@ -625,6 +625,8 @@ class CReceiveManager : implements IReceiveManager, public CInterface
         
         ~receive_data()
         {
+            DBGLOG("Total data packets seen = %u OOO(%u) Requests(%u) Permits(%u)", dataPacketsReceived.load(), packetsOOO.load(), flowRequestsReceived.load(), flowRequestsSent.load());
+
             running = false;
             if (receive_socket)
                 receive_socket->close();

+ 1 - 1
roxie/udplib/udptrs.cpp

@@ -804,7 +804,7 @@ class CSendManager : implements ISendManager, public CInterface
                     try 
                     {
                         unsigned int res;
-                        flow_socket->read(&f, readsize, readsize, res, 5);
+                        flow_socket->readtms(&f, readsize, readsize, res, 5000);
                         flowPermitsReceived++;
                         assert(res==readsize);
                         switch (f.cmd)