ws_workunitsQuerySets.cpp 156 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "ws_workunitsService.hpp"
  14. #include "ws_fs.hpp"
  15. #include "jlib.hpp"
  16. #include "jflz.hpp"
  17. #include "daclient.hpp"
  18. #include "dalienv.hpp"
  19. #include "dadfs.hpp"
  20. #include "dfuwu.hpp"
  21. #include "eclhelper.hpp"
  22. #include "roxiecontrol.hpp"
  23. #include "dfuutil.hpp"
  24. #include "dautils.hpp"
  25. #include "httpclient.hpp"
  26. #include "portlist.h" //ROXIE_SERVER_PORT
  27. #include "TpWrapper.hpp"
  28. #define DALI_FILE_LOOKUP_TIMEOUT (1000*15*1) // 15 seconds
  29. //The CQuerySetQueryActionTypes[] has to match with the ESPenum QuerySetQueryActionTypes in the ecm file.
  30. static unsigned NumOfQuerySetQueryActionTypes = 7;
  31. static const char *QuerySetQueryActionTypes[] = { "Suspend", "Unsuspend", "ToggleSuspend", "Activate",
  32. "Delete", "RemoveAllAliases", "ResetQueryStats", NULL };
  33. //The CQuerySetAliasActionTypes[] has to match with the ESPenum QuerySetAliasActionTypes in the ecm file.
  34. static unsigned NumOfQuerySetAliasActionTypes = 1;
  35. static const char *QuerySetAliasActionTypes[] = { "Deactivate", NULL };
  36. void checkUseEspOrDaliIP(SocketEndpoint &ep, const char *ip, const char *esp)
  37. {
  38. if (!ip || !*ip)
  39. return;
  40. ep.set(ip, 7070);
  41. if (ep.isLoopBack() || *ip=='.' || (ip[0]=='0' && ip[1]=='.'))
  42. ep.ipset(esp);
  43. }
  44. void ensureInputString(const char* input, bool lowerCase, StringBuffer& inputStr, int code, const char* msg)
  45. {
  46. inputStr.set(input).trim();
  47. if (inputStr.isEmpty())
  48. throw MakeStringException(code, "%s", msg);
  49. if (lowerCase)
  50. inputStr.toLowerCase();
  51. }
  52. static IClientWsWorkunits *ensureWsWorkunitsClient(IClientWsWorkunits *ws, IEspContext *ctx, const char *netAddress, bool useSSL)
  53. {
  54. if (ws)
  55. return LINK(ws);
  56. StringBuffer url;
  57. if (netAddress && *netAddress)
  58. url.appendf("%s://%s%s/WsWorkunits", useSSL ? "https" : "http", netAddress, (!strchr(netAddress, ':')) ? ":8010" : "");
  59. else
  60. {
  61. if (!ctx)
  62. throw MakeStringException(ECLWATCH_INVALID_IP, "Missing WsWorkunits service address");
  63. StringBuffer ip;
  64. short port = 0;
  65. ctx->getServAddress(ip, port);
  66. url.appendf("%s://%s:%d/WsWorkunits", useSSL ? "https" : "http", ip.str(), port);
  67. }
  68. Owned<IClientWsWorkunits> cws = createWsWorkunitsClient();
  69. cws->addServiceUrl(url);
  70. if (ctx && ctx->queryUserId() && *ctx->queryUserId())
  71. cws->setUsernameToken(ctx->queryUserId(), ctx->queryPassword(), NULL);
  72. return cws.getClear();
  73. }
  74. IClientWUQuerySetDetailsResponse *fetchQueryDetails(IClientWsWorkunits *_ws, IEspContext *ctx, const char *netAddress, const char *target, const char *queryid, bool useSSL)
  75. {
  76. Owned<IClientWsWorkunits> ws = ensureWsWorkunitsClient(_ws, ctx, netAddress, useSSL);
  77. //using existing WUQuerysetDetails rather than extending WUQueryDetails, to support copying query meta data from prior releases
  78. Owned<IClientWUQuerySetDetailsRequest> reqQueryInfo = ws->createWUQuerysetDetailsRequest();
  79. reqQueryInfo->setClusterName(target);
  80. reqQueryInfo->setQuerySetName(target);
  81. reqQueryInfo->setFilter(queryid);
  82. reqQueryInfo->setFilterType("Id");
  83. return ws->WUQuerysetDetails(reqQueryInfo);
  84. }
  85. void fetchRemoteWorkunit(IClientWsWorkunits *_ws, IEspContext *ctx, const char *netAddress, const char *queryset, const char *query, const char *wuid, StringBuffer &name, StringBuffer &xml, StringBuffer &dllname, MemoryBuffer &dll, StringBuffer &daliServer, bool useSSL)
  86. {
  87. Owned<IClientWsWorkunits> ws = ensureWsWorkunitsClient(_ws, ctx, netAddress, useSSL);
  88. Owned<IClientWULogFileRequest> req = ws->createWUFileRequest();
  89. if (queryset && *queryset)
  90. req->setQuerySet(queryset);
  91. if (query && *query)
  92. req->setQuery(query);
  93. if (wuid && *wuid)
  94. req->setWuid(wuid);
  95. req->setErrorMessageFormat(CErrorMessageFormat_XML);
  96. req->setType("xml");
  97. Owned<IClientWULogFileResponse> resp = ws->WUFile(req);
  98. if (!resp || resp->getExceptions().ordinality() || !resp->getThefile().length())
  99. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Cannot retrieve remote workunit");
  100. xml.append(resp->getThefile().length(), resp->getThefile().toByteArray());
  101. req->setType("dll");
  102. resp.setown(ws->WUFile(req));
  103. if (!resp || resp->getExceptions().ordinality() || !resp->getThefile().length())
  104. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Cannot retrieve remote workunit shared object");
  105. dll.append(resp->getThefile());
  106. dllname.append(resp->getFileName());
  107. name.append(resp->getQueryName());
  108. SocketEndpoint ep;
  109. checkUseEspOrDaliIP(ep, resp->getDaliServer(), netAddress);
  110. if (!ep.isNull())
  111. ep.getUrlStr(daliServer);
  112. }
  113. void fetchRemoteWorkunitAndQueryDetails(IClientWsWorkunits *_ws, IEspContext *ctx, const char *netAddress, const char *queryset, const char *query, const char *wuid, StringBuffer &name, StringBuffer &xml, StringBuffer &dllname, MemoryBuffer &dll, StringBuffer &daliServer, Owned<IClientWUQuerySetDetailsResponse> &respQueryInfo, bool useSSL)
  114. {
  115. Owned<IClientWsWorkunits> ws = ensureWsWorkunitsClient(_ws, ctx, netAddress, useSSL);
  116. fetchRemoteWorkunit(ws, ctx, netAddress, queryset, query, wuid, name, xml, dllname, dll, daliServer, useSSL);
  117. respQueryInfo.setown(fetchQueryDetails(ws, ctx, netAddress, queryset, query, useSSL));
  118. }
  119. void doWuFileCopy(IClientFileSpray &fs, IEspWULogicalFileCopyInfo &info, const char *logicalname, const char *cluster, bool isRoxie, bool supercopy)
  120. {
  121. try
  122. {
  123. Owned<IClientCopy> req = fs.createCopyRequest();
  124. req->setSourceLogicalName(logicalname);
  125. req->setDestLogicalName(logicalname);
  126. req->setDestGroup(cluster);
  127. req->setSuperCopy(supercopy);
  128. if (isRoxie)
  129. req->setDestGroupRoxie("Yes");
  130. Owned<IClientCopyResponse> resp = fs.Copy(req);
  131. info.setDfuCopyWuid(resp->getResult());
  132. }
  133. catch (IException *e)
  134. {
  135. StringBuffer msg;
  136. info.setDfuCopyError(e->errorMessage(msg).str());
  137. e->Release();
  138. }
  139. }
  140. bool copyWULogicalFiles(IEspContext &context, IConstWorkUnit &cw, const char *cluster, bool isRoxie, bool copyLocal, IEspWUCopyLogicalClusterFileSections &lfinfo)
  141. {
  142. if (isEmpty(cluster))
  143. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "copyWULogicalFiles Cluster parameter not set.");
  144. Owned<IUserDescriptor> udesc = createUserDescriptor();
  145. udesc->set(context.queryUserId(), context.queryPassword(), context.querySignature());
  146. IArrayOf<IEspWULogicalFileCopyInfo> foreign;
  147. IArrayOf<IEspWULogicalFileCopyInfo> onCluster;
  148. IArrayOf<IEspWULogicalFileCopyInfo> notOnCluster;
  149. IArrayOf<IEspWULogicalFileCopyInfo> notFound;
  150. Owned<IClientFileSpray> fs;
  151. if (copyLocal)
  152. {
  153. fs.setown(createFileSprayClient());
  154. VStringBuffer url("http://.:%d/FileSpray", 8010);
  155. fs->addServiceUrl(url.str());
  156. }
  157. Owned<IConstWUGraphIterator> graphs = &cw.getGraphs(GraphTypeActivities);
  158. ForEach(*graphs)
  159. {
  160. Owned <IPropertyTree> xgmml = graphs->query().getXGMMLTree(false, false);
  161. Owned<IPropertyTreeIterator> iter = xgmml->getElements(".//node");
  162. ForEach(*iter)
  163. {
  164. try
  165. {
  166. IPropertyTree &node = iter->query();
  167. ThorActivityKind kind = (ThorActivityKind) node.getPropInt("att[@name='_kind']/@value", TAKnone);
  168. if(kind==TAKdiskwrite || kind==TAKspillwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite || kind==TAKjsonwrite)
  169. continue;
  170. if (node.getPropBool("att[@name='_isSpill']/@value") || node.getPropBool("att[@name='_isTransformSpill']/@value"))
  171. continue;
  172. Owned<IEspWULogicalFileCopyInfo> info = createWULogicalFileCopyInfo();
  173. const char *logicalname = node.queryProp("att[@name='_indexFileName']/@value");
  174. if (logicalname)
  175. info->setIsIndex(true);
  176. else
  177. logicalname = node.queryProp("att[@name='_fileName']/@value");
  178. info->setLogicalName(logicalname);
  179. if (logicalname)
  180. {
  181. if (!strnicmp("~foreign::", logicalname, 10))
  182. foreign.append(*info.getClear());
  183. else
  184. {
  185. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalname, udesc, false, false, false, nullptr, defaultPrivilegedUser);
  186. if(!df)
  187. notFound.append(*info.getClear());
  188. else if (df->findCluster(cluster)!=NotFound)
  189. {
  190. onCluster.append(*info.getClear());
  191. }
  192. else
  193. {
  194. StringArray clusters;
  195. df->getClusterNames(clusters);
  196. info->setClusters(clusters);
  197. if (copyLocal)
  198. {
  199. StringBuffer wuid;
  200. bool supercopy = queryDistributedFileDirectory().isSuperFile(logicalname, udesc, NULL);
  201. doWuFileCopy(*fs, *info, logicalname, cluster, isRoxie, supercopy);
  202. }
  203. notOnCluster.append(*info.getClear());
  204. }
  205. }
  206. }
  207. }
  208. catch(IException *e)
  209. {
  210. e->Release();
  211. }
  212. }
  213. lfinfo.setClusterName(cluster);
  214. lfinfo.setNotOnCluster(notOnCluster);
  215. lfinfo.setOnCluster(onCluster);
  216. lfinfo.setForeign(foreign);
  217. lfinfo.setNotFound(notFound);
  218. }
  219. return true;
  220. }
  221. void copyWULogicalFilesToTarget(IEspContext &context, IConstWUClusterInfo &clusterInfo, IConstWorkUnit &cw, IArrayOf<IConstWUCopyLogicalClusterFileSections> &clusterfiles, bool doLocalCopy)
  222. {
  223. const StringArray &thors = clusterInfo.getThorProcesses();
  224. ForEachItemIn(i, thors)
  225. {
  226. Owned<IEspWUCopyLogicalClusterFileSections> files = createWUCopyLogicalClusterFileSections();
  227. copyWULogicalFiles(context, cw, thors.item(i), false, doLocalCopy, *files);
  228. clusterfiles.append(*files.getClear());
  229. }
  230. SCMStringBuffer roxie;
  231. clusterInfo.getRoxieProcess(roxie);
  232. if (roxie.length())
  233. {
  234. Owned<IEspWUCopyLogicalClusterFileSections> files = createWUCopyLogicalClusterFileSections();
  235. copyWULogicalFiles(context, cw, roxie.str(), true, doLocalCopy, *files);
  236. clusterfiles.append(*files.getClear());
  237. }
  238. }
  239. void QueryFilesInUse::loadTarget(IPropertyTree *t, const char *target, unsigned flags)
  240. {
  241. if (!target || !*target)
  242. return;
  243. Owned<IConstWUClusterInfo> clusterInfo = getWUClusterInfoByName(target);
  244. if (!clusterInfo || !(clusterInfo->getPlatform() == RoxieCluster))
  245. return;
  246. Owned<IPropertyTree> queryRegistry = getQueryRegistry(target, true);
  247. if (!queryRegistry)
  248. return;
  249. SCMStringBuffer process;
  250. clusterInfo->getRoxieProcess(process);
  251. if (!process.length())
  252. return;
  253. Owned<IHpccPackageSet> ps = createPackageSet(process.str());
  254. const IHpccPackageMap *pm = (ps) ? ps->queryActiveMap(target) : NULL;
  255. const char *pmid = (pm) ? pm->queryPackageId() : NULL;
  256. VStringBuffer xpath("%s/@pmid", target);
  257. const char *pmidPrev = t->queryProp(xpath);
  258. if ((flags & UFO_RELOAD_TARGETS_CHANGED_PMID) && (pmid || pmidPrev))
  259. {
  260. if (!(pmid && pmidPrev) || !streq(pmid, pmidPrev))
  261. t->removeProp(target);
  262. }
  263. IPropertyTree *targetTree = ensurePTree(t, target);
  264. if (pm)
  265. targetTree->setProp("@pmid", pmid);
  266. if (flags & UFO_REMOVE_QUERIES_NOT_IN_QUERYSET)
  267. {
  268. Owned<IPropertyTreeIterator> cachedQueries = targetTree->getElements("Query");
  269. ForEach(*cachedQueries)
  270. {
  271. IPropertyTree &cachedQuery = cachedQueries->query();
  272. VStringBuffer xpath("Query[@id='%s']", cachedQuery.queryProp("@id"));
  273. if (!queryRegistry->hasProp(xpath))
  274. targetTree->removeTree(&cachedQuery);
  275. }
  276. }
  277. Owned<IPropertyTreeIterator> queries = queryRegistry->getElements("Query");
  278. ForEach(*queries)
  279. {
  280. if (aborting)
  281. return;
  282. IPropertyTree &query = queries->query();
  283. const char *queryid = query.queryProp("@id");
  284. if (!queryid || !*queryid)
  285. continue;
  286. const char *wuid = query.queryProp("@wuid");
  287. if (!wuid || !*wuid)
  288. continue;
  289. const char *pkgid=NULL;
  290. if (pm)
  291. {
  292. const IHpccPackage *pkg = pm->matchPackage(queryid);
  293. if (pkg)
  294. pkgid = pkg->queryId();
  295. }
  296. VStringBuffer xpath("Query[@id='%s']", queryid);
  297. IPropertyTree *queryTree = targetTree->queryPropTree(xpath);
  298. if (queryTree)
  299. {
  300. const char *cachedPkgid = queryTree->queryProp("@pkgid");
  301. if (pkgid && *pkgid)
  302. {
  303. if (!(flags & UFO_RELOAD_MAPPED_QUERIES) && (cachedPkgid && streq(pkgid, cachedPkgid)))
  304. continue;
  305. }
  306. else if (!cachedPkgid || !*cachedPkgid)
  307. continue;
  308. targetTree->removeTree(queryTree);
  309. queryTree = NULL;
  310. }
  311. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  312. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
  313. if (!cw)
  314. continue;
  315. queryTree = targetTree->addPropTree("Query", createPTree("Query"));
  316. queryTree->setProp("@target", target); //for reference when searching across targets
  317. queryTree->setProp("@id", queryid);
  318. if (pkgid && *pkgid)
  319. queryTree->setProp("@pkgid", pkgid);
  320. IUserDescriptor **roxieUser = roxieUserMap.getValue(target);
  321. Owned<IReferencedFileList> wufiles = createReferencedFileList(roxieUser ? *roxieUser : NULL, true, true);
  322. wufiles->addFilesFromQuery(cw, pm, queryid);
  323. if (aborting)
  324. return;
  325. StringArray locations;
  326. locations.append(process.str());
  327. wufiles->resolveFiles(locations, NULL, NULL, NULL, true, true, true, false);
  328. Owned<IReferencedFileIterator> files = wufiles->getFiles();
  329. ForEach(*files)
  330. {
  331. if (aborting)
  332. return;
  333. IReferencedFile &rf = files->query();
  334. //if (!(rf.getFlags() & RefSubFile))
  335. // continue;
  336. const char *lfn = rf.getLogicalName();
  337. if (!lfn || !*lfn)
  338. continue;
  339. if (!queryTree->hasProp(xpath.setf("File[@lfn='%s']", lfn)))
  340. {
  341. IPropertyTree *fileTree = queryTree->addPropTree("File", createPTree("File"));
  342. fileTree->setProp("@lfn", lfn);
  343. if (rf.getFlags() & RefFileSuper)
  344. {
  345. fileTree->setPropBool("@super", true);
  346. ForEachItemIn(i, rf.getSubFileNames())
  347. {
  348. IPropertyTree *subfileTree = fileTree->addPropTree("SubFile");
  349. subfileTree->setProp("@lfn", rf.getSubFileNames().item(i));
  350. }
  351. }
  352. if (rf.getFlags() & RefFileNotFound)
  353. fileTree->setPropBool("@notFound", true);
  354. const char *fpkgid = rf.queryPackageId();
  355. if (fpkgid && *fpkgid)
  356. fileTree->setProp("@pkgid", fpkgid);
  357. if (rf.getFileSize())
  358. fileTree->setPropInt64("@size", rf.getFileSize());
  359. if (rf.getNumParts())
  360. fileTree->setPropInt("@numparts", rf.getNumParts());
  361. }
  362. }
  363. }
  364. }
  365. void QueryFilesInUse::loadTargets(IPropertyTree *t, unsigned flags)
  366. {
  367. #ifdef _CONTAINERIZED
  368. Owned<IStringIterator> targets = getContainerTargetClusters("roxie", nullptr);
  369. #else
  370. Owned<IStringIterator> targets = getTargetClusters("RoxieCluster", nullptr);
  371. #endif
  372. SCMStringBuffer s;
  373. ForEach(*targets)
  374. {
  375. if (aborting)
  376. return;
  377. loadTarget(t, targets->str(s).str(), flags);
  378. }
  379. }
  380. IPropertyTreeIterator *QueryFilesInUse::findAllQueriesUsingFile(const char *lfn)
  381. {
  382. if (!lfn || !*lfn)
  383. return NULL;
  384. Owned<IPropertyTree> t = getTree();
  385. VStringBuffer xpath("*/Query[File/@lfn='%s']", lfn);
  386. return t->getElements(xpath);
  387. }
  388. IPropertyTreeIterator *QueryFilesInUse::findQueriesUsingFile(const char *target, const char *lfn, StringAttr &pmid)
  389. {
  390. if (!lfn || !*lfn)
  391. return NULL;
  392. if (!target || !*target)
  393. return findAllQueriesUsingFile(lfn);
  394. Owned<IPropertyTree> t = getTree();
  395. IPropertyTree *targetTree = t->queryPropTree(target);
  396. if (!targetTree)
  397. return NULL;
  398. pmid.set(targetTree->queryProp("@pmid"));
  399. VStringBuffer xpath("Query[File/@lfn='%s']", lfn);
  400. return targetTree->getElements(xpath);
  401. }
  402. bool CWsWorkunitsEx::onWUCopyLogicalFiles(IEspContext &context, IEspWUCopyLogicalFilesRequest &req, IEspWUCopyLogicalFilesResponse &resp)
  403. {
  404. StringBuffer wuid(req.getWuid());
  405. WsWuHelpers::checkAndTrimWorkunit("WUCopyLogicalFiles", wuid);
  406. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  407. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  408. if (!cw)
  409. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s", wuid.str());
  410. ensureWsWorkunitAccess(context, *cw, SecAccess_Full);
  411. resp.setWuid(wuid.str());
  412. StringAttr cluster;
  413. if (notEmpty(req.getCluster()))
  414. cluster.set(req.getCluster());
  415. else
  416. cluster.set(cw->queryClusterName());
  417. validateTargetName(cluster);
  418. Owned<IConstWUClusterInfo> clusterInfo = getWUClusterInfoByName(cluster);
  419. IArrayOf<IConstWUCopyLogicalClusterFileSections> clusterfiles;
  420. PROGLOG("WUCopyLogicalFiles: %s", wuid.str());
  421. copyWULogicalFilesToTarget(context, *clusterInfo, *cw, clusterfiles, req.getCopyLocal());
  422. resp.setClusterFiles(clusterfiles);
  423. return true;
  424. }
  425. static inline unsigned remainingMsWait(unsigned wait, unsigned start)
  426. {
  427. if (wait==0 || wait==(unsigned)-1)
  428. return wait;
  429. unsigned waited = msTick()-start;
  430. return (wait>waited) ? wait-waited : 0;
  431. }
  432. #ifndef _CONTAINERIZED
  433. bool reloadCluster(IConstWUClusterInfo *clusterInfo, unsigned wait)
  434. #else
  435. bool reloadCluster(MapStringToMyClass<ISmartSocketFactory> &roxieConnMap, const char *target, unsigned wait)
  436. #endif
  437. {
  438. #ifndef _CONTAINERIZED
  439. if (0==wait || !clusterInfo || clusterInfo->getPlatform()!=RoxieCluster)
  440. return true;
  441. const SocketEndpointArray &addrs = clusterInfo->getRoxieServers();
  442. if (addrs.length() == 0)
  443. return true;
  444. #else
  445. if (0==wait)
  446. return true;
  447. ISmartSocketFactory *conn = roxieConnMap.getValue(target);
  448. if (!conn)
  449. return true;
  450. if (!isActiveK8sService(target))
  451. return true;
  452. #endif
  453. try
  454. {
  455. #ifndef _CONTAINERIZED
  456. Owned<IPropertyTree> result = sendRoxieControlAllNodes(addrs.item(0), "<control:reload/>", false, wait);
  457. #else
  458. Owned<IPropertyTree> result = sendRoxieControlAllNodes(conn, "<control:reload/>", false, wait, ROXIECONNECTIONTIMEOUT);
  459. #endif
  460. const char *status = result->queryProp("Endpoint[1]/Status");
  461. if (!status || !strieq(status, "ok"))
  462. return false;
  463. }
  464. catch(IMultiException *me)
  465. {
  466. StringBuffer err;
  467. IERRLOG("ERROR control:reloading roxie query info %s", me->errorMessage(err.append(me->errorCode()).append(' ')).str());
  468. me->Release();
  469. return false;
  470. }
  471. catch(IException *e)
  472. {
  473. StringBuffer err;
  474. IERRLOG("ERROR control:reloading roxie query info %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
  475. e->Release();
  476. return false;
  477. }
  478. return true;
  479. }
  480. #ifndef _CONTAINERIZED
  481. bool reloadCluster(const char *cluster, unsigned wait)
  482. {
  483. Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
  484. return (clusterInfo) ? reloadCluster(clusterInfo, wait) : true;
  485. }
  486. #endif
  487. static inline void updateQuerySetting(bool ignore, IPropertyTree *queryTree, const char *xpath, int value)
  488. {
  489. if (ignore || !queryTree)
  490. return;
  491. if (value!=0)
  492. queryTree->setPropInt(xpath, value);
  493. else
  494. queryTree->removeProp(xpath);
  495. }
  496. static inline void updateTimeLimitSetting(IPropertyTree *queryTree, bool valueNotSet, int value, IConstQuerySetQuery *srcInfo=NULL)
  497. {
  498. if (valueNotSet && srcInfo && !srcInfo->getTimeLimit_isNull())
  499. {
  500. value = srcInfo->getTimeLimit();
  501. valueNotSet=false;
  502. }
  503. updateQuerySetting(valueNotSet, queryTree, "@timeLimit", value);
  504. }
  505. static inline void updateWarnTimeLimitSetting(IPropertyTree *queryTree, bool valueNotSet, int value, IConstQuerySetQuery *srcInfo=NULL)
  506. {
  507. if (valueNotSet && srcInfo && !srcInfo->getWarnTimeLimit_isNull())
  508. {
  509. value = srcInfo->getWarnTimeLimit();
  510. valueNotSet=false;
  511. }
  512. updateQuerySetting(valueNotSet, queryTree, "@warnTimeLimit", value);
  513. }
  514. static inline unsigned __int64 memoryLimitUInt64FromString(const char *value)
  515. {
  516. if (!value || !*value || !isdigit(*value))
  517. return 0;
  518. unsigned __int64 result = (*value - '0');
  519. const char *s = value+1;
  520. while (isdigit(*s))
  521. {
  522. result = 10 * result + ((*s) - '0');
  523. s++;
  524. }
  525. if (*s)
  526. {
  527. const char unit = toupper(*s++);
  528. if (*s && !strieq("B", s)) //more?
  529. return 0;
  530. switch (unit)
  531. {
  532. case 'E':
  533. result <<=60;
  534. break;
  535. case 'P':
  536. result <<=50;
  537. break;
  538. case 'T':
  539. result <<=40;
  540. break;
  541. case 'G':
  542. result <<=30;
  543. break;
  544. case 'M':
  545. result <<=20;
  546. break;
  547. case 'K':
  548. result <<=10;
  549. break;
  550. case 'B':
  551. break;
  552. default:
  553. return 0;
  554. }
  555. }
  556. return result;
  557. }
  558. const char memUnitAbbrev[] = {'B', 'K', 'M', 'G', 'T', 'P', 'E'};
  559. #define MAX_MEMUNIT_ABBREV 6
  560. static inline StringBuffer &memoryLimitStringFromUInt64(StringBuffer &s, unsigned __int64 in)
  561. {
  562. if (!in)
  563. return s;
  564. unsigned __int64 value = in;
  565. unsigned char unit = 0;
  566. while (!(value & 0x3FF) && unit < MAX_MEMUNIT_ABBREV)
  567. {
  568. value >>= 10;
  569. unit++;
  570. }
  571. return s.append(value).append(memUnitAbbrev[unit]);
  572. }
  573. static inline void updateMemoryLimitSetting(IPropertyTree *queryTree, const char *value, IConstQuerySetQuery *srcInfo=NULL)
  574. {
  575. if (!queryTree)
  576. return;
  577. if (!value && srcInfo)
  578. value = srcInfo->getMemoryLimit();
  579. if (!value)
  580. return;
  581. unsigned __int64 limit = memoryLimitUInt64FromString(value);
  582. if (0==limit)
  583. queryTree->removeProp("@memoryLimit");
  584. else
  585. queryTree->setPropInt64("@memoryLimit", limit);
  586. }
  587. enum QueryPriority {
  588. QueryPriorityNone = -1,
  589. QueryPriorityLow = 0,
  590. QueryPriorityHigh = 1,
  591. QueryPrioritySLA = 2,
  592. QueryPriorityInvalid = 3
  593. };
  594. static inline const char *getQueryPriorityName(int value)
  595. {
  596. switch (value)
  597. {
  598. case QueryPriorityLow:
  599. return "LOW";
  600. case QueryPriorityHigh:
  601. return "HIGH";
  602. case QueryPrioritySLA:
  603. return "SLA";
  604. case QueryPriorityNone:
  605. return "NONE";
  606. }
  607. return "INVALID";
  608. }
  609. static inline void updateQueryPriority(IPropertyTree *queryTree, const char *value, IConstQuerySetQuery *srcInfo=NULL)
  610. {
  611. if (!queryTree)
  612. return;
  613. if ((!value || !*value) && srcInfo)
  614. value = srcInfo->getPriority();
  615. if (!value || !*value)
  616. return;
  617. int priority = QueryPriorityInvalid;
  618. if (strieq("LOW", value))
  619. priority=QueryPriorityLow;
  620. else if (strieq("HIGH", value))
  621. priority=QueryPriorityHigh;
  622. else if (strieq("SLA", value))
  623. priority=QueryPrioritySLA;
  624. else if (strieq("NONE", value))
  625. priority=QueryPriorityNone;
  626. switch (priority)
  627. {
  628. case QueryPriorityInvalid:
  629. break;
  630. case QueryPriorityNone:
  631. queryTree->removeProp("@priority");
  632. break;
  633. default:
  634. queryTree->setPropInt("@priority", priority);
  635. break;
  636. }
  637. }
  638. void gatherFileErrors(IReferencedFileList *files, IArrayOf<IConstLogicalFileError> &errors)
  639. {
  640. if (!files)
  641. return;
  642. Owned<IReferencedFileIterator> it = files->getFiles();
  643. ForEach(*it)
  644. {
  645. IReferencedFile &file = it->query();
  646. unsigned flags = file.getFlags();
  647. if (!(flags & (RefFileNotFound | RefFileCopyInfoFailed)))
  648. continue;
  649. StringBuffer msg;
  650. if (flags & RefFileOptional)
  651. msg.append("OPT ");
  652. if (flags & RefFileNotFound)
  653. msg.append("Not Found");
  654. else
  655. msg.append("Copy Failed");
  656. Owned<IEspLogicalFileError> error = createLogicalFileError();
  657. error->setLogicalName(file.getLogicalName());
  658. error->setError(msg);
  659. errors.append(*static_cast<IConstLogicalFileError*>(error.getClear()));
  660. }
  661. }
  662. class QueryFileCopier
  663. {
  664. public:
  665. QueryFileCopier(const char *target_) : target(target_) {}
  666. void init(IEspContext &context, bool allowForeignFiles)
  667. {
  668. files.setown(createReferencedFileList(context.queryUserId(), context.queryPassword(), allowForeignFiles, false));
  669. #ifndef _CONTAINERIZED
  670. clusterInfo.setown(getTargetClusterInfo(target));
  671. StringBufferAdaptor sba(process);
  672. if (clusterInfo && clusterInfo->getPlatform()==RoxieCluster)
  673. clusterInfo->getRoxieProcess(sba);
  674. if (!process.length())
  675. return;
  676. #else
  677. process.set(target);
  678. #endif
  679. ps.setown(createPackageSet(process.str()));
  680. if (ps)
  681. pm = ps->queryActiveMap(target);
  682. }
  683. void copy(IConstWorkUnit *cw, unsigned updateFlags)
  684. {
  685. StringBuffer queryid;
  686. if (queryname && *queryname)
  687. queryname = queryid.append(queryname).append(".0").str(); //prepublish dummy version number to support fuzzy match like queries="myquery.*" in package
  688. files->addFilesFromQuery(cw, pm, queryname);
  689. StringArray locations;
  690. #ifdef _CONTAINERIZED
  691. StringBuffer targetPlane;
  692. getRoxieDirectAccessPlanes(locations, targetPlane, target, true);
  693. const char * targetPlaneOrGroup = targetPlane;
  694. #else
  695. const char * targetPlaneOrGroup = process;
  696. locations.append(targetPlaneOrGroup);
  697. #endif
  698. files->resolveFiles(locations, remoteIP, remotePrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES)), true, false, true);
  699. Owned<IDFUhelper> helper = createIDFUhelper();
  700. #ifdef _CONTAINERIZED
  701. files->cloneAllInfo(targetPlaneOrGroup, updateFlags, helper, true, true, 0, 1, 0, nullptr);
  702. #else
  703. StringBuffer defReplicateFolder;
  704. getConfigurationDirectory(NULL, "data2", "roxie", process.str(), defReplicateFolder);
  705. files->cloneAllInfo(targetPlaneOrGroup, updateFlags, helper, true, true, clusterInfo->getRoxieRedundancy(), clusterInfo->getChannelsPerNode(), clusterInfo->getRoxieReplicateOffset(), defReplicateFolder);
  706. #endif
  707. }
  708. void gatherFileErrors(IArrayOf<IConstLogicalFileError> &errors)
  709. {
  710. ::gatherFileErrors(files, errors);
  711. }
  712. private:
  713. #ifndef _CONTAINERIZED
  714. Owned <IConstWUClusterInfo> clusterInfo;
  715. #endif
  716. Owned<IHpccPackageSet> ps;
  717. const IHpccPackageMap *pm = nullptr;
  718. StringAttr target;
  719. public:
  720. Owned<IReferencedFileList> files;
  721. StringBuffer process;
  722. StringAttr remoteIP;
  723. StringAttr remotePrefix;
  724. StringAttr srcCluster;
  725. StringAttr queryname;
  726. };
  727. #ifndef _CONTAINERIZED
  728. bool CWsWorkunitsEx::isQuerySuspended(const char* query, IConstWUClusterInfo* clusterInfo, unsigned wait, StringBuffer& errorMessage)
  729. #else
  730. bool CWsWorkunitsEx::isQuerySuspended(const char* query, const char* target, unsigned wait, StringBuffer& errorMessage)
  731. #endif
  732. {
  733. try
  734. {
  735. #ifndef _CONTAINERIZED
  736. if (0==wait || !clusterInfo || clusterInfo->getPlatform()!=RoxieCluster)
  737. return false;
  738. const SocketEndpointArray &addrs = clusterInfo->getRoxieServers();
  739. if (addrs.length() < 1)
  740. return false;
  741. #else
  742. if (0==wait)
  743. return false;
  744. ISmartSocketFactory *conn = roxieConnMap.getValue(target);
  745. if (!conn)
  746. return false;
  747. if (!isActiveK8sService(target))
  748. return false;
  749. #endif
  750. StringBuffer control;
  751. control.appendf("<control:queries><Query id='%s'/></control:queries>", query);
  752. #ifndef _CONTAINERIZED
  753. Owned<IPropertyTree> result = sendRoxieControlAllNodes(addrs.item(0), control.str(), false, wait);
  754. #else
  755. Owned<IPropertyTree> result = sendRoxieControlAllNodes(conn, control, false, wait, ROXIECONNECTIONTIMEOUT);
  756. #endif
  757. if (!result)
  758. return false;
  759. Owned<IPropertyTreeIterator> suspendedQueries = result->getElements("Endpoint/Queries/Query[@suspended='1']");
  760. if (!suspendedQueries->first())
  761. return false;
  762. errorMessage.set(suspendedQueries->query().queryProp("@error"));
  763. return true;
  764. }
  765. catch(IMultiException *me)
  766. {
  767. StringBuffer err;
  768. IERRLOG("ERROR control:queries roxie query info %s", me->errorMessage(err.append(me->errorCode()).append(' ')).str());
  769. me->Release();
  770. return false;
  771. }
  772. catch(IException *e)
  773. {
  774. StringBuffer err;
  775. IERRLOG("ERROR control:queries roxie query info %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
  776. e->Release();
  777. return false;
  778. }
  779. }
  780. bool CWsWorkunitsEx::onWUPublishWorkunit(IEspContext &context, IEspWUPublishWorkunitRequest & req, IEspWUPublishWorkunitResponse & resp)
  781. {
  782. StringBuffer wuid(req.getWuid());
  783. WsWuHelpers::checkAndTrimWorkunit("WUPublishWorkunit", wuid);
  784. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  785. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  786. if (!cw)
  787. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot find the workunit %s", wuid.str());
  788. ensureWsWorkunitAccess(context, *cw, SecAccess_Full);
  789. resp.setWuid(wuid.str());
  790. StringAttr queryName;
  791. if (notEmpty(req.getQueryName()))
  792. queryName.set(req.getQueryName());
  793. else if (notEmpty(req.getJobName()))
  794. queryName.set(req.getJobName());
  795. else
  796. queryName.set(cw->queryJobName());
  797. if (!queryName.length())
  798. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Query/Job name not defined for publishing workunit %s", wuid.str());
  799. StringAttr target;
  800. if (notEmpty(req.getCluster()))
  801. target.set(req.getCluster());
  802. else
  803. target.set(cw->queryClusterName());
  804. validateTargetName(target);
  805. DBGLOG("%s publishing wuid %s to target %s as query %s", nullText(context.queryUserId()), wuid.str(), target.str(), queryName.str());
  806. StringBuffer daliIP;
  807. StringBuffer srcCluster;
  808. StringBuffer srcPrefix;
  809. splitDerivedDfsLocation(req.getRemoteDali(), srcCluster, daliIP, srcPrefix, req.getSourceProcess(),req.getSourceProcess(), NULL, NULL);
  810. if (srcCluster.length())
  811. {
  812. if (!validateDataPlaneName(daliIP, srcCluster))
  813. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Process cluster %s not found on %s DALI", srcCluster.str(), daliIP.length() ? daliIP.str() : "local");
  814. }
  815. unsigned updateFlags = 0;
  816. if (req.getUpdateDfs())
  817. updateFlags |= (DALI_UPDATEF_SUPERFILES | DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM);
  818. if (req.getUpdateCloneFrom())
  819. updateFlags |= DALI_UPDATEF_CLONE_FROM;
  820. if (req.getUpdateSuperFiles())
  821. updateFlags |= DALI_UPDATEF_SUPERFILES;
  822. if (req.getAppendCluster())
  823. updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
  824. if (!req.getDontCopyFiles())
  825. {
  826. QueryFileCopier cpr(target);
  827. cpr.init(context, req.getAllowForeignFiles());
  828. cpr.remoteIP.set(daliIP);
  829. cpr.remotePrefix.set(srcPrefix);
  830. cpr.srcCluster.set(srcCluster);
  831. cpr.queryname.set(queryName);
  832. cpr.copy(cw, updateFlags);
  833. if (req.getIncludeFileErrors())
  834. cpr.gatherFileErrors(resp.getFileErrors());
  835. }
  836. WorkunitUpdate wu(&cw->lock());
  837. if (notEmpty(req.getWorkUnitJobName()))
  838. wu->setJobName(req.getWorkUnitJobName());
  839. else if (req.getUpdateWorkUnitName() && notEmpty(req.getJobName()))
  840. wu->setJobName(req.getJobName());
  841. StringBuffer queryId;
  842. WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
  843. addQueryToQuerySet(wu, target.str(), queryName.str(), activate, queryId, context.queryUserId());
  844. if (req.getMemoryLimit() || !req.getTimeLimit_isNull() || !req.getWarnTimeLimit_isNull() || req.getPriority() || req.getComment())
  845. {
  846. Owned<IPropertyTree> queryTree = getQueryById(target.str(), queryId, false);
  847. updateMemoryLimitSetting(queryTree, req.getMemoryLimit());
  848. updateQuerySetting(req.getTimeLimit_isNull(), queryTree, "@timeLimit", req.getTimeLimit());
  849. updateQuerySetting(req.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", req.getWarnTimeLimit());
  850. updateQueryPriority(queryTree, req.getPriority());
  851. if (req.getComment())
  852. queryTree->setProp("@comment", req.getComment());
  853. }
  854. wu->commit();
  855. wu.clear();
  856. if (queryId.length())
  857. resp.setQueryId(queryId.str());
  858. resp.setQueryName(queryName.str());
  859. resp.setQuerySet(target.str());
  860. bool reloadFailed = false;
  861. #ifndef _CONTAINERIZED
  862. Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target.str());
  863. #endif
  864. if (0!=req.getWait() && !req.getNoReload())
  865. #ifndef _CONTAINERIZED
  866. reloadFailed = !reloadCluster(clusterInfo, (unsigned)req.getWait());
  867. #else
  868. reloadFailed = !reloadCluster(roxieConnMap, target, (unsigned)req.getWait());
  869. #endif
  870. resp.setReloadFailed(reloadFailed);
  871. double version = context.getClientVersion();
  872. if (version > 1.38)
  873. {
  874. StringBuffer errorMessage;
  875. #ifndef _CONTAINERIZED
  876. if (!reloadFailed && !req.getNoReload() && isQuerySuspended(queryName.str(), clusterInfo, (unsigned)req.getWait(), errorMessage))
  877. #else
  878. if (!reloadFailed && !req.getNoReload() && isQuerySuspended(queryName.str(), target, (unsigned)req.getWait(), errorMessage))
  879. #endif
  880. {
  881. resp.setSuspended(true);
  882. resp.setErrorMessage(errorMessage);
  883. }
  884. }
  885. return true;
  886. }
  887. bool CWsWorkunitsEx::onWUQuerysets(IEspContext &context, IEspWUQuerysetsRequest & req, IEspWUQuerysetsResponse & resp)
  888. {
  889. IArrayOf<IEspQuerySet> querySets;
  890. #ifdef _CONTAINERIZED
  891. Owned<IStringIterator> targets = getContainerTargetClusters(nullptr, nullptr);
  892. #else
  893. Owned<IStringIterator> targets = getTargetClusters(nullptr, nullptr);
  894. #endif
  895. SCMStringBuffer target;
  896. ForEach(*targets)
  897. {
  898. Owned<IEspQuerySet> qs = createQuerySet();
  899. qs->setQuerySetName(targets->str(target).str());
  900. querySets.append(*qs.getClear());
  901. }
  902. resp.setQuerysets(querySets);
  903. return true;
  904. }
  905. void addClusterQueryStates(IPropertyTree* queriesOnCluster, const char *target, const char *id, IArrayOf<IEspClusterQueryState>& clusterStates, double version)
  906. {
  907. if (queriesOnCluster)
  908. queriesOnCluster = queriesOnCluster->queryPropTree("Endpoint[1]/Queries[1]");
  909. if (!queriesOnCluster)
  910. return;
  911. int reporting = queriesOnCluster->getPropInt("@reporting");
  912. Owned<IEspClusterQueryState> clusterState = createClusterQueryState();
  913. clusterState->setCluster(target);
  914. VStringBuffer xpath("Query[@id='%s']", id);
  915. IPropertyTree *query = queriesOnCluster->queryPropTree(xpath.str());
  916. if (!query)
  917. clusterState->setState("Not Found");
  918. else
  919. {
  920. int suspended = query->getPropInt("@suspended");
  921. const char* error = query->queryProp("@error");
  922. if (0==suspended)
  923. clusterState->setState("Available");
  924. else
  925. {
  926. clusterState->setState("Suspended");
  927. if (suspended<reporting)
  928. clusterState->setMixedNodeStates(true);
  929. }
  930. if (error && *error)
  931. clusterState->setErrors(error);
  932. }
  933. clusterStates.append(*clusterState.getClear());
  934. }
  935. template<typename T>
  936. void checkAndSetQueryPriority(double version, IPropertyTree *query, T *ret)
  937. {
  938. if (!query->hasProp("@priority"))
  939. return;
  940. int priorityID = query->getPropInt("@priority");
  941. ret->setPriority(getQueryPriorityName(priorityID));
  942. if (version >= 1.83)
  943. ret->setPriorityID(priorityID);
  944. }
  945. void gatherQuerySetQueryDetails(IEspContext &context, IPropertyTree *query, IEspQuerySetQuery *queryInfo, const char *cluster, IPropertyTree *queriesOnCluster)
  946. {
  947. double version = context.getClientVersion();
  948. queryInfo->setId(query->queryProp("@id"));
  949. queryInfo->setName(query->queryProp("@name"));
  950. queryInfo->setDll(query->queryProp("@dll"));
  951. queryInfo->setWuid(query->queryProp("@wuid"));
  952. queryInfo->setSuspended(query->getPropBool("@suspended", false));
  953. if (query->hasProp("@memoryLimit"))
  954. {
  955. StringBuffer s;
  956. memoryLimitStringFromUInt64(s, query->getPropInt64("@memoryLimit"));
  957. queryInfo->setMemoryLimit(s);
  958. }
  959. if (query->hasProp("@timeLimit"))
  960. queryInfo->setTimeLimit(query->getPropInt("@timeLimit"));
  961. if (query->hasProp("@warnTimeLimit"))
  962. queryInfo->setWarnTimeLimit(query->getPropInt("@warnTimeLimit"));
  963. checkAndSetQueryPriority(version, query, queryInfo);
  964. if (query->hasProp("@comment"))
  965. queryInfo->setComment(query->queryProp("@comment"));
  966. if (query->hasProp("@snapshot"))
  967. queryInfo->setSnapshot(query->queryProp("@snapshot"));
  968. if (version >= 1.46)
  969. {
  970. queryInfo->setPublishedBy(query->queryProp("@publishedBy"));
  971. queryInfo->setIsLibrary(query->getPropBool("@isLibrary"));
  972. }
  973. if (queriesOnCluster)
  974. {
  975. IArrayOf<IEspClusterQueryState> clusters;
  976. addClusterQueryStates(queriesOnCluster, cluster, query->queryProp("@id"), clusters, version);
  977. queryInfo->setClusters(clusters);
  978. }
  979. }
  980. void gatherQuerySetAliasDetails(IPropertyTree *alias, IEspQuerySetAlias *aliasInfo)
  981. {
  982. aliasInfo->setName(alias->queryProp("@name"));
  983. aliasInfo->setId(alias->queryProp("@id"));
  984. }
  985. void retrieveAllQuerysetDetails(IEspContext &context, IPropertyTree *registry, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL, const char *type=NULL, const char *value=NULL)
  986. {
  987. Owned<IPropertyTreeIterator> regQueries = registry->getElements("Query");
  988. ForEach(*regQueries)
  989. {
  990. IPropertyTree &query = regQueries->query();
  991. Owned<IEspQuerySetQuery> q = createQuerySetQuery();
  992. gatherQuerySetQueryDetails(context, &query, q, cluster, queriesOnCluster);
  993. if (isEmpty(cluster) || isEmpty(type) || isEmpty(value) || !strieq(type, "Status"))
  994. queries.append(*q.getClear());
  995. else
  996. {
  997. IArrayOf<IConstClusterQueryState>& cs = q->getClusters();
  998. ForEachItemIn(i, cs)
  999. {
  1000. IConstClusterQueryState& c = cs.item(i);
  1001. if (strieq(c.getCluster(), cluster) && (strieq(value, "All") || strieq(c.getState(), value)))
  1002. {
  1003. queries.append(*q.getClear());
  1004. break;
  1005. }
  1006. }
  1007. }
  1008. }
  1009. Owned<IPropertyTreeIterator> regAliases = registry->getElements("Alias");
  1010. ForEach(*regAliases)
  1011. {
  1012. IPropertyTree &alias = regAliases->query();
  1013. Owned<IEspQuerySetAlias> a = createQuerySetAlias();
  1014. gatherQuerySetAliasDetails(&alias, a);
  1015. aliases.append(*a.getClear());
  1016. }
  1017. }
  1018. void retrieveQuerysetDetailsFromAlias(IEspContext &context, IPropertyTree *registry, const char *name, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster, IPropertyTree *queriesOnCluster)
  1019. {
  1020. StringBuffer xpath;
  1021. xpath.append("Alias[@name='").append(name).append("']");
  1022. Owned<IPropertyTreeIterator> regAliases = registry->getElements(xpath.str());
  1023. if (!regAliases->first())
  1024. {
  1025. UWARNLOG("Alias %s not found", name);
  1026. return;
  1027. }
  1028. ForEach(*regAliases)
  1029. {
  1030. IPropertyTree& alias = regAliases->query();
  1031. Owned<IEspQuerySetAlias> a = createQuerySetAlias();
  1032. gatherQuerySetAliasDetails(&alias, a);
  1033. xpath.clear().append("Query[@id='").append(a->getId()).append("']");
  1034. aliases.append(*a.getClear());
  1035. IPropertyTree *query = registry->queryPropTree(xpath);
  1036. if (!query)
  1037. {
  1038. UWARNLOG("No matching Query %s found for Alias %s", a->getId(), name);
  1039. return;
  1040. }
  1041. Owned<IEspQuerySetQuery> q = createQuerySetQuery();
  1042. gatherQuerySetQueryDetails(context, query, q, cluster, queriesOnCluster);
  1043. queries.append(*q.getClear());
  1044. }
  1045. }
  1046. void retrieveQuerysetDetailsFromQuery(IEspContext &context, IPropertyTree *registry, const char *value, const char *type, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
  1047. {
  1048. if (!strieq(type, "Id") && !strieq(type, "Name"))
  1049. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Unrecognized queryset filter type %s", type);
  1050. StringBuffer attributeName(type);
  1051. StringBuffer xpath;
  1052. xpath.clear().append("Query[@").append(attributeName.toLowerCase()).append("='").append(value).append("']");
  1053. Owned<IPropertyTreeIterator> regQueries = registry->getElements(xpath.str());
  1054. if (!regQueries->first())
  1055. {
  1056. UWARNLOG("No matching Query %s found for %s", value, type);
  1057. return;
  1058. }
  1059. ForEach(*regQueries)
  1060. {
  1061. IPropertyTree& query = regQueries->query();
  1062. Owned<IEspQuerySetQuery> q = createQuerySetQuery();
  1063. gatherQuerySetQueryDetails(context, &query, q, cluster, queriesOnCluster);
  1064. xpath.clear().append("Alias[@id='").append(q->getId()).append("']");
  1065. queries.append(*q.getClear());
  1066. Owned<IPropertyTreeIterator> regAliases = registry->getElements(xpath.str());
  1067. ForEach(*regAliases)
  1068. {
  1069. IPropertyTree &alias = regAliases->query();
  1070. Owned<IEspQuerySetAlias> a = createQuerySetAlias();
  1071. gatherQuerySetAliasDetails(&alias, a);
  1072. aliases.append(*a.getClear());
  1073. }
  1074. }
  1075. }
  1076. void retrieveQuerysetDetails(IEspContext &context, IPropertyTree *registry, const char *type, const char *value, IArrayOf<IEspQuerySetQuery> &queries, IArrayOf<IEspQuerySetAlias> &aliases, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
  1077. {
  1078. if (strieq(type, "All"))
  1079. return retrieveAllQuerysetDetails(context, registry, queries, aliases, cluster, queriesOnCluster);
  1080. if (!value || !*value)
  1081. return;
  1082. if (strieq(type, "Alias"))
  1083. return retrieveQuerysetDetailsFromAlias(context, registry, value, queries, aliases, cluster, queriesOnCluster);
  1084. if (strieq(type, "Status") && !isEmpty(cluster))
  1085. return retrieveAllQuerysetDetails(context, registry, queries, aliases, cluster, queriesOnCluster, type, value);
  1086. return retrieveQuerysetDetailsFromQuery(context, registry, value, type, queries, aliases, cluster, queriesOnCluster);
  1087. }
  1088. void retrieveQuerysetDetails(IEspContext &context, IArrayOf<IEspWUQuerySetDetail> &details, IPropertyTree *registry, const char *type, const char *value, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
  1089. {
  1090. if (!registry)
  1091. return;
  1092. IArrayOf<IEspQuerySetQuery> queries;
  1093. IArrayOf<IEspQuerySetAlias> aliases;
  1094. retrieveQuerysetDetails(context, registry, type, value, queries, aliases, cluster, queriesOnCluster);
  1095. Owned<IEspWUQuerySetDetail> detail = createWUQuerySetDetail();
  1096. detail->setQuerySetName(registry->queryProp("@id"));
  1097. detail->setQueries(queries);
  1098. detail->setAliases(aliases);
  1099. details.append(*detail.getClear());
  1100. }
  1101. void retrieveQuerysetDetails(IEspContext &context, IArrayOf<IEspWUQuerySetDetail> &details, const char *queryset, const char *type, const char *value, const char *cluster=NULL, IPropertyTree *queriesOnCluster=NULL)
  1102. {
  1103. if (!queryset || !*queryset)
  1104. return;
  1105. Owned<IPropertyTree> registry = getQueryRegistry(queryset, true);
  1106. if (!registry)
  1107. return;
  1108. retrieveQuerysetDetails(context, details, registry, type, value, cluster, queriesOnCluster);
  1109. }
  1110. #ifndef _CONTAINERIZED
  1111. IPropertyTree *getQueriesOnCluster(const char *target, const char *queryset, StringArray *queryIDs, bool checkAllNodes)
  1112. #else
  1113. IPropertyTree *getQueriesOnCluster(const char *target, const char *queryset, StringArray *queryIDs, bool checkAllNodes,
  1114. MapStringToMyClass<ISmartSocketFactory> &roxieConnMap)
  1115. #endif
  1116. {
  1117. if (isEmpty(target))
  1118. target = queryset;
  1119. else if (queryset && *queryset && !strieq(target, queryset))
  1120. throw makeStringExceptionV(ECLWATCH_QUERYSET_NOT_ON_CLUSTER, "Target %s and QuerySet %s should match", target, queryset);
  1121. #ifndef _CONTAINERIZED
  1122. Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
  1123. if (!info)
  1124. throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Cluster %s not found", target);
  1125. if (info->getPlatform()!=RoxieCluster)
  1126. return NULL;
  1127. const SocketEndpointArray &eps = info->getRoxieServers();
  1128. if (!eps.length())
  1129. return NULL;
  1130. #else
  1131. ISmartSocketFactory *conn = roxieConnMap.getValue(target);
  1132. if (!conn)
  1133. return nullptr;
  1134. if (!isActiveK8sService(target))
  1135. return nullptr;
  1136. #endif
  1137. try
  1138. {
  1139. StringBuffer control;
  1140. if (!queryIDs || (queryIDs->ordinality() == 0))
  1141. control.append("<control:queries/>");
  1142. else
  1143. {
  1144. control.append("<control:queries>");
  1145. ForEachItemIn(i, *queryIDs)
  1146. control.appendf("<Query id='%s'/>", queryIDs->item(i));
  1147. control.append("</control:queries>");
  1148. }
  1149. #ifndef _CONTAINERIZED
  1150. Owned<ISocket> sock = ISocket::connect_timeout(eps.item(0), ROXIECONNECTIONTIMEOUT);
  1151. if (checkAllNodes)
  1152. return sendRoxieControlAllNodes(sock, control, false, ROXIECONTROLQUERIESTIMEOUT);
  1153. else
  1154. return sendRoxieControlQuery(sock, control, ROXIECONTROLQUERIESTIMEOUT);
  1155. #else
  1156. if (checkAllNodes)
  1157. return sendRoxieControlAllNodes(conn, control, false, ROXIECONTROLQUERIESTIMEOUT, ROXIECONNECTIONTIMEOUT);
  1158. else
  1159. return sendRoxieControlQuery(conn, control, ROXIECONTROLQUERIESTIMEOUT, ROXIECONNECTIONTIMEOUT);
  1160. #endif
  1161. }
  1162. catch(IException* e)
  1163. {
  1164. StringBuffer err;
  1165. DBGLOG("Get exception in control:queries: %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
  1166. e->Release();
  1167. return NULL;
  1168. }
  1169. }
  1170. #ifndef _CONTAINERIZED
  1171. void retrieveQuerysetDetailsByCluster(IEspContext &context, IArrayOf<IEspWUQuerySetDetail> &details, const char *target, const char *queryset, const char *type, const char *value, bool checkAllNodes)
  1172. {
  1173. Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(target, queryset, nullptr, checkAllNodes);
  1174. #else
  1175. void retrieveQuerysetDetailsByCluster(IEspContext &context, IArrayOf<IEspWUQuerySetDetail> &details,
  1176. const char *target, const char *queryset, const char *type, const char *value, bool checkAllNodes,
  1177. MapStringToMyClass<ISmartSocketFactory> &roxieConnMap)
  1178. {
  1179. Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(target, queryset, nullptr, checkAllNodes, roxieConnMap);
  1180. #endif
  1181. retrieveQuerysetDetails(context, details, target, type, value, target, queriesOnCluster);
  1182. }
  1183. void retrieveAllQuerysetDetails(IEspContext &context, IArrayOf<IEspWUQuerySetDetail> &details, const char *type, const char *value)
  1184. {
  1185. Owned<IPropertyTree> root = getQueryRegistryRoot();
  1186. if (!root)
  1187. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "QuerySet Registry not found");
  1188. Owned<IPropertyTreeIterator> querysets = root->getElements("QuerySet");
  1189. ForEach(*querysets)
  1190. retrieveQuerysetDetails(context, details, &querysets->query(), type, value);
  1191. }
  1192. bool CWsWorkunitsEx::onWUQuerysetDetails(IEspContext &context, IEspWUQuerySetDetailsRequest & req, IEspWUQuerySetDetailsResponse & resp)
  1193. {
  1194. resp.setQuerySetName(req.getQuerySetName());
  1195. double version = context.getClientVersion();
  1196. if (version > 1.36)
  1197. {
  1198. Owned<IPropertyTree> queryRegistry = getQueryRegistry(req.getQuerySetName(), false);
  1199. resp.setClusterName(req.getClusterName());
  1200. resp.setFilter(req.getFilter());
  1201. resp.setFilterType(req.getFilterType());
  1202. }
  1203. Owned<IPropertyTree> registry = getQueryRegistry(req.getQuerySetName(), true);
  1204. if (!registry)
  1205. return false;
  1206. PROGLOG("WUQuerysetDetails for queryset %s", req.getQuerySetName());
  1207. IArrayOf<IEspQuerySetQuery> respQueries;
  1208. IArrayOf<IEspQuerySetAlias> respAliases;
  1209. if (isEmpty(req.getClusterName()) || isEmpty(req.getFilterTypeAsString()) || !strieq(req.getFilterTypeAsString(), "Status") || isEmpty(req.getFilter()))
  1210. {
  1211. const char* cluster = req.getClusterName();
  1212. if (isEmpty(cluster))
  1213. cluster = req.getQuerySetName();
  1214. #ifndef _CONTAINERIZED
  1215. Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(cluster, req.getQuerySetName(), nullptr, req.getCheckAllNodes());
  1216. #else
  1217. Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(cluster, req.getQuerySetName(), nullptr, req.getCheckAllNodes(), roxieConnMap);
  1218. #endif
  1219. retrieveQuerysetDetails(context, registry, req.getFilterTypeAsString(), req.getFilter(), respQueries, respAliases, cluster, queriesOnCluster);
  1220. resp.setQuerysetQueries(respQueries);
  1221. resp.setQuerysetAliases(respAliases);
  1222. }
  1223. else
  1224. {
  1225. IArrayOf<IEspWUQuerySetDetail> respDetails;
  1226. #ifndef _CONTAINERIZED
  1227. retrieveQuerysetDetailsByCluster(context, respDetails, req.getClusterName(), req.getQuerySetName(), req.getFilterTypeAsString(), req.getFilter(), false);
  1228. #else
  1229. retrieveQuerysetDetailsByCluster(context, respDetails, req.getClusterName(), req.getQuerySetName(), req.getFilterTypeAsString(), req.getFilter(), false,
  1230. roxieConnMap);
  1231. #endif
  1232. if (respDetails.ordinality())
  1233. {
  1234. IEspWUQuerySetDetail& detail = respDetails.item(0);
  1235. resp.setQuerysetQueries(detail.getQueries());
  1236. resp.setQuerysetAliases(detail.getAliases());
  1237. }
  1238. }
  1239. return true;
  1240. }
  1241. bool CWsWorkunitsEx::onWUMultiQuerysetDetails(IEspContext &context, IEspWUMultiQuerySetDetailsRequest & req, IEspWUMultiQuerySetDetailsResponse & resp)
  1242. {
  1243. IArrayOf<IEspWUQuerySetDetail> respDetails;
  1244. if (notEmpty(req.getClusterName()))
  1245. {
  1246. PROGLOG("WUMultiQuerysetDetails for cluster %s", req.getClusterName());
  1247. #ifndef _CONTAINERIZED
  1248. retrieveQuerysetDetailsByCluster(context, respDetails, req.getClusterName(), req.getQuerySetName(), req.getFilterTypeAsString(), req.getFilter(), req.getCheckAllNodes());
  1249. #else
  1250. retrieveQuerysetDetailsByCluster(context, respDetails, req.getClusterName(), req.getQuerySetName(), req.getFilterTypeAsString(), req.getFilter(), req.getCheckAllNodes(),
  1251. roxieConnMap);
  1252. #endif
  1253. }
  1254. else if (notEmpty(req.getQuerySetName()))
  1255. {
  1256. PROGLOG("WUMultiQuerysetDetails for queryset %s", req.getQuerySetName());
  1257. retrieveQuerysetDetails(context, respDetails, req.getQuerySetName(), req.getFilterTypeAsString(), req.getFilter());
  1258. }
  1259. else
  1260. {
  1261. VStringBuffer logMsg("WUMultiQuerysetDetails: FilterType %s", req.getFilterTypeAsString());
  1262. if (notEmpty(req.getFilter()))
  1263. logMsg.append(", Filter ").append(req.getFilter());
  1264. PROGLOG("%s", logMsg.str());
  1265. retrieveAllQuerysetDetails(context, respDetails, req.getFilterTypeAsString(), req.getFilter());
  1266. }
  1267. resp.setQuerysets(respDetails);
  1268. return true;
  1269. }
  1270. bool addWUQSQueryFilter(WUQuerySortField *filters, unsigned short &count, MemoryBuffer &buff, const char* value, WUQuerySortField name)
  1271. {
  1272. if (isEmpty(value))
  1273. return false;
  1274. filters[count++] = name;
  1275. buff.append(value);
  1276. return true;
  1277. }
  1278. bool addWUQSQueryFilterInt(WUQuerySortField *filters, unsigned short &count, MemoryBuffer &buff, int value, WUQuerySortField name)
  1279. {
  1280. VStringBuffer vBuf("%d", value);
  1281. filters[count++] = name;
  1282. buff.append(vBuf.str());
  1283. return true;
  1284. }
  1285. bool addWUQSQueryFilterInt64(WUQuerySortField *filters, unsigned short &count, MemoryBuffer &buff, __int64 value, WUQuerySortField name)
  1286. {
  1287. VStringBuffer vBuf("%" I64F "d", value);
  1288. filters[count++] = name;
  1289. buff.append(vBuf.str());
  1290. return true;
  1291. }
  1292. unsigned CWsWorkunitsEx::getGraphIdsByQueryId(const char *target, const char *queryId, StringArray& graphIds)
  1293. {
  1294. if (!target || !*target)
  1295. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Target name required");
  1296. if (!queryId || !*queryId)
  1297. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Query Id required");
  1298. #ifndef _CONTAINERIZED
  1299. Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
  1300. if (!info || (info->getPlatform()!=RoxieCluster)) //Only roxie query has query graph.
  1301. return 0;
  1302. const SocketEndpointArray &eps = info->getRoxieServers();
  1303. if (eps.empty())
  1304. return 0;
  1305. #else
  1306. ISmartSocketFactory *conn = roxieConnMap.getValue(target);
  1307. if (!conn)
  1308. return 0;
  1309. if (!isActiveK8sService(target))
  1310. return 0;
  1311. #endif
  1312. VStringBuffer xpath("<control:querystats><Query id='%s'/></control:querystats>", queryId);
  1313. #ifndef _CONTAINERIZED
  1314. Owned<ISocket> sock = ISocket::connect_timeout(eps.item(0), ROXIECONNECTIONTIMEOUT);
  1315. Owned<IPropertyTree> querystats = sendRoxieControlQuery(sock, xpath.str(), ROXIECONTROLQUERYTIMEOUT);
  1316. #else
  1317. Owned<IPropertyTree> querystats = sendRoxieControlQuery(conn, xpath.str(), ROXIECONTROLQUERYTIMEOUT, ROXIECONNECTIONTIMEOUT);
  1318. #endif
  1319. if (!querystats)
  1320. return 0;
  1321. Owned<IPropertyTreeIterator> graphs = querystats->getElements("Endpoint/Query/Graph");
  1322. ForEach(*graphs)
  1323. {
  1324. IPropertyTree &graph = graphs->query();
  1325. const char* graphId = graph.queryProp("@id");
  1326. if (graphId && *graphId)
  1327. graphIds.appendUniq(graphId);
  1328. }
  1329. return graphIds.length();
  1330. }
  1331. //This method is thread safe because a query belongs to a single queryset. The method may be called by different threads.
  1332. //Since one thread is for one queryset and a query only belongs to a single queryset, it is impossible for different threads
  1333. //to update the same query object.
  1334. void CWsWorkunitsEx::checkAndSetClusterQueryState(IEspContext &context, const char* cluster, const char* querySetId, IArrayOf<IEspQuerySetQuery>& queries, bool checkAllNodes)
  1335. {
  1336. try
  1337. {
  1338. double version = context.getClientVersion();
  1339. if (isEmpty(cluster))
  1340. cluster = querySetId;
  1341. StringArray queryIDs;
  1342. ForEachItemIn(j, queries)
  1343. {
  1344. IEspQuerySetQuery& query = queries.item(j);
  1345. const char* queryId = query.getId();
  1346. const char* querySetId0 = query.getQuerySetId();
  1347. if (queryId && querySetId0 && strieq(querySetId0, querySetId))
  1348. queryIDs.append(queryId);
  1349. }
  1350. if (queryIDs.ordinality() == 0)
  1351. return;
  1352. #ifndef _CONTAINERIZED
  1353. Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(cluster, querySetId, &queryIDs, checkAllNodes);
  1354. #else
  1355. Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(cluster, querySetId, &queryIDs, checkAllNodes, roxieConnMap);
  1356. #endif
  1357. if (!queriesOnCluster)
  1358. {
  1359. UWARNLOG("getQueriesOnCluster() returns NULL for cluster<%s> and querySetId<%s>", cluster, querySetId);
  1360. return;
  1361. }
  1362. ForEachItemIn(i, queries)
  1363. {
  1364. IEspQuerySetQuery& query = queries.item(i);
  1365. const char* queryId = query.getId();
  1366. const char* querySetId0 = query.getQuerySetId();
  1367. if (!queryId || !querySetId0 || !strieq(querySetId0, querySetId))
  1368. continue;
  1369. IArrayOf<IEspClusterQueryState> clusters;
  1370. addClusterQueryStates(queriesOnCluster, cluster, queryId, clusters, version);
  1371. query.setClusters(clusters);
  1372. }
  1373. }
  1374. catch(IException *e)
  1375. {
  1376. EXCLOG(e, "CWsWorkunitsEx::checkAndSetClusterQueryState: Failed to read Query State On Cluster");
  1377. e->Release();
  1378. }
  1379. }
  1380. void CWsWorkunitsEx::checkAndSetClusterQueryState(IEspContext &context, const char* cluster, StringArray& querySetIds, IArrayOf<IEspQuerySetQuery>& queries, bool checkAllNodes)
  1381. {
  1382. UnsignedArray threadHandles;
  1383. ForEachItemIn(i, querySetIds)
  1384. {
  1385. const char* querySetId = querySetIds.item(i);
  1386. if(!querySetId || !*querySetId)
  1387. continue;
  1388. Owned<CClusterQueryStateParam> threadReq = new CClusterQueryStateParam(this, context, cluster, querySetId, queries, checkAllNodes);
  1389. PooledThreadHandle handle = clusterQueryStatePool->start( threadReq.getClear() );
  1390. threadHandles.append(handle);
  1391. }
  1392. //block for worker threads to finish, if necessary and then collect results
  1393. //Not use joinAll() because multiple threads may call this method. Each call uses the pool to create
  1394. //its own threads of checking query state. Each call should only join the ones created by that call.
  1395. ForEachItemIn(ii, threadHandles)
  1396. clusterQueryStatePool->join(threadHandles.item(ii));
  1397. }
  1398. bool CWsWorkunitsEx::onWUListQueries(IEspContext &context, IEspWUListQueriesRequest & req, IEspWUListQueriesResponse & resp)
  1399. {
  1400. bool descending = req.getDescending();
  1401. const char *sortBy = req.getSortby();
  1402. WUQuerySortField sortOrder[2] = {WUQSFId, WUQSFterm};
  1403. if(notEmpty(sortBy))
  1404. {
  1405. if (strieq(sortBy, "Name"))
  1406. sortOrder[0] = WUQSFname;
  1407. else if (strieq(sortBy, "WUID"))
  1408. sortOrder[0] = WUQSFwuid;
  1409. else if (strieq(sortBy, "DLL"))
  1410. sortOrder[0] = WUQSFdll;
  1411. else if (strieq(sortBy, "Activated"))
  1412. sortOrder[0] = WUQSFActivited;
  1413. else if (strieq(sortBy, "MemoryLimit"))
  1414. sortOrder[0] = (WUQuerySortField) (WUQSFmemoryLimit | WUQSFnumeric);
  1415. else if (strieq(sortBy, "TimeLimit"))
  1416. sortOrder[0] = (WUQuerySortField) (WUQSFtimeLimit | WUQSFnumeric);
  1417. else if (strieq(sortBy, "WarnTimeLimit"))
  1418. sortOrder[0] = (WUQuerySortField) (WUQSFwarnTimeLimit | WUQSFnumeric);
  1419. else if (strieq(sortBy, "Priority"))
  1420. sortOrder[0] = (WUQuerySortField) (WUQSFpriority | WUQSFnumeric);
  1421. else if (strieq(sortBy, "PublishedBy"))
  1422. sortOrder[0] = WUQSFPublishedBy;
  1423. else if (strieq(sortBy, "QuerySetId"))
  1424. sortOrder[0] = WUQSFQuerySet;
  1425. else
  1426. sortOrder[0] = WUQSFId;
  1427. sortOrder[0] = (WUQuerySortField) (sortOrder[0] | WUQSFnocase);
  1428. if (descending)
  1429. sortOrder[0] = (WUQuerySortField) (sortOrder[0] | WUQSFreverse);
  1430. }
  1431. WUQuerySortField filters[16];
  1432. unsigned short filterCount = 0;
  1433. MemoryBuffer filterBuf;
  1434. const char* clusterReq = req.getClusterName();
  1435. addWUQSQueryFilter(filters, filterCount, filterBuf, req.getQuerySetName(), WUQSFQuerySet);
  1436. addWUQSQueryFilter(filters, filterCount, filterBuf, req.getQueryID(), (WUQuerySortField) (WUQSFId | WUQSFwild | WUSFnocase));
  1437. addWUQSQueryFilter(filters, filterCount, filterBuf, req.getQueryName(), (WUQuerySortField) (WUQSFname | WUQSFwild | WUSFnocase));
  1438. addWUQSQueryFilter(filters, filterCount, filterBuf, req.getWUID(), (WUQuerySortField) (WUQSFwuid | WUSFnocase));
  1439. addWUQSQueryFilter(filters, filterCount, filterBuf, req.getLibraryName(), (WUQuerySortField) (WUQSFLibrary | WUQSFnocase));
  1440. addWUQSQueryFilter(filters, filterCount, filterBuf, req.getPublishedBy(), (WUQuerySortField) (WUQSFPublishedBy | WUQSFwild | WUSFnocase));
  1441. if (!req.getMemoryLimitLow_isNull())
  1442. addWUQSQueryFilterInt64(filters, filterCount, filterBuf, req.getMemoryLimitLow(), (WUQuerySortField) (WUQSFmemoryLimit | WUQSFnumeric));
  1443. if (!req.getMemoryLimitHigh_isNull())
  1444. addWUQSQueryFilterInt64(filters, filterCount, filterBuf, req.getMemoryLimitHigh(), (WUQuerySortField) (WUQSFmemoryLimitHi | WUQSFnumeric));
  1445. if (!req.getTimeLimitLow_isNull())
  1446. addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getTimeLimitLow(), (WUQuerySortField) (WUQSFtimeLimit | WUQSFnumeric));
  1447. if (!req.getTimeLimitHigh_isNull())
  1448. addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getTimeLimitHigh(), (WUQuerySortField) (WUQSFtimeLimitHi | WUQSFnumeric));
  1449. if (!req.getWarnTimeLimitLow_isNull())
  1450. addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getWarnTimeLimitLow(), (WUQuerySortField) (WUQSFwarnTimeLimit | WUQSFnumeric));
  1451. if (!req.getWarnTimeLimitHigh_isNull())
  1452. addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getWarnTimeLimitHigh(), (WUQuerySortField) (WUQSFwarnTimeLimitHi | WUQSFnumeric));
  1453. if (!req.getPriorityLow_isNull())
  1454. addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getPriorityLow(), (WUQuerySortField) (WUQSFpriority | WUQSFnumeric));
  1455. if (!req.getPriorityHigh_isNull())
  1456. addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getPriorityHigh(), (WUQuerySortField) (WUQSFpriorityHi | WUQSFnumeric));
  1457. if (!req.getActivated_isNull())
  1458. addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getActivated(), (WUQuerySortField) (WUQSFActivited | WUQSFnumeric));
  1459. MapStringTo<bool> suspendedQueriesByCluster;
  1460. CWUQueryFilterSuspendedType suspendedType = req.getSuspendedFilter();
  1461. if (suspendedType != WUQueryFilterSuspendedType_Undefined)
  1462. {
  1463. addWUQSQueryFilterInt(filters, filterCount, filterBuf, suspendedType, (WUQuerySortField) (WUQSFSuspendedFilter | WUQSFnumeric));
  1464. if (suspendedType == CWUQueryFilterSuspendedType_SUSPDBYFirstNode)
  1465. {
  1466. getSuspendedQueriesByCluster(suspendedQueriesByCluster, req.getQuerySetName(), req.getQueryID(), false);
  1467. }
  1468. else if ((suspendedType == CWUQueryFilterSuspendedType_SUSPDBYAnyNode) || (suspendedType == CWUQueryFilterSuspendedType_NOTSUSPD)
  1469. || (suspendedType == CWUQueryFilterSuspendedType_SUSPD))
  1470. {
  1471. getSuspendedQueriesByCluster(suspendedQueriesByCluster, req.getQuerySetName(), req.getQueryID(), true);
  1472. }
  1473. }
  1474. else if (!req.getSuspendedByUser_isNull()) //For the client before version 1.78
  1475. addWUQSQueryFilterInt(filters, filterCount, filterBuf, CWUQueryFilterSuspendedType_SUSPDBYUSER, (WUQuerySortField) (WUQSFSuspendedFilter | WUQSFnumeric));
  1476. filters[filterCount] = WUQSFterm;
  1477. unsigned numberOfQueries = 0;
  1478. unsigned pageSize = req.getPageSize();
  1479. unsigned pageStartFrom = req.getPageStartFrom();
  1480. if(pageSize < 1)
  1481. pageSize = 100;
  1482. __int64 cacheHint = 0;
  1483. if (!req.getCacheHint_isNull())
  1484. cacheHint = req.getCacheHint();
  1485. Owned<MapStringTo<bool> > queriesUsingFileMap;
  1486. const char *lfn = req.getFileName();
  1487. if (lfn && *lfn)
  1488. {
  1489. queriesUsingFileMap.setown(new MapStringTo<bool>());
  1490. StringAttr dummy;
  1491. Owned<IPropertyTreeIterator> queriesUsingFile = filesInUse.findQueriesUsingFile(clusterReq, lfn, dummy);
  1492. ForEach (*queriesUsingFile)
  1493. {
  1494. IPropertyTree &queryUsingFile = queriesUsingFile->query();
  1495. const char *queryTarget = queryUsingFile.queryProp("@target");
  1496. const char *queryId = queryUsingFile.queryProp("@id");
  1497. if (queryTarget && *queryTarget && queryId && *queryId)
  1498. {
  1499. VStringBuffer targetQuery("%s/%s", queryTarget, queryId);
  1500. queriesUsingFileMap->setValue(targetQuery, true);
  1501. }
  1502. }
  1503. }
  1504. PROGLOG("WUListQueries: getQuerySetQueriesSorted");
  1505. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1506. Owned<IConstQuerySetQueryIterator> it = factory->getQuerySetQueriesSorted(sortOrder, filters, filterBuf.bufferBase(),
  1507. pageStartFrom, pageSize, &cacheHint, &numberOfQueries, queriesUsingFileMap, &suspendedQueriesByCluster);
  1508. resp.setCacheHint(cacheHint);
  1509. PROGLOG("WUListQueries: getQuerySetQueriesSorted done");
  1510. StringArray querySetIds;
  1511. IArrayOf<IEspQuerySetQuery> queries;
  1512. double version = context.getClientVersion();
  1513. ForEach(*it)
  1514. {
  1515. IPropertyTree &query=it->query();
  1516. const char *queryId = query.queryProp("@id");
  1517. const char *queryTarget = query.queryProp("@querySetId");
  1518. Owned<IEspQuerySetQuery> q = createQuerySetQuery();
  1519. q->setId(queryId);
  1520. q->setQuerySetId(queryTarget);
  1521. q->setName(query.queryProp("@name"));
  1522. q->setDll(query.queryProp("@dll"));
  1523. q->setWuid(query.queryProp("@wuid"));
  1524. q->setActivated(query.getPropBool("@activated", false));
  1525. q->setSuspended(query.getPropBool("@suspended", false));
  1526. if (query.hasProp("@memoryLimit"))
  1527. {
  1528. StringBuffer s;
  1529. memoryLimitStringFromUInt64(s, query.getPropInt64("@memoryLimit"));
  1530. q->setMemoryLimit(s);
  1531. }
  1532. if (query.hasProp("@timeLimit"))
  1533. q->setTimeLimit(query.getPropInt("@timeLimit"));
  1534. if (query.hasProp("@warnTimeLimit"))
  1535. q->setWarnTimeLimit(query.getPropInt("@warnTimeLimit"));
  1536. checkAndSetQueryPriority(version, &query, q.get());
  1537. if (query.hasProp("@comment"))
  1538. q->setComment(query.queryProp("@comment"));
  1539. if (version >= 1.46)
  1540. {
  1541. q->setPublishedBy(query.queryProp("@publishedBy"));
  1542. q->setIsLibrary(query.getPropBool("@isLibrary"));
  1543. }
  1544. if (!querySetIds.contains(queryTarget))
  1545. querySetIds.append(queryTarget);
  1546. queries.append(*q.getClear());
  1547. }
  1548. if (queries.ordinality() > 0)
  1549. checkAndSetClusterQueryState(context, clusterReq, querySetIds, queries, req.getCheckAllNodes());
  1550. resp.setQuerysetQueries(queries);
  1551. resp.setNumberOfQueries(numberOfQueries);
  1552. return true;
  1553. }
  1554. void CWsWorkunitsEx::getSuspendedQueriesByCluster(MapStringTo<bool> &suspendedQueries, const char *querySet, const char *queryID, bool checkAllNodes)
  1555. {
  1556. StringArray queryIDs;
  1557. if (!isEmptyString(queryID))
  1558. queryIDs.append(queryID);
  1559. if (!isEmptyString(querySet))
  1560. {
  1561. #ifndef _CONTAINERIZED
  1562. Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(querySet, querySet, &queryIDs, checkAllNodes);
  1563. #else
  1564. Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(querySet, querySet, &queryIDs, checkAllNodes, roxieConnMap);
  1565. #endif
  1566. addSuspendedQueryIDs(suspendedQueries, queriesOnCluster, querySet);
  1567. }
  1568. else
  1569. {
  1570. #ifdef _CONTAINERIZED
  1571. Owned<IStringIterator> targets = getContainerTargetClusters("roxie", nullptr);
  1572. #else
  1573. Owned<IStringIterator> targets = getTargetClusters("RoxieCluster", nullptr);
  1574. #endif
  1575. ForEach(*targets)
  1576. {
  1577. SCMStringBuffer target;
  1578. targets->str(target);
  1579. #ifndef _CONTAINERIZED
  1580. Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(target.str(), target.str(), &queryIDs, checkAllNodes);
  1581. #else
  1582. Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(target.str(), target.str(), &queryIDs, checkAllNodes, roxieConnMap);
  1583. #endif
  1584. addSuspendedQueryIDs(suspendedQueries, queriesOnCluster, target.str());
  1585. }
  1586. }
  1587. }
  1588. void CWsWorkunitsEx::addSuspendedQueryIDs(MapStringTo<bool> &suspendedQueryIDs, IPropertyTree *queriesOnCluster, const char *querySet)
  1589. {
  1590. if (!queriesOnCluster)
  1591. throw makeStringExceptionV(ECLWATCH_INTERNAL_ERROR, "getQueriesOnCluster() returns nullptr for target <%s>", querySet);
  1592. Owned<IPropertyTreeIterator> queries = queriesOnCluster->getElements("Endpoint/Queries/Query");
  1593. ForEach(*queries)
  1594. {
  1595. IPropertyTree &query = queries->query();
  1596. const char *id = query.queryProp("@id");
  1597. if (isEmptyString(id))
  1598. continue; //Should not happen
  1599. if (query.getPropInt("@suspended"))
  1600. {
  1601. VStringBuffer suspendedID("%s/%s", querySet, id);
  1602. suspendedQueryIDs.setValue(suspendedID, true);
  1603. }
  1604. }
  1605. }
  1606. bool CWsWorkunitsEx::onWUListQueriesUsingFile(IEspContext &context, IEspWUListQueriesUsingFileRequest &req, IEspWUListQueriesUsingFileResponse &resp)
  1607. {
  1608. const char *target = req.getTarget();
  1609. const char *process = req.getProcess();
  1610. StringBuffer lfn(req.getFileName());
  1611. resp.setFileName(lfn.toLowerCase());
  1612. resp.setProcess(process);
  1613. if (lfn.isEmpty())
  1614. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "FileName required");
  1615. VStringBuffer logMsg("WUListQueriesUsingFile: %s", lfn.str());
  1616. StringArray targets;
  1617. if (target && *target)
  1618. {
  1619. targets.append(target);
  1620. logMsg.append(", target ").append(target);
  1621. }
  1622. else // if (process && *process)
  1623. {
  1624. SCMStringBuffer targetStr;
  1625. #ifdef _CONTAINERIZED
  1626. Owned<IStringIterator> targetClusters = getContainerTargetClusters("roxie", process);
  1627. #else
  1628. Owned<IStringIterator> targetClusters = getTargetClusters("RoxieCluster", process);
  1629. #endif
  1630. ForEach(*targetClusters)
  1631. targets.append(targetClusters->str(targetStr).str());
  1632. logMsg.append(", process ").append(process);
  1633. }
  1634. PROGLOG("%s", logMsg.str());
  1635. IArrayOf<IEspTargetQueriesUsingFile> respTargets;
  1636. ForEachItemIn(i, targets)
  1637. {
  1638. target = targets.item(i);
  1639. Owned<IEspTargetQueriesUsingFile> respTarget = createTargetQueriesUsingFile();
  1640. respTarget->setTarget(target);
  1641. StringAttr pmid;
  1642. Owned<IPropertyTreeIterator> queries = filesInUse.findQueriesUsingFile(target, lfn, pmid);
  1643. if (!pmid.isEmpty())
  1644. respTarget->setPackageMap(pmid);
  1645. if (queries)
  1646. {
  1647. IArrayOf<IEspQueryUsingFile> respQueries;
  1648. ForEach(*queries)
  1649. {
  1650. IPropertyTree &query = queries->query();
  1651. Owned<IEspQueryUsingFile> q = createQueryUsingFile();
  1652. q->setId(query.queryProp("@id"));
  1653. VStringBuffer xpath("File[@lfn='%s']/@pkgid", lfn.str());
  1654. if (query.hasProp(xpath))
  1655. q->setPackage(query.queryProp(xpath));
  1656. respQueries.append(*q.getClear());
  1657. }
  1658. respTarget->setQueries(respQueries);
  1659. }
  1660. respTargets.append(*respTarget.getClear());
  1661. }
  1662. resp.setTargets(respTargets);
  1663. return true;
  1664. }
  1665. void addQueryFiles(IPropertyTree *queryTree, StringBuffer &queryid, IArrayOf<IEspFileUsedByQuery> &referencedFiles, IArrayOf<IEspQuerySuperFile> &referencedSuperFiles)
  1666. {
  1667. queryid.set(queryTree->queryProp("@id"));
  1668. Owned<IPropertyTreeIterator> files = queryTree->getElements("File");
  1669. ForEach(*files)
  1670. {
  1671. IPropertyTree &file = files->query();
  1672. if (file.getPropBool("@super"))
  1673. {
  1674. Owned<IEspQuerySuperFile> superFile = createQuerySuperFile();
  1675. superFile->setName(file.queryProp("@lfn"));
  1676. Owned<IPropertyTreeIterator> subfiles = file.getElements("SubFile");
  1677. ForEach(*subfiles)
  1678. superFile->getSubFiles().append(subfiles->query().queryProp("@lfn"));
  1679. referencedSuperFiles.append(*superFile.getClear());
  1680. }
  1681. else
  1682. {
  1683. Owned<IEspFileUsedByQuery> respFile = createFileUsedByQuery();
  1684. respFile->setFileName(file.queryProp("@lfn"));
  1685. respFile->setFileSize(file.getPropInt64("@size"));
  1686. respFile->setNumberOfParts(file.getPropInt("@numparts"));
  1687. referencedFiles.append(*respFile.getClear());
  1688. }
  1689. }
  1690. }
  1691. void addQueriesFiles(IPropertyTreeIterator *queriesTrees, IArrayOf<IEspQueryFilesUsed> &queriesFiles)
  1692. {
  1693. ForEach(*queriesTrees)
  1694. {
  1695. IPropertyTree &queryTree = queriesTrees->get();
  1696. StringBuffer id;
  1697. IArrayOf<IEspFileUsedByQuery> referencedFiles;
  1698. IArrayOf<IEspQuerySuperFile> referencedSuperFiles;
  1699. addQueryFiles(&queryTree, id, referencedFiles, referencedSuperFiles);
  1700. Owned<IEspQueryFilesUsed> queryFilesUsed = createQueryFilesUsed();
  1701. queryFilesUsed->setQueryId(id);
  1702. queryFilesUsed->setFiles(referencedFiles);
  1703. queryFilesUsed->setSuperFiles(referencedSuperFiles);
  1704. queriesFiles.append(*queryFilesUsed.getClear());
  1705. }
  1706. }
  1707. bool CWsWorkunitsEx::onWUQueryFiles(IEspContext &context, IEspWUQueryFilesRequest &req, IEspWUQueryFilesResponse &resp)
  1708. {
  1709. const char *target = req.getTarget();
  1710. validateTargetName(target);
  1711. StringAttr queryid;
  1712. const char *query = req.getQueryId();
  1713. if (!query || !*query)
  1714. {
  1715. if (context.getClientVersion()<1.86)
  1716. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query not specified");
  1717. PROGLOG("WUQueryFiles: target %s, all queries", target);
  1718. }
  1719. else
  1720. {
  1721. Owned<IPropertyTree> registeredQuery = resolveQueryAlias(target, query, true);
  1722. if (!registeredQuery)
  1723. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query not found");
  1724. PROGLOG("WUQueryFiles: target %s, query %s", target, query);
  1725. queryid.set(registeredQuery->queryProp("@id"));
  1726. }
  1727. Owned<IPropertyTree> tree = filesInUse.getTree();
  1728. if (queryid.length())
  1729. {
  1730. VStringBuffer xpath("%s/Query[@id='%s']", target, queryid.get());
  1731. IPropertyTree *queryTree = tree->queryPropTree(xpath);
  1732. if (!queryTree)
  1733. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query not found in file cache (%s)", xpath.str());
  1734. IArrayOf<IEspFileUsedByQuery> referencedFiles;
  1735. IArrayOf<IEspQuerySuperFile> referencedSuperFiles;
  1736. StringBuffer id;
  1737. addQueryFiles(queryTree, id, referencedFiles, referencedSuperFiles);
  1738. resp.setFiles(referencedFiles);
  1739. resp.setSuperFiles(referencedSuperFiles);
  1740. }
  1741. else
  1742. {
  1743. //return entire queryset
  1744. VStringBuffer xpath("%s/Query", target);
  1745. Owned<IPropertyTreeIterator> queryTrees = tree->getElements(xpath);
  1746. IArrayOf<IEspQueryFilesUsed> queriesFiles;
  1747. addQueriesFiles(queryTrees, queriesFiles);
  1748. resp.setQueries(queriesFiles);
  1749. }
  1750. return true;
  1751. }
  1752. void copyWorkunitForRecompile(IEspContext &context, IWorkUnitFactory *factory, const char *srcWuid, StringAttr &wuid, StringAttr &jobname)
  1753. {
  1754. Owned<IConstWorkUnit> src(factory->openWorkUnit(srcWuid));
  1755. if (!src)
  1756. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.", srcWuid);
  1757. WsWuInfo info(context, src);
  1758. StringBuffer archiveText;
  1759. info.getWorkunitArchiveQuery(archiveText); //archive required, fail otherwise
  1760. if (!isArchiveQuery(archiveText))
  1761. throw MakeStringException(ECLWATCH_RESOURCE_NOT_FOUND,"Cannot retrieve workunit ECL archive %s.", srcWuid);
  1762. SCMStringBuffer mainDefinition;
  1763. Owned <IConstWUQuery> query = src->getQuery();
  1764. if (query)
  1765. query->getQueryMainDefinition(mainDefinition);
  1766. NewWsWorkunit wu(factory, context);
  1767. wuid.set(wu->queryWuid());
  1768. wu->setAction(WUActionCompile);
  1769. jobname.set(src->queryJobName());
  1770. if (jobname.length())
  1771. wu->setJobName(jobname);
  1772. wu.setQueryText(archiveText.str());
  1773. if (mainDefinition.length())
  1774. wu.setQueryMain(mainDefinition.str());
  1775. wu->setResultLimit(src->getResultLimit());
  1776. IStringIterator &names = src->getDebugValues();
  1777. ForEach(names)
  1778. {
  1779. SCMStringBuffer name, value;
  1780. names.str(name);
  1781. if (0==strncmp(name.str(), "eclcc", 5))
  1782. wu->setDebugValue(name.str(), src->getDebugValue(name.str(), value).str(), true);
  1783. }
  1784. }
  1785. bool CWsWorkunitsEx::onWURecreateQuery(IEspContext &context, IEspWURecreateQueryRequest &req, IEspWURecreateQueryResponse &resp)
  1786. {
  1787. try
  1788. {
  1789. const char* srcTarget = req.getTarget();
  1790. const char* queryIdOrAlias = req.getQueryId();
  1791. if (!srcTarget || !*srcTarget)
  1792. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target not specified");
  1793. if (!queryIdOrAlias || !*queryIdOrAlias)
  1794. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "QueryId not specified");
  1795. const char *target = req.getDestTarget();
  1796. if (isEmptyString(target))
  1797. target = srcTarget;
  1798. Owned<IPropertyTree> queryRegistry = getQueryRegistry(srcTarget, false);
  1799. Owned<IPropertyTree> srcQueryTree = resolveQueryAlias(queryRegistry, queryIdOrAlias);
  1800. if (!srcQueryTree)
  1801. {
  1802. DBGLOG("WURecreateQuery - No matching Query");
  1803. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND,"No matching query for given id or alias %s.", queryIdOrAlias);
  1804. }
  1805. resp.setPriority(isEmptyString(req.getPriority()) ? srcQueryTree->queryProp("@priority") : req.getPriority());
  1806. resp.setComment(isEmptyString(req.getComment()) ? srcQueryTree->queryProp("@comment") : req.getComment());
  1807. resp.setMemoryLimit(isEmptyString(req.getMemoryLimit()) ? srcQueryTree->queryProp("@memoryLimit") : req.getMemoryLimit());
  1808. resp.setTimeLimit(req.getTimeLimit_isNull() ? srcQueryTree->getPropInt("@timeLimit") : req.getTimeLimit());
  1809. resp.setWarnTimeLimit(req.getWarnTimeLimit_isNull() ? srcQueryTree->getPropInt("@warnTimeLimit") : req.getWarnTimeLimit());
  1810. StringAttr wuid;
  1811. StringAttr jobname;
  1812. const char* srcQueryId = srcQueryTree->queryProp("@id");
  1813. const char* srcQueryName = srcQueryTree->queryProp("@name");
  1814. const char *srcWuid = srcQueryTree->queryProp("@wuid");
  1815. PROGLOG("WURecreateQuery: QuerySet %s, query %s, wuid %s", srcTarget, srcQueryId, srcWuid);
  1816. ensureWsWorkunitAccess(context, srcWuid, SecAccess_Write);
  1817. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1818. copyWorkunitForRecompile(context, factory, srcWuid, wuid, jobname);
  1819. resp.setWuid(wuid);
  1820. WsWuHelpers::submitWsWorkunit(context, wuid.str(), target, nullptr, 0, 0, true, false, false, nullptr, nullptr, &req.getDebugValues(), nullptr);
  1821. waitForWorkUnitToCompile(wuid.str(), req.getWait());
  1822. Owned<IConstWorkUnit> cw(factory->openWorkUnit(wuid.str()));
  1823. if (!cw)
  1824. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open recreated workunit %s.",wuid.str());
  1825. if (jobname.length())
  1826. {
  1827. StringBuffer name;
  1828. origValueChanged(jobname.str(), cw->queryJobName(), name, false);
  1829. if (name.length()) //non generated user specified name, so override #Workunit('name')
  1830. {
  1831. WorkunitUpdate wx(&cw->lock());
  1832. wx->setJobName(name.str());
  1833. }
  1834. }
  1835. PROGLOG("WURecreateQuery generated: %s", wuid.str());
  1836. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str());
  1837. queryRegistry.clear();
  1838. srcQueryTree.clear();
  1839. if (req.getRepublish())
  1840. {
  1841. if (!req.getDontCopyFiles())
  1842. {
  1843. StringBuffer daliIP;
  1844. StringBuffer srcCluster;
  1845. StringBuffer srcPrefix;
  1846. splitDerivedDfsLocation(req.getRemoteDali(), srcCluster, daliIP, srcPrefix, req.getSourceProcess(),req.getSourceProcess(), NULL, NULL);
  1847. if (srcCluster.length())
  1848. {
  1849. if (!validateDataPlaneName(daliIP, srcCluster))
  1850. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Process cluster %s not found on %s DALI", srcCluster.str(), daliIP.length() ? daliIP.str() : "local");
  1851. }
  1852. unsigned updateFlags = 0;
  1853. if (req.getUpdateDfs())
  1854. updateFlags |= (DALI_UPDATEF_SUPERFILES | DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM);
  1855. if (req.getUpdateCloneFrom())
  1856. updateFlags |= DALI_UPDATEF_CLONE_FROM;
  1857. if (req.getUpdateSuperFiles())
  1858. updateFlags |= DALI_UPDATEF_SUPERFILES;
  1859. if (req.getAppendCluster())
  1860. updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
  1861. QueryFileCopier cpr(target);
  1862. cpr.init(context, req.getAllowForeignFiles());
  1863. cpr.remoteIP.set(daliIP);
  1864. cpr.remotePrefix.set(srcPrefix);
  1865. cpr.srcCluster.set(srcCluster);
  1866. cpr.queryname.set(srcQueryName);
  1867. cpr.copy(cw, updateFlags);
  1868. if (req.getIncludeFileErrors())
  1869. cpr.gatherFileErrors(resp.getFileErrors());
  1870. }
  1871. StringBuffer queryId;
  1872. WorkunitUpdate wu(&cw->lock());
  1873. WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
  1874. addQueryToQuerySet(wu, target, srcQueryName, activate, queryId, context.queryUserId());
  1875. {
  1876. Owned<IPropertyTree> queryTree = getQueryById(target, queryId, false);
  1877. if (queryTree)
  1878. {
  1879. queryTree->setProp("@priority", resp.getPriority());
  1880. updateMemoryLimitSetting(queryTree, resp.getMemoryLimit());
  1881. updateQuerySetting(resp.getTimeLimit_isNull(), queryTree, "@timeLimit", resp.getTimeLimit());
  1882. updateQuerySetting(resp.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", resp.getWarnTimeLimit());
  1883. updateQueryPriority(queryTree, resp.getPriority());
  1884. queryTree->setProp("@comment", resp.getComment());
  1885. }
  1886. }
  1887. wu->commit();
  1888. wu.clear();
  1889. PROGLOG("WURecreateQuery published: %s as %s/%s", wuid.str(), target, queryId.str());
  1890. resp.setQuerySet(target);
  1891. resp.setQueryName(srcQueryName);
  1892. resp.setQueryId(queryId.str());
  1893. bool reloadFailed = false;
  1894. #ifndef _CONTAINERIZED
  1895. Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target);
  1896. #endif
  1897. if (0!=req.getWait() && !req.getNoReload())
  1898. #ifndef _CONTAINERIZED
  1899. reloadFailed = !reloadCluster(clusterInfo, (unsigned)req.getWait());
  1900. #else
  1901. reloadFailed = !reloadCluster(roxieConnMap, target, (unsigned)req.getWait());
  1902. #endif
  1903. resp.setReloadFailed(reloadFailed);
  1904. StringBuffer errorMessage;
  1905. #ifndef _CONTAINERIZED
  1906. if (!reloadFailed && !req.getNoReload() && isQuerySuspended(queryId.str(), clusterInfo, (unsigned)req.getWait(), errorMessage))
  1907. #else
  1908. if (!reloadFailed && !req.getNoReload() && isQuerySuspended(queryId, target, (unsigned)req.getWait(), errorMessage))
  1909. #endif
  1910. {
  1911. resp.setSuspended(true);
  1912. resp.setErrorMessage(errorMessage);
  1913. }
  1914. }
  1915. }
  1916. catch(IException* e)
  1917. {
  1918. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1919. }
  1920. return true;
  1921. }
  1922. bool CWsWorkunitsEx::onWUQueryDetails(IEspContext &context, IEspWUQueryDetailsRequest & req, IEspWUQueryDetailsResponse & resp)
  1923. {
  1924. try
  1925. {
  1926. CWUQueryDetailsReq request(req);
  1927. getWUQueryDetails(context, request, resp);
  1928. }
  1929. catch(IException* e)
  1930. {
  1931. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1932. }
  1933. return true;
  1934. }
  1935. void CWsWorkunitsEx::getWUQueryDetails(IEspContext &context, CWUQueryDetailsReq &req, IEspWUQueryDetailsResponse &resp)
  1936. {
  1937. const char *querySet = req.getQuerySet();
  1938. if (isEmptyString(querySet))
  1939. throw makeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "QuerySet not specified");
  1940. const char *queryIdOrAlias = req.getQueryIdOrAlias();
  1941. if (isEmptyString(queryIdOrAlias))
  1942. throw makeStringException(ECLWATCH_QUERYID_NOT_FOUND, "QueryId not specified");
  1943. Owned<IPropertyTree> queryRegistry = getQueryRegistry(querySet, false);
  1944. Owned<IPropertyTree> query = resolveQueryAlias(queryRegistry, queryIdOrAlias);
  1945. if (!query)
  1946. throw makeStringExceptionV(ECLWATCH_QUERYID_NOT_FOUND, "No matching query for given id or alias %s.", queryIdOrAlias);
  1947. const char* queryId = query->queryProp("@id");
  1948. resp.setQueryId(queryId);
  1949. resp.setQuerySet(querySet);
  1950. PROGLOG("WUQueryDetails: QuerySet %s, query %s", querySet, queryId);
  1951. const char* wuid = query->queryProp("@wuid");
  1952. resp.setQueryName(query->queryProp("@name"));
  1953. resp.setWuid(wuid);
  1954. resp.setDll(query->queryProp("@dll"));
  1955. resp.setPublishedBy(query->queryProp("@publishedBy"));
  1956. resp.setSuspended(query->getPropBool("@suspended", false));
  1957. resp.setSuspendedBy(query->queryProp("@suspendedBy"));
  1958. resp.setComment(query->queryProp("@comment"));
  1959. double version = context.getClientVersion();
  1960. if (version >= 1.46)
  1961. {
  1962. checkAndSetQueryPriority(version, query, &resp);
  1963. resp.setIsLibrary(query->getPropBool("@isLibrary"));
  1964. if (version < 1.64)
  1965. {
  1966. StringArray graphIds;
  1967. unsigned numGraphIds = getGraphIdsByQueryId(querySet, queryId, graphIds);
  1968. resp.setCountGraphs(numGraphIds);
  1969. if (numGraphIds > 0)
  1970. resp.setGraphIds(graphIds);
  1971. }
  1972. if (req.getIncludeWUDetails())
  1973. {
  1974. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1975. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
  1976. if (!cw)
  1977. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",wuid);
  1978. ensureWsWorkunitAccess(context, *cw, SecAccess_Read);
  1979. SCMStringBuffer s;
  1980. resp.setWUSnapShot(cw->getSnapshot(s).str()); //Label
  1981. stat_type whenCompiled;
  1982. if (cw->getStatistic(whenCompiled, "", StWhenCompiled))
  1983. {
  1984. formatStatistic(s.s.clear(), whenCompiled, StWhenCompiled);
  1985. resp.setCompileTime(s.str());
  1986. }
  1987. StringArray libUsed;
  1988. Owned<IConstWULibraryIterator> libs = &cw->getLibraries();
  1989. ForEach(*libs)
  1990. libUsed.append(libs->query().getName(s).str());
  1991. if (libUsed.length())
  1992. resp.setLibrariesUsed(libUsed);
  1993. if (version >= 1.50)
  1994. {
  1995. WsWuInfo winfo(context, cw);
  1996. resp.setResourceURLCount(winfo.getResourceURLCount());
  1997. if (version >= 1.64)
  1998. {
  1999. IArrayOf<IEspECLTimer> timers;
  2000. winfo.doGetTimers(timers); //Graph Duration
  2001. if (timers.length())
  2002. resp.setWUTimers(timers);
  2003. IArrayOf<IEspECLGraph> graphs;
  2004. winfo.doGetGraphs(graphs); //Graph Name, Label, Started, Finished, Type
  2005. unsigned numGraphIds = graphs.length();
  2006. resp.setCountGraphs(numGraphIds);
  2007. if (numGraphIds > 0)
  2008. resp.setWUGraphs(graphs);
  2009. }
  2010. }
  2011. }
  2012. }
  2013. if (req.getIncludeWUQueryFiles())
  2014. {
  2015. StringArray logicalFiles;
  2016. IArrayOf<IEspQuerySuperFile> superFiles;
  2017. getQueryFiles(context, wuid, queryId, querySet, logicalFiles, req.getIncludeSuperFiles() ? &superFiles : nullptr);
  2018. if (logicalFiles.length())
  2019. resp.setLogicalFiles(logicalFiles);
  2020. if (superFiles.length())
  2021. resp.setSuperFiles(superFiles);
  2022. }
  2023. if (version >= 1.42)
  2024. {
  2025. VStringBuffer xpath("Alias[@id='%s']", queryId);
  2026. IPropertyTree *alias = queryRegistry->queryPropTree(xpath.str());
  2027. if (!alias)
  2028. resp.setActivated(false);
  2029. else
  2030. resp.setActivated(true);
  2031. }
  2032. if (req.getIncludeStateOnClusters() && (version >= 1.43))
  2033. {
  2034. StringArray queryIds;
  2035. queryIds.append(queryId);
  2036. #ifndef _CONTAINERIZED
  2037. Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(querySet, querySet, &queryIds, req.getCheckAllNodes());
  2038. #else
  2039. Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(querySet, querySet, &queryIds, req.getCheckAllNodes(), roxieConnMap);
  2040. #endif
  2041. if (queriesOnCluster)
  2042. {
  2043. IArrayOf<IEspClusterQueryState> clusterStates;
  2044. addClusterQueryStates(queriesOnCluster, querySet, queryId, clusterStates, version);
  2045. resp.setClusters(clusterStates);
  2046. }
  2047. }
  2048. if (req.getIncludeWsEclAddresses())
  2049. {
  2050. StringArray wseclAddresses;
  2051. #ifndef _CONTAINERIZED
  2052. Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
  2053. Owned<IConstEnvironment> env = factory->openEnvironment();
  2054. Owned<IPropertyTree> root = &env->getPTree();
  2055. Owned<IPropertyTreeIterator> services = root->getElements("Software/EspService[Properties/@type='ws_ecl']");
  2056. StringArray serviceNames;
  2057. VStringBuffer xpath("Target[@name='%s']", querySet);
  2058. ForEach(*services)
  2059. {
  2060. IPropertyTree &service = services->query();
  2061. if (!service.hasProp("Target") || service.hasProp(xpath))
  2062. serviceNames.append(service.queryProp("@name"));
  2063. }
  2064. Owned<IPropertyTreeIterator> processes = root->getElements("Software/EspProcess");
  2065. ForEach(*processes)
  2066. {
  2067. StringArray netAddrs;
  2068. IPropertyTree &process = processes->query();
  2069. Owned<IPropertyTreeIterator> instances = process.getElements("Instance");
  2070. ForEach(*instances)
  2071. {
  2072. IPropertyTree &instance = instances->query();
  2073. const char *netAddr = instance.queryProp("@netAddress");
  2074. if (!netAddr || !*netAddr)
  2075. continue;
  2076. if (streq(netAddr, "."))
  2077. netAddrs.appendUniq(envLocalAddress); //not necessarily local to this server
  2078. else
  2079. netAddrs.appendUniq(netAddr);
  2080. }
  2081. Owned<IPropertyTreeIterator> bindings = process.getElements("EspBinding");
  2082. ForEach(*bindings)
  2083. {
  2084. IPropertyTree &binding = bindings->query();
  2085. const char *srvName = binding.queryProp("@service");
  2086. if (!serviceNames.contains(srvName))
  2087. continue;
  2088. const char *port = binding.queryProp("@port"); //should always be an integer, but we're just concatenating strings
  2089. if (!port || !*port)
  2090. continue;
  2091. ForEachItemIn(i, netAddrs)
  2092. {
  2093. VStringBuffer wseclAddr("%s:%s", netAddrs.item(i), port);
  2094. wseclAddresses.append(wseclAddr);
  2095. }
  2096. }
  2097. }
  2098. #else
  2099. IArrayOf<IConstHPCCService> eclservices;
  2100. CTpWrapper tpWrapper;
  2101. tpWrapper.getServices(version, "eclqueries", nullptr, eclservices);
  2102. ForEachItemIn(i, eclservices)
  2103. {
  2104. IConstHPCCService& eclservice = eclservices.item(i);
  2105. VStringBuffer wseclAddr("%s:%u", eclservice.getName(), eclservice.getPort());
  2106. wseclAddresses.append(wseclAddr);
  2107. }
  2108. #endif
  2109. resp.setWsEclAddresses(wseclAddresses);
  2110. }
  2111. }
  2112. bool CWsWorkunitsEx::onWUQueryDetailsLightWeight(IEspContext &context, IEspWUQueryDetailsLightWeightRequest & req, IEspWUQueryDetailsResponse & resp)
  2113. {
  2114. try
  2115. {
  2116. CWUQueryDetailsReq request(req);
  2117. getWUQueryDetails(context, request, resp);
  2118. }
  2119. catch(IException* e)
  2120. {
  2121. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  2122. }
  2123. return true;
  2124. }
  2125. CWUQueryDetailsReq::CWUQueryDetailsReq(IEspWUQueryDetailsRequest &req)
  2126. {
  2127. querySet = req.getQuerySet();
  2128. queryIdOrAlias = req.getQueryId();
  2129. includeWUDetails = req.getIncludeWUDetails();
  2130. IncludeWUQueryFiles = req.getIncludeWUQueryFiles();
  2131. includeSuperFiles = req.getIncludeSuperFiles();
  2132. includeWsEclAddresses = req.getIncludeWsEclAddresses();
  2133. includeStateOnClusters = req.getIncludeStateOnClusters();
  2134. checkAllNodes = req.getCheckAllNodes();
  2135. }
  2136. CWUQueryDetailsReq::CWUQueryDetailsReq(IEspWUQueryDetailsLightWeightRequest &req)
  2137. {
  2138. querySet = req.getQuerySet();
  2139. queryIdOrAlias = req.getQueryId();
  2140. includeWUDetails = req.getIncludeWUDetails();
  2141. IncludeWUQueryFiles = req.getIncludeWUQueryFiles();
  2142. includeSuperFiles = req.getIncludeSuperFiles();
  2143. includeWsEclAddresses = req.getIncludeWsEclAddresses();
  2144. includeStateOnClusters = req.getIncludeStateOnClusters();
  2145. checkAllNodes = req.getCheckAllNodes();
  2146. }
  2147. int EspQuerySuperFileCompareFunc(IInterface * const *i1, IInterface * const *i2)
  2148. {
  2149. if (!i1 || !*i1 || !i2 || !*i2)
  2150. return 0;
  2151. IEspQuerySuperFile *sf1 = QUERYINTERFACE(*i1, IEspQuerySuperFile);
  2152. IEspQuerySuperFile *sf2 = QUERYINTERFACE(*i2, IEspQuerySuperFile);
  2153. if (!sf1 || !sf2)
  2154. return 0;
  2155. const char *name1 = sf1->getName();
  2156. const char *name2 = sf2->getName();
  2157. if (!name1 || !name2)
  2158. return 0;
  2159. return strcmp(name1, name2);
  2160. }
  2161. IReferencedFile* CWsWorkunitsEx::getReferencedFileByName(const char* name, IReferencedFileList* wufiles)
  2162. {
  2163. Owned<IReferencedFileIterator> refFileItr = wufiles->getFiles();
  2164. ForEach(*refFileItr)
  2165. {
  2166. IReferencedFile& rf = refFileItr->query();
  2167. const char* lfn = rf.getLogicalName();
  2168. if (lfn && strieq(lfn, name))
  2169. return &rf;
  2170. }
  2171. return NULL;
  2172. }
  2173. void CWsWorkunitsEx::readSuperFiles(IEspContext &context, IReferencedFile* rf, const char* fileName, IReferencedFileList* wufiles, IArrayOf<IEspQuerySuperFile>* files)
  2174. {
  2175. double version = context.getClientVersion();
  2176. StringArray subFiles;
  2177. IArrayOf<IEspQuerySuperFile> superFiles;
  2178. const StringArray& subFileNames = rf->getSubFileNames();
  2179. ForEachItemIn(i, subFileNames)
  2180. {
  2181. const char* name = subFileNames.item(i);
  2182. if (!name || !*name)
  2183. continue;
  2184. IReferencedFile* pRF = getReferencedFileByName(name, wufiles);
  2185. if (!pRF)
  2186. continue;
  2187. if (!(pRF->getFlags() & RefFileSuper))
  2188. {
  2189. subFiles.append(name);
  2190. }
  2191. else if (version >= 1.57)
  2192. {
  2193. readSuperFiles(context, pRF, name, wufiles, &superFiles);
  2194. }
  2195. }
  2196. Owned<IEspQuerySuperFile> newSuperFile = createQuerySuperFile();
  2197. newSuperFile->setName(fileName);
  2198. if (subFiles.length())
  2199. {
  2200. subFiles.sortAscii();
  2201. newSuperFile->setSubFiles(subFiles);
  2202. }
  2203. if ((version >= 1.57) && superFiles.length())
  2204. {
  2205. superFiles.sort(EspQuerySuperFileCompareFunc);
  2206. newSuperFile->setSuperFiles(superFiles);
  2207. }
  2208. files->append(*newSuperFile.getClear());
  2209. }
  2210. bool CWsWorkunitsEx::getQueryFiles(IEspContext &context, const char* wuid, const char* query, const char* target, StringArray& logicalFiles, IArrayOf<IEspQuerySuperFile> *respSuperFiles)
  2211. {
  2212. try
  2213. {
  2214. Owned<IConstWUClusterInfo> info = getWUClusterInfoByName(target);
  2215. if (!info || (info->getPlatform()!=RoxieCluster))
  2216. return false;
  2217. SCMStringBuffer process;
  2218. info->getRoxieProcess(process);
  2219. if (!process.length())
  2220. return false;
  2221. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  2222. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
  2223. if (!cw)
  2224. return false;
  2225. double version = context.getClientVersion();
  2226. StringArray superFileNames;
  2227. Owned<IHpccPackageSet> ps = createPackageSet(process.str());
  2228. Owned<IReferencedFileList> wufiles = createReferencedFileList(context.queryUserId(),
  2229. context.queryPassword(), true, true);
  2230. wufiles->addFilesFromQuery(cw, (ps) ? ps->queryActiveMap(target) : NULL, query);
  2231. StringArray locations;
  2232. locations.append(process.str());
  2233. wufiles->resolveFiles(locations, NULL, NULL, NULL, true, true, true, true);
  2234. Owned<IReferencedFileIterator> refFileItr = wufiles->getFiles();
  2235. ForEach(*refFileItr)
  2236. {
  2237. IReferencedFile &rf = refFileItr->query();
  2238. const char *lfn = rf.getLogicalName();
  2239. if (lfn && *lfn)
  2240. {
  2241. bool isSuper = rf.getFlags() & RefFileSuper;
  2242. if (!isSuper || (version < 1.78))
  2243. logicalFiles.append(lfn);
  2244. if (respSuperFiles && isSuper)
  2245. readSuperFiles(context, &rf, lfn, wufiles, respSuperFiles);
  2246. }
  2247. }
  2248. logicalFiles.sortAscii();
  2249. if (respSuperFiles)
  2250. respSuperFiles->sort(EspQuerySuperFileCompareFunc);
  2251. return true;
  2252. }
  2253. catch(IMultiException *me)
  2254. {
  2255. StringBuffer err;
  2256. IERRLOG("ERROR control:getQueryXrefInfo roxie query info %s", me->errorMessage(err.append(me->errorCode()).append(' ')).str());
  2257. me->Release();
  2258. return false;
  2259. }
  2260. catch(IException *e)
  2261. {
  2262. StringBuffer err;
  2263. IERRLOG("ERROR control:getQueryXrefInfo roxie query info %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
  2264. e->Release();
  2265. return false;
  2266. }
  2267. }
  2268. inline void verifyQueryActionAllowsWild(bool &allowWildChecked, CQuerySetQueryActionTypes action)
  2269. {
  2270. if (allowWildChecked)
  2271. return;
  2272. switch (action)
  2273. {
  2274. case CQuerySetQueryActionTypes_ToggleSuspend:
  2275. throw MakeStringException(ECLWATCH_INVALID_ACTION, "Wildcards not supported for toggling suspended state");
  2276. case CQuerySetQueryActionTypes_Activate:
  2277. throw MakeStringException(ECLWATCH_INVALID_ACTION, "Wildcards not supported for Activating queries");
  2278. }
  2279. allowWildChecked=true;
  2280. }
  2281. inline bool verifyQueryActionAllowsAlias(CQuerySetQueryActionTypes action)
  2282. {
  2283. return (action!=CQuerySetQueryActionTypes_Activate && action!=CQuerySetQueryActionTypes_RemoveAllAliases);
  2284. }
  2285. void expandQueryActionTargetList(IProperties *queryIds, IPropertyTree *queryset, IArrayOf<IConstQuerySetQueryActionItem> &items, CQuerySetQueryActionTypes action)
  2286. {
  2287. bool allowWildChecked=false;
  2288. ForEachItemIn(i, items)
  2289. {
  2290. const char *itemId = items.item(i).getQueryId();
  2291. if (!isWildString(itemId))
  2292. {
  2293. bool suspendedByUser = false;
  2294. const char *itemSuspendState = items.item(i).getClientState().getSuspended();
  2295. if (itemSuspendState && (strieq(itemSuspendState, "By User") || strieq(itemSuspendState, "1")))
  2296. suspendedByUser = true;
  2297. if (!verifyQueryActionAllowsAlias(action))
  2298. queryIds->setProp(itemId, suspendedByUser);
  2299. else
  2300. {
  2301. Owned<IPropertyTree> query = resolveQueryAlias(queryset, itemId);
  2302. if (query)
  2303. {
  2304. const char *id = query->queryProp("@id");
  2305. if (id && *id)
  2306. queryIds->setProp(id, suspendedByUser);
  2307. }
  2308. }
  2309. }
  2310. else
  2311. {
  2312. verifyQueryActionAllowsWild(allowWildChecked, action);
  2313. if (verifyQueryActionAllowsAlias(action))
  2314. {
  2315. Owned<IPropertyTreeIterator> active = queryset->getElements("Alias");
  2316. ForEach(*active)
  2317. {
  2318. const char *name = active->query().queryProp("@name");
  2319. const char *id = active->query().queryProp("@id");
  2320. if (name && id && WildMatch(name, itemId))
  2321. queryIds->setProp(id, 0);
  2322. }
  2323. }
  2324. Owned<IPropertyTreeIterator> queries = queryset->getElements("Query");
  2325. ForEach(*queries)
  2326. {
  2327. const char *id = queries->query().queryProp("@id");
  2328. if (id && WildMatch(id, itemId))
  2329. queryIds->setProp(id, 0);
  2330. }
  2331. }
  2332. }
  2333. }
  2334. void expandQueryActionTargetList(IProperties *queryIds, IPropertyTree *queryset, const char *id, CQuerySetQueryActionTypes action)
  2335. {
  2336. IArrayOf<IConstQuerySetQueryActionItem> items;
  2337. Owned<IEspQuerySetQueryActionItem> item = createQuerySetQueryActionItem();
  2338. item->setQueryId(id);
  2339. items.append(*(IConstQuerySetQueryActionItem*)item.getClear());
  2340. expandQueryActionTargetList(queryIds, queryset, items, action);
  2341. }
  2342. bool CWsWorkunitsEx::onWUQueryConfig(IEspContext &context, IEspWUQueryConfigRequest & req, IEspWUQueryConfigResponse & resp)
  2343. {
  2344. StringAttr target(req.getTarget());
  2345. validateTargetName(target);
  2346. Owned<IPropertyTree> queryset = getQueryRegistry(target.get(), false);
  2347. if (!queryset)
  2348. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target Queryset %s not found", req.getTarget());
  2349. PROGLOG("WUQueryConfig: target %s", target.get());
  2350. Owned<IProperties> queryIds = createProperties();
  2351. expandQueryActionTargetList(queryIds, queryset, req.getQueryId(), QuerySetQueryActionTypes_Undefined);
  2352. IArrayOf<IEspWUQueryConfigResult> results;
  2353. Owned<IPropertyIterator> it = queryIds->getIterator();
  2354. ForEach(*it)
  2355. {
  2356. Owned<IEspWUQueryConfigResult> result = createWUQueryConfigResult();
  2357. result->setQueryId(it->getPropKey());
  2358. VStringBuffer xpath("Query[@id='%s']", it->getPropKey());
  2359. IPropertyTree *queryTree = queryset->queryPropTree(xpath);
  2360. if (queryTree)
  2361. {
  2362. updateMemoryLimitSetting(queryTree, req.getMemoryLimit());
  2363. updateQueryPriority(queryTree, req.getPriority());
  2364. updateQuerySetting(req.getTimeLimit_isNull(), queryTree, "@timeLimit", req.getTimeLimit());
  2365. updateQuerySetting(req.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", req.getWarnTimeLimit());
  2366. if (req.getComment())
  2367. queryTree->setProp("@comment", req.getComment());
  2368. }
  2369. results.append(*result.getClear());
  2370. }
  2371. resp.setResults(results);
  2372. bool reloadFailed = false;
  2373. if (0!=req.getWait() && !req.getNoReload())
  2374. #ifndef _CONTAINERIZED
  2375. reloadFailed = !reloadCluster(target.get(), (unsigned)req.getWait());
  2376. #else
  2377. reloadFailed = !reloadCluster(roxieConnMap, target.get(), (unsigned)req.getWait());
  2378. #endif
  2379. resp.setReloadFailed(reloadFailed);
  2380. return true;
  2381. }
  2382. bool CWsWorkunitsEx::onWUQuerysetQueryAction(IEspContext &context, IEspWUQuerySetQueryActionRequest & req, IEspWUQuerySetQueryActionResponse & resp)
  2383. {
  2384. resp.setQuerySetName(req.getQuerySetName());
  2385. resp.setAction(req.getAction());
  2386. if (isEmpty(req.getQuerySetName()))
  2387. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Queryset name required");
  2388. Owned<IPropertyTree> queryset = getQueryRegistry(req.getQuerySetName(), true);
  2389. if (!queryset)
  2390. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySetName());
  2391. Owned<IProperties> queryIds = createProperties();
  2392. expandQueryActionTargetList(queryIds, queryset, req.getQueries(), req.getAction());
  2393. if (req.getAction() == CQuerySetQueryActionTypes_ResetQueryStats)
  2394. return resetQueryStats(context, req.getQuerySetName(), queryIds, resp);
  2395. IArrayOf<IEspQuerySetQueryActionResult> results;
  2396. Owned<IPropertyIterator> it = queryIds->getIterator();
  2397. ForEach(*it)
  2398. {
  2399. const char *id = it->getPropKey();
  2400. Owned<IEspQuerySetQueryActionResult> result = createQuerySetQueryActionResult();
  2401. result->setQueryId(id);
  2402. try
  2403. {
  2404. Owned<IPropertyTree> query = getQueryById(queryset, id);
  2405. if (!query)
  2406. throw MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query %s/%s not found.", req.getQuerySetName(), id);
  2407. CQuerySetQueryActionTypes action = req.getAction();
  2408. const char* strAction = (action > -1) && (action < NumOfQuerySetQueryActionTypes) ? QuerySetQueryActionTypes[action] : "Undefined";
  2409. PROGLOG("%s: queryset %s, query %s", strAction, req.getQuerySetName(), id);
  2410. switch (action)
  2411. {
  2412. case CQuerySetQueryActionTypes_ToggleSuspend:
  2413. setQuerySuspendedState(queryset, id, !queryIds->getPropBool(id), context.queryUserId());
  2414. break;
  2415. case CQuerySetQueryActionTypes_Suspend:
  2416. setQuerySuspendedState(queryset, id, true, context.queryUserId());
  2417. break;
  2418. case CQuerySetQueryActionTypes_Unsuspend:
  2419. setQuerySuspendedState(queryset, id, false, NULL);
  2420. break;
  2421. case CQuerySetQueryActionTypes_Activate:
  2422. setQueryAlias(queryset, query->queryProp("@name"), id);
  2423. break;
  2424. case CQuerySetQueryActionTypes_Delete:
  2425. removeNamedQuery(queryset, id);
  2426. query.clear();
  2427. break;
  2428. case CQuerySetQueryActionTypes_RemoveAllAliases:
  2429. removeAliasesFromNamedQuery(queryset, id);
  2430. break;
  2431. }
  2432. result->setSuccess(true);
  2433. if (query)
  2434. result->setSuspended(query->getPropBool("@suspended"));
  2435. }
  2436. catch(IException *e)
  2437. {
  2438. StringBuffer msg;
  2439. result->setMessage(e->errorMessage(msg).str());
  2440. result->setCode(e->errorCode());
  2441. result->setSuccess(false);
  2442. e->Release();
  2443. }
  2444. results.append(*result.getClear());
  2445. }
  2446. resp.setResults(results);
  2447. return true;
  2448. }
  2449. bool CWsWorkunitsEx::onWUQuerysetAliasAction(IEspContext &context, IEspWUQuerySetAliasActionRequest &req, IEspWUQuerySetAliasActionResponse &resp)
  2450. {
  2451. resp.setQuerySetName(req.getQuerySetName());
  2452. resp.setAction(req.getAction());
  2453. if (isEmpty(req.getQuerySetName()))
  2454. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Queryset name required");
  2455. Owned<IPropertyTree> queryset = getQueryRegistry(req.getQuerySetName(), true);
  2456. if (!queryset)
  2457. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", req.getQuerySetName());
  2458. IArrayOf<IEspQuerySetAliasActionResult> results;
  2459. ForEachItemIn(i, req.getAliases())
  2460. {
  2461. IConstQuerySetAliasActionItem& item=req.getAliases().item(i);
  2462. Owned<IEspQuerySetAliasActionResult> result = createQuerySetAliasActionResult();
  2463. try
  2464. {
  2465. VStringBuffer xpath("Alias[@name='%s']", item.getName());
  2466. IPropertyTree *alias = queryset->queryPropTree(xpath.str());
  2467. if (!alias)
  2468. throw MakeStringException(ECLWATCH_ALIAS_NOT_FOUND, "Alias %s/%s not found.", req.getQuerySetName(), item.getName());
  2469. CQuerySetAliasActionTypes action = req.getAction();
  2470. const char* strAction = (action > -1) && (action < NumOfQuerySetAliasActionTypes) ? QuerySetAliasActionTypes[action] : "Undefined";
  2471. PROGLOG("%s: queryset %s, alias %s", strAction, req.getQuerySetName(), item.getName());
  2472. switch (action)
  2473. {
  2474. case CQuerySetAliasActionTypes_Deactivate:
  2475. removeQuerySetAlias(req.getQuerySetName(), item.getName());
  2476. break;
  2477. }
  2478. result->setSuccess(true);
  2479. }
  2480. catch(IException *e)
  2481. {
  2482. StringBuffer msg;
  2483. result->setMessage(e->errorMessage(msg).str());
  2484. result->setCode(e->errorCode());
  2485. result->setSuccess(false);
  2486. e->Release();
  2487. }
  2488. results.append(*result.getClear());
  2489. }
  2490. resp.setResults(results);
  2491. return true;
  2492. }
  2493. #define QUERYPATH_SEP_CHAR '/'
  2494. bool nextQueryPathNode(const char *&path, StringBuffer &node)
  2495. {
  2496. if (*path==QUERYPATH_SEP_CHAR)
  2497. path++;
  2498. while (*path && *path!=QUERYPATH_SEP_CHAR)
  2499. node.append(*path++);
  2500. return (*path && *++path);
  2501. }
  2502. bool splitQueryPath(const char *path, StringBuffer &netAddress, StringBuffer &queryset, StringBuffer *query)
  2503. {
  2504. if (!path || !*path)
  2505. return false;
  2506. if (*path==QUERYPATH_SEP_CHAR && path[1]==QUERYPATH_SEP_CHAR)
  2507. {
  2508. path+=2;
  2509. if (!nextQueryPathNode(path, netAddress))
  2510. return false;
  2511. }
  2512. if (!nextQueryPathNode(path, queryset))
  2513. return (query==NULL);
  2514. if (!query)
  2515. return false;
  2516. if (nextQueryPathNode(path, *query))
  2517. return false; //query path too deep
  2518. return true;
  2519. }
  2520. IPropertyTree *fetchRemoteQuerySetInfo(IEspContext *context, const char *srcAddress, const char *srcTarget, bool useSSL)
  2521. {
  2522. if (!srcAddress || !*srcAddress || !srcTarget || !*srcTarget)
  2523. return NULL;
  2524. VStringBuffer url("%s://%s%s/WsWorkunits/WUQuerysetDetails.xml?ver_=1.51&QuerySetName=%s&FilterType=All", useSSL ? "https" : "http", srcAddress, (!strchr(srcAddress, ':')) ? ":8010" : "", srcTarget);
  2525. Owned<IHttpClientContext> httpCtx = getHttpClientContext();
  2526. Owned<IHttpClient> httpclient = httpCtx->createHttpClient(NULL, url);
  2527. const char *user = context->queryUserId();
  2528. if (user && *user)
  2529. httpclient->setUserID(user);
  2530. const char *pw = context->queryPassword();
  2531. if (pw && *pw)
  2532. httpclient->setPassword(pw);
  2533. StringBuffer request; //empty
  2534. StringBuffer response;
  2535. StringBuffer status;
  2536. if (0 > httpclient->sendRequest("GET", NULL, request, response, status) || !response.length() || strncmp("200", status, 3))
  2537. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Error fetching remote queryset information: %s %s %s", srcAddress, srcTarget, status.str());
  2538. return createPTreeFromXMLString(response);
  2539. }
  2540. class QueryCloner
  2541. {
  2542. public:
  2543. QueryCloner(IEspContext *_context, const char *address, const char *source, const char *_target, bool _useSSL) :
  2544. context(_context), target(_target), srcAddress(address), useSSL(_useSSL)
  2545. {
  2546. if (srcAddress.length())
  2547. srcQuerySet.setown(fetchRemoteQuerySetInfo(context, srcAddress, source, useSSL));
  2548. else
  2549. srcQuerySet.setown(getQueryRegistry(source, true));
  2550. if (!srcQuerySet)
  2551. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Source Queryset %s %s not found", srcAddress.str(), source);
  2552. destQuerySet.setown(getQueryRegistry(target, false));
  2553. if (!destQuerySet) // getQueryRegistry should have created if not found
  2554. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Destination Queryset %s could not be created, or found", target.str());
  2555. factory.setown(getWorkUnitFactory(context->querySecManager(), context->queryUser()));
  2556. }
  2557. QueryCloner(IEspContext *_context, IPropertyTree *srcTree, const char *_target) :
  2558. context(_context), target(_target)
  2559. {
  2560. srcQuerySet.set(srcTree);
  2561. destQuerySet.setown(getQueryRegistry(target, false));
  2562. if (!destQuerySet) // getQueryRegistry should have created if not found
  2563. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Destination Queryset %s could not be created, or found", target.str());
  2564. factory.setown(getWorkUnitFactory(context->querySecManager(), context->queryUser()));
  2565. }
  2566. void setQueryDirectory(const char *dir)
  2567. {
  2568. queryDirectory.set(dir);
  2569. }
  2570. void cloneQueryRemote(IPropertyTree *query, bool makeActive)
  2571. {
  2572. StringBuffer wuid(query->queryProp("Wuid"));
  2573. if (!wuid.length())
  2574. return;
  2575. const char *queryName = query->queryProp("Name");
  2576. if (!queryName || !*queryName)
  2577. return;
  2578. StringBuffer xml;
  2579. MemoryBuffer dll;
  2580. StringBuffer dllname;
  2581. StringBuffer fetchedName;
  2582. StringBuffer remoteDfs;
  2583. fetchRemoteWorkunit(NULL, context, srcAddress.str(), NULL, NULL, wuid, fetchedName, xml, dllname, dll, remoteDfs, useSSL);
  2584. deploySharedObject(*context, wuid, dllname, target, queryName, dll, queryDirectory, xml.str(), false);
  2585. SCMStringBuffer existingQueryId;
  2586. queryIdFromQuerySetWuid(destQuerySet, wuid, queryName, existingQueryId);
  2587. if (existingQueryId.length())
  2588. {
  2589. existingQueryIds.append(existingQueryId.str());
  2590. if (makeActive)
  2591. activateQuery(destQuerySet, ACTIVATE_SUSPEND_PREVIOUS, queryName, existingQueryId.str(), context->queryUserId());
  2592. return;
  2593. }
  2594. StringBuffer newQueryId;
  2595. Owned<IWorkUnit> workunit = factory->updateWorkUnit(wuid);
  2596. addQueryToQuerySet(workunit, destQuerySet, queryName, makeActive ? ACTIVATE_SUSPEND_PREVIOUS : DO_NOT_ACTIVATE, newQueryId, context->queryUserId());
  2597. copiedQueryIds.append(newQueryId);
  2598. Owned<IPropertyTree> destQuery = getQueryById(destQuerySet, newQueryId);
  2599. if (destQuery)
  2600. {
  2601. Owned<IAttributeIterator> aiter = query->getAttributes();
  2602. ForEach(*aiter)
  2603. {
  2604. const char *atname = aiter->queryName();
  2605. if (!destQuery->hasProp(atname))
  2606. destQuery->setProp(atname, aiter->queryValue());
  2607. }
  2608. if (cloneFilesEnabled && wufiles)
  2609. wufiles->addFilesFromQuery(workunit, pm, newQueryId);
  2610. }
  2611. }
  2612. void cloneQueryLocal(IPropertyTree *query, bool makeActive)
  2613. {
  2614. const char *wuid = query->queryProp("@wuid");
  2615. if (!wuid || !*wuid)
  2616. return;
  2617. const char *queryName = query->queryProp("@name");
  2618. if (!queryName || !*queryName)
  2619. return;
  2620. SCMStringBuffer existingQueryId;
  2621. queryIdFromQuerySetWuid(destQuerySet, wuid, queryName, existingQueryId);
  2622. if (existingQueryId.length())
  2623. {
  2624. existingQueryIds.append(existingQueryId.str());
  2625. if (makeActive)
  2626. activateQuery(destQuerySet, ACTIVATE_SUSPEND_PREVIOUS, queryName, existingQueryId.str(), context->queryUserId());
  2627. return;
  2628. }
  2629. StringBuffer newQueryId;
  2630. Owned<IWorkUnit> workunit = factory->updateWorkUnit(wuid);
  2631. if (!workunit)
  2632. {
  2633. StringBuffer msg(wuid);
  2634. msg.append(": ").append(query->queryProp("@id"));
  2635. missingWuids.append(msg);
  2636. return;
  2637. }
  2638. if (!newQueryId.length())
  2639. addQueryToQuerySet(workunit, destQuerySet, queryName, makeActive ? ACTIVATE_SUSPEND_PREVIOUS : DO_NOT_ACTIVATE, newQueryId, context->queryUserId());
  2640. copiedQueryIds.append(newQueryId);
  2641. Owned<IPropertyTree> destQuery = getQueryById(destQuerySet, newQueryId);
  2642. if (destQuery)
  2643. {
  2644. Owned<IAttributeIterator> aiter = query->getAttributes();
  2645. ForEach(*aiter)
  2646. {
  2647. const char *atname = aiter->queryName();
  2648. if (!destQuery->hasProp(atname))
  2649. destQuery->setProp(atname, aiter->queryValue());
  2650. }
  2651. Owned<IPropertyTreeIterator> children = query->getElements("*");
  2652. ForEach(*children)
  2653. {
  2654. IPropertyTree &child = children->query();
  2655. destQuery->addPropTree(child.queryName(), createPTreeFromIPT(&child));
  2656. }
  2657. if (cloneFilesEnabled && wufiles)
  2658. wufiles->addFilesFromQuery(workunit, pm, newQueryId);
  2659. }
  2660. }
  2661. void cloneActiveRemote(bool makeActive)
  2662. {
  2663. Owned<IPropertyTreeIterator> activeQueries = srcQuerySet->getElements("QuerysetAliases/QuerySetAlias");
  2664. ForEach(*activeQueries)
  2665. {
  2666. IPropertyTree &alias = activeQueries->query();
  2667. VStringBuffer xpath("QuerysetQueries/QuerySetQuery[Id='%s'][1]", alias.queryProp("Id"));
  2668. IPropertyTree *query = srcQuerySet->queryPropTree(xpath);
  2669. if (!query)
  2670. continue;
  2671. cloneQueryRemote(query, makeActive);
  2672. }
  2673. }
  2674. void cloneActiveLocal(bool makeActive, const char *mask)
  2675. {
  2676. StringBuffer xpath("Alias");
  2677. if (mask && *mask)
  2678. xpath.appendf("[@id='%s']", mask);
  2679. Owned<IPropertyTreeIterator> activeQueries = srcQuerySet->getElements(xpath);
  2680. ForEach(*activeQueries)
  2681. {
  2682. IPropertyTree &alias = activeQueries->query();
  2683. Owned<IPropertyTree> query = getQueryById(srcQuerySet, alias.queryProp("@id"));
  2684. if (!query)
  2685. return;
  2686. cloneQueryLocal(query, makeActive);
  2687. }
  2688. }
  2689. void cloneActive(bool makeActive)
  2690. {
  2691. if (srcAddress.length())
  2692. cloneActiveRemote(makeActive);
  2693. else
  2694. cloneActiveLocal(makeActive, nullptr);
  2695. }
  2696. void cloneAllRemote(bool cloneActiveState)
  2697. {
  2698. Owned<IPropertyTreeIterator> allQueries = srcQuerySet->getElements("QuerysetQueries/QuerySetQuery");
  2699. ForEach(*allQueries)
  2700. {
  2701. IPropertyTree &query = allQueries->query();
  2702. bool makeActive = false;
  2703. if (cloneActiveState)
  2704. {
  2705. VStringBuffer xpath("QuerysetAliases/QuerySetAlias[Id='%s']", query.queryProp("Id"));
  2706. makeActive = srcQuerySet->hasProp(xpath);
  2707. }
  2708. cloneQueryRemote(&query, makeActive);
  2709. }
  2710. }
  2711. void cloneAllLocal(bool cloneActiveState, const char *mask)
  2712. {
  2713. StringBuffer xpath("Query");
  2714. if (mask && *mask)
  2715. xpath.appendf("[@id='%s']", mask);
  2716. Owned<IPropertyTreeIterator> allQueries = srcQuerySet->getElements(xpath);
  2717. ForEach(*allQueries)
  2718. {
  2719. IPropertyTree &query = allQueries->query();
  2720. bool makeActive = false;
  2721. if (cloneActiveState)
  2722. {
  2723. VStringBuffer xpath("Alias[@id='%s']", query.queryProp("@id"));
  2724. makeActive = srcQuerySet->hasProp(xpath);
  2725. }
  2726. cloneQueryLocal(&query, makeActive);
  2727. }
  2728. }
  2729. void cloneAll(bool cloneActiveState)
  2730. {
  2731. if (srcAddress.length())
  2732. cloneAllRemote(cloneActiveState);
  2733. else
  2734. cloneAllLocal(cloneActiveState, nullptr);
  2735. }
  2736. void enableFileCloning(unsigned _updateFlags, const char *dfsServer, const char *destProcess, const char *sourceProcess, bool allowForeign)
  2737. {
  2738. cloneFilesEnabled = true;
  2739. updateFlags = _updateFlags;
  2740. splitDerivedDfsLocation(dfsServer, srcCluster, dfsIP, srcPrefix, sourceProcess, sourceProcess, NULL, NULL);
  2741. wufiles.setown(createReferencedFileList(context->queryUserId(), context->queryPassword(), allowForeign, false));
  2742. Owned<IHpccPackageSet> ps = createPackageSet(destProcess);
  2743. pm.set(ps->queryActiveMap(target));
  2744. if (isContainerized())
  2745. {
  2746. StringAttrBuilder builder(process);
  2747. getRoxieDirectAccessPlanes(locations, builder, target, true);
  2748. }
  2749. else
  2750. process.set(destProcess);
  2751. }
  2752. void cloneFiles()
  2753. {
  2754. if (cloneFilesEnabled)
  2755. {
  2756. wufiles->resolveFiles(locations, dfsIP, srcPrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM)), true, false, true);
  2757. Owned<IDFUhelper> helper = createIDFUhelper();
  2758. Owned <IConstWUClusterInfo> cl = getWUClusterInfoByName(target);
  2759. if (cl)
  2760. {
  2761. #ifdef _CONTAINERIZED
  2762. wufiles->cloneAllInfo(process.str(), updateFlags, helper, true, true, 0, 1, 0, nullptr);
  2763. #else
  2764. SCMStringBuffer process;
  2765. StringBuffer defReplicateFolder;
  2766. getConfigurationDirectory(NULL, "data2", "roxie", cl->getRoxieProcess(process).str(), defReplicateFolder);
  2767. wufiles->cloneAllInfo(process.str(), updateFlags, helper, true, true, cl->getRoxieRedundancy(), cl->getChannelsPerNode(), cl->getRoxieReplicateOffset(), defReplicateFolder);
  2768. #endif
  2769. }
  2770. }
  2771. }
  2772. void gatherFileErrors(IArrayOf<IConstLogicalFileError> &errors)
  2773. {
  2774. if (wufiles)
  2775. ::gatherFileErrors(wufiles, errors);
  2776. }
  2777. private:
  2778. Linked<IEspContext> context;
  2779. Linked<IWorkUnitFactory> factory;
  2780. Owned<IPropertyTree> destQuerySet;
  2781. Owned<IPropertyTree> srcQuerySet;
  2782. Owned<IReferencedFileList> wufiles;
  2783. Owned<const IHpccPackageMap> pm;
  2784. StringBuffer dfsIP;
  2785. StringBuffer srcAddress;
  2786. StringBuffer srcCluster;
  2787. StringBuffer srcPrefix;
  2788. StringAttr target;
  2789. StringAttr process;
  2790. StringAttr queryDirectory;
  2791. bool cloneFilesEnabled = false;
  2792. bool useSSL = false;
  2793. unsigned updateFlags = 0;
  2794. StringArray locations;
  2795. public:
  2796. StringArray existingQueryIds;
  2797. StringArray copiedQueryIds;
  2798. StringArray missingWuids;
  2799. };
  2800. bool CWsWorkunitsEx::onWUCopyQuerySet(IEspContext &context, IEspWUCopyQuerySetRequest &req, IEspWUCopyQuerySetResponse &resp)
  2801. {
  2802. const char *source = req.getSource();
  2803. if (!source || !*source)
  2804. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No source target specified");
  2805. StringBuffer srcAddress;
  2806. StringBuffer srcTarget;
  2807. if (!splitQueryPath(source, srcAddress, srcTarget, NULL))
  2808. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid source target");
  2809. //don't validate source target if it's remote (has srcAddress)
  2810. if (srcAddress.isEmpty() && !srcTarget.isEmpty())
  2811. validateTargetName(srcTarget);
  2812. const char *target = req.getTarget();
  2813. validateTargetName(target);
  2814. DBGLOG("%s copying queryset %s from %s target %s", context.queryUserId(), target, srcAddress.str(), srcTarget.str());
  2815. QueryCloner cloner(&context, srcAddress, srcTarget, target, req.getSourceSSL());
  2816. cloner.setQueryDirectory(queryDirectory);
  2817. SCMStringBuffer process;
  2818. if (req.getCopyFiles())
  2819. {
  2820. Owned <IConstWUClusterInfo> clusterInfo = getWUClusterInfoByName(target);
  2821. if (clusterInfo && clusterInfo->getPlatform()==RoxieCluster)
  2822. {
  2823. clusterInfo->getRoxieProcess(process);
  2824. if (!process.length())
  2825. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_INFO, "DFS process cluster not found for destination target %s", target);
  2826. unsigned updateFlags = 0;
  2827. if (req.getOverwriteDfs())
  2828. updateFlags |= (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES);
  2829. if (req.getUpdateCloneFrom())
  2830. updateFlags |= DALI_UPDATEF_CLONE_FROM;
  2831. if (req.getUpdateSuperFiles())
  2832. updateFlags |= DALI_UPDATEF_SUPERFILES;
  2833. if (req.getAppendCluster())
  2834. updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
  2835. cloner.enableFileCloning(updateFlags, req.getDfsServer(), process.str(), req.getSourceProcess(), req.getAllowForeignFiles());
  2836. }
  2837. }
  2838. if (req.getActiveOnly())
  2839. cloner.cloneActive(req.getCloneActiveState());
  2840. else
  2841. cloner.cloneAll(req.getCloneActiveState());
  2842. cloner.cloneFiles();
  2843. if (req.getIncludeFileErrors())
  2844. cloner.gatherFileErrors(resp.getFileErrors());
  2845. resp.setCopiedQueries(cloner.copiedQueryIds);
  2846. resp.setExistingQueries(cloner.existingQueryIds);
  2847. return true;
  2848. }
  2849. bool CWsWorkunitsEx::onWUQuerysetCopyQuery(IEspContext &context, IEspWUQuerySetCopyQueryRequest &req, IEspWUQuerySetCopyQueryResponse &resp)
  2850. {
  2851. unsigned start = msTick();
  2852. const char *source = req.getSource();
  2853. if (!source || !*source)
  2854. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No source query specified");
  2855. const char *target = req.getTarget();
  2856. if (!target || !*target)
  2857. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "No destination specified");
  2858. if (strchr(target, '/')) //for future use
  2859. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid target queryset name");
  2860. if (req.getCluster() && *req.getCluster() && !strieq(req.getCluster(), target)) //backward compatability check
  2861. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid target cluster and queryset must match");
  2862. validateTargetName(target);
  2863. StringBuffer srcAddress, srcQuerySet, srcQuery;
  2864. if (!splitQueryPath(source, srcAddress, srcQuerySet, &srcQuery))
  2865. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid source query path");
  2866. StringAttr targetQueryName(req.getDestName());
  2867. Owned<IClientWUQuerySetDetailsResponse> sourceQueryInfoResp;
  2868. IConstQuerySetQuery *srcInfo=NULL;
  2869. DBGLOG("%s copying query %s to target %s from %s target %s", context.queryUserId(), srcQuery.str(), target, srcAddress.str(), srcQuerySet.str());
  2870. StringBuffer remoteIP;
  2871. StringBuffer wuid;
  2872. if (srcAddress.length())
  2873. {
  2874. StringBuffer xml;
  2875. MemoryBuffer dll;
  2876. StringBuffer dllname;
  2877. StringBuffer queryName;
  2878. fetchRemoteWorkunitAndQueryDetails(NULL, &context, srcAddress.str(), srcQuerySet.str(), srcQuery.str(), NULL, queryName, xml, dllname, dll, remoteIP, sourceQueryInfoResp, req.getSourceSSL());
  2879. if (sourceQueryInfoResp && sourceQueryInfoResp->getQuerysetQueries().ordinality())
  2880. srcInfo = &sourceQueryInfoResp->getQuerysetQueries().item(0);
  2881. if (srcInfo)
  2882. wuid.set(srcInfo->getWuid());
  2883. if (targetQueryName.isEmpty())
  2884. targetQueryName.set(queryName);
  2885. deploySharedObject(context, wuid, dllname.str(), target, targetQueryName.get(), dll, queryDirectory.str(), xml.str(), false);
  2886. }
  2887. else
  2888. {
  2889. //Could get the atributes without soap call, but this creates a common data structure shared with fetching remote query info
  2890. //Get query attributes before resolveQueryAlias, to avoid deadlock
  2891. sourceQueryInfoResp.setown(fetchQueryDetails(NULL, &context, NULL, srcQuerySet, srcQuery, req.getSourceSSL()));
  2892. if (sourceQueryInfoResp && sourceQueryInfoResp->getQuerysetQueries().ordinality())
  2893. srcInfo = &sourceQueryInfoResp->getQuerysetQueries().item(0);
  2894. Owned<IPropertyTree> queryset = getQueryRegistry(srcQuerySet.str(), true);
  2895. if (!queryset)
  2896. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Source Queryset %s not found", srcQuery.str());
  2897. Owned<IPropertyTree> query = resolveQueryAlias(queryset, srcQuery.str());
  2898. if (!query)
  2899. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Source query %s not found", source);
  2900. wuid.set(query->queryProp("@wuid"));
  2901. if (targetQueryName.isEmpty())
  2902. targetQueryName.set(query->queryProp("@name"));
  2903. }
  2904. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  2905. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
  2906. if (!req.getDontCopyFiles())
  2907. {
  2908. StringBuffer daliIP;
  2909. StringBuffer srcCluster;
  2910. StringBuffer srcPrefix;
  2911. splitDerivedDfsLocation(req.getDaliServer(), srcCluster, daliIP, srcPrefix, req.getSourceProcess(), req.getSourceProcess(), remoteIP.str(), NULL);
  2912. unsigned updateFlags = 0;
  2913. if (req.getOverwrite())
  2914. updateFlags |= (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES);
  2915. if (req.getUpdateCloneFrom())
  2916. updateFlags |= DALI_UPDATEF_CLONE_FROM;
  2917. if (req.getUpdateSuperFiles())
  2918. updateFlags |= DALI_UPDATEF_SUPERFILES;
  2919. if (req.getAppendCluster())
  2920. updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
  2921. QueryFileCopier cpr(target);
  2922. cpr.init(context, req.getAllowForeignFiles());
  2923. cpr.remoteIP.set(daliIP);
  2924. cpr.remotePrefix.set(srcPrefix);
  2925. cpr.srcCluster.set(srcCluster);
  2926. cpr.queryname.set(targetQueryName);
  2927. cpr.copy(cw, updateFlags);
  2928. if (req.getIncludeFileErrors())
  2929. cpr.gatherFileErrors(resp.getFileErrors());
  2930. }
  2931. WorkunitUpdate wu(&cw->lock());
  2932. if (!wu)
  2933. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "Error opening wuid %s for query %s", wuid.str(), source);
  2934. StringBuffer targetQueryId;
  2935. WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
  2936. addQueryToQuerySet(wu, target, targetQueryName.get(), activate, targetQueryId, context.queryUserId());
  2937. Owned<IPropertyTree> queryTree = getQueryById(target, targetQueryId, false);
  2938. if (queryTree)
  2939. {
  2940. updateMemoryLimitSetting(queryTree, req.getMemoryLimit(), srcInfo);
  2941. updateQueryPriority(queryTree, req.getPriority(), srcInfo);
  2942. updateTimeLimitSetting(queryTree, req.getTimeLimit_isNull(), req.getTimeLimit(), srcInfo);
  2943. updateWarnTimeLimitSetting(queryTree, req.getWarnTimeLimit_isNull(), req.getWarnTimeLimit(), srcInfo);
  2944. if (req.getComment())
  2945. queryTree->setProp("@comment", req.getComment());
  2946. else if (srcInfo && srcInfo->getComment())
  2947. queryTree->setProp("@comment", srcInfo->getComment());
  2948. if (srcInfo && srcInfo->getSnapshot())
  2949. queryTree->setProp("@snapshot", srcInfo->getSnapshot());
  2950. }
  2951. wu.clear();
  2952. resp.setQueryId(targetQueryId.str());
  2953. if (0!=req.getWait() && !req.getNoReload())
  2954. #ifndef _CONTAINERIZED
  2955. reloadCluster(target, remainingMsWait(req.getWait(), start));
  2956. #else
  2957. reloadCluster(roxieConnMap, target, (unsigned)req.getWait());
  2958. #endif
  2959. return true;
  2960. }
  2961. bool CWsWorkunitsEx::onWUQuerysetImport(IEspContext &context, IEspWUQuerysetImportRequest &req, IEspWUQuerysetImportResponse &resp)
  2962. {
  2963. try
  2964. {
  2965. const char* target = req.getTarget();
  2966. if (!target || !*target)
  2967. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target not specified");
  2968. Owned <IConstWUClusterInfo> clusterInfo = getWUClusterInfoByName(target);
  2969. if (!clusterInfo)
  2970. throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Target not found");
  2971. if (req.getCopyFiles() && clusterInfo->getPlatform()!=RoxieCluster)
  2972. throw MakeStringException(ECLWATCH_INVALID_ACTION, "Copy files option only supported for Roxie");
  2973. MemoryBuffer &mb = const_cast<MemoryBuffer &>(req.getData()); //for efficiency, content of request shouldn't matter after
  2974. if (req.getCompressed())
  2975. {
  2976. MemoryBuffer decompressed;
  2977. fastLZDecompressToBuffer(decompressed, mb);
  2978. mb.swapWith(decompressed);
  2979. }
  2980. mb.append('\0');
  2981. Owned<IPropertyTree> srcTree = createPTreeFromXMLString(mb.toByteArray());
  2982. const char *archivedTarget = srcTree->queryProp("@target");
  2983. if (archivedTarget && *archivedTarget) //support simple queryset or with archived (exported) root format
  2984. {
  2985. VStringBuffer xpath("QuerySet[@id='%s']", archivedTarget);
  2986. IPropertyTree *qsTree = srcTree->queryPropTree(xpath);
  2987. if (qsTree)
  2988. srcTree.setown(LINK(qsTree));
  2989. }
  2990. if (req.getReplace())
  2991. {
  2992. Owned<IPropertyTree> queryRegistry = getQueryRegistry(target, false);
  2993. queryRegistry->removeProp("*");
  2994. resp.setClearedExisting(true);
  2995. }
  2996. const bool activate = CQuerysetImportActivation_ImportedActive == req.getActivation(); //only two options now but may evolve
  2997. QueryCloner cloner(&context, srcTree, target);
  2998. SCMStringBuffer process;
  2999. if (req.getCopyFiles())
  3000. {
  3001. clusterInfo->getRoxieProcess(process); //checked if roxie when copying files above
  3002. if (!process.length())
  3003. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_INFO, "DFS process cluster not found for destination target %s", target);
  3004. unsigned updateFlags = 0;
  3005. if (req.getOverwriteDfs())
  3006. updateFlags |= (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES);
  3007. if (req.getUpdateCloneFrom())
  3008. updateFlags |= DALI_UPDATEF_CLONE_FROM;
  3009. if (req.getUpdateSuperFiles())
  3010. updateFlags |= DALI_UPDATEF_SUPERFILES;
  3011. if (req.getAppendCluster())
  3012. updateFlags |= DALI_UPDATEF_APPEND_CLUSTER;
  3013. cloner.enableFileCloning(updateFlags, req.getDfsServer(), process.str(), req.getSourceProcess(), req.getAllowForeignFiles());
  3014. }
  3015. if (req.getActiveOnly())
  3016. cloner.cloneActiveLocal(activate, req.getQueryMask());
  3017. else
  3018. cloner.cloneAllLocal(activate, req.getQueryMask());
  3019. cloner.cloneFiles();
  3020. if (req.getIncludeFileErrors())
  3021. cloner.gatherFileErrors(resp.getFileErrors());
  3022. resp.setImportedQueries(cloner.copiedQueryIds);
  3023. resp.setExistingQueries(cloner.existingQueryIds);
  3024. resp.setMissingWuids(cloner.missingWuids);
  3025. }
  3026. catch(IException* e)
  3027. {
  3028. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3029. }
  3030. return true;
  3031. }
  3032. bool CWsWorkunitsEx::onWUQuerysetExport(IEspContext &context, IEspWUQuerysetExportRequest &req, IEspWUQuerysetExportResponse &resp)
  3033. {
  3034. try
  3035. {
  3036. const char* target = req.getTarget();
  3037. if (!target || !*target)
  3038. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Target not specified");
  3039. Owned<IPropertyTree> queryRegistry = getQueryRegistry(target, true);
  3040. if (req.getActiveOnly())
  3041. {
  3042. Owned<IPropertyTree> activeOnly = createPTree("QuerySet");
  3043. Owned<IAttributeIterator> attrs = queryRegistry->getAttributes();
  3044. ForEach(*attrs)
  3045. activeOnly->setProp(attrs->queryName(), attrs->queryValue());
  3046. Owned<IPropertyTreeIterator> aliases = queryRegistry->getElements("Alias");
  3047. ForEach(*aliases)
  3048. {
  3049. IPropertyTree &alias = aliases->query();
  3050. const char *id = alias.queryProp("@id");
  3051. if (id && *id)
  3052. {
  3053. VStringBuffer xpath("Query[@id='%s']", id);
  3054. IPropertyTree *query = queryRegistry->queryPropTree(xpath);
  3055. if (query)
  3056. {
  3057. activeOnly->addPropTree("Query", LINK(query));
  3058. activeOnly->addPropTree("Alias", LINK(&alias));
  3059. }
  3060. }
  3061. }
  3062. queryRegistry.setown(activeOnly.getClear());
  3063. }
  3064. if (req.getProtect())
  3065. {
  3066. StringArray wuids;
  3067. Owned<IPropertyTreeIterator> queries = queryRegistry->getElements("Query");
  3068. ForEach(*queries)
  3069. {
  3070. IPropertyTree &query = queries->query();
  3071. const char *wuid = query.queryProp("@wuid");
  3072. if (wuid && *wuid)
  3073. wuids.append(wuid);
  3074. }
  3075. if (wuids.length())
  3076. doProtectWorkunits(context, wuids, nullptr);
  3077. }
  3078. CDateTime dt;
  3079. dt.setNow();
  3080. StringBuffer dts;
  3081. VStringBuffer qs("<QuerySetArchive exported='%s' target='%s' activeOnly='%s'>\n", dt.getString(dts, true).str(), target, req.getActiveOnly() ? "true" : "false");
  3082. toXML(queryRegistry, qs);
  3083. qs.append("</QuerySetArchive>");
  3084. MemoryBuffer content;
  3085. if (req.getCompress())
  3086. fastLZCompressToBuffer(content, qs.length()+1, qs);
  3087. else
  3088. content.append(qs.str());
  3089. resp.setTarget(target);
  3090. resp.setCompressed(req.getCompress());
  3091. resp.setData(content);
  3092. }
  3093. catch(IException* e)
  3094. {
  3095. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3096. }
  3097. return true;
  3098. }
  3099. void CWsWorkunitsEx::getGraphsByQueryId(const char *target, const char *queryId, const char *graphId, const char *subGraphId, IArrayOf<IEspECLGraphEx>& ECLGraphs)
  3100. {
  3101. if (!target || !*target)
  3102. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Target name required");
  3103. if (!queryId || !*queryId)
  3104. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Query Id required");
  3105. #ifndef _CONTAINERIZED
  3106. Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
  3107. if (!info || (info->getPlatform()!=RoxieCluster)) //Only support roxie for now
  3108. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid Roxie name");
  3109. PROGLOG("getGraphsByQueryId: target %s, query %s", target, queryId);
  3110. const SocketEndpointArray &eps = info->getRoxieServers();
  3111. if (eps.empty())
  3112. return;
  3113. VStringBuffer control("<control:querystats><Query id='%s'/></control:querystats>", queryId);
  3114. Owned<IPropertyTree> querystats = sendRoxieControlAllNodes(eps.item(0), control.str(), false, ROXIELOCKCONNECTIONTIMEOUT);
  3115. #else
  3116. ISmartSocketFactory *conn = roxieConnMap.getValue(target);
  3117. if (!conn)
  3118. throw makeStringExceptionV(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid Roxie name %s", target);
  3119. PROGLOG("getGraphsByQueryId: target %s, query %s", target, queryId);
  3120. VStringBuffer control("<control:querystats><Query id='%s'/></control:querystats>", queryId);
  3121. Owned<IPropertyTree> querystats = sendRoxieControlAllNodes(conn, control.str(), false, ROXIELOCKCONNECTIONTIMEOUT, ROXIECONNECTIONTIMEOUT);
  3122. #endif
  3123. if (!querystats)
  3124. return;
  3125. Owned<IPropertyTreeIterator> graphs = querystats->getElements("Endpoint/Query/Graph");
  3126. ForEach(*graphs)
  3127. {
  3128. IPropertyTree &graph = graphs->query();
  3129. const char* aGraphId = graph.queryProp("@id");
  3130. if (graphId && *graphId && !strieq(graphId, aGraphId))
  3131. continue;
  3132. IPropertyTree* xgmml = graph.getBranch("xgmml/graph");
  3133. if (!xgmml)
  3134. continue;
  3135. Owned<IEspECLGraphEx> g = createECLGraphEx("","");
  3136. g->setName(aGraphId);
  3137. StringBuffer xml;
  3138. if (!subGraphId || !*subGraphId)
  3139. toXML(xgmml, xml);
  3140. else
  3141. {
  3142. VStringBuffer xpath("//node[@id='%s']", subGraphId);
  3143. toXML(xgmml->queryPropTree(xpath.str()), xml);
  3144. }
  3145. g->setGraph(xml.str());
  3146. ECLGraphs.append(*g.getClear());
  3147. }
  3148. return;
  3149. }
  3150. bool CWsWorkunitsEx::onWUQueryGetGraph(IEspContext& context, IEspWUQueryGetGraphRequest& req, IEspWUQueryGetGraphResponse& resp)
  3151. {
  3152. try
  3153. {
  3154. IArrayOf<IEspECLGraphEx> graphs;
  3155. getGraphsByQueryId(req.getTarget(), req.getQueryId(), req.getGraphName(), req.getSubGraphId(), graphs);
  3156. resp.setGraphs(graphs);
  3157. }
  3158. catch(IException* e)
  3159. {
  3160. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3161. }
  3162. return true;
  3163. }
  3164. bool CWsWorkunitsEx::resetQueryStats(IEspContext& context, const char* target, IProperties* queryIds, IEspWUQuerySetQueryActionResponse& resp)
  3165. {
  3166. IArrayOf<IEspQuerySetQueryActionResult> results;
  3167. Owned<IEspQuerySetQueryActionResult> result = createQuerySetQueryActionResult();
  3168. try
  3169. {
  3170. StringBuffer control;
  3171. Owned<IPropertyIterator> it = queryIds->getIterator();
  3172. ForEach(*it)
  3173. {
  3174. const char *queryId = it->getPropKey();
  3175. if (queryId && *queryId)
  3176. {
  3177. appendXMLOpenTag(control, "Query", NULL, false);
  3178. appendXMLAttr(control, "id", queryId);
  3179. if (target && *target)
  3180. appendXMLAttr(control, "target", target);
  3181. control.append("/>");
  3182. }
  3183. }
  3184. if (!control.length())
  3185. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "CWsWorkunitsEx::resetQueryStats: Query ID not specified");
  3186. control.insert(0, "<control:resetquerystats>");
  3187. control.append("</control:resetquerystats>");
  3188. if (!sendControlQuery(context, target, control.str(), ROXIECONNECTIONTIMEOUT))
  3189. throw MakeStringException(ECLWATCH_INTERNAL_ERROR, "CWsWorkunitsEx::resetQueryStats: Failed to send roxie control query");
  3190. result->setMessage("Query stats reset succeeded");
  3191. result->setSuccess(true);;
  3192. }
  3193. catch(IMultiException *me)
  3194. {
  3195. StringBuffer msg;
  3196. result->setMessage(me->errorMessage(msg).str());
  3197. result->setCode(me->errorCode());
  3198. result->setSuccess(false);
  3199. me->Release();
  3200. }
  3201. catch(IException *e)
  3202. {
  3203. StringBuffer msg;
  3204. result->setMessage(e->errorMessage(msg).str());
  3205. result->setCode(e->errorCode());
  3206. result->setSuccess(false);
  3207. e->Release();
  3208. }
  3209. results.append(*result.getClear());
  3210. resp.setResults(results);
  3211. return true;
  3212. }
  3213. IPropertyTree* CWsWorkunitsEx::sendControlQuery(IEspContext& context, const char* target, const char* query, unsigned timeout)
  3214. {
  3215. if (!target || !*target)
  3216. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "CWsWorkunitsEx::sendControlQuery: target not specified");
  3217. if (!query || !*query)
  3218. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "CWsWorkunitsEx::sendControlQuery: Control query not specified");
  3219. #ifndef _CONTAINERIZED
  3220. Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
  3221. if (!info || (info->getPlatform()!=RoxieCluster)) //Only support roxie for now
  3222. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "CWsWorkunitsEx::sendControlQuery: Invalid target name %s", target);
  3223. const SocketEndpointArray &eps = info->getRoxieServers();
  3224. if (eps.empty())
  3225. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "CWsWorkunitsEx::sendControlQuery: Server not found for %s", target);
  3226. Owned<ISocket> sock = ISocket::connect_timeout(eps.item(0), timeout);
  3227. return sendRoxieControlQuery(sock, query, timeout);
  3228. #else
  3229. ISmartSocketFactory *conn = roxieConnMap.getValue(target);
  3230. if (!conn)
  3231. throw makeStringExceptionV(ECLWATCH_CANNOT_GET_ENV_INFO, "roxie target cluster not mapped: %s", target);
  3232. return sendRoxieControlQuery(conn, query, timeout, ROXIECONNECTIONTIMEOUT);
  3233. #endif
  3234. }
  3235. bool CWsWorkunitsEx::onWUUpdateQueryEntry(IEspContext& context, IEspWUUpdateQueryEntryRequest& req, IEspWUUpdateQueryEntryResponse& resp)
  3236. {
  3237. try
  3238. {
  3239. StringBuffer querySetName, query;
  3240. ensureInputString(req.getQuerySet(), true, querySetName, ECLWATCH_QUERYSET_NOT_FOUND, "Query Set not specified");
  3241. ensureInputString(req.getQueryId(), true, query, ECLWATCH_QUERYID_NOT_FOUND, "Query not specified");
  3242. Owned<IPropertyTree> querySet = getQueryRegistry(querySetName.str(), true);
  3243. if (!querySet)
  3244. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Queryset %s not found", querySetName.str());
  3245. VStringBuffer xpath("Query[@id=\"%s\"]", query.str());
  3246. IPropertyTree *tree = querySet->queryPropTree(xpath);
  3247. if (!tree)
  3248. throw MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "Query %s not found", query.str());
  3249. StringBuffer comment(req.getComment());
  3250. if (comment.isEmpty())
  3251. tree->removeProp("@comment");
  3252. else
  3253. tree->setProp("@comment", comment.str());
  3254. }
  3255. catch(IException* e)
  3256. {
  3257. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3258. }
  3259. return true;
  3260. }
  3261. bool CWsWorkunitsEx::onWUGetNumFileToCopy(IEspContext& context, IEspWUGetNumFileToCopyRequest& req, IEspWUGetNumFileToCopyResponse& resp)
  3262. {
  3263. class CWUGetNumFileToCopyPager : public CSimpleInterface, implements IElementsPager
  3264. {
  3265. StringAttr clusterName;
  3266. StringAttr sortOrder;
  3267. MapStringToMyClass<ISmartSocketFactory> *roxieConnMap;
  3268. public:
  3269. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  3270. #ifndef _CONTAINERIZED
  3271. CWUGetNumFileToCopyPager(const char* _clusterName, const char *_sortOrder)
  3272. : clusterName(_clusterName), sortOrder(_sortOrder) { };
  3273. #else
  3274. CWUGetNumFileToCopyPager(const char* _clusterName, const char* _sortOrder, MapStringToMyClass<ISmartSocketFactory>* _roxieConnMap)
  3275. : clusterName(_clusterName), sortOrder(_sortOrder), roxieConnMap(_roxieConnMap) { };
  3276. #endif
  3277. virtual IRemoteConnection* getElements(IArrayOf<IPropertyTree> &elements)
  3278. {
  3279. #ifndef _CONTAINERIZED
  3280. SocketEndpointArray servers;
  3281. getRoxieProcessServers(clusterName.get(), servers);
  3282. if (servers.length() < 1)
  3283. {
  3284. PROGLOG("WUGetNumFileToCopy: Process Server not found for %s", clusterName.get());
  3285. return NULL;
  3286. }
  3287. Owned<IPropertyTree> result = sendRoxieControlAllNodes(servers.item(0), "<control:numfilestoprocess/>", false, ROXIELOCKCONNECTIONTIMEOUT);
  3288. #else
  3289. ISmartSocketFactory *conn = roxieConnMap->getValue(clusterName);
  3290. if (!conn)
  3291. {
  3292. PROGLOG("WUGetNumFileToCopy: Process Server not found for %s", clusterName.get());
  3293. return nullptr;
  3294. }
  3295. Owned<IPropertyTree> result = sendRoxieControlAllNodes(conn, "<control:numfilestoprocess/>", false, ROXIELOCKCONNECTIONTIMEOUT, ROXIECONNECTIONTIMEOUT);
  3296. #endif
  3297. if (!result)
  3298. {
  3299. PROGLOG("WUGetNumFileToCopy: Empty result received for cluster %s", clusterName.get());
  3300. return NULL;
  3301. }
  3302. Owned<IPropertyTreeIterator> iter = result->getElements("*");
  3303. if (!iter)
  3304. return NULL;
  3305. StringArray unknownAttributes;
  3306. sortElements(iter, sortOrder.get(), NULL, NULL, unknownAttributes, elements);
  3307. return NULL;
  3308. }
  3309. virtual bool allMatchingElementsReceived() { return true; } //For now, roxie always returns all of matched items.
  3310. };
  3311. try
  3312. {
  3313. #ifndef _CONTAINERIZED
  3314. StringBuffer clusterName(req.getClusterName());
  3315. if (clusterName.isEmpty())
  3316. throw MakeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Cluster not specified");
  3317. #else
  3318. StringBuffer targetName(req.getTargetName());
  3319. if (targetName.isEmpty())
  3320. targetName.set(req.getClusterName()); //for backward compatible
  3321. if (targetName.isEmpty())
  3322. throw makeStringException(ECLWATCH_CANNOT_RESOLVE_CLUSTER_NAME, "Target not specified");
  3323. #endif
  3324. StringBuffer so;
  3325. bool descending = req.getDescending();
  3326. if (descending)
  3327. so.set("-");
  3328. const char *sortBy = req.getSortby();
  3329. if (!isEmptyString(sortBy) && strieq(sortBy, "URL"))
  3330. so.append("?@ep");
  3331. else if (!isEmptyString(sortBy) && strieq(sortBy, "Status"))
  3332. so.append("?Status");
  3333. else
  3334. so.append("#FilesToProcess/@value");
  3335. unsigned pageSize = req.getPageSize();
  3336. unsigned pageStartFrom = req.getPageStartFrom();
  3337. if(pageSize < 1)
  3338. pageSize = 100;
  3339. __int64 cacheHint = 0;
  3340. if (!req.getCacheHint_isNull())
  3341. cacheHint = req.getCacheHint();
  3342. unsigned numberOfEndpoints = 0;
  3343. IArrayOf<IPropertyTree> results;
  3344. #ifndef _CONTAINERIZED
  3345. Owned<IElementsPager> elementsPager = new CWUGetNumFileToCopyPager(clusterName, so);
  3346. #else
  3347. Owned<IElementsPager> elementsPager = new CWUGetNumFileToCopyPager(targetName, so, &roxieConnMap);
  3348. #endif
  3349. getElementsPaged(elementsPager, pageStartFrom, pageSize, NULL, "", &cacheHint, results, &numberOfEndpoints, NULL, false);
  3350. IArrayOf<IEspClusterEndpoint> endpoints;
  3351. ForEachItemIn(i, results)
  3352. {
  3353. IPropertyTree &item = results.item(i);
  3354. Owned<IEspClusterEndpoint> endpoint = createClusterEndpoint();
  3355. endpoint->setURL(item.queryProp("@ep"));
  3356. endpoint->setStatus(item.queryProp("Status"));
  3357. endpoint->setNumQueryFileToCopy(item.getPropInt("FilesToProcess/@value", 0));
  3358. endpoints.append(*endpoint.getClear());
  3359. }
  3360. resp.setEndpoints(endpoints);
  3361. resp.setCacheHint(cacheHint);
  3362. resp.setTotal(numberOfEndpoints);
  3363. }
  3364. catch(IException* e)
  3365. {
  3366. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3367. }
  3368. return true;
  3369. }
  3370. bool CWsWorkunitsEx::onWUQueryGetSummaryStats(IEspContext& context, IEspWUQueryGetSummaryStatsRequest& req, IEspWUQueryGetSummaryStatsResponse& resp)
  3371. {
  3372. try
  3373. {
  3374. const char *target = req.getTarget();
  3375. if (isEmptyString(target))
  3376. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Target name required");
  3377. #ifndef _CONTAINERIZED
  3378. Owned<IConstWUClusterInfo> info = getTargetClusterInfo(target);
  3379. if (!info || (info->getPlatform()!=RoxieCluster)) //Only support roxie for now
  3380. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Roxie name not found");
  3381. #else
  3382. ISmartSocketFactory *conn = roxieConnMap.getValue(target);
  3383. if (!conn)
  3384. throw makeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Roxie name not found");
  3385. if (!isActiveK8sService(target))
  3386. throw makeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Roxie cluster has no active servers");
  3387. #endif
  3388. double version = context.getClientVersion();
  3389. const char *queryId = req.getQueryId();
  3390. if (!isEmptyString(queryId))
  3391. PROGLOG("WUQueryGetSummaryStats: target %s, query %s", target, queryId);
  3392. else
  3393. PROGLOG("WUQueryGetSummaryStats: target %s", target);
  3394. #ifndef _CONTAINERIZED
  3395. const SocketEndpointArray &eps = info->getRoxieServers();
  3396. if (eps.empty())
  3397. {
  3398. IERRLOG("WUQueryGetSummaryStats: Failed to getRoxieServers for %s", target);
  3399. return true;
  3400. }
  3401. #endif
  3402. bool includeRawStats = req.getIncludeRawStats();
  3403. const char *fromTime = req.getFromTime();
  3404. const char *toTime = req.getToTime();
  3405. VStringBuffer control("<control:queryAggregates");
  3406. if (!isEmpty(fromTime))
  3407. control.appendf(" from='%s'", fromTime);
  3408. if (!isEmpty(toTime))
  3409. control.appendf(" to='%s'", toTime);
  3410. if (includeRawStats)
  3411. {
  3412. if (!isEmptyString(queryId))
  3413. control.appendf(" rawStats='1'><Query id='%s'/></control:queryAggregates>", queryId);
  3414. else
  3415. control.append(" all='1' rawStats='1' />");
  3416. }
  3417. else if (!isEmptyString(queryId))
  3418. {
  3419. control.appendf("><Query id='%s'/></control:queryAggregates>", queryId);
  3420. }
  3421. else
  3422. control.append(" />");
  3423. #ifndef _CONTAINERIZED
  3424. Owned<IPropertyTree> queryAggregates = sendRoxieControlAllNodes(eps.item(0), control.str(), false, ROXIELOCKCONNECTIONTIMEOUT);
  3425. #else
  3426. Owned<IPropertyTree> queryAggregates = sendRoxieControlAllNodes(conn, control, false, ROXIELOCKCONNECTIONTIMEOUT, ROXIECONNECTIONTIMEOUT);
  3427. #endif
  3428. if (!queryAggregates)
  3429. {
  3430. PROGLOG("WUQueryGetSummaryStats: %s returns empty for %s", control.str(), target);
  3431. return true;
  3432. }
  3433. if (getEspLogLevel() >= LogMax)
  3434. {
  3435. StringBuffer sb;
  3436. toXML(queryAggregates, sb);
  3437. DBGLOG("getQueryStats(): '%s' => '%s'", control.str(), sb.str());
  3438. }
  3439. IArrayOf<IEspQuerySummaryStats> querySummaryStatsList;
  3440. IArrayOf<IEspEndpointQueryStats> queryStatsList;
  3441. Owned<IPropertyTreeIterator> queryStatsOnEndpointItr = queryAggregates->getElements("Endpoint");
  3442. ForEach(*queryStatsOnEndpointItr)
  3443. {
  3444. IPropertyTree &queryStatsOnEndpoint = queryStatsOnEndpointItr->query();
  3445. const char *status = queryStatsOnEndpoint.queryProp("Status");
  3446. const char *ep = queryStatsOnEndpoint.queryProp("@ep");
  3447. if (isEmptyString(ep))
  3448. continue;
  3449. if (version >= 1.75)
  3450. {
  3451. if (includeRawStats)
  3452. readQueryStatsList(&queryStatsOnEndpoint, status, ep, includeRawStats, queryStatsList);
  3453. else
  3454. readQueryAggregateStats(queryStatsOnEndpoint.queryPropTree("Query"), status, ep, querySummaryStatsList);
  3455. }
  3456. else if (!isEmptyString(queryId))
  3457. readQueryAggregateStats(queryStatsOnEndpoint.queryPropTree("Query"), status, ep, querySummaryStatsList);
  3458. }
  3459. resp.setStatsList(querySummaryStatsList);
  3460. resp.setQueryStatsList(queryStatsList);
  3461. }
  3462. catch(IException* e)
  3463. {
  3464. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  3465. }
  3466. return true;
  3467. }
  3468. void CWsWorkunitsEx::readQueryAggregateStats(IPropertyTree *queryStats, const char *status, const char *ep,
  3469. IArrayOf<IEspQuerySummaryStats> &querySummaryStatsList)
  3470. {
  3471. if (!queryStats)
  3472. return;
  3473. Owned<IEspQuerySummaryStats> querySummaryStats = createQuerySummaryStats();
  3474. if (!isEmptyString(ep))
  3475. querySummaryStats->setEndpoint(ep);
  3476. if (queryStats->hasProp("countFailed"))
  3477. querySummaryStats->setCountFailed(queryStats->getPropInt("countFailed"));
  3478. if (queryStats->hasProp("countTotal"))
  3479. querySummaryStats->setCountTotal(queryStats->getPropInt("countTotal"));
  3480. if (queryStats->hasProp("averageBytesOut"))
  3481. querySummaryStats->setAverageBytesOut(queryStats->getPropInt64("averageBytesOut"));
  3482. if (queryStats->hasProp("averageMemUsed"))
  3483. querySummaryStats->setSizeAvgPeakMemory(queryStats->getPropInt64("averageMemUsed"));
  3484. if (queryStats->hasProp("averageSlavesReplyLen"))
  3485. querySummaryStats->setAverageSlavesReplyLen(queryStats->getPropInt("averageSlavesReplyLen"));
  3486. if (queryStats->hasProp("averageTimeMs"))
  3487. querySummaryStats->setTimeAvgTotalExecuteMinutes(queryStats->getPropInt64("averageTimeMs"));
  3488. if (queryStats->hasProp("minTimeMs"))
  3489. querySummaryStats->setTimeMinTotalExecuteMinutes(queryStats->getPropInt64("minTimeMs"));
  3490. if (queryStats->hasProp("maxTimeMs"))
  3491. querySummaryStats->setTimeMaxTotalExecuteMinutes(queryStats->getPropInt64("maxTimeMs"));
  3492. if (queryStats->hasProp("percentile97"))
  3493. {
  3494. querySummaryStats->setPercentile97(queryStats->getPropInt("percentile97"));
  3495. if (queryStats->hasProp("percentile97/@estimate"))
  3496. querySummaryStats->setPercentile97Estimate(queryStats->getPropBool("percentile97/@estimate"));
  3497. }
  3498. const char *startTime = queryStats->queryProp("startTime");
  3499. const char *endTime = queryStats->queryProp("endTime");
  3500. if (!isEmptyString(startTime))
  3501. querySummaryStats->setStartTime(startTime);
  3502. if (!isEmptyString(endTime))
  3503. querySummaryStats->setEndTime(endTime);
  3504. if (!isEmptyString(status))
  3505. querySummaryStats->setStatus(status);
  3506. querySummaryStatsList.append(*querySummaryStats.getLink());
  3507. }
  3508. void CWsWorkunitsEx::readQueryStatsList(IPropertyTree *queryStatsTree, const char *status, const char *ep,
  3509. bool includeRawStats, IArrayOf<IEspEndpointQueryStats> &endpointQueryStatsList)
  3510. {
  3511. if (!queryStatsTree)
  3512. return;
  3513. IArrayOf<IEspQueryStats> queryStatsList;
  3514. Owned<IPropertyTreeIterator> queryItr = queryStatsTree->getElements("QueryStats/Query");
  3515. ForEach(*queryItr)
  3516. {
  3517. IPropertyTree &query = queryItr->query();
  3518. readQueryStats(&query, query.queryProp("@id"), includeRawStats, queryStatsList);
  3519. }
  3520. IPropertyTree *globalStats = queryStatsTree->queryPropTree("QueryStats/Global");
  3521. if (globalStats)
  3522. readQueryStats(globalStats, "Global", includeRawStats, queryStatsList);
  3523. if (queryStatsList.ordinality() == 0)
  3524. return;
  3525. Owned<IEspEndpointQueryStats> endpointQueryStats = createEndpointQueryStats();
  3526. endpointQueryStats->setEndpoint(ep);
  3527. if (!isEmptyString(status))
  3528. endpointQueryStats->setStatus(status);
  3529. endpointQueryStats->setQueryStatsList(queryStatsList);
  3530. endpointQueryStatsList.append(*endpointQueryStats.getLink());
  3531. }
  3532. void CWsWorkunitsEx::readQueryStats(IPropertyTree *queryStatsTree, const char *id, bool includeRawStats,
  3533. IArrayOf<IEspQueryStats> &queryStatsList)
  3534. {
  3535. if (!queryStatsTree)
  3536. return;
  3537. Owned<IEspQueryStats> queryStats = createQueryStats();
  3538. if (!isEmptyString(id))
  3539. queryStats->setID(id);
  3540. if (!includeRawStats)
  3541. {
  3542. IArrayOf<IEspQuerySummaryStats> aggregateQueryStatsList;
  3543. readQueryAggregateStats(queryStatsTree, nullptr, nullptr, aggregateQueryStatsList);
  3544. queryStats->setAggregateQueryStatsList(aggregateQueryStatsList);
  3545. queryStatsList.append(*queryStats.getLink());
  3546. return;
  3547. }
  3548. IArrayOf<IEspQuerySummaryStats> aggregateQueryStatsList;
  3549. Owned<IPropertyTreeIterator> aggregateRecordItr = queryStatsTree->getElements("QueryStatsAggregateRecord");
  3550. ForEach(*aggregateRecordItr)
  3551. {
  3552. IPropertyTree &query = aggregateRecordItr->query();
  3553. readQueryAggregateStats(&query, nullptr, nullptr, aggregateQueryStatsList);
  3554. }
  3555. queryStats->setAggregateQueryStatsList(aggregateQueryStatsList);
  3556. IArrayOf<IEspQueryStatsRecord> recordList;
  3557. Owned<IPropertyTreeIterator> recordItr = queryStatsTree->getElements("QueryStatsRecord");
  3558. ForEach(*recordItr)
  3559. {
  3560. IPropertyTree &query = recordItr->query();
  3561. readQueryStatsRecord(&query, recordList);
  3562. }
  3563. queryStats->setQueryStatsRecordList(recordList);
  3564. queryStatsList.append(*queryStats.getLink());
  3565. }
  3566. void CWsWorkunitsEx::readQueryStatsRecord(IPropertyTree *queryRecord, IArrayOf<IEspQueryStatsRecord> &recordList)
  3567. {
  3568. if (!queryRecord)
  3569. return;
  3570. Owned<IEspQueryStatsRecord> record = createQueryStatsRecord();
  3571. const char *startTime = queryRecord->queryProp("@startTime");
  3572. if (!isEmptyString(startTime))
  3573. record->setStartTime(startTime);
  3574. if (queryRecord->hasProp("elapsedTimeMs"))
  3575. record->setElapsedTimeMs(queryRecord->getPropInt64("elapsedTimeMs"));
  3576. if (queryRecord->hasProp("memUsed"))
  3577. record->setMemoryUsed(queryRecord->getPropInt64("memUsed"));
  3578. if (queryRecord->hasProp("bytesOut"))
  3579. record->setBytesOut(queryRecord->getPropInt64("bytesOut"));
  3580. if (queryRecord->hasProp("slavesReplyLen"))
  3581. record->setSlavesReplyLen(queryRecord->getPropInt("slavesReplyLen"));
  3582. recordList.append(*record.getLink());
  3583. }