浏览代码

Merge pull request #2113 from jakesmith/globalloopsubgraphs

Ensure subgraphs of a global loop are global

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 13 年之前
父节点
当前提交
68a10c5a5d
共有 4 个文件被更改,包括 57 次插入40 次删除
  1. 33 23
      thorlcr/graph/thgraph.cpp
  2. 1 1
      thorlcr/graph/thgraph.hpp
  3. 22 16
      thorlcr/graph/thgraphmaster.cpp
  4. 1 0
      thorlcr/graph/thgraphmaster.ipp

+ 33 - 23
thorlcr/graph/thgraph.cpp

@@ -539,6 +539,12 @@ void CGraphElementBase::connectInput(unsigned input, CGraphElementBase *inputAct
     inputAct->connectedOutputs.replace(new CIOConnection(this, input), inputOutIdx);
 }
 
+void CGraphElementBase::addAssociatedChildGraph(CGraphBase *childGraph)
+{
+    if (!associatedChildGraphs.contains(*childGraph))
+        associatedChildGraphs.append(*LINK(childGraph));
+}
+
 void CGraphElementBase::serializeCreateContext(MemoryBuffer &mb)
 {
     if (!onCreateCalled) return;
@@ -1672,7 +1678,28 @@ void CGraphBase::createFromXGMML(IPropertyTree *_node, CGraphBase *_owner, CGrap
         resultsGraph = this;
         tmpHandler.setown(queryJob().createTempHandler());
     }
-    
+
+    bool localChild = false;
+    if (owner && parentActivityId)
+    {
+        CGraphElementBase *parentElement = owner->queryElement(parentActivityId);
+        parentElement->addAssociatedChildGraph(this);
+        switch (parentElement->getKind())
+        {
+            case TAKlooprow:
+            case TAKloopcount:
+            case TAKloopdataset:
+            case TAKgraphloop:
+            case TAKparallelgraphloop:
+                if (!parentElement->queryLocal())
+                    global = true;
+                break;
+            default:
+                localChild = true;
+                break;
+        }
+    }
+
     Owned<IPropertyTreeIterator> nodes = xgmml->getElements("node");
     ForEach(*nodes)
     {
@@ -1688,29 +1715,12 @@ void CGraphBase::createFromXGMML(IPropertyTree *_node, CGraphBase *_owner, CGrap
         }
         else
         {
-            if (owner && parentActivityId)
+            if (localChild && !e.getPropBool("att[@name=\"coLocal\"]/@value", false))
             {
-                CGraphElementBase *parentElement = owner->queryElement(parentActivityId);
-                parentElement->addAssociatedChildGraph(this);
-                switch (parentElement->getKind())
-                {
-                    case TAKlooprow:
-                    case TAKloopcount:
-                    case TAKloopdataset:
-                    case TAKgraphloop:
-                    case TAKparallelgraphloop:
-                        break;
-                    default:
-                        // not a loop graph, force it to be local child graph
-                        if (!e.getPropBool("att[@name=\"coLocal\"]/@value", false))
-                        {
-                            IPropertyTree *att = createPTree("att");
-                            att->setProp("@name", "coLocal");
-                            att->setPropBool("@value", true);
-                            e.addPropTree("att", att);
-                        }
-                        break;
-                }
+                IPropertyTree *att = createPTree("att");
+                att->setProp("@name", "coLocal");
+                att->setPropBool("@value", true);
+                e.addPropTree("att", att);
             }
             CGraphElementBase *act = createGraphElement(e, *this, resultsGraph);
             addActivity(act);

+ 1 - 1
thorlcr/graph/thgraph.hpp

@@ -270,7 +270,7 @@ public:
     void clearConnections();
     virtual void connectInput(unsigned which, CGraphElementBase *input, unsigned inputOutIdx);
     void setResultsGraph(CGraphBase *_resultsGraph) { resultsGraph = _resultsGraph; }
-    void addAssociatedChildGraph(CGraphBase *childGraph) { associatedChildGraphs.append(*LINK(childGraph)); }
+    void addAssociatedChildGraph(CGraphBase *childGraph);
     void releaseIOs();
     void addDependsOn(CGraphBase *graph, int controlId);
     IThorGraphDependencyIterator *getDependsIterator() const;

+ 22 - 16
thorlcr/graph/thgraphmaster.cpp

@@ -192,14 +192,17 @@ void CSlaveMessageHandler::main()
                         DBGLOG("%s", msg.str());
                         parentExtract = graph->setParentCtx(parentExtractSz, parentExtract);
                     }
-                    Owned<IThorActivityIterator> iter = graph->getIterator();
-                    // onCreate all
-                    ForEach (*iter)
                     {
-                        CMasterGraphElement &element = (CMasterGraphElement &)iter->query();
-                        element.onCreate();
-                        if (isDiskInput(element.getKind()))
-                            element.onStart(parentExtractSz, parentExtract);
+                        CriticalBlock b(graph->queryCreateLock());
+                        Owned<IThorActivityIterator> iter = graph->getIterator();
+                        // onCreate all
+                        ForEach (*iter)
+                        {
+                            CMasterGraphElement &element = (CMasterGraphElement &)iter->query();
+                            element.onCreate();
+                            if (isDiskInput(element.getKind()))
+                                element.onStart(parentExtractSz, parentExtract);
+                        }
                     }
                     msg.clear();
                     graph->serializeCreateContexts(msg);
@@ -213,16 +216,19 @@ void CSlaveMessageHandler::main()
                     Owned<CMasterGraph> graph = (CMasterGraph *)job.getGraph(gid);
                     assertex(graph);
                     CGraphElementArray toSerialize;
-                    loop
                     {
-                        activity_id id;
-                        msg.read(id);
-                        if (!id)
-                            break;
-                        CMasterGraphElement *element = (CMasterGraphElement *)graph->queryElement(id);
-                        assertex(element);
-                        element->doCreateActivity();
-                        toSerialize.append(*LINK(element));
+                        CriticalBlock b(graph->queryCreateLock());
+                        loop
+                        {
+                            activity_id id;
+                            msg.read(id);
+                            if (!id)
+                                break;
+                            CMasterGraphElement *element = (CMasterGraphElement *)graph->queryElement(id);
+                            assertex(element);
+                            element->doCreateActivity();
+                            toSerialize.append(*LINK(element));
+                        }
                     }
                     msg.clear();
                     CMessageBuffer replyMsg;

+ 1 - 0
thorlcr/graph/thgraphmaster.ipp

@@ -63,6 +63,7 @@ public:
 
     virtual void init();
     virtual void executeSubGraph(size32_t parentExtractSz, const byte *parentExtract);
+    CriticalSection &queryCreateLock() { return createdCrit; }
     void handleSlaveDone(unsigned node, MemoryBuffer &mb);
     void serializeCreateContexts(MemoryBuffer &mb);
     void serializeStartCtxs(MemoryBuffer &mb);