فهرست منبع

HPCC-12311 Change code based on review

The code changes are made to meet the review comments,
including new class CJobQueueBase/CJobQueueConst.

Signed-off-by: wangkx <kevin.wang@lexisnexis.com>
wangkx 10 سال پیش
والد
کامیت
95b9288448
4فایلهای تغییر یافته به همراه588 افزوده شده و 499 حذف شده
  1. 470 400
      common/workunit/wujobq.cpp
  2. 0 3
      common/workunit/wujobq.hpp
  3. 108 85
      esp/services/ws_smc/ws_smcService.cpp
  4. 10 11
      esp/services/ws_smc/ws_smcService.hpp

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 470 - 400
common/workunit/wujobq.cpp


+ 0 - 3
common/workunit/wujobq.hpp

@@ -71,9 +71,7 @@ interface IJobQueueConst: extends IInterface
     virtual unsigned copyItems(CJobQueueContents &dest)=0;  // takes a snapshot copy of the entire queue (returns number copied)
     virtual bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)=0;
     virtual void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state, StringBuffer& stateDetails)=0;
-
     virtual void getState(StringBuffer& state, StringBuffer& stateDetails)=0;
-    virtual void getWUIDs(StringArray& ids)=0;
 };
 
 interface IJobQueue: extends IJobQueueConst
@@ -142,7 +140,6 @@ interface IJobQueue: extends IJobQueueConst
 interface IJQSnapshot : extends IInterface
 {
     virtual IJobQueueConst *getJobQueue(const char *name)=0;
-    virtual bool isJQSnapshotValid(unsigned timeOutSeconds)=0;
 };
 
 extern WORKUNIT_API IJQSnapshot *createJQSnapshot();

+ 108 - 85
esp/services/ws_smc/ws_smcService.cpp

@@ -244,27 +244,36 @@ void CActivityInfo::createActivityInfo()
     Owned<IPropertyTree> envRoot= &env->getPTree();
     getEnvironmentClusterInfo(envRoot, clusters);
 
-    jobQueueSnapshot.setown(createJQSnapshot());
+    try
+    {
+        jobQueueSnapshot.setown(createJQSnapshot());
+    }
+    catch(IException* e)
+    {
+        StringBuffer eMsg;
+        ERRLOG("CActivityInfo::createActivityInfo: %s", e->errorMessage(eMsg).str());
+        e->Release();
+    }
 
     Owned<IRemoteConnection> connStatusServers = querySDS().connect("/Status/Servers",myProcessSession(),RTM_LOCK_READ,30000);
     if (!connStatusServers)
         throw MakeStringException(ECLWATCH_CANNOT_GET_STATUS_INFO, "Failed to get status server information.");
 
-    serverStatusRoot = connStatusServers->queryRoot();
+    IPropertyTree* serverStatusRoot = connStatusServers->queryRoot();
 
-    readTargetClusterInfo(clusters);
-    readActiveWUsAndQueuedWUs(envRoot);
+    readTargetClusterInfo(clusters, serverStatusRoot);
+    readActiveWUsAndQueuedWUs(envRoot, serverStatusRoot);
 
     timeCached.setNow();
 }
 
-void CActivityInfo::readTargetClusterInfo(CConstWUClusterInfoArray& clusters)
+void CActivityInfo::readTargetClusterInfo(CConstWUClusterInfoArray& clusters, IPropertyTree* serverStatusRoot)
 {
     ForEachItemIn(c, clusters)
     {
         IConstWUClusterInfo &cluster = clusters.item(c);
         Owned<CWsSMCTargetCluster> targetCluster = new CWsSMCTargetCluster();
-        readTargetClusterInfo(cluster, targetCluster);
+        readTargetClusterInfo(cluster, serverStatusRoot, targetCluster);
         if (cluster.getPlatform() == ThorLCRCluster)
             thorTargetClusters.append(*targetCluster.getClear());
         else if (cluster.getPlatform() == RoxieCluster)
@@ -274,7 +283,7 @@ void CActivityInfo::readTargetClusterInfo(CConstWUClusterInfoArray& clusters)
     }
 }
 
