浏览代码

Merge pull request #6515 from wangkx/h12311b

HPCC-12311 Remove extra calls to Dali for JobQueues

Reviewed-By: Anthony Fishbeck <anthony.fishbeck@lexisnexis.com>
Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 年之前
父节点
当前提交
0026c60037
共有 4 个文件被更改,包括 1355 次插入1158 次删除
  1. 583 442
      common/workunit/wujobq.cpp
  2. 26 19
      common/workunit/wujobq.hpp
  3. 691 654
      esp/services/ws_smc/ws_smcService.cpp
  4. 55 43
      esp/services/ws_smc/ws_smcService.hpp

文件差异内容过多而无法显示
+ 583 - 442
common/workunit/wujobq.cpp


+ 26 - 19
common/workunit/wujobq.hpp

@@ -59,7 +59,26 @@ interface IDynamicPriority
     virtual int get()=0;
 };
 
-interface IJobQueue: extends IInterface
+interface IJobQueueConst: extends IInterface
+{
+    virtual unsigned ordinality()=0;            // number of items on queue
+    virtual unsigned waiting()=0;               // number currently waiting on dequeue
+    virtual IJobQueueItem *getItem(unsigned idx)=0;
+    virtual IJobQueueItem *getHead()=0;
+    virtual IJobQueueItem *getTail()=0;
+    virtual IJobQueueItem *find(const char *wuid)=0;
+    virtual unsigned findRank(const char *wuid)=0;
+    virtual unsigned copyItems(CJobQueueContents &dest)=0;  // takes a snapshot copy of the entire queue (returns number copied)
+    virtual bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)=0;
+    virtual void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state, StringBuffer& stateDetails)=0;
+    virtual void getState(StringBuffer& state, StringBuffer& stateDetails)=0;
+    virtual bool paused()=0;    // true if paused
+    virtual bool paused(StringBuffer& info)=0;    // true if paused
+    virtual bool stopped()=0;   // true if stopped
+    virtual bool stopped(StringBuffer& info)=0;   // true if stopped
+};
+
+interface IJobQueue: extends IJobQueueConst
 {
 
 // enqueuing
@@ -80,20 +99,6 @@ interface IJobQueue: extends IInterface
     virtual bool waitStatsChange(unsigned timeout)=0;
     virtual void cancelWaitStatsChange()=0;
 
-
-//enquiry
-    virtual unsigned ordinality()=0;            // number of items on queue
-    virtual unsigned waiting()=0;               // number currently waiting on dequeue
-    virtual IJobQueueItem *getItem(unsigned idx)=0;
-    virtual IJobQueueItem *getHead()=0;
-    virtual IJobQueueItem *getTail()=0;
-    virtual IJobQueueItem *find(const char *wuid)=0;
-    virtual unsigned findRank(const char *wuid)=0;
-    virtual unsigned copyItems(CJobQueueContents &dest)=0;  // takes a snapshot copy of the entire queue (returns number copied)
-    virtual bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)=0;
-    virtual void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state, StringBuffer& stateDetails)=0;
-
-
 //manipulation
     virtual IJobQueueItem *take(const char *wuid)=0; // finds and removes
     virtual unsigned takeItems(CJobQueueContents &dest)=0;   // takes items and clears queue
@@ -113,12 +118,8 @@ interface IJobQueue: extends IInterface
 // control:
     virtual void pause()=0;     // marks queue as paused - and subsequent dequeues block until resumed
     virtual void pause(const char *info)=0;     // marks queue as paused - and subsequent dequeues block until resumed
-    virtual bool paused()=0;    // true if paused
-    virtual bool paused(StringBuffer& info)=0;    // true if paused
     virtual void stop()=0;      // sets stopped flags - all current and subsequent dequeues return NULL
     virtual void stop(const char *info)=0;      // sets stopped flags - all current and subsequent dequeues return NULL
-    virtual bool stopped()=0;   // true if stopped
-    virtual bool stopped(StringBuffer& info)=0;   // true if stopped
     virtual void resume()=0;    // removes paused or stopped flag
     virtual void resume(const char *info)=0;    // removes paused or stopped flag
 
@@ -136,6 +137,12 @@ interface IJobQueue: extends IInterface
 
 };
 
+interface IJQSnapshot : extends IInterface
+{
+    virtual IJobQueueConst *getJobQueue(const char *name)=0;
+};
+
+extern WORKUNIT_API IJQSnapshot *createJQSnapshot();
 
 extern WORKUNIT_API IJobQueueItem *createJobQueueItem(const char *wuid);
 extern WORKUNIT_API IJobQueueItem *deserializeJobQueueItem(MemoryBuffer &mb); 

文件差异内容过多而无法显示
+ 691 - 654
esp/services/ws_smc/ws_smcService.cpp


+ 55 - 43
esp/services/ws_smc/ws_smcService.hpp

