|
@@ -507,7 +507,6 @@ void CGraphElementBase::addAssociatedChildGraph(CGraphBase *childGraph)
|
|
|
void CGraphElementBase::serializeCreateContext(MemoryBuffer &mb)
|
|
|
{
|
|
|
if (!onCreateCalled) return;
|
|
|
- mb.append(queryId());
|
|
|
DelayedSizeMarker sizeMark(mb);
|
|
|
queryHelper()->serializeCreateContext(mb);
|
|
|
sizeMark.write();
|
|
@@ -515,7 +514,7 @@ void CGraphElementBase::serializeCreateContext(MemoryBuffer &mb)
|
|
|
|
|
|
void CGraphElementBase::serializeStartContext(MemoryBuffer &mb)
|
|
|
{
|
|
|
- assertex(onStartCalled);
|
|
|
+ if (!onStartCalled) return;
|
|
|
DelayedSizeMarker sizeMark(mb);
|
|
|
queryHelper()->serializeStartContext(mb);
|
|
|
sizeMark.write();
|
|
@@ -625,12 +624,22 @@ bool CGraphElementBase::prepareContext(size32_t parentExtractSz, const byte *par
|
|
|
owner->ifs.append(*this);
|
|
|
// fall through
|
|
|
case TAKif:
|
|
|
+ case TAKifaction:
|
|
|
{
|
|
|
if (_shortCircuit) return true;
|
|
|
onCreate();
|
|
|
onStart(parentExtractSz, parentExtract);
|
|
|
IHThorIfArg *helper = (IHThorIfArg *)baseHelper.get();
|
|
|
- whichBranch = helper->getCondition() ? 0 : 1; // True argument preceeds false...
|
|
|
+ whichBranch = helper->getCondition() ? 0 : 1; // True argument precedes false...
|
|
|
+ /* NB - The executeDependencies code below is only needed if actionLinkInNewGraph=true, which is no longer the default
|
|
|
+ * It should be removed, once we are positive there are no issues with in-line conditional actions
|
|
|
+ */
|
|
|
+ if (TAKifaction == getKind())
|
|
|
+ {
|
|
|
+ if (!executeDependencies(parentExtractSz, parentExtract, whichBranch+1, async)) //NB whenId 1 based
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
if (inputs.queryItem(whichBranch))
|
|
|
{
|
|
|
if (!whichBranchBitSet->testSet(whichBranch)) // if not set, new
|
|
@@ -678,6 +687,19 @@ bool CGraphElementBase::prepareContext(size32_t parentExtractSz, const byte *par
|
|
|
return true;
|
|
|
break;
|
|
|
}
|
|
|
+ case TAKsequential:
|
|
|
+ case TAKparallel:
|
|
|
+ {
|
|
|
+ /* NB - The executeDependencies code below is only needed if actionLinkInNewGraph=true, which is no longer the default
|
|
|
+ * It should be removed, once we are positive there are no issues with in-line sequential/parallel activities
|
|
|
+ */
|
|
|
+ for (unsigned s=1; s<=dependsOn.ordinality(); s++)
|
|
|
+ {
|
|
|
+ if (!executeDependencies(parentExtractSz, parentExtract, s, async))
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
ForEachItemIn(i, inputs)
|
|
|
{
|
|
@@ -763,6 +785,26 @@ void CGraphElementBase::createActivity(size32_t parentExtractSz, const byte *par
|
|
|
factorySet(TAKnull);
|
|
|
}
|
|
|
break;
|
|
|
+ case TAKifaction:
|
|
|
+ if (inputs.queryItem(whichBranch))
|
|
|
+ {
|
|
|
+ CGraphElementBase *input = inputs.item(whichBranch)->activity;
|
|
|
+ input->createActivity(parentExtractSz, parentExtract);
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case TAKsequential:
|
|
|
+ case TAKparallel:
|
|
|
+ {
|
|
|
+ ForEachItemIn(i, inputs)
|
|
|
+ {
|
|
|
+ if (inputs.queryItem(i))
|
|
|
+ {
|
|
|
+ CGraphElementBase *input = inputs.item(i)->activity;
|
|
|
+ input->createActivity(parentExtractSz, parentExtract);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
default:
|
|
|
if (!isEof)
|
|
|
{
|
|
@@ -1060,16 +1102,21 @@ void CGraphBase::clean()
|
|
|
disconnectActivities();
|
|
|
containers.kill();
|
|
|
sinks.kill();
|
|
|
+ connectedSinks.kill();
|
|
|
}
|
|
|
|
|
|
void CGraphBase::serializeCreateContexts(MemoryBuffer &mb)
|
|
|
{
|
|
|
DelayedSizeMarker sizeMark(mb);
|
|
|
- Owned<IThorActivityIterator> iter = (queryOwner() && !isGlobal()) ? getIterator() : getTraverseIterator(true); // all if non-global-child, or graph with conditionals
|
|
|
+ Owned<IThorActivityIterator> iter = getIterator();
|
|
|
ForEach (*iter)
|
|
|
{
|
|
|
CGraphElementBase &element = iter->query();
|
|
|
- element.serializeCreateContext(mb);
|
|
|
+ if (element.isOnCreated())
|
|
|
+ {
|
|
|
+ mb.append(element.queryId());
|
|
|
+ element.serializeCreateContext(mb);
|
|
|
+ }
|
|
|
}
|
|
|
mb.append((activity_id)0);
|
|
|
sizeMark.write();
|
|
@@ -1078,12 +1125,15 @@ void CGraphBase::serializeCreateContexts(MemoryBuffer &mb)
|
|
|
void CGraphBase::serializeStartContexts(MemoryBuffer &mb)
|
|
|
{
|
|
|
DelayedSizeMarker sizeMark(mb);
|
|
|
- Owned<IThorActivityIterator> iter = getTraverseIterator();
|
|
|
+ Owned<IThorActivityIterator> iter = getIterator();
|
|
|
ForEach (*iter)
|
|
|
{
|
|
|
CGraphElementBase &element = iter->query();
|
|
|
- mb.append(element.queryId());
|
|
|
- element.serializeStartContext(mb);
|
|
|
+ if (element.isOnStarted())
|
|
|
+ {
|
|
|
+ mb.append(element.queryId());
|
|
|
+ element.serializeStartContext(mb);
|
|
|
+ }
|
|
|
}
|
|
|
mb.append((activity_id)0);
|
|
|
sizeMark.write();
|
|
@@ -1127,10 +1177,10 @@ void CGraphBase::reset()
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- CGraphElementIterator iterC(containers);
|
|
|
- ForEach(iterC)
|
|
|
+ Owned<IThorActivityIterator> iter = getIterator();
|
|
|
+ ForEach(*iter)
|
|
|
{
|
|
|
- CGraphElementBase &element = iterC.query();
|
|
|
+ CGraphElementBase &element = iter->query();
|
|
|
element.reset();
|
|
|
}
|
|
|
dependentSubGraphs.kill();
|
|
@@ -1162,7 +1212,7 @@ bool CGraphBase::fireException(IException *e)
|
|
|
|
|
|
bool CGraphBase::preStart(size32_t parentExtractSz, const byte *parentExtract)
|
|
|
{
|
|
|
- Owned<IThorActivityIterator> iter = getTraverseIterator();
|
|
|
+ Owned<IThorActivityIterator> iter = getConnectedIterator();
|
|
|
ForEach(*iter)
|
|
|
{
|
|
|
CGraphElementBase &element = iter->query();
|
|
@@ -1270,7 +1320,7 @@ void CGraphBase::doExecute(size32_t parentExtractSz, const byte *parentExtract,
|
|
|
{
|
|
|
if (started)
|
|
|
reset();
|
|
|
- Owned<IThorActivityIterator> iter = getTraverseIterator();
|
|
|
+ Owned<IThorActivityIterator> iter = getConnectedIterator();
|
|
|
ForEach(*iter)
|
|
|
{
|
|
|
CGraphElementBase &element = iter->query();
|
|
@@ -1349,10 +1399,10 @@ bool CGraphBase::prepare(size32_t parentExtractSz, const byte *parentExtract, bo
|
|
|
|
|
|
void CGraphBase::create(size32_t parentExtractSz, const byte *parentExtract)
|
|
|
{
|
|
|
- CGraphElementIterator iterC(containers);
|
|
|
- ForEach(iterC)
|
|
|
+ Owned<IThorActivityIterator> iter = getIterator();
|
|
|
+ ForEach(*iter)
|
|
|
{
|
|
|
- CGraphElementBase &element = iterC.query();
|
|
|
+ CGraphElementBase &element = iter->query();
|
|
|
element.clearConnections();
|
|
|
}
|
|
|
ForEachItemIn(s, sinks)
|
|
@@ -1360,13 +1410,20 @@ void CGraphBase::create(size32_t parentExtractSz, const byte *parentExtract)
|
|
|
CGraphElementBase &sink = sinks.item(s);
|
|
|
sink.createActivity(parentExtractSz, parentExtract);
|
|
|
}
|
|
|
+ connectedSinks.kill();
|
|
|
+ ForEach(*iter)
|
|
|
+ {
|
|
|
+ CGraphElementBase &element = iter->query();
|
|
|
+ if (element.queryActivity() && 0 == element.connectedOutputs.ordinality())
|
|
|
+ connectedSinks.append(*LINK(&element));
|
|
|
+ }
|
|
|
created = true;
|
|
|
}
|
|
|
|
|
|
void CGraphBase::done()
|
|
|
{
|
|
|
if (aborted) return; // activity done methods only called on success
|
|
|
- Owned<IThorActivityIterator> iter = getTraverseIterator();
|
|
|
+ Owned<IThorActivityIterator> iter = getConnectedIterator();
|
|
|
ForEach (*iter)
|
|
|
{
|
|
|
CGraphElementBase &element = iter->query();
|
|
@@ -1515,6 +1572,7 @@ public:
|
|
|
switch (cur->getKind())
|
|
|
{
|
|
|
case TAKif:
|
|
|
+ case TAKifaction:
|
|
|
case TAKchildif:
|
|
|
case TAKchildcase:
|
|
|
case TAKcase:
|
|
@@ -1552,12 +1610,9 @@ public:
|
|
|
}
|
|
|
};
|
|
|
|
|
|
-IThorActivityIterator *CGraphBase::getTraverseIterator(bool all)
|
|
|
+IThorActivityIterator *CGraphBase::getConnectedIterator()
|
|
|
{
|
|
|
- if (all)
|
|
|
- return new CGraphTraverseIterator(*this); // all sinks + conditionals
|
|
|
- else
|
|
|
- return new CGraphTraverseConnectedIterator(*this);
|
|
|
+ return new CGraphTraverseConnectedIterator(*this);
|
|
|
}
|
|
|
|
|
|
bool CGraphBase::wait(unsigned timeout)
|
|
@@ -1585,7 +1640,7 @@ bool CGraphBase::wait(unsigned timeout)
|
|
|
throw MakeGraphException(graph, 0, "Timed out waiting for graph to end");
|
|
|
}
|
|
|
} waitException(this);
|
|
|
- Owned<IThorActivityIterator> iter = getTraverseIterator();
|
|
|
+ Owned<IThorActivityIterator> iter = getConnectedIterator();
|
|
|
ForEach (*iter)
|
|
|
{
|
|
|
CGraphElementBase &element = iter->query();
|
|
@@ -1647,7 +1702,7 @@ void CGraphBase::abort(IException *e)
|
|
|
}
|
|
|
if (started && !graphDone)
|
|
|
{
|
|
|
- Owned<IThorActivityIterator> iter = getTraverseIterator();
|
|
|
+ Owned<IThorActivityIterator> iter = getConnectedIterator();
|
|
|
ForEach (*iter)
|
|
|
{
|
|
|
iter->query().abort(e); // JCSMORE - could do in parallel, they can take some time to timeout
|
|
@@ -1684,9 +1739,9 @@ void CGraphBase::GraphPrintLog(IException *e)
|
|
|
|
|
|
void CGraphBase::setLogging(bool tf)
|
|
|
{
|
|
|
- CGraphElementIterator iterC(containers);
|
|
|
- ForEach(iterC)
|
|
|
- iterC.query().setLogging(tf);
|
|
|
+ Owned<IThorActivityIterator> iter = getIterator();
|
|
|
+ ForEach(*iter)
|
|
|
+ iter->query().setLogging(tf);
|
|
|
}
|
|
|
|
|
|
void CGraphBase::createFromXGMML(IPropertyTree *_node, CGraphBase *_owner, CGraphBase *_parent, CGraphBase *resultsGraph)
|
|
@@ -1767,6 +1822,20 @@ void CGraphBase::createFromXGMML(IPropertyTree *_node, CGraphBase *_owner, CGrap
|
|
|
CGraphElementBase *target = queryElement(edge.getPropInt("@target"));
|
|
|
target->addInput(targetInput, source, sourceOutput);
|
|
|
}
|
|
|
+ Owned<IThorActivityIterator> iter = getIterator();
|
|
|
+ ForEach(*iter)
|
|
|
+ {
|
|
|
+ CGraphElementBase &element = iter->query();
|
|
|
+ if (0 == element.getOutputs())
|
|
|
+ {
|
|
|
+ /* JCSMORE - Making some outputs conditional, will require:
|
|
|
+ * a) Pass through information as to which dependent graph causes this graph (and this sink) to execute)
|
|
|
+ * b) Allow the subgraph to re-executed by other dependent subgraphs and avoid re-executing completed sinks
|
|
|
+ * c) Keep common points (splitters) around (preferably in memory), re-execution of graph will need them
|
|
|
+ */
|
|
|
+ sinks.append(*LINK(&element));
|
|
|
+ }
|
|
|
+ }
|
|
|
init();
|
|
|
}
|
|
|
|
|
@@ -2538,10 +2607,22 @@ static void getGlobalDeps(CGraphBase &graph, CopyCIArrayOf<CGraphDependency> &de
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+static void noteDependency(CGraphElementBase *targetActivity, CGraphElementBase *sourceActivity, CGraphBase *targetGraph, CGraphBase *sourceGraph, unsigned controlId)
|
|
|
+{
|
|
|
+ targetActivity->addDependsOn(sourceGraph, controlId);
|
|
|
+ // NB: record dependency in source graph, serialized to slaves, used to decided if should run dependency sinks or not
|
|
|
+ Owned<IPropertyTree> dependencyFor = createPTree();
|
|
|
+ dependencyFor->setPropInt("@id", sourceActivity->queryId());
|
|
|
+ dependencyFor->setPropInt("@graphId", targetGraph->queryGraphId());
|
|
|
+ if (controlId)
|
|
|
+ dependencyFor->setPropInt("@conditionalId", controlId);
|
|
|
+ sourceGraph->queryXGMML().addPropTree("Dependency", dependencyFor.getClear());
|
|
|
+}
|
|
|
+
|
|
|
void CJobBase::addDependencies(IPropertyTree *xgmml, bool failIfMissing)
|
|
|
{
|
|
|
CGraphArrayCopy childGraphs;
|
|
|
- CGraphElementArrayCopy targetActivities;
|
|
|
+ CGraphElementArrayCopy targetActivities, sourceActivities;
|
|
|
|
|
|
Owned<IPropertyTreeIterator> iter = xgmml->getElements("edge");
|
|
|
ForEach(*iter)
|
|
@@ -2577,7 +2658,7 @@ void CJobBase::addDependencies(IPropertyTree *xgmml, bool failIfMissing)
|
|
|
targetActivity = targetGraph->queryElement(targetGraphContext);
|
|
|
}
|
|
|
assertex(targetActivity && sourceActivity);
|
|
|
- targetActivity->addDependsOn(source, controlId);
|
|
|
+ noteDependency(targetActivity, sourceActivity, target, source, controlId);
|
|
|
}
|
|
|
else if (edge.getPropBool("att[@name=\"_conditionSource\"]/@value", false))
|
|
|
{ /* Ignore it */ }
|
|
@@ -2586,18 +2667,20 @@ void CJobBase::addDependencies(IPropertyTree *xgmml, bool failIfMissing)
|
|
|
// NB: any dependencies of the child acts. are dependencies of this act.
|
|
|
childGraphs.append(*source);
|
|
|
targetActivities.append(*targetActivity);
|
|
|
+ sourceActivities.append(*sourceActivity);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
if (!edge.getPropBool("att[@name=\"_childGraph\"]/@value", false)) // JCSMORE - not sure if necess. roxie seem to do.
|
|
|
controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
|
|
|
- targetActivity->addDependsOn(source, controlId);
|
|
|
+ noteDependency(targetActivity, sourceActivity, target, source, controlId);
|
|
|
}
|
|
|
}
|
|
|
ForEachItemIn(c, childGraphs)
|
|
|
{
|
|
|
CGraphBase &childGraph = childGraphs.item(c);
|
|
|
CGraphElementBase &targetActivity = targetActivities.item(c);
|
|
|
+ CGraphElementBase &sourceActivity = sourceActivities.item(c);
|
|
|
if (!childGraph.isGlobal())
|
|
|
{
|
|
|
CopyCIArrayOf<CGraphDependency> globalChildGraphDeps;
|
|
@@ -2605,7 +2688,7 @@ void CJobBase::addDependencies(IPropertyTree *xgmml, bool failIfMissing)
|
|
|
ForEachItemIn(gcd, globalChildGraphDeps)
|
|
|
{
|
|
|
CGraphDependency &globalDep = globalChildGraphDeps.item(gcd);
|
|
|
- targetActivity.addDependsOn(globalDep.graph, globalDep.controlId);
|
|
|
+ noteDependency(&targetActivity, &sourceActivity, globalDep.graph, &childGraph, globalDep.controlId);
|
|
|
}
|
|
|
}
|
|
|
}
|