roxiecommunicationclient.cpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #pragma warning (disable : 4786)
  14. #include "jmisc.hpp"
  15. #include "portlist.h"
  16. #include "roxiecommlib.hpp"
  17. #define GET_LOCK_FAILURE 1100
  18. //////////////////////////////////////////
  19. class CRoxieCommunicationClient: implements IRoxieCommunicationClient, public CInterface
  20. {
  21. private:
  22. void processRoxieQueryList(IPropertyTree *info)
  23. {
  24. StringAttrMapping aliasOriginalNames(false);
  25. Owned<IPropertyTreeIterator> alaises = info->getElements("Endpoint/Alias");
  26. ForEach (*alaises)
  27. {
  28. IPropertyTree &alias = alaises->query();
  29. const char *id = alias.queryProp("@id");
  30. const char *original = alias.queryProp("@original");
  31. aliasOriginalNames.setValue(original, original);
  32. }
  33. Owned<IPropertyTreeIterator> queries = info->getElements("Endpoint/Query");
  34. ForEach (*queries)
  35. {
  36. IPropertyTree &query = queries->query();
  37. const char *id = query.queryProp("@id");
  38. // look for id in the stringArray
  39. if (aliasOriginalNames.find(id) != 0)
  40. query.setProp("@hasAlias", "1");
  41. else
  42. query.setProp("@hasAlias", "0");
  43. }
  44. }
  45. protected:
  46. SocketEndpoint ep;
  47. unsigned roxieTimeout;
  48. IPropertyTree *sendRoxieControlQuery(const StringBuffer &xml, bool deployAll, const char *remoteRoxieIP = NULL)
  49. {
  50. unsigned len = xml.length();
  51. size32_t sendlen = len;
  52. _WINREV(sendlen);
  53. Owned<ISocket> sock = getRoxieSocket(remoteRoxieIP);
  54. if (deployAll)
  55. if (!getLock(sock, deployAll)) // if we want to deploy to all roxie servers, make sure we can lock all servers
  56. throw MakeStringException(GET_LOCK_FAILURE, "Request Failed! All roxie nodes unable to process this request at this time. Roxie is busy - possibly in the middle of another deployment. Try again later, if problem persists, make sure all nodes are running");
  57. sock->write(&sendlen, sizeof(sendlen));
  58. sock->write(xml.str(), len);
  59. StringBuffer response;
  60. for (;;)
  61. {
  62. sock->read(&sendlen, sizeof(sendlen));
  63. if (!sendlen)
  64. break;
  65. _WINREV(sendlen);
  66. sock->read(response.reserveTruncate(sendlen), sendlen);
  67. }
  68. Owned<IPropertyTree> ret = createPTreeFromXMLString(response.str());
  69. Owned<IMultiException> me = MakeMultiException();
  70. Owned<IPropertyTreeIterator> endpoints = ret->getElements("Endpoint");
  71. ForEach(*endpoints)
  72. {
  73. Owned<IPropertyTreeIterator> exceptions = endpoints->query().getElements("Exception");
  74. ForEach (*exceptions)
  75. {
  76. me->append(*MakeStringException(exceptions->query().getPropInt("Code"), "Endpoint %s: %s", endpoints->query().queryProp("@ep"), exceptions->query().queryProp("Message")));
  77. }
  78. }
  79. if (me->ordinality())
  80. throw me.getClear();
  81. return ret.getClear();
  82. }
  83. const char *sendRoxieOnDemandQuery(const StringBuffer &xml, SCMStringBuffer &response, bool deployAll, const char *remoteRoxieIP = NULL)
  84. {
  85. unsigned len = xml.length();
  86. size32_t sendlen = len;
  87. _WINREV(sendlen);
  88. Owned<ISocket> sock = getRoxieSocket(remoteRoxieIP);
  89. if (deployAll)
  90. if (!getLock(sock, deployAll)) // if we want to deploy to all roxie servers, make sure we can lock all servers
  91. throw MakeStringException(GET_LOCK_FAILURE, "Request Failed! All roxie nodes unable to process this request at this time. Roxie is busy - possibly in the middle of another deployment. Try again later, if problem persists, make sure all nodes are running");
  92. sock->write(&sendlen, sizeof(sendlen));
  93. sock->write(xml.str(), len);
  94. Owned<IException> exception;
  95. for (;;)
  96. {
  97. sock->read(&sendlen, sizeof(sendlen));
  98. if (!sendlen)
  99. break;
  100. _WINREV(sendlen);
  101. char *block = response.s.reserveTruncate(sendlen);
  102. sock->read(block, sendlen);
  103. if (!exception && sendlen > 11 && memicmp(block, "<Exception>", 11) == 0)
  104. {
  105. Owned<IPropertyTree> eTree = createPTreeFromXMLString(sendlen, block, ipt_caseInsensitive);
  106. exception.setown(MakeStringException(eTree->getPropInt("Code", 0), "%s", eTree->queryProp("Message")));
  107. }
  108. }
  109. if (exception)
  110. throw exception.getClear();
  111. return response.str();
  112. }
  113. IPropertyTree *sendRoxieControlQuery(IPropertyTree *pt)
  114. {
  115. StringBuffer xml;
  116. toXML(pt, xml);
  117. return sendRoxieControlQuery(xml, true);
  118. }
  119. bool sendRoxieControlLock(ISocket *sock, bool allOrNothing, unsigned wait)
  120. {
  121. Owned<IPropertyTree> resp = sendRoxieControlQuery(sock, "<control:lock/>", wait);
  122. if (allOrNothing)
  123. {
  124. int lockCount = resp->getPropInt("Lock", 0);
  125. int serverCount = resp->getPropInt("NumServers", 0);
  126. return (lockCount && (lockCount == serverCount));
  127. }
  128. return resp->getPropInt("Lock", 0) != 0;
  129. }
  130. void checkRoxieControlExceptions(IPropertyTree *msg)
  131. {
  132. Owned<IMultiException> me = MakeMultiException();
  133. Owned<IPropertyTreeIterator> endpoints = msg->getElements("Endpoint");
  134. ForEach(*endpoints)
  135. {
  136. IPropertyTree &endp = endpoints->query();
  137. Owned<IPropertyTreeIterator> exceptions = endp.getElements("Exception");
  138. ForEach (*exceptions)
  139. {
  140. IPropertyTree &ex = exceptions->query();
  141. me->append(*MakeStringException(ex.getPropInt("Code"), "Endpoint %s: %s", endp.queryProp("@ep"), ex.queryProp("Message")));
  142. }
  143. }
  144. if (me->ordinality())
  145. throw me.getClear();
  146. }
  147. unsigned waitMsToSeconds(unsigned wait)
  148. {
  149. if (wait==0 || wait==(unsigned)-1)
  150. return wait;
  151. return wait/1000;
  152. }
  153. unsigned remainingMsWait(unsigned wait, unsigned start)
  154. {
  155. if (wait==0 || wait==(unsigned)-1)
  156. return wait;
  157. unsigned waited = msTick()-start;
  158. return (wait>waited) ? wait-waited : 0;
  159. }
  160. const char* buildRoxieName(const char* orig_name, StringBuffer& roxiename)
  161. {
  162. const char *last_dot = strrchr(orig_name, '.');
  163. if (last_dot)
  164. roxiename.append((last_dot - orig_name), orig_name);
  165. else
  166. roxiename.append(orig_name);
  167. return roxiename.str();
  168. }
  169. ISocket *getRoxieSocket(const char *remoteIP)
  170. {
  171. ISocket* s = NULL;
  172. try
  173. {
  174. if (remoteIP)
  175. {
  176. SocketEndpoint roxie_ep;
  177. roxie_ep.set(remoteIP);
  178. return ISocket::connect_timeout(roxie_ep, 20000);
  179. }
  180. s = ISocket::connect_timeout(ep, 20000);
  181. }
  182. catch(IException* e)
  183. {
  184. e->Release();
  185. }
  186. catch(...)
  187. {
  188. }
  189. if (s==NULL)
  190. {
  191. StringBuffer buf;
  192. buf.append("Failed to connect to Roxie cluster at ");
  193. if (remoteIP)
  194. buf.append(remoteIP);
  195. else
  196. ep.getUrlStr(buf);
  197. throw MakeStringException(ROXIECOMM_SOCKET_ERROR, "%s", buf.str());
  198. }
  199. return s;
  200. }
  201. bool getLock(ISocket *sock, bool lockAll)
  202. {
  203. const char *lock = "<control:lock/>";
  204. unsigned locklen = strlen(lock);
  205. _WINREV(locklen);
  206. sock->write(&locklen, sizeof(locklen));
  207. sock->write(lock, strlen(lock));
  208. StringBuffer lockResponse;
  209. for (;;)
  210. {
  211. unsigned sendlen;
  212. sock->read(&sendlen, sizeof(sendlen));
  213. if (!sendlen)
  214. break;
  215. _WINREV(sendlen);
  216. sock->read(lockResponse.reserveTruncate(sendlen), sendlen);
  217. }
  218. Owned<IPropertyTree> lockRet = createPTreeFromXMLString(lockResponse.str());
  219. if (lockAll)
  220. {
  221. int lockCount = lockRet->getPropInt("Lock", 0);
  222. int serverCount = lockRet->getPropInt("NumServers", 0);
  223. return (lockCount && (lockCount == serverCount));
  224. }
  225. return lockRet->getPropInt("Lock", 0) != 0;
  226. }
  227. void buildPackageFileInfo(IPropertyTree *packageTree, const char *filename)
  228. {
  229. StringBuffer packageInfo;
  230. IPropertyTree *pkgInfo = createPTreeFromXMLFile(filename, ipt_caseInsensitive);
  231. StringBuffer baseFileName;
  232. splitFilename(filename, NULL, NULL, &baseFileName, &baseFileName);
  233. pkgInfo->setProp("@pkgFileName", baseFileName.str());
  234. packageTree->addPropTree(pkgInfo->queryName(), pkgInfo);
  235. }
  236. public:
  237. IMPLEMENT_IINTERFACE;
  238. CRoxieCommunicationClient(const SocketEndpoint& _ep, unsigned _roxieTimeout)
  239. :ep(_ep), roxieTimeout(_roxieTimeout)
  240. {
  241. }
  242. IPropertyTree * sendRoxieControlRequest(const char *xml, bool lockAll)
  243. {
  244. return sendRoxieControlQuery(xml, lockAll);
  245. }
  246. const char * sendRoxieOnDemandRequest(const char *xml, SCMStringBuffer &response, bool lockAll)
  247. {
  248. return sendRoxieOnDemandQuery(xml, response, lockAll);
  249. }
  250. void sendRoxieReloadControlRequest()
  251. {
  252. StringBuffer xpath;
  253. xpath.appendf("<control:reload/>");
  254. try
  255. {
  256. Owned<IPropertyTree> result = sendRoxieControlQuery(xpath, true);
  257. }
  258. catch(IException *e) // not a fatal error - just log the error
  259. {
  260. int errCode = e->errorCode();
  261. StringBuffer err;
  262. err.appendf("%d ", errCode);
  263. e->errorMessage(err);
  264. DBGLOG("ERROR loading query info directly to roxie %s", err.str());
  265. e->Release();
  266. }
  267. }
  268. IPropertyTree *sendRoxieControlAllNodes(const char *msg, bool allOrNothing)
  269. {
  270. Owned<ISocket> sock = ISocket::connect_timeout(ep, roxieTimeout);
  271. return sendRoxieControlAllNodes(sock, msg, allOrNothing, roxieTimeout);
  272. }
  273. IPropertyTree *sendRoxieControlAllNodes(ISocket *sock, const char *msg, bool allOrNothing, unsigned wait)
  274. {
  275. unsigned start = msTick();
  276. if (!sendRoxieControlLock(sock, allOrNothing, wait))
  277. throw MakeStringException(-1, "Roxie control:lock failed");
  278. return sendRoxieControlQuery(sock, msg, remainingMsWait(wait, start));
  279. }
  280. IPropertyTree *sendRoxieControlQuery(ISocket *sock, const char *msg, unsigned wait)
  281. {
  282. size32_t msglen = strlen(msg);
  283. size32_t len = msglen;
  284. _WINREV(len);
  285. sock->write(&len, sizeof(len));
  286. sock->write(msg, msglen);
  287. StringBuffer resp;
  288. for (;;)
  289. {
  290. sock->read(&len, sizeof(len));
  291. if (!len)
  292. break;
  293. _WINREV(len);
  294. size32_t size_read;
  295. sock->read(resp.reserveTruncate(len), len, len, size_read, waitMsToSeconds(wait));
  296. if (size_read<len)
  297. throw MakeStringException(-1, "Error reading roxie control message response");
  298. }
  299. Owned<IPropertyTree> ret = createPTreeFromXMLString(resp.str());
  300. checkRoxieControlExceptions(ret);
  301. return ret.getClear();
  302. }
  303. IPropertyTree * retrieveQuery(const char *id)
  304. {
  305. StringBuffer xpath;
  306. xpath.appendf("<control:getQuery id='%s'/>", id);
  307. return sendRoxieControlQuery(xpath.str(), false); // assume we only need info from one server - they all must be the same or roxie is in trouble
  308. }
  309. virtual IPropertyTree * retrieveAllQueryInfo(const char *id)
  310. {
  311. StringBuffer xpath;
  312. xpath.appendf("<control:getAllQueryInfo id='%s'/>", id);
  313. return sendRoxieControlQuery(xpath.str(), false);
  314. }
  315. IPropertyTree * retrieveState()
  316. {
  317. return sendRoxieControlQuery("<control:state/>", false);
  318. }
  319. IPropertyTree * retrieveQueryStats(const char *queryName, const char *action, const char *graphName, bool lockAll)
  320. {
  321. StringBuffer xpath;
  322. xpath.appendf("<control:querystats><Query id='%s'", queryName);
  323. if (action)
  324. xpath.appendf(" action='%s'", action);
  325. if (graphName)
  326. xpath.appendf(" name='%s'", graphName);
  327. xpath.appendf("/></control:querystats>");
  328. return sendRoxieControlQuery(xpath.str(), lockAll);
  329. }
  330. IPropertyTree * retrieveTopology()
  331. {
  332. return sendRoxieControlQuery("<control:topology/>", false);
  333. }
  334. virtual bool updateQueryComment(const char *id, const char *comment, bool append)
  335. {
  336. StringBuffer xpath;
  337. xpath.appendf("<control:updateQueryComment><Query mode='update' id='%s' comment='%s' append='%d'/></control:updateQueryComment>", id, comment, append);
  338. Owned<IPropertyTree> tree = sendRoxieControlQuery(xpath, true);
  339. return true;
  340. }
  341. virtual const char * queryRoxieClusterName(SCMStringBuffer &clusterName)
  342. {
  343. Owned<IPropertyTree> tree = sendRoxieControlQuery("<control:getClusterName/>", false);
  344. clusterName.set(tree->queryProp("Endpoint/clusterName/@id"));
  345. return clusterName.str();
  346. }
  347. virtual IPropertyTree * retrieveQueryWuInfo(const char *queryName)
  348. {
  349. StringBuffer xpath;
  350. xpath.appendf("<control:querywuid><Query id='%s'/></control:querywuid>", queryName);
  351. return sendRoxieControlQuery(xpath.str(), false); // assume we only need info from one server - they all must be the same or roxie is in trouble
  352. }
  353. virtual bool updateACLInfo(bool allow, const char *ip, const char *mask, const char *query, const char *errorMsg, int errorCode, const char *roxieAddress, SCMStringBuffer &status)
  354. {
  355. StringBuffer xpath;
  356. StringBuffer errMsg;
  357. if (errorMsg && *errorMsg)
  358. errMsg.appendf("error='%s'", errorMsg);
  359. xpath.appendf("<control:aclupdate><ACL><Access allow='%d' ip='%s' mask='%s' query='%s' errorCode='%d' %s/></ACL></control:aclupdate>", allow, ip, mask, query, errorCode, (errMsg.length() ? errMsg.str() : ""));
  360. Owned<IPropertyTree> tree = sendRoxieControlQuery(xpath.str(), true, roxieAddress);
  361. status.set("update ACL info");
  362. return true;
  363. }
  364. virtual IPropertyTree* queryACLInfo()
  365. {
  366. StringBuffer xpath;
  367. xpath.appendf("<control:queryaclinfo/>");
  368. return sendRoxieControlQuery(xpath.str(), true);
  369. }
  370. virtual IPropertyTree * retrieveActiveMetaInformation(bool addClusterName, bool groupByDataPackage, unsigned version)
  371. {
  372. StringBuffer xpath;
  373. xpath.appendf("<control:queryActiveMetaInfo addClusterName='%d' groupByDataPackage='%d' version='%d'", addClusterName, groupByDataPackage, version);
  374. xpath.appendf("/>");
  375. return sendRoxieControlQuery(xpath.str(), false); // assume we only need info from one server - they all must be the same or roxie is in trouble
  376. }
  377. virtual unsigned retrieveRoxieStateRevision()
  378. {
  379. StringBuffer xpath;
  380. xpath.appendf("<control:revision/>");
  381. Owned<IPropertyTree> tree = sendRoxieControlQuery(xpath.str(), false);
  382. return tree->getPropInt("Endpoint/Revision", 0);
  383. }
  384. virtual IPropertyTree * retrieveRoxieQueryMetrics(SCMStringBuffer &queryName, SCMStringBuffer &startDateTime, SCMStringBuffer &endDateTime)
  385. {
  386. StringBuffer xpath;
  387. xpath.append("<control:queryAggregates"); // leave off > for now - put it on later
  388. if (startDateTime.length())
  389. xpath.appendf(" from='%s'", startDateTime.str());
  390. if (endDateTime.length())
  391. xpath.appendf(" to='%s'", endDateTime.str());
  392. if (queryName.length())
  393. {
  394. xpath.append(">");
  395. xpath.appendf("<Query id='%s'/></control:queryAggregates>", queryName.str());
  396. }
  397. else
  398. xpath.append("/>");
  399. return sendRoxieControlQuery(xpath.str(), true);
  400. }
  401. virtual IPropertyTree * retrieveRoxieMetrics(StringArray &ipList)
  402. {
  403. StringBuffer xpath;
  404. xpath.append("<control:metrics/>");
  405. if (ipList.ordinality() == 0) // no list of ips, so assume all
  406. return sendRoxieControlQuery(xpath.str(), true);
  407. else
  408. {
  409. IPropertyTree *tree = createPTree("Control");
  410. ForEachItemIn(idx, ipList)
  411. {
  412. StringBuffer ip(ipList.item(idx));
  413. if (strchr(ip.str(), ':') == 0)
  414. ip.appendf(":%d", ROXIE_SERVER_PORT);
  415. Owned<IPropertyTree> t = sendRoxieControlQuery(xpath.str(), false, ip.str());
  416. if (t)
  417. tree->addPropTree("Endpoint", t->getBranch("Endpoint"));
  418. }
  419. return tree;
  420. }
  421. }
  422. virtual IPropertyTree * listLibrariesUsedByQuery(const char *id, bool useAliasNames)
  423. {
  424. StringBuffer xpath;
  425. xpath.appendf("<control:getLibrariesUsedByQuery id='%s' useAliasNamesOnly='%s'/>", id, (useAliasNames) ? "1" : "0");
  426. return sendRoxieControlQuery(xpath, false);
  427. }
  428. virtual IPropertyTree * listQueriesUsingLibrary(const char *id)
  429. {
  430. StringBuffer xpath;
  431. xpath.appendf("<control:getQueriesUsingLibrary id='%s'/>", id);
  432. return sendRoxieControlQuery(xpath, false);
  433. }
  434. virtual IPropertyTree *retrieveQueryActivityInfo(const char *id, int activityId)
  435. {
  436. StringBuffer xpath;
  437. xpath.appendf("<control:retrieveActivityDetails id='%s' activityId='%d'/>", id, activityId);
  438. return sendRoxieControlQuery(xpath, false);
  439. }
  440. virtual IPropertyTree *getRoxieBuildVersion()
  441. {
  442. StringBuffer xpath;
  443. xpath.appendf("<control:getBuildVersion/>");
  444. return sendRoxieControlQuery(xpath.str(), false);
  445. }
  446. };
  447. IRoxieCommunicationClient* createRoxieCommunicationClient(const SocketEndpoint &ep, unsigned roxieTimeout)
  448. {
  449. return new CRoxieCommunicationClient(ep, roxieTimeout);
  450. }