ソースを参照

Merge pull request #9035 from richardkchapman/ok-to-send

HPCC-16130 Change default timeout on ok_to_send response

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 8 年 前
コミット
3af1fb8ded

+ 2 - 2
initfiles/componentfiles/configxml/roxie.xsd.in

@@ -1029,10 +1029,10 @@
         </xs:appinfo>
       </xs:annotation>
     </xs:attribute>
-        <xs:attribute name="udpRequestToSendTimeout" type="xs:nonNegativeInteger" use="optional" default="5">
+        <xs:attribute name="udpRequestToSendTimeout" type="xs:nonNegativeInteger" use="optional" default="0">
             <xs:annotation>
                 <xs:appinfo>
-                    <tooltip>Controls the timeout value a agent udp will wait for a permission to send from a Roxie server</tooltip>
+                    <tooltip>Controls the timeout value agent udp will wait for permission to send from a Roxie server, in milliseconds. Specify 0 to calcuate automatically.</tooltip>
                 </xs:appinfo>
             </xs:annotation>
         </xs:attribute>

+ 11 - 3
roxie/ccd/ccdmain.cpp

@@ -721,11 +721,19 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
         udpInlineCollationPacketLimit = topology->getPropInt("@udpInlineCollationPacketLimit", 50);
         udpSendCompletedInData = topology->getPropBool("@udpSendCompletedInData", false);
         udpRetryBusySenders = topology->getPropInt("@udpRetryBusySenders", 0);
+
+        // Historically, this was specified in seconds. Assume any value <= 10 is a legacy value specified in seconds!
         udpMaxRetryTimedoutReqs = topology->getPropInt("@udpMaxRetryTimedoutReqs", 0);
-        udpRequestToSendTimeout = topology->getPropInt("@udpRequestToSendTimeout", 5);
-        // MORE: think of a better way/value/check maybe/and/or based on Roxie server timeout
+        udpRequestToSendTimeout = topology->getPropInt("@udpRequestToSendTimeout", 0);
+        if (udpRequestToSendTimeout<=10)
+            udpRequestToSendTimeout *= 1000;
         if (udpRequestToSendTimeout == 0)
-            udpRequestToSendTimeout = 5; 
+        {
+            if (slaTimeout)
+                udpRequestToSendTimeout = (slaTimeout*3) / 4;
+            else
+                udpRequestToSendTimeout = 5000;
+        }
         // MORE: might want to check socket buffer sizes against sys max here instead of udp threads ?
         udpMulticastBufferSize = topology->getPropInt("@udpMulticastBufferSize", 262142);
         udpFlowSocketsSize = topology->getPropInt("@udpFlowSocketsSize", 131072);

+ 8 - 9
roxie/udplib/udptrs.cpp

@@ -36,7 +36,7 @@
 
 unsigned udpOutQsPriority = 0;
 unsigned udpMaxRetryTimedoutReqs = 0; // 0 means off (keep retrying forever)
-unsigned udpRequestToSendTimeout = 5; // value in sec
+unsigned udpRequestToSendTimeout = 0; // value in milliseconds - 0 means calculate from query timeouts
 bool udpSnifferEnabled = true;
 
 #ifdef _DEBUG
@@ -502,7 +502,7 @@ class CSendManager : implements ISendManager, public CInterface
 
         char *state;
         unsigned char *timeouts;   // Number of consecutive timeouts
-        time_t *request_time;
+        unsigned *request_time;
 
         CriticalSection cr;
         Semaphore       sem;
@@ -527,8 +527,7 @@ class CSendManager : implements ISendManager, public CInterface
                     idle = true;
                 if (!running) return 0;
 
-                time_t now;
-                time(&now);
+                unsigned now = msTick();
 
                 // I don't really like this loop. Could keep a circular buffer of ones with non-zero state?
                 // In a typical heavy load scenario most will be pending
@@ -549,7 +548,7 @@ class CSendManager : implements ISendManager, public CInterface
                         if ( (now - request_time[i]) < udpRequestToSendTimeout) // MORE - should really protect it?
                             break;
                         timeouts[i]++;
