Ver código fonte

Merge pull request #15620 from richardkchapman/udpsim-ooo

HPCC-26906 Add variable delay and jitter to udp simulator

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 3 anos atrás
pai
commit
1870623e38
3 arquivos alterados com 174 adições e 7 exclusões
  1. 139 6
      roxie/udplib/udpsha.cpp
  2. 14 1
      roxie/udplib/udpsha.hpp
  3. 21 0
      roxie/udplib/udpsim.cpp

+ 139 - 6
roxie/udplib/udpsha.cpp

@@ -682,22 +682,137 @@ fake read socket that
 #ifdef SOCKET_SIMULATION
 bool isUdpTestMode = false;
 bool udpTestUseUdpSockets = true;
+bool udpTestSocketJitter = false;
+unsigned udpTestSocketDelay = 0;
+bool udpTestVariableDelay = false;
+
+
+static CriticalSection allWriteSocketsCrit;
+static ICopyArrayOf<CSimulatedQueueWriteSocket> allWriteSockets;
+
+class DelayedSocketWriter : public Thread
+{
+public:
+    virtual int run() override
+    {
+        while (running)
+        {
+            unsigned shortestDelay = udpTestSocketDelay;
+            unsigned now = msTick();
+            {
+                CriticalBlock b(allWriteSocketsCrit);
+                ForEachItemIn(idx, allWriteSockets)
+                {
+                    CSimulatedQueueWriteSocket &ws = allWriteSockets.item(idx);
+                    shortestDelay = std::min(shortestDelay, ws.writeDelayed(now));
+                }
+            }
+            MilliSleep(shortestDelay);
+        }
+        return 0;
+    }
+    virtual void start()
+    {
+        running = true;
+        Thread::start();
+    }
+    void stop()
+    {
+        running = false;
+        join();
+    }
+private:
+    std::atomic<bool> running = { false };
+} delayedWriter;
+
+CSimulatedQueueWriteSocket::CSimulatedQueueWriteSocket(const SocketEndpoint &ep) : destEp(ep), delay(udpTestSocketDelay), jitter(udpTestSocketJitter)
+{
+    if (delay)
+    {
+        CriticalBlock b(allWriteSocketsCrit);
+        if (!allWriteSockets.length())
+            delayedWriter.start();
+        allWriteSockets.append(*this);
+    }
+}
+
+CSimulatedQueueWriteSocket::~CSimulatedQueueWriteSocket()
+{
+    if (delay)
+    {
+        CriticalBlock b(allWriteSocketsCrit);
+        allWriteSockets.zap(*this);
+        if (!allWriteSockets.length())
+            delayedWriter.stop();
+    }
+}
 
 CSimulatedQueueWriteSocket* CSimulatedQueueWriteSocket::udp_connect(const SocketEndpoint &ep)
 {
     return new CSimulatedQueueWriteSocket(ep);
 }
 
+unsigned CSimulatedQueueWriteSocket::writeDelayed(unsigned now)
+{
+    CriticalBlock b(crit);
+    while (dueTimes.size())
+    {
+        int delay = dueTimes.front() - now;
+        if (delay > 0)
+            return delay;
+        unsigned jitteredSize = 0;
+        const void *jitteredBuff = nullptr;
+        if (jitter && dueTimes.size()>1 && rand() % 100 == 0)
+        {
+            jitteredSize = packetSizes.front();
+            jitteredBuff = packets.front();
+            dueTimes.pop();
+            packets.pop();
+            packetSizes.pop();
+        }
+        CriticalBlock b(CSimulatedQueueReadSocket::allReadersCrit);
+        CSimulatedQueueReadSocket *dest = CSimulatedQueueReadSocket::connectSimulatedSocket(destEp);
+        if (dest)
+        {
+            dest->writeOwnSimulatedPacket(packets.front(), packetSizes.front());
+            if (jitteredBuff)
+                dest->writeOwnSimulatedPacket(jitteredBuff, jitteredSize);
+        }
+        else
+        {
+            StringBuffer s;
+            free((void *) packets.front());
+            if (jitteredBuff)
+                free((void *) jitteredBuff);
+            DBGLOG("Write to disconnected socket %s", destEp.getUrlStr(s).str());
+        }
+        dueTimes.pop();
+        packets.pop();
+        packetSizes.pop();
+    }
+    return (unsigned) -1;
+}
+
 size32_t CSimulatedQueueWriteSocket::write(void const* buf, size32_t size)
 {
-    CriticalBlock b(CSimulatedQueueReadSocket::allReadersCrit);
-    CSimulatedQueueReadSocket *dest = CSimulatedQueueReadSocket::connectSimulatedSocket(destEp);
-    if (dest)
-        dest->writeSimulatedPacket(buf, size);
+    if (delay)
+    {
+        CriticalBlock b(crit);
+        packetSizes.push(size);
+        packets.push(memcpy(malloc(size), buf, size));
+        dueTimes.push(msTick() + delay * (udpTestVariableDelay && size>200 ? 1 : 3));
+    }
     else
     {
-        StringBuffer s;
-        DBGLOG("Write to disconnected socket %s", destEp.getUrlStr(s).str());
+        CriticalBlock b(CSimulatedQueueReadSocket::allReadersCrit);
+        CSimulatedQueueReadSocket *dest = CSimulatedQueueReadSocket::connectSimulatedSocket(destEp);
+        if (dest)
+            dest->writeSimulatedPacket(buf, size);
+        else
+        {
+            StringBuffer s;
+            DBGLOG("Write to disconnected socket %s", destEp.getUrlStr(s).str());
+        }
     }
     return size;
 }
