Browse Source

Merge pull request #15546 from richardkchapman/udp-simulator

HPCC-26780 Roxie UDP issues are very hard to track down

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 3 năm trước cách đây
mục cha
commit
7179a4d37d

+ 1 - 1
roxie/ccd/ccdqueue.cpp

@@ -2866,7 +2866,7 @@ public:
         unsigned dataPort = topology->getPropInt("@dataPort", CCD_DATA_PORT);
         unsigned clientFlowPort = topology->getPropInt("@clientFlowPort", CCD_CLIENT_FLOW_PORT);
         receiveManager.setown(createReceiveManager(serverFlowPort, dataPort, clientFlowPort, udpQueueSize, udpMaxSlotsPerClient, encryptionInTransit));
-        sendManager.setown(createSendManager(sendFlowOnDataPort ? dataPort : serverFlowPort, dataPort, clientFlowPort, udpSendQueueSize, fastLaneQueue ? 3 : 2, bucket, encryptionInTransit));
+        sendManager.setown(createSendManager(sendFlowOnDataPort ? dataPort : serverFlowPort, dataPort, clientFlowPort, udpSendQueueSize, fastLaneQueue ? 3 : 2, myNode.getIpAddress(), bucket, encryptionInTransit));
     }
 
     virtual void abortPendingData(const SocketEndpoint &ep) override

+ 1 - 1
roxie/udplib/udplib.hpp

@@ -151,7 +151,7 @@ interface ISendManager : extends IInterface
 };
 
 extern UDPLIB_API IReceiveManager *createReceiveManager(int server_flow_port, int data_port, int client_flow_port, int queue_size, unsigned maxSlotsPerSender, bool encrypted);
-extern UDPLIB_API ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter, bool encryptionInTransit);
+extern UDPLIB_API ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int queue_size_pr_server, int queues_pr_server, const IpAddress &myIP, TokenBucket *rateLimiter, bool encryptionInTransit);
 
 extern UDPLIB_API void setAeronProperties(const IPropertyTree *config);
 extern UDPLIB_API IReceiveManager *createAeronReceiveManager(const SocketEndpoint &ep, bool encrypted);

+ 183 - 0
roxie/udplib/udpsha.cpp

@@ -21,6 +21,7 @@
 #include "jlog.hpp"
 #include "roxie.hpp"
 #include "roxiemem.hpp"
+#include "portlist.h"
 
 #ifdef _WIN32
 #include <winsock2.h>
@@ -655,3 +656,185 @@ If you do need global:
 
 
 */
