Ver código fonte

Merge pull request #1261 from jakesmith/gh-1159-fix

Fix graph executor, needs to block if running
Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 13 anos atrás
pai
commit
1b285807a5
1 arquivos alterados com 44 adições e 10 exclusões
  1. 44 10
      thorlcr/graph/thgraph.cpp

+ 44 - 10
thorlcr/graph/thgraph.cpp

@@ -2015,8 +2015,9 @@ class CGraphExecutor : public CInterface, implements IGraphExecutor
     UnsignedArray seen;
     bool stopped;
     unsigned limit;
+    unsigned waitOnRunning;
     CriticalSection crit;
-    Semaphore sem;
+    Semaphore sem, runningSem;
     Owned<IThreadPool> graphPool;
 
     class CGraphExecutorFactory : public CInterface, implements IThreadFactory
@@ -2068,12 +2069,24 @@ class CGraphExecutor : public CInterface, implements IGraphExecutor
             return new CGraphExecutorThread();
         }
     } *factory;
+
+    CGraphExecutorGraphInfo *findRunning(graph_id gid)
+    {
+        ForEachItemIn(r, running)
+        {
+            CGraphExecutorGraphInfo *graphInfo = &running.item(r);
+            if (gid == graphInfo->subGraph->queryGraphId())
+                return graphInfo;
+        }
+        return NULL;
+    }
 public:
     IMPLEMENT_IINTERFACE;
 
     CGraphExecutor(CJobBase &_job) : job(_job)
     {
         limit = (unsigned)job.getWorkUnitValueInt("concurrentSubGraphs", globals->getPropInt("@concurrentSubGraphs", 1));
+        waitOnRunning = 0;
         stopped = false;
         factory = new CGraphExecutorFactory(*this);
         graphPool.setown(createThreadPool("CGraphExecutor pool", factory, &job, limit));
@@ -2088,6 +2101,11 @@ public:
     {
         CriticalBlock b(crit);
         running.zap(doneGraphInfo);
+        if (waitOnRunning)
+        {
+            runningSem.signal(waitOnRunning);
+            waitOnRunning = 0;
+        }
 
         if (e || job.queryAborted())
         {
@@ -2148,24 +2166,39 @@ public:
 // IGraphExecutor
     virtual void add(CGraphBase *subGraph, IGraphCallback &callback, bool checkDependencies, size32_t parentExtractSz, const byte *parentExtract)
     {
-        CriticalBlock b(crit);
-        if (job.queryPausing())
-            return;
-        if (subGraph->isComplete())
+        bool alreadyRunning;
+        {
+            CriticalBlock b(crit);
+            if (job.queryPausing())
+                return;
+            if (subGraph->isComplete())
+                return;
+            alreadyRunning = NULL != findRunning(subGraph->queryGraphId());
+            if (alreadyRunning)
+                ++waitOnRunning;
+        }
+        if (alreadyRunning)
+        {
+            loop
+            {
+                PROGLOG("Waiting on subgraph %"GIDPF"d", subGraph->queryGraphId());
+                if (runningSem.wait(MEDIUMTIMEOUT) || job.queryAborted() || job.queryPausing())
+                    break;
+            }
             return;
-        ForEachItemIn(s, seen)
+        }
+        else
         {
-            if (subGraph->queryGraphId() == seen.item(s))
+            CriticalBlock b(crit);
+            if (seen.contains(subGraph->queryGraphId()))
                 return; // already queued;
+            seen.append(subGraph->queryGraphId());
         }
-        seen.append(subGraph->queryGraphId());
-
         if (!subGraph->prepare(parentExtractSz, parentExtract, checkDependencies, true, true))
         {
             subGraph->setComplete();
             return;
         }
-
         if (subGraph->dependentSubGraphs.ordinality())
         {
             bool dependenciesDone = true;
@@ -2182,6 +2215,7 @@ public:
                 subGraph->dependentSubGraphs.kill(); // none to track anymore
         }
         Owned<CGraphExecutorGraphInfo> graphInfo = new CGraphExecutorGraphInfo(*this, subGraph, callback, parentExtract, parentExtractSz);
+        CriticalBlock b(crit);
         if (0 == subGraph->dependentSubGraphs.ordinality())
         {
             if (running.ordinality()<limit)