浏览代码

HPCC-14234 WHEN subgraph execution race condition fix

A race condition during execution of dependent WHEN subgraphs
can cause Thor to stall. If the slave begins to execute before
the master graph has triggered the completio of the parent graph,
the slave graph executor would queue the pending graph and never
pull it from the queue.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 9 年之前
父节点
当前提交
fa89c2652f
共有 3 个文件被更改,包括 53 次插入111 次删除
  1. 51 103
      thorlcr/graph/thgraph.cpp
  2. 1 4
      thorlcr/graph/thgraph.hpp
  3. 1 4
      thorlcr/slave/slavmain.cpp

+ 51 - 103
thorlcr/graph/thgraph.cpp

@@ -1110,7 +1110,6 @@ CGraphBase::CGraphBase(CJobBase &_job) : job(_job)
     startBarrier = waitBarrier = doneBarrier = NULL;
     mpTag = waitBarrierTag = startBarrierTag = doneBarrierTag = TAG_NULL;
     executeReplyTag = TAG_NULL;
-    poolThreadHandle = 0;
     parentExtractSz = 0;
     counter = 0; // loop/graph counter, will be set by loop/graph activity if needed
 }
@@ -1253,6 +1252,7 @@ bool CGraphBase::preStart(size32_t parentExtractSz, const byte *parentExtract)
 
 void CGraphBase::executeSubGraph(size32_t parentExtractSz, const byte *parentExtract)
 {
+    CriticalBlock b(executeCrit);
     if (job.queryPausing())
         return;
     Owned<IException> exception;
@@ -1319,12 +1319,6 @@ void CGraphBase::execute(size32_t _parentExtractSz, const byte *parentExtract, b
     }
 }
 
-void CGraphBase::join()
-{
-    if (poolThreadHandle)
-        queryJob().joinGraph(*this);
-}
-
 void CGraphBase::doExecute(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies)
 {
     if (isComplete()) return;
@@ -2137,7 +2131,7 @@ class CGraphExecutor : public CInterface, implements IGraphExecutor
     unsigned limit;
     unsigned waitOnRunning;
     CriticalSection crit;
-    Semaphore sem, runningSem;
+    Semaphore runningSem;
     Owned<IThreadPool> graphPool;
 
     class CGraphExecutorFactory : public CInterface, implements IThreadFactory
@@ -2164,24 +2158,35 @@ class CGraphExecutor : public CInterface, implements IGraphExecutor
                 }
                 void main()
                 {
-                    Linked<CGraphBase> graph = graphInfo->subGraph;
-                    Owned<IException> e;
-                    try
-                    {
-                        graphInfo->callback.runSubgraph(*graph, graphInfo->parentExtractMb.length(), (const byte *)graphInfo->parentExtractMb.toByteArray());
-                    }
-                    catch (IException *_e)
-                    {
-                        e.setown(_e);
-                    }
-                    try { graphInfo->executor.graphDone(*graphInfo, e); }
-                    catch (IException *e)
+                    loop
                     {
-                        GraphPrintLog(graph, e, "graphDone");
-                        e->Release();
+                        Linked<CGraphBase> graph = graphInfo->subGraph;
+                        Owned<IException> e;
+                        try
+                        {
+                            PROGLOG("CGraphExecutor: Running graph, graphId=%" GIDPF "d", graph->queryGraphId());
+                            graphInfo->callback.runSubgraph(*graph, graphInfo->parentExtractMb.length(), (const byte *)graphInfo->parentExtractMb.toByteArray());
+                        }
+                        catch (IException *_e)
+                        {
+                            e.setown(_e);
+                        }
+                        Owned<CGraphExecutorGraphInfo> nextGraphInfo;
+                        try
+                        {
+                            nextGraphInfo.setown(graphInfo->executor.graphDone(*graphInfo, e));
+                        }
+                        catch (IException *e)
+                        {
+                            GraphPrintLog(graph, e, "graphDone");
+                            e->Release();
+                        }
+                        if (e)
+                            throw e.getClear();
+                        if (!nextGraphInfo)
+                            return;
+                        graphInfo.setown(nextGraphInfo.getClear());
                     }
-                    if (e)
-                        throw e.getClear();
                 }
                 bool canReuse() { return true; }
                 bool stop() { return true; }