@@ -749,6 +864,24 @@ void CSimulatedQueueReadSocket::writeSimulatedPacket(void const* buf, size32_t s
     avail.signal();
 }
 
+void CSimulatedQueueReadSocket::writeOwnSimulatedPacket(void const* buf, size32_t size)
+{
+    {
+        CriticalBlock b(crit);
+        if (size+used > max)
+        {
+            DBGLOG("Lost packet");
+            free((void *) buf);
+            return;
+        }
+        packetSizes.push(size);
+        packets.push(buf);
+        used += size;
+    }
+//    StringBuffer s; DBGLOG("Signalling available data on %s", me.getUrlStr(s).str());
+    avail.signal();
+}
+
 void CSimulatedQueueReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs)
 {
     unsigned tms = timeoutsecs == WAIT_FOREVER ? WAIT_FOREVER : timeoutsecs * 1000;

+ 14 - 1
roxie/udplib/udpsha.hpp

@@ -285,6 +285,9 @@ extern unsigned flowPacketsSent[flowType::max_flow_cmd];
 
 extern UDPLIB_API bool isUdpTestMode;
 extern UDPLIB_API bool udpTestUseUdpSockets;
+extern UDPLIB_API bool udpTestSocketJitter;
+extern UDPLIB_API unsigned udpTestSocketDelay;
+extern UDPLIB_API bool udpTestVariableDelay;
 
 class CSocketSimulator : public CInterfaceOf<ISocket>
 {
@@ -381,6 +384,7 @@ class CSimulatedQueueReadSocket : public CSocketSimulator
     Semaphore avail;
 
     void writeSimulatedPacket(void const* buf, size32_t size);
+    void writeOwnSimulatedPacket(void const* buf, size32_t size);
     static std::map<SocketEndpoint, CSimulatedQueueReadSocket *> allReaders;
     static CriticalSection allReadersCrit;
 
@@ -402,13 +406,22 @@ public:
 
 class CSimulatedQueueWriteSocket : public CSocketSimulator
 {
-    CSimulatedQueueWriteSocket( const SocketEndpoint &ep) : destEp(ep) {}
+    CSimulatedQueueWriteSocket(const SocketEndpoint &ep);
+    ~CSimulatedQueueWriteSocket();
     const SocketEndpoint destEp;
+    CriticalSection crit;
+    std::queue<unsigned> dueTimes;
+    std::queue<unsigned> packetSizes;
+    std::queue<const void *> packets;
+    unsigned delay = 0;
+    bool jitter = false;
 public:
     static CSimulatedQueueWriteSocket*  udp_connect( const SocketEndpoint &ep);
     virtual size32_t write(void const* buf, size32_t size) override;
     virtual void set_send_buffer_size(size32_t sz) override {};
     virtual void close() override {};
+
+    unsigned writeDelayed(unsigned now);
 };
 
 

+ 21 - 0
roxie/udplib/udpsim.cpp

@@ -56,6 +56,9 @@ udpsim:
   udpRequestToSendTimeout: 1000
   udpRequestToSendAckTimeout: 1000
   udpMaxPendingPermits: 1
+  udpTestSocketDelay: 0
+  udpTestSocketJitter: false
+  udpTestVariableDelay: false
   udpTraceFlow: false
   useQueue: false
 )!!";
@@ -154,6 +157,24 @@ void initOptions(int argc, const char **argv)
     udpTraceFlow = options->getPropBool("@udpTraceFlow", false);
     packetsPerThread = options->getPropInt("@packetsPerThread");
     udpTestUseUdpSockets = !options->getPropBool("@useQueue");
+    udpTestSocketDelay = options->getPropInt("@udpTestSocketDelay", 0);
+    udpTestSocketJitter = options->getPropBool("@udpTestSocketJitter");
+    udpTestVariableDelay = options->getPropBool("@udpTestVariableDelay");
+    if (udpTestSocketJitter && !udpTestSocketDelay)
+    {
+        printf("udpTestSocketDelay requires udpTestSocketDelay to be set - setting to 1\n");
+        udpTestSocketDelay = 1;
+    }
+    if (udpTestVariableDelay && !udpTestSocketDelay)
+    {
+        printf("udpTestVariableDelay requires udpTestSocketDelay to be set - setting to 1\n");
+        udpTestSocketDelay = 1;
+    }
+    if (udpTestSocketDelay && udpTestUseUdpSockets)
+    {
+        printf("udpTestSocketDelay requires queue mode (--useQueue=1) - setting it on\n");
+        udpTestUseUdpSockets = false;
+    }
 
     isUdpTestMode = true;
     roxiemem::setTotalMemoryLimit(false, false, false, 20*1024*1024, 0, NULL, NULL);