Browse Source

HPCC-25574 Roxie UDP layer resend logic

Added logic to resend packets that have not been received
Removed hard limit on received packet queue
Added ability to request from more than one sender
Added code to track resends, duplicates etc

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 4 years ago
parent
commit
e6c74f568e

+ 23 - 4
roxie/ccd/ccdcontext.cpp

@@ -1184,7 +1184,9 @@ protected:
     Owned<IPropertyTree> probeQuery;
     unsigned lastWuAbortCheck;
     unsigned startTime;
-    unsigned totAgentsReplyLen;
+    std::atomic<unsigned> totAgentsReplyLen = {0};
+    std::atomic<unsigned> totAgentsDuplicates = {0};
+    std::atomic<unsigned> totAgentsResends = {0};
     CCycleTimer elapsedTimer;
 
     QueryOptions options;
@@ -1264,6 +1266,8 @@ public:
         aborted = false;
         exceptionLogged = false;
         totAgentsReplyLen = 0;
+        totAgentsDuplicates = 0;
+        totAgentsResends = 0;
 
         allocatorMetaCache.setown(createRowAllocatorCache(this));
         rowManager.setown(roxiemem::createRowManager(options.memoryLimit, this, logctx, allocatorMetaCache, false));
@@ -1676,10 +1680,13 @@ public:
         return *rowManager;
     }
 
-    virtual void addAgentsReplyLen(unsigned len)
+    virtual void addAgentsReplyLen(unsigned len, unsigned duplicates, unsigned resends)
     {
-        CriticalBlock b(statsCrit); // MORE: change to atomic_add, or may not need it at all?
         totAgentsReplyLen += len;
+        if (duplicates)
+            totAgentsDuplicates += duplicates;
+        if (resends)
+            totAgentsResends += resends;
     }
 
     virtual const char *loadResource(unsigned id)
@@ -2582,6 +2589,8 @@ protected:
     void init()
     {
         totAgentsReplyLen = 0;
+        totAgentsDuplicates = 0;
+        totAgentsResends = 0;
         isRaw = false;
         isBlocked = false;
         isNative = true;
@@ -2792,11 +2801,21 @@ public:
         return rowManager->getMemoryUsage();
     }
 
-    virtual unsigned getAgentsReplyLen()
+    virtual unsigned getAgentsReplyLen() const
     {
         return totAgentsReplyLen;
     }
 
+    virtual unsigned getAgentsDuplicates() const
+    {
+        return totAgentsDuplicates;
+    }
+
+    virtual unsigned getAgentsResends() const
+    {
+        return totAgentsResends;
+    }
+
     virtual void process()
     {
         MTIME_SECTION(myTimer, "Process");

+ 4 - 2
roxie/ccd/ccdcontext.hpp

@@ -52,7 +52,7 @@ interface IRoxieAgentContext : extends IRoxieContextLogger
     virtual void noteChildGraph(unsigned id, IActivityGraph *childGraph) = 0;
     virtual roxiemem::IRowManager &queryRowManager() = 0;
     virtual const QueryOptions &queryOptions() const = 0;
-    virtual void addAgentsReplyLen(unsigned len) = 0;
+    virtual void addAgentsReplyLen(unsigned len, unsigned duplicates, unsigned resends) = 0;
     virtual const char *queryAuthToken() = 0;
     virtual const IResolvedFile *resolveLFN(const char *filename, bool isOpt, bool isPrivilegedUser) = 0;
     virtual IRoxieWriteHandler *createLFN(const char *filename, bool overwrite, bool extend, const StringArray &clusters, bool isPrivilegedUser) = 0;
@@ -84,7 +84,9 @@ interface IRoxieServerContext : extends IInterface
     virtual void done(bool failed) = 0;
     virtual void finalize(unsigned seqNo) = 0;
     virtual memsize_t getMemoryUsage() = 0;
-    virtual unsigned getAgentsReplyLen() = 0;
+    virtual unsigned getAgentsReplyLen() const = 0;
+    virtual unsigned getAgentsDuplicates() const = 0;
+    virtual unsigned getAgentsResends() const = 0;
 
     virtual unsigned getXmlFlags() const = 0;
     virtual IConstWorkUnit *queryWorkUnit() const = 0;

+ 1 - 1
roxie/ccd/ccdfile.cpp

@@ -837,7 +837,7 @@ class CRoxieFileCache : implements IRoxieFileCache, implements ICopyFileProgress
                 cidtSleep.wait(cacheReportPeriodSeconds * 1000);
                 if (closing)
                     break;
-                if (traceLevel)
+                if (traceLevel>8)
                     DBGLOG("Cache info dump");
                 // Note - cache info is stored in the DLLSERVER persistent area - which we should perhaps consider renaming
                 const char* dllserver_root = getenv("HPCC_DLLSERVER_PATH");

+ 21 - 8
roxie/ccd/ccdlistener.cpp

@@ -965,11 +965,11 @@ public:
         worker->threadmain();
     }
 
-    virtual void noteQuery(IHpccProtocolMsgContext *msgctx, const char *peer, bool failed, unsigned bytesOut, unsigned elapsed, unsigned memused, unsigned agentsReplyLen, bool continuationNeeded)
+    virtual void noteQuery(IHpccProtocolMsgContext *msgctx, const char *peer, bool failed, unsigned bytesOut, unsigned elapsed, unsigned memused, unsigned agentsReplyLen, unsigned agentsDuplicates, unsigned agentsResends, bool continuationNeeded)
     {
     }
 
-    virtual void onQueryMsg(IHpccProtocolMsgContext *msgctx, IPropertyTree *msg, IHpccProtocolResponse *protocol, unsigned flags, PTreeReaderOptions readFlags, const char *target, unsigned idx, unsigned &memused, unsigned &agentReplyLen)
+    virtual void onQueryMsg(IHpccProtocolMsgContext *msgctx, IPropertyTree *msg, IHpccProtocolResponse *protocol, unsigned flags, PTreeReaderOptions readFlags, const char *target, unsigned idx, unsigned &memused, unsigned &agentReplyLen, unsigned &agentsDuplicates, unsigned &agentsResends)
     {
         UNIMPLEMENTED;
     }
@@ -1228,6 +1228,8 @@ public:
         bool failed = true; // many paths to failure, only one to success...
         unsigned memused = 0;
         unsigned agentsReplyLen = 0;
+        unsigned agentsDuplicates = 0;
+        unsigned agentsResends = 0;
         unsigned priority = (unsigned) -2;
         try
         {
@@ -1264,6 +1266,8 @@ public:
                 ctx->process();
                 memused = (unsigned)(ctx->getMemoryUsage() / 0x100000);
                 agentsReplyLen = ctx->getAgentsReplyLen();
+                agentsDuplicates = ctx->getAgentsDuplicates();
+                agentsResends = ctx->getAgentsResends();
                 ctx->done(false);
                 failed = false;
             }
@@ -1271,6 +1275,8 @@ public:
             {
                 memused = (unsigned)(ctx->getMemoryUsage() / 0x100000);
                 agentsReplyLen = ctx->getAgentsReplyLen();
+                agentsDuplicates = ctx->getAgentsDuplicates();
+                agentsResends = ctx->getAgentsResends();
                 ctx->done(true);
                 throw;
             }
@@ -1314,7 +1320,7 @@ public:
                 txidInfo.append(']');
             }
 
-            logctx.CTXLOG("COMPLETE: %s%s complete in %d msecs memory=%d Mb priority=%d agentsreply=%d%s", wuid.get(), txidInfo.str(), elapsed, memused, priority, agentsReplyLen, s.str());
+            logctx.CTXLOG("COMPLETE: %s%s complete in %u msecs memory=%u Mb priority=%d agentsreply=%u duplicatePackets=%u resentPackets=%u%s", wuid.get(), txidInfo.str(), elapsed, memused, priority, agentsReplyLen, agentsDuplicates, agentsResends, s.str());
         }
     }
 
@@ -1574,7 +1580,7 @@ public:
         }
         combinedQueryStats.noteQuery(failed, elapsedTime);
     }
-    void noteQuery(const char *peer, bool failed, unsigned elapsed, unsigned memused, unsigned agentsReplyLen, unsigned bytesOut, bool continuationNeeded)
+    void noteQuery(const char *peer, bool failed, unsigned elapsed, unsigned memused, unsigned agentsReplyLen, unsigned agentsDuplicates, unsigned agentsResends, unsigned bytesOut, bool continuationNeeded)
     {
         noteQueryStats(failed, elapsed);
         if (queryFactory)
@@ -1601,7 +1607,7 @@ public:
                 }
                 if (txIds.length())
                     txIds.insert(0, '[').append(']');
-                logctx->CTXLOG("COMPLETE: %s %s%s from %s complete in %d msecs memory=%d Mb priority=%d agentsreply=%d resultsize=%d continue=%d%s", queryName.get(), uid.get(), txIds.str(), peer, elapsed, memused, getQueryPriority(), agentsReplyLen, bytesOut, continuationNeeded, s.str());
+                logctx->CTXLOG("COMPLETE: %s %s%s from %s complete in %u msecs memory=%u Mb priority=%d agentsreply=%u duplicatePackets=%u resentPackets=%u resultsize=%u continue=%d%s", queryName.get(), uid.get(), txIds.str(), peer, elapsed, memused, getQueryPriority(), agentsReplyLen, agentsDuplicates, agentsResends, bytesOut, continuationNeeded, s.str());
             }
         }
     }
@@ -1736,7 +1742,8 @@ public:
         return checkGetRoxieMsgContext(msgctx);
     }
 
-    virtual void onQueryMsg(IHpccProtocolMsgContext *msgctx, IPropertyTree *msg, IHpccProtocolResponse *protocol, unsigned flags, PTreeReaderOptions xmlReadFlags, const char *target, unsigned idx, unsigned &memused, unsigned &agentsReplyLen)
+    virtual void onQueryMsg(IHpccProtocolMsgContext *msgctx, IPropertyTree *msg, IHpccProtocolResponse *protocol, unsigned flags, PTreeReaderOptions xmlReadFlags,
+                            const char *target, unsigned idx, unsigned &memused, unsigned &agentsReplyLen, unsigned &agentsDuplicates, unsigned &agentsResends)
     {
         RoxieProtocolMsgContext *roxieMsgCtx = checkGetRoxieMsgContext(msgctx, msg);
         IQueryFactory *f = roxieMsgCtx->queryQueryFactory();
@@ -1753,6 +1760,8 @@ public:
             protocol->finalize(idx);
             memused += (unsigned)(ctx->getMemoryUsage() / 0x100000);
             agentsReplyLen += ctx->getAgentsReplyLen();
+            agentsDuplicates += ctx->getAgentsDuplicates();
+            agentsResends += ctx->getAgentsResends();
         }
         else
         {
@@ -1761,12 +1770,16 @@ public:
                 ctx->process();
                 memused = (unsigned)(ctx->getMemoryUsage() / 0x100000);
                 agentsReplyLen = ctx->getAgentsReplyLen();
+                agentsDuplicates = ctx->getAgentsDuplicates();
+                agentsResends = ctx->getAgentsResends();
                 ctx->done(false);
             }
             catch(...)
             {
                 memused = (unsigned)(ctx->getMemoryUsage() / 0x100000);
                 agentsReplyLen = ctx->getAgentsReplyLen();
+                agentsDuplicates = ctx->getAgentsDuplicates();
+                agentsResends = ctx->getAgentsResends();
                 ctx->done(true);
                 throw;
             }
@@ -1865,10 +1878,10 @@ public:
         roxieMsgCtx->ensureDebugCommandHandler().doDebugCommand(msg, &roxieMsgCtx->ensureDebuggerContext(uid), out);
     }
 
-    virtual void noteQuery(IHpccProtocolMsgContext *msgctx, const char *peer, bool failed, unsigned bytesOut, unsigned elapsed, unsigned memused, unsigned agentsReplyLen, bool continuationNeeded)
+    virtual void noteQuery(IHpccProtocolMsgContext *msgctx, const char *peer, bool failed, unsigned bytesOut, unsigned elapsed, unsigned memused, unsigned agentsReplyLen, unsigned agentsDuplicates, unsigned agentsResends, bool continuationNeeded)
     {
         RoxieProtocolMsgContext *roxieMsgCtx = checkGetRoxieMsgContext(msgctx);
-        roxieMsgCtx->noteQuery(peer, failed, elapsed, memused, agentsReplyLen, bytesOut, continuationNeeded);
+        roxieMsgCtx->noteQuery(peer, failed, elapsed, memused, agentsReplyLen, agentsDuplicates, agentsResends, bytesOut, continuationNeeded);
     }
 
 };

+ 9 - 0
roxie/ccd/ccdmain.cpp

@@ -769,6 +769,9 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
             traceLevel = MAXTRACELEVEL;
         if (traceLevel && topology->hasProp("logging/@disabled"))
             topology->setPropBool("logging/@disabled", false);
+        udpStatsReportInterval = topology->getPropInt("@udpStatsReportInterval", traceLevel ? 60000 : 0);
+        udpTraceFlow = topology->getPropBool("@udpTraceFlow", false);
+        udpTraceTimeouts = topology->getPropBool("@udpTraceTimeouts", false);
         udpTraceLevel = topology->getPropInt("@udpTraceLevel", runOnce ? 0 : 1);
         roxiemem::setMemTraceLevel(topology->getPropInt("@memTraceLevel", runOnce ? 0 : 1));
         soapTraceLevel = topology->getPropInt("@soapTraceLevel", runOnce ? 0 : 1);
@@ -971,6 +974,12 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
             udpSnifferEnabled = false;
         }
 #endif
+
+        udpResendEnabled = topology->getPropBool("@udpResendEnabled", true);
+        udpResendTimeout = topology->getPropInt("@udpResendTimeout", 10);  // milliseconds
+        udpAssumeSequential = topology->getPropBool("@udpAssumeSequential", false);
+        udpMaxPendingPermits = topology->getPropInt("@udpMaxPendingPermits", 1);
+
         int ttlTmp = topology->getPropInt("@multicastTTL", 1);
         if (ttlTmp < 0)
         {

+ 12 - 6
roxie/ccd/ccdprotocol.cpp

@@ -1368,14 +1368,17 @@ private:
     PTreeReaderOptions xmlReadFlags;
     unsigned &memused;
     unsigned &agentReplyLen;
+    unsigned &agentDuplicates;
+    unsigned &agentResends;
     CriticalSection crit;
     unsigned flags;
 
 public:
     CHttpRequestAsyncFor(const char *_queryName, IHpccProtocolMsgSink *_sink, IHpccProtocolMsgContext *_msgctx, IArrayOf<IPropertyTree> &_requestArray,
-            SafeSocket &_client, HttpHelper &_httpHelper, unsigned _flags, unsigned &_memused, unsigned &_agentReplyLen, const char *_queryText, const IContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_querySetName)
+            SafeSocket &_client, HttpHelper &_httpHelper, unsigned _flags, unsigned &_memused, unsigned &_agentReplyLen, unsigned &_agentDuplicates, unsigned &_agentResends,
+            const char *_queryText, const IContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_querySetName)
     : querySetName(_querySetName), logctx(_logctx), requestArray(_requestArray), sink(_sink), msgctx(_msgctx), client(_client), httpHelper(_httpHelper), xmlReadFlags(_xmlReadFlags)
