jsmartsock.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jsmartsock.ipp"
  14. #include "jdebug.hpp"
  15. ISmartSocketException *createSmartSocketException(int errorCode, const char *msg)
  16. {
  17. return new SmartSocketException(errorCode, msg);
  18. }
  19. class SmartSocketListParser
  20. {
  21. public:
  22. SmartSocketListParser(const char * text)
  23. {
  24. fullText = strdup(text);
  25. }
  26. ~SmartSocketListParser()
  27. {
  28. free(fullText);
  29. }
  30. unsigned getSockets(SmartSocketEndpointArray &array, unsigned defport=0)
  31. {
  32. // IPV6TBD
  33. char *copyFullText = strdup(fullText);
  34. unsigned port = defport;
  35. char *saveptr;
  36. char *ip = strtok_r(copyFullText, "|", &saveptr);
  37. while (ip != NULL)
  38. {
  39. char *p = strchr(ip, ':');
  40. if (p)
  41. {
  42. *p = 0;
  43. p++;
  44. port = atoi(p);
  45. }
  46. if (isdigit(*ip))
  47. {
  48. char *dash = strrchr(ip, '-');
  49. if (dash)
  50. {
  51. *dash = 0;
  52. int last = atoi(dash+1);
  53. char *dot = strrchr(ip, '.');
  54. *dot = 0;
  55. int first = atoi(dot+1);
  56. for (int i = first; i <= last; i++)
  57. {
  58. StringBuffer t;
  59. t.append(ip).append('.').append(i);
  60. array.append(new SmartSocketEndpoint(t.str(), port));
  61. }
  62. }
  63. else
  64. {
  65. array.append(new SmartSocketEndpoint(ip, port));
  66. }
  67. }
  68. else
  69. {
  70. array.append(new SmartSocketEndpoint(ip, port));
  71. }
  72. ip = strtok_r(NULL, "|", &saveptr);
  73. }
  74. free(copyFullText);
  75. return array.ordinality();
  76. }
  77. private:
  78. char *fullText;
  79. };
  80. class jlib_decl CSmartSocket: implements ISmartSocket, public CInterface
  81. {
  82. ISocket *sock;
  83. SocketEndpoint ep;
  84. CSmartSocketFactory *factory;
  85. public:
  86. IMPLEMENT_IINTERFACE;
  87. CSmartSocket(ISocket *_sock, SocketEndpoint &_ep, CSmartSocketFactory *_factory);
  88. ~CSmartSocket();
  89. // ISmartSocket
  90. ISocket *querySocket() { return (sock); }
  91. // subset of ISocket
  92. void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  93. unsigned timeout = WAIT_FOREVER);
  94. void read(void* buf, size32_t size);
  95. size32_t write(void const* buf, size32_t size);
  96. void close();
  97. };
  98. CSmartSocket::CSmartSocket(ISocket *_sock, SocketEndpoint &_ep, CSmartSocketFactory *_factory) : sock(_sock), ep(_ep), factory(_factory)
  99. {
  100. };
  101. CSmartSocket::~CSmartSocket()
  102. {
  103. if (sock)
  104. sock->Release();
  105. };
  106. void CSmartSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout)
  107. {
  108. try
  109. {
  110. sock->read(buf, min_size, max_size, size_read, timeout);
  111. }
  112. catch (IException *)
  113. {
  114. factory->setStatus(ep, false);
  115. if (sock != NULL)
  116. {
  117. sock->Release();
  118. sock = NULL;
  119. }
  120. throw;
  121. }
  122. }
  123. void CSmartSocket::read(void* buf, size32_t size)
  124. {
  125. try
  126. {
  127. sock->read(buf, size);
  128. }
  129. catch (IException *)
  130. {
  131. factory->setStatus(ep, false);
  132. if (sock != NULL)
  133. {
  134. sock->Release();
  135. sock = NULL;
  136. }
  137. throw;
  138. }
  139. }
  140. size32_t CSmartSocket::write(void const* buf, size32_t size)
  141. {
  142. try
  143. {
  144. return sock->write(buf, size);
  145. }
  146. catch (IException *)
  147. {
  148. factory->setStatus(ep, false);
  149. if (sock != NULL)
  150. {
  151. sock->Release();
  152. sock = NULL;
  153. }
  154. throw;
  155. }
  156. }
  157. void CSmartSocket::close()
  158. {
  159. try
  160. {
  161. sock->close();
  162. sock->Release();
  163. sock = NULL;
  164. }
  165. catch (IException *)
  166. {
  167. factory->setStatus(ep, false);
  168. if (sock != NULL)
  169. {
  170. sock->Release();
  171. sock = NULL;
  172. }
  173. throw;
  174. }
  175. }
  176. CSmartSocketFactory::CSmartSocketFactory(const char *_socklist, bool _retry, unsigned _retryInterval, unsigned _dnsInterval)
  177. {
  178. PROGLOG("CSmartSocketFactory::CSmartSocketFactory(%s)",_socklist?_socklist:"NULL");
  179. SmartSocketListParser slp(_socklist);
  180. if (slp.getSockets(sockArray) == 0)
  181. throw createSmartSocketException(0, "no endpoints defined");
  182. shuffleEndpoints();
  183. nextEndpointIndex = 0;
  184. dnsInterval=_dnsInterval;
  185. retry = _retry;
  186. if (retry)
  187. {
  188. retryInterval = _retryInterval;
  189. this->start();
  190. }
  191. }
  192. CSmartSocketFactory::~CSmartSocketFactory()
  193. {
  194. stop();
  195. }
  196. void CSmartSocketFactory::stop()
  197. {
  198. retry = false;
  199. this->join();
  200. }
  201. void CSmartSocketFactory::resolveHostnames() {
  202. for(unsigned i=0; i < sockArray.ordinality(); i++) {
  203. SmartSocketEndpoint *ep=sockArray.item(i);
  204. SmartSocketEndpoint resolveEP=*ep;
  205. resolveEP.ep.set(resolveEP.name.str(), resolveEP.ep.port);
  206. {
  207. synchronized block(lock);
  208. *ep=resolveEP;
  209. }
  210. }
  211. }
  212. void CSmartSocketFactory::shuffleEndpoints()
  213. {
  214. Owned<IRandomNumberGenerator> random = createRandomNumberGenerator();
  215. random->seed((unsigned)get_cycles_now());
  216. unsigned i = sockArray.ordinality();
  217. while (i > 1)
  218. {
  219. unsigned j = random->next() % i;
  220. i--;
  221. sockArray.swap(i, j);
  222. }
  223. }
  224. SmartSocketEndpoint *CSmartSocketFactory::nextSmartEndpoint()
  225. {
  226. SmartSocketEndpoint *ss=sockArray.item(nextEndpointIndex);
  227. if (retry)
  228. {
  229. unsigned startEndpoint = nextEndpointIndex;
  230. while (!ss || !ss->status)
  231. {
  232. ++nextEndpointIndex %= sockArray.ordinality();
  233. if (startEndpoint == nextEndpointIndex)
  234. throw createSmartSocketException(0, "no endpoints are alive");
  235. ss = sockArray.item(nextEndpointIndex);
  236. }
  237. }
  238. ++nextEndpointIndex %= sockArray.ordinality();
  239. return ss;
  240. }
  241. SocketEndpoint& CSmartSocketFactory::nextEndpoint()
  242. {
  243. SmartSocketEndpoint *ss=nextSmartEndpoint();
  244. if (!ss)
  245. throw createSmartSocketException(0, "smartsocket failed to get nextEndpoint");
  246. return (ss->ep);
  247. }
  248. ISmartSocket *CSmartSocketFactory::connect_timeout( unsigned timeoutms)
  249. {
  250. SmartSocketEndpoint *ss = nextSmartEndpoint();
  251. if (!ss)
  252. throw createSmartSocketException(0, "smartsocket failed to get nextEndpoint");
  253. ISocket *sock = NULL;
  254. SocketEndpoint ep;
  255. try
  256. {
  257. {
  258. synchronized block(lock);
  259. ss->checkHost(dnsInterval);
  260. ep = ss->ep;
  261. }
  262. if (timeoutms)
  263. sock = ISocket::connect_timeout(ep, timeoutms);
  264. else
  265. sock = ISocket::connect(ep);
  266. return new CSmartSocket(sock, ep, this);
  267. }
  268. catch (IException *e)
  269. {
  270. StringBuffer s("CSmartSocketFactory::connect ");
  271. ep.getUrlStr(s);
  272. EXCLOG(e,s.str());
  273. ss->status=false;
  274. if (sock)
  275. sock->Release();
  276. throw;
  277. }
  278. }
  279. ISmartSocket *CSmartSocketFactory::connect()
  280. {
  281. return connect_timeout(0);
  282. }
  283. ISmartSocket *CSmartSocketFactory::connectNextAvailableSocket()
  284. {
  285. while(1)
  286. {
  287. try
  288. {
  289. return connect_timeout(1000); // 1 sec
  290. }
  291. catch (ISmartSocketException *e)
  292. {
  293. throw e;
  294. }
  295. catch (IException *e)
  296. {
  297. e->Release(); //keep trying
  298. }
  299. }
  300. return NULL; // should never get here, but make the compiler happy
  301. }
  302. int CSmartSocketFactory::run()
  303. {
  304. unsigned idx;
  305. while (retry)
  306. {
  307. for(unsigned secs = 0; (secs < retryInterval) && retry; secs++)
  308. Sleep(1000);
  309. if(!retry)
  310. break;
  311. for (idx = 0; idx < sockArray.ordinality(); idx++)
  312. {
  313. SmartSocketEndpoint *ss=sockArray.item(idx);
  314. if (ss && !ss->status)
  315. {
  316. try
  317. {
  318. synchronized block(lock);
  319. ss->checkHost(dnsInterval);
  320. Owned <ISocket> testSock = ISocket::connect_timeout(ss->ep, 1000); // 1 sec
  321. testSock->close();
  322. ss->status = true;
  323. }
  324. catch (IException *e)
  325. {
  326. // still bad - keep set to false
  327. e->Release();
  328. }
  329. }
  330. }
  331. }
  332. return 0;
  333. }
  334. SmartSocketEndpoint *CSmartSocketFactory::findEndpoint(SocketEndpoint &ep)
  335. {
  336. for (unsigned idx = 0; idx < sockArray.ordinality(); idx++)
  337. {
  338. SmartSocketEndpoint *ss=sockArray.item(idx);
  339. if (ss && ss->ep.equals(ep))
  340. return ss;
  341. }
  342. return NULL;
  343. }
  344. bool CSmartSocketFactory::getStatus(SocketEndpoint &ep)
  345. {
  346. SmartSocketEndpoint *ss=findEndpoint(ep);
  347. return (ss && ss->status);
  348. }
  349. void CSmartSocketFactory::setStatus(SocketEndpoint &ep, bool status)
  350. {
  351. SmartSocketEndpoint *ss=findEndpoint(ep);
  352. if (ss)
  353. ss->status=status;
  354. }
  355. StringBuffer & CSmartSocketFactory::getUrlStr(StringBuffer &url, bool useHostName)
  356. {
  357. SmartSocketEndpoint * sep = nextSmartEndpoint();
  358. if (sep)
  359. {
  360. SocketEndpoint ep;
  361. if(useHostName && sep->name.length())
  362. {
  363. url.append(sep->name.str());
  364. ep = sep->ep;
  365. if (ep.port)
  366. url.append(':').append((unsigned)ep.port);
  367. }
  368. else
  369. {
  370. sep->checkHost(dnsInterval);
  371. SocketEndpoint ep = sep->ep;
  372. ep.getUrlStr(url);
  373. }
  374. }
  375. return url;
  376. }
  377. ISmartSocketFactory *createSmartSocketFactory(const char *_socklist, bool _retry, unsigned _retryInterval, unsigned _dnsInterval) {
  378. return new CSmartSocketFactory(_socklist, _retry, _retryInterval, _dnsInterval);
  379. }