瀏覽代碼

HPCC-23027 Return channels in ESP machine responses

For thor slave, the value is read from @channelsPerSlave.
For roxie, the value is read from @channelsPerNode.

A bug is fixed for retrieving thor slaves (processes) in
ws_machineEx::getThorProcesses(). The existing code uses
thor group name to lookup thor node groups. Then, one
slave process is defined for each node group. That is
incorrect if one thor slave has multiple channels because
each channel has its own node group. The new code calls
the getClusterProcessNodeGroup(). The call returns node
groups, one for each slave node. Then, the code defines
thor slaves in each node using @slavePerNode.

Signed-off-by: wangkx <kevin.wang@lexisnexis.com>
wangkx 5 年之前
父節點
當前提交
427f0d7ce1

+ 2 - 1
esp/scm/ws_machine.ecm

@@ -179,6 +179,7 @@ ESPstruct MachineInfoEx
    [min_ver("1.13")] string RoxieStateDetails;
    [min_ver("1.13")] string RoxieStateDetails;
    int    OS;
    int    OS;
    [min_ver("1.10")] int    ProcessNumber;
    [min_ver("1.10")] int    ProcessNumber;
+   [min_ver("1.16")] unsigned Channels;
    ESParray<ESPstruct ProcessorInfo> Processors;
    ESParray<ESPstruct ProcessorInfo> Processors;
    ESParray<ESPstruct StorageInfo> Storage;
    ESParray<ESPstruct StorageInfo> Storage;
    ESParray<ESPstruct SWRunInfo> Running;
    ESParray<ESPstruct SWRunInfo> Running;
@@ -447,7 +448,7 @@ ESPresponse [encode(0), nil_remove, exceptions_inline] GetNodeGroupUsageResponse
 };
 };
 
 
 //-------- service ---------
 //-------- service ---------
-ESPservice [auth_feature("DEFERRED"), version("1.15")] ws_machine
+ESPservice [auth_feature("DEFERRED"), version("1.16")] ws_machine
 {
 {
     ESPmethod [resp_xsl_default("./smc_xslt/clusterprocesses.xslt"), exceptions_inline("./smc_xslt/exceptions.xslt")]
     ESPmethod [resp_xsl_default("./smc_xslt/clusterprocesses.xslt"), exceptions_inline("./smc_xslt/exceptions.xslt")]
        GetTargetClusterInfo(GetTargetClusterInfoRequest, GetTargetClusterInfoResponse);
        GetTargetClusterInfo(GetTargetClusterInfoRequest, GetTargetClusterInfoResponse);

+ 2 - 1
esp/scm/ws_topology.ecm

@@ -32,6 +32,7 @@ ESPStruct TpMachine
     string Path;
     string Path;
     int    Port;
     int    Port;
     [min_ver("1.18")] int    ProcessNumber;
     [min_ver("1.18")] int    ProcessNumber;
+    [min_ver("1.30")] unsigned Channels;
 };
 };
 
 
 //  ===========================================================================
 //  ===========================================================================
@@ -636,7 +637,7 @@ ESPresponse [nil_remove, exceptions_inline] TpDropZoneQueryResponse
     ESParray<ESPstruct TpDropZone>    TpDropZones;
     ESParray<ESPstruct TpDropZone>    TpDropZones;
 };
 };
 
 
-ESPservice [auth_feature("DEFERRED"), noforms, version("1.29"), cache_group("ESPWsTP"), exceptions_inline("./smc_xslt/exceptions.xslt")] WsTopology
+ESPservice [auth_feature("DEFERRED"), noforms, version("1.30"), cache_group("ESPWsTP"), exceptions_inline("./smc_xslt/exceptions.xslt")] WsTopology
 {
 {
     ESPmethod [cache_seconds(180), cache_global(1), resp_xsl_default("/esp/xslt/targetclusters.xslt")] TpTargetClusterQuery(TpTargetClusterQueryRequest, TpTargetClusterQueryResponse);
     ESPmethod [cache_seconds(180), cache_global(1), resp_xsl_default("/esp/xslt/targetclusters.xslt")] TpTargetClusterQuery(TpTargetClusterQueryRequest, TpTargetClusterQueryResponse);
     ESPmethod [cache_seconds(180), cache_global(1), resp_xsl_default("/esp/xslt/topology.xslt")] TpClusterQuery(TpClusterQueryRequest, TpClusterQueryResponse);
     ESPmethod [cache_seconds(180), cache_global(1), resp_xsl_default("/esp/xslt/topology.xslt")] TpClusterQuery(TpClusterQueryRequest, TpClusterQueryResponse);

+ 105 - 22
esp/services/ws_machine/ws_machineService.cpp

@@ -42,15 +42,18 @@ public:
     CMachineData&                   m_machineData;          //From request
     CMachineData&                   m_machineData;          //From request
     IArrayOf<IEspMachineInfoEx>&    m_machineInfoTable;     //For response
     IArrayOf<IEspMachineInfoEx>&    m_machineInfoTable;     //For response
     StringArray&                    m_machineInfoColumns;   //For response
     StringArray&                    m_machineInfoColumns;   //For response
+    MapStringTo<int>&               channelsMap;            //For response
 
 
     CMachineInfoThreadParam(Cws_machineEx* pService, IEspContext& context, CGetMachineInfoUserOptions&  options,
     CMachineInfoThreadParam(Cws_machineEx* pService, IEspContext& context, CGetMachineInfoUserOptions&  options,
-        CMachineData& machineData, IArrayOf<IEspMachineInfoEx>& machineInfoTable, StringArray& machineInfoColumns )
+        CMachineData& machineData, IArrayOf<IEspMachineInfoEx>& machineInfoTable, StringArray& machineInfoColumns,
+        MapStringTo<int>& _channelsMap)
        : CWsMachineThreadParam(NULL, NULL, NULL, pService),
        : CWsMachineThreadParam(NULL, NULL, NULL, pService),
          m_context(context),
          m_context(context),
          m_options(options),
          m_options(options),
          m_machineData(machineData),
          m_machineData(machineData),
          m_machineInfoTable(machineInfoTable),
          m_machineInfoTable(machineInfoTable),
-         m_machineInfoColumns(machineInfoColumns)
+         m_machineInfoColumns(machineInfoColumns),
+         channelsMap(_channelsMap)
     {
     {
     }
     }
 
 
@@ -66,6 +69,7 @@ public:
         if (m_machineInfoColumns.find(columnName) == NotFound)
         if (m_machineInfoColumns.find(columnName) == NotFound)
             m_machineInfoColumns.append(columnName);
             m_machineInfoColumns.append(columnName);
     }
     }
