|
@@ -42,15 +42,18 @@ public:
|
|
|
CMachineData& m_machineData; //From request
|
|
|
IArrayOf<IEspMachineInfoEx>& m_machineInfoTable; //For response
|
|
|
StringArray& m_machineInfoColumns; //For response
|
|
|
+ MapStringTo<int>& channelsMap; //For response
|
|
|
|
|
|
CMachineInfoThreadParam(Cws_machineEx* pService, IEspContext& context, CGetMachineInfoUserOptions& options,
|
|
|
- CMachineData& machineData, IArrayOf<IEspMachineInfoEx>& machineInfoTable, StringArray& machineInfoColumns )
|
|
|
+ CMachineData& machineData, IArrayOf<IEspMachineInfoEx>& machineInfoTable, StringArray& machineInfoColumns,
|
|
|
+ MapStringTo<int>& _channelsMap)
|
|
|
: CWsMachineThreadParam(NULL, NULL, NULL, pService),
|
|
|
m_context(context),
|
|
|
m_options(options),
|
|
|
m_machineData(machineData),
|
|
|
m_machineInfoTable(machineInfoTable),
|
|
|
- m_machineInfoColumns(machineInfoColumns)
|
|
|
+ m_machineInfoColumns(machineInfoColumns),
|
|
|
+ channelsMap(_channelsMap)
|
|
|
{
|
|
|
}
|
|
|
|
|
@@ -66,6 +69,7 @@ public:
|
|
|
if (m_machineInfoColumns.find(columnName) == NotFound)
|
|
|
m_machineInfoColumns.append(columnName);
|
|
|
}
|
|
|
+ int* getChannels(const char* key) { return channelsMap.getValue(key); };
|
|
|
private:
|
|
|
static Mutex s_mutex;
|
|
|
};
|
|
@@ -77,9 +81,12 @@ class CRoxieStateInfoThreadParam : public CWsMachineThreadParam
|
|
|
public:
|
|
|
StringAttr clusterName;
|
|
|
IArrayOf<IEspMachineInfoEx>& machineInfoTable; //For response
|
|
|
+ MapStringTo<int>& channelsMap; //For response
|
|
|
|
|
|
- CRoxieStateInfoThreadParam(Cws_machineEx* pService, const char* _clusterName, IArrayOf<IEspMachineInfoEx>& _machineInfoTable)
|
|
|
- : CWsMachineThreadParam(pService), clusterName(_clusterName), machineInfoTable(_machineInfoTable)
|
|
|
+ CRoxieStateInfoThreadParam(Cws_machineEx* pService, const char* _clusterName,
|
|
|
+ IArrayOf<IEspMachineInfoEx>& _machineInfoTable, MapStringTo<int>& _channelsMap)
|
|
|
+ : CWsMachineThreadParam(pService), clusterName(_clusterName), machineInfoTable(_machineInfoTable),
|
|
|
+ channelsMap(_channelsMap)
|
|
|
{
|
|
|
}
|
|
|
|
|
@@ -87,6 +94,7 @@ public:
|
|
|
{
|
|
|
m_pService->getRoxieStateInfo(this);
|
|
|
}
|
|
|
+ int* getChannels(const char* key) { return channelsMap.getValue(key); };
|
|
|
};
|
|
|
|
|
|
class CGetMachineUsageThreadParam : public CWsMachineThreadParam
|
|
@@ -268,6 +276,47 @@ bool Cws_machineEx::onGetTargetClusterInfo(IEspContext &context, IEspGetTargetCl
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
+void Cws_machineEx::addChannels(CGetMachineInfoData& machineInfoData, IPropertyTree* envRoot,
|
|
|
+ const char* componentType, const char* componentName)
|
|
|
+{
|
|
|
+ VStringBuffer key("%s|%s", componentType, componentName);
|
|
|
+ int* channels = machineInfoData.getChannels(key);
|
|
|
+ if (channels)
|
|
|
+ return;
|
|
|
+
|
|
|
+ StringBuffer path("Software/");
|
|
|
+ if (strieq(componentType, eqThorSlaveProcess))
|
|
|
+ path.append(eqThorCluster);
|
|
|
+ else if (strieq(componentType, eqRoxieServerProcess))
|
|
|
+ path.append(eqRoxieCluster);
|
|
|
+ else
|
|
|
+ throw MakeStringException(ECLWATCH_INVALID_COMPONENT_TYPE, "Invalid %s in Cws_machineEx::addChannels().", componentType);
|
|
|
+
|
|
|
+ path.appendf("[@name=\"%s\"]", componentName);
|
|
|
+
|
|
|
+ Owned<IPropertyTree> component;
|
|
|
+ if (envRoot)
|
|
|
+ {
|
|
|
+ component.setown(envRoot->getPropTree(path));
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory(true);
|
|
|
+ Owned<IConstEnvironment> constEnv = envFactory->openEnvironment();
|
|
|
+ Owned<IPropertyTree> root = &constEnv->getPTree();
|
|
|
+ component.setown(root->getPropTree(path));
|
|
|
+ }
|
|
|
+ if (!component)
|
|
|
+ throw MakeStringException(ECLWATCH_INVALID_IP_OR_COMPONENT, "%s not found.", componentName);
|
|
|
+
|
|
|
+ StringAttr attr;
|
|
|
+ if (strieq(componentType, eqThorSlaveProcess))
|
|
|
+ attr.set("@channelsPerSlave");
|
|
|
+ else
|
|
|
+ attr.set("@channelsPerNode");
|
|
|
+ if (component->hasProp(attr.get()))
|
|
|
+ machineInfoData.addToChannelsMap(key, component->getPropInt(attr.get()));
|
|
|
+}
|
|
|
////////////////////////////////////////////////////////////////////////////////////////
|
|
|
// Read Machine Infomation request and collect related settings from environment.xml //
|
|
|
////////////////////////////////////////////////////////////////////////////////////////
|
|
@@ -275,6 +324,7 @@ bool Cws_machineEx::onGetTargetClusterInfo(IEspContext &context, IEspGetTargetCl
|
|
|
void Cws_machineEx::readMachineInfoRequest(IEspContext& context, bool getProcessorInfo, bool getStorageInfo, bool localFileSystemsOnly, bool getSoftwareInfo, bool applyProcessFilter,
|
|
|
StringArray& processes, const char* addProcessesToFilters, CGetMachineInfoData& machineInfoData)
|
|
|
{
|
|
|
+ double version = context.getClientVersion();
|
|
|
StringBuffer userID, password;
|
|
|
context.getUserID(userID);
|
|
|
context.getPassword(password);
|
|
@@ -312,6 +362,8 @@ void Cws_machineEx::readMachineInfoRequest(IEspContext& context, bool getProcess
|
|
|
setProcessRequest(machineInfoData, uniqueProcesses, address1.str(), address2.str(), processType.str(), compName.str(), path.str(), processNumber);
|
|
|
if (strieq(processType.str(), eqRoxieServerProcess))
|
|
|
machineInfoData.appendRoxieClusters(compName.str());
|
|
|
+ if ((version >= 1.16) && (strieq(processType, eqThorSlaveProcess) || strieq(processType, eqRoxieServerProcess)))
|
|
|
+ addChannels(machineInfoData, nullptr, processType, compName);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -606,28 +658,28 @@ void Cws_machineEx::readSettingsForTargetClusters(IEspContext& context, StringAr
|
|
|
//Read Cluster processes
|
|
|
if (clusterProcesses && clusterProcesses->first())
|
|
|
ForEach(*clusterProcesses)
|
|
|
- readTargetClusterProcesses(clusterProcesses->query(), clusterType.str(), uniqueProcesses, machineInfoData, targetClusterOut);
|
|
|
+ readTargetClusterProcesses(context, clusterProcesses->query(), clusterType.str(), uniqueProcesses, machineInfoData, targetClusterOut);
|
|
|
|
|
|
//Read eclCCServer process
|
|
|
if (eclCCServerProcesses->first())
|
|
|
- readTargetClusterProcesses(eclCCServerProcesses->query(), eqEclCCServer, uniqueProcesses, machineInfoData, targetClusterOut);
|
|
|
+ readTargetClusterProcesses(context, eclCCServerProcesses->query(), eqEclCCServer, uniqueProcesses, machineInfoData, targetClusterOut);
|
|
|
|
|
|
//Read eclServer process
|
|
|
if (eclServerProcesses->first())
|
|
|
- readTargetClusterProcesses(eclServerProcesses->query(), eqEclServer, uniqueProcesses, machineInfoData, targetClusterOut);
|
|
|
+ readTargetClusterProcesses(context, eclServerProcesses->query(), eqEclServer, uniqueProcesses, machineInfoData, targetClusterOut);
|
|
|
|
|
|
//Read eclAgent process
|
|
|
if (eclAgentProcesses->first())
|
|
|
- readTargetClusterProcesses(eclAgentProcesses->query(), eqEclAgent, uniqueProcesses, machineInfoData, targetClusterOut);
|
|
|
+ readTargetClusterProcesses(context, eclAgentProcesses->query(), eqEclAgent, uniqueProcesses, machineInfoData, targetClusterOut);
|
|
|
|
|
|
//Read eclScheduler process
|
|
|
if (eclSchedulerProcesses->first())
|
|
|
- readTargetClusterProcesses(eclSchedulerProcesses->query(), eqEclScheduler, uniqueProcesses, machineInfoData, targetClusterOut);
|
|
|
+ readTargetClusterProcesses(context, eclSchedulerProcesses->query(), eqEclScheduler, uniqueProcesses, machineInfoData, targetClusterOut);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
//Collect settings for one group of target cluster processes
|
|
|
-void Cws_machineEx::readTargetClusterProcesses(IPropertyTree &processNode, const char* nodeType, BoolHash& uniqueProcesses, CGetMachineInfoData& machineInfoData,
|
|
|
+void Cws_machineEx::readTargetClusterProcesses(IEspContext& context, IPropertyTree &processNode, const char* nodeType, BoolHash& uniqueProcesses, CGetMachineInfoData& machineInfoData,
|
|
|
IPropertyTree* targetClustersOut)
|
|
|
{
|
|
|
const char* process = processNode.queryProp("@process");
|
|
@@ -643,6 +695,7 @@ void Cws_machineEx::readTargetClusterProcesses(IPropertyTree &processNode, const
|
|
|
if (!pEnvironmentSoftware)
|
|
|
throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Failed to get environment information.");
|
|
|
|
|
|
+ double version = context.getClientVersion();
|
|
|
IPropertyTree* pClusterProcess = NULL;
|
|
|
if (strieq(nodeType, eqThorCluster) || strieq(nodeType, eqRoxieCluster))
|
|
|
{
|
|
@@ -679,11 +732,16 @@ void Cws_machineEx::readTargetClusterProcesses(IPropertyTree &processNode, const
|
|
|
getProcesses(constEnv, pClusterProcess, process, eqThorMasterProcess, dirStr.str(), machineInfoData, true, uniqueProcesses);
|
|
|
getThorProcesses(constEnv, pClusterProcess, process, eqThorSlaveProcess, dirStr.str(), machineInfoData, uniqueProcesses);
|
|
|
getThorProcesses(constEnv, pClusterProcess, process, eqThorSpareProcess, dirStr.str(), machineInfoData, uniqueProcesses);
|
|
|
+ if (version >= 1.16)
|
|
|
+ addChannels(machineInfoData, pEnvironmentRoot, eqThorSlaveProcess, process);
|
|
|
+
|
|
|
}
|
|
|
else if (strieq(nodeType, eqRoxieCluster))
|
|
|
{
|
|
|
BoolHash uniqueRoxieProcesses;
|
|
|
getProcesses(constEnv, pClusterProcess, process, eqRoxieServerProcess, dirStr.str(), machineInfoData, true, uniqueProcesses, &uniqueRoxieProcesses);
|
|
|
+ if (version >= 1.16)
|
|
|
+ addChannels(machineInfoData, pEnvironmentRoot, eqRoxieServerProcess, process);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -693,20 +751,35 @@ void Cws_machineEx::getThorProcesses(IConstEnvironment* constEnv, IPropertyTree*
|
|
|
if (!constEnv || !cluster)
|
|
|
return;
|
|
|
|
|
|
+ Owned<IGroup> nodeGroup;
|
|
|
StringBuffer groupName;
|
|
|
if (strieq(processType, eqThorSlaveProcess))
|
|
|
+ {
|
|
|
getClusterGroupName(*cluster, groupName);
|
|
|
- else if (strieq(processType, eqThorSpareProcess))
|
|
|
+ if (groupName.length() < 1)
|
|
|
+ {
|
|
|
+ OWARNLOG("Cannot find group name for %s", processName);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ nodeGroup.setown(getClusterProcessNodeGroup(processName, eqThorCluster));
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
getClusterSpareGroupName(*cluster, groupName);
|
|
|
-
|
|
|
- if (groupName.length() < 1)
|
|
|
- return;
|
|
|
-
|
|
|
- Owned<IGroup> nodeGroup = queryNamedGroupStore().lookup(groupName.str());
|
|
|
+ if (groupName.length() < 1)
|
|
|
+ {
|
|
|
+ OWARNLOG("Cannot find group name for %s", processName);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ nodeGroup.setown(queryNamedGroupStore().lookup(groupName.str()));
|
|
|
+ }
|
|
|
if (!nodeGroup || (nodeGroup->ordinality() == 0))
|
|
|
+ {
|
|
|
+ OWARNLOG("Cannot find node group for %s", processName);
|
|
|
return;
|
|
|
+ }
|
|
|
|
|
|
- unsigned processNumber = 0;
|
|
|
+ int slavesPerNode = cluster->getPropInt("@slavesPerNode");
|
|
|
Owned<INodeIterator> gi = nodeGroup->getIterator();
|
|
|
ForEach(*gi)
|
|
|
{
|
|
@@ -718,8 +791,6 @@ void Cws_machineEx::getThorProcesses(IConstEnvironment* constEnv, IPropertyTree*
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- processNumber++;
|
|
|
-
|
|
|
StringBuffer netAddress;
|
|
|
const char* ip = addressRead.str();
|
|
|
if (!streq(ip, "."))
|
|
@@ -745,7 +816,10 @@ void Cws_machineEx::getThorProcesses(IConstEnvironment* constEnv, IPropertyTree*
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- setProcessRequest(machineInfoData, uniqueProcesses, netAddress.str(), addressRead.str(), processType, processName, directory, processNumber);
|
|
|
+ //Each thor slave is a process. The i is used to check whether the process is running or not.
|
|
|
+ for (unsigned i = 1; i <= slavesPerNode; i++)
|
|
|
+ setProcessRequest(machineInfoData, uniqueProcesses, netAddress.str(), addressRead.str(),
|
|
|
+ processType, processName, directory, i);
|
|
|
}
|
|
|
|
|
|
return;
|
|
@@ -1003,7 +1077,8 @@ void Cws_machineEx::getMachineInfo(IEspContext& context, bool getRoxieState, CGe
|
|
|
ForEachItemIn(idx, machines)
|
|
|
{
|
|
|
Owned<CMachineInfoThreadParam> pThreadReq = new CMachineInfoThreadParam(this, context, machineInfoData.getOptions(),
|
|
|
- machines.item(idx), machineInfoData.getMachineInfoTable(), machineInfoData.getMachineInfoColumns());
|
|
|
+ machines.item(idx), machineInfoData.getMachineInfoTable(), machineInfoData.getMachineInfoColumns(),
|
|
|
+ machineInfoData.getChannelsMap());
|
|
|
PooledThreadHandle handle = m_threadPool->start( pThreadReq.getClear());
|
|
|
threadHandles.append(handle);
|
|
|
}
|
|
@@ -1014,7 +1089,7 @@ void Cws_machineEx::getMachineInfo(IEspContext& context, bool getRoxieState, CGe
|
|
|
ForEachItemIn(i, roxieClusters)
|
|
|
{
|
|
|
Owned<CRoxieStateInfoThreadParam> pThreadReq = new CRoxieStateInfoThreadParam(this, roxieClusters.item(i),
|
|
|
- machineInfoData.getMachineInfoTable());
|
|
|
+ machineInfoData.getMachineInfoTable(), machineInfoData.getChannelsMap());
|
|
|
PooledThreadHandle handle = m_threadPool->start( pThreadReq.getClear());
|
|
|
threadHandles.append(handle);
|
|
|
}
|
|
@@ -1642,6 +1717,14 @@ void Cws_machineEx::setProcessInfo(IEspContext& context, CMachineInfoThreadParam
|
|
|
pMachineInfo->setProcessNumber(process.getProcessNumber());
|
|
|
}
|
|
|
|
|
|
+ if ((version >= 1.16) && (strieq(process.getType(), eqThorSlaveProcess) || strieq(process.getType(), eqRoxieServerProcess)))
|
|
|
+ {
|
|
|
+ VStringBuffer key("%s|%s", process.getType(), process.getName());
|
|
|
+ int* channels = pParam->getChannels(key);
|
|
|
+ if (channels)
|
|
|
+ pMachineInfo->setChannels(*channels);
|
|
|
+ }
|
|
|
+
|
|
|
if (error != 0 || !response || !*response)
|
|
|
{
|
|
|
StringBuffer description;
|