فهرست منبع

Merge pull request #7802 from jakesmith/hpcc-14234

HPCC-14234 WHEN subgraph execution race condition fix

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 9 سال پیش
والد
کامیت
2da6570af6
3فایلهای تغییر یافته به همراه53 افزوده شده و 111 حذف شده
  1. 51 103
      thorlcr/graph/thgraph.cpp
  2. 1 4
      thorlcr/graph/thgraph.hpp
  3. 1 4
      thorlcr/slave/slavmain.cpp

+ 51 - 103
thorlcr/graph/thgraph.cpp

@@ -1110,7 +1110,6 @@ CGraphBase::CGraphBase(CJobBase &_job) : job(_job)
     startBarrier = waitBarrier = doneBarrier = NULL;
     startBarrier = waitBarrier = doneBarrier = NULL;
     mpTag = waitBarrierTag = startBarrierTag = doneBarrierTag = TAG_NULL;
     mpTag = waitBarrierTag = startBarrierTag = doneBarrierTag = TAG_NULL;
     executeReplyTag = TAG_NULL;
     executeReplyTag = TAG_NULL;
-    poolThreadHandle = 0;
     parentExtractSz = 0;
     parentExtractSz = 0;
     counter = 0; // loop/graph counter, will be set by loop/graph activity if needed
     counter = 0; // loop/graph counter, will be set by loop/graph activity if needed
 }
 }
@@ -1253,6 +1252,7 @@ bool CGraphBase::preStart(size32_t parentExtractSz, const byte *parentExtract)
 
 
 void CGraphBase::executeSubGraph(size32_t parentExtractSz, const byte *parentExtract)
 void CGraphBase::executeSubGraph(size32_t parentExtractSz, const byte *parentExtract)
 {
 {
+    CriticalBlock b(executeCrit);
     if (job.queryPausing())
     if (job.queryPausing())
         return;
         return;
     Owned<IException> exception;
     Owned<IException> exception;
@@ -1319,12 +1319,6 @@ void CGraphBase::execute(size32_t _parentExtractSz, const byte *parentExtract, b
     }
     }
 }
 }
 
 
-void CGraphBase::join()
-{
-    if (poolThreadHandle)
-        queryJob().joinGraph(*this);
-}
-
 void CGraphBase::doExecute(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies)
 void CGraphBase::doExecute(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies)
 {
 {
     if (isComplete()) return;
     if (isComplete()) return;
@@ -2137,7 +2131,7 @@ class CGraphExecutor : public CInterface, implements IGraphExecutor
     unsigned limit;
     unsigned limit;
     unsigned waitOnRunning;
     unsigned waitOnRunning;
     CriticalSection crit;
     CriticalSection crit;
-    Semaphore sem, runningSem;
+    Semaphore runningSem;
     Owned<IThreadPool> graphPool;
     Owned<IThreadPool> graphPool;
 
 
     class CGraphExecutorFactory : public CInterface, implements IThreadFactory
     class CGraphExecutorFactory : public CInterface, implements IThreadFactory
@@ -2164,24 +2158,35 @@ class CGraphExecutor : public CInterface, implements IGraphExecutor
                 }
                 }
                 void main()
                 void main()
                 {
                 {
-                    Linked<CGraphBase> graph = graphInfo->subGraph;
-                    Owned<IException> e;
-                    try
-                    {
-                        graphInfo->callback.runSubgraph(*graph, graphInfo->parentExtractMb.length(), (const byte *)graphInfo->parentExtractMb.toByteArray());
-                    }
-                    catch (IException *_e)
-                    {
-                        e.setown(_e);
-                    }
-                    try { graphInfo->executor.graphDone(*graphInfo, e); }
-                    catch (IException *e)
+                    loop
                     {
                     {
-                        GraphPrintLog(graph, e, "graphDone");
-                        e->Release();
+                        Linked<CGraphBase> graph = graphInfo->subGraph;
+                        Owned<IException> e;
+                        try
+                        {
+                            PROGLOG("CGraphExecutor: Running graph, graphId=%" GIDPF "d", graph->queryGraphId());
+                            graphInfo->callback.runSubgraph(*graph, graphInfo->parentExtractMb.length(), (const byte *)graphInfo->parentExtractMb.toByteArray());
+                        }
+                        catch (IException *_e)
+                        {
+                            e.setown(_e);
+                        }
+                        Owned<CGraphExecutorGraphInfo> nextGraphInfo;
+                        try
+                        {
+                            nextGraphInfo.setown(graphInfo->executor.graphDone(*graphInfo, e));
+                        }
+                        catch (IException *e)
+                        {
+                            GraphPrintLog(graph, e, "graphDone");
+                            e->Release();
+                        }
+                        if (e)
+                            throw e.getClear();
+                        if (!nextGraphInfo)
+                            return;
+                        graphInfo.setown(nextGraphInfo.getClear());
                     }
                     }
-                    if (e)
-                        throw e.getClear();
                 }
                 }
                 bool canReuse() { return true; }
                 bool canReuse() { return true; }
                 bool stop() { return true; }
                 bool stop() { return true; }
