Browse Source

Merge pull request #7664 from richardkchapman/cassandra-waitpaused

HPCC-14043 CWorkunitResumeHandler reads dali workunit branch directly

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 years ago
parent
commit
cb4688d1d1

+ 156 - 7
common/workunit/workunit.cpp

@@ -819,6 +819,16 @@ static int getEnum(const IPropertyTree *p, const char *propname, const mapEnums
     return getEnum(p->queryProp(propname),map);
 }
 
+const char * getWorkunitActionStr(WUAction action)
+{
+    return getEnumText(action, actions);
+}
+
+WUAction getWorkunitAction(const char *actionStr)
+{
+    return (WUAction) getEnum(actionStr, actions);
+}
+
 //==========================================================================================
 
 class CLightweightWorkunitInfo : public CInterfaceOf<IConstWorkUnitInfo>
@@ -889,6 +899,106 @@ protected:
     Owned<IRemoteConnection> conn;
 };
 
+CWorkUnitWatcher::CWorkUnitWatcher(IWorkUnitSubscriber *_subscriber, WUSubscribeOptions flags, const char *wuid) : subscriber(_subscriber)
+{
+    abortId = 0;
+    stateId = 0;
+    actionId = 0;
+    assertex((flags & ~SubscribeOptionAbort) == 0);
+    if (flags & SubscribeOptionAbort)
+    {
+        VStringBuffer xpath("/WorkUnitAborts/%s", wuid);
+        abortId = querySDS().subscribe(xpath.str(), *this);
+    }
+}
+CWorkUnitWatcher::~CWorkUnitWatcher()
+{
+    assertex(abortId==0 && stateId==0 && actionId==0);
+}
+
+void CWorkUnitWatcher::unsubscribe()
+{
+    CriticalBlock b(crit);
+    if (abortId)
+        querySDS().unsubscribe(abortId);
+    if (stateId)
+        querySDS().unsubscribe(stateId);
+    if (actionId)
+        querySDS().unsubscribe(actionId);
+    abortId = 0;
+    stateId = 0;
+    actionId = 0;
+}
+
+void CWorkUnitWatcher::notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
+{
+    CriticalBlock b(crit);
+    if (id==stateId)
+        subscriber->notify(SubscribeOptionState);
+    else if (id==actionId)
+        subscriber->notify(SubscribeOptionAction);
+    else if (id==abortId)
+        subscriber->notify(SubscribeOptionAbort);
+}
+
+
+class CDaliWorkUnitWatcher : public CWorkUnitWatcher
+{
+public:
+    CDaliWorkUnitWatcher(IWorkUnitSubscriber *_subscriber, WUSubscribeOptions flags, const char *wuid)
+    : CWorkUnitWatcher(_subscriber, (WUSubscribeOptions) (flags & SubscribeOptionAbort), wuid)
+    {
+        if (flags & SubscribeOptionState)
+        {
+            VStringBuffer xpath("/WorkUnits/%s/State", wuid);
+            stateId = querySDS().subscribe(xpath.str(), *this);
+        }
+        if (flags & SubscribeOptionAction)
+        {
+            VStringBuffer xpath("/WorkUnits/%s/Action", wuid);
+            actionId = querySDS().subscribe(xpath.str(), *this);
+        }
+    }
+};
+
+void CPersistedWorkUnit::subscribe(WUSubscribeOptions options)
+{
+    CriticalBlock block(crit);
+    assertex(options==SubscribeOptionAbort);
+    if (!abortWatcher)
+    {
+        abortWatcher.setown(new CWorkUnitWatcher(this, SubscribeOptionAbort, p->queryName()));
+        abortDirty = true;
+    }
+}
+
+void CPersistedWorkUnit::unsubscribe()
+{
+    CriticalBlock block(crit);
+    if (abortWatcher)
+    {
+        abortWatcher->unsubscribe();
+        abortWatcher.clear();
+    }
+}
+
+bool CPersistedWorkUnit::aborting() const
+{
+    CriticalBlock block(crit);
+    if (abortDirty)
+    {
+        StringBuffer apath;
+        apath.append("/WorkUnitAborts/").append(p->queryName());
+        Owned<IRemoteConnection> acon = querySDS().connect(apath.str(), myProcessSession(), 0, SDS_LOCK_TIMEOUT);
+        if (acon)
+            abortState = acon->queryRoot()->getPropInt(NULL) != 0;
+        else
+            abortState = false;
+        abortDirty = false;
+    }
+    return abortState;
+}
+
 class CDaliWorkUnit : public CPersistedWorkUnit
 {
 public:
@@ -2576,6 +2686,10 @@ public:
     {
         removeShutdownHook(*this);
     }
+    virtual IWorkUnitWatcher *getWatcher(IWorkUnitSubscriber *subscriber, WUSubscribeOptions options, const char *wuid) const
+    {
+        return new CDaliWorkUnitWatcher(subscriber, options, wuid);
+    }
     virtual unsigned validateRepository(bool fixErrors)
     {
         return 0;
@@ -2837,11 +2951,11 @@ public:
 
     virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState)
     {
-        StringBuffer wuRoot;
-        getXPath(wuRoot, wuid);
-        Owned<WorkUnitWaiter> waiter = new WorkUnitWaiter(wuRoot.str(), SDSNotify_Data);
+        Owned<WorkUnitWaiter> waiter = new WorkUnitWaiter(wuid, SubscribeOptionState);
         LocalIAbortHandler abortHandler(*waiter);
         WUState ret = WUStateUnknown;
+        StringBuffer wuRoot;
+        getXPath(wuRoot, wuid);
         Owned<IRemoteConnection> conn = sdsManager->connect(wuRoot.str(), session, 0, SDS_LOCK_TIMEOUT);
         if (conn)
         {
@@ -2859,12 +2973,10 @@ public:
                 case WUStateCompleted:
                 case WUStateFailed:
                 case WUStateAborted:
-                    waiter->unsubscribe();
                     return ret;
                 case WUStateWait:
                     if(returnOnWaitState)
                     {
-                        waiter->unsubscribe();
                         return ret;
                     }
                     break;
@@ -2877,7 +2989,6 @@ public:
                     SessionId agent = conn->queryRoot()->getPropInt64("@agentSession", -1);
                     if (checkAbnormalTermination(wuid, ret, agent))
                     {
-                        waiter->unsubscribe();
                         return ret;
                     }
                     break;
@@ -2900,6 +3011,35 @@ public:
                 conn->reload();
             }
         }
+        return ret;
+    }
+
+    virtual WUAction waitForWorkUnitAction(const char * wuid, WUAction original)
+    {
+        Owned<WorkUnitWaiter> waiter = new WorkUnitWaiter(wuid, SubscribeOptionAction);
+        LocalIAbortHandler abortHandler(*waiter);
+        WUAction ret = WUActionUnknown;
+        StringBuffer wuRoot;
+        getXPath(wuRoot, wuid);
+        Owned<IRemoteConnection> conn = sdsManager->connect(wuRoot.str(), session, 0, SDS_LOCK_TIMEOUT);
+        if (conn)
+        {
+            unsigned start = msTick();
+            loop
+            {
+                ret = (WUAction) getEnum(conn->queryRoot(), "Action", actions);
+                if (ret != original)
+                    break;
+                unsigned waited = msTick() - start;
+                waiter->wait(20000);  // recheck state every 20 seconds even if no timeout, in case eclagent has crashed.
+                if (waiter->aborted)
+                {
+                    ret = WUActionUnknown;  // MORE - throw an exception?
+                    break;
+                }
+                conn->reload();
+            }
+        }
         waiter->unsubscribe();
         return ret;
     }
