workflow.hpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef workflow_incl
  14. #define workflow_incl
  15. #include "thorhelper.hpp"
  16. #include "workunit.hpp"
  17. #include "jlog.hpp"
  18. #include "eclhelper.hpp"
  19. #include <queue>
  20. #include <thread>
  21. #include <atomic>
  22. #ifdef _DEBUG
  23. // #define TRACE_WORKFLOW
  24. #endif
  25. #define WFERR_ExecutingInWaitState 5100
  26. #define WFERR_ExecutingInBlockedState 5101
  27. #define WFERR_ExecutingItemMoreThanOnce 5103
  28. class WORKUNIT_API WorkflowException : public IException, public CInterface
  29. {
  30. public:
  31. typedef enum { SYSTEM, USER, ABORT } Type;
  32. WorkflowException(int _code, char const * _msg, unsigned _wfid, Type _type, MessageAudience _audience) : code(_code), msg(_msg), wfid(_wfid), type(_type), audience(_audience) {}
  33. virtual ~WorkflowException() {}
  34. IMPLEMENT_IINTERFACE;
  35. virtual int errorCode() const { return code; }
  36. virtual StringBuffer & errorMessage(StringBuffer & buff) const { return buff.append(msg.get()); }
  37. virtual MessageAudience errorAudience() const { return audience; }
  38. unsigned queryWfid() const { return wfid; }
  39. Type queryType() const { return type; }
  40. private:
  41. int code;
  42. StringAttr msg;
  43. unsigned wfid;
  44. Type type;
  45. MessageAudience audience;
  46. };
  47. /** This is the main work-flow interface. The dependency tree is kept in the
  48. * IWorkflowItemArray object and each item is executed in order, or recursed
  49. * in case of dependencies.
  50. *
  51. * The workflow object is created with a global function createWorkflowItemArray
  52. * and populated via the createWorkflowItem. Shouldn't be static member? Or better,
  53. * using a builder or factory pattern?
  54. *
  55. * Calling the method perform will then execute the whole dependency graph recursively,
  56. * depth-first, and account for workunits' scheduling and machine epilogue/prologue.
  57. *
  58. * The main features are:
  59. * - Allow a graph of dependent workflow items
  60. * - Allow actions to be performed on success/failure
  61. * - Allow recovery actions before retrying, with limits on number of retries.
  62. * - Ensure that workflow items inside SEQUENTIAL actions are executed correctly.
  63. * - Allow workflow items to be triggered on events.
  64. * - Support once, stored, persist workflow items.
  65. *
  66. */
  67. class CCloneWorkflowItem;
  68. class WORKUNIT_API WorkflowMachine : public CInterface
  69. {
  70. public:
  71. WorkflowMachine();
  72. WorkflowMachine(const IContextLogger &logctx);
  73. void perform(IGlobalCodeContext *_ctx, IEclProcess *_process);
  74. int queryLastFailCode() const;
  75. char const * queryLastFailMessage() const;
  76. const char * queryEventName() const;
  77. const char * queryEventExtra() const;
  78. bool hasItemsWaiting() const { return (itemsWaiting > 0); }
  79. void setCondition(bool value) { condition = value; }
  80. bool isItemOlderThanInputPersists(IRuntimeWorkflowItem & item);
  81. protected:
  82. // Machine specific prologue/epilogue
  83. virtual void begin() = 0;
  84. virtual void end() = 0;
  85. // Workflow specific scheduling
  86. virtual void schedulingStart() = 0;
  87. virtual bool schedulingPull() = 0;
  88. virtual bool schedulingPullStop() = 0;
  89. // Error handling
  90. virtual void reportContingencyFailure(char const * type, IException * e) = 0;
  91. virtual void checkForAbort(unsigned wfid, IException * handling) = 0;
  92. // Persistence styles varies from machine to machine
  93. virtual void doExecutePersistItem(IRuntimeWorkflowItem & item) = 0;
  94. virtual void doExecuteCriticalItem(IRuntimeWorkflowItem & item) = 0;
  95. virtual bool getPersistTime(time_t & when, IRuntimeWorkflowItem & item) = 0;
  96. virtual void noteTiming(unsigned wfid, timestamp_type startTime, stat_type elapsedNs) = 0;
  97. // Check conditions, item type and call operations below based on type
  98. bool executeItem(unsigned wfid, unsigned scheduledWfid);
  99. // Iterate through dependencies and execute them
  100. bool doExecuteItemDependencies(IRuntimeWorkflowItem & item, unsigned scheduledWfid);
  101. bool doExecuteItemDependency(IRuntimeWorkflowItem & item, unsigned dep, unsigned scheduledWfid, bool alwaysEvaluate);
  102. // Execute an item (wrapper to deal with exceptions)
  103. void doExecuteItem(IRuntimeWorkflowItem & item, unsigned scheduledWfid);
  104. // Actually executes item: calls process->perform()
  105. void performItem(unsigned wfid, unsigned scheduledWfid);
  106. // Conditional dependency execution
  107. bool doExecuteConditionItem(IRuntimeWorkflowItem & item, unsigned scheduledWfid);
  108. // Block execution of the currently executing scheduled item
  109. void doExecuteBeginWaitItem(IRuntimeWorkflowItem & item, unsigned scheduledWfid);
  110. // Unblock the scheduled workflow item, which should mean execution continues.
  111. void doExecuteEndWaitItem(IRuntimeWorkflowItem & item);
  112. //Used for checking if a persist is older than its inputs
  113. bool isOlderThanPersist(time_t when, IRuntimeWorkflowItem & item);
  114. bool isOlderThanInputPersists(time_t when, IRuntimeWorkflowItem & item);
  115. bool attemptRetry(IRuntimeWorkflowItem & item, unsigned dep, unsigned scheduledWfid);
  116. void handleFailure(IRuntimeWorkflowItem & item, WorkflowException const * e, bool isDep);
  117. void addSuccessors();
  118. //This function defines the implicit dependencies of the workflow, by creating logical successorships.
  119. //It traverses through all the items, by recursing through their dependencies.
  120. //It also reverses dependencies, so that items point to their successors.
  121. void defineLogicalRelationships(unsigned wfid, CCloneWorkflowItem * logicalPredecessor, bool prevOrdered);
  122. CCloneWorkflowItem & queryWorkflowItem(unsigned wfid);
  123. //This creates a new node which is inserted as a predecessor to the successor item.
  124. //This new runtime item is a logical predecessor - one that may activate the successor.
  125. //The logical predecessor can also activate any of the successor's children.
  126. //The pointer to the runtime item is returned.
  127. CCloneWorkflowItem * insertLogicalPredecessor(unsigned successorWfid);
  128. void performParallel(IGlobalCodeContext *_ctx, IEclProcess *_process);
  129. void processWfItems();
  130. void executeItemParallel(unsigned wfid);
  131. void doExecuteItemParallel(IRuntimeWorkflowItem & item);
  132. void doExecuteConditionExpression(CCloneWorkflowItem & item);
  133. void performItemParallel(unsigned wfid);
  134. //Returns true if a failure contingency has been queued
  135. bool handleFailureParallel(CCloneWorkflowItem & item, WorkflowException * e);
  136. //Returns true if a failure contingency has been queued
  137. bool activateFailureContingency(CCloneWorkflowItem & item);
  138. void checkAbort(CCloneWorkflowItem & item, bool depFailed);
  139. void startContingency();
  140. void endContingency();
  141. void processDependentSuccessors(CCloneWorkflowItem &item);
  142. void processLogicalSuccessors(CCloneWorkflowItem &item);
  143. //when an item fails, this marks dependentSuccessors with the exception belonging to their predecessor
  144. void failDependentSuccessors(CCloneWorkflowItem &item);
  145. void addToItemQueue(unsigned wfid);
  146. bool checkIfDone();
  147. virtual bool getParallelFlag() const = 0;
  148. virtual unsigned getThreadNumFlag() const = 0;
  149. bool isParallelViable();
  150. protected:
  151. const IContextLogger &logctx;
  152. Owned<IWorkflowItemArray> workflow;
  153. //contains extra workflow items that are created at runtime. These support logical successorships
  154. std::vector<Shared<IRuntimeWorkflowItem>> logicalWorkflow;
  155. std::queue<unsigned> wfItemQueue;
  156. Semaphore wfItemQueueSem;
  157. //used to pop/add items to the queue
  158. CriticalSection queueCritSec;
  159. //optional debug value "parallelThreads" to select number of threads
  160. unsigned numThreads = 1U;
  161. //the wfid of the parent item. It has no successors, only dependents.
  162. unsigned parentWfid = 0U;
  163. //If startItem has an item as its logical successor, then that item will be active before the start.
  164. //Any items that are active from the start don't need to perform the defineLogicalRelationships algorithm more than once.
  165. CCloneWorkflowItem * startItem = nullptr;
  166. //flag is set when the "parent" item is reached. There may still be pending contingencies
  167. std::atomic<bool> parentReached{false};
  168. //flag is set once the workflow is completed
  169. std::atomic<bool> done{false};
  170. //flag is set when a workflowItem fails and is not successfully recovered
  171. std::atomic<bool> abort{false};
  172. //This protects against a race condition between activate() and deactivate()
  173. CriticalSection activationCritSec;
  174. //This protects each item from having its exception set twice, in a race condition
  175. CriticalSection exceptionCritSec;
  176. //This counts the active contingency clauses (that haven't finished being executed)
  177. //This ensures that the query doesn't finish without completing the contingencies.
  178. std::atomic<unsigned> activeContingencies{0U};
  179. //The number of branches is the number of dependent successors to the failed workflow item.
  180. //Each successor then fails its own successors, so the branch count increases.
  181. //In order to verify that all possible contingency clauses have been reached, the number of open-ended "branches" that haven't yet reached the parent item must be tracked.
  182. //The query is finished when there are zero open-ended branches.
  183. std::atomic<unsigned> branchCount{0U};
  184. //optional debug value "parallelWorkflow" to select parallel algorithm
  185. bool parallel = false;
  186. Owned<WorkflowException> runtimeError;
  187. IGlobalCodeContext *ctx;
  188. IEclProcess *process;
  189. IntArray wfidStack;
  190. unsigned currentWfid;
  191. unsigned currentScheduledWfid;
  192. unsigned itemsWaiting;
  193. unsigned itemsUnblocked;
  194. //allows the condition result to be returned from a process in a thread-safe way
  195. CriticalSection conditionCritSec;
  196. unsigned condition;
  197. };
  198. extern WORKUNIT_API IWorkflowItemIterator *createWorkflowItemIterator(IPropertyTree *p);
  199. extern WORKUNIT_API IWorkflowItemArray *createWorkflowItemArray(unsigned size);
  200. extern WORKUNIT_API IWorkflowItem *createWorkflowItem(IPropertyTree * ptree, unsigned wfid, WFType type, WFMode mode, unsigned success, unsigned failure, unsigned recovery, unsigned retriesAllowed, unsigned contingencyFor);
  201. extern WORKUNIT_API IWorkflowItemIterator *createWorkflowItemIterator(IPropertyTree * ptree);
  202. extern WORKUNIT_API const char * queryWorkflowTypeText(WFType type);
  203. extern WORKUNIT_API const char * queryWorkflowModeText(WFMode mode);
  204. extern WORKUNIT_API const char * queryWorkflowStateText(WFState state);
  205. #ifdef TRACE_WORKFLOW
  206. constexpr LogMsgCategory MCworkflow = MCprogress(50); // Category used to inform enqueue/start/finish of workflow item
  207. #endif
  208. #endif