Browse Source

HPCC-18242 Clean up UDP code regarding burst completion messages

Move one-bit flag occupying 4-byte field into adjacent field (whcih has
reserved space for it already).

Default the option that uses this flag (rather than an entire separate
message) to true.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 years ago
parent
commit
5314507b01

+ 2 - 2
initfiles/componentfiles/configxml/roxie.xsd.in

@@ -1111,10 +1111,10 @@
         </xs:appinfo>
       </xs:annotation>
     </xs:attribute>
-    <xs:attribute name="udpSendCompletedInData" type="xs:boolean" use="optional" default="false">
+    <xs:attribute name="udpSendCompletedPackets" type="xs:boolean" use="optional" default="false">
       <xs:annotation>
         <xs:appinfo>
-          <tooltip>Controls whether UDP completion packets are sent in data packets if possible</tooltip>
+          <tooltip>Controls whether UDP completion packets are sent in separate data packets.</tooltip>
         </xs:appinfo>
       </xs:annotation>
     </xs:attribute>

+ 1 - 1
roxie/ccd/ccdmain.cpp

@@ -751,7 +751,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
         udpSnifferEnabled = topology->getPropBool("@udpSnifferEnabled", true);
         udpInlineCollation = topology->getPropBool("@udpInlineCollation", false);
         udpInlineCollationPacketLimit = topology->getPropInt("@udpInlineCollationPacketLimit", 50);
-        udpSendCompletedInData = topology->getPropBool("@udpSendCompletedInData", false);
+        udpSendCompletedInData = !topology->getPropBool("@udpSendCompletedPackets", false);
         udpRetryBusySenders = topology->getPropInt("@udpRetryBusySenders", 0);
 
         // Historically, this was specified in seconds. Assume any value <= 10 is a legacy value specified in seconds!

+ 2 - 2
roxie/udplib/udpmsgpk.cpp

@@ -499,8 +499,8 @@ public:
         UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
         if (checkTraceLevel(TRACE_MSGPACK, 4))
         {
-            DBGLOG("UdpCollator: CMessageCollator::add_package memLimitEx=%d ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%u udpSequence=%u rowMgr=%p this=%p", 
-                memLimitExceeded, pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->nodeIndex, pktHdr->udpSequence, (void*)rowMgr, this);
+            DBGLOG("UdpCollator: CMessageCollator::add_package memLimitEx=%d ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%u rowMgr=%p this=%p",
+                memLimitExceeded, pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->nodeIndex, (void*)rowMgr, this);
         }
 
         if (memLimitExceeded || roxiemem::memPoolExhausted()) 

+ 2 - 5
roxie/udplib/udpsha.hpp

@@ -27,11 +27,9 @@ extern roxiemem::IDataBufferManager *bufferManager;
 
 typedef bool (*PKT_CMP_FUN) (void *pkData, void *key);
 
-#define UDP_SEQUENCE_COMPLETE 0x80000000
-
 // Flag bits in pktSeq field
 #define UDP_PACKET_COMPLETE           0x80000000  // Packet completes a single slave request
-#define UDP_PACKET_RESERVED           0x40000000  // Not used - could move UDP_SEQUENCE_COMPLETE here?
+#define UDP_PACKET_ENDBURST           0x40000000  // Packet completes a single slave data burst
 #define UDP_PACKET_SEQUENCE_MASK      0x3fffffff
 
 struct UdpPacketHeader 
@@ -40,8 +38,7 @@ struct UdpPacketHeader
     unsigned short metalength;  // length of metadata (comes after header and data)
     unsigned       nodeIndex;   // Node this message came from
     unsigned       msgSeq;      // sequence number of messages ever sent from given node, used with ruid to tell which packets are from same message
-    unsigned       pktSeq;      // sequence number of this packet within the message (top bit signifies final packet)
-    unsigned       udpSequence; // Top bits used for flow control info
+    unsigned       pktSeq;      // sequence number of this packet within the message (top bits used for flags)
     // information below is duplicated in the Roxie packet header - we could remove? However, would make aborts harder, and at least ruid is needed at receive end
     ruid_t         ruid;        // The uid allocated by the server to this slave transaction
     unsigned       msgId;       // sub-id allocated by the server to this request within the transaction

+ 1 - 2
roxie/udplib/udptrr.cpp

@@ -559,8 +559,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                     b = bufferManager->allocate();
                     receive_socket->read(b->data, 1, DATA_PAYLOAD, res, 5);
                     UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
-                    unsigned flowBits = hdr.udpSequence;
-                    if (flowBits & UDP_SEQUENCE_COMPLETE)
+                    if (hdr.pktSeq & UDP_PACKET_ENDBURST)
                     {
                         parent.manager->completed(hdr.nodeIndex);
                     }

+ 2 - 9
roxie/udplib/udptrs.cpp

@@ -111,16 +111,10 @@ public:
                 MTIME_SECTION(queryActiveTimer(), "bucket_wait");
                 bucket->wait((length / 1024)+1);
             }
+            if (udpSendCompletedInData && idx == maxPackets-1)
+                header->pktSeq |= UDP_PACKET_ENDBURST;
             try
             {
-                if (udpSendCompletedInData)
-                {
-                    if (idx == maxPackets-1)
-                    {
-                         // MORE - is this safe ? Any other thread looking at the data right now? Don't _think_ so...
-                        header->udpSequence = UDP_SEQUENCE_COMPLETE;
-                    }
-                }
                 data_socket->write(buffer->data, length);
             }
             catch(IException *e)
@@ -896,7 +890,6 @@ public:
         package_header.pktSeq = 0;
         package_header.nodeIndex = _sourceNode;
         package_header.msgSeq = _msgSeq;
-        package_header.udpSequence = 0; // these are allocated when transmitted
 
         packed_request = false;
         part_buffer = bufferManager->allocate();