jsmartsock.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jsmartsock.ipp"
  14. #include "jdebug.hpp"
  15. ISmartSocketException *createSmartSocketException(int errorCode, const char *msg)
  16. {
  17. return new SmartSocketException(errorCode, msg);
  18. }
  19. class SmartSocketListParser
  20. {
  21. public:
  22. SmartSocketListParser(const char * text)
  23. {
  24. fullText = strdup(text);
  25. }
  26. ~SmartSocketListParser()
  27. {
  28. free(fullText);
  29. }
  30. unsigned getSockets(SmartSocketEndpointArray &array, unsigned defport=0)
  31. {
  32. // IPV6TBD
  33. char *copyFullText = strdup(fullText);
  34. unsigned port = defport;
  35. char *saveptr;
  36. char *ip = strtok_r(copyFullText, "|", &saveptr);
  37. while (ip != NULL)
  38. {
  39. char *p = strchr(ip, ':');
  40. if (p)
  41. {
  42. *p = 0;
  43. p++;
  44. port = atoi(p);
  45. }
  46. if (isdigit(*ip))
  47. {
  48. char *dash = strrchr(ip, '-');
  49. if (dash)
  50. {
  51. *dash = 0;
  52. int last = atoi(dash+1);
  53. char *dot = strrchr(ip, '.');
  54. *dot = 0;
  55. int first = atoi(dot+1);
  56. for (int i = first; i <= last; i++)
  57. {
  58. StringBuffer t;
  59. t.append(ip).append('.').append(i);
  60. array.append(new SmartSocketEndpoint(t.str(), port));
  61. }
  62. }
  63. else
  64. {
  65. array.append(new SmartSocketEndpoint(ip, port));
  66. }
  67. }
  68. else
  69. {
  70. array.append(new SmartSocketEndpoint(ip, port));
  71. }
  72. ip = strtok_r(NULL, "|", &saveptr);
  73. }
  74. free(copyFullText);
  75. return array.ordinality();
  76. }
  77. private:
  78. char *fullText;
  79. };
  80. CSmartSocket::CSmartSocket(ISocket *_sock, SocketEndpoint &_ep, ISmartSocketFactory *_factory) : sock(_sock), ep(_ep), factory(_factory)
  81. {
  82. };
  83. CSmartSocket::~CSmartSocket()
  84. {
  85. if (sock)
  86. sock->Release();
  87. };
  88. void CSmartSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout)
  89. {
  90. try
  91. {
  92. sock->read(buf, min_size, max_size, size_read, timeout);
  93. }
  94. catch (IException *)
  95. {
  96. factory->setStatus(ep, false);
  97. if (sock != NULL)
  98. {
  99. sock->Release();
  100. sock = NULL;
  101. }
  102. throw;
  103. }
  104. }
  105. void CSmartSocket::read(void* buf, size32_t size)
  106. {
  107. try
  108. {
  109. sock->read(buf, size);
  110. }
  111. catch (IException *)
  112. {
  113. factory->setStatus(ep, false);
  114. if (sock != NULL)
  115. {
  116. sock->Release();
  117. sock = NULL;
  118. }
  119. throw;
  120. }
  121. }
  122. size32_t CSmartSocket::write(void const* buf, size32_t size)
  123. {
  124. try
  125. {
  126. return sock->write(buf, size);
  127. }
  128. catch (IException *)
  129. {
  130. factory->setStatus(ep, false);
  131. if (sock != NULL)
  132. {
  133. sock->Release();
  134. sock = NULL;
  135. }
  136. throw;
  137. }
  138. }
  139. void CSmartSocket::close()
  140. {
  141. try
  142. {
  143. sock->close();
  144. sock->Release();
  145. sock = NULL;
  146. }
  147. catch (IException *)
  148. {
  149. factory->setStatus(ep, false);
  150. if (sock != NULL)
  151. {
  152. sock->Release();
  153. sock = NULL;
  154. }
  155. throw;
  156. }
  157. }
  158. CSmartSocketFactory::CSmartSocketFactory(const char *_socklist, bool _retry, unsigned _retryInterval, unsigned _dnsInterval)
  159. {
  160. PROGLOG("CSmartSocketFactory::CSmartSocketFactory(%s)",_socklist?_socklist:"NULL");
  161. SmartSocketListParser slp(_socklist);
  162. if (slp.getSockets(sockArray) == 0)
  163. throw createSmartSocketException(0, "no endpoints defined");
  164. shuffleEndpoints();
  165. nextEndpointIndex = 0;
  166. dnsInterval=_dnsInterval;
  167. retry = _retry;
  168. if (retry)
  169. {
  170. retryInterval = _retryInterval;
  171. this->start();
  172. }
  173. }
  174. CSmartSocketFactory::~CSmartSocketFactory()
  175. {
  176. stop();
  177. }
  178. void CSmartSocketFactory::stop()
  179. {
  180. retry = false;
  181. this->join();
  182. }
  183. void CSmartSocketFactory::resolveHostnames() {
  184. for(unsigned i=0; i < sockArray.ordinality(); i++) {
  185. SmartSocketEndpoint *ep=sockArray.item(i);
  186. SmartSocketEndpoint resolveEP=*ep;
  187. resolveEP.ep.set(resolveEP.name.str(), resolveEP.ep.port);
  188. {
  189. synchronized block(lock);
  190. *ep=resolveEP;
  191. }
  192. }
  193. }
  194. void CSmartSocketFactory::shuffleEndpoints()
  195. {
  196. Owned<IRandomNumberGenerator> random = createRandomNumberGenerator();
  197. random->seed((unsigned)get_cycles_now());
  198. unsigned i = sockArray.ordinality();
  199. while (i > 1)
  200. {
  201. unsigned j = random->next() % i;
  202. i--;
  203. sockArray.swap(i, j);
  204. }
  205. }
  206. SmartSocketEndpoint *CSmartSocketFactory::nextSmartEndpoint()
  207. {
  208. SmartSocketEndpoint *ss=sockArray.item(nextEndpointIndex);
  209. if (retry)
  210. {
  211. unsigned startEndpoint = nextEndpointIndex;
  212. while (!ss || !ss->status)
  213. {
  214. ++nextEndpointIndex %= sockArray.ordinality();
  215. if (startEndpoint == nextEndpointIndex)
  216. throw createSmartSocketException(0, "no endpoints are alive");
  217. ss = sockArray.item(nextEndpointIndex);
  218. }
  219. }
  220. ++nextEndpointIndex %= sockArray.ordinality();
  221. return ss;
  222. }
  223. SocketEndpoint& CSmartSocketFactory::nextEndpoint()
  224. {
  225. SmartSocketEndpoint *ss=nextSmartEndpoint();
  226. if (!ss)
  227. throw createSmartSocketException(0, "smartsocket failed to get nextEndpoint");
  228. return (ss->ep);
  229. }
  230. ISocket *CSmartSocketFactory::connect_sock(unsigned timeoutms, SmartSocketEndpoint *&ss, SocketEndpoint &ep)
  231. {
  232. ss = nextSmartEndpoint();
  233. if (!ss)
  234. throw createSmartSocketException(0, "smartsocket failed to get nextEndpoint");
  235. ISocket *sock = nullptr;
  236. try
  237. {
  238. {
  239. synchronized block(lock);
  240. ss->checkHost(dnsInterval);
  241. ep = ss->ep;
  242. }
  243. if (timeoutms)
  244. sock = ISocket::connect_timeout(ep, timeoutms);
  245. else
  246. sock = ISocket::connect(ep);
  247. }
  248. catch (IException *e)
  249. {
  250. StringBuffer s("CSmartSocketFactory::connect_sock ");
  251. ep.getUrlStr(s);
  252. EXCLOG(e,s.str());
  253. ss->status=false;
  254. if (sock)
  255. sock->Release();
  256. throw;
  257. }
  258. return sock;
  259. }
  260. ISmartSocket *CSmartSocketFactory::connect_timeout(unsigned timeoutms)
  261. {
  262. SocketEndpoint ep;
  263. SmartSocketEndpoint *ss = nullptr;
  264. Owned<ISocket> sock = connect_sock(timeoutms, ss, ep);
  265. return new CSmartSocket(sock.getClear(), ep, this);
  266. }
  267. ISmartSocket *CSmartSocketFactory::connect()
  268. {
  269. return connect_timeout(0);
  270. }
  271. ISmartSocket *CSmartSocketFactory::connectNextAvailableSocket()
  272. {
  273. while(1)
  274. {
  275. try
  276. {
  277. return connect_timeout(1000); // 1 sec
  278. }
  279. catch (ISmartSocketException *e)
  280. {
  281. throw e;
  282. }
  283. catch (IException *e)
  284. {
  285. e->Release(); //keep trying
  286. }
  287. }
  288. return NULL; // should never get here, but make the compiler happy
  289. }
  290. int CSmartSocketFactory::run()
  291. {
  292. unsigned idx;
  293. while (retry)
  294. {
  295. for(unsigned secs = 0; (secs < retryInterval) && retry; secs++)
  296. Sleep(1000);
  297. if(!retry)
  298. break;
  299. for (idx = 0; idx < sockArray.ordinality(); idx++)
  300. {
  301. SmartSocketEndpoint *ss=sockArray.item(idx);
  302. if (ss && !ss->status)
  303. {
  304. try
  305. {
  306. synchronized block(lock);
  307. ss->checkHost(dnsInterval);
  308. Owned <ISocket> testSock = ISocket::connect_timeout(ss->ep, 1000); // 1 sec
  309. testSock->close();
  310. ss->status = true;
  311. }
  312. catch (IException *e)
  313. {
  314. // still bad - keep set to false
  315. e->Release();
  316. }
  317. }
  318. }
  319. }
  320. return 0;
  321. }
  322. SmartSocketEndpoint *CSmartSocketFactory::findEndpoint(SocketEndpoint &ep)
  323. {
  324. for (unsigned idx = 0; idx < sockArray.ordinality(); idx++)
  325. {
  326. SmartSocketEndpoint *ss=sockArray.item(idx);
  327. if (ss && ss->ep.equals(ep))
  328. return ss;
  329. }
  330. return NULL;
  331. }
  332. bool CSmartSocketFactory::getStatus(SocketEndpoint &ep)
  333. {
  334. SmartSocketEndpoint *ss=findEndpoint(ep);
  335. return (ss && ss->status);
  336. }
  337. void CSmartSocketFactory::setStatus(SocketEndpoint &ep, bool status)
  338. {
  339. SmartSocketEndpoint *ss=findEndpoint(ep);
  340. if (ss)
  341. ss->status=status;
  342. }
  343. StringBuffer & CSmartSocketFactory::getUrlStr(StringBuffer &url, bool useHostName)
  344. {
  345. SmartSocketEndpoint * sep = nextSmartEndpoint();
  346. if (sep)
  347. {
  348. SocketEndpoint ep;
  349. if(useHostName && sep->name.length())
  350. {
  351. url.append(sep->name.str());
  352. ep = sep->ep;
  353. if (ep.port)
  354. url.append(':').append((unsigned)ep.port);
  355. }
  356. else
  357. {
  358. sep->checkHost(dnsInterval);
  359. SocketEndpoint ep = sep->ep;
  360. ep.getUrlStr(url);
  361. }
  362. }
  363. return url;
  364. }
  365. ISmartSocketFactory *createSmartSocketFactory(const char *_socklist, bool _retry, unsigned _retryInterval, unsigned _dnsInterval) {
  366. return new CSmartSocketFactory(_socklist, _retry, _retryInterval, _dnsInterval);
  367. }