+
+/* Simulating traffic flow
+
+N threads that simulate behaviour of agents
+fake write socket that
+- accepts data pushed to it
+- moves it to a read socket
+fake read socket that
+- accepts packets from write sockets
+- discards when full (but tells you)
+- delivers packets to consumer
+
+*/
+
+#ifdef SOCKET_SIMULATION
+bool isUdpTestMode = false;
+
+
+CSimulatedWriteSocket* CSimulatedWriteSocket::udp_connect(const SocketEndpoint &ep)
+{
+    return new CSimulatedWriteSocket(ep);
+}
+
+size32_t CSimulatedWriteSocket::write(void const* buf, size32_t size)
+{
+    CriticalBlock b(CSimulatedReadSocket::allReadersCrit);
+    CSimulatedReadSocket *dest = CSimulatedReadSocket::connectSimulatedSocket(destEp);
+    if (dest)
+        dest->writeSimulatedPacket(buf, size);
+    else
+    {
+        StringBuffer s;
+        DBGLOG("Write to disconnected socket %s", destEp.getUrlStr(s).str());
+    }
+    return size;
+}
+
+std::map<SocketEndpoint, CSimulatedReadSocket *> CSimulatedReadSocket::allReaders;
+CriticalSection CSimulatedReadSocket::allReadersCrit;
+
+CSimulatedReadSocket::CSimulatedReadSocket(const SocketEndpoint &_me) : me(_me)
+{
+    StringBuffer s;
+    DBGLOG("Creating fake socket %s", me.getUrlStr(s).str());
+    CriticalBlock b(allReadersCrit);
+    allReaders[me] = this;
+}
+
+CSimulatedReadSocket::~CSimulatedReadSocket()
+{
+    StringBuffer s;
+    DBGLOG("Closing fake socket %s", me.getUrlStr(s).str());
+    CriticalBlock b(allReadersCrit);
+    allReaders.erase(me);
+}
+
+CSimulatedReadSocket* CSimulatedReadSocket::udp_create(const SocketEndpoint &_me)
+{
+    return new CSimulatedReadSocket(_me);
+}
+
+CSimulatedReadSocket* CSimulatedReadSocket::connectSimulatedSocket(const SocketEndpoint &ep)
+{
+    CriticalBlock b(allReadersCrit);
+    return allReaders[ep];
+}
+
+void CSimulatedReadSocket::writeSimulatedPacket(void const* buf, size32_t size)
+{
+    {
+        CriticalBlock b(crit);
+        if (size+used > max)
+        {
+            DBGLOG("Lost packet");
+            return;
+        }
+        packetSizes.push(size);
+        packets.push(memcpy(malloc(size), buf, size));
+        used += size;
+    }
+//    StringBuffer s; DBGLOG("Signalling available data on %s", me.getUrlStr(s).str());
+    avail.signal();
+}
+
+void CSimulatedReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs)
+{
+    unsigned tms = timeoutsecs == WAIT_FOREVER ? WAIT_FOREVER : timeoutsecs * 1000;
+    readtms(buf, min_size, max_size, size_read, tms);
+}
+
+void CSimulatedReadSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
+                     unsigned timeout)
+{
+    size_read = 0;
+    if (!timeout || wait_read(timeout))
+    {
+        CriticalBlock b(crit);
+        const void *thisData = packets.front();
+        unsigned thisSize = packetSizes.front();
+        if (thisSize > max_size)
+        {
+            assert(false);
+            UNIMPLEMENTED;  // Partial packet read not supported yet - add if needed
+        }
+        else
+        {
+            packets.pop();
+            packetSizes.pop();
+            used -= thisSize;
+        }
+        size_read = thisSize;
+        memcpy(buf, thisData, thisSize);
+        free((void *) thisData);
+    }
+    else
+        throw makeStringException(JSOCKERR_timeout_expired, "");
+}
+
+int CSimulatedReadSocket::wait_read(unsigned timeout)
+{
+    bool ret = avail.wait(timeout);
+    return ret;
+}
+
+#ifdef _USE_CPPUNIT
+
+class SimulatedUdpStressTest : public CppUnit::TestFixture
+{
+    CPPUNIT_TEST_SUITE(SimulatedUdpStressTest);
+    CPPUNIT_TEST(simulateTraffic);
+    CPPUNIT_TEST_SUITE_END();
+
+    Owned<IDataBufferManager> dbm;
+    bool initialized = false;
+
+    void testInit()
+    {
+        if (!initialized)
+        {
+            udpTraceLevel = 1;
+            udpTraceTimeouts = true;
+            udpResendLostPackets = true;
+            udpRequestToSendTimeout = 10000;
+            udpRequestToSendAckTimeout = 10000;
+            isUdpTestMode = true;
+            roxiemem::setTotalMemoryLimit(false, false, false, 20*1024*1024, 0, NULL, NULL);
+            dbm.setown(roxiemem::createDataBufferManager(roxiemem::DATA_ALIGNMENT_SIZE));
+            initialized = true;
+        }
+    }
+
+    void simulateTraffic()
+    {
+        testInit();
+        myNode.setIp(IpAddress("1.2.3.4"));
+        Owned<IReceiveManager> rm = createReceiveManager(CCD_SERVER_FLOW_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, 2, 2, false);
+        printf("Start test\n");
+        asyncFor(20, 20, [](unsigned i)
+        {
+            unsigned header = 0;
+            IpAddress pretendIP(VStringBuffer("8.8.8.%d", i));
+            // Note - this is assuming we send flow on the data port (that option defaults true in roxie too)
+            Owned<ISendManager> sm = createSendManager(CCD_DATA_PORT, CCD_DATA_PORT, CCD_CLIENT_FLOW_PORT, 100, 3, pretendIP, nullptr, false);
+            Owned<IMessagePacker> mp = sm->createMessagePacker(0, 0, &header, sizeof(header), myNode, 0);
+            for (unsigned i = 0; i < 10000; i++)
+            {
+                void *buf = mp->getBuffer(500, false);
+                memset(buf, i, 500);
+                mp->putBuffer(buf, 500, false);
+            }
+            mp->flush();
+            Sleep(1000);
+        });
+        printf("End test\n");
+    }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION( SimulatedUdpStressTest );
+CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( SimulatedUdpStressTest, "SimulatedUdpStressTest" );
+
+#endif
+#endif

+ 131 - 0
roxie/udplib/udpsha.hpp

@@ -22,6 +22,8 @@
 #include "roxiemem.hpp"
 #include "jcrc.hpp"
 #include <limits>
+#include <queue>
+#include <map>
 
 typedef unsigned sequence_t;
 #define SEQF
@@ -261,6 +263,135 @@ inline bool checkTraceLevel(unsigned category, unsigned level)
 {
     return (udpTraceLevel >= level);
 }
+#define SOCKET_SIMULATION
 
+#ifdef SOCKET_SIMULATION
+extern bool isUdpTestMode;
+
+class CSocketSimulator : public CInterfaceOf<ISocket>
+{
+private:
+    virtual void   read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
+                        unsigned timeoutsecs = WAIT_FOREVER) override { UNIMPLEMENTED; }
+    virtual void   readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
+                           unsigned timeout) override { UNIMPLEMENTED; }
+    virtual void   read(void* buf, size32_t size) override { UNIMPLEMENTED; }
+    virtual size32_t write(void const* buf, size32_t size) override { UNIMPLEMENTED; }
+    virtual size32_t writetms(void const* buf, size32_t size, unsigned timeoutms=WAIT_FOREVER) override { UNIMPLEMENTED; }
+
+    virtual size32_t get_max_send_size() override { UNIMPLEMENTED; }
+    virtual ISocket* accept(bool allowcancel=false, SocketEndpoint *peerEp = nullptr) override { UNIMPLEMENTED; }
+    virtual int logPollError(unsigned revents, const char *rwstr) override { UNIMPLEMENTED; }
+    virtual int wait_read(unsigned timeout) override { UNIMPLEMENTED; }
+    virtual int wait_write(unsigned timeout) override { UNIMPLEMENTED; }
+    virtual bool set_nonblock(bool on) override { UNIMPLEMENTED; }
+    virtual bool set_nagle(bool on) override { UNIMPLEMENTED; }
+    virtual void set_linger(int lingersecs) override { UNIMPLEMENTED; }
+    virtual void  cancel_accept() override { UNIMPLEMENTED; }
+    virtual void  shutdown(unsigned mode=SHUTDOWN_READWRITE) override { UNIMPLEMENTED; }
+    virtual int name(char *name,size32_t namemax) override { UNIMPLEMENTED; }
+    virtual int peer_name(char *name,size32_t namemax) override { UNIMPLEMENTED; }
+    virtual SocketEndpoint &getPeerEndpoint(SocketEndpoint &ep) override { UNIMPLEMENTED; }
+    virtual IpAddress &getPeerAddress(IpAddress &addr) override { UNIMPLEMENTED; }
+    virtual SocketEndpoint &getEndpoint(SocketEndpoint &ep) const override { UNIMPLEMENTED; }
+    virtual bool connectionless() override { UNIMPLEMENTED; }
+
+    virtual void set_return_addr(int port,const char *name) override { UNIMPLEMENTED; }
+    virtual void  set_block_mode (             // must be called before block operations
+                            unsigned flags,    // BF_* flags (must match receive_block)
+                          size32_t recsize=0,  // record size (required for rec compression)
+                            unsigned timeoutms=0 // timeout in milisecs (0 for no timeout)
+                  ) override { UNIMPLEMENTED; }
+
+
+
+    virtual bool  send_block(
+                            const void *blk,   // data to send
+                            size32_t sz          // size to send (0 for eof)
+                  ) override { UNIMPLEMENTED; }
+
+    virtual size32_t receive_block_size () override { UNIMPLEMENTED; }
+
+    virtual size32_t receive_block(
+                            void *blk,         // receive pointer
+                            size32_t sz          // max size to read (0 for sync eof)
+                                               // if less than block size truncates block
+                  ) override { UNIMPLEMENTED; }
+
+    virtual void  close() override { UNIMPLEMENTED; }
+
+    virtual unsigned OShandle() const override { UNIMPLEMENTED; }
+    virtual size32_t avail_read() override { UNIMPLEMENTED; }
+
+    virtual size32_t write_multiple(unsigned num,void const**buf, size32_t *size) override { UNIMPLEMENTED; }
+
+    virtual size32_t get_send_buffer_size() override { UNIMPLEMENTED; }
+    virtual void set_send_buffer_size(size32_t sz) override { UNIMPLEMENTED; }
+
+    virtual bool join_multicast_group(SocketEndpoint &ep) override { UNIMPLEMENTED; }
+    virtual bool leave_multicast_group(SocketEndpoint &ep) override { UNIMPLEMENTED; }
+
+    virtual void set_ttl(unsigned _ttl) override { UNIMPLEMENTED; }
+
+    virtual size32_t get_receive_buffer_size() override { UNIMPLEMENTED; }
+    virtual void set_receive_buffer_size(size32_t sz) override { UNIMPLEMENTED; }
+
+    virtual void set_keep_alive(bool set) override { UNIMPLEMENTED; }
+
+    virtual size32_t udp_write_to(const SocketEndpoint &ep,void const* buf, size32_t size) override { UNIMPLEMENTED; }
+    virtual bool check_connection() override { UNIMPLEMENTED; }
+
+    virtual bool isSecure() const override { UNIMPLEMENTED; }
+    virtual bool isValid() const override { UNIMPLEMENTED; }
+
+};
+
+class CSimulatedReadSocket : public CSocketSimulator
+{
+    friend class CSimulatedWriteSocket;
+
+    CSimulatedReadSocket(const SocketEndpoint &_me);
+    ~CSimulatedReadSocket();
+
+    std::queue<unsigned> packetSizes;
+    std::queue<const void *> packets;
+    unsigned max = 131072;
+    unsigned used = 0;
+    SocketEndpoint me;
+    CriticalSection crit;
+    Semaphore avail;
+
+    void writeSimulatedPacket(void const* buf, size32_t size);
+    static std::map<SocketEndpoint, CSimulatedReadSocket *> allReaders;
+    static CriticalSection allReadersCrit;
+
+public:
+    static CSimulatedReadSocket* udp_create(const SocketEndpoint &_me);
+    static CSimulatedReadSocket* connectSimulatedSocket(const SocketEndpoint &ep);
+
+    virtual size32_t get_receive_buffer_size() override { return max; }
+    virtual void set_receive_buffer_size(size32_t sz) override { max = sz; }
+    virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
+                      unsigned timeoutsecs = WAIT_FOREVER) override;
+    virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
+                         unsigned timeout) override;
+    virtual int wait_read(unsigned timeout) override;
+    virtual void close() override {}
+
+};
+
+class CSimulatedWriteSocket : public CSocketSimulator
+{
+    CSimulatedWriteSocket( const SocketEndpoint &ep) : destEp(ep) {}
+    const SocketEndpoint destEp;
+public:
+    static CSimulatedWriteSocket*  udp_connect( const SocketEndpoint &ep);
+    virtual size32_t write(void const* buf, size32_t size) override;
+    virtual void set_send_buffer_size(size32_t sz) override {};
+    virtual void close() override {};
+};
+
+
+#endif
 
 #endif