+    int* getChannels(const char* key) { return channelsMap.getValue(key); };
 private:
 private:
     static Mutex s_mutex;
     static Mutex s_mutex;
 };
 };
@@ -77,9 +81,12 @@ class CRoxieStateInfoThreadParam : public CWsMachineThreadParam
 public:
 public:
     StringAttr                      clusterName;
     StringAttr                      clusterName;
     IArrayOf<IEspMachineInfoEx>&    machineInfoTable;     //For response
     IArrayOf<IEspMachineInfoEx>&    machineInfoTable;     //For response
+    MapStringTo<int>&               channelsMap;          //For response
 
 
-    CRoxieStateInfoThreadParam(Cws_machineEx* pService, const char* _clusterName, IArrayOf<IEspMachineInfoEx>& _machineInfoTable)
-       : CWsMachineThreadParam(pService), clusterName(_clusterName), machineInfoTable(_machineInfoTable)
+    CRoxieStateInfoThreadParam(Cws_machineEx* pService, const char* _clusterName,
+        IArrayOf<IEspMachineInfoEx>& _machineInfoTable, MapStringTo<int>& _channelsMap)
+        : CWsMachineThreadParam(pService), clusterName(_clusterName), machineInfoTable(_machineInfoTable),
+        channelsMap(_channelsMap)
     {
     {
     }
     }
 
 
@@ -87,6 +94,7 @@ public:
     {
     {
         m_pService->getRoxieStateInfo(this);
         m_pService->getRoxieStateInfo(this);
     }
     }
+    int* getChannels(const char* key) { return channelsMap.getValue(key); };
 };
 };
 
 
 class CGetMachineUsageThreadParam : public CWsMachineThreadParam
 class CGetMachineUsageThreadParam : public CWsMachineThreadParam
@@ -268,6 +276,47 @@ bool Cws_machineEx::onGetTargetClusterInfo(IEspContext &context, IEspGetTargetCl
     return true;
     return true;
 }
 }
 
 
+void Cws_machineEx::addChannels(CGetMachineInfoData& machineInfoData, IPropertyTree* envRoot,
+    const char* componentType, const char* componentName)
+{
+    VStringBuffer key("%s|%s", componentType, componentName);
+    int* channels = machineInfoData.getChannels(key);
+    if (channels)
+        return;
+
+    StringBuffer path("Software/");
+    if (strieq(componentType, eqThorSlaveProcess))
+        path.append(eqThorCluster);
+    else if (strieq(componentType, eqRoxieServerProcess))
+        path.append(eqRoxieCluster);
+    else
+        throw MakeStringException(ECLWATCH_INVALID_COMPONENT_TYPE, "Invalid %s in Cws_machineEx::addChannels().", componentType);
+
+    path.appendf("[@name=\"%s\"]", componentName);
+
+    Owned<IPropertyTree> component;
+    if (envRoot)
+    {
+        component.setown(envRoot->getPropTree(path));
+    }
+    else
+    {
+        Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory(true);
+        Owned<IConstEnvironment> constEnv = envFactory->openEnvironment();
+        Owned<IPropertyTree> root = &constEnv->getPTree();
+        component.setown(root->getPropTree(path));
+    }
+    if (!component)
+        throw MakeStringException(ECLWATCH_INVALID_IP_OR_COMPONENT, "%s not found.", componentName);
+
+    StringAttr attr;
+    if (strieq(componentType, eqThorSlaveProcess))
+        attr.set("@channelsPerSlave");
+    else
+        attr.set("@channelsPerNode");
+    if (component->hasProp(attr.get()))
+        machineInfoData.addToChannelsMap(key, component->getPropInt(attr.get()));
+}
 ////////////////////////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////////////////////////
 // Read Machine Infomation request and collect related settings from environment.xml  //
 // Read Machine Infomation request and collect related settings from environment.xml  //
 ////////////////////////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////////////////////////
@@ -275,6 +324,7 @@ bool Cws_machineEx::onGetTargetClusterInfo(IEspContext &context, IEspGetTargetCl
 void Cws_machineEx::readMachineInfoRequest(IEspContext& context, bool getProcessorInfo, bool getStorageInfo, bool localFileSystemsOnly, bool getSoftwareInfo, bool applyProcessFilter,
 void Cws_machineEx::readMachineInfoRequest(IEspContext& context, bool getProcessorInfo, bool getStorageInfo, bool localFileSystemsOnly, bool getSoftwareInfo, bool applyProcessFilter,
                                            StringArray& processes, const char* addProcessesToFilters, CGetMachineInfoData& machineInfoData)
                                            StringArray& processes, const char* addProcessesToFilters, CGetMachineInfoData& machineInfoData)
 {
 {
+    double version = context.getClientVersion();
     StringBuffer userID, password;
     StringBuffer userID, password;
     context.getUserID(userID);
     context.getUserID(userID);
     context.getPassword(password);
     context.getPassword(password);
@@ -312,6 +362,8 @@ void Cws_machineEx::readMachineInfoRequest(IEspContext& context, bool getProcess
         setProcessRequest(machineInfoData, uniqueProcesses, address1.str(), address2.str(), processType.str(), compName.str(), path.str(), processNumber);
         setProcessRequest(machineInfoData, uniqueProcesses, address1.str(), address2.str(), processType.str(), compName.str(), path.str(), processNumber);
         if (strieq(processType.str(), eqRoxieServerProcess))
         if (strieq(processType.str(), eqRoxieServerProcess))
             machineInfoData.appendRoxieClusters(compName.str());
             machineInfoData.appendRoxieClusters(compName.str());
+        if ((version >= 1.16) && (strieq(processType, eqThorSlaveProcess) || strieq(processType, eqRoxieServerProcess)))
+            addChannels(machineInfoData, nullptr, processType, compName);
     }
     }
 }
 }
 
 
