/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ #include "ws_workunitsService.hpp" #include "ws_fs.hpp" #include "jlib.hpp" #include "daclient.hpp" #include "dalienv.hpp" #include "dadfs.hpp" #include "dfuwu.hpp" #include "eclhelper.hpp" const unsigned roxieQueryRoxieTimeOut = 60000; #define SDS_LOCK_TIMEOUT (5*60*1000) // 5mins, 30s a bit short bool isRoxieProcess(const char *process) { if (!process) return false; Owned conn = querySDS().connect("Environment", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT); if (!conn) return false; VStringBuffer xpath("Software/RoxieCluster[@name=\"%s\"]", process); return conn->queryRoot()->hasProp(xpath.str()); } void doWuFileCopy(IClientFileSpray &fs, IEspWULogicalFileCopyInfo &info, const char *logicalname, const char *cluster, bool isRoxie, bool supercopy) { try { Owned req = fs.createCopyRequest(); req->setSourceLogicalName(logicalname); req->setDestLogicalName(logicalname); req->setDestGroup(cluster); req->setSuperCopy(supercopy); if (isRoxie) req->setDestGroupRoxie("Yes"); Owned resp = fs.Copy(req); info.setDfuCopyWuid(resp->getResult()); } catch (IException *e) { StringBuffer msg; info.setDfuCopyError(e->errorMessage(msg).str()); } } bool copyWULogicalFiles(IEspContext &context, IConstWorkUnit &cw, const char *cluster, bool copyLocal, IEspWUCopyLogicalClusterFileSections &lfinfo) { if (isEmpty(cluster)) throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "copyWULogicalFiles Cluster parameter not set."); Owned udesc = createUserDescriptor(); udesc->set(context.queryUserId(), context.queryPassword()); IArrayOf foreign; IArrayOf onCluster; IArrayOf notOnCluster; IArrayOf notFound; Owned fs; if (copyLocal) { fs.setown(createFileSprayClient()); VStringBuffer url("http://.:%d/FileSpray", 8010); fs->addServiceUrl(url.str()); } bool isRoxie = isRoxieProcess(cluster); Owned graphs = &cw.getGraphs(GraphTypeActivities); ForEach(*graphs) { Owned xgmml = graphs->query().getXGMMLTree(false); Owned iter = xgmml->getElements(".//node"); ForEach(*iter) { try { IPropertyTree &node = iter->query(); ThorActivityKind kind = (ThorActivityKind) node.getPropInt("att[@name='_kind']/@value", TAKnone); if(kind==TAKdiskwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite) continue; if (node.getPropBool("att[@name='_isSpill']/@value") || node.getPropBool("att[@name='_isTransformSpill']/@value")) continue; Owned info = createWULogicalFileCopyInfo(); const char *logicalname = node.queryProp("att[@name='_indexFileName']/@value"); if (logicalname) info->setIsIndex(true); else logicalname = node.queryProp("att[@name='_fileName']/@value"); info->setLogicalName(logicalname); if (logicalname) { if (!strnicmp("~foreign::", logicalname, 10)) foreign.append(*info.getClear()); else { Owned df = queryDistributedFileDirectory().lookup(logicalname, udesc); if(!df) notFound.append(*info.getClear()); else if (df->findCluster(cluster)!=NotFound) { onCluster.append(*info.getClear()); } else { StringArray clusters; df->getClusterNames(clusters); info->setClusters(clusters); if (copyLocal) { StringBuffer wuid; bool supercopy = queryDistributedFileDirectory().isSuperFile(logicalname, NULL, udesc); doWuFileCopy(*fs, *info, logicalname, cluster, isRoxie, supercopy); } notOnCluster.append(*info.getClear()); } } } } catch(IException *e) { e->Release(); } } lfinfo.setClusterName(cluster); lfinfo.setNotOnCluster(notOnCluster); lfinfo.setOnCluster(onCluster); lfinfo.setForeign(foreign); lfinfo.setNotFound(notFound); } return true; } void copyWULogicalFilesToTarget(IEspContext &context, IConstWUClusterInfo &clusterInfo, IConstWorkUnit &cw, IArrayOf &clusterfiles, bool doLocalCopy) { const StringArray &thors = clusterInfo.getThorProcesses(); ForEachItemIn(i, thors) { Owned files = createWUCopyLogicalClusterFileSections(); copyWULogicalFiles(context, cw, thors.item(i), doLocalCopy, *files); clusterfiles.append(*files.getClear()); } SCMStringBuffer roxie; clusterInfo.getRoxieProcess(roxie); if (roxie.length()) { Owned files = createWUCopyLogicalClusterFileSections(); copyWULogicalFiles(context, cw, roxie.str(), doLocalCopy, *files); clusterfiles.append(*files.getClear()); } } bool CWsWorkunitsEx::onWUCopyLogicalFiles(IEspContext &context, IEspWUCopyLogicalFilesRequest &req, IEspWUCopyLogicalFilesResponse &resp) { if (isEmpty(req.getWuid())) throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "WUCopyLogicalFiles WUID parameter not set."); Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned cw = factory->openWorkUnit(req.getWuid(), false); if (!cw) throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s", req.getWuid()); resp.setWuid(req.getWuid()); SCMStringBuffer cluster; if (notEmpty(req.getCluster())) cluster.set(req.getCluster()); else cw->getClusterName(cluster); Owned clusterInfo = getTargetClusterInfo(cluster.str()); IArrayOf clusterfiles; copyWULogicalFilesToTarget(context, *clusterInfo, *cw, clusterfiles, req.getCopyLocal()); resp.setClusterFiles(clusterfiles); return true; } bool CWsWorkunitsEx::onWUPublishWorkunit(IEspContext &context, IEspWUPublishWorkunitRequest & req, IEspWUPublishWorkunitResponse & resp) { if (isEmpty(req.getWuid())) throw MakeStringException(ECLWATCH_NO_WUID_SPECIFIED,"No Workunit ID has been specified."); Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned cw = factory->openWorkUnit(req.getWuid(), false); if (!cw) throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot find the workunit %s", req.getWuid()); resp.setWuid(req.getWuid()); SCMStringBuffer queryName; if (notEmpty(req.getJobName())) queryName.set(req.getJobName()); else cw->getJobName(queryName).str(); SCMStringBuffer cluster; if (notEmpty(req.getCluster())) cluster.set(req.getCluster()); else cw->getClusterName(cluster); Owned clusterInfo = getTargetClusterInfo(cluster.str()); SCMStringBuffer queryset; clusterInfo->getQuerySetName(queryset); WorkunitUpdate wu(&cw->lock()); if (notEmpty(req.getJobName())) wu->setJobName(req.getJobName()); StringBuffer queryId; addQueryToQuerySet(wu, queryset.str(), queryName.str(), NULL, (WUQueryActivationOptions)req.getActivate(), queryId); wu->commit(); wu.clear(); if (queryId.length()) resp.setQueryId(queryId.str()); resp.setQueryName(queryName.str()); resp.setQuerySet(queryset.str()); if (req.getCopyLocal() || req.getShowFiles()) { IArrayOf clusterfiles; copyWULogicalFilesToTarget(context, *clusterInfo, *cw, clusterfiles, req.getCopyLocal()); resp.setClusterFiles(clusterfiles); } return true; } bool CWsWorkunitsEx::onWUQuerysets(IEspContext &context, IEspWUQuerysetsRequest & req, IEspWUQuerysetsResponse & resp) { Owned queryRegistry = getQueryRegistryRoot(); if (!queryRegistry) return false; IArrayOf querySets; Owned it = queryRegistry->getElements("QuerySet"); ForEach(*it) { Owned qs = createQuerySet("", ""); qs->setQuerySetName(it->query().queryProp("@id")); querySets.append(*qs.getClear()); } resp.setQuerysets(querySets); return true; } bool CWsWorkunitsEx::onWUQuerysetDetails(IEspContext &context, IEspWUQuerySetDetailsRequest & req, IEspWUQuerySetDetailsResponse & resp) { resp.setQuerySetName(req.getQuerySetName()); Owned registry = getQueryRegistry(req.getQuerySetName(), true); if (!registry) return false; IArrayOf querySetQueries; Owned queries = registry->getElements("Query"); ForEach(*queries) { IPropertyTree &query = queries->query(); Owned q = createQuerySetQuery("", ""); q->setId(query.queryProp("@id")); q->setName(query.queryProp("@name")); q->setDll(query.queryProp("@dll")); q->setWuid(query.queryProp("@wuid")); q->setSuspended(query.getPropBool("@suspended", false)); querySetQueries.append(*q.getLink()); } resp.setQuerysetQueries(querySetQueries); IArrayOf querySetAliases; Owned aliases = registry->getElements("Alias"); ForEach(*aliases) { IPropertyTree &alias = aliases->query(); Owned a = createQuerySetAlias("", ""); a->setName(alias.queryProp("@name")); a->setId(alias.queryProp("@id")); querySetAliases.append(*a.getClear()); } resp.setQuerysetAliases(querySetAliases); return true; } bool CWsWorkunitsEx::onWUQuerysetQueryAction(IEspContext &context, IEspWUQuerySetQueryActionRequest & req, IEspWUQuerySetQueryActionResponse & resp) { resp.setQuerySetName(req.getQuerySetName()); resp.setAction(req.getAction()); if (isEmpty(req.getQuerySetName())) throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Queryset name required"); Owned queryset = getQueryRegistry(req.getQuerySetName(), true); if (!queryset) throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySetName()); IArrayOf results; ForEachItemIn(i, req.getQueries()) { IConstQuerySetQueryActionItem& item=req.getQueries().item(i); Owned result = createQuerySetQueryActionResult(); try { VStringBuffer xpath("Query[@id='%s']", item.getQueryId()); IPropertyTree *query = queryset->queryPropTree(xpath.str()); if (!query) throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query %s/%s not found.", req.getQuerySetName(), item.getQueryId()); switch (req.getAction()) { case CQuerySetQueryActionTypes_ToggleSuspend: setQuerySuspendedState(queryset, item.getQueryId(), !item.getClientState().getSuspended()); break; case CQuerySetQueryActionTypes_Suspend: setQuerySuspendedState(queryset, item.getQueryId(), true); break; case CQuerySetQueryActionTypes_Unsuspend: setQuerySuspendedState(queryset, item.getQueryId(), false); break; case CQuerySetQueryActionTypes_Activate: setQueryAlias(queryset, query->queryProp("@name"), item.getQueryId()); break; case CQuerySetQueryActionTypes_Delete: removeAliasesFromNamedQuery(queryset, item.getQueryId()); removeNamedQuery(queryset, item.getQueryId()); break; case CQuerySetQueryActionTypes_RemoveAllAliases: removeAliasesFromNamedQuery(queryset, item.getQueryId()); break; } result->setSuccess(true); query = queryset->queryPropTree(xpath.str()); // refresh if (query) result->setSuspended(query->getPropBool("@suspended")); } catch(IException *e) { StringBuffer msg; result->setMessage(e->errorMessage(msg).str()); result->setCode(e->errorCode()); result->setSuccess(false); } results.append(*result.getClear()); } resp.setResults(results); return true; } bool CWsWorkunitsEx::onWUQuerysetAliasAction(IEspContext &context, IEspWUQuerySetAliasActionRequest &req, IEspWUQuerySetAliasActionResponse &resp) { resp.setQuerySetName(req.getQuerySetName()); resp.setAction(req.getAction()); if (isEmpty(req.getQuerySetName())) throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Queryset name required"); Owned queryset = getQueryRegistry(req.getQuerySetName(), true); if (!queryset) throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySetName()); IArrayOf results; ForEachItemIn(i, req.getAliases()) { IConstQuerySetAliasActionItem& item=req.getAliases().item(i); Owned result = createQuerySetAliasActionResult(); try { VStringBuffer xpath("Alias[@name='%s']", item.getName()); IPropertyTree *alias = queryset->queryPropTree(xpath.str()); if (!alias) throw MakeStringException(ECLWATCH_ALIAS_NOT_FOUND, "Alias %s/%s not found.", req.getQuerySetName(), item.getName()); switch (req.getAction()) { case CQuerySetAliasActionTypes_Deactivate: removeQuerySetAlias(req.getQuerySetName(), item.getName()); break; } result->setSuccess(true); } catch(IException *e) { StringBuffer msg; result->setMessage(e->errorMessage(msg).str()); result->setCode(e->errorCode()); result->setSuccess(false); } results.append(*result.getClear()); } resp.setResults(results); return true; }