@@ -3010,6 +3150,10 @@ public:
         : baseFactory(_baseFactory), defaultSecMgr(_secMgr), defaultSecUser(_secUser)
     {
     }
+    virtual IWorkUnitWatcher *getWatcher(IWorkUnitSubscriber *subscriber, WUSubscribeOptions options, const char *wuid) const
+    {
+        return baseFactory->getWatcher(subscriber, options, wuid);
+    }
     virtual unsigned validateRepository(bool fix)
     {
         return baseFactory->validateRepository(fix);
@@ -3141,6 +3285,10 @@ public:
     {
         return baseFactory->waitForWorkUnit(wuid, timeout, compiled, returnOnWaitState);
     }
+    virtual WUAction waitForWorkUnitAction(const char * wuid, WUAction original)
+    {
+        return baseFactory->waitForWorkUnitAction(wuid, original);
+    }
 private:
     Owned<IWorkUnitFactory> baseFactory;
     Linked<ISecManager> defaultSecMgr;
@@ -3805,7 +3953,8 @@ void CLocalWorkUnit::setState(WUState value)
             factory->clearAborting(queryWuid());
     }
     CriticalBlock block(crit);
-    setEnum(p, "@state", value, states);
+    setEnum(p, "@state", value, states);  // For historical reasons, we use state to store the state
+    setEnum(p, "State", value, states);   // But we can only subscribe to elements, not attributes
     if (getDebugValueBool("monitorWorkunit", false))
     {
         switch(value)

+ 14 - 0
common/workunit/workunit.hpp

@@ -696,10 +696,20 @@ interface IConstLocalFileUploadIterator : extends IScmIterator
 
 enum WUSubscribeOptions
 {
+    SubscribeOptionState = 1,
     SubscribeOptionAbort = 2,
+    SubscribeOptionAction = 4
 };
 
+interface IWorkUnitSubscriber
+{
+    virtual void notify(WUSubscribeOptions flags) = 0;
+};
 
+interface IWorkUnitWatcher : extends IInterface
+{
+    virtual void unsubscribe() = 0;
+};
 
 interface IWUGraphProgress;
 interface IPropertyTree;
@@ -1260,6 +1270,7 @@ interface IWorkUnitFactory : extends IInterface
     virtual bool isAborting(const char *wuid) const = 0;
     virtual void clearAborting(const char *wuid) = 0;
     virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState) = 0;