-void CActivityInfo::readTargetClusterInfo(IConstWUClusterInfo& cluster, CWsSMCTargetCluster* targetCluster)
+void CActivityInfo::readTargetClusterInfo(IConstWUClusterInfo& cluster, IPropertyTree* serverStatusRoot, CWsSMCTargetCluster* targetCluster)
 {
     SCMStringBuffer clusterName;
     cluster.getName(clusterName);
@@ -285,50 +294,68 @@ void CActivityInfo::readTargetClusterInfo(IConstWUClusterInfo& cluster, CWsSMCTa
     cluster.getAgentQueue(targetCluster->agentQueue.queueName);
 
     StringBuffer statusServerName;
-    CWsSMCQueue* jobQueue = NULL;
+    CWsSMCQueue* smcQueue = NULL;
     if (targetCluster->clusterType == ThorLCRCluster)
     {
         statusServerName.set(getStatusServerTypeName(WsSMCSSTThorLCRCluster));
-        jobQueue = &targetCluster->clusterQueue;
-        cluster.getThorQueue(jobQueue->queueName);
+        smcQueue = &targetCluster->clusterQueue;
+        cluster.getThorQueue(smcQueue->queueName);
     }
     else if (targetCluster->clusterType == RoxieCluster)
     {
         statusServerName.set(getStatusServerTypeName(WsSMCSSTRoxieCluster));
-        jobQueue = &targetCluster->agentQueue;
+        smcQueue = &targetCluster->agentQueue;
     }
     else
     {
         statusServerName.set(getStatusServerTypeName(WsSMCSSTHThorCluster));
-        jobQueue = &targetCluster->agentQueue;
+        smcQueue = &targetCluster->agentQueue;
     }
 
     targetCluster->statusServerName.set(statusServerName.str());
-    targetCluster->queueName.set(jobQueue->queueName.str());
+    targetCluster->queueName.set(smcQueue->queueName.str());
+
+    bool validQueue = readJobQueue(smcQueue->queueName.str(), targetCluster->queuedWUIDs, smcQueue->queueState, smcQueue->queueStateDetails);
+    if (validQueue && smcQueue->queueState.length())
+        targetCluster->queueStatus.set(smcQueue->queueState.str());
+
+    if (serverStatusRoot)
+    {
+        smcQueue->foundQueueInStatusServer = findQueueInStatusServer(serverStatusRoot, statusServerName.str(), targetCluster->queueName.get());
+        if (!smcQueue->foundQueueInStatusServer)
+            targetCluster->clusterStatusDetails.appendf("Cluster %s not attached; ", clusterName.str());
+    }
+}
+
+bool CActivityInfo::readJobQueue(const char* queueName, StringArray& wuids, StringBuffer& state, StringBuffer& stateDetails)
+{
+    if (!queueName || !*queueName)
+        return false;
 
     if (!jobQueueSnapshot)
-        WARNLOG("CActivityInfo::readTargetClusterInfo: jobQueueSnapshot not found.");
-    else
     {
-        Owned<IJobQueueConst> queue = jobQueueSnapshot->getJobQueue(jobQueue->queueName.str());
-        if (!queue)
-            WARNLOG("CActivityInfo::readTargetClusterInfo: failed to get info for Job queue %s", jobQueue->queueName.str());
-        else
-        {
-            queue->getState(jobQueue->queueState, jobQueue->queueStateDetails);
-            if (jobQueue->queueState.length())
-                targetCluster->queueStatus.set(jobQueue->queueState.str());
-        }
+        WARNLOG("CActivityInfo::readJobQueue: jobQueueSnapshot not found.");
+        return false;
     }
 
-    if (serverStatusRoot)
+    Owned<IJobQueueConst> jobQueue = jobQueueSnapshot->getJobQueue(queueName);
+    if (!jobQueue)
     {
-        jobQueue->foundQueueInStatusServer = findQueueInStatusServer(statusServerName.str(), targetCluster->queueName.get());
-        if (!jobQueue->foundQueueInStatusServer)
-            targetCluster->clusterStatusDetails.appendf("Cluster %s not attached; ", clusterName.str());
+        WARNLOG("CActivityInfo::readJobQueue: failed to get info for job queue %s", queueName);
+        return false;
     }
 
-    return;
+    CJobQueueContents queuedJobs;
+    jobQueue->copyItemsAndState(queuedJobs, state, stateDetails);
+
+    Owned<IJobQueueIterator> iter = queuedJobs.getIterator();
+    ForEach(*iter)
+    {
+        const char* wuid = iter->query().queryWUID();
+        if (wuid && *wuid)
+            wuids.append(wuid);
+    }
+    return true;
 }
 
 const char *CActivityInfo::getStatusServerTypeName(WsSMCStatusServerType type)
@@ -336,9 +363,8 @@ const char *CActivityInfo::getStatusServerTypeName(WsSMCStatusServerType type)
     return (type < WsSMCSSTterm) ? WsSMCStatusServerTypeNames[type] : NULL;
 }
 