@@ -606,28 +658,28 @@ void Cws_machineEx::readSettingsForTargetClusters(IEspContext& context, StringAr
         //Read Cluster processes
         //Read Cluster processes
         if (clusterProcesses && clusterProcesses->first())
         if (clusterProcesses && clusterProcesses->first())
             ForEach(*clusterProcesses)
             ForEach(*clusterProcesses)
-                readTargetClusterProcesses(clusterProcesses->query(), clusterType.str(), uniqueProcesses, machineInfoData, targetClusterOut);
+                readTargetClusterProcesses(context, clusterProcesses->query(), clusterType.str(), uniqueProcesses, machineInfoData, targetClusterOut);
 
 
         //Read eclCCServer process
         //Read eclCCServer process
         if (eclCCServerProcesses->first())
         if (eclCCServerProcesses->first())
-            readTargetClusterProcesses(eclCCServerProcesses->query(), eqEclCCServer, uniqueProcesses, machineInfoData, targetClusterOut);
+            readTargetClusterProcesses(context, eclCCServerProcesses->query(), eqEclCCServer, uniqueProcesses, machineInfoData, targetClusterOut);
 
 
         //Read eclServer process
         //Read eclServer process
         if (eclServerProcesses->first())
         if (eclServerProcesses->first())
-            readTargetClusterProcesses(eclServerProcesses->query(), eqEclServer, uniqueProcesses, machineInfoData, targetClusterOut);
+            readTargetClusterProcesses(context, eclServerProcesses->query(), eqEclServer, uniqueProcesses, machineInfoData, targetClusterOut);
 
 
         //Read eclAgent process
         //Read eclAgent process
         if (eclAgentProcesses->first())
         if (eclAgentProcesses->first())
-            readTargetClusterProcesses(eclAgentProcesses->query(), eqEclAgent, uniqueProcesses, machineInfoData, targetClusterOut);
+            readTargetClusterProcesses(context, eclAgentProcesses->query(), eqEclAgent, uniqueProcesses, machineInfoData, targetClusterOut);
 
 
         //Read eclScheduler process
         //Read eclScheduler process
         if (eclSchedulerProcesses->first())
         if (eclSchedulerProcesses->first())
-            readTargetClusterProcesses(eclSchedulerProcesses->query(), eqEclScheduler, uniqueProcesses, machineInfoData, targetClusterOut);
+            readTargetClusterProcesses(context, eclSchedulerProcesses->query(), eqEclScheduler, uniqueProcesses, machineInfoData, targetClusterOut);
     }
     }
 }
 }
 
 
 //Collect settings for one group of target cluster processes
 //Collect settings for one group of target cluster processes
