|
@@ -326,7 +326,7 @@ bool isDiskInput(ThorActivityKind kind)
|
|
|
|
|
|
void CIOConnection::connect(unsigned which, CActivityBase *destActivity)
|
|
|
{
|
|
|
- destActivity->setInput(which, activity->queryActivity(true), index);
|
|
|
+ destActivity->setInput(which, activity->queryActivity(), index);
|
|
|
}
|
|
|
|
|
|
///////////////////////////////////
|
|
@@ -371,11 +371,9 @@ CGraphElementBase::CGraphElementBase(CGraphBase &_owner, IPropertyTree &_xgmml)
|
|
|
throw makeOsExceptionV(GetLastError(), "Failed to load helper factory method: %s (dll handle = %p)", helperName.str(), queryJob().queryDllEntry().getInstance());
|
|
|
alreadyUpdated = false;
|
|
|
whichBranch = (unsigned)-1;
|
|
|
- whichBranchBitSet.setown(createThreadSafeBitSet());
|
|
|
- newWhichBranch = false;
|
|
|
- hasNullInput = false;
|
|
|
log = true;
|
|
|
sentActInitData.setown(createThreadSafeBitSet());
|
|
|
+ baseHelper.setown(helperFactory());
|
|
|
}
|
|
|
|
|
|
CGraphElementBase::~CGraphElementBase()
|
|
@@ -584,7 +582,6 @@ void CGraphElementBase::onCreate()
|
|
|
if (onCreateCalled)
|
|
|
return;
|
|
|
onCreateCalled = true;
|
|
|
- baseHelper.setown(helperFactory());
|
|
|
if (!nullAct)
|
|
|
{
|
|
|
CGraphElementBase *ownerActivity = owner->queryOwner() ? owner->queryOwner()->queryElement(ownerId) : NULL;
|
|
@@ -629,140 +626,168 @@ bool CGraphElementBase::executeDependencies(size32_t parentExtractSz, const byte
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
-bool CGraphElementBase::prepareContext(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async)
|
|
|
+bool CGraphElementBase::prepareContext(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async, bool connectOnly)
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- bool _shortCircuit = shortCircuit;
|
|
|
- Owned<IThorGraphDependencyIterator> deps = getDependsIterator();
|
|
|
- bool depsDone = true;
|
|
|
- ForEach(*deps)
|
|
|
+ bool create = true;
|
|
|
+ if (connectOnly)
|
|
|
{
|
|
|
- CGraphDependency &dep = deps->query();
|
|
|
- if (0 == dep.controlId && NotFound == owner->dependentSubGraphs.find(*dep.graph))
|
|
|
+ if (activity)
|
|
|
+ return true;
|
|
|
+ ForEachItemIn(i, inputs)
|
|
|
{
|
|
|
- owner->dependentSubGraphs.append(*dep.graph);
|
|
|
- if (!dep.graph->isComplete())
|
|
|
- depsDone = false;
|
|
|
+ if (!queryInput(i)->prepareContext(parentExtractSz, parentExtract, false, false, async, true))
|
|
|
+ return false;
|
|
|
}
|
|
|
}
|
|
|
- if (depsDone) _shortCircuit = false;
|
|
|
- if (!depsDone && checkDependencies)
|
|
|
+ else
|
|
|
{
|
|
|
- if (!executeDependencies(parentExtractSz, parentExtract, 0, async))
|
|
|
- return false;
|
|
|
- }
|
|
|
- whichBranch = (unsigned)-1;
|
|
|
- hasNullInput = false;
|
|
|
- alreadyUpdated = false;
|
|
|
- switch (getKind())
|
|
|
- {
|
|
|
- case TAKindexwrite:
|
|
|
- case TAKdiskwrite:
|
|
|
- case TAKcsvwrite:
|
|
|
- case TAKxmlwrite:
|
|
|
- case TAKjsonwrite:
|
|
|
- if (_shortCircuit) return true;
|
|
|
- onCreate();
|
|
|
- alreadyUpdated = checkUpdate();
|
|
|
- if (alreadyUpdated)
|
|
|
- return false;
|
|
|
- break;
|
|
|
- case TAKchildif:
|
|
|
- owner->ifs.append(*this);
|
|
|
- // fall through
|
|
|
- case TAKif:
|
|
|
- case TAKifaction:
|
|
|
+ bool _shortCircuit = shortCircuit;
|
|
|
+ Owned<IThorGraphDependencyIterator> deps = getDependsIterator();
|
|
|
+ bool depsDone = true;
|
|
|
+ ForEach(*deps)
|
|
|
{
|
|
|
- if (_shortCircuit) return true;
|
|
|
- onCreate();
|
|
|
- onStart(parentExtractSz, parentExtract);
|
|
|
- IHThorIfArg *helper = (IHThorIfArg *)baseHelper.get();
|
|
|
- whichBranch = helper->getCondition() ? 0 : 1; // True argument precedes false...
|
|
|
- /* NB - The executeDependencies code below is only needed if actionLinkInNewGraph=true, which is no longer the default
|
|
|
- * It should be removed, once we are positive there are no issues with in-line conditional actions
|
|
|
- */
|
|
|
- if (TAKifaction == getKind())
|
|
|
- {
|
|
|
- if (!executeDependencies(parentExtractSz, parentExtract, whichBranch+1, async)) //NB whenId 1 based
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- if (inputs.queryItem(whichBranch))
|
|
|
+ CGraphDependency &dep = deps->query();
|
|
|
+ if (0 == dep.controlId && NotFound == owner->dependentSubGraphs.find(*dep.graph))
|
|
|
{
|
|
|
- if (!whichBranchBitSet->testSet(whichBranch)) // if not set, new
|
|
|
- newWhichBranch = true;
|
|
|
- return inputs.item(whichBranch)->activity->prepareContext(parentExtractSz, parentExtract, checkDependencies, false, async);
|
|
|
+ owner->dependentSubGraphs.append(*dep.graph);
|
|
|
+ if (!dep.graph->isComplete())
|
|
|
+ depsDone = false;
|
|
|
}
|
|
|
- return true;
|
|
|
}
|
|
|
- case TAKchildcase:
|
|
|
- owner->ifs.append(*this);
|
|
|
- // fall through
|
|
|
- case TAKcase:
|
|
|
+ if (depsDone) _shortCircuit = false;
|
|
|
+ if (!depsDone && checkDependencies)
|
|
|
{
|
|
|
- if (_shortCircuit) return true;
|
|
|
- onCreate();
|
|
|
- onStart(parentExtractSz, parentExtract);
|
|
|
- IHThorCaseArg *helper = (IHThorCaseArg *)baseHelper.get();
|
|
|
- whichBranch = helper->getBranch();
|
|
|
- if (whichBranch >= inputs.ordinality())
|
|
|
- whichBranch = inputs.ordinality()-1;
|
|
|
- if (inputs.queryItem(whichBranch))
|
|
|
- return inputs.item(whichBranch)->activity->prepareContext(parentExtractSz, parentExtract, checkDependencies, false, async);
|
|
|
- return true;
|
|
|
+ if (!executeDependencies(parentExtractSz, parentExtract, 0, async))
|
|
|
+ return false;
|
|
|
}
|
|
|
- case TAKfilter:
|
|
|
- case TAKfiltergroup:
|
|
|
- case TAKfilterproject:
|
|
|
+ whichBranch = (unsigned)-1;
|
|
|
+ alreadyUpdated = false;
|
|
|
+ switch (getKind())
|
|
|
{
|
|
|
- if (_shortCircuit) return true;
|
|
|
- onCreate();
|
|
|
- onStart(parentExtractSz, parentExtract);
|
|
|
- switch (getKind())
|
|
|
+ case TAKindexwrite:
|
|
|
+ case TAKdiskwrite:
|
|
|
+ case TAKcsvwrite:
|
|
|
+ case TAKxmlwrite:
|
|
|
+ case TAKjsonwrite:
|
|
|
+ if (_shortCircuit) return true;
|
|
|
+ onCreate();
|
|
|
+ alreadyUpdated = checkUpdate();
|
|
|
+ if (alreadyUpdated)
|
|
|
+ return false;
|
|
|
+ break;
|
|
|
+ case TAKchildif:
|
|
|
+ case TAKif:
|
|
|
+ case TAKifaction:
|
|
|
{
|
|
|
- case TAKfilter:
|
|
|
- hasNullInput = !((IHThorFilterArg *)baseHelper.get())->canMatchAny();
|
|
|
- break;
|
|
|
- case TAKfiltergroup:
|
|
|
- hasNullInput = !((IHThorFilterGroupArg *)baseHelper.get())->canMatchAny();
|
|
|
- break;
|
|
|
- case TAKfilterproject:
|
|
|
- hasNullInput = !((IHThorFilterProjectArg *)baseHelper.get())->canMatchAny();
|
|
|
- break;
|
|
|
+ if (_shortCircuit) return true;
|
|
|
+ onCreate();
|
|
|
+ onStart(parentExtractSz, parentExtract);
|
|
|
+ IHThorIfArg *helper = (IHThorIfArg *)baseHelper.get();
|
|
|
+ whichBranch = helper->getCondition() ? 0 : 1; // True argument precedes false...
|
|
|
+ /* NB - The executeDependencies code below is only needed if actionLinkInNewGraph=true, which is no longer the default
|
|
|
+ * It should be removed, once we are positive there are no issues with in-line conditional actions
|
|
|
+ */
|
|
|
+ if (TAKifaction == getKind())
|
|
|
+ {
|
|
|
+ if (!executeDependencies(parentExtractSz, parentExtract, whichBranch+1, async)) //NB whenId 1 based
|
|
|
+ return false;
|
|
|
+ create = false;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case TAKchildcase:
|
|
|
+ case TAKcase:
|
|
|
+ {
|
|
|
+ if (_shortCircuit) return true;
|
|
|
+ onCreate();
|
|
|
+ onStart(parentExtractSz, parentExtract);
|
|
|
+ IHThorCaseArg *helper = (IHThorCaseArg *)baseHelper.get();
|
|
|
+ whichBranch = helper->getBranch();
|
|
|
+ if (whichBranch >= inputs.ordinality())
|
|
|
+ whichBranch = inputs.ordinality()-1;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case TAKfilter:
|
|
|
+ case TAKfiltergroup:
|
|
|
+ case TAKfilterproject:
|
|
|
+ {
|
|
|
+ if (_shortCircuit) return true;
|
|
|
+ onCreate();
|
|
|
+ onStart(parentExtractSz, parentExtract);
|
|
|
+ switch (getKind())
|
|
|
+ {
|
|
|
+ case TAKfilter:
|
|
|
+ whichBranch = ((IHThorFilterArg *)baseHelper.get())->canMatchAny() ? 0 : 1;
|
|
|
+ break;
|
|
|
+ case TAKfiltergroup:
|
|
|
+ whichBranch = ((IHThorFilterGroupArg *)baseHelper.get())->canMatchAny() ? 0 : 1;
|
|
|
+ break;
|
|
|
+ case TAKfilterproject:
|
|
|
+ whichBranch = ((IHThorFilterProjectArg *)baseHelper.get())->canMatchAny() ? 0 : 1;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case TAKsequential:
|
|
|
+ case TAKparallel:
|
|
|
+ {
|
|
|
+ /* NB - The executeDependencies code below is only needed if actionLinkInNewGraph=true, which is no longer the default
|
|
|
+ * It should be removed, once we are positive there are no issues with in-line sequential/parallel activities
|
|
|
+ */
|
|
|
+ for (unsigned s=1; s<=dependsOn.ordinality(); s++)
|
|
|
+ executeDependencies(parentExtractSz, parentExtract, s, async);
|
|
|
+ create = false;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case TAKwhen_dataset:
|
|
|
+ case TAKwhen_action:
|
|
|
+ {
|
|
|
+ if (!executeDependencies(parentExtractSz, parentExtract, WhenBeforeId, async))
|
|
|
+ return false;
|
|
|
+ if (!executeDependencies(parentExtractSz, parentExtract, WhenParallelId, async))
|
|
|
+ return false;
|
|
|
+ break;
|
|
|
}
|
|
|
- if (hasNullInput)
|
|
|
- return true;
|
|
|
- break;
|
|
|
}
|
|
|
- case TAKsequential:
|
|
|
- case TAKparallel:
|
|
|
+ if (checkDependencies && ((unsigned)-1 != whichBranch))
|
|
|
{
|
|
|
- /* NB - The executeDependencies code below is only needed if actionLinkInNewGraph=true, which is no longer the default
|
|
|
- * It should be removed, once we are positive there are no issues with in-line sequential/parallel activities
|
|
|
- */
|
|
|
- for (unsigned s=1; s<=dependsOn.ordinality(); s++)
|
|
|
+ if (inputs.queryItem(whichBranch))
|
|
|
{
|
|
|
- if (!executeDependencies(parentExtractSz, parentExtract, s, async))
|
|
|
+ if (!queryInput(whichBranch)->prepareContext(parentExtractSz, parentExtract, true, false, async, connectOnly))
|
|
|
return false;
|
|
|
}
|
|
|
- break;
|
|
|
+ ForEachItemIn(i, inputs)
|
|
|
+ {
|
|
|
+ if (i != whichBranch)
|
|
|
+ {
|
|
|
+ if (!queryInput(i)->prepareContext(parentExtractSz, parentExtract, false, false, async, true))
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- case TAKwhen_dataset:
|
|
|
- case TAKwhen_action:
|
|
|
+ else
|
|
|
{
|
|
|
- if (!executeDependencies(parentExtractSz, parentExtract, WhenBeforeId, async))
|
|
|
- return false;
|
|
|
- if (!executeDependencies(parentExtractSz, parentExtract, WhenParallelId, async))
|
|
|
- return false;
|
|
|
- break;
|
|
|
+ ForEachItemIn(i, inputs)
|
|
|
+ {
|
|
|
+ if (!queryInput(i)->prepareContext(parentExtractSz, parentExtract, checkDependencies, false, async, connectOnly))
|
|
|
+ return false;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- ForEachItemIn(i, inputs)
|
|
|
+ if (create)
|
|
|
{
|
|
|
- CGraphElementBase *input = inputs.item(i)->activity;
|
|
|
- if (!input->prepareContext(parentExtractSz, parentExtract, checkDependencies, shortCircuit, async))
|
|
|
- return false;
|
|
|
+ if (activity) // no need to recreate
|
|
|
+ return true;
|
|
|
+ ForEachItemIn(i2, inputs)
|
|
|
+ {
|
|
|
+ CIOConnection *inputIO = inputs.item(i2);
|
|
|
+ connectInput(i2, inputIO->activity, inputIO->index);
|
|
|
+ }
|
|
|
+ if (isSink())
|
|
|
+ owner->addActiveSink(*this);
|
|
|
+ activity.setown(factory());
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
@@ -790,12 +815,8 @@ void CGraphElementBase::preStart(size32_t parentExtractSz, const byte *parentExt
|
|
|
|
|
|
void CGraphElementBase::initActivity()
|
|
|
{
|
|
|
- CriticalBlock b(crit);
|
|
|
- if (isSink())
|
|
|
- owner->addActiveSink(*this);
|
|
|
- if (activity) // no need to recreate
|
|
|
- return;
|
|
|
- activity.setown(factory());
|
|
|
+ if (!activity)
|
|
|
+ activity.setown(factory());
|
|
|
if (isLoopActivity(*this))
|
|
|
{
|
|
|
unsigned loopId = queryXGMML().getPropInt("att[@name=\"_loopid\"]/@value");
|
|
@@ -805,113 +826,6 @@ void CGraphElementBase::initActivity()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-void CGraphElementBase::createActivity(size32_t parentExtractSz, const byte *parentExtract)
|
|
|
-{
|
|
|
- if (connectedInputs.ordinality()) // ensure not traversed twice (e.g. via splitter)
|
|
|
- return;
|
|
|
- try
|
|
|
- {
|
|
|
- switch (getKind())
|
|
|
- {
|
|
|
- case TAKchildif:
|
|
|
- case TAKchildcase:
|
|
|
- {
|
|
|
- if (inputs.queryItem(whichBranch))
|
|
|
- {
|
|
|
- CGraphElementBase *input = inputs.item(whichBranch)->activity;
|
|
|
- input->createActivity(parentExtractSz, parentExtract);
|
|
|
- }
|
|
|
- onCreate();
|
|
|
- initActivity();
|
|
|
- if (inputs.queryItem(whichBranch))
|
|
|
- {
|
|
|
- CIOConnection *inputIO = inputs.item(whichBranch);
|
|
|
- connectInput(whichBranch, inputIO->activity, inputIO->index);
|
|
|
- }
|
|
|
- break;
|
|
|
- }
|
|
|
- case TAKif:
|
|
|
- case TAKcase:
|
|
|
- if (inputs.queryItem(whichBranch))
|
|
|
- {
|
|
|
- CGraphElementBase *input = inputs.item(whichBranch)->activity;
|
|
|
- input->createActivity(parentExtractSz, parentExtract);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- onCreate();
|
|
|
- if (!activity)
|
|
|
- factorySet(TAKnull);
|
|
|
- }
|
|
|
- break;
|
|
|
- case TAKifaction:
|
|
|
- if (inputs.queryItem(whichBranch))
|
|
|
- {
|
|
|
- CGraphElementBase *input = inputs.item(whichBranch)->activity;
|
|
|
- input->createActivity(parentExtractSz, parentExtract);
|
|
|
- }
|
|
|
- break;
|
|
|
- case TAKsequential:
|
|
|
- case TAKparallel:
|
|
|
- {
|
|
|
- ForEachItemIn(i, inputs)
|
|
|
- {
|
|
|
- if (inputs.queryItem(i))
|
|
|
- {
|
|
|
- CGraphElementBase *input = inputs.item(i)->activity;
|
|
|
- input->createActivity(parentExtractSz, parentExtract);
|
|
|
- }
|
|
|
- }
|
|
|
- break;
|
|
|
- }
|
|
|
- default:
|
|
|
- if (!hasNullInput)
|
|
|
- {
|
|
|
- ForEachItemIn(i, inputs)
|
|
|
- {
|
|
|
- CGraphElementBase *input = inputs.item(i)->activity;
|
|
|
- input->createActivity(parentExtractSz, parentExtract);
|
|
|
- }
|
|
|
- onCreate();
|
|
|
- if (isDiskInput(getKind()))
|
|
|
- onStart(parentExtractSz, parentExtract);
|
|
|
- ForEachItemIn(i2, inputs)
|
|
|
- {
|
|
|
- CIOConnection *inputIO = inputs.item(i2);
|
|
|
- loop
|
|
|
- {
|
|
|
- CGraphElementBase *input = inputIO->activity;
|
|
|
- switch (input->getKind())
|
|
|
- {
|
|
|
- case TAKif:
|
|
|
- case TAKcase:
|
|
|
- {
|
|
|
- if (input->whichBranch >= input->getInputs()) // if, will have TAKnull activity, made at create time.
|
|
|
- {
|
|
|
- input = NULL;
|
|
|
- break;
|
|
|
- }
|
|
|
- inputIO = input->inputs.item(input->whichBranch);
|
|
|
- assertex(inputIO);
|
|
|
- break;
|
|
|
- }
|
|
|
- default:
|
|
|
- input = NULL;
|
|
|
- break;
|
|
|
- }
|
|
|
- if (!input)
|
|
|
- break;
|
|
|
- }
|
|
|
- connectInput(i2, inputIO->activity, inputIO->index);
|
|
|
- }
|
|
|
- }
|
|
|
- initActivity();
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- catch (IException *e) { ActPrintLog(e); activity.clear(); throw; }
|
|
|
-}
|
|
|
-
|
|
|
ICodeContext *CGraphElementBase::queryCodeContext()
|
|
|
{
|
|
|
return queryOwner().queryCodeContext();
|
|
@@ -1134,13 +1048,8 @@ CGraphBase::CGraphBase(CJobChannel &_jobChannel) : jobChannel(_jobChannel), job(
|
|
|
parent = owner = NULL;
|
|
|
graphId = 0;
|
|
|
complete = false;
|
|
|
- reinit = false; // should graph reinitialize each time it is called (e.g. in loop graphs)
|
|
|
- // This is currently for 'init' (Create time) info and onStart into
|
|
|
- sentInitData = false;
|
|
|
-// sentStartCtx = false;
|
|
|
- sentStartCtx = true; // JCSMORE - disable for now
|
|
|
parentActivityId = 0;
|
|
|
- created = connected = started = graphDone = aborted = prepared = false;
|
|
|
+ connected = started = graphDone = aborted = prepared = false;
|
|
|
startBarrier = waitBarrier = doneBarrier = NULL;
|
|
|
mpTag = waitBarrierTag = startBarrierTag = doneBarrierTag = TAG_NULL;
|
|
|
executeReplyTag = TAG_NULL;
|
|
@@ -1275,6 +1184,7 @@ bool CGraphBase::fireException(IException *e)
|
|
|
|
|
|
bool CGraphBase::preStart(size32_t parentExtractSz, const byte *parentExtract)
|
|
|
{
|
|
|
+ started = true; // causes reset() to be called on all subsequent executions of this subgraph.
|
|
|
Owned<IThorActivityIterator> iter = getConnectedIterator();
|
|
|
ForEach(*iter)
|
|
|
{
|
|
@@ -1292,27 +1202,11 @@ void CGraphBase::executeSubGraph(size32_t parentExtractSz, const byte *parentExt
|
|
|
Owned<IException> exception;
|
|
|
try
|
|
|
{
|
|
|
- if (!prepare(parentExtractSz, parentExtract, false, false, false))
|
|
|
+ if (!queryOwner())
|
|
|
{
|
|
|
- setCompleteEx();
|
|
|
- return;
|
|
|
- }
|
|
|
- try
|
|
|
- {
|
|
|
- if (!queryOwner())
|
|
|
- {
|
|
|
- StringBuffer s;
|
|
|
- toXML(&queryXGMML(), s, 2);
|
|
|
- GraphPrintLog("Running graph [%s] : %s", isGlobal()?"global":"local", s.str());
|
|
|
- }
|
|
|
- create(parentExtractSz, parentExtract);
|
|
|
- }
|
|
|
- catch (IException *e)
|
|
|
- {
|
|
|
- Owned<IThorException> e2 = MakeGraphException(this, e);
|
|
|
- e2->setAction(tea_abort);
|
|
|
- queryJobChannel().fireException(e2);
|
|
|
- throw;
|
|
|
+ StringBuffer s;
|
|
|
+ toXML(&queryXGMML(), s, 2);
|
|
|
+ GraphPrintLog("Running graph [%s] : %s", isGlobal()?"global":"local", s.str());
|
|
|
}
|
|
|
if (localResults)
|
|
|
localResults->clear();
|
|
@@ -1334,21 +1228,31 @@ void CGraphBase::executeSubGraph(size32_t parentExtractSz, const byte *parentExt
|
|
|
throw exception.getClear();
|
|
|
}
|
|
|
|
|
|
+void CGraphBase::onCreate()
|
|
|
+{
|
|
|
+ Owned<IThorActivityIterator> iter = getConnectedIterator();
|
|
|
+ ForEach(*iter)
|
|
|
+ {
|
|
|
+ CGraphElementBase &element = iter->query();
|
|
|
+ element.onCreate();
|
|
|
+ element.initActivity();
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
void CGraphBase::execute(size32_t _parentExtractSz, const byte *parentExtract, bool checkDependencies, bool async)
|
|
|
{
|
|
|
- parentExtractSz = _parentExtractSz;
|
|
|
if (isComplete())
|
|
|
return;
|
|
|
if (async)
|
|
|
- queryJobChannel().startGraph(*this, queryJobChannel(), checkDependencies, parentExtractSz, parentExtract); // may block if enough running
|
|
|
+ queryJobChannel().startGraph(*this, checkDependencies, _parentExtractSz, parentExtract); // may block if enough running
|
|
|
else
|
|
|
{
|
|
|
- if (!prepare(parentExtractSz, parentExtract, checkDependencies, async, async))
|
|
|
+ if (!prepare(_parentExtractSz, parentExtract, checkDependencies, false, false))
|
|
|
{
|
|
|
setComplete();
|
|
|
return;
|
|
|
}
|
|
|
- executeSubGraph(parentExtractSz, parentExtract);
|
|
|
+ executeSubGraph(_parentExtractSz, parentExtract);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1359,18 +1263,7 @@ void CGraphBase::doExecute(size32_t parentExtractSz, const byte *parentExtract,
|
|
|
{
|
|
|
if (abortException)
|
|
|
throw abortException.getLink();
|
|
|
- throw MakeGraphException(this, 0, "subgraph aborted(1)");
|
|
|
- }
|
|
|
- if (!prepare(parentExtractSz, parentExtract, checkDependencies, false, false))
|
|
|
- {
|
|
|
- setComplete();
|
|
|
- return;
|
|
|
- }
|
|
|
- if (queryAborted())
|
|
|
- {
|
|
|
- if (abortException)
|
|
|
- throw abortException.getLink();
|
|
|
- throw MakeGraphException(this, 0, "subgraph aborted(2)");
|
|
|
+ throw MakeGraphException(this, 0, "subgraph aborted");
|
|
|
}
|
|
|
Owned<IException> exception;
|
|
|
try
|
|
@@ -1442,34 +1335,17 @@ bool CGraphBase::prepare(size32_t parentExtractSz, const byte *parentExtract, bo
|
|
|
{
|
|
|
if (isComplete()) return false;
|
|
|
bool needToExecute = false;
|
|
|
- ifs.kill();
|
|
|
ForEachItemIn(s, sinks)
|
|
|
{
|
|
|
CGraphElementBase &sink = sinks.item(s);
|
|
|
- if (sink.prepareContext(parentExtractSz, parentExtract, checkDependencies, shortCircuit, async))
|
|
|
+ if (sink.prepareContext(parentExtractSz, parentExtract, checkDependencies, shortCircuit, async, false))
|
|
|
needToExecute = true;
|
|
|
}
|
|
|
// prepared = true;
|
|
|
+ onCreate();
|
|
|
return needToExecute;
|
|
|
}
|
|
|
|
|
|
-void CGraphBase::create(size32_t parentExtractSz, const byte *parentExtract)
|
|
|
-{
|
|
|
- Owned<IThorActivityIterator> iter = getIterator();
|
|
|
- ForEach(*iter)
|
|
|
- {
|
|
|
- CGraphElementBase &element = iter->query();
|
|
|
- element.clearConnections();
|
|
|
- }
|
|
|
- activeSinks.kill(); // NB: activeSinks are added to during activity creation
|
|
|
- ForEachItemIn(s, sinks)
|
|
|
- {
|
|
|
- CGraphElementBase &sink = sinks.item(s);
|
|
|
- sink.createActivity(parentExtractSz, parentExtract);
|
|
|
- }
|
|
|
- created = true;
|
|
|
-}
|
|
|
-
|
|
|
void CGraphBase::done()
|
|
|
{
|
|
|
if (aborted) return; // activity done methods only called on success
|
|
@@ -1539,21 +1415,23 @@ protected:
|
|
|
cur.setown(&others.popGet());
|
|
|
return cur;
|
|
|
}
|
|
|
- CGraphElementBase *setNext(CIOConnectionArray &inputs, unsigned whichInput=((unsigned)-1))
|
|
|
+ void setNext(bool branchOnConditional)
|
|
|
{
|
|
|
- cur.clear();
|
|
|
- unsigned n = inputs.ordinality();
|
|
|
- if (((unsigned)-1) != whichInput)
|
|
|
+ if (branchOnConditional && ((unsigned)-1) != cur->whichBranch)
|
|
|
{
|
|
|
- CIOConnection *io = inputs.queryItem(whichInput);
|
|
|
+ CIOConnection *io = cur->connectedInputs.queryItem(cur->whichBranch);
|
|
|
if (io)
|
|
|
cur.set(io->activity);
|
|
|
+ else
|
|
|
+ cur.clear();
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
+ CIOConnectionArray &inputs = cur->connectedInputs;
|
|
|
+ cur.clear();
|
|
|
+ unsigned n = inputs.ordinality();
|
|
|
bool first = true;
|
|
|
- unsigned i=0;
|
|
|
- for (; i<n; i++)
|
|
|
+ for (unsigned i=0; i<n; i++)
|
|
|
{
|
|
|
CIOConnection *io = inputs.queryItem(i);
|
|
|
if (io)
|
|
@@ -1571,7 +1449,7 @@ protected:
|
|
|
if (!cur)
|
|
|
{
|
|
|
if (!popNext())
|
|
|
- return NULL;
|
|
|
+ return;
|
|
|
}
|
|
|
// check haven't been here before
|
|
|
loop
|
|
@@ -1587,9 +1465,8 @@ protected:
|
|
|
}
|
|
|
}
|
|
|
if (!popNext())
|
|
|
- return NULL;
|
|
|
+ return;
|
|
|
}
|
|
|
- return cur.get();
|
|
|
}
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
@@ -1623,28 +1500,19 @@ public:
|
|
|
|
|
|
class CGraphTraverseConnectedIterator : public CGraphTraverseIteratorBase
|
|
|
{
|
|
|
+ bool branchOnConditional;
|
|
|
public:
|
|
|
- CGraphTraverseConnectedIterator(CGraphBase &graph) : CGraphTraverseIteratorBase(graph) { }
|
|
|
+ CGraphTraverseConnectedIterator(CGraphBase &graph, bool _branchOnConditional) : CGraphTraverseIteratorBase(graph), branchOnConditional(_branchOnConditional) { }
|
|
|
virtual bool next()
|
|
|
{
|
|
|
- if (cur->hasNullInput)
|
|
|
- {
|
|
|
- do
|
|
|
- {
|
|
|
- if (!popNext())
|
|
|
- return false;
|
|
|
- }
|
|
|
- while (cur->hasNullInput);
|
|
|
- }
|
|
|
- else
|
|
|
- setNext(cur->connectedInputs);
|
|
|
+ setNext(branchOnConditional);
|
|
|
return NULL!=cur.get();
|
|
|
}
|
|
|
};
|
|
|
|
|
|
-IThorActivityIterator *CGraphBase::getConnectedIterator()
|
|
|
+IThorActivityIterator *CGraphBase::getConnectedIterator(bool branchOnConditional)
|
|
|
{
|
|
|
- return new CGraphTraverseConnectedIterator(*this);
|
|
|
+ return new CGraphTraverseConnectedIterator(*this, branchOnConditional);
|
|
|
}
|
|
|
|
|
|
bool CGraphBase::wait(unsigned timeout)
|
|
@@ -2910,9 +2778,9 @@ void CJobChannel::clean()
|
|
|
subGraphs.kill();
|
|
|
}
|
|
|
|
|
|
-void CJobChannel::startGraph(CGraphBase &graph, IGraphCallback &callback, bool checkDependencies, size32_t parentExtractSize, const byte *parentExtract)
|
|
|
+void CJobChannel::startGraph(CGraphBase &graph, bool checkDependencies, size32_t parentExtractSize, const byte *parentExtract)
|
|
|
{
|
|
|
- graphExecutor->add(&graph, callback, checkDependencies, parentExtractSize, parentExtract);
|
|
|
+ graphExecutor->add(&graph, *this, checkDependencies, parentExtractSize, parentExtract);
|
|
|
}
|
|
|
|
|
|
IThorResult *CJobChannel::getOwnedResult(graph_id gid, activity_id ownerId, unsigned resultId)
|