@@ -19,6 +19,7 @@
 #define _ESPWIZ_WsSMC_HPP__
 
 #include "ws_smc_esp.ipp"
+#include "wujobq.hpp"
 #include "TpWrapper.hpp"
 #include "WUXMLInfo.hpp"
 
@@ -65,6 +66,8 @@ public:
     CWsSMCQueue(bool foundQueue = false): countRunningJobs(0), countQueuedJobs(0), statusType(RunningNormal)
     {
         foundQueueInStatusServer = foundQueue;
+        countRunningJobs = 0;
+        countQueuedJobs = 0;
     }
     virtual ~CWsSMCQueue(){};
 };
@@ -84,19 +87,18 @@ public:
     CWsSMCQueue clusterQueue;
     CWsSMCQueue agentQueue;
     CWsSMCQueue serverQueue;
+    StringArray queuedWUIDs;
 
     CWsSMCTargetCluster(){};
     virtual ~CWsSMCTargetCluster(){};
 };
 
-struct ActivityInfo : public CInterface, implements IInterface
+class CActivityInfo : public CInterface, implements IInterface
 {
-    IMPLEMENT_IINTERFACE;
-
-    ActivityInfo() {};
-    bool isCachedActivityInfoValid(unsigned timeOutSeconds);
-
     CDateTime timeCached;
+    BoolHash uniqueECLWUIDs;
+
+    Owned<IJQSnapshot> jobQueueSnapshot;
 
     CIArrayOf<CWsSMCTargetCluster> thorTargetClusters;
     CIArrayOf<CWsSMCTargetCluster> roxieTargetClusters;
@@ -105,6 +107,46 @@ struct ActivityInfo : public CInterface, implements IInterface
     IArrayOf<IEspActiveWorkunit> aws;
     IArrayOf<IEspServerJobQueue> serverJobQueues;
     IArrayOf<IEspDFUJob> DFURecoveryJobs;
+
+    void readTargetClusterInfo(CConstWUClusterInfoArray& clusters, IPropertyTree* serverStatusRoot);
+    void readTargetClusterInfo(IConstWUClusterInfo& cluster, IPropertyTree* serverStatusRoot, CWsSMCTargetCluster* targetCluster);
+    bool findQueueInStatusServer(IPropertyTree* serverStatusRoot, const char* serverName, const char* queueName);
+    const char *getStatusServerTypeName(WsSMCStatusServerType type);
+    bool readJobQueue(const char* queueName, StringArray& wuids, StringBuffer& state, StringBuffer& stateDetails);
+    void readActiveWUsAndQueuedWUs(IEspContext& context, IPropertyTree* envRoot, IPropertyTree* serverStatusRoot);
+    void readRunningWUsOnStatusServer(IEspContext& context, IPropertyTree* serverStatusRoot, WsSMCStatusServerType statusServerType);
+    void readWUsInTargetClusterJobQueues(IEspContext& context, CIArrayOf<CWsSMCTargetCluster>& targetClusters);
+    void readWUsInTargetClusterJobQueue(IEspContext& context, CWsSMCTargetCluster& targetCluster, CWsSMCQueue& jobQueue, const char* queueName);
+    void readRunningWUsAndJobQueueforOtherStatusServers(IEspContext& context, IPropertyTree* serverStatusRoot);
+    void createActiveWorkUnit(IEspContext& context, Owned<IEspActiveWorkunit>& ownedWU, const char* wuid, const char* location,
+        unsigned index, const char* serverName, const char* queueName, const char* instanceName, const char* targetClusterName, bool useContext);
+    void getDFUServersAndWUs(IPropertyTree* envRoot, IPropertyTree* serverStatusRoot);
+    unsigned readDFUWUIDs(IPropertyTree* serverStatusRoot, const char* queueName, StringArray& wuidList);
+    void readDFUWUDetails(const char* queueName, const char* serverName, StringArray& wuidList, unsigned runningWUCount);
+    void getDFURecoveryJobs();
+    void getServerJobQueue(const char* queueName, const char* serverName, const char* serverType, const char* networkAddress, unsigned port);
+    void readServerJobQueueStatus(IEspServerJobQueue* jobQueue);
+    CWsSMCTargetCluster* findWUClusterInfo(const char* wuid, bool isOnECLAgent, CIArrayOf<CWsSMCTargetCluster>& targetClusters,
+        CIArrayOf<CWsSMCTargetCluster>& targetClusters1, CIArrayOf<CWsSMCTargetCluster>& targetClusters2);
+    CWsSMCTargetCluster* findTargetCluster(const char* clusterName, CIArrayOf<CWsSMCTargetCluster>& targetClusters);
+    bool checkSetUniqueECLWUID(const char* wuid);
+
+public:
+    IMPLEMENT_IINTERFACE;
+
+    CActivityInfo() {};
+    virtual ~CActivityInfo() { jobQueueSnapshot.clear(); };
+
+    bool isCachedActivityInfoValid(unsigned timeOutSeconds);
+    void createActivityInfo(IEspContext& context);
+
+    inline CIArrayOf<CWsSMCTargetCluster>& queryThorTargetClusters() { return thorTargetClusters; };
+    inline CIArrayOf<CWsSMCTargetCluster>& queryRoxieTargetClusters() { return roxieTargetClusters; };
+    inline CIArrayOf<CWsSMCTargetCluster>& queryHThorTargetClusters() { return hthorTargetClusters; };
+
+    inline IArrayOf<IEspActiveWorkunit>& queryActiveWUs() { return aws; };
+    inline IArrayOf<IEspServerJobQueue>& queryServerJobQueues() { return serverJobQueues; };
+    inline IArrayOf<IEspDFUJob>& queryDFURecoveryJobs() { return DFURecoveryJobs; };
 };
 
 class CWsSMCEx : public CWsSMC
