ws_workunitsQuerySets.cpp 47 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "ws_workunitsService.hpp"
  14. #include "ws_fs.hpp"
  15. #include "jlib.hpp"
  16. #include "daclient.hpp"
  17. #include "dalienv.hpp"
  18. #include "dadfs.hpp"
  19. #include "dfuwu.hpp"
  20. #include "eclhelper.hpp"
  21. #include "roxiecontrol.hpp"
  22. #include "dfuutil.hpp"
  23. #include "dautils.hpp"
  24. #include "referencedfilelist.hpp"
  25. #define DALI_FILE_LOOKUP_TIMEOUT (1000*15*1) // 15 seconds
  26. const unsigned roxieQueryRoxieTimeOut = 60000;
  27. #define SDS_LOCK_TIMEOUT (5*60*1000) // 5mins, 30s a bit short
  28. bool isRoxieProcess(const char *process)
  29. {
  30. if (!process)
  31. return false;
  32. Owned<IRemoteConnection> conn = querySDS().connect("Environment", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
  33. if (!conn)
  34. return false;
  35. VStringBuffer xpath("Software/RoxieCluster[@name=\"%s\"]", process);
  36. return conn->queryRoot()->hasProp(xpath.str());
  37. }
  38. void checkUseEspOrDaliIP(SocketEndpoint &ep, const char *ip, const char *esp)
  39. {
  40. if (!ip || !*ip)
  41. return;
  42. ep.set(ip, 7070);
  43. if (ep.isLoopBack() || *ip=='.' || (ip[0]=='0' && ip[1]=='.'))
  44. ep.ipset(esp);
  45. }
  46. void fetchRemoteWorkunit(IEspContext &context, const char *netAddress, const char *queryset, const char *query, const char *wuid, StringBuffer &name, StringBuffer &xml, StringBuffer &dllname, MemoryBuffer &dll, StringBuffer &daliServer)
  47. {
  48. Owned<IClientWsWorkunits> ws;
  49. ws.setown(createWsWorkunitsClient());
  50. VStringBuffer url("http://%s%s/WsWorkunits", netAddress, (!strchr(netAddress, ':')) ? ":8010" : "");
  51. ws->addServiceUrl(url.str());
  52. if (context.queryUserId() && *context.queryUserId())
  53. ws->setUsernameToken(context.queryUserId(), context.queryPassword(), NULL);
  54. Owned<IClientWULogFileRequest> req = ws->createWUFileRequest();
  55. if (queryset && *queryset)
  56. req->setQuerySet(queryset);
  57. if (query && *query)
  58. req->setQuery(query);
  59. if (wuid && *wuid)
  60. req->setWuid(wuid);
  61. req->setType("xml");
  62. Owned<IClientWULogFileResponse> resp = ws->WUFile(req);
  63. if (!resp || resp->getExceptions().ordinality() || !resp->getThefile().length())
  64. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Cannot retrieve remote workunit");
  65. xml.append(resp->getThefile().length(), resp->getThefile().toByteArray());
  66. req->setType("dll");
  67. resp.setown(ws->WUFile(req));
  68. if (!resp || resp->getExceptions().ordinality() || !resp->getThefile().length())
  69. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Cannot retrieve remote workunit shared object");
  70. dll.append(resp->getThefile());
  71. dllname.append(resp->getFileName());
  72. name.append(resp->getQueryName());
  73. SocketEndpoint ep;
  74. checkUseEspOrDaliIP(ep, resp->getDaliServer(), netAddress);
  75. if (!ep.isNull())
  76. ep.getUrlStr(daliServer);
  77. }
  78. void doWuFileCopy(IClientFileSpray &fs, IEspWULogicalFileCopyInfo &info, const char *logicalname, const char *cluster, bool isRoxie, bool supercopy)
  79. {
  80. try
  81. {
  82. Owned<IClientCopy> req = fs.createCopyRequest();
  83. req->setSourceLogicalName(logicalname);
  84. req->setDestLogicalName(logicalname);
  85. req->setDestGroup(cluster);
  86. req->setSuperCopy(supercopy);
  87. if (isRoxie)
  88. req->setDestGroupRoxie("Yes");
  89. Owned<IClientCopyResponse> resp = fs.Copy(req);
  90. info.setDfuCopyWuid(resp->getResult());
  91. }
  92. catch (IException *e)
  93. {
  94. StringBuffer msg;
  95. info.setDfuCopyError(e->errorMessage(msg).str());
  96. }
  97. }
  98. bool copyWULogicalFiles(IEspContext &context, IConstWorkUnit &cw, const char *cluster, bool copyLocal, IEspWUCopyLogicalClusterFileSections &lfinfo)
  99. {
  100. if (isEmpty(cluster))
  101. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "copyWULogicalFiles Cluster parameter not set.");
  102. Owned<IUserDescriptor> udesc = createUserDescriptor();
  103. udesc->set(context.queryUserId(), context.queryPassword());
  104. IArrayOf<IEspWULogicalFileCopyInfo> foreign;
  105. IArrayOf<IEspWULogicalFileCopyInfo> onCluster;
  106. IArrayOf<IEspWULogicalFileCopyInfo> notOnCluster;
  107. IArrayOf<IEspWULogicalFileCopyInfo> notFound;
  108. Owned<IClientFileSpray> fs;
  109. if (copyLocal)
  110. {
  111. fs.setown(createFileSprayClient());
  112. VStringBuffer url("http://.:%d/FileSpray", 8010);
  113. fs->addServiceUrl(url.str());
  114. }
  115. bool isRoxie = isRoxieProcess(cluster);
  116. Owned<IConstWUGraphIterator> graphs = &cw.getGraphs(GraphTypeActivities);
  117. ForEach(*graphs)
  118. {
  119. Owned <IPropertyTree> xgmml = graphs->query().getXGMMLTree(false);
  120. Owned<IPropertyTreeIterator> iter = xgmml->getElements(".//node");
  121. ForEach(*iter)
  122. {
  123. try
  124. {
  125. IPropertyTree &node = iter->query();
  126. ThorActivityKind kind = (ThorActivityKind) node.getPropInt("att[@name='_kind']/@value", TAKnone);
  127. if(kind==TAKdiskwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite)
  128. continue;
  129. if (node.getPropBool("att[@name='_isSpill']/@value") || node.getPropBool("att[@name='_isTransformSpill']/@value"))
  130. continue;
  131. Owned<IEspWULogicalFileCopyInfo> info = createWULogicalFileCopyInfo();
  132. const char *logicalname = node.queryProp("att[@name='_indexFileName']/@value");
  133. if (logicalname)
  134. info->setIsIndex(true);
  135. else
  136. logicalname = node.queryProp("att[@name='_fileName']/@value");
  137. info->setLogicalName(logicalname);
  138. if (logicalname)
  139. {
  140. if (!strnicmp("~foreign::", logicalname, 10))
  141. foreign.append(*info.getClear());
  142. else
  143. {
  144. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalname, udesc);
  145. if(!df)
  146. notFound.append(*info.getClear());
  147. else if (df->findCluster(cluster)!=NotFound)
  148. {
  149. onCluster.append(*info.getClear());
  150. }
  151. else
  152. {
  153. StringArray clusters;
  154. df->getClusterNames(clusters);
  155. info->setClusters(clusters);
  156. if (copyLocal)
  157. {
  158. StringBuffer wuid;
  159. bool supercopy = queryDistributedFileDirectory().isSuperFile(logicalname, udesc, NULL);
  160. doWuFileCopy(*fs, *info, logicalname, cluster, isRoxie, supercopy);
  161. }
  162. notOnCluster.append(*info.getClear());
  163. }
  164. }
  165. }
  166. }
  167. catch(IException *e)
  168. {
  169. e->Release();
  170. }
  171. }
  172. lfinfo.setClusterName(cluster);
  173. lfinfo.setNotOnCluster(notOnCluster);
  174. lfinfo.setOnCluster(onCluster);
  175. lfinfo.setForeign(foreign);
  176. lfinfo.setNotFound(notFound);
  177. }
  178. return true;
  179. }
  180. void copyWULogicalFilesToTarget(IEspContext &context, IConstWUClusterInfo &clusterInfo, IConstWorkUnit &cw, IArrayOf<IConstWUCopyLogicalClusterFileSections> &clusterfiles, bool doLocalCopy)
  181. {
  182. const StringArray &thors = clusterInfo.getThorProcesses();
  183. ForEachItemIn(i, thors)
  184. {
  185. Owned<IEspWUCopyLogicalClusterFileSections> files = createWUCopyLogicalClusterFileSections();
  186. copyWULogicalFiles(context, cw, thors.item(i), doLocalCopy, *files);
  187. clusterfiles.append(*files.getClear());
  188. }
  189. SCMStringBuffer roxie;
  190. clusterInfo.getRoxieProcess(roxie);
  191. if (roxie.length())
  192. {
  193. Owned<IEspWUCopyLogicalClusterFileSections> files = createWUCopyLogicalClusterFileSections();
  194. copyWULogicalFiles(context, cw, roxie.str(), doLocalCopy, *files);
  195. clusterfiles.append(*files.getClear());
  196. }
  197. }
  198. bool CWsWorkunitsEx::onWUCopyLogicalFiles(IEspContext &context, IEspWUCopyLogicalFilesRequest &req, IEspWUCopyLogicalFilesResponse &resp)
  199. {
  200. StringBuffer wuid = req.getWuid();
  201. checkAndTrimWorkunit("WUCopyLogicalFiles", wuid);
  202. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  203. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  204. if (!cw)
  205. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s", wuid.str());
  206. resp.setWuid(wuid.str());
  207. SCMStringBuffer cluster;
  208. if (notEmpty(req.getCluster()))
  209. cluster.set(req.getCluster());
  210. else
  211. cw->getClusterName(cluster);
  212. if (!isValidCluster(req.getCluster()))
  213. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", cluster.str());
  214. Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster.str());
  215. IArrayOf<IConstWUCopyLogicalClusterFileSections> clusterfiles;
  216. copyWULogicalFilesToTarget(context, *clusterInfo, *cw, clusterfiles, req.getCopyLocal());
  217. resp.setClusterFiles(clusterfiles);
  218. return true;
  219. }
  220. static inline unsigned remainingMsWait(unsigned wait, unsigned start)
  221. {
  222. if (wait==0 || wait==(unsigned)-1)
  223. return wait;
  224. unsigned waited = msTick()-start;
  225. return (wait>waited) ? wait-waited : 0;
  226. }
  227. bool reloadCluster(IConstWUClusterInfo *clusterInfo, unsigned wait)
  228. {
  229. if (0==wait || !clusterInfo || clusterInfo->getPlatform()!=RoxieCluster)
  230. return true;
  231. const SocketEndpointArray &addrs = clusterInfo->getRoxieServers();
  232. if (addrs.length())
  233. {
  234. try
  235. {
  236. Owned<IPropertyTree> result = sendRoxieControlAllNodes(addrs.item(0), "<control:reload/>", false, wait);
  237. const char *status = result->queryProp("Endpoint[1]/Status");
  238. if (!status || !strieq(status, "ok"))
  239. return false;
  240. }
  241. catch(IMultiException *me)
  242. {
  243. StringBuffer err;
  244. DBGLOG("ERROR control:reloading roxie query info %s", me->errorMessage(err.append(me->errorCode()).append(' ')).str());
  245. me->Release();
  246. return false;
  247. }
  248. catch(IException *e)
  249. {
  250. StringBuffer err;
  251. DBGLOG("ERROR control:reloading roxie query info %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
  252. e->Release();
  253. return false;
  254. }
  255. }
  256. return true;
  257. }
  258. bool reloadCluster(const char *cluster, unsigned wait)
  259. {
  260. Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
  261. return (clusterInfo) ? reloadCluster(clusterInfo, wait) : true;
  262. }
  263. static inline void updateQuerySetting(bool ignore, IPropertyTree *queryTree, const char *xpath, int value)
  264. {
  265. if (ignore || !queryTree)
  266. return;
  267. if (value!=0)
  268. queryTree->setPropInt(xpath, value);
  269. else
  270. queryTree->removeProp(xpath);
  271. }
  272. static inline unsigned __int64 memoryLimitUInt64FromString(const char *value)
  273. {
  274. if (!value || !*value || !isdigit(*value))
  275. return 0;
  276. unsigned __int64 result = (*value - '0');
  277. const char *s = value+1;
  278. while (isdigit(*s))
  279. {
  280. result = 10 * result + ((*s) - '0');
  281. s++;
  282. }
  283. if (*s)
  284. {
  285. const char unit = toupper(*s++);
  286. if (*s && !strieq("B", s)) //more?
  287. return 0;
  288. switch (unit)
  289. {
  290. case 'E':
  291. result <<=60;
  292. break;
  293. case 'P':
  294. result <<=50;
  295. break;
  296. case 'T':
  297. result <<=40;
  298. break;
  299. case 'G':
  300. result <<=30;
  301. break;
  302. case 'M':
  303. result <<=20;
  304. break;
  305. case 'K':
  306. result <<=10;
  307. break;
  308. case 'B':
  309. break;
  310. default:
  311. return 0;
  312. }
  313. }
  314. return result;
  315. }
  316. const char memUnitAbbrev[] = {'B', 'K', 'M', 'G', 'T', 'P', 'E'};
  317. #define MAX_MEMUNIT_ABBREV 6
  318. static inline StringBuffer &memoryLimitStringFromUInt64(StringBuffer &s, unsigned __int64 in)
  319. {
  320. if (!in)
  321. return s;
  322. unsigned __int64 value = in;
  323. unsigned char unit = 0;
  324. while (!(value & 0x3FF) && unit < MAX_MEMUNIT_ABBREV)
  325. {
  326. value >>= 10;
  327. unit++;
  328. }
  329. return s.append(value).append(memUnitAbbrev[unit]);
  330. }
  331. static inline void updateMemoryLimitSetting(IPropertyTree *queryTree, const char *value)
  332. {
  333. if (!value || !queryTree)
  334. return;
  335. unsigned __int64 limit = memoryLimitUInt64FromString(value);
  336. if (0==limit)
  337. queryTree->removeProp("@memoryLimit");
  338. else
  339. queryTree->setPropInt64("@memoryLimit", limit);
  340. }
  341. enum QueryPriority {
  342. QueryPriorityNone = -1,
  343. QueryPriorityLow = 0,
  344. QueryPriorityHigh = 1,
  345. QueryPrioritySLA = 2,
  346. QueryPriorityInvalid = 3
  347. };
  348. static inline const char *getQueryPriorityName(int value)
  349. {
  350. switch (value)
  351. {
  352. case QueryPriorityLow:
  353. return "LOW";
  354. case QueryPriorityHigh:
  355. return "HIGH";
  356. case QueryPrioritySLA:
  357. return "SLA";
  358. case QueryPriorityNone:
  359. return "NONE";
  360. }
  361. return "INVALID";
  362. }
  363. static inline void updateQueryPriority(IPropertyTree *queryTree, const char *value)
  364. {
  365. if (!value || !*value || !queryTree)
  366. return;
  367. int priority = QueryPriorityInvalid;
  368. if (strieq("LOW", value))
  369. priority=QueryPriorityLow;
  370. else if (strieq("HIGH", value))
  371. priority=QueryPriorityHigh;
  372. else if (strieq("SLA", value))
  373. priority=QueryPrioritySLA;
  374. else if (strieq("NONE", value))
  375. priority=QueryPriorityNone;
  376. switch (priority)
  377. {
  378. case QueryPriorityInvalid:
  379. break;
  380. case QueryPriorityNone:
  381. queryTree->removeProp("@priority");
  382. break;
  383. default:
  384. queryTree->setPropInt("@priority", priority);
  385. break;
  386. }
  387. }
  388. void copyQueryFilesToCluster(IEspContext &context, IConstWorkUnit *cw, const char *remoteIP, const char *target, const char *queryid, bool overwrite)
  389. {
  390. if (!target || !*target)
  391. return;
  392. SCMStringBuffer process;
  393. Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target);
  394. if (clusterInfo && clusterInfo->getPlatform()==RoxieCluster)
  395. {
  396. clusterInfo->getRoxieProcess(process);
  397. if (!process.length())
  398. return;
  399. Owned<IReferencedFileList> wufiles = createReferencedFileList(context.queryUserId(), context.queryPassword());
  400. Owned<IHpccPackageSet> ps = createPackageSet(process.str());
  401. wufiles->addFilesFromQuery(cw, (ps) ? ps->queryActiveMap(target) : NULL, queryid);
  402. wufiles->resolveFiles(process.str(), remoteIP, !overwrite, true);
  403. wufiles->cloneAllInfo(overwrite, true);
  404. }
  405. }
  406. bool CWsWorkunitsEx::isQuerySuspended(const char* query, IConstWUClusterInfo *clusterInfo, unsigned wait, StringBuffer& errorMessage)
  407. {
  408. try
  409. {
  410. if (0==wait || !clusterInfo || clusterInfo->getPlatform()!=RoxieCluster)
  411. return false;
  412. const SocketEndpointArray &addrs = clusterInfo->getRoxieServers();
  413. if (addrs.length() < 1)
  414. return false;
  415. StringBuffer control;
  416. control.appendf("<control:queries><Query id='%s'/></control:queries>", query);
  417. Owned<IPropertyTree> result = sendRoxieControlAllNodes(addrs.item(0), control.str(), false, wait);
  418. if (!result)
  419. return false;
  420. Owned<IPropertyTreeIterator> suspendedQueries = result->getElements("Endpoint/Queries/Query[@suspended='1']");
  421. if (!suspendedQueries->first())
  422. return false;
  423. errorMessage.set(suspendedQueries->query().queryProp("@error"));
  424. return true;
  425. }
  426. catch(IMultiException *me)
  427. {
  428. StringBuffer err;
  429. DBGLOG("ERROR control:queries roxie query info %s", me->errorMessage(err.append(me->errorCode()).append(' ')).str());
  430. me->Release();
  431. return false;
  432. }
  433. catch(IException *e)
  434. {
  435. StringBuffer err;
  436. DBGLOG("ERROR control:queries roxie query info %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
  437. e->Release();
  438. return false;
  439. }
  440. }
  441. bool CWsWorkunitsEx::onWUPublishWorkunit(IEspContext &context, IEspWUPublishWorkunitRequest & req, IEspWUPublishWorkunitResponse & resp)
  442. {
  443. StringBuffer wuid = req.getWuid();
  444. checkAndTrimWorkunit("WUPublishWorkunit", wuid);
  445. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  446. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  447. if (!cw)
  448. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot find the workunit %s", wuid.str());
  449. resp.setWuid(wuid.str());
  450. SCMStringBuffer queryName;
  451. if (notEmpty(req.getJobName()))
  452. queryName.set(req.getJobName());
  453. else
  454. cw->getJobName(queryName).str();
  455. if (!queryName.length())
  456. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Query/Job name not defined for publishing workunit %s", wuid.str());
  457. SCMStringBuffer target;
  458. if (notEmpty(req.getCluster()))
  459. target.set(req.getCluster());
  460. else
  461. cw->getClusterName(target);
  462. if (!target.length())
  463. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Cluster name not defined for publishing workunit %s", wuid.str());
  464. if (!isValidCluster(target.str()))
  465. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", target.str());
  466. copyQueryFilesToCluster(context, cw, req.getRemoteDali(), target.str(), queryName.str(), false);
  467. WorkunitUpdate wu(&cw->lock());
  468. if (req.getUpdateWorkUnitName() && notEmpty(req.getJobName()))
  469. wu->setJobName(req.getJobName());
  470. StringBuffer queryId;
  471. WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
  472. addQueryToQuerySet(wu, target.str(), queryName.str(), NULL, activate, queryId, context.queryUserId());
  473. if (req.getMemoryLimit() || !req.getTimeLimit_isNull() || !req.getWarnTimeLimit_isNull() || req.getPriority() || req.getComment())
  474. {
  475. Owned<IPropertyTree> queryTree = getQueryById(target.str(), queryId, false);
  476. updateMemoryLimitSetting(queryTree, req.getMemoryLimit());
  477. updateQuerySetting(req.getTimeLimit_isNull(), queryTree, "@timeLimit", req.getTimeLimit());
  478. updateQuerySetting(req.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", req.getWarnTimeLimit());
  479. updateQueryPriority(queryTree, req.getPriority());
  480. if (req.getComment())
  481. queryTree->setProp("@comment", req.getComment());
  482. }
  483. wu->commit();
  484. wu.clear();
  485. if (queryId.length())
  486. resp.setQueryId(queryId.str());
  487. resp.setQueryName(queryName.str());
  488. resp.setQuerySet(target.str());
  489. Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target.str());
  490. bool reloadFailed = false;
  491. if (0!=req.getWait() && !req.getNoReload())
  492. reloadFailed = !reloadCluster(clusterInfo, (unsigned)req.getWait());
  493. resp.setReloadFailed(reloadFailed);
  494. double version = context.getClientVersion();
  495. if (version > 1.38)
  496. {
  497. StringBuffer errorMessage;
  498. if (!reloadFailed && !req.getNoReload() && isQuerySuspended(queryName.str(), clusterInfo, (unsigned)req.getWait(), errorMessage))
  499. {
  500. resp.setSuspended(true);
  501. resp.setErrorMessage(errorMessage);
  502. }
  503. }
  504. return true;
  505. }
  506. bool CWsWorkunitsEx::onWUQuerysets(IEspContext &context, IEspWUQuerysetsRequest & req, IEspWUQuerysetsResponse & resp)
  507. {
  508. IArrayOf<IEspQuerySet> querySets;
  509. Owned<IStringIterator> targets = getTargetClusters(NULL, NULL);
  510. SCMStringBuffer target;
  511. ForEach(*targets)
  512. {
  513. Owned<IEspQuerySet> qs = createQuerySet();
  514. qs->setQuerySetName(targets->str(target).str());
  515. querySets.append(*qs.getClear());
  516. }
  517. resp.setQuerysets(querySets);
  518. return true;
  519. }
  520. void gatherQuerySetQueryDetails(IPropertyTree *query, IEspQuerySetQuery *queryInfo, const char *cluster, IPropertyTree *queriesOnCluster)
  521. {
  522. queryInfo->setId(query->queryProp("@id"));
  523. queryInfo->setName(query->queryProp("@name"));
  524. queryInfo->setDll(query->queryProp("@dll"));
  525. queryInfo->setWuid(query->queryProp("@wuid"));
  526. queryInfo->setSuspended(query->getPropBool("@suspended", false));
  527. if (query->hasProp("@memoryLimit"))
  528. {
  529. StringBuffer s;
  530. memoryLimitStringFromUInt64(s, query->getPropInt64("@memoryLimit"));
  531. queryInfo->setMemoryLimit(s);
  532. }
  533. if (query->hasProp("@timeLimit"))
  534. queryInfo->setTimeLimit(query->getPropInt("@timeLimit"));
  535. if (query->hasProp("@warnTimeLimit"))
  536. queryInfo->setWarnTimeLimit(query->getPropInt("@warnTimeLimit"));
  537. if (query->hasProp("@priority"))
  538. queryInfo->setPriority(getQueryPriorityName(query->getPropInt("@priority")));
  539. if (query->hasProp("@comment"))
  540. queryInfo->setComment(query->queryProp("@comment"));
  541. if (queriesOnCluster)
  542. {
  543. IArrayOf<IEspClusterQueryState> clusters;
  544. Owned<IEspClusterQueryState> clusterState = createClusterQueryState();
  545. clusterState->setCluster(cluster);
  546. VStringBuffer xpath("Endpoint/Queries/Query[@id='%s']", query->queryProp("@id"));
  547. IPropertyTree *aQuery = queriesOnCluster->getBranch(xpath.str());
  548. if (!aQuery)
  549. {
  550. clusterState->setState("Not Found");
  551. }
  552. else if (aQuery->getPropBool("@suspended", false))
  553. {
  554. clusterState->setState("Suspended");
  555. }
  556. else
  557. {
  558. clusterState->setState("Available");
  559. }
  560. clusters.append(*clusterState.getClear());
  561. queryInfo->setClusters(clusters);
  562. }
  563. }
  564. void gatherQuerySetAliasDetails(IPropertyTree *alias, IEspQuerySetAlias *aliasInfo)
  565. {
  566. aliasInfo->setName(alias->queryProp("@name"));
  567. aliasInfo->setId(alias->queryProp("@id"));
  568. }
  569. void retrieveAllQuerysetDetails(IPropertyTree *registry, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL, const char *type=NULL, const char *value=NULL)
  570. {
  571. Owned<IPropertyTreeIterator> regQueries = registry->getElements("Query");
  572. ForEach(*regQueries)
  573. {
  574. IPropertyTree &query = regQueries->query();
  575. Owned<IEspQuerySetQuery> q = createQuerySetQuery();
  576. gatherQuerySetQueryDetails(&query, q, cluster, queriesOnCluster);
  577. if (isEmpty(cluster) || isEmpty(type) || isEmpty(value) || !strieq(type, "Status"))
  578. queries.append(*q.getClear());
  579. else
  580. {
  581. IArrayOf<IConstClusterQueryState>& cs = q->getClusters();
  582. ForEachItemIn(i, cs)
  583. {
  584. IConstClusterQueryState& c = cs.item(i);
  585. if (strieq(c.getCluster(), cluster) && (strieq(value, "All") || strieq(c.getState(), value)))
  586. {
  587. queries.append(*q.getClear());
  588. break;
  589. }
  590. }
  591. }
  592. }
  593. Owned<IPropertyTreeIterator> regAliases = registry->getElements("Alias");
  594. ForEach(*regAliases)
  595. {
  596. IPropertyTree &alias = regAliases->query();
  597. Owned<IEspQuerySetAlias> a = createQuerySetAlias();
  598. gatherQuerySetAliasDetails(&alias, a);
  599. aliases.append(*a.getClear());
  600. }
  601. }
  602. void retrieveQuerysetDetailsFromAlias(IPropertyTree *registry, const char *name, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster, IPropertyTree *queriesOnCluster)
  603. {
  604. StringBuffer xpath;
  605. xpath.append("Alias[@name='").append(name).append("']");
  606. IPropertyTree *alias = registry->queryPropTree(xpath);
  607. if (!alias)
  608. {
  609. DBGLOG("Alias %s not found", name);
  610. return;
  611. }
  612. Owned<IEspQuerySetAlias> a = createQuerySetAlias();
  613. gatherQuerySetAliasDetails(alias, a);
  614. xpath.clear().append("Query[@id='").append(a->getId()).append("']");
  615. aliases.append(*a.getClear());
  616. IPropertyTree *query = registry->queryPropTree(xpath);
  617. if (!query)
  618. {
  619. DBGLOG("No matching Query %s found for Alias %s", a->getId(), name);
  620. return;
  621. }
  622. Owned<IEspQuerySetQuery> q = createQuerySetQuery();
  623. gatherQuerySetQueryDetails(query, q, cluster, queriesOnCluster);
  624. queries.append(*q.getClear());
  625. }
  626. void retrieveQuerysetDetailsFromQuery(IPropertyTree *registry, const char *value, const char *type, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
  627. {
  628. if (!strieq(type, "Id") && !strieq(type, "Name"))
  629. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Unrecognized queryset filter type %s", type);
  630. StringBuffer attributeName(type);
  631. StringBuffer xpath;
  632. xpath.clear().append("Query[@").append(attributeName.toLowerCase()).append("='").append(value).append("']");
  633. IPropertyTree *query = registry->queryPropTree(xpath);
  634. if (!query)
  635. {
  636. DBGLOG("No matching Query %s found for %s", value, type);
  637. return;
  638. }
  639. Owned<IEspQuerySetQuery> q = createQuerySetQuery();
  640. gatherQuerySetQueryDetails(query, q, cluster, queriesOnCluster);
  641. xpath.clear().append("Alias[@id='").append(q->getId()).append("']");
  642. queries.append(*q.getClear());
  643. Owned<IPropertyTreeIterator> regAliases = registry->getElements(xpath.str());
  644. ForEach(*regAliases)
  645. {
  646. IPropertyTree &alias = regAliases->query();
  647. Owned<IEspQuerySetAlias> a = createQuerySetAlias();
  648. gatherQuerySetAliasDetails(&alias, a);
  649. aliases.append(*a.getClear());
  650. }
  651. }
  652. void retrieveQuerysetDetails(IPropertyTree *registry, const char *type, const char *value, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
  653. {
  654. if (strieq(type, "All"))
  655. return retrieveAllQuerysetDetails(registry, queries, aliases, cluster, queriesOnCluster);
  656. if (!value || !*value)
  657. return;
  658. if (strieq(type, "Alias"))
  659. return retrieveQuerysetDetailsFromAlias(registry, value, queries, aliases, cluster, queriesOnCluster);
  660. if (strieq(type, "Status") && !isEmpty(cluster))
  661. return retrieveAllQuerysetDetails(registry, queries, aliases, cluster, queriesOnCluster, type, value);
  662. return retrieveQuerysetDetailsFromQuery(registry, value, type, queries, aliases, cluster, queriesOnCluster);
  663. }
  664. void retrieveQuerysetDetails(IArrayOf<IEspWUQuerySetDetail> &details, IPropertyTree *registry, const char *type, const char *value, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
  665. {
  666. if (!registry)
  667. return;
  668. IArrayOf<IEspQuerySetQuery> queries;
  669. IArrayOf<IEspQuerySetAlias> aliases;
  670. retrieveQuerysetDetails(registry, type, value, queries, aliases, cluster, queriesOnCluster);
  671. Owned<IEspWUQuerySetDetail> detail = createWUQuerySetDetail();
  672. detail->setQuerySetName(registry->queryProp("@id"));
  673. detail->setQueries(queries);
  674. detail->setAliases(aliases);
  675. details.append(*detail.getClear());
  676. }
  677. void retrieveQuerysetDetails(IArrayOf<IEspWUQuerySetDetail> &details, const char *queryset, const char *type, const char *value, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
  678. {
  679. if (!queryset || !*queryset)
  680. return;
  681. Owned<IPropertyTree> registry = getQueryRegistry(queryset, true);
  682. if (!registry)
  683. return;
  684. retrieveQuerysetDetails(details, registry, type, value, cluster, queriesOnCluster);
  685. }
  686. void retrieveQuerysetDetailsByCluster(IArrayOf<IEspWUQuerySetDetail> &details, const char *target, const char *queryset, const char *type, const char *value)
  687. {
  688. Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
  689. if (!info)
  690. throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Cluster %s not found", target);
  691. if (queryset && *queryset && !strieq(target, queryset))
  692. throw MakeStringException(ECLWATCH_QUERYSET_NOT_ON_CLUSTER, "Target %s and QuerySet %s should match", target, queryset);
  693. Owned<IPropertyTree> queriesOnCluster;
  694. if (info->getPlatform()==RoxieCluster)
  695. {
  696. const SocketEndpointArray &eps = info->getRoxieServers();
  697. if (eps.length())
  698. {
  699. Owned<ISocket> sock = ISocket::connect_timeout(eps.item(0), 5);
  700. queriesOnCluster.setown(sendRoxieControlQuery(sock, "<control:queries/>", 5));
  701. }
  702. }
  703. retrieveQuerysetDetails(details, target, type, value, target, queriesOnCluster);
  704. }
  705. void retrieveAllQuerysetDetails(IArrayOf<IEspWUQuerySetDetail> &details, const char *type, const char *value)
  706. {
  707. Owned<IPropertyTree> root = getQueryRegistryRoot();
  708. if (!root)
  709. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "QuerySet Registry not found");
  710. Owned<IPropertyTreeIterator> querysets = root->getElements("QuerySet");
  711. if (!root)
  712. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "QuerySet Registry not found");
  713. ForEach(*querysets)
  714. retrieveQuerysetDetails(details, &querysets->query(), type, value);
  715. }
  716. bool CWsWorkunitsEx::onWUQuerysetDetails(IEspContext &context, IEspWUQuerySetDetailsRequest & req, IEspWUQuerySetDetailsResponse & resp)
  717. {
  718. resp.setQuerySetName(req.getQuerySetName());
  719. double version = context.getClientVersion();
  720. if (version > 1.36)
  721. {
  722. Owned<IPropertyTree> queryRegistry = getQueryRegistry(req.getQuerySetName(), false);
  723. resp.setClusterName(req.getClusterName());
  724. resp.setFilter(req.getFilter());
  725. resp.setFilterType(req.getFilterType());
  726. }
  727. Owned<IPropertyTree> registry = getQueryRegistry(req.getQuerySetName(), true);
  728. if (!registry)
  729. return false;
  730. IArrayOf<IEspQuerySetQuery> respQueries;
  731. IArrayOf<IEspQuerySetAlias> respAliases;
  732. if (isEmpty(req.getClusterName()) || isEmpty(req.getFilterTypeAsString()) || !strieq(req.getFilterTypeAsString(), "Status") || isEmpty(req.getFilter()))
  733. {
  734. retrieveQuerysetDetails(registry, req.getFilterTypeAsString(), req.getFilter(), respQueries, respAliases);
  735. resp.setQuerysetQueries(respQueries);
  736. resp.setQuerysetAliases(respAliases);
  737. }
  738. else
  739. {
  740. IArrayOf<IEspWUQuerySetDetail> respDetails;
  741. retrieveQuerysetDetailsByCluster(respDetails, req.getClusterName(), req.getQuerySetName(), req.getFilterTypeAsString(), req.getFilter());
  742. if (respDetails.ordinality())
  743. {
  744. IEspWUQuerySetDetail& detail = respDetails.item(0);
  745. resp.setQuerysetQueries(detail.getQueries());
  746. resp.setQuerysetAliases(detail.getAliases());
  747. }
  748. }
  749. return true;
  750. }
  751. bool CWsWorkunitsEx::onWUMultiQuerysetDetails(IEspContext &context, IEspWUMultiQuerySetDetailsRequest & req, IEspWUMultiQuerySetDetailsResponse & resp)
  752. {
  753. IArrayOf<IEspWUQuerySetDetail> respDetails;
  754. if (notEmpty(req.getClusterName()))
  755. retrieveQuerysetDetailsByCluster(respDetails, req.getClusterName(), req.getQuerySetName(), req.getFilterTypeAsString(), req.getFilter());
  756. else if (notEmpty(req.getQuerySetName()))
  757. retrieveQuerysetDetails(respDetails, req.getQuerySetName(), req.getFilterTypeAsString(), req.getFilter());
  758. else
  759. retrieveAllQuerysetDetails(respDetails, req.getFilterTypeAsString(), req.getFilter());
  760. resp.setQuerysets(respDetails);
  761. return true;
  762. }
  763. bool CWsWorkunitsEx::onWUQueryDetails(IEspContext &context, IEspWUQueryDetailsRequest & req, IEspWUQueryDetailsResponse & resp)
  764. {
  765. const char* querySet = req.getQuerySet();
  766. const char* queryId = req.getQueryId();
  767. if (!querySet || !*querySet)
  768. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "QuerySet not specified");
  769. if (!queryId || !*queryId)
  770. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "QueryId not specified");
  771. resp.setQueryId(queryId);
  772. resp.setQuerySet(querySet);
  773. Owned<IPropertyTree> queryRegistry = getQueryRegistry(querySet, false);
  774. StringBuffer xpath;
  775. xpath.clear().append("Query[@id='").append(queryId).append("']");
  776. IPropertyTree *query = queryRegistry->queryPropTree(xpath);
  777. if (!query)
  778. {
  779. DBGLOG("No matching Query");
  780. return false;
  781. }
  782. resp.setQueryName(query->queryProp("@name"));
  783. resp.setWuid(query->queryProp("@wuid"));
  784. resp.setDll(query->queryProp("@dll"));
  785. resp.setPublishedBy(query->queryProp("@publishedBy"));
  786. resp.setSuspended(query->getPropBool("@suspended", false));
  787. resp.setSuspendedBy(query->queryProp("@suspendedBy"));
  788. resp.setComment(query->queryProp("@comment"));
  789. StringArray logicalFiles;
  790. getQueryFiles(queryId, querySet, logicalFiles);
  791. if (logicalFiles.length())
  792. resp.setLogicalFiles(logicalFiles);
  793. return true;
  794. }
  795. bool CWsWorkunitsEx::getQueryFiles(const char* query, const char* target, StringArray& logicalFiles)
  796. {
  797. try
  798. {
  799. Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
  800. if (!info || (info->getPlatform()!=RoxieCluster))
  801. return false;
  802. const SocketEndpointArray &eps = info->getRoxieServers();
  803. if (eps.empty())
  804. return false;
  805. StringBuffer control;
  806. control.appendf("<control:getQueryXrefInfo full='1'><Query id='%s'/></control:getQueryXrefInfo>", query);
  807. Owned<ISocket> sock = ISocket::connect_timeout(eps.item(0), 5);
  808. Owned<IPropertyTree> result = sendRoxieControlQuery(sock, control.str(), 5);
  809. if (!result)
  810. return false;
  811. Owned<IPropertyTreeIterator> files = result->getElements("Endpoint/Queries/Query/File");
  812. ForEach (*files)
  813. {
  814. IPropertyTree &file = files->query();
  815. const char* fileName = file.queryProp("@name");
  816. if (fileName && *fileName)
  817. logicalFiles.append(fileName);
  818. }
  819. return true;
  820. }
  821. catch(IMultiException *me)
  822. {
  823. StringBuffer err;
  824. DBGLOG("ERROR control:getQueryXrefInfo roxie query info %s", me->errorMessage(err.append(me->errorCode()).append(' ')).str());
  825. me->Release();
  826. return false;
  827. }
  828. catch(IException *e)
  829. {
  830. StringBuffer err;
  831. DBGLOG("ERROR control:getQueryXrefInfo roxie query info %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
  832. e->Release();
  833. return false;
  834. }
  835. }
  836. inline void verifyQueryActionAllowsWild(bool &allowWildChecked, CQuerySetQueryActionTypes action)
  837. {
  838. if (allowWildChecked)
  839. return;
  840. switch (action)
  841. {
  842. case CQuerySetQueryActionTypes_ToggleSuspend:
  843. throw MakeStringException(ECLWATCH_INVALID_ACTION, "Wildcards not supported for toggling suspended state");
  844. case CQuerySetQueryActionTypes_Activate:
  845. throw MakeStringException(ECLWATCH_INVALID_ACTION, "Wildcards not supported for Activating queries");
  846. }
  847. allowWildChecked=true;
  848. }
  849. void expandQueryActionTargetList(IProperties *queryIds, IPropertyTree *queryset, IArrayOf<IConstQuerySetQueryActionItem> &items, CQuerySetQueryActionTypes action)
  850. {
  851. bool allowWildChecked=false;
  852. Owned<IPropertyTreeIterator> queries = queryset->getElements("Query");
  853. ForEachItemIn(i, items)
  854. {
  855. const char *itemId = items.item(i).getQueryId();
  856. if (!isWildString(itemId))
  857. queryIds->setProp(itemId, (int) items.item(i).getClientState().getSuspended());
  858. else
  859. {
  860. verifyQueryActionAllowsWild(allowWildChecked, action);
  861. ForEach(*queries)
  862. {
  863. const char *queryId = queries->query().queryProp("@id");
  864. if (queryId && WildMatch(queryId, itemId))
  865. queryIds->setProp(queryId, 0);
  866. }
  867. }
  868. }
  869. }
  870. void expandQueryActionTargetList(IProperties *queryIds, IPropertyTree *queryset, const char *id, CQuerySetQueryActionTypes action)
  871. {
  872. IArrayOf<IConstQuerySetQueryActionItem> items;
  873. Owned<IEspQuerySetQueryActionItem> item = createQuerySetQueryActionItem();
  874. item->setQueryId(id);
  875. items.append(*(IConstQuerySetQueryActionItem*)item.getClear());
  876. expandQueryActionTargetList(queryIds, queryset, items, action);
  877. }
  878. bool CWsWorkunitsEx::onWUQueryConfig(IEspContext &context, IEspWUQueryConfigRequest & req, IEspWUQueryConfigResponse & resp)
  879. {
  880. StringAttr target(req.getTarget());
  881. if (target.isEmpty())
  882. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Target name required");
  883. if (!isValidCluster(target))
  884. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid target name: %s", target.get());
  885. Owned<IPropertyTree> queryset = getQueryRegistry(target.get(), false);
  886. if (!queryset)
  887. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target Queryset %s not found", req.getTarget());
  888. Owned<IProperties> queryIds = createProperties();
  889. expandQueryActionTargetList(queryIds, queryset, req.getQueryId(), QuerySetQueryActionTypes_Undefined);
  890. IArrayOf<IEspWUQueryConfigResult> results;
  891. Owned<IPropertyIterator> it = queryIds->getIterator();
  892. ForEach(*it)
  893. {
  894. Owned<IEspWUQueryConfigResult> result = createWUQueryConfigResult();
  895. result->setQueryId(it->getPropKey());
  896. VStringBuffer xpath("Query[@id='%s']", it->getPropKey());
  897. IPropertyTree *queryTree = queryset->queryPropTree(xpath);
  898. if (queryTree)
  899. {
  900. updateMemoryLimitSetting(queryTree, req.getMemoryLimit());
  901. updateQueryPriority(queryTree, req.getPriority());
  902. updateQuerySetting(req.getTimeLimit_isNull(), queryTree, "@timeLimit", req.getTimeLimit());
  903. updateQuerySetting(req.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", req.getWarnTimeLimit());
  904. if (req.getComment())
  905. queryTree->setProp("@comment", req.getComment());
  906. }
  907. results.append(*result.getClear());
  908. }
  909. resp.setResults(results);
  910. bool reloadFailed = false;
  911. if (0!=req.getWait() && !req.getNoReload())
  912. reloadFailed = !reloadCluster(target.get(), (unsigned)req.getWait());
  913. resp.setReloadFailed(reloadFailed);
  914. return true;
  915. }
  916. bool CWsWorkunitsEx::onWUQuerysetQueryAction(IEspContext &context, IEspWUQuerySetQueryActionRequest & req, IEspWUQuerySetQueryActionResponse & resp)
  917. {
  918. resp.setQuerySetName(req.getQuerySetName());
  919. resp.setAction(req.getAction());
  920. if (isEmpty(req.getQuerySetName()))
  921. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Queryset name required");
  922. Owned<IPropertyTree> queryset = getQueryRegistry(req.getQuerySetName(), true);
  923. if (!queryset)
  924. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySetName());
  925. Owned<IProperties> queryIds = createProperties();
  926. expandQueryActionTargetList(queryIds, queryset, req.getQueries(), req.getAction());
  927. IArrayOf<IEspQuerySetQueryActionResult> results;
  928. Owned<IPropertyIterator> it = queryIds->getIterator();
  929. ForEach(*it)
  930. {
  931. const char *id = it->getPropKey();
  932. VStringBuffer xpath("Query[@id='%s']", id);
  933. Owned<IEspQuerySetQueryActionResult> result = createQuerySetQueryActionResult();
  934. result->setQueryId(id);
  935. try
  936. {
  937. switch (req.getAction())
  938. {
  939. case CQuerySetQueryActionTypes_ToggleSuspend:
  940. setQuerySuspendedState(queryset, id, !queryIds->getPropBool(id), context.queryUserId());
  941. break;
  942. case CQuerySetQueryActionTypes_Suspend:
  943. setQuerySuspendedState(queryset, id, true, context.queryUserId());
  944. break;
  945. case CQuerySetQueryActionTypes_Unsuspend:
  946. setQuerySuspendedState(queryset, id, false, NULL);
  947. break;
  948. case CQuerySetQueryActionTypes_Activate:
  949. {
  950. IPropertyTree *query = queryset->queryPropTree(xpath);
  951. if (!query)
  952. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query %s/%s not found.", req.getQuerySetName(), id);
  953. setQueryAlias(queryset, query->queryProp("@name"), id);
  954. break;
  955. }
  956. case CQuerySetQueryActionTypes_Delete:
  957. removeNamedQuery(queryset, id);
  958. break;
  959. case CQuerySetQueryActionTypes_RemoveAllAliases:
  960. removeAliasesFromNamedQuery(queryset, id);
  961. break;
  962. }
  963. result->setSuccess(true);
  964. IPropertyTree *query = queryset->queryPropTree(xpath);
  965. if (query)
  966. result->setSuspended(query->getPropBool("@suspended"));
  967. }
  968. catch(IException *e)
  969. {
  970. StringBuffer msg;
  971. result->setMessage(e->errorMessage(msg).str());
  972. result->setCode(e->errorCode());
  973. result->setSuccess(false);
  974. }
  975. results.append(*result.getClear());
  976. }
  977. resp.setResults(results);
  978. return true;
  979. }
  980. bool CWsWorkunitsEx::onWUQuerysetAliasAction(IEspContext &context, IEspWUQuerySetAliasActionRequest &req, IEspWUQuerySetAliasActionResponse &resp)
  981. {
  982. resp.setQuerySetName(req.getQuerySetName());
  983. resp.setAction(req.getAction());
  984. if (isEmpty(req.getQuerySetName()))
  985. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Queryset name required");
  986. Owned<IPropertyTree> queryset = getQueryRegistry(req.getQuerySetName(), true);
  987. if (!queryset)
  988. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySetName());
  989. IArrayOf<IEspQuerySetAliasActionResult> results;
  990. ForEachItemIn(i, req.getAliases())
  991. {
  992. IConstQuerySetAliasActionItem& item=req.getAliases().item(i);
  993. Owned<IEspQuerySetAliasActionResult> result = createQuerySetAliasActionResult();
  994. try
  995. {
  996. VStringBuffer xpath("Alias[@name='%s']", item.getName());
  997. IPropertyTree *alias = queryset->queryPropTree(xpath.str());
  998. if (!alias)
  999. throw MakeStringException(ECLWATCH_ALIAS_NOT_FOUND, "Alias %s/%s not found.", req.getQuerySetName(), item.getName());
  1000. switch (req.getAction())
  1001. {
  1002. case CQuerySetAliasActionTypes_Deactivate:
  1003. removeQuerySetAlias(req.getQuerySetName(), item.getName());
  1004. break;
  1005. }
  1006. result->setSuccess(true);
  1007. }
  1008. catch(IException *e)
  1009. {
  1010. StringBuffer msg;
  1011. result->setMessage(e->errorMessage(msg).str());
  1012. result->setCode(e->errorCode());
  1013. result->setSuccess(false);
  1014. }
  1015. results.append(*result.getClear());
  1016. }
  1017. resp.setResults(results);
  1018. return true;
  1019. }
  1020. #define QUERYPATH_SEP_CHAR '/'
  1021. bool nextQueryPathNode(const char *&path, StringBuffer &node)
  1022. {
  1023. if (*path==QUERYPATH_SEP_CHAR)
  1024. path++;
  1025. while (*path && *path!=QUERYPATH_SEP_CHAR)
  1026. node.append(*path++);
  1027. return (*path && *++path);
  1028. }
  1029. bool splitQueryPath(const char *path, StringBuffer &netAddress, StringBuffer &queryset, StringBuffer &query)
  1030. {
  1031. if (!path || !*path)
  1032. return false;
  1033. if (*path==QUERYPATH_SEP_CHAR && path[1]==QUERYPATH_SEP_CHAR)
  1034. {
  1035. path+=2;
  1036. if (!nextQueryPathNode(path, netAddress))
  1037. return false;
  1038. }
  1039. if (!nextQueryPathNode(path, queryset))
  1040. return false;
  1041. if (nextQueryPathNode(path, query))
  1042. return false; //query path too deep
  1043. return true;
  1044. }
  1045. bool CWsWorkunitsEx::onWUQuerysetCopyQuery(IEspContext &context, IEspWUQuerySetCopyQueryRequest &req, IEspWUQuerySetCopyQueryResponse &resp)
  1046. {
  1047. unsigned start = msTick();
  1048. const char *source = req.getSource();
  1049. if (!source || !*source)
  1050. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No source query specified");
  1051. const char *target = req.getTarget();
  1052. if (!target || !*target)
  1053. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No destination specified");
  1054. if (strchr(target, '/')) //for future use
  1055. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid target queryset name");
  1056. if (req.getCluster() && *req.getCluster() && !strieq(req.getCluster(), target)) //backward compatability check
  1057. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid target cluster and queryset must match");
  1058. if (!isValidCluster(target))
  1059. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid target name: %s", target);
  1060. StringBuffer srcAddress, srcQuerySet, srcQuery;
  1061. if (!splitQueryPath(source, srcAddress, srcQuerySet, srcQuery))
  1062. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid source query path");
  1063. StringBuffer remoteIP;
  1064. StringBuffer queryName;
  1065. StringBuffer wuid;
  1066. if (srcAddress.length())
  1067. {
  1068. StringBuffer xml;
  1069. MemoryBuffer dll;
  1070. StringBuffer dllname;
  1071. fetchRemoteWorkunit(context, srcAddress.str(), srcQuerySet.str(), srcQuery.str(), NULL, queryName, xml, dllname, dll, remoteIP);
  1072. deploySharedObject(context, wuid, dllname.str(), target, queryName.str(), dll, queryDirectory.str(), xml.str());
  1073. }
  1074. else
  1075. {
  1076. Owned<IPropertyTree> queryset = getQueryRegistry(srcQuerySet.str(), true);
  1077. if (!queryset)
  1078. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Source Queryset %s not found", srcQuery.str());
  1079. IPropertyTree *query = resolveQueryAlias(queryset, srcQuery.str());
  1080. if (!query)
  1081. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Source query %s not found", source);
  1082. wuid.set(query->queryProp("@wuid"));
  1083. queryName.set(query->queryProp("@name"));
  1084. }
  1085. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1086. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  1087. if (!req.getDontCopyFiles())
  1088. {
  1089. const char *reqDali = req.getDaliServer();
  1090. copyQueryFilesToCluster(context, cw, (reqDali && *reqDali) ? reqDali : remoteIP.str(), target, queryName.str(), req.getOverwrite());
  1091. }
  1092. WorkunitUpdate wu(&cw->lock());
  1093. if (!wu)
  1094. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "Error opening wuid %s for query %s", wuid.str(), source);
  1095. StringBuffer targetQueryId;
  1096. WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
  1097. addQueryToQuerySet(wu, target, queryName.str(), NULL, activate, targetQueryId, context.queryUserId());
  1098. if (req.getMemoryLimit() || !req.getTimeLimit_isNull() || ! req.getWarnTimeLimit_isNull() || req.getPriority())
  1099. {
  1100. Owned<IPropertyTree> queryTree = getQueryById(target, targetQueryId, false);
  1101. updateMemoryLimitSetting(queryTree, req.getMemoryLimit());
  1102. updateQueryPriority(queryTree, req.getPriority());
  1103. updateQuerySetting(req.getTimeLimit_isNull(), queryTree, "@timeLimit", req.getTimeLimit());
  1104. updateQuerySetting(req.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", req.getWarnTimeLimit());
  1105. if (req.getComment())
  1106. queryTree->setProp("@comment", req.getComment());
  1107. }
  1108. wu.clear();
  1109. resp.setQueryId(targetQueryId.str());
  1110. if (0!=req.getWait() && !req.getNoReload())
  1111. reloadCluster(target, remainingMsWait(req.getWait(), start));
  1112. return true;
  1113. }