jsmartsock.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jsmartsock.ipp"
  14. #include "jdebug.hpp"
  15. ISmartSocketException *createSmartSocketException(int errorCode, const char *msg)
  16. {
  17. return new SmartSocketException(errorCode, msg);
  18. }
  19. class SmartSocketListParser
  20. {
  21. public:
  22. SmartSocketListParser(const char * text)
  23. {
  24. fullText = strdup(text);
  25. }
  26. ~SmartSocketListParser()
  27. {
  28. free(fullText);
  29. }
  30. unsigned getSockets(SmartSocketEndpointArray &array, unsigned defport=0)
  31. {
  32. // IPV6TBD
  33. char *copyFullText = strdup(fullText);
  34. unsigned port = defport;
  35. char *saveptr;
  36. char *hostportstr = strtok_r(copyFullText, "|", &saveptr);
  37. while (hostportstr != nullptr)
  38. {
  39. // strip off http[s]://
  40. // is strcasestr/stristr available ?
  41. char *ip = strstr(hostportstr, "http://");
  42. if (ip == nullptr)
  43. ip = strstr(hostportstr, "HTTP://");
  44. if (ip != nullptr)
  45. ip += 7;
  46. else
  47. {
  48. ip = strstr(hostportstr, "https://");
  49. if (ip == nullptr)
  50. ip = strstr(hostportstr, "HTTPS://");
  51. if (ip != nullptr)
  52. ip += 8;
  53. }
  54. if (ip == nullptr)
  55. ip = hostportstr;
  56. char *p = strchr(ip, ':');
  57. if (p)
  58. {
  59. *p = 0;
  60. p++;
  61. port = atoi(p);
  62. }
  63. if (isdigit(*ip))
  64. {
  65. char *dash = strrchr(ip, '-');
  66. if (dash)
  67. {
  68. *dash = 0;
  69. int last = atoi(dash+1);
  70. char *dot = strrchr(ip, '.');
  71. *dot = 0;
  72. int first = atoi(dot+1);
  73. for (int i = first; i <= last; i++)
  74. {
  75. StringBuffer t;
  76. t.append(ip).append('.').append(i);
  77. array.append(new SmartSocketEndpoint(t.str(), port));
  78. }
  79. }
  80. else
  81. {
  82. array.append(new SmartSocketEndpoint(ip, port));
  83. }
  84. }
  85. else
  86. {
  87. array.append(new SmartSocketEndpoint(ip, port));
  88. }
  89. hostportstr = strtok_r(NULL, "|", &saveptr);
  90. }
  91. free(copyFullText);
  92. return array.ordinality();
  93. }
  94. private:
  95. char *fullText;
  96. };
  97. CSmartSocket::CSmartSocket(ISocket *_sock, SocketEndpoint &_ep, ISmartSocketFactory *_factory) : sock(_sock), ep(_ep), factory(_factory)
  98. {
  99. };
  100. CSmartSocket::~CSmartSocket()
  101. {
  102. if (sock)
  103. sock->Release();
  104. };
  105. void CSmartSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout)
  106. {
  107. try
  108. {
  109. sock->read(buf, min_size, max_size, size_read, timeout);
  110. }
  111. catch (IException *)
  112. {
  113. factory->setStatus(ep, false);
  114. if (sock != NULL)
  115. {
  116. sock->Release();
  117. sock = NULL;
  118. }
  119. throw;
  120. }
  121. }
  122. void CSmartSocket::read(void* buf, size32_t size)
  123. {
  124. try
  125. {
  126. sock->read(buf, size);
  127. }
  128. catch (IException *)
  129. {
  130. factory->setStatus(ep, false);
  131. if (sock != NULL)
  132. {
  133. sock->Release();
  134. sock = NULL;
  135. }
  136. throw;
  137. }
  138. }
  139. size32_t CSmartSocket::write(void const* buf, size32_t size)
  140. {
  141. try
  142. {
  143. return sock->write(buf, size);
  144. }
  145. catch (IException *)
  146. {
  147. factory->setStatus(ep, false);
  148. if (sock != NULL)
  149. {
  150. sock->Release();
  151. sock = NULL;
  152. }
  153. throw;
  154. }
  155. }
  156. void CSmartSocket::close()
  157. {
  158. try
  159. {
  160. sock->close();
  161. sock->Release();
  162. sock = NULL;
  163. }
  164. catch (IException *)
  165. {
  166. factory->setStatus(ep, false);
  167. if (sock != NULL)
  168. {
  169. sock->Release();
  170. sock = NULL;
  171. }
  172. throw;
  173. }
  174. }
  175. CSmartSocketFactory::CSmartSocketFactory(const char *_socklist, bool _retry, unsigned _retryInterval, unsigned _dnsInterval)
  176. {
  177. PROGLOG("CSmartSocketFactory::CSmartSocketFactory(%s)",_socklist?_socklist:"NULL");
  178. SmartSocketListParser slp(_socklist);
  179. if (slp.getSockets(sockArray) == 0)
  180. throw createSmartSocketException(0, "no endpoints defined");
  181. shuffleEndpoints();
  182. nextEndpointIndex = 0;
  183. dnsInterval=_dnsInterval;
  184. retry = _retry;
  185. if (retry)
  186. {
  187. retryInterval = _retryInterval;
  188. this->start();
  189. }
  190. }
  191. CSmartSocketFactory::~CSmartSocketFactory()
  192. {
  193. stop();
  194. }
  195. void CSmartSocketFactory::stop()
  196. {
  197. retry = false;
  198. this->join();
  199. }
  200. void CSmartSocketFactory::resolveHostnames() {
  201. for(unsigned i=0; i < sockArray.ordinality(); i++) {
  202. SmartSocketEndpoint *ep=sockArray.item(i);
  203. SmartSocketEndpoint resolveEP=*ep;
  204. resolveEP.ep.set(resolveEP.name.str(), resolveEP.ep.port);
  205. {
  206. synchronized block(lock);
  207. *ep=resolveEP;
  208. }
  209. }
  210. }
  211. void CSmartSocketFactory::shuffleEndpoints()
  212. {
  213. Owned<IRandomNumberGenerator> random = createRandomNumberGenerator();
  214. random->seed((unsigned)get_cycles_now());
  215. unsigned i = sockArray.ordinality();
  216. while (i > 1)
  217. {
  218. unsigned j = random->next() % i;
  219. i--;
  220. sockArray.swap(i, j);
  221. }
  222. }
  223. SmartSocketEndpoint *CSmartSocketFactory::nextSmartEndpoint()
  224. {
  225. SmartSocketEndpoint *ss=sockArray.item(nextEndpointIndex);
  226. if (retry)
  227. {
  228. unsigned startEndpoint = nextEndpointIndex;
  229. while (!ss || !ss->status)
  230. {
  231. ++nextEndpointIndex %= sockArray.ordinality();
  232. if (startEndpoint == nextEndpointIndex)
  233. throw createSmartSocketException(0, "no endpoints are alive");
  234. ss = sockArray.item(nextEndpointIndex);
  235. }
  236. }
  237. ++nextEndpointIndex %= sockArray.ordinality();
  238. return ss;
  239. }
  240. SocketEndpoint& CSmartSocketFactory::nextEndpoint()
  241. {
  242. SmartSocketEndpoint *ss=nextSmartEndpoint();
  243. if (!ss)
  244. throw createSmartSocketException(0, "smartsocket failed to get nextEndpoint");
  245. return (ss->ep);
  246. }
  247. ISocket *CSmartSocketFactory::connect_sock(unsigned timeoutms, SmartSocketEndpoint *&ss, SocketEndpoint &ep)
  248. {
  249. ss = nextSmartEndpoint();
  250. if (!ss)
  251. throw createSmartSocketException(0, "smartsocket failed to get nextEndpoint");
  252. ISocket *sock = nullptr;
  253. try
  254. {
  255. {
  256. synchronized block(lock);
  257. ss->checkHost(dnsInterval);
  258. ep = ss->ep;
  259. }
  260. if (timeoutms)
  261. sock = ISocket::connect_timeout(ep, timeoutms);
  262. else
  263. sock = ISocket::connect(ep);
  264. }
  265. catch (IException *e)
  266. {
  267. StringBuffer s("CSmartSocketFactory::connect_sock ");
  268. ep.getUrlStr(s);
  269. EXCLOG(e,s.str());
  270. ss->status=false;
  271. if (sock)
  272. sock->Release();
  273. throw;
  274. }
  275. return sock;
  276. }
  277. ISmartSocket *CSmartSocketFactory::connect_timeout(unsigned timeoutms)
  278. {
  279. SocketEndpoint ep;
  280. SmartSocketEndpoint *ss = nullptr;
  281. Owned<ISocket> sock = connect_sock(timeoutms, ss, ep);
  282. return new CSmartSocket(sock.getClear(), ep, this);
  283. }
  284. ISmartSocket *CSmartSocketFactory::connect()
  285. {
  286. return connect_timeout(0);
  287. }
  288. ISmartSocket *CSmartSocketFactory::connectNextAvailableSocket()
  289. {
  290. while(1)
  291. {
  292. try
  293. {
  294. return connect_timeout(1000); // 1 sec
  295. }
  296. catch (ISmartSocketException *e)
  297. {
  298. throw e;
  299. }
  300. catch (IException *e)
  301. {
  302. e->Release(); //keep trying
  303. }
  304. }
  305. return NULL; // should never get here, but make the compiler happy
  306. }
  307. int CSmartSocketFactory::run()
  308. {
  309. unsigned idx;
  310. while (retry)
  311. {
  312. for(unsigned secs = 0; (secs < retryInterval) && retry; secs++)
  313. Sleep(1000);
  314. if(!retry)
  315. break;
  316. for (idx = 0; idx < sockArray.ordinality(); idx++)
  317. {
  318. SmartSocketEndpoint *ss=sockArray.item(idx);
  319. if (ss && !ss->status)
  320. {
  321. try
  322. {
  323. synchronized block(lock);
  324. ss->checkHost(dnsInterval);
  325. Owned <ISocket> testSock = ISocket::connect_timeout(ss->ep, 1000); // 1 sec
  326. testSock->close();
  327. ss->status = true;
  328. }
  329. catch (IException *e)
  330. {
  331. // still bad - keep set to false
  332. e->Release();
  333. }
  334. }
  335. }
  336. }
  337. return 0;
  338. }
  339. SmartSocketEndpoint *CSmartSocketFactory::findEndpoint(SocketEndpoint &ep)
  340. {
  341. for (unsigned idx = 0; idx < sockArray.ordinality(); idx++)
  342. {
  343. SmartSocketEndpoint *ss=sockArray.item(idx);
  344. if (ss && ss->ep.equals(ep))
  345. return ss;
  346. }
  347. return NULL;
  348. }
  349. bool CSmartSocketFactory::getStatus(SocketEndpoint &ep)
  350. {
  351. SmartSocketEndpoint *ss=findEndpoint(ep);
  352. return (ss && ss->status);
  353. }
  354. void CSmartSocketFactory::setStatus(SocketEndpoint &ep, bool status)
  355. {
  356. SmartSocketEndpoint *ss=findEndpoint(ep);
  357. if (ss)
  358. ss->status=status;
  359. }
  360. StringBuffer & CSmartSocketFactory::getUrlStr(StringBuffer &url, bool useHostName)
  361. {
  362. SmartSocketEndpoint * sep = nextSmartEndpoint();
  363. if (sep)
  364. {
  365. SocketEndpoint ep;
  366. if(useHostName && sep->name.length())
  367. {
  368. url.append(sep->name.str());
  369. ep = sep->ep;
  370. if (ep.port)
  371. url.append(':').append((unsigned)ep.port);
  372. }
  373. else
  374. {
  375. sep->checkHost(dnsInterval);
  376. SocketEndpoint ep = sep->ep;
  377. ep.getUrlStr(url);
  378. }
  379. }
  380. return url;
  381. }
  382. ISmartSocketFactory *createSmartSocketFactory(const char *_socklist, bool _retry, unsigned _retryInterval, unsigned _dnsInterval) {
  383. return new CSmartSocketFactory(_socklist, _retry, _retryInterval, _dnsInterval);
  384. }