+ 26 - 6
roxie/udplib/udptrr.cpp

@@ -45,7 +45,7 @@
 using roxiemem::DataBuffer;
 using roxiemem::IRowManager;
 
-unsigned udpMaxPendingPermits;
+unsigned udpMaxPendingPermits = 1;
 
 RelaxedAtomic<unsigned> flowPermitsSent = {0};
 RelaxedAtomic<unsigned> flowRequestsReceived = {0};
@@ -130,7 +130,12 @@ class CReceiveManager : implements IReceiveManager, public CInterface
         UdpSenderEntry(const IpAddress &_dest, unsigned port) : dest(_dest)
         {
             SocketEndpoint ep(port, dest);
-            flowSocket = ISocket::udp_connect(ep);
+#ifdef SOCKET_SIMULATION
+            if (isUdpTestMode)
+                flowSocket = CSimulatedWriteSocket::udp_connect(ep);
+            else
+#endif
+                flowSocket = ISocket::udp_connect(ep);
         }
 
         ~UdpSenderEntry()
@@ -401,7 +406,12 @@ class CReceiveManager : implements IReceiveManager, public CInterface
         {
             if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0) 
                 throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
-            flow_socket.setown(ISocket::udp_create(flow_port));
+#ifdef SOCKET_SIMULATION
+            if (isUdpTestMode)
+                flow_socket.setown(CSimulatedReadSocket::udp_create(SocketEndpoint(flow_port, myNode.getIpAddress())));
+            else
+#endif
+                flow_socket.setown(ISocket::udp_create(flow_port));
             flow_socket->set_receive_buffer_size(udpFlowSocketsSize);
             size32_t actualSize = flow_socket->get_receive_buffer_size();
             DBGLOG("UdpReceiver: receive_receive_flow created port=%d sockbuffsize=%d actual %d", flow_port, udpFlowSocketsSize, actualSize);