-                        EXCLOG(MCoperatorError,"ERROR: UdpSender: timed out %i times (max=%i) waiting ok_to_send msg from node=%d timed out after=%i sec max=%i sec",
+                        EXCLOG(MCoperatorError,"ERROR: UdpSender: timed out %i times (max=%i) waiting ok_to_send msg from node=%d timed out after=%i msec max=%i msec",
                                 timeouts[i], udpMaxRetryTimedoutReqs,   
                                 i, (int) (now - request_time[i]), udpRequestToSendTimeout);
                         // 0 (zero) value of udpMaxRetryTimedoutReqs means NO limit on retries
@@ -584,7 +583,7 @@ class CSendManager : implements ISendManager, public CInterface
                 if (dataRemaining)
                 {
                     state[index] = pending_request;
-                    time(&request_time[index]); 
+                    request_time[index] = msTick();
                 }
                 else
                 {
@@ -616,7 +615,7 @@ class CSendManager : implements ISendManager, public CInterface
             CriticalBlock b(cr);
             parent.sendRequest(index, flow_t::request_to_send);
             state[index] = pending_request;
-            time(&request_time[index]); 
+            request_time[index] = msTick();
         }
 
         void abort(unsigned index) 
@@ -639,8 +638,8 @@ class CSendManager : implements ISendManager, public CInterface
             memset(state, 0, target_count);
             timeouts = new unsigned char [target_count];
             memset(timeouts, 0, target_count);
-            request_time = new time_t [target_count];
-            memset(request_time, 0, sizeof(time_t) * target_count);
+            request_time = new unsigned [target_count];
+            memset(request_time, 0, sizeof(unsigned) * target_count);
             start();
         }
 

+ 3 - 3
roxie/udplib/uttest.cpp

@@ -806,12 +806,12 @@ void usage(char *err = NULL)
     fprintf(stderr, " [-destA IP]          : Sets the sender destination ip address to IP (i.e roxie server IP) <default to local host>\n");
     fprintf(stderr, " [-destB IP]          : Sets the sender second destination ip address to IP <default no sec dest>\n");
     fprintf(stderr, " [-multiCast IP]      : Sets the sniffer multicast ip address to IP <default %s>\n", multiCast);
-    fprintf(stderr, " [-udpTimeout sec]    : Sets the sender udpRequestToSendTimeout value  <default %i>\n", udpRequestToSendTimeout);
+    fprintf(stderr, " [-udpTimeout msec]   : Sets the sender udpRequestToSendTimeout value  <default %i>\n", udpRequestToSendTimeout);
     fprintf(stderr, " [-udpMaxTimeouts val]: Sets the sender udpMaxRetryTimedoutReqs value <default %i>\n", udpMaxRetryTimedoutReqs);
     fprintf(stderr, " [-udpNumQs val]      : Sets the sender's number of output queues <default %i>\n", udpNumQs);
     fprintf(stderr, " [-udpQsPriority val] : Sets the sender's output queues priority udpQsPriority <default %i>\n", udpOutQsPriority);
     fprintf(stderr, " [-packerHdrSize val] : Sets the packers header size (like RoxieHeader) <default %i>\n", packerHdrSize);
-    fprintf(stderr, " [-numPackers val]    : Sets the number of packers/unpakcers to create/expect <default %i>\n", numPackers);
+    fprintf(stderr, " [-numPackers val]    : Sets the number of packers/unpackers to create/expect <default %i>\n", numPackers);
     fprintf(stderr, " [-packers val vale .]: Sets a packer specific packet sizes, this option can be repeated as many packers as needed\n");
     fprintf(stderr, " [-numSizes val]      : Sets the number of packet data sizes to try sending/receiving <default %i>\n", numSizes);
     fprintf(stderr, " [-numSends val]]     : Sets the number of msgs per size per packer to send <default %i>\n", numSends);
@@ -839,7 +839,7 @@ int main(int argc, char * argv[] )
     progName = argv[0];
     destA = myIndex = addRoxieNode(GetCachedHostName());
 
-    udpRequestToSendTimeout = 5;
+    udpRequestToSendTimeout = 5000;
     udpMaxRetryTimedoutReqs = 3;
     udpOutQsPriority = 5;
     udpTraceLevel = 1;