Browse Source

Merge branch 'candidate-6.4.2'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 years ago
parent
commit
adee39bf32

+ 1 - 1
roxie/ccd/ccdqueue.cpp

@@ -1816,7 +1816,7 @@ public:
         unsigned clientFlowPort = topology->getPropInt("@clientFlowPort", CCD_CLIENT_FLOW_PORT);
         unsigned snifferPort = topology->getPropInt("@snifferPort", CCD_SNIFFER_PORT);
         receiveManager.setown(createReceiveManager(serverFlowPort, dataPort, clientFlowPort, snifferPort, snifferIp, udpQueueSize, udpMaxSlotsPerClient, myNodeIndex));
-        sendManager.setown(createSendManager(serverFlowPort, dataPort, clientFlowPort, snifferPort, snifferIp, udpSendQueueSize, fastLaneQueue ? 3 : 2, udpResendEnabled ? udpMaxSlotsPerClient : 0, bucket, myNodeIndex));
+        sendManager.setown(createSendManager(serverFlowPort, dataPort, clientFlowPort, snifferPort, snifferIp, udpSendQueueSize, fastLaneQueue ? 3 : 2, bucket, myNodeIndex));
         running = false;
     }
 

+ 0 - 2
roxie/ccd/ccdsnmp.cpp

@@ -469,8 +469,6 @@ CRoxieMetricsManager::CRoxieMetricsManager()
     addMetric(preloadCacheHits, 0);
     addMetric(preloadCacheAdds, 0);
     addMetric(unwantedDiscarded, 1000);
-    addMetric(packetsRetried, 1000);
-    addMetric(packetsAbandoned, 1000);
 
     addMetric(getHeapAllocated, 0);
     addMetric(getHeapPercentAllocated, 0);

+ 1 - 3
roxie/udplib/udplib.hpp

@@ -103,7 +103,7 @@ interface ISendManager : extends IInterface
 };
 
 extern UDPLIB_API IReceiveManager *createReceiveManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size, unsigned maxSlotsPerSender, unsigned myNodeIndex);
-extern UDPLIB_API ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, unsigned maxRetryData, TokenBucket *rateLimiter, unsigned myNodeIndex);
+extern UDPLIB_API ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter, unsigned myNodeIndex);
 extern UDPLIB_API IMessagePacker *createMessagePacker(ruid_t ruid, unsigned msgId, const void *messageHeader, unsigned headerSize, ISendManager &_parent, unsigned _destNode, unsigned _sourceNode, unsigned _msgSeq, unsigned _queue);
 
 extern UDPLIB_API const IpAddress &getNodeAddress(unsigned index);
@@ -111,8 +111,6 @@ extern UDPLIB_API unsigned addRoxieNode(const char *ipString);
 extern UDPLIB_API unsigned getNumNodes();
 
 extern UDPLIB_API atomic_t unwantedDiscarded;
-extern UDPLIB_API atomic_t packetsRetried;
-extern UDPLIB_API atomic_t packetsAbandoned;
 
 extern UDPLIB_API unsigned udpTraceLevel;
 extern UDPLIB_API unsigned udpTraceCategories;

+ 11 - 23
roxie/udplib/udpmsgpk.cpp

@@ -41,11 +41,7 @@ using roxiemem::IRowManager;
  #define new new(_NORMAL_BLOCK, __FILE__, __LINE__)
 #endif
 
-bool streamingSupported = false;
-
 atomic_t unwantedDiscarded;
-atomic_t packetsRetried;
-atomic_t packetsAbandoned;
 
 // PackageSequencer ====================================================================================
 //
@@ -73,7 +69,6 @@ class PackageSequencer : public CInterface, implements IInterface
 
     MemoryBuffer metadata;
     InterruptableSemaphore dataAvailable; // MORE - need to work out when to interrupt it!
-    SpinLock streamLock; // Needed if streaming is supported since data can be read as blocks are being written. MORE - not 100% sure it is needed as semaphore probably protects adequately.
 
 public:
     IMPLEMENT_IINTERFACE;