@@ -2206,6 +2211,7 @@ public:
     CGraphExecutor(CJobBase &_job) : job(_job)
     CGraphExecutor(CJobBase &_job) : job(_job)
     {
     {
         limit = (unsigned)job.getWorkUnitValueInt("concurrentSubGraphs", globals->getPropInt("@concurrentSubGraphs", 1));
         limit = (unsigned)job.getWorkUnitValueInt("concurrentSubGraphs", globals->getPropInt("@concurrentSubGraphs", 1));
+        PROGLOG("CGraphExecutor: limit = %d", limit);
         waitOnRunning = 0;
         waitOnRunning = 0;
         stopped = false;
         stopped = false;
         factory = new CGraphExecutorFactory(*this);
         factory = new CGraphExecutorFactory(*this);
@@ -2214,10 +2220,10 @@ public:
     ~CGraphExecutor()
     ~CGraphExecutor()
     {
     {
         stopped = true;
         stopped = true;
-        sem.signal();
+        graphPool->joinAll();
         factory->Release();
         factory->Release();
     }
     }
-    void graphDone(CGraphExecutorGraphInfo &doneGraphInfo, IException *e)
+    CGraphExecutorGraphInfo *graphDone(CGraphExecutorGraphInfo &doneGraphInfo, IException *e)
     {
     {
         CriticalBlock b(crit);
         CriticalBlock b(crit);
         running.zap(doneGraphInfo);
         running.zap(doneGraphInfo);
@@ -2231,8 +2237,7 @@ public:
         {
         {
             stopped = true;
             stopped = true;
             stack.kill();
             stack.kill();
-            sem.signal();
-            return;
+            return NULL;
         }
         }
         if (job.queryPausing())
         if (job.queryPausing())
             stack.kill();
             stack.kill();
@@ -2281,7 +2286,20 @@ public:
         }
         }
         job.markWuDirty();
         job.markWuDirty();
         PROGLOG("CGraphExecutor running=%d, waitingToRun=%d, dependentsWaiting=%d", running.ordinality(), toRun.ordinality(), stack.ordinality());
         PROGLOG("CGraphExecutor running=%d, waitingToRun=%d, dependentsWaiting=%d", running.ordinality(), toRun.ordinality(), stack.ordinality());
-        sem.signal();
+
+        while (toRun.ordinality())
+        {
+            if (job.queryPausing())
+                return NULL;
+            Linked<CGraphExecutorGraphInfo> nextGraphInfo = &toRun.item(0);
+            toRun.remove(0);
+            if (!nextGraphInfo->subGraph->isComplete() && (NULL == findRunning(nextGraphInfo->subGraph->queryGraphId())))
+            {
+                running.append(*nextGraphInfo.getLink());
+                return nextGraphInfo.getClear();
+            }
+        }
+        return NULL;
     }
     }
 // IGraphExecutor
 // IGraphExecutor
     virtual void add(CGraphBase *subGraph, IGraphCallback &callback, bool checkDependencies, size32_t parentExtractSz, const byte *parentExtract)
     virtual void add(CGraphBase *subGraph, IGraphCallback &callback, bool checkDependencies, size32_t parentExtractSz, const byte *parentExtract)
