Selaa lähdekoodia

HPCC-12250 Allow pluggable interface to workunit creation

Use a lightweight implementation for iterator results

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 vuotta sitten
vanhempi
commit
d26d92a599

+ 149 - 95
common/workunit/workunit.cpp

@@ -1015,6 +1015,123 @@ template <>  struct CachedTags<CLocalWUAppValue, IConstWUAppValue>
     IArrayOf<IConstWUAppValue> tags;
 };
 
+//==========================================================================================
+
+struct mapEnums { int val; const char *str; };
+
+mapEnums states[] = {
+   { WUStateUnknown, "unknown" },
+   { WUStateCompiled, "compiled" },
+   { WUStateRunning, "running" },
+   { WUStateCompleted, "completed" },
+   { WUStateFailed, "failed" },
+   { WUStateArchived, "archived" },
+   { WUStateAborting, "aborting" },
+   { WUStateAborted, "aborted" },
+   { WUStateBlocked, "blocked" },
+   { WUStateSubmitted, "submitted" },
+   { WUStateScheduled, "scheduled" },
+   { WUStateCompiling, "compiling" },
+   { WUStateWait, "wait" },
+   { WUStateUploadingFiles, "uploading_files" },
+   { WUStateDebugPaused, "debugging" },
+   { WUStateDebugRunning, "debug_running" },
+   { WUStatePaused, "paused" },
+   { WUStateSize, NULL }
+};
+
+const char * getWorkunitStateStr(WUState state)
+{
+    return states[state].str; // MORE - should be using getEnumText
+}
+
+const char *getEnumText(int value, mapEnums *map)
+{
+    const char *defval = map->str;
+    while (map->str)
+    {
+        if (value==map->val)
+            return map->str;
+        map++;
+    }
+    assertex(!"Unexpected value in setEnum");
+    return defval;
+}
+
+void setEnum(IPropertyTree *p, const char *propname, int value, mapEnums *map)
+{
+    const char *defval = map->str;
+    while (map->str)
+    {
+        if (value==map->val)
+        {
+            p->setProp(propname, map->str);
+            return;
+        }
+        map++;
+    }
+    assertex(!"Unexpected value in setEnum");
+    p->setProp(propname, defval);
+}
+
+static int getEnum(const char *v, mapEnums *map)
+{
+    if (v)
+    {
+        while (map->str)
+        {
+            if (stricmp(v, map->str)==0)
+                return map->val;
+            map++;
+        }
+        assertex(!"Unexpected value in getEnum");
+    }
+    return 0;
+}
+
+static int getEnum(const IPropertyTree *p, const char *propname, mapEnums *map)
+{
+    return getEnum(p->queryProp(propname),map);
+}
+
+//==========================================================================================
+
+class CLightweightWorkunitInfo : public CInterfaceOf<IConstWorkUnitInfo>
+{
+public:
+    CLightweightWorkunitInfo(IPropertyTree &p)
+    {
+        wuid.set(p.queryName());
+        user.set(p.queryProp("@submitID"));
+        jobName.set(p.queryProp("@jobName"));
+        clusterName.set(p.queryProp("@clusterName"));
+        timeScheduled.set(p.queryProp("@timeScheduled"));
+        state = (WUState) getEnum(&p, "@state", states);
+        _isProtected = p.getPropBool("@protected", false);
+        _isAborting = false; // MORE - this one is tricky
+    }
+    virtual const char *queryWuid() const { return wuid.str(); }
+    virtual const char *queryUser() const { return user.str(); }
+    virtual const char *queryJobName() const { return jobName.str(); }
+    virtual const char *queryClusterName() const { return clusterName.str(); }
+    virtual WUState getState() const { return state; }
+    virtual const char *queryStateDesc() const { return getEnumText(getState(), states); }
+    virtual bool isProtected() const { return _isProtected; }
+// Not sure about these ones...
+    virtual bool aborting() const { return _isAborting; }
+    virtual IJlibDateTime & getTimeScheduled(IJlibDateTime & val) const
+    {
+        if (timeScheduled.length())
+            val.setGmtString(timeScheduled.str());
+        return val;
+    }
+
+protected:
+    StringAttr wuid, user, jobName, clusterName, timeScheduled;
+    WUState state;
+    bool _isProtected;
+    bool _isAborting;
+};
 
 class CLocalWorkUnit : public CInterface, implements IWorkUnit , implements IExtendedWUInterface
 {
@@ -2268,74 +2385,19 @@ extern WORKUNIT_API bool isSpecialResultSequence(unsigned sequence)
     }
 }
 