@@ -104,15 +99,12 @@ public:
     
     DataBuffer *next(DataBuffer *after)
     {
-        dataAvailable.wait(); // MORE - when do I interrupt? Should I time out? Will potentially block indefinately if sender restarts (leading to an abandoned packet) or stalls.
+        dataAvailable.wait(); // MORE - when do I interrupt? Should I time out? Will potentially block indefinitely if sender restarts (leading to an abandoned packet) or stalls.
         DataBuffer *ret;
-        {
-            SpinBlock b(streamLock);
-            if (after)
-                ret = after->msgNext; 
-            else
-                ret = firstPacket;
-        }
+        if (after)
+            ret = after->msgNext;
+        else
+            ret = firstPacket;
         if (checkTraceLevel(TRACE_MSGPACK, 5))
         {
             if (ret)
@@ -141,7 +133,6 @@ public:
 
         DataBuffer *finger;
         DataBuffer *prev;
-        SpinBlock b(streamLock);
         if (lastContiguousPacket)
         {
             UdpPacketHeader *oldHdr = (UdpPacketHeader*) lastContiguousPacket->data;
@@ -195,7 +186,7 @@ public:
             unsigned prevseq;
             if (lastContiguousPacket)
             {
-                prevseq = ((UdpPacketHeader*) lastContiguousPacket->data)->pktSeq & 0x3fffffff;
+                prevseq = ((UdpPacketHeader*) lastContiguousPacket->data)->pktSeq & UDP_PACKET_SEQUENCE_MASK;
                 finger = lastContiguousPacket->msgNext;
             }
             else
@@ -206,12 +197,12 @@ public:
             while (finger)
             {
                 UdpPacketHeader *fingerHdr  = (UdpPacketHeader*) finger->data;
-                unsigned pktseq = fingerHdr->pktSeq & 0x3fffffff;
+                unsigned pktseq = fingerHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK;
                 if (pktseq == prevseq+1)
                 {
                     unsigned packetDataSize = fingerHdr->length - fingerHdr->metalength - sizeof(UdpPacketHeader);
                     assert(packetDataSize < roxiemem::DATA_ALIGNMENT_SIZE);
-                    if ((fingerHdr->pktSeq & ~0x80000000) == 0)
+                    if (pktseq == 0)
                     {
                         // MORE - Is this safe - header lifetime is somewhat unpredictable without a copy of it...
                         // Client header is at the start of packet 0
@@ -228,7 +219,7 @@ public:
 
                     lastContiguousPacket = finger;
                     dataAvailable.signal();
-                    if (fingerHdr->pktSeq >= 0x80000000)
+                    if (fingerHdr->pktSeq & UDP_PACKET_COMPLETE)
                     {
                         res = true;
                         dataAvailable.signal(); // allowing us to read the NULL that signifies end of message. May prefer to use the flag to stop?
@@ -529,26 +520,23 @@ public:
         // MORE - I think we leak a PackageSequencer for messages that we only receive parts of - maybe only an issue for "catchall" case
         CriticalBlock b(mapCrit);
         PackageSequencer *pkSqncr = mapping.getValue(puid);
-        bool isNew = false;
         bool isComplete = false;
         if (!pkSqncr) 
         {
             pkSqncr = new PackageSequencer;
             mapping.setValue(puid, pkSqncr);
             pkSqncr->Release();
-            isNew = true;
         }
         isComplete = pkSqncr->insert(dataBuff);
-        if (streamingSupported ? isNew : isComplete)
+        if (isComplete)
         {
             queueCrit.enter();
             pkSqncr->Link();
             queue.push(pkSqncr);
             sem.signal();
             queueCrit.leave();
-        }
-        if (isComplete)
             mapping.remove(puid);
+        }
         return(true);
     }
 

+ 21 - 37
roxie/udplib/udpsha.hpp

@@ -28,8 +28,11 @@ extern roxiemem::IDataBufferManager *bufferManager;
 typedef bool (*PKT_CMP_FUN) (void *pkData, void *key);
 
 #define UDP_SEQUENCE_COMPLETE 0x80000000
-#define UDP_SEQUENCE_MORE 0x40000000
-#define UDP_SEQUENCE_BITS (UDP_SEQUENCE_COMPLETE|UDP_SEQUENCE_MORE)
+
+// Flag bits in pktSeq field
+#define UDP_PACKET_COMPLETE           0x80000000  // Packet completes a single slave request
+#define UDP_PACKET_RESERVED           0x40000000  // Not used - could move UDP_SEQUENCE_COMPLETE here?
+#define UDP_PACKET_SEQUENCE_MASK      0x3fffffff
 
 struct UdpPacketHeader 
 {
@@ -38,7 +41,7 @@ struct UdpPacketHeader
     unsigned       nodeIndex;   // Node this message came from
     unsigned       msgSeq;      // sequence number of messages ever sent from given node, used with ruid to tell which packets are from same message
     unsigned       pktSeq;      // sequence number of this packet within the message (top bit signifies final packet)
-    unsigned       udpSequence; // packet sequence from this slave to this server - used to detect lost packets and resend (independent of how formed into messages). Top bits used for flow control info
+    unsigned       udpSequence; // Top bits used for flow control info
     // information below is duplicated in the Roxie packet header - we could remove? However, would make aborts harder, and at least ruid is needed at receive end
     ruid_t         ruid;        // The uid allocated by the server to this slave transaction
     unsigned       msgId;       // sub-id allocated by the server to this request within the transaction
@@ -173,64 +176,45 @@ public:
 #define HANDLE_PRAGMA_PACK_PUSH_POP
 #endif
 
-#define MAX_RESEND_TABLE_SIZE 256 
-
 #pragma pack(push,1)
 struct UdpPermitToSendMsg
 {
     // New static fields must be included inside this block, so that
     // size calculations work correctly
-    struct MsgHeader {
-        unsigned short  length;
-        unsigned short  cmd;
-        unsigned short  destNodeIndex;
-        unsigned short  max_data;
-        unsigned        lastSequenceSeen;
-        unsigned        missingCount;
+    unsigned short  length;
+    unsigned short  cmd;
+    unsigned short  destNodeIndex;
+    unsigned short  max_data;
 #ifdef CRC_MESSAGES
-        unsigned        crc;
+    unsigned        crc;
 #endif
-    } hdr;
-    // WARNING: Do not add any field below this line, if it must be included in the message
-    unsigned        missingSequences[MAX_RESEND_TABLE_SIZE]; // only [missingCount] actually sent
-
-    StringBuffer &toString(StringBuffer &str) const
-    {
-        str.appendf("lastSeen=%u missingCount=%u", hdr.lastSequenceSeen, hdr.missingCount);
-        if (hdr.missingCount)
-        {
-            str.append(" missing=");
-            for (unsigned j = 0; j < hdr.missingCount; j++)
-                str.appendf(j?",%u":"%u", missingSequences[j]);
-        }
-        return str;
-    }
 
 #ifdef CRC_MESSAGES
     unsigned calcCRC()
     {
-        size_t len = sizeof(MsgHeader) - sizeof(hdr.crc);
+        size_t len = sizeof(UdpPermitToSendMsg) - sizeof(crc);
         unsigned expectedCRC = crc32((const char *) this, len, 0);
-        if (missingCount)
-            expectedCRC = crc32((const char *) &missingSequences, missingCount * sizeof(missingSequences[0]), expectedCRC); 
         return expectedCRC;
     }
 #endif
 
     UdpPermitToSendMsg()
     {
-        hdr.length = hdr.cmd = hdr.destNodeIndex = hdr.max_data = 0;
-        hdr.lastSequenceSeen = 0;
-        hdr.missingCount = 0;
+        length = cmd = destNodeIndex = max_data = 0;
 #ifdef CRC_MESSAGES
-        hdr.crc = calcCRC();
+        crc = calcCRC();
 #endif
     }
 
     UdpPermitToSendMsg(const UdpPermitToSendMsg &from)
     {
-        hdr = from.hdr;
-        memcpy(missingSequences, from.missingSequences, from.hdr.missingCount * sizeof(missingSequences[0]));
+        length = from.length;
+        cmd = from.cmd;
+        destNodeIndex = from.destNodeIndex;
+        max_data = from.max_data;
+#ifdef CRC_MESSAGES
+        crc = from.crc;
+#endif
     }
 };
 

+ 10 - 193
roxie/udplib/udptrr.cpp

@@ -60,12 +60,6 @@ class CReceiveManager : implements IReceiveManager, public CInterface
 
         class UdpSenderEntry  // one per node in the system
         {
-            SpinLock resendInfoLock;
-            unsigned lastSeen;      // Highest sequence ever seen
-            unsigned missingCount;  // Number of values in missing table
-            unsigned missingIndex;  // Pointer to start of values in missing circular buffer
-            unsigned missingTableSize;
-            unsigned *missing;
             unsigned destNodeIndex;
             unsigned myNodeIndex;
             ISocket *flowSocket;
@@ -77,11 +71,6 @@ class CReceiveManager : implements IReceiveManager, public CInterface
             {
                 nextIndex = (unsigned) -1;
                 flowSocket = NULL;
-                lastSeen = 0;
-                missingCount = 0;
-                missingIndex = 0;
-                missingTableSize = 0;
-                missing = NULL;
                 destNodeIndex = (unsigned) -1;
                 myNodeIndex = (unsigned) -1;
             }
@@ -93,134 +82,15 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                     flowSocket->close();
                     flowSocket->Release();
                 }
-                delete [] missing;
             }
 
-            void init(unsigned _destNodeIndex, unsigned _myNodeIndex, unsigned port, unsigned _missingTableSize)
+            void init(unsigned _destNodeIndex, unsigned _myNodeIndex, unsigned port)
             {
                 assert(!flowSocket);
                 destNodeIndex = _destNodeIndex;
                 myNodeIndex = _myNodeIndex;
                 SocketEndpoint ep(port, getNodeAddress(destNodeIndex));
                 flowSocket = ISocket::udp_connect(ep);
-                missingTableSize = _missingTableSize;
-                missing = new unsigned[_missingTableSize];
-            }
-
-            void noteRequest(const UdpRequestToSendMsg &msg)
-            {
-                // MORE - if we never go idle, we never get to see these in udpSendCompletedInData mode. Is that a big deal?
-                SpinBlock b(resendInfoLock);
-                if (msg.lastSequenceAvailable < lastSeen)
-                {
-                    // must have wrapped or restarted. Reinitialize my view of sender missing packets.
-                    // MORE - if a restart (rather than a wrap) should also clear out any collators still hanging on for a reply
-                    // Should I on a wrap too? 
-                    if (checkTraceLevel(TRACE_RETRY_DATA, 1))
-                        DBGLOG("Resetting missing list as sender %u seems to have restarted or wrapped", msg.sourceNodeIndex); 
-                    lastSeen = 0;
-                    missingCount = 0;
-                    missingIndex = 0;
-                }
-                else
-                {
-                    // abandon hope of receiving any that sender says they no longer have
-                    while (missingCount && missing[missingIndex] < msg.firstSequenceAvailable)
-                    {
-                        if (checkTraceLevel(TRACE_RETRY_DATA, 1))
-                            DBGLOG("Abandoning missing message %u from sender %u - sender no longer has it", missing[missingIndex], msg.sourceNodeIndex); 
-                        atomic_inc(&packetsAbandoned);
-                        missingCount--;
-                        missingIndex++;
-                        if (missingIndex == missingTableSize)
-                            missingIndex = 0;
-                    }
-                }
-            }
-
-            void noteReceived(const UdpPacketHeader &msg)
-            {
-                // MORE - what happens when we wrap?
-                // MORE - what happens when a sender restarts? Probably similar to wrap...
-                // Receiver restart is ok
-                SpinBlock b(resendInfoLock);
-                unsigned thisSequence = msg.udpSequence;
-                if (thisSequence > lastSeen)
-                {
-                    lastSeen++;
-                    if (lastSeen != thisSequence)
-                    {
-                        // Everything between lastSeen and just received needs adding to missing table.
-                        if (thisSequence - lastSeen > missingTableSize)
-                        {
-                            if (lastSeen==1)
-                            {
-                                // assume it's receiver restart and just ignore the missing values
-                                DBGLOG("Assuming receiver restart (lastseen==%u, thisSequence==%u, index==%u)", lastSeen, thisSequence, msg.nodeIndex); 
-                                lastSeen = thisSequence;
-                                return;
-                            }
-                            if (checkTraceLevel(TRACE_RETRY_DATA, 1))
-                                DBGLOG("Big gap in UDP packet sequence (%u-%u) from node %u - some ignored!", lastSeen, thisSequence-1, msg.nodeIndex); 
-                            lastSeen = thisSequence - missingTableSize; // avoids wasting CPU cycles writing more values than will fit after long interruption, receiver restart, or on corrupt packet
-                        }
-                        while (lastSeen < thisSequence)
-                        {
-                            if (checkTraceLevel(TRACE_RETRY_DATA, 3))
-                                DBGLOG("Adding packet sequence %u to missing list for node %u", lastSeen, msg.nodeIndex); 
-                            missing[(missingIndex + missingCount) % missingTableSize] = lastSeen;
-                            if (missingCount < missingTableSize)
-                                missingCount++;
-                            else
-                            {
-                                if (checkTraceLevel(TRACE_RETRY_DATA, 1))
-                                    DBGLOG("missing table overflow - packets will be lost"); 
-                            }
-                            lastSeen++;
-                        }
-                    }
-                }
-                else 
-                {
-                    if (checkTraceLevel(TRACE_RETRY_DATA, 2))
-                        DBGLOG("Out-of-sequence packet received %u", thisSequence); 
-                    // Hopefully filling in a missing value, and USUALLY in sequence so no point binchopping
-                    for (unsigned i = 0; i < missingCount; i++)
-                    {
-                        unsigned idx = (missingIndex + i) % missingTableSize;
-                        if (missing[idx] == thisSequence)
-                        {
-                            if (checkTraceLevel(TRACE_RETRY_DATA, 2))
-                                DBGLOG("Formerly missing packet sequence received %u", thisSequence); 
-                            while (i)
-                            {
-                                // copy up ones that are still missing
-                                unsigned idx2 = idx ? idx-1 : missingTableSize-1;
-                                missing[idx] = missing[idx2];
-                                idx = idx2;
-                                i--;
-                            }
-                            missingIndex++;
-                            missingCount--;
-                            break;
-                        }
-                        else if (missing[idx] < thisSequence)
-                        {
-                            if (checkTraceLevel(TRACE_RETRY_DATA, 1))
-                                DBGLOG("Unexpected packet sequence received %u - ignored", thisSequence); 
-                            break;
-                        }
-                        else // (missing[idx] > thisSequence)
-                        {
-                            if (!i)
-                            {
-                                if (checkTraceLevel(TRACE_RETRY_DATA, 1))
-                                    DBGLOG("Unrequested missing packet received %u - ignored", thisSequence); 
-                                break;
-                            }
-                        }
-                    }
-                }
             }
 
             void requestToSend(unsigned maxTransfer)
@@ -228,32 +98,15 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                 try
                 {
                     UdpPermitToSendMsg msg;
-                    {
-                        SpinBlock block(resendInfoLock);
-                        msg.hdr.length = sizeof(UdpPermitToSendMsg::MsgHeader) + missingCount*sizeof(msg.missingSequences[0]);
-                        msg.hdr.cmd = flow_t::ok_to_send;
-
-                        msg.hdr.destNodeIndex = myNodeIndex;
-                        msg.hdr.max_data = maxTransfer;
-                        msg.hdr.lastSequenceSeen = lastSeen;
-                        msg.hdr.missingCount = missingCount;
-                        for (unsigned i = 0; i < missingCount; i++)
-                        {
-                            unsigned idx = (missingIndex + i) % missingTableSize;
-                            msg.missingSequences[i] = missing[idx];
-                            if (checkTraceLevel(TRACE_RETRY_DATA, 1))
-                                DBGLOG("Requesting resend of packet %d", missing[idx]);
-                        }
+                    msg.length = sizeof(UdpPermitToSendMsg);
+                    msg.cmd = flow_t::ok_to_send;
+
+                    msg.destNodeIndex = myNodeIndex;
+                    msg.max_data = maxTransfer;
 #ifdef CRC_MESSAGES
-                        msg.hdr.crc = msg.calcCRC();
+                    msg.crc = msg.calcCRC();
 #endif
-                    }
-                    if (checkTraceLevel(TRACE_RETRY_DATA, 5))
-                    {
-                        StringBuffer s;
-                        DBGLOG("requestToSend %s", msg.toString(s).str());
-                    }
-                    flowSocket->write(&msg, msg.hdr.length);
+                    flowSocket->write(&msg, msg.length);
                 }
                 catch(IException *e) 
                 {
@@ -288,12 +141,9 @@ class CReceiveManager : implements IReceiveManager, public CInterface
             maxSenders = _maxSenders;
             maxSlotsPerSender = _maxSlotsPerSender;
             sendersTable = new UdpSenderEntry[maxSenders];
-            unsigned missingTableSize = maxSlotsPerSender;
-            if (missingTableSize > MAX_RESEND_TABLE_SIZE)
-                missingTableSize = MAX_RESEND_TABLE_SIZE;
             for (unsigned i = 0; i < maxSenders; i++)
             {
-                sendersTable[i].init(i, parent.myNodeIndex, parent.send_flow_port, missingTableSize);
+                sendersTable[i].init(i, parent.myNodeIndex, parent.send_flow_port);
             }
         }
 
@@ -397,28 +247,6 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                     firstRequest = index;
                 lastRequest = index;
             }   
-            sender.noteRequest(msg);
-            requestPending.signal();
-        }
-
-        void requestMore(unsigned index) // simpler version of request since we don't get the extra info
-        {
-            assertex(index < maxSenders);
-            UdpSenderEntry &sender = sendersTable[index];
-            {
-                SpinBlock b(receiveFlowLock);
-                if ((lastRequest == index) || (sender.nextIndex != (unsigned) -1))
-                {
-                    DBGLOG("UdpReceiver: received duplicate request_to_send msg from node=%d", index);
-                    return;
-                }
-                // Chain it onto list
-                if (firstRequest != (unsigned) -1) 
-                    sendersTable[lastRequest].nextIndex = index;
-                else 
-                    firstRequest = index;
-                lastRequest = index;
-            }   
             requestPending.signal();
         }
 
@@ -436,11 +264,6 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                 DBGLOG("UdpReceiver: completed msg from node %u is not for current transfer (%u) ", index, currentTransfer);
         }
 
-        inline void noteReceived(const UdpPacketHeader &msg)
-        {
-            sendersTable[msg.nodeIndex].noteReceived(msg);
-        }
-
         virtual void start()
         {
             running = true;
@@ -736,16 +559,11 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                     b = bufferManager->allocate();
                     receive_socket->read(b->data, 1, DATA_PAYLOAD, res, 5);
                     UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
-                    unsigned flowBits = hdr.udpSequence & UDP_SEQUENCE_BITS;
-                    hdr.udpSequence &= ~UDP_SEQUENCE_BITS;
+                    unsigned flowBits = hdr.udpSequence;
                     if (flowBits & UDP_SEQUENCE_COMPLETE)
                     {
                         parent.manager->completed(hdr.nodeIndex);
                     }
-                    if (flowBits & UDP_SEQUENCE_MORE)
-                    {
-                        parent.manager->requestMore(hdr.nodeIndex); // MORE - what about the rest of request info?
-                    }
                     if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it
                         DBGLOG("UdpReceiver: %u bytes received, node=%u", res, hdr.nodeIndex);
 
@@ -896,7 +714,6 @@ public:
     void collatePacket(DataBuffer *dataBuff)
     {
         const UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
-        manager->noteReceived(*pktHdr);
 
         if (udpTraceLevel >= 4) 
             DBGLOG("UdpReceiver: CPacketCollator - unQed packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X len=%d node=%u",

+ 22 - 175
roxie/udplib/udptrs.cpp

@@ -51,12 +51,6 @@ class UdpReceiverEntry
     queue_t *output_queue;
     bool    initialized;
 
-    // Circular buffer of packets available for resending
-    unsigned retryDataCount;
-    unsigned retryDataIdx;
-    unsigned maxRetryData;
-    DataBuffer **retryData;
-
 public:
     ISocket *send_flow_socket;
     ISocket *data_socket;
@@ -65,32 +59,11 @@ public:
     int     currentQNumPkts;   // Current Queue Number of Consecutive Processed Packets.
     int     *maxPktsPerQ;      // to minimise power function re-calc for evey packet
 
-    SpinLock lock;
-    unsigned maxUdpSequence;
-
-    unsigned nextUdpSequence()
-    {
-        SpinBlock b(lock); // MORE - is this needed?
-        unsigned ret = ++maxUdpSequence;
-        if (ret & UDP_SEQUENCE_BITS) // overflowed
-            ret = 1;
-        return ret;
-    }
-
     // MORE - consider where we need critsecs in here!
 
     void sendRequest(unsigned myNodeIndex, flow_t::flowmsg_t cmd)
     {
-        unsigned minUdpSequence;
-        SpinBlock b(lock);
-        {
-            if (retryDataCount)
-                minUdpSequence = ((UdpPacketHeader *) retryData[retryDataIdx]->data)->udpSequence;
-            else
-                minUdpSequence = maxUdpSequence;
-        }
-
-        UdpRequestToSendMsg msg = {sizeof(UdpRequestToSendMsg), static_cast<unsigned short>(cmd), static_cast<unsigned short>(myNodeIndex), 0, minUdpSequence, maxUdpSequence};
+        UdpRequestToSendMsg msg = {sizeof(UdpRequestToSendMsg), static_cast<unsigned short>(cmd), static_cast<unsigned short>(myNodeIndex), 0};
         try 
         {
             send_flow_socket->write(&msg, msg.length);
@@ -107,89 +80,12 @@ public:
         }
     }
 
-    unsigned cleanRetryData(const UdpPermitToSendMsg &permit, PointerArray &retries)
-    {
-        // Any saved packets < lastReceived that are not listed as missing can be deleted
-        SpinBlock b(lock);
-        unsigned totalData = 0;
-        if (checkTraceLevel(TRACE_RETRY_DATA, 3))
-        {
-            unsigned minUdpSequence;
-            if (retryDataCount)
-                minUdpSequence = ((UdpPacketHeader *) retryData[retryDataIdx]->data)->udpSequence;
-            else
-                minUdpSequence = maxUdpSequence;
-            StringBuffer permitStr;
-            permit.toString(permitStr);
-            DBGLOG("UdpSender: cleanRetryData (%s), total %u available between %u and %u", permitStr.str(), retryDataCount, minUdpSequence, maxUdpSequence); 
-        }
-        unsigned lastReceived = permit.hdr.lastSequenceSeen;
-        unsigned missingIndex = 0;
-        unsigned missingCount = permit.hdr.missingCount;
-        unsigned i = 0;
-        if (maxRetryData)
-        {
-            while (i < retryDataCount && retries.length() < permit.hdr.max_data)
-            {
-                unsigned idx = (retryDataIdx + i) % maxRetryData;
-                DataBuffer *buffer = retryData[idx];
-                if (buffer)
-                {
-                    UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
-                    unsigned thisSequence = header->udpSequence;
-                    if (thisSequence > lastReceived)
-                        break;
-
-                    if (!missingCount || thisSequence < permit.missingSequences[missingIndex])
-                    {
-                        ::Release(buffer);
-                        retryData[idx] = NULL;
-                        if (i)
-                            i++; // MORE - leaves holes - is this smart? Alternatively could close up... Should be rare anyway
-                        else
-                        {
-                            retryDataIdx = (retryDataIdx + 1) % maxRetryData;
-                            retryDataCount--; 
-                        }               
-                    }
-                    else if (thisSequence == permit.missingSequences[missingIndex])
-                    {
-                        totalData += header->length;
-                        retries.append(buffer);
-                        i++;
-                        missingIndex++;
-                        missingCount--;
-                    }
-                    else
-                    {
-                        missingIndex++;
-                        missingCount--;
-                    }
-                }
-                else
-                {
-                    if (i)
-                        i++;
-                    else
-                    {
-                        // Removing leading nulls
-                        retryDataCount--;
-                        retryDataIdx = (retryDataIdx + 1) % maxRetryData;
-                    }
-                }
-            }
-        }
-        if (checkTraceLevel(TRACE_RETRY_DATA, 3))
-            DBGLOG("UdpSender: cleanRetryData found %u to resend total size %u, total %u still available", retries.length(), totalData, retryDataCount);
-        return totalData;
-    }
-
     unsigned sendData(const UdpPermitToSendMsg &permit, bool isLocal, TokenBucket *bucket, bool &moreRequested, unsigned &maxPackets)
     {
         moreRequested = false;
-        maxPackets = permit.hdr.max_data;
+        maxPackets = permit.max_data;
         PointerArray toSend;
-        unsigned totalSent = cleanRetryData(permit, toSend);
+        unsigned totalSent = 0;
         while (toSend.length() < maxPackets && dataQueued())
         {
             DataBuffer *buffer = popQueuedData();
@@ -209,15 +105,6 @@ public:
         {
             DataBuffer *buffer = (DataBuffer *) toSend.item(idx);
             UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
-            bool isRetry = (header->udpSequence != 0);
-            if (isRetry)
-            {
-                if (checkTraceLevel(TRACE_RETRY_DATA, 1))
-                    DBGLOG("UdpSender: Resending packet to destination node %u sequence %u", permit.hdr.destNodeIndex, header->udpSequence);
-                atomic_inc(&packetsRetried);
-            }
-            else
-                header->udpSequence = nextUdpSequence();
             unsigned length = header->length;
             if (bucket)
             {
@@ -231,20 +118,10 @@ public:
                     if (idx == maxPackets-1)
                     {
                          // MORE - is this safe ? Any other thread looking at the data right now? Don't _think_ so...
-                        if (false && dataQueued()) // Causes some problems because no flow control info gets through at all
-                        {
-                            moreRequested = true;
-                            header->udpSequence |= (UDP_SEQUENCE_COMPLETE|UDP_SEQUENCE_MORE);
-                        }
-                        else
-                            header->udpSequence |= UDP_SEQUENCE_COMPLETE;
+                        header->udpSequence = UDP_SEQUENCE_COMPLETE;
                     }
                 }
-#ifdef _SIMULATE_LOST_PACKETS
-                if (isRetry || (header->udpSequence % 100) != 0)
-#endif
                 data_socket->write(buffer->data, length);
-                header->udpSequence &= ~UDP_SEQUENCE_BITS;
             }
             catch(IException *e)
             {
@@ -256,23 +133,7 @@ public:
             {
                 DBGLOG("UdpSender: write exception - unknown exception");
             }
-            if (!isRetry && maxRetryData)
-            {
-                unsigned slot = (retryDataIdx + retryDataCount) % maxRetryData;
-                if (retryDataCount < maxRetryData)
-                    retryDataCount++;
-                else
-                {
-                    if (udpTraceLevel > 0)
-                        DBGLOG("Overflow in resend packet buffer for destination node %u - discarding packet sequence %u", permit.hdr.destNodeIndex, header->udpSequence);
-                    ::Release(retryData[slot]);
-                }
-                retryData[slot] = buffer;
-            }
-            else
-            {
-                ::Release(buffer);
-            }
+            ::Release(buffer);
         }
         return totalSent;
     }
@@ -366,17 +227,11 @@ public:
         output_queue = 0;
         currentQNumPkts = 0;
         maxPktsPerQ = 0;
-        maxUdpSequence = 0;
-        retryDataCount = 0;
-        retryDataIdx = 0;
-        retryData = NULL;
-        maxRetryData = 0;
     }
 
-    void init(unsigned destNodeIndex, unsigned _numQueues, unsigned queueSize, unsigned _maxRetryData, unsigned sendFlowPort, unsigned dataPort, bool isLocal)
+    void init(unsigned destNodeIndex, unsigned _numQueues, unsigned queueSize, unsigned sendFlowPort, unsigned dataPort, bool isLocal)
     {
         assert(!initialized);
-        maxRetryData = _maxRetryData;
         numQueues = _numQueues;
         const IpAddress &ip = getNodeAddress(destNodeIndex);
         if (!ip.isNull())
@@ -413,11 +268,6 @@ public:
                 output_queue[j].set_queue_size(queueSize);
                 maxPktsPerQ[j] = (int) pow((double)udpOutQsPriority, (double)numQueues - j - 1);
             }
-            if (maxRetryData)
-            {
-                retryData = new DataBuffer *[maxRetryData];
-                memset(retryData, 0, maxRetryData * sizeof(DataBuffer *));
-            }
             initialized = true;
             if (udpTraceLevel > 0)
             {
@@ -429,7 +279,6 @@ public:
 
     ~UdpReceiverEntry()
     {
-        if (retryData) delete[] retryData; // MORE - should I release the values pointed to as well - not a big deal as quitting anyway...
         if (send_flow_socket) send_flow_socket->Release();
         if (data_socket) data_socket->Release();
         if (output_queue) delete [] output_queue;
@@ -728,20 +577,20 @@ class CSendManager : implements ISendManager, public CInterface
                     {
                         unsigned int res ;
                         flow_socket->read(&f, 1, sizeof(f), res, 5);
-                        assertex(res == f.hdr.length);
+                        assertex(res == f.length);
 #ifdef CRC_MESSAGES
                         assertex(f.hdr.crc == f.calcCRC());
 #endif
-                        switch (f.hdr.cmd)
+                        switch (f.cmd)
                         {
                         case flow_t::ok_to_send:
                             if (udpTraceLevel > 1) 
-                                DBGLOG("UdpSender: received ok_to_send msg max %d packets from node=%u (length %u)", f.hdr.max_data, f.hdr.destNodeIndex, res);
+                                DBGLOG("UdpSender: received ok_to_send msg max %d packets from node=%u (length %u)", f.max_data, f.destNodeIndex, res);
                             parent.data->ok_to_send(f);
                             break;
 
                         default: 
-                            DBGLOG("UdpSender: received unknown flow message type=%d", f.hdr.cmd);
+                            DBGLOG("UdpSender: received unknown flow message type=%d", f.cmd);
                         }
                     }
                     catch (IException *e) 
@@ -837,7 +686,7 @@ class CSendManager : implements ISendManager, public CInterface
                 return true;
             else 
             {
-                DBGLOG("UdpSender: push() failed - ignored ok_to_send msg - index=%u, maxData=%u", msg.hdr.destNodeIndex, msg.hdr.max_data);
+                DBGLOG("UdpSender: push() failed - ignored ok_to_send msg - index=%u, maxData=%u", msg.destNodeIndex, msg.max_data);
                 return false;
             }
         }
@@ -858,19 +707,19 @@ class CSendManager : implements ISendManager, public CInterface
 
                 if (udpSnifferEnabled)
                     send_sniff(true);
-                parent.send_flow->clear_to_send_received(permit.hdr.destNodeIndex);
-                UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.hdr.destNodeIndex];
+                parent.send_flow->clear_to_send_received(permit.destNodeIndex);
+                UdpReceiverEntry &receiverInfo = parent.receiversTable[permit.destNodeIndex];
                 bool moreRequested;
                 unsigned maxPackets;
-                unsigned payload = receiverInfo.sendData(permit, (parent.myNodeIndex == permit.hdr.destNodeIndex), bucket, moreRequested, maxPackets);
+                unsigned payload = receiverInfo.sendData(permit, (parent.myNodeIndex == permit.destNodeIndex), bucket, moreRequested, maxPackets);
                 if (udpSendCompletedInData && !maxPackets)
-                    parent.sendRequest(permit.hdr.destNodeIndex, flow_t::send_completed);
-                parent.send_flow->send_done(permit.hdr.destNodeIndex, moreRequested);
+                    parent.sendRequest(permit.destNodeIndex, flow_t::send_completed);
+                parent.send_flow->send_done(permit.destNodeIndex, moreRequested);
                 if (udpSnifferEnabled)
                     send_sniff(false);
                 
                 if (udpTraceLevel > 1) 
-                    DBGLOG("UdpSender: sent %u bytes to node=%d", payload, permit.hdr.destNodeIndex);
+                    DBGLOG("UdpSender: sent %u bytes to node=%d", payload, permit.destNodeIndex);
                 
             }
             if (udpTraceLevel > 0)
@@ -918,7 +767,7 @@ class CSendManager : implements ISendManager, public CInterface
 public:
     IMPLEMENT_IINTERFACE;
 
-    CSendManager(int server_flow_port, int d_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, unsigned maxRetryData, unsigned _myNodeIndex, TokenBucket *_bucket)
+    CSendManager(int server_flow_port, int d_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int q_size, int _numQueues, unsigned _myNodeIndex, TokenBucket *_bucket)
         : bucket(_bucket)
     {
 #ifndef _WIN32
@@ -931,10 +780,8 @@ public:
         myNodeIndex = _myNodeIndex;
         numQueues = _numQueues;
         receiversTable = new UdpReceiverEntry[numNodes];
-        if (maxRetryData > MAX_RESEND_TABLE_SIZE)
-            maxRetryData = MAX_RESEND_TABLE_SIZE;
         for (unsigned i = 0; i < numNodes; i++)
-            receiversTable[i].init(i, numQueues, q_size, maxRetryData, send_flow_port, data_port, i==myNodeIndex);
+            receiversTable[i].init(i, numQueues, q_size, send_flow_port, data_port, i==myNodeIndex);
 
         data = new send_data(*this, sniffer_port, sniffer_multicast_ip, bucket);
         send_flow = new send_send_flow(*this, numNodes);
@@ -1012,9 +859,9 @@ public:
 
 };
 
-ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, unsigned maxRetryData, TokenBucket *rateLimiter, unsigned myNodeIndex)
+ISendManager *createSendManager(int server_flow_port, int data_port, int client_flow_port, int sniffer_port, const IpAddress &sniffer_multicast_ip, int queue_size_pr_server, int queues_pr_server, TokenBucket *rateLimiter, unsigned myNodeIndex)
 {
-    return new CSendManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, queue_size_pr_server, queues_pr_server, maxRetryData, myNodeIndex, rateLimiter);
+    return new CSendManager(server_flow_port, data_port, client_flow_port, sniffer_port, sniffer_multicast_ip, queue_size_pr_server, queues_pr_server, myNodeIndex, rateLimiter);
 }
 
 class CMessagePacker : implements IMessagePacker, public CInterface
@@ -1179,7 +1026,7 @@ public:
                 part_buffer = bufferManager->allocate();
             }
             memcpy(&part_buffer->data[sizeof(UdpPacketHeader)+data_used], metaData, metaLength);
-            package_header.pktSeq |= 0x80000000;
+            package_header.pktSeq |= UDP_PACKET_COMPLETE;
             put_package(part_buffer, data_used, metaLength);
         }
         else if (part_buffer)

