ws_workunitsQuerySets.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "ws_workunitsService.hpp"
  15. #include "ws_fs.hpp"
  16. #include "jlib.hpp"
  17. #include "daclient.hpp"
  18. #include "dalienv.hpp"
  19. #include "dadfs.hpp"
  20. #include "dfuwu.hpp"
  21. #include "eclhelper.hpp"
  22. const unsigned roxieQueryRoxieTimeOut = 60000;
  23. #define SDS_LOCK_TIMEOUT (5*60*1000) // 5mins, 30s a bit short
  24. bool isRoxieProcess(const char *process)
  25. {
  26. if (!process)
  27. return false;
  28. Owned<IRemoteConnection> conn = querySDS().connect("Environment", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
  29. if (!conn)
  30. return false;
  31. VStringBuffer xpath("Software/RoxieCluster[@name=\"%s\"]", process);
  32. return conn->queryRoot()->hasProp(xpath.str());
  33. }
  34. void doWuFileCopy(IClientFileSpray &fs, IEspWULogicalFileCopyInfo &info, const char *logicalname, const char *cluster, bool isRoxie, bool supercopy)
  35. {
  36. try
  37. {
  38. Owned<IClientCopy> req = fs.createCopyRequest();
  39. req->setSourceLogicalName(logicalname);
  40. req->setDestLogicalName(logicalname);
  41. req->setDestGroup(cluster);
  42. req->setSuperCopy(supercopy);
  43. if (isRoxie)
  44. req->setDestGroupRoxie("Yes");
  45. Owned<IClientCopyResponse> resp = fs.Copy(req);
  46. info.setDfuCopyWuid(resp->getResult());
  47. }
  48. catch (IException *e)
  49. {
  50. StringBuffer msg;
  51. info.setDfuCopyError(e->errorMessage(msg).str());
  52. }
  53. }
  54. bool copyWULogicalFiles(IEspContext &context, IConstWorkUnit &cw, const char *cluster, bool copyLocal, IEspWUCopyLogicalClusterFileSections &lfinfo)
  55. {
  56. if (isEmpty(cluster))
  57. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "copyWULogicalFiles Cluster parameter not set.");
  58. Owned<IUserDescriptor> udesc = createUserDescriptor();
  59. udesc->set(context.queryUserId(), context.queryPassword());
  60. IArrayOf<IEspWULogicalFileCopyInfo> foreign;
  61. IArrayOf<IEspWULogicalFileCopyInfo> onCluster;
  62. IArrayOf<IEspWULogicalFileCopyInfo> notOnCluster;
  63. IArrayOf<IEspWULogicalFileCopyInfo> notFound;
  64. Owned<IClientFileSpray> fs;
  65. if (copyLocal)
  66. {
  67. fs.setown(createFileSprayClient());
  68. VStringBuffer url("http://.:%d/FileSpray", 8010);
  69. fs->addServiceUrl(url.str());
  70. }
  71. bool isRoxie = isRoxieProcess(cluster);
  72. Owned<IConstWUGraphIterator> graphs = &cw.getGraphs(GraphTypeActivities);
  73. ForEach(*graphs)
  74. {
  75. Owned <IPropertyTree> xgmml = graphs->query().getXGMMLTree(false);
  76. Owned<IPropertyTreeIterator> iter = xgmml->getElements(".//node");
  77. ForEach(*iter)
  78. {
  79. try
  80. {
  81. IPropertyTree &node = iter->query();
  82. ThorActivityKind kind = (ThorActivityKind) node.getPropInt("att[@name='_kind']/@value", TAKnone);
  83. if(kind==TAKdiskwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite)
  84. continue;
  85. if (node.getPropBool("att[@name='_isSpill']/@value") || node.getPropBool("att[@name='_isTransformSpill']/@value"))
  86. continue;
  87. Owned<IEspWULogicalFileCopyInfo> info = createWULogicalFileCopyInfo();
  88. const char *logicalname = node.queryProp("att[@name='_indexFileName']/@value");
  89. if (logicalname)
  90. info->setIsIndex(true);
  91. else
  92. logicalname = node.queryProp("att[@name='_fileName']/@value");
  93. info->setLogicalName(logicalname);
  94. if (logicalname)
  95. {
  96. if (!strnicmp("~foreign::", logicalname, 10))
  97. foreign.append(*info.getClear());
  98. else
  99. {
  100. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalname, udesc);
  101. if(!df)
  102. notFound.append(*info.getClear());
  103. else if (df->findCluster(cluster)!=NotFound)
  104. {
  105. onCluster.append(*info.getClear());
  106. }
  107. else
  108. {
  109. StringArray clusters;
  110. df->getClusterNames(clusters);
  111. info->setClusters(clusters);
  112. if (copyLocal)
  113. {
  114. StringBuffer wuid;
  115. bool supercopy = queryDistributedFileDirectory().isSuperFile(logicalname, NULL, udesc);
  116. doWuFileCopy(*fs, *info, logicalname, cluster, isRoxie, supercopy);
  117. }
  118. notOnCluster.append(*info.getClear());
  119. }
  120. }
  121. }
  122. }
  123. catch(IException *e)
  124. {
  125. e->Release();
  126. }
  127. }
  128. lfinfo.setClusterName(cluster);
  129. lfinfo.setNotOnCluster(notOnCluster);
  130. lfinfo.setOnCluster(onCluster);
  131. lfinfo.setForeign(foreign);
  132. lfinfo.setNotFound(notFound);
  133. }
  134. return true;
  135. }
  136. void copyWULogicalFilesToTarget(IEspContext &context, IConstWUClusterInfo &clusterInfo, IConstWorkUnit &cw, IArrayOf<IConstWUCopyLogicalClusterFileSections> &clusterfiles, bool doLocalCopy)
  137. {
  138. const StringArray &thors = clusterInfo.getThorProcesses();
  139. ForEachItemIn(i, thors)
  140. {
  141. Owned<IEspWUCopyLogicalClusterFileSections> files = createWUCopyLogicalClusterFileSections();
  142. copyWULogicalFiles(context, cw, thors.item(i), doLocalCopy, *files);
  143. clusterfiles.append(*files.getClear());
  144. }
  145. SCMStringBuffer roxie;
  146. clusterInfo.getRoxieProcess(roxie);
  147. if (roxie.length())
  148. {
  149. Owned<IEspWUCopyLogicalClusterFileSections> files = createWUCopyLogicalClusterFileSections();
  150. copyWULogicalFiles(context, cw, roxie.str(), doLocalCopy, *files);
  151. clusterfiles.append(*files.getClear());
  152. }
  153. }
  154. bool CWsWorkunitsEx::onWUCopyLogicalFiles(IEspContext &context, IEspWUCopyLogicalFilesRequest &req, IEspWUCopyLogicalFilesResponse &resp)
  155. {
  156. if (isEmpty(req.getWuid()))
  157. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "WUCopyLogicalFiles WUID parameter not set.");
  158. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  159. Owned<IConstWorkUnit> cw = factory->openWorkUnit(req.getWuid(), false);
  160. if (!cw)
  161. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s", req.getWuid());
  162. resp.setWuid(req.getWuid());
  163. SCMStringBuffer cluster;
  164. if (notEmpty(req.getCluster()))
  165. cluster.set(req.getCluster());
  166. else
  167. cw->getClusterName(cluster);
  168. Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster.str());
  169. IArrayOf<IConstWUCopyLogicalClusterFileSections> clusterfiles;
  170. copyWULogicalFilesToTarget(context, *clusterInfo, *cw, clusterfiles, req.getCopyLocal());
  171. resp.setClusterFiles(clusterfiles);
  172. return true;
  173. }
  174. bool CWsWorkunitsEx::onWUDeployWorkunit(IEspContext &context, IEspWUDeployWorkunitRequest & req, IEspWUDeployWorkunitResponse & resp)
  175. {
  176. if (isEmpty(req.getWuid()))
  177. throw MakeStringException(ECLWATCH_NO_WUID_SPECIFIED,"No Workunit ID has been specified.");
  178. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  179. Owned<IConstWorkUnit> cw = factory->openWorkUnit(req.getWuid(), false);
  180. if (!cw)
  181. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot find the workunit %s", req.getWuid());
  182. resp.setWuid(req.getWuid());
  183. SCMStringBuffer queryName;
  184. if (notEmpty(req.getJobName()))
  185. queryName.set(req.getJobName());
  186. else
  187. cw->getJobName(queryName).str();
  188. SCMStringBuffer cluster;
  189. cw->getClusterName(cluster);
  190. Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster.str());
  191. SCMStringBuffer queryset;
  192. clusterInfo->getQuerySetName(queryset);
  193. WorkunitUpdate wu(&cw->lock());
  194. if (notEmpty(req.getJobName()))
  195. wu->setJobName(req.getJobName());
  196. StringBuffer queryId;
  197. addQueryToQuerySet(wu, queryset.str(), queryName.str(), NULL, (WUQueryActivationOptions)req.getActivate(), queryId);
  198. wu->commit();
  199. wu.clear();
  200. if (req.getCopyLocal() || req.getShowFiles())
  201. {
  202. IArrayOf<IConstWUCopyLogicalClusterFileSections> clusterfiles;
  203. copyWULogicalFilesToTarget(context, *clusterInfo, *cw, clusterfiles, req.getCopyLocal());
  204. resp.setClusterFiles(clusterfiles);
  205. }
  206. return true;
  207. }
  208. bool CWsWorkunitsEx::onWUQuerysets(IEspContext &context, IEspWUQuerysetsRequest & req, IEspWUQuerysetsResponse & resp)
  209. {
  210. Owned<IPropertyTree> queryRegistry = getQueryRegistryRoot();
  211. if (!queryRegistry)
  212. return false;
  213. IArrayOf<IEspQuerySet> querySets;
  214. Owned<IPropertyTreeIterator> it = queryRegistry->getElements("QuerySet");
  215. ForEach(*it)
  216. {
  217. Owned<IEspQuerySet> qs = createQuerySet("", "");
  218. qs->setQuerySetName(it->query().queryProp("@id"));
  219. querySets.append(*qs.getClear());
  220. }
  221. resp.setQuerysets(querySets);
  222. return true;
  223. }
  224. bool CWsWorkunitsEx::onWUQuerysetDetails(IEspContext &context, IEspWUQuerySetDetailsRequest & req, IEspWUQuerySetDetailsResponse & resp)
  225. {
  226. resp.setQuerySetName(req.getQuerySetName());
  227. Owned<IPropertyTree> registry = getQueryRegistry(req.getQuerySetName(), true);
  228. if (!registry)
  229. return false;
  230. IArrayOf<IEspQuerySetQuery> querySetQueries;
  231. Owned<IPropertyTreeIterator> queries = registry->getElements("Query");
  232. ForEach(*queries)
  233. {
  234. IPropertyTree &query = queries->query();
  235. Owned<IEspQuerySetQuery> q = createQuerySetQuery("", "");
  236. q->setId(query.queryProp("@id"));
  237. q->setName(query.queryProp("@name"));
  238. q->setDll(query.queryProp("@dll"));
  239. q->setWuid(query.queryProp("@wuid"));
  240. q->setSuspended(query.getPropBool("@suspended", false));
  241. querySetQueries.append(*q.getLink());
  242. }
  243. resp.setQuerysetQueries(querySetQueries);
  244. IArrayOf<IEspQuerySetAlias> querySetAliases;
  245. Owned<IPropertyTreeIterator> aliases = registry->getElements("Alias");
  246. ForEach(*aliases)
  247. {
  248. IPropertyTree &alias = aliases->query();
  249. Owned<IEspQuerySetAlias> a = createQuerySetAlias("", "");
  250. a->setName(alias.queryProp("@name"));
  251. a->setId(alias.queryProp("@id"));
  252. querySetAliases.append(*a.getClear());
  253. }
  254. resp.setQuerysetAliases(querySetAliases);
  255. return true;
  256. }
  257. bool CWsWorkunitsEx::onWUQuerysetActionQueries(IEspContext &context, IEspWUQuerySetActionQueriesRequest & req, IEspWUQuerySetActionQueriesResponse & resp)
  258. {
  259. resp.setQuerySetName(req.getQuerySetName());
  260. resp.setRemove(req.getRemove());
  261. Owned<IPropertyTree> queryRegistry = getQueryRegistry(req.getQuerySetName(), false);
  262. IArrayOf<IEspQuerySetQueryAction> actions;
  263. ForEachItemIn(qa, req.getQuerysetQueryActions())
  264. {
  265. IConstQuerySetQueryAction& item=req.getQuerysetQueryActions().item(qa);
  266. if(notEmpty(item.getId()))
  267. {
  268. if (req.getRemove())
  269. {
  270. removeAliasesFromNamedQuery(queryRegistry, item.getId());
  271. removeNamedQuery(queryRegistry, item.getId());
  272. Owned<IEspQuerySetQueryAction> action = createQuerySetQueryAction("", "");
  273. action->setId(item.getId());
  274. action->setStatus("Completed");
  275. actions.append(*action.getClear());
  276. }
  277. if (req.getToggleSuspend())
  278. {
  279. setQuerySuspendedState(queryRegistry, item.getId(), !item.getSuspended());
  280. Owned<IEspQuerySetQueryAction> action = createQuerySetQueryAction("", "");
  281. action->setId(item.getId());
  282. action->setStatus("Completed");
  283. actions.append(*action.getClear());
  284. }
  285. }
  286. }
  287. resp.setQuerysetQueryActions(actions);
  288. return true;
  289. }
  290. bool CWsWorkunitsEx::onWUQuerysetActionAliases(IEspContext &context, IEspWUQuerySetActionAliasesRequest & req, IEspWUQuerySetActionAliasesResponse & resp)
  291. {
  292. resp.setQuerySetName(req.getQuerySetName());
  293. resp.setRemove(req.getRemove());
  294. Owned<IPropertyTree> queryRegistry = getQueryRegistry(req.getQuerySetName(), false);
  295. IArrayOf<IEspQuerySetAliasAction> actions;
  296. ForEachItemIn(aa, req.getQuerysetAliasActions())
  297. {
  298. IConstQuerySetAliasAction& item=req.getQuerysetAliasActions().item(aa);
  299. if (req.getRemove())
  300. {
  301. if(notEmpty(item.getId()))
  302. {
  303. removeAliasesFromNamedQuery(queryRegistry, item.getId());
  304. Owned<IEspQuerySetAliasAction> action = createQuerySetAliasAction("", "");
  305. action->setId(item.getId());
  306. action->setStatus("Completed");
  307. actions.append(*action.getClear());
  308. }
  309. }
  310. }
  311. resp.setQuerysetAliasActions(actions);
  312. return true;
  313. }