workflow.cpp 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "workunit.hpp"
  15. #include "jptree.hpp"
  16. #include "jlog.hpp"
  17. #include "jregexp.hpp"
  18. #include "workflow.hpp"
  19. //------------------------------------------------------------------------------------------
  20. // Workflow
  21. mapEnums wftypes[] =
  22. {
  23. { WFTypeNormal, "normal" },
  24. { WFTypeSuccess, "success" },
  25. { WFTypeFailure, "failure" },
  26. { WFTypeRecovery, "recovery" },
  27. { WFTypeWait, "wait" },
  28. { WFTypeSize, NULL }
  29. };
  30. mapEnums wfmodes[] =
  31. {
  32. { WFModeNormal, "normal" },
  33. { WFModeCondition, "condition" },
  34. { WFModeSequential, "sequential" },
  35. { WFModeParallel, "parallel" },
  36. { WFModePersist, "persist" },
  37. { WFModeBeginWait, "bwait" },
  38. { WFModeWait, "wait" },
  39. { WFModeOnce, "once" },
  40. { WFModeCritical, "critical" },
  41. { WFModeSize, NULL}
  42. };
  43. mapEnums wfstates[] =
  44. {
  45. { WFStateNull, "null" },
  46. { WFStateReqd, "reqd" },
  47. { WFStateDone, "done" },
  48. { WFStateFail, "fail" },
  49. { WFStateSkip, "skip" },
  50. { WFStateWait, "wait" },
  51. { WFStateBlocked, "block" },
  52. { WFStateSize, NULL }
  53. };
  54. static void setEnum(IPropertyTree *p, const char *propname, int value, mapEnums *map)
  55. {
  56. const char *defval = map->str;
  57. while (map->str)
  58. {
  59. if (value==map->val)
  60. {
  61. p->setProp(propname, map->str);
  62. return;
  63. }
  64. map++;
  65. }
  66. assertex(!"Unexpected value in setEnum");
  67. p->setProp(propname, defval);
  68. }
  69. static int getEnum(IPropertyTree *p, const char *propname, mapEnums *map)
  70. {
  71. const char *v = p->queryProp(propname);
  72. if (v)
  73. {
  74. while (map->str)
  75. {
  76. if (stricmp(v, map->str)==0)
  77. return map->val;
  78. map++;
  79. }
  80. assertex(!"Unexpected value in getEnum");
  81. }
  82. return 0;
  83. }
  84. class CWorkflowDependencyIterator : implements IWorkflowDependencyIterator, public CInterface
  85. {
  86. public:
  87. CWorkflowDependencyIterator(IPropertyTree * tree) { iter.setown(tree->getElements("Dependency")); }
  88. IMPLEMENT_IINTERFACE;
  89. bool first() { return iter->first(); }
  90. bool isValid() { return iter->isValid(); }
  91. bool next() { return iter->next(); }
  92. unsigned query() const { return iter->query().getPropInt("@wfid"); }
  93. private:
  94. Owned<IPropertyTreeIterator> iter;
  95. };
  96. class CWorkflowEvent : public CInterface, implements IWorkflowEvent
  97. {
  98. public:
  99. CWorkflowEvent(char const * _name, char const * _text) : name(_name), text(_text) {}
  100. IMPLEMENT_IINTERFACE;
  101. virtual char const * queryName() const { return name.get(); }
  102. virtual char const * queryText() const { return text.get(); }
  103. virtual bool matches(char const * trialName, char const * trialText) const { return((strcmp(trialName, name.get()) == 0) && WildMatch(trialText, text.get(), true)); }
  104. private:
  105. StringAttr name;
  106. StringAttr text;
  107. };
  108. class CWorkflowItem : implements IWorkflowItem, public CInterface
  109. {
  110. public:
  111. CWorkflowItem(IPropertyTree & _tree) { tree.setown(&_tree); }
  112. CWorkflowItem(IPropertyTree * ptree, unsigned wfid, WFType type, WFMode mode, unsigned success, unsigned failure, unsigned recovery, unsigned retriesAllowed, unsigned contingencyFor)
  113. {
  114. tree.setown(LINK(ptree->addPropTree("Item", createPTree())));
  115. tree->setPropInt("@wfid", wfid);
  116. setEnum(tree, "@type", type, wftypes);
  117. setEnum(tree, "@mode", mode, wfmodes);
  118. if(success) tree->setPropInt("@success", success);
  119. if(failure) tree->setPropInt("@failure", failure);
  120. if(recovery && retriesAllowed)
  121. {
  122. tree->setPropInt("@recovery", recovery);
  123. tree->setPropInt("@retriesAllowed", retriesAllowed);
  124. tree->addPropTree("Dependency", createPTree())->setPropInt("@wfid", recovery);
  125. }
  126. if(contingencyFor) tree->setPropInt("@contingencyFor", contingencyFor);
  127. reset();
  128. }
  129. IMPLEMENT_IINTERFACE;
  130. //info set at compile time
  131. virtual unsigned queryWfid() const { return tree->getPropInt("@wfid"); }
  132. virtual bool isScheduled() const { return tree->hasProp("Schedule"); }
  133. virtual bool isScheduledNow() const { return (tree->hasProp("Schedule") && !tree->hasProp("Schedule/Event")); }
  134. virtual IWorkflowEvent * getScheduleEvent() const { if(tree->hasProp("Schedule/Event")) return new CWorkflowEvent(tree->queryProp("Schedule/Event/@name"), tree->queryProp("Schedule/Event/@text")); else return NULL; }
  135. virtual unsigned querySchedulePriority() const { return (tree->hasProp("Schedule") ? tree->getPropInt("Schedule/@priority", 0) : 0); }
  136. virtual bool hasScheduleCount() const { return tree->hasProp("Schedule/@count"); }
  137. virtual unsigned queryScheduleCount() const { assertex(tree->hasProp("Schedule/@count")); return tree->getPropInt("Schedule/@count"); }
  138. virtual IWorkflowDependencyIterator * getDependencies() const { return new CWorkflowDependencyIterator(tree); }
  139. virtual WFType queryType() const { return static_cast<WFType>(getEnum(tree, "@type", wftypes)); }
  140. virtual WFMode queryMode() const { return static_cast<WFMode>(getEnum(tree, "@mode", wfmodes)); }
  141. virtual unsigned querySuccess() const { return tree->getPropInt("@success", 0); }
  142. virtual unsigned queryFailure() const { return tree->getPropInt("@failure", 0); }
  143. virtual unsigned queryRecovery() const { return tree->getPropInt("@recovery", 0); }
  144. virtual unsigned queryRetriesAllowed() const { return tree->getPropInt("@retriesAllowed", 0); }
  145. virtual unsigned queryContingencyFor() const { return tree->getPropInt("@contingencyFor", 0); }
  146. virtual IStringVal & getPersistName(IStringVal & val) const { val.set(tree->queryProp("@persistName")); return val; }
  147. virtual unsigned queryPersistWfid() const { return tree->getPropInt("@persistWfid", 0); }
  148. virtual int queryPersistCopies() const { return tree->getPropInt("@persistCopies", 0); }
  149. virtual bool queryPersistRefresh() const { return tree->getPropBool("@persistRefresh", true); }
  150. virtual IStringVal & getCriticalName(IStringVal & val) const { val.set(tree->queryProp("@criticalName")); return val; }
  151. virtual IStringVal & queryCluster(IStringVal & val) const { val.set(tree->queryProp("@cluster")); return val; }
  152. virtual void setScheduledNow() { tree->setPropTree("Schedule", createPTree()); setEnum(tree, "@state", WFStateReqd, wfstates); }
  153. virtual void setScheduledOn(char const * name, char const * text) { IPropertyTree * stree = createPTree(); stree->setProp("@name", name); stree->setProp("@text", text); tree->setPropTree("Schedule", createPTree())->setPropTree("Event", stree); setEnum(tree, "@state", WFStateWait, wfstates); }
  154. virtual void setSchedulePriority(unsigned priority) { assertex(tree->hasProp("Schedule")); tree->setPropInt("Schedule/@priority", priority); }
  155. virtual void setScheduleCount(unsigned count) { assertex(tree->hasProp("Schedule")); tree->setPropInt("Schedule/@count", count); tree->setPropInt("Schedule/@countRemaining", count); }
  156. virtual void addDependency(unsigned wfid) { tree->addPropTree("Dependency", createPTree())->setPropInt("@wfid", wfid); }
  157. virtual void setPersistInfo(char const * name, unsigned wfid, int numPersistInstances, bool refresh)
  158. {
  159. tree->setProp("@persistName", name);
  160. tree->setPropInt("@persistWfid", wfid);
  161. if (numPersistInstances != 0)
  162. tree->setPropInt("@persistCopies", (int)numPersistInstances);
  163. tree->setPropBool("@persistRefresh", refresh);
  164. }
  165. virtual void setCriticalInfo(char const * name) { tree->setProp("@criticalName", name);}
  166. virtual void setCluster(const char * cluster) { tree->setProp("@cluster", cluster); }
  167. //info set at run time
  168. virtual unsigned queryScheduleCountRemaining() const { assertex(tree->hasProp("Schedule")); return tree->getPropInt("Schedule/@countRemaining"); }
  169. virtual WFState queryState() const { return static_cast<WFState>(getEnum(tree, "@state", wfstates)); }
  170. virtual unsigned queryRetriesRemaining() const { return tree->getPropInt("@retriesRemaining"); }
  171. virtual int queryFailCode() const { return tree->getPropInt("@failcode"); }
  172. virtual char const * queryFailMessage() const { return tree->queryProp("@failmsg"); }
  173. virtual char const * queryEventName() const { return tree->queryProp("@eventname"); }
  174. virtual char const * queryEventExtra() const { return tree->queryProp("@eventextra"); }
  175. virtual void setState(WFState state) { setEnum(tree, "@state", state, wfstates); }
  176. virtual unsigned queryScheduledWfid() const { return tree->getPropInt("@swfid", 0); }
  177. virtual void setScheduledWfid(unsigned wfid) { tree->setPropInt("@swfid", wfid); }
  178. virtual bool testAndDecRetries()
  179. {
  180. assertex(tree->hasProp("@retriesAllowed"));
  181. unsigned rem = tree->getPropInt("@retriesRemaining", 0);
  182. if(rem==0)
  183. return false;
  184. tree->setPropInt("@retriesRemaining", rem-1);
  185. return true;
  186. }
  187. virtual bool decAndTestScheduleCountRemaining()
  188. {
  189. if(!tree->hasProp("Schedule/@count"))
  190. return true;
  191. unsigned rem = tree->getPropInt("Schedule/@countRemaining");
  192. assertex(rem>0);
  193. tree->setPropInt("Schedule/@countRemaining", rem-1);
  194. return (rem>1);
  195. }
  196. virtual void incScheduleCount()
  197. {
  198. unsigned rem = tree->getPropInt("Schedule/@countRemaining");
  199. tree->setPropInt("Schedule/@countRemaining", rem+1);
  200. }
  201. virtual void setFailInfo(int code, char const * message)
  202. {
  203. tree->setPropInt("@failcode", code);
  204. tree->setProp("@failmsg", message);
  205. }
  206. virtual void setEvent(const char * name, const char * extra)
  207. {
  208. if (name)
  209. tree->setProp("@eventname", name);
  210. if (extra)
  211. tree->setProp("@eventextra", extra);
  212. }
  213. virtual void reset()
  214. {
  215. if(tree->hasProp("@retriesAllowed"))
  216. tree->setPropInt("@retriesRemaining", tree->getPropInt("@retriesAllowed"));
  217. if(tree->hasProp("Schedule/@count"))
  218. tree->setPropInt("Schedule/@countRemaining", tree->getPropInt("Schedule/@count"));
  219. tree->removeProp("@failcode");
  220. tree->removeProp("@failmsg");
  221. tree->removeProp("@eventname");
  222. tree->removeProp("@eventtext");
  223. if(isScheduled())
  224. {
  225. if(isScheduledNow())
  226. setState(WFStateReqd);
  227. else if (hasScheduleCount() && (queryScheduleCountRemaining() == 0))
  228. setState(WFStateDone);
  229. else
  230. setState(WFStateWait);
  231. }
  232. else if(queryType() == WFTypeRecovery)
  233. setState(WFStateSkip);
  234. else
  235. setState(WFStateNull);
  236. }
  237. virtual void syncRuntimeData(IConstWorkflowItem const & other)
  238. {
  239. WFState state = other.queryState();
  240. setState(state);
  241. if(tree->hasProp("@retriesAllowed"))
  242. tree->setPropInt("@retriesRemaining", other.queryRetriesRemaining());
  243. if(tree->hasProp("Schedule/@count"))
  244. tree->setPropInt("Schedule/@countRemaining", other.queryScheduleCountRemaining());
  245. if(state == WFStateFail)
  246. {
  247. tree->setPropInt("@failcode", other.queryFailCode());
  248. tree->setProp("@failmsg", other.queryFailMessage());
  249. }
  250. setEvent(other.queryEventName(), other.queryEventExtra());
  251. }
  252. private:
  253. Owned<IPropertyTree> tree;
  254. };
  255. class CCloneWorkflowItem : public CInterface, implements IRuntimeWorkflowItem
  256. {
  257. private:
  258. class CCloneSchedule : public CInterface
  259. {
  260. private:
  261. bool now;
  262. unsigned priority;
  263. bool counting;
  264. unsigned count;
  265. unsigned countRemaining;
  266. Owned<IWorkflowEvent> event;
  267. public:
  268. CCloneSchedule(IConstWorkflowItem const * other)
  269. {
  270. now = other->isScheduledNow();
  271. priority = other->querySchedulePriority();
  272. counting = other->hasScheduleCount();
  273. if(counting)
  274. {
  275. count = other->queryScheduleCount();
  276. countRemaining = other->queryScheduleCountRemaining();
  277. }
  278. else
  279. {
  280. count = 0;
  281. countRemaining = 0;
  282. }
  283. event.setown(other->getScheduleEvent());
  284. }
  285. bool isNow() const { return now; }
  286. unsigned queryPriority() const { return priority; }
  287. bool hasCount() const { return counting; }
  288. unsigned queryCount() const { return count; }
  289. unsigned queryCountRemaining() const { return countRemaining; }
  290. bool decAndTestCountRemaining()
  291. {
  292. if(!counting)
  293. return true;
  294. if(countRemaining)
  295. countRemaining--;
  296. return (countRemaining>0);
  297. }
  298. void incCountRemaining()
  299. {
  300. if(counting)
  301. countRemaining++;
  302. }
  303. void resetCount() { if(counting) countRemaining = count; }
  304. IWorkflowEvent * getEvent() const { return event.getLink(); }
  305. };
  306. class CCloneIterator : public CInterface, public IWorkflowDependencyIterator
  307. {
  308. public:
  309. CCloneIterator(IntArray const & _array) : array(_array), idx(0) {}
  310. IMPLEMENT_IINTERFACE;
  311. virtual bool first() { idx = 0; return isValid(); }
  312. virtual bool isValid() { return array.isItem(idx); }
  313. virtual bool next() { idx++; return isValid(); }
  314. virtual unsigned query() const { return array.item(idx); }
  315. private:
  316. IntArray const & array;
  317. aindex_t idx;
  318. };
  319. unsigned wfid;
  320. Owned<CCloneSchedule> schedule;
  321. IntArray dependencies;
  322. WFType type;
  323. WFMode mode;
  324. unsigned success;
  325. unsigned failure;
  326. unsigned recovery;
  327. unsigned retriesAllowed;
  328. unsigned contingencyFor;
  329. unsigned scheduledWfid;
  330. WFState state;
  331. unsigned retriesRemaining;
  332. int failcode;
  333. StringAttr failmsg;
  334. SCMStringBuffer persistName;
  335. SCMStringBuffer clusterName;
  336. unsigned persistWfid;
  337. int persistCopies;
  338. bool persistRefresh;
  339. SCMStringBuffer criticalName;
  340. StringAttr eventName;
  341. StringAttr eventExtra;
  342. public:
  343. CCloneWorkflowItem() : persistRefresh(true) {}
  344. IMPLEMENT_IINTERFACE;
  345. void copy(IConstWorkflowItem const * other)
  346. {
  347. wfid = other->queryWfid();
  348. if(other->isScheduled())
  349. schedule.setown(new CCloneSchedule(other));
  350. Owned<IWorkflowDependencyIterator> iter = other->getDependencies();
  351. for(iter->first(); iter->isValid(); iter->next())
  352. dependencies.append(iter->query());
  353. type = other->queryType();
  354. mode = other->queryMode();
  355. success = other->querySuccess();
  356. failure = other->queryFailure();
  357. recovery = other->queryRecovery();
  358. retriesAllowed = other->queryRetriesAllowed();
  359. contingencyFor = other->queryContingencyFor();
  360. state = other->queryState();
  361. retriesRemaining = other->queryRetriesRemaining();
  362. if(state == WFStateFail)
  363. {
  364. failcode = other->queryFailCode();
  365. failmsg.set(other->queryFailMessage());
  366. }
  367. eventName.set(other->queryEventName());
  368. eventExtra.set(other->queryEventExtra());
  369. other->getPersistName(persistName);
  370. persistWfid = other->queryPersistWfid();
  371. scheduledWfid = other->queryScheduledWfid();
  372. persistCopies = other->queryPersistCopies();
  373. persistRefresh = other->queryPersistRefresh();
  374. other->getCriticalName(criticalName);
  375. other->queryCluster(clusterName);
  376. }
  377. //info set at compile time
  378. virtual unsigned queryWfid() const { return wfid; }
  379. virtual bool isScheduled() const { return schedule.get() != 0; }
  380. virtual bool isScheduledNow() const { return schedule && schedule->isNow(); }
  381. virtual IWorkflowEvent * getScheduleEvent() const { if(schedule) return schedule->getEvent(); else return NULL; }
  382. virtual unsigned querySchedulePriority() const { return schedule ? schedule->queryPriority() : 0; }
  383. virtual bool hasScheduleCount() const { return schedule ? schedule->hasCount() : false; }
  384. virtual unsigned queryScheduleCount() const { return schedule ? schedule->queryCount() : 0; }
  385. virtual IWorkflowDependencyIterator * getDependencies() const { return new CCloneIterator(dependencies); }
  386. virtual WFType queryType() const { return type; }
  387. virtual WFMode queryMode() const { return mode; }
  388. virtual unsigned querySuccess() const { return success; }
  389. virtual unsigned queryFailure() const { return failure; }
  390. virtual unsigned queryRecovery() const { return recovery; }
  391. virtual unsigned queryRetriesAllowed() const { return retriesAllowed; }
  392. virtual unsigned queryContingencyFor() const { return contingencyFor; }
  393. virtual IStringVal & getPersistName(IStringVal & val) const { val.set(persistName.str()); return val; }
  394. virtual unsigned queryPersistWfid() const { return persistWfid; }
  395. virtual int queryPersistCopies() const { return persistCopies; }
  396. virtual bool queryPersistRefresh() const { return persistRefresh; }
  397. virtual IStringVal & getCriticalName(IStringVal & val) const { val.set(criticalName.str()); return val; }
  398. virtual IStringVal & queryCluster(IStringVal & val) const { val.set(clusterName.str()); return val; }
  399. //info set at run time
  400. virtual unsigned queryScheduleCountRemaining() const { return schedule ? schedule->queryCountRemaining() : 0; }
  401. virtual WFState queryState() const { return state; }
  402. virtual unsigned queryRetriesRemaining() const { return retriesRemaining; }
  403. virtual int queryFailCode() const { return failcode; }
  404. virtual char const * queryFailMessage() const { return failmsg.get(); }
  405. virtual char const * queryEventName() const { return eventName; }
  406. virtual char const * queryEventExtra() const { return eventExtra; }
  407. virtual unsigned queryScheduledWfid() const { return scheduledWfid; }
  408. virtual void setState(WFState _state) { state = _state; }
  409. virtual bool testAndDecRetries()
  410. {
  411. if(retriesRemaining == 0)
  412. return false;
  413. retriesRemaining--;
  414. return true;
  415. }
  416. virtual bool decAndTestScheduleCountRemaining()
  417. {
  418. if(!schedule)
  419. return true;
  420. return schedule->decAndTestCountRemaining();
  421. }
  422. virtual void incScheduleCount()
  423. {
  424. if(schedule)
  425. schedule->incCountRemaining();
  426. }
  427. virtual void setFailInfo(int code, char const * message)
  428. {
  429. failcode = code;
  430. failmsg.set(message);
  431. }
  432. virtual void setEvent(const char * name, const char * extra)
  433. {
  434. eventName.set(name);
  435. eventExtra.set(extra);
  436. }
  437. virtual void reset()
  438. {
  439. retriesRemaining = retriesAllowed;
  440. if(schedule) schedule->resetCount();
  441. if(isScheduled())
  442. {
  443. if(isScheduledNow())
  444. setState(WFStateReqd);
  445. else if (hasScheduleCount() && (queryScheduleCountRemaining() == 0))
  446. setState(WFStateDone);
  447. else
  448. setState(WFStateWait);
  449. }
  450. else if(queryType() == WFTypeRecovery)
  451. setState(WFStateSkip);
  452. else
  453. setState(WFStateNull);
  454. }
  455. };
  456. class CWorkflowItemIterator : public CInterface, implements IWorkflowItemIterator
  457. {
  458. public:
  459. CWorkflowItemIterator(IPropertyTree * tree) { iter.setown(tree->getElements("Item")); }
  460. IMPLEMENT_IINTERFACE;
  461. bool first() { item.clear(); return iter->first(); }
  462. bool isValid() { return iter->isValid(); }
  463. bool next() { item.clear(); return iter->next(); }
  464. IConstWorkflowItem * query() const { if(!item) item.setown(new CWorkflowItem(iter->get())); return item.get(); }
  465. IWorkflowItem * get() const { if(!item) item.setown(new CWorkflowItem(iter->get())); return item.getLink(); }
  466. private:
  467. Owned<IPropertyTreeIterator> iter;
  468. mutable Owned<CWorkflowItem> item;
  469. };
  470. class CCloneWorkflowItemArray : public CInterface, implements IWorkflowItemArray
  471. {
  472. private:
  473. class ListItem
  474. {
  475. public:
  476. ListItem(ListItem * _next, IRuntimeWorkflowItem * _item) : next(_next), item(_item) {}
  477. ListItem * next;
  478. IRuntimeWorkflowItem * item;
  479. };
  480. class ListItemPtr : public CInterface, implements IRuntimeWorkflowItemIterator
  481. {
  482. public:
  483. ListItemPtr(ListItem * _start) : start(_start) { ptr = NULL; }
  484. IMPLEMENT_IINTERFACE;
  485. virtual bool first() { ptr = start; return isValid(); }
  486. virtual bool isValid() { return ptr != NULL; }
  487. virtual bool next() { ptr = ptr->next; return isValid(); }
  488. virtual IConstWorkflowItem * query() const { return ptr->item; }
  489. virtual IRuntimeWorkflowItem * get() const { return LINK(ptr->item); }
  490. private:
  491. ListItem * start;
  492. ListItem * ptr;
  493. };
  494. void insert(CCloneWorkflowItem * item)
  495. {
  496. if(!item->isScheduled())
  497. return;
  498. if(!head)
  499. head = tail = new ListItem(NULL, item);
  500. else if(item->querySchedulePriority() > head->item->querySchedulePriority())
  501. head = new ListItem(head, item);
  502. else if(item->querySchedulePriority() <= tail->item->querySchedulePriority())
  503. {
  504. tail->next = new ListItem(NULL, item);
  505. tail = tail->next;
  506. }
  507. else
  508. {
  509. ListItem * finger = head;
  510. while(item->querySchedulePriority() <= finger->next->item->querySchedulePriority())
  511. finger = finger->next;
  512. finger->next = new ListItem(finger->next, item);
  513. }
  514. }
  515. public:
  516. CCloneWorkflowItemArray(unsigned _capacity) : capacity(_capacity), head(NULL), tail(NULL)
  517. {
  518. array = _capacity ? new CCloneWorkflowItem[_capacity] : NULL;
  519. }
  520. ~CCloneWorkflowItemArray()
  521. {
  522. ListItem * finger = head;
  523. while(finger)
  524. {
  525. ListItem * del = finger;
  526. finger = finger->next;
  527. delete del;
  528. }
  529. if (array)
  530. delete [] array;
  531. }
  532. IMPLEMENT_IINTERFACE;
  533. virtual void addClone(IConstWorkflowItem const * other)
  534. {
  535. unsigned wfid = other->queryWfid();
  536. assertex((wfid > 0) && (wfid <= capacity));
  537. array[wfid-1].copy(other);
  538. insert(&array[wfid-1]);
  539. }
  540. virtual IRuntimeWorkflowItem & queryWfid(unsigned wfid)
  541. {
  542. assertex((wfid > 0) && (wfid <= capacity));
  543. return array[wfid-1];
  544. }
  545. virtual unsigned count() const
  546. {
  547. return capacity;
  548. }
  549. virtual IRuntimeWorkflowItemIterator * getSequenceIterator() { return new ListItemPtr(head); }
  550. virtual bool hasScheduling() const
  551. {
  552. ListItem * finger = head;
  553. while(finger)
  554. {
  555. if(!finger->item->isScheduledNow())
  556. return true;
  557. finger = finger->next;
  558. }
  559. return false;
  560. }
  561. private:
  562. unsigned capacity;
  563. CCloneWorkflowItem * array;
  564. ListItem * head;
  565. ListItem * tail;
  566. };
  567. //-------------------------------------------------------------------------------------------------
  568. #ifdef TRACE_WORKFLOW
  569. const LogMsgCategory MCworkflow = MCprogress(50); // Category used to inform enqueue/start/finish of workflow item
  570. #endif
  571. WorkflowMachine::WorkflowMachine()
  572. : ctx(NULL), process(NULL), currentWfid(0), currentScheduledWfid(0), itemsWaiting(0), itemsUnblocked(0), condition(false), logctx(queryDummyContextLogger())
  573. {
  574. }
  575. WorkflowMachine::WorkflowMachine(const IContextLogger &_logctx)
  576. : ctx(NULL), process(NULL), currentWfid(0), currentScheduledWfid(0), itemsWaiting(0), itemsUnblocked(0), condition(false), logctx(_logctx)
  577. {
  578. }
  579. void WorkflowMachine::perform(IGlobalCodeContext *_ctx, IEclProcess *_process)
  580. {
  581. ctx = _ctx;
  582. process = _process;
  583. Owned<WorkflowException> error;
  584. begin();
  585. bool scheduling = workflow->hasScheduling();
  586. if(scheduling)
  587. schedulingStart();
  588. bool more = false;
  589. do
  590. {
  591. Owned<IRuntimeWorkflowItem> item;
  592. Owned<IRuntimeWorkflowItemIterator> iter = workflow->getSequenceIterator();
  593. itemsWaiting = 0;
  594. itemsUnblocked = 0;
  595. if (iter->first())
  596. {
  597. while (iter->isValid())
  598. {
  599. try
  600. {
  601. item.setown(iter->get());
  602. switch(item->queryState())
  603. {
  604. case WFStateReqd:
  605. case WFStateFail:
  606. if(!error)
  607. {
  608. unsigned wfid = item->queryWfid();
  609. executeItem(wfid, wfid);
  610. }
  611. break;
  612. }
  613. }
  614. catch(WorkflowException * e)
  615. {
  616. error.setown(e);
  617. }
  618. if(item->queryState() == WFStateWait) itemsWaiting++;
  619. if(error) break; //MORE: will not want to break in situations where there might be pending contingency clauses
  620. if(scheduling && schedulingPull())
  621. {
  622. itemsWaiting = 0;
  623. iter.setown(workflow->getSequenceIterator());
  624. if(!iter->first()) break;
  625. }
  626. else
  627. if(!iter->next()) break;
  628. }
  629. }
  630. if(error) break; //MORE: will not want to break in situations where there might be pending contingency clauses
  631. if(scheduling)
  632. more = schedulingPullStop();
  633. } while(more || itemsUnblocked);
  634. end();
  635. if(error)
  636. throw error.getLink();
  637. }
  638. bool WorkflowMachine::executeItem(unsigned wfid, unsigned scheduledWfid)
  639. {
  640. #ifdef TRACE_WORKFLOW
  641. LOG(MCworkflow, "Beginning workflow item %u", wfid);
  642. #endif
  643. IRuntimeWorkflowItem & item = workflow->queryWfid(wfid);
  644. switch(item.queryState())
  645. {
  646. case WFStateDone:
  647. if (item.queryMode() == WFModePersist)
  648. {
  649. #ifdef TRACE_WORKFLOW
  650. LOG(MCworkflow, "Recheck persist %u", wfid);
  651. #endif
  652. break;
  653. }
  654. #ifdef TRACE_WORKFLOW
  655. LOG(MCworkflow, "Nothing to be done for workflow item %u", wfid);
  656. #endif
  657. return true;
  658. case WFStateSkip:
  659. #ifdef TRACE_WORKFLOW
  660. LOG(MCworkflow, "Nothing to be done for workflow item %u", wfid);
  661. #endif
  662. return true;
  663. case WFStateWait:
  664. throw new WorkflowException(0, "INTERNAL ERROR: attempting to execute workflow item in wait state", wfid, WorkflowException::SYSTEM, MSGAUD_user);
  665. case WFStateBlocked:
  666. throw new WorkflowException(0, "INTERNAL ERROR: attempting to execute workflow item in blocked state", wfid, WorkflowException::SYSTEM, MSGAUD_user);
  667. case WFStateFail:
  668. item.reset();
  669. break;
  670. }
  671. switch(item.queryMode())
  672. {
  673. case WFModeNormal:
  674. case WFModeOnce:
  675. if (!doExecuteItemDependencies(item, wfid))
  676. return false;
  677. doExecuteItem(item, scheduledWfid);
  678. break;
  679. case WFModeCondition:
  680. if (!doExecuteConditionItem(item, scheduledWfid))
  681. return false;
  682. break;
  683. case WFModeSequential:
  684. case WFModeParallel:
  685. if (!doExecuteItemDependencies(item, scheduledWfid))
  686. return false;
  687. break;
  688. case WFModePersist:
  689. doExecutePersistItem(item);
  690. break;
  691. case WFModeCritical:
  692. doExecuteCriticalItem(item);
  693. break;
  694. case WFModeBeginWait:
  695. doExecuteBeginWaitItem(item, scheduledWfid);
  696. item.setState(WFStateDone);
  697. return false;
  698. case WFModeWait:
  699. doExecuteEndWaitItem(item);
  700. break;
  701. default:
  702. throwUnexpected();
  703. }
  704. switch(item.queryType())
  705. {
  706. case WFTypeNormal:
  707. if(item.isScheduled() && !item.isScheduledNow() && item.decAndTestScheduleCountRemaining())
  708. item.setState(WFStateWait);
  709. else
  710. item.setState(WFStateDone);
  711. break;
  712. case WFTypeSuccess:
  713. case WFTypeFailure:
  714. item.setState(WFStateNull);
  715. break;
  716. case WFTypeRecovery:
  717. item.setState(WFStateSkip);
  718. break;
  719. }
  720. if(item.querySuccess())
  721. {
  722. try
  723. {
  724. executeItem(item.querySuccess(), scheduledWfid);
  725. }
  726. catch(WorkflowException * ce)
  727. {
  728. if(ce->queryType() == WorkflowException::ABORT)
  729. throw;
  730. reportContingencyFailure("SUCCESS", ce);
  731. ce->Release();
  732. }
  733. }
  734. #ifdef TRACE_WORKFLOW
  735. LOG(MCworkflow, "Done workflow item %u", wfid);
  736. #endif
  737. return true;
  738. }
  739. bool WorkflowMachine::doExecuteItemDependencies(IRuntimeWorkflowItem & item, unsigned scheduledWfid)
  740. {
  741. Owned<IWorkflowDependencyIterator> iter = item.getDependencies();
  742. for(iter->first(); iter->isValid(); iter->next())
  743. {
  744. if (!doExecuteItemDependency(item, iter->query(), scheduledWfid, false))
  745. return false;
  746. }
  747. return true;
  748. }
  749. bool WorkflowMachine::doExecuteItemDependency(IRuntimeWorkflowItem & item, unsigned wfid, unsigned scheduledWfid, bool alwaysEvaluate)
  750. {
  751. try
  752. {
  753. if (alwaysEvaluate)
  754. workflow->queryWfid(wfid).setState(WFStateNull);
  755. return executeItem(wfid, scheduledWfid);
  756. }
  757. catch(WorkflowException * e)
  758. {
  759. if(e->queryType() == WorkflowException::ABORT)
  760. throw;
  761. if(!attemptRetry(item, wfid, scheduledWfid))
  762. {
  763. handleFailure(item, e, true);
  764. throw;
  765. }
  766. e->Release();
  767. }
  768. return true;//more!
  769. }
  770. void WorkflowMachine::doExecuteItem(IRuntimeWorkflowItem & item, unsigned scheduledWfid)
  771. {
  772. try
  773. {
  774. performItem(item.queryWfid(), scheduledWfid);
  775. }
  776. catch(WorkflowException * ein)
  777. {
  778. if(ein->queryType() == WorkflowException::ABORT)
  779. throw;
  780. if(!attemptRetry(item, 0, scheduledWfid))
  781. {
  782. handleFailure(item, ein, true);
  783. throw;
  784. }
  785. ein->Release();
  786. }
  787. catch(IException * ein)
  788. {
  789. checkForAbort(item.queryWfid(), ein);
  790. if(!attemptRetry(item, 0, scheduledWfid))
  791. {
  792. StringBuffer msg;
  793. ein->errorMessage(msg);
  794. WorkflowException::Type type = ((dynamic_cast<IUserException *>(ein) != NULL) ? WorkflowException::USER : WorkflowException::SYSTEM);
  795. WorkflowException * eout = new WorkflowException(ein->errorCode(), msg.str(), item.queryWfid(), type, ein->errorAudience());
  796. ein->Release();
  797. handleFailure(item, eout, false);
  798. throw eout;
  799. }
  800. ein->Release();
  801. }
  802. }
  803. bool WorkflowMachine::doExecuteConditionItem(IRuntimeWorkflowItem & item, unsigned scheduledWfid)
  804. {
  805. Owned<IWorkflowDependencyIterator> iter = item.getDependencies();
  806. if(!iter->first()) throwUnexpected();
  807. unsigned wfidCondition = iter->query();
  808. if(!iter->next()) throwUnexpected();
  809. unsigned wfidTrue = iter->query();
  810. unsigned wfidFalse = 0;
  811. if(iter->next()) wfidFalse = iter->query();
  812. if(iter->next()) throwUnexpected();
  813. if (!doExecuteItemDependency(item, wfidCondition, scheduledWfid, true))
  814. return false;
  815. if(condition)
  816. return doExecuteItemDependency(item, wfidTrue, scheduledWfid, false);
  817. else if (wfidFalse)
  818. return doExecuteItemDependency(item, wfidFalse, scheduledWfid, false);
  819. return true;
  820. }
  821. void WorkflowMachine::doExecuteBeginWaitItem(IRuntimeWorkflowItem & item, unsigned scheduledWfid)
  822. {
  823. #ifdef TRACE_WORKFLOW
  824. LOG(MCworkflow, "Begin wait for workflow item %u sched %u", item.queryWfid(), scheduledWfid);
  825. #endif
  826. //Block execution of the currently executing scheduled item
  827. IRuntimeWorkflowItem & scheduledItem = workflow->queryWfid(scheduledWfid);
  828. assertex(scheduledItem.queryState() == WFStateReqd);
  829. scheduledItem.setState(WFStateBlocked);
  830. //And increment the count on the wait wf item so it becomes active
  831. Owned<IWorkflowDependencyIterator> iter = item.getDependencies();
  832. if(!iter->first()) throwUnexpected();
  833. unsigned waitWfid = iter->query();
  834. if(iter->next()) throwUnexpected();
  835. IRuntimeWorkflowItem & waitItem = workflow->queryWfid(waitWfid);
  836. assertex(waitItem.queryState() == WFStateDone);
  837. waitItem.incScheduleCount();
  838. waitItem.setState(WFStateWait);
  839. itemsWaiting++;
  840. }
  841. void WorkflowMachine::doExecuteEndWaitItem(IRuntimeWorkflowItem & item)
  842. {
  843. //Unblock the scheduled workflow item, which should mean execution continues.
  844. unsigned scheduledWfid = item.queryScheduledWfid();
  845. #ifdef TRACE_WORKFLOW
  846. LOG(MCworkflow, "Finished wait for workflow sched %u", scheduledWfid);
  847. #endif
  848. IRuntimeWorkflowItem & scheduledItem = workflow->queryWfid(scheduledWfid);
  849. assertex(scheduledItem.queryState() == WFStateBlocked);
  850. scheduledItem.setState(WFStateReqd);
  851. itemsUnblocked++;
  852. //Note this would be more efficient implemented more like a state machine
  853. //(with next processing rather than walking from the top down),
  854. //but that will require some more work.
  855. }
  856. bool WorkflowMachine::isOlderThanPersist(time_t when, IRuntimeWorkflowItem & item)
  857. {
  858. time_t thisTime;
  859. if (!getPersistTime(thisTime, item))
  860. return false; // if no time must be older than the persist
  861. return when < thisTime;
  862. }
  863. bool WorkflowMachine::isOlderThanInputPersists(time_t when, IRuntimeWorkflowItem & item)
  864. {
  865. Owned<IWorkflowDependencyIterator> iter = item.getDependencies();
  866. ForEach(*iter)
  867. {
  868. unsigned cur = iter->query();
  869. IRuntimeWorkflowItem & other = workflow->queryWfid(cur);
  870. if (isPersist(other))
  871. {
  872. if (isOlderThanPersist(when, other))
  873. return true;
  874. }
  875. else
  876. {
  877. if (isOlderThanInputPersists(when, other))
  878. return true;
  879. }
  880. }
  881. return false;
  882. }
  883. bool WorkflowMachine::isItemOlderThanInputPersists(IRuntimeWorkflowItem & item)
  884. {
  885. time_t curWhen;
  886. if (!getPersistTime(curWhen, item))
  887. return false; // if no time then old and can't tell
  888. return isOlderThanInputPersists(curWhen, item);
  889. }
  890. void WorkflowMachine::performItem(unsigned wfid, unsigned scheduledWfid)
  891. {
  892. #ifdef TRACE_WORKFLOW
  893. if(currentWfid)
  894. LOG(MCworkflow, "Branching from workflow item %u", currentWfid);
  895. LOG(MCworkflow, "Performing workflow item %u", wfid);
  896. #endif
  897. wfidStack.append(currentWfid);
  898. wfidStack.append(scheduledWfid);
  899. currentWfid = wfid;
  900. currentScheduledWfid = scheduledWfid;
  901. process->perform(ctx, wfid);
  902. scheduledWfid = wfidStack.popGet();
  903. currentWfid = wfidStack.popGet();
  904. if(currentWfid)
  905. {
  906. #ifdef TRACE_WORKFLOW
  907. LOG(MCworkflow, "Returning to workflow item %u", currentWfid);
  908. #endif
  909. }
  910. }
  911. bool WorkflowMachine::attemptRetry(IRuntimeWorkflowItem & item, unsigned dep, unsigned scheduledWfid)
  912. {
  913. unsigned wfid = item.queryWfid();
  914. unsigned recovery = item.queryRecovery();
  915. if(!recovery)
  916. return false;
  917. while(item.testAndDecRetries())
  918. {
  919. bool okay = true;
  920. try
  921. {
  922. workflow->queryWfid(recovery).setState(WFStateNull);
  923. executeItem(recovery, recovery);
  924. if(dep)
  925. executeItem(dep, scheduledWfid);
  926. else
  927. performItem(wfid, scheduledWfid);
  928. }
  929. catch(WorkflowException * ce)
  930. {
  931. okay = false;
  932. if(ce->queryType() == WorkflowException::ABORT)
  933. throw;
  934. reportContingencyFailure("RECOVERY", ce);
  935. ce->Release();
  936. }
  937. catch(IException * ce)
  938. {
  939. okay = false;
  940. checkForAbort(wfid, ce);
  941. reportContingencyFailure("RECOVERY", ce);
  942. ce->Release();
  943. }
  944. if(okay)
  945. return true;
  946. }
  947. return false;
  948. }
  949. void WorkflowMachine::handleFailure(IRuntimeWorkflowItem & item, WorkflowException const * e, bool isDep)
  950. {
  951. StringBuffer msg;
  952. e->errorMessage(msg).append(" (in item ").append(e->queryWfid()).append(")");
  953. if(isDep)
  954. logctx.logOperatorException(NULL, NULL, 0, "Dependency failure for workflow item %u: %d: %s", item.queryWfid(), e->errorCode(), msg.str());
  955. else
  956. logctx.logOperatorException(NULL, NULL, 0, "%d: %s", e->errorCode(), msg.str());
  957. item.setFailInfo(e->errorCode(), msg.str());
  958. switch(item.queryType())
  959. {
  960. case WFTypeNormal:
  961. item.setState(WFStateFail);
  962. break;
  963. case WFTypeSuccess:
  964. case WFTypeFailure:
  965. item.setState(WFStateNull);
  966. break;
  967. case WFTypeRecovery:
  968. item.setState(WFStateSkip);
  969. break;
  970. }
  971. unsigned failureWfid = item.queryFailure();
  972. if(failureWfid)
  973. {
  974. try
  975. {
  976. executeItem(failureWfid, failureWfid);
  977. }
  978. catch(WorkflowException * ce)
  979. {
  980. if(ce->queryType() == WorkflowException::ABORT)
  981. throw;
  982. reportContingencyFailure("FAILURE", ce);
  983. ce->Release();
  984. }
  985. }
  986. }
  987. int WorkflowMachine::queryLastFailCode() const
  988. {
  989. unsigned wfidFor = workflow->queryWfid(currentWfid).queryContingencyFor();
  990. if(!wfidFor)
  991. return 0;
  992. return workflow->queryWfid(wfidFor).queryFailCode();
  993. }
  994. char const * WorkflowMachine::queryLastFailMessage() const
  995. {
  996. unsigned wfidFor = workflow->queryWfid(currentWfid).queryContingencyFor();
  997. if(!wfidFor)
  998. return "";
  999. char const * ret = workflow->queryWfid(wfidFor).queryFailMessage();
  1000. return ret ? ret : "";
  1001. }
  1002. const char * WorkflowMachine::queryEventName() const
  1003. {
  1004. //MORE: This doesn't work so well once we've done SEQUENTIAL transforms if they split a wf item into 2
  1005. return workflow->queryWfid(currentWfid).queryEventName();
  1006. }
  1007. const char * WorkflowMachine::queryEventExtra() const
  1008. {
  1009. //MORE: This doesn't work so well once we've done SEQUENTIAL transforms if they split a wf item into 2
  1010. return workflow->queryWfid(currentWfid).queryEventExtra();
  1011. }
  1012. IWorkflowItemIterator *createWorkflowItemIterator(IPropertyTree *p)
  1013. {
  1014. return new CWorkflowItemIterator(p);
  1015. }
  1016. IWorkflowItemArray *createWorkflowItemArray(unsigned size)
  1017. {
  1018. return new CCloneWorkflowItemArray(size);
  1019. }
  1020. IWorkflowItem *createWorkflowItem(IPropertyTree * ptree, unsigned wfid, WFType type, WFMode mode, unsigned success, unsigned failure, unsigned recovery, unsigned retriesAllowed, unsigned contingencyFor)
  1021. {
  1022. return new CWorkflowItem(ptree, wfid, type, mode, success, failure, recovery, retriesAllowed, contingencyFor);
  1023. }