+    virtual WUAction waitForWorkUnitAction(const char * wuid, WUAction original) = 0;
 
     virtual unsigned validateRepository(bool fixErrors) = 0;
     virtual void deleteRepository(bool recreate) = 0;
@@ -1267,6 +1278,7 @@ interface IWorkUnitFactory : extends IInterface
     virtual const char *queryStoreType() const = 0; // Returns "Dali" or "Cassandra"
 
     virtual StringArray &getUniqueValues(WUSortField field, const char *prefix, StringArray &result) const = 0;
+    virtual IWorkUnitWatcher *getWatcher(IWorkUnitSubscriber *subscriber, WUSubscribeOptions options, const char *wuid) const = 0;
 };
 
 interface IWorkflowScheduleConnection : extends IInterface
@@ -1455,6 +1467,8 @@ void WORKUNIT_API testWorkflow();
 #endif
 
 extern WORKUNIT_API const char * getWorkunitStateStr(WUState state);
+extern WORKUNIT_API const char * getWorkunitActionStr(WUAction action);
+extern WORKUNIT_API WUAction getWorkunitAction(const char * actionStr);
 
 extern WORKUNIT_API void addTimeStamp(IWorkUnit * wu, StatisticScopeType scopeType, const char * scope, StatisticKind kind);
 

+ 37 - 84
common/workunit/workunit.ipp

@@ -563,89 +563,24 @@ protected:
     virtual void _loadExceptions() const;
 };
 
-class CPersistedWorkUnit : public CLocalWorkUnit
+class CPersistedWorkUnit : public CLocalWorkUnit, implements IWorkUnitSubscriber
 {
 public:
     CPersistedWorkUnit(ISecManager *secmgr, ISecUser *secuser) : CLocalWorkUnit(secmgr, secuser)
-{
+    {
         abortDirty = true;
         abortState = false;
     }
-
-    virtual void subscribe(WUSubscribeOptions options)
-    {
-        CriticalBlock block(crit);
-        assertex(options==SubscribeOptionAbort);
-        if (!abortWatcher)
-        {
-            abortWatcher.setown(new CWorkUnitAbortWatcher(this, p->queryName()));
-            abortDirty = true;
-        }
-    }
-    virtual void unsubscribe()
-    {
-        CriticalBlock block(crit);
-        if (abortWatcher)
-        {
-            abortWatcher->unsubscribe();
-            abortWatcher.clear();
-        }
-    }
-
-    virtual bool aborting() const
-    {
-        CriticalBlock block(crit);
-        if (abortDirty)
-        {
-            StringBuffer apath;
-            apath.append("/WorkUnitAborts/").append(p->queryName());
-            Owned<IRemoteConnection> acon = querySDS().connect(apath.str(), myProcessSession(), 0, SDS_LOCK_TIMEOUT);
-            if (acon)
-                abortState = acon->queryRoot()->getPropInt(NULL) != 0;
-            else
-                abortState = false;
-            abortDirty = false;
-        }
-        return abortState;
-    }
-
+    virtual void subscribe(WUSubscribeOptions options);
+    virtual void unsubscribe();
+    virtual bool aborting() const;
 protected:
-    class CWorkUnitAbortWatcher : public CInterface, implements ISDSSubscription
-    {
-        CPersistedWorkUnit *parent; // not linked - it links me
-        SubscriptionId abort;
-    public:
-        IMPLEMENT_IINTERFACE;
-        CWorkUnitAbortWatcher(CPersistedWorkUnit *_parent, const char *wuid) : parent(_parent)
-        {
-            StringBuffer wuRoot;
-            wuRoot.append("/WorkUnitAborts/").append(wuid);
-            abort = querySDS().subscribe(wuRoot.str(), *this);
-        }
-        ~CWorkUnitAbortWatcher()
-        {
-            assertex(abort==0);
-        }
-
-        void unsubscribe()
-        {
-            querySDS().unsubscribe(abort);
-            abort = 0;
-        }
-
-        void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
-        {
-            parent->abortDirty = true;
-        }
-    };
-
-    Owned<CWorkUnitAbortWatcher> abortWatcher;
+    virtual void notify(WUSubscribeOptions) { abortDirty = true; }
+    Owned<IWorkUnitWatcher> abortWatcher;
     mutable bool abortDirty;
     mutable bool abortState;
 };
 
-interface ISDSManager; // MORE - can remove once dali split out
-
 class CWorkUnitFactory : public CInterface, implements IWorkUnitFactory
 {
 public:
@@ -740,24 +675,39 @@ protected:
     unsigned id;
 };
 
-class WorkUnitWaiter : public CInterface, implements ISDSSubscription, implements IAbortHandler
+class CWorkUnitWatcher : public CInterface, implements IWorkUnitWatcher, implements ISDSSubscription
 {
-    Semaphore changed;
-    SubscriptionId change;
-    SDSNotifyFlags watchFor;
+protected:
+    CriticalSection crit;
+    IWorkUnitSubscriber *subscriber; // not linked - it will generally link me
+    SubscriptionId abortId, stateId, actionId;
 public:
     IMPLEMENT_IINTERFACE;
+    CWorkUnitWatcher(IWorkUnitSubscriber *_subscriber, WUSubscribeOptions flags, const char *wuid);
+    ~CWorkUnitWatcher();
+    void unsubscribe();
+    void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData);
+};
 