-bool CActivityInfo::findQueueInStatusServer(const char* serverName, const char* queueName)
+bool CActivityInfo::findQueueInStatusServer(IPropertyTree* serverStatusRoot, const char* serverName, const char* queueName)
 {
-    bool foundQueue = false;
     VStringBuffer path("Server[@name=\"%s\"]", serverName);
     Owned<IPropertyTreeIterator> it(serverStatusRoot->getElements(path.str()));
     ForEach(*it)
@@ -353,35 +379,30 @@ bool CActivityInfo::findQueueInStatusServer(const char* serverName, const char*
         ForEachItemIn(q, qlist)
         {
             if (strieq(qlist.item(q), queueName))
-            {
-                foundQueue = true;
-                break;
-            }
+                return true;
         }
-        if (foundQueue)
-            break;
     }
-    return foundQueue;
+    return false;
 }
 
-void CActivityInfo::readActiveWUsAndQueuedWUs(IPropertyTree* envRoot)
+void CActivityInfo::readActiveWUsAndQueuedWUs(IPropertyTree* envRoot, IPropertyTree* serverStatusRoot)
 {
-    readRunningWUsOnStatusServer(WsSMCSSTThorLCRCluster);
+    readRunningWUsOnStatusServer(serverStatusRoot, WsSMCSSTThorLCRCluster);
     readWUsInTargetClusterJobQueues(thorTargetClusters);
-    readRunningWUsOnStatusServer(WsSMCSSTRoxieCluster);
+    readRunningWUsOnStatusServer(serverStatusRoot, WsSMCSSTRoxieCluster);
     readWUsInTargetClusterJobQueues(roxieTargetClusters);
-    readRunningWUsOnStatusServer(WsSMCSSTHThorCluster);
+    readRunningWUsOnStatusServer(serverStatusRoot, WsSMCSSTHThorCluster);
     readWUsInTargetClusterJobQueues(hthorTargetClusters);
 
-    readRunningWUsOnStatusServer(WsSMCSSTECLagent);
-    readRunningWUsAndJobQueueforOtherStatusServers();
+    readRunningWUsOnStatusServer(serverStatusRoot, WsSMCSSTECLagent);
+    readRunningWUsAndJobQueueforOtherStatusServers(serverStatusRoot);
     //TODO: add queued WUs for ECLCCServer/ECLServer here. Right now, they are under target clusters.
 
-    getDFUServersAndWUs(envRoot);
+    getDFUServersAndWUs(envRoot, serverStatusRoot);
     getDFURecoveryJobs();
 }
 
