浏览代码

HPCC-12311 Revise based on review

1) changed CJobQueueBase to implement IJobQueueConst;
2) renamed getLastDequeuedInfo to doGetLastDequeuedInfo(qd, );
3) defined getLastDequeuedInfo in CJobQueueConst to explicitly
pass in qdata to call to doGetLastDequeuedInfo
4) moved activeq to CJobQueue and use in overloaded definition
of getLastDequeuedInfo in call to doGetLastDequeuedInfo
5) moved sessionid to CJobQueue;
6) added queryClientRootIndex() to base and updated related calls;
7) added queryClientRootSession() to CJobQueue and updated related
calls;
8) replaced CJobQueueBase::findRank with doFindRank;
9) renamed getItem() to doGetItem() and defined a getItem() to
call doGetItem();
10) replaced 'CJobQueueBase::getHead' with doGetItem();
11) replaced 'CJobQueueBase::getItem' with doGetItem();
12) removed 'CJobQueueBase::' from CJobQueueBase::copyItemsImpl();
13) removed waiting(sQueueData &qd);
14) moved paused() and stopped() to IJobQueueConst.

Haven't made change for copyItemsAndState -- waiting for a reply.

Signed-off-by: wangkx <kevin.wang@lexisnexis.com>
wangkx 10 年之前
父节点
当前提交
136053e336
共有 3 个文件被更改,包括 167 次插入184 次删除
  1. 162 178
      common/workunit/wujobq.cpp
  2. 4 4
      common/workunit/wujobq.hpp
  3. 1 2
      esp/services/ws_smc/ws_smcService.cpp

+ 162 - 178
common/workunit/wujobq.cpp

@@ -298,7 +298,7 @@ struct sQueueData
     unsigned lastWaitEdition;
 };
 
-class CJobQueueBase: public CInterface
+class CJobQueueBase: public CInterface, implements IJobQueueConst
 {
     class cOrderedIterator
     {
@@ -387,10 +387,24 @@ class CJobQueueBase: public CInterface
             return *queuet[current];
         }
     };
+protected:
+    bool doGetLastDequeuedInfo(sQueueData *qd, StringAttr &wuid, CDateTime &enqueuedt, int &priority)
+    {
+        priority = 0;
+        if (!qd)
+            return false;
+        const char *w = qd->root->queryProp("@prevwuid");
+        if (!w||!*w)
+            return false;
+        wuid.set(w);
+        StringBuffer dts;
+        if (qd->root->getProp("@prevenqueuedt",dts))
+            enqueuedt.setString(dts.str());
+        priority = qd->root->getPropInt("@prevpriority");
+        return true;
+    }
 public:
     sQueueData *qdata;
-    sQueueData *activeq;
-    SessionId sessionid;
     Semaphore notifysem;
     CriticalSection crit;
 
@@ -416,8 +430,6 @@ public:
                 qdata = qd;
             last = qd;
         }
-        activeq = qdata;
-        sessionid = myProcessSession();
     };
     virtual ~CJobQueueBase()
     {
@@ -442,23 +454,10 @@ public:
         return path;
     }
 
-    IPropertyTree *queryClientRoot(sQueueData &qd,unsigned idx=(unsigned)-1)
+    IPropertyTree *queryClientRootIndex(sQueueData &qd, unsigned idx)
     {
-        StringBuffer path;
-        if (idx==(unsigned)-1)
-            path.appendf("Client[@session=\"%"I64F"d\"]",sessionid);
-        else
-            path.appendf("Client[%d]",idx+1);
-        IPropertyTree *ret = qd.root->queryPropTree(path.str());
-        if (!ret&&(idx==(unsigned)-1))
-        {
-            ret = createPTree("Client");
-            ret = qd.root->addPropTree("Client",ret);
-            ret->setPropInt64("@session",sessionid);
-            StringBuffer eps;
-            ret->setProp("@node",queryMyNode()->endpoint().getUrlStr(eps).str());
-        }
-        return ret;
+        VStringBuffer path("Client[%d]", idx+1);
+        return qd.root->queryPropTree(path);
     }
 
     bool itemOlder(IPropertyTree *qt1, IPropertyTree *qt2)
@@ -473,7 +472,7 @@ public:
         return (strcmp(d1s.str(),d2s.str())<0);
     }
 
