瀏覽代碼

HPCC-12250 Allow pluggable interface to workunit creation

Split CLocalWorkUnit from CDaliWorkUnit, and remove unused subscription
mechanism.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 年之前
父節點
當前提交
3d09480607
共有 2 個文件被更改,包括 169 次插入251 次删除
  1. 169 245
      common/workunit/workunit.cpp
  2. 0 6
      common/workunit/workunit.hpp

+ 169 - 245
common/workunit/workunit.cpp

@@ -1017,18 +1017,13 @@ template <>  struct CachedTags<CLocalWUAppValue, IConstWUAppValue>
 };
 
 
-class CLocalWorkUnit : public CInterface, implements IWorkUnit , implements ISDSSubscription, implements IExtendedWUInterface
+class CLocalWorkUnit : public CInterface, implements IWorkUnit , implements IExtendedWUInterface
 {
     friend StringBuffer &exportWorkUnitToXML(const IConstWorkUnit *wu, StringBuffer &str, bool decodeGraphs, bool includeProgress);
     friend void exportWorkUnitToXMLFile(const IConstWorkUnit *wu, const char * filename, unsigned extraXmlFlags, bool decodeGraphs, bool includeProgress);
 
-    // NOTE - order is important - we need to construct connection before p and (especially) destroy after p
-    Owned<IRemoteConnection> connection;
+protected:
     Owned<IPropertyTree> p;
-    bool dirty;
-    bool connectAtRoot;
-    mutable bool abortDirty;
-    mutable bool abortState;
     mutable CriticalSection crit;
     mutable Owned<IWUQuery> query;
     mutable Owned<IWUWebServicesInfo> webServicesInfo;
@@ -1064,9 +1059,8 @@ class CLocalWorkUnit : public CInterface, implements IWorkUnit , implements ISDS
 public:
     IMPLEMENT_IINTERFACE;
 
-    CLocalWorkUnit(IRemoteConnection *_conn, ISecManager *secmgr, ISecUser *secuser);
-    CLocalWorkUnit(IRemoteConnection *_conn, IPropertyTree* root, ISecManager *secmgr, ISecUser *secuser);
-    ~CLocalWorkUnit();
+    CLocalWorkUnit(IPropertyTree* root, ISecManager *secmgr, ISecUser *secuser);
+    void beforeDispose();
     CLocalWorkUnit(const char *dummyWuid, const char *xml, ISecManager *secmgr, ISecUser *secuser);
     IPropertyTree *getUnpackedTree(bool includeProgress) const;
 
@@ -1076,7 +1070,7 @@ public:
     void setSecIfcs(ISecManager *mgr, ISecUser*usr){secMgr.set(mgr); secUser.set(usr);}
     
     virtual bool aborting() const;
-    virtual void forceReload();
+    virtual void forceReload() {};
     virtual WUAction getAction() const;
     virtual IStringVal& getActionEx(IStringVal & str) const;
     virtual IStringVal & getApplicationValue(const char * application, const char * propname, IStringVal & str) const;
@@ -1155,7 +1149,6 @@ public:
     virtual bool isProtected() const;
     virtual bool isPausing() const;
     virtual IWorkUnit& lock();
-    virtual bool reload();
     virtual void requestAbort();
     virtual void subscribe(WUSubscribeOptions options);
     virtual unsigned calculateHash(unsigned prevHash);
@@ -1249,7 +1242,6 @@ public:
 
     IWorkUnit &lockRemote(bool commit);
     void unlockRemote();
-    void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen=0, const void *valueData=NULL);
     void abort();
     void cleanupAndDelete(bool deldll,bool deleteOwned, const StringArray *deleteExclusions=NULL);
     bool switchThorQueue(const char *cluster, IQueueSwitcher *qs);
@@ -1392,7 +1384,7 @@ protected:
         }
     }
 
-private:
+protected:
     void init();
     IWUGraph *createGraph();
     IWUResult *createResult();
@@ -1407,48 +1399,161 @@ private:
     void unsubscribe();
     void checkAgentRunning(WUState & state);
 