-      , memused(_memused), agentReplyLen(_agentReplyLen), flags(_flags)
+      , memused(_memused), agentReplyLen(_agentReplyLen), agentDuplicates(_agentDuplicates), agentResends(_agentResends), flags(_flags)
     {
         queryName = _queryName;
         queryText = _queryText;
@@ -1398,7 +1401,8 @@ public:
         {
             IPropertyTree &request = requestArray.item(idx);
             Owned<IHpccProtocolResponse> protocol = createProtocolResponse(request.queryName(), &client, httpHelper, logctx, flags, xmlReadFlags);
-            sink->onQueryMsg(msgctx, &request, protocol, flags, xmlReadFlags, querySetName, idx, memused, agentReplyLen);
+            // MORE - agentReply etc should really be atomic
+            sink->onQueryMsg(msgctx, &request, protocol, flags, xmlReadFlags, querySetName, idx, memused, agentReplyLen, agentDuplicates, agentResends);
         }
         catch (IException * E)
         {
@@ -1737,6 +1741,8 @@ private:
 
 readAnother:
         unsigned agentsReplyLen = 0;
+        unsigned agentsDuplicates = 0;
+        unsigned agentsResends = 0;
         StringArray allTargets;
         sink->getTargetNames(allTargets);
         HttpHelper httpHelper(&allTargets);
@@ -2027,13 +2033,13 @@ readAnother:
 
                         if (isHTTP)
                         {
-                            CHttpRequestAsyncFor af(queryName, sink, msgctx, requestArray, *client, httpHelper, protocolFlags, memused, agentsReplyLen, sanitizedText, logctx, (PTreeReaderOptions)readFlags, querySetName);
+                            CHttpRequestAsyncFor af(queryName, sink, msgctx, requestArray, *client, httpHelper, protocolFlags, memused, agentsReplyLen, agentsDuplicates, agentsResends, sanitizedText, logctx, (PTreeReaderOptions)readFlags, querySetName);
                             af.For(requestArray.length(), global->numRequestArrayThreads);
                         }
                         else
                         {
                             Owned<IHpccProtocolResponse> protocol = createProtocolResponse(queryPT->queryName(), client, httpHelper, logctx, protocolFlags, (PTreeReaderOptions)readFlags);
-                            sink->onQueryMsg(msgctx, queryPT, protocol, protocolFlags, (PTreeReaderOptions)readFlags, querySetName, 0, memused, agentsReplyLen);
+                            sink->onQueryMsg(msgctx, queryPT, protocol, protocolFlags, (PTreeReaderOptions)readFlags, querySetName, 0, memused, agentsReplyLen, agentsDuplicates, agentsResends);
                         }
                     }
                 }
@@ -2102,7 +2108,7 @@ readAnother:
         }
         unsigned bytesOut = client? client->bytesOut() : 0;
         unsigned elapsed = msTick() - qstart;
-        sink->noteQuery(msgctx.get(), peerStr, failed, bytesOut, elapsed,  memused, agentsReplyLen, continuationNeeded);
+        sink->noteQuery(msgctx.get(), peerStr, failed, bytesOut, elapsed,  memused, agentsReplyLen, agentsDuplicates, agentsResends, continuationNeeded);
         if (continuationNeeded)
         {
             rawText.clear();

+ 14 - 3
roxie/ccd/ccdqueue.cpp

@@ -26,6 +26,7 @@
 
 #include "udplib.hpp"
 #include "udptopo.hpp"
+#include "udpsha.hpp"
 #include "ccd.hpp"
 #include "ccddebug.hpp"
 #include "ccdquery.hpp"
@@ -2832,9 +2833,9 @@ class RoxieUdpSocketQueueManager : public RoxieSocketQueueManager
 public:
     RoxieUdpSocketQueueManager(unsigned snifferChannel, unsigned _numWorkers, bool encryptionInTransit) : RoxieSocketQueueManager(_numWorkers)
     {
-        int udpQueueSize = topology->getPropInt("@udpQueueSize", UDP_QUEUE_SIZE);
-        int udpSendQueueSize = topology->getPropInt("@udpSendQueueSize", UDP_SEND_QUEUE_SIZE);
-        int udpMaxSlotsPerClient = topology->getPropInt("@udpMaxSlotsPerClient", 0x7fffffff);
+        unsigned udpQueueSize = topology->getPropInt("@udpQueueSize", UDP_QUEUE_SIZE);
+        unsigned udpSendQueueSize = topology->getPropInt("@udpSendQueueSize", UDP_SEND_QUEUE_SIZE);
+        unsigned udpMaxSlotsPerClient = topology->getPropInt("@udpMaxSlotsPerClient", 0x7fffffff);
         if (topology->getPropInt("@sendMaxRate", 0))
         {
             unsigned sendMaxRate = topology->getPropInt("@sendMaxRate");
@@ -2847,6 +2848,8 @@ public:
         getChannelIp(snifferIp, snifferChannel);
         if (udpMaxSlotsPerClient > udpQueueSize)
             udpMaxSlotsPerClient = udpQueueSize;
+        if (udpResendEnabled && udpMaxSlotsPerClient > TRACKER_BITS)
+            udpMaxSlotsPerClient = TRACKER_BITS;
         unsigned serverFlowPort = topology->getPropInt("@serverFlowPort", CCD_SERVER_FLOW_PORT);
         unsigned dataPort = topology->getPropInt("@dataPort", CCD_DATA_PORT);
         unsigned clientFlowPort = topology->getPropInt("@clientFlowPort", CCD_CLIENT_FLOW_PORT);
@@ -3061,6 +3064,14 @@ public:
     {
         return totalBytesReceived;
     }
+    virtual unsigned queryDuplicates() const
+    {
+        return 0;
+    }
+    virtual unsigned queryResends() const
+    {
+        return 0;
+    }
 };
 
 class RoxieLocalReceiveManager : implements ILocalReceiveManager, public CInterface

+ 3 - 3
roxie/ccd/ccdserver.cpp

@@ -295,9 +295,9 @@ public:
     {
         return ctx->queryOptions();
     }
-    virtual void addAgentsReplyLen(unsigned len) 
+    virtual void addAgentsReplyLen(unsigned len, unsigned duplicates, unsigned resends)
     {
-        ctx->addAgentsReplyLen(len);
+        ctx->addAgentsReplyLen(len, duplicates, resends);
     }
     virtual const char *queryAuthToken() 
     {
@@ -4376,7 +4376,7 @@ public:
         merger.reset();
         pending.kill();
         if (mc && ctx)
-            ctx->addAgentsReplyLen(mc->queryBytesReceived());
+            ctx->addAgentsReplyLen(mc->queryBytesReceived(), mc->queryDuplicates(), mc->queryResends());
         mc.clear(); // Or we won't free memory for graphs that get recreated
         mu.clear(); //ditto
         deferredStart = false;

+ 2 - 0
roxie/ccd/ccdsnmp.cpp

@@ -403,6 +403,8 @@ CRoxieMetricsManager::CRoxieMetricsManager()
     addMetric(lastQueryDate, 0);
     addMetric(lastQueryTime, 0);
 
+    addMetric(packetsResent, 0);
+
 #ifdef TIME_PACKETS
     addMetric(packetWaitMax, 0);
     addMetric(packetRunMax, 0);

+ 2 - 2
roxie/ccd/hpccprotocol.hpp

@@ -98,8 +98,8 @@ interface IHpccProtocolMsgSink : extends IInterface
     virtual IHpccProtocolMsgContext *createMsgContext(time_t startTime) = 0;
     virtual StringArray &getTargetNames(StringArray &targets) = 0;
 
-    virtual void noteQuery(IHpccProtocolMsgContext *msgctx, const char *peer, bool failed, unsigned bytesOut, unsigned elapsed, unsigned memused, unsigned agentsReplyLen, bool continuationNeeded) = 0;
-    virtual void onQueryMsg(IHpccProtocolMsgContext *msgctx, IPropertyTree *msg, IHpccProtocolResponse *protocol, unsigned flags, PTreeReaderOptions readFlags, const char *target, unsigned idx, unsigned &memused, unsigned &agentReplyLen) = 0;
+    virtual void noteQuery(IHpccProtocolMsgContext *msgctx, const char *peer, bool failed, unsigned bytesOut, unsigned elapsed, unsigned memused, unsigned agentsReplyLen, unsigned agentDuplicates, unsigned agentResends, bool continuationNeeded) = 0;
+    virtual void onQueryMsg(IHpccProtocolMsgContext *msgctx, IPropertyTree *msg, IHpccProtocolResponse *protocol, unsigned flags, PTreeReaderOptions readFlags, const char *target, unsigned idx, unsigned &memused, unsigned &agentReplyLen, unsigned &agentDuplicates, unsigned &agentResends) = 0;
 };
 
 interface IHpccProtocolListener : extends IInterface

+ 1 - 1
roxie/udplib/udpaeron.cpp

@@ -393,7 +393,7 @@ public:
     CRoxieAeronSendManager(unsigned _dataPort, unsigned _numQueues, const IpAddress &_myIP)
     : dataPort(_dataPort),
       numQueues(_numQueues),
-      receiversTable([this](const ServerIdentifier &ip) { return new UdpAeronReceiverEntry(ip.getIpAddress(), dataPort, aeron, numQueues);}),
+      receiversTable([this](const ServerIdentifier ip) { return new UdpAeronReceiverEntry(ip.getIpAddress(), dataPort, aeron, numQueues);}),
       myIP(_myIP)
     {
         if (useEmbeddedAeronDriver && !is_running())

+ 20 - 3
roxie/udplib/udpipmap.cpp

@@ -32,6 +32,7 @@ class IpMapTest : public CppUnit::TestFixture
         CPPUNIT_TEST(testIpMap);
         // CPPUNIT_TEST(testIpV6);
         CPPUNIT_TEST(testThread);
+        CPPUNIT_TEST(testMulti);
     CPPUNIT_TEST_SUITE_END();
 
     static unsigned *createMapEntry(const ServerIdentifier &)
@@ -42,7 +43,7 @@ class IpMapTest : public CppUnit::TestFixture
     void testIpMap()
     {
         unsigned five = 5;
-        auto createMapEntry = [five](const ServerIdentifier &ip)
+        auto createMapEntry = [five](const ServerIdentifier ip)
         {
             StringBuffer s;
             printf("adding ip %s\n", ip.getTraceText(s).str());
@@ -81,16 +82,18 @@ class IpMapTest : public CppUnit::TestFixture
     class IpEntry
     {
     public:
-        IpEntry()
+        IpEntry(const ServerIdentifier &_s) : s(_s)
         {
             numCreated++;
         }
+        ServerIdentifier s;
         static RelaxedAtomic<unsigned> numCreated;
     };
 
     void testThread()
     {
-        IpMapOf<IpEntry> map([](const ServerIdentifier &){return new IpEntry; });
+        IpEntry::numCreated = 0;
+        IpMapOf<IpEntry> map([](const ServerIdentifier s){return new IpEntry(s); });
         std::thread threads[100];
         Semaphore ready;
         for (int i = 0; i < 100; i++)
@@ -112,6 +115,20 @@ class IpMapTest : public CppUnit::TestFixture
             threads[i].join();
         }
         ASSERT(IpEntry::numCreated == 1000)
+        unsigned numInTable = 0;
+        for (auto &&i: map)
+        {
+            auto &entry = map[i.s];
+            ASSERT(&entry==&i);
+            numInTable++;
+        }
+        ASSERT(numInTable == 1000);
+    }
+
+    void testMulti()
+    {
+        for (unsigned i = 0; i < 1000; i++)
+            testThread();
     }
 };
 

+ 3 - 3
roxie/udplib/udpipmap.hpp

@@ -31,7 +31,7 @@ private:
     class list
     {
     public:
-        list(const ServerIdentifier &_ip, const list *_next, std::function<T *(const ServerIdentifier &)> tfunc) : ip(_ip), next(_next)
+        list(const ServerIdentifier &_ip, const list *_next, std::function<T *(const ServerIdentifier)> tfunc) : ip(_ip), next(_next)
         {
             entry = tfunc(ip);
         }
@@ -86,7 +86,7 @@ private:
     };
 
 public:
-    IpMapOf<T>(std::function<T *(const ServerIdentifier &)> _tfunc) : tfunc(_tfunc)
+    IpMapOf<T>(std::function<T *(const ServerIdentifier)> _tfunc) : tfunc(_tfunc)
     {
     }
     T &lookup(const ServerIdentifier &) const;
@@ -103,7 +103,7 @@ public:
     myIterator end()   { return myIterator(nullptr, 256, nullptr); }
 
 private:
-    const std::function<T *(const ServerIdentifier &)> tfunc;
+    const std::function<T *(const ServerIdentifier)> tfunc;
     mutable std::atomic<const list *> table[256] = {};
     mutable CriticalSection lock;
     mutable std::atomic<unsigned> firstHash = { 256 };

+ 12 - 2
roxie/udplib/udplib.hpp

@@ -125,6 +125,8 @@ interface IMessageCollator : extends IInterface
     virtual void interrupt(IException *E = NULL) = 0;
     virtual ruid_t queryRUID() const = 0;
     virtual unsigned queryBytesReceived() const = 0;
+    virtual unsigned queryDuplicates() const = 0;
+    virtual unsigned queryResends() const = 0;
 };
 
 interface IReceiveManager : extends IInterface 
@@ -147,7 +149,7 @@ interface ISendManager : extends IInterface
     virtual bool allDone() = 0;
 };
 
-extern UDPLIB_API IReceiveManager *createReceiveManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size, unsigned maxSlotsPerSender, bool encryptionInTransit);
+extern UDPLIB_API IReceiveManager *createReceiveManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size, unsigned maxSlotsPerSender, bool encrypted);
 extern UDPLIB_API ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter, bool encryptionInTransit);
 
 extern UDPLIB_API void setAeronProperties(const IPropertyTree *config);
@@ -156,8 +158,9 @@ extern UDPLIB_API ISendManager *createAeronSendManager(unsigned dataPort, unsign
 
 extern UDPLIB_API RelaxedAtomic<unsigned> unwantedDiscarded;
 
+extern UDPLIB_API bool udpTraceFlow;
+extern UDPLIB_API bool udpTraceTimeouts;
 extern UDPLIB_API unsigned udpTraceLevel;
-extern UDPLIB_API unsigned udpTraceCategories;
 extern UDPLIB_API unsigned udpOutQsPriority;
 extern UDPLIB_API void queryMemoryPoolStats(StringBuffer &memStats);
 
@@ -180,4 +183,11 @@ extern UDPLIB_API unsigned udpSnifferSendThreadPriority;
 
 extern UDPLIB_API void stopAeronDriver();
 
+extern UDPLIB_API bool udpResendEnabled;
+extern UDPLIB_API unsigned udpResendTimeout;  // in millseconds
+extern UDPLIB_API unsigned udpMaxPendingPermits;
+extern UDPLIB_API bool udpAssumeSequential;
+extern UDPLIB_API unsigned udpStatsReportInterval;
+extern UDPLIB_API RelaxedAtomic<unsigned> packetsResent;
+extern UDPLIB_API RelaxedAtomic<unsigned> packetsOOO;
 #endif

+ 44 - 16
roxie/udplib/udpmsgpk.cpp

@@ -64,9 +64,9 @@ class PackageSequencer : public CInterface, implements IInterface
     unsigned metaSize;
     unsigned headerSize;
     const void *header;
-#ifdef _DEBUG
-    unsigned numPackets = 0;
     unsigned maxSeqSeen = 0;
+    unsigned numPackets = 0;
+#ifdef _DEBUG
     unsigned scans = 0;
     unsigned overscans = 0;
 #endif
@@ -125,21 +125,26 @@ public:
         return ret;
     }
 