@@ -2342,8 +2360,7 @@ public:
             {
             {
                 running.append(*LINK(graphInfo));
                 running.append(*LINK(graphInfo));
                 PROGLOG("Add: Launching graph thread for graphId=%" GIDPF "d", subGraph->queryGraphId());
                 PROGLOG("Add: Launching graph thread for graphId=%" GIDPF "d", subGraph->queryGraphId());
-                PooledThreadHandle h = graphPool->start(graphInfo.getClear());
-                subGraph->poolThreadHandle = h;
+                graphPool->start(graphInfo.getClear());
             }
             }
             else
             else
                 stack.add(*graphInfo.getClear(), 0); // push to front, no dependency, free to run next.
                 stack.add(*graphInfo.getClear(), 0); // push to front, no dependency, free to run next.
@@ -2352,74 +2369,11 @@ public:
             stack.append(*graphInfo.getClear()); // as dependencies finish, may move up the list
             stack.append(*graphInfo.getClear()); // as dependencies finish, may move up the list
     }
     }
     virtual IThreadPool &queryGraphPool() { return *graphPool; }
     virtual IThreadPool &queryGraphPool() { return *graphPool; }
-
     virtual void wait()
     virtual void wait()
     {
     {
-        loop
-        {
-            CriticalBlock b(crit);
-            if (stopped || job.queryAborted() || job.queryPausing())
-                break;
-            if (0 == stack.ordinality() && 0 == toRun.ordinality() && 0 == running.ordinality())
-                break;
-            if (job.queryPausing())
-                break; // pending graphs will re-run on resubmission
-
-            bool signalled;
-            {
-                CriticalUnblock b(crit);
-                signalled = sem.wait(MEDIUMTIMEOUT);
-            }
-            if (signalled)
-            {
-                bool added = false;
-                if (running.ordinality() < limit)
-                {
-                    while (toRun.ordinality())
-                    {
-                        Linked<CGraphExecutorGraphInfo> graphInfo = &toRun.item(0);
-                        toRun.remove(0);
-                        running.append(*LINK(graphInfo));
-                        CGraphBase *subGraph = graphInfo->subGraph;
-                        PROGLOG("Wait: Launching graph thread for graphId=%" GIDPF "d", subGraph->queryGraphId());
-                        added = true;
-                        PooledThreadHandle h = graphPool->start(graphInfo.getClear());
-                        subGraph->poolThreadHandle = h;
-                        if (running.ordinality() >= limit)
-                            break;
-                    }
-                }
-                if (!added)
-                    Sleep(1000); // still more to come
-            }
-            else
-                PROGLOG("Waiting on executing graphs to complete.");
-            StringBuffer str("Currently running graphId = ");
-
-            if (running.ordinality())
-            {
-                ForEachItemIn(r, running)
-                {
-                    CGraphExecutorGraphInfo &graphInfo = running.item(r);
-                    str.append(graphInfo.subGraph->queryGraphId());
-                    if (r != running.ordinality()-1)
-                        str.append(", ");
-                }
-                PROGLOG("%s", str.str());
-            }
-            if (stack.ordinality())
-            {
-                str.clear().append("Queued in stack graphId = ");
-                ForEachItemIn(s, stack)
-                {
-                    CGraphExecutorGraphInfo &graphInfo = stack.item(s);
-                    str.append(graphInfo.subGraph->queryGraphId());
-                    if (s != stack.ordinality()-1)
-                        str.append(", ");
-                }
-                PROGLOG("%s", str.str());
-            }
-        }
+        PROGLOG("CGraphExecutor exiting, waiting on graph pool");
+        graphPool->joinAll();
+        PROGLOG("CGraphExecutor graphPool finished");
     }
     }
 };
 };
 
 
