Browse Source

HPCC-12552 Show queued job under eclserver/eclccserver

The code is added to read queued jobs from the .eclserver
JobQueues into buffers. The queued jobs in the buffers are
added under each eclserver/eclccserver if the server is
listenning to the job queue.

Signed-off-by: wangkx <kevin.wang@lexisnexis.com>
wangkx 10 years ago
parent
commit
812d19d244
2 changed files with 35 additions and 0 deletions
  1. 33 0
      esp/services/ws_smc/ws_smcService.cpp
  2. 2 0
      esp/services/ws_smc/ws_smcService.hpp

+ 33 - 0
esp/services/ws_smc/ws_smcService.cpp

@@ -324,6 +324,8 @@ void CActivityInfo::readTargetClusterInfo(IConstWUClusterInfo& cluster, IPropert
         if (!smcQueue->foundQueueInStatusServer)
             targetCluster->clusterStatusDetails.appendf("Cluster %s not attached; ", clusterName.str());
     }
+
+    readJobQueue(targetCluster->serverQueue.queueName.str(), targetCluster->wuidsOnServerQueue, targetCluster->serverQueue.queueState, targetCluster->serverQueue.queueStateDetails);
 }
 
 bool CActivityInfo::readJobQueue(const char* queueName, StringArray& wuids, StringBuffer& state, StringBuffer& stateDetails)
@@ -620,6 +622,27 @@ void CActivityInfo::readWUsInTargetClusterJobQueue(IEspContext& context, CWsSMCT
     }
 }
 
+void CActivityInfo::addQueuedServerQueueJob(IEspContext& context, const char* serverName, const char* queueName, const char* instanceName,  CIArrayOf<CWsSMCTargetCluster>& targetClusters)
+{
+    ForEachItemIn(i, targetClusters)
+    {
+        CWsSMCTargetCluster& targetCluster = targetClusters.item(i);
+        if (!targetCluster.wuidsOnServerQueue.length() || !strieq(queueName, targetCluster.serverQueue.queueName.str()))
+            continue;
+
+        ForEachItemIn(i1, targetCluster.wuidsOnServerQueue)
+        {
+            const char* wuid = targetCluster.wuidsOnServerQueue.item(i1);
+            if (!wuid || !*wuid) //Multiple servers may monitor one queue. The WU may be shown under the multiple servers.
+                continue;
+
+            Owned<IEspActiveWorkunit> wu;
+            createActiveWorkUnit(context, wu, wuid, NULL, 0, serverName, queueName, instanceName, NULL, false);
+            aws.append(*wu.getClear());
+        }
+    }
+}
+
 void CActivityInfo::readRunningWUsAndJobQueueforOtherStatusServers(IEspContext& context, IPropertyTree* serverStatusRoot)
 {
     BoolHash uniqueServers;
@@ -654,6 +677,16 @@ void CActivityInfo::readRunningWUsAndJobQueueforOtherStatusServers(IEspContext&
         {
             uniqueServers.setValue(instanceName, true);
             getServerJobQueue(queueName, instanceName, serverName, node, port);
+
+            //Now, we found a new server. we need to add queued jobs from the queues the server is monitoring.
+            StringArray qList;
+            qList.appendListUniq(queueName, ",");
+            ForEachItemIn(q, qList)
+            {
+                addQueuedServerQueueJob(context, serverName, qList.item(q), instanceName.str(), thorTargetClusters);
+                addQueuedServerQueueJob(context, serverName, qList.item(q), instanceName.str(), roxieTargetClusters);
+                addQueuedServerQueueJob(context, serverName, qList.item(q), instanceName.str(), hthorTargetClusters);
+            }
         }
     }
 

+ 2 - 0
esp/services/ws_smc/ws_smcService.hpp

@@ -88,6 +88,7 @@ public:
     CWsSMCQueue agentQueue;
     CWsSMCQueue serverQueue;
     StringArray queuedWUIDs;
+    StringArray wuidsOnServerQueue;
 
     CWsSMCTargetCluster(){};
     virtual ~CWsSMCTargetCluster(){};
@@ -130,6 +131,7 @@ class CActivityInfo : public CInterface, implements IInterface
         CIArrayOf<CWsSMCTargetCluster>& targetClusters1, CIArrayOf<CWsSMCTargetCluster>& targetClusters2);
     CWsSMCTargetCluster* findTargetCluster(const char* clusterName, CIArrayOf<CWsSMCTargetCluster>& targetClusters);
     bool checkSetUniqueECLWUID(const char* wuid);
+    void addQueuedServerQueueJob(IEspContext& context, const char* serverName, const char* queueName, const char* instanceName, CIArrayOf<CWsSMCTargetCluster>& targetClusters);
 
 public:
     IMPLEMENT_IINTERFACE;