-    bool insert(DataBuffer *dataBuff)  // returns true if message is complete.
+    bool insert(DataBuffer *dataBuff, std::atomic<unsigned> &duplicates, std::atomic<unsigned> &resends)  // returns true if message is complete.
     {
         bool res = false;
         assert(dataBuff->msgNext == NULL);
         UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
         unsigned pktseq = pktHdr->pktSeq;
-#ifdef _DEBUG
         if ((pktseq & UDP_PACKET_SEQUENCE_MASK) > maxSeqSeen)
             maxSeqSeen = pktseq & UDP_PACKET_SEQUENCE_MASK;
-#endif
+        if (pktseq & UDP_PACKET_RESENT)
+        {
+            pktseq &= ~UDP_PACKET_RESENT;
+            pktHdr->pktSeq = pktseq;
+            resends++;
+        }
+
         if (checkTraceLevel(TRACE_MSGPACK, 5))
         {
             StringBuffer s;
-            DBGLOG("UdpCollator: PackageSequencer::insert ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%s dataBuffer=%p this=%p",
-                    pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->node.getTraceText(s).str(), dataBuff, this);
+            DBGLOG("UdpCollator: PackageSequencer::insert ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X sendSeq=%" SEQF "u node=%s dataBuffer=%p this=%p",
+                    pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->sendSeq, pktHdr->node.getTraceText(s).str(), dataBuff, this);
         }
 
         // Optimize the (very) common case where I need to add to the end
@@ -162,8 +167,9 @@ public:
                 {
                     // discard duplicated incoming packet - should be uncommon unless we requested a resend for a packet already in flight
                     if (checkTraceLevel(TRACE_MSGPACK, 5))
-                        DBGLOG("UdpCollator: Discarding duplicate incoming packet");
+                        DBGLOG("UdpCollator: Discarding duplicate incoming packet %u (we have all up to %u)", pktHdr->pktSeq, oldHdr->pktSeq);
                     dataBuff->Release();
+                    duplicates++;
                     return false;
                 }
                 finger = lastContiguousPacket->msgNext;
@@ -176,7 +182,7 @@ public:
             }
             while (finger)
             {
-    #ifdef _DEBUG
+#ifdef _DEBUG
                 scans++;
                 if (scans==1000000)
                 {
@@ -191,13 +197,13 @@ public:
                         DBGLOG("lastContiguousPacket is NULL , last packet seen is %u", pktHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK);
                     scans = 0;
                 }
-    #endif
+#endif
                 UdpPacketHeader *oldHdr = (UdpPacketHeader*) finger->data;
                 if (pktHdr->pktSeq == oldHdr->pktSeq)
                 {
                     // discard duplicated incoming packet - should be uncommon unless we requested a resend for a packet already in flight
                     if (checkTraceLevel(TRACE_MSGPACK, 5))
-                        DBGLOG("UdpCollator: Discarding duplicate incoming packet");
+                        DBGLOG("UdpCollator: Discarding duplicate incoming packet %u", pktHdr->pktSeq);
                     dataBuff->Release();
                     return false;
                 }
@@ -297,12 +303,16 @@ public:
         return headerSize;
     }
 
-#ifdef _DEBUG
     void dump()
     {
         DBGLOG("Contains %u packets, lastSeq = %u", numPackets, maxSeqSeen);
+        if (lastContiguousPacket)
+        {
+            UdpPacketHeader *hdr = (UdpPacketHeader*) lastContiguousPacket->data;
+            DBGLOG("lastContiguousPacket is %u %" SEQF "u", hdr->pktSeq, hdr->sendSeq);
+        }
     }
-#endif
+
 };
 
 // MessageResult ====================================================================================
@@ -513,9 +523,27 @@ CMessageCollator::~CMessageCollator()
     }
 }
 
+void CMessageCollator::noteDuplicate(bool isResend)
+{
+    totalDuplicates++;
+    if (isResend)
+        totalResends++;
+}
+
+
 unsigned CMessageCollator::queryBytesReceived() const
 {
-    return totalBytesReceived; // Arguably should lock, but can't be bothered. Never going to cause an issue in practice.
+    return totalBytesReceived;
+}
+
+unsigned CMessageCollator::queryDuplicates() const
+{
+    return totalDuplicates;
+}
+
+unsigned CMessageCollator::queryResends() const
+{
+    return totalResends;
 }
 
 bool CMessageCollator::attach_databuffer(DataBuffer *dataBuff)
@@ -566,7 +594,7 @@ bool CMessageCollator::attach_data(const void *data, unsigned len)
 void CMessageCollator::collate(DataBuffer *dataBuff)
 {
     PUID puid = GETPUID(dataBuff);
-    // MORE - I think we leak a PackageSequencer for messages that we only receive parts of - maybe only an issue for "catchall" case
+    // MORE - we leak (at least until query terminates) a PackageSequencer for messages that we only receive parts of - maybe only an issue for "catchall" case
     PackageSequencer *pkSqncr = mapping.getValue(puid);
     if (!pkSqncr)
     {
@@ -574,7 +602,7 @@ void CMessageCollator::collate(DataBuffer *dataBuff)
         mapping.setValue(puid, pkSqncr);
         pkSqncr->Release();
     }
-    bool isComplete = pkSqncr->insert(dataBuff);
+    bool isComplete = pkSqncr->insert(dataBuff, totalDuplicates, totalResends);
     if (isComplete)
     {
         pkSqncr->Link();

+ 6 - 1
roxie/udplib/udpmsgpk.hpp

@@ -34,7 +34,9 @@ private:
     InterruptableSemaphore sem;
     Linked<roxiemem::IRowManager> rowMgr;
     ruid_t ruid;
-    unsigned totalBytesReceived; // technically should be atomic
+    std::atomic<unsigned> totalBytesReceived = {0};
+    std::atomic<unsigned> totalDuplicates = {0};
+    std::atomic<unsigned> totalResends = {0};
 
     void collate(roxiemem::DataBuffer *dataBuff);
 public:
@@ -47,9 +49,12 @@ public:
     }
 
     virtual unsigned queryBytesReceived() const override;
+    virtual unsigned queryDuplicates() const override;
+    virtual unsigned queryResends() const override;
     virtual IMessageResult *getNextResult(unsigned time_out, bool &anyActivity) override;
     virtual void interrupt(IException *E) override;
 
     bool attach_databuffer(roxiemem::DataBuffer *dataBuff);
     bool attach_data(const void *data, unsigned len);
+    void noteDuplicate(bool isResend);
 };

+ 404 - 89
roxie/udplib/udpsha.cpp

@@ -33,12 +33,14 @@ using roxiemem::IDataBufferManager;
 
 IDataBufferManager *bufferManager;
 
+bool udpTraceFlow = false;
+bool udpTraceTimeouts = false;
 unsigned udpTraceLevel = 0;
-unsigned udpTraceCategories = (unsigned) -1;
 unsigned udpFlowSocketsSize = 131072;
 unsigned udpLocalWriteSocketSize = 1024000;
 unsigned udpSnifferReadThreadPriority = 3;
 unsigned udpSnifferSendThreadPriority = 3;
+unsigned udpStatsReportInterval = 60000;
 
 unsigned multicastTTL = 1;
 
@@ -70,50 +72,46 @@ ServerIdentifier myNode;
 
 //---------------------------------------------------------------------------------------------
 
-void queue_t::set_queue_size(unsigned int queue_s) 
+void queue_t::set_queue_size(unsigned _limit)
 {
-    queue_size = queue_s;
-    element_count = queue_size;
-    elements = new queue_element[queue_size];
-    free_space.signal(queue_size);
-    active_buffers = 0;
-    first = 0;
-    last = 0;
+    limit = _limit;
 }
 
-queue_t::queue_t(unsigned int queue_s) 
+queue_t::queue_t(unsigned _limit)
 {
-    set_queue_size(queue_s);
-    signal_free_sl = 0;
+    set_queue_size(_limit);
 }
 
-queue_t::queue_t() 
+
+queue_t::~queue_t() 
 {
-    signal_free_sl = 0;
-    queue_size = 0;
-    element_count = 0;
-    elements = nullptr;
-    active_buffers = 0;
-    first = 0;
-    last = 0;
+    while (head)
+    {
+        auto p = head;
+        head = head->msgNext;
+        ::Release(p);
+    }
 }
 
