Explorar el Código

Merge pull request #9024 from richardkchapman/thread-priority

HPCC-16117 UDP messaging threads should run at elevated priority

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday hace 8 años
padre
commit
c2d91b2a5e

+ 14 - 0
initfiles/componentfiles/configxml/roxie.xsd.in

@@ -1071,6 +1071,20 @@
                 </xs:appinfo>
             </xs:annotation>
         </xs:attribute>
+        <xs:attribute name="udpSnifferReadThreadPriority" type="nonNegativeInteger" use="optional" default="3">
+            <xs:annotation>
+                <xs:appinfo>
+                    <tooltip>If non-zero, run the sniffer read thread at elevated priority level</tooltip>
+                </xs:appinfo>
+            </xs:annotation>
+        </xs:attribute>
+        <xs:attribute name="udpSnifferSendThreadPriority" type="nonNegativeInteger" use="optional" default="3">
+            <xs:annotation>
+                <xs:appinfo>
+                    <tooltip>If non-zero, run the sniffer send thread at elevated priority level</tooltip>
+                </xs:appinfo>
+            </xs:annotation>
+        </xs:attribute>
     </xs:attributeGroup>
   <xs:attributeGroup name="Cache">
     <xs:attribute name="blobCacheMem" type="xs:nonNegativeInteger" use="optional" default="0">

+ 3 - 0
roxie/ccd/ccdmain.cpp

@@ -729,6 +729,9 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
         if (udpRequestToSendTimeout == 0)
             udpRequestToSendTimeout = 5; 
         // MORE: might want to check socket buffer sizes against sys max here instead of udp threads ?
+        udpSnifferReadThreadPriority = topology->getPropInt("@udpSnifferReadThreadPriority", 3);
+        udpSnifferSendThreadPriority = topology->getPropInt("@udpSnifferSendThreadPriority", 3);
+
         udpMulticastBufferSize = topology->getPropInt("@udpMulticastBufferSize", 262142);
         udpFlowSocketsSize = topology->getPropInt("@udpFlowSocketsSize", 131072);
         udpLocalWriteSocketSize = topology->getPropInt("@udpLocalWriteSocketSize", 1024000);

+ 3 - 0
roxie/udplib/udplib.hpp

@@ -138,4 +138,7 @@ extern UDPLIB_API unsigned udpInlineCollationPacketLimit;
 extern UDPLIB_API bool udpInlineCollation;
 extern UDPLIB_API bool udpSnifferEnabled;
 extern UDPLIB_API bool udpSendCompletedInData;
+extern UDPLIB_API unsigned udpSnifferReadThreadPriority;
+extern UDPLIB_API unsigned udpSnifferSendThreadPriority;
+
 #endif

+ 2 - 0
roxie/udplib/udpsha.cpp

@@ -41,6 +41,8 @@ unsigned udpTraceLevel = 0;
 unsigned udpTraceCategories = (unsigned) -1;
 unsigned udpFlowSocketsSize = 131072;
 unsigned udpLocalWriteSocketSize = 1024000;
+unsigned udpSnifferReadThreadPriority = 3;
+unsigned udpSnifferSendThreadPriority = 3;
 
 unsigned multicastTTL = 1;
 

+ 18 - 1
roxie/udplib/udptrr.cpp

@@ -450,6 +450,14 @@ class CReceiveManager : public CInterface, implements IReceiveManager
         virtual int run() 
         {
             DBGLOG("UdpReceiver: ReceiveFlowManager started");
+            if (udpSnifferSendThreadPriority)
+            {
+#ifdef __linux__
+                setLinuxThreadPriority(udpSnifferSendThreadPriority);
+#else
+                adjustPriority(1);
+#endif
+            }
             while (running)
             {
                 requestPending.wait();
@@ -501,7 +509,8 @@ class CReceiveManager : public CInterface, implements IReceiveManager
             {
                 StringBuffer ipStr;
                 snifferIP.getIpText(ipStr);
-                DBGLOG("UdpReceiver: receive_sniffer port open %s:%i", ipStr.str(), snifferPort);
+                size32_t actualSize = sniffer_socket->get_receive_buffer_size();
+                DBGLOG("UdpReceiver: receive_sniffer port open %s:%i sockbuffsize=%d actual %d", ipStr.str(), snifferPort, udpFlowSocketsSize, actualSize);
             }
         }
 
@@ -529,6 +538,14 @@ class CReceiveManager : public CInterface, implements IReceiveManager
         virtual int run() 
         {
             DBGLOG("UdpReceiver: sniffer started");
+            if (udpSnifferReadThreadPriority)
+            {
+#ifdef __linux__
+                setLinuxThreadPriority(udpSnifferReadThreadPriority);
+#else
+                adjustPriority(1);
+#endif
+            }
             while (running) 
             {
                 try