@@ -588,8 +598,18 @@ class CReceiveManager : implements IReceiveManager, public CInterface
             if (ip_buffer < udpFlowSocketsSize) ip_buffer = udpFlowSocketsSize;
             if (check_max_socket_read_buffer(ip_buffer) < 0) 
                 throw MakeStringException(ROXIE_UDP_ERROR, "System socket max read buffer is less than %u", ip_buffer);
-            receive_socket = ISocket::udp_create(parent.data_port);
-            selfFlowSocket = ISocket::udp_connect(SocketEndpoint(parent.receive_flow_port, myNode.getIpAddress()));
+#ifdef SOCKET_SIMULATION
+            if (isUdpTestMode)
+            {
+                receive_socket = CSimulatedReadSocket::udp_create(SocketEndpoint(parent.data_port, myNode.getIpAddress()));
+                selfFlowSocket = CSimulatedWriteSocket::udp_connect(SocketEndpoint(parent.receive_flow_port, myNode.getIpAddress()));
+           }
+            else
+#endif
+            {
+                receive_socket = ISocket::udp_create(parent.data_port);
+                selfFlowSocket = ISocket::udp_connect(SocketEndpoint(parent.receive_flow_port, myNode.getIpAddress()));
+            }
             receive_socket->set_receive_buffer_size(ip_buffer);
             size32_t actualSize = receive_socket->get_receive_buffer_size();
             DBGLOG("UdpReceiver: rcv_data_socket created port=%d requested sockbuffsize=%d actual sockbuffsize=%d", parent.data_port, ip_buffer, actualSize);
