workflow.cpp 78 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "workunit.hpp"
  15. #include "jptree.hpp"
  16. #include "jlog.hpp"
  17. #include "jregexp.hpp"
  18. #include "workflow.hpp"
  19. //------------------------------------------------------------------------------------------
  20. /* Parallel Workflow explanation
  21. Prerequisite
  22. Being able to understand how the workflow engine works requires understanding the code generator. In particular, the way that queries get translated into a workflow structure.
  23. Many details are important or unexpected. For example, persist items come as a pair, but in the opposite order to what you might expect.
  24. There is a useful description in the WorkUnits.rst devdoc.
  25. Key information
  26. * All items are executed a maximum of one time per query. (Unless they are recovered)
  27. * ECL actions are in one-to-one correspondence with the workflow items that houses the action.
  28. Ready criteria
  29. The core criteria defining when items are added to the task queue is if they have any unperformed dependencies. A second criteria checks that the item is active.
  30. An item, that has been popped from the task queue, should be executed is if it is both active and alive.
  31. General process
  32. * The program checks workflow items against a list of exceptions to determine if parallel execution is supported.
  33. * The program recursively traces through each item’s dependencies, constructing the “graph of successors”. See below
  34. * Items without dependencies are placed on the starting task queue
  35. * Threads are created
  36. * Threads perform the thread specific process
  37. * Threads finish, once all items have been executed
  38. Thread specific process
  39. * Wait for an item to be added to the task queue
  40. * Pop an item from the task queue
  41. * Execute item
  42. * Alert successors to the item (convey that one of their dependencies has been completed)
  43. * Add any successors to the task queue if they meet the "ready criteria"
  44. What is the graph of successors?
  45. The relationship between an item and its dependency is two-directional.
  46. * The dependency must be done first
  47. * The item should be done second. Therefore, the item is a successor to the dependency.
  48. The term graph represents the idea that the successor of an item may also have its own successors. (You could sketch this out visually)
  49. This is a consequence of allowing an item's dependency to also have its own dependencies.
  50. An ECL query always generates at least one item with zero successors, called the "parent item”. This is the item to execute last (unless the query fails)
  51. Dependent successors are those described above; they are the reverse of a dependency.
  52. Logical successors are a second type of successor. Logical successors have no dependency on their predecessor, yet must execute afterwards.
  53. Logical successors are used for a variety of reasons involving ORDERED, SEQUENTIAL, IF, SUCCESS/FAILURE.
  54. There are scenarios where an item will execute before its logical predecessor. (But this can't happen for a dependent successor)
  55. i.e. the ECL code: PARALLEL(b, ORDERED(a,b))
  56. This may cause action b to be executed before a - even though there is a logical successorship from a to b due to ORDERED.
  57. You could say that this logical successorship is made obsolete by the encompassing PARALLEL statement.
  58. This code example shows that although logical successorships are added in the graph of successors, they may never be used.
  59. PARALLEL, ORDERED and SEQUENTIAL are ways to group actions and to specify if they have any ordering requirements. (An ordering requirement could be: action 1 needs to be performed before action 2)
  60. I will describe how they work in terms of the first and second actions in the actionlist, without any loss of generality.
  61. The relationship from the second action to the first is *exactly* the same as the relationship from the third action to the second, and so on.
  62. PARALLEL
  63. The actions in a parallel actionlist have no special ordering requirements. Any actions or their dependencies can be performed concurrently.
  64. In relation to the workflow engine, there are no logical dependencies between the actions or their dependencies
  65. SEQUENTIAL
  66. The actions in a sequential actionlist have the most constrained ordering requirements. Firstly, the actions must be performed in order. Secondly, any dependencies to an action can only be started once the previous action has finished.
  67. In relation to the workflow engine, the second action in the actionlist has a logical dependency on the first action. Furthermore, each of the second action's dependencies has a logical dependency on the first action.
  68. ORDERED
  69. The actions in an ordered actionlist have a less constrained ordering requirement than sequential. Only the actions in the actionlist must be performed in order, but there is no special ordering requirement for their dependencies.
  70. In relation to the workflow engine, the second action in the actionlist has a logical dependency on the first action. This is not true of the second action's dependencies, which can be executed at any point.
  71. Active
  72. Any actions explicitly called in the ECL query are active. For the query to finish, it is always necessary for these item to execute.
  73. Any items that these actions depend on also get marked as active.
  74. Items that start without being active (i.e. those that might not execute during the workunit) are:
  75. * items that are logical successors of other items, and not activated by another route
  76. These could be:
  77. * items that are contingencies (SUCCESS/FAILURE)
  78. * items that are the trueresult or falseresult of a conditional IF function
  79. If one of these items is going to execute, then they are marked as active.
  80. For example, logical successorships (described above) entail a predecessor "activating" its successor.
  81. An item is active when it has a reason to execute.
  82. Anything that uses activate()/deactivate() is protected by a critical section.
  83. Alive
  84. When an item starts executing, it has to be "alive". This means that it fulfills the following condition: (!abort || item.queryException() || item.queryContingencyWithin())
  85. If the item is no longer alive, then instead of processing it, the item is discarded.
  86. Whenever the global abort is false, all items are alive.
  87. If the global abort is true, then only items that are part of a contingency or have an exception are alive. (For an item to have an exception, either it has failed or its children have failed)
  88. Items that are part of a contingency **and** have an exception are treated like items with just an exception.
  89. The global abort changing is the only reason that items can go from alive to dead.
  90. Items are made inactive once they turn dead, so that a different logical predecessor can re-activate them.
  91. Complications
  92. The algorithm is made more complicated by having to
  93. * co-ordinate when the worker threads stop
  94. * catch any failed workflow items and perform the failure contingencies
  95. * identify whether persist items (and dependencies) are up-to-date
  96. * protect shared resources such as the task queue from race conditions.
  97. */
  98. EnumMapping wftypes[] =
  99. {
  100. { WFTypeNormal, "normal" },
  101. { WFTypeSuccess, "success" },
  102. { WFTypeFailure, "failure" },
  103. { WFTypeRecovery, "recovery" },
  104. { WFTypeWait, "wait" },
  105. { WFTypeSize, NULL }
  106. };
  107. EnumMapping wfmodes[] =
  108. {
  109. { WFModeNormal, "normal" },
  110. { WFModeCondition, "condition" },
  111. { WFModeSequential, "sequential" },
  112. { WFModeParallel, "parallel" },
  113. { WFModePersist, "persist" },
  114. { WFModeBeginWait, "bwait" },
  115. { WFModeWait, "wait" },
  116. { WFModeOnce, "once" },
  117. { WFModeCritical, "critical" },
  118. { WFModeOrdered, "ordered" },
  119. { WFModeConditionExpression, "condition expression" },
  120. { WFModeSize, NULL}
  121. };
  122. EnumMapping wfstates[] =
  123. {
  124. { WFStateNull, "null" },
  125. { WFStateReqd, "reqd" },
  126. { WFStateDone, "done" },
  127. { WFStateFail, "fail" },
  128. { WFStateSkip, "skip" },
  129. { WFStateWait, "wait" },
  130. { WFStateBlocked, "block" },
  131. { WFStateSize, NULL }
  132. };
  133. static void setEnum(IPropertyTree *p, const char *propname, int value, EnumMapping *map)
  134. {
  135. const char * mapped = getEnumText(value, map, nullptr);
  136. if (!mapped)
  137. assertex(!"Unexpected value in setEnum");
  138. p->setProp(propname, mapped);
  139. }
  140. static int getEnum(IPropertyTree *p, const char *propname, EnumMapping *map)
  141. {
  142. const char *v = p->queryProp(propname);
  143. if (v)
  144. return getEnum(v, map);
  145. return 0;
  146. }
  147. const char * queryWorkflowTypeText(WFType type)
  148. {
  149. return getEnumText(type, wftypes);
  150. }
  151. const char * queryWorkflowModeText(WFMode mode)
  152. {
  153. return getEnumText(mode, wfmodes);
  154. }
  155. const char * queryWorkflowStateText(WFState state)
  156. {
  157. return getEnumText(state, wfstates);
  158. }
  159. class CWorkflowDependencyIterator : implements IWorkflowDependencyIterator, public CInterface
  160. {
  161. public:
  162. CWorkflowDependencyIterator(IPropertyTree * tree) { iter.setown(tree->getElements("Dependency")); }
  163. IMPLEMENT_IINTERFACE;
  164. bool first() { return iter->first(); }
  165. bool isValid() { return iter->isValid(); }
  166. bool next() { return iter->next(); }
  167. unsigned query() const { return iter->query().getPropInt("@wfid"); }
  168. private:
  169. Owned<IPropertyTreeIterator> iter;
  170. };
  171. class CWorkflowEvent : public CInterface, implements IWorkflowEvent
  172. {
  173. public:
  174. CWorkflowEvent(char const * _name, char const * _text) : name(_name), text(_text) {}
  175. IMPLEMENT_IINTERFACE;
  176. virtual char const * queryName() const { return name.get(); }
  177. virtual char const * queryText() const { return text.get(); }
  178. virtual bool matches(char const * trialName, char const * trialText) const { return((strcmp(trialName, name.get()) == 0) && WildMatch(trialText, text.get(), true)); }
  179. private:
  180. StringAttr name;
  181. StringAttr text;
  182. };
  183. class CWorkflowItem : implements IWorkflowItem, public CInterface
  184. {
  185. public:
  186. CWorkflowItem(IPropertyTree & _tree) { tree.setown(&_tree); }
  187. CWorkflowItem(IPropertyTree * ptree, unsigned wfid, WFType type, WFMode mode, unsigned success, unsigned failure, unsigned recovery, unsigned retriesAllowed, unsigned contingencyFor)
  188. {
  189. tree.setown(LINK(ptree->addPropTree("Item")));
  190. tree->setPropInt("@wfid", wfid);
  191. setEnum(tree, "@type", type, wftypes);
  192. setEnum(tree, "@mode", mode, wfmodes);
  193. if(success) tree->setPropInt("@success", success);
  194. if(failure) tree->setPropInt("@failure", failure);
  195. if(recovery && retriesAllowed)
  196. {
  197. tree->setPropInt("@recovery", recovery);
  198. tree->setPropInt("@retriesAllowed", retriesAllowed);
  199. tree->addPropTree("Dependency")->setPropInt("@wfid", recovery);
  200. }
  201. if(contingencyFor) tree->setPropInt("@contingencyFor", contingencyFor);
  202. reset();
  203. }
  204. IMPLEMENT_IINTERFACE;
  205. //info set at compile time
  206. virtual unsigned queryWfid() const { return tree->getPropInt("@wfid"); }
  207. virtual bool isScheduled() const { return tree->hasProp("Schedule"); }
  208. virtual bool isScheduledNow() const { return (tree->hasProp("Schedule") && !tree->hasProp("Schedule/Event")); }
  209. virtual IWorkflowEvent * getScheduleEvent() const { if(tree->hasProp("Schedule/Event")) return new CWorkflowEvent(tree->queryProp("Schedule/Event/@name"), tree->queryProp("Schedule/Event/@text")); else return NULL; }
  210. virtual unsigned querySchedulePriority() const { return (tree->hasProp("Schedule") ? tree->getPropInt("Schedule/@priority", 0) : 0); }
  211. virtual bool hasScheduleCount() const { return tree->hasProp("Schedule/@count"); }
  212. virtual unsigned queryScheduleCount() const { assertex(tree->hasProp("Schedule/@count")); return tree->getPropInt("Schedule/@count"); }
  213. virtual IWorkflowDependencyIterator * getDependencies() const { return new CWorkflowDependencyIterator(tree); }
  214. virtual WFType queryType() const { return static_cast<WFType>(getEnum(tree, "@type", wftypes)); }
  215. virtual IStringVal & getLabel(IStringVal & val) const { val.set(tree->queryProp("@label")); return val; }
  216. virtual WFMode queryMode() const { return static_cast<WFMode>(getEnum(tree, "@mode", wfmodes)); }
  217. virtual unsigned querySuccess() const { return tree->getPropInt("@success", 0); }
  218. virtual unsigned queryFailure() const { return tree->getPropInt("@failure", 0); }
  219. virtual unsigned queryRecovery() const { return tree->getPropInt("@recovery", 0); }
  220. virtual unsigned queryRetriesAllowed() const { return tree->getPropInt("@retriesAllowed", 0); }
  221. virtual unsigned queryContingencyFor() const { return tree->getPropInt("@contingencyFor", 0); }
  222. virtual IStringVal & getPersistName(IStringVal & val) const { val.set(tree->queryProp("@persistName")); return val; }
  223. virtual unsigned queryPersistWfid() const { return tree->getPropInt("@persistWfid", 0); }
  224. virtual int queryPersistCopies() const { return tree->getPropInt("@persistCopies", 0); }
  225. virtual bool queryPersistRefresh() const { return tree->getPropBool("@persistRefresh", true); }
  226. virtual IStringVal & getCriticalName(IStringVal & val) const { val.set(tree->queryProp("@criticalName")); return val; }
  227. virtual IStringVal & queryCluster(IStringVal & val) const { val.set(tree->queryProp("@cluster")); return val; }
  228. virtual void setScheduledNow() { tree->setPropTree("Schedule"); setEnum(tree, "@state", WFStateReqd, wfstates); }
  229. virtual void setScheduledOn(char const * name, char const * text) { IPropertyTree * stree = tree->setPropTree("Schedule")->setPropTree("Event"); stree->setProp("@name", name); stree->setProp("@text", text);; setEnum(tree, "@state", WFStateWait, wfstates); }
  230. virtual void setSchedulePriority(unsigned priority) { assertex(tree->hasProp("Schedule")); tree->setPropInt("Schedule/@priority", priority); }
  231. virtual void setScheduleCount(unsigned count) { assertex(tree->hasProp("Schedule")); tree->setPropInt("Schedule/@count", count); tree->setPropInt("Schedule/@countRemaining", count); }
  232. virtual void addDependency(unsigned wfid) { tree->addPropTree("Dependency")->setPropInt("@wfid", wfid); }
  233. virtual void setPersistInfo(char const * name, unsigned wfid, int numPersistInstances, bool refresh)
  234. {
  235. tree->setProp("@persistName", name);
  236. tree->setPropInt("@persistWfid", wfid);
  237. if (numPersistInstances != 0)
  238. tree->setPropInt("@persistCopies", (int)numPersistInstances);
  239. tree->setPropBool("@persistRefresh", refresh);
  240. }
  241. virtual void setCriticalInfo(char const * name) { tree->setProp("@criticalName", name);}
  242. virtual void setCluster(const char * cluster) { tree->setProp("@cluster", cluster); }
  243. //info set at run time
  244. virtual unsigned queryScheduleCountRemaining() const { assertex(tree->hasProp("Schedule")); return tree->getPropInt("Schedule/@countRemaining"); }
  245. virtual WFState queryState() const { return static_cast<WFState>(getEnum(tree, "@state", wfstates)); }
  246. virtual unsigned queryRetriesRemaining() const { return tree->getPropInt("@retriesRemaining"); }
  247. virtual int queryFailCode() const { return tree->getPropInt("@failcode"); }
  248. virtual char const * queryFailMessage() const { return tree->queryProp("@failmsg"); }
  249. virtual char const * queryEventName() const { return tree->queryProp("@eventname"); }
  250. virtual char const * queryEventExtra() const { return tree->queryProp("@eventextra"); }
  251. virtual void setState(WFState state) { setEnum(tree, "@state", state, wfstates); }
  252. virtual unsigned queryScheduledWfid() const { return tree->getPropInt("@swfid", 0); }
  253. virtual void setScheduledWfid(unsigned wfid) { tree->setPropInt("@swfid", wfid); }
  254. virtual void setLabel(const char * label) { tree->setProp("@label", label); }
  255. virtual bool testAndDecRetries()
  256. {
  257. assertex(tree->hasProp("@retriesAllowed"));
  258. unsigned rem = tree->getPropInt("@retriesRemaining", 0);
  259. if(rem==0)
  260. return false;
  261. tree->setPropInt("@retriesRemaining", rem-1);
  262. return true;
  263. }
  264. virtual bool decAndTestScheduleCountRemaining()
  265. {
  266. if(!tree->hasProp("Schedule/@count"))
  267. return true;
  268. unsigned rem = tree->getPropInt("Schedule/@countRemaining");
  269. assertex(rem>0);
  270. tree->setPropInt("Schedule/@countRemaining", rem-1);
  271. return (rem>1);
  272. }
  273. virtual void incScheduleCount()
  274. {
  275. unsigned rem = tree->getPropInt("Schedule/@countRemaining");
  276. tree->setPropInt("Schedule/@countRemaining", rem+1);
  277. }
  278. virtual void setFailInfo(int code, char const * message)
  279. {
  280. tree->setPropInt("@failcode", code);
  281. tree->setProp("@failmsg", message);
  282. }
  283. virtual void setEvent(const char * name, const char * extra)
  284. {
  285. if (name)
  286. tree->setProp("@eventname", name);
  287. if (extra)
  288. tree->setProp("@eventextra", extra);
  289. }
  290. virtual void reset()
  291. {
  292. if(tree->hasProp("@retriesAllowed"))
  293. tree->setPropInt("@retriesRemaining", tree->getPropInt("@retriesAllowed"));
  294. if(tree->hasProp("Schedule/@count"))
  295. tree->setPropInt("Schedule/@countRemaining", tree->getPropInt("Schedule/@count"));
  296. tree->removeProp("@failcode");
  297. tree->removeProp("@failmsg");
  298. tree->removeProp("@eventname");
  299. tree->removeProp("@eventtext");
  300. if(isScheduled())
  301. {
  302. if(isScheduledNow())
  303. setState(WFStateReqd);
  304. else if (hasScheduleCount() && (queryScheduleCountRemaining() == 0))
  305. setState(WFStateDone);
  306. else
  307. setState(WFStateWait);
  308. }
  309. else if(queryType() == WFTypeRecovery)
  310. setState(WFStateSkip);
  311. else
  312. setState(WFStateNull);
  313. }
  314. virtual void syncRuntimeData(IConstWorkflowItem const & other)
  315. {
  316. WFState state = other.queryState();
  317. setState(state);
  318. if(tree->hasProp("@retriesAllowed"))
  319. tree->setPropInt("@retriesRemaining", other.queryRetriesRemaining());
  320. if(tree->hasProp("Schedule/@count"))
  321. tree->setPropInt("Schedule/@countRemaining", other.queryScheduleCountRemaining());
  322. if(state == WFStateFail)
  323. {
  324. tree->setPropInt("@failcode", other.queryFailCode());
  325. tree->setProp("@failmsg", other.queryFailMessage());
  326. }
  327. setEvent(other.queryEventName(), other.queryEventExtra());
  328. }
  329. private:
  330. Owned<IPropertyTree> tree;
  331. };
  332. class CCloneWorkflowItem : public CInterface, implements IRuntimeWorkflowItem
  333. {
  334. private:
  335. class CCloneSchedule : public CInterface
  336. {
  337. private:
  338. bool now;
  339. unsigned priority;
  340. bool counting;
  341. unsigned count;
  342. unsigned countRemaining;
  343. Owned<IWorkflowEvent> event;
  344. public:
  345. CCloneSchedule(IConstWorkflowItem const * other)
  346. {
  347. now = other->isScheduledNow();
  348. priority = other->querySchedulePriority();
  349. counting = other->hasScheduleCount();
  350. if(counting)
  351. {
  352. count = other->queryScheduleCount();
  353. countRemaining = other->queryScheduleCountRemaining();
  354. }
  355. else
  356. {
  357. count = 0;
  358. countRemaining = 0;
  359. }
  360. event.setown(other->getScheduleEvent());
  361. }
  362. bool isNow() const { return now; }
  363. unsigned queryPriority() const { return priority; }
  364. bool hasCount() const { return counting; }
  365. unsigned queryCount() const { return count; }
  366. unsigned queryCountRemaining() const { return countRemaining; }
  367. bool decAndTestCountRemaining()
  368. {
  369. if(!counting)
  370. return true;
  371. if(countRemaining)
  372. countRemaining--;
  373. return (countRemaining>0);
  374. }
  375. void incCountRemaining()
  376. {
  377. if(counting)
  378. countRemaining++;
  379. }
  380. void resetCount() { if(counting) countRemaining = count; }
  381. IWorkflowEvent * getEvent() const { return event.getLink(); }
  382. };
  383. class CCloneIterator : public CInterface, public IWorkflowDependencyIterator
  384. {
  385. public:
  386. CCloneIterator(IntArray const & _array) : array(_array), idx(0) {}
  387. IMPLEMENT_IINTERFACE;
  388. virtual bool first() { idx = 0; return isValid(); }
  389. virtual bool isValid() { return array.isItem(idx); }
  390. virtual bool next() { idx++; return isValid(); }
  391. virtual unsigned query() const { return array.item(idx); }
  392. private:
  393. IntArray const & array;
  394. aindex_t idx;
  395. };
  396. unsigned wfid;
  397. //If an item has an exception, only the failure contingency should execute
  398. Owned<WorkflowException> thisException;
  399. Owned<CCloneSchedule> schedule;
  400. IntArray dependencies;
  401. IntArray dependentSuccessors;
  402. //These are the items that are activated, upon completion (of this item)
  403. IntArray logicalSuccessors;
  404. //This is the number of unperformed dependencies belonging to the item. It is decreased until it reaches 0
  405. std::atomic<unsigned int> numDependencies{0U};
  406. //An item will only be executed if it is active
  407. std::atomic<bool> active{false};
  408. //The flag is the runtime context in which an item has been added to the task queue. This means that before the execution starts, it isn't known whether each item will be executed as part of a contingency or otherwise.
  409. //This catches contingency failures
  410. unsigned withinContingency = 0;
  411. WFType type = WFTypeNormal;
  412. WFMode mode = WFModeNormal;
  413. unsigned success;
  414. unsigned failure;
  415. unsigned recovery;
  416. unsigned retriesAllowed;
  417. unsigned contingencyFor;
  418. unsigned scheduledWfid;
  419. WFState state = WFStateNull;
  420. unsigned retriesRemaining;
  421. int failcode;
  422. StringAttr failmsg;
  423. SCMStringBuffer persistName;
  424. SCMStringBuffer clusterName;
  425. SCMStringBuffer label;
  426. unsigned persistWfid;
  427. int persistCopies;
  428. bool persistRefresh = true;
  429. SCMStringBuffer criticalName;
  430. StringAttr eventName;
  431. StringAttr eventExtra;
  432. public:
  433. CCloneWorkflowItem(){}
  434. CCloneWorkflowItem(unsigned _wfid)
  435. {
  436. wfid = _wfid;
  437. }
  438. IMPLEMENT_IINTERFACE;
  439. void incNumDependencies()
  440. {
  441. numDependencies++;
  442. }
  443. unsigned atomicDecNumDependencies()
  444. {
  445. return numDependencies.fetch_sub(1);
  446. }
  447. unsigned queryNumDependencies() const { return numDependencies; }
  448. unsigned queryNumDependentSuccessors() const { return dependentSuccessors.ordinality(); }
  449. unsigned queryNumLogicalSuccessors() const { return logicalSuccessors.ordinality(); }
  450. bool isDependentSuccessorsEmpty() const
  451. {
  452. return dependentSuccessors.empty();
  453. }
  454. void addDependentSuccessor(CCloneWorkflowItem * next)
  455. {
  456. #ifdef TRACE_WORKFLOW
  457. LOG(MCworkflow, "Workflow item %u has marked workflow item %u as its dependent successor", wfid, next->queryWfid());
  458. #endif
  459. dependentSuccessors.append(next->queryWfid());
  460. next->incNumDependencies();
  461. }
  462. void addLogicalSuccessor(CCloneWorkflowItem * next)
  463. {
  464. #ifdef TRACE_WORKFLOW
  465. LOG(MCworkflow, "Workflow item %u has marked workflow item %u as its logical successor", wfid, next->queryWfid());
  466. #endif
  467. logicalSuccessors.append(next->queryWfid());
  468. //note that dependency count is not incremented, since logical successors don't follow as dependents
  469. //Instead, logical relationships are used to activate the successors
  470. }
  471. bool hasLogicalSuccessor(unsigned wfid)
  472. {
  473. return (logicalSuccessors.contains(wfid));
  474. }
  475. //For condition expression
  476. void removeLogicalSuccessors()
  477. {
  478. if(logicalSuccessors.empty())
  479. throwUnexpected();
  480. logicalSuccessors.clear();
  481. }
  482. IWorkflowDependencyIterator * getDependentSuccessors() const
  483. {
  484. return new CCloneIterator(dependentSuccessors);
  485. }
  486. IWorkflowDependencyIterator * getLogicalSuccessors() const
  487. {
  488. return new CCloneIterator(logicalSuccessors);
  489. }
  490. void activate()
  491. {
  492. #ifdef TRACE_WORKFLOW
  493. LOG(MCworkflow, "workflow item %u [%p] is activated", wfid, this);
  494. #endif
  495. active = true;
  496. }
  497. void deactivate()
  498. {
  499. #ifdef TRACE_WORKFLOW
  500. LOG(MCworkflow, "workflow item %u [%p] is deActivated", wfid, this);
  501. #endif
  502. active = false;
  503. }
  504. bool isActive() const { return active; }
  505. void setMode(WFMode _mode)
  506. {
  507. mode = _mode;
  508. }
  509. void setFailureWfid(unsigned _failure)
  510. {
  511. failure = _failure;
  512. }
  513. void setSuccessWfid(unsigned _success)
  514. {
  515. success = _success;
  516. }
  517. void setException(WorkflowException * e)
  518. {
  519. #ifdef TRACE_WORKFLOW
  520. LOG(MCworkflow, "workflow item %u [%p] has its exception set", wfid, this);
  521. #endif
  522. thisException.set(e);
  523. }
  524. void setContingencyWithin(unsigned n)
  525. {
  526. withinContingency = n;
  527. }
  528. void copy(IConstWorkflowItem const * other)
  529. {
  530. wfid = other->queryWfid();
  531. if(other->isScheduled())
  532. schedule.setown(new CCloneSchedule(other));
  533. Owned<IWorkflowDependencyIterator> iter = other->getDependencies();
  534. for(iter->first(); iter->isValid(); iter->next())
  535. dependencies.append(iter->query());
  536. type = other->queryType();
  537. mode = other->queryMode();
  538. success = other->querySuccess();
  539. failure = other->queryFailure();
  540. recovery = other->queryRecovery();
  541. retriesAllowed = other->queryRetriesAllowed();
  542. contingencyFor = other->queryContingencyFor();
  543. state = other->queryState();
  544. retriesRemaining = other->queryRetriesRemaining();
  545. if(state == WFStateFail)
  546. {
  547. failcode = other->queryFailCode();
  548. failmsg.set(other->queryFailMessage());
  549. }
  550. eventName.set(other->queryEventName());
  551. eventExtra.set(other->queryEventExtra());
  552. other->getPersistName(persistName);
  553. persistWfid = other->queryPersistWfid();
  554. scheduledWfid = other->queryScheduledWfid();
  555. persistCopies = other->queryPersistCopies();
  556. persistRefresh = other->queryPersistRefresh();
  557. other->getCriticalName(criticalName);
  558. other->queryCluster(clusterName);
  559. other->getLabel(label);
  560. }
  561. //info set at compile time
  562. virtual WorkflowException * queryException() const
  563. {
  564. #ifdef TRACE_WORKFLOW
  565. LOG(MCworkflow, "workflow item %u [%p] has its exception queried", wfid, this);
  566. #endif
  567. return thisException.get();
  568. }
  569. virtual unsigned queryContingencyWithin() const { return withinContingency; }
  570. virtual unsigned queryWfid() const { return wfid; }
  571. virtual bool isScheduled() const { return schedule.get() != 0; }
  572. virtual bool isScheduledNow() const { return schedule && schedule->isNow(); }
  573. virtual IWorkflowEvent * getScheduleEvent() const { if(schedule) return schedule->getEvent(); else return NULL; }
  574. virtual unsigned querySchedulePriority() const { return schedule ? schedule->queryPriority() : 0; }
  575. virtual bool hasScheduleCount() const { return schedule ? schedule->hasCount() : false; }
  576. virtual unsigned queryScheduleCount() const { return schedule ? schedule->queryCount() : 0; }
  577. virtual IWorkflowDependencyIterator * getDependencies() const { return new CCloneIterator(dependencies); }
  578. virtual WFType queryType() const { return type; }
  579. virtual WFMode queryMode() const { return mode; }
  580. virtual IStringVal & getLabel(IStringVal & val) const { val.set(label.str()); return val; }
  581. virtual unsigned querySuccess() const { return success; }
  582. virtual unsigned queryFailure() const { return failure; }
  583. virtual unsigned queryRecovery() const { return recovery; }
  584. virtual unsigned queryRetriesAllowed() const { return retriesAllowed; }
  585. virtual unsigned queryContingencyFor() const { return contingencyFor; }
  586. virtual IStringVal & getPersistName(IStringVal & val) const { val.set(persistName.str()); return val; }
  587. virtual unsigned queryPersistWfid() const { return persistWfid; }
  588. virtual int queryPersistCopies() const { return persistCopies; }
  589. virtual bool queryPersistRefresh() const { return persistRefresh; }
  590. virtual IStringVal & getCriticalName(IStringVal & val) const { val.set(criticalName.str()); return val; }
  591. virtual IStringVal & queryCluster(IStringVal & val) const { val.set(clusterName.str()); return val; }
  592. //info set at run time
  593. virtual unsigned queryScheduleCountRemaining() const { return schedule ? schedule->queryCountRemaining() : 0; }
  594. virtual WFState queryState() const { return state; }
  595. virtual unsigned queryRetriesRemaining() const { return retriesRemaining; }
  596. virtual int queryFailCode() const { return failcode; }
  597. virtual char const * queryFailMessage() const { return failmsg.get(); }
  598. virtual char const * queryEventName() const { return eventName; }
  599. virtual char const * queryEventExtra() const { return eventExtra; }
  600. virtual unsigned queryScheduledWfid() const { return scheduledWfid; }
  601. virtual void setState(WFState _state) { state = _state; }
  602. virtual bool testAndDecRetries()
  603. {
  604. if(retriesRemaining == 0)
  605. return false;
  606. retriesRemaining--;
  607. return true;
  608. }
  609. virtual bool decAndTestScheduleCountRemaining()
  610. {
  611. if(!schedule)
  612. return true;
  613. return schedule->decAndTestCountRemaining();
  614. }
  615. virtual void incScheduleCount()
  616. {
  617. if(schedule)
  618. schedule->incCountRemaining();
  619. }
  620. virtual void setFailInfo(int code, char const * message)
  621. {
  622. failcode = code;
  623. failmsg.set(message);
  624. }
  625. virtual void setEvent(const char * name, const char * extra)
  626. {
  627. eventName.set(name);
  628. eventExtra.set(extra);
  629. }
  630. virtual void reset()
  631. {
  632. retriesRemaining = retriesAllowed;
  633. if(schedule) schedule->resetCount();
  634. if(isScheduled())
  635. {
  636. if(isScheduledNow())
  637. setState(WFStateReqd);
  638. else if (hasScheduleCount() && (queryScheduleCountRemaining() == 0))
  639. setState(WFStateDone);
  640. else
  641. setState(WFStateWait);
  642. }
  643. else if(queryType() == WFTypeRecovery)
  644. setState(WFStateSkip);
  645. else
  646. setState(WFStateNull);
  647. }
  648. };
  649. class CWorkflowItemIterator : public CInterface, implements IWorkflowItemIterator
  650. {
  651. public:
  652. CWorkflowItemIterator(IPropertyTree * tree) { iter.setown(tree->getElements("Item")); }
  653. IMPLEMENT_IINTERFACE;
  654. bool first() { item.clear(); return iter->first(); }
  655. bool isValid() { return iter->isValid(); }
  656. bool next() { item.clear(); return iter->next(); }
  657. IConstWorkflowItem * query() const { if(!item) item.setown(new CWorkflowItem(iter->get())); return item.get(); }
  658. IWorkflowItem * get() const { if(!item) item.setown(new CWorkflowItem(iter->get())); return item.getLink(); }
  659. private:
  660. Owned<IPropertyTreeIterator> iter;
  661. mutable Owned<CWorkflowItem> item;
  662. };
  663. class CCloneWorkflowItemArray : public CInterface, implements IWorkflowItemArray
  664. {
  665. private:
  666. class ListItem
  667. {
  668. public:
  669. ListItem(ListItem * _next, IRuntimeWorkflowItem * _item) : next(_next), item(_item) {}
  670. ListItem * next;
  671. IRuntimeWorkflowItem * item;
  672. };
  673. class ListItemPtr : public CInterface, implements IRuntimeWorkflowItemIterator
  674. {
  675. public:
  676. ListItemPtr(ListItem * _start) : start(_start) { ptr = NULL; }
  677. IMPLEMENT_IINTERFACE;
  678. virtual bool first() { ptr = start; return isValid(); }
  679. virtual bool isValid() { return ptr != NULL; }
  680. virtual bool next() { ptr = ptr->next; return isValid(); }
  681. virtual IConstWorkflowItem * query() const { return ptr->item; }
  682. virtual IRuntimeWorkflowItem * get() const { return LINK(ptr->item); }
  683. private:
  684. ListItem * start;
  685. ListItem * ptr;
  686. };
  687. void insert(CCloneWorkflowItem * item)
  688. {
  689. if(!item->isScheduled())
  690. return;
  691. if(!head)
  692. head = tail = new ListItem(NULL, item);
  693. else if(item->querySchedulePriority() > head->item->querySchedulePriority())
  694. head = new ListItem(head, item);
  695. else if(item->querySchedulePriority() <= tail->item->querySchedulePriority())
  696. {
  697. tail->next = new ListItem(NULL, item);
  698. tail = tail->next;
  699. }
  700. else
  701. {
  702. ListItem * finger = head;
  703. while(item->querySchedulePriority() <= finger->next->item->querySchedulePriority())
  704. finger = finger->next;
  705. finger->next = new ListItem(finger->next, item);
  706. }
  707. }
  708. public:
  709. CCloneWorkflowItemArray(unsigned _capacity) : capacity(_capacity), head(NULL), tail(NULL)
  710. {
  711. array = _capacity ? new CCloneWorkflowItem[_capacity] : NULL;
  712. }
  713. ~CCloneWorkflowItemArray()
  714. {
  715. ListItem * finger = head;
  716. while(finger)
  717. {
  718. ListItem * del = finger;
  719. finger = finger->next;
  720. delete del;
  721. }
  722. if (array)
  723. delete [] array;
  724. }
  725. IMPLEMENT_IINTERFACE;
  726. virtual void addClone(IConstWorkflowItem const * other)
  727. {
  728. unsigned wfid = other->queryWfid();
  729. assertex((wfid > 0) && (wfid <= capacity));
  730. array[wfid-1].copy(other);
  731. insert(&array[wfid-1]);
  732. }
  733. virtual IRuntimeWorkflowItem & queryWfid(unsigned wfid)
  734. {
  735. assertex((wfid > 0) && (wfid <= capacity));
  736. return array[wfid-1];
  737. }
  738. virtual unsigned count() const
  739. {
  740. return capacity;
  741. }
  742. //iterator through the scheduled items (not ALL the items)
  743. virtual IRuntimeWorkflowItemIterator * getSequenceIterator() { return new ListItemPtr(head); }
  744. virtual bool hasScheduling() const
  745. {
  746. ListItem * finger = head;
  747. while(finger)
  748. {
  749. if(!finger->item->isScheduledNow())
  750. return true;
  751. finger = finger->next;
  752. }
  753. return false;
  754. }
  755. private:
  756. unsigned capacity;
  757. CCloneWorkflowItem * array;
  758. ListItem * head;
  759. ListItem * tail;
  760. };
  761. //-------------------------------------------------------------------------------------------------
  762. WorkflowMachine::WorkflowMachine()
  763. : ctx(NULL), process(NULL), currentWfid(0), currentScheduledWfid(0), itemsWaiting(0), itemsUnblocked(0), condition(false), logctx(queryDummyContextLogger())
  764. {
  765. }
  766. WorkflowMachine::WorkflowMachine(const IContextLogger &_logctx)
  767. : ctx(NULL), process(NULL), currentWfid(0), currentScheduledWfid(0), itemsWaiting(0), itemsUnblocked(0), condition(false), logctx(_logctx)
  768. {
  769. }
  770. void WorkflowMachine::addSuccessors()
  771. {
  772. Owned<IRuntimeWorkflowItemIterator> iter = workflow->getSequenceIterator();
  773. if (iter->first())
  774. {
  775. while (iter->isValid())
  776. {
  777. IConstWorkflowItem * item = iter->query();
  778. if(item->queryState() == WFStateReqd)
  779. {
  780. //initial call
  781. parentWfid = item->queryWfid();
  782. #ifdef TRACE_WORKFLOW
  783. LOG(MCworkflow, "Item %u has been identified as the 'parent' item, with Reqd state", parentWfid);
  784. #endif
  785. CCloneWorkflowItem thisItem;
  786. startItem = &thisItem;
  787. defineLogicalRelationships(parentWfid, startItem, false);
  788. #ifdef TRACE_WORKFLOW
  789. LOG(MCworkflow, "Adding initial workflow items");
  790. #endif
  791. //Logical successors to the startItem are ready be executed if they have no dependencies
  792. processLogicalSuccessors(*startItem);
  793. startItem = nullptr;
  794. break;
  795. }
  796. if(!iter->next()) break;
  797. }
  798. }
  799. assertex(parentWfid != 0);
  800. #ifdef TRACE_WORKFLOW
  801. //Outputting debug info about each workflow item.
  802. unsigned totalDependencies = 0;
  803. unsigned totalActiveItems = 0;
  804. unsigned totalInActiveItems = 0;
  805. unsigned totalDependentSuccessors = 0;
  806. unsigned totalLogicalSuccessors = 0;
  807. unsigned totalConditionItems = 0;
  808. //iterate through the workflow items
  809. for(int i = 1; i <= workflow->count(); i++)
  810. {
  811. CCloneWorkflowItem & cur = queryWorkflowItem(i);
  812. unsigned numDep = cur.queryNumDependencies();
  813. unsigned numDepSuc = cur.queryNumDependentSuccessors();
  814. unsigned numLogSuc = cur.queryNumLogicalSuccessors();
  815. if(cur.isActive())
  816. totalActiveItems++;
  817. else
  818. totalInActiveItems++;
  819. totalDependencies += numDep;
  820. totalDependentSuccessors += numDepSuc;
  821. totalLogicalSuccessors += numLogSuc;
  822. LOG(MCworkflow, "Item %u has %u dependencies, %u dependent successors and %u logical successors", cur.queryWfid(), numDep, numDepSuc, numLogSuc);
  823. if(cur.queryMode() == WFModeCondition)
  824. {
  825. totalConditionItems++;
  826. }
  827. }
  828. //iterate throught the IntermediaryWorkflow items
  829. for(int i = 0; i < logicalWorkflow.size() ; i++)
  830. {
  831. IRuntimeWorkflowItem *tmp = logicalWorkflow[i].get();
  832. CCloneWorkflowItem * cur = static_cast<CCloneWorkflowItem*>(tmp);
  833. unsigned numDep = cur->queryNumDependencies();
  834. unsigned numDepSuc = cur->queryNumDependentSuccessors();
  835. unsigned numLogSuc = cur->queryNumLogicalSuccessors();
  836. if(cur->isActive())
  837. totalActiveItems++;
  838. else
  839. totalInActiveItems++;
  840. totalDependencies += numDep;
  841. totalDependentSuccessors += numDepSuc;
  842. totalLogicalSuccessors += numLogSuc;
  843. LOG(MCworkflow, "Runtime item %u has %u dependencies, %u dependent successors and %u logical successors", cur->queryWfid(), numDep, numDepSuc, numLogSuc);
  844. if(cur->queryMode() == WFModeCondition)
  845. {
  846. totalConditionItems++;
  847. }
  848. }
  849. LOG(MCworkflow, "Total dependencies is: %u, total dependent successors is: %u, total logical successors is: %u", totalDependencies, totalDependentSuccessors, totalLogicalSuccessors);
  850. LOG(MCworkflow, "Total condition items is: %u, total active items is: %u, total inactive items is %u", totalConditionItems, totalActiveItems, totalInActiveItems);
  851. if(totalDependencies == totalDependentSuccessors)
  852. LOG(MCworkflow, "dependency and dependent successor count is consistent");
  853. else
  854. LOG(MCworkflow, "dependency and dependent successor count is inconsistent");
  855. #endif
  856. }
  857. CCloneWorkflowItem * WorkflowMachine::insertLogicalPredecessor(unsigned successorWfid)
  858. {
  859. unsigned wfid = workflow->count() + logicalWorkflow.size()+1;
  860. #ifdef TRACE_WORKFLOW
  861. LOG(MCworkflow, "new predecessor workflow item %u has been created", wfid);
  862. #endif
  863. CCloneWorkflowItem * predecessor = new CCloneWorkflowItem(wfid); //initialise the intermediary
  864. Owned<IRuntimeWorkflowItem> tmp = predecessor;
  865. logicalWorkflow.push_back(tmp); //adding it to the workflow array
  866. defineLogicalRelationships(successorWfid, predecessor, false);
  867. return predecessor;
  868. }
  869. void WorkflowMachine::defineLogicalRelationships(unsigned int wfid, CCloneWorkflowItem *logicalPredecessor, bool prevOrdered)
  870. {
  871. #ifdef TRACE_WORKFLOW
  872. LOG(MCworkflow, "Called mark dependents on item %u", wfid);
  873. #endif
  874. //If this condition is met, then the item will be actived before the start of the execution.
  875. //Any new logical relationships to activate this item are irrelevant, since they cannot activate an already active item.
  876. if (startItem->hasLogicalSuccessor(wfid))
  877. return;
  878. //If this condition is met, then an identical call to defineLogicalRelationships has been made previously.
  879. //Processing it twice is redundant.
  880. if (logicalPredecessor->hasLogicalSuccessor(wfid))
  881. return;
  882. CCloneWorkflowItem & item = queryWorkflowItem(wfid);
  883. bool alreadyProcessed = (!item.isDependentSuccessorsEmpty());
  884. //Ordered causes the effect of logicalPredecessor to skip a generation (to this item's dependencies)
  885. if(!prevOrdered)
  886. {
  887. logicalPredecessor->addLogicalSuccessor(&item);
  888. }
  889. Owned<IWorkflowDependencyIterator> iter = item.getDependencies();
  890. //For Non-Condition items
  891. if(item.queryMode() != WFModeCondition)
  892. {
  893. #ifdef TRACE_WORKFLOW
  894. LOG(MCworkflow, "Item %u is a non-condition item", wfid);
  895. #endif
  896. CCloneWorkflowItem * prev = nullptr;
  897. bool thisOrdered = false;
  898. bool onlyProcessFirst = false;
  899. for(iter->first(); iter->isValid(); iter->next())
  900. {
  901. CCloneWorkflowItem & cur = queryWorkflowItem(iter->query());
  902. //prev is the logical predecessor to cur.
  903. switch(item.queryMode())
  904. {
  905. case WFModeOrdered:
  906. if(prev)
  907. {
  908. //Note: thisOrdered is false for the first ORDERED action
  909. thisOrdered = true;
  910. if(!alreadyProcessed)
  911. cur.addLogicalSuccessor(prev);
  912. }
  913. //Note: Ordered doesn't change logicalPredecessor
  914. break;
  915. case WFModeSequential:
  916. onlyProcessFirst = alreadyProcessed;
  917. if(prev)
  918. //Note: Sequential changes logicalPredecessor, so that the dependencies of the cur item also depend on prev.
  919. logicalPredecessor = prev;
  920. break;
  921. }
  922. defineLogicalRelationships(cur.queryWfid(), logicalPredecessor, thisOrdered);
  923. if(onlyProcessFirst)
  924. return;
  925. if(!alreadyProcessed)
  926. cur.addDependentSuccessor(&item);//this means that alreadyProcessed will be true when next evaluated
  927. prev = &cur;
  928. }
  929. }
  930. else
  931. {
  932. //For Condition items
  933. #ifdef TRACE_WORKFLOW
  934. LOG(MCworkflow, "Item %u is a condition item", wfid);
  935. #endif
  936. if(!iter->first())
  937. throwUnexpected();
  938. CCloneWorkflowItem & conditionExpression = queryWorkflowItem(iter->query());
  939. defineLogicalRelationships(conditionExpression.queryWfid(), logicalPredecessor, false);
  940. if(alreadyProcessed)
  941. return;
  942. if(!iter->next())
  943. throwUnexpected();
  944. unsigned wfidTrue = iter->query();
  945. unsigned wfidFalse = 0;
  946. if(iter->next())
  947. wfidFalse = iter->query();
  948. conditionExpression.setMode(WFModeConditionExpression);
  949. conditionExpression.addLogicalSuccessor(insertLogicalPredecessor(wfidTrue));
  950. CCloneWorkflowItem & trueSuccessor = queryWorkflowItem(wfidTrue);
  951. trueSuccessor.addDependentSuccessor(&item);
  952. if(wfidFalse)
  953. {
  954. conditionExpression.addLogicalSuccessor(insertLogicalPredecessor(wfidFalse));
  955. CCloneWorkflowItem & falseSuccessor = queryWorkflowItem(wfidFalse);
  956. falseSuccessor.addDependentSuccessor(&item);
  957. //Decrement this.numDependencies by one, to account for one path not being completed in the future.
  958. item.atomicDecNumDependencies();
  959. }
  960. conditionExpression.addDependentSuccessor(&item);
  961. }
  962. //Contingency clauses (belonging to any type of item)
  963. //Here, an intermediary item is inserted between "item" and its contingency.
  964. unsigned successWfid = item.querySuccess();
  965. if(successWfid)
  966. {
  967. item.setSuccessWfid(insertLogicalPredecessor(successWfid)->queryWfid());
  968. }
  969. unsigned failureWfid = item.queryFailure();
  970. if(failureWfid)
  971. {
  972. item.setFailureWfid(insertLogicalPredecessor(failureWfid)->queryWfid());
  973. }
  974. }
  975. void WorkflowMachine::addToItemQueue(unsigned wfid)
  976. {
  977. {
  978. CriticalBlock thisBlock(queueCritSec);
  979. wfItemQueue.push(wfid);
  980. }
  981. wfItemQueueSem.signal(1);
  982. }
  983. void WorkflowMachine::processDependentSuccessors(CCloneWorkflowItem &item)
  984. {
  985. //item will never be re-executed, so this function is only called once per item
  986. if(item.queryWfid() == parentWfid)
  987. {
  988. //update stop conditions
  989. parentReached = true;
  990. #ifdef TRACE_WORKFLOW
  991. LOG(MCworkflow, "Reached parent");
  992. #endif
  993. //Evaluate stop conditions. If the workflow has failed, then it needs to check whether there are any pending contingencies
  994. checkIfDone();
  995. }
  996. WorkflowException * e = item.queryException();
  997. Owned<IWorkflowDependencyIterator> iter = item.getDependentSuccessors();
  998. //MORE: optionally check "alive" - could increase speed, but may introduce race condition
  999. for(iter->first();iter->isValid(); iter->next())
  1000. {
  1001. unsigned thisWfid = iter->query();
  1002. CCloneWorkflowItem & cur = queryWorkflowItem(thisWfid);
  1003. //this must be done even if the workflow item has an exception
  1004. unsigned numPred = cur.atomicDecNumDependencies();
  1005. if(e)
  1006. {
  1007. bool newBranch = false;
  1008. {
  1009. //this protects against two threads adding the same item at the same time
  1010. CriticalBlock thisBlock(exceptionCritSec);
  1011. if(!cur.queryException())
  1012. {
  1013. cur.setException(e);
  1014. newBranch = true;
  1015. }
  1016. }
  1017. if(newBranch)
  1018. {
  1019. //only process the exception if cur is active
  1020. if(cur.isActive())
  1021. {
  1022. branchCount++;
  1023. addToItemQueue(thisWfid);
  1024. }
  1025. }
  1026. }
  1027. else
  1028. {
  1029. if((numPred == 1) && cur.isActive())
  1030. {
  1031. addToItemQueue(thisWfid);
  1032. }
  1033. }
  1034. }
  1035. if(item.queryException())
  1036. {
  1037. //decrement branch count by one, since this item is already on a failed branch
  1038. branchCount.fetch_add(-1);
  1039. checkIfDone();
  1040. }
  1041. }
  1042. void WorkflowMachine::processLogicalSuccessors(CCloneWorkflowItem &item)
  1043. {
  1044. Owned<IWorkflowDependencyIterator> iter = item.getLogicalSuccessors();
  1045. for(iter->first();iter->isValid(); iter->next())
  1046. {
  1047. unsigned thisWfid = iter->query();
  1048. CCloneWorkflowItem & cur = queryWorkflowItem(thisWfid);
  1049. bool itemIsReady = false;
  1050. {
  1051. //this protects against two threads activating the same item at the same time
  1052. CriticalBlock thisBlock(activationCritSec);
  1053. if(!cur.queryContingencyWithin())
  1054. {
  1055. //This may make cur alive if it was dead
  1056. //If cur has already been deactivated by executeItemParallel(), it will soon be re-added to the item queue
  1057. //If not, it may now never become deactivated, since it is part of a contingency (in the case that item is part of a contingency)
  1058. //In the case that item is also not part of a contingency, then no variables have been modified
  1059. cur.setContingencyWithin(item.queryContingencyWithin());
  1060. }
  1061. if(!cur.isActive())
  1062. {
  1063. cur.activate();
  1064. itemIsReady = true;
  1065. }
  1066. }
  1067. if(itemIsReady)
  1068. {
  1069. if(cur.queryNumDependencies() == 0)
  1070. {
  1071. addToItemQueue(thisWfid);
  1072. }
  1073. }
  1074. }
  1075. }
  1076. bool WorkflowMachine::activateFailureContingency(CCloneWorkflowItem & item)
  1077. {
  1078. unsigned failureWfid = item.queryFailure();
  1079. if(failureWfid)
  1080. {
  1081. startContingency();
  1082. CCloneWorkflowItem & failureActivator = queryWorkflowItem(failureWfid);
  1083. failureActivator.setContingencyWithin(item.queryWfid());
  1084. processLogicalSuccessors(failureActivator);
  1085. return true;
  1086. }
  1087. return false;
  1088. }
  1089. void WorkflowMachine::checkAbort(CCloneWorkflowItem & item, bool depFailed)
  1090. {
  1091. if(item.queryContingencyWithin())
  1092. return;
  1093. CriticalBlock thisBlock(exceptionCritSec);
  1094. if(!abort)
  1095. {
  1096. //This stores the error that causes the workflow to abort
  1097. runtimeError.set(item.queryException());
  1098. #ifdef TRACE_WORKFLOW
  1099. if(!depFailed)
  1100. LOG(MCworkflow, "Workflow item %u failed. Aborting task", item.queryWfid());
  1101. else
  1102. LOG(MCworkflow, "Dependency of Workflow item %u failed. Aborting task", item.queryWfid());
  1103. #endif
  1104. abort = true;
  1105. }
  1106. }
  1107. void WorkflowMachine::startContingency()
  1108. {
  1109. activeContingencies++;
  1110. #ifdef TRACE_WORKFLOW
  1111. LOG(MCworkflow, "Starting a new contingency");
  1112. #endif
  1113. }
  1114. void WorkflowMachine::endContingency()
  1115. {
  1116. activeContingencies--;
  1117. #ifdef TRACE_WORKFLOW
  1118. LOG(MCworkflow, "Ending a contingency");
  1119. #endif
  1120. }
  1121. void WorkflowMachine::executeItemParallel(unsigned wfid)
  1122. {
  1123. #ifdef TRACE_WORKFLOW
  1124. LOG(MCworkflow, "Beginning workflow item %u", wfid);
  1125. #endif
  1126. CCloneWorkflowItem & item = queryWorkflowItem(wfid);
  1127. {
  1128. //the critical section ensures that the item is never abandoned at the same time that a different thread would have added it to the item queue.
  1129. //this would cause a problem where the item never gets performed.
  1130. CriticalBlock thisBlock(activationCritSec);
  1131. bool alive = ((!abort) || item.queryContingencyWithin() || item.queryException());
  1132. if(!alive)
  1133. {
  1134. #ifdef TRACE_WORKFLOW
  1135. LOG(MCworkflow, "Ignoring workflow item %u due to abort", wfid);
  1136. #endif
  1137. //item is deactivated because it is no longer alive
  1138. item.deactivate();
  1139. return;
  1140. }
  1141. }
  1142. switch(item.queryState())
  1143. {
  1144. case WFStateDone:
  1145. case WFStateFail:
  1146. throw new WorkflowException(WFERR_ExecutingItemMoreThanOnce, "INTERNAL ERROR: attempting to execute workflow item more than once", wfid, WorkflowException::SYSTEM, MSGAUD_user);
  1147. case WFStateSkip:
  1148. #ifdef TRACE_WORKFLOW
  1149. LOG(MCworkflow, "Nothing to be done for workflow item %u", wfid);
  1150. #endif
  1151. return;
  1152. case WFStateWait:
  1153. throw new WorkflowException(WFERR_ExecutingInWaitState, "INTERNAL ERROR: attempting to execute workflow item in wait state", wfid, WorkflowException::SYSTEM, MSGAUD_user);
  1154. case WFStateBlocked:
  1155. throw new WorkflowException(WFERR_ExecutingInBlockedState, "INTERNAL ERROR: attempting to execute workflow item in blocked state", wfid, WorkflowException::SYSTEM, MSGAUD_user);
  1156. }
  1157. if(item.queryException())
  1158. {
  1159. checkAbort(item, true);
  1160. item.setState(WFStateFail);
  1161. bool hasContingency = activateFailureContingency(item);
  1162. if(hasContingency)
  1163. return;
  1164. if(item.queryContingencyFor())
  1165. {
  1166. bool success = false;
  1167. switch(item.queryType())
  1168. {
  1169. case WFTypeSuccess:
  1170. success = true;
  1171. //fall through
  1172. case WFTypeFailure:
  1173. //This item must be the last item in the contingency to execute, since a contingency cannot have its own contingency
  1174. endContingency();
  1175. branchCount--;
  1176. if(checkIfDone())
  1177. return;
  1178. processDependentSuccessors(queryWorkflowItem(item.queryContingencyFor()));
  1179. if(success)
  1180. processLogicalSuccessors(queryWorkflowItem(item.queryContingencyFor()));
  1181. return;
  1182. }
  1183. }
  1184. processDependentSuccessors(item);
  1185. return;
  1186. }
  1187. else if(!item.isActive())
  1188. {
  1189. //should never happen
  1190. #ifdef TRACE_WORKFLOW
  1191. LOG(MCworkflow, "Ignoring workflow item %u due to inactive state", wfid);
  1192. #endif
  1193. throwUnexpected();
  1194. }
  1195. else
  1196. {
  1197. try
  1198. {
  1199. switch(item.queryMode())
  1200. {
  1201. case WFModeNormal:
  1202. case WFModeOnce:
  1203. doExecuteItemParallel(item);
  1204. break;
  1205. case WFModeCondition:
  1206. case WFModeSequential:
  1207. case WFModeParallel:
  1208. break;
  1209. case WFModeConditionExpression:
  1210. doExecuteConditionExpression(item);
  1211. break;
  1212. case WFModePersist:
  1213. doExecutePersistItem(item);
  1214. break;
  1215. case WFModeCritical:
  1216. case WFModeBeginWait:
  1217. case WFModeWait:
  1218. throwUnexpected();
  1219. default:
  1220. throwUnexpected();
  1221. }
  1222. item.setState(WFStateDone);
  1223. unsigned successWfid = item.querySuccess();
  1224. if(successWfid)
  1225. {
  1226. startContingency();
  1227. CCloneWorkflowItem & successActivator = queryWorkflowItem(successWfid);
  1228. successActivator.setContingencyWithin(item.queryWfid());
  1229. processLogicalSuccessors(successActivator);
  1230. return;
  1231. }
  1232. }
  1233. catch(WorkflowException * e)
  1234. {
  1235. Owned<WorkflowException> savedException = e;
  1236. bool hasContingency = handleFailureParallel(item, e);
  1237. //If the contingency exists, it must be fully performed before processSuccessors is called on the current item
  1238. //Until the clause finishes, any items dependent on the current item shouldn't execute.
  1239. if(hasContingency)
  1240. return;
  1241. }
  1242. }
  1243. if(!done)
  1244. {
  1245. bool success = false;
  1246. bool alive = false;
  1247. switch(item.queryType())
  1248. {
  1249. case WFTypeNormal:
  1250. //NOTE - doesn't need to be protected by the activationCritSec
  1251. {
  1252. alive = ((!abort) || item.queryContingencyWithin() || item.queryException());
  1253. }
  1254. if(alive)
  1255. {
  1256. processDependentSuccessors(item);
  1257. processLogicalSuccessors(item);
  1258. }
  1259. break;
  1260. case WFTypeSuccess:
  1261. success = true;
  1262. //fall through
  1263. case WFTypeFailure:
  1264. //This item must be the last item in the contingency to execute, since a contingency cannot have its own contingency
  1265. endContingency();
  1266. if(item.queryException())
  1267. branchCount--;
  1268. if(checkIfDone())
  1269. return;
  1270. processDependentSuccessors(queryWorkflowItem(item.queryContingencyFor()));
  1271. if(success)
  1272. processLogicalSuccessors(queryWorkflowItem(item.queryContingencyFor()));
  1273. //An item with type Success/Failure has no successors belonging to it
  1274. return;
  1275. }
  1276. }
  1277. #ifdef TRACE_WORKFLOW
  1278. LOG(MCworkflow, "Done workflow item %u", wfid);
  1279. #endif
  1280. }
  1281. void WorkflowMachine::doExecuteItemParallel(IRuntimeWorkflowItem & item)
  1282. {
  1283. try
  1284. {
  1285. performItemParallel(item.queryWfid());
  1286. }
  1287. catch(WorkflowException * ein)
  1288. {
  1289. if (ein->queryWfid() == 0)
  1290. {
  1291. StringBuffer msg;
  1292. ein->errorMessage(msg);
  1293. WorkflowException * newException = new WorkflowException(ein->errorCode(), msg.str(), item.queryWfid(), ein->queryType(), ein->errorAudience());
  1294. ein->Release();
  1295. ein = newException;
  1296. }
  1297. if(ein->queryType() == WorkflowException::ABORT)
  1298. throw ein;
  1299. //recovery will be added in a subsequent PR (Jira issue HPCC-24261)
  1300. //if(!attemptRetry(item, 0, scheduledWfid))
  1301. {
  1302. throw ein;
  1303. }
  1304. ein->Release();
  1305. }
  1306. catch(IException * ein)
  1307. {
  1308. checkForAbort(item.queryWfid(), ein);
  1309. //if(!attemptRetry(item, 0, scheduledWfid))
  1310. {
  1311. StringBuffer msg;
  1312. ein->errorMessage(msg);
  1313. WorkflowException::Type type = ((ein != NULL) ? WorkflowException::USER : WorkflowException::SYSTEM);
  1314. WorkflowException * eout = new WorkflowException(ein->errorCode(), msg.str(), item.queryWfid(), type, ein->errorAudience());
  1315. ein->Release();
  1316. throw eout;
  1317. }
  1318. ein->Release();
  1319. }
  1320. }
  1321. void WorkflowMachine::doExecuteConditionExpression(CCloneWorkflowItem & item)
  1322. {
  1323. bool result;
  1324. {
  1325. //To prevent the callback that modifies "condition" from having a race condition
  1326. CriticalBlock thisBlock(conditionCritSec);
  1327. doExecuteItemParallel(item);
  1328. result = condition;
  1329. }
  1330. //index 0 contains true successor, index 1 contains false successor
  1331. Owned<IWorkflowDependencyIterator> iter = item.getLogicalSuccessors();
  1332. if(!iter->first())
  1333. throwUnexpected();
  1334. unsigned wfidTrue = iter->query();
  1335. unsigned wfidFalse = 0;
  1336. if(iter->next())
  1337. wfidFalse = iter->query();
  1338. if(result)
  1339. {
  1340. CCloneWorkflowItem &trueActivator = queryWorkflowItem(wfidTrue);
  1341. trueActivator.setContingencyWithin(item.queryContingencyWithin());
  1342. processLogicalSuccessors(trueActivator);
  1343. }
  1344. else
  1345. {
  1346. if(wfidFalse)
  1347. {
  1348. CCloneWorkflowItem &falseActivator = queryWorkflowItem(wfidFalse);
  1349. falseActivator.setContingencyWithin(item.queryContingencyWithin());
  1350. processLogicalSuccessors(falseActivator);
  1351. }
  1352. else
  1353. {
  1354. //This function will be called again later. It is called twice, so that the parent condition item has the correct number of dependencies decremented.
  1355. processDependentSuccessors(item);
  1356. }
  1357. }
  1358. item.removeLogicalSuccessors();
  1359. }
  1360. void WorkflowMachine::performItemParallel(unsigned wfid)
  1361. {
  1362. #ifdef TRACE_WORKFLOW
  1363. LOG(MCworkflow, "Performing workflow item %u", wfid);
  1364. #endif
  1365. timestamp_type startTime = getTimeStampNowValue();
  1366. CCycleTimer timer;
  1367. process->perform(ctx, wfid);
  1368. noteTiming(wfid, startTime, timer.elapsedNs());
  1369. }
  1370. bool WorkflowMachine::handleFailureParallel(CCloneWorkflowItem & item, WorkflowException * e)
  1371. {
  1372. item.setException(e);
  1373. branchCount++;
  1374. StringBuffer msg;
  1375. e->errorMessage(msg).append(" (in item ").append(e->queryWfid()).append(")");
  1376. logctx.logOperatorException(NULL, NULL, 0, "%d: %s", e->errorCode(), msg.str());
  1377. item.setFailInfo(e->errorCode(), msg.str());
  1378. item.setState(WFStateFail);
  1379. if(!item.queryContingencyWithin())
  1380. {
  1381. checkAbort(item, false);
  1382. }
  1383. else
  1384. {
  1385. WFState contingencyState = queryWorkflowItem(item.queryContingencyWithin()).queryState();
  1386. if(contingencyState == WFStateDone)
  1387. reportContingencyFailure("SUCCESS", e);
  1388. else if(contingencyState == WFStateFail)
  1389. reportContingencyFailure("FAILURE", e);
  1390. else
  1391. reportContingencyFailure("Unknown", e);
  1392. }
  1393. return activateFailureContingency(item);
  1394. }
  1395. CCloneWorkflowItem &WorkflowMachine::queryWorkflowItem(unsigned wfid)
  1396. {
  1397. if(wfid <= workflow->count())
  1398. {
  1399. return static_cast<CCloneWorkflowItem&>(workflow->queryWfid(wfid));
  1400. }
  1401. else
  1402. {
  1403. unsigned index = wfid - workflow->count() - 1;
  1404. if(index >= logicalWorkflow.size())
  1405. throwUnexpected();
  1406. return static_cast<CCloneWorkflowItem&>(*logicalWorkflow[index].get());
  1407. }
  1408. }
  1409. bool WorkflowMachine::checkIfDone()
  1410. {
  1411. if((activeContingencies == 0) && (parentReached))
  1412. {
  1413. #ifdef TRACE_WORKFLOW
  1414. LOG(MCworkflow, "WorkflowMachine::checkifDone. Final check. Branch count: %u", branchCount.load());
  1415. #endif
  1416. if((branchCount == 0))
  1417. {
  1418. #ifdef TRACE_WORKFLOW
  1419. LOG(MCworkflow, "workflow done");
  1420. #endif
  1421. done = true;
  1422. wfItemQueueSem.signal(numThreads);
  1423. return true;
  1424. }
  1425. }
  1426. return false;
  1427. }
  1428. void WorkflowMachine::processWfItems()
  1429. {
  1430. while(!done)
  1431. {
  1432. wfItemQueueSem.wait();
  1433. if(!done)
  1434. {
  1435. unsigned currentWfid = 0;
  1436. {
  1437. CriticalBlock thisBlock(queueCritSec);
  1438. currentWfid = wfItemQueue.front();
  1439. wfItemQueue.pop();
  1440. }
  1441. try
  1442. {
  1443. executeItemParallel(currentWfid);
  1444. }
  1445. //terminate threads on fatal exception and save error
  1446. catch(WorkflowException * e)
  1447. {
  1448. runtimeError.setown(e);
  1449. done = true;
  1450. wfItemQueueSem.signal(numThreads); //MORE: think about interrupting other threads
  1451. break;
  1452. }
  1453. }
  1454. }
  1455. }
  1456. void WorkflowMachine::performParallel(IGlobalCodeContext *_ctx, IEclProcess *_process)
  1457. {
  1458. #ifdef TRACE_WORKFLOW
  1459. LOG(MCworkflow, "starting perform parallel");
  1460. #endif
  1461. ctx = _ctx;
  1462. process = _process;
  1463. //relink workflow
  1464. #ifdef TRACE_WORKFLOW
  1465. LOG(MCworkflow, "Starting to mark Items with their successors");
  1466. #endif
  1467. addSuccessors();
  1468. #ifdef TRACE_WORKFLOW
  1469. LOG(MCworkflow, "Finished marking Items with their successors");
  1470. #endif
  1471. #ifdef TRACE_WORKFLOW
  1472. LOG(MCworkflow, "Initialising threads");
  1473. #endif
  1474. //initialise thread count
  1475. numThreads = getThreadNumFlag();
  1476. if(numThreads < 1)
  1477. numThreads = 4;
  1478. unsigned maxThreads = getAffinityCpus();
  1479. if(numThreads > maxThreads)
  1480. numThreads = maxThreads;
  1481. #ifdef TRACE_WORKFLOW
  1482. LOG(MCworkflow, "num threads = %u", numThreads);
  1483. #endif
  1484. std::vector<std::thread *> threads(numThreads);
  1485. //NOTE: Initial work items have already been added to the queue by addSuccessors (above)
  1486. //Start threads
  1487. for(int i=0; i < numThreads; i++)
  1488. threads[i] = new std::thread([this]() { this->processWfItems(); });
  1489. #ifdef TRACE_WORKFLOW
  1490. LOG(MCworkflow, "Calling join threads");
  1491. #endif
  1492. //wait for threads to process the workflow items, and then exit when all the work is done
  1493. for(int i=0; i < numThreads; i++)
  1494. threads[i]->join();
  1495. #ifdef TRACE_WORKFLOW
  1496. LOG(MCworkflow, "Destroying threads");
  1497. #endif
  1498. for(int i=0; i < numThreads; i++)
  1499. delete threads[i];
  1500. if(runtimeError)
  1501. throw runtimeError.getClear();
  1502. }
  1503. bool WorkflowMachine::isParallelViable()
  1504. {
  1505. //initialise parallel flag from workunit
  1506. parallel = getParallelFlag();
  1507. if(!parallel)
  1508. {
  1509. return false;
  1510. }
  1511. for(int i = 1; i <= workflow->count(); i++)
  1512. {
  1513. CCloneWorkflowItem & cur = queryWorkflowItem(i);
  1514. #ifdef TRACE_WORKFLOW
  1515. LOG(MCworkflow, "Checking Item %u to decide if parallel is viable", i);
  1516. #endif
  1517. //list of exceptions for currently unsupported modes/types
  1518. switch(cur.queryMode())
  1519. {
  1520. case WFModeWait:
  1521. case WFModeBeginWait:
  1522. case WFModeCritical:
  1523. case WFModePersist:
  1524. case WFModeOnce:
  1525. return false;
  1526. }
  1527. switch(cur.queryType())
  1528. {
  1529. case WFTypeRecovery:
  1530. return false;
  1531. }
  1532. //switch(cur.queryState())
  1533. if(cur.isScheduled() && (!cur.isScheduledNow()))
  1534. return false;
  1535. }
  1536. return true;
  1537. }
  1538. //The process parameter defines the c++ task associated with each workflowItem
  1539. //These are executed in the context/scope of the 'agent' which calls perform()
  1540. void WorkflowMachine::perform(IGlobalCodeContext *_ctx, IEclProcess *_process)
  1541. {
  1542. #ifdef TRACE_WORKFLOW
  1543. LOG(MCworkflow, "starting perform");
  1544. #endif
  1545. ctx = _ctx;
  1546. process = _process;
  1547. //This is where the 'agent' initialises the workflow engine with an array of workflowItems, with their dependencies
  1548. begin();
  1549. if(isParallelViable())
  1550. {
  1551. performParallel(_ctx, _process);
  1552. return;
  1553. }
  1554. Owned<WorkflowException> error;
  1555. bool scheduling = workflow->hasScheduling();
  1556. if(scheduling)
  1557. schedulingStart();
  1558. bool more = false;
  1559. do
  1560. {
  1561. Owned<IRuntimeWorkflowItem> item;
  1562. Owned<IRuntimeWorkflowItemIterator> iter = workflow->getSequenceIterator();
  1563. itemsWaiting = 0;
  1564. itemsUnblocked = 0;
  1565. if (iter->first())
  1566. {
  1567. while (iter->isValid())
  1568. {
  1569. try
  1570. {
  1571. item.setown(iter->get());
  1572. switch(item->queryState())
  1573. {
  1574. case WFStateReqd:
  1575. case WFStateFail:
  1576. if(!error)
  1577. {
  1578. unsigned wfid = item->queryWfid();
  1579. executeItem(wfid, wfid);
  1580. }
  1581. break;
  1582. }
  1583. }
  1584. catch(WorkflowException * e)
  1585. {
  1586. error.setown(e);
  1587. }
  1588. if(item->queryState() == WFStateWait) itemsWaiting++;
  1589. if(error) break; //MORE: will not want to break in situations where there might be pending contingency clauses
  1590. if(scheduling && schedulingPull())
  1591. {
  1592. itemsWaiting = 0;
  1593. iter.setown(workflow->getSequenceIterator());
  1594. if(!iter->first()) break;
  1595. }
  1596. else
  1597. if(!iter->next()) break;
  1598. }
  1599. }
  1600. if(error) break; //MORE: will not want to break in situations where there might be pending contingency clauses
  1601. if(scheduling)
  1602. more = schedulingPullStop();
  1603. } while(more || itemsUnblocked);
  1604. end();
  1605. if(error)
  1606. throw error.getLink();
  1607. }
  1608. bool WorkflowMachine::executeItem(unsigned wfid, unsigned scheduledWfid)
  1609. {
  1610. #ifdef TRACE_WORKFLOW
  1611. LOG(MCworkflow, "Beginning workflow item %u", wfid);
  1612. #endif
  1613. IRuntimeWorkflowItem & item = workflow->queryWfid(wfid);
  1614. switch(item.queryState())
  1615. {
  1616. case WFStateDone:
  1617. if (item.queryMode() == WFModePersist)
  1618. {
  1619. #ifdef TRACE_WORKFLOW
  1620. LOG(MCworkflow, "Recheck persist %u", wfid);
  1621. #endif
  1622. break;
  1623. }
  1624. #ifdef TRACE_WORKFLOW
  1625. LOG(MCworkflow, "Nothing to be done for workflow item %u", wfid);
  1626. #endif
  1627. return true;
  1628. case WFStateSkip:
  1629. #ifdef TRACE_WORKFLOW
  1630. LOG(MCworkflow, "Nothing to be done for workflow item %u", wfid);
  1631. #endif
  1632. return true;
  1633. case WFStateWait:
  1634. throw new WorkflowException(WFERR_ExecutingInWaitState, "INTERNAL ERROR: attempting to execute workflow item in wait state", wfid, WorkflowException::SYSTEM, MSGAUD_user);
  1635. case WFStateBlocked:
  1636. throw new WorkflowException(WFERR_ExecutingInBlockedState, "INTERNAL ERROR: attempting to execute workflow item in blocked state", wfid, WorkflowException::SYSTEM, MSGAUD_user);
  1637. case WFStateFail:
  1638. item.reset();
  1639. break;
  1640. }
  1641. switch(item.queryMode())
  1642. {
  1643. case WFModeNormal:
  1644. case WFModeOnce:
  1645. if (!doExecuteItemDependencies(item, wfid))
  1646. return false;
  1647. doExecuteItem(item, scheduledWfid);
  1648. break;
  1649. case WFModeCondition:
  1650. if (!doExecuteConditionItem(item, scheduledWfid))
  1651. return false;
  1652. break;
  1653. case WFModeSequential:
  1654. case WFModeParallel:
  1655. if (!doExecuteItemDependencies(item, scheduledWfid))
  1656. return false;
  1657. break;
  1658. case WFModePersist:
  1659. doExecutePersistItem(item);
  1660. break;
  1661. case WFModeCritical:
  1662. doExecuteCriticalItem(item);
  1663. break;
  1664. case WFModeBeginWait:
  1665. doExecuteBeginWaitItem(item, scheduledWfid);
  1666. item.setState(WFStateDone);
  1667. return false;
  1668. case WFModeWait:
  1669. doExecuteEndWaitItem(item);
  1670. break;
  1671. default:
  1672. throwUnexpected();
  1673. }
  1674. switch(item.queryType())
  1675. {
  1676. case WFTypeNormal:
  1677. if(item.isScheduled() && !item.isScheduledNow() && item.decAndTestScheduleCountRemaining())
  1678. item.setState(WFStateWait);
  1679. else
  1680. item.setState(WFStateDone);
  1681. break;
  1682. case WFTypeSuccess:
  1683. case WFTypeFailure:
  1684. item.setState(WFStateNull);
  1685. break;
  1686. case WFTypeRecovery:
  1687. item.setState(WFStateSkip);
  1688. break;
  1689. }
  1690. if(item.querySuccess())
  1691. {
  1692. try
  1693. {
  1694. executeItem(item.querySuccess(), scheduledWfid);
  1695. }
  1696. catch(WorkflowException * ce)
  1697. {
  1698. if(ce->queryType() == WorkflowException::ABORT)
  1699. throw;
  1700. reportContingencyFailure("SUCCESS", ce);
  1701. ce->Release();
  1702. }
  1703. }
  1704. #ifdef TRACE_WORKFLOW
  1705. LOG(MCworkflow, "Done workflow item %u", wfid);
  1706. #endif
  1707. return true;
  1708. }
  1709. bool WorkflowMachine::doExecuteItemDependencies(IRuntimeWorkflowItem & item, unsigned scheduledWfid)
  1710. {
  1711. Owned<IWorkflowDependencyIterator> iter = item.getDependencies();
  1712. for(iter->first(); iter->isValid(); iter->next())
  1713. {
  1714. if (!doExecuteItemDependency(item, iter->query(), scheduledWfid, false))
  1715. return false;
  1716. }
  1717. return true;
  1718. }
  1719. bool WorkflowMachine::doExecuteItemDependency(IRuntimeWorkflowItem & item, unsigned wfid, unsigned scheduledWfid, bool alwaysEvaluate)
  1720. {
  1721. try
  1722. {
  1723. if (alwaysEvaluate)
  1724. workflow->queryWfid(wfid).setState(WFStateNull);
  1725. return executeItem(wfid, scheduledWfid);
  1726. }
  1727. catch(WorkflowException * e)
  1728. {
  1729. if(e->queryType() == WorkflowException::ABORT)
  1730. throw;
  1731. if(!attemptRetry(item, wfid, scheduledWfid))
  1732. {
  1733. handleFailure(item, e, true);
  1734. throw;
  1735. }
  1736. e->Release();
  1737. }
  1738. return true;//more!
  1739. }
  1740. void WorkflowMachine::doExecuteItem(IRuntimeWorkflowItem & item, unsigned scheduledWfid)
  1741. {
  1742. try
  1743. {
  1744. performItem(item.queryWfid(), scheduledWfid);
  1745. }
  1746. catch(WorkflowException * ein)
  1747. {
  1748. if (ein->queryWfid() == 0)
  1749. {
  1750. StringBuffer msg;
  1751. ein->errorMessage(msg);
  1752. WorkflowException * newException = new WorkflowException(ein->errorCode(), msg.str(), item.queryWfid(), ein->queryType(), ein->errorAudience());
  1753. ein->Release();
  1754. ein = newException;
  1755. }
  1756. if(ein->queryType() == WorkflowException::ABORT)
  1757. throw ein;
  1758. if(!attemptRetry(item, 0, scheduledWfid))
  1759. {
  1760. handleFailure(item, ein, true);
  1761. throw ein;
  1762. }
  1763. ein->Release();
  1764. }
  1765. catch(IException * ein)
  1766. {
  1767. checkForAbort(item.queryWfid(), ein);
  1768. if(!attemptRetry(item, 0, scheduledWfid))
  1769. {
  1770. StringBuffer msg;
  1771. ein->errorMessage(msg);
  1772. WorkflowException::Type type = ((dynamic_cast<IUserException *>(ein) != NULL) ? WorkflowException::USER : WorkflowException::SYSTEM);
  1773. WorkflowException * eout = new WorkflowException(ein->errorCode(), msg.str(), item.queryWfid(), type, ein->errorAudience());
  1774. ein->Release();
  1775. handleFailure(item, eout, false);
  1776. throw eout;
  1777. }
  1778. ein->Release();
  1779. }
  1780. }
  1781. bool WorkflowMachine::doExecuteConditionItem(IRuntimeWorkflowItem & item, unsigned scheduledWfid)
  1782. {
  1783. Owned<IWorkflowDependencyIterator> iter = item.getDependencies();
  1784. if(!iter->first()) throwUnexpected();
  1785. unsigned wfidCondition = iter->query();
  1786. if(!iter->next()) throwUnexpected();
  1787. unsigned wfidTrue = iter->query();
  1788. unsigned wfidFalse = 0;
  1789. if(iter->next()) wfidFalse = iter->query();
  1790. if(iter->next()) throwUnexpected();
  1791. if (!doExecuteItemDependency(item, wfidCondition, scheduledWfid, true))
  1792. return false;
  1793. if(condition)
  1794. return doExecuteItemDependency(item, wfidTrue, scheduledWfid, false);
  1795. else if (wfidFalse)
  1796. return doExecuteItemDependency(item, wfidFalse, scheduledWfid, false);
  1797. return true;
  1798. }
  1799. void WorkflowMachine::doExecuteBeginWaitItem(IRuntimeWorkflowItem & item, unsigned scheduledWfid)
  1800. {
  1801. #ifdef TRACE_WORKFLOW
  1802. LOG(MCworkflow, "Begin wait for workflow item %u sched %u", item.queryWfid(), scheduledWfid);
  1803. #endif
  1804. //Block execution of the currently executing scheduled item
  1805. IRuntimeWorkflowItem & scheduledItem = workflow->queryWfid(scheduledWfid);
  1806. assertex(scheduledItem.queryState() == WFStateReqd);
  1807. scheduledItem.setState(WFStateBlocked);
  1808. //And increment the count on the wait wf item so it becomes active
  1809. Owned<IWorkflowDependencyIterator> iter = item.getDependencies();
  1810. if(!iter->first()) throwUnexpected();
  1811. unsigned waitWfid = iter->query();
  1812. if(iter->next()) throwUnexpected();
  1813. IRuntimeWorkflowItem & waitItem = workflow->queryWfid(waitWfid);
  1814. assertex(waitItem.queryState() == WFStateDone);
  1815. waitItem.incScheduleCount();
  1816. waitItem.setState(WFStateWait);
  1817. itemsWaiting++;
  1818. }
  1819. void WorkflowMachine::doExecuteEndWaitItem(IRuntimeWorkflowItem & item)
  1820. {
  1821. //Unblock the scheduled workflow item, which should mean execution continues.
  1822. unsigned scheduledWfid = item.queryScheduledWfid();
  1823. #ifdef TRACE_WORKFLOW
  1824. LOG(MCworkflow, "Finished wait for workflow sched %u", scheduledWfid);
  1825. #endif
  1826. IRuntimeWorkflowItem & scheduledItem = workflow->queryWfid(scheduledWfid);
  1827. assertex(scheduledItem.queryState() == WFStateBlocked);
  1828. scheduledItem.setState(WFStateReqd);
  1829. itemsUnblocked++;
  1830. //Note this would be more efficient implemented more like a state machine
  1831. //(with next processing rather than walking from the top down),
  1832. //but that will require some more work.
  1833. }
  1834. bool WorkflowMachine::isOlderThanPersist(time_t when, IRuntimeWorkflowItem & item)
  1835. {
  1836. time_t thisTime;
  1837. if (!getPersistTime(thisTime, item))
  1838. return false; // if no time must be older than the persist
  1839. return when < thisTime;
  1840. }
  1841. bool WorkflowMachine::isOlderThanInputPersists(time_t when, IRuntimeWorkflowItem & item)
  1842. {
  1843. Owned<IWorkflowDependencyIterator> iter = item.getDependencies();
  1844. ForEach(*iter)
  1845. {
  1846. unsigned cur = iter->query();
  1847. IRuntimeWorkflowItem & other = workflow->queryWfid(cur);
  1848. if (isPersist(other))
  1849. {
  1850. if (isOlderThanPersist(when, other))
  1851. return true;
  1852. }
  1853. else
  1854. {
  1855. if (isOlderThanInputPersists(when, other))
  1856. return true;
  1857. }
  1858. }
  1859. return false;
  1860. }
  1861. bool WorkflowMachine::isItemOlderThanInputPersists(IRuntimeWorkflowItem & item)
  1862. {
  1863. time_t curWhen;
  1864. if (!getPersistTime(curWhen, item))
  1865. return false; // if no time then old and can't tell
  1866. return isOlderThanInputPersists(curWhen, item);
  1867. }
  1868. void WorkflowMachine::performItem(unsigned wfid, unsigned scheduledWfid)
  1869. {
  1870. #ifdef TRACE_WORKFLOW
  1871. if(currentWfid)
  1872. LOG(MCworkflow, "Branching from workflow item %u", currentWfid);
  1873. LOG(MCworkflow, "Performing workflow item %u", wfid);
  1874. #endif
  1875. wfidStack.append(currentWfid);
  1876. wfidStack.append(scheduledWfid);
  1877. currentWfid = wfid;
  1878. currentScheduledWfid = scheduledWfid;
  1879. timestamp_type startTime = getTimeStampNowValue();
  1880. CCycleTimer timer;
  1881. process->perform(ctx, wfid);
  1882. noteTiming(wfid, startTime, timer.elapsedNs());
  1883. scheduledWfid = wfidStack.popGet();
  1884. currentWfid = wfidStack.popGet();
  1885. if(currentWfid)
  1886. {
  1887. #ifdef TRACE_WORKFLOW
  1888. LOG(MCworkflow, "Returning to workflow item %u", currentWfid);
  1889. #endif
  1890. }
  1891. }
  1892. bool WorkflowMachine::attemptRetry(IRuntimeWorkflowItem & item, unsigned dep, unsigned scheduledWfid)
  1893. {
  1894. unsigned wfid = item.queryWfid();
  1895. unsigned recovery = item.queryRecovery();
  1896. if(!recovery)
  1897. return false;
  1898. while(item.testAndDecRetries())
  1899. {
  1900. bool okay = true;
  1901. try
  1902. {
  1903. workflow->queryWfid(recovery).setState(WFStateNull);
  1904. executeItem(recovery, recovery);
  1905. if(dep)
  1906. executeItem(dep, scheduledWfid);
  1907. else
  1908. performItem(wfid, scheduledWfid);
  1909. }
  1910. catch(WorkflowException * ce)
  1911. {
  1912. okay = false;
  1913. if(ce->queryType() == WorkflowException::ABORT)
  1914. throw;
  1915. reportContingencyFailure("RECOVERY", ce);
  1916. ce->Release();
  1917. }
  1918. catch(IException * ce)
  1919. {
  1920. okay = false;
  1921. checkForAbort(wfid, ce);
  1922. reportContingencyFailure("RECOVERY", ce);
  1923. ce->Release();
  1924. }
  1925. if(okay)
  1926. return true;
  1927. }
  1928. return false;
  1929. }
  1930. void WorkflowMachine::handleFailure(IRuntimeWorkflowItem & item, WorkflowException const * e, bool isDep)
  1931. {
  1932. StringBuffer msg;
  1933. e->errorMessage(msg).append(" (in item ").append(e->queryWfid()).append(")");
  1934. if(isDep)
  1935. logctx.logOperatorException(NULL, NULL, 0, "Dependency failure for workflow item %u: %d: %s", item.queryWfid(), e->errorCode(), msg.str());
  1936. else
  1937. logctx.logOperatorException(NULL, NULL, 0, "%d: %s", e->errorCode(), msg.str());
  1938. item.setFailInfo(e->errorCode(), msg.str());
  1939. switch(item.queryType())
  1940. {
  1941. case WFTypeNormal:
  1942. item.setState(WFStateFail);
  1943. break;
  1944. case WFTypeSuccess:
  1945. case WFTypeFailure:
  1946. item.setState(WFStateNull);
  1947. break;
  1948. case WFTypeRecovery:
  1949. item.setState(WFStateSkip);
  1950. break;
  1951. }
  1952. unsigned failureWfid = item.queryFailure();
  1953. if(failureWfid)
  1954. {
  1955. try
  1956. {
  1957. executeItem(failureWfid, failureWfid);
  1958. }
  1959. catch(WorkflowException * ce)
  1960. {
  1961. if(ce->queryType() == WorkflowException::ABORT)
  1962. throw;
  1963. reportContingencyFailure("FAILURE", ce);
  1964. ce->Release();
  1965. }
  1966. }
  1967. }
  1968. int WorkflowMachine::queryLastFailCode() const
  1969. {
  1970. unsigned wfidFor = workflow->queryWfid(currentWfid).queryContingencyFor();
  1971. if(!wfidFor)
  1972. return 0;
  1973. return workflow->queryWfid(wfidFor).queryFailCode();
  1974. }
  1975. char const * WorkflowMachine::queryLastFailMessage() const
  1976. {
  1977. unsigned wfidFor = workflow->queryWfid(currentWfid).queryContingencyFor();
  1978. if(!wfidFor)
  1979. return "";
  1980. char const * ret = workflow->queryWfid(wfidFor).queryFailMessage();
  1981. return ret ? ret : "";
  1982. }
  1983. const char * WorkflowMachine::queryEventName() const
  1984. {
  1985. //MORE: This doesn't work so well once we've done SEQUENTIAL transforms if they split a wf item into 2
  1986. return workflow->queryWfid(currentWfid).queryEventName();
  1987. }
  1988. const char * WorkflowMachine::queryEventExtra() const
  1989. {
  1990. //MORE: This doesn't work so well once we've done SEQUENTIAL transforms if they split a wf item into 2
  1991. return workflow->queryWfid(currentWfid).queryEventExtra();
  1992. }
  1993. IWorkflowItemIterator *createWorkflowItemIterator(IPropertyTree *p)
  1994. {
  1995. return new CWorkflowItemIterator(p);
  1996. }
  1997. IWorkflowItemArray *createWorkflowItemArray(unsigned size)
  1998. {
  1999. return new CCloneWorkflowItemArray(size);
  2000. }
  2001. IWorkflowItem *createWorkflowItem(IPropertyTree * ptree, unsigned wfid, WFType type, WFMode mode, unsigned success, unsigned failure, unsigned recovery, unsigned retriesAllowed, unsigned contingencyFor)
  2002. {
  2003. return new CWorkflowItem(ptree, wfid, type, mode, success, failure, recovery, retriesAllowed, contingencyFor);
  2004. }