Browse Source

HPCC-12250 Allow pluggable interface to workunit creation

More complete split of CWorkUnitFactory - should be ready for writing a
Cassandra version of the factory / workunit classes. Still need to think
about how locking is done.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 years ago
parent
commit
5ebda1b262

+ 303 - 284
common/workunit/workunit.cpp

@@ -2111,9 +2111,6 @@ public:
 
 CWorkUnitFactory::CWorkUnitFactory()
 {
-    // Assumes dali client configuration has already been done
-    sdsManager = &querySDS();
-    session = myProcessSession();
     deletedllworkq.setown(createWorkQueueThread());
 }
 
@@ -2122,38 +2119,22 @@ CWorkUnitFactory::~CWorkUnitFactory()
     // deletepool->joinAll();
 }
 
-IWorkUnit* CWorkUnitFactory::getGlobalWorkUnit()
-{
-    StringBuffer wuRoot;
-    getXPath(wuRoot, GLOBAL_WORKUNIT);
-    IRemoteConnection* conn = sdsManager->connect(wuRoot.str(), session, RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
-    conn->queryRoot()->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
-    Owned<CLocalWorkUnit> cw = new CDaliWorkUnit(conn, (ISecManager *) NULL, NULL);
-    return &cw->lockRemote(false);
-}
-
-IWorkUnit* CWorkUnitFactory::createNamedWorkUnit(const char *wuid, const char *app, const char *user)
-{
-    return secCreateNamedWorkUnit(wuid, app, user, NULL, NULL);
-}
-
 IWorkUnit* CWorkUnitFactory::secCreateNamedWorkUnit(const char *wuid, const char *app, const char *user, ISecManager *secmgr, ISecUser *secuser)
 {
-    StringBuffer wuRoot;
-    getXPath(wuRoot, wuid);
-    IRemoteConnection *conn;
-    conn = sdsManager->connect(wuRoot.str(), session, RTM_LOCK_WRITE|RTM_CREATE_UNIQUE, SDS_LOCK_TIMEOUT);
-    conn->queryRoot()->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
-    conn->queryRoot()->setPropInt("@wuidVersion", WUID_VERSION);
+    Owned<CLocalWorkUnit> cw = _createWorkUnit(wuid, secmgr, secuser);
     if (user)
-        conn->queryRoot()->setProp("@scope", user);  // Note - we do this here rather than calling setWuScope as we don't want the permission check. It will be checked by the lockRemote anyway.
-    Owned<CLocalWorkUnit> cw = new CDaliWorkUnit(conn, secmgr, secuser);
+        cw->setWuScope(user);  // Note - this may check access rights and throw exception. Is that correct? We might prefer to only check access once, and this will check on the lock too...
     IWorkUnit* ret = &cw->lockRemote(false);   // Note - this may throw exception if user does not have rights.
     ret->setDebugValue("CREATED_BY", app, true);
     ret->setDebugValue("CREATED_FOR", user, true);
     return ret;
 }
 
+IWorkUnit* CWorkUnitFactory::createNamedWorkUnit(const char *wuid, const char *app, const char *user)
+{
+    return secCreateNamedWorkUnit(wuid, app, user, NULL, NULL);
+}
+
 IWorkUnit* CWorkUnitFactory::createWorkUnit(const char *app, const char *user)
 {
     return secCreateWorkUnit(app, user, NULL, NULL);
@@ -2177,73 +2158,33 @@ IWorkUnit* CWorkUnitFactory::secCreateWorkUnit(const char *app, const char *user
     return ret;
 }
 
-bool CWorkUnitFactory::secDeleteWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser, bool raiseexceptions)
+bool CWorkUnitFactory::secDeleteWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser)
 {
     if (workUnitTraceLevel > 1)
         PrintLog("deleteWorkUnit %s", wuid);
     StringBuffer wuRoot;
     getXPath(wuRoot, wuid);
-    IRemoteConnection *conn = sdsManager->connect(wuRoot.str(), session, RTM_LOCK_WRITE|RTM_LOCK_SUB, SDS_LOCK_TIMEOUT);
-    if (!conn)
-    {
-        if (workUnitTraceLevel > 0)
-            PrintLog("deleteWorkUnit %s not found", wuid);
+    Owned<CLocalWorkUnit> cw = _updateWorkUnit(wuid, secmgr, secuser);
+    if (secmgr && !checkWuSecAccess(*cw.get(), *secmgr, secuser, SecAccess_Full, "delete", true, true))
         return false;
+    try
+    {
+        cw->cleanupAndDelete(true, true);
     }
-    Owned<CLocalWorkUnit> cw = new CDaliWorkUnit(conn, secmgr, secuser); // takes ownership of conn
-    if (secmgr && !checkWuSecAccess(*cw.get(), *secmgr, secuser, SecAccess_Full, "delete", true, true)) {
-        if (raiseexceptions) {
-            // perhaps raise exception here?
-        }
+    catch (IException *E)
+    {
+        StringBuffer s;
+        LOG(MCexception(E, MSGCLS_warning), E, s.append("Exception during deleteWorkUnit: ").append(wuid).str());
+        E->Release();
         return false;
     }
-    if (raiseexceptions) {
-        try
-        {
-            cw->cleanupAndDelete(true,true);
-        }
-        catch (IException *E)
-        {
-            StringBuffer s;
-            LOG(MCexception(E, MSGCLS_warning), E, s.append("Exception during deleteWorkUnit: ").append(wuid).str());
-            E->Release();
-            return false;
-        }
-    }
-    else
-        cw->cleanupAndDelete(true,true);
     removeWorkUnitFromAllQueues(wuid); //known active workunits wouldn't make it this far
     return true;
 }
 
-bool CWorkUnitFactory::deleteWorkUnitEx(const char * wuid)
-{
-    return secDeleteWorkUnit(wuid,NULL,NULL,true);
-}
 bool CWorkUnitFactory::deleteWorkUnit(const char * wuid)
 {
-    return secDeleteWorkUnit(wuid,NULL,NULL,false);
-}
-IConstWorkUnitIterator * CWorkUnitFactory::getWorkUnitsByOwner(const char * owner)
-{
-    StringBuffer path("*");
-    if (owner && *owner)
-        path.append("[@submitID=\"").append(owner).append("\"]");
-    return getWorkUnitsByXPath(path.str());
-}
-IConstWorkUnitIterator * CWorkUnitFactory::getWorkUnitsByECL(const char* ecl)
-{
-    StringBuffer path("*");
-    if (ecl && *ecl)
-        path.append("[Query/Text=~\"*").append(ecl).append("*\"]");
-    return getWorkUnitsByXPath(path.str());
-}
-IConstWorkUnitIterator * CWorkUnitFactory::getWorkUnitsByCluster(const char* cluster)
-{
-    StringBuffer path("*");
-    if (cluster && *cluster)
-        path.append("[@clusterName=\"").append(cluster).append("\"]");
-    return getWorkUnitsByXPath(path.str());
+    return secDeleteWorkUnit(wuid,NULL,NULL);
 }
 
 IConstWorkUnit* CWorkUnitFactory::secOpenWorkUnit(const char *wuid, bool lock, ISecManager *secmgr, ISecUser *secuser)
@@ -2263,21 +2204,12 @@ IConstWorkUnit* CWorkUnitFactory::secOpenWorkUnit(const char *wuid, bool lock, I
 
     if (workUnitTraceLevel > 1)
         PrintLog("openWorkUnit %s", wuidStr.str());
-    StringBuffer wuRoot;
-    getXPath(wuRoot, wuidStr.str());
-    IRemoteConnection* conn = sdsManager->connect(wuRoot.str(), session, lock ? RTM_LOCK_READ|RTM_LOCK_SUB : 0, SDS_LOCK_TIMEOUT);
-    if (conn)
+    Owned<IConstWorkUnit> wu = _openWorkUnit(wuid, lock, secmgr, secuser);
+    if (wu)
     {
-        CLocalWorkUnit *wu = new CDaliWorkUnit(conn, secmgr, secuser);
-        if (secmgr && wu)
-        {
-            if (!checkWuSecAccess(*wu, *secmgr, secuser, SecAccess_Read, "opening", true, true))
-            {
-                delete wu;
-                return NULL;
-            }
-        }
-        return wu;
+        if (secmgr && !checkWuSecAccess(*wu, *secmgr, secuser, SecAccess_Read, "opening", true, true))
+            return NULL; // Actually throws exception on failure, so won't reach here
+        return wu.getClear();
     }
     else
     {
@@ -2286,26 +2218,22 @@ IConstWorkUnit* CWorkUnitFactory::secOpenWorkUnit(const char *wuid, bool lock, I
         return NULL;
     }
 }
+
 IConstWorkUnit* CWorkUnitFactory::openWorkUnit(const char *wuid, bool lock)
 {
     return secOpenWorkUnit(wuid, lock, NULL, NULL);
 }
+
 IWorkUnit* CWorkUnitFactory::secUpdateWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
 {
     if (workUnitTraceLevel > 1)
         PrintLog("updateWorkUnit %s", wuid);
-    StringBuffer wuRoot;
-    getXPath(wuRoot, wuid);
-    IRemoteConnection* conn = sdsManager->connect(wuRoot.str(), session, RTM_LOCK_WRITE|RTM_LOCK_SUB, SDS_LOCK_TIMEOUT);
-    if (conn)
+    Owned<CLocalWorkUnit> wu = _updateWorkUnit(wuid, secmgr, secuser);
+    if (wu)
     {
-        Owned<CLocalWorkUnit> cw = new CDaliWorkUnit(conn, secmgr, secuser);
-        if (secmgr && cw)
-        {
-            if (!checkWuSecAccess(*cw.get(), *secmgr, secuser, SecAccess_Write, "updating", true, true))
-                return NULL;
-        }
-        return &cw->lockRemote(false);
+        if (secmgr && !checkWuSecAccess(*wu.get(), *secmgr, secuser, SecAccess_Write, "updating", true, true))
+            return NULL;
+        return &wu->lockRemote(false);
     }
     else
     {
@@ -2314,10 +2242,12 @@ IWorkUnit* CWorkUnitFactory::secUpdateWorkUnit(const char *wuid, ISecManager *se
         return NULL;
     }
 }
+
 IWorkUnit* CWorkUnitFactory::updateWorkUnit(const char *wuid)
 {
     return secUpdateWorkUnit(wuid, NULL, NULL);
 }
+
 int CWorkUnitFactory::setTracingLevel(int newLevel)
 {
     if (newLevel)
@@ -2326,10 +2256,6 @@ int CWorkUnitFactory::setTracingLevel(int newLevel)
     workUnitTraceLevel = newLevel;
     return level;
 }
-IConstWorkUnitIterator * CWorkUnitFactory::getWorkUnitsByXPath(const char *xpath)
-{
-    return secGetWorkUnitsByXPath(xpath, NULL, NULL);
-}
 
 void CWorkUnitFactory::descheduleAllWorkUnits()
 {
@@ -2355,150 +2281,22 @@ void CWorkUnitFactory::descheduleAllWorkUnits()
     do more = root->removeProp("*"); while(more);
 }
 
-IConstWorkUnitIterator * CWorkUnitFactory::secGetWorkUnitsByXPath(const char *xpath, ISecManager *secmgr, ISecUser *secuser)
+IConstWorkUnitIterator * CWorkUnitFactory::getWorkUnitsByOwner(const char * owner)
 {
-    Owned<IRemoteConnection> conn = sdsManager->connect("/WorkUnits", session, 0, SDS_LOCK_TIMEOUT);
-    if (conn)
-    {
-        CDaliVersion serverVersionNeeded("3.2");
-        Owned<IPropertyTreeIterator> iter(queryDaliServerVersion().compare(serverVersionNeeded) < 0 ?
-            conn->queryRoot()->getElements(xpath) :
-            conn->getElements(xpath));
-        return new CConstWUIterator(iter, secmgr, secuser);
-    }
-    else
-        return NULL;
+    return secGetWorkUnitsByOwner(owner, NULL, NULL);
 }
-
-IConstWorkUnitIterator* CWorkUnitFactory::secGetWorkUnitsSorted( WUSortField *sortorder, // list of fields to sort by (terminated by WUSFterm)
-                                            WUSortField *filters,   // NULL or list of fields to folteron (terminated by WUSFterm)
-                                            const void *filterbuf,  // (appended) string values for filters
-                                            unsigned startoffset,
-                                            unsigned maxnum,
-                                            const char *queryowner,
-                                            __int64 *cachehint,
-                                            ISecManager *secmgr,
-                                            ISecUser *secuser,
-                                            unsigned *total)
+IConstWorkUnitIterator * CWorkUnitFactory::getWorkUnitsByState(WUState state)
 {
-    class CWorkUnitsPager : public CSimpleInterface, implements IElementsPager
-    {
-        StringAttr xPath;
-        StringAttr sortOrder;
-        StringAttr nameFilterLo;
-        StringAttr nameFilterHi;
-        StringArray unknownAttributes;
-
-    public:
-        IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-
-        CWorkUnitsPager(const char* _xPath, const char *_sortOrder, const char* _nameFilterLo, const char* _nameFilterHi, StringArray& _unknownAttributes)
-            : xPath(_xPath), sortOrder(_sortOrder), nameFilterLo(_nameFilterLo), nameFilterHi(_nameFilterHi)
-        {
-            ForEachItemIn(x, _unknownAttributes)
-                unknownAttributes.append(_unknownAttributes.item(x));
-        }
-        virtual IRemoteConnection* getElements(IArrayOf<IPropertyTree> &elements)
-        {
-            Owned<IRemoteConnection> conn = querySDS().connect("WorkUnits", myProcessSession(), 0, SDS_LOCK_TIMEOUT);
-            if (!conn)
-                return NULL;
-            Owned<IPropertyTreeIterator> iter = conn->getElements(xPath);
-            if (!iter)
-                return NULL;
-            sortElements(iter, sortOrder.get(), nameFilterLo.get(), nameFilterHi.get(), unknownAttributes, elements);
-            return conn.getClear();
-        }
-    };
-    class CScopeChecker : public CSimpleInterface, implements ISortedElementsTreeFilter
-    {
-        UniqueScopes done;
-        ISecManager *secmgr;
-        ISecUser *secuser;
-        CriticalSection crit;
-    public:
-        IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-
-        CScopeChecker(ISecManager *_secmgr,ISecUser *_secuser)
-        {
-            secmgr = _secmgr;
-            secuser = _secuser;
-        }
-        bool isOK(IPropertyTree &tree)
-        {
-            const char *scopename = tree.queryProp("@scope");
-            if (!scopename||!*scopename)
-                return true;
-
-            {
-                CriticalBlock block(crit);
-                const bool *b = done.getValue(scopename);
-                if (b)
-                    return *b;
-            }
-            bool ret = checkWuScopeSecAccess(scopename,*secmgr,secuser,SecAccess_Read,"iterating",false,false);
-            {
-                // conceivably could have already been checked and added, but ok.
-                CriticalBlock block(crit);
-                done.setValue(scopename,ret);
-            }
-            return ret;
-        }
-    };
-    Owned<ISortedElementsTreeFilter> sc = new CScopeChecker(secmgr,secuser);
-    StringBuffer query;
-    StringBuffer so;
-    StringAttr namefilter("*");
-    StringAttr namefilterlo;
-    StringAttr namefilterhi;
-    StringArray unknownAttributes;
-    if (filters) {
-        const char *fv = (const char *)filterbuf;
-        for (unsigned i=0;filters[i]!=WUSFterm;i++) {
-            int fmt = filters[i];
-            int subfmt = (fmt&0xff);
-            if (subfmt==WUSFwuid)
-                namefilterlo.set(fv);
-            else if (subfmt==WUSFwuidhigh)
-                namefilterhi.set(fv);
-            else if (subfmt==WUSFwildwuid)
-                namefilter.set(fv);
-            else if (subfmt==WUSFcustom)
-                query.append("[").append(fv).append("]");
-            else if (!fv || !*fv)
-                unknownAttributes.append(getEnumText(subfmt,workunitSortFields));
-            else {
-                query.append('[').append(getEnumText(subfmt,workunitSortFields)).append('=');
-                if (fmt&WUSFnocase)
-                    query.append('?');
-                if (fmt&WUSFwild)
-                    query.append('~');
-                query.append('"').append(fv).append("\"]");
-            }
-            fv = fv + strlen(fv)+1;
-        }
-    }
-    query.insert(0, namefilter.get());
-    if (sortorder) {
-        for (unsigned i=0;sortorder[i]!=WUSFterm;i++) {
-            if (so.length())
-                so.append(',');
-            int fmt = sortorder[i];
-            if (fmt&WUSFreverse)
-                so.append('-');
-            if (fmt&WUSFnocase)
-                so.append('?');
-            if (fmt&WUSFnumeric)
-                so.append('#');
-            so.append(getEnumText(fmt&0xff,workunitSortFields));
-        }
-    }
-    IArrayOf<IPropertyTree> results;
-    Owned<IElementsPager> elementsPager = new CWorkUnitsPager(query.str(), so.length()?so.str():NULL, namefilterlo.get(), namefilterhi.get(), unknownAttributes);
-    Owned<IRemoteConnection> conn=getElementsPaged(elementsPager,startoffset,maxnum,secmgr?sc:NULL,queryowner,cachehint,results,total);
-    return new CConstWUArrayIterator(results);
+    return secGetWorkUnitsByState(state, NULL, NULL);
+}
+IConstWorkUnitIterator * CWorkUnitFactory::getWorkUnitsByECL(const char * ECL)
+{
+    return secGetWorkUnitsByECL(ECL, NULL, NULL);
+}
+IConstWorkUnitIterator * CWorkUnitFactory::getWorkUnitsByCluster(const char * cluster)
+{
+    return secGetWorkUnitsByCluster(cluster, NULL, NULL);
 }
-
 
 IConstWorkUnitIterator* CWorkUnitFactory::getWorkUnitsSorted( WUSortField *sortorder, // list of fields to sort by (terminated by WUSFterm)
                                             WUSortField *filters,   // NULL or list of fields to filter on (terminated by WUSFterm)
@@ -2677,15 +2475,6 @@ IConstQuerySetQueryIterator* CWorkUnitFactory::getQuerySetQueriesSorted( WUQuery
     return new CConstQuerySetQueryIterator(results);
 }
 
-unsigned CWorkUnitFactory::numWorkUnits()
-{
-    Owned<IRemoteConnection> conn = sdsManager->connect("/WorkUnits", session, 0, SDS_LOCK_TIMEOUT);
-    if (!conn)
-        return 0;
-    IPropertyTree *root = conn->queryRoot();
-    return root->numChildren();
-}
-
 bool CWorkUnitFactory::isAborting(const char *wuid) const
 {
     VStringBuffer apath("/WorkUnitAborts/%s", wuid);
@@ -2708,7 +2497,7 @@ void CWorkUnitFactory::clearAborting(const char *wuid)
     VStringBuffer apath("/WorkUnitAborts/%s", wuid);
     try
     {
-        Owned<IRemoteConnection> acon = sdsManager->connect(apath.str(), session, RTM_LOCK_WRITE|RTM_LOCK_SUB, SDS_LOCK_TIMEOUT);
+        Owned<IRemoteConnection> acon = querySDS().connect(apath.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_LOCK_SUB, SDS_LOCK_TIMEOUT);
         if (acon)
             acon->close(true);
     }
@@ -2752,13 +2541,257 @@ public:
     IMPLEMENT_IINTERFACE_USING(CWorkUnitFactory);
     CDaliWorkUnitFactory()
     {
+        // Assumes dali client configuration has already been done
+        sdsManager = &querySDS();
+        session = myProcessSession();
         addShutdownHook(*this);
     }
     ~CDaliWorkUnitFactory()
     {
         removeShutdownHook(*this);
     }
+
+    virtual CLocalWorkUnit *_createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
+    {
+        StringBuffer wuRoot;
+        getXPath(wuRoot, wuid);
+        IRemoteConnection *conn;
+        conn = sdsManager->connect(wuRoot.str(), session, RTM_LOCK_WRITE|RTM_CREATE_UNIQUE, SDS_LOCK_TIMEOUT);
+        conn->queryRoot()->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
+        conn->queryRoot()->setPropInt("@wuidVersion", WUID_VERSION);
+        return new CDaliWorkUnit(conn, secmgr, secuser);
+    }
+
+    virtual CLocalWorkUnit* _openWorkUnit(const char *wuid, bool lock, ISecManager *secmgr, ISecUser *secuser)
+    {
+        StringBuffer wuRoot;
+        getXPath(wuRoot, wuid);
+        IRemoteConnection* conn = sdsManager->connect(wuRoot.str(), session, lock ? RTM_LOCK_READ|RTM_LOCK_SUB : 0, SDS_LOCK_TIMEOUT);
+        if (conn)
+            return new CDaliWorkUnit(conn, secmgr, secuser);
+        else
+            return NULL;
+    }
+
+    virtual CLocalWorkUnit* _updateWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
+    {
+        StringBuffer wuRoot;
+        getXPath(wuRoot, wuid);
+        IRemoteConnection* conn = sdsManager->connect(wuRoot.str(), session, RTM_LOCK_WRITE|RTM_LOCK_SUB, SDS_LOCK_TIMEOUT);
+        if (conn)
+            return new CDaliWorkUnit(conn, secmgr, secuser);
+        else
+            return NULL;
+    }
+
+    virtual IWorkUnit* getGlobalWorkUnit()
+    {
+        StringBuffer wuRoot;
+        getXPath(wuRoot, GLOBAL_WORKUNIT);
+        IRemoteConnection* conn = sdsManager->connect(wuRoot.str(), session, RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
+        conn->queryRoot()->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
+        Owned<CLocalWorkUnit> cw = new CDaliWorkUnit(conn, (ISecManager *) NULL, NULL);
+        return &cw->lockRemote(false);
+    }
+
+    virtual IConstWorkUnitIterator* secGetWorkUnitsByOwner(const char * owner, ISecManager *secmgr, ISecUser *secuser)
+    {
+        StringBuffer path("*");
+        if (owner && *owner)
+            path.append("[@submitID=\"").append(owner).append("\"]");
+        return _getWorkUnitsByXPath(path.str(), secmgr, secuser);
+    }
+    IConstWorkUnitIterator* secGetWorkUnitsByState(WUState state, ISecManager *secmgr, ISecUser *secuser)
+    {
+        StringBuffer path("*");
+        path.append("[@state=\"").append(getEnumText(state, states)).append("\"]");
+        return _getWorkUnitsByXPath(path.str(), secmgr, secuser);
+    }
+    IConstWorkUnitIterator* secGetWorkUnitsByECL(const char* ecl, ISecManager *secmgr, ISecUser *secuser)
+    {
+        StringBuffer path("*");
+        if (ecl && *ecl)
+            path.append("[Query/Text=~\"*").append(ecl).append("*\"]");
+        return _getWorkUnitsByXPath(path.str(), secmgr, secuser);
+    }
+    IConstWorkUnitIterator* secGetWorkUnitsByCluster(const char* cluster, ISecManager *secmgr, ISecUser *secuser)
+    {
+        StringBuffer path("*");
+        if (cluster && *cluster)
+            path.append("[@clusterName=\"").append(cluster).append("\"]");
+        return _getWorkUnitsByXPath(path.str(), secmgr, secuser);
+    }
+
+    IConstWorkUnitIterator* getWorkUnitsByXPath(const char *xpath)
+    {
+        // NOTE - this is deprecated - we want to get rid of it (daliadmin MAY be allowed to use it, but nothing else should)
+        return _getWorkUnitsByXPath(xpath, NULL, NULL);
+    }
+
+    IConstWorkUnitIterator* secGetWorkUnitsByXPath(const char *xpath, ISecManager *secmgr, ISecUser *secuser)
+    {
+        // NOTE - this is deprecated - we want to get rid of it (daliadmin MAY be allowed to use it, but nothing else should)
+        return _getWorkUnitsByXPath(xpath, secmgr, secuser);
+    }
+
     virtual void clientShutdown();
+
+    virtual unsigned numWorkUnits()
+    {
+        Owned<IRemoteConnection> conn = sdsManager->connect("/WorkUnits", session, 0, SDS_LOCK_TIMEOUT);
+        if (!conn)
+            return 0;
+        IPropertyTree *root = conn->queryRoot();
+        return root->numChildren();
+    }
+
+    IConstWorkUnitIterator* secGetWorkUnitsSorted( WUSortField *sortorder, // list of fields to sort by (terminated by WUSFterm)
+                                                WUSortField *filters,   // NULL or list of fields to folteron (terminated by WUSFterm)
+                                                const void *filterbuf,  // (appended) string values for filters
+                                                unsigned startoffset,
+                                                unsigned maxnum,
+                                                const char *queryowner,
+                                                __int64 *cachehint,
+                                                ISecManager *secmgr,
+                                                ISecUser *secuser,
+                                                unsigned *total)
+    {
+        class CWorkUnitsPager : public CSimpleInterface, implements IElementsPager
+        {
+            StringAttr xPath;
+            StringAttr sortOrder;
+            StringAttr nameFilterLo;
+            StringAttr nameFilterHi;
+            StringArray unknownAttributes;
+
+        public:
+            IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
+
+            CWorkUnitsPager(const char* _xPath, const char *_sortOrder, const char* _nameFilterLo, const char* _nameFilterHi, StringArray& _unknownAttributes)
+                : xPath(_xPath), sortOrder(_sortOrder), nameFilterLo(_nameFilterLo), nameFilterHi(_nameFilterHi)
+            {
+                ForEachItemIn(x, _unknownAttributes)
+                    unknownAttributes.append(_unknownAttributes.item(x));
+            }
+            virtual IRemoteConnection* getElements(IArrayOf<IPropertyTree> &elements)
+            {
+                Owned<IRemoteConnection> conn = querySDS().connect("WorkUnits", myProcessSession(), 0, SDS_LOCK_TIMEOUT);
+                if (!conn)
+                    return NULL;
+                Owned<IPropertyTreeIterator> iter = conn->getElements(xPath);
+                if (!iter)
+                    return NULL;
+                sortElements(iter, sortOrder.get(), nameFilterLo.get(), nameFilterHi.get(), unknownAttributes, elements);
+                return conn.getClear();
+            }
+        };
+        class CScopeChecker : public CSimpleInterface, implements ISortedElementsTreeFilter
+        {
+            UniqueScopes done;
+            ISecManager *secmgr;
+            ISecUser *secuser;
+            CriticalSection crit;
+        public:
+            IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
+
+            CScopeChecker(ISecManager *_secmgr,ISecUser *_secuser)
+            {
+                secmgr = _secmgr;
+                secuser = _secuser;
+            }
+            bool isOK(IPropertyTree &tree)
+            {
+                const char *scopename = tree.queryProp("@scope");
+                if (!scopename||!*scopename)
+                    return true;
+
+                {
+                    CriticalBlock block(crit);
+                    const bool *b = done.getValue(scopename);
+                    if (b)
+                        return *b;
+                }
+                bool ret = checkWuScopeSecAccess(scopename,*secmgr,secuser,SecAccess_Read,"iterating",false,false);
+                {
+                    // conceivably could have already been checked and added, but ok.
+                    CriticalBlock block(crit);
+                    done.setValue(scopename,ret);
+                }
+                return ret;
+            }
+        };
+        Owned<ISortedElementsTreeFilter> sc = new CScopeChecker(secmgr,secuser);
+        StringBuffer query;
+        StringBuffer so;
+        StringAttr namefilter("*");
+        StringAttr namefilterlo;
+        StringAttr namefilterhi;
+        StringArray unknownAttributes;
+        if (filters) {
+            const char *fv = (const char *)filterbuf;
+            for (unsigned i=0;filters[i]!=WUSFterm;i++) {
+                int fmt = filters[i];
+                int subfmt = (fmt&0xff);
+                if (subfmt==WUSFwuid)
+                    namefilterlo.set(fv);
+                else if (subfmt==WUSFwuidhigh)
+                    namefilterhi.set(fv);
+                else if (subfmt==WUSFwildwuid)
+                    namefilter.set(fv);
+                else if (subfmt==WUSFcustom)
+                    query.append("[").append(fv).append("]");
+                else if (!fv || !*fv)
+                    unknownAttributes.append(getEnumText(subfmt,workunitSortFields));
+                else {
+                    query.append('[').append(getEnumText(subfmt,workunitSortFields)).append('=');
+                    if (fmt&WUSFnocase)
+                        query.append('?');
+                    if (fmt&WUSFwild)
+                        query.append('~');
+                    query.append('"').append(fv).append("\"]");
+                }
+                fv = fv + strlen(fv)+1;
+            }
+        }
+        query.insert(0, namefilter.get());
+        if (sortorder) {
+            for (unsigned i=0;sortorder[i]!=WUSFterm;i++) {
+                if (so.length())
+                    so.append(',');
+                int fmt = sortorder[i];
+                if (fmt&WUSFreverse)
+                    so.append('-');
+                if (fmt&WUSFnocase)
+                    so.append('?');
+                if (fmt&WUSFnumeric)
+                    so.append('#');
+                so.append(getEnumText(fmt&0xff,workunitSortFields));
+            }
+        }
+        IArrayOf<IPropertyTree> results;
+        Owned<IElementsPager> elementsPager = new CWorkUnitsPager(query.str(), so.length()?so.str():NULL, namefilterlo.get(), namefilterhi.get(), unknownAttributes);
+        Owned<IRemoteConnection> conn=getElementsPaged(elementsPager,startoffset,maxnum,secmgr?sc:NULL,queryowner,cachehint,results,total);
+        return new CConstWUArrayIterator(results);
+    }
+
+protected:
+    IConstWorkUnitIterator * _getWorkUnitsByXPath(const char *xpath, ISecManager *secmgr, ISecUser *secuser)
+    {
+        Owned<IRemoteConnection> conn = sdsManager->connect("/WorkUnits", session, 0, SDS_LOCK_TIMEOUT);
+        if (conn)
+        {
+            CDaliVersion serverVersionNeeded("3.2");
+            Owned<IPropertyTreeIterator> iter(queryDaliServerVersion().compare(serverVersionNeeded) < 0 ?
+                conn->queryRoot()->getElements(xpath) :
+                conn->getElements(xpath));
+            return new CConstWUIterator(iter, secmgr, secuser);
+        }
+        else
+            return NULL;
+    }
+
+    ISDSManager *sdsManager;
+    SessionId session;
 };
 
 static Owned<CDaliWorkUnitFactory> factory;
@@ -2800,13 +2833,9 @@ public:
         checkWuScopeSecAccess(user, *secMgr.get(), secUser.get(), SecAccess_Write, "Create", true, true);
         return baseFactory->secCreateWorkUnit(app, user, secMgr.get(), secUser.get());
     }
-    virtual bool deleteWorkUnitEx(const char * wuid)
-    {
-        return baseFactory->secDeleteWorkUnit(wuid, secMgr.get(), secUser.get(), true);
-    }
     virtual bool deleteWorkUnit(const char * wuid)
     {
-        return baseFactory->secDeleteWorkUnit(wuid, secMgr.get(), secUser.get(), false);
+        return baseFactory->secDeleteWorkUnit(wuid, secMgr.get(), secUser.get());
     }
     virtual IConstWorkUnit* openWorkUnit(const char *wuid, bool lock)
     {
@@ -2825,22 +2854,20 @@ public:
     //make cached workunits a non secure pass through for now.
     virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner)
     {
-        StringBuffer path("*");
-        if (owner && *owner)
-            path.append("[@submitID=\"").append(owner).append("\"]");
-        return baseFactory->secGetWorkUnitsByXPath(path.str(), secMgr.get(), secUser.get());
+        return baseFactory->secGetWorkUnitsByOwner(owner, secMgr.get(), secUser.get());
+    }
+    virtual IConstWorkUnitIterator * getWorkUnitsByState(WUState state)
+    {
+        return baseFactory->secGetWorkUnitsByState(state, secMgr.get(), secUser.get());
     }
-    virtual IConstWorkUnitIterator * getWorkUnitsByState(WUState state);
     virtual IConstWorkUnitIterator * getWorkUnitsByECL(const char* ecl)
     {
-        // MORE - why no security?
-        return baseFactory->getWorkUnitsByECL(ecl);
+        return baseFactory->secGetWorkUnitsByECL(ecl, secMgr.get(), secUser.get());
     }
 
     virtual IConstWorkUnitIterator * getWorkUnitsByCluster(const char* cluster)
     {   
-        // MORE - why no security?
-        return baseFactory->getWorkUnitsByCluster(cluster);
+        return baseFactory->secGetWorkUnitsByCluster(cluster, secMgr.get(), secUser.get());
     }
 
     virtual IConstWorkUnitIterator * getWorkUnitsByXPath(const char * xpath)
@@ -3730,19 +3757,6 @@ WUPriorityClass CLocalWorkUnit::getPriority() const
     return (WUPriorityClass) getEnum(p, "@priorityClass", priorityClasses);
 }
 
-IConstWorkUnitIterator * CWorkUnitFactory::getWorkUnitsByState(WUState state)
-{
-    StringBuffer path("*");
-    path.append("[@state=\"").append(getEnumText(state, states)).append("\"]");
-    return getWorkUnitsByXPath(path.str());
-}
-IConstWorkUnitIterator * CSecureWorkUnitFactory::getWorkUnitsByState(WUState state)
-{
-    StringBuffer path("*");
-    path.append("[@state=\"").append(getEnumText(state, states)).append("\"]");
-    return factory->secGetWorkUnitsByXPath(path.str(), secMgr.get(), secUser.get());
-}
-
 void CLocalWorkUnit::setState(WUState value) 
 {
     if (value==WUStateAborted || value==WUStatePaused || value==WUStateCompleted || value==WUStateFailed || value==WUStateSubmitted || value==WUStateWait)
@@ -5486,6 +5500,11 @@ IWULibrary* CLocalWorkUnit::updateLibraryByName(const char *qname)
     return q;
 }
 
+void CLocalWorkUnit::unsubscribe()
+{
+    // Only overriding versions need to do anything
+}
+
 void CLocalWorkUnit::loadExceptions() const
 {
     CriticalBlock block(crit);

+ 0 - 1
common/workunit/workunit.hpp

@@ -1276,7 +1276,6 @@ interface IWorkUnitFactory : extends IInterface
     virtual unsigned numWorkUnits() = 0;
     virtual unsigned numWorkUnitsFiltered(WUSortField * filters, const void * filterbuf) = 0;
     virtual void descheduleAllWorkUnits() = 0;
-    virtual bool deleteWorkUnitEx(const char * wuid) = 0;
     virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset) = 0;
     virtual bool isAborting(const char *wuid) const = 0;
     virtual void clearAborting(const char *wuid) = 0;

+ 20 - 13
common/workunit/workunit.ipp

@@ -569,38 +569,42 @@ public:
     CWorkUnitFactory();
     ~CWorkUnitFactory();
 
-    // interface IWorkUnitFactory
+    // interface IWorkUnitFactory - some are left for derived classes
 
     virtual IWorkUnit * createWorkUnit(const char * app, const char * user);
     virtual bool deleteWorkUnit(const char * wuid);
     virtual IConstWorkUnit * openWorkUnit(const char * wuid, bool lock);
-    virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner);
     virtual IWorkUnit * updateWorkUnit(const char * wuid);
     virtual int setTracingLevel(int newlevel);
     virtual IWorkUnit * createNamedWorkUnit(const char * wuid, const char * app, const char * user);
-    virtual IWorkUnit * getGlobalWorkUnit();
+    virtual IWorkUnit * getGlobalWorkUnit() = 0;
+    virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner);
     virtual IConstWorkUnitIterator * getWorkUnitsByState(WUState state);
     virtual IConstWorkUnitIterator * getWorkUnitsByECL(const char * ecl);
     virtual IConstWorkUnitIterator * getWorkUnitsByCluster(const char * cluster);
-    virtual IConstWorkUnitIterator * getWorkUnitsByXPath(const char * xpath);
+    virtual IConstWorkUnitIterator * getWorkUnitsByXPath(const char * xpath) = 0;  // deprecated
     virtual IConstWorkUnitIterator * getWorkUnitsSorted(WUSortField * sortorder, WUSortField * filters, const void * filterbuf, unsigned startoffset, unsigned maxnum, const char * queryowner, __int64 * cachehint, unsigned *total);
-    virtual unsigned numWorkUnits();
+    virtual unsigned numWorkUnits() = 0;
     virtual unsigned numWorkUnitsFiltered(WUSortField * filters, const void * filterbuf);
     virtual void descheduleAllWorkUnits();
-    virtual bool deleteWorkUnitEx(const char * wuid);
     virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset);
     virtual bool isAborting(const char *wuid) const;
     virtual void clearAborting(const char *wuid);
 
     // Secure variants, mostly used from the secure factory wrapper
 
-    IWorkUnit* secCreateWorkUnit(const char *app, const char *user, ISecManager *secmgr, ISecUser *secuser);
-    bool secDeleteWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser, bool raiseexceptions);
+    virtual IWorkUnit* secCreateWorkUnit(const char *app, const char *user, ISecManager *secmgr, ISecUser *secuser);
+    bool secDeleteWorkUnit(const char * wuid, ISecManager *secmgr, ISecUser *secuser);
     IConstWorkUnit* secOpenWorkUnit(const char *wuid, bool lock, ISecManager *secmgr, ISecUser *secuser);
     IWorkUnit* secCreateNamedWorkUnit(const char *wuid, const char *app, const char *user, ISecManager *secmgr, ISecUser *secuser);
     IWorkUnit* secUpdateWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser);
