persistent.cpp 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "persistent.hpp"
  14. #include "jthread.hpp"
  15. #include "jdebug.hpp"
  16. #include "jlog.hpp"
  17. #include <memory>
  18. #define PERSILOG(loglevel, ...) if(static_cast<int>(loglevel) <= static_cast<int>(m_loglevel)) DBGLOG(__VA_ARGS__)
  19. class CPersistentInfo : implements IInterface, public CInterface
  20. {
  21. friend class CPersistentHandler;
  22. public:
  23. IMPLEMENT_IINTERFACE;
  24. CPersistentInfo(bool _inUse, int _timeUsed, int _useCount, SocketEndpoint* _ep)
  25. : inUse(_inUse), timeUsed(_timeUsed), useCount(_useCount), ep(_ep?(new SocketEndpoint(*_ep)):nullptr)
  26. {
  27. }
  28. virtual ~CPersistentInfo() { } //TODO remove trace
  29. protected:
  30. bool inUse;
  31. int timeUsed;
  32. int useCount;
  33. std::unique_ptr<SocketEndpoint> ep;
  34. };
  35. using SockInfoMap = MapBetween<Linked<ISocket>, ISocket*, Owned<CPersistentInfo>, CPersistentInfo*>;
  36. class CPersistentHandler : implements IPersistentHandler, implements ISocketSelectNotify, public Thread
  37. {
  38. private:
  39. static const int MAX_INFLIGHT_TIME = 600;
  40. int m_maxIdleTime;
  41. int m_maxReqs;
  42. Owned<ISocketSelectHandler> m_selectHandler;
  43. IPersistentSelectNotify* m_notify;
  44. Semaphore m_waitsem;
  45. bool m_stop;
  46. SockInfoMap m_infomap;
  47. Mutex m_mutex;
  48. PersistentLogLevel m_loglevel;
  49. static int CurID;
  50. int m_id = 0;
  51. public:
  52. IMPLEMENT_IINTERFACE;
  53. CPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel)
  54. : m_stop(false), m_notify(notify), m_maxIdleTime(maxIdleTime), m_maxReqs(maxReqs), m_loglevel(loglevel)
  55. {
  56. m_id = ++CurID;
  57. m_selectHandler.setown(createSocketSelectHandler());
  58. }
  59. virtual ~CPersistentHandler()
  60. {
  61. }
  62. virtual void add(ISocket* sock, SocketEndpoint* ep = nullptr) override
  63. {
  64. if (!sock)
  65. return;
  66. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: adding socket %d to handler %d", sock->OShandle(), m_id);
  67. synchronized block(m_mutex);
  68. m_selectHandler->add(sock, SELECTMODE_READ, this);
  69. m_infomap.setValue(sock, new CPersistentInfo(false, usTick()/1000, 0, ep));
  70. }
  71. virtual void remove(ISocket* sock) override
  72. {
  73. if (!sock)
  74. return;
  75. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Removing socket %d from handler %d", sock->OShandle(), m_id);
  76. synchronized block(m_mutex);
  77. Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
  78. CPersistentInfo* info = nullptr;
  79. if (val)
  80. info = *val;
  81. if (!info || !info->inUse) //If inUse sock was already removed from select handler
  82. m_selectHandler->remove(sock);
  83. if (info)
  84. m_infomap.remove(sock);
  85. }
  86. virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne) override
  87. {
  88. synchronized block(m_mutex);
  89. Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
  90. CPersistentInfo* info = nullptr;
  91. if (val)
  92. info = *val;
  93. if (info)
  94. {
  95. info->useCount += usesOverOne;
  96. bool quotaReached = m_maxReqs > 0 && m_maxReqs <= info->useCount;
  97. if (keep && !quotaReached)
  98. {
  99. info->inUse = false;
  100. info->timeUsed = usTick()/1000;
  101. m_selectHandler->add(sock, SELECTMODE_READ, this);
  102. }
  103. else
  104. {
  105. if (quotaReached)
  106. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Socket %d reached quota", sock->OShandle());
  107. if(!keep)
  108. PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Indicated not to keep socket %d", sock->OShandle());
  109. remove(sock);
  110. }
  111. }
  112. }
  113. virtual Linked<ISocket> getAvailable(SocketEndpoint* ep = nullptr) override
  114. {
  115. synchronized block(m_mutex);
  116. for (auto si:m_infomap)
  117. {
  118. CPersistentInfo* info = si.getValue();
  119. if (info && !info->inUse && (ep == nullptr || (info->ep != nullptr && *(info->ep) == *ep)))
  120. {
  121. ISocket* sock = *(ISocket**)(si.getKey());
  122. if (sock)
  123. {
  124. info->inUse = true;
  125. info->timeUsed = usTick()/1000;
  126. info->useCount++;
  127. m_selectHandler->remove(sock);
  128. PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Obtained persistent socket %d from handler %d", sock->OShandle(), m_id);
  129. return sock;
  130. }
  131. }
  132. }
  133. return nullptr;
  134. }
  135. //ISocketSelectNotify
  136. bool notifySelected(ISocket *sock,unsigned selected) override
  137. {
  138. size32_t x = sock->avail_read();
  139. if (x == 0)
  140. {
  141. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Detected closing of connection %d from the other end", sock->OShandle());
  142. remove(sock);
  143. }
  144. else if (m_notify != nullptr)
  145. {
  146. bool ignore = false;
  147. Owned<ISocket> mysock(LINK(sock));
  148. PERSILOG(PersistentLogLevel::PLogMax, "Data arrived on persistent connection %d", sock->OShandle());
  149. {
  150. synchronized block(m_mutex);
  151. Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
  152. CPersistentInfo* info = nullptr;
  153. if (val)
  154. info = *val;
  155. if (info)
  156. {
  157. info->inUse = true;
  158. info->timeUsed = usTick()/1000;
  159. info->useCount++;
  160. }
  161. else
  162. {
  163. ignore = true;
  164. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: No info found for socket %d, ignore data", sock->OShandle());
  165. }
  166. m_selectHandler->remove(sock);
  167. }
  168. if (!ignore)
  169. m_notify->notifySelected(sock, selected, this);
  170. }
  171. return false;
  172. }
  173. //Thread
  174. virtual void start() override
  175. {
  176. m_selectHandler->start();
  177. Thread::start();
  178. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Handler %d started with max idle time %d and max requests %d", m_id, m_maxIdleTime, m_maxReqs);
  179. }
  180. virtual int run() override
  181. {
  182. while (true)
  183. {
  184. m_waitsem.wait(1000);
  185. if (m_stop)
  186. break;
  187. unsigned now = usTick()/1000;
  188. unsigned oldest = now - m_maxIdleTime*1000;
  189. unsigned oldest_inflight = now - MAX_INFLIGHT_TIME*1000;
  190. synchronized block(m_mutex);
  191. std::vector<ISocket*> socks1;
  192. std::vector<ISocket*> socks2;
  193. for (auto& si:m_infomap)
  194. {
  195. CPersistentInfo* info = si.getValue();
  196. if (!info)
  197. continue;
  198. if(m_maxIdleTime > 0 && !info->inUse && info->timeUsed < oldest)
  199. socks1.push_back(*(ISocket**)(si.getKey()));
  200. if(info->inUse && info->timeUsed < oldest_inflight)
  201. socks2.push_back(*(ISocket**)(si.getKey()));
  202. }
  203. for (auto s:socks1)
  204. {
  205. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Socket %d has been idle for %d seconds so remove it", s->OShandle(), m_maxIdleTime);
  206. remove(s);
  207. }
  208. for (auto s:socks2)
  209. {
  210. PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Socket %d has been in flight for %d seconds, remove it", s->OShandle(), MAX_INFLIGHT_TIME);
  211. remove(s);
  212. }
  213. }
  214. return 0;
  215. }
  216. virtual void stop(bool wait) override
  217. {
  218. m_selectHandler->stop(wait);
  219. m_stop = true;
  220. m_waitsem.signal();
  221. if(wait)
  222. join(1000);
  223. PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Handler %d stopped", m_id);
  224. }
  225. };
  226. int CPersistentHandler::CurID = 0;
  227. IPersistentHandler* createPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel)
  228. {
  229. Owned<CPersistentHandler> handler = new CPersistentHandler(notify, maxIdleTime, maxReqs, loglevel);
  230. handler->start();
  231. return handler.getClear();
  232. }