|
@@ -22,9 +22,17 @@
|
|
|
#include "workunit.hpp"
|
|
|
#include "jlog.hpp"
|
|
|
#include "eclhelper.hpp"
|
|
|
+#include <queue>
|
|
|
+#include <thread>
|
|
|
+#include <atomic>
|
|
|
+
|
|
|
+#ifdef _DEBUG
|
|
|
+// #define TRACE_WORKFLOW
|
|
|
+#endif
|
|
|
|
|
|
#define WFERR_ExecutingInWaitState 5100
|
|
|
#define WFERR_ExecutingInBlockedState 5101
|
|
|
+#define WFERR_ExecutingItemMoreThanOnce 5103
|
|
|
|
|
|
class WORKUNIT_API WorkflowException : public IException, public CInterface
|
|
|
{
|
|
@@ -66,6 +74,7 @@ private:
|
|
|
* - Support once, stored, persist workflow items.
|
|
|
*
|
|
|
*/
|
|
|
+class CCloneWorkflowItem;
|
|
|
class WORKUNIT_API WorkflowMachine : public CInterface
|
|
|
{
|
|
|
public:
|
|
@@ -121,9 +130,83 @@ protected:
|
|
|
bool attemptRetry(IRuntimeWorkflowItem & item, unsigned dep, unsigned scheduledWfid);
|
|
|
void handleFailure(IRuntimeWorkflowItem & item, WorkflowException const * e, bool isDep);
|
|
|
|
|
|
+ void addSuccessors();
|
|
|
+ //This function defines the implicit dependencies of the workflow, by creating logical successorships.
|
|
|
+ //It traverses through all the items, by recursing through their dependencies.
|
|
|
+ //It also reverses dependencies, so that items point to their successors.
|
|
|
+ void defineLogicalRelationships(unsigned wfid, CCloneWorkflowItem * logicalPredecessor, bool prevOrdered);
|
|
|
+ CCloneWorkflowItem & queryWorkflowItem(unsigned wfid);
|
|
|
+ //This creates a new node which is inserted as a predecessor to the successor item.
|
|
|
+ //This new runtime item is a logical predecessor - one that may activate the successor.
|
|
|
+ //The logical predecessor can also activate any of the successor's children.
|
|
|
+ //The pointer to the runtime item is returned.
|
|
|
+ CCloneWorkflowItem * insertLogicalPredecessor(unsigned successorWfid);
|
|
|
+
|
|
|
+ void performParallel(IGlobalCodeContext *_ctx, IEclProcess *_process);
|
|
|
+ void processWfItems();
|
|
|
+ void executeItemParallel(unsigned wfid);
|
|
|
+ void doExecuteItemParallel(IRuntimeWorkflowItem & item);
|
|
|
+ void doExecuteConditionExpression(CCloneWorkflowItem & item);
|
|
|
+ void performItemParallel(unsigned wfid);
|
|
|
+ //Returns true if a failure contingency has been queued
|
|
|
+ bool handleFailureParallel(CCloneWorkflowItem & item, WorkflowException * e);
|
|
|
+ //Returns true if a failure contingency has been queued
|
|
|
+ bool activateFailureContingency(CCloneWorkflowItem & item);
|
|
|
+ void checkAbort(CCloneWorkflowItem & item, bool depFailed);
|
|
|
+ void startContingency();
|
|
|
+ void endContingency();
|
|
|
+
|
|
|
+ void processDependentSuccessors(CCloneWorkflowItem &item);
|
|
|
+ void processLogicalSuccessors(CCloneWorkflowItem &item);
|
|
|
+ //when an item fails, this marks dependentSuccessors with the exception belonging to their predecessor
|
|
|
+ void failDependentSuccessors(CCloneWorkflowItem &item);
|
|
|
+
|
|
|
+ void addToItemQueue(unsigned wfid);
|
|
|
+ bool checkIfDone();
|
|
|
+
|
|
|
+ virtual bool getParallelFlag() const = 0;
|
|
|
+ virtual unsigned getThreadNumFlag() const = 0;
|
|
|
+ bool isParallelViable();
|
|
|
+
|
|
|
+
|
|
|
protected:
|
|
|
const IContextLogger &logctx;
|
|
|
Owned<IWorkflowItemArray> workflow;
|
|
|
+ //contains extra workflow items that are created at runtime. These support logical successorships
|
|
|
+ std::vector<Shared<IRuntimeWorkflowItem>> logicalWorkflow;
|
|
|
+ std::queue<unsigned> wfItemQueue;
|
|
|
+ Semaphore wfItemQueueSem;
|
|
|
+ //used to pop/add items to the queue
|
|
|
+ CriticalSection queueCritSec;
|
|
|
+ //optional debug value "parallelThreads" to select number of threads
|
|
|
+ unsigned numThreads = 1U;
|
|
|
+ //the wfid of the parent item. It has no successors, only dependents.
|
|
|
+ unsigned parentWfid = 0U;
|
|
|
+ //If startItem has an item as its logical successor, then that item will be active before the start.
|
|
|
+ //Any items that are active from the start don't need to perform the defineLogicalRelationships algorithm more than once.
|
|
|
+ CCloneWorkflowItem * startItem = nullptr;
|
|
|
+ //flag is set when the "parent" item is reached. There may still be pending contingencies
|
|
|
+ std::atomic<bool> parentReached{false};
|
|
|
+ //flag is set once the workflow is completed
|
|
|
+ std::atomic<bool> done{false};
|
|
|
+ //flag is set when a workflowItem fails and is not successfully recovered
|
|
|
+ std::atomic<bool> abort{false};
|
|
|
+ //This protects against a race condition between activate() and deactivate()
|
|
|
+ CriticalSection activationCritSec;
|
|
|
+ //This protects each item from having its exception set twice, in a race condition
|
|
|
+ CriticalSection exceptionCritSec;
|
|
|
+ //This counts the active contingency clauses (that haven't finished being executed)
|
|
|
+ //This ensures that the query doesn't finish without completing the contingencies.
|
|
|
+ std::atomic<unsigned> activeContingencies{0U};
|
|
|
+ //The number of branches is the number of dependent successors to the failed workflow item.
|
|
|
+ //Each successor then fails its own successors, so the branch count increases.
|
|
|
+ //In order to verify that all possible contingency clauses have been reached, the number of open-ended "branches" that haven't yet reached the parent item must be tracked.
|
|
|
+ //The query is finished when there are zero open-ended branches.
|
|
|
+ std::atomic<unsigned> branchCount{0U};
|
|
|
+ //optional debug value "parallelWorkflow" to select parallel algorithm
|
|
|
+ bool parallel = false;
|
|
|
+ Owned<WorkflowException> runtimeError;
|
|
|
+
|
|
|
IGlobalCodeContext *ctx;
|
|
|
IEclProcess *process;
|
|
|
IntArray wfidStack;
|
|
@@ -131,6 +214,9 @@ protected:
|
|
|
unsigned currentScheduledWfid;
|
|
|
unsigned itemsWaiting;
|
|
|
unsigned itemsUnblocked;
|
|
|
+
|
|
|
+ //allows the condition result to be returned from a process in a thread-safe way
|
|
|
+ CriticalSection conditionCritSec;
|
|
|
unsigned condition;
|
|
|
};
|
|
|
|