persistent.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "persistent.hpp"
  14. #include "jthread.hpp"
  15. #include "jdebug.hpp"
  16. #include "jlog.hpp"
  17. #include <memory>
  18. #define PERSILOG(loglevel, ...) if(static_cast<int>(loglevel) <= static_cast<int>(m_loglevel)) DBGLOG(__VA_ARGS__)
  19. class CPersistentInfo : implements IInterface, public CInterface
  20. {
  21. friend class CPersistentHandler;
  22. public:
  23. IMPLEMENT_IINTERFACE;
  24. CPersistentInfo(bool _inUse, unsigned _timeUsed, unsigned _useCount, SocketEndpoint* _ep)
  25. : inUse(_inUse), timeUsed(_timeUsed), useCount(_useCount), ep(_ep?(new SocketEndpoint(*_ep)):nullptr)
  26. {
  27. if(_ep)
  28. _ep->getUrlStr(epstr);
  29. }
  30. virtual ~CPersistentInfo() { } //TODO remove trace
  31. protected:
  32. bool inUse;
  33. unsigned timeUsed;
  34. unsigned useCount;
  35. std::unique_ptr<SocketEndpoint> ep;
  36. StringBuffer epstr;
  37. };
  38. using SockInfoMap = MapBetween<Linked<ISocket>, ISocket*, Owned<CPersistentInfo>, CPersistentInfo*>;
  39. using StringIntMap = MapStringTo<int, int>;
  40. class CPersistentHandler : implements IPersistentHandler, implements ISocketSelectNotify, public Thread
  41. {
  42. private:
  43. static const int MAX_INFLIGHT_TIME = 1800;
  44. int m_maxIdleTime = DEFAULT_MAX_PERSISTENT_IDLE_TIME;
  45. int m_maxReqs = DEFAULT_MAX_PERSISTENT_REQUESTS;
  46. Owned<ISocketSelectHandler> m_selectHandler;
  47. IPersistentSelectNotify* m_notify;
  48. Semaphore m_waitsem;
  49. bool m_stop = false;
  50. SockInfoMap m_infomap;
  51. Mutex m_mutex;
  52. PersistentLogLevel m_loglevel = PersistentLogLevel::PLogNormal;
  53. static int CurID;
  54. int m_id = 0;
  55. bool m_enableDoNotReuseList = false;
  56. StringIntMap m_instantCloseCounts;
  57. StringIntMap m_doNotReuseList;
  58. public:
  59. IMPLEMENT_IINTERFACE;
  60. CPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel, bool enableDoNotReuseList)
  61. : m_stop(false), m_notify(notify), m_maxIdleTime(maxIdleTime), m_maxReqs(maxReqs), m_loglevel(loglevel), m_enableDoNotReuseList(enableDoNotReuseList)
  62. {
  63. m_id = ++CurID;
  64. m_selectHandler.setown(createSocketSelectHandler());
  65. }
  66. virtual ~CPersistentHandler()
  67. {
  68. }
  69. virtual void add(ISocket* sock, SocketEndpoint* ep = nullptr) override
  70. {
  71. if (!sock)
  72. return;
  73. synchronized block(m_mutex);
  74. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: adding socket %d to handler %d", sock->OShandle(), m_id);
  75. if (m_enableDoNotReuseList && ep != nullptr)
  76. {
  77. StringBuffer epstr;
  78. ep->getUrlStr(epstr);
  79. if(m_doNotReuseList.getValue(epstr.str()) != nullptr)
  80. {
  81. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: socket %d's target endpoint %s is in DoNotReuseList, will not add it.", sock->OShandle(), epstr.str());
  82. sock->shutdown();
  83. sock->close();
  84. return;
  85. }
  86. }
  87. m_selectHandler->add(sock, SELECTMODE_READ, this);
  88. m_infomap.setValue(sock, new CPersistentInfo(false, usTick()/1000, 0, ep));
  89. }
  90. virtual void remove(ISocket* sock) override
  91. {
  92. if (!sock)
  93. return;
  94. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Removing socket %d from handler %d", sock->OShandle(), m_id);
  95. synchronized block(m_mutex);
  96. Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
  97. CPersistentInfo* info = nullptr;
  98. if (val)
  99. info = *val;
  100. bool removedFromSelectHandler = info && info->inUse; //If inUse sock was already removed from select handler
  101. if (info)
  102. m_infomap.remove(sock);
  103. if (!removedFromSelectHandler)
  104. m_selectHandler->remove(sock);
  105. }
  106. virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne) override
  107. {
  108. synchronized block(m_mutex);
  109. Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
  110. CPersistentInfo* info = nullptr;
  111. if (val)
  112. info = *val;
  113. if (info)
  114. {
  115. info->useCount += usesOverOne;
  116. bool quotaReached = m_maxReqs > 0 && m_maxReqs <= info->useCount;
  117. if (keep && !quotaReached)
  118. {
  119. info->inUse = false;
  120. info->timeUsed = usTick()/1000;
  121. m_selectHandler->add(sock, SELECTMODE_READ, this);
  122. }
  123. else
  124. {
  125. if (quotaReached)
  126. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Socket %d reached quota", sock->OShandle());
  127. if(!keep)
  128. PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Indicated not to keep socket %d", sock->OShandle());
  129. remove(sock);
  130. }
  131. }
  132. }
  133. virtual Linked<ISocket> getAvailable(SocketEndpoint* ep = nullptr) override
  134. {
  135. synchronized block(m_mutex);
  136. for (auto si:m_infomap)
  137. {
  138. CPersistentInfo* info = si.getValue();
  139. if (info && !info->inUse && (ep == nullptr || (info->ep != nullptr && *(info->ep) == *ep)))
  140. {
  141. ISocket* sock = *(ISocket**)(si.getKey());
  142. if (sock)
  143. {
  144. info->inUse = true;
  145. info->timeUsed = usTick()/1000;
  146. info->useCount++;
  147. m_selectHandler->remove(sock);
  148. PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Obtained persistent socket %d from handler %d", sock->OShandle(), m_id);
  149. return sock;
  150. }
  151. }
  152. }
  153. return nullptr;
  154. }
  155. //ISocketSelectNotify
  156. bool notifySelected(ISocket *sock,unsigned selected) override
  157. {
  158. size32_t x = sock->avail_read();
  159. if (x == 0)
  160. {
  161. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Detected closing of connection %d from the other end", sock->OShandle());
  162. if (m_enableDoNotReuseList)
  163. {
  164. synchronized block(m_mutex);
  165. Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
  166. CPersistentInfo* info = nullptr;
  167. if (val)
  168. info = *val;
  169. if (info && info->epstr.length() > 0)
  170. {
  171. int* countptr = m_instantCloseCounts.getValue(info->epstr.str());
  172. if (info->useCount == 0)
  173. {
  174. const static int MAX_INSTANT_CLOSES = 5;
  175. int count = 1;
  176. if (countptr)
  177. count = (*countptr)+1;
  178. if (count < MAX_INSTANT_CLOSES)
  179. m_instantCloseCounts.setValue(info->epstr.str(), count);
  180. else if (m_doNotReuseList.getValue(info->epstr.str()) == nullptr)
  181. {
  182. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Endpoint %s has instantly closed connection for %d times in a row, adding it to DoNotReuseList", info->epstr.str(), MAX_INSTANT_CLOSES);
  183. m_doNotReuseList.setValue(info->epstr.str(), 1);
  184. }
  185. }
  186. else if (countptr)
  187. m_instantCloseCounts.remove(info->epstr.str());
  188. }
  189. }
  190. remove(sock);
  191. }
  192. else if (m_notify != nullptr)
  193. {
  194. bool ignore = false;
  195. Owned<ISocket> mysock(LINK(sock));
  196. PERSILOG(PersistentLogLevel::PLogMax, "Data arrived on persistent connection %d", sock->OShandle());
  197. {
  198. synchronized block(m_mutex);
  199. Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
  200. CPersistentInfo* info = nullptr;
  201. if (val)
  202. info = *val;
  203. if (info)
  204. {
  205. info->inUse = true;
  206. info->timeUsed = usTick()/1000;
  207. info->useCount++;
  208. }
  209. else
  210. {
  211. ignore = true;
  212. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: No info found for socket %d, ignore data", sock->OShandle());
  213. }
  214. m_selectHandler->remove(sock);
  215. }
  216. if (!ignore)
  217. m_notify->notifySelected(sock, selected, this);
  218. }
  219. return false;
  220. }
  221. //Thread
  222. virtual void start() override
  223. {
  224. m_selectHandler->start();
  225. Thread::start();
  226. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Handler %d started with max idle time %d and max requests %d", m_id, m_maxIdleTime, m_maxReqs);
  227. }
  228. virtual int run() override
  229. {
  230. while (true)
  231. {
  232. m_waitsem.wait(1000);
  233. if (m_stop)
  234. break;
  235. unsigned now = usTick()/1000;
  236. synchronized block(m_mutex);
  237. std::vector<ISocket*> socks1;
  238. std::vector<ISocket*> socks2;
  239. for (auto& si:m_infomap)
  240. {
  241. CPersistentInfo* info = si.getValue();
  242. if (!info)
  243. continue;
  244. if(m_maxIdleTime > 0 && !info->inUse && info->timeUsed + m_maxIdleTime*1000 < now)
  245. socks1.push_back(*(ISocket**)(si.getKey()));
  246. if(info->inUse && info->timeUsed + MAX_INFLIGHT_TIME*1000 < now)
  247. socks2.push_back(*(ISocket**)(si.getKey()));
  248. }
  249. for (auto s:socks1)
  250. {
  251. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Socket %d has been idle for %d seconds so remove it", s->OShandle(), m_maxIdleTime);
  252. remove(s);
  253. }
  254. for (auto s:socks2)
  255. {
  256. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Socket %d has been in flight for %d seconds, remove it", s->OShandle(), MAX_INFLIGHT_TIME);
  257. remove(s);
  258. }
  259. }
  260. return 0;
  261. }
  262. virtual void stop(bool wait) override
  263. {
  264. m_selectHandler->stop(wait);
  265. m_stop = true;
  266. m_waitsem.signal();
  267. if(wait)
  268. join(1000);
  269. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Handler %d stopped", m_id);
  270. }
  271. virtual bool inDoNotReuseList(SocketEndpoint* ep)
  272. {
  273. if(!ep)
  274. return false;
  275. StringBuffer epstr;
  276. ep->getUrlStr(epstr);
  277. if(epstr.length()> 0 && m_doNotReuseList.getValue(epstr.str()) != nullptr)
  278. return true;
  279. return false;
  280. }
  281. };
  282. int CPersistentHandler::CurID = 0;
  283. IPersistentHandler* createPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel, bool enableDoNotReuseList)
  284. {
  285. Owned<CPersistentHandler> handler = new CPersistentHandler(notify, maxIdleTime, maxReqs, loglevel, enableDoNotReuseList);
  286. handler->start();
  287. return handler.getClear();
  288. }