@@ -2206,6 +2211,7 @@ public:
     CGraphExecutor(CJobBase &_job) : job(_job)
     {
         limit = (unsigned)job.getWorkUnitValueInt("concurrentSubGraphs", globals->getPropInt("@concurrentSubGraphs", 1));
+        PROGLOG("CGraphExecutor: limit = %d", limit);
         waitOnRunning = 0;
         stopped = false;
         factory = new CGraphExecutorFactory(*this);
@@ -2214,10 +2220,10 @@ public:
     ~CGraphExecutor()
     {
         stopped = true;
-        sem.signal();
+        graphPool->joinAll();
         factory->Release();
     }
-    void graphDone(CGraphExecutorGraphInfo &doneGraphInfo, IException *e)
+    CGraphExecutorGraphInfo *graphDone(CGraphExecutorGraphInfo &doneGraphInfo, IException *e)
     {
         CriticalBlock b(crit);
         running.zap(doneGraphInfo);
@@ -2231,8 +2237,7 @@ public:
         {
             stopped = true;
             stack.kill();
-            sem.signal();
-            return;
+            return NULL;
         }
         if (job.queryPausing())
             stack.kill();
@@ -2281,7 +2286,20 @@ public:
         }
         job.markWuDirty();
         PROGLOG("CGraphExecutor running=%d, waitingToRun=%d, dependentsWaiting=%d", running.ordinality(), toRun.ordinality(), stack.ordinality());
-        sem.signal();
+
+        while (toRun.ordinality())
+        {
+            if (job.queryPausing())
+                return NULL;
+            Linked<CGraphExecutorGraphInfo> nextGraphInfo = &toRun.item(0);
+            toRun.remove(0);
+            if (!nextGraphInfo->subGraph->isComplete() && (NULL == findRunning(nextGraphInfo->subGraph->queryGraphId())))
+            {
+                running.append(*nextGraphInfo.getLink());
+                return nextGraphInfo.getClear();
+            }
+        }
+        return NULL;
     }
 // IGraphExecutor
     virtual void add(CGraphBase *subGraph, IGraphCallback &callback, bool checkDependencies, size32_t parentExtractSz, const byte *parentExtract)
@@ -2342,8 +2360,7 @@ public:
             {
                 running.append(*LINK(graphInfo));
                 PROGLOG("Add: Launching graph thread for graphId=%" GIDPF "d", subGraph->queryGraphId());
-                PooledThreadHandle h = graphPool->start(graphInfo.getClear());
-                subGraph->poolThreadHandle = h;
+                graphPool->start(graphInfo.getClear());
             }
             else
                 stack.add(*graphInfo.getClear(), 0); // push to front, no dependency, free to run next.
@@ -2352,74 +2369,11 @@ public:
             stack.append(*graphInfo.getClear()); // as dependencies finish, may move up the list
     }
     virtual IThreadPool &queryGraphPool() { return *graphPool; }