-    WorkUnitWaiter(const char *xpath, SDSNotifyFlags _watchFor)
-    : watchFor(_watchFor)
+class WorkUnitWaiter : public CInterface, implements IAbortHandler, implements IWorkUnitSubscriber
+{
+    Semaphore changed;
+    Owned<IWorkUnitWatcher> watcher;
+public:
+    IMPLEMENT_IINTERFACE;
+    WorkUnitWaiter(const char *wuid, WUSubscribeOptions watchFor)
     {
-        change = querySDS().subscribe(xpath, *this, false);
+        Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
+        watcher.setown(factory->getWatcher(this, watchFor, wuid));
         aborted = false;
     }
-    void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
+    ~WorkUnitWaiter()
     {
-        if (flags & watchFor)
-            changed.signal();
+        unsubscribe();
+    }
+    void notify(WUSubscribeOptions flags)
+    {
+        changed.signal();
     }
     bool wait(unsigned timeout)
     {
@@ -771,8 +721,11 @@ public:
     }
     void unsubscribe()
     {
-        querySDS().unsubscribe(change);
-        change = 0;
+        if (watcher)
+        {
+            watcher->unsubscribe();
+            watcher.clear();
+        }
     }
     bool aborted;
 };

+ 11 - 64
ecl/eclagent/eclgraph.cpp

@@ -1473,74 +1473,21 @@ void EclAgent::executeThorGraph(const char * graphName)
     Owned<IJobQueue> jq = createJobQueue(queueName.str());
 
     bool resubmit;
+    Owned<IWorkUnitFactory> wuFactory = getWorkUnitFactory();
     do // loop if pause interrupted graph and needs resubmitting on resume
     {
         resubmit = false; // set if job interrupted in thor
-        class CWorkunitResumeHandler : public CInterface, implements ISDSSubscription
-        {
-            IConstWorkUnit &wu;
-            StringBuffer xpath;
-            StringAttr wuid;
-            SubscriptionId subId;
-            CriticalSection crit;
-            Semaphore sem;
-            
-            void unsubscribe()
-            {
-                CriticalBlock b(crit);
-                if (subId)
-                {
-                    SubscriptionId _subId = subId;
-                    subId = 0;
-                    querySDS().unsubscribe(_subId);
-                }
-            }
-        public:
-            IMPLEMENT_IINTERFACE;
-            CWorkunitResumeHandler(IConstWorkUnit &_wu) : wu(_wu)
-            {
-                xpath.append("/WorkUnits/");
-                wuid.set(wu.queryWuid());
-                xpath.append(wuid.get()).append("/Action");
-                subId = 0;
-            }
-            ~CWorkunitResumeHandler()
-            {
-                unsubscribe();
-            }
-            void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
-            {
-                CriticalBlock b(crit);
-                if (0 == subId) return;
-                if (valueLen==strlen("resume") && (0 == strncmp("resume", (const char *)valueData, valueLen)))
-                    sem.signal();
-            }
-            bool wait()
-            {
-                subId = querySDS().subscribe(xpath.str(), *this, false, true);
-                assertex(subId);
-                PROGLOG("Job %s paused, waiting for resume/abort", wuid.get());
-                bool ret = true;
-                while (!sem.wait(10000))
-                {
-                    wu.forceReload();
-                    if (WUStatePaused != wu.getState() || wu.aborting())
-                    {
-                        PROGLOG("Aborting pause job %s, state : %s", wuid.get(), wu.queryStateDesc());
-                        ret = false;
-                        break;
-                    }
-                }
-                unsubscribe();
-                return ret;
-            }
-        } workunitResumeHandler(*queryWorkUnit());
-
         unlockWorkUnit();
         if (WUStatePaused == queryWorkUnit()->getState()) // check initial state - and wait if paused
         {
-            if (!workunitResumeHandler.wait())
-                throw new WorkflowException(0,"User abort requested", 0, WorkflowException::ABORT, MSGAUD_user);
+            loop
+            {
+                WUAction action = wuFactory->waitForWorkUnitAction(wuid, queryWorkUnit()->getAction());
+                if (action == WUActionUnknown)
+                    throw new WorkflowException(0, "Workunit aborting", 0, WorkflowException::ABORT, MSGAUD_user);
+                if (action != WUActionPause && action != WUActionPauseNow)
+                    break;
+            }
         }
         {
             Owned <IWorkUnit> w = updateWorkUnit();
@@ -1691,8 +1638,8 @@ void EclAgent::executeThorGraph(const char * graphName)
     while (resubmit); // if pause interrupted job (i.e. with pausenow action), resubmit graph
 }
 
-//In case of logfile rollover, update workunit logfile name(s) stored
-//in SDS/WorkUnits/{WUID}/Process/EclAgent/myeclagent<log>
+//In case of logfile rollover, update logfile name(s) stored in workunit
+
 void EclAgent::updateWULogfile()
 {
     if (logMsgHandler && config->hasProp("@name"))

+ 115 - 38
plugins/cassandra/cassandrawu.cpp

@@ -865,7 +865,7 @@ static const CassandraXmlMapping workunitsMappings [] =
 
     // These are catchalls for anything not processed above or in a child table
 
-    {"elements", "map<text, text>", "@Action@Application@Debug@Exceptions@Files@FilesRead@Graphs@Results@Statistics@Plugins@Query@Variables@Temporaries@Workflow@", elementMapColumnMapper},  // name is the suppression list, note trailing @
+    {"elements", "map<text, text>", "@Action@Application@Debug@Exceptions@Files@FilesRead@Graphs@Results@Statistics@Plugins@Query@State@Variables@Temporaries@Workflow@", elementMapColumnMapper},  // name is the suppression list, note trailing @
     {"subtrees", "map<text, text>", "@DiskUsageStats@Parameters@Process@Tracing@", subTreeMapColumnMapper},  // name is the INCLUSION list, note trailing @
 
     { NULL, "workunits", "((partition), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
@@ -2125,6 +2125,8 @@ public:
         loadPTree(sessionCache->cassandraToWorkunitXML(queryWuid()));
         memset(childLoaded, 0, sizeof(childLoaded));
         allDirty = false;
+        actionChanged = false;
+        stateChanged = false;
         abortDirty = true;
     }
 
@@ -2233,9 +2235,25 @@ public:
                 DBGLOG("Executing batch");
             CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), *batch));
             futureBatch.wait("execute");
