瀏覽代碼

Merge pull request #14656 from richardkchapman/hpcc-25465

HPCC-25465 Message collator insert method can be n^2

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Mark Kelly <mark.kelly@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 4 年之前
父節點
當前提交
f86df1040e
共有 1 個文件被更改,包括 95 次插入30 次删除
  1. 95 30
      roxie/udplib/udpmsgpk.cpp

+ 95 - 30
roxie/udplib/udpmsgpk.cpp

@@ -60,13 +60,21 @@ class PackageSequencer : public CInterface, implements IInterface
 {
     DataBuffer *firstPacket;
     DataBuffer *lastContiguousPacket;
+    DataBuffer *tail = nullptr;
     unsigned metaSize;
     unsigned headerSize;
     const void *header;
+#ifdef _DEBUG
+    unsigned numPackets = 0;
+    unsigned maxSeqSeen = 0;
+    unsigned scans = 0;
+    unsigned overscans = 0;
+#endif
 
     MemoryBuffer metadata;
     InterruptableSemaphore dataAvailable; // MORE - need to work out when to interrupt it!
 
+
 public:
     IMPLEMENT_IINTERFACE;
 
@@ -122,7 +130,11 @@ public:
         bool res = false;
         assert(dataBuff->msgNext == NULL);
         UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
-
+        unsigned pktseq = pktHdr->pktSeq;
+#ifdef _DEBUG
+        if ((pktseq & UDP_PACKET_SEQUENCE_MASK) > maxSeqSeen)
+            maxSeqSeen = pktseq & UDP_PACKET_SEQUENCE_MASK;
+#endif
         if (checkTraceLevel(TRACE_MSGPACK, 5))
         {
             StringBuffer s;
@@ -130,46 +142,74 @@ public:
                     pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->node.getTraceText(s).str(), dataBuff, this);
         }
 
+        // Optimize the (very) common case where I need to add to the end
         DataBuffer *finger;
         DataBuffer *prev;
-        if (lastContiguousPacket)
+        if (tail && (pktseq > ((UdpPacketHeader*) tail->data)->pktSeq))
         {
-            UdpPacketHeader *oldHdr = (UdpPacketHeader*) lastContiguousPacket->data;
-            if (pktHdr->pktSeq <= oldHdr->pktSeq)
-            {
-                // discard duplicated incoming packet - should be uncommon unless we requested a resend for a packet already in flight
-                if (checkTraceLevel(TRACE_MSGPACK, 5))
-                    DBGLOG("UdpCollator: Discarding duplicate incoming packet");
-                dataBuff->Release();
-                return false;
-            }
-            finger = lastContiguousPacket->msgNext;
-            prev = lastContiguousPacket;
+            assert(tail->msgNext==nullptr);
+            finger = nullptr;
+            prev = tail;
+            tail = dataBuff;
         }
         else
         {
-            finger = firstPacket; 
-            prev = NULL;
-        }
-        while (finger)
-        {
-            UdpPacketHeader *oldHdr = (UdpPacketHeader*) finger->data;
-            if (pktHdr->pktSeq == oldHdr->pktSeq)
+            // This is an insertion sort - YUK!
+            if (lastContiguousPacket)
             {
-                // discard duplicated incoming packet - should be uncommon unless we requested a resend for a packet already in flight
-                if (checkTraceLevel(TRACE_MSGPACK, 5))
-                    DBGLOG("UdpCollator: Discarding duplicate incoming packet");
-                dataBuff->Release();
-                return false;
+                UdpPacketHeader *oldHdr = (UdpPacketHeader*) lastContiguousPacket->data;
+                if (pktHdr->pktSeq <= oldHdr->pktSeq)
+                {
+                    // discard duplicated incoming packet - should be uncommon unless we requested a resend for a packet already in flight
+                    if (checkTraceLevel(TRACE_MSGPACK, 5))
+                        DBGLOG("UdpCollator: Discarding duplicate incoming packet");
+                    dataBuff->Release();
+                    return false;
+                }
+                finger = lastContiguousPacket->msgNext;
+                prev = lastContiguousPacket;
             }
-            else if (pktHdr->pktSeq < oldHdr->pktSeq)
+            else
             {
-                break;
+                finger = firstPacket;
+                prev = NULL;
             }
-            else
+            while (finger)
             {
-                prev = finger;
-                finger = finger->msgNext;
+    #ifdef _DEBUG
+                scans++;
+                if (scans==1000000)
+                {
+                    overscans++;
+                    DBGLOG("%u million scans in UdpCollator insert(DataBuffer *dataBuff", overscans);
+                    if (lastContiguousPacket)
+                    {
+                        UdpPacketHeader *oldHdr = (UdpPacketHeader*) lastContiguousPacket->data;
+                        DBGLOG("lastContiguousPacket is at %u , last packet seen is %u", oldHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK, pktHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK);
+                    }
+                    else
+                        DBGLOG("lastContiguousPacket is NULL , last packet seen is %u", pktHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK);
+                    scans = 0;
+                }
+    #endif
+                UdpPacketHeader *oldHdr = (UdpPacketHeader*) finger->data;
+                if (pktHdr->pktSeq == oldHdr->pktSeq)
+                {
+                    // discard duplicated incoming packet - should be uncommon unless we requested a resend for a packet already in flight
+                    if (checkTraceLevel(TRACE_MSGPACK, 5))
+                        DBGLOG("UdpCollator: Discarding duplicate incoming packet");
+                    dataBuff->Release();
+                    return false;
+                }
+                else if (pktHdr->pktSeq < oldHdr->pktSeq)
+                {
+                    break;
+                }
+                else
+                {
+                    prev = finger;
+                    finger = finger->msgNext;
+                }
             }
         }
         if (prev)
@@ -178,8 +218,15 @@ public:
             prev->msgNext = dataBuff;
         }
         else
+        {
             firstPacket = dataBuff;
+            if (!tail)
+                tail = dataBuff;
+        }
         dataBuff->msgNext = finger;
+#ifdef _DEBUG
+        numPackets++;
+#endif
         if (prev == lastContiguousPacket)
         {
             unsigned prevseq;
@@ -250,6 +297,12 @@ public:
         return headerSize;
     }
 
+#ifdef _DEBUG
+    void dump()
+    {
+        DBGLOG("Contains %u packets, lastSeq = %u", numPackets, maxSeqSeen);
+    }
+#endif
 };
 
 // MessageResult ====================================================================================
@@ -564,6 +617,18 @@ IMessageResult *CMessageCollator::getNextResult(unsigned time_out, bool &anyActi
     activity = false;
     if (!anyActivity && ruid>=RUID_FIRST && checkTraceLevel(TRACE_MSGPACK, 1)) // suppress the tracing for pings where we expect the timeout...
     {
+#ifdef _DEBUG
+        DBGLOG("GetNextResult timeout: mapping has %d partial results", mapping.ordinality());
+        HashIterator h(mapping);
+        ForEach(h)
+        {
+            auto *r = mapping.mapToValue(&h.query());
+            PUID puid = *(PUID *) h.query().getKey();
+            DBGLOG("puid=%" I64F "x:", puid);
+            PackageSequencer *pkSqncr = mapping.getValue(puid);
+            pkSqncr->dump();
+        }
+#endif
         DBGLOG("UdpCollator: CMessageCollator::GetNextResult timeout");
     }
     return 0;