فهرست منبع

HPCC-24837 Report Activity Information for containerized HPCC

1. Add new getContainerWUClusterInfo() which is similar to the
getEnvironmentClusterInfo() for bare metal environment. For
now, the API only collects the cluster information needed for
reporting the Activity Information.
2. Call the getContainerWUClusterInfo() to get the cluster
information.
3. Make other changes for reporting the Activity Information
for containerized HPCC.

Revise based on review:
1. Use the IConstWUClusterInfo
2. Implement the getSize()
3. Move the getDFURecoveryJobs() into the #ifndef
4. Add 'virtual' and 'override'

Signed-off-by: wangkx <kevin.wang@lexisnexis.com>
wangkx 4 سال پیش
والد
کامیت
8c8621d71c
3فایلهای تغییر یافته به همراه161 افزوده شده و 2 حذف شده
  1. 16 1
      esp/services/ws_smc/ws_smcService.cpp
  2. 142 1
      esp/smc/SMCLib/TpWrapper.cpp
  3. 3 0
      esp/smc/SMCLib/TpWrapper.hpp

+ 16 - 1
esp/services/ws_smc/ws_smcService.cpp

@@ -237,12 +237,16 @@ struct CActiveWorkunitWrapper: public CActiveWorkunit
 
 void CActivityInfo::createActivityInfo(IEspContext& context)
 {
+    CConstWUClusterInfoArray clusters;
+#ifndef _CONTAINERIZED
     Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
     Owned<IConstEnvironment> env = factory->openEnvironment();
 
-    CConstWUClusterInfoArray clusters;
     Owned<IPropertyTree> envRoot= &env->getPTree();
     getEnvironmentClusterInfo(envRoot, clusters);
+#else
+    getContainerWUClusterInfo(clusters);
+#endif
 
     try
     {
@@ -261,7 +265,11 @@ void CActivityInfo::createActivityInfo(IEspContext& context)
     IPropertyTree* serverStatusRoot = connStatusServers->queryRoot();
 
     readTargetClusterInfo(clusters, serverStatusRoot);
+#ifndef _CONTAINERIZED
     readActiveWUsAndQueuedWUs(context, envRoot, serverStatusRoot);
+#else
+    readActiveWUsAndQueuedWUs(context, nullptr, serverStatusRoot);
+#endif
 
     timeCached.setNow();
 }
@@ -322,9 +330,13 @@ void CActivityInfo::readTargetClusterInfo(IConstWUClusterInfo& cluster, IPropert
 
     if (serverStatusRoot)
     {
+#ifndef _CONTAINERIZED
         smcQueue->foundQueueInStatusServer = findQueueInStatusServer(serverStatusRoot, statusServerName.str(), targetCluster->queueName.get());
         if (!smcQueue->foundQueueInStatusServer)
             targetCluster->clusterStatusDetails.appendf("Cluster %s not listening for workunits; ", clusterName.str());
+#else
+        smcQueue->foundQueueInStatusServer = true; //Server is launched dynamically.
+#endif
     }
 
     targetCluster->serverQueue.notFoundInJobQueues = !readJobQueue(targetCluster->serverQueue.queueName.str(), targetCluster->wuidsOnServerQueue, targetCluster->serverQueue.queueState, targetCluster->serverQueue.queueStateDetails);
@@ -418,8 +430,11 @@ void CActivityInfo::readActiveWUsAndQueuedWUs(IEspContext& context, IPropertyTre
     readRunningWUsAndJobQueueforOtherStatusServers(context, serverStatusRoot);
     //TODO: add queued WUs for ECLCCServer/ECLServer here. Right now, they are under target clusters.
 
+#ifndef _CONTAINERIZED
     getDFUServersAndWUs(context, envRoot, serverStatusRoot);
     getDFURecoveryJobs();
+    //For containerized HPCC, we do not know how to find out DFU Server queues, as well as running DFU WUs, for now.
+#endif
 }
 
 void CActivityInfo::readRunningWUsOnStatusServer(IEspContext& context, IPropertyTree* serverStatusRoot, WsSMCStatusServerType statusServerType)

+ 142 - 1
esp/smc/SMCLib/TpWrapper.cpp

@@ -25,7 +25,7 @@
 #include "workunit.hpp"
 #include "exception_util.hpp"
 #include "portlist.h"
-
+#include "daqueue.hpp"
 
 const char* MSG_FAILED_GET_ENVIRONMENT_INFO = "Failed to get environment information.";
 
@@ -2050,4 +2050,145 @@ extern TPWRAPPER_API IStringIterator* getContainerTargetClusters(const char* pro
         ret->append_unique(targetName);
     }
     return ret.getClear();
+}
+
+class CContainerWUClusterInfo : public CSimpleInterfaceOf<IConstWUClusterInfo>
+{
+    StringAttr name;
+    StringAttr serverQueue;
+    StringAttr agentQueue;
+    StringAttr thorQueue;
+    ClusterType platform;
+    unsigned clusterWidth;
+
+public:
+    CContainerWUClusterInfo(const char* _name, const char* type, unsigned _clusterWidth)
+        : name(_name), clusterWidth(_clusterWidth)
+    {
+        StringBuffer queue;
+        if (strieq(type, "thor"))
+        {
+            thorQueue.set(getClusterThorQueueName(queue.clear(), name));
+            platform = ThorLCRCluster;
+        }
+        else if (strieq(type, "roxie"))
+        {
+            agentQueue.set(getClusterEclAgentQueueName(queue.clear(), name));
+            platform = RoxieCluster;
+        }
+        else
+        {
+            agentQueue.set(getClusterEclAgentQueueName(queue.clear(), name));
+            platform = HThorCluster;
+        }
+
+        serverQueue.set(getClusterEclCCServerQueueName(queue.clear(), name));
+    }
+
+    virtual IStringVal& getName(IStringVal& str) const override
+    {
+        str.set(name.get());
+        return str;
+    }
+    virtual IStringVal& getAgentQueue(IStringVal& str) const override
+    {
+        str.set(agentQueue);
+        return str;
+    }
+    virtual IStringVal& getServerQueue(IStringVal& str) const override
+    {
+        str.set(serverQueue);
+        return str;
+    }
+    virtual IStringVal& getThorQueue(IStringVal& str) const override
+    {
+        str.set(thorQueue);
+        return str;
+    }
+    virtual ClusterType getPlatform() const override
+    {
+        return platform;
+    }
+    virtual unsigned getSize() const override
+    {
+        return clusterWidth;
+    }
+    virtual bool isLegacyEclServer() const override
+    {
+        return false;
+    }
+    virtual IStringVal& getScope(IStringVal& str) const override
+    {
+        UNIMPLEMENTED;
+    }
+    virtual unsigned getNumberOfSlaveLogs() const override
+    {
+        UNIMPLEMENTED;
+    }
+    virtual IStringVal & getAgentName(IStringVal & str) const override
+    {
+        UNIMPLEMENTED;
+    }
+    virtual IStringVal & getECLSchedulerName(IStringVal & str) const override
+    {
+        UNIMPLEMENTED;
+    }
+    virtual const StringArray & getECLServerNames() const override
+    {
+        UNIMPLEMENTED;
+    }
+    virtual IStringVal & getRoxieProcess(IStringVal & str) const override
+    {
+        UNIMPLEMENTED;
+    }
+    virtual const StringArray & getThorProcesses() const override
+    {
+        UNIMPLEMENTED;
+    }
+    virtual const StringArray & getPrimaryThorProcesses() const override
+    {
+        UNIMPLEMENTED;
+    }
+    virtual const SocketEndpointArray & getRoxieServers() const override
+    {
+        UNIMPLEMENTED;
+    }
+    virtual const char *getLdapUser() const override
+    {
+        UNIMPLEMENTED;
+    }
+    virtual const char *getLdapPassword() const override
+    {
+        UNIMPLEMENTED;
+    }
+    virtual unsigned getRoxieRedundancy() const override
+    {
+        UNIMPLEMENTED;
+    }
+    virtual unsigned getChannelsPerNode() const override
+    {
+        UNIMPLEMENTED;
+    }
+    virtual int getRoxieReplicateOffset() const override
+    {
+        UNIMPLEMENTED;
+    }
+    virtual const char *getAlias() const override
+    {
+        UNIMPLEMENTED;
+    }
+};
+
+extern TPWRAPPER_API unsigned getContainerWUClusterInfo(CConstWUClusterInfoArray& clusters)
+{
+    Owned<IPropertyTreeIterator> queues = queryComponentConfig().getElements("queues");
+    ForEach(*queues)
+    {
+        IPropertyTree& queue = queues->query();
+        Owned<IConstWUClusterInfo> cluster = new CContainerWUClusterInfo(queue.queryProp("@name"),
+            queue.queryProp("@type"), (unsigned) queue.getPropInt("@width", 1));
+        clusters.append(*cluster.getClear());
+    }
+
+    return clusters.ordinality();
 }

+ 3 - 0
esp/smc/SMCLib/TpWrapper.hpp

@@ -211,6 +211,9 @@ private:
 };
 
 extern TPWRAPPER_API ISashaCommand* archiveOrRestoreWorkunits(StringArray& wuids, IProperties* params, bool archive, bool dfu);
+
+extern TPWRAPPER_API unsigned getContainerWUClusterInfo(CConstWUClusterInfoArray& clusters);
+
 extern TPWRAPPER_API IStringIterator *getContainerTargetClusters(const char* processType, const char* processName);
 
 #endif //_ESPWIZ_TpWrapper_HPP__