Преглед изворни кода

HPCC-20910 Prevent slave initialization aborting too early

Extend timeout for activity initialization time and make it
configurable, plus log why waiting in logs.
Prior to this change, if a query was blocked e.g. due to a file
locking issue in the master during initialization, it would cause
the slaves to fail after timeout (25 mins).
Not it will by default continue for up to 2hrs, configurable
via global/workunit option "actInitWaitTimeMins"

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith пре 6 година
родитељ
комит
7474d1f8ad

+ 1 - 0
thorlcr/graph/thgraph.hpp

@@ -28,6 +28,7 @@
 
 #define LONGTIMEOUT (25*60*1000)
 #define MEDIUMTIMEOUT 30000
+#define DEFAULT_MAX_ACTINITWAITTIME_MINS (2*60) // 2hrs
 
 #include "jlib.hpp"
 #include "jarray.hpp"

+ 14 - 2
thorlcr/graph/thgraphslave.cpp

@@ -949,8 +949,18 @@ bool CSlaveGraph::recvActivityInitData(size32_t parentExtractSz, const byte *par
 
     if (syncInitData())
     {
-        if (!graphCancelHandler.recv(queryJobChannel().queryJobComm(), msg, 0, mpTag, NULL, LONGTIMEOUT))
-            throw MakeStringException(0, "Error receiving actinit data for graph: %" GIDPF "d", graphId);
+        CTimeMon timer;
+        while (!graphCancelHandler.recv(queryJobChannel().queryJobComm(), msg, 0, mpTag, NULL, MEDIUMTIMEOUT))
+        {
+            if (graphCancelHandler.isCancelled())
+                throw MakeStringException(0, "Aborted whilst waiting to receive actinit data for graph: %" GIDPF "d", graphId);
+            // put an upper limit on time Thor can be stalled here
+            unsigned mins = timer.elapsed()/60000;
+            if (mins >= jobS->queryActInitWaitTimeMins())
+                throw MakeStringException(0, "Timed out after %u minutes, waiting to receive actinit data for graph: %" GIDPF "u", mins, graphId);
+
+            GraphPrintLogEx(this, thorlog_null, MCwarning, "Waited %u minutes for activity initialization message (Master may be blocked on a file lock?).", mins);
+        }
         replyTag = msg.getReplyTag();
         msg.read(len);
     }
@@ -1659,6 +1669,8 @@ CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, co
     getOpt("remoteCompressedOutput", remoteCompressedOutput);
     if (remoteCompressedOutput.length())
         setRemoteOutputCompressionDefault(remoteCompressedOutput);
+
+    actInitWaitTimeMins = getOptInt(THOROPT_ACTINIT_WAITTIME_MINS, DEFAULT_MAX_ACTINITWAITTIME_MINS);
 }
 
 void CJobSlave::addChannel(IMPServer *mpServer)

+ 2 - 0
thorlcr/graph/thgraphslave.hpp

@@ -474,6 +474,7 @@ class graphslave_decl CJobSlave : public CJobBase
     Owned<IPropertyTree> workUnitInfo;
     size32_t oldNodeCacheMem;
     unsigned channelMemoryMB;
+    unsigned actInitWaitTimeMins = DEFAULT_MAX_ACTINITWAITTIME_MINS;
 
 public:
     IMPLEMENT_IINTERFACE;
@@ -484,6 +485,7 @@ public:
     virtual void startJob() override;
     virtual void endJob() override;
     const char *queryFindString() const { return key.get(); } // for string HT
+    unsigned queryActInitWaitTimeMins() const { return actInitWaitTimeMins; }
 
     virtual IGraphTempHandler *createTempHandler(bool errorOnMissing);
     ISlaveWatchdog *queryProgressHandler() { return watchdog; }

+ 2 - 0
thorlcr/thorutil/thormisc.hpp

@@ -95,6 +95,7 @@
 #define THOROPT_KEYLOOKUP_COMPRESS_MESSAGES "keyedJoinCompressMsgs" // compress key and fetch request messages                                   (default = true)
 #define THOROPT_FORCE_REMOTE_DISABLED "forceRemoteDisabled"     // disable remote (via dafilesrv) reads (NB: takes precedence over forceRemoteRead) (default = false)
 #define THOROPT_FORCE_REMOTE_READ     "forceRemoteRead"         // force remote (via dafilesrv) read (NB: takes precedence over environment.conf setting) (default = false)
+#define THOROPT_ACTINIT_WAITTIME_MINS "actInitWaitTimeMins"     // max time to wait for slave activity initialization message from master
 
 
 #define INITIAL_SELFJOIN_MATCH_WARNING_LEVEL 20000  // max of row matches before selfjoin emits warning
@@ -140,6 +141,7 @@ public:
     {
         reset();
     }
+    bool isCancelled() const { return cancelled; }
     void reset()
     {
         clear();