+ 11 - 6
roxie/udplib/uttest.cpp

@@ -49,6 +49,8 @@ void usage()
         "--udpLocalWriteSocketSize nn\n"
         "--udpRetryBusySenders nn\n"
         "--maxPacketsPerSender nn\n"
+        "--udpQueueSize nn\n"
+        "--udpRTSTimeout nn\n"
         "--udpSnifferEnabled 0|1\n"     
         "--udpTraceCategories nn\n"
         "--udpTraceLevel nn\n"
@@ -257,11 +259,6 @@ public:
                     received[i] = 0;
                 }
                 receivedTotal = 0;
-                unsigned retried = atomic_read(&packetsRetried);
-                atomic_set(&packetsRetried, 0);
-                unsigned abandoned = atomic_read(&packetsAbandoned);
-                atomic_set(&packetsAbandoned, 0);
-                DBGLOG("%u packets resent %u packets abandoned", retried, abandoned);
             }
         }
         {
@@ -280,7 +277,7 @@ void testNxN()
 {
     if (maxPacketsPerSender > udpQueueSize)
         maxPacketsPerSender = udpQueueSize;
-    Owned <ISendManager> sendMgr = createSendManager(7000, 7001, 7002, 7003, multicastIP, 100, udpNumQs, maxPacketsPerSender, NULL, myIndex);
+    Owned <ISendManager> sendMgr = createSendManager(7000, 7001, 7002, 7003, multicastIP, 100, udpNumQs, NULL, myIndex);
     Receiver receiver;
 
     IMessagePacker **packers = new IMessagePacker *[numNodes];
@@ -621,6 +618,7 @@ int main(int argc, char * argv[] )
     DBGLOG("%s",cmdline.str());
 //  queryLogMsgManager()->enterQueueingMode();
 //  queryLogMsgManager()->setQueueDroppingLimit(512, 32);
+    udpRequestToSendTimeout = 5000;
     for (c = 1; c < argc; c++)
     {
         const char *ip = argv[c];
@@ -634,6 +632,13 @@ int main(int argc, char * argv[] )
                     usage();
                 udpQueueSize = atoi(argv[c]);
             }
+            if (strcmp(ip, "--udpRTSTimeout")==0)
+            {
+                c++;
+                if (c==argc || !isdigit(*argv[c]))
+                    usage();
+                udpRequestToSendTimeout = atoi(argv[c]);
+            }
             else if (strcmp(ip, "--jumboFrames")==0)
             {
                 roxiemem::setDataAlignmentSize(0x2000);

+ 6 - 4
system/jlib/jstats.cpp

@@ -1917,9 +1917,10 @@ void CRuntimeStatisticCollection::merge(const CRuntimeStatisticCollection & othe
                 mergeStatistic(kind, value);
         }
     }
-    if (other.nested)
+    CNestedRuntimeStatisticMap *otherNested = other.queryNested();
+    if (otherNested)
     {
-        ensureNested().merge(*other.nested);
+        ensureNested().merge(*otherNested);
     }
 }
 
@@ -1944,9 +1945,10 @@ void CRuntimeStatisticCollection::updateDelta(CRuntimeStatisticCollection & targ
                 target.mergeStatistic(kind, sourceValue);
         }
     }
-    if (source.nested)
+    CNestedRuntimeStatisticMap *sourceNested = source.queryNested();
+    if (sourceNested)
     {
-        ensureNested().updateDelta(target.ensureNested(), *source.nested);
+        ensureNested().updateDelta(target.ensureNested(), *sourceNested);
     }
 }