-    IJobQueueItem *getItem(sQueueData &qd,unsigned idx)
+    IJobQueueItem *doGetItem(sQueueData &qd,unsigned idx)
     {
         if (idx==(unsigned)-1)
         {
@@ -489,6 +488,11 @@ public:
         return new CJobQueueItem(item);
     }
 
+    IJobQueueItem *getItem(sQueueData &qd,unsigned idx)
+    {
+        return doGetItem(qd, idx);
+    }
+
     IJobQueueItem *getHead(sQueueData &qd)
     {
         return getItem(qd,0);
@@ -535,7 +539,7 @@ public:
         return ret;
     }
 
-    void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state, StringBuffer& stateDetails)
+    virtual void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state, StringBuffer& stateDetails)
     {
         assertex(qdata);
         assertex(qdata->root);
@@ -567,14 +571,14 @@ public:
         return NULL;
     }
 
-    unsigned waiting()
+    virtual unsigned waiting()
     {
         unsigned ret = 0;
         ForEachQueue(qd)
         {
             for (unsigned i=0;;i++)
             {
-                IPropertyTree *croot = queryClientRoot(*qd,i);
+                IPropertyTree *croot = queryClientRootIndex(*qd,i);
                 if (!croot)
                     break;
                 ret += croot->getPropInt("@waiting");
@@ -583,7 +587,7 @@ public:
         return ret;
     }
 
-    unsigned findRank(const char *wuid)
+    virtual unsigned findRank(const char *wuid)
     {
         assertex(qdata);
         if (!qdata->next)
@@ -600,7 +604,7 @@ public:
         return (unsigned)-1;
     }
 
-    unsigned copyItems(CJobQueueContents &dest)
+    virtual unsigned copyItems(CJobQueueContents &dest)
     {
         assertex(qdata);
         if (!qdata->next)
@@ -615,7 +619,7 @@ public:
         return ret;
     }
 
-    IJobQueueItem *getItem(unsigned idx)
+    virtual IJobQueueItem *getItem(unsigned idx)
     {
         if (!qdata)
             return NULL;
@@ -640,7 +644,7 @@ public:
         return NULL;
     }
 
-    IJobQueueItem *getHead()
+    virtual IJobQueueItem *getHead()
     {
         if (!qdata)
             return NULL;
@@ -649,7 +653,7 @@ public:
         return getItem(0);
     }
 
-    IJobQueueItem *getTail()
+    virtual IJobQueueItem *getTail()
     {
         if (!qdata)
             return NULL;
@@ -658,7 +662,7 @@ public:
         return getItem((unsigned)-1);
     }
 
-    IJobQueueItem *find(const char *wuid)
+    virtual IJobQueueItem *find(const char *wuid)
     {
         if (!qdata)
             return NULL;
@@ -668,7 +672,79 @@ public:
         return find(*qd,wuid);
     }
 
-    unsigned ordinality()
+    virtual bool paused()
+    {
+        // true if all paused
+        ForEachQueue(qd)
+        {
+            if (qd->root)
+            {
+                const char *state = qd->root->queryProp("@state");
+                if (state&&(strcmp(state,"paused")!=0))
+                    return false;
+            }
+        }
+        return true;
+    }
+
+    virtual bool paused(StringBuffer& info)
+    {
+        // true if all paused
+        ForEachQueue(qd)
+        {
+            if (qd->root)
+            {
+                const char *state = qd->root->queryProp("@state");
+                if (state&&(strcmp(state,"paused")!=0))
+                    return false;
+                if (state&&!info.length())
+                {
+                    const char *stateDetails = qd->root->queryProp("@stateDetails");
+                    if (stateDetails && *stateDetails)
+                        info.set(stateDetails);
+                }
+            }
+        }
+        return true;
+    }
+
+    virtual bool stopped()
+    {
+        // true if all stopped
+        ForEachQueue(qd)
+        {
+            if (qd->root)
+            {
+                const char *state = qd->root->queryProp("@state");
+                if (state&&(strcmp(state,"stopped")!=0))
+                    return false;
+            }
+        }
+        return true;
+    }
+
+    virtual bool stopped(StringBuffer& info)
+    {
+        // true if all stopped
+        ForEachQueue(qd)
+        {
+            if (qd->root)
+            {
+                const char *state = qd->root->queryProp("@state");
+                if (state&&(strcmp(state,"stopped")!=0))
+                    return false;
+                if (state&&!info.length())
+                {
+                    const char *stateDetails = qd->root->queryProp("@stateDetails");
+                    if (stateDetails && *stateDetails)
+                        info.set(stateDetails);
+                }
+            }
+        }
+        return true;
+    }
+
+    virtual unsigned ordinality()
     {
         unsigned ret = 0;
         ForEachQueue(qd)
@@ -679,24 +755,13 @@ public:
         return ret;
     }
 
-    bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)
+    virtual bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)
     {
-        priority = 0;
-        if (!activeq)
-            return false;
-        const char *w = activeq->root->queryProp("@prevwuid");
-        if (!w||!*w)
-            return false;
-        wuid.set(w);
-        StringBuffer dts;
-        if (activeq->root->getProp("@prevenqueuedt",dts))
-            enqueuedt.setString(dts.str());
-        priority = activeq->root->getPropInt("@prevpriority");
-        return true;
+        return doGetLastDequeuedInfo(qdata, wuid, enqueuedt, priority);
     }
 
     //Similar to copyItemsAndState(), this method returns the state information for one queue.