+            if (stateChanged)
+            {
+                // Signal changes to state to anyone that might be watching via Dali
+                VStringBuffer xpath("/WorkUnitStates/%s/State", wuid);
+                Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE, SDS_LOCK_TIMEOUT);
+                conn->queryRoot()->setProp(NULL, p->queryProp("@state"));
+            }
+            if (actionChanged)
+            {
+                // Signal changes to action to anyone that might be watching via Dali
+                VStringBuffer xpath("/WorkUnitStates/%s/Action", wuid);
+                Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE, SDS_LOCK_TIMEOUT);
+                conn->queryRoot()->setProp(NULL, p->queryProp("Action"));
+            }
             batch.setown(new CassandraBatch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED))); // Commit leaves it locked...
             prev.clear();
             allDirty = false;
+            stateChanged = false;
+            actionChanged = false;
             dirtyPaths.kill();
             dirtyResults.kill();
         }
@@ -2300,8 +2318,17 @@ public:
     virtual void setState(WUState state)
     {
         if (trackSecondaryChange(getWorkunitStateStr(state), "@state"))
+        {
+            stateChanged = true;
             CPersistedWorkUnit::setState(state);
+        }
+    }
+    virtual void setAction(WUAction action)
+    {
+        actionChanged = true;
+        CPersistedWorkUnit::setAction(action);
     }
+
     virtual void setApplicationValue(const char *app, const char *propname, const char *value, bool overwrite)
     {
         VStringBuffer xpath("Application/%s/%s", app, propname);
@@ -2376,6 +2403,8 @@ public:
         CPersistedWorkUnit::copyWorkUnit(cached, all);
         memset(childLoaded, 1, sizeof(childLoaded));
         allDirty = true;
+        actionChanged = true;
+        stateChanged = true;
     }
     virtual void noteFileRead(IDistributedFile *file)
     {
@@ -2911,6 +2940,8 @@ protected:
     Linked<const ICassandraSession> sessionCache;
     mutable bool childLoaded[ChildTablesSize];
     bool allDirty;
+    bool stateChanged;
+    bool actionChanged;
     Owned<IPTree> prev;
 
     Owned<CassandraBatch> batch;
@@ -2919,6 +2950,25 @@ protected:
     Owned<IRemoteConnection> daliLock;  // We still use dali for locking
 };
 