-void Cws_machineEx::readTargetClusterProcesses(IPropertyTree &processNode, const char* nodeType, BoolHash& uniqueProcesses, CGetMachineInfoData& machineInfoData,
+void Cws_machineEx::readTargetClusterProcesses(IEspContext& context, IPropertyTree &processNode, const char* nodeType, BoolHash& uniqueProcesses, CGetMachineInfoData& machineInfoData,
                                               IPropertyTree* targetClustersOut)
                                               IPropertyTree* targetClustersOut)
 {
 {
     const char* process = processNode.queryProp("@process");
     const char* process = processNode.queryProp("@process");
@@ -643,6 +695,7 @@ void Cws_machineEx::readTargetClusterProcesses(IPropertyTree &processNode, const
     if (!pEnvironmentSoftware)
     if (!pEnvironmentSoftware)
         throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Failed to get environment information.");
         throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Failed to get environment information.");
 
 
+    double version = context.getClientVersion();
     IPropertyTree* pClusterProcess = NULL;
     IPropertyTree* pClusterProcess = NULL;
     if (strieq(nodeType, eqThorCluster) || strieq(nodeType, eqRoxieCluster))
     if (strieq(nodeType, eqThorCluster) || strieq(nodeType, eqRoxieCluster))
     {
     {
@@ -679,11 +732,16 @@ void Cws_machineEx::readTargetClusterProcesses(IPropertyTree &processNode, const
         getProcesses(constEnv, pClusterProcess, process, eqThorMasterProcess, dirStr.str(), machineInfoData, true, uniqueProcesses);
         getProcesses(constEnv, pClusterProcess, process, eqThorMasterProcess, dirStr.str(), machineInfoData, true, uniqueProcesses);
         getThorProcesses(constEnv, pClusterProcess, process, eqThorSlaveProcess, dirStr.str(), machineInfoData, uniqueProcesses);
         getThorProcesses(constEnv, pClusterProcess, process, eqThorSlaveProcess, dirStr.str(), machineInfoData, uniqueProcesses);
         getThorProcesses(constEnv, pClusterProcess, process, eqThorSpareProcess, dirStr.str(), machineInfoData, uniqueProcesses);
         getThorProcesses(constEnv, pClusterProcess, process, eqThorSpareProcess, dirStr.str(), machineInfoData, uniqueProcesses);
+        if (version >= 1.16)
+           addChannels(machineInfoData, pEnvironmentRoot, eqThorSlaveProcess, process);
+
     }
     }
     else if (strieq(nodeType, eqRoxieCluster))
     else if (strieq(nodeType, eqRoxieCluster))
     {
     {
         BoolHash uniqueRoxieProcesses;
         BoolHash uniqueRoxieProcesses;
         getProcesses(constEnv, pClusterProcess, process, eqRoxieServerProcess, dirStr.str(), machineInfoData, true, uniqueProcesses, &uniqueRoxieProcesses);
         getProcesses(constEnv, pClusterProcess, process, eqRoxieServerProcess, dirStr.str(), machineInfoData, true, uniqueProcesses, &uniqueRoxieProcesses);
+        if (version >= 1.16)
+           addChannels(machineInfoData, pEnvironmentRoot, eqRoxieServerProcess, process);
     }
     }
 }
 }
 
 
@@ -693,20 +751,35 @@ void Cws_machineEx::getThorProcesses(IConstEnvironment* constEnv, IPropertyTree*
     if (!constEnv || !cluster)
     if (!constEnv || !cluster)
         return;
         return;
 
 
+    Owned<IGroup> nodeGroup;
     StringBuffer groupName;
     StringBuffer groupName;
     if (strieq(processType, eqThorSlaveProcess))
     if (strieq(processType, eqThorSlaveProcess))
+    {
         getClusterGroupName(*cluster, groupName);
         getClusterGroupName(*cluster, groupName);
-    else if (strieq(processType, eqThorSpareProcess))
+        if (groupName.length() < 1)
+        {
+            OWARNLOG("Cannot find group name for %s", processName);
+            return;
+        }
+        nodeGroup.setown(getClusterProcessNodeGroup(processName, eqThorCluster));
+    }
+    else
+    {
         getClusterSpareGroupName(*cluster, groupName);
         getClusterSpareGroupName(*cluster, groupName);
-
-    if (groupName.length() < 1)
-        return;
-
-    Owned<IGroup> nodeGroup = queryNamedGroupStore().lookup(groupName.str());
+        if (groupName.length() < 1)
+        {
+            OWARNLOG("Cannot find group name for %s", processName);
+            return;
+        }
+        nodeGroup.setown(queryNamedGroupStore().lookup(groupName.str()));
+    }
     if (!nodeGroup || (nodeGroup->ordinality() == 0))
     if (!nodeGroup || (nodeGroup->ordinality() == 0))
+    {
+        OWARNLOG("Cannot find node group for %s", processName);
         return;
         return;
+    }
 
 
-    unsigned processNumber = 0;
+    int slavesPerNode = cluster->getPropInt("@slavesPerNode");
     Owned<INodeIterator> gi = nodeGroup->getIterator();
     Owned<INodeIterator> gi = nodeGroup->getIterator();
     ForEach(*gi)
     ForEach(*gi)
     {
     {
@@ -718,8 +791,6 @@ void Cws_machineEx::getThorProcesses(IConstEnvironment* constEnv, IPropertyTree*
             continue;
             continue;
         }
         }
 
 
-        processNumber++;
-
         StringBuffer netAddress;
         StringBuffer netAddress;
         const char* ip = addressRead.str();
         const char* ip = addressRead.str();
         if (!streq(ip, "."))
         if (!streq(ip, "."))
@@ -745,7 +816,10 @@ void Cws_machineEx::getThorProcesses(IConstEnvironment* constEnv, IPropertyTree*
             continue;
             continue;
         }
         }
 
 
-        setProcessRequest(machineInfoData, uniqueProcesses, netAddress.str(), addressRead.str(), processType, processName, directory, processNumber);
+        //Each thor slave is a process. The i is used to check whether the process is running or not.
+        for (unsigned i = 1; i <= slavesPerNode; i++)
+            setProcessRequest(machineInfoData, uniqueProcesses, netAddress.str(), addressRead.str(),
+                processType, processName, directory, i);
     }
     }
 
 
     return;
     return;
@@ -1003,7 +1077,8 @@ void Cws_machineEx::getMachineInfo(IEspContext& context, bool getRoxieState, CGe
         ForEachItemIn(idx, machines)
         ForEachItemIn(idx, machines)
         {
         {
             Owned<CMachineInfoThreadParam> pThreadReq = new CMachineInfoThreadParam(this, context, machineInfoData.getOptions(),
             Owned<CMachineInfoThreadParam> pThreadReq = new CMachineInfoThreadParam(this, context, machineInfoData.getOptions(),
-                machines.item(idx), machineInfoData.getMachineInfoTable(), machineInfoData.getMachineInfoColumns());
+                machines.item(idx), machineInfoData.getMachineInfoTable(), machineInfoData.getMachineInfoColumns(),
+                machineInfoData.getChannelsMap());
             PooledThreadHandle handle = m_threadPool->start( pThreadReq.getClear());
             PooledThreadHandle handle = m_threadPool->start( pThreadReq.getClear());
             threadHandles.append(handle);
             threadHandles.append(handle);
         }
         }
@@ -1014,7 +1089,7 @@ void Cws_machineEx::getMachineInfo(IEspContext& context, bool getRoxieState, CGe
         ForEachItemIn(i, roxieClusters)
         ForEachItemIn(i, roxieClusters)
         {
         {
             Owned<CRoxieStateInfoThreadParam> pThreadReq = new CRoxieStateInfoThreadParam(this, roxieClusters.item(i),
             Owned<CRoxieStateInfoThreadParam> pThreadReq = new CRoxieStateInfoThreadParam(this, roxieClusters.item(i),
-                machineInfoData.getMachineInfoTable());
+                machineInfoData.getMachineInfoTable(), machineInfoData.getChannelsMap());
             PooledThreadHandle handle = m_threadPool->start( pThreadReq.getClear());
             PooledThreadHandle handle = m_threadPool->start( pThreadReq.getClear());
             threadHandles.append(handle);
             threadHandles.append(handle);
         }
         }
@@ -1642,6 +1717,14 @@ void Cws_machineEx::setProcessInfo(IEspContext& context, CMachineInfoThreadParam
         pMachineInfo->setProcessNumber(process.getProcessNumber());
         pMachineInfo->setProcessNumber(process.getProcessNumber());
     }
     }
 
 
+    if ((version >= 1.16) && (strieq(process.getType(), eqThorSlaveProcess) || strieq(process.getType(), eqRoxieServerProcess)))
+    {
+        VStringBuffer key("%s|%s", process.getType(), process.getName());
+        int* channels = pParam->getChannels(key);
+        if (channels)
+            pMachineInfo->setChannels(*channels);
+    }
+
     if (error != 0 || !response || !*response)
     if (error != 0 || !response || !*response)
     {
     {
         StringBuffer description;
         StringBuffer description;

+ 7 - 1
esp/services/ws_machine/ws_machineService.hpp

@@ -683,6 +683,7 @@ class CGetMachineInfoData
 
 
     StringArray                     roxieClusters;
     StringArray                     roxieClusters;
     BoolHash                        uniqueRoxieClusters;
     BoolHash                        uniqueRoxieClusters;
+    MapStringTo<int>                channelsMap;
 
 
 public:
 public:
     CGetMachineInfoUserOptions& getOptions()
     CGetMachineInfoUserOptions& getOptions()
@@ -719,6 +720,10 @@ public:
         roxieClusters.append(clusterName);
         roxieClusters.append(clusterName);
         uniqueRoxieClusters.setValue(clusterName, true);
         uniqueRoxieClusters.setValue(clusterName, true);
     }
     }
+
+    MapStringTo<int>& getChannelsMap() { return channelsMap; };
+    void addToChannelsMap(const char* key, int channels) { channelsMap.setValue(key, channels); };
+    int* getChannels(const char* key) { return channelsMap.getValue(key); };
 };
 };
 
 
 const unsigned MACHINE_USAGE_MAX_CACHE_SIZE = 8;
 const unsigned MACHINE_USAGE_MAX_CACHE_SIZE = 8;
