浏览代码

Fix gh-851 Thor graph execute rate condition

The graph executor thread could cause a pending graph to execute too
early. The consquence was that on the slave this early subgraph was
queued, but nothing polled to run it and master stalled waiting for it
to initialize

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 13 年之前
父节点
当前提交
d5985473a1
共有 2 个文件被更改,包括 15 次插入13 次删除
  1. 14 12
      thorlcr/graph/thgraph.cpp
  2. 1 1
      thorlcr/slave/slavmain.cpp

+ 14 - 12
thorlcr/graph/thgraph.cpp

@@ -2235,20 +2235,22 @@ public:
                     break;
                     break;
                 }
                 }
                 if (job.queryPausing()) return; // pending graphs will re-run on resubmission
                 if (job.queryPausing()) return; // pending graphs will re-run on resubmission
-                assertex(running.ordinality() <= limit);
                 bool added = false;
                 bool added = false;
-                while (toRun.ordinality())
+                if (running.ordinality() < limit)
                 {
                 {
-                    Linked<CGraphExecutorGraphInfo> graphInfo = &toRun.item(0);
-                    toRun.remove(0);
-                    running.append(*LINK(graphInfo));
-                    CGraphBase *subGraph = graphInfo->subGraph;
-                    PROGLOG("Wait: Launching graph thread for graphId=%"GIDPF"d", subGraph->queryGraphId());
-                    added = true;
-                    PooledThreadHandle h = graphPool->start(graphInfo.getClear());
-                    subGraph->poolThreadHandle = h;
-                    if (running.ordinality() >= limit)
-                        break;
+                    while (toRun.ordinality())
+                    {
+                        Linked<CGraphExecutorGraphInfo> graphInfo = &toRun.item(0);
+                        toRun.remove(0);
+                        running.append(*LINK(graphInfo));
+                        CGraphBase *subGraph = graphInfo->subGraph;
+                        PROGLOG("Wait: Launching graph thread for graphId=%"GIDPF"d", subGraph->queryGraphId());
+                        added = true;
+                        PooledThreadHandle h = graphPool->start(graphInfo.getClear());
+                        subGraph->poolThreadHandle = h;
+                        if (running.ordinality() >= limit)
+                            break;
+                    }
                 }
                 }
                 if (!added)
                 if (!added)
                     Sleep(1000); // still more to come
                     Sleep(1000); // still more to come

+ 1 - 1
thorlcr/slave/slavmain.cpp

@@ -258,10 +258,10 @@ public:
                         CJobSlave *job = jobs.find(jobKey.get());
                         CJobSlave *job = jobs.find(jobKey.get());
                         if (!job)
                         if (!job)
                             throw MakeStringException(0, "Job not found: %s", jobKey.get());
                             throw MakeStringException(0, "Job not found: %s", jobKey.get());
-                        PROGLOG("GraphInit: %s", jobKey.get());
                         Owned<IPropertyTree> graphNode = createPTree(msg);
                         Owned<IPropertyTree> graphNode = createPTree(msg);
                         Owned<CSlaveGraph> subGraph = (CSlaveGraph *)job->createGraph();
                         Owned<CSlaveGraph> subGraph = (CSlaveGraph *)job->createGraph();
                         subGraph->createFromXGMML(graphNode, NULL, NULL, NULL);
                         subGraph->createFromXGMML(graphNode, NULL, NULL, NULL);
+                        PROGLOG("GraphInit: %s, graphId=%"GIDPF"d", jobKey.get(), subGraph->queryGraphId());
                         subGraph->setExecuteReplyTag(subGraph->queryJob().deserializeMPTag(msg));
                         subGraph->setExecuteReplyTag(subGraph->queryJob().deserializeMPTag(msg));
                         unsigned len;
                         unsigned len;
                         msg.read(len);
                         msg.read(len);