-
     virtual void wait()
     {
-        loop
-        {
-            CriticalBlock b(crit);
-            if (stopped || job.queryAborted() || job.queryPausing())
-                break;
-            if (0 == stack.ordinality() && 0 == toRun.ordinality() && 0 == running.ordinality())
-                break;
-            if (job.queryPausing())
-                break; // pending graphs will re-run on resubmission
-
-            bool signalled;
-            {
-                CriticalUnblock b(crit);
-                signalled = sem.wait(MEDIUMTIMEOUT);
-            }
-            if (signalled)
-            {
-                bool added = false;
-                if (running.ordinality() < limit)
-                {
-                    while (toRun.ordinality())
-                    {
-                        Linked<CGraphExecutorGraphInfo> graphInfo = &toRun.item(0);
-                        toRun.remove(0);
-                        running.append(*LINK(graphInfo));
-                        CGraphBase *subGraph = graphInfo->subGraph;
-                        PROGLOG("Wait: Launching graph thread for graphId=%" GIDPF "d", subGraph->queryGraphId());
-                        added = true;
-                        PooledThreadHandle h = graphPool->start(graphInfo.getClear());
-                        subGraph->poolThreadHandle = h;
-                        if (running.ordinality() >= limit)
-                            break;
-                    }
-                }
-                if (!added)
-                    Sleep(1000); // still more to come
-            }
-            else
-                PROGLOG("Waiting on executing graphs to complete.");
-            StringBuffer str("Currently running graphId = ");
-
-            if (running.ordinality())
-            {
-                ForEachItemIn(r, running)
-                {
-                    CGraphExecutorGraphInfo &graphInfo = running.item(r);
-                    str.append(graphInfo.subGraph->queryGraphId());
-                    if (r != running.ordinality()-1)
-                        str.append(", ");
-                }
-                PROGLOG("%s", str.str());
-            }
-            if (stack.ordinality())
-            {
-                str.clear().append("Queued in stack graphId = ");
-                ForEachItemIn(s, stack)
-                {
-                    CGraphExecutorGraphInfo &graphInfo = stack.item(s);
-                    str.append(graphInfo.subGraph->queryGraphId());
-                    if (s != stack.ordinality()-1)
-                        str.append(", ");
-                }
-                PROGLOG("%s", str.str());
-            }
-        }
+        PROGLOG("CGraphExecutor exiting, waiting on graph pool");
+        graphPool->joinAll();
+        PROGLOG("CGraphExecutor graphPool finished");
     }
 };
 
@@ -2738,12 +2692,6 @@ void CJobBase::startGraph(CGraphBase &graph, IGraphCallback &callback, bool chec
     graphExecutor->add(&graph, callback, checkDependencies, parentExtractSize, parentExtract);
 }
 
-void CJobBase::joinGraph(CGraphBase &graph)
-{
-    if (graph.poolThreadHandle)
-        graphExecutor->queryGraphPool().join(graph.poolThreadHandle);
-}
-
 ICodeContext &CJobBase::queryCodeContext() const
 {
     return *codeCtx;

+ 1 - 4
thorlcr/graph/thgraph.hpp

@@ -421,7 +421,7 @@ interface IPropertyTree;
 class graph_decl CGraphBase : public CInterface, implements IEclGraphResults, implements IThorChildGraph, implements IExceptionHandler
 {
     mutable CriticalSection crit;
-    CriticalSection evaluateCrit;
+    CriticalSection evaluateCrit, executeCrit;
     CGraphElementTable containers;
     CGraphElementArray sinks, activeSinks;
     bool sink, complete, global, localChild;
@@ -553,7 +553,6 @@ protected:
 public:
     IMPLEMENT_IINTERFACE;
 
-    PooledThreadHandle poolThreadHandle;
     CGraphArrayCopy dependentSubGraphs;
 
     CGraphBase(CJobBase &job);
@@ -608,7 +607,6 @@ public:
         }
     }
     virtual void executeSubGraph(size32_t parentExtractSz, const byte *parentExtract);
-    void join();
     virtual void execute(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool async);
     IThorActivityIterator *getIterator()
     {
@@ -814,7 +812,6 @@ public:
     virtual void startJob();
     virtual IGraphTempHandler *createTempHandler(bool errorOnMissing) = 0;
     virtual CGraphBase *createGraph() = 0;
-    void joinGraph(CGraphBase &graph);
     void startGraph(CGraphBase &graph, IGraphCallback &callback, bool checkDependencies, size32_t parentExtractSize, const byte *parentExtract);
 
     void addDependencies(IPropertyTree *xgmml, bool failIfMissing=true);

+ 1 - 4
thorlcr/slave/slavmain.cpp

@@ -366,10 +366,7 @@ public:
                             msg.append(false);
                             Owned<CSlaveGraph> graph = (CSlaveGraph *)job->getGraph(gid);
                             if (graph)
-                            {
-                                graph->getDone(msg);
-                                graph->join(); // graph will wind-up.
-                            }
+                                graph->getDone(msg); // graph will start to wind-up
                             else
                             {
                                 msg.clear();