@@ -903,7 +908,7 @@ private:
     void getThorProcesses(IConstEnvironment* constEnv,  IPropertyTree* cluster, const char* processName, const char* processType, const char* directory, CGetMachineInfoData& machineInfoData, BoolHash& uniqueProcesses);
     void getThorProcesses(IConstEnvironment* constEnv,  IPropertyTree* cluster, const char* processName, const char* processType, const char* directory, CGetMachineInfoData& machineInfoData, BoolHash& uniqueProcesses);
     const char* getProcessTypeFromMachineType(const char* machineType);
     const char* getProcessTypeFromMachineType(const char* machineType);
     void readSettingsForTargetClusters(IEspContext& context, StringArray& targetClusters, CGetMachineInfoData& machineInfoData, IPropertyTree* targetClustersOut);
     void readSettingsForTargetClusters(IEspContext& context, StringArray& targetClusters, CGetMachineInfoData& machineInfoData, IPropertyTree* targetClustersOut);
-    void readTargetClusterProcesses(IPropertyTree& targetClusters, const char* processType, BoolHash& uniqueProcesses, CGetMachineInfoData& machineInfoData, IPropertyTree* targetClustersOut);
+    void readTargetClusterProcesses(IEspContext& context, IPropertyTree& targetClusters, const char* processType, BoolHash& uniqueProcesses, CGetMachineInfoData& machineInfoData, IPropertyTree* targetClustersOut);
     void setTargetClusterInfo(IPropertyTree* pTargetClusterTree, IArrayOf<IEspMachineInfoEx>& machineArray, IArrayOf<IEspTargetClusterInfo>& targetClusterInfoList);
     void setTargetClusterInfo(IPropertyTree* pTargetClusterTree, IArrayOf<IEspMachineInfoEx>& machineArray, IArrayOf<IEspTargetClusterInfo>& targetClusterInfoList);
 
 
     void buildPreflightCommand(IEspContext& context, CMachineInfoThreadParam* pParam, StringBuffer& preflightCommand);
     void buildPreflightCommand(IEspContext& context, CMachineInfoThreadParam* pParam, StringBuffer& preflightCommand);
@@ -983,6 +988,7 @@ private:
     bool readComponentUsageCache(IEspContext& context, const char* id, IEspGetComponentUsageResponse& resp);
     bool readComponentUsageCache(IEspContext& context, const char* id, IEspGetComponentUsageResponse& resp);
     bool readTargetClusterUsageCache(IEspContext& context, const char* id, IEspGetTargetClusterUsageResponse& resp);
     bool readTargetClusterUsageCache(IEspContext& context, const char* id, IEspGetTargetClusterUsageResponse& resp);
     bool readNodeGroupUsageCache(IEspContext& context, const char* id, IEspGetNodeGroupUsageResponse& resp);
     bool readNodeGroupUsageCache(IEspContext& context, const char* id, IEspGetNodeGroupUsageResponse& resp);
+    void addChannels(CGetMachineInfoData& machineInfoData, IPropertyTree* envRoot, const char* componentType, const char* componentName);
 
 
     //Still used in StartStop/Rexec, so keep them for now.
     //Still used in StartStop/Rexec, so keep them for now.
     enum OpSysType { OS_Windows, OS_Solaris, OS_Linux };
     enum OpSysType { OS_Windows, OS_Solaris, OS_Linux };

