|
@@ -233,7 +233,7 @@ bool CActivityInfo::isCachedActivityInfoValid(unsigned timeOutSeconds)
|
|
|
return timeNow.getSimple() <= timeCached.getSimple() + timeOutSeconds;;
|
|
|
}
|
|
|
|
|
|
-void CActivityInfo::createActivityInfo()
|
|
|
+void CActivityInfo::createActivityInfo(IEspContext& context)
|
|
|
{
|
|
|
Owned<IEnvironmentFactory> factory = getEnvironmentFactory();
|
|
|
Owned<IConstEnvironment> env = factory->openEnvironment();
|
|
@@ -261,7 +261,7 @@ void CActivityInfo::createActivityInfo()
|
|
|
IPropertyTree* serverStatusRoot = connStatusServers->queryRoot();
|
|
|
|
|
|
readTargetClusterInfo(clusters, serverStatusRoot);
|
|
|
- readActiveWUsAndQueuedWUs(envRoot, serverStatusRoot);
|
|
|
+ readActiveWUsAndQueuedWUs(context, envRoot, serverStatusRoot);
|
|
|
|
|
|
timeCached.setNow();
|
|
|
}
|
|
@@ -384,24 +384,24 @@ bool CActivityInfo::findQueueInStatusServer(IPropertyTree* serverStatusRoot, con
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
-void CActivityInfo::readActiveWUsAndQueuedWUs(IPropertyTree* envRoot, IPropertyTree* serverStatusRoot)
|
|
|
+void CActivityInfo::readActiveWUsAndQueuedWUs(IEspContext& context, IPropertyTree* envRoot, IPropertyTree* serverStatusRoot)
|
|
|
{
|
|
|
- readRunningWUsOnStatusServer(serverStatusRoot, WsSMCSSTThorLCRCluster);
|
|
|
- readWUsInTargetClusterJobQueues(thorTargetClusters);
|
|
|
- readRunningWUsOnStatusServer(serverStatusRoot, WsSMCSSTRoxieCluster);
|
|
|
- readWUsInTargetClusterJobQueues(roxieTargetClusters);
|
|
|
- readRunningWUsOnStatusServer(serverStatusRoot, WsSMCSSTHThorCluster);
|
|
|
- readWUsInTargetClusterJobQueues(hthorTargetClusters);
|
|
|
+ readRunningWUsOnStatusServer(context, serverStatusRoot, WsSMCSSTThorLCRCluster);
|
|
|
+ readWUsInTargetClusterJobQueues(context, thorTargetClusters);
|
|
|
+ readRunningWUsOnStatusServer(context, serverStatusRoot, WsSMCSSTRoxieCluster);
|
|
|
+ readWUsInTargetClusterJobQueues(context, roxieTargetClusters);
|
|
|
+ readRunningWUsOnStatusServer(context, serverStatusRoot, WsSMCSSTHThorCluster);
|
|
|
+ readWUsInTargetClusterJobQueues(context, hthorTargetClusters);
|
|
|
|
|
|
- readRunningWUsOnStatusServer(serverStatusRoot, WsSMCSSTECLagent);
|
|
|
- readRunningWUsAndJobQueueforOtherStatusServers(serverStatusRoot);
|
|
|
+ readRunningWUsOnStatusServer(context, serverStatusRoot, WsSMCSSTECLagent);
|
|
|
+ readRunningWUsAndJobQueueforOtherStatusServers(context, serverStatusRoot);
|
|
|
//TODO: add queued WUs for ECLCCServer/ECLServer here. Right now, they are under target clusters.
|
|
|
|
|
|
getDFUServersAndWUs(envRoot, serverStatusRoot);
|
|
|
getDFURecoveryJobs();
|
|
|
}
|
|
|
|
|
|
-void CActivityInfo::readRunningWUsOnStatusServer(IPropertyTree* serverStatusRoot, WsSMCStatusServerType statusServerType)
|
|
|
+void CActivityInfo::readRunningWUsOnStatusServer(IEspContext& context, IPropertyTree* serverStatusRoot, WsSMCStatusServerType statusServerType)
|
|
|
{
|
|
|
const char* serverName = getStatusServerTypeName(statusServerType);
|
|
|
if (!serverName || !*serverName)
|
|
@@ -423,7 +423,7 @@ void CActivityInfo::readRunningWUsOnStatusServer(IPropertyTree* serverStatusRoot
|
|
|
ForEach(*wuids)
|
|
|
{
|
|
|
const char* wuid=wuids->query().queryProp(NULL);
|
|
|
- if (!wuid || !*wuid || isDuplicatedECLWUID(wuid))
|
|
|
+ if (!wuid || !*wuid || checkSetUniqueECLWUID(wuid))
|
|
|
continue;
|
|
|
|
|
|
CWsSMCTargetCluster* targetCluster;
|
|
@@ -453,7 +453,7 @@ void CActivityInfo::readRunningWUsOnStatusServer(IPropertyTree* serverStatusRoot
|
|
|
else
|
|
|
queueName.append(targetCluster->queueName.get());
|
|
|
|
|
|
- createActiveWorkUnit(wu, wuid, !strieq(targetClusterName, serverInstance.str()) ? serverInstance.str() : NULL, 0, serverName,
|
|
|
+ createActiveWorkUnit(context, wu, wuid, !strieq(targetClusterName, serverInstance.str()) ? serverInstance.str() : NULL, 0, serverName,
|
|
|
queueName, serverInstance.str(), targetClusterName, false);
|
|
|
|
|
|
if (wu->getStateID() == WUStateRunning) //'aborting' may be another possible status
|
|
@@ -477,7 +477,7 @@ void CActivityInfo::readRunningWUsOnStatusServer(IPropertyTree* serverStatusRoot
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- createActiveWorkUnit(wu, wuid, serverInstance.str(), 0, serverName, serverName, serverInstance.str(), targetClusterName, false);
|
|
|
+ createActiveWorkUnit(context, wu, wuid, serverInstance.str(), 0, serverName, serverName, serverInstance.str(), targetClusterName, false);
|
|
|
|
|
|
if (targetCluster->clusterType == ThorLCRCluster)
|
|
|
wu->setClusterType(CLUSTER_TYPE_THOR);
|
|
@@ -509,7 +509,7 @@ void CActivityInfo::readRunningWUsOnStatusServer(IPropertyTree* serverStatusRoot
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-bool CActivityInfo::isDuplicatedECLWUID(const char* wuid)
|
|
|
+bool CActivityInfo::checkSetUniqueECLWUID(const char* wuid)
|
|
|
{
|
|
|
bool* idFound = uniqueECLWUIDs.getValue(wuid);
|
|
|
if (!idFound || !*idFound)
|
|
@@ -563,7 +563,7 @@ CWsSMCTargetCluster* CActivityInfo::findTargetCluster(const char* clusterName, C
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
-void CActivityInfo::createActiveWorkUnit(Owned<IEspActiveWorkunit>& ownedWU, const char* wuid, const char* location,
|
|
|
+void CActivityInfo::createActiveWorkUnit(IEspContext& context, Owned<IEspActiveWorkunit>& ownedWU, const char* wuid, const char* location,
|
|
|
unsigned index, const char* serverName, const char* queueName, const char* instanceName, const char* targetClusterName, bool useContext)
|
|
|
{
|
|
|
try
|
|
@@ -591,36 +591,36 @@ void CActivityInfo::createActiveWorkUnit(Owned<IEspActiveWorkunit>& ownedWU, con
|
|
|
ownedWU->setTargetClusterName(targetClusterName);
|
|
|
}
|
|
|
|
|
|
-void CActivityInfo::readWUsInTargetClusterJobQueues(CIArrayOf<CWsSMCTargetCluster>& targetClusters)
|
|
|
+void CActivityInfo::readWUsInTargetClusterJobQueues(IEspContext& context, CIArrayOf<CWsSMCTargetCluster>& targetClusters)
|
|
|
{
|
|
|
ForEachItemIn(i, targetClusters)
|
|
|
{
|
|
|
CWsSMCTargetCluster& targetCluster = targetClusters.item(i);
|
|
|
if (targetCluster.clusterType == ThorLCRCluster)
|
|
|
- readWUsInTargetClusterJobQueue(targetCluster, targetCluster.clusterQueue, targetCluster.clusterName.get());
|
|
|
+ readWUsInTargetClusterJobQueue(context, targetCluster, targetCluster.clusterQueue, targetCluster.clusterName.get());
|
|
|
if (targetCluster.agentQueue.queueName.length())
|
|
|
- readWUsInTargetClusterJobQueue(targetCluster, targetCluster.agentQueue, targetCluster.agentQueue.queueName.str());
|
|
|
+ readWUsInTargetClusterJobQueue(context, targetCluster, targetCluster.agentQueue, targetCluster.agentQueue.queueName.str());
|
|
|
if (targetCluster.serverQueue.queueName.length()) //TODO: queued WUs for ECLCCServer/ECLServer should not be here.
|
|
|
- readWUsInTargetClusterJobQueue(targetCluster, targetCluster.serverQueue, targetCluster.serverQueue.queueName.str());
|
|
|
+ readWUsInTargetClusterJobQueue(context, targetCluster, targetCluster.serverQueue, targetCluster.serverQueue.queueName.str());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-void CActivityInfo::readWUsInTargetClusterJobQueue(CWsSMCTargetCluster& targetCluster, CWsSMCQueue& jobQueue, const char* queueName)
|
|
|
+void CActivityInfo::readWUsInTargetClusterJobQueue(IEspContext& context, CWsSMCTargetCluster& targetCluster, CWsSMCQueue& jobQueue, const char* queueName)
|
|
|
{
|
|
|
ForEachItemIn(i, targetCluster.queuedWUIDs)
|
|
|
{
|
|
|
const char* wuid = targetCluster.queuedWUIDs.item(i);
|
|
|
- if (!wuid || !*wuid || isDuplicatedECLWUID(wuid))
|
|
|
+ if (!wuid || !*wuid || checkSetUniqueECLWUID(wuid))
|
|
|
continue;
|
|
|
|
|
|
Owned<IEspActiveWorkunit> wu;
|
|
|
- createActiveWorkUnit(wu, wuid, jobQueue.queueName.str(), ++jobQueue.countQueuedJobs, targetCluster.statusServerName.str(),
|
|
|
+ createActiveWorkUnit(context, wu, wuid, jobQueue.queueName.str(), ++jobQueue.countQueuedJobs, targetCluster.statusServerName.str(),
|
|
|
queueName, NULL, targetCluster.clusterName.get(), false);
|
|
|
aws.append(*wu.getClear());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-void CActivityInfo::readRunningWUsAndJobQueueforOtherStatusServers(IPropertyTree* serverStatusRoot)
|
|
|
+void CActivityInfo::readRunningWUsAndJobQueueforOtherStatusServers(IEspContext& context, IPropertyTree* serverStatusRoot)
|
|
|
{
|
|
|
BoolHash uniqueServers;
|
|
|
Owned<IPropertyTreeIterator> it(serverStatusRoot->getElements("Server"));
|
|
@@ -641,11 +641,11 @@ void CActivityInfo::readRunningWUsAndJobQueueforOtherStatusServers(IPropertyTree
|
|
|
ForEach(*wuids)
|
|
|
{
|
|
|
const char* wuid=wuids->query().queryProp(NULL);
|
|
|
- if (!wuid || !*wuid || isDuplicatedECLWUID(wuid))
|
|
|
+ if (!wuid || !*wuid || checkSetUniqueECLWUID(wuid))
|
|
|
continue;
|
|
|
|
|
|
Owned<IEspActiveWorkunit> wu;
|
|
|
- createActiveWorkUnit(wu, wuid, NULL, 0, serverName, queueName, instanceName.str(), NULL, false);
|
|
|
+ createActiveWorkUnit(context, wu, wuid, NULL, 0, serverName, queueName, instanceName.str(), NULL, false);
|
|
|
aws.append(*wu.getClear());
|
|
|
}
|
|
|
|
|
@@ -1091,8 +1091,8 @@ CActivityInfo* CWsSMCEx::getActivityInfo(IEspContext &context)
|
|
|
DBGLOG("CWsSMCEx::getActivityInfo - rebuild cached information");
|
|
|
{
|
|
|
EspTimeSection timer("createActivityInfo");
|
|
|
- activityInfoCache.setown(new CActivityInfo(context));
|
|
|
- activityInfoCache->createActivityInfo();
|
|
|
+ activityInfoCache.setown(new CActivityInfo());
|
|
|
+ activityInfoCache->createActivityInfo(context);
|
|
|
}
|
|
|
|
|
|
return activityInfoCache.getLink();
|