|
@@ -754,6 +754,12 @@ bool CGraphElementBase::prepareContext(size32_t parentExtractSz, const byte *par
|
|
|
default:
|
|
|
break;
|
|
|
}
|
|
|
+ if (isActivitySink(getKind()))
|
|
|
+ {
|
|
|
+ // Suppress executing internal sinks with 0 generated dependencies
|
|
|
+ if (queryXGMML().getPropBool("att[@name='_internal']/@value", false) && (0 == owner->queryDependents(id)))
|
|
|
+ return false;
|
|
|
+ }
|
|
|
if (checkDependencies && ((unsigned)-1 != whichBranch))
|
|
|
{
|
|
|
if (inputs.queryItem(whichBranch))
|
|
@@ -1067,18 +1073,6 @@ static void getGlobalDeps(CGraphBase &graph, CICopyArrayOf<CGraphDependency> &de
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-static void noteDependency(CGraphElementBase *targetActivity, CGraphElementBase *sourceActivity, CGraphBase *targetGraph, CGraphBase *sourceGraph, unsigned controlId)
|
|
|
-{
|
|
|
- targetActivity->addDependsOn(sourceGraph, controlId);
|
|
|
- // NB: record dependency in source graph, serialized to slaves, used to decided if should run dependency sinks or not
|
|
|
- Owned<IPropertyTree> dependencyFor = createPTree();
|
|
|
- dependencyFor->setPropInt("@id", sourceActivity->queryId());
|
|
|
- dependencyFor->setPropInt("@graphId", targetGraph->queryGraphId());
|
|
|
- if (controlId)
|
|
|
- dependencyFor->setPropInt("@conditionalId", controlId);
|
|
|
- sourceGraph->queryXGMML().addPropTree("Dependency", dependencyFor.getClear());
|
|
|
-}
|
|
|
-
|
|
|
static void addDependencies(IPropertyTree *xgmml, bool failIfMissing, CGraphTableCopy &graphs)
|
|
|
{
|
|
|
CGraphArrayCopy dependentchildGraphs;
|
|
@@ -1128,7 +1122,7 @@ static void addDependencies(IPropertyTree *xgmml, bool failIfMissing, CGraphTabl
|
|
|
targetActivity = targetGraph->queryElement(targetGraphContext);
|
|
|
}
|
|
|
assertex(targetActivity && sourceActivity);
|
|
|
- noteDependency(targetActivity, sourceActivity, target, source, controlId);
|
|
|
+ source->noteDependency(targetActivity, sourceActivity, controlId, true);
|
|
|
}
|
|
|
else if (edge.getPropBool("att[@name=\"_conditionSource\"]/@value", false))
|
|
|
{ /* Ignore it */ }
|
|
@@ -1143,7 +1137,7 @@ static void addDependencies(IPropertyTree *xgmml, bool failIfMissing, CGraphTabl
|
|
|
{
|
|
|
if (!edge.getPropBool("att[@name=\"_childGraph\"]/@value", false)) // JCSMORE - not sure if necess. roxie seem to do.
|
|
|
controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
|
|
|
- noteDependency(targetActivity, sourceActivity, target, source, controlId);
|
|
|
+ source->noteDependency(targetActivity, sourceActivity, controlId, true);
|
|
|
}
|
|
|
}
|
|
|
ForEachItemIn(c, dependentchildGraphs)
|
|
@@ -1158,7 +1152,7 @@ static void addDependencies(IPropertyTree *xgmml, bool failIfMissing, CGraphTabl
|
|
|
ForEachItemIn(gcd, globalChildGraphDeps)
|
|
|
{
|
|
|
CGraphDependency &globalDep = globalChildGraphDeps.item(gcd);
|
|
|
- noteDependency(&targetActivity, &sourceActivity, globalDep.graph, &childGraph, globalDep.controlId);
|
|
|
+ childGraph.noteDependency(&targetActivity, &sourceActivity, globalDep.controlId, true);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1207,6 +1201,7 @@ CGraphBase::CGraphBase(CJobChannel &_jobChannel) : jobChannel(_jobChannel), job(
|
|
|
parentExtractSz = 0;
|
|
|
counter = 0; // loop/graph counter, will be set by loop/graph activity if needed
|
|
|
loopBodySubgraph = false;
|
|
|
+ sourceActDependents.setown(createPTree());
|
|
|
}
|
|
|
|
|
|
CGraphBase::~CGraphBase()
|
|
@@ -1323,6 +1318,21 @@ IThorGraphStubIterator *CGraphBase::getChildStubIterator() const
|
|
|
return new CIter(childGraphsTable);
|
|
|
}
|
|
|
|
|
|
+void CGraphBase::noteDependency(CGraphElementBase *targetActivity, CGraphElementBase *sourceActivity, unsigned controlId, bool interGraph)
|
|
|
+{
|
|
|
+ if (interGraph)
|
|
|
+ targetActivity->addDependsOn(this, controlId);
|
|
|
+ // NB: record dependency in source graph, serialized to slaves, used to decided if should run dependency sinks or not
|
|
|
+ VStringBuffer srcActStr("act%u", sourceActivity->queryId());
|
|
|
+ sourceActDependents->setPropInt(srcActStr, sourceActDependents->getPropInt(srcActStr)+1);
|
|
|
+}
|
|
|
+
|
|
|
+unsigned CGraphBase::queryDependents(unsigned sourceActId)
|
|
|
+{
|
|
|
+ VStringBuffer srcActStr("act%u", sourceActId);
|
|
|
+ return sourceActDependents->getPropInt(srcActStr);
|
|
|
+}
|
|
|
+
|
|
|
IThorGraphIterator *CGraphBase::getChildGraphIterator() const
|
|
|
{
|
|
|
CriticalBlock b(crit);
|
|
@@ -2019,6 +2029,10 @@ void CGraphBase::createFromXGMML(IPropertyTree *_node, CGraphBase *_owner, CGrap
|
|
|
CGraphElementBase *source = queryElement(edge.getPropInt("@source"));
|
|
|
CGraphElementBase *target = queryElement(edge.getPropInt("@target"));
|
|
|
target->addInput(targetInput, source, sourceOutput);
|
|
|
+
|
|
|
+ int controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
|
|
|
+ if (controlId != 0)
|
|
|
+ noteDependency(target, source, controlId, false);
|
|
|
}
|
|
|
Owned<IThorActivityIterator> iter = getIterator();
|
|
|
ForEach(*iter)
|