+class CCassandraWorkUnitWatcher : public CWorkUnitWatcher
+{
+public:
+    CCassandraWorkUnitWatcher(IWorkUnitSubscriber *_subscriber, WUSubscribeOptions flags, const char *wuid)
+    : CWorkUnitWatcher(_subscriber, (WUSubscribeOptions) (flags & SubscribeOptionAbort), wuid)
+    {
+        if (flags & SubscribeOptionState)
+        {
+            VStringBuffer xpath("/WorkUnitStates/%s/State", wuid);
+            stateId = querySDS().subscribe(xpath.str(), *this);
+        }
+        if (flags & SubscribeOptionAction)
+        {
+            VStringBuffer xpath("/WorkUnitStates/%s/Action", wuid);
+            actionId = querySDS().subscribe(xpath.str(), *this);
+        }
+    }
+};
+
 class CCasssandraWorkUnitFactory : public CWorkUnitFactory, implements ICassandraSession
 {
     IMPLEMENT_IINTERFACE;
@@ -2950,29 +3000,35 @@ public:
             cluster.setKeySpace("hpcc");
         cluster.connect();
         Owned<IPTree> versionInfo = getVersionInfo();
-        int major = versionInfo->getPropInt("@major", 0);
-        int minor = versionInfo->getPropInt("@minor", 0);
-        if (major && minor)
+        if (versionInfo)
         {
-            // Note that if there is no version info at all, we have to assume that the repository is not yet created. We don't fail, otherwise no-one can call createRepository the first time...
-            if (major != majorVersion)
-                throw makeStringExceptionV(WUERR_WorkunitVersionMismatch, "Incompatible workunit repository version (wanted %d.%d, found %d.%d)", majorVersion, minorVersion, major, minor);
-            if (minor != minorVersion)
+            int major = versionInfo->getPropInt("@major", 0);
+            int minor = versionInfo->getPropInt("@minor", 0);
+            if (major && minor)
             {
-                if (minor < minorVersion)
+                // Note that if there is no version info at all, we have to assume that the repository is not yet created. We don't fail, otherwise no-one can call createRepository the first time...
+                if (major != majorVersion)
+                    throw makeStringExceptionV(WUERR_WorkunitVersionMismatch, "Incompatible workunit repository version (wanted %d.%d, found %d.%d)", majorVersion, minorVersion, major, minor);
+                if (minor != minorVersion)
                 {
-                    DBGLOG("WARNING: repository version %d.%d is older than current version %d.%d - adding required columns", major, minor, majorVersion, minorVersion);
-                    switch (minor)
+                    if (minor < minorVersion)
                     {
-                    // Add code here to create any columns that we need to to get from version "minor" to expected layout
+                        DBGLOG("WARNING: repository version %d.%d is older than current version %d.%d - adding required columns", major, minor, majorVersion, minorVersion);
+                        switch (minor)
+                        {
+                        // Add code here to create any columns that we need to to get from version "minor" to expected layout
+                        }
                     }
+                    else
+                        DBGLOG("WARNING: repository version %d.%d is newer than current version %d.%d - some columns will not be updated", major, minor, majorVersion, minorVersion);
                 }
-                else
-                    DBGLOG("WARNING: repository version %d.%d is newer than current version %d.%d - some columns will not be updated", major, minor, majorVersion, minorVersion);
             }
         }
         else
+        {
             DBGLOG("WARNING: repository version could not be retrieved (repository not yet created?)");
+            cluster.disconnect();
+        }
         cacheRetirer.start();
     }
 
@@ -2983,6 +3039,10 @@ public:
         if (traceLevel)
             DBGLOG("CCasssandraWorkUnitFactory destroyed");
     }
+    virtual IWorkUnitWatcher *getWatcher(IWorkUnitSubscriber *subscriber, WUSubscribeOptions options, const char *wuid) const
+    {
+        return new CCassandraWorkUnitWatcher(subscriber, options, wuid);
+    }
 
     virtual CLocalWorkUnit* _createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
     {
@@ -3411,8 +3471,7 @@ public:
     */
     virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState)
     {
-        VStringBuffer wuRoot("/WorkUnitLocks/%s", wuid);
-        Owned<WorkUnitWaiter> waiter = new WorkUnitWaiter(wuRoot, SDSNotify_Deleted); // We subscribe to the dali lock branch to give us hints about when wu may have changed
+        Owned<WorkUnitWaiter> waiter = new WorkUnitWaiter(wuid, SubscribeOptionState);
         LocalIAbortHandler abortHandler(*waiter);
         CassandraStatement statement(prepareStatement("select state, agentSession from workunits where partition=? and wuid=?;"));
         statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
@@ -3425,16 +3484,10 @@ public:
             CassandraResult result(cass_future_get_result(future));
             const CassRow *row = cass_result_first_row(result);
             if (!row)
-            {
-                waiter->unsubscribe();
                 return WUStateUnknown;
-            }
             const CassValue *stateVal = cass_row_get_column(row, 0);
             if (!stateVal)
-            {
-                waiter->unsubscribe();
                 return WUStateUnknown;
-            }
             StringBuffer stateStr;
             getCassString(stateStr, stateVal);
             WUState state = getWorkUnitState(stateStr);
@@ -3443,22 +3496,15 @@ public:
             case WUStateCompiled:
             case WUStateUploadingFiles:
                 if (compiled)
-                {
-                    waiter->unsubscribe();
                     return state;
-                }
                 break;
             case WUStateCompleted:
             case WUStateFailed:
             case WUStateAborted:
-                waiter->unsubscribe();
                 return state;
             case WUStateWait:
                 if (returnOnWaitState)
-                {
-                    waiter->unsubscribe();
                     return state;
-                }
                 break;
             case WUStateCompiling:
             case WUStateRunning:
@@ -3468,10 +3514,7 @@ public:
             case WUStateAborting:
                 SessionId agent = getUnsignedResult(NULL, cass_row_get_column(row, 1));
                 if (agent && checkAbnormalTermination(wuid, state, agent))
-                {
-                    waiter->unsubscribe();
                     return state;
-                }
                 break;
             }
             unsigned waited = msTick() - start;