-void CActivityInfo::readRunningWUsOnStatusServer(WsSMCStatusServerType statusServerType)
+void CActivityInfo::readRunningWUsOnStatusServer(IPropertyTree* serverStatusRoot, WsSMCStatusServerType statusServerType)
 {
     const char* serverName = getStatusServerTypeName(statusServerType);
     if (!serverName || !*serverName)
@@ -472,7 +493,7 @@ void CActivityInfo::readRunningWUsOnStatusServer(WsSMCStatusServerType statusSer
                     const char *extra = wu->getExtra();
                     if (wu->getStateID() != WUStateBlocked || !extra || !*extra)  // Blocked on persist treated as running here
                     {
-                        aws.append(*wu.getLink());
+                        aws.append(*wu.getClear());
                         jobQueue->countQueuedJobs++;
                         continue;
                     }
@@ -483,7 +504,7 @@ void CActivityInfo::readRunningWUsOnStatusServer(WsSMCStatusServerType statusSer
                     wu->setMemoryBlocked(1);
             }
 
-            aws.append(*wu.getLink());
+            aws.append(*wu.getClear());
             jobQueue->countRunningJobs++;
         }
     }
@@ -587,43 +608,20 @@ void CActivityInfo::readWUsInTargetClusterJobQueues(CIArrayOf<CWsSMCTargetCluste
 
 void CActivityInfo::readWUsInTargetClusterJobQueue(CWsSMCTargetCluster& targetCluster, CWsSMCQueue& jobQueue, const char* queueName)
 {
-    StringArray wuidList;
-    readWUIDsInJobQueue(jobQueue.queueName.str(), wuidList);
-
-    ForEachItemIn(i, wuidList)
+    ForEachItemIn(i, targetCluster.queuedWUIDs)
     {
-        const char* wuid = wuidList.item(i);
+        const char* wuid = targetCluster.queuedWUIDs.item(i);
         if (!wuid || !*wuid || isDuplicatedECLWUID(wuid))
             continue;
 
         Owned<IEspActiveWorkunit> wu;
         createActiveWorkUnit(wu, wuid, jobQueue.queueName.str(), ++jobQueue.countQueuedJobs, targetCluster.statusServerName.str(),
             queueName, NULL, targetCluster.clusterName.get(), false);
-        aws.append(*wu.getLink());
-    }
-}
-
-void CActivityInfo::readWUIDsInJobQueue(const char* queueName, StringArray& wuidList)
-{
-    if (!queueName || !*queueName)
-    {
-        WARNLOG("CActivityInfo::readWUIDsInJobQueue: job queue not specified.");
-        return;
-    }
-    if (!jobQueueSnapshot)
-    {
-        WARNLOG("CActivityInfo::readWUIDsInJobQueue: jobQueueSnapshot not found.");
-        return;
+        aws.append(*wu.getClear());
     }
-
-    Owned<IJobQueueConst> jobQueue = jobQueueSnapshot->getJobQueue(queueName);
-    if (!jobQueue)
-        WARNLOG("CActivityInfo::readWUsInJobQueue: failed to get info for job queue %s.", queueName);
-    else
-        jobQueue->getWUIDs(wuidList);
 }
 
-void CActivityInfo::readRunningWUsAndJobQueueforOtherStatusServers()
+void CActivityInfo::readRunningWUsAndJobQueueforOtherStatusServers(IPropertyTree* serverStatusRoot)
 {
     BoolHash uniqueServers;
     Owned<IPropertyTreeIterator> it(serverStatusRoot->getElements("Server"));
@@ -649,7 +647,7 @@ void CActivityInfo::readRunningWUsAndJobQueueforOtherStatusServers()
 
             Owned<IEspActiveWorkunit> wu;
             createActiveWorkUnit(wu, wuid, NULL, 0, serverName, queueName, instanceName.str(), NULL, false);
-            aws.append(*wu.getLink());
+            aws.append(*wu.getClear());
         }
 
         bool* found = uniqueServers.getValue(instanceName);
@@ -663,7 +661,7 @@ void CActivityInfo::readRunningWUsAndJobQueueforOtherStatusServers()
     return;
 }
 
