Преглед на файлове

HPCC-21470 Fix a very old MP bug in multi packet send

Fix mechanism that avoids multi packets sends clashing

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith преди 6 години
родител
ревизия
d10b12e602
променени са 1 файла, в които са добавени 18 реда и са изтрити 6 реда
  1. 18 6
      system/mp/mpcomm.cpp

+ 18 - 6
system/mp/mpcomm.cpp

@@ -1733,7 +1733,8 @@ bool CMPChannel::send(MemoryBuffer &mb, mptag_t tag, mptag_t replytag, CTimeMon
     assertex(tm.timeout);
     size32_t msgsize = mb.length();
     PacketHeader hdr(msgsize+sizeof(PacketHeader),localep,remoteep,tag,replytag);
-    if (closed||(reply&&!isConnected())) {  // flag error if has been disconnected
+    if (closed||(reply&&!isConnected()))  // flag error if has been disconnected
+    {
 #ifdef _TRACELINKCLOSED
         LOG(MCdebugInfo(100), unknownJob, "CMPChannel::send closed on entry %d",(int)closed);
         PrintStackReport();
@@ -1744,14 +1745,24 @@ bool CMPChannel::send(MemoryBuffer &mb, mptag_t tag, mptag_t replytag, CTimeMon
 
     bool ismulti = (msgsize>MAXDATAPERPACKET);
     // pre-condition - ensure no clashes
-    for (;;) {
+    for (;;)
+    {
         sendmutex.lock();
-        if (ismulti) {
+        if (ismulti)
+        {
             if (multitag==TAG_NULL)     // don't want to interleave with other multi send
+            {
+                multitag = tag;
                 break;
+            }
         }
         else if (multitag!=tag)         // don't want to interleave with another of same tag
             break;
+
+        /* NB: block clashing multi packet sends until current one is done,
+         * but note that the multipackethandler-send() temporarily releases the sendmutex,
+         * between packets, to allow other tags to interleave
+         */
         sendwaiting++;
         sendmutex.unlock();
         sendwaitingsig.wait();
@@ -1774,14 +1785,15 @@ bool CMPChannel::send(MemoryBuffer &mb, mptag_t tag, mptag_t replytag, CTimeMon
         ~Cpostcondition() 
         { 
             if (multitag)
-                *multitag = TAG_NULL; 
-            if (sendwaiting) {
+                *multitag = TAG_NULL;
+            if (sendwaiting)
+            {
                 sendwaitingsig.signal(sendwaiting);
                 sendwaiting = 0;
             }
             sendmutex.unlock();
         }
-    } postcond(sendmutex,sendwaiting,sendwaitingsig,ismulti?&multitag:NULL); 
+    } postcond(sendmutex, sendwaiting, sendwaitingsig, (ismulti && (multitag != TAG_NULL)) ? &multitag : nullptr);
 
     if (ismulti)
         return parent->multipackethandler->send(this,hdr,mb,tm,sendmutex);