workflow.cpp 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "jlib.hpp"
  15. #include "workunit.hpp"
  16. #include "jptree.hpp"
  17. #include "jlog.hpp"
  18. #include "jregexp.hpp"
  19. #include "workflow.hpp"
  20. //------------------------------------------------------------------------------------------
  21. // Workflow
  22. struct mapEnums { int val; const char *str; };
  23. mapEnums wftypes[] =
  24. {
  25. { WFTypeNormal, "normal" },
  26. { WFTypeSuccess, "success" },
  27. { WFTypeFailure, "failure" },
  28. { WFTypeRecovery, "recovery" },
  29. { WFTypeWait, "wait" },
  30. { WFTypeSize, NULL }
  31. };
  32. mapEnums wfmodes[] =
  33. {
  34. { WFModeNormal, "normal" },
  35. { WFModeCondition, "condition" },
  36. { WFModeSequential, "sequential" },
  37. { WFModeParallel, "parallel" },
  38. { WFModePersist, "persist" },
  39. { WFModeBeginWait, "bwait" },
  40. { WFModeWait, "wait" },
  41. { WFModeOnce, "once" },
  42. { WFModeSize, NULL}
  43. };
  44. mapEnums wfstates[] =
  45. {
  46. { WFStateNull, "null" },
  47. { WFStateReqd, "reqd" },
  48. { WFStateDone, "done" },
  49. { WFStateFail, "fail" },
  50. { WFStateSkip, "skip" },
  51. { WFStateWait, "wait" },
  52. { WFStateBlocked, "block" },
  53. { WFStateSize, NULL }
  54. };
  55. static void setEnum(IPropertyTree *p, const char *propname, int value, mapEnums *map)
  56. {
  57. const char *defval = map->str;
  58. while (map->str)
  59. {
  60. if (value==map->val)
  61. {
  62. p->setProp(propname, map->str);
  63. return;
  64. }
  65. map++;
  66. }
  67. assertex(!"Unexpected value in setEnum");
  68. p->setProp(propname, defval);
  69. }
  70. static int getEnum(IPropertyTree *p, const char *propname, mapEnums *map)
  71. {
  72. const char *v = p->queryProp(propname);
  73. if (v)
  74. {
  75. while (map->str)
  76. {
  77. if (stricmp(v, map->str)==0)
  78. return map->val;
  79. map++;
  80. }
  81. assertex(!"Unexpected value in getEnum");
  82. }
  83. return 0;
  84. }
  85. class CWorkflowDependencyIterator : public CInterface, implements IWorkflowDependencyIterator
  86. {
  87. public:
  88. CWorkflowDependencyIterator(IPropertyTree * tree) { iter.setown(tree->getElements("Dependency")); }
  89. IMPLEMENT_IINTERFACE;
  90. bool first() { return iter->first(); }
  91. bool isValid() { return iter->isValid(); }
  92. bool next() { return iter->next(); }
  93. unsigned query() const { return iter->query().getPropInt("@wfid"); }
  94. private:
  95. Owned<IPropertyTreeIterator> iter;
  96. };
  97. class CWorkflowEvent : public CInterface, implements IWorkflowEvent
  98. {
  99. public:
  100. CWorkflowEvent(char const * _name, char const * _text) : name(_name), text(_text) {}
  101. IMPLEMENT_IINTERFACE;
  102. virtual char const * queryName() const { return name.get(); }
  103. virtual char const * queryText() const { return text.get(); }
  104. virtual bool matches(char const * trialName, char const * trialText) const { return((strcmp(trialName, name.get()) == 0) && WildMatch(trialText, text.get(), true)); }
  105. private:
  106. StringAttr name;
  107. StringAttr text;
  108. };
  109. class CWorkflowItem : public CInterface, implements IWorkflowItem
  110. {
  111. public:
  112. CWorkflowItem(IPropertyTree & _tree) { tree.setown(&_tree); }
  113. CWorkflowItem(IPropertyTree * ptree, unsigned wfid, WFType type, WFMode mode, unsigned success, unsigned failure, unsigned recovery, unsigned retriesAllowed, unsigned contingencyFor)
  114. {
  115. tree.setown(LINK(ptree->addPropTree("Item", createPTree())));
  116. tree->setPropInt("@wfid", wfid);
  117. setEnum(tree, "@type", type, wftypes);
  118. setEnum(tree, "@mode", mode, wfmodes);
  119. if(success) tree->setPropInt("@success", success);
  120. if(failure) tree->setPropInt("@failure", failure);
  121. if(recovery && retriesAllowed)
  122. {
  123. tree->setPropInt("@recovery", recovery);
  124. tree->setPropInt("@retriesAllowed", retriesAllowed);
  125. tree->addPropTree("Dependency", createPTree())->setPropInt("@wfid", recovery);
  126. }
  127. if(contingencyFor) tree->setPropInt("@contingencyFor", contingencyFor);
  128. reset();
  129. }
  130. IMPLEMENT_IINTERFACE;
  131. //info set at compile time
  132. virtual unsigned queryWfid() const { return tree->getPropInt("@wfid"); }
  133. virtual bool isScheduled() const { return tree->hasProp("Schedule"); }
  134. virtual bool isScheduledNow() const { return (tree->hasProp("Schedule") && !tree->hasProp("Schedule/Event")); }
  135. virtual IWorkflowEvent * getScheduleEvent() const { if(tree->hasProp("Schedule/Event")) return new CWorkflowEvent(tree->queryProp("Schedule/Event/@name"), tree->queryProp("Schedule/Event/@text")); else return NULL; }
  136. virtual unsigned querySchedulePriority() const { return (tree->hasProp("Schedule") ? tree->getPropInt("Schedule/@priority", 0) : 0); }
  137. virtual bool hasScheduleCount() const { return tree->hasProp("Schedule/@count"); }
  138. virtual unsigned queryScheduleCount() const { assertex(tree->hasProp("Schedule/@count")); return tree->getPropInt("Schedule/@count"); }
  139. virtual IWorkflowDependencyIterator * getDependencies() const { return new CWorkflowDependencyIterator(tree); }
  140. virtual WFType queryType() const { return static_cast<WFType>(getEnum(tree, "@type", wftypes)); }
  141. virtual WFMode queryMode() const { return static_cast<WFMode>(getEnum(tree, "@mode", wfmodes)); }
  142. virtual unsigned querySuccess() const { return tree->getPropInt("@success", 0); }
  143. virtual unsigned queryFailure() const { return tree->getPropInt("@failure", 0); }
  144. virtual unsigned queryRecovery() const { return tree->getPropInt("@recovery", 0); }
  145. virtual unsigned queryRetriesAllowed() const { return tree->getPropInt("@retriesAllowed", 0); }
  146. virtual unsigned queryContingencyFor() const { return tree->getPropInt("@contingencyFor", 0); }
  147. virtual IStringVal & getPersistName(IStringVal & val) const { val.set(tree->queryProp("@persistName")); return val; }
  148. virtual unsigned queryPersistWfid() const { return tree->getPropInt("@persistWfid", 0); }
  149. virtual IStringVal & queryCluster(IStringVal & val) const { val.set(tree->queryProp("@cluster")); return val; }
  150. virtual void setScheduledNow() { tree->setPropTree("Schedule", createPTree()); setEnum(tree, "@state", WFStateReqd, wfstates); }
  151. virtual void setScheduledOn(char const * name, char const * text) { IPropertyTree * stree = createPTree(); stree->setProp("@name", name); stree->setProp("@text", text); tree->setPropTree("Schedule", createPTree())->setPropTree("Event", stree); setEnum(tree, "@state", WFStateWait, wfstates); }
  152. virtual void setSchedulePriority(unsigned priority) { assertex(tree->hasProp("Schedule")); tree->setPropInt("Schedule/@priority", priority); }
  153. virtual void setScheduleCount(unsigned count) { assertex(tree->hasProp("Schedule")); tree->setPropInt("Schedule/@count", count); tree->setPropInt("Schedule/@countRemaining", count); }
  154. virtual void addDependency(unsigned wfid) { tree->addPropTree("Dependency", createPTree())->setPropInt("@wfid", wfid); }
  155. virtual void setPersistInfo(char const * name, unsigned wfid) { tree->setProp("@persistName", name); tree->setPropInt("@persistWfid", wfid); }
  156. virtual void setCluster(const char * cluster) { tree->setProp("@cluster", cluster); }
  157. //info set at run time
  158. virtual unsigned queryScheduleCountRemaining() const { assertex(tree->hasProp("Schedule")); return tree->getPropInt("Schedule/@countRemaining"); }
  159. virtual WFState queryState() const { return static_cast<WFState>(getEnum(tree, "@state", wfstates)); }
  160. virtual unsigned queryRetriesRemaining() const { return tree->getPropInt("@retriesRemaining"); }
  161. virtual int queryFailCode() const { return tree->getPropInt("@failcode"); }
  162. virtual char const * queryFailMessage() const { return tree->queryProp("@failmsg"); }
  163. virtual char const * queryEventName() const { return tree->queryProp("@eventname"); }
  164. virtual char const * queryEventExtra() const { return tree->queryProp("@eventextra"); }
  165. virtual void setState(WFState state) { setEnum(tree, "@state", state, wfstates); }
  166. virtual unsigned queryScheduledWfid() const { return tree->getPropInt("@swfid", 0); }
  167. virtual void setScheduledWfid(unsigned wfid) { tree->setPropInt("@swfid", wfid); }
  168. virtual bool testAndDecRetries()
  169. {
  170. assertex(tree->hasProp("@retriesAllowed"));
  171. unsigned rem = tree->getPropInt("@retriesRemaining", 0);
  172. if(rem==0)
  173. return false;
  174. tree->setPropInt("@retriesRemaining", rem-1);
  175. return true;
  176. }
  177. virtual bool decAndTestScheduleCountRemaining()
  178. {
  179. if(!tree->hasProp("Schedule/@count"))
  180. return true;
  181. unsigned rem = tree->getPropInt("Schedule/@countRemaining");
  182. assertex(rem>0);
  183. tree->setPropInt("Schedule/@countRemaining", rem-1);
  184. return (rem>1);
  185. }
  186. virtual void incScheduleCount()
  187. {
  188. unsigned rem = tree->getPropInt("Schedule/@countRemaining");
  189. tree->setPropInt("Schedule/@countRemaining", rem+1);
  190. }
  191. virtual void setFailInfo(int code, char const * message)
  192. {
  193. tree->setPropInt("@failcode", code);
  194. tree->setProp("@failmsg", message);
  195. }
  196. virtual void setEvent(const char * name, const char * extra)
  197. {
  198. if (name)
  199. tree->setProp("@eventname", name);
  200. if (extra)
  201. tree->setProp("@eventextra", extra);
  202. }
  203. virtual void reset()
  204. {
  205. if(tree->hasProp("@retriesAllowed"))
  206. tree->setPropInt("@retriesRemaining", tree->getPropInt("@retriesAllowed"));
  207. if(tree->hasProp("Schedule/@count"))
  208. tree->setPropInt("Schedule/@countRemaining", tree->getPropInt("Schedule/@count"));
  209. tree->removeProp("@failcode");
  210. tree->removeProp("@failmsg");
  211. tree->removeProp("@eventname");
  212. tree->removeProp("@eventtext");
  213. if(isScheduled())
  214. {
  215. if(isScheduledNow())
  216. setState(WFStateReqd);
  217. else if (hasScheduleCount() && (queryScheduleCountRemaining() == 0))
  218. setState(WFStateDone);
  219. else
  220. setState(WFStateWait);
  221. }
  222. else if(queryType() == WFTypeRecovery)
  223. setState(WFStateSkip);
  224. else
  225. setState(WFStateNull);
  226. }
  227. virtual void syncRuntimeData(IConstWorkflowItem const & other)
  228. {
  229. WFState state = other.queryState();
  230. setState(state);
  231. if(tree->hasProp("@retriesAllowed"))
  232. tree->setPropInt("@retriesRemaining", other.queryRetriesRemaining());
  233. if(tree->hasProp("Schedule/@count"))
  234. tree->setPropInt("Schedule/@countRemaining", other.queryScheduleCountRemaining());
  235. if(state == WFStateFail)
  236. {
  237. tree->setPropInt("@failcode", other.queryFailCode());
  238. tree->setProp("@failmsg", other.queryFailMessage());
  239. }
  240. setEvent(other.queryEventName(), other.queryEventExtra());
  241. }
  242. private:
  243. Owned<IPropertyTree> tree;
  244. };
  245. class CCloneWorkflowItem : public CInterface, implements IRuntimeWorkflowItem
  246. {
  247. private:
  248. class CCloneSchedule : public CInterface
  249. {
  250. private:
  251. bool now;
  252. unsigned priority;
  253. bool counting;
  254. unsigned count;
  255. unsigned countRemaining;
  256. Owned<IWorkflowEvent> event;
  257. public:
  258. CCloneSchedule(IConstWorkflowItem const * other)
  259. {
  260. now = other->isScheduledNow();
  261. priority = other->querySchedulePriority();
  262. counting = other->hasScheduleCount();
  263. if(counting)
  264. {
  265. count = other->queryScheduleCount();
  266. countRemaining = other->queryScheduleCountRemaining();
  267. }
  268. else
  269. {
  270. count = 0;
  271. countRemaining = 0;
  272. }
  273. event.setown(other->getScheduleEvent());
  274. }
  275. bool isNow() const { return now; }
  276. unsigned queryPriority() const { return priority; }
  277. bool hasCount() const { return counting; }
  278. unsigned queryCount() const { return count; }
  279. unsigned queryCountRemaining() const { return countRemaining; }
  280. bool decAndTestCountRemaining()
  281. {
  282. if(!counting)
  283. return true;
  284. if(countRemaining)
  285. countRemaining--;
  286. return (countRemaining>0);
  287. }
  288. void incCountRemaining()
  289. {
  290. if(counting)
  291. countRemaining++;
  292. }
  293. void resetCount() { if(counting) countRemaining = count; }
  294. IWorkflowEvent * getEvent() const { return event.getLink(); }
  295. };
  296. class CCloneIterator : public CInterface, public IWorkflowDependencyIterator
  297. {
  298. public:
  299. CCloneIterator(IntArray const & _array) : array(_array), idx(0) {}
  300. IMPLEMENT_IINTERFACE;
  301. virtual bool first() { idx = 0; return isValid(); }
  302. virtual bool isValid() { return array.isItem(idx); }
  303. virtual bool next() { idx++; return isValid(); }
  304. virtual unsigned query() const { return array.item(idx); }
  305. private:
  306. IntArray const & array;
  307. aindex_t idx;
  308. };
  309. unsigned wfid;
  310. Owned<CCloneSchedule> schedule;
  311. IntArray dependencies;
  312. WFType type;
  313. WFMode mode;
  314. unsigned success;
  315. unsigned failure;
  316. unsigned recovery;
  317. unsigned retriesAllowed;
  318. unsigned contingencyFor;
  319. unsigned scheduledWfid;
  320. WFState state;
  321. unsigned retriesRemaining;
  322. int failcode;
  323. StringAttr failmsg;
  324. SCMStringBuffer persistName;
  325. SCMStringBuffer clusterName;
  326. unsigned persistWfid;
  327. StringAttr eventName;
  328. StringAttr eventExtra;
  329. public:
  330. CCloneWorkflowItem() {}
  331. IMPLEMENT_IINTERFACE;
  332. void copy(IConstWorkflowItem const * other)
  333. {
  334. wfid = other->queryWfid();
  335. if(other->isScheduled())
  336. schedule.setown(new CCloneSchedule(other));
  337. Owned<IWorkflowDependencyIterator> iter = other->getDependencies();
  338. for(iter->first(); iter->isValid(); iter->next())
  339. dependencies.append(iter->query());
  340. type = other->queryType();
  341. mode = other->queryMode();
  342. success = other->querySuccess();
  343. failure = other->queryFailure();
  344. recovery = other->queryRecovery();
  345. retriesAllowed = other->queryRetriesAllowed();
  346. contingencyFor = other->queryContingencyFor();
  347. state = other->queryState();
  348. retriesRemaining = other->queryRetriesRemaining();
  349. if(state == WFStateFail)
  350. {
  351. failcode = other->queryFailCode();
  352. failmsg.set(other->queryFailMessage());
  353. }
  354. eventName.set(other->queryEventName());
  355. eventExtra.set(other->queryEventExtra());
  356. other->getPersistName(persistName);
  357. persistWfid = other->queryPersistWfid();
  358. scheduledWfid = other->queryScheduledWfid();
  359. other->queryCluster(clusterName);
  360. }
  361. //info set at compile time
  362. virtual unsigned queryWfid() const { return wfid; }
  363. virtual bool isScheduled() const { return schedule.get() != 0; }
  364. virtual bool isScheduledNow() const { return schedule && schedule->isNow(); }
  365. virtual IWorkflowEvent * getScheduleEvent() const { if(schedule) return schedule->getEvent(); else return NULL; }
  366. virtual unsigned querySchedulePriority() const { return schedule ? schedule->queryPriority() : 0; }
  367. virtual bool hasScheduleCount() const { return schedule ? schedule->hasCount() : false; }
  368. virtual unsigned queryScheduleCount() const { return schedule ? schedule->queryCount() : 0; }
  369. virtual IWorkflowDependencyIterator * getDependencies() const { return new CCloneIterator(dependencies); }
  370. virtual WFType queryType() const { return type; }
  371. virtual WFMode queryMode() const { return mode; }
  372. virtual unsigned querySuccess() const { return success; }
  373. virtual unsigned queryFailure() const { return failure; }
  374. virtual unsigned queryRecovery() const { return recovery; }
  375. virtual unsigned queryRetriesAllowed() const { return retriesAllowed; }
  376. virtual unsigned queryContingencyFor() const { return contingencyFor; }
  377. virtual IStringVal & getPersistName(IStringVal & val) const { val.set(persistName.str()); return val; }
  378. virtual unsigned queryPersistWfid() const { return persistWfid; }
  379. virtual IStringVal & queryCluster(IStringVal & val) const { val.set(clusterName.str()); return val; }
  380. //info set at run time
  381. virtual unsigned queryScheduleCountRemaining() const { return schedule ? schedule->queryCountRemaining() : 0; }
  382. virtual WFState queryState() const { return state; }
  383. virtual unsigned queryRetriesRemaining() const { return retriesRemaining; }
  384. virtual int queryFailCode() const { return failcode; }
  385. virtual char const * queryFailMessage() const { return failmsg.get(); }
  386. virtual char const * queryEventName() const { return eventName; }
  387. virtual char const * queryEventExtra() const { return eventExtra; }
  388. virtual unsigned queryScheduledWfid() const { return scheduledWfid; }
  389. virtual void setState(WFState _state) { state = _state; }
  390. virtual bool testAndDecRetries()
  391. {
  392. if(retriesRemaining == 0)
  393. return false;
  394. retriesRemaining--;
  395. return true;
  396. }
  397. virtual bool decAndTestScheduleCountRemaining()
  398. {
  399. if(!schedule)
  400. return true;
  401. return schedule->decAndTestCountRemaining();
  402. }
  403. virtual void incScheduleCount()
  404. {
  405. if(schedule)
  406. schedule->incCountRemaining();
  407. }
  408. virtual void setFailInfo(int code, char const * message)
  409. {
  410. failcode = code;
  411. failmsg.set(message);
  412. }
  413. virtual void setEvent(const char * name, const char * extra)
  414. {
  415. eventName.set(name);
  416. eventExtra.set(extra);
  417. }
  418. virtual void reset()
  419. {
  420. retriesRemaining = retriesAllowed;
  421. if(schedule) schedule->resetCount();
  422. if(isScheduled())
  423. {
  424. if(isScheduledNow())
  425. setState(WFStateReqd);
  426. else if (hasScheduleCount() && (queryScheduleCountRemaining() == 0))
  427. setState(WFStateDone);
  428. else
  429. setState(WFStateWait);
  430. }
  431. else if(queryType() == WFTypeRecovery)
  432. setState(WFStateSkip);
  433. else
  434. setState(WFStateNull);
  435. }
  436. };
  437. class CWorkflowItemIterator : public CInterface, implements IWorkflowItemIterator
  438. {
  439. public:
  440. CWorkflowItemIterator(IPropertyTree * tree) { iter.setown(tree->getElements("Item")); }
  441. IMPLEMENT_IINTERFACE;
  442. bool first() { item.clear(); return iter->first(); }
  443. bool isValid() { return iter->isValid(); }
  444. bool next() { item.clear(); return iter->next(); }
  445. IConstWorkflowItem * query() const { if(!item) item.setown(new CWorkflowItem(iter->get())); return item.get(); }
  446. IWorkflowItem * get() const { if(!item) item.setown(new CWorkflowItem(iter->get())); return item.getLink(); }
  447. private:
  448. Owned<IPropertyTreeIterator> iter;
  449. mutable Owned<CWorkflowItem> item;
  450. };
  451. class CCloneWorkflowItemArray : public CInterface, implements IWorkflowItemArray
  452. {
  453. private:
  454. class ListItem
  455. {
  456. public:
  457. ListItem(ListItem * _next, IRuntimeWorkflowItem * _item) : next(_next), item(_item) {}
  458. ListItem * next;
  459. IRuntimeWorkflowItem * item;
  460. };
  461. class ListItemPtr : public CInterface, implements IRuntimeWorkflowItemIterator
  462. {
  463. public:
  464. ListItemPtr(ListItem * _start) : start(_start) { ptr = NULL; }
  465. IMPLEMENT_IINTERFACE;
  466. virtual bool first() { ptr = start; return isValid(); }
  467. virtual bool isValid() { return ptr != NULL; }
  468. virtual bool next() { ptr = ptr->next; return isValid(); }
  469. virtual IConstWorkflowItem * query() const { return ptr->item; }
  470. virtual IRuntimeWorkflowItem * get() const { return LINK(ptr->item); }
  471. private:
  472. ListItem * start;
  473. ListItem * ptr;
  474. };
  475. void insert(CCloneWorkflowItem * item)
  476. {
  477. if(!item->isScheduled())
  478. return;
  479. if(!head)
  480. head = tail = new ListItem(NULL, item);
  481. else if(item->querySchedulePriority() > head->item->querySchedulePriority())
  482. head = new ListItem(head, item);
  483. else if(item->querySchedulePriority() <= tail->item->querySchedulePriority())
  484. {
  485. tail->next = new ListItem(NULL, item);
  486. tail = tail->next;
  487. }
  488. else
  489. {
  490. ListItem * finger = head;
  491. while(item->querySchedulePriority() <= finger->next->item->querySchedulePriority())
  492. finger = finger->next;
  493. finger->next = new ListItem(finger->next, item);
  494. }
  495. }
  496. public:
  497. CCloneWorkflowItemArray(unsigned _capacity) : capacity(_capacity), head(NULL), tail(NULL)
  498. {
  499. array = _capacity ? new CCloneWorkflowItem[_capacity] : NULL;
  500. }
  501. ~CCloneWorkflowItemArray()
  502. {
  503. ListItem * finger = head;
  504. while(finger)
  505. {
  506. ListItem * del = finger;
  507. finger = finger->next;
  508. delete del;
  509. }
  510. if (array)
  511. delete [] array;
  512. }
  513. IMPLEMENT_IINTERFACE;
  514. virtual void addClone(IConstWorkflowItem const * other)
  515. {
  516. unsigned wfid = other->queryWfid();
  517. assertex((wfid > 0) && (wfid <= capacity));
  518. array[wfid-1].copy(other);
  519. insert(&array[wfid-1]);
  520. }
  521. virtual IRuntimeWorkflowItem & queryWfid(unsigned wfid)
  522. {
  523. assertex((wfid > 0) && (wfid <= capacity));
  524. return array[wfid-1];
  525. }
  526. virtual unsigned count() const
  527. {
  528. return capacity;
  529. }
  530. virtual IRuntimeWorkflowItemIterator * getSequenceIterator() { return new ListItemPtr(head); }
  531. virtual bool hasScheduling() const
  532. {
  533. ListItem * finger = head;
  534. while(finger)
  535. {
  536. if(!finger->item->isScheduledNow())
  537. return true;
  538. finger = finger->next;
  539. }
  540. return false;
  541. }
  542. private:
  543. unsigned capacity;
  544. CCloneWorkflowItem * array;
  545. ListItem * head;
  546. ListItem * tail;
  547. };
  548. //-------------------------------------------------------------------------------------------------
  549. #ifdef TRACE_WORKFLOW
  550. const LogMsgCategory MCworkflow = MCprogress(50); // Category used to inform enqueue/start/finish of workflow item
  551. #endif
  552. WorkflowMachine::WorkflowMachine()
  553. : ctx(NULL), process(NULL), currentWfid(0), currentScheduledWfid(0), itemsWaiting(0), itemsUnblocked(0), condition(false), logctx(queryDummyContextLogger())
  554. {
  555. }
  556. WorkflowMachine::WorkflowMachine(const IContextLogger &_logctx)
  557. : ctx(NULL), process(NULL), currentWfid(0), currentScheduledWfid(0), itemsWaiting(0), itemsUnblocked(0), condition(false), logctx(_logctx)
  558. {
  559. }
  560. void WorkflowMachine::perform(IGlobalCodeContext *_ctx, IEclProcess *_process)
  561. {
  562. ctx = _ctx;
  563. process = _process;
  564. Owned<WorkflowException> error;
  565. begin();
  566. bool scheduling = workflow->hasScheduling();
  567. if(scheduling)
  568. schedulingStart();
  569. bool more = false;
  570. do
  571. {
  572. Owned<IRuntimeWorkflowItem> item;
  573. Owned<IRuntimeWorkflowItemIterator> iter = workflow->getSequenceIterator();
  574. itemsWaiting = 0;
  575. itemsUnblocked = 0;
  576. if (iter->first())
  577. {
  578. while (iter->isValid())
  579. {
  580. try
  581. {
  582. item.setown(iter->get());
  583. switch(item->queryState())
  584. {
  585. case WFStateReqd:
  586. case WFStateFail:
  587. if(!error)
  588. {
  589. unsigned wfid = item->queryWfid();
  590. executeItem(wfid, wfid);
  591. }
  592. break;
  593. }
  594. }
  595. catch(WorkflowException * e)
  596. {
  597. error.setown(e);
  598. }
  599. if(item->queryState() == WFStateWait) itemsWaiting++;
  600. if(error) break; //MORE: will not want to break in situations where there might be pending contingency clauses
  601. if(scheduling && schedulingPull())
  602. {
  603. itemsWaiting = 0;
  604. iter.setown(workflow->getSequenceIterator());
  605. if(!iter->first()) break;
  606. }
  607. else
  608. if(!iter->next()) break;
  609. }
  610. }
  611. if(error) break; //MORE: will not want to break in situations where there might be pending contingency clauses
  612. if(scheduling)
  613. more = schedulingPullStop();
  614. } while(more || itemsUnblocked);
  615. end();
  616. if(error)
  617. throw error.getLink();
  618. }
  619. bool WorkflowMachine::executeItem(unsigned wfid, unsigned scheduledWfid)
  620. {
  621. #ifdef TRACE_WORKFLOW
  622. LOG(MCworkflow, "Beginning workflow item %u", wfid);
  623. #endif
  624. IRuntimeWorkflowItem & item = workflow->queryWfid(wfid);
  625. switch(item.queryState())
  626. {
  627. case WFStateDone:
  628. case WFStateSkip:
  629. #ifdef TRACE_WORKFLOW
  630. LOG(MCworkflow, "Nothing to be done for workflow item %u", wfid);
  631. #endif
  632. return true;
  633. case WFStateWait:
  634. throw new WorkflowException(0, "INTERNAL ERROR: attempting to execute workflow item in wait state", wfid, WorkflowException::SYSTEM, MSGAUD_user);
  635. case WFStateBlocked:
  636. throw new WorkflowException(0, "INTERNAL ERROR: attempting to execute workflow item in blocked state", wfid, WorkflowException::SYSTEM, MSGAUD_user);
  637. case WFStateFail:
  638. item.reset(); //fall through
  639. }
  640. switch(item.queryMode())
  641. {
  642. case WFModeNormal:
  643. case WFModeOnce:
  644. if (!doExecuteItemDependencies(item, wfid))
  645. return false;
  646. doExecuteItem(item, scheduledWfid);
  647. break;
  648. case WFModeCondition:
  649. if (!doExecuteConditionItem(item, scheduledWfid))
  650. return false;
  651. break;
  652. case WFModeSequential:
  653. case WFModeParallel:
  654. if (!doExecuteItemDependencies(item, scheduledWfid))
  655. return false;
  656. break;
  657. case WFModePersist:
  658. doExecutePersistItem(item);
  659. break;
  660. case WFModeBeginWait:
  661. doExecuteBeginWaitItem(item, scheduledWfid);
  662. item.setState(WFStateDone);
  663. return false;
  664. case WFModeWait:
  665. doExecuteEndWaitItem(item);
  666. break;
  667. default:
  668. throwUnexpected();
  669. }
  670. switch(item.queryType())
  671. {
  672. case WFTypeNormal:
  673. if(item.isScheduled() && !item.isScheduledNow() && item.decAndTestScheduleCountRemaining())
  674. item.setState(WFStateWait);
  675. else
  676. item.setState(WFStateDone);
  677. break;
  678. case WFTypeSuccess:
  679. case WFTypeFailure:
  680. item.setState(WFStateNull);
  681. break;
  682. case WFTypeRecovery:
  683. item.setState(WFStateSkip);
  684. break;
  685. }
  686. if(item.querySuccess())
  687. {
  688. try
  689. {
  690. executeItem(item.querySuccess(), scheduledWfid);
  691. }
  692. catch(WorkflowException * ce)
  693. {
  694. if(ce->queryType() == WorkflowException::ABORT)
  695. throw;
  696. reportContingencyFailure("SUCCESS", ce);
  697. ce->Release();
  698. }
  699. }
  700. #ifdef TRACE_WORKFLOW
  701. LOG(MCworkflow, "Done workflow item %u", wfid);
  702. #endif
  703. return true;
  704. }
  705. bool WorkflowMachine::doExecuteItemDependencies(IRuntimeWorkflowItem & item, unsigned scheduledWfid)
  706. {
  707. Owned<IWorkflowDependencyIterator> iter = item.getDependencies();
  708. for(iter->first(); iter->isValid(); iter->next())
  709. {
  710. if (!doExecuteItemDependency(item, iter->query(), scheduledWfid))
  711. return false;
  712. }
  713. return true;
  714. }
  715. bool WorkflowMachine::doExecuteItemDependency(IRuntimeWorkflowItem & item, unsigned dep, unsigned scheduledWfid)
  716. {
  717. try
  718. {
  719. return executeItem(dep, scheduledWfid);
  720. }
  721. catch(WorkflowException * e)
  722. {
  723. if(e->queryType() == WorkflowException::ABORT)
  724. throw;
  725. if(!attemptRetry(item, dep, scheduledWfid))
  726. {
  727. handleFailure(item, e, true);
  728. throw;
  729. }
  730. e->Release();
  731. }
  732. return true;//more!
  733. }
  734. void WorkflowMachine::doExecuteItem(IRuntimeWorkflowItem & item, unsigned scheduledWfid)
  735. {
  736. try
  737. {
  738. performItem(item.queryWfid(), scheduledWfid);
  739. }
  740. catch(WorkflowException * ein)
  741. {
  742. if(ein->queryType() == WorkflowException::ABORT)
  743. throw;
  744. if(!attemptRetry(item, 0, scheduledWfid))
  745. {
  746. handleFailure(item, ein, true);
  747. throw;
  748. }
  749. ein->Release();
  750. }
  751. catch(IException * ein)
  752. {
  753. checkForAbort(item.queryWfid(), ein);
  754. if(!attemptRetry(item, 0, scheduledWfid))
  755. {
  756. StringBuffer msg;
  757. ein->errorMessage(msg);
  758. WorkflowException::Type type = ((dynamic_cast<IUserException *>(ein) != NULL) ? WorkflowException::USER : WorkflowException::SYSTEM);
  759. WorkflowException * eout = new WorkflowException(ein->errorCode(), msg.str(), item.queryWfid(), type, ein->errorAudience());
  760. ein->Release();
  761. handleFailure(item, eout, false);
  762. throw eout;
  763. }
  764. ein->Release();
  765. }
  766. }
  767. bool WorkflowMachine::doExecuteConditionItem(IRuntimeWorkflowItem & item, unsigned scheduledWfid)
  768. {
  769. Owned<IWorkflowDependencyIterator> iter = item.getDependencies();
  770. if(!iter->first()) throwUnexpected();
  771. unsigned wfidCondition = iter->query();
  772. if(!iter->next()) throwUnexpected();
  773. unsigned wfidTrue = iter->query();
  774. unsigned wfidFalse = 0;
  775. if(iter->next()) wfidFalse = iter->query();
  776. if(iter->next()) throwUnexpected();
  777. if (!doExecuteItemDependency(item, wfidCondition, scheduledWfid))
  778. return false;
  779. if(condition)
  780. return doExecuteItemDependency(item, wfidTrue, scheduledWfid);
  781. else if (wfidFalse)
  782. return doExecuteItemDependency(item, wfidFalse, scheduledWfid);
  783. return true;
  784. }
  785. void WorkflowMachine::doExecuteBeginWaitItem(IRuntimeWorkflowItem & item, unsigned scheduledWfid)
  786. {
  787. #ifdef TRACE_WORKFLOW
  788. LOG(MCworkflow, "Begin wait for workflow item %u sched %u", item.queryWfid(), scheduledWfid);
  789. #endif
  790. //Block execution of the currently executing scheduled item
  791. IRuntimeWorkflowItem & scheduledItem = workflow->queryWfid(scheduledWfid);
  792. assertex(scheduledItem.queryState() == WFStateReqd);
  793. scheduledItem.setState(WFStateBlocked);
  794. //And increment the count on the wait wf item so it becomes active
  795. Owned<IWorkflowDependencyIterator> iter = item.getDependencies();
  796. if(!iter->first()) throwUnexpected();
  797. unsigned waitWfid = iter->query();
  798. if(iter->next()) throwUnexpected();
  799. IRuntimeWorkflowItem & waitItem = workflow->queryWfid(waitWfid);
  800. assertex(waitItem.queryState() == WFStateDone);
  801. waitItem.incScheduleCount();
  802. waitItem.setState(WFStateWait);
  803. itemsWaiting++;
  804. }
  805. void WorkflowMachine::doExecuteEndWaitItem(IRuntimeWorkflowItem & item)
  806. {
  807. //Unblock the scheduled workflow item, which should mean execution continues.
  808. unsigned scheduledWfid = item.queryScheduledWfid();
  809. #ifdef TRACE_WORKFLOW
  810. LOG(MCworkflow, "Finished wait for workflow sched %u", scheduledWfid);
  811. #endif
  812. IRuntimeWorkflowItem & scheduledItem = workflow->queryWfid(scheduledWfid);
  813. assertex(scheduledItem.queryState() == WFStateBlocked);
  814. scheduledItem.setState(WFStateReqd);
  815. itemsUnblocked++;
  816. //Note this would be more efficient implemented more like a state machine
  817. //(with next processing rather than walking from the top down),
  818. //but that will require some more work.
  819. }
  820. void WorkflowMachine::performItem(unsigned wfid, unsigned scheduledWfid)
  821. {
  822. #ifdef TRACE_WORKFLOW
  823. if(currentWfid)
  824. LOG(MCworkflow, "Branching from workflow item %u", currentWfid);
  825. LOG(MCworkflow, "Performing workflow item %u", wfid);
  826. #endif
  827. wfidStack.append(currentWfid);
  828. wfidStack.append(scheduledWfid);
  829. currentWfid = wfid;
  830. currentScheduledWfid = scheduledWfid;
  831. process->perform(ctx, wfid);
  832. scheduledWfid = wfidStack.pop();
  833. currentWfid = wfidStack.pop();
  834. if(currentWfid)
  835. {
  836. #ifdef TRACE_WORKFLOW
  837. LOG(MCworkflow, "Returning to workflow item %u", currentWfid);
  838. #endif
  839. }
  840. }
  841. bool WorkflowMachine::attemptRetry(IRuntimeWorkflowItem & item, unsigned dep, unsigned scheduledWfid)
  842. {
  843. unsigned wfid = item.queryWfid();
  844. unsigned recovery = item.queryRecovery();
  845. if(!recovery)
  846. return false;
  847. while(item.testAndDecRetries())
  848. {
  849. bool okay = true;
  850. try
  851. {
  852. workflow->queryWfid(recovery).setState(WFStateNull);
  853. executeItem(recovery, recovery);
  854. if(dep)
  855. executeItem(dep, scheduledWfid);
  856. else
  857. performItem(wfid, scheduledWfid);
  858. }
  859. catch(WorkflowException * ce)
  860. {
  861. okay = false;
  862. if(ce->queryType() == WorkflowException::ABORT)
  863. throw;
  864. reportContingencyFailure("RECOVERY", ce);
  865. ce->Release();
  866. }
  867. catch(IException * ce)
  868. {
  869. okay = false;
  870. checkForAbort(wfid, ce);
  871. reportContingencyFailure("RECOVERY", ce);
  872. ce->Release();
  873. }
  874. if(okay)
  875. return true;
  876. }
  877. return false;
  878. }
  879. void WorkflowMachine::handleFailure(IRuntimeWorkflowItem & item, WorkflowException const * e, bool isDep)
  880. {
  881. StringBuffer msg;
  882. e->errorMessage(msg).append(" (in item ").append(e->queryWfid()).append(")");
  883. if(isDep)
  884. logctx.logOperatorException(NULL, NULL, 0, "Dependency failure for workflow item %u: %d: %s", item.queryWfid(), e->errorCode(), msg.str());
  885. else
  886. logctx.logOperatorException(NULL, NULL, 0, "%d: %s", e->errorCode(), msg.str());
  887. item.setFailInfo(e->errorCode(), msg.str());
  888. switch(item.queryType())
  889. {
  890. case WFTypeNormal:
  891. item.setState(WFStateFail);
  892. break;
  893. case WFTypeSuccess:
  894. case WFTypeFailure:
  895. item.setState(WFStateNull);
  896. break;
  897. case WFTypeRecovery:
  898. item.setState(WFStateSkip);
  899. break;
  900. }
  901. unsigned failureWfid = item.queryFailure();
  902. if(failureWfid)
  903. {
  904. try
  905. {
  906. executeItem(failureWfid, failureWfid);
  907. }
  908. catch(WorkflowException * ce)
  909. {
  910. if(ce->queryType() == WorkflowException::ABORT)
  911. throw;
  912. reportContingencyFailure("FAILURE", ce);
  913. ce->Release();
  914. }
  915. }
  916. }
  917. int WorkflowMachine::queryLastFailCode() const
  918. {
  919. unsigned wfidFor = workflow->queryWfid(currentWfid).queryContingencyFor();
  920. if(!wfidFor)
  921. return 0;
  922. return workflow->queryWfid(wfidFor).queryFailCode();
  923. }
  924. char const * WorkflowMachine::queryLastFailMessage() const
  925. {
  926. unsigned wfidFor = workflow->queryWfid(currentWfid).queryContingencyFor();
  927. if(!wfidFor)
  928. return "";
  929. char const * ret = workflow->queryWfid(wfidFor).queryFailMessage();
  930. return ret ? ret : "";
  931. }
  932. const char * WorkflowMachine::queryEventName() const
  933. {
  934. //MORE: This doesn't work so well once we've done SEQUENTIAL transforms if they split a wf item into 2
  935. return workflow->queryWfid(currentWfid).queryEventName();
  936. }
  937. const char * WorkflowMachine::queryEventExtra() const
  938. {
  939. //MORE: This doesn't work so well once we've done SEQUENTIAL transforms if they split a wf item into 2
  940. return workflow->queryWfid(currentWfid).queryEventExtra();
  941. }
  942. ECLRTL_API IWorkflowItemIterator *createWorkflowItemIterator(IPropertyTree *p)
  943. {
  944. return new CWorkflowItemIterator(p);
  945. }
  946. ECLRTL_API IWorkflowItemArray *createWorkflowItemArray(unsigned size)
  947. {
  948. return new CCloneWorkflowItemArray(size);
  949. }
  950. ECLRTL_API IWorkflowItem *createWorkflowItem(IPropertyTree * ptree, unsigned wfid, WFType type, WFMode mode, unsigned success, unsigned failure, unsigned recovery, unsigned retriesAllowed, unsigned contingencyFor)
  951. {
  952. return new CWorkflowItem(ptree, wfid, type, mode, success, failure, recovery, retriesAllowed, contingencyFor);
  953. }