-void CActivityInfo::getDFUServersAndWUs(IPropertyTree* envRoot)
+void CActivityInfo::getDFUServersAndWUs(IPropertyTree* envRoot, IPropertyTree* serverStatusRoot)
 {
     if (!envRoot)
         return;
@@ -684,14 +682,20 @@ void CActivityInfo::getDFUServersAndWUs(IPropertyTree* envRoot)
         {
             StringArray wuidList;
             const char *queueName = queues.item(q);
-            readDFUWUDetails(queueName, serverName, wuidList, readDFUWUIDs(queueName, wuidList));
+            readDFUWUDetails(queueName, serverName, wuidList, readDFUWUIDs(serverStatusRoot, queueName, wuidList));
             getServerJobQueue(queueName, serverName, STATUS_SERVER_DFUSERVER, NULL, 0);
         }
     }
 }
 
-unsigned CActivityInfo::readDFUWUIDs(const char* queueName, StringArray& wuidList)
+unsigned CActivityInfo::readDFUWUIDs(IPropertyTree* serverStatusRoot, const char* queueName, StringArray& wuidList)
 {
+    if (!queueName || !*queueName)
+    {
+        WARNLOG("CActivityInfo::readDFUWUIDs: queue name not specified");
+        return 0;
+    }
+
     unsigned runningWUCount = 0;
     VStringBuffer path("Server[@name=\"DFUserver\"]/Queue[@name=\"%s\"]",queueName);
     Owned<IPropertyTreeIterator> iter = serverStatusRoot->getElements(path.str());
@@ -709,7 +713,26 @@ unsigned CActivityInfo::readDFUWUIDs(const char* queueName, StringArray& wuidLis
         }
     }
 
-    readWUIDsInJobQueue(queueName, wuidList);
+    if (!jobQueueSnapshot)
+        return runningWUCount;
+
+    //Read queued jobs
+    Owned<IJobQueueConst> jobQueue = jobQueueSnapshot->getJobQueue(queueName);
+    if (!jobQueue)
+    {
+        WARNLOG("CActivityInfo::readDFUWUIDs: failed to get info for job queue %s.", queueName);
+        return runningWUCount;
+    }
+
+    CJobQueueContents jobList;
+    jobQueue->copyItems(jobList);
+    Owned<IJobQueueIterator> iterq = jobList.getIterator();
+    ForEach(*iterq)
+    {
+        const char* wuid = iterq->query().queryWUID();
+        if (wuid && *wuid)
+            wuidList.append(wuid);
+    }
     return runningWUCount;
 }
 
@@ -742,7 +765,7 @@ void CActivityInfo::readDFUWUDetails(const char* queueName, const char* serverNa
         wu->setServer(STATUS_SERVER_DFUSERVER);
         wu->setInstance(serverName);
         wu->setQueueName(queueName);
-        aws.append(*wu.getLink());
+        aws.append(*wu.getClear());
     }
 }
 
@@ -777,7 +800,7 @@ void CActivityInfo::getDFURecoveryJobs()
         job->setDone(done);
         job->setTotal(total);
         job->setCommand(cmd.str());
-        DFURecoveryJobs.append(*job.getLink());
+        DFURecoveryJobs.append(*job.getClear());
     }
 }
 
@@ -1113,7 +1136,7 @@ void CWsSMCEx::addWUsToResponse(IEspContext &context, const IArrayOf<IEspActiveW
                 cw->setInstance(instanceName); // JCSMORE In thor case at least, if queued it is unknown which instance it will run on..
             if (targetClusterName && *targetClusterName)
                 cw->setTargetClusterName(targetClusterName);
-            awsReturned.append(*cw.getLink());
+            awsReturned.append(*cw.getClear());
 
             e->Release();
         }
@@ -2177,7 +2200,7 @@ void CWsSMCEx::setActiveWUs(IEspContext &context, const char *serverType, const
 
         Owned<IEspActiveWorkunit> wuOnThisQueue = new CActiveWorkunitWrapper(wuid, "", "", "", "");
         setActiveWUs(context, wu, wuOnThisQueue);
-        awsOnThisQueue.append(*wuOnThisQueue.getLink());
+        awsOnThisQueue.append(*wuOnThisQueue.getClear());
     }
     statusServerInfo.setWorkunits(awsOnThisQueue);
 }