@@ -112,7 +154,7 @@ class CWsSMCEx : public CWsSMC
     long m_counter;
     CTpWrapper m_ClusterStatus;
     CriticalSection getActivityCrit;
-    Owned<ActivityInfo> activityInfoCache;
+    Owned<CActivityInfo> activityInfoCache;
     unsigned activityInfoCacheSeconds;
 
     StringBuffer m_ChatURL;
@@ -151,59 +193,30 @@ public:
     virtual bool onRoxieControlCmd(IEspContext &context, IEspRoxieControlCmdRequest &req, IEspRoxieControlCmdResponse &resp);
     virtual bool onGetStatusServerInfo(IEspContext &context, IEspGetStatusServerInfoRequest &req, IEspGetStatusServerInfoResponse &resp);
 private:
-    void addCapabilities( IPropertyTree* pFeatureNode, const char* access, 
-                                 IArrayOf<IEspCapability>& capabilities);
+    void addCapabilities( IPropertyTree* pFeatureNode, const char* access, IArrayOf<IEspCapability>& capabilities);
     void addServerJobQueue(IArrayOf<IEspServerJobQueue>& jobQueues, const char* queueName, const char* serverName,
-        const char* serverType, const char* networkAddress, unsigned port);
+        const char* serverType, const char* networkAddress, unsigned port, IJQSnapshot* jobQueueSnapshot);
     void addServerJobQueue(IArrayOf<IEspServerJobQueue>& jobQueues, const char* queueName, const char* serverName,
         const char* serverType, const char* networkAddress, unsigned port, const char* queueState, const char* queueStateDetails);
     void readBannerAndChatRequest(IEspContext& context, IEspActivityRequest &req, IEspActivityResponse& resp);
     void setBannerAndChatData(double version, IEspActivityResponse& resp);
     void getServersAndWUs(IEspContext &context, IEspActivityRequest &req, IPropertyTree* envRoot, CConstWUClusterInfoArray& clusters,
-        ActivityInfo* activityInfo);
+        CActivityInfo* activityInfo);
 
     void sortTargetClusters(IArrayOf<IEspTargetCluster>& clusters, const char* sortBy, bool descending);
-    void createActiveWorkUnit(Owned<IEspActiveWorkunit>& ownedWU, IEspContext &context, const char* wuid, const char* location,
-        unsigned index, const char* serverName, const char* queueName, const char* instanceName, const char* targetClusterName, bool useContext);
-    void readDFUWUs(IEspContext &context, const char* queueName, const char* serverName, IArrayOf<IEspActiveWorkunit>& aws);
-    void readRunningWUsOnECLAgent(IEspContext& context, IPropertyTreeIterator* itStatusECLagent, CConstWUClusterInfoArray& clusters,
-         CWsSMCTargetCluster& targetCluster, BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws);
-    void readWUsAndStateFromJobQueue(IEspContext& context, CWsSMCTargetCluster& targetCluster, CWsSMCQueue& queue, const char* listQueue, BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws);
     void addToTargetClusterList(IArrayOf<IEspTargetCluster>& clusters, IEspTargetCluster* cluster, const char* sortBy, bool descending);
