|
@@ -754,7 +754,7 @@ bool CWsWorkunitsEx::onWUPublishWorkunit(IEspContext &context, IEspWUPublishWork
|
|
|
|
|
|
StringBuffer queryId;
|
|
|
WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
|
|
|
- addQueryToQuerySet(wu, target.str(), queryName.str(), NULL, activate, queryId, context.queryUserId());
|
|
|
+ addQueryToQuerySet(wu, target.str(), queryName.str(), activate, queryId, context.queryUserId());
|
|
|
if (req.getMemoryLimit() || !req.getTimeLimit_isNull() || !req.getWarnTimeLimit_isNull() || req.getPriority() || req.getComment())
|
|
|
{
|
|
|
Owned<IPropertyTree> queryTree = getQueryById(target.str(), queryId, false);
|
|
@@ -1797,6 +1797,170 @@ bool splitQueryPath(const char *path, StringBuffer &netAddress, StringBuffer &qu
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
+class QueryCloner
|
|
|
+{
|
|
|
+public:
|
|
|
+ QueryCloner(IEspContext *_context, const char *source, const char *_target) :
|
|
|
+ context(_context), cloneFilesEnabled(false), target(_target), overwriteDfs(false)
|
|
|
+ {
|
|
|
+ srcQuerySet.setown(getQueryRegistry(source, true));
|
|
|
+ if (!srcQuerySet)
|
|
|
+ throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Source Queryset %s not found", source);
|
|
|
+
|
|
|
+ destQuerySet.setown(getQueryRegistry(target, false));
|
|
|
+ if (!destQuerySet) // getQueryRegistry should have created if not found
|
|
|
+ throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Destination Queryset %s could not be created, or found", target.sget());
|
|
|
+
|
|
|
+ factory.setown(getWorkUnitFactory(context->querySecManager(), context->queryUser()));
|
|
|
+ }
|
|
|
+ void clone(const char *name, const char *id, bool makeActive)
|
|
|
+ {
|
|
|
+ Owned<IPropertyTree> query = getQueryById(srcQuerySet, id);
|
|
|
+ if (!query)
|
|
|
+ return;
|
|
|
+ const char *wuid = query->queryProp("@wuid");
|
|
|
+ if (!wuid || !*wuid)
|
|
|
+ return;
|
|
|
+ SCMStringBuffer existingQueryId;
|
|
|
+ queryIdFromQuerySetWuid(destQuerySet, wuid, existingQueryId);
|
|
|
+ if (existingQueryId.length())
|
|
|
+ {
|
|
|
+ existingQueryIds.append(existingQueryId.str());
|
|
|
+ if (makeActive)
|
|
|
+ activateQuery(destQuerySet, ACTIVATE_SUSPEND_PREVIOUS, name, existingQueryId.str(), context->queryUserId());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ StringBuffer newQueryId;
|
|
|
+ Owned<IWorkUnit> workunit = factory->updateWorkUnit(wuid);
|
|
|
+ addQueryToQuerySet(workunit, destQuerySet, name, makeActive ? ACTIVATE_SUSPEND_PREVIOUS : DO_NOT_ACTIVATE, newQueryId, context->queryUserId());
|
|
|
+ copiedQueryIds.append(newQueryId);
|
|
|
+ Owned<IPropertyTree> destQuery = getQueryById(destQuerySet, newQueryId);
|
|
|
+ if (destQuery)
|
|
|
+ {
|
|
|
+ Owned<IAttributeIterator> aiter = query->getAttributes();
|
|
|
+ ForEach(*aiter)
|
|
|
+ {
|
|
|
+ const char *atname = aiter->queryName();
|
|
|
+ if (!destQuery->hasProp(atname))
|
|
|
+ destQuery->setProp(atname, aiter->queryValue());
|
|
|
+ }
|
|
|
+ if (cloneFilesEnabled && wufiles)
|
|
|
+ wufiles->addFilesFromQuery(workunit, pm, newQueryId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void cloneActive(bool makeActive)
|
|
|
+ {
|
|
|
+ Owned<IPropertyTreeIterator> activeQueries = srcQuerySet->getElements("Alias");
|
|
|
+ ForEach(*activeQueries)
|
|
|
+ {
|
|
|
+ IPropertyTree &alias = activeQueries->query();
|
|
|
+ const char *name = alias.queryProp("@name");
|
|
|
+ const char *id = alias.queryProp("@id");
|
|
|
+ clone(name, id, makeActive);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void cloneAll(bool cloneActiveState)
|
|
|
+ {
|
|
|
+ Owned<IPropertyTreeIterator> allQueries = srcQuerySet->getElements("Query");
|
|
|
+ ForEach(*allQueries)
|
|
|
+ {
|
|
|
+ IPropertyTree &query = allQueries->query();
|
|
|
+ const char *name = query.queryProp("@name");
|
|
|
+ const char *id = query.queryProp("@id");
|
|
|
+ bool makeActive = false;
|
|
|
+ if (cloneActiveState)
|
|
|
+ {
|
|
|
+ VStringBuffer xpath("Alias[@id='%s']", id);
|
|
|
+ makeActive = srcQuerySet->hasProp(xpath);
|
|
|
+ }
|
|
|
+ clone(name, id, makeActive);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ void enableFileCloning(const char *dfsServer, const char *destProcess, const char *sourceProcess, bool _overwriteDfs, bool allowForeign)
|
|
|
+ {
|
|
|
+ cloneFilesEnabled = true;
|
|
|
+ overwriteDfs = _overwriteDfs;
|
|
|
+ splitDerivedDfsLocation(dfsServer, srcCluster, dfsIP, srcPrefix, sourceProcess, sourceProcess, NULL, NULL);
|
|
|
+ wufiles.setown(createReferencedFileList(context->queryUserId(), context->queryPassword(), allowForeign));
|
|
|
+ Owned<IHpccPackageSet> ps = createPackageSet(destProcess);
|
|
|
+ pm.set(ps->queryActiveMap(target));
|
|
|
+ process.set(destProcess);
|
|
|
+ }
|
|
|
+
|
|
|
+ void cloneFiles()
|
|
|
+ {
|
|
|
+ if (cloneFilesEnabled)
|
|
|
+ {
|
|
|
+ wufiles->resolveFiles(process, dfsIP, srcPrefix, srcCluster, !overwriteDfs, true);
|
|
|
+ Owned<IDFUhelper> helper = createIDFUhelper();
|
|
|
+ wufiles->cloneAllInfo(helper, overwriteDfs, true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+private:
|
|
|
+ Linked<IEspContext> context;
|
|
|
+ Linked<IWorkUnitFactory> factory;
|
|
|
+ Owned<IPropertyTree> destQuerySet;
|
|
|
+ Owned<IPropertyTree> srcQuerySet;
|
|
|
+ Owned<IReferencedFileList> wufiles;
|
|
|
+ Owned<const IHpccPackageMap> pm;
|
|
|
+ StringBuffer dfsIP;
|
|
|
+ StringBuffer srcCluster;
|
|
|
+ StringBuffer srcPrefix;
|
|
|
+ StringAttr target;
|
|
|
+ StringAttr process;
|
|
|
+ bool cloneFilesEnabled;
|
|
|
+ bool overwriteDfs;
|
|
|
+
|
|
|
+public:
|
|
|
+ StringArray existingQueryIds;
|
|
|
+ StringArray copiedQueryIds;
|
|
|
+};
|
|
|
+
|
|
|
+
|
|
|
+bool CWsWorkunitsEx::onWUCopyQuerySet(IEspContext &context, IEspWUCopyQuerySetRequest &req, IEspWUCopyQuerySetResponse &resp)
|
|
|
+{
|
|
|
+ const char *source = req.getSource();
|
|
|
+ if (!source || !*source)
|
|
|
+ throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No source target specified");
|
|
|
+ if (!isValidCluster(source))
|
|
|
+ throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid source target name: %s", source);
|
|
|
+
|
|
|
+ const char *target = req.getTarget();
|
|
|
+ if (!target || !*target)
|
|
|
+ throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No destination target specified");
|
|
|
+ if (!isValidCluster(target))
|
|
|
+ throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid destination target name: %s", target);
|
|
|
+
|
|
|
+ QueryCloner cloner(&context, source, target);
|
|
|
+
|
|
|
+ SCMStringBuffer process;
|
|
|
+ if (req.getCopyFiles())
|
|
|
+ {
|
|
|
+ Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target);
|
|
|
+ if (clusterInfo && clusterInfo->getPlatform()==RoxieCluster)
|
|
|
+ {
|
|
|
+ clusterInfo->getRoxieProcess(process);
|
|
|
+ if (!process.length())
|
|
|
+ throw MakeStringException(ECLWATCH_INVALID_CLUSTER_INFO, "DFS process cluster not found for destination target %s", target);
|
|
|
+ cloner.enableFileCloning(req.getDfsServer(), process.str(), req.getSourceProcess(), req.getOverwriteDfs(), req.getAllowForeignFiles());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (req.getActiveOnly())
|
|
|
+ cloner.cloneActive(req.getCloneActiveState());
|
|
|
+ else
|
|
|
+ cloner.cloneAll(req.getCloneActiveState());
|
|
|
+
|
|
|
+ cloner.cloneFiles();
|
|
|
+
|
|
|
+ resp.setCopiedQueries(cloner.copiedQueryIds);
|
|
|
+ resp.setExistingQueries(cloner.existingQueryIds);
|
|
|
+
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
bool CWsWorkunitsEx::onWUQuerysetCopyQuery(IEspContext &context, IEspWUQuerySetCopyQueryRequest &req, IEspWUQuerySetCopyQueryResponse &resp)
|
|
|
{
|
|
|
unsigned start = msTick();
|
|
@@ -1865,7 +2029,7 @@ bool CWsWorkunitsEx::onWUQuerysetCopyQuery(IEspContext &context, IEspWUQuerySetC
|
|
|
|
|
|
StringBuffer targetQueryId;
|
|
|
WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
|
|
|
- addQueryToQuerySet(wu, target, queryName.str(), NULL, activate, targetQueryId, context.queryUserId());
|
|
|
+ addQueryToQuerySet(wu, target, queryName.str(), activate, targetQueryId, context.queryUserId());
|
|
|
|
|
|
Owned<IPropertyTree> queryTree = getQueryById(target, targetQueryId, false);
|
|
|
if (queryTree)
|