-    // MORE - the two could be a bit more similar...
+    void ensureGraphsUnpacked ()
+    {
+        IPropertyTree *t = p->queryPropTree("PackedGraphs");
+        MemoryBuffer buf;
+        if (t&&t->getPropBin(NULL,buf)) {
+            cachedGraphs.clear();
+            IPropertyTree *st = createPTree(buf);
+            if (st) {
+                p->setPropTree("Graphs",st);
+                p->removeTree(t);
+            }
+        }
+    }
+
+    // Implemented by derived classes
+    virtual void _lockRemote() {};
+    virtual void _unlockRemote() {};
 
-    class CWorkUnitWatcher : public CInterface, implements ISDSSubscription
+};
+
+class CDaliWorkUnit : public CLocalWorkUnit
+{
+public:
+    IMPLEMENT_IINTERFACE;
+    CDaliWorkUnit(IRemoteConnection *_conn, ISecManager *secmgr, ISecUser *secuser)
+    : connection(_conn), CLocalWorkUnit(_conn->getRoot(), secmgr, secuser)
     {
-        ISDSSubscription *parent; // not linked - it links me
-        SubscriptionId change;
-        bool sub;
-    public:
-        IMPLEMENT_IINTERFACE;
-        CWorkUnitWatcher(ISDSSubscription *_parent, const char *wuid, bool _sub) : parent(_parent), sub(_sub)
+        abortDirty = true;
+        abortState = false;
+    }
+    ~CDaliWorkUnit()
+    {
+        // NOTE - order is important - we need to construct connection before p and (especially) destroy after p
+        // We use the beforeDIspose() in base class to help ensure this
+        p.clear();
+    }
+
+    virtual void forceReload()
+    {
+        synchronized sync(locked); // protect locked workunits (uncommited writes) from reload
+        StringBuffer wuRoot;
+        getXPath(wuRoot, p->queryName());
+        IRemoteConnection *newconn = querySDS().connect(wuRoot.str(), myProcessSession(), 0, SDS_LOCK_TIMEOUT);
+        if (!newconn)
+            throw MakeStringException(WUERR_ConnectFailed, "Could not connect to workunit %s (deleted?)",p->queryName());
+        CriticalBlock block(crit);
+        connection.setown(newconn);
+        init();
+        abortDirty = true;
+        p.setown(connection->getRoot());
+    }
+
+    virtual void cleanupAndDelete(bool deldll, bool deleteOwned, const StringArray *deleteExclusions)
+    {
+        CLocalWorkUnit::cleanupAndDelete(deldll, deleteOwned, deleteExclusions);
+        connection->close(true);
+        PROGLOG("WUID %s removed",p->queryName());
+        connection.clear();
+    }
+
+    virtual void commit()
+    {
+        CLocalWorkUnit::commit();
+        if (connection)
+            connection->commit();
+    }
+
+    virtual void _lockRemote()
+    {
+        StringBuffer wuRoot;
+        getXPath(wuRoot, p->queryName());
+        if (connection)
+            connection->changeMode(RTM_LOCK_WRITE,SDS_LOCK_TIMEOUT);
+        else
+            connection.setown(querySDS().connect(wuRoot.str(), myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT));
+        if (!connection)
+            throw MakeStringException(WUERR_LockFailed, "Failed to get connection for xpath %s", wuRoot.str());
+        init();
+        abortDirty = true;
+        p.setown(connection->getRoot());
+    }
+
+    virtual void _unlockRemote()
+    {
+        try
         {
-            StringBuffer wuRoot;
-            getXPath(wuRoot, wuid);
-            change = querySDS().subscribe(wuRoot.str(), *this, sub);
+            //MORE: I'm not convinced this is useful...
+            setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, NULL, StWhenWorkunitModified, NULL, getTimeStampNowValue(), 1, 0, StatsMergeReplace);
+            try
+            {
+                connection->commit();
+            }
+            catch (IException *e)
+            {
+                EXCLOG(e, "Error during workunit commit");
+                connection->rollback();
+                connection->changeMode(0, SDS_LOCK_TIMEOUT);
+                throw;
+            }
+            connection->changeMode(0, SDS_LOCK_TIMEOUT);
         }
-        ~CWorkUnitWatcher()
+        catch (IException *E)
         {
-            assertex(change==0);
+            StringBuffer s;
+            PrintLog("Failed to release write lock on workunit: %s", E->errorMessage(s).str());
+            throw;
         }
-        bool watchingChildren()
+    }
+
+    virtual void subscribe(WUSubscribeOptions options)
+    {
+        CriticalBlock block(crit);
+        assertex(options==SubscribeOptionAbort);
+        if (!abortWatcher)
         {
-            return sub;
+            abortWatcher.setown(new CWorkUnitAbortWatcher(this, p->queryName()));
+            abortDirty = true;
         }
-        void unsubscribe()
+    }
+
+    virtual void unsubscribe()
+    {
+        CriticalBlock block(crit);
+        if (abortWatcher)
         {
-            querySDS().unsubscribe(change);
-            change = 0;
+            abortWatcher->unsubscribe();
+            abortWatcher.clear();
         }
+    }
 
