ws_workunitsQuerySets.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "ws_workunitsService.hpp"
  15. #include "ws_fs.hpp"
  16. #include "jlib.hpp"
  17. #include "daclient.hpp"
  18. #include "dalienv.hpp"
  19. #include "dadfs.hpp"
  20. #include "dfuwu.hpp"
  21. #include "eclhelper.hpp"
  22. const unsigned roxieQueryRoxieTimeOut = 60000;
  23. #define SDS_LOCK_TIMEOUT (5*60*1000) // 5mins, 30s a bit short
  24. bool isRoxieProcess(const char *process)
  25. {
  26. if (!process)
  27. return false;
  28. Owned<IRemoteConnection> conn = querySDS().connect("Environment", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
  29. if (!conn)
  30. return false;
  31. VStringBuffer xpath("Software/RoxieCluster[@name=\"%s\"]", process);
  32. return conn->queryRoot()->hasProp(xpath.str());
  33. }
  34. void doWuFileCopy(IClientFileSpray &fs, IEspWULogicalFileCopyInfo &info, const char *logicalname, const char *cluster, bool isRoxie, bool supercopy)
  35. {
  36. try
  37. {
  38. Owned<IClientCopy> req = fs.createCopyRequest();
  39. req->setSourceLogicalName(logicalname);
  40. req->setDestLogicalName(logicalname);
  41. req->setDestGroup(cluster);
  42. req->setSuperCopy(supercopy);
  43. if (isRoxie)
  44. req->setDestGroupRoxie("Yes");
  45. Owned<IClientCopyResponse> resp = fs.Copy(req);
  46. info.setDfuCopyWuid(resp->getResult());
  47. }
  48. catch (IException *e)
  49. {
  50. StringBuffer msg;
  51. info.setDfuCopyError(e->errorMessage(msg).str());
  52. }
  53. }
  54. bool copyWULogicalFiles(IEspContext &context, IConstWorkUnit &cw, const char *cluster, bool copyLocal, IEspWUCopyLogicalClusterFileSections &lfinfo)
  55. {
  56. if (isEmpty(cluster))
  57. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "copyWULogicalFiles Cluster parameter not set.");
  58. Owned<IUserDescriptor> udesc = createUserDescriptor();
  59. udesc->set(context.queryUserId(), context.queryPassword());
  60. IArrayOf<IEspWULogicalFileCopyInfo> foreign;
  61. IArrayOf<IEspWULogicalFileCopyInfo> onCluster;
  62. IArrayOf<IEspWULogicalFileCopyInfo> notOnCluster;
  63. IArrayOf<IEspWULogicalFileCopyInfo> notFound;
  64. Owned<IClientFileSpray> fs;
  65. if (copyLocal)
  66. {
  67. fs.setown(createFileSprayClient());
  68. VStringBuffer url("http://.:%d/FileSpray", 8010);
  69. fs->addServiceUrl(url.str());
  70. }
  71. bool isRoxie = isRoxieProcess(cluster);
  72. Owned<IConstWUGraphIterator> graphs = &cw.getGraphs(GraphTypeActivities);
  73. ForEach(*graphs)
  74. {
  75. Owned <IPropertyTree> xgmml = graphs->query().getXGMMLTree(false);
  76. Owned<IPropertyTreeIterator> iter = xgmml->getElements(".//node");
  77. ForEach(*iter)
  78. {
  79. try
  80. {
  81. IPropertyTree &node = iter->query();
  82. ThorActivityKind kind = (ThorActivityKind) node.getPropInt("att[@name='_kind']/@value", TAKnone);
  83. if(kind==TAKdiskwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite)
  84. continue;
  85. if (node.getPropBool("att[@name='_isSpill']/@value") || node.getPropBool("att[@name='_isTransformSpill']/@value"))
  86. continue;
  87. Owned<IEspWULogicalFileCopyInfo> info = createWULogicalFileCopyInfo();
  88. const char *logicalname = node.queryProp("att[@name='_indexFileName']/@value");
  89. if (logicalname)
  90. info->setIsIndex(true);
  91. else
  92. logicalname = node.queryProp("att[@name='_fileName']/@value");
  93. info->setLogicalName(logicalname);
  94. if (logicalname)
  95. {
  96. if (!strnicmp("~foreign::", logicalname, 10))
  97. foreign.append(*info.getClear());
  98. else
  99. {
  100. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalname, udesc);
  101. if(!df)
  102. notFound.append(*info.getClear());
  103. else if (df->findCluster(cluster)!=NotFound)
  104. {
  105. onCluster.append(*info.getClear());
  106. }
  107. else
  108. {
  109. StringArray clusters;
  110. df->getClusterNames(clusters);
  111. info->setClusters(clusters);
  112. if (copyLocal)
  113. {
  114. StringBuffer wuid;
  115. bool supercopy = queryDistributedFileDirectory().isSuperFile(logicalname, NULL, udesc);
  116. doWuFileCopy(*fs, *info, logicalname, cluster, isRoxie, supercopy);
  117. }
  118. notOnCluster.append(*info.getClear());
  119. }
  120. }
  121. }
  122. }
  123. catch(IException *e)
  124. {
  125. e->Release();
  126. }
  127. }
  128. lfinfo.setClusterName(cluster);
  129. lfinfo.setNotOnCluster(notOnCluster);
  130. lfinfo.setOnCluster(onCluster);
  131. lfinfo.setForeign(foreign);
  132. lfinfo.setNotFound(notFound);
  133. }
  134. return true;
  135. }
  136. void copyWULogicalFilesToTarget(IEspContext &context, IConstWUClusterInfo &clusterInfo, IConstWorkUnit &cw, IArrayOf<IConstWUCopyLogicalClusterFileSections> &clusterfiles, bool doLocalCopy)
  137. {
  138. const StringArray &thors = clusterInfo.getThorProcesses();
  139. ForEachItemIn(i, thors)
  140. {
  141. Owned<IEspWUCopyLogicalClusterFileSections> files = createWUCopyLogicalClusterFileSections();
  142. copyWULogicalFiles(context, cw, thors.item(i), doLocalCopy, *files);
  143. clusterfiles.append(*files.getClear());
  144. }
  145. SCMStringBuffer roxie;
  146. clusterInfo.getRoxieProcess(roxie);
  147. if (roxie.length())
  148. {
  149. Owned<IEspWUCopyLogicalClusterFileSections> files = createWUCopyLogicalClusterFileSections();
  150. copyWULogicalFiles(context, cw, roxie.str(), doLocalCopy, *files);
  151. clusterfiles.append(*files.getClear());
  152. }
  153. }
  154. bool CWsWorkunitsEx::onWUCopyLogicalFiles(IEspContext &context, IEspWUCopyLogicalFilesRequest &req, IEspWUCopyLogicalFilesResponse &resp)
  155. {
  156. if (isEmpty(req.getWuid()))
  157. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "WUCopyLogicalFiles WUID parameter not set.");
  158. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  159. Owned<IConstWorkUnit> cw = factory->openWorkUnit(req.getWuid(), false);
  160. if (!cw)
  161. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s", req.getWuid());
  162. resp.setWuid(req.getWuid());
  163. SCMStringBuffer cluster;
  164. if (notEmpty(req.getCluster()))
  165. cluster.set(req.getCluster());
  166. else
  167. cw->getClusterName(cluster);
  168. Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster.str());
  169. IArrayOf<IConstWUCopyLogicalClusterFileSections> clusterfiles;
  170. copyWULogicalFilesToTarget(context, *clusterInfo, *cw, clusterfiles, req.getCopyLocal());
  171. resp.setClusterFiles(clusterfiles);
  172. return true;
  173. }
  174. bool CWsWorkunitsEx::onWUPublishWorkunit(IEspContext &context, IEspWUPublishWorkunitRequest & req, IEspWUPublishWorkunitResponse & resp)
  175. {
  176. if (isEmpty(req.getWuid()))
  177. throw MakeStringException(ECLWATCH_NO_WUID_SPECIFIED,"No Workunit ID has been specified.");
  178. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  179. Owned<IConstWorkUnit> cw = factory->openWorkUnit(req.getWuid(), false);
  180. if (!cw)
  181. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot find the workunit %s", req.getWuid());
  182. resp.setWuid(req.getWuid());
  183. SCMStringBuffer queryName;
  184. if (notEmpty(req.getJobName()))
  185. queryName.set(req.getJobName());
  186. else
  187. cw->getJobName(queryName).str();
  188. SCMStringBuffer cluster;
  189. if (notEmpty(req.getCluster()))
  190. cluster.set(req.getCluster());
  191. else
  192. cw->getClusterName(cluster);
  193. Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster.str());
  194. SCMStringBuffer queryset;
  195. clusterInfo->getQuerySetName(queryset);
  196. WorkunitUpdate wu(&cw->lock());
  197. if (notEmpty(req.getJobName()))
  198. wu->setJobName(req.getJobName());
  199. StringBuffer queryId;
  200. addQueryToQuerySet(wu, queryset.str(), queryName.str(), NULL, (WUQueryActivationOptions)req.getActivate(), queryId);
  201. wu->commit();
  202. wu.clear();
  203. if (queryId.length())
  204. resp.setQueryId(queryId.str());
  205. resp.setQueryName(queryName.str());
  206. resp.setQuerySet(queryset.str());
  207. if (req.getCopyLocal() || req.getShowFiles())
  208. {
  209. IArrayOf<IConstWUCopyLogicalClusterFileSections> clusterfiles;
  210. copyWULogicalFilesToTarget(context, *clusterInfo, *cw, clusterfiles, req.getCopyLocal());
  211. resp.setClusterFiles(clusterfiles);
  212. }
  213. return true;
  214. }
  215. bool CWsWorkunitsEx::onWUQuerysets(IEspContext &context, IEspWUQuerysetsRequest & req, IEspWUQuerysetsResponse & resp)
  216. {
  217. Owned<IPropertyTree> queryRegistry = getQueryRegistryRoot();
  218. if (!queryRegistry)
  219. return false;
  220. IArrayOf<IEspQuerySet> querySets;
  221. Owned<IPropertyTreeIterator> it = queryRegistry->getElements("QuerySet");
  222. ForEach(*it)
  223. {
  224. Owned<IEspQuerySet> qs = createQuerySet("", "");
  225. qs->setQuerySetName(it->query().queryProp("@id"));
  226. querySets.append(*qs.getClear());
  227. }
  228. resp.setQuerysets(querySets);
  229. return true;
  230. }
  231. bool CWsWorkunitsEx::onWUQuerysetDetails(IEspContext &context, IEspWUQuerySetDetailsRequest & req, IEspWUQuerySetDetailsResponse & resp)
  232. {
  233. resp.setQuerySetName(req.getQuerySetName());
  234. Owned<IPropertyTree> registry = getQueryRegistry(req.getQuerySetName(), true);
  235. if (!registry)
  236. return false;
  237. IArrayOf<IEspQuerySetQuery> querySetQueries;
  238. Owned<IPropertyTreeIterator> queries = registry->getElements("Query");
  239. ForEach(*queries)
  240. {
  241. IPropertyTree &query = queries->query();
  242. Owned<IEspQuerySetQuery> q = createQuerySetQuery("", "");
  243. q->setId(query.queryProp("@id"));
  244. q->setName(query.queryProp("@name"));
  245. q->setDll(query.queryProp("@dll"));
  246. q->setWuid(query.queryProp("@wuid"));
  247. q->setSuspended(query.getPropBool("@suspended", false));
  248. querySetQueries.append(*q.getLink());
  249. }
  250. resp.setQuerysetQueries(querySetQueries);
  251. IArrayOf<IEspQuerySetAlias> querySetAliases;
  252. Owned<IPropertyTreeIterator> aliases = registry->getElements("Alias");
  253. ForEach(*aliases)
  254. {
  255. IPropertyTree &alias = aliases->query();
  256. Owned<IEspQuerySetAlias> a = createQuerySetAlias("", "");
  257. a->setName(alias.queryProp("@name"));
  258. a->setId(alias.queryProp("@id"));
  259. querySetAliases.append(*a.getClear());
  260. }
  261. resp.setQuerysetAliases(querySetAliases);
  262. return true;
  263. }
  264. bool CWsWorkunitsEx::onWUQuerysetQueryAction(IEspContext &context, IEspWUQuerySetQueryActionRequest & req, IEspWUQuerySetQueryActionResponse & resp)
  265. {
  266. resp.setQuerySetName(req.getQuerySetName());
  267. resp.setAction(req.getAction());
  268. if (isEmpty(req.getQuerySetName()))
  269. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Queryset name required");
  270. Owned<IPropertyTree> queryset = getQueryRegistry(req.getQuerySetName(), true);
  271. if (!queryset)
  272. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySetName());
  273. IArrayOf<IEspQuerySetQueryActionResult> results;
  274. ForEachItemIn(i, req.getQueries())
  275. {
  276. IConstQuerySetQueryActionItem& item=req.getQueries().item(i);
  277. Owned<IEspQuerySetQueryActionResult> result = createQuerySetQueryActionResult();
  278. try
  279. {
  280. VStringBuffer xpath("Query[@id='%s']", item.getQueryId());
  281. IPropertyTree *query = queryset->queryPropTree(xpath.str());
  282. if (!query)
  283. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query %s/%s not found.", req.getQuerySetName(), item.getQueryId());
  284. switch (req.getAction())
  285. {
  286. case CQuerySetQueryActionTypes_ToggleSuspend:
  287. setQuerySuspendedState(queryset, item.getQueryId(), !item.getClientState().getSuspended());
  288. break;
  289. case CQuerySetQueryActionTypes_Suspend:
  290. setQuerySuspendedState(queryset, item.getQueryId(), true);
  291. break;
  292. case CQuerySetQueryActionTypes_Unsuspend:
  293. setQuerySuspendedState(queryset, item.getQueryId(), false);
  294. break;
  295. case CQuerySetQueryActionTypes_Activate:
  296. setQueryAlias(queryset, query->queryProp("@name"), item.getQueryId());
  297. break;
  298. case CQuerySetQueryActionTypes_Delete:
  299. removeAliasesFromNamedQuery(queryset, item.getQueryId());
  300. removeNamedQuery(queryset, item.getQueryId());
  301. break;
  302. case CQuerySetQueryActionTypes_RemoveAllAliases:
  303. removeAliasesFromNamedQuery(queryset, item.getQueryId());
  304. break;
  305. }
  306. result->setSuccess(true);
  307. query = queryset->queryPropTree(xpath.str()); // refresh
  308. if (query)
  309. result->setSuspended(query->getPropBool("@suspended"));
  310. }
  311. catch(IException *e)
  312. {
  313. StringBuffer msg;
  314. result->setMessage(e->errorMessage(msg).str());
  315. result->setCode(e->errorCode());
  316. result->setSuccess(false);
  317. }
  318. results.append(*result.getClear());
  319. }
  320. resp.setResults(results);
  321. return true;
  322. }
  323. bool CWsWorkunitsEx::onWUQuerysetAliasAction(IEspContext &context, IEspWUQuerySetAliasActionRequest &req, IEspWUQuerySetAliasActionResponse &resp)
  324. {
  325. resp.setQuerySetName(req.getQuerySetName());
  326. resp.setAction(req.getAction());
  327. if (isEmpty(req.getQuerySetName()))
  328. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Queryset name required");
  329. Owned<IPropertyTree> queryset = getQueryRegistry(req.getQuerySetName(), true);
  330. if (!queryset)
  331. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySetName());
  332. IArrayOf<IEspQuerySetAliasActionResult> results;
  333. ForEachItemIn(i, req.getAliases())
  334. {
  335. IConstQuerySetAliasActionItem& item=req.getAliases().item(i);
  336. Owned<IEspQuerySetAliasActionResult> result = createQuerySetAliasActionResult();
  337. try
  338. {
  339. VStringBuffer xpath("Alias[@name='%s']", item.getName());
  340. IPropertyTree *alias = queryset->queryPropTree(xpath.str());
  341. if (!alias)
  342. throw MakeStringException(ECLWATCH_ALIAS_NOT_FOUND, "Alias %s/%s not found.", req.getQuerySetName(), item.getName());
  343. switch (req.getAction())
  344. {
  345. case CQuerySetAliasActionTypes_Deactivate:
  346. removeQuerySetAlias(req.getQuerySetName(), item.getName());
  347. break;
  348. }
  349. result->setSuccess(true);
  350. }
  351. catch(IException *e)
  352. {
  353. StringBuffer msg;
  354. result->setMessage(e->errorMessage(msg).str());
  355. result->setCode(e->errorCode());
  356. result->setSuccess(false);
  357. }
  358. results.append(*result.getClear());
  359. }
  360. resp.setResults(results);
  361. return true;
  362. }