@@ -2738,12 +2692,6 @@ void CJobBase::startGraph(CGraphBase &graph, IGraphCallback &callback, bool chec
     graphExecutor->add(&graph, callback, checkDependencies, parentExtractSize, parentExtract);
     graphExecutor->add(&graph, callback, checkDependencies, parentExtractSize, parentExtract);
 }
 }
 
 
-void CJobBase::joinGraph(CGraphBase &graph)
-{
-    if (graph.poolThreadHandle)
-        graphExecutor->queryGraphPool().join(graph.poolThreadHandle);
-}
-
 ICodeContext &CJobBase::queryCodeContext() const
 ICodeContext &CJobBase::queryCodeContext() const
 {
 {
     return *codeCtx;
     return *codeCtx;

+ 1 - 4
thorlcr/graph/thgraph.hpp

@@ -421,7 +421,7 @@ interface IPropertyTree;
 class graph_decl CGraphBase : public CInterface, implements IEclGraphResults, implements IThorChildGraph, implements IExceptionHandler
 class graph_decl CGraphBase : public CInterface, implements IEclGraphResults, implements IThorChildGraph, implements IExceptionHandler
 {
 {
     mutable CriticalSection crit;
     mutable CriticalSection crit;
-    CriticalSection evaluateCrit;
+    CriticalSection evaluateCrit, executeCrit;
     CGraphElementTable containers;
     CGraphElementTable containers;
     CGraphElementArray sinks, activeSinks;
     CGraphElementArray sinks, activeSinks;
     bool sink, complete, global, localChild;
     bool sink, complete, global, localChild;
@@ -553,7 +553,6 @@ protected:
 public:
 public:
     IMPLEMENT_IINTERFACE;
     IMPLEMENT_IINTERFACE;
 
 
-    PooledThreadHandle poolThreadHandle;
     CGraphArrayCopy dependentSubGraphs;
     CGraphArrayCopy dependentSubGraphs;
 
 
     CGraphBase(CJobBase &job);
     CGraphBase(CJobBase &job);
@@ -608,7 +607,6 @@ public:
         }
         }
     }
     }
     virtual void executeSubGraph(size32_t parentExtractSz, const byte *parentExtract);
     virtual void executeSubGraph(size32_t parentExtractSz, const byte *parentExtract);
-    void join();
     virtual void execute(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool async);
     virtual void execute(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool async);
     IThorActivityIterator *getIterator()
     IThorActivityIterator *getIterator()
     {
     {
@@ -814,7 +812,6 @@ public:
     virtual void startJob();
     virtual void startJob();
     virtual IGraphTempHandler *createTempHandler(bool errorOnMissing) = 0;
     virtual IGraphTempHandler *createTempHandler(bool errorOnMissing) = 0;
     virtual CGraphBase *createGraph() = 0;
     virtual CGraphBase *createGraph() = 0;
-    void joinGraph(CGraphBase &graph);
     void startGraph(CGraphBase &graph, IGraphCallback &callback, bool checkDependencies, size32_t parentExtractSize, const byte *parentExtract);
     void startGraph(CGraphBase &graph, IGraphCallback &callback, bool checkDependencies, size32_t parentExtractSize, const byte *parentExtract);
 
 
     void addDependencies(IPropertyTree *xgmml, bool failIfMissing=true);
     void addDependencies(IPropertyTree *xgmml, bool failIfMissing=true);

+ 1 - 4
thorlcr/slave/slavmain.cpp

@@ -366,10 +366,7 @@ public:
                             msg.append(false);
                             msg.append(false);
                             Owned<CSlaveGraph> graph = (CSlaveGraph *)job->getGraph(gid);
                             Owned<CSlaveGraph> graph = (CSlaveGraph *)job->getGraph(gid);
                             if (graph)
                             if (graph)
-                            {
-                                graph->getDone(msg);
-                                graph->join(); // graph will wind-up.
-                            }
+                                graph->getDone(msg); // graph will start to wind-up
                             else
                             else
                             {
                             {
                                 msg.clear();
                                 msg.clear();