@@ -841,7 +861,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                 E->Release();
             }
         }
-        if (udpTraceLevel && isDefault)
+        if (udpTraceLevel && isDefault && !isUdpTestMode)
         {
             StringBuffer s;
             DBGLOG("UdpReceiver: CPacketCollator NO msg collator found - using default - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%s", pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->node.getTraceText(s).str());

+ 50 - 33
roxie/udplib/udptrs.cpp

@@ -86,9 +86,9 @@ RelaxedAtomic<unsigned> flowRequestsSent;
 RelaxedAtomic<unsigned> flowPermitsReceived;
 RelaxedAtomic<unsigned> dataPacketsSent;
 
-unsigned udpResendTimeout;  // in millseconds
-bool udpResendLostPackets;
-bool udpAssumeSequential;
+unsigned udpResendTimeout = 10;  // in millseconds
+bool udpResendLostPackets = true;
+bool udpAssumeSequential = false;
 
 static unsigned lastResentReport = 0;
 static unsigned lastPacketsResent = 0;
@@ -157,7 +157,7 @@ public:
                     if (now-timeSent[idx] >= udpResendTimeout ||    // Note that this will block us from sending newer packets, if we have reached limit of tracking.
                         (udpAssumeSequential && (int)(seq - seen.lastSeen()) < 0))  // so we (optionally) assume any packet not received that is EARLIER than one that HAS been received is lost.
                     {
-                        if (udpTraceLevel > 1)
+                        if (udpTraceLevel > 1 || udpTraceTimeouts)
                             DBGLOG("Resending %" SEQF "u last sent %u ms ago", seq, now-timeSent[idx]);
                         timeSent[idx] = now;
                         packetsResent++;
@@ -213,8 +213,8 @@ private:
         {
             if (udpTraceLevel > 3 || udpTraceFlow)
             {
-                StringBuffer s;
-                DBGLOG("UdpSender: sending flowType::%s msg %" SEQF "u flowSeq %" SEQF "u to node=%s", flowType::name(msg.cmd), msg.sendSeq, msg.flowSeq, ip.getIpText(s).str());
+                StringBuffer s, s2;
+                DBGLOG("UdpSender[%s]: sending flowType::%s msg %" SEQF "u flowSeq %" SEQF "u to node=%s", msg.sourceNode.getTraceText(s2).str(), flowType::name(msg.cmd), msg.sendSeq, msg.flowSeq, ip.getIpText(s).str());
             }
             send_flow_socket->write(&msg, sizeof(UdpRequestToSendMsg));
             flowRequestsSent++;
@@ -310,8 +310,8 @@ public:
         if (udpTraceLevel || udpTraceFlow || udpTraceTimeouts)
         {
             StringBuffer s;
-            EXCLOG(MCoperatorError,"ERROR: UdpSender: timed out %i times (max=%i) waiting ok_to_send msg from node=%s",
-                   timeouts, udpMaxRetryTimedoutReqs, ip.getIpText(s).str());
+            EXCLOG(MCoperatorError,"ERROR: UdpSender: timed out %i times (max=%i, timeout=%u, expiryTime=%u) waiting ok_to_send msg from node=%s",
+                   timeouts, udpMaxRetryTimedoutReqs, udpRequestToSendAckTimeout, requestExpiryTime.load(), ip.getIpText(s).str());
         }
         // 0 (zero) value of udpMaxRetryTimedoutReqs means NO limit on retries
         CriticalBlock b(activeCrit);
@@ -560,8 +560,8 @@ public:
         return nullptr;
     }
 
-    UdpReceiverEntry(const IpAddress _ip, unsigned _numQueues, unsigned _queueSize, unsigned _sendFlowPort, unsigned _dataPort, bool _encrypted)
-    : ip (_ip), sourceIP(myNode.getIpAddress()), numQueues(_numQueues), isLocal(_ip.isLocal()), encrypted(_encrypted)
+    UdpReceiverEntry(const IpAddress _ip, const IpAddress _myIP, unsigned _numQueues, unsigned _queueSize, unsigned _sendFlowPort, unsigned _dataPort, bool _encrypted)
+    : ip (_ip), sourceIP(_myIP), numQueues(_numQueues), isLocal(_ip.isLocal()), encrypted(_encrypted)
     {
         assert(!initialized);
         assert(numQueues > 0);
@@ -571,8 +571,18 @@ public:
             {
                 SocketEndpoint sendFlowEp(_sendFlowPort, ip);
                 SocketEndpoint dataEp(_dataPort, ip);
-                send_flow_socket = ISocket::udp_connect(sendFlowEp);
-                data_socket = ISocket::udp_connect(dataEp);
+#ifdef SOCKET_SIMULATION
+                if (isUdpTestMode)
+                {
+                    send_flow_socket = CSimulatedWriteSocket::udp_connect(sendFlowEp);
+                    data_socket = CSimulatedWriteSocket::udp_connect(dataEp);
+                }
+                else
+#endif
+                {
+                    send_flow_socket = ISocket::udp_connect(sendFlowEp);
+                    data_socket = ISocket::udp_connect(dataEp);
+                }
                 if (isLocal)
                 {
                     data_socket->set_send_buffer_size(udpLocalWriteSocketSize);
@@ -602,8 +612,8 @@ public:
             initialized = true;
             if (udpTraceLevel > 0)
             {
-                StringBuffer ipStr;
-                DBGLOG("UdpSender: added entry for ip=%s to receivers table - send_flow_port=%d", ip.getIpText(ipStr).str(), _sendFlowPort);
+                StringBuffer ipStr, myIpStr;
+                DBGLOG("UdpSender[%s]: added entry for ip=%s to receivers table - send_flow_port=%d", _myIP.getIpText(myIpStr).str(), ip.getIpText(ipStr).str(), _sendFlowPort);
             }
         }
         if (udpResendLostPackets)
@@ -670,7 +680,7 @@ class CSendManager : implements ISendManager, public CInterface
         virtual int doRun() override
         {
             if (udpTraceLevel > 0)
-                DBGLOG("UdpSender: send_resend_flow started");
+                DBGLOG("UdpSender[%s]: send_resend_flow started", parent.myId);
             unsigned timeout = udpRequestToSendTimeout;
             while (running)
             {
@@ -687,7 +697,7 @@ class CSendManager : implements ISendManager, public CInterface
                     if (&receiverInfo != &dest)
                     {
                         StringBuffer s;
-                        DBGLOG("UdpSender: table entry %s does not find itself", dest.ip.getIpText(s).str());
+                        DBGLOG("UdpSender[%s]: table entry %s does not find itself", parent.myId, dest.ip.getIpText(s).str());
                         printStackReport();
 
                     }
@@ -752,17 +762,21 @@ class CSendManager : implements ISendManager, public CInterface
         CSendManager &parent;
         int      receive_port;
         Owned<ISocket> flow_socket;
-        
     public:
         send_receive_flow(CSendManager &_parent, int r_port) : StartedThread("UdpLib::send_receive_flow"), parent(_parent)
         {
             receive_port = r_port;
             if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0) 
                 throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
-            flow_socket.setown(ISocket::udp_create(receive_port));
+#ifdef SOCKET_SIMULATION
+            if (isUdpTestMode)
+                flow_socket.setown(CSimulatedReadSocket::udp_create(SocketEndpoint(receive_port, parent.myIP)));
+            else
+#endif
+                flow_socket.setown(ISocket::udp_create(receive_port));
             flow_socket->set_receive_buffer_size(udpFlowSocketsSize);
             size32_t actualSize = flow_socket->get_receive_buffer_size();
-            DBGLOG("UdpSender: rcv_flow_socket created port=%d sockbuffsize=%d actualsize=%d", receive_port, udpFlowSocketsSize, actualSize);
+            DBGLOG("UdpSender[%s]: rcv_flow_socket created port=%d sockbuffsize=%d actualsize=%d", parent.myId, receive_port, udpFlowSocketsSize, actualSize);
             start();
         }
         
@@ -777,7 +791,7 @@ class CSendManager : implements ISendManager, public CInterface
         virtual int doRun() 
         {
             if (udpTraceLevel > 0)
-                DBGLOG("UdpSender: send_receive_flow started");
+                DBGLOG("UdpSender[%s]: send_receive_flow started", parent.myId);
 #ifdef __linux__
             setLinuxThreadPriority(2);
 #endif
@@ -799,7 +813,7 @@ class CSendManager : implements ISendManager, public CInterface
                             if (udpTraceLevel > 2 || udpTraceFlow)
                             {
                                 StringBuffer s;
-                                DBGLOG("UdpSender: received ok_to_send msg max %d packets from node=%s seq %" SEQF "u", f.max_data, f.destNode.getTraceText(s).str(), f.flowSeq);
+                                DBGLOG("UdpSender[%s]: received ok_to_send msg max %d packets from node=%s seq %" SEQF "u", parent.myId, f.max_data, f.destNode.getTraceText(s).str(), f.flowSeq);
                             }
                             parent.data->ok_to_send(f);
                             break;
@@ -808,13 +822,13 @@ class CSendManager : implements ISendManager, public CInterface
                             if (udpTraceLevel > 2 || udpTraceFlow)
                             {
                                 StringBuffer s;
-                                DBGLOG("UdpSender: received request_received msg from node=%s seq %" SEQF "u", f.destNode.getTraceText(s).str(), f.flowSeq);
+                                DBGLOG("UdpSender[%s]: received request_received msg from node=%s seq %" SEQF "u", parent.myId, f.destNode.getTraceText(s).str(), f.flowSeq);
                             }
                             parent.receiversTable[f.destNode].requestAcknowledged();
                             break;
 
                         default: 
-                            DBGLOG("UdpSender: received unknown flow message type=%d", f.cmd);
+                            DBGLOG("UdpSender[%s]: received unknown flow message type=%d", parent.myId, f.cmd);
                         }
                     }
                     catch (IException *e) 
@@ -822,14 +836,14 @@ class CSendManager : implements ISendManager, public CInterface
                         if (running && e->errorCode() != JSOCKERR_timeout_expired)
                         {
                             StringBuffer s;
-                            DBGLOG("UdpSender: send_receive_flow::read failed port=%i %s", receive_port, e->errorMessage(s).str());
+                            DBGLOG("UdpSender[%s]: send_receive_flow::read failed port=%i %s", parent.myId, receive_port, e->errorMessage(s).str());
                         }
                         e->Release();
                     }
                     catch (...) 
                     {
                         if (running)   
-                            DBGLOG("UdpSender: send_receive_flow::unknown exception");
+                            DBGLOG("UdpSender[%s]: send_receive_flow::unknown exception", parent.myId);
                         MilliSleep(0);
                     }
                 }
@@ -868,7 +882,7 @@ class CSendManager : implements ISendManager, public CInterface
             else 
             {
                 StringBuffer s;
-                DBGLOG("UdpSender: push() failed - ignored ok_to_send msg - node=%s, maxData=%u", msg.destNode.getTraceText(s).str(), msg.max_data);
+                DBGLOG("UdpSender[%s]: push() failed - ignored ok_to_send msg - node=%s, maxData=%u", parent.myId, msg.destNode.getTraceText(s).str(), msg.max_data);
                 return false;
             }
         }
@@ -876,7 +890,7 @@ class CSendManager : implements ISendManager, public CInterface
         virtual int doRun() 
         {
             if (udpTraceLevel > 0)
-                DBGLOG("UdpSender: send_data started");
+                DBGLOG("UdpSender[%s]: send_data started", parent.myId);
         #ifdef __linux__
             setLinuxThreadPriority(1); // MORE - windows? Is this even a good idea? Must not send faster than receiver can pull off the socket
         #endif
@@ -893,11 +907,11 @@ class CSendManager : implements ISendManager, public CInterface
                 if (udpTraceLevel > 2)
                 {
                     StringBuffer s;
-                    DBGLOG("UdpSender: sent %u bytes to node=%s under permit %" SEQF "u", payload, permit.destNode.getTraceText(s).str(), permit.flowSeq);
+                    DBGLOG("UdpSender[%s]: sent %u bytes to node=%s under permit %" SEQF "u", parent.myId, payload, permit.destNode.getTraceText(s).str(), permit.flowSeq);
                 }
             }
             if (udpTraceLevel > 0)
-                DBGLOG("UdpSender: send_data stopped");
+                DBGLOG("UdpSender[%s]: send_data stopped", parent.myId);
             return 0;
         }
     };