-queue_t::~queue_t() 
+unsigned queue_t::available()
 {
-    delete [] elements; 
+    CriticalBlock b(c_region);
+    if (count < limit)
+        return limit - count;
+    return 0;
 }
 
 int queue_t::free_slots() 
 {
     int res=0;
-    while (!res) 
+    while (res <= 0)
     {
         c_region.enter();
-        res = queue_size - active_buffers;
-        if (!res) 
+        res = limit - count;
+        if (res <= 0)
             signal_free_sl++;
         c_region.leave();
-        if (!res) 
+        if (res <= 0)
         {
             while (!free_sl.wait(3000))
             {
@@ -132,17 +130,29 @@ void queue_t::interrupt()
 
 void queue_t::pushOwn(DataBuffer *buf)
 {
-    while (!free_space.wait(3000))
+    // Could probably be done lock-free, which given one thread using this is high priority might avoid some
+    // potential priority-inversion issues. Or we might consider using PI-aware futexes here?
+    assert(!buf->msgNext);
     {
-        if (udpTraceLevel >= 1)
-            DBGLOG("queue_t::pushOwn blocked for 3 seconds waiting for free_space semaphore, activeBuffers == %d", active_buffers);
+        CriticalBlock b(c_region);
+        if (tail)
+        {
+            assert(head);
+            assert(!tail->msgNext);
+            tail->msgNext = buf;
+        }
+        else
+        {
+            assert(!head);
+            head = buf;
+        }
+        tail = buf;
+        count++;
+#ifdef _DEBUG
+        if (count > limit)
+            DBGLOG("queue_t::pushOwn set count to %u", count);
+#endif
     }
-    c_region.enter();
-    int next = (last + 1) % element_count;
-    elements[last].data = buf;
-    last = next;
-    active_buffers++;
-    c_region.leave();
     data_avail.signal();
 }
 
@@ -150,22 +160,29 @@ DataBuffer *queue_t::pop(bool block)
 {
     if (!data_avail.wait(block ? INFINITE : 0))
         return nullptr;
-    DataBuffer *ret = NULL; 
-    bool must_signal;
+    DataBuffer *ret = nullptr;
+    unsigned signalFreeSlots = 0;
     {
         CriticalBlock b(c_region);
-        if (!active_buffers) 
-            return NULL;
-        ret = elements[first].data;
-        first = (first + 1) % element_count;
-        active_buffers--;
-        must_signal = signal_free_sl>0;
-        if (must_signal) 
+        if (!count)
+            return nullptr;
+        count--;
+        ret = head;
+        head = head->msgNext;
+        if (!head)
+        {
+            assert(!count);
+            tail = nullptr;
+        }
+        ret->msgNext = nullptr;
+        if (count < limit && signal_free_sl)
+        {
             signal_free_sl--;
+            signalFreeSlots++;
+        }
     }
-    free_space.signal();
-    if (must_signal) 
-        free_sl.signal();
+    if (signalFreeSlots)
+        free_sl.signal(signalFreeSlots);
     return ret;
 }
 
@@ -173,43 +190,45 @@ DataBuffer *queue_t::pop(bool block)
 unsigned queue_t::removeData(const void *key, PKT_CMP_FUN pkCmpFn)
 {
     unsigned removed = 0;
-    unsigned signalFree = 0;
     unsigned signalFreeSlots = 0;
     {
         CriticalBlock b(c_region);
-        if (active_buffers)
+        if (count)
         {
-            unsigned destix = first;
-            unsigned ix = first;
-            for (;;)
+            DataBuffer *prev = nullptr;
+            DataBuffer *finger = head;
+            while (finger)
             {
-                if (elements[ix].data && (!key || !pkCmpFn || pkCmpFn((const void*) elements[ix].data, key)))
+                if (!key || !pkCmpFn || pkCmpFn((const void*) finger, key))
                 {
-                    ::Release(elements[ix].data);
-                    signalFree++;
-                    active_buffers--;
+                    auto temp = finger;
+                    finger = finger->msgNext;
+                    if (prev==nullptr)
+                    {
+                        assert(head==temp);
+                        head = finger;
+                    }
+                    else
+                        prev->msgNext = finger;
+                    if (temp==tail)
+                        tail = prev;
+                    ::Release(temp);
+                    count--;
+                    if (count < limit && signal_free_sl)
+                    {
+                        signal_free_sl--;
+                        signalFreeSlots++;
+                    }
                     removed++;
                 }
                 else
-                    elements[destix++] = elements[ix];
-                ix++;
-                if (ix==element_count)
-                    ix = 0;
-                if (destix==element_count)
-                    destix = 0;
-                if (ix == last)
-                    break;
-            }
-            if (signalFree && signal_free_sl)
-            {
-                signal_free_sl--;
-                signalFreeSlots++;
+                {
+                    prev = finger;
+                    finger = finger->msgNext;
+                }
             }
-            last = destix;
         }
     }
-    if (signalFree)
-        free_space.signal(signalFree);
     if (signalFreeSlots)
         free_sl.signal(signalFreeSlots);
     return removed;
@@ -218,26 +237,15 @@ unsigned queue_t::removeData(const void *key, PKT_CMP_FUN pkCmpFn)
 
 bool queue_t::dataQueued(const void *key, PKT_CMP_FUN pkCmpFn)
 {
-    bool ret = false;
     CriticalBlock b(c_region);
-    if (active_buffers) 
+    DataBuffer *finger = head;
+    while (finger)
     {
-        unsigned ix = first;
-        for (;;)
-        {
-            if (elements[ix].data && pkCmpFn((const void*) elements[ix].data, key))
-            {
-                ret = true;
-                break;
-            }
-            ix++;
-            if (ix==element_count)
-                ix = 0;
-            if (ix==last)
-                break;
-        }           
+        if (pkCmpFn((const void*) finger, key))
+            return true;
+        finger = finger->msgNext;
     }
-    return ret;
+    return false;
 }
 
 
@@ -312,6 +320,313 @@ extern UDPLIB_API void queryMemoryPoolStats(StringBuffer &memStats)
         bufferManager->poolStats(memStats);
 }
 
+RelaxedAtomic<unsigned> packetsOOO;
+
+bool PacketTracker::noteSeen(UdpPacketHeader &hdr)
+{
+    bool resent = false;
+    sequence_t seq = hdr.sendSeq;
+    if (hdr.pktSeq & UDP_PACKET_RESENT)
+        resent = true;
+    // Four cases: less than lastUnseen, equal to, within TRACKER_BITS of, or higher
+    // Be careful to think about wrapping. Less than and higher can't really be distinguished, but we treat resent differently from original
+    bool duplicate = false;
+    unsigned delta = seq - base;
+    if (udpTraceLevel > 5)
+    {
+        DBGLOG("PacketTracker::noteSeen %" SEQF "u: delta %d", hdr.sendSeq, delta);
+        dump();
+    }
+    if (delta < TRACKER_BITS)
+    {
+        unsigned idx = (seq / 64) % TRACKER_DWORDS;
+        unsigned bit = seq % 64;
+        __uint64 bitm = U64C(1)<<bit;
+        duplicate = (seen[idx] & bitm) != 0;
+        seen[idx] |= bitm;
+        if (seq==base)
+        {
+            while (seen[idx] & bitm)
+            {
+                // Important to update in this order, so that during the window where they are inconsistent we have
+                // false negatives rather than false positives
+                seen[idx] &= ~bitm;
+                base++;
+                idx = (base / 64) % TRACKER_DWORDS;
+                bit = base % 64;
+                bitm = U64C(1)<<bit;
+            }
+        }
+        // calculate new hwm, with some care for wrapping
+        if ((int) (seq - hwm) > 0)
+            hwm = seq;
+        else if (!resent)
+            packetsOOO++;
+    }
+    else if (resent)
+        // Don't treat a resend that goes out of range as indicative of a restart - it probably just means
+        // that the resend was not needed and the original moved things on when it arrived
+        duplicate = true;
+    else
+    {
+        // We've gone forwards too far to track, or backwards because server restarted
+        // We have taken steps to try to avoid the former...
+        // In theory could try to preserve SOME information in the former case, but as it shouldn't happen, can we be bothered?
+#ifdef _DEBUG
+        if (udpResendEnabled)
+        {
+            DBGLOG("Received packet %" SEQF "u will cause loss of information in PacketTracker", seq);
+            dump();
+        }
+        //assert(false);
+#endif
+        memset(seen, 0, sizeof(seen));
+        base = seq+1;
+        hwm = seq;
+    }
+    return duplicate;
+}
+
+const PacketTracker PacketTracker::copy() const
+{
+    // This is called within a critical section. Would be better if we could avoid having to do so,
+    // but we want to be able to read a consistent set of values
+    PacketTracker ret;
+    ret.base = base;
+    ret.hwm = hwm;
+    memcpy(ret.seen, seen, sizeof(seen));
+    return ret;
+}
+
+bool PacketTracker::hasSeen(sequence_t seq) const
+{
+    // Accessed only on sender side where these are not modified, so no need for locking
+    // Careful about wrapping!
+    unsigned delta = seq - base;
+    if (udpTraceLevel > 5)
+    {
+       DBGLOG("PacketTracker::hasSeen - have I seen %" SEQF "u, %d", seq, delta);
+       dump();
+    }
+    if (delta < TRACKER_BITS)
+    {
+        unsigned idx = (seq / 64) % TRACKER_DWORDS;
+        unsigned bit = seq % 64;
+        return (seen[idx] & (U64C(1)<<bit)) != 0;
+    }
+    else if (delta > INT_MAX)  // Or we could just make delta a signed int? But code above would have to check >0
+        return true;
+    else
+        return false;
+}
+
+bool PacketTracker::canRecord(sequence_t seq) const
+{
+    // Careful about wrapping!
+    unsigned delta = seq - base;
+    if (udpTraceLevel > 5)
+    {
+       DBGLOG("PacketTracker::hasSeen - can I record %" SEQF "u, %d", seq, delta);
+       dump();
+    }
+    return (delta < TRACKER_BITS);
+}
+
+bool PacketTracker::hasGaps() const
+{
+    return base!=hwm+1;
+}
+
+void PacketTracker::dump() const
+{
+    DBGLOG("PacketTracker base=%" SEQF "u, hwm=%" SEQF "u, seen[0]=%" I64F "x", base, hwm, seen[0]);
+}
+
+#ifdef _USE_CPPUNIT
+#include "unittests.hpp"
+
+class PacketTrackerTest : public CppUnit::TestFixture
+{
+    CPPUNIT_TEST_SUITE(PacketTrackerTest);
+    CPPUNIT_TEST(testNoteSeen);
+    CPPUNIT_TEST(testReplay);
+    CPPUNIT_TEST_SUITE_END();
+
+    void testNoteSeen()
+    {
+        PacketTracker p;
+        UdpPacketHeader hdr;
+        hdr.pktSeq = 0;
+        // Some simple tests
+        CPPUNIT_ASSERT(!p.hasSeen(0));
+        CPPUNIT_ASSERT(!p.hasSeen(1));
+        hdr.sendSeq = 0;
+        CPPUNIT_ASSERT(!p.noteSeen(hdr));
+        CPPUNIT_ASSERT(p.hasSeen(0));
+        CPPUNIT_ASSERT(!p.hasSeen(1));
+        CPPUNIT_ASSERT(!p.hasSeen(2000));
+        CPPUNIT_ASSERT(!p.hasSeen(2001));
+        hdr.pktSeq = UDP_PACKET_RESENT;
+        CPPUNIT_ASSERT(p.noteSeen(hdr));
+        hdr.pktSeq = 0;
+        hdr.sendSeq = 2000;
+        CPPUNIT_ASSERT(!p.noteSeen(hdr));
+        CPPUNIT_ASSERT(p.hasSeen(0));
+        CPPUNIT_ASSERT(p.hasSeen(1));
+        CPPUNIT_ASSERT(p.hasSeen(2000));
+        CPPUNIT_ASSERT(!p.hasSeen(2001));
+        hdr.sendSeq = 0;
+        CPPUNIT_ASSERT(!p.noteSeen(hdr));
+        CPPUNIT_ASSERT(p.hasSeen(0));
+        CPPUNIT_ASSERT(!p.hasSeen(1));
+        CPPUNIT_ASSERT(!p.hasSeen(2000));
+        CPPUNIT_ASSERT(!p.hasSeen(2001));
+
+        PacketTracker p2;
+        hdr.sendSeq = 1;
+        CPPUNIT_ASSERT(!p2.noteSeen(hdr));
+        CPPUNIT_ASSERT(!p2.hasSeen(0));
+        CPPUNIT_ASSERT(p2.hasSeen(1));
+        hdr.sendSeq = TRACKER_BITS-1;  // This is the highest value we can record without losing information
+        CPPUNIT_ASSERT(!p2.noteSeen(hdr));
+        CPPUNIT_ASSERT(!p2.hasSeen(0));
+        CPPUNIT_ASSERT(p2.hasSeen(1));
+        CPPUNIT_ASSERT(p2.hasSeen(TRACKER_BITS-1));
+        CPPUNIT_ASSERT(!p2.hasSeen(TRACKER_BITS));
+        CPPUNIT_ASSERT(!p2.hasSeen(TRACKER_BITS+1));
+        hdr.sendSeq = TRACKER_BITS;
+        p2.noteSeen(hdr);
+        CPPUNIT_ASSERT(p2.hasSeen(0));
+        CPPUNIT_ASSERT(p2.hasSeen(1));
+        CPPUNIT_ASSERT(p2.hasSeen(TRACKER_BITS-1));
+        CPPUNIT_ASSERT(p2.hasSeen(TRACKER_BITS));
+        CPPUNIT_ASSERT(!p2.hasSeen(TRACKER_BITS+1));
+        CPPUNIT_ASSERT(!p2.hasSeen(TRACKER_BITS+2));
+    }
+
+    void t(PacketTracker &p, sequence_t seq, unsigned pseq)
+    {
+        UdpPacketHeader hdr;
+        hdr.sendSeq = seq;
+        hdr.pktSeq = pseq;
+        if (seq==29)
+            CPPUNIT_ASSERT(p.noteSeen(hdr) == false);
+        else
+            p.noteSeen(hdr);
+    }
+
+    void testReplay()
+    {
+        PacketTracker p;
+        t(p,1,0x1);
+        t(p,2,0x2);
+        t(p,3,0x3);
+        t(p,4,0x4);
+        t(p,5,0x5);
+        t(p,6,0x6);
+        t(p,7,0x7);
+        t(p,8,0x8);
+        t(p,9,0x9);
+        t(p,11,0xb);
+        t(p,12,0xc);
+        t(p,13,0xd);
+        t(p,14,0xe);
+        t(p,15,0xf);
+        t(p,16,0x10);
+        t(p,17,0x11);
+        t(p,18,0x12);
+        t(p,19,0x13);
+        t(p,20,0x14);
+        t(p,21,0x15);
+        t(p,22,0x16);
+        t(p,23,0x17);
+        t(p,24,0x18);
+        t(p,25,0x19);
+        t(p,26,0x1a);
+        t(p,27,0x1b);
+        t(p,28,0x1c);
+        t(p,50,0x40000032);
+        t(p,51,0x40000033);
+        t(p,52,0x40000034);
+        t(p,53,0x40000035);
+        t(p,54,0x40000036);
+        t(p,55,0x40000037);
+        t(p,56,0x40000038);
+        t(p,57,0x40000039);
+        t(p,58,0x4000003a);
+        t(p,59,0x4000003b);
+        t(p,60,0x4000003c);
+        t(p,61,0x4000003d);
+        t(p,62,0xc0000000);
+        t(p,63,0x4000003e);
+        t(p,64,0x4000003f);
+        t(p,65,0x40000040);
+        t(p,66,0x40000041);
+        t(p,67,0x40000042);
+        t(p,68,0x40000043);
+        t(p,69,0x40000044);
+        t(p,70,0x40000045);
+        t(p,71,0x40000046);
+        t(p,72,0x40000047);
+        t(p,73,0x40000048);
+        t(p,74,0x40000049);
+        t(p,75,0x4000004a);
+        t(p,76,0x4000004b);
+        t(p,77,0x4000004c);
+        t(p,78,0x4000004d);
+        t(p,79,0x4000004e);
+        t(p,80,0x4000004f);
+        t(p,81,0x40000050);
+        t(p,82,0x40000051);
+        t(p,83,0x40000052);
+        t(p,84,0x40000053);
+        t(p,85,0x40000054);
+        t(p,86,0x40000055);
+        t(p,87,0x40000056);
+        t(p,88,0x40000057);
+        t(p,89,0x40000058);
+        t(p,90,0x40000059);
+        t(p,91,0x4000005a);
+        t(p,92,0x4000005b);
+        t(p,93,0x4000005c);
+        t(p,0,0x40000000);
+        t(p,1,0x40000001);
+        t(p,2,0x40000002);
+        t(p,3,0x40000003);
+        t(p,4,0x40000004);
+        t(p,5,0x40000005);
+        t(p,6,0x40000006);
+        t(p,7,0x40000007);
+        t(p,8,0x40000008);
+        t(p,9,0x40000009);
+        t(p,10,0x4000000a);
+        t(p,11,0x4000000b);
+        t(p,12,0x4000000c);
+        t(p,13,0x4000000d);
+        t(p,14,0x4000000e);
+        t(p,15,0x4000000f);
+        t(p,16,0x40000010);
+        t(p,17,0x40000011);
+        t(p,18,0x40000012);
+        t(p,19,0x40000013);
+        t(p,20,0x40000014);
+        t(p,21,0x40000015);
+        t(p,22,0x40000016);
+        t(p,23,0x40000017);
+        t(p,24,0x40000018);
+        t(p,25,0x40000019);
+        t(p,26,0x4000001a);
+        t(p,27,0x4000001b);
+        t(p,28,0x4000001c);
+        t(p,29,0x4000001d);
+    }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION( PacketTrackerTest );
+CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( PacketTrackerTest, "PacketTrackerTest" );
+
+#endif
+
 /*
 Crazy thoughts on network-wide flow control
 

+ 60 - 29
roxie/udplib/udpsha.hpp

@@ -21,6 +21,10 @@
 #include "jmutex.hpp"
 #include "roxiemem.hpp"
 #include "jcrc.hpp"
+#include <limits>
+
+typedef unsigned sequence_t;
+#define SEQF
 
 extern roxiemem::IDataBufferManager *bufferManager;
 
@@ -29,61 +33,82 @@ typedef bool (*PKT_CMP_FUN) (const void *pkData, const void *key);
 
 // Flag bits in pktSeq field
 #define UDP_PACKET_COMPLETE           0x80000000  // Packet completes a single agent request
-#define UDP_PACKET_RESERVED           0x40000000  // Not used - could move UDP_SEQUENCE_COMPLETE here?
+#define UDP_PACKET_RESENT             0x40000000  // Packet is a repeat of one that the server may have missed
 #define UDP_PACKET_SEQUENCE_MASK      0x3fffffff
 
-struct UdpPacketHeader 
+struct UdpPacketHeader
 {
     unsigned short length;      // total length of packet including the header, data, and meta
     unsigned short metalength;  // length of metadata (comes after header and data)
     ServerIdentifier  node;        // Node this message came from
     unsigned       msgSeq;      // sequence number of messages ever sent from given node, used with ruid to tell which packets are from same message
     unsigned       pktSeq;      // sequence number of this packet within the message (top bit signifies final packet)
+    sequence_t     sendSeq;     // sequence number of this packet among all those send from this node to this target
     // information below is duplicated in the Roxie packet header - we could remove? However, would make aborts harder, and at least ruid is needed at receive end
     ruid_t         ruid;        // The uid allocated by the server to this agent transaction
     unsigned       msgId;       // sub-id allocated by the server to this request within the transaction
 };
 
-class queue_t 
+constexpr unsigned TRACKER_BITS=1024;      // Power of two recommended
+constexpr unsigned TRACKER_DWORDS=(TRACKER_BITS+63)/64;
+
+// Some more things we can consider:
+// 1. sendSeq gives us some insight into lost packets that might help is get inflight calcuation right (if it is still needed)
+// 2. If we can definitively declare that a packet is lost, we can fail that messageCollator earlier (and thus get the resend going earlier)
+// 3. Worth seeing why resend doesn't use same collator. We could skip sending (though would still need to calculate) the bit we already had...
+
+class PacketTracker
 {
+    // This uses a circular buffer indexed by seq to store information about which packets we have seen
+private:
+    sequence_t base = 0;                           // Sequence number of first packet represented in the array
+    sequence_t hwm = (sequence_t) -1;              // Sequence number of highest sequence number ever seen
+    unsigned __int64 seen[TRACKER_DWORDS] = {0};  // bitmask representing whether we have seen (base+n)
+    void dump() const;
 
-    class queue_element 
-    {
-    public:
-        roxiemem::DataBuffer  *data;
-        queue_element() 
-        {
-            data = NULL;
-        }
-    };
+public:
+    // Note that we have seen this packet, and return indicating whether we had already seen it
+    bool noteSeen(UdpPacketHeader &hdr);
+    const PacketTracker copy() const;
+    inline sequence_t lastSeen() const { return hwm; }
+    bool hasSeen(sequence_t seq) const;
+    bool canRecord(sequence_t seq) const;
+    bool hasGaps() const;
+};
 
-    queue_element   *elements;
-    unsigned int    element_count;
+using roxiemem::DataBuffer;
+
+// queue_t is used to hold a fifo queue of DataBuffer elements to be sent or collated.
+// Originally implemented as a circular buffer, but we don't want to block adding even if full (we do however want to avoid requesting more if full)
+// so now reimplemented as a single-linked list. There is a field in the DataBuffers that can be used for chaining them together that is used
+// in a few other places, e.g. collator
+
+class queue_t 
+{
+    DataBuffer *head = nullptr;      // We add at tail and remove from head
+    DataBuffer *tail = nullptr;
+
+    unsigned count = 0;
+    unsigned limit = 0;
     
-    unsigned        first;
-    unsigned        last;
     CriticalSection c_region;
-    int             active_buffers;
-    int             queue_size;
     InterruptableSemaphore data_avail;
-    Semaphore       free_space;
-    Semaphore       free_sl;
-    unsigned        signal_free_sl;
-
-    void removeElement(int ix);
+    Semaphore       free_sl;              // Signalled when (a) someone is waiting for it and (b) count changes from >= limit to < limit
+    unsigned        signal_free_sl = 0;   // Number of people waiting in free_sl. Only updated within critical section
     
 public: 
     void interrupt();
-    void pushOwn(roxiemem::DataBuffer *buffer);
-    roxiemem::DataBuffer *pop(bool block);
+    void pushOwn(DataBuffer *buffer);
+    DataBuffer *pop(bool block);
     bool dataQueued(const void *key, PKT_CMP_FUN pkCmpFn);
     unsigned removeData(const void *key, PKT_CMP_FUN pkCmpFn);
-    int  free_slots(); //block if no free slots
-    void set_queue_size(unsigned int queue_size); //must be called immediately after constructor if default constructor is used
+    unsigned available();                // non-blocking
+    int  free_slots();                   // block if no free slots
+    void set_queue_size(unsigned limit); //must be called immediately after constructor if default constructor is used
     queue_t(unsigned int queue_size);
-    queue_t();
+    queue_t() {};
     ~queue_t();
-    inline int capacity() const { return queue_size; }
+    inline int capacity() const { return limit; }
 };
 
 
@@ -203,13 +228,17 @@ struct UdpPermitToSendMsg
 {
     flowType::flowCmd cmd;
     unsigned short max_data;
+    sequence_t flowSeq;
     ServerIdentifier destNode;
+    PacketTracker seen;
 };
 
 struct UdpRequestToSendMsg
 {
     flowType::flowCmd cmd;
     unsigned short packets;
+    sequence_t sendSeq;
+    sequence_t flowSeq;
     ServerIdentifier sourceNode;
 };
 
@@ -232,4 +261,6 @@ inline bool checkTraceLevel(unsigned category, unsigned level)
 {
     return (udpTraceLevel >= level);
 }
+
+
 #endif

+ 1 - 1
roxie/udplib/udptopo.cpp

@@ -85,7 +85,7 @@ bool ChannelInfo::otherAgentHasPriority(unsigned priorityHash, unsigned otherAge
     return false;
 }
 
-static unsigned *createNewNodeHealthScore(const ServerIdentifier &)
+static unsigned *createNewNodeHealthScore(const ServerIdentifier)
 {
     return new unsigned(initIbytiDelay);
 }

+ 361 - 327
roxie/udplib/udptrr.cpp

@@ -47,6 +47,14 @@ using roxiemem::DataBuffer;
 using roxiemem::IRowManager;
 
 unsigned udpRetryBusySenders = 0; // seems faster with 0 than 1 in my testing on small clusters and sustained throughput
+unsigned udpMaxPendingPermits;
+
+RelaxedAtomic<unsigned> flowPermitsSent = {0};
+RelaxedAtomic<unsigned> flowRequestsReceived = {0};
+RelaxedAtomic<unsigned> dataPacketsReceived = {0};
+static unsigned lastFlowPermitsSent = 0;
+static unsigned lastFlowRequestsReceived = 0;
+static unsigned lastDataPacketsReceived = 0;
 
 static byte key[32] = {
     0xf7, 0xe8, 0x79, 0x40, 0x44, 0x16, 0x66, 0x18, 0x52, 0xb8, 0x18, 0x6e, 0x76, 0xd1, 0x68, 0xd3,
@@ -60,7 +68,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
      * 1. receive_receive_flow (priority 3)
      *     - waits for packets on flow port
      *     - maintains list of nodes that have pending requests
-     *     - sends ok_to_send to one sender at a time
+     *     - sends ok_to_send to one sender (or more) at a time
      * 2. receive_sniffer (default priority 3, configurable)
      *     - waits for packets on sniffer port
      *     - updates information about what other node are currently up to
@@ -73,7 +81,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
      *     - used to have an option to perform collation on this thread but a bad idea:
      *        - can block (ends up in memory manager via attachDataBuffer).
      *        - Does not apply back pressure
-     *     - Just enqueues them. We don't give permission to send more than the queue can hold.
+     *     - Just enqueues them. We don't give permission to send more than the queue can hold, but it's a soft limit
      * 4. PacketCollator (standard priority)
      *     - dequeues packets
      *     - collates packets
@@ -87,15 +95,13 @@ class CReceiveManager : implements IReceiveManager, public CInterface
      * there's a good chance that socket buffer will have room). But we can't legislate for network issues.
      *
      * What packets can be lost?
-     * 1. Data packets - handled via retrying the whole query (not ideal). But will also leave the inflight count wrong. We correct it any time
-     *    the data socket times out but that may not be good enough.
+     * 1. Data packets - handled via sliding window of resendable packets (or by retrying whole query after a timeout, of resend logic disabled)
      * 2. RequestToSend - the sender's resend thread checks periodically. There's a short initial timeout for getting a reply (either "request_received"
      *    or "okToSend"), then a longer timeout for actually sending.
      * 3. OkToSend - there is a timeout after which the permission is considered invalid (based on how long it SHOULD take to send them).
      *    The requestToSend retry mechanism would then make sure retried.
      *    MORE - if I don't get a response from OkToSend I should assume lost and requeue it.
      * 4. complete - covered by same timeout as okToSend. A lost complete will mean incoming data to that node stalls for the duration of this timeout,
-     *    and will also leave inflight count out-of-whack.
      * 4. Sniffers - expire anyway
      *
      */
@@ -116,11 +122,21 @@ class CReceiveManager : implements IReceiveManager, public CInterface
         // Used only by receive_flow thread
         IpAddress dest;
         ISocket *flowSocket = nullptr;
+        UdpSenderEntry *prevSender = nullptr;  // Used to form list of all senders that have outstanding requests
         UdpSenderEntry *nextSender = nullptr;  // Used to form list of all senders that have outstanding requests
+        flowType::flowCmd state = flowType::send_completed;    // Meaning I'm not on any queue
+        sequence_t flowSeq = 0;                // the sender's most recent flow sequence number
+        sequence_t sendSeq = 0;                // the sender's most recent sequence number from request-to-send, representing sequence number of next packet it will send
         unsigned timeouts = 0;
+        unsigned requestTime = 0;              // When we received the active requestToSend
+        unsigned timeStamp = 0;                // When we last sent okToSend
+
+    private:
+        // Updated by receive_data thread, read atomically by receive_flow
+        mutable CriticalSection psCrit;
+        PacketTracker packetsSeen;
 
-        // Set by sniffer, used by receive_flow. But races are unimportant
-        unsigned timeStamp = 0;               // When it was marked busy (0 means not busy)
+    public:
 
         UdpSenderEntry(const IpAddress &_dest, unsigned port) : dest(_dest)
         {
@@ -136,50 +152,107 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                 flowSocket->Release();
             }
         }
+        bool noteSeen(UdpPacketHeader &hdr)
+        {
+            if (udpResendEnabled)
+            {
+                CriticalBlock b(psCrit);
+                return packetsSeen.noteSeen(hdr);
+            }
+            else
+                return false;
+        }
 
         inline void noteDone()
         {
             timeouts = 0;
         }
 
-        inline bool retryOnTimeout()
+        bool canSendAny() const
         {
-            ++timeouts;
-            if (udpTraceLevel)
+            // We can send some if (a) the first available new packet is less than TRACKER_BITS above the first unreceived packet or
+            // (b) we are assuming arrival in order, and there are some marked seen that are > first unseen OR
+            // (c) the oldest in-flight packet has expired
+            if (!udpResendEnabled)
+                return true;
             {
-                StringBuffer s;
-                DBGLOG("Timed out %d times waiting for send_done from %s", timeouts, dest.getIpText(s).str());
+                CriticalBlock b(psCrit);
+                if (packetsSeen.canRecord(sendSeq))
+                    return true;
+                if (udpAssumeSequential && packetsSeen.hasGaps())
+                    return true;
             }
-            if (udpMaxRetryTimedoutReqs && (timeouts >= udpMaxRetryTimedoutReqs))
+            if (msTick()-requestTime > udpResendTimeout)
+                return true;
+            return false;
+        }
+
+        void acknowledgeRequest(const IpAddress &returnAddress, sequence_t _flowSeq, sequence_t _sendSeq)
+        {
+            if (flowSeq==_flowSeq)
             {
-                if (udpTraceLevel)
-                    DBGLOG("Abandoning");
-                timeouts = 0;
-                return false;
+                // It's a duplicate request-to-send - ignore it? Or assume it means they lost our ok-to-send ? MORE - probably depends on state
+                if (udpTraceLevel || udpTraceFlow)
+                {
+                    StringBuffer s;
+                    DBGLOG("UdpFlow: ignoring duplicate requestToSend %" SEQF "u from node %s", _flowSeq, dest.getIpText(s).str());
+                }
+                return;
             }
-            else
+            flowSeq = _flowSeq;
+            sendSeq = _sendSeq;
+            requestTime = msTick();
+            try
             {
-                if (udpTraceLevel)
-                    DBGLOG("Retrying");
-                return true;
+                UdpPermitToSendMsg msg;
+                msg.cmd = flowType::request_received;
+                msg.flowSeq = _flowSeq;
+                msg.destNode = returnAddress;
+                msg.max_data = 0;
+                if (udpResendEnabled)
+                {
+                    CriticalBlock b(psCrit);
+                    msg.seen = packetsSeen.copy();
+                }
+
+                if (udpTraceLevel > 3 || udpTraceFlow)
+                {
+                    StringBuffer ipStr;
+                    DBGLOG("UdpReceiver: sending request_received msg seq %" SEQF "u to node=%s", _flowSeq, dest.getIpText(ipStr).str());
+                }
+                flowSocket->write(&msg, udpResendEnabled ? sizeof(UdpPermitToSendMsg) : offsetof(UdpPermitToSendMsg, seen));
+                flowPermitsSent++;
+
+            }
+            catch(IException *e)
+            {
+                StringBuffer d, s;
+                DBGLOG("UdpReceiver: acknowledgeRequest failed node=%s %s", dest.getIpText(d).str(), e->errorMessage(s).str());
+                e->Release();
             }
         }
 
-
         void requestToSend(unsigned maxTransfer, const IpAddress &returnAddress)
         {
             try
             {
                 UdpPermitToSendMsg msg;
                 msg.cmd = maxTransfer ? flowType::ok_to_send : flowType::request_received;
+                msg.flowSeq = flowSeq;
                 msg.destNode = returnAddress;
                 msg.max_data = maxTransfer;
-                if (udpTraceLevel > 1)
+                if (udpResendEnabled)
+                {
+                    CriticalBlock b(psCrit);
+                    msg.seen = packetsSeen.copy();
+                }
+                if (udpTraceLevel > 3 || udpTraceFlow)
                 {
                     StringBuffer ipStr;
-                    DBGLOG("UdpReceiver: sending ok_to_send %d msg to node=%s", maxTransfer, dest.getIpText(ipStr).str());
+                    DBGLOG("UdpReceiver: sending ok_to_send %u msg seq %" SEQF "u to node=%s", maxTransfer, flowSeq, dest.getIpText(ipStr).str());
                 }
-                flowSocket->write(&msg, sizeof(UdpPermitToSendMsg));
+                flowSocket->write(&msg, udpResendEnabled ? sizeof(UdpPermitToSendMsg) : offsetof(UdpPermitToSendMsg, seen));
+                flowPermitsSent++;
             }
             catch(IException *e)
             {
@@ -189,127 +262,73 @@ class CReceiveManager : implements IReceiveManager, public CInterface
             }
         }
 
-        bool is_busy()
-        {
-            if (timeStamp)
-            {
-                unsigned now = msTick();
-                if ((now - timeStamp) < 10)
-                    return true;
-                // MORE - might be interesting to note how often this happens. Why 10 milliseconds?
-                timeStamp = 0;      // No longer considered busy
-            }
-            return false;
-        }
-
-        void update(bool busy)
-        {
-            if (busy)
-                timeStamp = msTick();
-            else
-                timeStamp = 0;
-        }
-
     };
 
-    IpMapOf<UdpSenderEntry> sendersTable;
-
-    class receive_sniffer : public Thread
+    class SenderList
     {
-        ISocket     *sniffer_socket;
-        unsigned snifferPort;
-        IpAddress snifferIP;
-        CReceiveManager &parent;
-        std::atomic<bool> running = { false };
-        
-        inline void update(const IpAddress &ip, bool busy)
-        {
-            if (udpTraceLevel > 5)
-            {
-                StringBuffer s;
-                DBGLOG("UdpReceive: sniffer sets is_busy[%s] to %d", ip.getIpText(s).str(), busy);
-            }
-            parent.sendersTable[ip].update(busy);
-        }
+        UdpSenderEntry *head = nullptr;
+        UdpSenderEntry *tail = nullptr;
+        unsigned numEntries = 0;
 
-    public:
-        receive_sniffer(CReceiveManager &_parent, unsigned _snifferPort, const IpAddress &_snifferIP)
-          : Thread("udplib::receive_sniffer"), parent(_parent), snifferPort(_snifferPort), snifferIP(_snifferIP), running(false)
+        void checkListIsValid(UdpSenderEntry *lookfor)
         {
-            sniffer_socket = ISocket::multicast_create(snifferPort, snifferIP, multicastTTL);
-            if (check_max_socket_read_buffer(udpFlowSocketsSize) < 0)
-                throw MakeStringException(ROXIE_UDP_ERROR, "System Socket max read buffer is less than %i", udpFlowSocketsSize);
-            sniffer_socket->set_receive_buffer_size(udpFlowSocketsSize);
-            if (udpTraceLevel)
+#ifdef _DEBUG
+            UdpSenderEntry *prev = nullptr;
+            UdpSenderEntry *finger = head;
+            unsigned length = 0;
+            while (finger)
             {
-                StringBuffer ipStr;
-                snifferIP.getIpText(ipStr);
-                size32_t actualSize = sniffer_socket->get_receive_buffer_size();
-                DBGLOG("UdpReceiver: receive_sniffer port open %s:%i sockbuffsize=%d actual %d", ipStr.str(), snifferPort, udpFlowSocketsSize, actualSize);
+                if (finger==lookfor)
+                    lookfor = nullptr;
+                prev = finger;
+                finger = finger->nextSender;
+                length++;
             }
+            assert(prev == tail);
+            assert(lookfor==nullptr);
+            assert(numEntries==length);
+#endif
         }
-
-        ~receive_sniffer() 
+    public:
+        unsigned length() const { return numEntries; }
+        operator UdpSenderEntry *() const
         {
-            running = false;
-            if (sniffer_socket) sniffer_socket->close();
-            join();
-            if (sniffer_socket) sniffer_socket->Release();
+            return head;
         }
-
-        virtual int run() 
+        void append(UdpSenderEntry *sender)
         {
-            DBGLOG("UdpReceiver: sniffer started");
-            if (udpSnifferReadThreadPriority)
+            if (tail)
             {
-#ifdef __linux__
-                setLinuxThreadPriority(udpSnifferReadThreadPriority);
-#else
-                adjustPriority(1);
-#endif
+                tail->nextSender = sender;
+                sender->prevSender = tail;
+                tail = sender;
             }
-            while (running) 
+            else
             {
-                try 
-                {
-                    unsigned int res;
-                    sniff_msg msg;
-                    sniffer_socket->read(&msg, 1, sizeof(msg), res, 5);
-                    update(msg.nodeIp.getIpAddress(), msg.cmd == sniffType::busy);
-                }
-                catch (IException *e) 
-                {
-                    if (running && e->errorCode() != JSOCKERR_timeout_expired)
-                    {
-                        StringBuffer s;
-                        DBGLOG("UdpReceiver: receive_sniffer::run read failed %s", e->errorMessage(s).str());
-                        MilliSleep(1000);
-                    }
-                    e->Release();
-                }
-                catch (...) 
-                {
-                    DBGLOG("UdpReceiver: receive_sniffer::run unknown exception port %u", parent.data_port);
-                    if (sniffer_socket) {
-                        sniffer_socket->Release();
-                        sniffer_socket = ISocket::multicast_create(snifferPort, snifferIP, multicastTTL);
-                    }
-                    MilliSleep(1000);
-                }
+                head = tail = sender;
             }
-            return 0;
+            numEntries++;
+            checkListIsValid(sender);
         }
-
-        virtual void start()
+        void remove(UdpSenderEntry *sender)
         {
-            if (udpSnifferEnabled)
-            {
-                running = true;
-                Thread::start();
-            }
+            if (sender->prevSender)
+                sender->prevSender->nextSender = sender->nextSender;
+            else
+                head = sender->nextSender;
+            if (sender->nextSender)
+                sender->nextSender->prevSender = sender->prevSender;
+            else
+                tail = sender->prevSender;
+            sender->prevSender = nullptr;
+            sender->nextSender = nullptr;
+            numEntries--;
+            checkListIsValid(nullptr);
         }
     };
 
+    IpMapOf<UdpSenderEntry> sendersTable;
+
     class receive_receive_flow : public Thread 
     {
         CReceiveManager &parent;
@@ -318,128 +337,79 @@ class CReceiveManager : implements IReceiveManager, public CInterface
         const unsigned maxSlotsPerSender;
         std::atomic<bool> running = { false };
         
-        UdpSenderEntry *pendingRequests = nullptr;   // Head of list of people wanting permission to send
-        UdpSenderEntry *lastPending = nullptr;       // Tail of list
-        UdpSenderEntry *currentRequester = nullptr;  // Who currently has permission to send
+        SenderList pendingRequests;     // List of people wanting permission to send
+        SenderList pendingPermits;      // List of people given permission to send
 
-        void enqueueRequest(UdpSenderEntry *requester)
+        void enqueueRequest(UdpSenderEntry *requester, sequence_t flowSeq, sequence_t sendSeq)
         {
-            if ((lastPending == requester) || (requester->nextSender != nullptr)) // Already on queue
-            {
-                if (udpTraceLevel > 1)
-                {
-                    StringBuffer s;
-                    DBGLOG("UdpReceive: received duplicate request_to_send from node %s", requester->dest.getIpText(s).str());
-                }
-                // We can safely ignore these
-            }
-            else
+            switch (requester->state)
             {
-                // Chain it onto list
-                if (pendingRequests != nullptr)
-                    lastPending->nextSender = requester;
-                else
-                    pendingRequests = requester;
-                lastPending = requester;
+            case flowType::ok_to_send:
+                pendingPermits.remove(requester);
+                // Fall through
+            case flowType::send_completed:
+                pendingRequests.append(requester);
+                requester->state = flowType::request_to_send;
+                break;
+            case flowType::request_to_send:
+                // Perhaps the sender never saw our permission? Already on queue...
+                break;
+            default:
+                // Unexpected state, should never happen!
+                DBGLOG("ERROR: Unexpected state %s in enqueueRequest", flowType::name(requester->state));
+                throwUnexpected();
+                break;
             }
-            requester->requestToSend(0, myNode.getIpAddress());  // Acknowledge receipt of the request
-        }
+            requester->acknowledgeRequest(myNode.getIpAddress(), flowSeq, sendSeq);  // Acknowledge receipt of the request
 
-        unsigned okToSend(UdpSenderEntry *requester)
-        {
-            assert (!currentRequester);
-            unsigned max_transfer = parent.free_slots();
-            if (max_transfer > maxSlotsPerSender)
-                max_transfer = maxSlotsPerSender;
-            unsigned timeout = ((max_transfer * DATA_PAYLOAD) / 100) + 10; // in ms assuming mtu package size with 100x margin on 100 Mbit network // MORE - hideous!
-            currentRequester = requester;
-            requester->requestToSend(max_transfer, myNode.getIpAddress());
-            assert(timeout >= 10 && timeout <= 20000);
-            return timeout;
         }
 
-        bool noteDone(UdpSenderEntry *requester)
+        void okToSend(UdpSenderEntry *requester, unsigned slots)
         {
-            if (requester != currentRequester)
+            switch (requester->state)
             {
-                // This should not happen - I suppose it COULD if we receive a delayed message for a transfer we had earlier given up on.
-                // Best response is to ignore it if so
-                DBGLOG("Received completed message is not from current sender!");
-                // MORE - should we set currentRequester NULL here? debatable.
-                return false;
+            case flowType::request_to_send:
+                pendingRequests.remove(requester);
+                // Fall through
+            case flowType::send_completed:
+                pendingPermits.append(requester);
+                requester->state = flowType::ok_to_send;
+                break;
+            case flowType::ok_to_send:
+                // Perhaps the sender never saw our permission? Already on queue...
+                break;
+            default:
+                // Unexpected state, should never happen!
+                DBGLOG("ERROR: Unexpected state %s in okToSend", flowType::name(requester->state));
+                throwUnexpected();
+                break;
             }
-            currentRequester->noteDone();
-            currentRequester = nullptr;
-            return true;
-        }
-
-        unsigned timedOut(UdpSenderEntry *requester)
-        {
-            // MORE - this will retry indefinitely if agent in question is dead
-            // As coded, this rescinds the permission to send that just timed out and tells the next person to have a go
-            // Thus leading to "Received completed message is not from current sender" messages the the send was in flight
-            currentRequester = nullptr;
-            if (requester->retryOnTimeout())
-                enqueueRequest(requester);
-            if (pendingRequests)
-                return sendNextOk();
-            else
-                return 5000;
+            requester->timeStamp = msTick();
+            requester->requestToSend(slots, myNode.getIpAddress());
         }
 
-
-        unsigned sendNextOk()
+        void noteDone(UdpSenderEntry *requester, UdpRequestToSendMsg &msg)
         {
-            assert(pendingRequests != nullptr);
-            if (udpSnifferEnabled)
+            switch (requester->state)
             {
-                //find first non-busy sender, and move it to front of sendersTable request chain
-                int retry = udpRetryBusySenders;
-                UdpSenderEntry *finger = pendingRequests;
-                UdpSenderEntry *prev = nullptr;
-                for (;;)
-                {
-                    if (finger->is_busy())
-                    {
-                        prev = finger;
-                        finger = finger->nextSender;
-                        if (finger==nullptr)
-                        {
-                            if (retry--)
-                            {
-                                if (udpTraceLevel > 4)
-                                    DBGLOG("UdpReceive: All senders busy");
-                                MilliSleep(1);
-                                finger = pendingRequests;
-                                prev = nullptr;
-                            }
-                            else
-                                break; // give up and use first anyway
-                        }
-                    }
-                    else
-                    {
-                        if (finger != pendingRequests)
-                        {
-                            if (finger == lastPending)
-                                lastPending = prev;
-                            assert(prev != nullptr);
-                            prev->nextSender = finger->nextSender;
-                            finger->nextSender = pendingRequests;
-                            pendingRequests = finger;
-                        }
-                        break;
-                    }
-                }
+            case flowType::request_to_send:
+                // A bit unexpected but will happen if our previous permission timed out and we pushed to back of the requests queue
+                pendingRequests.remove(requester);
+                break;
+            case flowType::ok_to_send:
+                pendingPermits.remove(requester);
+                break;
+            case flowType::send_completed:
+                DBGLOG("Duplicate completed message received: msg %s flowSeq %" SEQF "u sendSeq %" SEQF "u. Ignoring", flowType::name(msg.cmd), msg.flowSeq, msg.sendSeq);
+                break;
+            default:
+                // Unexpected state, should never happen! Ignore.
+                DBGLOG("ERROR: Unexpected state %s in noteDone", flowType::name(requester->state));
+                break;
             }
-            UdpSenderEntry *nextSender = pendingRequests;
-            // remove from front of queue
-            if (pendingRequests==lastPending)
-                lastPending = nullptr;
-            pendingRequests = nextSender->nextSender;
-            nextSender->nextSender = nullptr;
-            return okToSend(nextSender);
+            requester->state = flowType::send_completed;
         }
+
     public:
         receive_receive_flow(CReceiveManager &_parent, unsigned flow_p, unsigned _maxSlotsPerSender)
         : Thread("UdpLib::receive_receive_flow"), parent(_parent), flow_port(flow_p), maxSlotsPerSender(_maxSlotsPerSender)
@@ -475,81 +445,120 @@ class CReceiveManager : implements IReceiveManager, public CInterface
             adjustPriority(1);
         #endif
             UdpRequestToSendMsg msg;
-            unsigned timeoutExpires = msTick() + 5000;
+            unsigned timeout = 5000;
             while (running)
             {
                 try
                 {
-                    const unsigned l = sizeof(msg);
-                    unsigned int res ;
-                    unsigned now = msTick();
-                    if (now >= timeoutExpires)
+                    if (udpTraceLevel > 5 || udpTraceFlow)
                     {
-                        if (currentRequester)
-                            timeoutExpires = now + timedOut(currentRequester);
-                        else
-                            timeoutExpires = now + 5000;
+                        DBGLOG("UdpReceiver: wait_read(%u)", timeout);
                     }
-                    else
+                    bool dataAvail = flow_socket->wait_read(timeout);
+                    if (dataAvail)
                     {
-                        flow_socket->readtms(&msg, l, l, res, timeoutExpires-now);
-                        unsigned newTimeout = 0;
+                        const unsigned l = sizeof(msg);
+                        unsigned int res ;
+                        flow_socket->readtms(&msg, l, l, res, 0);
+                        flowRequestsReceived++;
                         assert(res==l);
-                        if (udpTraceLevel > 5)
+                        if (udpTraceLevel > 5 || udpTraceFlow)
                         {
                             StringBuffer ipStr;
-                            DBGLOG("UdpReceiver: received %s msg from node=%s", flowType::name(msg.cmd), msg.sourceNode.getTraceText(ipStr).str());
+                            DBGLOG("UdpReceiver: received %s msg flowSeq %" SEQF "u sendSeq %" SEQF "u from node=%s", flowType::name(msg.cmd), msg.flowSeq, msg.sendSeq, msg.sourceNode.getTraceText(ipStr).str());
                         }
                         UdpSenderEntry *sender = &parent.sendersTable[msg.sourceNode];
                         switch (msg.cmd)
                         {
                         case flowType::request_to_send:
-                            if (pendingRequests || currentRequester)
-                                enqueueRequest(sender);   // timeoutExpires does not change - there's still an active request. We have not given a new permission
-                            else
-                                newTimeout = okToSend(sender);
+                            enqueueRequest(sender, msg.flowSeq, msg.sendSeq);
                             break;
 
                         case flowType::send_completed:
-                            parent.inflight += msg.packets;
-                            if (noteDone(sender) && pendingRequests)   // This && looks wrong - noteDone returning false should mean we haven't seen the completed we wanted - so current timeout still applies. Or the one below is wrong...
-                                newTimeout = sendNextOk();
-                            else
-                                newTimeout = 5000;
+                            noteDone(sender, msg);
                             break;
 
                         case flowType::request_to_send_more:
-                            parent.inflight += msg.packets;
-                            if (noteDone(sender))
+                            noteDone(sender, msg);
+                            enqueueRequest(sender, msg.flowSeq+1, msg.sendSeq);
+                            break;
+
+                        default:
+                            DBGLOG("UdpReceiver: received unrecognized flow control message cmd=%i", msg.cmd);
+                        }
+                    }
+                    timeout = 5000;   // The default timeout is 5 seconds if nothing is waiting for response...
+                    if (pendingPermits)
+                    {
+                        unsigned now = msTick();
+                        for (UdpSenderEntry *finger = pendingPermits; finger != nullptr; )
+                        {
+                            if (now - finger->timeStamp >= udpRequestToSendAckTimeout)
                             {
-                                if (pendingRequests)
+                                if (udpTraceLevel || udpTraceFlow || udpTraceTimeouts)
                                 {
-                                    enqueueRequest(sender);
-                                    newTimeout = sendNextOk();
+                                    StringBuffer s;
+                                    DBGLOG("permit to send %" SEQF "u to node %s timed out after %u ms, rescheduling", finger->flowSeq, finger->dest.getIpText(s).str(), udpRequestToSendAckTimeout);
                                 }
-                                else
-                                    newTimeout = okToSend(sender);
+                                UdpSenderEntry *next = finger->nextSender;
+                                pendingPermits.remove(finger);
+                                pendingRequests.append(finger);
+                                finger->state = flowType::request_to_send;  // Go to the back of the queue  - MORE - lets have some code to eventually give up! Or just give up here?
+                                finger = next;
+                            }
+                            else
+                            {
+                                timeout = finger->timeStamp + udpRequestToSendAckTimeout - now;
+                                break;
                             }
+                        }
+                    }
+                    unsigned slots = parent.input_queue->available();
+                    bool anyCanSend = false;
+                    for (UdpSenderEntry *finger = pendingRequests; finger != nullptr; finger = finger->nextSender)
+                    {
+                        if (pendingPermits.length()>=udpMaxPendingPermits)
                             break;
+                        if (!slots) // || slots<minSlotsPerSender)
+                        {
+                            timeout = 1;   // Slots should free up very soon!
+                            break;
+                        }
+                        // If requester would not be able to send me any (because of the ones in flight) then wait
 
-                        default:
-                            DBGLOG("UdpReceiver: received unrecognized flow control message cmd=%i", msg.cmd);
+                        if (finger->canSendAny())
+                        {
+                            unsigned requestSlots = slots;
+                            if (requestSlots>maxSlotsPerSender)
+                                requestSlots = maxSlotsPerSender;
+                            okToSend(finger, requestSlots);
+                            slots -= requestSlots;
+                            if (timeout > udpRequestToSendAckTimeout)
+                                timeout = udpRequestToSendAckTimeout;
+                            anyCanSend = true;
+                        }
+                        else
+                        {
+                            if (udpTraceFlow)
+                            {
+                                StringBuffer s;
+                                DBGLOG("Sender %s can't be given permission to send yet as resend buffer full", finger->dest.getIpText(s).str());
+                            }
                         }
-                        if (newTimeout)
-                            timeoutExpires = msTick() + newTimeout;
+                    }
+                    if (slots && pendingRequests.length() && pendingPermits.length()<udpMaxPendingPermits && !anyCanSend)
+                    {
+                        if (udpTraceFlow)
+                        {
+                            StringBuffer s;
+                            DBGLOG("All senders blocked by resend buffers");
+                        }
+                        timeout = 1; // Hopefully one of the senders should unblock soon
                     }
                 }
                 catch (IException *e)
                 {
-                    // MORE - timeouts need some attention
-                    if (e->errorCode() == JSOCKERR_timeout_expired)
-                    {
-                        // A timeout implies that there is an active permission to send, but nothing has happened.
-                        // Could be a really busy (or crashed) agent, could be a lost packet
-                        if (currentRequester)
-                            timeoutExpires = msTick() + timedOut(currentRequester);
-                    }
-                    else if (running)
+                    if (running)
                     {
                         StringBuffer s;
                         DBGLOG("UdpReceiver: failed %i %s", flow_port, e->errorMessage(s).str());
@@ -563,6 +572,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
             }
             return 0;
         }
+
     };
 
     class receive_data : public Thread 
@@ -620,6 +630,8 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                 max_payload = DATA_PAYLOAD+16;  // AES function may add up to 16 bytes of padding
                 encryptedBuffer = encryptData.reserveTruncate(max_payload);
             }
+            unsigned lastOOOReport = 0;
+            unsigned lastPacketsOOO = 0;
             while (running) 
             {
                 try 
@@ -633,27 +645,35 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                     }
                     else
                         receive_socket->read(b->data, 1, DATA_PAYLOAD, res, 5);
-                    parent.inflight--;
-                    // MORE - reset it to zero if we fail to read data, or if avail_read returns 0.
+                    dataPacketsReceived++;
                     UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
                     assert(hdr.length == res && hdr.length > sizeof(hdr));
-                    if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it
+                    UdpSenderEntry *sender = &parent.sendersTable[hdr.node];
+                    if (sender->noteSeen(hdr))
                     {
-                        StringBuffer s;
-                        DBGLOG("UdpReceiver: %u bytes received, node=%s", res, hdr.node.getTraceText(s).str());
+                        if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it
+                        {
+                            StringBuffer s;
+                            DBGLOG("UdpReceiver: discarding unwanted resent packet %" SEQF "u %x from %s", hdr.sendSeq, hdr.pktSeq, hdr.node.getTraceText(s).str());
+                        }
+                        parent.noteDuplicate(b);
+                        ::Release(b);
+                    }
+                    else
+                    {
+                        if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it
+                        {
+                            StringBuffer s;
+                            DBGLOG("UdpReceiver: %u bytes received packet %" SEQF "u %x from %s", res, hdr.sendSeq, hdr.pktSeq, hdr.node.getTraceText(s).str());
+                        }
+                        parent.input_queue->pushOwn(b);
                     }
-                    parent.input_queue->pushOwn(b);
                     b = NULL;
                 }
                 catch (IException *e) 
                 {
                     ::Release(b);
                     b = NULL;
-                    if (udpTraceLevel > 1 && parent.inflight)
-                    {
-                        DBGLOG("resetting inflight to 0 (was %d)", parent.inflight.load(std::memory_order_relaxed));
-                    }
-                    parent.inflight = 0;
                     if (running && e->errorCode() != JSOCKERR_timeout_expired)
                     {
                         StringBuffer s;
@@ -669,6 +689,34 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                     DBGLOG("UdpReceiver: receive_data::run unknown exception port %u", parent.data_port);
                     MilliSleep(1000);
                 }
+                if (udpStatsReportInterval)
+                {
+                    unsigned now = msTick();
+                    if (now-lastOOOReport > udpStatsReportInterval)
+                    {
+                        lastOOOReport = now;
+                        if (packetsOOO > lastPacketsOOO)
+                        {
+                            DBGLOG("%u more packets received out-of-order by this server (%u total)", packetsOOO-lastPacketsOOO, packetsOOO-0);
+                            lastPacketsOOO = packetsOOO;
+                        }
+                        if (flowRequestsReceived > lastFlowRequestsReceived)
+                        {
+                            DBGLOG("%u more flow requests received by this server (%u total)", flowRequestsReceived-lastFlowRequestsReceived, flowRequestsReceived-0);
+                            lastFlowRequestsReceived = flowRequestsReceived;
+                        }
+                        if (flowPermitsSent > lastFlowPermitsSent)
+                        {
+                            DBGLOG("%u more flow permits sent by this server (%u total)", flowPermitsSent-lastFlowPermitsSent, flowPermitsSent-0);
+                            lastFlowPermitsSent = flowPermitsSent;
+                        }
+                        if (dataPacketsReceived > lastDataPacketsReceived)
+                        {
+                            DBGLOG("%u more data packets received by this server (%u total)", dataPacketsReceived-lastDataPacketsReceived, dataPacketsReceived-0);
+                            lastDataPacketsReceived = dataPacketsReceived;
+                        }
+                    }
+                }
             }
             ::Release(b);
             return 0;
@@ -700,7 +748,6 @@ class CReceiveManager : implements IReceiveManager, public CInterface
     int                  input_queue_size;
     receive_receive_flow *receive_flow;
     receive_data         *data;
-    receive_sniffer      *sniffer;
     
     int                  receive_flow_port;
     int                  data_port;
@@ -711,62 +758,26 @@ class CReceiveManager : implements IReceiveManager, public CInterface
     typedef std::map<ruid_t, CMessageCollator*> uid_map;
     uid_map         collators;
     SpinLock collatorsLock; // protects access to collators map
-    // inflight is my best guess at how many packets may be sitting in socket buffers somewhere.
-    // Incremented when I am notified about packets having been sent, decremented as they are read off the socket.
-    std::atomic<int> inflight = {0};
-
-    int free_slots()
-    {
-        int free = input_queue->free_slots();  // May block if collator thread is not removing from my queue fast enough
-        // Ignore inflight if negative (can happen because we read some inflight before we see the send_done)
-        int i = inflight.load(std::memory_order_relaxed);
-        if (i < 0)
-        {
-            if (i < -input_queue->capacity())
-            {
-                if (udpTraceLevel)
-                    DBGLOG("UdpReceiver: ERROR: inflight has more packets in queue but not counted (%d) than queue capacity (%d)", -i, input_queue->capacity());  // Should never happen
-                inflight = -input_queue->capacity();
-            }
-            i = 0;
-        }
-        else if (i >= free)
-        {
-            if ((i > free) && (udpTraceLevel))
-                DBGLOG("UdpReceiver: ERROR: more packets in flight (%d) than slots free (%d)", i, free);  // Should never happen
-            inflight = i = free-1;
-        }
-        if (i && udpTraceLevel > 1)
-            DBGLOG("UdpReceiver: adjusting free_slots to allow for %d in flight", i);
-        return free - i;
-    }
 
-    public:
+  public:
     IMPLEMENT_IINTERFACE;
     CReceiveManager(int server_flow_port, int d_port, int client_flow_port, int snif_port, const IpAddress &multicast_ip, int queue_size, int m_slot_pr_client, bool _encrypted)
-        : collatorThread(*this), sendersTable([client_flow_port](const ServerIdentifier &ip) { return new UdpSenderEntry(ip.getIpAddress(), client_flow_port);})
+        : collatorThread(*this), sendersTable([client_flow_port](const ServerIdentifier ip) { return new UdpSenderEntry(ip.getIpAddress(), client_flow_port);})
     {
 #ifndef _WIN32
         setpriority(PRIO_PROCESS, 0, -15);
 #endif
-        encrypted = _encrypted;
         receive_flow_port = server_flow_port;
         data_port = d_port;
         input_queue_size = queue_size;
         input_queue = new queue_t(queue_size);
         data = new receive_data(*this);
         receive_flow = new receive_receive_flow(*this, server_flow_port, m_slot_pr_client);
-        if (udpSnifferEnabled)
-            sniffer = new receive_sniffer(*this, snif_port, multicast_ip);
-        else
-            sniffer = nullptr;
 
         running = true;
         collatorThread.start();
         data->start();
         receive_flow->start();
-        if (udpSnifferEnabled)
-            sniffer->start();
         MilliSleep(15);
     }
 
@@ -777,7 +788,6 @@ class CReceiveManager : implements IReceiveManager, public CInterface
         collatorThread.join();
         delete data;
         delete receive_flow;
-        delete sniffer;
         delete input_queue;
     }
 
@@ -800,6 +810,29 @@ class CReceiveManager : implements IReceiveManager, public CInterface
             collatePacket(dataBuff);
         }
     }
