ws_packageprocessService.cpp 19 KB


  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. ############################################################################## */
  4. #pragma warning (disable : 4786)
  5. #include "ws_packageprocessService.hpp"
  6. #include "daclient.hpp"
  7. #include "dalienv.hpp"
  8. #include "dadfs.hpp"
  9. #include "dfuutil.hpp"
  10. #include "ws_fs.hpp"
  11. #include "ws_workunits.hpp"
  12. #define SDS_LOCK_TIMEOUT (5*60*1000) // 5mins, 30s a bit short
  13. void CWsPackageProcessEx::init(IPropertyTree *cfg, const char *process, const char *service)
  14. {
  15. }
  16. bool CWsPackageProcessEx::onEcho(IEspContext &context, IEspEchoRequest &req, IEspEchoResponse &resp)
  17. {
  18. StringBuffer respMsg;
  19. ISecUser* user = context.queryUser();
  20. if(user != NULL)
  21. {
  22. const char* name = user->getName();
  23. if (name && *name)
  24. respMsg.appendf("%s: ", name);
  25. }
  26. const char* reqMsg = req.getRequest();
  27. if (reqMsg && *reqMsg)
  28. respMsg.append(reqMsg);
  29. else
  30. respMsg.append("??");
  31. resp.setResponse(respMsg.str());
  32. return true;
  33. }
  34. IPropertyTree *getPkgSetRegistry(const char *setName, bool readonly)
  35. {
  36. Owned<IRemoteConnection> globalLock = querySDS().connect("/PackageSets/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
  37. //Only lock the branch for the target we're interested in.
  38. StringBuffer xpath;
  39. xpath.append("/PackageSets/PackageSet[@id=\"").append(setName).append("\"]");
  40. Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), readonly ? RTM_LOCK_READ : RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT);
  41. if (!conn)
  42. {
  43. if (readonly)
  44. return NULL;
  45. Owned<IPropertyTree> querySet = createPTree();
  46. querySet->setProp("@id", setName);
  47. globalLock->queryRoot()->addPropTree("PackageSet", querySet.getClear());
  48. globalLock->commit();
  49. conn.setown(querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT));
  50. if (!conn)
  51. throwUnexpected();
  52. }
  53. return conn->getRoot();
  54. }
  55. ////////////////////////////////////////////////////////////////////////////////////////
  56. const unsigned roxieQueryRoxieTimeOut = 60000;
  57. #define SDS_LOCK_TIMEOUT (5*60*1000) // 5mins, 30s a bit short
  58. bool isRoxieProcess(const char *process)
  59. {
  60. if (!process)
  61. return false;
  62. Owned<IRemoteConnection> conn = querySDS().connect("Environment", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
  63. if (!conn)
  64. return false;
  65. VStringBuffer xpath("Software/RoxieCluster[@name=\"%s\"]", process);
  66. return conn->queryRoot()->hasProp(xpath.str());
  67. }
  68. bool isFileKnownOnCluster(const char *logicalname, const char *lookupDaliIp, const char *process, IUserDescriptor* userdesc)
  69. {
  70. Owned<IDistributedFile> dst = queryDistributedFileDirectory().lookup(logicalname, userdesc, true);
  71. if (dst)
  72. {
  73. if (dst->findCluster(process) != NotFound)
  74. return true; // file already known for this cluster
  75. }
  76. return false;
  77. }
  78. bool addFileInfoToDali(const char *logicalname, const char *lookupDaliIp, const char *process, bool overwrite, IUserDescriptor* userdesc, StringBuffer &host, short port, StringBuffer &msg)
  79. {
  80. bool retval = true;
  81. try
  82. {
  83. if (!overwrite)
  84. {
  85. if (isFileKnownOnCluster(logicalname, lookupDaliIp, process, userdesc))
  86. return true;
  87. }
  88. StringBuffer user;
  89. userdesc->getUserName(user);
  90. StringBuffer password;
  91. userdesc->getPassword(password);
  92. Owned<IClientFileSpray> fs;
  93. fs.setown(createFileSprayClient());
  94. fs->setUsernameToken(user.str(), password.str(), NULL);
  95. VStringBuffer url("http://%s:%d/FileSpray", host.str(), port);
  96. fs->addServiceUrl(url.str());
  97. bool isRoxie = isRoxieProcess(process);
  98. Owned<IClientCopy> req = fs->createCopyRequest();
  99. req->setSourceLogicalName(logicalname);
  100. req->setDestLogicalName(logicalname);
  101. req->setDestGroup(process);
  102. req->setSuperCopy(false);
  103. if (isRoxie)
  104. req->setDestGroupRoxie("Yes");
  105. req->setSourceDali(lookupDaliIp);
  106. req->setSrcusername(user);
  107. req->setSrcpassword(password);
  108. req->setOverwrite(overwrite);
  109. Owned<IClientCopyResponse> resp = fs->Copy(req);
  110. }
  111. catch(IException *e)
  112. {
  113. e->errorMessage(msg);
  114. DBGLOG("ERROR = %s", msg.str());
  115. e->Release(); // report the error later if needed
  116. retval = false;
  117. }
  118. catch(...)
  119. {
  120. retval = false;
  121. }
  122. return retval;
  123. }
  124. //////////////////////////////////////////////////////////
  125. void addPackageMapInfo(IPropertyTree *pkgSetRegistry, const char *setName, const char *packageSetName, IPropertyTree *packageInfo, bool active, bool overWrite)
  126. {
  127. Owned<IRemoteConnection> globalLock = querySDS().connect("/PackageMaps/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
  128. StringBuffer lcName(packageSetName);
  129. lcName.toLowerCase();
  130. StringBuffer xpath;
  131. xpath.append("PackageMap[@id='").append(lcName).append("']");
  132. IPropertyTree *pkgRegTree = pkgSetRegistry->queryPropTree(xpath.str());
  133. IPropertyTree *root = globalLock->queryRoot();
  134. IPropertyTree *mapTree = root->queryPropTree(xpath);
  135. if (!overWrite && (pkgRegTree || mapTree))
  136. {
  137. throw MakeStringException(0, "Package name %s already exists, either delete it or specify overwrite", lcName.str());
  138. }
  139. if (mapTree)
  140. root->removeTree(mapTree);
  141. if (pkgRegTree)
  142. pkgSetRegistry->removeTree(pkgRegTree);
  143. mapTree = root->addPropTree("PackageMap", createPTree());
  144. mapTree->addProp("@id", packageSetName);
  145. IPropertyTree *baseInfo = createPTree();
  146. Owned<IPropertyTreeIterator> iter = packageInfo->getElements("Package");
  147. ForEach(*iter)
  148. {
  149. IPropertyTree &item = iter->query();
  150. Owned<IPropertyTreeIterator> super_iter = item.getElements("SuperFile");
  151. if (super_iter->first())
  152. {
  153. ForEach(*super_iter)
  154. {
  155. IPropertyTree &supertree = super_iter->query();
  156. StringAttr id(supertree.queryProp("@id"));
  157. if (id.length() && id[0] == '~')
  158. supertree.setProp("@id", id+1);
  159. Owned<IPropertyTreeIterator> sub_iter = supertree.getElements("SubFile");
  160. ForEach(*sub_iter)
  161. {
  162. IPropertyTree &subtree = sub_iter->query();
  163. StringAttr subid = subtree.queryProp("@value");
  164. if (subid.length())
  165. {
  166. if (subid[0] == '~')
  167. subtree.setProp("@value", subid+1);
  168. }
  169. }
  170. mapTree->addPropTree("Package", LINK(&item));
  171. }
  172. }
  173. else
  174. {
  175. baseInfo->addPropTree("Package", LINK(&item));
  176. }
  177. }
  178. mergePTree(mapTree, baseInfo);
  179. globalLock->commit();
  180. IPropertyTree *pkgSetTree = pkgSetRegistry->addPropTree("PackageMap", createPTree("PackageMap"));
  181. pkgSetTree->setProp("@id", lcName);
  182. pkgSetTree->setProp("@querySet", setName);
  183. pkgSetTree->setPropBool("@active", active);
  184. }
  185. void copyPackageSubFiles(IPropertyTree *packageInfo, const char *process, const char *defaultLookupDaliIp, bool overwrite, IUserDescriptor* userdesc, StringBuffer &host, short port)
  186. {
  187. Owned<IPropertyTreeIterator> iter = packageInfo->getElements("Package");
  188. ForEach(*iter)
  189. {
  190. IPropertyTree &item = iter->query();
  191. StringBuffer lookupDaliIp;
  192. lookupDaliIp.append(item.queryProp("@daliip"));
  193. if (lookupDaliIp.length() == 0)
  194. lookupDaliIp.append(defaultLookupDaliIp);
  195. if (lookupDaliIp.length() == 0)
  196. {
  197. StringAttr superfile(item.queryProp("@id"));
  198. DBGLOG("Could not lookup SubFiles in package %s because no remote dali ip was specified", superfile.get());
  199. return;
  200. }
  201. Owned<IPropertyTreeIterator> super_iter = item.getElements("SuperFile");
  202. ForEach(*super_iter)
  203. {
  204. IPropertyTree &supertree = super_iter->query();
  205. Owned<IPropertyTreeIterator> sub_iter = supertree.getElements("SubFile");
  206. ForEach(*sub_iter)
  207. {
  208. IPropertyTree &subtree = sub_iter->query();
  209. StringAttr subid = subtree.queryProp("@value");
  210. if (subid.length())
  211. {
  212. StringBuffer msg;
  213. addFileInfoToDali(subid.get(), lookupDaliIp, process, overwrite, userdesc, host, port, msg);
  214. }
  215. }
  216. }
  217. }
  218. }
  219. void getPackageListInfo(IPropertyTree *mapTree, IEspPackageListMapData *pkgList)
  220. {
  221. pkgList->setId(mapTree->queryProp("@id"));
  222. Owned<IPropertyTreeIterator> iter = mapTree->getElements("Package");
  223. IArrayOf<IConstPackageListData> results;
  224. ForEach(*iter)
  225. {
  226. IPropertyTree &item = iter->query();
  227. Owned<IEspPackageListData> res = createPackageListData("", "");
  228. res->setId(item.queryProp("@id"));
  229. if (item.hasProp("@queries"))
  230. res->setQueries(item.queryProp("@queries"));
  231. results.append(*res.getClear());
  232. }
  233. pkgList->setPkgListData(results);
  234. }
  235. void getAllPackageListInfo(IPropertyTree *mapTree, StringBuffer &info)
  236. {
  237. info.append("<PackageMap id='").append(mapTree->queryProp("@id")).append("'");
  238. Owned<IPropertyTreeIterator> iter = mapTree->getElements("Package");
  239. ForEach(*iter)
  240. {
  241. IPropertyTree &item = iter->query();
  242. info.append("<Package id='").append(item.queryProp("@id")).append("'");
  243. if (item.hasProp("@queries"))
  244. info.append(" queries='").append(item.queryProp("@queries")).append("'");
  245. info.append("></Package>");
  246. }
  247. info.append("</PackageMap>");
  248. }
  249. void listPkgInfo(const char *cluster, IArrayOf<IConstPackageListMapData>* results)
  250. {
  251. StringBuffer info;
  252. Owned<IRemoteConnection> globalLock = querySDS().connect("/PackageMaps/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
  253. if (!globalLock)
  254. return;
  255. IPropertyTree *root = globalLock->queryRoot();
  256. if (!cluster || !*cluster)
  257. {
  258. info.append("<PackageMaps>");
  259. Owned<IPropertyTreeIterator> iter = root->getElements("PackageMap");
  260. ForEach(*iter)
  261. {
  262. Owned<IEspPackageListMapData> res = createPackageListMapData("", "");
  263. IPropertyTree &item = iter->query();
  264. getPackageListInfo(&item, res);
  265. results->append(*res.getClear());
  266. }
  267. info.append("</PackageMaps>");
  268. }
  269. else
  270. {
  271. Owned<IPropertyTree> pkgSetRegistry = getPkgSetRegistry(cluster, true);
  272. Owned<IPropertyTreeIterator> iter = pkgSetRegistry->getElements("PackageMap");
  273. info.append("<PackageMaps>");
  274. ForEach(*iter)
  275. {
  276. IPropertyTree &item = iter->query();
  277. const char *id = item.queryProp("@id");
  278. if (id)
  279. {
  280. StringBuffer xpath;
  281. xpath.append("PackageMap[@id='").append(id).append("']");
  282. IPropertyTree *mapTree = root->queryPropTree(xpath);
  283. Owned<IEspPackageListMapData> res = createPackageListMapData("", "");
  284. getPackageListInfo(mapTree, res);
  285. results->append(*res.getClear());
  286. }
  287. }
  288. info.append("</PackageMaps>");
  289. }
  290. }
  291. void getPkgInfo(const char *cluster, const char *package, StringBuffer &info)
  292. {
  293. Owned<IRemoteConnection> globalLock = querySDS().connect("/PackageMaps/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
  294. if (!globalLock)
  295. return;
  296. IPropertyTree *root = globalLock->queryRoot();
  297. Owned<IPropertyTree> tree = createPTree("PackageMaps");
  298. if (cluster)
  299. {
  300. Owned<IPropertyTree> pkgSetRegistry = getPkgSetRegistry(cluster, true);
  301. Owned<IPropertyTreeIterator> iter = pkgSetRegistry->getElements("PackageMap[@active='1']");
  302. ForEach(*iter)
  303. {
  304. IPropertyTree &item = iter->query();
  305. const char *id = item.queryProp("@id");
  306. if (id)
  307. {
  308. StringBuffer xpath;
  309. xpath.append("PackageMap[@id='").append(id).append("']");
  310. IPropertyTree *mapTree = root->queryPropTree(xpath);
  311. if (mapTree)
  312. mergePTree(tree, mapTree);
  313. }
  314. }
  315. }
  316. else
  317. {
  318. StringBuffer xpath;
  319. xpath.append("PackageMap[@id='").append(package).append("']");
  320. Owned<IPropertyTreeIterator> iter = root->getElements(xpath.str());
  321. ForEach(*iter)
  322. {
  323. IPropertyTree &item = iter->query();
  324. mergePTree(tree, &item);
  325. }
  326. }
  327. toXML(tree, info);
  328. }
  329. bool deletePkgInfo(const char *packageSetName, const char *queryset)
  330. {
  331. Owned<IPropertyTree> pkgSetRegistry = getPkgSetRegistry(queryset, false);
  332. Owned<IRemoteConnection> globalLock = querySDS().connect("/PackageMaps/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
  333. StringBuffer lcName(packageSetName);
  334. lcName.toLowerCase();
  335. StringBuffer xpath;
  336. xpath.append("PackageMap[@id='").append(lcName).append("']");
  337. bool ret = true;
  338. IPropertyTree *root = globalLock->queryRoot();
  339. IPropertyTree *mapTree = root->queryPropTree(xpath);
  340. if (mapTree)
  341. ret = root->removeTree(mapTree);
  342. if (ret)
  343. {
  344. IPropertyTree *pkgTree = pkgSetRegistry->queryPropTree(xpath.str());
  345. if (pkgTree)
  346. ret = pkgSetRegistry->removeTree(pkgTree);
  347. }
  348. return ret;
  349. }
  350. void activatePackageMapInfo(const char *packageSetName, const char *packageMap, bool activate)
  351. {
  352. if (!packageSetName || !*packageSetName)
  353. return;
  354. Owned<IRemoteConnection> globalLock = querySDS().connect("PackageSets", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
  355. if (!globalLock)
  356. return;
  357. StringBuffer lcName(packageSetName);
  358. lcName.toLowerCase();
  359. VStringBuffer xpath("PackageSet[@id=\"%s\"]", lcName.str());
  360. IPropertyTree *root = globalLock->queryRoot();
  361. if (!root)
  362. return;
  363. IPropertyTree *pkgSetTree = root->queryPropTree(xpath);
  364. if (pkgSetTree)
  365. {
  366. if (packageMap && *packageMap)
  367. {
  368. StringBuffer lcMapName(packageMap);
  369. lcMapName.toLowerCase();
  370. VStringBuffer xpath_map("PackageMap[@id=\"%s\"]", lcMapName.str());
  371. IPropertyTree *mapTree = pkgSetTree->queryPropTree(xpath_map);
  372. mapTree->setPropBool("@active", activate);
  373. }
  374. else
  375. {
  376. Owned<IPropertyTreeIterator> iter = pkgSetTree->getElements("PackageMap");
  377. ForEach(*iter)
  378. {
  379. IPropertyTree &item = iter->query();
  380. item.setPropBool("@active", activate);
  381. }
  382. }
  383. }
  384. }
  385. bool CWsPackageProcessEx::onAddPackage(IEspContext &context, IEspAddPackageRequest &req, IEspAddPackageResponse &resp)
  386. {
  387. resp.updateStatus().setCode(0);
  388. StringBuffer info(req.getInfo());
  389. bool activate = req.getActivate();
  390. bool overWrite = req.getOverWrite();
  391. StringAttr querySet(req.getQuerySet());
  392. StringAttr pkgName(req.getPackageName());
  393. Owned<IPropertyTree> packageTree = createPTreeFromXMLString(info.str());
  394. Owned<IPropertyTree> pkgSetRegistry = getPkgSetRegistry(querySet.get(), false);
  395. addPackageMapInfo(pkgSetRegistry, querySet.get(), pkgName.get(), LINK(packageTree), activate, overWrite);
  396. StringBuffer msg;
  397. msg.append("Successfully loaded ").append(pkgName.get());
  398. resp.updateStatus().setDescription(msg.str());
  399. return true;
  400. }
  401. bool CWsPackageProcessEx::onDeletePackage(IEspContext &context, IEspDeletePackageRequest &req, IEspDeletePackageResponse &resp)
  402. {
  403. resp.updateStatus().setCode(0);
  404. StringAttr pkgName(req.getPackageName());
  405. bool ret = deletePkgInfo(pkgName.get(), req.getQuerySet());
  406. StringBuffer msg;
  407. (ret) ? msg.append("Successfully ") : msg.append("Unsuccessfully ");
  408. msg.append("deleted").append(pkgName.get());
  409. resp.updateStatus().setDescription(msg.str());
  410. return true;
  411. }
  412. bool CWsPackageProcessEx::onActivatePackage(IEspContext &context, IEspActivatePackageRequest &req, IEspActivatePackageResponse &resp)
  413. {
  414. resp.updateStatus().setCode(0);
  415. StringBuffer pkgName(req.getPackageName());
  416. StringBuffer pkgMapName(req.getPackageMapName());
  417. activatePackageMapInfo(pkgName.str(), pkgMapName.str(), true);
  418. return true;
  419. }
  420. bool CWsPackageProcessEx::onDeActivatePackage(IEspContext &context, IEspDeActivatePackageRequest &req, IEspDeActivatePackageResponse &resp)
  421. {
  422. resp.updateStatus().setCode(0);
  423. StringBuffer pkgName(req.getPackageName());
  424. StringBuffer pkgMapName(req.getPackageMapName());
  425. activatePackageMapInfo(pkgName.str(), pkgMapName.str(), false);
  426. return true;
  427. }
  428. bool CWsPackageProcessEx::onListPackage(IEspContext &context, IEspListPackageRequest &req, IEspListPackageResponse &resp)
  429. {
  430. resp.updateStatus().setCode(0);
  431. IArrayOf<IConstPackageListMapData> results;
  432. listPkgInfo(req.getCluster(), &results);
  433. resp.setPkgListMapData(results);
  434. return true;
  435. }
  436. bool CWsPackageProcessEx::onGetPackage(IEspContext &context, IEspGetPackageRequest &req, IEspGetPackageResponse &resp)
  437. {
  438. resp.updateStatus().setCode(0);
  439. StringAttr cluster(req.getCluster());
  440. StringAttr pkgName(req.getPackageName());
  441. StringBuffer info;
  442. getPkgInfo(cluster.length() ? cluster.get() : NULL, pkgName.length() ? pkgName.get() : NULL, info);
  443. resp.setInfo(info);
  444. return true;
  445. }
  446. bool CWsPackageProcessEx::onCopyFiles(IEspContext &context, IEspCopyFilesRequest &req, IEspCopyFilesResponse &resp)
  447. {
  448. resp.updateStatus().setCode(0);
  449. StringBuffer info(req.getInfo());
  450. StringAttr process(req.getProcess());
  451. StringAttr pkgName(req.getPackageName());
  452. StringAttr lookupDaliIp(req.getDaliIp());
  453. if (process.length() == 0)
  454. throw MakeStringException(0, "CWsPackageProcessEx::onCopyFiles process parameter not set.");
  455. Owned<IUserDescriptor> userdesc;
  456. const char *user = context.queryUserId();
  457. const char *password = context.queryPassword();
  458. if (user && *user && *password && *password)
  459. {
  460. userdesc.setown(createUserDescriptor());
  461. userdesc->set(user, password);
  462. }
  463. StringBuffer host;
  464. short port;
  465. context.getServAddress(host, port);
  466. Owned<IPropertyTree> packageTree = createPTreeFromXMLString(info.str());
  467. copyPackageSubFiles(LINK(packageTree), process, lookupDaliIp, req.getOverWrite(), userdesc, host, port);
  468. StringBuffer msg;
  469. msg.append("Successfully loaded ").append(pkgName.get());
  470. resp.updateStatus().setDescription(msg.str());
  471. return true;
  472. }