Просмотр исходного кода

HPCC-20124 Return ECLServer queue object in WsSMC.Activity

One ECLServer may listen multiple queues. In the existing WsSMC
Activity method, the queue status for those queues are read and
concatenated into one string. It is difficult for an ESP client
to parse. In this fix, each queue will be an object under an
ECLServer object. The queue object has queue name, status, and
status details.

Signed-off-by: wangkx <kevin.wang@lexisnexis.com>
wangkx 7 лет назад
Родитель
Сommit
d4dd7e6b03
3 измененных файлов с 101 добавлено и 43 удалено
  1. 4 3
      esp/scm/ws_smc.ecm
  2. 94 39
      esp/services/ws_smc/ws_smcService.cpp
  3. 3 1
      esp/services/ws_smc/ws_smcService.hpp

+ 4 - 3
esp/scm/ws_smc.ecm

@@ -102,8 +102,9 @@ ESPStruct DFUJob
 
 ESPStruct ServerJobQueue
 {
-    [depr_ver("1.20")] string QueueName;
-    [min_ver("1.20")] ESParray<string, QueueName> QueueNames;
+    string QueueName;
+    [min_ver("1.20"), depr_ver("1.21")] ESParray<string, QueueName> QueueNames;
+    [min_ver("1.21")] ESParray<ESPstruct ServerJobQueue> Queues;
     string ServerName;
     string ServerType;
     string QueueStatus;
@@ -398,7 +399,7 @@ ESPresponse [exceptions_inline] LockQueryResponse
     int NumLocks;
 };
 