-    bool findQueueInStatusServer(IEspContext& context, IPropertyTree* serverStatusRoot, const char* serverName, const char* queueName);
     void getClusterQueueStatus(const CWsSMCTargetCluster& targetCluster, ClusterStatusType& queueStatusType, StringBuffer& queueStatusDetails);
     void setClusterStatus(IEspContext& context, const CWsSMCTargetCluster& targetCluster, IEspTargetCluster* returnCluster);
     void getTargetClusterAndWUs(IEspContext& context, CConstWUClusterInfoArray& clusters, IConstWUClusterInfo& cluster,
-         IPropertyTree* serverStatusRoot, IPropertyTreeIterator* itStatusECLagent, IEspTargetCluster* returnCluster, IArrayOf<IEspActiveWorkunit>& aws);
-    void getWUsNotOnTargetCluster(IEspContext &context, IPropertyTree* serverStatusRoot, IArrayOf<IEspServerJobQueue>& serverJobQueues, IArrayOf<IEspActiveWorkunit>& aws);
-    void getDFUServersAndWUs(IEspContext &context, IPropertyTree* envRoot, IArrayOf<IEspServerJobQueue>& serverJobQueues, IArrayOf<IEspActiveWorkunit>& aws);
-    void getDFURecoveryJobs(IEspContext &context, const IPropertyTree* dfuRecoveryRoot, IArrayOf<IEspDFUJob>& jobs);
+        IPropertyTree* serverStatusRoot, IPropertyTreeIterator* itStatusECLagent, IEspTargetCluster* returnCluster, IArrayOf<IEspActiveWorkunit>& aws);
     const char* createQueueActionInfo(IEspContext &context, const char* action, IEspSMCQueueRequest &req, StringBuffer& info);
-    void setServerJobQueueStatus(double version, IEspServerJobQueue* jobQueue, const char* status, const char* details);
-    void setServerJobQueueStatus(IEspServerJobQueue* jobQueue, const char* status, const char* details);
-    void setServerJobQueueStatusDetails(IEspServerJobQueue* jobQueue, const char* status, const char* details);
     void setJobPriority(IEspContext &context, IWorkUnitFactory* factory, const char* wuid, const char* queue, WUPriorityClass& priority);
 
-    void readTargetClusterInfo(IEspContext &context, CConstWUClusterInfoArray& clusters, IPropertyTree* serverStatusRoot,
-        ActivityInfo* activityInfo);
-    void readTargetClusterInfo(IEspContext& context, IConstWUClusterInfo& cluster, IPropertyTree* serverStatusRoot, CWsSMCTargetCluster* targetCluster);
-    void readRunningWUsAndQueuedWUs(IEspContext &context, IPropertyTree* envRoot, IPropertyTree* serverStatusRoot,
-        IPropertyTree* dfuRecoveryRoot, ActivityInfo* activityInfo);
-    CWsSMCTargetCluster* findWUClusterInfo(IEspContext& context, const char* wuid, bool isOnECLAgent,
-            CIArrayOf<CWsSMCTargetCluster>& targetClusters, CIArrayOf<CWsSMCTargetCluster>& targetClusters1, CIArrayOf<CWsSMCTargetCluster>& targetClusters2);
-    CWsSMCTargetCluster* findTargetCluster(const char* clusterName, CIArrayOf<CWsSMCTargetCluster>& targetClusters);
-    void readRunningWUsOnStatusServer(IEspContext& context, IPropertyTree* serverStatusRoot, WsSMCStatusServerType statusServerType,
-            CIArrayOf<CWsSMCTargetCluster>& targetClusters, CIArrayOf<CWsSMCTargetCluster>& targetClusters1, CIArrayOf<CWsSMCTargetCluster>& targetClusters2,
-            BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws);
-    void readWUsAndStateFromJobQueue(IEspContext& context, CWsSMCTargetCluster& targetCluster, BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws);
-    void readWUsAndStateFromJobQueue(IEspContext& context, CIArrayOf<CWsSMCTargetCluster>& targetClusters, BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws);
     void setESPTargetClusters(IEspContext& context, const CIArrayOf<CWsSMCTargetCluster>& targetClusters, IArrayOf<IEspTargetCluster>& respTargetClusters);
-    ActivityInfo* createActivityInfo(IEspContext &context);
     void clearActivityInfoCache();
-    ActivityInfo* getActivityInfo(IEspContext &context);
-    void setActivityResponse(IEspContext &context, ActivityInfo* activityInfo, IEspActivityRequest &req, IEspActivityResponse& resp);
+    CActivityInfo* getActivityInfo(IEspContext &context);
+    void setActivityResponse(IEspContext &context, CActivityInfo* activityInfo, IEspActivityRequest &req, IEspActivityResponse& resp);
     void addWUsToResponse(IEspContext &context, const IArrayOf<IEspActiveWorkunit>& aws, IEspActivityResponse& resp);
-    const char *getStatusServerTypeName(WsSMCStatusServerType type);
 
     void getStatusServerInfo(IEspContext &context, const char *serverType, const char *serverName, const char *networkAddress, unsigned port, IEspStatusServerInfo& statusServerInfo);
     void setTargetClusterInfo(IEspContext &context, const char *serverType, const char *serverName, const CIArrayOf<CWsSMCTargetCluster>& targetClusters,
@@ -282,4 +295,3 @@ public:
 };
 
 #endif //_ESPWIZ_WsSMC_HPP__
-