|
@@ -2485,6 +2485,9 @@ protected:
|
|
|
Owned<ITimeReporter> myTimer;
|
|
|
mutable MapStringToMyClass<IResolvedFile> fileCache;
|
|
|
SafeSocket *client;
|
|
|
+ StringArray clusterNames;
|
|
|
+ int clusterWidth = -1;
|
|
|
+
|
|
|
bool isBlocked;
|
|
|
bool isHttp;
|
|
|
bool trim;
|
|
@@ -2573,6 +2576,8 @@ protected:
|
|
|
}
|
|
|
else
|
|
|
wu->setState(WUStateRunning);
|
|
|
+ clusterNames.append(workUnit->queryClusterName());
|
|
|
+ clusterWidth = -1;
|
|
|
}
|
|
|
|
|
|
void initDebugMode(bool breakAtStart, const char *debugUID)
|
|
@@ -2830,6 +2835,8 @@ public:
|
|
|
{
|
|
|
WorkunitUpdate w(&workUnit->lock());
|
|
|
w->setState(aborted ? WUStateAborted : (failed ? WUStateFailed : WUStateCompleted));
|
|
|
+ while (clusterNames.ordinality())
|
|
|
+ restoreCluster();
|
|
|
addTimeStamp(w, SSTglobal, NULL, StWhenQueryFinished);
|
|
|
updateWorkunitTimings(w, myTimer);
|
|
|
Owned<IStatisticGatherer> gatherer = createGlobalStatisticGatherer(w);
|
|
@@ -3579,7 +3586,49 @@ public:
|
|
|
return strdup(querySetName.str()); // StringAttr::str() will return "" rather than NULL
|
|
|
}
|
|
|
}
|
|
|
- virtual char *getGroupName() { throwUnexpected(); }
|
|
|
+ virtual char *getGroupName()
|
|
|
+ {
|
|
|
+ StringBuffer groupName;
|
|
|
+ if (workUnit && clusterNames.length())
|
|
|
+ {
|
|
|
+ const char * cluster = clusterNames.tos();
|
|
|
+ Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
|
|
|
+ if (!clusterInfo)
|
|
|
+ throw MakeStringException(-1, "Unknown cluster '%s'", cluster);
|
|
|
+ const StringArray &thors = clusterInfo->getThorProcesses();
|
|
|
+ if (thors.length())
|
|
|
+ {
|
|
|
+ StringArray envClusters, envGroups, envTargets, envQueues;
|
|
|
+ getEnvironmentThorClusterNames(envClusters, envGroups, envTargets, envQueues);
|
|
|
+ ForEachItemIn(i, thors)
|
|
|
+ {
|
|
|
+ const char *thorName = thors.item(i);
|
|
|
+ ForEachItemIn(j, envClusters)
|
|
|
+ {
|
|
|
+ if (strieq(thorName, envClusters.item(j)))
|
|
|
+ {
|
|
|
+ const char *envGroup = envGroups.item(j);
|
|
|
+ if (groupName.length())
|
|
|
+ {
|
|
|
+ if (!strieq(groupName, envGroup))
|
|
|
+ throw MakeStringException(-1, "getGroupName(): ambiguous groups %s, %s", groupName.str(), envGroup);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ groupName.append(envGroup);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ StringBufferAdaptor a(groupName);
|
|
|
+ clusterInfo->getRoxieProcess(a);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return groupName.detach();
|
|
|
+ }
|
|
|
virtual char *queryIndexMetaData(char const * lfn, char const * xpath) { throwUnexpected(); }
|
|
|
virtual char *getEnv(const char *name, const char *defaultValue) const
|
|
|
{
|
|
@@ -3596,7 +3645,16 @@ public:
|
|
|
virtual char *getJobOwner() { throwUnexpected(); }
|
|
|
virtual char *getPlatform()
|
|
|
{
|
|
|
- return strdup("roxie");
|
|
|
+ if (clusterNames.length())
|
|
|
+ {
|
|
|
+ const char * cluster = clusterNames.tos();
|
|
|
+ Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
|
|
|
+ if (!clusterInfo)
|
|
|
+ throw MakeStringException(-1, "Unknown Cluster '%s'", cluster);
|
|
|
+ return strdup(clusterTypeString(clusterInfo->getPlatform(), false));
|
|
|
+ }
|
|
|
+ else
|
|
|
+ return strdup("roxie");
|
|
|
}
|
|
|
virtual char *getWuid()
|
|
|
{
|
|
@@ -3693,7 +3751,23 @@ public:
|
|
|
virtual bool fileExists(const char * filename) { throwUnexpected(); }
|
|
|
virtual void deleteFile(const char * logicalName) { throwUnexpected(); }
|
|
|
|
|
|
- virtual unsigned getNodes() { return numChannels; }
|
|
|
+ virtual unsigned getNodes()
|
|
|
+ {
|
|
|
+ if (clusterNames.length())
|
|
|
+ {
|
|
|
+ if (clusterWidth == -1)
|
|
|
+ {
|
|
|
+ const char * cluster = clusterNames.tos();
|
|
|
+ Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
|
|
|
+ if (!clusterInfo)
|
|
|
+ throw MakeStringException(-1, "Unknown cluster '%s'", cluster);
|
|
|
+ clusterWidth = clusterInfo->getSize();
|
|
|
+ }
|
|
|
+ return clusterWidth;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ return numChannels;
|
|
|
+ }
|
|
|
virtual unsigned getNodeNum() { return 0; }
|
|
|
virtual char *getFilePart(const char *logicalPart, bool create=false) { UNIMPLEMENTED; }
|
|
|
virtual unsigned __int64 getFileOffset(const char *logicalPart) { throwUnexpected(); }
|
|
@@ -3710,9 +3784,33 @@ public:
|
|
|
virtual IConstWorkUnit *queryWorkUnit() const { return workUnit; }
|
|
|
virtual bool outputResultsToSocket() const { return client != NULL; }
|
|
|
|
|
|
- virtual void selectCluster(const char * cluster) { throwUnexpected(); }
|
|
|
- virtual void restoreCluster() { throwUnexpected(); }
|
|
|
-
|
|
|
+ virtual void selectCluster(const char * newCluster)
|
|
|
+ {
|
|
|
+ if (workUnit)
|
|
|
+ {
|
|
|
+ const char *oldCluster = workUnit->queryClusterName();
|
|
|
+ SCMStringBuffer bStr;
|
|
|
+ ClusterType targetClusterType = getClusterType(workUnit->getDebugValue("targetClusterType", bStr).str(), RoxieCluster);
|
|
|
+ if (targetClusterType==RoxieCluster)
|
|
|
+ {
|
|
|
+ if (!streq(oldCluster, newCluster))
|
|
|
+ throw MakeStringException(-1, "Error - cannot switch cluster if not targetting thor jobs");
|
|
|
+ }
|
|
|
+ clusterNames.append(oldCluster);
|
|
|
+ WorkunitUpdate wu = updateWorkUnit();
|
|
|
+ if (wu)
|
|
|
+ wu->setClusterName(newCluster);
|
|
|
+ clusterWidth = -1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ virtual void restoreCluster()
|
|
|
+ {
|
|
|
+ WorkunitUpdate wu = updateWorkUnit();
|
|
|
+ if (wu)
|
|
|
+ wu->setClusterName(clusterNames.item(clusterNames.length()-1));
|
|
|
+ clusterNames.pop();
|
|
|
+ clusterWidth = -1;
|
|
|
+ }
|
|
|
};
|
|
|
|
|
|
//================================================================================================
|