@@ -3479,17 +3522,51 @@ public:
             {
                 waiter->wait(20000);  // recheck state every 20 seconds even if no timeout, in case eclagent has crashed.
                 if (waiter->aborted)
-                {
-                    waiter->unsubscribe();
                     return WUStateUnknown;  // MORE - throw an exception?
-                }
             }
             else if (waited > timeout || !waiter->wait(timeout-waited))
-            {
-                waiter->unsubscribe();
                 return WUStateUnknown;  // MORE - throw an exception?
+        }
+    }
+
+    virtual WUAction waitForWorkUnitAction(const char * wuid, WUAction original)
+    {
+        StringAttr origStr(getWorkunitActionStr(original));
+        Owned<WorkUnitWaiter> waiter = new WorkUnitWaiter(wuid, SubscribeOptionAction);
+        LocalIAbortHandler abortHandler(*waiter);
+        CassandraStatement statement(prepareStatement("select action from workunits where partition=? and wuid=?;"));
+        statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+        statement.bindString(1, wuid);
+        WUAction ret = WUActionUnknown;
+        loop
+        {
+            CassandraFuture future(cass_session_execute(querySession(), statement));
+            future.wait("Lookup wu action");
+            CassandraResult result(cass_future_get_result(future));
+            const CassRow *row = cass_result_first_row(result);
+            if (!row)
+            {
+                PROGLOG("While waiting for job %s, WU no longer exists", wuid);
+                break;
             }
+            const CassValue *actionVal = cass_row_get_column(row, 0);
+            if (!actionVal)
+            {
+                PROGLOG("While waiting for job %s, WU action cannot be read", wuid);
+                break;
+            }
+            StringBuffer actionStr;
+            getCassString(actionStr, actionVal);
+            if (!streq(actionStr, origStr))
+            {
+                ret = getWorkunitAction(actionStr);
+                break;
+            }
+            waiter->wait(10000);  // recheck state every 20 seconds even if no notifications... just because we used to before
+            if (waiter->aborted)
+                break;
         }
+        return ret;
     }
 
     unsigned validateRepository(bool fix)
@@ -3621,7 +3698,7 @@ private:
         {
             DBGLOG("WARNING: Unknown exception caught while trying to retrieve Cassandra repository version information");
         }
-        return createPTreeFromXMLString("<Version/>");
+        return NULL;
     }
 
     bool checkWuExists(const char *wuid)

+ 8 - 62
roxie/ccd/ccdcontext.cpp

@@ -2049,75 +2049,21 @@ protected:
         SCMStringBuffer queueName;
         c->getThorQueue(queueName);
         Owned<IJobQueue> jq = createJobQueue(queueName.str());
