ws_workunitsQuerySets.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "ws_workunitsService.hpp"
  15. #include "ws_fs.hpp"
  16. #include "jlib.hpp"
  17. #include "daclient.hpp"
  18. #include "dalienv.hpp"
  19. #include "dadfs.hpp"
  20. #include "dfuwu.hpp"
  21. #include "eclhelper.hpp"
  22. const unsigned roxieQueryRoxieTimeOut = 60000;
  23. #define SDS_LOCK_TIMEOUT (5*60*1000) // 5mins, 30s a bit short
  24. bool isRoxieProcess(const char *process)
  25. {
  26. if (!process)
  27. return false;
  28. Owned<IRemoteConnection> conn = querySDS().connect("Environment", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
  29. if (!conn)
  30. return false;
  31. VStringBuffer xpath("Software/RoxieCluster[@name=\"%s\"]", process);
  32. return conn->queryRoot()->hasProp(xpath.str());
  33. }
  34. void doWuFileCopy(IClientFileSpray &fs, IEspWULogicalFileCopyInfo &info, const char *logicalname, const char *cluster, bool isRoxie, bool supercopy)
  35. {
  36. try
  37. {
  38. Owned<IClientCopy> req = fs.createCopyRequest();
  39. req->setSourceLogicalName(logicalname);
  40. req->setDestLogicalName(logicalname);
  41. req->setDestGroup(cluster);
  42. req->setSuperCopy(supercopy);
  43. if (isRoxie)
  44. req->setDestGroupRoxie("Yes");
  45. Owned<IClientCopyResponse> resp = fs.Copy(req);
  46. info.setDfuCopyWuid(resp->getResult());
  47. }
  48. catch (IException *e)
  49. {
  50. StringBuffer msg;
  51. info.setDfuCopyError(e->errorMessage(msg).str());
  52. }
  53. }
  54. bool copyWULogicalFiles(IEspContext &context, IConstWorkUnit &cw, const char *cluster, bool copyLocal, IEspWUCopyLogicalClusterFileSections &lfinfo)
  55. {
  56. if (isEmpty(cluster))
  57. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "copyWULogicalFiles Cluster parameter not set.");
  58. Owned<IUserDescriptor> udesc = createUserDescriptor();
  59. udesc->set(context.queryUserId(), context.queryPassword());
  60. IArrayOf<IEspWULogicalFileCopyInfo> foreign;
  61. IArrayOf<IEspWULogicalFileCopyInfo> onCluster;
  62. IArrayOf<IEspWULogicalFileCopyInfo> notOnCluster;
  63. IArrayOf<IEspWULogicalFileCopyInfo> notFound;
  64. Owned<IClientFileSpray> fs;
  65. if (copyLocal)
  66. {
  67. fs.setown(createFileSprayClient());
  68. VStringBuffer url("http://.:%d/FileSpray", 8010);
  69. fs->addServiceUrl(url.str());
  70. }
  71. bool isRoxie = isRoxieProcess(cluster);
  72. Owned<IConstWUGraphIterator> graphs = &cw.getGraphs(GraphTypeActivities);
  73. ForEach(*graphs)
  74. {
  75. Owned <IPropertyTree> xgmml = graphs->query().getXGMMLTree(false);
  76. Owned<IPropertyTreeIterator> iter = xgmml->getElements(".//node");
  77. ForEach(*iter)
  78. {
  79. try
  80. {
  81. IPropertyTree &node = iter->query();
  82. ThorActivityKind kind = (ThorActivityKind) node.getPropInt("att[@name='_kind']/@value", TAKnone);
  83. if(kind==TAKdiskwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite)
  84. continue;
  85. if (node.getPropBool("att[@name='_isSpill']/@value") || node.getPropBool("att[@name='_isTransformSpill']/@value"))
  86. continue;
  87. Owned<IEspWULogicalFileCopyInfo> info = createWULogicalFileCopyInfo();
  88. const char *logicalname = node.queryProp("att[@name='_indexFileName']/@value");
  89. if (logicalname)
  90. info->setIsIndex(true);
  91. else
  92. logicalname = node.queryProp("att[@name='_fileName']/@value");
  93. info->setLogicalName(logicalname);
  94. if (logicalname)
  95. {
  96. if (!strnicmp("~foreign::", logicalname, 10))
  97. foreign.append(*info.getClear());
  98. else
  99. {
  100. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalname, udesc);
  101. if(!df)
  102. notFound.append(*info.getClear());
  103. else if (df->findCluster(cluster)!=NotFound)
  104. {
  105. onCluster.append(*info.getClear());
  106. }
  107. else
  108. {
  109. StringArray clusters;
  110. df->getClusterNames(clusters);
  111. info->setClusters(clusters);
  112. if (copyLocal)
  113. {
  114. StringBuffer wuid;
  115. bool supercopy = queryDistributedFileDirectory().isSuperFile(logicalname, NULL, udesc);
  116. doWuFileCopy(*fs, *info, logicalname, cluster, isRoxie, supercopy);
  117. }
  118. notOnCluster.append(*info.getClear());
  119. }
  120. }
  121. }
  122. }
  123. catch(IException *e)
  124. {
  125. e->Release();
  126. }
  127. }
  128. lfinfo.setClusterName(cluster);
  129. lfinfo.setNotOnCluster(notOnCluster);
  130. lfinfo.setOnCluster(onCluster);
  131. lfinfo.setForeign(foreign);
  132. lfinfo.setNotFound(notFound);
  133. }
  134. return true;
  135. }
  136. void copyWULogicalFilesToTarget(IEspContext &context, IConstWUClusterInfo &clusterInfo, IConstWorkUnit &cw, IArrayOf<IConstWUCopyLogicalClusterFileSections> &clusterfiles, bool doLocalCopy)
  137. {
  138. const StringArray &thors = clusterInfo.getThorProcesses();
  139. ForEachItemIn(i, thors)
  140. {
  141. Owned<IEspWUCopyLogicalClusterFileSections> files = createWUCopyLogicalClusterFileSections();
  142. copyWULogicalFiles(context, cw, thors.item(i), doLocalCopy, *files);
  143. clusterfiles.append(*files.getClear());
  144. }
  145. SCMStringBuffer roxie;
  146. clusterInfo.getRoxieProcess(roxie);
  147. if (roxie.length())
  148. {
  149. Owned<IEspWUCopyLogicalClusterFileSections> files = createWUCopyLogicalClusterFileSections();
  150. copyWULogicalFiles(context, cw, roxie.str(), doLocalCopy, *files);
  151. clusterfiles.append(*files.getClear());
  152. }
  153. }
  154. void gatherQuerySetQueryDetails(IPropertyTree *query, IEspQuerySetQuery *queryInfo)
  155. {
  156. queryInfo->setId(query->queryProp("@id"));
  157. queryInfo->setName(query->queryProp("@name"));
  158. queryInfo->setDll(query->queryProp("@dll"));
  159. queryInfo->setWuid(query->queryProp("@wuid"));
  160. queryInfo->setSuspended(query->getPropBool("@suspended", false));
  161. }
  162. void gatherQuerySetAliasDetails(IPropertyTree *alias, IEspQuerySetAlias *aliasInfo)
  163. {
  164. aliasInfo->setName(alias->queryProp("@name"));
  165. aliasInfo->setId(alias->queryProp("@id"));
  166. }
  167. void retrieveAllQuerysetDetails(IPropertyTree *registry, IEspWUQuerySetDetailsResponse &resp)
  168. {
  169. IArrayOf<IEspQuerySetQuery> querySetQueries;
  170. Owned<IPropertyTreeIterator> queries = registry->getElements("Query");
  171. ForEach(*queries)
  172. {
  173. IPropertyTree &query = queries->query();
  174. Owned<IEspQuerySetQuery> q = createQuerySetQuery("", "");
  175. gatherQuerySetQueryDetails(&query, q);
  176. querySetQueries.append(*q.getClear());
  177. }
  178. resp.setQuerysetQueries(querySetQueries);
  179. IArrayOf<IEspQuerySetAlias> querySetAliases;
  180. Owned<IPropertyTreeIterator> aliases = registry->getElements("Alias");
  181. ForEach(*aliases)
  182. {
  183. IPropertyTree &alias = aliases->query();
  184. Owned<IEspQuerySetAlias> a = createQuerySetAlias("", "");
  185. gatherQuerySetAliasDetails(&alias, a);
  186. querySetAliases.append(*a.getClear());
  187. }
  188. }
  189. void retrieveQuerysetDetailsFromAlias(IPropertyTree *registry, const char *filter, IEspWUQuerySetDetailsResponse &resp)
  190. {
  191. StringBuffer xpath;
  192. xpath.append("Alias[@name='").append(filter).append("']");
  193. IPropertyTree *alias = registry->queryPropTree(xpath);
  194. if (!alias)
  195. {
  196. DBGLOG("Alias %s not found", filter);
  197. return;
  198. }
  199. IArrayOf<IEspQuerySetAlias> querySetAliases;
  200. Owned<IEspQuerySetAlias> a = createQuerySetAlias("", "");
  201. gatherQuerySetAliasDetails(alias, a);
  202. querySetAliases.append(*a.getClear());
  203. xpath.clear().append("Query[@id='").append(a->getId()).append("']");
  204. IPropertyTree *query = registry->queryPropTree(xpath);
  205. if (!query)
  206. {
  207. DBGLOG("No matching Query %s found for Alias %s", a->getId(), filter);
  208. return;
  209. }
  210. IArrayOf<IEspQuerySetQuery> querySetQueries;
  211. Owned<IEspQuerySetQuery> q = createQuerySetQuery("", "");
  212. gatherQuerySetQueryDetails(query, q);
  213. querySetQueries.append(*q.getClear());
  214. resp.setQuerysetQueries(querySetQueries);
  215. }
  216. void retrieveQuerysetDetailsFromQuery(IPropertyTree *registry, const char *filter, const char *filterType, IEspWUQuerySetDetailsResponse &resp)
  217. {
  218. StringBuffer xpath;
  219. xpath.clear().append("Query[@").append(filterType).append("='").append(filter).append("']");
  220. IPropertyTree *query = registry->queryPropTree(xpath);
  221. if (!query)
  222. {
  223. DBGLOG("No matching Query %s found for %s", filter, filterType);
  224. return;
  225. }
  226. IArrayOf<IEspQuerySetQuery> querySetQueries;
  227. Owned<IEspQuerySetQuery> q = createQuerySetQuery("", "");
  228. gatherQuerySetQueryDetails(query, q);
  229. querySetQueries.append(*q.getClear());
  230. resp.setQuerysetQueries(querySetQueries);
  231. IArrayOf<IEspQuerySetAlias> querySetAliases;
  232. xpath.clear().append("Alias[@id='").append(q->getId()).append("']");
  233. Owned<IPropertyTreeIterator> aliases = registry->getElements(xpath.str());
  234. ForEach(*aliases)
  235. {
  236. IPropertyTree &alias = aliases->query();
  237. Owned<IEspQuerySetAlias> a = createQuerySetAlias("", "");
  238. gatherQuerySetAliasDetails(&alias, a);
  239. querySetAliases.append(*a.getClear());
  240. }
  241. }
  242. bool CWsWorkunitsEx::onWUCopyLogicalFiles(IEspContext &context, IEspWUCopyLogicalFilesRequest &req, IEspWUCopyLogicalFilesResponse &resp)
  243. {
  244. if (isEmpty(req.getWuid()))
  245. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "WUCopyLogicalFiles WUID parameter not set.");
  246. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  247. Owned<IConstWorkUnit> cw = factory->openWorkUnit(req.getWuid(), false);
  248. if (!cw)
  249. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s", req.getWuid());
  250. resp.setWuid(req.getWuid());
  251. SCMStringBuffer cluster;
  252. if (notEmpty(req.getCluster()))
  253. cluster.set(req.getCluster());
  254. else
  255. cw->getClusterName(cluster);
  256. Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster.str());
  257. IArrayOf<IConstWUCopyLogicalClusterFileSections> clusterfiles;
  258. copyWULogicalFilesToTarget(context, *clusterInfo, *cw, clusterfiles, req.getCopyLocal());
  259. resp.setClusterFiles(clusterfiles);
  260. return true;
  261. }
  262. bool CWsWorkunitsEx::onWUPublishWorkunit(IEspContext &context, IEspWUPublishWorkunitRequest & req, IEspWUPublishWorkunitResponse & resp)
  263. {
  264. if (isEmpty(req.getWuid()))
  265. throw MakeStringException(ECLWATCH_NO_WUID_SPECIFIED,"No Workunit ID has been specified.");
  266. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  267. Owned<IConstWorkUnit> cw = factory->openWorkUnit(req.getWuid(), false);
  268. if (!cw)
  269. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot find the workunit %s", req.getWuid());
  270. resp.setWuid(req.getWuid());
  271. SCMStringBuffer queryName;
  272. if (notEmpty(req.getJobName()))
  273. queryName.set(req.getJobName());
  274. else
  275. cw->getJobName(queryName).str();
  276. if (!queryName.length())
  277. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Query/Job name not defined for publishing workunit %s", req.getWuid());
  278. SCMStringBuffer cluster;
  279. if (notEmpty(req.getCluster()))
  280. cluster.set(req.getCluster());
  281. else
  282. cw->getClusterName(cluster);
  283. if (!cluster.length())
  284. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Cluster name not defined for publishing workunit %s", req.getWuid());
  285. Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster.str());
  286. SCMStringBuffer queryset;
  287. clusterInfo->getQuerySetName(queryset);
  288. WorkunitUpdate wu(&cw->lock());
  289. if (notEmpty(req.getJobName()))
  290. wu->setJobName(req.getJobName());
  291. StringBuffer queryId;
  292. addQueryToQuerySet(wu, queryset.str(), queryName.str(), NULL, (WUQueryActivationOptions)req.getActivate(), queryId);
  293. wu->commit();
  294. wu.clear();
  295. if (queryId.length())
  296. resp.setQueryId(queryId.str());
  297. resp.setQueryName(queryName.str());
  298. resp.setQuerySet(queryset.str());
  299. if (req.getCopyLocal() || req.getShowFiles())
  300. {
  301. IArrayOf<IConstWUCopyLogicalClusterFileSections> clusterfiles;
  302. copyWULogicalFilesToTarget(context, *clusterInfo, *cw, clusterfiles, req.getCopyLocal());
  303. resp.setClusterFiles(clusterfiles);
  304. }
  305. return true;
  306. }
  307. bool CWsWorkunitsEx::onWUQuerysets(IEspContext &context, IEspWUQuerysetsRequest & req, IEspWUQuerysetsResponse & resp)
  308. {
  309. Owned<IPropertyTree> queryRegistry = getQueryRegistryRoot();
  310. if (!queryRegistry)
  311. return false;
  312. IArrayOf<IEspQuerySet> querySets;
  313. Owned<IPropertyTreeIterator> it = queryRegistry->getElements("QuerySet");
  314. ForEach(*it)
  315. {
  316. Owned<IEspQuerySet> qs = createQuerySet("", "");
  317. qs->setQuerySetName(it->query().queryProp("@id"));
  318. querySets.append(*qs.getClear());
  319. }
  320. resp.setQuerysets(querySets);
  321. return true;
  322. }
  323. bool CWsWorkunitsEx::onWUQuerysetDetails(IEspContext &context, IEspWUQuerySetDetailsRequest & req, IEspWUQuerySetDetailsResponse & resp)
  324. {
  325. resp.setQuerySetName(req.getQuerySetName());
  326. Owned<IPropertyTree> registry = getQueryRegistry(req.getQuerySetName(), true);
  327. if (!registry)
  328. return false;
  329. const char *filterType = req.getFilterTypeAsString();
  330. if (strieq(filterType, "All"))
  331. retrieveAllQuerysetDetails(registry, resp);
  332. else if (strieq(filterType, "Alias"))
  333. {
  334. const char *filter = req.getFilter();
  335. if (!filter || !*filter)
  336. {
  337. DBGLOG("Need to specify an Alias name for lookup");
  338. return false;
  339. }
  340. retrieveQuerysetDetailsFromAlias(registry, filter, resp);
  341. }
  342. else
  343. {
  344. const char *filter = req.getFilter();
  345. if (!filter || !*filter)
  346. {
  347. DBGLOG("Need to specify an Alias name for lookup");
  348. return false;
  349. }
  350. if (strieq(filterType, "Id"))
  351. retrieveQuerysetDetailsFromQuery(registry, filter, "id", resp);
  352. else if (strieq(filterType, "Name"))
  353. retrieveQuerysetDetailsFromQuery(registry, filter, "name", resp);
  354. else
  355. {
  356. DBGLOG("invalid filterType %s specified", filter);
  357. return false;
  358. }
  359. }
  360. return true;
  361. }
  362. bool CWsWorkunitsEx::onWUQuerysetQueryAction(IEspContext &context, IEspWUQuerySetQueryActionRequest & req, IEspWUQuerySetQueryActionResponse & resp)
  363. {
  364. resp.setQuerySetName(req.getQuerySetName());
  365. resp.setAction(req.getAction());
  366. if (isEmpty(req.getQuerySetName()))
  367. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Queryset name required");
  368. Owned<IPropertyTree> queryset = getQueryRegistry(req.getQuerySetName(), true);
  369. if (!queryset)
  370. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySetName());
  371. IArrayOf<IEspQuerySetQueryActionResult> results;
  372. ForEachItemIn(i, req.getQueries())
  373. {
  374. IConstQuerySetQueryActionItem& item=req.getQueries().item(i);
  375. Owned<IEspQuerySetQueryActionResult> result = createQuerySetQueryActionResult();
  376. try
  377. {
  378. VStringBuffer xpath("Query[@id='%s']", item.getQueryId());
  379. IPropertyTree *query = queryset->queryPropTree(xpath.str());
  380. if (!query)
  381. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query %s/%s not found.", req.getQuerySetName(), item.getQueryId());
  382. switch (req.getAction())
  383. {
  384. case CQuerySetQueryActionTypes_ToggleSuspend:
  385. setQuerySuspendedState(queryset, item.getQueryId(), !item.getClientState().getSuspended());
  386. break;
  387. case CQuerySetQueryActionTypes_Suspend:
  388. setQuerySuspendedState(queryset, item.getQueryId(), true);
  389. break;
  390. case CQuerySetQueryActionTypes_Unsuspend:
  391. setQuerySuspendedState(queryset, item.getQueryId(), false);
  392. break;
  393. case CQuerySetQueryActionTypes_Activate:
  394. setQueryAlias(queryset, query->queryProp("@name"), item.getQueryId());
  395. break;
  396. case CQuerySetQueryActionTypes_Delete:
  397. removeAliasesFromNamedQuery(queryset, item.getQueryId());
  398. removeNamedQuery(queryset, item.getQueryId());
  399. break;
  400. case CQuerySetQueryActionTypes_RemoveAllAliases:
  401. removeAliasesFromNamedQuery(queryset, item.getQueryId());
  402. break;
  403. }
  404. result->setSuccess(true);
  405. query = queryset->queryPropTree(xpath.str()); // refresh
  406. if (query)
  407. result->setSuspended(query->getPropBool("@suspended"));
  408. }
  409. catch(IException *e)
  410. {
  411. StringBuffer msg;
  412. result->setMessage(e->errorMessage(msg).str());
  413. result->setCode(e->errorCode());
  414. result->setSuccess(false);
  415. }
  416. results.append(*result.getClear());
  417. }
  418. resp.setResults(results);
  419. return true;
  420. }
  421. bool CWsWorkunitsEx::onWUQuerysetAliasAction(IEspContext &context, IEspWUQuerySetAliasActionRequest &req, IEspWUQuerySetAliasActionResponse &resp)
  422. {
  423. resp.setQuerySetName(req.getQuerySetName());
  424. resp.setAction(req.getAction());
  425. if (isEmpty(req.getQuerySetName()))
  426. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Queryset name required");
  427. Owned<IPropertyTree> queryset = getQueryRegistry(req.getQuerySetName(), true);
  428. if (!queryset)
  429. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySetName());
  430. IArrayOf<IEspQuerySetAliasActionResult> results;
  431. ForEachItemIn(i, req.getAliases())
  432. {
  433. IConstQuerySetAliasActionItem& item=req.getAliases().item(i);
  434. Owned<IEspQuerySetAliasActionResult> result = createQuerySetAliasActionResult();
  435. try
  436. {
  437. VStringBuffer xpath("Alias[@name='%s']", item.getName());
  438. IPropertyTree *alias = queryset->queryPropTree(xpath.str());
  439. if (!alias)
  440. throw MakeStringException(ECLWATCH_ALIAS_NOT_FOUND, "Alias %s/%s not found.", req.getQuerySetName(), item.getName());
  441. switch (req.getAction())
  442. {
  443. case CQuerySetAliasActionTypes_Deactivate:
  444. removeQuerySetAlias(req.getQuerySetName(), item.getName());
  445. break;
  446. }
  447. result->setSuccess(true);
  448. }
  449. catch(IException *e)
  450. {
  451. StringBuffer msg;
  452. result->setMessage(e->errorMessage(msg).str());
  453. result->setCode(e->errorCode());
  454. result->setSuccess(false);
  455. }
  456. results.append(*result.getClear());
  457. }
  458. resp.setResults(results);
  459. return true;
  460. }