|
@@ -95,6 +95,34 @@ struct QueueLock
|
|
|
Linked<IJobQueue> queue;
|
|
|
};
|
|
|
|
|
|
+static int sortTargetClustersByNameDescending(IInterface **L, IInterface **R)
|
|
|
+{
|
|
|
+ IEspTargetCluster *left = (IEspTargetCluster *) *L;
|
|
|
+ IEspTargetCluster *right = (IEspTargetCluster *) *R;
|
|
|
+ return strcmp(right->getClusterName(), left->getClusterName());
|
|
|
+}
|
|
|
+
|
|
|
+static int sortTargetClustersByNameAscending(IInterface **L, IInterface **R)
|
|
|
+{
|
|
|
+ IEspTargetCluster *left = (IEspTargetCluster *) *L;
|
|
|
+ IEspTargetCluster *right = (IEspTargetCluster *) *R;
|
|
|
+ return strcmp(left->getClusterName(), right->getClusterName());
|
|
|
+}
|
|
|
+
|
|
|
+static int sortTargetClustersBySizeDescending(IInterface **L, IInterface **R)
|
|
|
+{
|
|
|
+ IEspTargetCluster *left = (IEspTargetCluster *) *L;
|
|
|
+ IEspTargetCluster *right = (IEspTargetCluster *) *R;
|
|
|
+ return right->getClusterSize() - left->getClusterSize();
|
|
|
+}
|
|
|
+
|
|
|
+static int sortTargetClustersBySizeAscending(IInterface **L, IInterface **R)
|
|
|
+{
|
|
|
+ IEspTargetCluster *left = (IEspTargetCluster *) *L;
|
|
|
+ IEspTargetCluster *right = (IEspTargetCluster *) *R;
|
|
|
+ return left->getClusterSize() - right->getClusterSize();
|
|
|
+}
|
|
|
+
|
|
|
void CWsSMCEx::init(IPropertyTree *cfg, const char *process, const char *service)
|
|
|
{
|
|
|
if (!daliClientActive())
|
|
@@ -131,7 +159,7 @@ static void countProgress(IPropertyTree *t,unsigned &done,unsigned &total)
|
|
|
|
|
|
struct CActiveWorkunitWrapper: public CActiveWorkunit
|
|
|
{
|
|
|
- CActiveWorkunitWrapper(IEspContext &context, const char* wuid,unsigned index=0): CActiveWorkunit("","")
|
|
|
+ CActiveWorkunitWrapper(IEspContext &context, const char* wuid,const char* location = NULL,unsigned index=0): CActiveWorkunit("","")
|
|
|
{
|
|
|
double version = context.getClientVersion();
|
|
|
|
|
@@ -140,11 +168,14 @@ struct CActiveWorkunitWrapper: public CActiveWorkunit
|
|
|
SCMStringBuffer state,owner,jobname;
|
|
|
setWuid(wuid);
|
|
|
wu->getStateDesc(state);
|
|
|
- if(index)
|
|
|
+ if(index && location && *location)
|
|
|
+ stateStr.appendf("queued(%d) [%s on %s]", index, state.str(), location);
|
|
|
+ else if(index)
|
|
|
stateStr.appendf("queued(%d) [%s]", index, state.str());
|
|
|
+ else if(location && *location)
|
|
|
+ stateStr.appendf("%s [on %s]", state.str(), location);
|
|
|
else
|
|
|
stateStr.set(state.str());
|
|
|
-
|
|
|
setState(stateStr.str());
|
|
|
setStateID(wu->getState());
|
|
|
if ((version > 1.09) && (wu->getState() == WUStateFailed))
|
|
@@ -213,6 +244,7 @@ bool isInWuList(IArrayOf<IEspActiveWorkunit>& aws, const char* wuid)
|
|
|
return bFound;
|
|
|
}
|
|
|
|
|
|
+//This function will only be called when client version < 1.16
|
|
|
void addQueuedWorkUnits(const char *queueName, CJobQueueContents &contents, IArrayOf<IEspActiveWorkunit> &aws, IEspContext &context, const char *serverName, const char *instanceName)
|
|
|
{
|
|
|
Owned<IJobQueueIterator> iter = contents.getIterator();
|
|
@@ -223,7 +255,7 @@ void addQueuedWorkUnits(const char *queueName, CJobQueueContents &contents, IArr
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(context, iter->query().queryWUID(),++count));
|
|
|
+ Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(context, iter->query().queryWUID(),NULL,++count));
|
|
|
wu->setServer(serverName);
|
|
|
wu->setInstance(instanceName); // JCSMORE In thor case at least, if queued it is unknown which instance it will run on..
|
|
|
wu->setQueueName(queueName);
|
|
@@ -683,7 +715,7 @@ void CWsSMCEx::getServersAndWUs(IEspContext &context, IEspActivityRequest &req,
|
|
|
if (isInWuList(aws, iter->query().queryWUID()))
|
|
|
continue;
|
|
|
|
|
|
- Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(context, iter->query().queryWUID(),++count));
|
|
|
+ Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(context, iter->query().queryWUID(),NULL, ++count));
|
|
|
wu->setServer("ECLCCserver");
|
|
|
wu->setInstance(serverName);
|
|
|
wu->setQueueName(queueName);
|
|
@@ -805,6 +837,501 @@ void CWsSMCEx::getServersAndWUs(IEspContext &context, IEspActivityRequest &req,
|
|
|
|
|
|
resp.setDFUJobs(jobs);
|
|
|
}
|
|
|
+
|
|
|
+void CWsSMCEx::createActiveWorkUnit(Owned<IEspActiveWorkunit>& ownedWU, IEspContext &context, const char* wuid, const char* location,
|
|
|
+ unsigned index, const char* serverName, const char* queueName, const char* instanceName, const char* targetClusterName)
|
|
|
+{
|
|
|
+ try
|
|
|
+ {
|
|
|
+ ownedWU.setown(new CActiveWorkunitWrapper(context, wuid, location, index));
|
|
|
+ }
|
|
|
+ catch (IException *e)
|
|
|
+ {//Exception may be thrown when the openWorkUnit() is called inside the last CActiveWorkunitWrapper()
|
|
|
+ StringBuffer msg;
|
|
|
+ ownedWU.setown(new CActiveWorkunitWrapper(wuid, "", "", e->errorMessage(msg).str(), "normal"));
|
|
|
+ ownedWU->setStateID(WUStateUnknown);
|
|
|
+ }
|
|
|
+
|
|
|
+ ownedWU->setServer(serverName);
|
|
|
+ ownedWU->setQueueName(queueName);
|
|
|
+ if (instanceName && *instanceName)
|
|
|
+ ownedWU->setInstance(instanceName); // JCSMORE In thor case at least, if queued it is unknown which instance it will run on..
|
|
|
+ if (targetClusterName && *targetClusterName)
|
|
|
+ ownedWU->setTargetClusterName(targetClusterName);
|
|
|
+}
|
|
|
+
|
|
|
+void CWsSMCEx::readWUsAndStateFromJobQueue(IEspContext& context, CWsSMCTargetCluster& targetCluster,
|
|
|
+ CWsSMCQueue& jobQueue, const char* queueName, IArrayOf<IEspActiveWorkunit>& aws)
|
|
|
+{
|
|
|
+ CJobQueueContents contents;
|
|
|
+ Owned<IJobQueue> queue = createJobQueue(jobQueue.queueName.str());
|
|
|
+ queue->copyItemsAndState(contents, jobQueue.queueState);
|
|
|
+ Owned<IJobQueueIterator> iter = contents.getIterator();
|
|
|
+ jobQueue.countQueuedJobs=0;
|
|
|
+ ForEach(*iter)
|
|
|
+ {
|
|
|
+ const char* wuid = iter->query().queryWUID();
|
|
|
+ if (isInWuList(aws, wuid))
|
|
|
+ continue;
|
|
|
+
|
|
|
+ const char* queue = targetCluster.clusterName.str();
|
|
|
+ if (queueName && *queueName)
|
|
|
+ queue = queueName;
|
|
|
+
|
|
|
+ Owned<IEspActiveWorkunit> wu;
|
|
|
+ createActiveWorkUnit(wu, context, wuid, jobQueue.queueName.str(), ++jobQueue.countQueuedJobs, targetCluster.statusServerName.str(),
|
|
|
+ queue, NULL, targetCluster.clusterName.str());
|
|
|
+ aws.append(*wu.getLink());
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void CWsSMCEx::readRunningWUsOnServerNode(IEspContext& context, IPropertyTree& serverStatusNode, const char* targetClusterName,
|
|
|
+ unsigned& runningJobsInQueue, BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws)
|
|
|
+{
|
|
|
+ StringBuffer instance, qname, durationStr, subgraphStr;
|
|
|
+ serverStatusNode.getProp("@queue", qname);
|
|
|
+ const char* serverName = serverStatusNode.queryProp("@name");
|
|
|
+ if (serverName && *serverName)
|
|
|
+ {
|
|
|
+ if (strieq("ThorMaster", serverName))
|
|
|
+ serverStatusNode.getProp("@thorname",instance);
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (strieq(serverName, "ECLAgent"))
|
|
|
+ qname.append(serverName);//use set()??
|
|
|
+ instance.appendf("%s on %s", serverName, serverStatusNode.queryProp("@node"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ int sg_duration = serverStatusNode.getPropInt("@sg_duration", -1);
|
|
|
+ const char* graph = serverStatusNode.queryProp("@graph");
|
|
|
+ int subgraph = serverStatusNode.getPropInt("@subgraph", -1);
|
|
|
+ durationStr.appendf("%d min", sg_duration);
|
|
|
+ subgraphStr.appendf("%d", subgraph);
|
|
|
+
|
|
|
+ //get all WUs
|
|
|
+ Owned<IPropertyTreeIterator> wuids(serverStatusNode.getElements("WorkUnit"));
|
|
|
+ ForEach(*wuids)
|
|
|
+ {
|
|
|
+ const char* wuid=wuids->query().queryProp(NULL);
|
|
|
+ if (!wuid || !*wuid)
|
|
|
+ continue;
|
|
|
+
|
|
|
+ uniqueWUIDs.setValue(wuid, true);
|
|
|
+ runningJobsInQueue++;
|
|
|
+
|
|
|
+ StringBuffer queueName;
|
|
|
+ const char* processName = NULL;
|
|
|
+ if (!strieq(targetClusterName, instance.str()))
|
|
|
+ processName = instance.str();
|
|
|
+
|
|
|
+ const char *cluster = serverStatusNode.queryProp("Cluster");
|
|
|
+ if (cluster) // backward compat check.
|
|
|
+ getClusterThorQueueName(queueName, cluster);
|
|
|
+ else
|
|
|
+ queueName.append(qname);
|
|
|
+
|
|
|
+ Owned<IEspActiveWorkunit> wu;
|
|
|
+ createActiveWorkUnit(wu, context, wuid, processName, 0, serverName, queueName, instance.str(), targetClusterName);
|
|
|
+ if (wu->getStateID() != WUStateRunning)
|
|
|
+ {
|
|
|
+ aws.append(*wu.getLink());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (subgraph > -1 && sg_duration > -1)
|
|
|
+ {
|
|
|
+ wu->setGraphName(graph);
|
|
|
+ wu->setDuration(durationStr.str());
|
|
|
+ wu->setGID(subgraphStr.str());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (serverStatusNode.getPropInt("@memoryBlocked ", 0) != 0)
|
|
|
+ wu->setMemoryBlocked(1);
|
|
|
+
|
|
|
+ aws.append(*wu.getLink());
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void CWsSMCEx::readRunningWUsOnECLAgent(IEspContext& context, IPropertyTreeIterator* itrStatusECLagent, CConstWUClusterInfoArray& clusters,
|
|
|
+ CWsSMCTargetCluster& targetCluster, BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws)
|
|
|
+{
|
|
|
+ ForEach(*itrStatusECLagent)
|
|
|
+ {
|
|
|
+ IPropertyTree& serverStatusNode = itrStatusECLagent->query();
|
|
|
+ VStringBuffer instance("ECLagent of %s", serverStatusNode.queryProp("@node"));
|
|
|
+
|
|
|
+ Owned<IPropertyTreeIterator> wuids(serverStatusNode.getElements("WorkUnit"));
|
|
|
+ ForEach(*wuids)
|
|
|
+ {
|
|
|
+ const char* wuid=wuids->query().queryProp(NULL);
|
|
|
+ if (!wuid || !*wuid || uniqueWUIDs.getValue(wuid))
|
|
|
+ continue;
|
|
|
+
|
|
|
+ SCMStringBuffer clusterQueue, clusterName;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ CWUWrapper cwu(wuid, context);
|
|
|
+ cwu->getClusterName(clusterName);
|
|
|
+ if (clusterName.length() < 1)
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ catch (IException *e)
|
|
|
+ {//Exception may be thrown when the openWorkUnit() is called inside the CWUWrapper
|
|
|
+ StringBuffer msg;
|
|
|
+ WARNLOG("Failed to open workunit %s: %s", wuid, e->errorMessage(msg).str());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ StringBuffer clusterType;
|
|
|
+ readClusterTypeAndQueueName(clusters, clusterName.str(), clusterType, clusterQueue);
|
|
|
+ if ((targetCluster.clusterType == ThorLCRCluster) && !streq(targetCluster.clusterQueue.queueName.str(), clusterQueue.str()))
|
|
|
+ continue;
|
|
|
+ if ((targetCluster.clusterType != ThorLCRCluster) && !streq(targetCluster.agentQueue.queueName.str(), clusterQueue.str()))
|
|
|
+ continue;
|
|
|
+
|
|
|
+ Owned<IEspActiveWorkunit> wu;
|
|
|
+ createActiveWorkUnit(wu, context, wuid, instance.str(), 0, "ECLagent", "ECLagent", instance.str(), targetCluster.clusterName.str());
|
|
|
+ if (wu->getStateID() != WUStateRunning)
|
|
|
+ {
|
|
|
+ aws.append(*wu.getLink());
|
|
|
+ targetCluster.agentQueue.countQueuedJobs++;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ targetCluster.agentQueue.countRunningJobs++;
|
|
|
+
|
|
|
+ if (serverStatusNode.getPropInt("@memoryBlocked ", 0) != 0)
|
|
|
+ wu->setMemoryBlocked(1);
|
|
|
+
|
|
|
+ wu->setClusterType(clusterType.str());
|
|
|
+ wu->setClusterQueueName(clusterQueue.str());
|
|
|
+
|
|
|
+ aws.append(*wu.getLink());
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+bool CWsSMCEx::foundQueueInStatusServer(IEspContext& context, IPropertyTree* serverStatusRoot, const char* serverName,
|
|
|
+ const char* processName, const char* processExt)
|
|
|
+{
|
|
|
+ bool foundServer = false;
|
|
|
+ StringBuffer path, queueName;
|
|
|
+ queueName.append(processName).append(processExt);
|
|
|
+ path.appendf("Server[@name=\"%s\"]", serverName);
|
|
|
+ Owned<IPropertyTreeIterator> it(serverStatusRoot->getElements(path));
|
|
|
+ ForEach(*it)
|
|
|
+ {
|
|
|
+ IPropertyTree& serverStatusNode = it->query();
|
|
|
+ const char* queue = serverStatusNode.queryProp("@queue");
|
|
|
+ if (!queue || !*queue)
|
|
|
+ continue;
|
|
|
+
|
|
|
+ StringArray qlist;
|
|
|
+ qlist.appendListUniq(queue, ",");
|
|
|
+ ForEachItemIn(q, qlist)
|
|
|
+ {
|
|
|
+ if (strieq(qlist.item(q), queueName.str()))
|
|
|
+ {
|
|
|
+ foundServer = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (foundServer)
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ return foundServer;
|
|
|
+}
|
|
|
+
|
|
|
+void CWsSMCEx::sortTargetClusters(IArrayOf<IEspTargetCluster>& clusters, const char* sortBy, bool descending)
|
|
|
+{
|
|
|
+ if (!sortBy || !*sortBy || strieq(sortBy, "name"))
|
|
|
+ clusters.sort(descending ? sortTargetClustersByNameDescending : sortTargetClustersByNameAscending);
|
|
|
+ else
|
|
|
+ clusters.sort(descending ? sortTargetClustersBySizeDescending : sortTargetClustersBySizeAscending);
|
|
|
+}
|
|
|
+
|
|
|
+void CWsSMCEx::setClusterQueueStatus(CWsSMCTargetCluster& targetCluster)
|
|
|
+{
|
|
|
+ CWsSMCQueue& jobQueue = targetCluster.clusterQueue;
|
|
|
+ if (targetCluster.clusterType != ThorLCRCluster)
|
|
|
+ jobQueue = targetCluster.agentQueue;
|
|
|
+ if (!jobQueue.queueName.length())
|
|
|
+ return;
|
|
|
+
|
|
|
+ targetCluster.clusterStatusDetails.appendf("%s: ", jobQueue.queueName.str());
|
|
|
+
|
|
|
+ bool queuePausedOrStopped = false;
|
|
|
+ unsigned countRunningJobs = jobQueue.countRunningJobs;
|
|
|
+ unsigned countQueuedJobs = jobQueue.countQueuedJobs;
|
|
|
+ if (targetCluster.clusterType == ThorLCRCluster)
|
|
|
+ {
|
|
|
+ countRunningJobs += targetCluster.agentQueue.countRunningJobs;
|
|
|
+ countQueuedJobs += targetCluster.agentQueue.countQueuedJobs;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (jobQueue.queueState.length())
|
|
|
+ {
|
|
|
+ const char* queueState = jobQueue.queueState.str();
|
|
|
+ targetCluster.clusterStatusDetails.appendf("queue %s; ", queueState);
|
|
|
+ if (strieq(queueState,"stopped") || strieq(queueState,"paused"))
|
|
|
+ queuePausedOrStopped = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!jobQueue.foundQueueInStatusServer)
|
|
|
+ {
|
|
|
+ if (queuePausedOrStopped)
|
|
|
+ jobQueue.statusType = QueuePausedOrStoppedNotFound;
|
|
|
+ else
|
|
|
+ jobQueue.statusType = QueueRunningNotFound;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (queuePausedOrStopped)
|
|
|
+ {
|
|
|
+ if (jobQueue.countRunningJobs > 0)
|
|
|
+ jobQueue.statusType = QueuePausedOrStoppedWithJobs;
|
|
|
+ else
|
|
|
+ jobQueue.statusType = QueuePausedOrStoppedWithNoJob;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void CWsSMCEx::setClusterStatus(IEspContext& context, CWsSMCTargetCluster& targetCluster, IEspTargetCluster* returnCluster)
|
|
|
+{
|
|
|
+ setClusterQueueStatus(targetCluster);
|
|
|
+
|
|
|
+ int statusType = (targetCluster.clusterQueue.statusType > targetCluster.agentQueue.statusType) ? targetCluster.clusterQueue.statusType
|
|
|
+ : targetCluster.agentQueue.statusType;
|
|
|
+ returnCluster->setClusterStatus(statusType);
|
|
|
+ //Set 'Warning' which may be displayed beside cluster name
|
|
|
+ if (statusType == QueueRunningNotFound)
|
|
|
+ returnCluster->setWarning("Cluster not attached");
|
|
|
+ else if (statusType == QueuePausedOrStoppedNotFound)
|
|
|
+ returnCluster->setWarning("Queue paused or stopped - Cluster not attached");
|
|
|
+ else if (statusType != RunningNormal)
|
|
|
+ returnCluster->setWarning("Queue paused or stopped");
|
|
|
+ //Set 'StatusDetails' which may be displayed when a mouse is moved over cluster icon
|
|
|
+ if (targetCluster.clusterStatusDetails.length())
|
|
|
+ returnCluster->setStatusDetails(targetCluster.clusterStatusDetails.str());
|
|
|
+}
|
|
|
+
|
|
|
+void CWsSMCEx::getTargetClusterAndWUs(IEspContext& context, CConstWUClusterInfoArray& clusters, IConstWUClusterInfo& cluster,
|
|
|
+ IPropertyTree* serverStatusRoot, IPropertyTreeIterator* itrStatusECLagent, IEspTargetCluster* returnCluster,
|
|
|
+ IArrayOf<IEspActiveWorkunit>& aws)
|
|
|
+{
|
|
|
+ CWsSMCTargetCluster targetCluster;
|
|
|
+ cluster.getServerQueue(targetCluster.serverQueue.queueName);
|
|
|
+ targetCluster.clusterType = cluster.getPlatform();
|
|
|
+ returnCluster->setClusterName(cluster.getName(targetCluster.clusterName).str());
|
|
|
+ returnCluster->setClusterType(targetCluster.clusterType);
|
|
|
+ returnCluster->setClusterSize(cluster.getSize());
|
|
|
+
|
|
|
+ //get running WUs on cluster
|
|
|
+ BoolHash uniqueWUIDs;
|
|
|
+ if (targetCluster.clusterType == ThorLCRCluster)
|
|
|
+ {
|
|
|
+ targetCluster.statusServerName.set("ThorMaster");
|
|
|
+ cluster.getThorQueue(targetCluster.clusterQueue.queueName);
|
|
|
+ cluster.getAgentQueue(targetCluster.agentQueue.queueName);
|
|
|
+ const StringArray& processes = cluster.getThorProcesses();
|
|
|
+ ForEachItemIn(i, processes)
|
|
|
+ {
|
|
|
+ const char* process = processes.item(i);
|
|
|
+ if (!process || !*process)
|
|
|
+ continue;
|
|
|
+
|
|
|
+ StringBuffer path;
|
|
|
+ path.appendf("Server[@thorname=\"%s\"][@name=\"%s\"]", process, targetCluster.statusServerName.str());
|
|
|
+ Owned<IPropertyTreeIterator> it(serverStatusRoot->getElements(path));
|
|
|
+ if (!it->first())
|
|
|
+ targetCluster.clusterStatusDetails.appendf("Thor Process %s not attached; ", process);
|
|
|
+ else
|
|
|
+ targetCluster.clusterQueue.foundQueueInStatusServer = true;
|
|
|
+
|
|
|
+ ForEach(*it)
|
|
|
+ readRunningWUsOnServerNode(context, it->query(), targetCluster.clusterName.str(), targetCluster.clusterQueue.countRunningJobs, uniqueWUIDs, aws);
|
|
|
+ }
|
|
|
+
|
|
|
+ //get queued WUs
|
|
|
+ readWUsAndStateFromJobQueue(context, targetCluster, targetCluster.clusterQueue, NULL, aws);
|
|
|
+ returnCluster->setQueueName(targetCluster.clusterQueue.queueName.str());
|
|
|
+ returnCluster->setQueueStatus(targetCluster.clusterQueue.queueState);
|
|
|
+ }
|
|
|
+ else if (targetCluster.clusterType == RoxieCluster)
|
|
|
+ {
|
|
|
+ targetCluster.statusServerName.set("RoxieServer");
|
|
|
+ targetCluster.clusterQueue.foundQueueInStatusServer = foundQueueInStatusServer(context, serverStatusRoot, targetCluster.statusServerName.str(), targetCluster.clusterName.str(), ".roxie");
|
|
|
+ if (!targetCluster.clusterQueue.foundQueueInStatusServer)
|
|
|
+ targetCluster.clusterStatusDetails.appendf("RoxieServer %s not attached; ", targetCluster.clusterName.str());
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ targetCluster.statusServerName.set("HThorServer");
|
|
|
+ cluster.getAgentQueue(targetCluster.agentQueue.queueName);
|
|
|
+ returnCluster->setQueueName(targetCluster.agentQueue.queueName.str());
|
|
|
+ targetCluster.agentQueue.foundQueueInStatusServer = foundQueueInStatusServer(context, serverStatusRoot, targetCluster.statusServerName.str(), targetCluster.clusterName.str(), ".agent");
|
|
|
+ if (!targetCluster.agentQueue.foundQueueInStatusServer)
|
|
|
+ targetCluster.clusterStatusDetails.appendf("ECLAgent %s%s not attached; ", targetCluster.clusterName.str(), ".agent");
|
|
|
+ }
|
|
|
+
|
|
|
+ //get running WUs on Agent Queue
|
|
|
+ if (targetCluster.agentQueue.queueName.length())
|
|
|
+ {
|
|
|
+ StringBuffer path;
|
|
|
+ path.appendf("Server[@name=\"%s\"]", targetCluster.agentQueue.queueName.str());
|
|
|
+ Owned<IPropertyTreeIterator> itr(serverStatusRoot->getElements(path));
|
|
|
+ if (itr->first())
|
|
|
+ {
|
|
|
+ ForEach(*itr)
|
|
|
+ readRunningWUsOnServerNode(context, itr->query(), targetCluster.clusterName.str(), targetCluster.agentQueue.countRunningJobs, uniqueWUIDs, aws);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {//legacy
|
|
|
+ readRunningWUsOnECLAgent(context, itrStatusECLagent, clusters, targetCluster, uniqueWUIDs, aws);
|
|
|
+ }
|
|
|
+
|
|
|
+ //get queued WUs
|
|
|
+ readWUsAndStateFromJobQueue(context, targetCluster, targetCluster.agentQueue, targetCluster.agentQueue.queueName.str(), aws);
|
|
|
+ if (targetCluster.clusterType != ThorLCRCluster)
|
|
|
+ returnCluster->setQueueStatus(targetCluster.agentQueue.queueState);
|
|
|
+ }
|
|
|
+
|
|
|
+ //get running WUs on Server Queue
|
|
|
+ if (targetCluster.serverQueue.queueName.length())
|
|
|
+ readWUsAndStateFromJobQueue(context, targetCluster, targetCluster.serverQueue, targetCluster.serverQueue.queueName.str(), aws);
|
|
|
+
|
|
|
+ setClusterStatus(context, targetCluster, returnCluster);
|
|
|
+ return;
|
|
|
+}
|
|
|
+
|
|
|
+void CWsSMCEx::getWUsNotOnTargetCluster(IEspContext &context, IPropertyTree* serverStatusRoot, IArrayOf<IEspServerJobQueue>& serverJobQueues,
|
|
|
+ IArrayOf<IEspActiveWorkunit>& aws)
|
|
|
+{
|
|
|
+ BoolHash uniqueServers;
|
|
|
+ Owned<IPropertyTreeIterator> it(serverStatusRoot->getElements("Server"));
|
|
|
+ ForEach(*it)
|
|
|
+ {
|
|
|
+ IPropertyTree& serverNode = it->query();
|
|
|
+ const char* serverName = serverNode.queryProp("@name");
|
|
|
+ const char* instance = serverNode.queryProp("@node");
|
|
|
+ if (!serverName || !*serverName || !instance || !*instance)
|
|
|
+ continue;
|
|
|
+
|
|
|
+ bool hasWU = false;
|
|
|
+ StringBuffer queueName;
|
|
|
+ queueName.appendf("%s_on_%s", serverName, instance);
|
|
|
+ Owned<IPropertyTreeIterator> wuids(serverNode.getElements("WorkUnit"));
|
|
|
+ ForEach(*wuids)
|
|
|
+ {
|
|
|
+ const char* wuid=wuids->query().queryProp(NULL);
|
|
|
+ if (!wuid || !*wuid)
|
|
|
+ continue;
|
|
|
+
|
|
|
+ if (isInWuList(aws, wuid))
|
|
|
+ continue;
|
|
|
+
|
|
|
+ Owned<IEspActiveWorkunit> wu;
|
|
|
+ createActiveWorkUnit(wu, context, wuid, NULL, 0, serverName, queueName.str(), instance, NULL);
|
|
|
+ aws.append(*wu.getLink());
|
|
|
+ hasWU = true;
|
|
|
+ }
|
|
|
+ if (hasWU && !uniqueServers.getValue(queueName))
|
|
|
+ {
|
|
|
+ uniqueServers.setValue(queueName, true);
|
|
|
+ addServerJobQueue(serverJobQueues, queueName.str(), serverName, serverName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return;
|
|
|
+}
|
|
|
+
|
|
|
+void CWsSMCEx::readDFUWUs(IEspContext &context, const char* queueName, const char* serverName, IArrayOf<IEspActiveWorkunit>& aws)
|
|
|
+{
|
|
|
+ StringAttrArray wulist;
|
|
|
+ unsigned running = queuedJobs(queueName, wulist);
|
|
|
+ ForEachItemIn(i, wulist)
|
|
|
+ {
|
|
|
+ StringBuffer jname, uname, state, error;
|
|
|
+ const char *wuid = wulist.item(i).text.get();
|
|
|
+ if (i<running)
|
|
|
+ state.set("running");
|
|
|
+ else
|
|
|
+ state.set("queued");
|
|
|
+
|
|
|
+ try
|
|
|
+ {
|
|
|
+ Owned<IConstDFUWorkUnit> dfuwu = getDFUWorkUnitFactory()->openWorkUnit(wuid, false);
|
|
|
+ dfuwu->getUser(uname);
|
|
|
+ dfuwu->getJobName(jname);
|
|
|
+ }
|
|
|
+ catch (IException *e)
|
|
|
+ {
|
|
|
+ e->errorMessage(error);
|
|
|
+ state.appendf(" (%s)", error.str());
|
|
|
+ }
|
|
|
+
|
|
|
+ Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(wuid, uname.str(), jname.str(), state.str(), "normal"));
|
|
|
+ wu->setServer("DFUserver");
|
|
|
+ wu->setInstance(serverName);
|
|
|
+ wu->setQueueName(queueName);
|
|
|
+ aws.append(*wu.getLink());
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void CWsSMCEx::getDFUServersAndWUs(IEspContext &context, IPropertyTree* envRoot, IArrayOf<IEspServerJobQueue>& serverJobQueues, IArrayOf<IEspActiveWorkunit>& aws)
|
|
|
+{
|
|
|
+ if (!envRoot)
|
|
|
+ return;
|
|
|
+
|
|
|
+ VStringBuffer path("Software/%s", eqDfu);
|
|
|
+ Owned<IPropertyTreeIterator> services = envRoot->getElements(path);
|
|
|
+ ForEach(*services)
|
|
|
+ {
|
|
|
+ IPropertyTree &serviceTree = services->query();
|
|
|
+ const char *qname = serviceTree.queryProp("@queue");
|
|
|
+ const char *serverName = serviceTree.queryProp("@name");
|
|
|
+ if (!qname || !*qname)
|
|
|
+ continue;
|
|
|
+
|
|
|
+ StringArray queues;
|
|
|
+ queues.appendListUniq(qname, ",");
|
|
|
+ ForEachItemIn(q, queues)
|
|
|
+ {
|
|
|
+ const char *queueName = queues.item(q);
|
|
|
+ readDFUWUs(context, queueName, serverName, aws);
|
|
|
+ addServerJobQueue(serverJobQueues, queueName, serverName, "DFUserver");
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void CWsSMCEx::getDFURecoveryJobs(IEspContext &context, IArrayOf<IEspDFUJob>& jobs)
|
|
|
+{
|
|
|
+ Owned<IRemoteConnection> conn = querySDS().connect("DFU/RECOVERY",myProcessSession(),0, INFINITE);
|
|
|
+ if (!conn)
|
|
|
+ return;
|
|
|
+
|
|
|
+ Owned<IPropertyTreeIterator> it(conn->queryRoot()->getElements("job"));
|
|
|
+ ForEach(*it)
|
|
|
+ {
|
|
|
+ IPropertyTree &e=it->query();
|
|
|
+ if (!e.getPropBool("Running",false))
|
|
|
+ continue;
|
|
|
+
|
|
|
+ StringBuffer cmd;
|
|
|
+ unsigned done, total;
|
|
|
+ countProgress(&e,done,total);
|
|
|
+ cmd.append(e.queryProp("@command")).append(" ").append(e.queryProp("@command_parameters"));
|
|
|
+
|
|
|
+ Owned<IEspDFUJob> job = new CDFUJob("","");
|
|
|
+ job->setTimeStarted(e.queryProp("@time_started"));
|
|
|
+ job->setDone(done);
|
|
|
+ job->setTotal(total);
|
|
|
+ job->setCommand(cmd.str());
|
|
|
+ jobs.append(*job.getLink());
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// This method reads job information from both /Status/Servers and IJobQueue.
|
|
|
//
|
|
|
// Each server component (a thor cluster, a dfuserver, or an eclagent) is one 'Server' branch under
|
|
@@ -843,6 +1370,7 @@ bool CWsSMCEx::onActivity(IEspContext &context, IEspActivityRequest &req, IEspAc
|
|
|
#endif
|
|
|
if(isSuperUser && req.getFromSubmitBtn())
|
|
|
readBannerAndChatRequest(context, req, resp);
|
|
|
+
|
|
|
if (version >= 1.12)
|
|
|
resp.setSuperUser(isSuperUser);
|
|
|
if (version >= 1.06)
|
|
@@ -855,7 +1383,56 @@ bool CWsSMCEx::onActivity(IEspContext &context, IEspActivityRequest &req, IEspAc
|
|
|
|
|
|
CConstWUClusterInfoArray clusters;
|
|
|
getEnvironmentClusterInfo(envRoot, clusters);
|
|
|
- getServersAndWUs(context, req, resp, version, envRoot, clusters);
|
|
|
+
|
|
|
+ if (version >= 1.16)
|
|
|
+ {
|
|
|
+ IArrayOf<IEspTargetCluster> ThorClusters;
|
|
|
+ IArrayOf<IEspTargetCluster> HThorClusters;
|
|
|
+ IArrayOf<IEspTargetCluster> RoxieClusters;
|
|
|
+ IArrayOf<IEspServerJobQueue> serverJobQueues;
|
|
|
+ IArrayOf<IEspActiveWorkunit> aws;
|
|
|
+ IArrayOf<IEspDFUJob> DFURecoveryJobs;
|
|
|
+
|
|
|
+ const char* sortBy = req.getSortBy();
|
|
|
+ bool descending = req.getDescending();
|
|
|
+
|
|
|
+ Owned<IRemoteConnection> conn = querySDS().connect("/Status/Servers",myProcessSession(),RTM_LOCK_READ,30000);
|
|
|
+ IPropertyTree* serverStatusRoot = conn->queryRoot();
|
|
|
+ Owned<IPropertyTreeIterator> itrStatusECLagent(serverStatusRoot->getElements("Server[@name=\"ECLagent\"]"));
|
|
|
+ ForEachItemIn(c, clusters)
|
|
|
+ {
|
|
|
+ IConstWUClusterInfo &cluster = clusters.item(c);
|
|
|
+ IEspTargetCluster* returnCluster = new CTargetCluster("","");
|
|
|
+ getTargetClusterAndWUs(context, clusters, cluster, serverStatusRoot, itrStatusECLagent, returnCluster, aws);
|
|
|
+ if (cluster.getPlatform() == ThorLCRCluster)
|
|
|
+ ThorClusters.append(*returnCluster);
|
|
|
+ else if (cluster.getPlatform() == RoxieCluster)
|
|
|
+ RoxieClusters.append(*returnCluster);
|
|
|
+ else
|
|
|
+ HThorClusters.append(*returnCluster);
|
|
|
+ }
|
|
|
+ sortTargetClusters(ThorClusters, sortBy, descending);
|
|
|
+ sortTargetClusters(RoxieClusters, sortBy, descending);
|
|
|
+ getWUsNotOnTargetCluster(context, serverStatusRoot, serverJobQueues, aws);
|
|
|
+ getDFUServersAndWUs(context, envRoot, serverJobQueues, aws);
|
|
|
+ getDFURecoveryJobs(context, DFURecoveryJobs);
|
|
|
+
|
|
|
+ SecAccessFlags access;
|
|
|
+ if (context.authorizeFeature(THORQUEUE_FEATURE, access) && access>=SecAccess_Full)
|
|
|
+ resp.setAccessRight("Access_Full");
|
|
|
+ resp.setSortBy(sortBy);
|
|
|
+ resp.setDescending(descending);
|
|
|
+ resp.setThorClusterList(ThorClusters);
|
|
|
+ resp.setRoxieClusterList(RoxieClusters);
|
|
|
+ resp.setHThorClusterList(HThorClusters);
|
|
|
+ resp.setServerJobQueues(serverJobQueues);
|
|
|
+ resp.setRunning(aws);
|
|
|
+ resp.setDFUJobs(DFURecoveryJobs);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {//for backward compatible
|
|
|
+ getServersAndWUs(context, req, resp, version, envRoot, clusters);
|
|
|
+ }
|
|
|
}
|
|
|
catch(IException* e)
|
|
|
{
|