@@ -2198,7 +2221,7 @@ void CWsSMCEx::setActiveWUs(IEspContext &context, const char *serverType, const
 
         Owned<IEspActiveWorkunit> wuOnThisQueue = new CActiveWorkunitWrapper(wuid, "", "", "", "");
         setActiveWUs(context, wu, wuOnThisQueue);
-        awsOnThisQueue.append(*wuOnThisQueue.getLink());
+        awsOnThisQueue.append(*wuOnThisQueue.getClear());
     }
     statusServerInfo.setWorkunits(awsOnThisQueue);
 }

+ 10 - 11
esp/services/ws_smc/ws_smcService.hpp

@@ -87,6 +87,7 @@ public:
     CWsSMCQueue clusterQueue;
     CWsSMCQueue agentQueue;
     CWsSMCQueue serverQueue;
+    StringArray queuedWUIDs;
 
     CWsSMCTargetCluster(){};
     virtual ~CWsSMCTargetCluster(){};
@@ -108,22 +109,20 @@ class CActivityInfo : public CInterface, implements IInterface
     IArrayOf<IEspServerJobQueue> serverJobQueues;
     IArrayOf<IEspDFUJob> DFURecoveryJobs;
 
-    IPropertyTree* serverStatusRoot;
-
-    void readTargetClusterInfo(CConstWUClusterInfoArray& clusters);
-    void readTargetClusterInfo(IConstWUClusterInfo& cluster, CWsSMCTargetCluster* targetCluster);
-    bool findQueueInStatusServer(const char* serverName, const char* queueName);
+    void readTargetClusterInfo(CConstWUClusterInfoArray& clusters, IPropertyTree* serverStatusRoot);
+    void readTargetClusterInfo(IConstWUClusterInfo& cluster, IPropertyTree* serverStatusRoot, CWsSMCTargetCluster* targetCluster);
+    bool findQueueInStatusServer(IPropertyTree* serverStatusRoot, const char* serverName, const char* queueName);
     const char *getStatusServerTypeName(WsSMCStatusServerType type);
-    void readActiveWUsAndQueuedWUs(IPropertyTree* envRoot);
-    void readRunningWUsOnStatusServer(WsSMCStatusServerType statusServerType);
+    bool readJobQueue(const char* queueName, StringArray& wuids, StringBuffer& state, StringBuffer& stateDetails);
+    void readActiveWUsAndQueuedWUs(IPropertyTree* envRoot, IPropertyTree* serverStatusRoot);
+    void readRunningWUsOnStatusServer(IPropertyTree* serverStatusRoot, WsSMCStatusServerType statusServerType);
     void readWUsInTargetClusterJobQueues(CIArrayOf<CWsSMCTargetCluster>& targetClusters);
     void readWUsInTargetClusterJobQueue(CWsSMCTargetCluster& targetCluster, CWsSMCQueue& jobQueue, const char* queueName);
-    void readWUIDsInJobQueue(const char* queueName, StringArray& wuidList);
-    void readRunningWUsAndJobQueueforOtherStatusServers();
+    void readRunningWUsAndJobQueueforOtherStatusServers(IPropertyTree* serverStatusRoot);
     void createActiveWorkUnit(Owned<IEspActiveWorkunit>& ownedWU, const char* wuid, const char* location,
         unsigned index, const char* serverName, const char* queueName, const char* instanceName, const char* targetClusterName, bool useContext);
-    void getDFUServersAndWUs(IPropertyTree* envRoot);
-    unsigned readDFUWUIDs(const char* queueName, StringArray& wuidList);
+    void getDFUServersAndWUs(IPropertyTree* envRoot, IPropertyTree* serverStatusRoot);
+    unsigned readDFUWUIDs(IPropertyTree* serverStatusRoot, const char* queueName, StringArray& wuidList);
     void readDFUWUDetails(const char* queueName, const char* serverName, StringArray& wuidList, unsigned runningWUCount);
     void getDFURecoveryJobs();
     void getServerJobQueue(const char* queueName, const char* serverName, const char* serverType, const char* networkAddress, unsigned port);