+ 61 - 50
esp/smc/SMCLib/TpWrapper.cpp

@@ -83,7 +83,7 @@ void CTpWrapper::getClusterMachineList(double clientVersion,
         if (strcmp(eqTHORMACHINES,ClusterType) == 0)
         if (strcmp(eqTHORMACHINES,ClusterType) == 0)
         {
         {
             bool multiSlaves = false;
             bool multiSlaves = false;
-            getMachineList(eqThorMasterProcess,path.str(),"", ClusterDirectory, MachineList);
+            getMachineList(clientVersion, eqThorMasterProcess, path.str(), "", ClusterDirectory, MachineList);
             getThorSlaveMachineList(clientVersion, ClusterName, ClusterDirectory, MachineList);
             getThorSlaveMachineList(clientVersion, ClusterName, ClusterDirectory, MachineList);
             unsigned count = MachineList.length();
             unsigned count = MachineList.length();
             getThorSpareMachineList(clientVersion, ClusterName, ClusterDirectory, MachineList);
             getThorSpareMachineList(clientVersion, ClusterName, ClusterDirectory, MachineList);
@@ -95,24 +95,24 @@ void CTpWrapper::getClusterMachineList(double clientVersion,
         }
         }
         else if (strcmp(eqHOLEMACHINES,ClusterType) == 0)
         else if (strcmp(eqHOLEMACHINES,ClusterType) == 0)
         {
         {
-            getMachineList(eqHoleSocketProcess,path.str(),"", ClusterDirectory, MachineList);
-            getMachineList(eqHoleProcessorProcess,path.str(),"", ClusterDirectory, MachineList);
-            getMachineList(eqHoleControlProcess,path.str(),"", ClusterDirectory, MachineList);
-            getMachineList(eqHoleCollatorProcess,path.str(),"", ClusterDirectory, MachineList);
-            getMachineList(eqHoleStandbyProcess,path.str(),"", ClusterDirectory, MachineList);
+            getMachineList(clientVersion, eqHoleSocketProcess, path.str(), "", ClusterDirectory, MachineList);
+            getMachineList(clientVersion, eqHoleProcessorProcess, path.str(), "", ClusterDirectory, MachineList);
+            getMachineList(clientVersion, eqHoleControlProcess, path.str(), "", ClusterDirectory, MachineList);
+            getMachineList(clientVersion, eqHoleCollatorProcess, path.str(), "", ClusterDirectory, MachineList);
+            getMachineList(clientVersion, eqHoleStandbyProcess, path.str(), "", ClusterDirectory, MachineList);
         }
         }
         else if (strcmp(eqROXIEMACHINES,ClusterType) == 0)
         else if (strcmp(eqROXIEMACHINES,ClusterType) == 0)
         {
         {
-            getMachineList("RoxieServerProcess",path.str(),"", ClusterDirectory, MachineList, &machineNames);
+            getMachineList(clientVersion, "RoxieServerProcess", path.str(), "", ClusterDirectory, MachineList, &machineNames);
         }
         }
         else if (strcmp(eqMACHINES,ClusterType) == 0)
         else if (strcmp(eqMACHINES,ClusterType) == 0)
         {
         {
             //load a list of available machines.......
             //load a list of available machines.......
-            getMachineList("Computer","/Environment/Hardware","", ClusterDirectory, MachineList);
+            getMachineList(clientVersion, "Computer", "/Environment/Hardware", "", ClusterDirectory, MachineList);
         }
         }
         else if (strcmp("AVAILABLEMACHINES",ClusterType) == 0)
         else if (strcmp("AVAILABLEMACHINES",ClusterType) == 0)
         {
         {
-            getMachineList("Computer","/Environment/Hardware",eqMachineAvailablability, ClusterDirectory, MachineList);
+            getMachineList(clientVersion, "Computer", "/Environment/Hardware", eqMachineAvailablability, ClusterDirectory, MachineList);
         }
         }
         else if (strcmp("DROPZONE",ClusterType) == 0)
         else if (strcmp("DROPZONE",ClusterType) == 0)
         {
         {
@@ -121,7 +121,7 @@ void CTpWrapper::getClusterMachineList(double clientVersion,
         else if (strcmp("STANDBYNNODE",ClusterType) == 0)
         else if (strcmp("STANDBYNNODE",ClusterType) == 0)
         {
         {
             getThorSpareMachineList(clientVersion, ClusterName, ClusterDirectory, MachineList);
             getThorSpareMachineList(clientVersion, ClusterName, ClusterDirectory, MachineList);
-            getMachineList(eqHoleStandbyProcess,path.str(),"", ClusterDirectory, MachineList);
+            getMachineList(clientVersion, eqHoleStandbyProcess, path.str(), "", ClusterDirectory, MachineList);
         }
         }
         else if (strcmp("THORSPARENODES",ClusterType) == 0)
         else if (strcmp("THORSPARENODES",ClusterType) == 0)
         {
         {
@@ -129,7 +129,7 @@ void CTpWrapper::getClusterMachineList(double clientVersion,
         }
         }
         else if (strcmp("HOLESTANDBYNODES",ClusterType) == 0)
         else if (strcmp("HOLESTANDBYNODES",ClusterType) == 0)
         {
         {
-            getMachineList(eqHoleStandbyProcess,path.str(),"", ClusterDirectory, MachineList);
+            getMachineList(clientVersion, eqHoleStandbyProcess, path.str(), "", ClusterDirectory, MachineList);
         }
         }
     }
     }
     catch(IException* e){   
     catch(IException* e){   
@@ -1439,8 +1439,8 @@ bool CTpWrapper::checkMultiSlavesFlag(const char* clusterName)
     return cluster->getPropBool("@multiSlaves");
     return cluster->getPropBool("@multiSlaves");
 }
 }
 
 
-void CTpWrapper::appendMachineList(double clientVersion, IConstEnvironment* constEnv, INode& node, const char* clusterName,
-    const char* machineType, unsigned& processNumber, const char* directory, IArrayOf<IEspTpMachine>& machineList)
+void CTpWrapper::appendThorMachineList(double clientVersion, IConstEnvironment* constEnv, INode& node, const char* clusterName,
+    const char* machineType, unsigned& processNumber, unsigned channels, const char* directory, IArrayOf<IEspTpMachine>& machineList)
 {
 {
     StringBuffer netAddress;
     StringBuffer netAddress;
     node.endpoint().getIpText(netAddress);
     node.endpoint().getIpText(netAddress);
@@ -1473,6 +1473,9 @@ void CTpWrapper::appendMachineList(double clientVersion, IConstEnvironment* cons
         machineInfo->setOS(MachineOsUnknown);
         machineInfo->setOS(MachineOsUnknown);
     }
     }
 
 
+    if (clientVersion >= 1.30)
+        machineInfo->setChannels(channels);
+
     machineList.append(*machineInfo.getLink());
     machineList.append(*machineInfo.getLink());
 }
 }
 
 
@@ -1480,16 +1483,7 @@ void CTpWrapper::getThorSlaveMachineList(double clientVersion, const char* clust
 {
 {
     try
     try
     {
     {
-        Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory(true);
-        Owned<IConstEnvironment> constEnv = envFactory->openEnvironment();
-        Owned<IGroup> nodeGroup = getClusterProcessNodeGroup(clusterName, "ThorCluster");
-        if (!nodeGroup || (nodeGroup->ordinality() == 0))
-            return;
-
-        unsigned processNumber = 0;
-        Owned<INodeIterator> gi = nodeGroup->getIterator();
-        ForEach(*gi)
-            appendMachineList(clientVersion, constEnv, gi->query(), clusterName, eqThorSlaveProcess, processNumber, directory, machineList);
+        getThorMachineList(clientVersion, clusterName, directory, true, machineList);
     }
     }
     catch(IException* e)
     catch(IException* e)
     {
     {
@@ -1511,27 +1505,7 @@ void CTpWrapper::getThorSpareMachineList(double clientVersion, const char* clust
     try
     try
     {
     {
         Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory(true);
         Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory(true);
-        Owned<IConstEnvironment> constEnv = envFactory->openEnvironment();
-        Owned<IPropertyTree> root = &constEnv->getPTree();
-
-        VStringBuffer path("Software/ThorCluster[@name=\"%s\"]", clusterName);
-        Owned<IPropertyTree> cluster= root->getPropTree(path.str());
-        if (!cluster)
-            throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
-
-        StringBuffer groupName;
-        getClusterSpareGroupName(*cluster, groupName);
-        if (groupName.length() < 1)
-            return;
-
-        Owned<IGroup> nodeGroup = queryNamedGroupStore().lookup(groupName.str());
-        if (!nodeGroup || (nodeGroup->ordinality() == 0))
-            return;
-
-        unsigned processNumber = 0;
-        Owned<INodeIterator> gi = nodeGroup->getIterator();
-        ForEach(*gi)
-            appendMachineList(clientVersion, constEnv, gi->query(), clusterName, eqThorSpareProcess, processNumber, directory, machineList);
+        getThorMachineList(clientVersion, clusterName, directory, false, machineList);
     }
     }
     catch(IException* e)
     catch(IException* e)
     {
     {
@@ -1548,12 +1522,44 @@ void CTpWrapper::getThorSpareMachineList(double clientVersion, const char* clust
     return;
     return;
 }
 }
 
 
-void CTpWrapper::getMachineList(const char* MachineType,
-                                const char* ParentPath,
-                                const char* Status,
-                                          const char* Directory,
-                                IArrayOf<IEspTpMachine> &MachineList,
-                                set<string>* pMachineNames/*=NULL*/)
+void CTpWrapper::getThorMachineList(double clientVersion, const char* clusterName, const char* directory,
+    bool slaveNode, IArrayOf<IEspTpMachine>& machineList)
+{
+    Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory(true);
+    Owned<IConstEnvironment> constEnv = envFactory->openEnvironment();
+    Owned<IPropertyTree> root = &constEnv->getPTree();
+
+    VStringBuffer path("Software/%s[@name=\"%s\"]", eqThorCluster, clusterName);
+    Owned<IPropertyTree> cluster= root->getPropTree(path.str());
+    if (!cluster)
+        throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
+
+    Owned<IGroup> nodeGroup;
+    if (slaveNode)
+    {
+        nodeGroup.setown(getClusterProcessNodeGroup(clusterName, eqThorCluster));
+    }
+    else
+    {
+        StringBuffer groupName;
+        getClusterSpareGroupName(*cluster, groupName);
+        if (groupName.length() < 1)
+            return;
+        nodeGroup.setown(queryNamedGroupStore().lookup(groupName.str()));
+    }
+    if (!nodeGroup || (nodeGroup->ordinality() == 0))
+        return;
+
+    unsigned processNumber = 0;
+    unsigned channels = cluster->getPropInt("@channelsPerSlave", 1);
+    Owned<INodeIterator> gi = nodeGroup->getIterator();
+    ForEach(*gi)
+        appendThorMachineList(clientVersion, constEnv, gi->query(), clusterName,
+            slaveNode? eqThorSlaveProcess : eqThorSpareProcess, processNumber, channels, directory, machineList);
+}
+
+void CTpWrapper::getMachineList(double clientVersion, const char* MachineType, const char* ParentPath,
+    const char* Status, const char* Directory, IArrayOf<IEspTpMachine>& MachineList, set<string>* pMachineNames/*=NULL*/)
 {
 {
     try
     try
     {
     {
@@ -1570,6 +1576,9 @@ void CTpWrapper::getMachineList(const char* MachineType,
         if (!root)
         if (!root)
             throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
             throw MakeStringExceptionDirect(ECLWATCH_CANNOT_GET_ENV_INFO, MSG_FAILED_GET_ENVIRONMENT_INFO);
 
 
+        bool hasPropChannelsPerNode = root->hasProp("@channelsPerNode");
+        int channels = root->getPropInt("@channelsPerNode");
+
         Owned<IPropertyTreeIterator> machines= root->getElements(MachineType);
         Owned<IPropertyTreeIterator> machines= root->getElements(MachineType);
         const char* nodenametag = getNodeNameTag(MachineType);
         const char* nodenametag = getNodeNameTag(MachineType);
         if (machines->first()) {
         if (machines->first()) {
@@ -1595,6 +1604,8 @@ void CTpWrapper::getMachineList(const char* MachineType,
 
 
                     if (Directory && *Directory)
                     if (Directory && *Directory)
                         machineInfo.setDirectory(Directory);
                         machineInfo.setDirectory(Directory);
+                    if (hasPropChannelsPerNode && (clientVersion >= 1.30))
+                        machineInfo.setChannels(channels);
 
 
                     MachineList.append(machineInfo);
                     MachineList.append(machineInfo);
                 }
                 }

+ 6 - 8
esp/smc/SMCLib/TpWrapper.hpp

@@ -139,6 +139,10 @@ private:
     void appendTpDropZone(double clientVersion, IConstEnvironment* constEnv, IConstDropZoneInfo& dropZoneInfo, IArrayOf<IConstTpDropZone>& list);
     void appendTpDropZone(double clientVersion, IConstEnvironment* constEnv, IConstDropZoneInfo& dropZoneInfo, IArrayOf<IConstTpDropZone>& list);
     void appendTpSparkThor(double clientVersion, IConstEnvironment* constEnv, IConstSparkThorInfo& sparkThorInfo, IArrayOf<IConstTpSparkThor>& list);
     void appendTpSparkThor(double clientVersion, IConstEnvironment* constEnv, IConstSparkThorInfo& sparkThorInfo, IArrayOf<IConstTpSparkThor>& list);
     void appendTpMachine(double clientVersion, IConstEnvironment* constEnv, IConstInstanceInfo& instanceInfo, IArrayOf<IConstTpMachine>& machines);
     void appendTpMachine(double clientVersion, IConstEnvironment* constEnv, IConstInstanceInfo& instanceInfo, IArrayOf<IConstTpMachine>& machines);
+    void getThorMachineList(double clientVersion, const char* clusterName, const char* directory,
+        bool slaveNode, IArrayOf<IEspTpMachine>& machineList);
+    void appendThorMachineList(double clientVersion, IConstEnvironment* constEnv, INode& node, const char* clusterName,
+         const char* machineType, unsigned& processNumber, unsigned channels, const char* directory, IArrayOf<IEspTpMachine>& machineList);
 
 
 public:
 public:
     IMPLEMENT_IINTERFACE;
     IMPLEMENT_IINTERFACE;
@@ -152,16 +156,10 @@ public:
     void getCluster(const char* ClusterType,IPropertyTree& returnRoot);
     void getCluster(const char* ClusterType,IPropertyTree& returnRoot);
     void getClusterMachineList(double clientVersion, const char* ClusterType,const char* ClusterPath, const char* ClusterDirectory, 
     void getClusterMachineList(double clientVersion, const char* ClusterType,const char* ClusterPath, const char* ClusterDirectory, 
                                         IArrayOf<IEspTpMachine> &MachineList, bool& hasThorSpareProcess, const char* ClusterName = NULL);
                                         IArrayOf<IEspTpMachine> &MachineList, bool& hasThorSpareProcess, const char* ClusterName = NULL);
-    void getMachineList( const char* MachineType,
-                        const char* MachinePath,
-                        const char* Status,
-                                const char* Directory,
-                        IArrayOf<IEspTpMachine> &MachineList, 
-                        set<string>* pMachineNames=NULL);
+    void getMachineList(double clientVersion, const char* MachineType, const char* MachinePath, const char* Status,
+        const char* Directory, IArrayOf<IEspTpMachine>& MachineList, set<string>* pMachineNames=nullptr);
     void getThorSpareMachineList(double clientVersion, const char* clusterName, const char* directory, IArrayOf<IEspTpMachine>& machineList);
     void getThorSpareMachineList(double clientVersion, const char* clusterName, const char* directory, IArrayOf<IEspTpMachine>& machineList);
     void getThorSlaveMachineList(double clientVersion, const char* clusterName, const char* directory, IArrayOf<IEspTpMachine>& machineList);
     void getThorSlaveMachineList(double clientVersion, const char* clusterName, const char* directory, IArrayOf<IEspTpMachine>& machineList);
-    void appendMachineList(double clientVersion, IConstEnvironment* constEnv, INode& node, const char* clusterName,
-         const char* machineType, unsigned& processNumber, const char* directory, IArrayOf<IEspTpMachine>& machineList);
     bool checkMultiSlavesFlag(const char* clusterName);
     bool checkMultiSlavesFlag(const char* clusterName);
     void getDropZoneMachineList(double clientVersion, bool ECLWatchVisibleOnly, IArrayOf<IEspTpMachine> &MachineList);
     void getDropZoneMachineList(double clientVersion, bool ECLWatchVisibleOnly, IArrayOf<IEspTpMachine> &MachineList);
     void setMachineInfo(const char* name,const char* type,IEspTpMachine& machine);
     void setMachineInfo(const char* name,const char* type,IEspTpMachine& machine);