-struct mapEnums { int val; const char *str; };
-
-const char *getEnumText(int value, mapEnums *map) 
-{
-    const char *defval = map->str;
-    while (map->str)
-    {
-        if (value==map->val)
-            return map->str;
-        map++;
-    }
-    assertex(!"Unexpected value in setEnum");
-    return defval;
-}
-
-void setEnum(IPropertyTree *p, const char *propname, int value, mapEnums *map) 
-{
-    const char *defval = map->str;
-    while (map->str)
-    {
-        if (value==map->val)
-        {
-            p->setProp(propname, map->str);
-            return;
-        }
-        map++;
-    }
-    assertex(!"Unexpected value in setEnum");
-    p->setProp(propname, defval);
-}
-
-static int getEnum(const char *v, mapEnums *map) 
-{
-    if (v)
-    {
-        while (map->str)
-        {
-            if (stricmp(v, map->str)==0)
-                return map->val;
-            map++;
-        }
-        assertex(!"Unexpected value in getEnum");
-    }
-    return 0;
-}
-
-static int getEnum(const IPropertyTree *p, const char *propname, mapEnums *map)
-{
-    return getEnum(p->queryProp(propname),map);
-}
-
-//==========================================================================================
-
 class CConstWUArrayIterator : public CInterface, implements IConstWorkUnitIterator
 {
     IArrayOf<IPropertyTree> trees;
-    Owned<IConstWorkUnit> cur;
+    Owned<IConstWorkUnitInfo> cur;
     unsigned curTreeNum;
-    Linked<IWorkUnitFactory> factory;
 
     void setCurrent()
     {
-        cur.setown(factory->openWorkUnit(trees.item(curTreeNum).queryName(), false));
+        cur.setown(new CLightweightWorkunitInfo(trees.item(curTreeNum)));
     }
 public:
     IMPLEMENT_IINTERFACE;
-    CConstWUArrayIterator(IWorkUnitFactory *_factory, IArrayOf<IPropertyTree> &_trees)
-        : factory(_factory)
+    CConstWUArrayIterator(IArrayOf<IPropertyTree> &_trees)
     {
         ForEachItemIn(t, _trees)
             trees.append(*LINK(&_trees.item(t)));
@@ -2361,7 +2423,7 @@ public:
         ++curTreeNum;
         return true;
     }
-    IConstWorkUnit & query() { return *cur; }
+    IConstWorkUnitInfo & query() { return *cur; }
 };
 //==========================================================================================
 
@@ -2797,7 +2859,7 @@ public:
             Owned<IPropertyTreeIterator> iter(queryDaliServerVersion().compare(serverVersionNeeded) < 0 ? 
                 conn->queryRoot()->getElements(xpath) : 
                 conn->getElements(xpath));
-            return new CConstWUIterator(this, iter, secmgr, secuser);
+            return new CConstWUIterator(iter, secmgr, secuser);
         }
         else
             return NULL;
@@ -2929,7 +2991,7 @@ public:
         IArrayOf<IPropertyTree> results;
         Owned<IElementsPager> elementsPager = new CWorkUnitsPager(query.str(), so.length()?so.str():NULL, namefilterlo.get(), namefilterhi.get(), unknownAttributes);
         Owned<IRemoteConnection> conn=getElementsPaged(elementsPager,startoffset,maxnum,secmgr?sc:NULL,queryowner,cachehint,results,total);
-        return new CConstWUArrayIterator(this, results);
+        return new CConstWUArrayIterator(results);
     }
 
     
@@ -3119,6 +3181,23 @@ public:
         return root->numChildren();
     }
 
+    virtual bool isAborting(const char *wuid) const
+    {
+        VStringBuffer apath("/WorkUnitAborts/%s", wuid);
+        try
+        {
+            Owned<IRemoteConnection> acon = querySDS().connect(apath.str(), myProcessSession(), 0, SDS_LOCK_TIMEOUT);
+            if (acon)
+                return acon->queryRoot()->getPropInt(NULL) != 0;
+        }
+        catch (IException *E)
+        {
+            EXCLOG(E);
+            E->Release();
+        }
+        return false;
+    }
+
     virtual unsigned numWorkUnitsFiltered(WUSortField *filters,
                                         const void *filterbuf,
                                         ISecManager *secmgr, 
@@ -3164,14 +3243,13 @@ private:
     }
     class CConstWUIterator : public CInterface, implements IConstWorkUnitIterator
     {
-        Owned<IConstWorkUnit> cur;
-        Linked<IWorkUnitFactory> factory;
+        Owned<IConstWorkUnitInfo> cur;
         Linked<IPropertyTreeIterator> ptreeIter;
         Owned<ISecResourceList> scopes;
 
         void setCurrent()
         {
-            cur.setown(factory->openWorkUnit(ptreeIter->query().queryName(), false));
+            cur.setown(new CLightweightWorkunitInfo(ptreeIter->query()));
         }
         bool getNext() // scan for a workunit with permissions
         {
@@ -3195,8 +3273,8 @@ private:
         }
     public:
         IMPLEMENT_IINTERFACE;
-        CConstWUIterator(IWorkUnitFactory *_factory, IPropertyTreeIterator *_ptreeIter, ISecManager *secmgr=NULL, ISecUser *secuser=NULL)
-            : factory(_factory), ptreeIter(_ptreeIter)
+        CConstWUIterator(IPropertyTreeIterator *_ptreeIter, ISecManager *secmgr=NULL, ISecUser *secuser=NULL)
+            : ptreeIter(_ptreeIter)
         {
             UniqueScopes us;
             if (secmgr /* && secmgr->authTypeRequired(RT_WORKUNIT_SCOPE) tbd */)
@@ -3239,13 +3317,12 @@ private:
             }
             return getNext();
         }