-
+        Owned<IWorkUnitFactory> wuFactory = getWorkUnitFactory();
         bool resubmit;
         do // loop if pause interrupted graph and needs resubmitting on resume
         {
             resubmit = false; // set if job interrupted in thor
-            class CWorkunitResumeHandler : public CInterface, implements ISDSSubscription
+            if (WUStatePaused == workUnit->getState()) // check initial state - and wait if paused
             {
-                IConstWorkUnit &wu;
-                StringBuffer xpath;
-                StringAttr wuid;
-                SubscriptionId subId;
-                CriticalSection crit;
-                Semaphore sem;
-
-                void unsubscribe()
-                {
-                    CriticalBlock b(crit);
-                    if (subId)
-                    {
-                        SubscriptionId _subId = subId;
-                        subId = 0;
-                        querySDS().unsubscribe(_subId);
-                    }
-                }
-            public:
-                IMPLEMENT_IINTERFACE;
-                CWorkunitResumeHandler(IConstWorkUnit &_wu) : wu(_wu)
-                {
-                    xpath.append("/WorkUnits/");
-                    wuid.set(wu.queryWuid());
-                    xpath.append(wuid.get()).append("/Action");
-                    subId = 0;
-                }
-                ~CWorkunitResumeHandler()
-                {
-                    unsubscribe();
-                }
-                void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
+                loop
                 {
-                    CriticalBlock b(crit);
-                    if (0 == subId) return;
-                    if (valueLen==strlen("resume") && (0 == strncmp("resume", (const char *)valueData, valueLen)))
-                        sem.signal();
-                }
-                bool wait()
-                {
-                    subId = querySDS().subscribe(xpath.str(), *this, false, true);
-                    assertex(subId);
-                    PROGLOG("Job %s paused, waiting for resume/abort", wuid.get());
-                    bool ret = true;
-                    while (!sem.wait(10000))
-                    {
-                        wu.forceReload();
-                        if (WUStatePaused != wu.getState() || wu.aborting())
-                        {
-                            PROGLOG("Aborting pause job %s, state : %s", wuid.get(), wu.queryStateDesc());
-                            ret = false;
-                            break;
-                        }
-                    }
-                    unsubscribe();
-                    return ret;
+                    WUAction action = wuFactory->waitForWorkUnitAction(wuid, queryWorkUnit()->getAction());
+                    if (action == WUActionUnknown)
+                        throw new WorkflowException(0, "Workunit aborting", 0, WorkflowException::ABORT, MSGAUD_user);
+                    if (action != WUActionPause && action != WUActionPauseNow)
+                        break;
                 }
-            } workunitResumeHandler(*workUnit);
-
-            if (WUStatePaused == workUnit->getState()) // check initial state - and wait if paused
-            {
-                if (!workunitResumeHandler.wait())
-                    throw new WorkflowException(0,"User abort requested", 0, WorkflowException::ABORT, MSGAUD_user);
             }
             setWUState(WUStateBlocked);
 

+ 39 - 68
thorlcr/graph/thgraphmaster.cpp

@@ -1569,96 +1569,67 @@ void CJobMaster::saveSpills()
 
 bool CJobMaster::go()
 {
-    class CWorkunitAbortHandler : public CInterface, implements IThreaded
+    class CWorkunitPauseHandler : public CInterface, implements IWorkUnitSubscriber
     {
         CJobMaster &job;
         IConstWorkUnit &wu;
-        CThreaded threaded;
-        bool running;
-        Semaphore sem;
-    public:
-        CWorkunitAbortHandler(CJobMaster &_job, IConstWorkUnit &_wu)
-            : job(_job), wu(_wu), threaded("WorkunitAbortHandler")
-        {
-            running = true;
-            wu.subscribe(SubscribeOptionAbort);
-            threaded.init(this);
-        }
-        ~CWorkunitAbortHandler()
-        {
-            stop();
-            threaded.join();
-        }
-        virtual void main()
-        {
-            while (running)
-            {
-                if (sem.wait(5000))
-                    break; // signalled aborted
-                if (wu.aborting())
-                {
-                    LOG(MCwarning, thorJob, "ABORT detected from user");
-                    Owned <IException> e = MakeThorException(TE_WorkUnitAborting, "User signalled abort");
-                    job.fireException(e);
-                    break;
-                }
-            }
-        }
-        void stop() { running = false; sem.signal(); }
-    } wuAbortHandler(*this, *workunit);
-    class CWorkunitPauseHandler : public CInterface, implements ISDSSubscription
-    {
-        CJobMaster &job;
-        IConstWorkUnit &wu;
-        SubscriptionId subId;
-        bool subscribed;
+        Owned<IWorkUnitWatcher> watcher;
         CriticalSection crit;
     public:
         IMPLEMENT_IINTERFACE;
 
         CWorkunitPauseHandler(CJobMaster &_job, IConstWorkUnit &_wu) : job(_job), wu(_wu)
         {
-            StringBuffer xpath("/WorkUnits/");
-            xpath.append(wu.queryWuid()).append("/Action");  // MORE - this should not be done here!
-            subId = querySDS().subscribe(xpath.str(), *this, false, true);
-            subscribed = true;
+            Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
+            watcher.setown(factory->getWatcher(this, (WUSubscribeOptions) (SubscribeOptionAction | SubscribeOptionAbort), wu.queryWuid()));
         }
         ~CWorkunitPauseHandler() { stop(); }
         void stop()
         {
             CriticalBlock b(crit);
-            if (subscribed)
+            if (watcher)
             {
-                subscribed = false;
-                querySDS().unsubscribe(subId);
+                watcher->unsubscribe();
+                watcher.clear();
             }
         }
-        void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
+        void notify(WUSubscribeOptions flags)
         {
             CriticalBlock b(crit);
-            if (!subscribed) return;
-            job.markWuDirty();
-            bool abort = false;
-            bool pause = false;
-            if (valueLen && valueLen==strlen("pause") && (0 == strncmp("pause", (const char *)valueData, valueLen)))
-            {
-                // pause after current subgraph
-                pause = true;
-            }
-            else if (valueLen && valueLen==strlen("pausenow") && (0 == strncmp("pausenow", (const char *)valueData, valueLen)))
+            if (!watcher)
+                return;
+            if (flags & SubscribeOptionAbort)
             {
-                // abort current subgraph
-                abort = true;
-                pause = true;
-            }
-            else
-            {
-                abort = pause = false;
+                if (wu.aborting())
+                {
+                    LOG(MCwarning, thorJob, "ABORT detected from user");
+                    Owned <IException> e = MakeThorException(TE_WorkUnitAborting, "User signalled abort");
+                    job.fireException(e);
+                }
             }
-            if (pause)
+            if (flags & SubscribeOptionAction)
             {
-                PROGLOG("Pausing job%s", abort?" [now]":"");
-                job.pause(abort);
+                job.markWuDirty();
+                bool abort = false;
+                bool pause = false;
+                wu.forceReload();
+                WUAction action = wu.getAction();
+                if (action==WUActionPause)
+                {
+                    // pause after current subgraph
+                    pause = true;
+                }
+                else if (action==WUActionPauseNow)
+                {
+                    // abort current subgraph
+                    abort = true;
+                    pause = true;
+                }
+                if (pause)
+                {
+                    PROGLOG("Pausing job%s", abort?" [now]":"");
+                    job.pause(abort);
+                }
             }
         }
     } workunitPauseHandler(*this, *workunit);