persistent.cpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "persistent.hpp"
  14. #include "jthread.hpp"
  15. #include "jdebug.hpp"
  16. #include "jlog.hpp"
  17. #include <memory>
  18. #include <unordered_set>
  19. #ifdef _WIN32
  20. #include <windows.h>
  21. #include <winsock2.h>
  22. #include <ws2tcpip.h>
  23. #endif
  24. #define PERSILOG(loglevel, ...) if(static_cast<int>(loglevel) <= static_cast<int>(m_loglevel)) DBGLOG(__VA_ARGS__)
  25. #define MAX_NO_CLEANUP_SOCKETSETS 1000
  26. static inline StringBuffer& addKeySuffix(PersistentProtocol proto, StringBuffer& keystr)
  27. {
  28. switch (proto)
  29. {
  30. case PersistentProtocol::ProtoTCP:
  31. break;
  32. case PersistentProtocol::ProtoTLS:
  33. keystr.append('~');
  34. break;
  35. default:
  36. throw makeStringException(-1, "New suffix should be defined");
  37. }
  38. return keystr;
  39. }
  40. class CPersistentInfo : implements IInterface, public CInterface
  41. {
  42. friend class CPersistentHandler;
  43. friend class CAvailKeeper;
  44. public:
  45. IMPLEMENT_IINTERFACE;
  46. CPersistentInfo(bool _inUse, unsigned _timeUsed, unsigned _useCount, SocketEndpoint* _ep, PersistentProtocol _proto, ISocket* _sock)
  47. : inUse(_inUse), timeUsed(_timeUsed), useCount(_useCount), ep(_ep?(new SocketEndpoint(*_ep)):nullptr), proto(_proto), sock(_sock)
  48. {
  49. if(_ep)
  50. {
  51. _ep->getUrlStr(epstr);
  52. keystr.set(epstr);
  53. addKeySuffix(proto, keystr);
  54. }
  55. }
  56. protected:
  57. bool inUse;
  58. unsigned timeUsed;
  59. unsigned useCount;
  60. std::unique_ptr<SocketEndpoint> ep;
  61. StringBuffer epstr;
  62. StringBuffer keystr;
  63. PersistentProtocol proto;
  64. Linked<ISocket> sock;
  65. };
  66. struct LinkedPersistentInfoHash
  67. {
  68. size_t operator()(const Linked<CPersistentInfo>& linkedinfo) const
  69. {
  70. return std::hash<CPersistentInfo*>()(linkedinfo);
  71. }
  72. };
  73. using SocketSet = std::unordered_set<Linked<CPersistentInfo>, LinkedPersistentInfoHash>;
  74. using EpSocketSetMap = MapStringTo<OwnedPtr<SocketSet>, SocketSet*>;
  75. using EpSocketSetMapping = MappingStringTo<OwnedPtr<SocketSet>, SocketSet*>;
  76. class CAvailKeeper
  77. {
  78. private:
  79. SocketSet m_avail;
  80. EpSocketSetMap m_avail4ep;
  81. public:
  82. void add(CPersistentInfo* sockinfo)
  83. {
  84. findSet(sockinfo, true)->insert(sockinfo);
  85. if (m_avail4ep.count() > MAX_NO_CLEANUP_SOCKETSETS)
  86. cleanup();
  87. }
  88. void remove(CPersistentInfo* sockinfo)
  89. {
  90. SocketSet* sset = findSet(sockinfo);
  91. if (sset)
  92. sset->erase(sockinfo);
  93. }
  94. CPersistentInfo* get(SocketEndpoint* ep, PersistentProtocol proto)
  95. {
  96. SocketSet* sset = findSet(ep, proto);
  97. if (sset)
  98. {
  99. //The first available socket will suffice
  100. auto iter = sset->begin();
  101. if (iter != sset->end())
  102. {
  103. Linked<CPersistentInfo> info = *iter;
  104. sset->erase(iter);
  105. return info.getClear();
  106. }
  107. }
  108. return nullptr;
  109. }
  110. private:
  111. inline StringBuffer& calcKey(SocketEndpoint& ep, PersistentProtocol proto, StringBuffer& keystr)
  112. {
  113. ep.getUrlStr(keystr);
  114. return addKeySuffix(proto, keystr);
  115. }
  116. SocketSet* findSet(CPersistentInfo* info, bool create = false)
  117. {
  118. if (!info->ep.get())
  119. return &m_avail;
  120. return findSet(info->keystr.str(), create);
  121. }
  122. SocketSet* findSet(SocketEndpoint* ep, PersistentProtocol proto, bool create = false)
  123. {
  124. if (!ep)
  125. return &m_avail;
  126. StringBuffer keystr;
  127. calcKey(*ep, proto, keystr);
  128. return findSet(keystr.str(), create);
  129. }
  130. SocketSet* findSet(const char* key, bool create = false)
  131. {
  132. auto ptrptr = m_avail4ep.getValue(key);
  133. if (ptrptr)
  134. return *ptrptr;
  135. else if (create)
  136. {
  137. SocketSet* sset = new SocketSet();
  138. m_avail4ep.setValue(key, sset);
  139. return sset;
  140. }
  141. return nullptr;
  142. }
  143. void cleanup()
  144. {
  145. std::vector<EpSocketSetMapping*> elems;
  146. for (auto& e : m_avail4ep)
  147. {
  148. if (e.getValue()->empty())
  149. elems.push_back(&e);
  150. }
  151. for (auto& e : elems)
  152. m_avail4ep.removeExact(e);
  153. }
  154. };
  155. using SockInfoMap = MapBetween<Linked<ISocket>, ISocket*, Owned<CPersistentInfo>, CPersistentInfo*>;
  156. using StringIntMap = MapStringTo<int, int>;
  157. // Important data structures for the implementation:
  158. // m_selecHandler: used to detect incoming data on a socket or socket closure from the other end
  159. // m_infomap: keep track of the status of a socket, there's one entry for each reusable socket, no matter if the socket is being used or idle. The main purpose is to
  160. // keep track of the status and life span of the socket so that it can be recycled properly.
  161. // m_availkeeper: keep track of available sockets that can be assigned for reusing. It's a map between <endpoint, protocal> and the set of available sockets. The
  162. // main purpose is to speed up finding an available socket
  163. class CPersistentHandler : implements IPersistentHandler, implements ISocketSelectNotify, public Thread
  164. {
  165. private:
  166. static const int MAX_INFLIGHT_TIME = 1800;
  167. int m_maxIdleTime = DEFAULT_MAX_PERSISTENT_IDLE_TIME;
  168. int m_maxReqs = DEFAULT_MAX_PERSISTENT_REQUESTS;
  169. Owned<ISocketSelectHandler> m_selectHandler;
  170. IPersistentSelectNotify* m_notify;
  171. Semaphore m_waitsem;
  172. bool m_stop = false;
  173. SockInfoMap m_infomap;
  174. CAvailKeeper m_availkeeper;
  175. CriticalSection m_critsect;
  176. PersistentLogLevel m_loglevel = PersistentLogLevel::PLogNormal;
  177. static int CurID;
  178. int m_id = 0;
  179. bool m_enableDoNotReuseList = false;
  180. StringIntMap m_instantCloseCounts;
  181. StringIntMap m_doNotReuseList;
  182. public:
  183. IMPLEMENT_IINTERFACE;
  184. CPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel, bool enableDoNotReuseList)
  185. : m_stop(false), m_notify(notify), m_maxIdleTime(maxIdleTime), m_maxReqs(maxReqs), m_loglevel(loglevel), m_enableDoNotReuseList(enableDoNotReuseList)
  186. {
  187. m_id = ++CurID;
  188. m_selectHandler.setown(createSocketSelectHandler());
  189. }
  190. virtual ~CPersistentHandler()
  191. {
  192. }
  193. virtual void add(ISocket* sock, SocketEndpoint* ep = nullptr, PersistentProtocol proto = PersistentProtocol::ProtoTCP) override
  194. {
  195. if (!sock || !sock->isValid())
  196. return;
  197. PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: adding socket %d to handler %d", sock->OShandle(), m_id);
  198. CriticalBlock block(m_critsect);
  199. if (m_enableDoNotReuseList && ep != nullptr)
  200. {
  201. StringBuffer epstr;
  202. ep->getUrlStr(epstr);
  203. if(m_doNotReuseList.getValue(epstr.str()) != nullptr)
  204. {
  205. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: socket %d's target endpoint %s is in DoNotReuseList, will not add it.", sock->OShandle(), epstr.str());
  206. sock->shutdown();
  207. sock->close();
  208. return;
  209. }
  210. }
  211. m_selectHandler->add(sock, SELECTMODE_READ, this);
  212. Owned<CPersistentInfo> info = new CPersistentInfo(false, usTick()/1000, 0, ep, proto, sock);
  213. m_infomap.setValue(sock, info.getLink());
  214. m_availkeeper.add(info);
  215. }
  216. virtual void remove(ISocket* sock) override
  217. {
  218. if (!sock)
  219. return;
  220. PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Removing socket %d from handler %d", sock->OShandle(), m_id);
  221. CriticalBlock block(m_critsect);
  222. Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
  223. CPersistentInfo* info = nullptr;
  224. if (val)
  225. info = *val;
  226. bool removedFromSelectHandler = info && info->inUse; //If inUse sock was already removed from select handler
  227. if (info)
  228. {
  229. if (!info->inUse)
  230. m_availkeeper.remove(info);
  231. m_infomap.remove(sock);
  232. }
  233. if (!removedFromSelectHandler)
  234. m_selectHandler->remove(sock);
  235. }
  236. virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne) override
  237. {
  238. PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Done using socket %d, keep=%s", sock->OShandle(), boolToStr(keep));
  239. CriticalBlock block(m_critsect);
  240. Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
  241. CPersistentInfo* info = nullptr;
  242. if (val)
  243. info = *val;
  244. if (info)
  245. {
  246. info->useCount += usesOverOne;
  247. bool reachedQuota = m_maxReqs > 0 && m_maxReqs <= info->useCount;
  248. if(!sock->isValid())
  249. keep = false;
  250. if (keep && !reachedQuota)
  251. {
  252. info->inUse = false;
  253. info->timeUsed = usTick()/1000;
  254. m_selectHandler->add(sock, SELECTMODE_READ, this);
  255. m_availkeeper.add(info);
  256. }
  257. else
  258. {
  259. if (reachedQuota)
  260. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Socket %d reached quota", sock->OShandle());
  261. if(!keep)
  262. PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Indicated not to keep socket %d", sock->OShandle());
  263. remove(sock);
  264. }
  265. }
  266. }
  267. virtual ISocket* getAvailable(SocketEndpoint* ep = nullptr, bool* pShouldClose = nullptr, PersistentProtocol proto = PersistentProtocol::ProtoTCP) override
  268. {
  269. CriticalBlock block(m_critsect);
  270. Owned<CPersistentInfo> info = m_availkeeper.get(ep, proto);
  271. if (info)
  272. {
  273. Linked<ISocket> sock = info->sock;
  274. info->inUse = true;
  275. info->timeUsed = usTick()/1000;
  276. info->useCount++;
  277. if (pShouldClose != nullptr)
  278. *pShouldClose = m_maxReqs > 0 && m_maxReqs <= info->useCount;
  279. m_selectHandler->remove(sock);
  280. PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Obtained persistent socket %d from handler %d", info->sock->OShandle(), m_id);
  281. return sock.getClear();
  282. }
  283. return nullptr;
  284. }
  285. //ISocketSelectNotify
  286. bool notifySelected(ISocket *sock,unsigned selected) override
  287. {
  288. size32_t x = sock->avail_read();
  289. if (x == 0)
  290. {
  291. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Detected closing of connection %d from the other end", sock->OShandle());
  292. if (m_enableDoNotReuseList)
  293. {
  294. CriticalBlock block(m_critsect);
  295. Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
  296. CPersistentInfo* info = nullptr;
  297. if (val)
  298. info = *val;
  299. if (info && info->epstr.length() > 0)
  300. {
  301. int* countptr = m_instantCloseCounts.getValue(info->epstr.str());
  302. if (info->useCount == 0)
  303. {
  304. const static int MAX_INSTANT_CLOSES = 5;
  305. int count = 1;
  306. if (countptr)
  307. count = (*countptr)+1;
  308. if (count < MAX_INSTANT_CLOSES)
  309. m_instantCloseCounts.setValue(info->epstr.str(), count);
  310. else if (m_doNotReuseList.getValue(info->epstr.str()) == nullptr)
  311. {
  312. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Endpoint %s has instantly closed connection for %d times in a row, adding it to DoNotReuseList", info->epstr.str(), MAX_INSTANT_CLOSES);
  313. m_doNotReuseList.setValue(info->epstr.str(), 1);
  314. }
  315. }
  316. else if (countptr)
  317. m_instantCloseCounts.remove(info->epstr.str());
  318. }
  319. }
  320. remove(sock);
  321. }
  322. else if (m_notify != nullptr)
  323. {
  324. bool reachedQuota = false;
  325. bool ignore = false;
  326. Owned<ISocket> mysock(LINK(sock));
  327. PERSILOG(PersistentLogLevel::PLogMax, "Data arrived on persistent connection %d", sock->OShandle());
  328. {
  329. CriticalBlock block(m_critsect);
  330. Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
  331. CPersistentInfo* info = nullptr;
  332. if (val)
  333. info = *val;
  334. if (info)
  335. {
  336. m_availkeeper.remove(info);
  337. info->inUse = true;
  338. info->timeUsed = usTick()/1000;
  339. info->useCount++;
  340. reachedQuota = m_maxReqs > 0 && m_maxReqs <= info->useCount;
  341. }
  342. else
  343. {
  344. ignore = true;
  345. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: No info found for socket %d, ignore data", sock->OShandle());
  346. }
  347. m_selectHandler->remove(sock);
  348. }
  349. if (!ignore)
  350. m_notify->notifySelected(sock, selected, this, reachedQuota);
  351. }
  352. else
  353. {
  354. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Unexpected data received on connection %d, so discard the connection.", sock->OShandle());
  355. remove(sock);
  356. }
  357. return false;
  358. }
  359. //Thread
  360. virtual void start() override
  361. {
  362. m_selectHandler->start();
  363. Thread::start();
  364. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Handler %d started with max idle time %d and max requests %d", m_id, m_maxIdleTime, m_maxReqs);
  365. }
  366. virtual int run() override
  367. {
  368. while (true)
  369. {
  370. m_waitsem.wait(1000);
  371. if (m_stop)
  372. break;
  373. unsigned now = usTick()/1000;
  374. CriticalBlock block(m_critsect);
  375. std::vector<ISocket*> socks1;
  376. std::vector<ISocket*> socks2;
  377. for (auto& si:m_infomap)
  378. {
  379. CPersistentInfo* info = si.getValue();
  380. if (!info)
  381. continue;
  382. if(m_maxIdleTime > 0 && !info->inUse && info->timeUsed + m_maxIdleTime*1000 < now)
  383. socks1.push_back(*(ISocket**)(si.getKey()));
  384. if(info->inUse && info->timeUsed + MAX_INFLIGHT_TIME*1000 < now)
  385. socks2.push_back(*(ISocket**)(si.getKey()));
  386. }
  387. for (auto& s:socks1)
  388. {
  389. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Socket %d has been idle for %d seconds so remove it", s->OShandle(), m_maxIdleTime);
  390. remove(s);
  391. }
  392. for (auto& s:socks2)
  393. {
  394. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Socket %d has been in flight for %d seconds, remove it", s->OShandle(), MAX_INFLIGHT_TIME);
  395. remove(s);
  396. }
  397. }
  398. return 0;
  399. }
  400. virtual void stop(bool wait) override
  401. {
  402. m_selectHandler->stop(wait);
  403. m_stop = true;
  404. m_waitsem.signal();
  405. if(wait)
  406. join(1000);
  407. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Handler %d stopped", m_id);
  408. }
  409. virtual bool inDoNotReuseList(SocketEndpoint* ep)
  410. {
  411. if(!ep)
  412. return false;
  413. StringBuffer epstr;
  414. ep->getUrlStr(epstr);
  415. if(epstr.length()> 0 && m_doNotReuseList.getValue(epstr.str()) != nullptr)
  416. return true;
  417. return false;
  418. }
  419. };
  420. bool isHttpPersistable(const char* httpVer, const char* conHeader)
  421. {
  422. if (isEmptyString(httpVer))
  423. return false;
  424. if (!isEmptyString(conHeader))
  425. {
  426. if (strieq(conHeader, "close"))
  427. return false;
  428. else if (strieq(conHeader, "Keep-Alive"))
  429. return true;
  430. }
  431. return !streq(httpVer, "1.0");
  432. }
  433. int CPersistentHandler::CurID = 0;
  434. IPersistentHandler* createPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel, bool enableDoNotReuseList)
  435. {
  436. Owned<CPersistentHandler> handler = new CPersistentHandler(notify, maxIdleTime, maxReqs, loglevel, enableDoNotReuseList);
  437. handler->start();
  438. return handler.getClear();
  439. }