-        void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
+    virtual bool aborting() const
+    {
+        CriticalBlock block(crit);
+        if (abortDirty)
         {
-            parent->notify(id, xpath, flags, valueLen, valueData);
+            StringBuffer apath;
+            apath.append("/WorkUnitAborts/").append(p->queryName());
+            Owned<IRemoteConnection> acon = querySDS().connect(apath.str(), myProcessSession(), 0, SDS_LOCK_TIMEOUT);
+            if (acon)
+                abortState = acon->queryRoot()->getPropInt(NULL) != 0;
+            else
+                abortState = false;
+            abortDirty = false;
         }
-    };
+        return abortState;
+    }
 
+protected:
     class CWorkUnitAbortWatcher : public CInterface, implements ISDSSubscription
     {
-        CLocalWorkUnit *parent; // not linked - it links me
+        CDaliWorkUnit *parent; // not linked - it links me
         SubscriptionId abort;
     public:
         IMPLEMENT_IINTERFACE;
-        CWorkUnitAbortWatcher(CLocalWorkUnit *_parent, const char *wuid) : parent(_parent)
+        CWorkUnitAbortWatcher(CDaliWorkUnit *_parent, const char *wuid) : parent(_parent)
         {
             StringBuffer wuRoot;
             wuRoot.append("/WorkUnitAborts/").append(wuid);
@@ -1467,26 +1572,15 @@ private:
 
         void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
         {
-            parent->abort();
+            parent->abortDirty = true;
         }
     };
-    Owned<CWorkUnitAbortWatcher> abortWatcher;
-    Owned<CWorkUnitWatcher> changeWatcher;
 
-    void ensureGraphsUnpacked ()
-    {
-        IPropertyTree *t = p->queryPropTree("PackedGraphs");
-        MemoryBuffer buf;
-        if (t&&t->getPropBin(NULL,buf)) {
-            cachedGraphs.clear();
-            IPropertyTree *st = createPTree(buf);
-            if (st) {
-                p->setPropTree("Graphs",st);
-                p->removeTree(t);
-            }
-        }
-    }
+    Owned<IRemoteConnection> connection;
+    Owned<CWorkUnitAbortWatcher> abortWatcher;
 
+    mutable bool abortDirty;
+    mutable bool abortState;
 };
 
 class CLockedWorkUnit : public CInterface, implements ILocalWorkUnit, implements IExtendedWUInterface
@@ -2239,18 +2333,16 @@ class CConstWUArrayIterator : public CInterface, implements IConstWorkUnitIterat
     IArrayOf<IPropertyTree> trees;
     Owned<IConstWorkUnit> cur;
     unsigned curTreeNum;
-    Linked<IRemoteConnection> conn;
-    Linked<ISecManager> secmgr;
-    Linked<ISecUser> secuser;
+    Linked<IWorkUnitFactory> factory;
 
     void setCurrent()
     {
-        cur.setown(new CLocalWorkUnit(LINK(conn), LINK(&trees.item(curTreeNum)), secmgr, secuser));
+        cur.setown(factory->openWorkUnit(trees.item(curTreeNum).queryName(), false));
     }
 public:
     IMPLEMENT_IINTERFACE;
