|
@@ -60,42 +60,10 @@ void AccessFailure(IEspContext& context, char const * msg,...)
|
|
|
|
|
|
struct QueueWrapper
|
|
|
{
|
|
|
- QueueWrapper(const char* cluster)
|
|
|
+ QueueWrapper(const char* targetName, const char* queueName)
|
|
|
{
|
|
|
StringBuffer name;
|
|
|
- name<<cluster<<".thor";
|
|
|
- queue.setown(createJobQueue(name.str()));
|
|
|
- }
|
|
|
-
|
|
|
- QueueWrapper(int clusterType, const char* cluster)
|
|
|
- {
|
|
|
- if (!cluster || !*cluster)
|
|
|
- return;
|
|
|
-
|
|
|
- const char* type = eqThorCluster;
|
|
|
- if (clusterType < 1)
|
|
|
- type = eqRoxieCluster;
|
|
|
-
|
|
|
- CTpWrapper dummy;
|
|
|
- IArrayOf<IEspTpLogicalCluster> clusters;
|
|
|
- dummy.getTargetClusterList(clusters, type, cluster);
|
|
|
- if (clusters.length() < 1)
|
|
|
- return;
|
|
|
-
|
|
|
- IEspTpLogicalCluster &cluster0 = clusters.item(0);
|
|
|
- const char *name0 = cluster0.getName();
|
|
|
- if (!name0 || !*name0)
|
|
|
- return;
|
|
|
-
|
|
|
- StringBuffer name;
|
|
|
- name<<name0<<".thor";
|
|
|
- queue.setown(createJobQueue(name.str()));
|
|
|
- }
|
|
|
-
|
|
|
- QueueWrapper(const char* clusterName, const char* queueName)
|
|
|
- {
|
|
|
- StringBuffer name;
|
|
|
- name<<clusterName<<"."<<queueName;
|
|
|
+ name<<targetName<<"."<<queueName;
|
|
|
queue.setown(createJobQueue(name.str()));
|
|
|
}
|
|
|
|
|
@@ -206,7 +174,7 @@ static int stringcmp(const char **a, const char **b)
|
|
|
return strcmp(*a, *b);
|
|
|
}
|
|
|
|
|
|
-bool CWsSMCEx::isInWuList(IArrayOf<IEspActiveWorkunit>& aws, const char* wuid)
|
|
|
+bool isInWuList(IArrayOf<IEspActiveWorkunit>& aws, const char* wuid)
|
|
|
{
|
|
|
bool bFound = false;
|
|
|
if (wuid && *wuid && (aws.length() > 0))
|
|
@@ -227,6 +195,80 @@ bool CWsSMCEx::isInWuList(IArrayOf<IEspActiveWorkunit>& aws, const char* wuid)
|
|
|
return bFound;
|
|
|
}
|
|
|
|
|
|
+void addQueuedWorkUnits(const char *queueName, IJobQueue *queue, IArrayOf<IEspActiveWorkunit> &aws, IEspContext &context, const char *serverName, const char *instanceName)
|
|
|
+{
|
|
|
+ CJobQueueContents contents;
|
|
|
+ queue->copyItems(contents);
|
|
|
+ Owned<IJobQueueIterator> iter = contents.getIterator();
|
|
|
+ unsigned count=0;
|
|
|
+ ForEach(*iter)
|
|
|
+ {
|
|
|
+ if (!isInWuList(aws, iter->query().queryWUID()))
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(context, iter->query().queryWUID(),++count));
|
|
|
+ wu->setServer(serverName);
|
|
|
+ wu->setInstance(instanceName); // JCSMORE In thor case at least, if queued it is unknown which instance it will run on..
|
|
|
+ wu->setQueueName(queueName);
|
|
|
+
|
|
|
+ aws.append(*wu.getLink());
|
|
|
+ }
|
|
|
+ catch (IException *e)
|
|
|
+ {
|
|
|
+ // JCSMORE->KWang what is this handling? Why would this succeeed and above fail?
|
|
|
+ StringBuffer msg;
|
|
|
+ Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(iter->query().queryWUID(), "", "", e->errorMessage(msg).str(), "normal"));
|
|
|
+ wu->setServer(serverName);
|
|
|
+ wu->setInstance(instanceName);
|
|
|
+ wu->setQueueName(queueName);
|
|
|
+
|
|
|
+ aws.append(*wu.getLink());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+const char *getQueueState(IJobQueue *queue, int runningJobsInQueue, int *colorTypePtr)
|
|
|
+{
|
|
|
+ int qStatus = 1;
|
|
|
+ const char *queueState = NULL;
|
|
|
+ if (queue->stopped())
|
|
|
+ {
|
|
|
+ queueState = "stopped";
|
|
|
+ qStatus = 3;
|
|
|
+ }
|
|
|
+ else if (queue->paused())
|
|
|
+ {
|
|
|
+ queueState = "paused";
|
|
|
+ qStatus = 2;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ queueState = "running";
|
|
|
+ if (NULL != colorTypePtr)
|
|
|
+ {
|
|
|
+ int &color_type = *colorTypePtr;
|
|
|
+ color_type = 6;
|
|
|
+ if (NotFound == runningJobsInQueue)
|
|
|
+ {
|
|
|
+ if (qStatus > 1)
|
|
|
+ color_type = 3;
|
|
|
+ else
|
|
|
+ color_type = 5;
|
|
|
+ }
|
|
|
+ else if (runningJobsInQueue > 0)
|
|
|
+ {
|
|
|
+ if (qStatus > 1)
|
|
|
+ color_type = 1;
|
|
|
+ else
|
|
|
+ color_type = 4;
|
|
|
+ }
|
|
|
+ else if (qStatus > 1)
|
|
|
+ color_type = 2;
|
|
|
+ }
|
|
|
+ return queueState;
|
|
|
+}
|
|
|
+
|
|
|
bool CWsSMCEx::onActivity(IEspContext &context, IEspActivityRequest &req, IEspActivityResponse& resp)
|
|
|
{
|
|
|
context.validateFeatureAccess(FEATURE_URL, SecAccess_Read, true);
|
|
@@ -321,8 +363,7 @@ bool CWsSMCEx::onActivity(IEspContext &context, IEspActivityRequest &req, IEspAc
|
|
|
|
|
|
Owned<IRemoteConnection> conn = querySDS().connect("/Status/Servers",myProcessSession(),RTM_LOCK_READ,30000);
|
|
|
|
|
|
- StringBuffer runningQueueNames[256];
|
|
|
- int runningQueues = 0;
|
|
|
+ StringArray runningQueueNames;
|
|
|
int runningJobsInQueue[256];
|
|
|
for (int i = 0; i < 256; i++)
|
|
|
runningJobsInQueue[i] = 0;
|
|
@@ -339,58 +380,37 @@ bool CWsSMCEx::onActivity(IEspContext &context, IEspActivityRequest &req, IEspAc
|
|
|
int serverID = -1;
|
|
|
IPropertyTree& node = it->query();
|
|
|
const char* name = node.queryProp("@name");
|
|
|
- if(node.hasProp("@queue"))
|
|
|
+ if (name && *name)
|
|
|
{
|
|
|
- const char* queue=node.queryProp("@queue");
|
|
|
- const char* thor=strstr(queue,".thor");
|
|
|
- if(thor)
|
|
|
+ if (0 == stricmp("ThorMaster", name))
|
|
|
{
|
|
|
- qname.append(thor-queue,queue);
|
|
|
+ node.getProp("@queue", qname);
|
|
|
node.getProp("@thorname",instance);
|
|
|
}
|
|
|
- else
|
|
|
- qname.append(queue);
|
|
|
- }
|
|
|
- else if (name && !stricmp(name, "ECLAgent"))
|
|
|
- {
|
|
|
- qname.append(name);
|
|
|
- }
|
|
|
- if((instance.length()==0)&& name && *name)
|
|
|
- {
|
|
|
- instance.append( !strcmp(name, "ECLagent") ? "ECL agent" : name);
|
|
|
- instance.append(" on ").append(node.queryProp("@node"));
|
|
|
+ else if (0 == stricmp(name, "ECLAgent"))
|
|
|
+ {
|
|
|
+ qname.append(name);
|
|
|
+ }
|
|
|
+ if ((instance.length()==0))
|
|
|
+ {
|
|
|
+ instance.append( !strcmp(name, "ECLagent") ? "ECL agent" : name);
|
|
|
+ instance.append(" on ").append(node.queryProp("@node"));
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
if (qname.length() > 0)
|
|
|
{
|
|
|
- int i = 0;
|
|
|
- bool bFound = false;
|
|
|
- while (i < runningQueues)
|
|
|
+ serverID = runningQueueNames.find(qname);
|
|
|
+ if (NotFound == serverID)
|
|
|
{
|
|
|
- const char* serverName = runningQueueNames[i].str();
|
|
|
- if (serverName && !stricmp(serverName, qname.str()))
|
|
|
- {
|
|
|
- bFound = true;
|
|
|
- serverID = i;
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- i++;
|
|
|
- }
|
|
|
-
|
|
|
- if (!bFound)
|
|
|
- {
|
|
|
- runningQueueNames[runningQueues] = qname;
|
|
|
- runningQueues++;
|
|
|
- serverID = runningQueues - 1;
|
|
|
+ serverID = runningQueueNames.ordinality(); // i.e. last
|
|
|
+ runningQueueNames.append(qname);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
Owned<IPropertyTreeIterator> wuids(node.getElements("WorkUnit"));
|
|
|
- ForEach(*wuids)
|
|
|
+ ForEach(*wuids)
|
|
|
{
|
|
|
const char* wuid=wuids->query().queryProp(NULL);
|
|
|
- if(!wuid)
|
|
|
+ if (!wuid)
|
|
|
continue;
|
|
|
|
|
|
try
|
|
@@ -449,200 +469,64 @@ bool CWsSMCEx::onActivity(IEspContext &context, IEspActivityRequest &req, IEspAc
|
|
|
SecAccessFlags access;
|
|
|
bool doCommand=(context.authorizeFeature(THORQUEUE_FEATURE, access) && access>=SecAccess_Full);
|
|
|
|
|
|
- CTpWrapper dummy;
|
|
|
- IArrayOf<IEspTpCluster> clusters;
|
|
|
- dummy.getClusterProcessList(eqThorCluster,clusters,true);
|
|
|
-
|
|
|
IArrayOf<IEspThorCluster> ThorClusters;
|
|
|
- ForEachItemIn(x, clusters)
|
|
|
- {
|
|
|
- IEspTpCluster& cluster = clusters.item(x);
|
|
|
- IEspThorCluster* returnCluster = new CThorCluster("","");
|
|
|
-
|
|
|
- returnCluster->setClusterName(cluster.getName());
|
|
|
- returnCluster->setQueueName(cluster.getQueueName());
|
|
|
-
|
|
|
- if (version > 1.08)
|
|
|
- {
|
|
|
- bool bThorLCR = dummy.getClusterLCR(eqThorCluster, cluster.getName());
|
|
|
- if (bThorLCR)
|
|
|
- returnCluster->setThorLCR("withLCR");
|
|
|
- else
|
|
|
- returnCluster->setThorLCR("noLCR");
|
|
|
- }
|
|
|
+ IArrayOf<IEspRoxieCluster> RoxieClusters;
|
|
|
|
|
|
- int i = 0;
|
|
|
- int serverID = -1;
|
|
|
- const char* queueName = cluster.getQueueName();
|
|
|
- if (queueName && (runningQueues > 0))
|
|
|
- {
|
|
|
- for (int i = 0; i < runningQueues; i++)
|
|
|
- {
|
|
|
- const char* serverName = runningQueueNames[i].str();
|
|
|
- if (serverName && !stricmp(serverName, queueName))
|
|
|
- {
|
|
|
- serverID = i;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- IArrayOf<IEspTpLogicalCluster> clusters1;
|
|
|
- dummy.getTargetClusterList(clusters1, eqThorCluster, cluster.getName());
|
|
|
- const char* queuename1 = cluster.getQueueName();
|
|
|
- if (clusters1.length() > 0)
|
|
|
- {
|
|
|
- IEspTpLogicalCluster& logicalCluster = clusters1.item(0);
|
|
|
- queuename1 = logicalCluster.getName();
|
|
|
- }
|
|
|
-
|
|
|
- QueueWrapper queue(queuename1);
|
|
|
- CJobQueueContents contents;
|
|
|
- queue->copyItems(contents);
|
|
|
- Owned<IJobQueueIterator> iter = contents.getIterator();
|
|
|
- unsigned count=0;
|
|
|
- ForEach(*iter)
|
|
|
- {
|
|
|
- if (!isInWuList(aws, iter->query().queryWUID()))
|
|
|
- {
|
|
|
- try
|
|
|
- {
|
|
|
- Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(context, iter->query().queryWUID(),++count));
|
|
|
- wu->setServer("ThorMaster");
|
|
|
- wu->setInstance(cluster.getName());
|
|
|
- wu->setQueueName(cluster.getQueueName());
|
|
|
-
|
|
|
- aws.append(*wu.getLink());
|
|
|
- }
|
|
|
- catch (IException *e)
|
|
|
- {
|
|
|
- StringBuffer msg;
|
|
|
- Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(iter->query().queryWUID(), "", "", e->errorMessage(msg).str(), "normal"));
|
|
|
- wu->setServer("ThorMaster");
|
|
|
- wu->setInstance(cluster.getName());
|
|
|
- wu->setQueueName(cluster.getQueueName());
|
|
|
-
|
|
|
- aws.append(*wu.getLink());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- int qStatus = 1;
|
|
|
- if(queue->stopped())
|
|
|
- {
|
|
|
- returnCluster->setQueueStatus("stopped");
|
|
|
- qStatus = 3;
|
|
|
- }
|
|
|
- else if (queue->paused())
|
|
|
- {
|
|
|
- returnCluster->setQueueStatus("paused");
|
|
|
- qStatus = 2;
|
|
|
- }
|
|
|
- else
|
|
|
+ CConstWUClusterInfoArray clusters;
|
|
|
+ getEnvironmentClusterInfo(clusters);
|
|
|
+ ForEachItemIn(c, clusters)
|
|
|
+ {
|
|
|
+ IConstWUClusterInfo &cluster = clusters.item(c);
|
|
|
+ SCMStringBuffer str;
|
|
|
+ if (cluster.getThorProcesses().ordinality())
|
|
|
{
|
|
|
- returnCluster->setQueueStatus("running");
|
|
|
+ IEspThorCluster* returnCluster = new CThorCluster("","");
|
|
|
+ returnCluster->setThorLCR(0 == strcmp("thorlcr", cluster.getPlatform(str).str()) ? "withLCR" : "noLCR");
|
|
|
+ str.clear();
|
|
|
+ returnCluster->setClusterName(cluster.getName(str).str());
|
|
|
+ str.clear();
|
|
|
+ returnCluster->setQueueName(cluster.getThorQueue(str).str());
|
|
|
+ str.clear();
|
|
|
+ const char *queueName = cluster.getThorQueue(str).str();
|
|
|
+ Owned<IJobQueue> queue = createJobQueue(queueName);
|
|
|
+ addQueuedWorkUnits(queueName, queue, aws, context, "ThorMaster", NULL);
|
|
|
+
|
|
|
+ int serverID = runningQueueNames.find(queueName);
|
|
|
+ int numRunningJobsInQueue = (NotFound != serverID) ? runningJobsInQueue[serverID] : -1;
|
|
|
+ int color_type;
|
|
|
+ const char *queueState = getQueueState(queue, numRunningJobsInQueue, &color_type);
|
|
|
+ returnCluster->setQueueStatus(queueState);
|
|
|
+ if (version > 1.06)
|
|
|
+ returnCluster->setQueueStatus2(color_type);
|
|
|
+ returnCluster->setDoCommand(doCommand);
|
|
|
+ ThorClusters.append(*returnCluster);
|
|
|
}
|
|
|
-
|
|
|
- if (version > 1.06)
|
|
|
+ if (version > 1.06) // JCSMORE->WANGKX , is this necessary?
|
|
|
{
|
|
|
- int color_type = 6;
|
|
|
- if (serverID < 0)
|
|
|
- {
|
|
|
- if (qStatus > 1)
|
|
|
- color_type = 3;
|
|
|
- else
|
|
|
- color_type = 5;
|
|
|
- }
|
|
|
- else if (runningJobsInQueue[serverID] > 0)
|
|
|
+ str.clear();
|
|
|
+ if (cluster.getRoxieProcess(str).length())
|
|
|
{
|
|
|
- if (qStatus > 1)
|
|
|
- color_type = 1;
|
|
|
- else
|
|
|
- color_type = 4;
|
|
|
+ IEspRoxieCluster* returnCluster = new CRoxieCluster("","");
|
|
|
+ str.clear();
|
|
|
+ returnCluster->setClusterName(cluster.getName(str).str());
|
|
|
+ str.clear();
|
|
|
+ returnCluster->setQueueName(cluster.getAgentQueue(str).str());
|
|
|
+ str.clear();
|
|
|
+ const char *queueName = cluster.getAgentQueue(str).str();
|
|
|
+ Owned<IJobQueue> queue = createJobQueue(queueName);
|
|
|
+ addQueuedWorkUnits(queueName, queue, aws, context, "RoxieServer", NULL);
|
|
|
+ const char *queueState = getQueueState(queue, -1, NULL);
|
|
|
+ returnCluster->setQueueStatus(queueState);
|
|
|
+
|
|
|
+ RoxieClusters.append(*returnCluster);
|
|
|
}
|
|
|
- else if (qStatus > 1)
|
|
|
- {
|
|
|
- color_type = 2;
|
|
|
- }
|
|
|
- returnCluster->setQueueStatus2(color_type);
|
|
|
}
|
|
|
-
|
|
|
- returnCluster->setDoCommand(doCommand);
|
|
|
- ThorClusters.append(*returnCluster);
|
|
|
- }
|
|
|
-
|
|
|
- if (version > 1.06)
|
|
|
- {
|
|
|
- IArrayOf<IEspRoxieCluster> RoxieClusters;
|
|
|
- IArrayOf<IEspTpCluster> clusters1;
|
|
|
- dummy.getClusterProcessList(eqRoxieCluster,clusters1,true);
|
|
|
- ForEachItemIn(x1, clusters1)
|
|
|
- {
|
|
|
- IEspTpCluster& cluster = clusters1.item(x1);
|
|
|
- IEspRoxieCluster* returnCluster = new CRoxieCluster("","");
|
|
|
-
|
|
|
- returnCluster->setClusterName(cluster.getName());
|
|
|
- returnCluster->setQueueName(cluster.getQueueName());
|
|
|
-
|
|
|
- IArrayOf<IEspTpLogicalCluster> clusters1;
|
|
|
- dummy.getTargetClusterList(clusters1, eqRoxieCluster, cluster.getName());
|
|
|
- const char* queuename1 = cluster.getQueueName();
|
|
|
- if (clusters1.length() > 0)
|
|
|
- {
|
|
|
- IEspTpLogicalCluster& logicalCluster = clusters1.item(0);
|
|
|
- queuename1 = logicalCluster.getName();
|
|
|
- }
|
|
|
-
|
|
|
- QueueWrapper queue(queuename1);
|
|
|
-
|
|
|
- CJobQueueContents contents;
|
|
|
- queue->copyItems(contents);
|
|
|
- Owned<IJobQueueIterator> iter = contents.getIterator();
|
|
|
- unsigned count=0;
|
|
|
- ForEach(*iter)
|
|
|
- {
|
|
|
- try
|
|
|
- {
|
|
|
- Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(context, iter->query().queryWUID(),++count));
|
|
|
- wu->setServer("RoxieServer");
|
|
|
- wu->setInstance(cluster.getName());
|
|
|
- wu->setQueueName(cluster.getQueueName());
|
|
|
-
|
|
|
- aws.append(*wu.getLink());
|
|
|
- }
|
|
|
- catch (IException *e)
|
|
|
- {
|
|
|
- StringBuffer msg;
|
|
|
- Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(iter->query().queryWUID(), "", "", e->errorMessage(msg).str(), "normal"));
|
|
|
- wu->setServer("RoxieServer");
|
|
|
- wu->setInstance(cluster.getName());
|
|
|
- wu->setQueueName(cluster.getQueueName());
|
|
|
-
|
|
|
- aws.append(*wu.getLink());
|
|
|
- }
|
|
|
- }
|
|
|
- int qStatus = 1;
|
|
|
- if(queue->stopped())
|
|
|
- {
|
|
|
- returnCluster->setQueueStatus("stopped");
|
|
|
- qStatus = 3;
|
|
|
- }
|
|
|
- else if (queue->paused())
|
|
|
- {
|
|
|
- returnCluster->setQueueStatus("paused");
|
|
|
- qStatus = 2;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- returnCluster->setQueueStatus("running");
|
|
|
- }
|
|
|
-
|
|
|
- RoxieClusters.append(*returnCluster);
|
|
|
- }
|
|
|
- resp.setRoxieClusters(RoxieClusters);
|
|
|
}
|
|
|
+ resp.setThorClusters(ThorClusters);
|
|
|
+ resp.setRoxieClusters(RoxieClusters);
|
|
|
|
|
|
IArrayOf<IConstTpEclServer> eclccservers;
|
|
|
+ CTpWrapper dummy;
|
|
|
dummy.getTpEclCCServers(eclccservers);
|
|
|
ForEachItemIn(x1, eclccservers)
|
|
|
{
|
|
@@ -764,23 +648,8 @@ bool CWsSMCEx::onActivity(IEspContext &context, IEspActivityRequest &req, IEspAc
|
|
|
}
|
|
|
} while (services->next());
|
|
|
}
|
|
|
- resp.setThorClusters(ThorClusters);
|
|
|
resp.setRunning(aws);
|
|
|
|
|
|
- clusters.kill();
|
|
|
- dummy.getClusterProcessList(eqHoleCluster,clusters);
|
|
|
- IArrayOf<IEspHoleCluster> HoleClusters;
|
|
|
- ForEachItemIn(y, clusters)
|
|
|
- {
|
|
|
- IEspTpCluster& cluster = clusters.item(y);
|
|
|
- IEspHoleCluster* returnCluster = new CHoleCluster("","");
|
|
|
-
|
|
|
- returnCluster->setClusterName(cluster.getName());
|
|
|
- returnCluster->setDataModel(cluster.getDataModel());
|
|
|
- HoleClusters.append(*returnCluster);
|
|
|
- }
|
|
|
- resp.setHoleClusters(HoleClusters);
|
|
|
-
|
|
|
IArrayOf<IEspDFUJob> jobs;
|
|
|
conn.setown(querySDS().connect("DFU/RECOVERY",myProcessSession(),0, INFINITE));
|
|
|
if (conn)
|
|
@@ -851,7 +720,7 @@ bool CWsSMCEx::onMoveJobDown(IEspContext &context, IEspSMCJobRequest &req, IEspS
|
|
|
{
|
|
|
checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
|
|
|
|
|
|
- QueueWrapper queue(req.getClusterType(), req.getCluster());
|
|
|
+ Owned<IJobQueue> queue = createJobQueue(req.getCluster());
|
|
|
QueueLock lock(queue);
|
|
|
unsigned index=queue->findRank(req.getWuid());
|
|
|
if(index<queue->ordinality())
|
|
@@ -878,7 +747,7 @@ bool CWsSMCEx::onMoveJobUp(IEspContext &context, IEspSMCJobRequest &req, IEspSMC
|
|
|
{
|
|
|
checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
|
|
|
|
|
|
- QueueWrapper queue(req.getClusterType(), req.getCluster());
|
|
|
+ Owned<IJobQueue> queue = createJobQueue(req.getCluster());
|
|
|
QueueLock lock(queue);
|
|
|
unsigned index=queue->findRank(req.getWuid());
|
|
|
if(index>0 && index<queue->ordinality())
|
|
@@ -905,7 +774,7 @@ bool CWsSMCEx::onMoveJobBack(IEspContext &context, IEspSMCJobRequest &req, IEspS
|
|
|
{
|
|
|
checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
|
|
|
|
|
|
- QueueWrapper queue(req.getClusterType(), req.getCluster());
|
|
|
+ Owned<IJobQueue> queue = createJobQueue(req.getCluster());
|
|
|
QueueLock lock(queue);
|
|
|
|
|
|
unsigned index=queue->findRank(req.getWuid());
|
|
@@ -948,7 +817,7 @@ bool CWsSMCEx::onMoveJobFront(IEspContext &context, IEspSMCJobRequest &req, IEsp
|
|
|
{
|
|
|
checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
|
|
|
|
|
|
- QueueWrapper queue(req.getClusterType(), req.getCluster());
|
|
|
+ Owned<IJobQueue> queue = createJobQueue(req.getCluster());
|
|
|
QueueLock lock(queue);
|
|
|
|
|
|
unsigned index=queue->findRank(req.getWuid());
|
|
@@ -993,7 +862,7 @@ bool CWsSMCEx::onRemoveJob(IEspContext &context, IEspSMCJobRequest &req, IEspSMC
|
|
|
|
|
|
secAbortWorkUnit(req.getWuid(), *context.querySecManager(), *context.queryUser());
|
|
|
|
|
|
- QueueWrapper queue(req.getClusterType(), req.getCluster());
|
|
|
+ Owned<IJobQueue> queue = createJobQueue(req.getCluster());
|
|
|
QueueLock lock(queue);
|
|
|
|
|
|
unsigned index=queue->findRank(req.getWuid());
|
|
@@ -1019,7 +888,8 @@ bool CWsSMCEx::onStopQueue(IEspContext &context, IEspSMCQueueRequest &req, IEspS
|
|
|
{
|
|
|
checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
|
|
|
|
|
|
- QueueWrapper(req.getClusterType(), req.getCluster())->stop();
|
|
|
+ Owned<IJobQueue> queue = createJobQueue(req.getCluster());
|
|
|
+ queue->stop();
|
|
|
AccessSuccess(context, "Stopped queue %s",req.getCluster());
|
|
|
resp.setRedirectUrl("/WsSMC/");
|
|
|
}
|
|
@@ -1036,7 +906,8 @@ bool CWsSMCEx::onResumeQueue(IEspContext &context, IEspSMCQueueRequest &req, IEs
|
|
|
{
|
|
|
checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
|
|
|
|
|
|
- QueueWrapper(req.getClusterType(), req.getCluster())->resume();
|
|
|
+ Owned<IJobQueue> queue = createJobQueue(req.getCluster());
|
|
|
+ queue->resume();
|
|
|
AccessSuccess(context, "Resumed queue %s",req.getCluster());
|
|
|
resp.setRedirectUrl("/WsSMC/");
|
|
|
}
|
|
@@ -1053,7 +924,8 @@ bool CWsSMCEx::onPauseQueue(IEspContext &context, IEspSMCQueueRequest &req, IEsp
|
|
|
{
|
|
|
checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
|
|
|
|
|
|
- QueueWrapper(req.getClusterType(), req.getCluster())->pause();
|
|
|
+ Owned<IJobQueue> queue = createJobQueue(req.getCluster());
|
|
|
+ queue->pause();
|
|
|
AccessSuccess(context, "Paused queue %s",req.getCluster());
|
|
|
resp.setRedirectUrl("/WsSMC/");
|
|
|
}
|
|
@@ -1069,7 +941,7 @@ bool CWsSMCEx::onClearQueue(IEspContext &context, IEspSMCQueueRequest &req, IEsp
|
|
|
try
|
|
|
{
|
|
|
checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
|
|
|
- QueueWrapper queue(req.getClusterType(), req.getCluster());
|
|
|
+ Owned<IJobQueue> queue = createJobQueue(req.getCluster());
|
|
|
{
|
|
|
QueueLock lock(queue);
|
|
|
for(unsigned i=0;i<queue->ordinality();i++)
|
|
@@ -1102,7 +974,7 @@ bool CWsSMCEx::onSetJobPriority(IEspContext &context, IEspSMCPriorityRequest &re
|
|
|
|
|
|
// set job priority
|
|
|
int priority = lw->getPriorityValue();
|
|
|
- QueueWrapper queue(req.getClusterType(), req.getCluster());
|
|
|
+ Owned<IJobQueue> queue = createJobQueue(req.getCluster());
|
|
|
QueueLock lock(queue);
|
|
|
queue->changePriority(req.getWuid(),priority);
|
|
|
|
|
@@ -1122,20 +994,20 @@ bool CWsSMCEx::onGetThorQueueAvailability(IEspContext &context, IEspGetThorQueue
|
|
|
if (!context.validateFeatureAccess(FEATURE_URL, SecAccess_Read, false))
|
|
|
throw MakeStringException(ECLWATCH_SMC_ACCESS_DENIED, "Failed to get Thor Queue availability. Permission denied.");
|
|
|
|
|
|
- CTpWrapper dummy;
|
|
|
- IArrayOf<IEspTpCluster> clusters;
|
|
|
- dummy.getClusterProcessList(eqThorCluster,clusters,true);
|
|
|
+ StringArray thorNames, groupNames, targetNames, queueNames;
|
|
|
+ getEnvironmentThorClusterNames(thorNames, groupNames, targetNames, queueNames);
|
|
|
|
|
|
IArrayOf<IEspThorCluster> ThorClusters;
|
|
|
- ForEachItemIn(x, clusters)
|
|
|
+ ForEachItemIn(x, thorNames)
|
|
|
{
|
|
|
- IEspTpCluster& cluster = clusters.item(x);
|
|
|
+ const char* targetName = targetNames.item(x);
|
|
|
+ const char* queueName = queueNames.item(x);
|
|
|
IEspThorCluster* returnCluster = new CThorCluster("","");
|
|
|
|
|
|
- returnCluster->setClusterName(cluster.getName());
|
|
|
- returnCluster->setQueueName(cluster.getQueueName());
|
|
|
+ returnCluster->setClusterName(targetName);
|
|
|
+ returnCluster->setQueueName(queueName);
|
|
|
|
|
|
- QueueWrapper queue(cluster.getQueueName());
|
|
|
+ Owned<IJobQueue> queue = createJobQueue(queueName);
|
|
|
if(queue->stopped())
|
|
|
returnCluster->setQueueStatus("stopped");
|
|
|
else if (queue->paused())
|