workflow.hpp 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef workflow_incl
  14. #define workflow_incl
  15. #include "thorhelper.hpp"
  16. #include "workunit.hpp"
  17. #include "jlog.hpp"
  18. #include "eclhelper.hpp"
  19. class WORKUNIT_API WorkflowException : public CInterface, public IException
  20. {
  21. public:
  22. typedef enum { SYSTEM, USER, ABORT } Type;
  23. WorkflowException(int _code, char const * _msg, unsigned _wfid, Type _type, MessageAudience _audience) : code(_code), msg(_msg), wfid(_wfid), type(_type), audience(_audience) {}
  24. virtual ~WorkflowException() {}
  25. IMPLEMENT_IINTERFACE;
  26. virtual int errorCode() const { return code; }
  27. virtual StringBuffer & errorMessage(StringBuffer & buff) const { return buff.append(msg.get()); }
  28. virtual MessageAudience errorAudience() const { return audience; }
  29. unsigned queryWfid() const { return wfid; }
  30. Type queryType() const { return type; }
  31. private:
  32. int code;
  33. StringAttr msg;
  34. unsigned wfid;
  35. Type type;
  36. MessageAudience audience;
  37. };
  38. /** This is the main work-flow interface. The dependency tree is kept in the
  39. * IWorkflowItemArray object and each item is executed in order, or recursed
  40. * in case of dependencies.
  41. *
  42. * The workflow object is created with a global function createWorkflowItemArray
  43. * and populated via the createWorkflowItem. Shouldn't be static member? Or better,
  44. * using a builder or factory pattern?
  45. *
  46. * Calling the method perform will then execute the whole dependency graph recursively,
  47. * depth-first, and account for workunits' scheduling and machine epilogue/prologue.
  48. *
  49. * The main features are:
  50. * - Allow a graph of dependent workflow items
  51. * - Allow actions to be performed on success/failure
  52. * - Allow recovery actions before retrying, with limits on number of retries.
  53. * - Ensure that workflow items inside SEQUENTIAL actions are executed correctly.
  54. * - Allow workflow items to be triggered on events.
  55. * - Support once, stored, persist workflow items.
  56. *
  57. */
  58. class WORKUNIT_API WorkflowMachine : public CInterface
  59. {
  60. public:
  61. WorkflowMachine();
  62. WorkflowMachine(const IContextLogger &logctx);
  63. void perform(IGlobalCodeContext *_ctx, IEclProcess *_process);
  64. unsigned queryCurrentWfid() const { return currentWfid; }
  65. int queryLastFailCode() const;
  66. char const * queryLastFailMessage() const;
  67. const char * queryEventName() const;
  68. const char * queryEventExtra() const;
  69. bool hasItemsWaiting() const { return (itemsWaiting > 0); }
  70. void setCondition(bool value) { condition = value; }
  71. bool isItemOlderThanInputPersists(IRuntimeWorkflowItem & item);
  72. protected:
  73. // Machine specific prologue/epilogue
  74. virtual void begin() = 0;
  75. virtual void end() = 0;
  76. // Workflow specific scheduling
  77. virtual void schedulingStart() = 0;
  78. virtual bool schedulingPull() = 0;
  79. virtual bool schedulingPullStop() = 0;
  80. // Error handling
  81. virtual void reportContingencyFailure(char const * type, IException * e) = 0;
  82. virtual void checkForAbort(unsigned wfid, IException * handling) = 0;
  83. // Persistence styles varies from machine to machine
  84. virtual void doExecutePersistItem(IRuntimeWorkflowItem & item) = 0;
  85. virtual void doExecuteCriticalItem(IRuntimeWorkflowItem & item) = 0;
  86. virtual bool getPersistTime(time_t & when, IRuntimeWorkflowItem & item) = 0;
  87. // Check conditions, item type and call operations below based on type
  88. bool executeItem(unsigned wfid, unsigned scheduledWfid);
  89. // Iterate through dependencies and execute them
  90. bool doExecuteItemDependencies(IRuntimeWorkflowItem & item, unsigned scheduledWfid);
  91. bool doExecuteItemDependency(IRuntimeWorkflowItem & item, unsigned dep, unsigned scheduledWfid, bool alwaysEvaluate);
  92. // Execute an item (wrapper to deal with exceptions)
  93. void doExecuteItem(IRuntimeWorkflowItem & item, unsigned scheduledWfid);
  94. // Actually executes item: calls process->perform()
  95. void performItem(unsigned wfid, unsigned scheduledWfid);
  96. // Conditional dependency execution
  97. bool doExecuteConditionItem(IRuntimeWorkflowItem & item, unsigned scheduledWfid);
  98. // Block execution of the currently executing scheduled item
  99. void doExecuteBeginWaitItem(IRuntimeWorkflowItem & item, unsigned scheduledWfid);
  100. // Unblock the scheduled workflow item, which should mean execution continues.
  101. void doExecuteEndWaitItem(IRuntimeWorkflowItem & item);
  102. //Used for checking if a persist is older than its inputs
  103. bool isOlderThanPersist(time_t when, IRuntimeWorkflowItem & item);
  104. bool isOlderThanInputPersists(time_t when, IRuntimeWorkflowItem & item);
  105. bool attemptRetry(IRuntimeWorkflowItem & item, unsigned dep, unsigned scheduledWfid);
  106. void handleFailure(IRuntimeWorkflowItem & item, WorkflowException const * e, bool isDep);
  107. protected:
  108. const IContextLogger &logctx;
  109. Owned<IWorkflowItemArray> workflow;
  110. IGlobalCodeContext *ctx;
  111. IEclProcess *process;
  112. IntArray wfidStack;
  113. unsigned currentWfid;
  114. unsigned currentScheduledWfid;
  115. unsigned itemsWaiting;
  116. unsigned itemsUnblocked;
  117. unsigned condition;
  118. };
  119. extern WORKUNIT_API IWorkflowItemIterator *createWorkflowItemIterator(IPropertyTree *p);
  120. extern WORKUNIT_API IWorkflowItemArray *createWorkflowItemArray(unsigned size);
  121. extern WORKUNIT_API IWorkflowItem *createWorkflowItem(IPropertyTree * ptree, unsigned wfid, WFType type, WFMode mode, unsigned success, unsigned failure, unsigned recovery, unsigned retriesAllowed, unsigned contingencyFor);
  122. extern WORKUNIT_API IWorkflowItemIterator *createWorkflowItemIterator(IPropertyTree * ptree);
  123. #ifdef TRACE_WORKFLOW
  124. extern const WORKUNIT_API LogMsgCategory MCworkflow; // Category used to inform enqueue/start/finish of workflow item
  125. #endif
  126. #endif