-    void getState(StringBuffer& state, StringBuffer& stateDetails)
+    virtual void getState(StringBuffer& state, StringBuffer& stateDetails)
     {
         if (!qdata->root)
             return;
@@ -710,7 +775,7 @@ public:
     }
 };
 
-class CJobQueueConst: public CJobQueueBase, implements IJobQueueConst
+class CJobQueueConst: public CJobQueueBase
 {
     Owned<IPropertyTree> jobQueueSnapshot;
 
@@ -731,55 +796,13 @@ public:
                 throw MakeStringException(-1, "No job queue found for %s", qd->qname.get());
         }
     };
-    virtual unsigned ordinality()
-    {
-        return CJobQueueBase::ordinality();
-    }
-    virtual unsigned waiting()
-    {
-        return CJobQueueBase::waiting();
-    }
-    virtual IJobQueueItem *getItem(unsigned idx)
-    {
-        return CJobQueueBase::getItem(idx);
-    }
-    virtual IJobQueueItem *getHead()
-    {
-        return CJobQueueBase::getHead();
-    }
-    virtual IJobQueueItem *getTail()
-    {
-        return CJobQueueBase::getTail();
-    }
-    virtual IJobQueueItem *find(const char *wuid)
-    {
-        return CJobQueueBase::find(wuid);
-    }
-    virtual unsigned findRank(const char *wuid)
-    {
-        return CJobQueueBase::findRank(wuid);
-    }
-    virtual unsigned copyItems(CJobQueueContents &dest)
-    {
-        return CJobQueueBase::copyItems(dest);
-    }
-    virtual bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)
-    {
-        return CJobQueueBase::getLastDequeuedInfo(wuid, enqueuedt, priority);
-    }
-    virtual void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state, StringBuffer& stateDetails)
-    {
-        CJobQueueBase::copyItemsAndState(contents, state, stateDetails);
-    }
-    virtual void getState(StringBuffer& state, StringBuffer& stateDetails)
-    {
-        CJobQueueBase::getState(state, stateDetails);
-    }
 };
 
 class CJobQueue: public CJobQueueBase, implements IJobQueue
 {
 public:
+    sQueueData *activeq;
+    SessionId sessionid;
     unsigned locknest;
     bool writemode;
     bool connected;
@@ -809,6 +832,8 @@ public:
 
     CJobQueue(const char *_qname) : CJobQueueBase(_qname), subs(this)
     {
+        activeq = qdata;
+        sessionid = myProcessSession();
         validateitemsessions = false;
         writemode = false;
         locknest = 0;
@@ -1058,6 +1083,21 @@ public:
         }
     }
 
+    IPropertyTree *queryClientRootSession(sQueueData &qd)
+    {
+        VStringBuffer path("Client[@session=\"%"I64F"d\"]", sessionid);
+        IPropertyTree *ret = qd.root->queryPropTree(path.str());
+        if (!ret)
+        {
+            ret = createPTree("Client");
+            ret = qd.root->addPropTree("Client",ret);
+            ret->setPropInt64("@session",sessionid);
+            StringBuffer eps;
+            ret->setProp("@node",queryMyNode()->endpoint().getUrlStr(eps).str());
+        }
+        return ret;
+    }
+
     void connect(bool _validateitemsessions)
     {
         Cconnlockblock block(this,true);
@@ -1070,7 +1110,7 @@ public:
             unsigned waiting;
             unsigned count;
             getStats(*qd,connected,waiting,count); // clear any duff clients
-            IPropertyTree *croot = queryClientRoot(*qd);
+            IPropertyTree *croot = queryClientRootSession(*qd);
             croot->setPropInt64("@connected",croot->getPropInt64("@connected",0)+1);
         }
         connected = true;