-ESPservice [auth_feature("DEFERRED"), noforms, version("1.20"), exceptions_inline("./smc_xslt/exceptions.xslt"), use_method_name] WsSMC
+ESPservice [auth_feature("DEFERRED"), noforms, version("1.21"), exceptions_inline("./smc_xslt/exceptions.xslt"), use_method_name] WsSMC
 {
     ESPmethod Index(SMCIndexRequest, SMCIndexResponse);
     ESPmethod [resp_xsl_default("/esp/xslt/index.xslt")] Activity(ActivityRequest, ActivityResponse);

+ 94 - 39
esp/services/ws_smc/ws_smcService.cpp

@@ -867,14 +867,6 @@ void CActivityInfo::getServerJobQueue(IEspContext &context, const char* queueNam
 
     double version = context.getClientVersion();
     Owned<IEspServerJobQueue> jobQueue = createServerJobQueue("", "");
-    if (version < 1.20)
-        jobQueue->setQueueName(queueName);
-    else
-    {
-        StringArray queueNames;
-        queueNames.appendListUniq(queueName, ",");
-        jobQueue->setQueueNames(queueNames);
-    }
     jobQueue->setServerName(serverName);
     jobQueue->setServerType(serverType);
     if (networkAddress && *networkAddress)
@@ -883,12 +875,12 @@ void CActivityInfo::getServerJobQueue(IEspContext &context, const char* queueNam
         jobQueue->setPort(port);
     }
 
-    readServerJobQueueStatus(jobQueue);
+    readServerJobQueueStatus(context, queueName, jobQueue);
 
     serverJobQueues.append(*jobQueue.getClear());
 }
 
-void CActivityInfo::readServerJobQueueStatus(IEspServerJobQueue* jobQueue)
+void CActivityInfo::readServerJobQueueStatus(IEspContext &context, const char* queueName, IEspServerJobQueue* jobQueue)
 {
     if (!jobQueueSnapshot)
     {
@@ -897,48 +889,111 @@ void CActivityInfo::readServerJobQueueStatus(IEspServerJobQueue* jobQueue)
     }
 
     StringBuffer queueStateDetails;
-    bool jobQueueFound = false, hasRunning = false,  hasPaused =  false;
+    bool hasStopped = false,  hasPaused =  false;
 
-    StringArray qlist;
-    qlist.appendListUniq(jobQueue->getQueueName(), ",");
-    ForEachItemIn(i, qlist)
-    {
-        const char* qname = qlist.item(i);
-        Owned<IJobQueueConst> queue = jobQueueSnapshot->getJobQueue(qname);
-        if (!queue)
-            continue;
+    StringArray queueNames;
+    queueNames.appendListUniq(queueName, ",");
 
-        jobQueueFound = true;
+    IArrayOf<IEspServerJobQueue> jobQueues;
+    ForEachItemIn(i, queueNames)
+        readServerJobQueueDetails(context, queueNames.item(i), hasStopped, hasPaused, queueStateDetails, jobQueues);
 
-        StringBuffer status, details;
+    double version = context.getClientVersion();
+    if (version < 1.20)
+        jobQueue->setQueueName(queueName);
+    else if (version < 1.21)
+        jobQueue->setQueueNames(queueNames);
+    else
+        jobQueue->setQueues(jobQueues);
+
+    //The hasStopped, hasPaused, and queueStateDetails should be set inside readServerJobQueueDetails().
+    if (hasStopped)
+        jobQueue->setQueueStatus("stopped"); //Some of its job queues is stopped. So, return a warning here.
+    else if (hasPaused)
+        jobQueue->setQueueStatus("paused"); //Some of its job queues is paused. So, return a warning here.
+    else
+        jobQueue->setQueueStatus("running");
+    jobQueue->setStatusDetails(queueStateDetails.str());
+}
+
+void CActivityInfo::readServerJobQueueDetails(IEspContext &context, const char* queueName, bool& hasStopped,
+    bool& hasPaused, StringBuffer& queueStateDetails, IArrayOf<IEspServerJobQueue>& jobQueues)
+{
+    double version = context.getClientVersion();
+    StringBuffer status, details, stateDetailsString;
+    Owned<IJobQueueConst> queue = jobQueueSnapshot->getJobQueue(queueName);
+    if (queue)
         queue->getState(status, details);
-        if (!status || !*status)
-            continue;
+    if (status.isEmpty())
+    {
+        if (version < 1.21)
+        {
+            if (!queue)
+                queueStateDetails.appendf("%s not found in Status Server list; ", queueName);
+            else
+                queueStateDetails.appendf("No status set in Status Server list for %s; ", queueName);
+        }
+        else
+        {
+            Owned<IEspServerJobQueue> jobQueue = createServerJobQueue();
+            jobQueue->setQueueName(queueName);
 
+            if (!queue)
+                stateDetailsString.setf("%s not found in Status Server list", queueName);
+            else
+                stateDetailsString.setf("No status set in Status Server list for %s", queueName);
+
+            queueStateDetails.appendf("%s;", stateDetailsString.str());
+            jobQueue->setStatusDetails(stateDetailsString.str());
+            jobQueues.append(*jobQueue.getClear());
+        }
+        return;
+    }
+
+    if (version < 1.21)
+    {
         if (strieq(status.str(), "paused"))
             hasPaused =  true;
-        else if (!strieq(status.str(), "stopped"))
-            hasRunning =  true;
-
+        else if (strieq(status.str(), "stopped"))
+            hasStopped =  true;
+    
         if (details && *details)
-            queueStateDetails.appendf("%s: queue %s; %s;", qname, status.str(), details.str());
+            queueStateDetails.appendf("%s: queue %s; %s;", queueName, status.str(), details.str());
         else
-            queueStateDetails.appendf("%s: queue %s;", qname, status.str());
+            queueStateDetails.appendf("%s: queue %s;", queueName, status.str());
     }
-
-    if (hasRunning)
-        jobQueue->setQueueStatus("running");
-    else if (hasPaused)
-        jobQueue->setQueueStatus("paused");
     else
     {
-        jobQueue->setQueueStatus("stopped");
-        if (!jobQueueFound)
-            queueStateDetails.setf("%s not found in Status Server list", jobQueue->getQueueName());
-        else if (!queueStateDetails.length())
-            queueStateDetails.setf("No status set in Status Server list for %s", jobQueue->getQueueName());
+        Owned<IEspServerJobQueue> jobQueue = createServerJobQueue();
+        jobQueue->setQueueName(queueName);
+        if (strieq(status.str(), "paused"))
+        {
+            hasPaused =  true;
+            jobQueue->setQueueStatus("paused");
+        }
+        else if (strieq(status.str(), "stopped"))
+        {
+            hasStopped =  true;
+            jobQueue->setQueueStatus("stopped");
+        }
+        else
+        {
+            jobQueue->setQueueStatus("running");
+        }
+    
+        if (details && *details)
+        {
+            queueStateDetails.appendf("%s: queue %s; %s;", queueName, status.str(), details.str());
+            stateDetailsString.setf("%s: queue %s; %s;", queueName, status.str(), details.str());
+        }
+        else
+        {
+            queueStateDetails.appendf("%s: queue %s;", queueName, status.str());
+            stateDetailsString.setf("%s: queue %s;", queueName, status.str());
+        }
+        jobQueue->setStatusDetails(stateDetailsString.str());
+        jobQueues.append(*jobQueue.getClear());
     }
-    jobQueue->setStatusDetails(queueStateDetails.str());
 }
 
 bool CWsSMCEx::onIndex(IEspContext &context, IEspSMCIndexRequest &req, IEspSMCIndexResponse &resp)

+ 3 - 1
esp/services/ws_smc/ws_smcService.hpp

@@ -129,7 +129,9 @@ class CActivityInfo : public CInterface, implements IInterface
     void readDFUWUDetails(const char* queueName, const char* serverName, StringArray& wuidList, unsigned runningWUCount);
     void getDFURecoveryJobs();
     void getServerJobQueue(IEspContext &context, const char* queueName, const char* serverName, const char* serverType, const char* networkAddress, unsigned port);
-    void readServerJobQueueStatus(IEspServerJobQueue* jobQueue);
+    void readServerJobQueueStatus(IEspContext &context, const char* queueName, IEspServerJobQueue* jobQueue);
+    void readServerJobQueueDetails(IEspContext &context, const char* queueName, bool& hasStopped,
+        bool& hasPaused, StringBuffer& queueStateDetails, IArrayOf<IEspServerJobQueue>& jobQueues);
     CWsSMCTargetCluster* findWUClusterInfo(const char* wuid, bool isOnECLAgent, CIArrayOf<CWsSMCTargetCluster>& targetClusters,
         CIArrayOf<CWsSMCTargetCluster>& targetClusters1, CIArrayOf<CWsSMCTargetCluster>& targetClusters2);
     CWsSMCTargetCluster* findTargetCluster(const char* clusterName, CIArrayOf<CWsSMCTargetCluster>& targetClusters);