Explorar o código

HPCC-25441 Fix race condition whilst starting progress handler

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith %!s(int64=4) %!d(string=hai) anos
pai
achega
a4a20e6557
Modificáronse 3 ficheiros con 41 adicións e 32 borrados
  1. 6 4
      thorlcr/graph/thgraphmaster.cpp
  2. 28 25
      thorlcr/slave/slwatchdog.cpp
  3. 7 3
      thorlcr/thorutil/thormisc.hpp

+ 6 - 4
thorlcr/graph/thgraphmaster.cpp

@@ -70,8 +70,9 @@ public:
 
     CFatalHandler(unsigned timeout) : CTimeoutTrigger(timeout, "EXCEPTION")
     {
+        start();
     }
-    virtual bool action()
+    virtual bool action() override
     {
         StringBuffer s("FAILED TO RECOVER FROM EXCEPTION, STOPPING THOR");
         FLLOG(MCoperatorWarning, thorJob, exception, s.str());
@@ -84,11 +85,11 @@ public:
         return true;
     }
 // IFatalHandler
-    virtual void inform(IException *e)
+    virtual void inform(IException *e) override
     {
         CTimeoutTrigger::inform(e);
     }
-    virtual void clear()
+    virtual void clear() override
     {
         CTimeoutTrigger::clear();
     }
@@ -1758,9 +1759,10 @@ bool CJobMaster::go()
     public:
         CQueryTimeoutHandler(CJobMaster &_job, unsigned timeout) : CTimeoutTrigger(timeout, "QUERY"), job(_job)
         {
+            start();
             inform(MakeThorException(TE_QueryTimeoutError, "Query took greater than %d seconds", timeout));
         }
-        virtual bool action()
+        virtual bool action() override
         {
             job.fireException(exception);
             return true;

+ 28 - 25
thorlcr/slave/slwatchdog.cpp

@@ -28,11 +28,12 @@
 #include "slwatchdog.hpp"
 #include "thgraphslave.hpp"
 
-class CGraphProgressHandlerBase : public CSimpleInterface, implements ISlaveWatchdog, implements IThreaded
+class CGraphProgressHandlerBase : public CInterfaceOf<ISlaveWatchdog>, implements IThreaded
 {
     mutable CriticalSection crit;
     CGraphArray activeGraphs;
-    bool stopped, progressEnabled;
+    bool stopped = true;
+    bool progressEnabled = false;
     CThreaded threaded;
     SocketEndpoint self;
 
@@ -55,36 +56,24 @@ class CGraphProgressHandlerBase : public CSimpleInterface, implements ISlaveWatc
     virtual void sendData(MemoryBuffer &mb) = 0;
 
 public:
-    IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-    CGraphProgressHandlerBase() : threaded("CGraphProgressHandler")
+    CGraphProgressHandlerBase() : threaded("CGraphProgressHandler", this)
     {
         self = queryMyNode()->endpoint();
-        stopped = true;
 
         progressEnabled = globals->getPropBool("@watchdogProgressEnabled");
-        stopped = false;
 #ifdef _WIN32
         threaded.adjustPriority(+1); // it is critical that watchdog packets get through.
 #endif
-        threaded.init(this);
     }
-    ~CGraphProgressHandlerBase()
+    void start()
     {
-        stop();
+        stopped = false;
+        threaded.start();
     }
-    virtual void stop()
+    virtual void beforeDispose() override
     {
-        if (!stopped)
-        {
-#ifdef _WIN32
-            threaded.adjustPriority(0); // restore to normal before stopping
-#endif
-            stopped = true;
-            threaded.join();
-            LOG(MCdebugProgress, thorJob, "Stopped watchdog");
-        }
+        stop();
     }
-
     size32_t gatherData(MemoryBuffer &mb)
     {
         CriticalBlock b(crit);
@@ -112,14 +101,14 @@ public:
     }
 
 // ISlaveWatchdog impl.
-    void startGraph(CGraphBase &graph)
+    virtual void startGraph(CGraphBase &graph) override
     {
         CriticalBlock b(crit);
         activeGraphs.append(*LINK(&graph));
         StringBuffer str("Watchdog: Start Job ");
         LOG(MCthorDetailedDebugInfo, thorJob, "%s", str.append(graph.queryGraphId()).str());
     }
-    void stopGraph(CGraphBase &graph, MemoryBuffer *mb)
+    virtual void stopGraph(CGraphBase &graph, MemoryBuffer *mb) override
     {
         CriticalBlock b(crit);
         if (NotFound != activeGraphs.find(graph))
@@ -135,7 +124,19 @@ public:
             activeGraphs.zap(graph);
         }
     }
-    virtual void debugRequest(MemoryBuffer &msg, const char *request) const
+    virtual void stop() override
+    {
+        if (!stopped)
+        {
+#ifdef _WIN32
+            threaded.adjustPriority(0); // restore to normal before stopping
+#endif
+            stopped = true;
+            threaded.join();
+            LOG(MCdebugProgress, thorJob, "Stopped watchdog");
+        }
+    }
+    virtual void debugRequest(MemoryBuffer &msg, const char *request) const override
     {
         Owned<IPTree> req = createPTreeFromXMLString(request);
 
@@ -195,8 +196,9 @@ public:
         StringBuffer ipStr;
         queryMasterNode().endpoint().getIpText(ipStr);
         sock.setown(ISocket::udp_connect(getFixedPort(getMasterPortBase(), TPORT_watchdog),ipStr.str()));
+        start();
     }
-    virtual void sendData(MemoryBuffer &mb)
+    virtual void sendData(MemoryBuffer &mb) override
     {
         HeartBeatPacketHeader hb;
         //Cast is to avoid warning about writing to an object with non trivial copy assignment
@@ -217,8 +219,9 @@ class CGraphProgressMPHandler : public CGraphProgressHandlerBase
 public:
     CGraphProgressMPHandler()
     {
+        start();
     }
-    virtual void sendData(MemoryBuffer &mb)
+    virtual void sendData(MemoryBuffer &mb) override
     {
         CMessageBuffer msg;
         msg.swapWith(mb);

+ 7 - 3
thorlcr/thorutil/thormisc.hpp

@@ -228,12 +228,16 @@ protected:
     Owned<IException> exception;
 
 public:
-    CTimeoutTrigger(unsigned _timeout, const char *_description) : timeout(_timeout), description(_description), threaded("TimeoutTrigger")
+    CTimeoutTrigger(unsigned _timeout, const char *_description) : timeout(_timeout), description(_description), threaded("TimeoutTrigger", this)
+    {
+    }
+    void start()
     {
         running = (timeout!=0);
-        threaded.init(this);
+        if (running)
+            threaded.start();
     }
-    virtual ~CTimeoutTrigger()
+    virtual void beforeDispose() override
     {
         stop();
         threaded.join();