-    CConstWUArrayIterator(IRemoteConnection *_conn, IArrayOf<IPropertyTree> &_trees, ISecManager *_secmgr=NULL, ISecUser *_secuser=NULL)
-        : conn(_conn), secmgr(_secmgr), secuser(_secuser)
+    CConstWUArrayIterator(IWorkUnitFactory *_factory, IArrayOf<IPropertyTree> &_trees)
+        : factory(_factory)
     {
         ForEachItemIn(t, _trees)
             trees.append(*LINK(&_trees.item(t)));
@@ -2476,7 +2568,7 @@ public:
         getXPath(wuRoot, name);
         IRemoteConnection* conn = sdsManager->connect(wuRoot.str(), session, RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
         conn->queryRoot()->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
-        Owned<CLocalWorkUnit> cw = new CLocalWorkUnit(conn, (ISecManager *)NULL, NULL);
+        Owned<CLocalWorkUnit> cw = new CDaliWorkUnit(conn, (ISecManager *)NULL, NULL);
         return &cw->lockRemote(false);
     }
 
@@ -2491,7 +2583,7 @@ public:
             conn = sdsManager->connect(wuRoot.str(), session, RTM_LOCK_WRITE|RTM_CREATE, SDS_LOCK_TIMEOUT);
         conn->queryRoot()->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
         conn->queryRoot()->setPropInt("@wuidVersion", WUID_VERSION);
-        Owned<CLocalWorkUnit> cw = new CLocalWorkUnit(conn, (ISecManager*)NULL, NULL);
+        Owned<CLocalWorkUnit> cw = new CDaliWorkUnit(conn, (ISecManager*)NULL, NULL);
         IWorkUnit* ret = &cw->lockRemote(false);
         ret->setDebugValue("CREATED_BY", app, true);
         ret->setDebugValue("CREATED_FOR", user, true);
@@ -2535,7 +2627,7 @@ public:
                 PrintLog("deleteWorkUnit %s not found", wuid);
             return false;
         }