@@ -1082,7 +1122,7 @@ public:
         if (connected) {
             dounsubscribe();
             ForEachQueue(qd) {
-                IPropertyTree *croot = queryClientRoot(*qd);
+                IPropertyTree *croot = queryClientRootSession(*qd);
                 croot->setPropInt64("@connected",croot->getPropInt64("@connected",0)-1);
             }
             connected = false;
@@ -1124,7 +1164,7 @@ public:
     void setWaiting(unsigned numqueues,sQueueData **queues, bool set)
     {
         for (unsigned i=0; i<numqueues; i++) {
-            IPropertyTree *croot = queryClientRoot(*queues[i]);
+            IPropertyTree *croot = queryClientRootSession(*queues[i]);
             croot->setPropInt64("@waiting",croot->getPropInt64("@waiting",0)+(set?1:-1));
         }
     }
@@ -1293,14 +1333,14 @@ public:
     void enqueueBefore(sQueueData &qd,IJobQueueItem *qitem,const char *wuid)
     {
         Cconnlockblock block(this,true);
-        placeonqueue(qd,qitem,CJobQueueBase::findRank(qd,wuid));
+        placeonqueue(qd,qitem,doFindRank(qd,wuid));
     }
 
 
     void enqueueAfter(sQueueData &qd,IJobQueueItem *qitem,const char *wuid)
     {
         Cconnlockblock block(this,true);
-        unsigned idx = CJobQueueBase::findRank(qd,wuid);
+        unsigned idx = doFindRank(qd,wuid);
         if (idx!=(unsigned)-1)
             idx++;
         placeonqueue(qd,qitem,idx);
@@ -1319,7 +1359,7 @@ public:
     void enqueueHead(sQueueData &qd,IJobQueueItem *qitem)
     {
         Cconnlockblock block(this,true);
-        Owned<IJobQueueItem> qi = CJobQueueBase::getHead(qd);
+        Owned<IJobQueueItem> qi = doGetItem(qd, 0);
         if (qi)
             enqueueBefore(qd,qitem,qi->queryWUID());
         else
@@ -1332,23 +1372,9 @@ public:
         return qd.root->getPropInt("@count");
     }
 
-
-    unsigned waiting(sQueueData &qd)
-    {
-        Cconnlockblock block(this,false);
-        unsigned ret = 0;
-        for (unsigned i=0;;i++) {
-            IPropertyTree *croot = queryClientRoot(qd,i);
-            if (!croot)
-                break;
-            ret += croot->getPropInt("@waiting");
-        }
-        return ret;
-    }
-
     IJobQueueItem *getTail(sQueueData &qd)
     {
-        return CJobQueueBase::getItem(qd,(unsigned)-1);
+        return doGetItem(qd,(unsigned)-1);
     }
 
     IJobQueueItem *loadItem(sQueueData &qd,IJobQueueItem *qi)
@@ -1400,7 +1426,7 @@ public:
     unsigned takeItems(sQueueData &qd,CJobQueueContents &dest)
     {
         Cconnlockblock block(this,true);
-        unsigned ret = CJobQueueBase::copyItemsImpl(qd,dest);
+        unsigned ret = copyItemsImpl(qd,dest);
         clear(qd);
         return ret;
     }
@@ -1611,7 +1637,7 @@ public:
         waiting = 0;
         unsigned i=0;
         loop {
-            IPropertyTree *croot = queryClientRoot(qd,i);
+            IPropertyTree *croot = queryClientRootIndex(qd,i);
             if (!croot)
                 break;
             if (!validSession(croot)) {
@@ -1699,7 +1725,7 @@ public:
         Cconnlockblock block(this,true);
         unsigned ret = 0;
         ForEachQueue(qd) {
-            ret += CJobQueueBase::copyItemsImpl(*qd,dest);
+            ret += copyItemsImpl(*qd,dest);
             clear(*qd);
         }
         return ret;
@@ -1879,37 +1905,6 @@ public:
             }
         }
     }
-    bool paused()
-    {
-        // true if all paused
-        Cconnlockblock block(this,false);
-        ForEachQueue(qd) {
-            if (qd->root) {
-                const char *state = qd->root->queryProp("@state");
-                if (state&&(strcmp(state,"paused")!=0))
-                    return false;
-            }
-        }
-        return true;
-    }
-    bool paused(StringBuffer& info)
-    {
-        // true if all paused
-        Cconnlockblock block(this,false);
-        ForEachQueue(qd) {
-            if (qd->root) {
-                const char *state = qd->root->queryProp("@state");
-                if (state&&(strcmp(state,"paused")!=0))
-                    return false;
-                if (state&&!info.length()) {
-                    const char *stateDetails = qd->root->queryProp("@stateDetails");
-                    if (stateDetails && *stateDetails)
-                        info.set(stateDetails);
-                }
-            }
-        }
-        return true;
-    }
     void stop()
     {
         Cconnlockblock block(this,true);
@@ -1929,37 +1924,6 @@ public:
             }
         }
     }
