|
@@ -32,6 +32,15 @@
|
|
|
#include "exception_util.hpp"
|
|
|
|
|
|
#include "roxiecontrol.hpp"
|
|
|
+#include "workunit.hpp"
|
|
|
+
|
|
|
+#define STATUS_SERVER_THOR "ThorMaster"
|
|
|
+#define STATUS_SERVER_HTHOR "HThorServer"
|
|
|
+#define STATUS_SERVER_ROXIE "RoxieServer"
|
|
|
+#define STATUS_SERVER_DFUSERVER "DFUserver"
|
|
|
+#define STATUS_SERVER_ECLSERVER "ECLserver"
|
|
|
+#define STATUS_SERVER_ECLCCSERVER "ECLCCserver"
|
|
|
+#define STATUS_SERVER_ECLAGENT "ECLagent"
|
|
|
|
|
|
static const char* FEATURE_URL = "SmcAccess";
|
|
|
const char* THORQUEUE_FEATURE = "ThorQueueAccess";
|
|
@@ -732,7 +741,7 @@ void CWsSMCEx::getServersAndWUs(IEspContext &context, IEspActivityRequest &req,
|
|
|
aws.append(*wu.getLink());
|
|
|
}
|
|
|
|
|
|
- addServerJobQueue(version, serverJobQueues, queueName, serverName, "ECLCCserver", queueState.str(), queueStateDetails.str());
|
|
|
+ addServerJobQueue(version, serverJobQueues, queueName, serverName, "ECLCCserver", NULL, 0, queueState.str(), queueStateDetails.str());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -809,7 +818,7 @@ void CWsSMCEx::getServersAndWUs(IEspContext &context, IEspActivityRequest &req,
|
|
|
e->Release();
|
|
|
}
|
|
|
}
|
|
|
- addServerJobQueue(version, serverJobQueues, queueName, serverName, "DFUserver");
|
|
|
+ addServerJobQueue(version, serverJobQueues, queueName, serverName, "DFUserver", NULL, 0);
|
|
|
}
|
|
|
}
|
|
|
} while (services->next());
|
|
@@ -1089,12 +1098,12 @@ void CWsSMCEx::getWUsNotOnTargetCluster(IEspContext &context, IPropertyTree* ser
|
|
|
IPropertyTree& serverNode = it->query();
|
|
|
const char* serverName = serverNode.queryProp("@name");
|
|
|
const char* instance = serverNode.queryProp("@node");
|
|
|
- if (!serverName || !*serverName || !instance || !*instance)
|
|
|
+ const char* queueName = serverNode.queryProp("@queue");
|
|
|
+ unsigned port = serverNode.getPropInt("@mpport", 0);
|
|
|
+ if (!serverName || !*serverName || !instance || !*instance || strieq(serverName, "DFUserver"))//DFUServer already handled separately
|
|
|
continue;
|
|
|
|
|
|
- bool hasWU = false;
|
|
|
- StringBuffer queueName;
|
|
|
- queueName.appendf("%s_on_%s", serverName, instance);
|
|
|
+ VStringBuffer instanceName("%s_on_%s:%d", serverName, instance, port);
|
|
|
Owned<IPropertyTreeIterator> wuids(serverNode.getElements("WorkUnit"));
|
|
|
ForEach(*wuids)
|
|
|
{
|
|
@@ -1106,14 +1115,13 @@ void CWsSMCEx::getWUsNotOnTargetCluster(IEspContext &context, IPropertyTree* ser
|
|
|
continue;
|
|
|
|
|
|
Owned<IEspActiveWorkunit> wu;
|
|
|
- createActiveWorkUnit(wu, context, wuid, NULL, 0, serverName, queueName.str(), instance, NULL);
|
|
|
+ createActiveWorkUnit(wu, context, wuid, NULL, 0, serverName, queueName, instance, NULL);
|
|
|
aws.append(*wu.getLink());
|
|
|
- hasWU = true;
|
|
|
}
|
|
|
- if (hasWU && !uniqueServers.getValue(queueName))
|
|
|
+ if (!uniqueServers.getValue(instanceName))
|
|
|
{
|
|
|
- uniqueServers.setValue(queueName, true);
|
|
|
- addServerJobQueue(version, serverJobQueues, queueName.str(), serverName, serverName);
|
|
|
+ uniqueServers.setValue(instanceName, true);
|
|
|
+ addServerJobQueue(version, serverJobQueues, queueName, instanceName, serverName, instance, port);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1177,7 +1185,7 @@ void CWsSMCEx::getDFUServersAndWUs(IEspContext &context, IPropertyTree* envRoot,
|
|
|
{
|
|
|
const char *queueName = queues.item(q);
|
|
|
readDFUWUs(context, queueName, serverName, aws);
|
|
|
- addServerJobQueue(version, serverJobQueues, queueName, serverName, "DFUserver");
|
|
|
+ addServerJobQueue(version, serverJobQueues, queueName, serverName, "DFUserver", NULL, 0);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1346,9 +1354,12 @@ void CWsSMCEx::readTargetClusterInfo(IEspContext& context, IConstWUClusterInfo&
|
|
|
targetCluster->statusServerName.set(statusServerName.str());
|
|
|
targetCluster->queueName.set(jobQueue->queueName.str());
|
|
|
|
|
|
- jobQueue->foundQueueInStatusServer = findQueueInStatusServer(context, serverStatusRoot, statusServerName.str(), targetCluster->queueName.get());
|
|
|
- if (!jobQueue->foundQueueInStatusServer)
|
|
|
- targetCluster->clusterStatusDetails.appendf("Cluster %s not attached; ", clusterName.str());
|
|
|
+ if (serverStatusRoot)
|
|
|
+ {
|
|
|
+ jobQueue->foundQueueInStatusServer = findQueueInStatusServer(context, serverStatusRoot, statusServerName.str(), targetCluster->queueName.get());
|
|
|
+ if (!jobQueue->foundQueueInStatusServer)
|
|
|
+ targetCluster->clusterStatusDetails.appendf("Cluster %s not attached; ", clusterName.str());
|
|
|
+ }
|
|
|
|
|
|
return;
|
|
|
}
|
|
@@ -1486,22 +1497,24 @@ void CWsSMCEx::readRunningWUsOnStatusServer(IEspContext& context, IPropertyTree*
|
|
|
void CWsSMCEx::readWUsAndStateFromJobQueue(IEspContext& context, CIArrayOf<CWsSMCTargetCluster>& targetClusters, BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws)
|
|
|
{
|
|
|
ForEachItemIn(i, targetClusters)
|
|
|
+ readWUsAndStateFromJobQueue(context, targetClusters.item(i), uniqueWUIDs, aws);
|
|
|
+}
|
|
|
+
|
|
|
+void CWsSMCEx::readWUsAndStateFromJobQueue(IEspContext& context, CWsSMCTargetCluster& targetCluster, BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws)
|
|
|
+{
|
|
|
+ if (targetCluster.clusterType == ThorLCRCluster)
|
|
|
{
|
|
|
- CWsSMCTargetCluster& targetCluster = targetClusters.item(i);
|
|
|
- if (targetCluster.clusterType == ThorLCRCluster)
|
|
|
- {
|
|
|
- readWUsAndStateFromJobQueue(context, targetCluster, targetCluster.clusterQueue, NULL, uniqueWUIDs, aws);
|
|
|
- targetCluster.queueStatus.set(targetCluster.clusterQueue.queueState);
|
|
|
- }
|
|
|
- if (targetCluster.agentQueue.queueName.length())
|
|
|
- {
|
|
|
- readWUsAndStateFromJobQueue(context, targetCluster, targetCluster.agentQueue, targetCluster.agentQueue.queueName.str(), uniqueWUIDs, aws);
|
|
|
- if (targetCluster.clusterType != ThorLCRCluster)
|
|
|
- targetCluster.queueStatus.set(targetCluster.agentQueue.queueState);
|
|
|
- }
|
|
|
- if (targetCluster.serverQueue.queueName.length())
|
|
|
- readWUsAndStateFromJobQueue(context, targetCluster, targetCluster.serverQueue, targetCluster.serverQueue.queueName.str(), uniqueWUIDs, aws);
|
|
|
+ readWUsAndStateFromJobQueue(context, targetCluster, targetCluster.clusterQueue, NULL, uniqueWUIDs, aws);
|
|
|
+ targetCluster.queueStatus.set(targetCluster.clusterQueue.queueState);
|
|
|
}
|
|
|
+ if (targetCluster.agentQueue.queueName.length())
|
|
|
+ {
|
|
|
+ readWUsAndStateFromJobQueue(context, targetCluster, targetCluster.agentQueue, targetCluster.agentQueue.queueName.str(), uniqueWUIDs, aws);
|
|
|
+ if (targetCluster.clusterType != ThorLCRCluster)
|
|
|
+ targetCluster.queueStatus.set(targetCluster.agentQueue.queueState);
|
|
|
+ }
|
|
|
+ if (targetCluster.serverQueue.queueName.length())
|
|
|
+ readWUsAndStateFromJobQueue(context, targetCluster, targetCluster.serverQueue, targetCluster.serverQueue.queueName.str(), uniqueWUIDs, aws);
|
|
|
}
|
|
|
|
|
|
CWsSMCTargetCluster* CWsSMCEx::findTargetCluster(const char* clusterName, CIArrayOf<CWsSMCTargetCluster>& targetClusters)
|
|
@@ -1593,7 +1606,8 @@ void CWsSMCEx::setESPTargetClusters(IEspContext& context, CIArrayOf<CWsSMCTarget
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-void CWsSMCEx::addServerJobQueue(double version, IArrayOf<IEspServerJobQueue>& jobQueues, const char* queueName, const char* serverName, const char* serverType)
|
|
|
+void CWsSMCEx::addServerJobQueue(double version, IArrayOf<IEspServerJobQueue>& jobQueues, const char* queueName, const char* serverName,
|
|
|
+ const char* serverType, const char* networkAddress, unsigned port)
|
|
|
{
|
|
|
if (!queueName || !*queueName || !serverName || !*serverName || !serverType || !*serverType)
|
|
|
return;
|
|
@@ -1607,10 +1621,11 @@ void CWsSMCEx::addServerJobQueue(double version, IArrayOf<IEspServerJobQueue>& j
|
|
|
queueState.set("paused");
|
|
|
else
|
|
|
queueState.set("running");
|
|
|
- addServerJobQueue(version, jobQueues, queueName, serverName, serverType, queueState.str(), queueStateDetails.str());
|
|
|
+ addServerJobQueue(version, jobQueues, queueName, serverName, serverType, networkAddress, port, queueState.str(), queueStateDetails.str());
|
|
|
}
|
|
|
|
|
|
-void CWsSMCEx::addServerJobQueue(double version, IArrayOf<IEspServerJobQueue>& jobQueues, const char* queueName, const char* serverName, const char* serverType, const char* queueState, const char* queueStateDetails)
|
|
|
+void CWsSMCEx::addServerJobQueue(double version, IArrayOf<IEspServerJobQueue>& jobQueues, const char* queueName, const char* serverName,
|
|
|
+ const char* serverType, const char* networkAddress, unsigned port, const char* queueState, const char* queueStateDetails)
|
|
|
{
|
|
|
if (!queueName || !*queueName || !serverName || !*serverName || !serverType || !*serverType)
|
|
|
return;
|
|
@@ -1622,6 +1637,11 @@ void CWsSMCEx::addServerJobQueue(double version, IArrayOf<IEspServerJobQueue>& j
|
|
|
jobQueue->setQueueName(queueName);
|
|
|
jobQueue->setServerName(serverName);
|
|
|
jobQueue->setServerType(serverType);
|
|
|
+ if ((version >= 1.19) && networkAddress && *networkAddress)
|
|
|
+ {
|
|
|
+ jobQueue->setNetworkAddress(networkAddress);
|
|
|
+ jobQueue->setPort(port);
|
|
|
+ }
|
|
|
setServerJobQueueStatus(version, jobQueue, queueState, queueStateDetails);
|
|
|
|
|
|
jobQueues.append(*jobQueue.getClear());
|
|
@@ -1922,6 +1942,10 @@ bool CWsSMCEx::onStopQueue(IEspContext &context, IEspSMCQueueRequest &req, IEspS
|
|
|
Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
|
|
|
queue->stop(createQueueActionInfo(context, "stopped", req, info));
|
|
|
AccessSuccess(context, "Stopped queue %s",req.getCluster());
|
|
|
+ double version = context.getClientVersion();
|
|
|
+ if (version >= 1.19)
|
|
|
+ getStatusServerInfo(context, req.getServerType(), req.getCluster(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
|
|
|
+
|
|
|
resp.setRedirectUrl("/WsSMC/");
|
|
|
}
|
|
|
catch(IException* e)
|
|
@@ -1941,6 +1965,10 @@ bool CWsSMCEx::onResumeQueue(IEspContext &context, IEspSMCQueueRequest &req, IEs
|
|
|
Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
|
|
|
queue->resume(createQueueActionInfo(context, "resumed", req, info));
|
|
|
AccessSuccess(context, "Resumed queue %s",req.getCluster());
|
|
|
+ double version = context.getClientVersion();
|
|
|
+ if (version >= 1.19)
|
|
|
+ getStatusServerInfo(context, req.getServerType(), req.getCluster(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
|
|
|
+
|
|
|
resp.setRedirectUrl("/WsSMC/");
|
|
|
}
|
|
|
catch(IException* e)
|
|
@@ -1977,6 +2005,10 @@ bool CWsSMCEx::onPauseQueue(IEspContext &context, IEspSMCQueueRequest &req, IEsp
|
|
|
Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
|
|
|
queue->pause(createQueueActionInfo(context, "paused", req, info));
|
|
|
AccessSuccess(context, "Paused queue %s",req.getCluster());
|
|
|
+ double version = context.getClientVersion();
|
|
|
+ if (version >= 1.19)
|
|
|
+ getStatusServerInfo(context, req.getServerType(), req.getCluster(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
|
|
|
+
|
|
|
resp.setRedirectUrl("/WsSMC/");
|
|
|
}
|
|
|
catch(IException* e)
|
|
@@ -1999,6 +2031,10 @@ bool CWsSMCEx::onClearQueue(IEspContext &context, IEspSMCQueueRequest &req, IEsp
|
|
|
queue->clear();
|
|
|
}
|
|
|
AccessSuccess(context, "Cleared queue %s",req.getCluster());
|
|
|
+ double version = context.getClientVersion();
|
|
|
+ if (version >= 1.19)
|
|
|
+ getStatusServerInfo(context, req.getServerType(), req.getCluster(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
|
|
|
+
|
|
|
resp.setRedirectUrl("/WsSMC/");
|
|
|
}
|
|
|
catch(IException* e)
|
|
@@ -2480,3 +2516,265 @@ bool CWsSMCEx::onRoxieControlCmd(IEspContext &context, IEspRoxieControlCmdReques
|
|
|
resp.setEndpoints(respEndpoints);
|
|
|
return true;
|
|
|
}
|
|
|
+
|
|
|
+bool CWsSMCEx::onGetStatusServerInfo(IEspContext &context, IEspGetStatusServerInfoRequest &req, IEspGetStatusServerInfoResponse &resp)
|
|
|
+{
|
|
|
+ getStatusServerInfo(context, req.getServerType(), req.getServerName(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
+void CWsSMCEx::getStatusServerInfo(IEspContext &context, const char *serverType, const char *server, const char *networkAddress, unsigned port,
|
|
|
+ IEspStatusServerInfo& statusServerInfo)
|
|
|
+{
|
|
|
+ if (!serverType || !*serverType)
|
|
|
+ throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Server type not specified.");
|
|
|
+
|
|
|
+ if (strieq(serverType,STATUS_SERVER_THOR) || strieq(serverType,STATUS_SERVER_HTHOR) || strieq(serverType,STATUS_SERVER_ROXIE))
|
|
|
+ {
|
|
|
+ if (!server || !*server)
|
|
|
+ throw MakeStringException(ECLWATCH_MISSING_PARAMS, "cluster not specified.");
|
|
|
+ getStatusServerInfo(context, server, statusServerInfo);
|
|
|
+ }
|
|
|
+ else if (!strieq(serverType,STATUS_SERVER_DFUSERVER))
|
|
|
+ {
|
|
|
+ if (!networkAddress || !*networkAddress)
|
|
|
+ throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Server network address not specified.");
|
|
|
+ getStatusServerInfo(context, serverType, networkAddress, port, statusServerInfo);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (!server || !*server)
|
|
|
+ throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Server not specified.");
|
|
|
+ getDFUServerInfo(context, server, statusServerInfo);
|
|
|
+ }
|
|
|
+ return;
|
|
|
+}
|
|
|
+
|
|
|
+void CWsSMCEx::getStatusServerInfo(IEspContext &context, const char* clusteName, IEspStatusServerInfo& statusServerInfo)
|
|
|
+{
|
|
|
+ double version = context.getClientVersion();
|
|
|
+
|
|
|
+ Owned<IConstWUClusterInfo> info = getTargetClusterInfo(clusteName);
|
|
|
+ if (!info)
|
|
|
+ throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO,"Failed to get target cluster information.");
|
|
|
+ CWsSMCTargetCluster targetCluster;
|
|
|
+ readTargetClusterInfo(context, *info, NULL, &targetCluster);
|
|
|
+
|
|
|
+ bool foundQueueInStatusServer = false;
|
|
|
+ Owned<IPropertyTree> statusServerTree = getStatusServerTree(info);
|
|
|
+ if (statusServerTree)
|
|
|
+ {
|
|
|
+ foundQueueInStatusServer = true;
|
|
|
+
|
|
|
+ BoolHash uniqueWUIDs;
|
|
|
+ IArrayOf<IEspActiveWorkunit> aws;
|
|
|
+ StringBuffer networkAddress;
|
|
|
+ statusServerTree->getProp("@node", networkAddress);
|
|
|
+ unsigned port = statusServerTree->getPropInt("@mpport");
|
|
|
+ readRunningWUsOnCluster(context, clusteName, networkAddress.str(), port, targetCluster, statusServerTree, uniqueWUIDs, aws);
|
|
|
+ readWUsAndStateFromJobQueue(context, targetCluster, uniqueWUIDs, aws);
|
|
|
+ statusServerInfo.setWorkunits(aws);
|
|
|
+ }
|
|
|
+
|
|
|
+ IEspTargetCluster& clusterInfo = statusServerInfo.updateTargetClusterInfo();
|
|
|
+ clusterInfo.setClusterName(targetCluster.clusterName.get());
|
|
|
+ clusterInfo.setClusterSize(targetCluster.clusterSize);
|
|
|
+ clusterInfo.setClusterType(targetCluster.clusterType);
|
|
|
+ clusterInfo.setQueueName(targetCluster.queueName.get());
|
|
|
+ clusterInfo.setQueueStatus(targetCluster.queueStatus.get());
|
|
|
+ if (targetCluster.clusterType != ThorLCRCluster)
|
|
|
+ targetCluster.agentQueue.foundQueueInStatusServer = foundQueueInStatusServer;
|
|
|
+ else
|
|
|
+ targetCluster.clusterQueue.foundQueueInStatusServer = foundQueueInStatusServer;
|
|
|
+ setClusterStatus(context, targetCluster, &clusterInfo);
|
|
|
+}
|
|
|
+
|
|
|
+void CWsSMCEx::getStatusServerInfo(IEspContext &context, const char* type, const char *networkAddress, unsigned port, IEspStatusServerInfo& statusServerInfo)
|
|
|
+{
|
|
|
+ double version = context.getClientVersion();
|
|
|
+
|
|
|
+ Owned<IPropertyTree> statusServerTree = getStatusServerTree(networkAddress, port);
|
|
|
+ if (!statusServerTree)
|
|
|
+ throw MakeStringException(ECLWATCH_CANNOT_GET_STATUS_INFO, "Server not attached");
|
|
|
+
|
|
|
+ IEspServerJobQueue& serverInfo = statusServerInfo.updateServerInfo();
|
|
|
+ serverInfo.setNetworkAddress(networkAddress);
|
|
|
+ serverInfo.setPort(port);
|
|
|
+
|
|
|
+ StringBuffer queueName, instance;
|
|
|
+ statusServerTree->getProp("@queue", queueName);
|
|
|
+ setServerJobQueue(version, type, NULL, queueName.str(), serverInfo);
|
|
|
+ instance.appendf("%s on %s:%d", type, networkAddress, port);
|
|
|
+
|
|
|
+ IArrayOf<IEspActiveWorkunit> aws;
|
|
|
+ Owned<IPropertyTreeIterator> wuids(statusServerTree->getElements("WorkUnit"));
|
|
|
+ ForEach(*wuids)
|
|
|
+ {
|
|
|
+ const char* wuid=wuids->query().queryProp(NULL);
|
|
|
+ if (!wuid || !*wuid)
|
|
|
+ continue;
|
|
|
+
|
|
|
+ Owned<IEspActiveWorkunit> wu;
|
|
|
+ createActiveWorkUnit(wu, context, wuid, NULL, 0, type, queueName.str(), instance, NULL);
|
|
|
+ aws.append(*wu.getLink());
|
|
|
+ }
|
|
|
+ statusServerInfo.setWorkunits(aws);
|
|
|
+}
|
|
|
+
|
|
|
+void CWsSMCEx::getDFUServerInfo(IEspContext &context, const char* serverName, IEspStatusServerInfo& statusServerInfo)
|
|
|
+{
|
|
|
+ double version = context.getClientVersion();
|
|
|
+
|
|
|
+ VStringBuffer xpath("/Environment/Software/%s[@name=\"%s\"]", eqDfu, serverName);
|
|
|
+ Owned<IRemoteConnection> connEnv = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
|
|
|
+ if (!connEnv)
|
|
|
+ throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO,"Failed to get environment information.");
|
|
|
+ IPropertyTree* serviceTree = connEnv->queryRoot();
|
|
|
+ if (!serviceTree)
|
|
|
+ throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO,"Failed to get environment information.");
|
|
|
+ const char *queueName = serviceTree->queryProp("@queue");
|
|
|
+ if (!queueName || !*queueName)
|
|
|
+ throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Server queue not found.");
|
|
|
+
|
|
|
+ setServerJobQueue(version, "DFUserver", serverName, queueName, statusServerInfo.updateServerInfo());
|
|
|
+
|
|
|
+ IArrayOf<IEspActiveWorkunit> aws;
|
|
|
+ readDFUWUs(context, queueName, serverName, aws);
|
|
|
+ statusServerInfo.setWorkunits(aws);
|
|
|
+}
|
|
|
+
|
|
|
+IPropertyTree* CWsSMCEx::getStatusServerTree(IConstWUClusterInfo* info)
|
|
|
+{
|
|
|
+ SCMStringBuffer str;
|
|
|
+ StringBuffer xpath;
|
|
|
+ if (info->getPlatform() != HThorCluster)
|
|
|
+ {
|
|
|
+ if (info->getPlatform() == ThorLCRCluster)
|
|
|
+ xpath.setf("/Status/Servers/Server[@name=\"%s\"][@cluster=\"%s\"]", getStatusServerTypeName(WsSMCSSTThorLCRCluster), info->getThorProcesses().item(0));
|
|
|
+ else
|
|
|
+ xpath.setf("/Status/Servers/Server[@name=\"%s\"][@cluster=\"%s\"]", getStatusServerTypeName(WsSMCSSTRoxieCluster), info->getRoxieProcess(str).str());
|
|
|
+ Owned<IRemoteConnection> connStatusServer = querySDS().connect(xpath.str(),myProcessSession(),RTM_LOCK_READ,SDS_LOCK_TIMEOUT);
|
|
|
+ if (!connStatusServer)
|
|
|
+ return NULL;
|
|
|
+
|
|
|
+ Owned<IPropertyTree> retServerTree = connStatusServer->queryRoot()->getBranch(NULL);
|
|
|
+ return retServerTree.getClear();
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ Owned<IRemoteConnection> connStatusServer = querySDS().connect("/Status/Servers",myProcessSession(),RTM_LOCK_READ,SDS_LOCK_TIMEOUT);
|
|
|
+ if (!connStatusServer)
|
|
|
+ throw MakeStringException(ECLWATCH_CANNOT_GET_STATUS_INFO, "Status servers not found");
|
|
|
+
|
|
|
+ info->getAgentQueue(str);
|
|
|
+ xpath.setf("Server[@name=\"%s\"]", getStatusServerTypeName(WsSMCSSTHThorCluster));
|
|
|
+ Owned<IPropertyTreeIterator> it(connStatusServer->queryRoot()->getElements(xpath));
|
|
|
+ ForEach(*it)
|
|
|
+ {
|
|
|
+ IPropertyTree &serverTree = it->query();
|
|
|
+ const char *queueNames = serverTree.queryProp("@queue");
|
|
|
+ if (!queueNames || !*queueNames)
|
|
|
+ continue;
|
|
|
+
|
|
|
+ StringArray qlist;
|
|
|
+ qlist.appendListUniq(queueNames, ",");
|
|
|
+ ForEachItemIn(q, qlist)
|
|
|
+ {
|
|
|
+ if (!strieq(qlist.item(q), str.str()))
|
|
|
+ continue;
|
|
|
+
|
|
|
+ Owned<IPropertyTree> retServerTree = serverTree.getBranch(NULL);
|
|
|
+ return retServerTree.getClear();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return NULL;
|
|
|
+}
|
|
|
+
|
|
|
+IPropertyTree* CWsSMCEx::getStatusServerTree(const char *networkAddress, unsigned port)
|
|
|
+{
|
|
|
+ if (!networkAddress || !*networkAddress)
|
|
|
+ throw MakeStringException(ECLWATCH_INVALID_INPUT, "Network Address not specified");
|
|
|
+
|
|
|
+ VStringBuffer xpath("/Status/Servers/Server[@node=\"%s\"][@mpport=\"%d\"]", networkAddress, port);
|
|
|
+ Owned<IRemoteConnection> connStatusServer = querySDS().connect(xpath.str(),myProcessSession(),RTM_LOCK_READ,SDS_LOCK_TIMEOUT);
|
|
|
+ if (!connStatusServer)
|
|
|
+ return NULL;
|
|
|
+
|
|
|
+ Owned<IPropertyTree> retServerTree = connStatusServer->queryRoot()->getBranch(NULL);
|
|
|
+ return retServerTree.getClear();
|
|
|
+}
|
|
|
+
|
|
|
+void CWsSMCEx::readRunningWUsOnCluster(IEspContext& context, const char* serverName, const char* node, unsigned port,
|
|
|
+ CWsSMCTargetCluster& targetCluster, IPropertyTree* statusServerNode, BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws)
|
|
|
+{
|
|
|
+ const char *cluster = statusServerNode->queryProp("Cluster");
|
|
|
+ StringBuffer queueName;
|
|
|
+ if (cluster) // backward compat check.
|
|
|
+ getClusterThorQueueName(queueName, cluster);
|
|
|
+ else
|
|
|
+ queueName.append(targetCluster.queueName.get());
|
|
|
+
|
|
|
+ CWsSMCQueue* jobQueue;
|
|
|
+ if (targetCluster.clusterType == ThorLCRCluster)
|
|
|
+ jobQueue = &targetCluster.clusterQueue;
|
|
|
+ else
|
|
|
+ jobQueue = &targetCluster.agentQueue;
|
|
|
+
|
|
|
+ StringBuffer instance;
|
|
|
+ if ((targetCluster.clusterType == ThorLCRCluster) || (targetCluster.clusterType == RoxieCluster))
|
|
|
+ statusServerNode->getProp("@cluster", instance);
|
|
|
+ else
|
|
|
+ instance.appendf("%s on %s:%d", serverName, node, port);
|
|
|
+
|
|
|
+ const char* targetClusterName = targetCluster.clusterName.get();
|
|
|
+ Owned<IPropertyTreeIterator> wuids(statusServerNode->getElements("WorkUnit"));
|
|
|
+ ForEach(*wuids)
|
|
|
+ {
|
|
|
+ const char* wuid=wuids->query().queryProp(NULL);
|
|
|
+ if (!wuid || !*wuid)
|
|
|
+ continue;
|
|
|
+
|
|
|
+ Owned<IEspActiveWorkunit> wu;
|
|
|
+ createActiveWorkUnit(wu, context, wuid, !strieq(targetClusterName, instance.str()) ? instance.str() : NULL, 0, serverName, queueName, instance.str(), targetClusterName);
|
|
|
+ if (wu->getStateID() == WUStateRunning) //'aborting' may be another possible status
|
|
|
+ {
|
|
|
+ StringBuffer durationStr, subgraphStr;
|
|
|
+ int sgDuration = statusServerNode->getPropInt("@sg_duration", -1);
|
|
|
+ int subgraph = statusServerNode->getPropInt("@subgraph", -1);
|
|
|
+ const char* graph = statusServerNode->queryProp("@graph");
|
|
|
+ durationStr.appendf("%d min", sgDuration);
|
|
|
+ subgraphStr.appendf("%d", subgraph);
|
|
|
+ if (subgraph > -1 && sgDuration > -1)
|
|
|
+ {
|
|
|
+ wu->setGraphName(graph);
|
|
|
+ wu->setDuration(durationStr.str());
|
|
|
+ wu->setGID(subgraphStr.str());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (statusServerNode->getPropInt("@memoryBlocked ", 0) != 0)
|
|
|
+ wu->setMemoryBlocked(1);
|
|
|
+ }
|
|
|
+
|
|
|
+ aws.append(*wu.getLink());
|
|
|
+ jobQueue->countRunningJobs++;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void CWsSMCEx::setServerJobQueue(double version, const char* serverType, const char* serverName, const char* queueName, IEspServerJobQueue& serverInfo)
|
|
|
+{
|
|
|
+ StringBuffer queueState, queueStateDetails;
|
|
|
+ Owned<IJobQueue> queue = createJobQueue(queueName);
|
|
|
+ if (queue->stopped(queueStateDetails))
|
|
|
+ queueState.set("stopped");
|
|
|
+ else if (queue->paused(queueStateDetails))
|
|
|
+ queueState.set("paused");
|
|
|
+ else
|
|
|
+ queueState.set("running");
|
|
|
+
|
|
|
+ serverInfo.setQueueName(queueName);
|
|
|
+ serverInfo.setServerType(serverType);
|
|
|
+ if (serverName && *serverName)
|
|
|
+ serverInfo.setServerName(serverName);
|
|
|
+ setServerJobQueueStatus(version, &serverInfo, queueState, queueStateDetails);
|
|
|
+}
|