|
@@ -42,6 +42,10 @@
|
|
|
#define STATUS_SERVER_ECLCCSERVER "ECLCCserver"
|
|
|
#define STATUS_SERVER_ECLAGENT "ECLagent"
|
|
|
|
|
|
+#define CLUSTER_TYPE_THOR "Thor"
|
|
|
+#define CLUSTER_TYPE_HTHOR "HThor"
|
|
|
+#define CLUSTER_TYPE_ROXIE "Roxie"
|
|
|
+
|
|
|
static const char* FEATURE_URL = "SmcAccess";
|
|
|
const char* THORQUEUE_FEATURE = "ThorQueueAccess";
|
|
|
static const char* ROXIE_CONTROL_URL = "RoxieControlAccess";
|
|
@@ -533,7 +537,7 @@ void CWsSMCEx::getWUsNotOnTargetCluster(IEspContext &context, IPropertyTree* ser
|
|
|
const char* instance = serverNode.queryProp("@node");
|
|
|
const char* queueName = serverNode.queryProp("@queue");
|
|
|
unsigned port = serverNode.getPropInt("@mpport", 0);
|
|
|
- if (!serverName || !*serverName || !instance || !*instance || strieq(serverName, "DFUserver") ||//DFUServer already handled separately
|
|
|
+ if (!serverName || !*serverName || !instance || !*instance || strieq(serverName, STATUS_SERVER_DFUSERVER) ||//DFUServer already handled separately
|
|
|
strieq(serverName, "ThorMaster") || strieq(serverName, "RoxieServer") || strieq(serverName, "HThorServer"))//target clusters already handled separately
|
|
|
continue;
|
|
|
|
|
@@ -549,7 +553,7 @@ void CWsSMCEx::getWUsNotOnTargetCluster(IEspContext &context, IPropertyTree* ser
|
|
|
continue;
|
|
|
|
|
|
Owned<IEspActiveWorkunit> wu;
|
|
|
- createActiveWorkUnit(wu, context, wuid, NULL, 0, serverName, queueName, instance, NULL, false);
|
|
|
+ createActiveWorkUnit(wu, context, wuid, NULL, 0, serverName, queueName, instanceName.str(), NULL, false);
|
|
|
aws.append(*wu.getLink());
|
|
|
}
|
|
|
if (!uniqueServers.getValue(instanceName))
|
|
@@ -589,7 +593,7 @@ void CWsSMCEx::readDFUWUs(IEspContext &context, const char* queueName, const cha
|
|
|
}
|
|
|
|
|
|
Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(wuid, uname.str(), jname.str(), state.str(), "normal"));
|
|
|
- wu->setServer("DFUserver");
|
|
|
+ wu->setServer(STATUS_SERVER_DFUSERVER);
|
|
|
wu->setInstance(serverName);
|
|
|
wu->setQueueName(queueName);
|
|
|
aws.append(*wu.getLink());
|
|
@@ -617,7 +621,7 @@ void CWsSMCEx::getDFUServersAndWUs(IEspContext &context, IPropertyTree* envRoot,
|
|
|
{
|
|
|
const char *queueName = queues.item(q);
|
|
|
readDFUWUs(context, queueName, serverName, aws);
|
|
|
- addServerJobQueue(serverJobQueues, queueName, serverName, "DFUserver", NULL, 0);
|
|
|
+ addServerJobQueue(serverJobQueues, queueName, serverName, STATUS_SERVER_DFUSERVER, NULL, 0);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -661,7 +665,7 @@ void CWsSMCEx::clearActivityInfoCache()
|
|
|
activityInfoCache.clear();
|
|
|
}
|
|
|
|
|
|
-ActivityInfo* CWsSMCEx::getActivityInfo(IEspContext &context, IEspActivityRequest &req)
|
|
|
+ActivityInfo* CWsSMCEx::getActivityInfo(IEspContext &context)
|
|
|
{
|
|
|
CriticalBlock b(getActivityCrit);
|
|
|
|
|
@@ -670,11 +674,11 @@ ActivityInfo* CWsSMCEx::getActivityInfo(IEspContext &context, IEspActivityReques
|
|
|
|
|
|
DBGLOG("CWsSMCEx::getActivityInfo - rebuild cached information");
|
|
|
|
|
|
- activityInfoCache.setown(createActivityInfo(context, req));
|
|
|
+ activityInfoCache.setown(createActivityInfo(context));
|
|
|
return activityInfoCache.getLink();
|
|
|
}
|
|
|
|
|
|
-ActivityInfo* CWsSMCEx::createActivityInfo(IEspContext &context, IEspActivityRequest &req)
|
|
|
+ActivityInfo* CWsSMCEx::createActivityInfo(IEspContext &context)
|
|
|
{
|
|
|
Owned<IEnvironmentFactory> factory = getEnvironmentFactory();
|
|
|
Owned<IConstEnvironment> env = factory->openEnvironment();
|
|
@@ -745,7 +749,7 @@ bool CWsSMCEx::onActivity(IEspContext &context, IEspActivityRequest &req, IEspAc
|
|
|
if (version >= 1.06)
|
|
|
setBannerAndChatData(version, resp);
|
|
|
|
|
|
- Owned<ActivityInfo> activityInfo = getActivityInfo(context, req);
|
|
|
+ Owned<ActivityInfo> activityInfo = getActivityInfo(context);
|
|
|
setActivityResponse(context, activityInfo, req, resp);
|
|
|
}
|
|
|
catch(IException* e)
|
|
@@ -922,11 +926,11 @@ void CWsSMCEx::readRunningWUsOnStatusServer(IEspContext& context, IPropertyTree*
|
|
|
createActiveWorkUnit(wu, context, wuid, instance.str(), 0, serverName, serverName, instance.str(), targetClusterName, false);
|
|
|
|
|
|
if (targetCluster->clusterType == ThorLCRCluster)
|
|
|
- wu->setClusterType("Thor");
|
|
|
+ wu->setClusterType(CLUSTER_TYPE_THOR);
|
|
|
else if (targetCluster->clusterType == RoxieCluster)
|
|
|
- wu->setClusterType("Roxie");
|
|
|
+ wu->setClusterType(CLUSTER_TYPE_ROXIE);
|
|
|
else
|
|
|
- wu->setClusterType("HThor");
|
|
|
+ wu->setClusterType(CLUSTER_TYPE_HTHOR);
|
|
|
wu->setClusterQueueName(targetCluster->queueName.get());
|
|
|
|
|
|
if (wu->getStateID() != WUStateRunning)
|
|
@@ -1162,14 +1166,8 @@ void CWsSMCEx::setESPTargetClusters(IEspContext& context, CIArrayOf<CWsSMCTarget
|
|
|
{
|
|
|
ForEachItemIn(i, targetClusters)
|
|
|
{
|
|
|
- CWsSMCTargetCluster& targetCluster = targetClusters.item(i);
|
|
|
Owned<IEspTargetCluster> respTargetCluster = new CTargetCluster("", "");
|
|
|
- respTargetCluster->setClusterName(targetCluster.clusterName.get());
|
|
|
- respTargetCluster->setClusterSize(targetCluster.clusterSize);
|
|
|
- respTargetCluster->setClusterType(targetCluster.clusterType);
|
|
|
- respTargetCluster->setQueueName(targetCluster.queueName.get());
|
|
|
- respTargetCluster->setQueueStatus(targetCluster.queueStatus.get());
|
|
|
- setClusterStatus(context, targetCluster, respTargetCluster);
|
|
|
+ setESPTargetCluster(context, targetClusters.item(i), respTargetCluster);
|
|
|
respTargetClusters.append(*respTargetCluster.getClear());
|
|
|
}
|
|
|
}
|
|
@@ -2044,253 +2042,186 @@ void CWsSMCEx::getStatusServerInfo(IEspContext &context, const char *serverType,
|
|
|
if (!serverType || !*serverType)
|
|
|
throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Server type not specified.");
|
|
|
|
|
|
- if (strieq(serverType,STATUS_SERVER_THOR) || strieq(serverType,STATUS_SERVER_HTHOR) || strieq(serverType,STATUS_SERVER_ROXIE))
|
|
|
+ Owned<ActivityInfo> activityInfo = getActivityInfo(context);
|
|
|
+ if (!activityInfo)
|
|
|
+ throw MakeStringException(ECLWATCH_INTERNAL_ERROR, "Failed to get Activity Info cache.");
|
|
|
+
|
|
|
+ if (strieq(serverType,STATUS_SERVER_THOR))
|
|
|
+ {
|
|
|
+ setTargetClusterInfo(context, serverType, server, activityInfo->thorTargetClusters, activityInfo->aws, statusServerInfo);
|
|
|
+ }
|
|
|
+ else if (strieq(serverType,STATUS_SERVER_ROXIE))
|
|
|
+ {
|
|
|
+ setTargetClusterInfo(context, serverType, server, activityInfo->roxieTargetClusters, activityInfo->aws, statusServerInfo);
|
|
|
+ }
|
|
|
+ else if (strieq(serverType,STATUS_SERVER_HTHOR))
|
|
|
{
|
|
|
- if (!server || !*server)
|
|
|
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "cluster not specified.");
|
|
|
- getStatusServerInfo(context, server, statusServerInfo);
|
|
|
+ setTargetClusterInfo(context, serverType, server, activityInfo->hthorTargetClusters, activityInfo->aws, statusServerInfo);
|
|
|
}
|
|
|
- else if (!strieq(serverType,STATUS_SERVER_DFUSERVER))
|
|
|
+ else if (strieq(serverType,STATUS_SERVER_DFUSERVER))
|
|
|
{
|
|
|
- if (!networkAddress || !*networkAddress)
|
|
|
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Server network address not specified.");
|
|
|
- getStatusServerInfo(context, serverType, networkAddress, port, statusServerInfo);
|
|
|
+ setServerQueueInfo(context, serverType, server, activityInfo->serverJobQueues, activityInfo->aws, statusServerInfo);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- if (!server || !*server)
|
|
|
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Server not specified.");
|
|
|
- getDFUServerInfo(context, server, statusServerInfo);
|
|
|
+ setServerQueueInfo(context, serverType, networkAddress, port, activityInfo->serverJobQueues, activityInfo->aws, statusServerInfo);
|
|
|
}
|
|
|
- return;
|
|
|
}
|
|
|
|
|
|
-void CWsSMCEx::getStatusServerInfo(IEspContext &context, const char* clusteName, IEspStatusServerInfo& statusServerInfo)
|
|
|
+void CWsSMCEx::setTargetClusterInfo(IEspContext &context, const char *serverType, const char *clusterName, const CIArrayOf<CWsSMCTargetCluster>& targetClusters,
|
|
|
+ const IArrayOf<IEspActiveWorkunit>& aws, IEspStatusServerInfo& statusServerInfo)
|
|
|
{
|
|
|
- double version = context.getClientVersion();
|
|
|
+ if (!clusterName || !*clusterName)
|
|
|
+ throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Cluster name not specified.");
|
|
|
|
|
|
- Owned<IConstWUClusterInfo> info = getTargetClusterInfo(clusteName);
|
|
|
- if (!info)
|
|
|
- throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO,"Failed to get target cluster information.");
|
|
|
- CWsSMCTargetCluster targetCluster;
|
|
|
- readTargetClusterInfo(context, *info, NULL, &targetCluster);
|
|
|
-
|
|
|
- bool foundQueueInStatusServer = false;
|
|
|
- Owned<IPropertyTree> statusServerTree = getStatusServerTree(info);
|
|
|
- if (statusServerTree)
|
|
|
- {
|
|
|
- foundQueueInStatusServer = true;
|
|
|
-
|
|
|
- BoolHash uniqueWUIDs;
|
|
|
- IArrayOf<IEspActiveWorkunit> aws;
|
|
|
- StringBuffer networkAddress;
|
|
|
- statusServerTree->getProp("@node", networkAddress);
|
|
|
- unsigned port = statusServerTree->getPropInt("@mpport");
|
|
|
- readRunningWUsOnCluster(context, clusteName, networkAddress.str(), port, targetCluster, statusServerTree, uniqueWUIDs, aws);
|
|
|
- readWUsAndStateFromJobQueue(context, targetCluster, uniqueWUIDs, aws);
|
|
|
- statusServerInfo.setWorkunits(aws);
|
|
|
- }
|
|
|
-
|
|
|
- IEspTargetCluster& clusterInfo = statusServerInfo.updateTargetClusterInfo();
|
|
|
- clusterInfo.setClusterName(targetCluster.clusterName.get());
|
|
|
- clusterInfo.setClusterSize(targetCluster.clusterSize);
|
|
|
- clusterInfo.setClusterType(targetCluster.clusterType);
|
|
|
- clusterInfo.setQueueName(targetCluster.queueName.get());
|
|
|
- clusterInfo.setQueueStatus(targetCluster.queueStatus.get());
|
|
|
- if (targetCluster.clusterType != ThorLCRCluster)
|
|
|
- targetCluster.agentQueue.foundQueueInStatusServer = foundQueueInStatusServer;
|
|
|
- else
|
|
|
- targetCluster.clusterQueue.foundQueueInStatusServer = foundQueueInStatusServer;
|
|
|
- setClusterStatus(context, targetCluster, &clusterInfo);
|
|
|
+ IEspTargetCluster& clusterInfo = statusServerInfo.updateTargetClusterInfo();
|
|
|
+ ForEachItemIn(i, targetClusters)
|
|
|
+ {
|
|
|
+ CWsSMCTargetCluster& targetCluster = targetClusters.item(i);
|
|
|
+ const char* name = targetCluster.clusterName.get();
|
|
|
+ if (name && strieq(name, clusterName))
|
|
|
+ {
|
|
|
+ setESPTargetCluster(context, targetCluster, &clusterInfo);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ setActiveWUs(context, serverType, clusterName, clusterInfo.getQueueName(), aws, statusServerInfo);
|
|
|
}
|
|
|
|
|
|
-void CWsSMCEx::getStatusServerInfo(IEspContext &context, const char* type, const char *networkAddress, unsigned port, IEspStatusServerInfo& statusServerInfo)
|
|
|
+void CWsSMCEx::setServerQueueInfo(IEspContext &context, const char *serverType, const char *serverName, const IArrayOf<IEspServerJobQueue>& serverJobQueues,
|
|
|
+ const IArrayOf<IEspActiveWorkunit>& aws, IEspStatusServerInfo& statusServerInfo)
|
|
|
{
|
|
|
- double version = context.getClientVersion();
|
|
|
-
|
|
|
- Owned<IPropertyTree> statusServerTree = getStatusServerTree(networkAddress, port);
|
|
|
- if (!statusServerTree)
|
|
|
- throw MakeStringException(ECLWATCH_CANNOT_GET_STATUS_INFO, "Server not attached");
|
|
|
-
|
|
|
- IEspServerJobQueue& serverInfo = statusServerInfo.updateServerInfo();
|
|
|
- serverInfo.setNetworkAddress(networkAddress);
|
|
|
- serverInfo.setPort(port);
|
|
|
-
|
|
|
- StringBuffer queueName, instance;
|
|
|
- statusServerTree->getProp("@queue", queueName);
|
|
|
- setServerJobQueue(version, type, NULL, queueName.str(), serverInfo);
|
|
|
- instance.appendf("%s on %s:%d", type, networkAddress, port);
|
|
|
+ if (!serverName || !*serverName)
|
|
|
+ throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Server name not specified.");
|
|
|
|
|
|
- IArrayOf<IEspActiveWorkunit> aws;
|
|
|
- Owned<IPropertyTreeIterator> wuids(statusServerTree->getElements("WorkUnit"));
|
|
|
- ForEach(*wuids)
|
|
|
+ ForEachItemIn(i, serverJobQueues)
|
|
|
{
|
|
|
- const char* wuid=wuids->query().queryProp(NULL);
|
|
|
- if (!wuid || !*wuid)
|
|
|
- continue;
|
|
|
-
|
|
|
- Owned<IEspActiveWorkunit> wu;
|
|
|
- createActiveWorkUnit(wu, context, wuid, NULL, 0, type, queueName.str(), instance, NULL, true);
|
|
|
- aws.append(*wu.getLink());
|
|
|
+ IEspServerJobQueue& serverJobQueue = serverJobQueues.item(i);
|
|
|
+ const char* name = serverJobQueue.getServerName();
|
|
|
+ if (name && strieq(name, serverName))
|
|
|
+ {
|
|
|
+ IEspServerJobQueue& serverQueue = statusServerInfo.updateServerInfo();
|
|
|
+ serverQueue.copy(serverJobQueue);
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
- statusServerInfo.setWorkunits(aws);
|
|
|
+
|
|
|
+ setActiveWUs(context, serverType, serverName, aws, statusServerInfo);
|
|
|
}
|
|
|
|
|
|
-void CWsSMCEx::getDFUServerInfo(IEspContext &context, const char* serverName, IEspStatusServerInfo& statusServerInfo)
|
|
|
+void CWsSMCEx::setServerQueueInfo(IEspContext &context, const char *serverType, const char *networkAddress, unsigned port, const IArrayOf<IEspServerJobQueue>& serverJobQueues,
|
|
|
+ const IArrayOf<IEspActiveWorkunit>& aws, IEspStatusServerInfo& statusServerInfo)
|
|
|
{
|
|
|
- double version = context.getClientVersion();
|
|
|
+ if (!networkAddress || !*networkAddress)
|
|
|
+ throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Network address not specified.");
|
|
|
|
|
|
- VStringBuffer xpath("/Environment/Software/%s[@name=\"%s\"]", eqDfu, serverName);
|
|
|
- Owned<IRemoteConnection> connEnv = querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
|
|
|
- if (!connEnv)
|
|
|
- throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO,"Failed to get environment information.");
|
|
|
- IPropertyTree* serviceTree = connEnv->queryRoot();
|
|
|
- if (!serviceTree)
|
|
|
- throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO,"Failed to get environment information.");
|
|
|
- const char *queueName = serviceTree->queryProp("@queue");
|
|
|
- if (!queueName || !*queueName)
|
|
|
- throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Server queue not found.");
|
|
|
+ ForEachItemIn(i, serverJobQueues)
|
|
|
+ {
|
|
|
+ IEspServerJobQueue& serverJobQueue = serverJobQueues.item(i);
|
|
|
+ const char* ipAddress = serverJobQueue.getNetworkAddress();
|
|
|
+ unsigned thePort = serverJobQueue.getPort();
|
|
|
+ if (ipAddress && strieq(ipAddress, networkAddress) && (thePort == port))
|
|
|
+ {
|
|
|
+ IEspServerJobQueue& serverQueue = statusServerInfo.updateServerInfo();
|
|
|
+ serverQueue.copy(serverJobQueue);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- setServerJobQueue(version, "DFUserver", serverName, queueName, statusServerInfo.updateServerInfo());
|
|
|
+ VStringBuffer instance("%s_on_%s:%d", serverType, networkAddress, port);
|
|
|
+ setActiveWUs(context, serverType, instance.str(), aws, statusServerInfo);
|
|
|
+}
|
|
|
|
|
|
- IArrayOf<IEspActiveWorkunit> aws;
|
|
|
- readDFUWUs(context, queueName, serverName, aws);
|
|
|
- statusServerInfo.setWorkunits(aws);
|
|
|
+void CWsSMCEx::setESPTargetCluster(IEspContext &context, CWsSMCTargetCluster& targetCluster, IEspTargetCluster* espTargetCluster)
|
|
|
+{
|
|
|
+ espTargetCluster->setClusterName(targetCluster.clusterName.get());
|
|
|
+ espTargetCluster->setClusterSize(targetCluster.clusterSize);
|
|
|
+ espTargetCluster->setClusterType(targetCluster.clusterType);
|
|
|
+ espTargetCluster->setQueueName(targetCluster.queueName.get());
|
|
|
+ espTargetCluster->setQueueStatus(targetCluster.queueStatus.get());
|
|
|
+ setClusterStatus(context, targetCluster, espTargetCluster);
|
|
|
}
|
|
|
|
|
|
-IPropertyTree* CWsSMCEx::getStatusServerTree(IConstWUClusterInfo* info)
|
|
|
+void CWsSMCEx::setActiveWUs(IEspContext &context, const char *serverType, const char *clusterName, const char *queueName, const IArrayOf<IEspActiveWorkunit>& aws, IEspStatusServerInfo& statusServerInfo)
|
|
|
{
|
|
|
- SCMStringBuffer str;
|
|
|
- StringBuffer xpath;
|
|
|
- if (info->getPlatform() != HThorCluster)
|
|
|
- {
|
|
|
- if (info->getPlatform() == ThorLCRCluster)
|
|
|
- xpath.setf("/Status/Servers/Server[@name=\"%s\"][@cluster=\"%s\"]", getStatusServerTypeName(WsSMCSSTThorLCRCluster), info->getThorProcesses().item(0));
|
|
|
- else
|
|
|
- xpath.setf("/Status/Servers/Server[@name=\"%s\"][@cluster=\"%s\"]", getStatusServerTypeName(WsSMCSSTRoxieCluster), info->getRoxieProcess(str).str());
|
|
|
- Owned<IRemoteConnection> connStatusServer = querySDS().connect(xpath.str(),myProcessSession(),RTM_LOCK_READ,SDS_LOCK_TIMEOUT);
|
|
|
- if (!connStatusServer)
|
|
|
- return NULL;
|
|
|
+ const char* clusterType = CLUSTER_TYPE_THOR;
|
|
|
+ if (strieq(serverType,STATUS_SERVER_ROXIE))
|
|
|
+ clusterType = CLUSTER_TYPE_ROXIE;
|
|
|
+ else if (strieq(serverType,STATUS_SERVER_HTHOR))
|
|
|
+ clusterType = CLUSTER_TYPE_HTHOR;
|
|
|
|
|
|
- Owned<IPropertyTree> retServerTree = connStatusServer->queryRoot()->getBranch(NULL);
|
|
|
- return retServerTree.getClear();
|
|
|
- }
|
|
|
- else
|
|
|
+ IArrayOf<IEspActiveWorkunit> awsOnThisQueue;
|
|
|
+ ForEachItemIn(i, aws)
|
|
|
{
|
|
|
- Owned<IRemoteConnection> connStatusServer = querySDS().connect("/Status/Servers",myProcessSession(),RTM_LOCK_READ,SDS_LOCK_TIMEOUT);
|
|
|
- if (!connStatusServer)
|
|
|
- throw MakeStringException(ECLWATCH_CANNOT_GET_STATUS_INFO, "Status servers not found");
|
|
|
+ IEspActiveWorkunit& wu = aws.item(i);
|
|
|
+ const char* wuid = wu.getWuid();
|
|
|
+ if (!wuid || !*wuid)
|
|
|
+ continue;
|
|
|
|
|
|
- info->getAgentQueue(str);
|
|
|
- xpath.setf("Server[@name=\"%s\"]", getStatusServerTypeName(WsSMCSSTHThorCluster));
|
|
|
- Owned<IPropertyTreeIterator> it(connStatusServer->queryRoot()->getElements(xpath));
|
|
|
- ForEach(*it)
|
|
|
+ const char* wuServerType = wu.getServer();
|
|
|
+ const char* wuClusterName = wu.getTargetClusterName();
|
|
|
+ if (!wuServerType || !wuClusterName || !strieq(serverType, wuServerType) || !strieq(clusterName, wuClusterName))
|
|
|
{
|
|
|
- IPropertyTree &serverTree = it->query();
|
|
|
- const char *queueNames = serverTree.queryProp("@queue");
|
|
|
- if (!queueNames || !*queueNames)
|
|
|
+ const char* wuClusterType = wu.getClusterType();
|
|
|
+ const char* wuClusterQueueName = wu.getClusterQueueName();
|
|
|
+ if (!wuClusterType || !wuClusterQueueName || !strieq(clusterType, wuClusterType) || !strieq(queueName, wuClusterQueueName))
|
|
|
continue;
|
|
|
-
|
|
|
- StringArray qlist;
|
|
|
- qlist.appendListUniq(queueNames, ",");
|
|
|
- ForEachItemIn(q, qlist)
|
|
|
- {
|
|
|
- if (!strieq(qlist.item(q), str.str()))
|
|
|
- continue;
|
|
|
-
|
|
|
- Owned<IPropertyTree> retServerTree = serverTree.getBranch(NULL);
|
|
|
- return retServerTree.getClear();
|
|
|
- }
|
|
|
}
|
|
|
- }
|
|
|
- return NULL;
|
|
|
-}
|
|
|
|
|
|
-IPropertyTree* CWsSMCEx::getStatusServerTree(const char *networkAddress, unsigned port)
|
|
|
-{
|
|
|
- if (!networkAddress || !*networkAddress)
|
|
|
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "Network Address not specified");
|
|
|
-
|
|
|
- VStringBuffer xpath("/Status/Servers/Server[@node=\"%s\"][@mpport=\"%d\"]", networkAddress, port);
|
|
|
- Owned<IRemoteConnection> connStatusServer = querySDS().connect(xpath.str(),myProcessSession(),RTM_LOCK_READ,SDS_LOCK_TIMEOUT);
|
|
|
- if (!connStatusServer)
|
|
|
- return NULL;
|
|
|
-
|
|
|
- Owned<IPropertyTree> retServerTree = connStatusServer->queryRoot()->getBranch(NULL);
|
|
|
- return retServerTree.getClear();
|
|
|
+ Owned<IEspActiveWorkunit> wuOnThisQueue = new CActiveWorkunitWrapper(wuid, "", "", "", "");
|
|
|
+ setActiveWUs(context, wu, wuOnThisQueue);
|
|
|
+ awsOnThisQueue.append(*wuOnThisQueue.getLink());
|
|
|
+ }
|
|
|
+ statusServerInfo.setWorkunits(awsOnThisQueue);
|
|
|
}
|
|
|
|
|
|
-void CWsSMCEx::readRunningWUsOnCluster(IEspContext& context, const char* serverName, const char* node, unsigned port,
|
|
|
- CWsSMCTargetCluster& targetCluster, IPropertyTree* statusServerNode, BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws)
|
|
|
+void CWsSMCEx::setActiveWUs(IEspContext &context, const char *serverType, const char *instance, const IArrayOf<IEspActiveWorkunit>& aws, IEspStatusServerInfo& statusServerInfo)
|
|
|
{
|
|
|
- const char *cluster = statusServerNode->queryProp("Cluster");
|
|
|
- StringBuffer queueName;
|
|
|
- if (cluster) // backward compat check.
|
|
|
- getClusterThorQueueName(queueName, cluster);
|
|
|
- else
|
|
|
- queueName.append(targetCluster.queueName.get());
|
|
|
-
|
|
|
- CWsSMCQueue* jobQueue;
|
|
|
- if (targetCluster.clusterType == ThorLCRCluster)
|
|
|
- jobQueue = &targetCluster.clusterQueue;
|
|
|
- else
|
|
|
- jobQueue = &targetCluster.agentQueue;
|
|
|
-
|
|
|
- StringBuffer instance;
|
|
|
- if ((targetCluster.clusterType == ThorLCRCluster) || (targetCluster.clusterType == RoxieCluster))
|
|
|
- statusServerNode->getProp("@cluster", instance);
|
|
|
- else
|
|
|
- instance.appendf("%s on %s:%d", serverName, node, port);
|
|
|
-
|
|
|
- const char* targetClusterName = targetCluster.clusterName.get();
|
|
|
- Owned<IPropertyTreeIterator> wuids(statusServerNode->getElements("WorkUnit"));
|
|
|
- ForEach(*wuids)
|
|
|
+ IArrayOf<IEspActiveWorkunit> awsOnThisQueue;
|
|
|
+ ForEachItemIn(i, aws)
|
|
|
{
|
|
|
- const char* wuid=wuids->query().queryProp(NULL);
|
|
|
+ IEspActiveWorkunit& wu = aws.item(i);
|
|
|
+ const char* wuid = wu.getWuid();
|
|
|
if (!wuid || !*wuid)
|
|
|
continue;
|
|
|
|
|
|
- Owned<IEspActiveWorkunit> wu;
|
|
|
- createActiveWorkUnit(wu, context, wuid, !strieq(targetClusterName, instance.str()) ? instance.str() : NULL, 0, serverName,
|
|
|
- queueName, instance.str(), targetClusterName, true);
|
|
|
- if (wu->getStateID() == WUStateRunning) //'aborting' may be another possible status
|
|
|
- {
|
|
|
- StringBuffer durationStr, subgraphStr;
|
|
|
- int sgDuration = statusServerNode->getPropInt("@sg_duration", -1);
|
|
|
- int subgraph = statusServerNode->getPropInt("@subgraph", -1);
|
|
|
- const char* graph = statusServerNode->queryProp("@graph");
|
|
|
- durationStr.appendf("%d min", sgDuration);
|
|
|
- subgraphStr.appendf("%d", subgraph);
|
|
|
- if (subgraph > -1 && sgDuration > -1)
|
|
|
- {
|
|
|
- wu->setGraphName(graph);
|
|
|
- wu->setDuration(durationStr.str());
|
|
|
- wu->setGID(subgraphStr.str());
|
|
|
- }
|
|
|
-
|
|
|
- if (statusServerNode->getPropInt("@memoryBlocked ", 0) != 0)
|
|
|
- wu->setMemoryBlocked(1);
|
|
|
- }
|
|
|
+ const char* wuInstance = wu.getInstance();
|
|
|
+ if (!wuInstance || !strieq(wuInstance, instance))
|
|
|
+ continue;
|
|
|
|
|
|
- aws.append(*wu.getLink());
|
|
|
- jobQueue->countRunningJobs++;
|
|
|
+ Owned<IEspActiveWorkunit> wuOnThisQueue = new CActiveWorkunitWrapper(wuid, "", "", "", "");
|
|
|
+ setActiveWUs(context, wu, wuOnThisQueue);
|
|
|
+ awsOnThisQueue.append(*wuOnThisQueue.getLink());
|
|
|
}
|
|
|
+ statusServerInfo.setWorkunits(awsOnThisQueue);
|
|
|
}
|
|
|
|
|
|
-void CWsSMCEx::setServerJobQueue(double version, const char* serverType, const char* serverName, const char* queueName, IEspServerJobQueue& serverInfo)
|
|
|
+void CWsSMCEx::setActiveWUs(IEspContext &context, IEspActiveWorkunit& wu, IEspActiveWorkunit* wuToSet)
|
|
|
{
|
|
|
- StringBuffer queueState, queueStateDetails;
|
|
|
- Owned<IJobQueue> queue = createJobQueue(queueName);
|
|
|
- if (queue->stopped(queueStateDetails))
|
|
|
- queueState.set("stopped");
|
|
|
- else if (queue->paused(queueStateDetails))
|
|
|
- queueState.set("paused");
|
|
|
- else
|
|
|
- queueState.set("running");
|
|
|
+ try
|
|
|
+ {
|
|
|
+ const char* user = context.queryUserId();
|
|
|
+ const char* owner = wu.getOwner();
|
|
|
|
|
|
- serverInfo.setQueueName(queueName);
|
|
|
- serverInfo.setServerType(serverType);
|
|
|
- if (serverName && *serverName)
|
|
|
- serverInfo.setServerName(serverName);
|
|
|
- setServerJobQueueStatus(version, &serverInfo, queueState, queueStateDetails);
|
|
|
+ //if no access, throw an exception and go to the 'catch' section.
|
|
|
+ context.validateFeatureAccess((!owner || !*owner || (user && streq(user, owner))) ? OWN_WU_ACCESS : OTHERS_WU_ACCESS, SecAccess_Read, true);
|
|
|
+ wuToSet->copy(wu);
|
|
|
+ }
|
|
|
+ catch (IException *e)
|
|
|
+ { //if the wu cannot be opened for some reason, the openWorkUnit() inside the CActiveWorkunitWrapper() may throw an exception.
|
|
|
+ //We do not want the exception stops this process of retrieving/showing all active WUs. And that WU should still be displayed
|
|
|
+ //with the exception.
|
|
|
+ wuToSet->setStateID(WUStateUnknown);
|
|
|
+ wuToSet->setServer(wu.getServer());
|
|
|
+ wuToSet->setQueueName(wu.getQueueName());
|
|
|
+ const char* instanceName = wu.getInstance();
|
|
|
+ const char* targetClusterName = wu.getTargetClusterName();
|
|
|
+ if (instanceName && *instanceName)
|
|
|
+ wuToSet->setInstance(instanceName); // JCSMORE In thor case at least, if queued it is unknown which instance it will run on..
|
|
|
+ if (targetClusterName && *targetClusterName)
|
|
|
+ wuToSet->setTargetClusterName(targetClusterName);
|
|
|
+
|
|
|
+ e->Release();
|
|
|
+ }
|
|
|
}
|