|
@@ -582,10 +582,10 @@ void CWsSMCEx::getServersAndWUs(IEspContext &context, IEspActivityRequest &req,
|
|
|
const char *queueName = cluster.getThorQueue(str).str();
|
|
|
returnCluster->setQueueName(queueName);
|
|
|
|
|
|
- StringBuffer queueState;
|
|
|
+ StringBuffer queueState, queueStateDetails;
|
|
|
CJobQueueContents contents;
|
|
|
Owned<IJobQueue> queue = createJobQueue(queueName);
|
|
|
- queue->copyItemsAndState(contents, queueState);
|
|
|
+ queue->copyItemsAndState(contents, queueState, queueStateDetails);
|
|
|
addQueuedWorkUnits(queueName, contents, aws, context, "ThorMaster", NULL);
|
|
|
|
|
|
BulletType bulletType = bulletGreen;
|
|
@@ -593,12 +593,12 @@ void CWsSMCEx::getServersAndWUs(IEspContext &context, IEspActivityRequest &req,
|
|
|
int numRunningJobsInQueue = (NotFound != serverID) ? runningJobsInQueue[serverID] : -1;
|
|
|
getQueueState(numRunningJobsInQueue, queueState, bulletType);
|
|
|
|
|
|
- StringBuffer agentQueueState;
|
|
|
+ StringBuffer agentQueueState, agentQueueStateDetails;
|
|
|
CJobQueueContents agentContents;
|
|
|
SCMStringBuffer str1;
|
|
|
const char *agentQueueName = cluster.getAgentQueue(str1).str();
|
|
|
Owned<IJobQueue> agentQueue = createJobQueue(agentQueueName);
|
|
|
- agentQueue->copyItemsAndState(agentContents, agentQueueState);
|
|
|
+ agentQueue->copyItemsAndState(agentContents, agentQueueState, agentQueueStateDetails);
|
|
|
//Use the same 'queueName' because the job belongs to the same cluster
|
|
|
addQueuedWorkUnits(queueName, agentContents, aws, context, "ThorMaster", NULL);
|
|
|
if (bulletType == bulletGreen)
|
|
@@ -628,10 +628,10 @@ void CWsSMCEx::getServersAndWUs(IEspContext &context, IEspActivityRequest &req,
|
|
|
returnCluster->setQueueName(cluster.getAgentQueue(str).str());
|
|
|
str.clear();
|
|
|
const char *queueName = cluster.getAgentQueue(str).str();
|
|
|
- StringBuffer queueState;
|
|
|
+ StringBuffer queueState, queueStateDetails;
|
|
|
CJobQueueContents contents;
|
|
|
Owned<IJobQueue> queue = createJobQueue(queueName);
|
|
|
- queue->copyItemsAndState(contents, queueState);
|
|
|
+ queue->copyItemsAndState(contents, queueState, queueStateDetails);
|
|
|
addQueuedWorkUnits(queueName, contents, aws, context, "RoxieServer", NULL);
|
|
|
|
|
|
BulletType bulletType = bulletGreen;
|
|
@@ -655,10 +655,10 @@ void CWsSMCEx::getServersAndWUs(IEspContext &context, IEspActivityRequest &req,
|
|
|
returnCluster->setQueueName(cluster.getAgentQueue(str).str());
|
|
|
str.clear();
|
|
|
const char *queueName = cluster.getAgentQueue(str).str();
|
|
|
- StringBuffer queueState;
|
|
|
+ StringBuffer queueState, queueStateDetails;
|
|
|
CJobQueueContents contents;
|
|
|
Owned<IJobQueue> queue = createJobQueue(queueName);
|
|
|
- queue->copyItemsAndState(contents, queueState);
|
|
|
+ queue->copyItemsAndState(contents, queueState, queueStateDetails);
|
|
|
addQueuedWorkUnits(queueName, contents, aws, context, "HThorServer", NULL);
|
|
|
|
|
|
BulletType bulletType = bulletGreen;
|
|
@@ -706,11 +706,11 @@ void CWsSMCEx::getServersAndWUs(IEspContext &context, IEspActivityRequest &req,
|
|
|
targetClusters->str(targetCluster);
|
|
|
|
|
|
StringBuffer queueName;
|
|
|
- StringBuffer queueState;
|
|
|
+ StringBuffer queueState, queueStateDetails;
|
|
|
CJobQueueContents contents;
|
|
|
getClusterEclCCServerQueueName(queueName, targetCluster.str());
|
|
|
Owned<IJobQueue> queue = createJobQueue(queueName);
|
|
|
- queue->copyItemsAndState(contents, queueState);
|
|
|
+ queue->copyItemsAndState(contents, queueState, queueStateDetails);
|
|
|
unsigned count=0;
|
|
|
Owned<IJobQueueIterator> iter = contents.getIterator();
|
|
|
ForEach(*iter)
|
|
@@ -726,7 +726,7 @@ void CWsSMCEx::getServersAndWUs(IEspContext &context, IEspActivityRequest &req,
|
|
|
aws.append(*wu.getLink());
|
|
|
}
|
|
|
|
|
|
- addServerJobQueue(serverJobQueues, queueName, queueState.str(), serverName, "ECLCCserver");
|
|
|
+ addServerJobQueue(version, serverJobQueues, queueName, serverName, "ECLCCserver", queueState.str(), queueStateDetails.str());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -803,7 +803,7 @@ void CWsSMCEx::getServersAndWUs(IEspContext &context, IEspActivityRequest &req,
|
|
|
e->Release();
|
|
|
}
|
|
|
}
|
|
|
- addServerJobQueue(serverJobQueues, queueName, serverName, "DFUserver");
|
|
|
+ addServerJobQueue(version, serverJobQueues, queueName, serverName, "DFUserver");
|
|
|
}
|
|
|
}
|
|
|
} while (services->next());
|
|
@@ -872,7 +872,7 @@ void CWsSMCEx::readWUsAndStateFromJobQueue(IEspContext& context, CWsSMCTargetClu
|
|
|
{
|
|
|
CJobQueueContents contents;
|
|
|
Owned<IJobQueue> queue = createJobQueue(jobQueue.queueName.str());
|
|
|
- queue->copyItemsAndState(contents, jobQueue.queueState);
|
|
|
+ queue->copyItemsAndState(contents, jobQueue.queueState, jobQueue.queueStateDetails);
|
|
|
Owned<IJobQueueIterator> iter = contents.getIterator();
|
|
|
jobQueue.countQueuedJobs=0;
|
|
|
ForEach(*iter)
|
|
@@ -1081,7 +1081,11 @@ void CWsSMCEx::setClusterQueueStatus(CWsSMCTargetCluster& targetCluster)
|
|
|
if (jobQueue.queueState.length())
|
|
|
{
|
|
|
const char* queueState = jobQueue.queueState.str();
|
|
|
- targetCluster.clusterStatusDetails.appendf("queue %s; ", queueState);
|
|
|
+ const char* queueStateDetails = jobQueue.queueStateDetails.str();
|
|
|
+ if (queueStateDetails && *queueStateDetails)
|
|
|
+ targetCluster.clusterStatusDetails.appendf("queue %s; %s;", queueState, queueStateDetails);
|
|
|
+ else
|
|
|
+ targetCluster.clusterStatusDetails.appendf("queue %s; ", queueState);
|
|
|
if (strieq(queueState,"stopped") || strieq(queueState,"paused"))
|
|
|
queuePausedOrStopped = true;
|
|
|
}
|
|
@@ -1128,6 +1132,7 @@ void CWsSMCEx::getTargetClusterAndWUs(IEspContext& context, CConstWUClusterInfoA
|
|
|
IPropertyTree* serverStatusRoot, IPropertyTreeIterator* itrStatusECLagent, IEspTargetCluster* returnCluster,
|
|
|
IArrayOf<IEspActiveWorkunit>& aws)
|
|
|
{
|
|
|
+ double version = context.getClientVersion();
|
|
|
CWsSMCTargetCluster targetCluster;
|
|
|
cluster.getServerQueue(targetCluster.serverQueue.queueName);
|
|
|
targetCluster.clusterType = cluster.getPlatform();
|
|
@@ -1169,8 +1174,10 @@ void CWsSMCEx::getTargetClusterAndWUs(IEspContext& context, CConstWUClusterInfoA
|
|
|
else if (targetCluster.clusterType == RoxieCluster)
|
|
|
{
|
|
|
targetCluster.statusServerName.set("RoxieServer");
|
|
|
- targetCluster.clusterQueue.foundQueueInStatusServer = foundQueueInStatusServer(context, serverStatusRoot, targetCluster.statusServerName.str(), targetCluster.clusterName.str(), ".roxie");
|
|
|
- if (!targetCluster.clusterQueue.foundQueueInStatusServer)
|
|
|
+ cluster.getAgentQueue(targetCluster.agentQueue.queueName);
|
|
|
+ returnCluster->setQueueName(targetCluster.agentQueue.queueName.str());
|
|
|
+ targetCluster.agentQueue.foundQueueInStatusServer = foundQueueInStatusServer(context, serverStatusRoot, targetCluster.statusServerName.str(), targetCluster.clusterName.str(), ".roxie");
|
|
|
+ if (!targetCluster.agentQueue.foundQueueInStatusServer)
|
|
|
targetCluster.clusterStatusDetails.appendf("RoxieServer %s not attached; ", targetCluster.clusterName.str());
|
|
|
}
|
|
|
else
|
|
@@ -1216,6 +1223,7 @@ void CWsSMCEx::getTargetClusterAndWUs(IEspContext& context, CConstWUClusterInfoA
|
|
|
void CWsSMCEx::getWUsNotOnTargetCluster(IEspContext &context, IPropertyTree* serverStatusRoot, IArrayOf<IEspServerJobQueue>& serverJobQueues,
|
|
|
IArrayOf<IEspActiveWorkunit>& aws)
|
|
|
{
|
|
|
+ double version = context.getClientVersion();
|
|
|
BoolHash uniqueServers;
|
|
|
Owned<IPropertyTreeIterator> it(serverStatusRoot->getElements("Server"));
|
|
|
ForEach(*it)
|
|
@@ -1247,7 +1255,7 @@ void CWsSMCEx::getWUsNotOnTargetCluster(IEspContext &context, IPropertyTree* ser
|
|
|
if (hasWU && !uniqueServers.getValue(queueName))
|
|
|
{
|
|
|
uniqueServers.setValue(queueName, true);
|
|
|
- addServerJobQueue(serverJobQueues, queueName.str(), serverName, serverName);
|
|
|
+ addServerJobQueue(version, serverJobQueues, queueName.str(), serverName, serverName);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1293,6 +1301,8 @@ void CWsSMCEx::getDFUServersAndWUs(IEspContext &context, IPropertyTree* envRoot,
|
|
|
if (!envRoot)
|
|
|
return;
|
|
|
|
|
|
+ double version = context.getClientVersion();
|
|
|
+
|
|
|
VStringBuffer path("Software/%s", eqDfu);
|
|
|
Owned<IPropertyTreeIterator> services = envRoot->getElements(path);
|
|
|
ForEach(*services)
|
|
@@ -1309,7 +1319,7 @@ void CWsSMCEx::getDFUServersAndWUs(IEspContext &context, IPropertyTree* envRoot,
|
|
|
{
|
|
|
const char *queueName = queues.item(q);
|
|
|
readDFUWUs(context, queueName, serverName, aws);
|
|
|
- addServerJobQueue(serverJobQueues, queueName, serverName, "DFUserver");
|
|
|
+ addServerJobQueue(version, serverJobQueues, queueName, serverName, "DFUserver");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1451,45 +1461,54 @@ bool CWsSMCEx::onActivity(IEspContext &context, IEspActivityRequest &req, IEspAc
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
-void CWsSMCEx::addServerJobQueue(IArrayOf<IEspServerJobQueue>& jobQueues, const char* queueName, const char* serverName, const char* serverType)
|
|
|
+void CWsSMCEx::addServerJobQueue(double version, IArrayOf<IEspServerJobQueue>& jobQueues, const char* queueName, const char* serverName, const char* serverType)
|
|
|
{
|
|
|
if (!queueName || !*queueName || !serverName || !*serverName || !serverType || !*serverType)
|
|
|
return;
|
|
|
|
|
|
- Owned<IEspServerJobQueue> jobQueue = createServerJobQueue("", "");
|
|
|
- jobQueue->setQueueName(queueName);
|
|
|
- jobQueue->setServerName(serverName);
|
|
|
- jobQueue->setServerType(serverType);
|
|
|
-
|
|
|
+ StringBuffer queueState;
|
|
|
+ StringBuffer queueStateDetails;
|
|
|
Owned<IJobQueue> queue = createJobQueue(queueName);
|
|
|
- if (queue->stopped())
|
|
|
- jobQueue->setQueueStatus("stopped");
|
|
|
- else if (queue->paused())
|
|
|
- jobQueue->setQueueStatus("paused");
|
|
|
+ if (queue->stopped(queueStateDetails))
|
|
|
+ queueState.set("stopped");
|
|
|
+ else if (queue->paused(queueStateDetails))
|
|
|
+ queueState.set("paused");
|
|
|
else
|
|
|
- jobQueue->setQueueStatus("running");
|
|
|
-
|
|
|
- jobQueues.append(*jobQueue.getClear());
|
|
|
+ queueState.set("running");
|
|
|
+ addServerJobQueue(version, jobQueues, queueName, serverName, serverType, queueState.str(), queueStateDetails.str());
|
|
|
}
|
|
|
|
|
|
-void CWsSMCEx::addServerJobQueue(IArrayOf<IEspServerJobQueue>& jobQueues, const char* queueName, const char* queueState, const char* serverName, const char* serverType)
|
|
|
+void CWsSMCEx::addServerJobQueue(double version, IArrayOf<IEspServerJobQueue>& jobQueues, const char* queueName, const char* serverName, const char* serverType, const char* queueState, const char* queueStateDetails)
|
|
|
{
|
|
|
if (!queueName || !*queueName || !serverName || !*serverName || !serverType || !*serverType)
|
|
|
return;
|
|
|
|
|
|
+ if (!queueState || !*queueState)
|
|
|
+ queueState = "running";
|
|
|
+
|
|
|
Owned<IEspServerJobQueue> jobQueue = createServerJobQueue("", "");
|
|
|
jobQueue->setQueueName(queueName);
|
|
|
jobQueue->setServerName(serverName);
|
|
|
jobQueue->setServerType(serverType);
|
|
|
-
|
|
|
- if (queueState && (strieq(queueState,"stopped") || strieq(queueState,"paused")))
|
|
|
- jobQueue->setQueueStatus(queueState);
|
|
|
- else
|
|
|
- jobQueue->setQueueStatus("running");
|
|
|
+ setServerJobQueueStatus(version, jobQueue, queueState, queueStateDetails);
|
|
|
|
|
|
jobQueues.append(*jobQueue.getClear());
|
|
|
}
|
|
|
|
|
|
+void CWsSMCEx::setServerJobQueueStatus(double version, IEspServerJobQueue* jobQueue, const char* status, const char* details)
|
|
|
+{
|
|
|
+ if (!status || !*status)
|
|
|
+ return;
|
|
|
+ StringBuffer queueState;
|
|
|
+ if (details && *details)
|
|
|
+ queueState.appendf("queue %s; %s;", status, details);
|
|
|
+ else
|
|
|
+ queueState.appendf("queue %s;", status);
|
|
|
+ jobQueue->setQueueStatus(status);
|
|
|
+ if (version >= 1.17)
|
|
|
+ jobQueue->setStatusDetails(queueState.str());
|
|
|
+}
|
|
|
+
|
|
|
void CWsSMCEx::addToThorClusterList(IArrayOf<IEspThorCluster>& clusters, IEspThorCluster* cluster, const char* sortBy, bool descending)
|
|
|
{
|
|
|
if (clusters.length() < 1)
|
|
@@ -1767,8 +1786,9 @@ bool CWsSMCEx::onStopQueue(IEspContext &context, IEspSMCQueueRequest &req, IEspS
|
|
|
{
|
|
|
checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
|
|
|
|
|
|
+ StringBuffer info;
|
|
|
Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
|
|
|
- queue->stop();
|
|
|
+ queue->stop(createQueueActionInfo(context, "stopped", req, info));
|
|
|
AccessSuccess(context, "Stopped queue %s",req.getCluster());
|
|
|
resp.setRedirectUrl("/WsSMC/");
|
|
|
}
|
|
@@ -1785,8 +1805,9 @@ bool CWsSMCEx::onResumeQueue(IEspContext &context, IEspSMCQueueRequest &req, IEs
|
|
|
{
|
|
|
checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
|
|
|
|
|
|
+ StringBuffer info;
|
|
|
Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
|
|
|
- queue->resume();
|
|
|
+ queue->resume(createQueueActionInfo(context, "resumed", req, info));
|
|
|
AccessSuccess(context, "Resumed queue %s",req.getCluster());
|
|
|
resp.setRedirectUrl("/WsSMC/");
|
|
|
}
|
|
@@ -1797,14 +1818,32 @@ bool CWsSMCEx::onResumeQueue(IEspContext &context, IEspSMCQueueRequest &req, IEs
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
+const char* CWsSMCEx::createQueueActionInfo(IEspContext &context, const char* state, IEspSMCQueueRequest &req, StringBuffer& info)
|
|
|
+{
|
|
|
+ StringBuffer peer, currentTime;
|
|
|
+ context.getPeer(peer);
|
|
|
+ const char* userId = context.queryUserId();
|
|
|
+ if (!userId || !*userId)
|
|
|
+ userId = "Unknown user";
|
|
|
+ CDateTime now;
|
|
|
+ now.setNow();
|
|
|
+ now.getString(currentTime);
|
|
|
+ info.appendf("%s by <%s> at <%s> from <%s>", state, userId, currentTime.str(), peer.str());
|
|
|
+ const char* comment = req.getComment();
|
|
|
+ if (comment && *comment)
|
|
|
+ info.append(": ' ").append(comment).append("'");
|
|
|
+ return info.str();
|
|
|
+}
|
|
|
+
|
|
|
bool CWsSMCEx::onPauseQueue(IEspContext &context, IEspSMCQueueRequest &req, IEspSMCQueueResponse &resp)
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
|
|
|
|
|
|
+ StringBuffer info;
|
|
|
Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
|
|
|
- queue->pause();
|
|
|
+ queue->pause(createQueueActionInfo(context, "paused", req, info));
|
|
|
AccessSuccess(context, "Paused queue %s",req.getCluster());
|
|
|
resp.setRedirectUrl("/WsSMC/");
|
|
|
}
|
|
@@ -1886,10 +1925,11 @@ bool CWsSMCEx::onGetThorQueueAvailability(IEspContext &context, IEspGetThorQueue
|
|
|
returnCluster->setClusterName(targetName);
|
|
|
returnCluster->setQueueName(queueName);
|
|
|
|
|
|
+ StringBuffer info;
|
|
|
Owned<IJobQueue> queue = createJobQueue(queueName);
|
|
|
- if(queue->stopped())
|
|
|
+ if(queue->stopped(info))
|
|
|
returnCluster->setQueueStatus("stopped");
|
|
|
- else if (queue->paused())
|
|
|
+ else if (queue->paused(info))
|
|
|
returnCluster->setQueueStatus("paused");
|
|
|
else
|
|
|
returnCluster->setQueueStatus("running");
|