-        IConstWorkUnit & query() { return *cur; }
+        IConstWorkUnitInfo & query() { return *cur; }
     };
     IRemoteConnection* connect(const char *xpath, unsigned flags)
     {
         return sdsManager->connect(xpath, session, flags, SDS_LOCK_TIMEOUT);
     }
-
 };
 
 static Owned<CWorkUnitFactory> factory;
@@ -3395,7 +3472,10 @@ public:
         return factory->numWorkUnitsFiltered(filters,filterbuf,secMgr.get(), secUser.get());
     }
 
-
+    virtual bool isAborting(const char *wuid) const
+    {
+        return factory->isAborting(wuid);
+    }
 private:
     Owned<CWorkUnitFactory> base_factory;
     Owned<ISecManager> secMgr;
@@ -4235,32 +4315,6 @@ WUPriorityClass CLocalWorkUnit::getPriority() const
     return (WUPriorityClass) getEnum(p, "@priorityClass", priorityClasses);
 }
 
-mapEnums states[] = {
-   { WUStateUnknown, "unknown" },
-   { WUStateCompiled, "compiled" },
-   { WUStateRunning, "running" },
-   { WUStateCompleted, "completed" },
-   { WUStateFailed, "failed" },
-   { WUStateArchived, "archived" },
-   { WUStateAborting, "aborting" },
-   { WUStateAborted, "aborted" },
-   { WUStateBlocked, "blocked" },
-   { WUStateSubmitted, "submitted" },
-   { WUStateScheduled, "scheduled" },
-   { WUStateCompiling, "compiling" },
-   { WUStateWait, "wait" },
-   { WUStateUploadingFiles, "uploading_files" },
-   { WUStateDebugPaused, "debugging" },
-   { WUStateDebugRunning, "debug_running" },
-   { WUStatePaused, "paused" },
-   { WUStateSize, NULL }
-};
-
-const char * getWorkunitStateStr(WUState state)
-{
-    return states[state].str;
-}
-
 IConstWorkUnitIterator * CWorkUnitFactory::getWorkUnitsByState(WUState state)
 {
     StringBuffer path("*");

+ 2 - 2
common/workunit/workunit.hpp

@@ -983,13 +983,12 @@ interface IConstWorkUnitInfo : extends IInterface
     virtual WUState getState() const = 0;
     virtual const char *queryStateDesc() const = 0;
     virtual bool isProtected() const = 0;
-// Not sure about these ones...
-    virtual bool aborting() const = 0;
     virtual IJlibDateTime & getTimeScheduled(IJlibDateTime & val) const = 0;
 };
 
 interface IConstWorkUnit : extends IConstWorkUnitInfo
 {
+    virtual bool aborting() const = 0;
     virtual void forceReload() = 0;
     virtual WUAction getAction() const = 0;
     virtual IStringVal& getActionEx(IStringVal & str) const = 0;
@@ -1279,6 +1278,7 @@ interface IWorkUnitFactory : extends IInterface
     virtual void descheduleAllWorkUnits() = 0;
     virtual bool deleteWorkUnitEx(const char * wuid) = 0;
     virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset) = 0;
+    virtual bool isAborting(const char *wuid) const = 0;
 };
 
 

+ 2 - 3
esp/services/ws_workunits/ws_workunitsHelpers.cpp

@@ -2753,18 +2753,17 @@ int WUSchedule::run()
                     try
                     {
                         IConstWorkUnitInfo & cw = itr->query();
-                        if (cw.aborting())
+                        if (factory->isAborting(cw.queryWuid()))
                         {
                             WorkunitUpdate wu(factory->updateWorkUnit(cw.queryWuid()));
                             wu->setState(WUStateAborted);
                             continue;
                         }
-
                         WsWuDateTime dt, now;
                         now.setNow();
                         cw.getTimeScheduled(dt);
                         if (now.compare(dt)>=0)
-                            runWorkUnit(cw.queryWuid());
+                            runWorkUnit(cw.queryWuid(), cw.queryClusterName());
                     }
                     catch(IException *e)
                     {