-    bool stopped()
-    {
-        // true if all stopped
-        Cconnlockblock block(this,false);
-        ForEachQueue(qd) {
-            if (qd->root) {
-                const char *state = qd->root->queryProp("@state");
-                if (state&&(strcmp(state,"stopped")!=0))
-                    return false;
-            }
-        }
-        return true;
-    }
-    bool stopped(StringBuffer& info)
-    {
-        // true if all stopped
-        Cconnlockblock block(this,false);
-        ForEachQueue(qd) {
-            if (qd->root) {
-                const char *state = qd->root->queryProp("@state");
-                if (state&&(strcmp(state,"stopped")!=0))
-                    return false;
-                if (state&&!info.length()) {
-                    const char *stateDetails = qd->root->queryProp("@stateDetails");
-                    if (stateDetails && *stateDetails)
-                        info.set(stateDetails);
-                }
-            }
-        }
-        return true;
-    }
 
     void resume()
     {
@@ -2023,6 +1987,26 @@ public:
         return NULL;
     }
 
+    virtual bool paused()
+    {
+        Cconnlockblock block(this,false);
+        return CJobQueueBase::paused();
+    }
+    virtual bool paused(StringBuffer& info)
+    {
+        Cconnlockblock block(this,false);
+        return CJobQueueBase::paused(info);
+    }
+    virtual bool stopped()
+    {
+        Cconnlockblock block(this,false);
+        return CJobQueueBase::stopped();
+    }
+    virtual bool stopped(StringBuffer& info)
+    {
+        Cconnlockblock block(this,false);
+        return CJobQueueBase::stopped(info);
+    }
     virtual unsigned ordinality()
     {
         Cconnlockblock block(this,false);
@@ -2066,7 +2050,7 @@ public:
     virtual bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)
     {
         Cconnlockblock block(this,false);
-        return CJobQueueBase::getLastDequeuedInfo(wuid, enqueuedt, priority);
+        return CJobQueueBase::doGetLastDequeuedInfo(activeq, wuid, enqueuedt, priority);
     }
     virtual void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state, StringBuffer& stateDetails)
     {

+ 4 - 4
common/workunit/wujobq.hpp

@@ -72,6 +72,10 @@ interface IJobQueueConst: extends IInterface
     virtual bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)=0;
     virtual void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state, StringBuffer& stateDetails)=0;
     virtual void getState(StringBuffer& state, StringBuffer& stateDetails)=0;
+    virtual bool paused()=0;    // true if paused
+    virtual bool paused(StringBuffer& info)=0;    // true if paused
+    virtual bool stopped()=0;   // true if stopped
+    virtual bool stopped(StringBuffer& info)=0;   // true if stopped
 };
 
 interface IJobQueue: extends IJobQueueConst
@@ -114,12 +118,8 @@ interface IJobQueue: extends IJobQueueConst
 // control:
     virtual void pause()=0;     // marks queue as paused - and subsequent dequeues block until resumed
     virtual void pause(const char *info)=0;     // marks queue as paused - and subsequent dequeues block until resumed
-    virtual bool paused()=0;    // true if paused
-    virtual bool paused(StringBuffer& info)=0;    // true if paused
     virtual void stop()=0;      // sets stopped flags - all current and subsequent dequeues return NULL
     virtual void stop(const char *info)=0;      // sets stopped flags - all current and subsequent dequeues return NULL
-    virtual bool stopped()=0;   // true if stopped
-    virtual bool stopped(StringBuffer& info)=0;   // true if stopped
     virtual void resume()=0;    // removes paused or stopped flag
     virtual void resume(const char *info)=0;    // removes paused or stopped flag
 

+ 1 - 2
esp/services/ws_smc/ws_smcService.cpp

@@ -250,8 +250,7 @@ void CActivityInfo::createActivityInfo()
     }
     catch(IException* e)
     {
-        StringBuffer eMsg;
-        ERRLOG("CActivityInfo::createActivityInfo: %s", e->errorMessage(eMsg).str());
+        EXCLOG(e, "CActivityInfo::createActivityInfo");
         e->Release();
     }