-    IConstWorkUnitIterator* secGetWorkUnitsByXPath(const char *xpath, ISecManager *secmgr, ISecUser *secuser);
-    IConstWorkUnitIterator* secGetWorkUnitsSorted(WUSortField *sortorder, // list of fields to sort by (terminated by WUSFterm)
+
+    virtual IConstWorkUnitIterator* secGetWorkUnitsByXPath(const char * xpath, ISecManager *secmgr, ISecUser *secuser) = 0;  // deprecated
+    virtual IConstWorkUnitIterator* secGetWorkUnitsByOwner(const char * owner, ISecManager *secmgr, ISecUser *secuser) = 0;
+    virtual IConstWorkUnitIterator* secGetWorkUnitsByState(WUState state, ISecManager *secmgr, ISecUser *secuser) = 0;
+    virtual IConstWorkUnitIterator* secGetWorkUnitsByECL(const char * ecl, ISecManager *secmgr, ISecUser *secuser) = 0;
+    virtual IConstWorkUnitIterator* secGetWorkUnitsByCluster(const char * cluster, ISecManager *secmgr, ISecUser *secuser) = 0;
+    virtual IConstWorkUnitIterator* secGetWorkUnitsSorted(WUSortField *sortorder, // list of fields to sort by (terminated by WUSFterm)
                                                 WUSortField *filters,   // NULL or list of fields to filter on (terminated by WUSFterm)
                                                 const void *filterbuf,  // (appended) string values for filters
                                                 unsigned startoffset,
@@ -609,7 +613,7 @@ public:
                                                 __int64 *cachehint,
                                                 ISecManager *secmgr,
                                                 ISecUser *secuser,
-                                                unsigned *total);
+                                                unsigned *total) = 0;
     unsigned secNumWorkUnitsFiltered(WUSortField *filters, const void *filterbuf, ISecManager *secmgr, ISecUser *secuser);
 
     // Misc other stuff called from CLocalWorkUnit class
@@ -618,7 +622,10 @@ public:
     void asyncRemoveFile(const char * ip, const char * name);
 
 protected:
-    ISDSManager *sdsManager;
-    SessionId session;
+    // These need to be implemented by the derived classes
+    virtual CLocalWorkUnit* _createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser) = 0;
+    virtual CLocalWorkUnit* _openWorkUnit(const char *wuid, bool lock, ISecManager *secmgr, ISecUser *secuser) = 0;  // for read access
+    virtual CLocalWorkUnit* _updateWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser) = 0;  // for write access
+
 };
 #endif

+ 0 - 1
plugins/cassandra/cassandraembed.cpp

@@ -3054,7 +3054,6 @@ public:
     virtual unsigned numWorkUnits() { UNIMPLEMENTED; }
     virtual unsigned numWorkUnitsFiltered(WUSortField * filters, const void * filterbuf) { UNIMPLEMENTED; }
     virtual void descheduleAllWorkUnits() { UNIMPLEMENTED; }
-    virtual bool deleteWorkUnitEx(const char * wuid) { UNIMPLEMENTED; }
     virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset) { UNIMPLEMENTED; }
 private:
     CassandraCluster cluster;