@@ -909,6 +923,8 @@ class CSendManager : implements ISendManager, public CInterface
     unsigned numQueues;
 
     IpAddress myIP;
+    StringBuffer myIdStr;
+    const char *myId;
     IpMapOf<UdpReceiverEntry> receiversTable;
     send_resend_flow  *resend_flow;
     send_receive_flow *receive_flow;
@@ -934,9 +950,10 @@ public:
     CSendManager(int server_flow_port, int data_port, int client_flow_port, int q_size, int _numQueues, const IpAddress &_myIP, TokenBucket *_bucket, bool _encrypted)
         : bucket(_bucket),
           myIP(_myIP),
-          receiversTable([_numQueues, q_size, server_flow_port, data_port, _encrypted](const ServerIdentifier ip) { return new UdpReceiverEntry(ip.getIpAddress(), _numQueues, q_size, server_flow_port, data_port, _encrypted);}),
+          receiversTable([_numQueues, q_size, server_flow_port, data_port, _encrypted, this](const ServerIdentifier ip) { return new UdpReceiverEntry(ip.getIpAddress(), myIP, _numQueues, q_size, server_flow_port, data_port, _encrypted);}),
           encrypted(_encrypted)
     {
+        myId = myIP.getIpText(myIdStr).str();
 #ifndef _WIN32
         setpriority(PRIO_PROCESS, 0, -3);
 #endif
@@ -1003,10 +1020,10 @@ public:
 
 };
 
-ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter, bool encryptionInTransit)
+ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int queue_size_pr_server, int queues_pr_server, const IpAddress &_myIP, TokenBucket *rateLimiter, bool encryptionInTransit)
 {
-    assertex(!myNode.getIpAddress().isNull());
-    return new CSendManager(server_flow_port, data_port, client_flow_port, queue_size_pr_server, queues_pr_server, myNode.getIpAddress(), rateLimiter, encryptionInTransit);
+    assertex(!_myIP.isNull());
+    return new CSendManager(server_flow_port, data_port, client_flow_port, queue_size_pr_server, queues_pr_server, _myIP, rateLimiter, encryptionInTransit);
 }
 
 class CMessagePacker : implements IMessagePacker, public CInterface

+ 1 - 1
roxie/udplib/uttest.cpp

@@ -293,7 +293,7 @@ void testNxN()
     if (useAeron)
         sendMgr.setown(createAeronSendManager(7000, udpNumQs, myNode.getIpAddress(), false));
     else
-        sendMgr.setown(createSendManager(7000, 7001, 7002, 100, udpNumQs, NULL, false));
+        sendMgr.setown(createSendManager(7000, 7001, 7002, 100, udpNumQs, myNode.getIpAddress(), nullptr, false));
     Receiver receiver;
 
     IMessagePacker **packers = new IMessagePacker *[numNodes];

+ 1 - 1
testing/unittests/unittests.cpp

@@ -167,7 +167,7 @@ int main(int argc, char* argv[])
         }
     }
     if (verbose)
-        queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time);
+        queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time|MSGFIELD_microTime|MSGFIELD_milliTime|MSGFIELD_thread);
     else
         removeLog();