-        Owned<CLocalWorkUnit> cw = new CLocalWorkUnit(conn, secmgr, secuser); // takes ownership of conn
+        Owned<CLocalWorkUnit> cw = new CDaliWorkUnit(conn, secmgr, secuser); // takes ownership of conn
         if (secmgr && !checkWuSecAccess(*cw.get(), *secmgr, secuser, SecAccess_Full, "delete", true, true)) {
             if (raiseexceptions) {
                 // perhaps raise exception here?
@@ -2620,7 +2712,7 @@ public:
         IRemoteConnection* conn = sdsManager->connect(wuRoot.str(), session, lock ? RTM_LOCK_READ|RTM_LOCK_SUB : 0, SDS_LOCK_TIMEOUT);
         if (conn)
         {
-            CLocalWorkUnit *wu = new CLocalWorkUnit(conn, secmgr, secuser);
+            CLocalWorkUnit *wu = new CDaliWorkUnit(conn, secmgr, secuser);
             if (secmgr && wu)
             {
                 if (!checkWuSecAccess(*wu, *secmgr, secuser, SecAccess_Read, "opening", true, true))
@@ -2651,7 +2743,7 @@ public:
         IRemoteConnection* conn = sdsManager->connect(wuRoot.str(), session, RTM_LOCK_WRITE|RTM_LOCK_SUB, SDS_LOCK_TIMEOUT);
         if (conn)
         {
-            Owned<CLocalWorkUnit> cw = new CLocalWorkUnit(conn, secmgr, secuser);
+            Owned<CLocalWorkUnit> cw = new CDaliWorkUnit(conn, secmgr, secuser);
             if (secmgr && cw)
             {
                 if (!checkWuSecAccess(*cw.get(), *secmgr, secuser, SecAccess_Write, "updating", true, true))
@@ -2716,7 +2808,7 @@ public:
             Owned<IPropertyTreeIterator> iter(queryDaliServerVersion().compare(serverVersionNeeded) < 0 ? 
                 conn->queryRoot()->getElements(xpath) : 
                 conn->getElements(xpath));
-            return new CConstWUIterator(conn, iter, secmgr, secuser);
+            return new CConstWUIterator(this, iter, secmgr, secuser);
         }
         else
             return NULL;
@@ -2848,7 +2940,7 @@ public:
         IArrayOf<IPropertyTree> results;
         Owned<IElementsPager> elementsPager = new CWorkUnitsPager(query.str(), so.length()?so.str():NULL, namefilterlo.get(), namefilterhi.get(), unknownAttributes);
         Owned<IRemoteConnection> conn=getElementsPaged(elementsPager,startoffset,maxnum,secmgr?sc:NULL,queryowner,cachehint,results,total);
-        return new CConstWUArrayIterator(conn, results, secmgr, secuser);
+        return new CConstWUArrayIterator(this, results);
     }
 
     
@@ -3084,15 +3176,13 @@ private:
     class CConstWUIterator : public CInterface, implements IConstWorkUnitIterator
     {
         Owned<IConstWorkUnit> cur;
-        Linked<IRemoteConnection> conn;
+        Linked<IWorkUnitFactory> factory;
         Linked<IPropertyTreeIterator> ptreeIter;
-        Linked<ISecManager> secmgr;
-        Linked<ISecUser> secuser;
         Owned<ISecResourceList> scopes;
 
         void setCurrent()
         {
-            cur.setown(new CLocalWorkUnit(LINK(conn), LINK(&ptreeIter->query()), secmgr, secuser));
+            cur.setown(factory->openWorkUnit(ptreeIter->query().queryName(), false));
         }
         bool getNext() // scan for a workunit with permissions
         {
@@ -3116,8 +3206,8 @@ private:
         }
     public:
         IMPLEMENT_IINTERFACE;
-        CConstWUIterator(IRemoteConnection *_conn, IPropertyTreeIterator *_ptreeIter, ISecManager *_secmgr=NULL, ISecUser *_secuser=NULL)
-            : conn(_conn), ptreeIter(_ptreeIter), secmgr(_secmgr), secuser(_secuser)
+        CConstWUIterator(IWorkUnitFactory *_factory, IPropertyTreeIterator *_ptreeIter, ISecManager *secmgr=NULL, ISecUser *secuser=NULL)
+            : factory(_factory), ptreeIter(_ptreeIter)
         {
             UniqueScopes us;
             if (secmgr /* && secmgr->authTypeRequired(RT_WORKUNIT_SCOPE) tbd */)
@@ -3376,19 +3466,8 @@ public:
 };
 //==========================================================================================
 
-CLocalWorkUnit::CLocalWorkUnit(IRemoteConnection *_conn, ISecManager *secmgr, ISecUser *secuser) : connection(_conn)
+CLocalWorkUnit::CLocalWorkUnit(IPropertyTree* root, ISecManager *secmgr, ISecUser *secuser)
 {
-    connectAtRoot = true;
-    init();
-    p.setown(connection->getRoot());
-    secMgr.set(secmgr);
-    secUser.set(secuser);
-}
-
-CLocalWorkUnit::CLocalWorkUnit(IRemoteConnection *_conn, IPropertyTree* root, ISecManager *secmgr, ISecUser *secuser) : connection(_conn)
-{
-    connectAtRoot = false;
-    init();
     p.setown(root);
     secMgr.set(secmgr);
     secUser.set(secuser);
@@ -3419,15 +3498,11 @@ void CLocalWorkUnit::init()
     activitiesCached = false;
     webServicesInfoCached = false;
     roxieQueryInfoCached = false;
-    dirty = false;
-    abortDirty = true;
-    abortState = false;
 }
 
 // Dummy workunit support
 CLocalWorkUnit::CLocalWorkUnit(const char *_wuid, const char *xml, ISecManager *secmgr, ISecUser *secuser)
 {
-    connectAtRoot = true;
     init();
     if (xml)
         p.setown(createPTreeFromXMLString(xml));
@@ -3440,12 +3515,8 @@ CLocalWorkUnit::CLocalWorkUnit(const char *_wuid, const char *xml, ISecManager *
     secUser.set(secuser);
 }
 
-CLocalWorkUnit::~CLocalWorkUnit() 
+void CLocalWorkUnit::beforeDispose()
 {
-    if (workUnitTraceLevel > 1)
-    {
-        PrintLog("Releasing workunit %s mode %x", p->queryName(), connection ? connection->queryMode() :0);
-    }
     try
     {
         unsubscribe();
@@ -3469,7 +3540,6 @@ CLocalWorkUnit::~CLocalWorkUnit()
         secUser.clear();
         cachedGraphs.clear();
         p.clear();
-        connection.clear();
     }
     catch (IException *E) { LOG(MCexception(E, MSGCLS_warning), E, "Exception during ~CLocalWorkUnit"); E->Release(); }
 }
@@ -3478,7 +3548,6 @@ void CLocalWorkUnit::cleanupAndDelete(bool deldll, bool deleteOwned, const Strin
 {
     TIME_SECTION("WUDELETE cleanupAndDelete total");
     // Delete any related things in SDS etc that might otherwise be forgotten
-    assertex(connectAtRoot); // make sure we don't delete entire workunit tree!
     if (p->getPropBool("@protected", false))
         throw MakeStringException(WUERR_WorkunitProtected, "%s: Workunit is protected",p->queryName());
     switch (getState())
@@ -3559,9 +3628,6 @@ void CLocalWorkUnit::cleanupAndDelete(bool deldll, bool deleteOwned, const Strin
         WARNLOG("Unknown exception during cleanupAndDelete: %s", p->queryName()); 
     }
     CConstGraphProgress::deleteWuidProgress(p->queryName());
-    connection->close(true);  
-    PROGLOG("WUID %s removed",p->queryName());
-    connection.clear();
 }
 
 void CLocalWorkUnit::setTimeScheduled(const IJlibDateTime &val)
@@ -3933,16 +3999,6 @@ void CLocalWorkUnit::deserialize(MemoryBuffer &src)
     loadXML(value);
 }
 
-void CLocalWorkUnit::notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
-{
-    dirty = true;
-}
-
-void CLocalWorkUnit::abort()
-{
-    abortDirty = true;
-}
-
 void CLocalWorkUnit::requestAbort()
 {
     CriticalBlock block(crit);
@@ -3951,90 +4007,10 @@ void CLocalWorkUnit::requestAbort()
 
 void CLocalWorkUnit::subscribe(WUSubscribeOptions options)
 {
-    CriticalBlock block(crit);
-    bool subscribeAbort = false;
-    bool subscribeChange = false;
-    bool needChildren = true;
-    switch (options)
-    {
-    case SubscribeOptionAbort:
-        subscribeAbort = true;
-        break;
-    case SubscribeOptionRunningState:
-        needChildren = false;
-    case SubscribeOptionAnyState:
-        subscribeAbort = true;
-        subscribeChange = true;
-        break;
-    case SubscribeOptionProgress:
-    case SubscribeOptionAll:
-        subscribeChange = true;
-        break;
-    }
-    if (subscribeChange)
-    {
-        if (changeWatcher && changeWatcher->watchingChildren() != needChildren)
-        {
-            changeWatcher->unsubscribe();
-            changeWatcher.clear();
-        }
-        if (!changeWatcher)
-        {
-            changeWatcher.setown(new CWorkUnitWatcher(this, p->queryName(), needChildren));
-            dirty = true;
-        }
-    }
-    if (subscribeAbort && !abortWatcher)
-    {
-        abortWatcher.setown(new CWorkUnitAbortWatcher(this, p->queryName()));
-        abortDirty = true;
-    }
-}
-
-void CLocalWorkUnit::forceReload()
-{
-    dirty = true;
-    reload();
-}
-
-bool CLocalWorkUnit::reload()
-{
-    synchronized sync(locked); // protect locked workunits (uncommited writes) from reload
-    CriticalBlock block(crit);
-    if (dirty)
-    {
-        if (!connectAtRoot)
-        {
-            StringBuffer wuRoot;
-            getXPath(wuRoot, p->queryName());
-            IRemoteConnection *newconn = factory->sdsManager->connect(wuRoot.str(), factory->session, 0, SDS_LOCK_TIMEOUT);
-            if (!newconn)
-                throw MakeStringException(WUERR_ConnectFailed, "Could not connect to workunit %s (deleted?)",p->queryName());
-            connection.setown(newconn);
-            connectAtRoot = true;
-        }
-        else
-            connection->reload();
-        init();
-        p.setown(connection->getRoot());
-        return true;
-    }
-    return false;
 }
 
 void CLocalWorkUnit::unsubscribe()
 {
-    CriticalBlock block(crit);
-    if (abortWatcher)
-    {
-        abortWatcher->unsubscribe();
-        abortWatcher.clear();
-    }
-    if (changeWatcher)
-    {
-        changeWatcher->unsubscribe();
-        changeWatcher.clear();
-    }
 }
 
 void CLocalWorkUnit::unlockRemote()
@@ -4043,27 +4019,7 @@ void CLocalWorkUnit::unlockRemote()
     locked.unlock();
     if (IsShared())  // Is this right? Doesn't feel right! Commit on last unlock would seem smarter
     {
-        try
-        {
-            assertex(connectAtRoot);
-            //MORE: I'm not convinced this is useful...
-            setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, NULL, StWhenWorkunitModified, NULL, getTimeStampNowValue(), 1, 0, StatsMergeReplace);
-            try { connection->commit(); }
-            catch (IException *e)
-            { 
-                EXCLOG(e, "Error during workunit commit");
-                connection->rollback();
-                connection->changeMode(0, SDS_LOCK_TIMEOUT);
-                throw;
-            }
-            connection->changeMode(0, SDS_LOCK_TIMEOUT);
-        }
-        catch (IException *E)
-        {
-            StringBuffer s;
-            PrintLog("Failed to release write lock on workunit: %s", E->errorMessage(s).str());
-            throw;
-        }
+        _unlockRemote();
     }
 }
 
@@ -4077,17 +4033,7 @@ IWorkUnit &CLocalWorkUnit::lockRemote(bool commit)
     {
         try
         {
-            StringBuffer wuRoot;
-            getXPath(wuRoot, p->queryName());
-            if (connection&&connectAtRoot) 
-                connection->changeMode(RTM_LOCK_WRITE,SDS_LOCK_TIMEOUT);
-            else 
-                connection.setown(factory->sdsManager->connect(wuRoot.str(), factory->session, RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT));
-            if (!connection)
-                throw MakeStringException(WUERR_LockFailed, "Failed to get connection for xpath %s", wuRoot.str());
-            connectAtRoot = true;
-            init();
-            p.setown(connection->getRoot());
+            _lockRemote();
         }
         catch (IException *E)
         {
@@ -4102,10 +4048,7 @@ IWorkUnit &CLocalWorkUnit::lockRemote(bool commit)
 
 void CLocalWorkUnit::commit()
 {
-    CriticalBlock block(crit);
-    assertex(connectAtRoot);
-    if (connection)
-        connection->commit();
+    // Nothing to do if not backed by a persistent store
 }
 
 IWorkUnit& CLocalWorkUnit::lock()
@@ -4348,11 +4291,7 @@ void CLocalWorkUnit::setState(WUState value)
     CriticalBlock block(crit);
     if (value==WUStateAborted || value==WUStatePaused || value==WUStateCompleted || value==WUStateFailed || value==WUStateSubmitted || value==WUStateWait)
     {
-        if (abortWatcher)
-        {
-            abortWatcher->unsubscribe();
-            abortWatcher.clear();
-        }
+        unsubscribe();
         StringBuffer apath;
         apath.append("/WorkUnitAborts/").append(p->queryName());
         if(factory)
@@ -4395,22 +4334,7 @@ void CLocalWorkUnit::setAgentSession(__int64 sessionId)
 
 bool CLocalWorkUnit::aborting() const 
 {
-    CriticalBlock block(crit);
-    if (abortDirty)
-    {
-        if (factory)
-        {
-            StringBuffer apath;
-            apath.append("/WorkUnitAborts/").append(p->queryName());
-            Owned<IRemoteConnection> acon = factory->sdsManager->connect(apath.str(), factory->session, 0, SDS_LOCK_TIMEOUT);
-            if (acon)
-                abortState = acon->queryRoot()->getPropInt(NULL)!=0;
-            else
-                abortState = false;
-        }
-        abortDirty = false;
-    }
-    return abortState;
+    return false;
 }
 
 bool CLocalWorkUnit::getIsQueryService() const 

+ 0 - 6
common/workunit/workunit.hpp

@@ -721,12 +721,7 @@ interface IConstLocalFileUploadIterator : extends IScmIterator
 
 enum WUSubscribeOptions
 {
-    SubscribeOptionRunningState = 0,
-    SubscribeOptionAnyState = 1,
     SubscribeOptionAbort = 2,
-    SubscribeOptionProgress = 3,
-    SubscribeOptionAll = 4,
-    SubscribeOptionSize = 5
 };
 
 
@@ -1045,7 +1040,6 @@ interface IConstWorkUnit : extends IInterface
     virtual bool isProtected() const = 0;
     virtual bool isPausing() const = 0;
     virtual IWorkUnit & lock() = 0;
-    virtual bool reload() = 0;
     virtual void requestAbort() = 0;
     virtual void subscribe(WUSubscribeOptions options) = 0;
     virtual unsigned queryFileUsage(const char * filename) const = 0;