+    void noteDuplicate(DataBuffer *dataBuff)
+    {
+        const UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
+        Linked <CMessageCollator> msgColl;
+        SpinBlock b(collatorsLock);
+        try
+        {
+            msgColl.set(collators[pktHdr->ruid]);
+        }
+        catch (IException *E)
+        {
+            EXCLOG(E);
+            E->Release();
+        }
+        catch (...)
+        {
+            IException *E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unexpected exception caught in CPacketCollator::run");
+            EXCLOG(E);
+            E->Release();
+        }
+        if (msgColl)
+            msgColl->noteDuplicate((pktHdr->pktSeq & UDP_PACKET_RESENT) != 0);
+    }
 
     void collatePacket(DataBuffer *dataBuff)
     {
@@ -852,7 +885,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
     virtual IMessageCollator *createMessageCollator(IRowManager *rowManager, ruid_t ruid)
     {
         CMessageCollator *msgColl = new CMessageCollator(rowManager, ruid);
-        if (udpTraceLevel >= 2)
+        if (udpTraceLevel > 2)
             DBGLOG("UdpReceiver: createMessageCollator %p %u", msgColl, ruid);
         {
             SpinBlock b(collatorsLock);
@@ -869,6 +902,7 @@ IReceiveManager *createReceiveManager(int server_flow_port, int data_port, int c
                                       bool encrypted)
 {
     assertex (maxSlotsPerSender <= (unsigned) udpQueueSize);
+    assertex (maxSlotsPerSender <= (unsigned) TRACKER_BITS);
     return new CReceiveManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, udpQueueSize, maxSlotsPerSender, encrypted);
 }
 

+ 310 - 52
roxie/udplib/udptrs.cpp

@@ -40,10 +40,11 @@ unsigned udpRequestToSendTimeout = 0; // value in milliseconds - 0 means calcula
 unsigned udpRequestToSendAckTimeout = 10; // value in milliseconds
 bool udpSnifferEnabled = false;
 
-using roxiemem::DataBuffer;
-// MORE - why use DataBuffers on output side?? We could use zeroCopy techniques if we had a dedicated memory area.
-// But using them on this side means we guarantee that the packets fit into databuffers on the other side... But so would matching their size
+#ifdef _DEBUG
+//#define TEST_DROPPED_PACKETS
+#endif
 
+using roxiemem::DataBuffer;
 /*
  *
  * There are 3 threads running to manage the data transfer from agent back to server:
@@ -79,15 +80,126 @@ using roxiemem::DataBuffer;
  *    - resend rts if we send data but there is some remaining
  */
 
+// UdpResentList keeps a copy of up to TRACKER_BITS previously sent packets so we can send them again
+RelaxedAtomic<unsigned> packetsResent;
+RelaxedAtomic<unsigned> flowRequestsSent;
+RelaxedAtomic<unsigned> flowPermitsReceived;
+RelaxedAtomic<unsigned> dataPacketsSent;
+
+unsigned udpResendTimeout;  // in millseconds
+bool udpResendEnabled;
+bool udpAssumeSequential;
+
+static unsigned lastResentReport = 0;
+static unsigned lastPacketsResent = 0;
+static unsigned lastFlowRequestsSent = 0;
+static unsigned lastFlowPermitsReceived = 0;
+static unsigned lastDataPacketsSent = 0;
+
+class UdpResendList
+{
+private:
+    DataBuffer *entries[TRACKER_BITS] = { nullptr };
+    unsigned timeSent[TRACKER_BITS] = { 0 };
+    sequence_t first = 0;
+    unsigned count = 0;  // number of non-null entries
+public:
+    void append(DataBuffer *buf)
+    {
+        UdpPacketHeader *header = (UdpPacketHeader*) buf->data;
+        sequence_t seq = header->sendSeq;
+        header->pktSeq |= UDP_PACKET_RESENT;
+        if (!count)
+        {
+            first = seq;
+        }
+        else if (seq - first >= TRACKER_BITS)
+        {
+            // This shouldn't happen if we have steps in place to block ending new until we are sure old have been delivered.
+            throwUnexpected();
+        }
+        unsigned idx = seq % TRACKER_BITS;
+        assert(entries[idx] == nullptr);
+        entries[idx] = buf;
+        timeSent[idx] = msTick();
+        count++;
+    }
+
+    // This function does two things:
+    // 1. Updates the circular buffer to release any packets that are confirmed delivered
+    // 2. Appends any packets that need resending to the toSend list
+
+    void noteRead(const PacketTracker &seen, std::vector<DataBuffer *> &toSend, unsigned space, unsigned nextSendSequence)
+    {
+        if (!count)
+            return;
+        unsigned now = msTick();
+        sequence_t seq = first;
+        unsigned checked = 0;
+        bool released = false;
+        while (checked < count && space)
+        {
+            unsigned idx = seq % TRACKER_BITS;
+            if (entries[idx])
+            {
+                UdpPacketHeader *header = (UdpPacketHeader*) entries[idx]->data;
+                assert(seq == header->sendSeq);
+                if (seen.hasSeen(header->sendSeq))
+                {
+                    ::Release(entries[idx]);
+                    entries[idx] = nullptr;
+                    count--;
+                    released = true;
+                }
+                else
+                {
+                    // The current table entry is not marked as seen by receiver. Should we resend it?
+                    if (now-timeSent[idx] >= udpResendTimeout ||    // Note that this will block us from sending newer packets, if we have reached limit of tracking.
+                        (udpAssumeSequential && (int)(seq - seen.lastSeen()) < 0))  // so we (optionally) assume any packet not received that is EARLIER than one that HAS been received is lost.
+                    {
+                        if (udpTraceLevel > 1)
+                            DBGLOG("Resending %" SEQF "u last sent %u ms ago", seq, now-timeSent[idx]);
+                        timeSent[idx] = now;
+                        packetsResent++;
+                        toSend.push_back(entries[idx]);
+                        space--;
+                    }
+                    checked++;
+                }
+            }
+            seq++;
+        }
+        if (released && count)
+        {
+            while (entries[first % TRACKER_BITS] == nullptr)
+                first++;
+        }
+    }
+    unsigned firstTracked() const
+    {
+        assert(count);                    // Meaningless to call this if count is 0
+        return first;
+    }
+    unsigned numActive() const
+    {
+        return count;
+    }
+    bool canRecord(unsigned seq) const
+    {
+        return (count==0 || seq - first < TRACKER_BITS);
+    }
+
+};
+
 static byte key[32] = {
     0xf7, 0xe8, 0x79, 0x40, 0x44, 0x16, 0x66, 0x18, 0x52, 0xb8, 0x18, 0x6e, 0x76, 0xd1, 0x68, 0xd3,
     0x87, 0x47, 0x01, 0xe6, 0x66, 0x62, 0x2f, 0xbe, 0xc1, 0xd5, 0x9f, 0x4a, 0x53, 0x27, 0xae, 0xa1,
 };
 
-
-
 class UdpReceiverEntry : public IUdpReceiverEntry
 {
+    UdpReceiverEntry() = delete;
+    UdpReceiverEntry ( const UdpReceiverEntry & ) = delete;
 private:
     queue_t *output_queue = nullptr;
     bool    initialized = false;
@@ -100,17 +212,17 @@ private:
     int     currentQNumPkts = 0;         // Current Queue Number of Consecutive Processed Packets.
     int     *maxPktsPerQ = nullptr;      // to minimise power function re-calc for every packet
 
-    void sendRequest(flowType::flowCmd cmd, unsigned packets )
+    void sendRequest(UdpRequestToSendMsg &msg)
     {
-        UdpRequestToSendMsg msg = { cmd, static_cast<unsigned short>(packets), sourceIP };
         try
         {
-            if (udpTraceLevel > 3)
+            if (udpTraceLevel > 3 || udpTraceFlow)
             {
                 StringBuffer s;
-                DBGLOG("UdpSender: sending flowType::%s msg to node=%s", flowType::name(cmd), ip.getIpText(s).str());
+                DBGLOG("UdpSender: sending flowType::%s msg %" SEQF "u flowSeq %" SEQF "u to node=%s", flowType::name(msg.cmd), msg.sendSeq, msg.flowSeq, ip.getIpText(s).str());
             }
             send_flow_socket->write(&msg, sizeof(UdpRequestToSendMsg));
+            flowRequestsSent++;
         }
         catch(IException *e)
         {
@@ -125,10 +237,11 @@ private:
     }
 
     const IpAddress sourceIP;
+    UdpResendList *resendList = nullptr;
 public:
     const IpAddress ip;
     unsigned timeouts = 0;      // Number of consecutive timeouts
-    unsigned requestExpiryTime = 0;
+    std::atomic<unsigned> requestExpiryTime = { 0 };  // Updated by send_flow thread, read by send_resend thread and send_data thread
 
     static bool comparePacket(const void *pkData, const void *key)
     {
@@ -138,52 +251,150 @@ public:
     }
 
     std::atomic<unsigned> packetsQueued = { 0 };
+    std::atomic<sequence_t> nextSendSequence = {0};
+    std::atomic<sequence_t> activeFlowSequence = {0};
+    CriticalSection activeCrit;
 
     void sendDone(unsigned packets)
     {
-        bool dataRemaining = packetsQueued.load(std::memory_order_relaxed);
+        bool dataRemaining;
+        if (resendList)
+            dataRemaining = (packetsQueued.load(std::memory_order_relaxed) && resendList->canRecord(nextSendSequence)) || resendList->numActive();
+        else
+            dataRemaining = packetsQueued.load(std::memory_order_relaxed);
         // If dataRemaining says 0, but someone adds a row in this window, the request_to_send will be sent BEFORE the send_completed
         // So long as receiver handles that, are we good?
-        if (dataRemaining)
+        CriticalBlock b(activeCrit);
+        UdpRequestToSendMsg msg;
+        msg.packets = packets;                      // Note this is how many we sent
+        msg.sendSeq = nextSendSequence;
+        msg.sourceNode = sourceIP;
+        if (dataRemaining && requestExpiryTime)  // requestExpiryTime will be non-zero UNLESS someone called abort() just before I got here
         {
+            msg.flowSeq = activeFlowSequence++;
+            msg.cmd = flowType::request_to_send_more;
             requestExpiryTime = msTick() + udpRequestToSendAckTimeout;
-            sendRequest(flowType::request_to_send_more, packets);
         }
         else
         {
+            msg.flowSeq = activeFlowSequence;
+            msg.cmd = flowType::send_completed;
             requestExpiryTime = 0;
-            sendRequest(flowType::send_completed, packets);
         }
+        sendRequest(msg);
         timeouts = 0;
     }
 
-    void requestToSend()
+    void requestToSendNew()
     {
-        requestExpiryTime = msTick() + udpRequestToSendAckTimeout;
-        sendRequest(flowType::request_to_send, 0);
+        CriticalBlock b(activeCrit);
+        // This is called from data thread when new data added to a previously-empty list
+        if (!requestExpiryTime)
+        {
+            // If there's already an active request - no need to create a new one
+            UdpRequestToSendMsg msg;
+            msg.cmd = flowType::request_to_send;
+            msg.packets = 0;
+            msg.sendSeq = nextSendSequence;
+            msg.flowSeq = ++activeFlowSequence;
+            msg.sourceNode = sourceIP;
+            requestExpiryTime = msTick() + udpRequestToSendAckTimeout;
+            sendRequest(msg);
+        }
+    }
+
+    void resendRequestToSend()
+    {
+        // This is called from timeout thread when a previously-send request has had no response
+        timeouts++;
+        if (udpTraceLevel || udpTraceFlow || udpTraceTimeouts)
+        {
+            StringBuffer s;
+            EXCLOG(MCoperatorError,"ERROR: UdpSender: timed out %i times (max=%i) waiting ok_to_send msg from node=%s",
+                   timeouts, udpMaxRetryTimedoutReqs, ip.getIpText(s).str());
+        }
+        // 0 (zero) value of udpMaxRetryTimedoutReqs means NO limit on retries
+        CriticalBlock b(activeCrit);
+        if (udpMaxRetryTimedoutReqs && (timeouts >= udpMaxRetryTimedoutReqs))
+        {
+            abort();
+            return;
+        }
+        if (requestExpiryTime)
+        {
+            UdpRequestToSendMsg msg;
+            msg.cmd = flowType::request_to_send;
+            msg.packets = 0;
+            msg.sendSeq = nextSendSequence;
+            msg.flowSeq = activeFlowSequence;
+            msg.sourceNode = sourceIP;
+            requestExpiryTime = msTick() + udpRequestToSendAckTimeout;
+            sendRequest(msg);
+        }
     }
 
     void requestAcknowledged()
     {
+        CriticalBlock b(activeCrit);
         if (requestExpiryTime)
             requestExpiryTime = msTick() + udpRequestToSendTimeout;
     }
 
-    // MORE - consider where/if we need critsecs in here!
-
     unsigned sendData(const UdpPermitToSendMsg &permit, TokenBucket *bucket)
     {
-        requestExpiryTime = 0;
+#ifdef _DEBUG
+        // Consistency check
+        if (permit.destNode.getIpAddress().ipcompare(ip) != 0)
+        {
+            StringBuffer p, s;
+            DBGLOG("UdpFlow: permit ip %s does not match receiver table ip %s", permit.destNode.getTraceText(p).str(), ip.getIpText(s).str());
+            printStackReport();
+        }
+#endif
+        if (permit.flowSeq != activeFlowSequence)
+        {
+            if (udpTraceLevel>1 || udpTraceFlow)
+            {
+                StringBuffer s;
+                DBGLOG("UdpFlow: ignoring out-of-date permit_to_send seq %" SEQF "u (expected %" SEQF "u) to node %s", permit.flowSeq, activeFlowSequence+0, permit.destNode.getTraceText(s).str());
+            }
+            return 0;
+        }
         unsigned maxPackets = permit.max_data;
         std::vector<DataBuffer *> toSend;
         unsigned totalSent = 0;
-        while (toSend.size() < maxPackets && packetsQueued.load(std::memory_order_relaxed))
+        unsigned resending = 0;
+        if (resendList)
+        {
+            resendList->noteRead(permit.seen, toSend, maxPackets, nextSendSequence.load(std::memory_order_relaxed));
+            resending = toSend.size();
+            maxPackets -= resending;
+            // Don't send any packet that would end up overwriting an active packet in our resend list
+            if (resendList->numActive())
+            {
+                unsigned inflight = nextSendSequence - resendList->firstTracked();
+                assert(inflight <= TRACKER_BITS);
+                if (maxPackets > TRACKER_BITS-inflight)
+                {
+                    maxPackets = TRACKER_BITS-inflight;
+                    if (udpTraceLevel>2 || maxPackets == 0)
+                        DBGLOG("Can't send more than %d new packets or we will overwrite unreceived packets (%u in flight, %u active %u resending now)", maxPackets, inflight, resendList->numActive(), resending);
+                    // Note that this may mean we can't send any packets, despite having asked for permission to do so
+                    // We will keep on asking.
+                }
+            }
+        }
+        if (udpTraceLevel>2)
+            DBGLOG("Resending %u packets", (unsigned) toSend.size());
+        while (maxPackets && packetsQueued.load(std::memory_order_relaxed))
         {
             DataBuffer *buffer = popQueuedData();
             if (!buffer)
                 break;  // Suggests data was aborted before we got to pop it
             UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
+            header->sendSeq = nextSendSequence++;
             toSend.push_back(buffer);
+            maxPackets--;
             totalSent += header->length;
 #if defined(__linux__) || defined(__APPLE__)
             if (isLocal && (totalSent> 100000))  // Avoids sending too fast to local node, for reasons lost in the mists of time
@@ -202,6 +413,11 @@ public:
             }
             try
             {
+#ifdef TEST_DROPPED_PACKETS
+                if (((header->pktSeq & UDP_PACKET_RESENT)==0) && (header->pktSeq==0 || header->pktSeq==10 || ((header->pktSeq&UDP_PACKET_COMPLETE) != 0)))
+                    DBGLOG("Deliberately dropping packet %" SEQF "u", header->sendSeq);
+                else
+#endif
                 if (encrypted)
                 {
                     aesEncrypt(key, sizeof(key), buffer->data, length, encryptBuffer.clear());
@@ -209,6 +425,7 @@ public:
                 }
                 else
                     data_socket->write(buffer->data, length);
+                dataPacketsSent++;
             }
             catch(IException *e)
             {
@@ -220,7 +437,15 @@ public:
             {
                 DBGLOG("UdpSender: write exception - unknown exception");
             }
-            ::Release(buffer);
+            if (resendList)
+            {
+                if (resending)
+                    resending--;   //Don't add the ones I am resending back onto list - they are still there!
+                else
+                    resendList->append(buffer);
+            }
+            else
+                ::Release(buffer);
         }
         sendDone(toSend.size());
         return totalSent;
@@ -244,6 +469,7 @@ public:
     bool removeData(void *key, PKT_CMP_FUN pkCmpFn) 
     {
         // Used after receiving an abort, to avoid sending data that is no longer required
+        // Note that we don't attempt to remove packets that have already been sent from the resend list
         unsigned removed = 0;
         if (packetsQueued.load(std::memory_order_relaxed))
         {
@@ -272,9 +498,10 @@ public:
 
     inline void pushData(unsigned queue, DataBuffer *buffer)
     {
+        output_queue[queue].free_slots();     // block until at least one free space
         output_queue[queue].pushOwn(buffer);
         if (!packetsQueued++)
-            requestToSend();
+            requestToSendNew();
     }
 
     DataBuffer *popQueuedData() 
@@ -369,6 +596,8 @@ public:
                 DBGLOG("UdpSender: added entry for ip=%s to receivers table - send_flow_port=%d", ip.getIpText(ipStr).str(), _sendFlowPort);
             }
         }
+        if (udpResendEnabled)
+            resendList = new UdpResendList;
     }
 
     ~UdpReceiverEntry()
@@ -377,6 +606,7 @@ public:
         if (data_socket) data_socket->Release();
         if (output_queue) delete [] output_queue;
         if (maxPktsPerQ) delete [] maxPktsPerQ;
+        delete resendList;
     }
 
 };
@@ -436,25 +666,50 @@ class CSendManager : implements ISendManager, public CInterface
                 timeout = udpRequestToSendTimeout;
                 for (auto&& dest: parent.receiversTable)
                 {
+#ifdef _DEBUG
+                    // Consistency check
+                    UdpReceiverEntry &receiverInfo = parent.receiversTable[dest.ip];
+                    if (&receiverInfo != &dest)
+                    {
+                        StringBuffer s;
+                        DBGLOG("UdpSender: table entry %s does not find itself", dest.ip.getIpText(s).str());
+                        printStackReport();
+
+                    }
+#endif
                     unsigned expireTime = dest.requestExpiryTime;
                     if (expireTime)
                     {
-                        if (expireTime <= now)
-                        {
-                            dest.timeouts++;
-                            {
-                                StringBuffer s;
-                                EXCLOG(MCoperatorError,"ERROR: UdpSender: timed out %i times (max=%i) waiting ok_to_send msg from node=%s",
-                                        dest.timeouts, udpMaxRetryTimedoutReqs, dest.ip.getIpText(s).str());
-                            }
-                            // 0 (zero) value of udpMaxRetryTimedoutReqs means NO limit on retries
-                            if (udpMaxRetryTimedoutReqs && (dest.timeouts >= udpMaxRetryTimedoutReqs))
-                                dest.abort();
-                            else
-                                dest.requestToSend();
-                        }
-                        else if (expireTime-now < timeout)
-                            timeout = expireTime-now;
+                        int timeToGo = expireTime-now;
+                        if (timeToGo <= 0)
+                            dest.resendRequestToSend();
+                        else if ((unsigned) timeToGo < timeout)
+                            timeout = timeToGo;
+                    }
+                }
+                if (udpStatsReportInterval && (now-lastResentReport > udpStatsReportInterval))
+                {
+                    // MORE - some of these should really be tracked per destination
+                    lastResentReport = now;
+                    if (packetsResent > lastPacketsResent)
+                    {
+                        DBGLOG("%u more packets resent by this agent (%u total)", packetsResent-lastPacketsResent, packetsResent-0);
+                        lastPacketsResent = packetsResent;
+                    }
+                    if (flowRequestsSent > lastFlowRequestsSent)
+                    {
+                        DBGLOG("%u more flow request packets sent by this agent (%u total)", flowRequestsSent - lastFlowRequestsSent, flowRequestsSent-0);
+                        lastFlowRequestsSent = flowRequestsSent;
+                    }
+                    if (flowPermitsReceived > lastFlowPermitsReceived)
+                    {
+                        DBGLOG("%u more flow control packets recived by this agent (%u total)", flowPermitsReceived - lastFlowPermitsReceived, flowPermitsReceived-0);
+                        lastFlowPermitsReceived = flowPermitsReceived;
+                    }
+                    if (dataPacketsSent > lastDataPacketsSent)
+                    {
+                        DBGLOG("%u more data packets sent by this agent (%u total)", dataPacketsSent - lastDataPacketsSent, dataPacketsSent-0);
+                        lastDataPacketsSent = dataPacketsSent;
                     }
                 }
             }
@@ -514,29 +769,31 @@ class CSendManager : implements ISendManager, public CInterface
             while(running) 
             {
                 UdpPermitToSendMsg f = { flowType::ok_to_send, 0, { } };
+                unsigned readsize = udpResendEnabled ? sizeof(UdpPermitToSendMsg) : offsetof(UdpPermitToSendMsg, seen);
                 while (running) 
                 {
                     try 
                     {
-                        unsigned int res ;
-                        flow_socket->read(&f, sizeof(f), sizeof(f), res, 5);
-                        assert(res==sizeof(f));
+                        unsigned int res;
+                        flow_socket->read(&f, readsize, readsize, res, 5);
+                        flowPermitsReceived++;
+                        assert(res==readsize);
                         switch (f.cmd)
                         {
                         case flowType::ok_to_send:
-                            if (udpTraceLevel > 1) 
+                            if (udpTraceLevel > 2 || udpTraceFlow)
                             {
                                 StringBuffer s;
-                                DBGLOG("UdpSender: received ok_to_send msg max %d packets from node=%s", f.max_data, f.destNode.getTraceText(s).str());
+                                DBGLOG("UdpSender: received ok_to_send msg max %d packets from node=%s seq %" SEQF "u", f.max_data, f.destNode.getTraceText(s).str(), f.flowSeq);
                             }
                             parent.data->ok_to_send(f);
                             break;
 
                         case flowType::request_received:
-                            if (udpTraceLevel > 1)
+                            if (udpTraceLevel > 2 || udpTraceFlow)
                             {
                                 StringBuffer s;
-                                DBGLOG("UdpSender: received request_received msg from node=%s", f.destNode.getTraceText(s).str());
+                                DBGLOG("UdpSender: received request_received msg from node=%s seq %" SEQF "u", f.destNode.getTraceText(s).str(), f.flowSeq);
                             }
                             parent.receiversTable[f.destNode].requestAcknowledged();
                             break;
@@ -582,14 +839,14 @@ class CSendManager : implements ISendManager, public CInterface
                 if (!sniffer_socket) 
                 {
                     sniffer_socket = ISocket::multicast_connect(ep, multicastTTL);
-                    if (udpTraceLevel > 1)
+                    if (udpTraceLevel > 2)
                     {
                         StringBuffer url;
                         DBGLOG("UdpSender: multicast_connect ok to %s", ep.getUrlStr(url).str());
                     }
                 }
                 sniffer_socket->write(&msg, sizeof(msg));
-                if (udpTraceLevel > 1)
+                if (udpTraceLevel > 2)
                     DBGLOG("UdpSender: sent busy=%d multicast msg", busy);
             }
             catch(IException *e) 
@@ -664,10 +921,10 @@ class CSendManager : implements ISendManager, public CInterface
                 if (udpSnifferEnabled)
                     send_sniff(sniffType::idle);
                 
-                if (udpTraceLevel > 1) 
+                if (udpTraceLevel > 2)
                 {
                     StringBuffer s;
-                    DBGLOG("UdpSender: sent %u bytes to node=%s", payload, permit.destNode.getTraceText(s).str());
+                    DBGLOG("UdpSender: sent %u bytes to node=%s under permit %" SEQF "u", payload, permit.destNode.getTraceText(s).str(), permit.flowSeq);
                 }
             }
             if (udpTraceLevel > 0)
@@ -682,12 +939,12 @@ class CSendManager : implements ISendManager, public CInterface
 
     unsigned numQueues;
 
+    IpAddress myIP;
     IpMapOf<UdpReceiverEntry> receiversTable;
     send_resend_flow  *resend_flow;
     send_receive_flow *receive_flow;
     send_data         *data;
     Linked<TokenBucket> bucket;
-    IpAddress myIP;
     
     std::atomic<unsigned> msgSeq{0};
 
@@ -707,7 +964,7 @@ public:
     CSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, const IpAddress &_myIP, TokenBucket *_bucket, bool encrypted)
         : bucket(_bucket),
           myIP(_myIP),
-          receiversTable([_numQueues, q_size, server_flow_port, data_port, encrypted](const ServerIdentifier &ip) { return new UdpReceiverEntry(ip.getIpAddress(), _numQueues, q_size, server_flow_port, data_port, encrypted);})
+          receiversTable([_numQueues, q_size, server_flow_port, data_port, encrypted](const ServerIdentifier ip) { return new UdpReceiverEntry(ip.getIpAddress(), _numQueues, q_size, server_flow_port, data_port, encrypted);})
     {
 #ifndef _WIN32
         setpriority(PRIO_PROCESS, 0, -3);
@@ -732,6 +989,7 @@ public:
     {
         // NOTE: takes ownership of the DataBuffer
         assert(queue < numQueues);
+        assert(buffer);
         static_cast<UdpReceiverEntry &>(receiver).pushData(queue, buffer);
     }
 

+ 0 - 7
roxie/udplib/uttest.cpp

@@ -709,13 +709,6 @@ int main(int argc, char * argv[] )
                     usage();
                 udpTraceLevel = atoi(argv[c]);
             }
-            else if (strcmp(ip, "--udpTraceCategories")==0)
-            {
-                c++;
-                if (c==argc)
-                    usage();
-                udpTraceCategories = atoi(argv[c]);
-            }
             else if (strcmp(ip, "--dontSendToSelf")==0)
             {
                 dontSendToSelf = true;

+ 13 - 1
tools/testsocket/testsocket.cpp

@@ -50,6 +50,7 @@ bool rawSend = false;
 bool remoteStreamQuery = false;
 bool remoteStreamForceResend = false;
 bool remoteStreamSendCursor = false;
+bool autoXML = true;
 int verboseDbgLevel = 0;
 
 StringBuffer sendFileName;
@@ -1049,6 +1050,11 @@ int main(int argc, char **argv)
             remoteStreamSendCursor = true;
             ++arg;
         }
+        else if (strieq(argv[arg], "-noxml"))
+        {
+            autoXML = false;
+            ++arg;
+        }
         else
         {
             printf("Unknown argument %s, ignored\n", argv[arg]);
@@ -1175,7 +1181,13 @@ int main(int argc, char **argv)
             }
             else
             {
-                ret = sendQuery(ip, socketPort, query);
+                if (*query == '<' || !autoXML)
+                    ret = sendQuery(ip, socketPort, query);
+                else
+                {
+                    VStringBuffer xquery("<%s/>", query);
+                    ret = sendQuery(ip, socketPort, xquery);
+                }
 
                 if (sendToSocket)
                     finishedReading.wait();