roxiecommunicationclient.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #pragma warning (disable : 4786)
  14. #include "jmisc.hpp"
  15. #include "portlist.h"
  16. #include "roxiecommlib.hpp"
  17. #define GET_LOCK_FAILURE 1100
  18. //////////////////////////////////////////
  19. class CRoxieCommunicationClient: public CInterface, implements IRoxieCommunicationClient
  20. {
  21. private:
  22. void processRoxieQueryList(IPropertyTree *info)
  23. {
  24. StringAttrMapping aliasOriginalNames(false);
  25. Owned<IPropertyTreeIterator> alaises = info->getElements("Endpoint/Alias");
  26. ForEach (*alaises)
  27. {
  28. IPropertyTree &alias = alaises->query();
  29. const char *id = alias.queryProp("@id");
  30. const char *original = alias.queryProp("@original");
  31. aliasOriginalNames.setValue(original, original);
  32. }
  33. Owned<IPropertyTreeIterator> queries = info->getElements("Endpoint/Query");
  34. ForEach (*queries)
  35. {
  36. IPropertyTree &query = queries->query();
  37. const char *id = query.queryProp("@id");
  38. // look for id in the stringArray
  39. if (aliasOriginalNames.find(id) != 0)
  40. query.setProp("@hasAlias", "1");
  41. else
  42. query.setProp("@hasAlias", "0");
  43. }
  44. }
  45. protected:
  46. SocketEndpoint ep;
  47. unsigned roxieTimeout;
  48. IPropertyTree *sendRoxieControlQuery(const StringBuffer &xml, bool deployAll, const char *remoteRoxieIP = NULL)
  49. {
  50. unsigned len = xml.length();
  51. size32_t sendlen = len;
  52. _WINREV(sendlen);
  53. Owned<ISocket> sock = getRoxieSocket(remoteRoxieIP);
  54. if (deployAll)
  55. if (!getLock(sock, deployAll)) // if we want to deploy to all roxie servers, make sure we can lock all servers
  56. throw MakeStringException(GET_LOCK_FAILURE, "Request Failed! All roxie nodes unable to process this request at this time. Roxie is busy - possibly in the middle of another deployment. Try again later, if problem persists, make sure all nodes are running");
  57. sock->write(&sendlen, sizeof(sendlen));
  58. sock->write(xml.str(), len);
  59. StringBuffer response;
  60. loop
  61. {
  62. sock->read(&sendlen, sizeof(sendlen));
  63. if (!sendlen)
  64. break;
  65. _WINREV(sendlen);
  66. sock->read(response.reserveTruncate(sendlen), sendlen);
  67. }
  68. Owned<IPropertyTree> ret = createPTreeFromXMLString(response.str());
  69. Owned<IMultiException> me = MakeMultiException();
  70. Owned<IPropertyTreeIterator> endpoints = ret->getElements("Endpoint");
  71. ForEach(*endpoints)
  72. {
  73. Owned<IPropertyTreeIterator> exceptions = endpoints->query().getElements("Exception");
  74. ForEach (*exceptions)
  75. {
  76. me->append(*MakeStringException(exceptions->query().getPropInt("Code"), "Endpoint %s: %s", endpoints->query().queryProp("@ep"), exceptions->query().queryProp("Message")));
  77. }
  78. }
  79. if (me->ordinality())
  80. throw me.getClear();
  81. return ret.getClear();
  82. }
  83. const char *sendRoxieOnDemandQuery(const StringBuffer &xml, SCMStringBuffer &response, bool deployAll, const char *remoteRoxieIP = NULL)
  84. {
  85. unsigned len = xml.length();
  86. size32_t sendlen = len;
  87. _WINREV(sendlen);
  88. Owned<ISocket> sock = getRoxieSocket(remoteRoxieIP);
  89. if (deployAll)
  90. if (!getLock(sock, deployAll)) // if we want to deploy to all roxie servers, make sure we can lock all servers
  91. throw MakeStringException(GET_LOCK_FAILURE, "Request Failed! All roxie nodes unable to process this request at this time. Roxie is busy - possibly in the middle of another deployment. Try again later, if problem persists, make sure all nodes are running");
  92. sock->write(&sendlen, sizeof(sendlen));
  93. sock->write(xml.str(), len);
  94. Owned<IException> exception;
  95. loop
  96. {
  97. sock->read(&sendlen, sizeof(sendlen));
  98. if (!sendlen)
  99. break;
  100. _WINREV(sendlen);
  101. char *block = response.s.reserveTruncate(sendlen);
  102. sock->read(block, sendlen);
  103. if (!exception && sendlen > 11 && memicmp(block, "<Exception>", 11) == 0)
  104. {
  105. Owned<IPropertyTree> eTree = createPTreeFromXMLString(sendlen, block, ipt_caseInsensitive);
  106. exception.setown(MakeStringException(eTree->getPropInt("Code", 0), "%s", eTree->queryProp("Message")));
  107. }
  108. }
  109. if (exception)
  110. throw exception.getClear();
  111. return response.str();
  112. }
  113. IPropertyTree *sendRoxieControlQuery(IPropertyTree *pt)
  114. {
  115. StringBuffer xml;
  116. toXML(pt, xml);
  117. return sendRoxieControlQuery(xml, true);
  118. }
  119. const char* buildRoxieName(const char* orig_name, StringBuffer& roxiename)
  120. {
  121. const char *last_dot = strrchr(orig_name, '.');
  122. if (last_dot)
  123. roxiename.append((last_dot - orig_name), orig_name);
  124. else
  125. roxiename.append(orig_name);
  126. return roxiename.str();
  127. }
  128. ISocket *getRoxieSocket(const char *remoteIP)
  129. {
  130. ISocket* s = NULL;
  131. try
  132. {
  133. if (remoteIP)
  134. {
  135. SocketEndpoint roxie_ep;
  136. roxie_ep.set(remoteIP);
  137. return ISocket::connect_timeout(roxie_ep, 20000);
  138. }
  139. s = ISocket::connect_timeout(ep, 20000);
  140. }
  141. catch(IException* e)
  142. {
  143. e->Release();
  144. }
  145. catch(...)
  146. {
  147. }
  148. if (s==NULL)
  149. {
  150. StringBuffer buf;
  151. buf.append("Failed to connect to Roxie cluster at ");
  152. if (remoteIP)
  153. buf.append(remoteIP);
  154. else
  155. ep.getUrlStr(buf);
  156. throw MakeStringException(ROXIECOMM_SOCKET_ERROR, "%s", buf.str());
  157. }
  158. return s;
  159. }
  160. bool getLock(ISocket *sock, bool lockAll)
  161. {
  162. const char *lock = "<control:lock/>";
  163. unsigned locklen = strlen(lock);
  164. _WINREV(locklen);
  165. sock->write(&locklen, sizeof(locklen));
  166. sock->write(lock, strlen(lock));
  167. StringBuffer lockResponse;
  168. loop
  169. {
  170. unsigned sendlen;
  171. sock->read(&sendlen, sizeof(sendlen));
  172. if (!sendlen)
  173. break;
  174. _WINREV(sendlen);
  175. sock->read(lockResponse.reserveTruncate(sendlen), sendlen);
  176. }
  177. Owned<IPropertyTree> lockRet = createPTreeFromXMLString(lockResponse.str());
  178. if (lockAll)
  179. {
  180. int lockCount = lockRet->getPropInt("Lock", 0);
  181. int serverCount = lockRet->getPropInt("NumServers", 0);
  182. return (lockCount && (lockCount == serverCount));
  183. }
  184. return lockRet->getPropInt("Lock", 0) != 0;
  185. }
  186. void buildPackageFileInfo(IPropertyTree *packageTree, const char *filename)
  187. {
  188. StringBuffer packageInfo;
  189. IPropertyTree *pkgInfo = createPTreeFromXMLFile(filename, ipt_caseInsensitive);
  190. StringBuffer baseFileName;
  191. splitFilename(filename, NULL, NULL, &baseFileName, &baseFileName);
  192. pkgInfo->setProp("@pkgFileName", baseFileName.str());
  193. packageTree->addPropTree(pkgInfo->queryName(), pkgInfo);
  194. }
  195. public:
  196. IMPLEMENT_IINTERFACE;
  197. CRoxieCommunicationClient(SocketEndpoint& _ep, unsigned _roxieTimeout)
  198. :ep(_ep), roxieTimeout(_roxieTimeout)
  199. {
  200. }
  201. IPropertyTree * sendRoxieControlRequest(const char *xml, bool lockAll)
  202. {
  203. return sendRoxieControlQuery(xml, lockAll);
  204. }
  205. const char * sendRoxieOnDemandRequest(const char *xml, SCMStringBuffer &response, bool lockAll)
  206. {
  207. return sendRoxieOnDemandQuery(xml, response, lockAll);
  208. }
  209. void sendRoxieReloadControlRequest()
  210. {
  211. StringBuffer xpath;
  212. xpath.appendf("<control:reload/>");
  213. try
  214. {
  215. Owned<IPropertyTree> result = sendRoxieControlQuery(xpath, true);
  216. }
  217. catch(IException *e) // not a fatal error - just log the error
  218. {
  219. int errCode = e->errorCode();
  220. StringBuffer err;
  221. err.appendf("%d ", errCode);
  222. e->errorMessage(err);
  223. DBGLOG("ERROR loading query info directly to roxie %s", err.str());
  224. e->Release();
  225. }
  226. }
  227. IPropertyTree * retrieveQuery(const char *id)
  228. {
  229. StringBuffer xpath;
  230. xpath.appendf("<control:getQuery id='%s'/>", id);
  231. return sendRoxieControlQuery(xpath.str(), false); // assume we only need info from one server - they all must be the same or roxie is in trouble
  232. }
  233. virtual IPropertyTree * retrieveAllQueryInfo(const char *id)
  234. {
  235. StringBuffer xpath;
  236. xpath.appendf("<control:getAllQueryInfo id='%s'/>", id);
  237. return sendRoxieControlQuery(xpath.str(), false);
  238. }
  239. IPropertyTree * retrieveState()
  240. {
  241. return sendRoxieControlQuery("<control:state/>", false);
  242. }
  243. IPropertyTree * retrieveQueryStats(const char *queryName, const char *action, const char *graphName, bool lockAll)
  244. {
  245. StringBuffer xpath;
  246. xpath.appendf("<control:querystats><Query id='%s'", queryName);
  247. if (action)
  248. xpath.appendf(" action='%s'", action);
  249. if (graphName)
  250. xpath.appendf(" name='%s'", graphName);
  251. xpath.appendf("/></control:querystats>");
  252. return sendRoxieControlQuery(xpath.str(), lockAll);
  253. }
  254. IPropertyTree * retrieveTopology()
  255. {
  256. return sendRoxieControlQuery("<control:topology/>", false);
  257. }
  258. virtual bool updateQueryComment(const char *id, const char *comment, bool append)
  259. {
  260. StringBuffer xpath;
  261. xpath.appendf("<control:updateQueryComment><Query mode='update' id='%s' comment='%s' append='%d'/></control:updateQueryComment>", id, comment, append);
  262. Owned<IPropertyTree> tree = sendRoxieControlQuery(xpath, true);
  263. return true;
  264. }
  265. virtual const char * queryRoxieClusterName(SCMStringBuffer &clusterName)
  266. {
  267. Owned<IPropertyTree> tree = sendRoxieControlQuery("<control:getClusterName/>", false);
  268. clusterName.set(tree->queryProp("Endpoint/clusterName/@id"));
  269. return clusterName.str();
  270. }
  271. virtual IPropertyTree * retrieveQueryWuInfo(const char *queryName)
  272. {
  273. StringBuffer xpath;
  274. xpath.appendf("<control:querywuid><Query id='%s'/></control:querywuid>", queryName);
  275. return sendRoxieControlQuery(xpath.str(), false); // assume we only need info from one server - they all must be the same or roxie is in trouble
  276. }
  277. virtual bool updateACLInfo(bool allow, const char *ip, const char *mask, const char *query, const char *errorMsg, int errorCode, const char *roxieAddress, SCMStringBuffer &status)
  278. {
  279. StringBuffer xpath;
  280. StringBuffer errMsg;
  281. if (errorMsg && *errorMsg)
  282. errMsg.appendf("error='%s'", errorMsg);
  283. xpath.appendf("<control:aclupdate><ACL><Access allow='%d' ip='%s' mask='%s' query='%s' errorCode='%d' %s/></ACL></control:aclupdate>", allow, ip, mask, query, errorCode, (errMsg.length() ? errMsg.str() : ""));
  284. Owned<IPropertyTree> tree = sendRoxieControlQuery(xpath.str(), true, roxieAddress);
  285. status.set("update ACL info");
  286. return true;
  287. }
  288. virtual IPropertyTree* queryACLInfo()
  289. {
  290. StringBuffer xpath;
  291. xpath.appendf("<control:queryaclinfo/>");
  292. return sendRoxieControlQuery(xpath.str(), true);
  293. }
  294. virtual IPropertyTree * retrieveActiveMetaInformation(bool addClusterName, bool groupByDataPackage, unsigned version)
  295. {
  296. StringBuffer xpath;
  297. xpath.appendf("<control:queryActiveMetaInfo addClusterName='%d' groupByDataPackage='%d' version='%d'", addClusterName, groupByDataPackage, version);
  298. xpath.appendf("/>");
  299. return sendRoxieControlQuery(xpath.str(), false); // assume we only need info from one server - they all must be the same or roxie is in trouble
  300. }
  301. virtual unsigned retrieveRoxieStateRevision()
  302. {
  303. StringBuffer xpath;
  304. xpath.appendf("<control:revision/>");
  305. Owned<IPropertyTree> tree = sendRoxieControlQuery(xpath.str(), false);
  306. return tree->getPropInt("Endpoint/Revision", 0);
  307. }
  308. virtual IPropertyTree * retrieveRoxieQueryMetrics(SCMStringBuffer &queryName, SCMStringBuffer &startDateTime, SCMStringBuffer &endDateTime)
  309. {
  310. StringBuffer xpath;
  311. xpath.append("<control:queryAggregates"); // leave off > for now - put it on later
  312. if (startDateTime.length())
  313. xpath.appendf(" from='%s'", startDateTime.str());
  314. if (endDateTime.length())
  315. xpath.appendf(" to='%s'", endDateTime.str());
  316. if (queryName.length())
  317. {
  318. xpath.append(">");
  319. xpath.appendf("<Query id='%s'/></control:queryAggregates>", queryName.str());
  320. }
  321. else
  322. xpath.append("/>");
  323. return sendRoxieControlQuery(xpath.str(), true);
  324. }
  325. virtual IPropertyTree * retrieveRoxieMetrics(StringArray &ipList)
  326. {
  327. StringBuffer xpath;
  328. xpath.append("<control:metrics/>");
  329. if (ipList.ordinality() == 0) // no list of ips, so assume all
  330. return sendRoxieControlQuery(xpath.str(), true);
  331. else
  332. {
  333. IPropertyTree *tree = createPTree("Control");
  334. ForEachItemIn(idx, ipList)
  335. {
  336. StringBuffer ip(ipList.item(idx));
  337. if (strchr(ip.str(), ':') == 0)
  338. ip.appendf(":%d", ROXIE_SERVER_PORT);
  339. Owned<IPropertyTree> t = sendRoxieControlQuery(xpath.str(), false, ip.str());
  340. if (t)
  341. tree->addPropTree("Endpoint", t->getBranch("Endpoint"));
  342. }
  343. return tree;
  344. }
  345. }
  346. virtual IPropertyTree * listLibrariesUsedByQuery(const char *id, bool useAliasNames)
  347. {
  348. StringBuffer xpath;
  349. xpath.appendf("<control:getLibrariesUsedByQuery id='%s' useAliasNamesOnly='%s'/>", id, (useAliasNames) ? "1" : "0");
  350. return sendRoxieControlQuery(xpath, false);
  351. }
  352. virtual IPropertyTree * listQueriesUsingLibrary(const char *id)
  353. {
  354. StringBuffer xpath;
  355. xpath.appendf("<control:getQueriesUsingLibrary id='%s'/>", id);
  356. return sendRoxieControlQuery(xpath, false);
  357. }
  358. virtual IPropertyTree *retrieveQueryActivityInfo(const char *id, int activityId)
  359. {
  360. StringBuffer xpath;
  361. xpath.appendf("<control:retrieveActivityDetails id='%s' activityId='%d'/>", id, activityId);
  362. return sendRoxieControlQuery(xpath, false);
  363. }
  364. virtual IPropertyTree *getRoxieBuildVersion()
  365. {
  366. StringBuffer xpath;
  367. xpath.appendf("<control:getBuildVersion/>");
  368. return sendRoxieControlQuery(xpath.str(), false);
  369. }
  370. };
  371. IRoxieCommunicationClient* createRoxieCommunicationClient(SocketEndpoint &ep, unsigned roxieTimeout)
  372. {
  373. return new CRoxieCommunicationClient(ep, roxieTimeout);
  374. }