persistent.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "persistent.hpp"
  14. #include "jthread.hpp"
  15. #include "jdebug.hpp"
  16. #include "jlog.hpp"
  17. #include <memory>
  18. #ifdef _WIN32
  19. #include <windows.h>
  20. #include <winsock2.h>
  21. #include <ws2tcpip.h>
  22. #endif
  23. #define PERSILOG(loglevel, ...) if(static_cast<int>(loglevel) <= static_cast<int>(m_loglevel)) DBGLOG(__VA_ARGS__)
  24. class CPersistentInfo : implements IInterface, public CInterface
  25. {
  26. friend class CPersistentHandler;
  27. public:
  28. IMPLEMENT_IINTERFACE;
  29. CPersistentInfo(bool _inUse, unsigned _timeUsed, unsigned _useCount, SocketEndpoint* _ep)
  30. : inUse(_inUse), timeUsed(_timeUsed), useCount(_useCount), ep(_ep?(new SocketEndpoint(*_ep)):nullptr)
  31. {
  32. if(_ep)
  33. _ep->getUrlStr(epstr);
  34. }
  35. virtual ~CPersistentInfo() { } //TODO remove trace
  36. protected:
  37. bool inUse;
  38. unsigned timeUsed;
  39. unsigned useCount;
  40. std::unique_ptr<SocketEndpoint> ep;
  41. StringBuffer epstr;
  42. };
  43. using SockInfoMap = MapBetween<Linked<ISocket>, ISocket*, Owned<CPersistentInfo>, CPersistentInfo*>;
  44. using StringIntMap = MapStringTo<int, int>;
  45. class CPersistentHandler : implements IPersistentHandler, implements ISocketSelectNotify, public Thread
  46. {
  47. private:
  48. static const int MAX_INFLIGHT_TIME = 1800;
  49. int m_maxIdleTime = DEFAULT_MAX_PERSISTENT_IDLE_TIME;
  50. int m_maxReqs = DEFAULT_MAX_PERSISTENT_REQUESTS;
  51. Owned<ISocketSelectHandler> m_selectHandler;
  52. IPersistentSelectNotify* m_notify;
  53. Semaphore m_waitsem;
  54. bool m_stop = false;
  55. SockInfoMap m_infomap;
  56. Mutex m_mutex;
  57. PersistentLogLevel m_loglevel = PersistentLogLevel::PLogNormal;
  58. static int CurID;
  59. int m_id = 0;
  60. bool m_enableDoNotReuseList = false;
  61. StringIntMap m_instantCloseCounts;
  62. StringIntMap m_doNotReuseList;
  63. public:
  64. IMPLEMENT_IINTERFACE;
  65. CPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel, bool enableDoNotReuseList)
  66. : m_stop(false), m_notify(notify), m_maxIdleTime(maxIdleTime), m_maxReqs(maxReqs), m_loglevel(loglevel), m_enableDoNotReuseList(enableDoNotReuseList)
  67. {
  68. m_id = ++CurID;
  69. m_selectHandler.setown(createSocketSelectHandler());
  70. }
  71. virtual ~CPersistentHandler()
  72. {
  73. }
  74. virtual void add(ISocket* sock, SocketEndpoint* ep = nullptr) override
  75. {
  76. if (!sock || sock->OShandle() == INVALID_SOCKET)
  77. return;
  78. synchronized block(m_mutex);
  79. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: adding socket %d to handler %d", sock->OShandle(), m_id);
  80. if (m_enableDoNotReuseList && ep != nullptr)
  81. {
  82. StringBuffer epstr;
  83. ep->getUrlStr(epstr);
  84. if(m_doNotReuseList.getValue(epstr.str()) != nullptr)
  85. {
  86. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: socket %d's target endpoint %s is in DoNotReuseList, will not add it.", sock->OShandle(), epstr.str());
  87. sock->shutdown();
  88. sock->close();
  89. return;
  90. }
  91. }
  92. m_selectHandler->add(sock, SELECTMODE_READ, this);
  93. m_infomap.setValue(sock, new CPersistentInfo(false, usTick()/1000, 0, ep));
  94. }
  95. virtual void remove(ISocket* sock) override
  96. {
  97. if (!sock)
  98. return;
  99. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Removing socket %d from handler %d", sock->OShandle(), m_id);
  100. synchronized block(m_mutex);
  101. Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
  102. CPersistentInfo* info = nullptr;
  103. if (val)
  104. info = *val;
  105. bool removedFromSelectHandler = info && info->inUse; //If inUse sock was already removed from select handler
  106. if (info)
  107. m_infomap.remove(sock);
  108. if (!removedFromSelectHandler)
  109. m_selectHandler->remove(sock);
  110. }
  111. virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne) override
  112. {
  113. synchronized block(m_mutex);
  114. Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
  115. CPersistentInfo* info = nullptr;
  116. if (val)
  117. info = *val;
  118. if (info)
  119. {
  120. info->useCount += usesOverOne;
  121. bool reachedQuota = m_maxReqs > 0 && m_maxReqs <= info->useCount;
  122. if(sock->OShandle() == INVALID_SOCKET)
  123. keep = false;
  124. if (keep && !reachedQuota)
  125. {
  126. info->inUse = false;
  127. info->timeUsed = usTick()/1000;
  128. m_selectHandler->add(sock, SELECTMODE_READ, this);
  129. }
  130. else
  131. {
  132. if (reachedQuota)
  133. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Socket %d reached quota", sock->OShandle());
  134. if(!keep)
  135. PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Indicated not to keep socket %d", sock->OShandle());
  136. remove(sock);
  137. }
  138. }
  139. }
  140. virtual Linked<ISocket> getAvailable(SocketEndpoint* ep = nullptr, bool* pShouldClose = nullptr) override
  141. {
  142. synchronized block(m_mutex);
  143. for (auto& si:m_infomap)
  144. {
  145. CPersistentInfo* info = si.getValue();
  146. if (info && !info->inUse && (ep == nullptr || (info->ep != nullptr && *(info->ep) == *ep)))
  147. {
  148. ISocket* sock = *(ISocket**)(si.getKey());
  149. if (sock)
  150. {
  151. info->inUse = true;
  152. info->timeUsed = usTick()/1000;
  153. info->useCount++;
  154. if (pShouldClose != nullptr)
  155. *pShouldClose = m_maxReqs > 0 && m_maxReqs <= info->useCount;
  156. m_selectHandler->remove(sock);
  157. PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Obtained persistent socket %d from handler %d", sock->OShandle(), m_id);
  158. return sock;
  159. }
  160. }
  161. }
  162. return nullptr;
  163. }
  164. //ISocketSelectNotify
  165. bool notifySelected(ISocket *sock,unsigned selected) override
  166. {
  167. size32_t x = sock->avail_read();
  168. if (x == 0)
  169. {
  170. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Detected closing of connection %d from the other end", sock->OShandle());
  171. if (m_enableDoNotReuseList)
  172. {
  173. synchronized block(m_mutex);
  174. Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
  175. CPersistentInfo* info = nullptr;
  176. if (val)
  177. info = *val;
  178. if (info && info->epstr.length() > 0)
  179. {
  180. int* countptr = m_instantCloseCounts.getValue(info->epstr.str());
  181. if (info->useCount == 0)
  182. {
  183. const static int MAX_INSTANT_CLOSES = 5;
  184. int count = 1;
  185. if (countptr)
  186. count = (*countptr)+1;
  187. if (count < MAX_INSTANT_CLOSES)
  188. m_instantCloseCounts.setValue(info->epstr.str(), count);
  189. else if (m_doNotReuseList.getValue(info->epstr.str()) == nullptr)
  190. {
  191. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Endpoint %s has instantly closed connection for %d times in a row, adding it to DoNotReuseList", info->epstr.str(), MAX_INSTANT_CLOSES);
  192. m_doNotReuseList.setValue(info->epstr.str(), 1);
  193. }
  194. }
  195. else if (countptr)
  196. m_instantCloseCounts.remove(info->epstr.str());
  197. }
  198. }
  199. remove(sock);
  200. }
  201. else if (m_notify != nullptr)
  202. {
  203. bool reachedQuota = false;
  204. bool ignore = false;
  205. Owned<ISocket> mysock(LINK(sock));
  206. PERSILOG(PersistentLogLevel::PLogMax, "Data arrived on persistent connection %d", sock->OShandle());
  207. {
  208. synchronized block(m_mutex);
  209. Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
  210. CPersistentInfo* info = nullptr;
  211. if (val)
  212. info = *val;
  213. if (info)
  214. {
  215. info->inUse = true;
  216. info->timeUsed = usTick()/1000;
  217. info->useCount++;
  218. reachedQuota = m_maxReqs > 0 && m_maxReqs <= info->useCount;
  219. }
  220. else
  221. {
  222. ignore = true;
  223. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: No info found for socket %d, ignore data", sock->OShandle());
  224. }
  225. m_selectHandler->remove(sock);
  226. }
  227. if (!ignore)
  228. m_notify->notifySelected(sock, selected, this, reachedQuota);
  229. }
  230. return false;
  231. }
  232. //Thread
  233. virtual void start() override
  234. {
  235. m_selectHandler->start();
  236. Thread::start();
  237. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Handler %d started with max idle time %d and max requests %d", m_id, m_maxIdleTime, m_maxReqs);
  238. }
  239. virtual int run() override
  240. {
  241. while (true)
  242. {
  243. m_waitsem.wait(1000);
  244. if (m_stop)
  245. break;
  246. unsigned now = usTick()/1000;
  247. synchronized block(m_mutex);
  248. std::vector<ISocket*> socks1;
  249. std::vector<ISocket*> socks2;
  250. for (auto& si:m_infomap)
  251. {
  252. CPersistentInfo* info = si.getValue();
  253. if (!info)
  254. continue;
  255. if(m_maxIdleTime > 0 && !info->inUse && info->timeUsed + m_maxIdleTime*1000 < now)
  256. socks1.push_back(*(ISocket**)(si.getKey()));
  257. if(info->inUse && info->timeUsed + MAX_INFLIGHT_TIME*1000 < now)
  258. socks2.push_back(*(ISocket**)(si.getKey()));
  259. }
  260. for (auto& s:socks1)
  261. {
  262. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Socket %d has been idle for %d seconds so remove it", s->OShandle(), m_maxIdleTime);
  263. remove(s);
  264. }
  265. for (auto& s:socks2)
  266. {
  267. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Socket %d has been in flight for %d seconds, remove it", s->OShandle(), MAX_INFLIGHT_TIME);
  268. remove(s);
  269. }
  270. }
  271. return 0;
  272. }
  273. virtual void stop(bool wait) override
  274. {
  275. m_selectHandler->stop(wait);
  276. m_stop = true;
  277. m_waitsem.signal();
  278. if(wait)
  279. join(1000);
  280. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Handler %d stopped", m_id);
  281. }
  282. virtual bool inDoNotReuseList(SocketEndpoint* ep)
  283. {
  284. if(!ep)
  285. return false;
  286. StringBuffer epstr;
  287. ep->getUrlStr(epstr);
  288. if(epstr.length()> 0 && m_doNotReuseList.getValue(epstr.str()) != nullptr)
  289. return true;
  290. return false;
  291. }
  292. };
  293. int CPersistentHandler::CurID = 0;
  294. IPersistentHandler* createPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel, bool enableDoNotReuseList)
  295. {
  296. Owned<CPersistentHandler> handler = new CPersistentHandler(notify, maxIdleTime, maxReqs, loglevel, enableDoNotReuseList);
  297. handler->start();
  298. return handler.getClear();
  299. }