123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "persistent.hpp"
- #include "jthread.hpp"
- #include "jdebug.hpp"
- #include "jlog.hpp"
- #include <memory>
- #include <unordered_set>
- #ifdef _WIN32
- #include <windows.h>
- #include <winsock2.h>
- #include <ws2tcpip.h>
- #endif
- #define PERSILOG(loglevel, ...) if(static_cast<int>(loglevel) <= static_cast<int>(m_loglevel)) DBGLOG(__VA_ARGS__)
- #define MAX_NO_CLEANUP_SOCKETSETS 1000
- static inline StringBuffer& addKeySuffix(PersistentProtocol proto, StringBuffer& keystr)
- {
- switch (proto)
- {
- case PersistentProtocol::ProtoTCP:
- break;
- case PersistentProtocol::ProtoTLS:
- keystr.append('~');
- break;
- default:
- throw makeStringException(-1, "New suffix should be defined");
- }
- return keystr;
- }
- class CPersistentInfo : implements IInterface, public CInterface
- {
- friend class CPersistentHandler;
- friend class CAvailKeeper;
- public:
- IMPLEMENT_IINTERFACE;
- CPersistentInfo(bool _inUse, unsigned _timeUsed, unsigned _useCount, SocketEndpoint* _ep, PersistentProtocol _proto, ISocket* _sock)
- : inUse(_inUse), timeUsed(_timeUsed), useCount(_useCount), ep(_ep?(new SocketEndpoint(*_ep)):nullptr), proto(_proto), sock(_sock)
- {
- if(_ep)
- {
- _ep->getUrlStr(epstr);
- keystr.set(epstr);
- addKeySuffix(proto, keystr);
- }
- }
- protected:
- bool inUse;
- unsigned timeUsed;
- unsigned useCount;
- std::unique_ptr<SocketEndpoint> ep;
- StringBuffer epstr;
- StringBuffer keystr;
- PersistentProtocol proto;
- Linked<ISocket> sock;
- };
- struct LinkedPersistentInfoHash
- {
- size_t operator()(const Linked<CPersistentInfo>& linkedinfo) const
- {
- return std::hash<CPersistentInfo*>()(linkedinfo);
- }
- };
- using SocketSet = std::unordered_set<Linked<CPersistentInfo>, LinkedPersistentInfoHash>;
- using EpSocketSetMap = MapStringTo<OwnedPtr<SocketSet>, SocketSet*>;
- using EpSocketSetMapping = MappingStringTo<OwnedPtr<SocketSet>, SocketSet*>;
- class CAvailKeeper
- {
- private:
- SocketSet m_avail;
- EpSocketSetMap m_avail4ep;
- public:
- void add(CPersistentInfo* sockinfo)
- {
- findSet(sockinfo, true)->insert(sockinfo);
- if (m_avail4ep.count() > MAX_NO_CLEANUP_SOCKETSETS)
- cleanup();
- }
- void remove(CPersistentInfo* sockinfo)
- {
- SocketSet* sset = findSet(sockinfo);
- if (sset)
- sset->erase(sockinfo);
- }
- CPersistentInfo* get(SocketEndpoint* ep, PersistentProtocol proto)
- {
- SocketSet* sset = findSet(ep, proto);
- if (sset)
- {
- //The first available socket will suffice
- auto iter = sset->begin();
- if (iter != sset->end())
- {
- Linked<CPersistentInfo> info = *iter;
- sset->erase(iter);
- return info.getClear();
- }
- }
- return nullptr;
- }
- private:
- inline StringBuffer& calcKey(SocketEndpoint& ep, PersistentProtocol proto, StringBuffer& keystr)
- {
- ep.getUrlStr(keystr);
- return addKeySuffix(proto, keystr);
- }
- SocketSet* findSet(CPersistentInfo* info, bool create = false)
- {
- if (!info->ep.get())
- return &m_avail;
- return findSet(info->keystr.str(), create);
- }
- SocketSet* findSet(SocketEndpoint* ep, PersistentProtocol proto, bool create = false)
- {
- if (!ep)
- return &m_avail;
- StringBuffer keystr;
- calcKey(*ep, proto, keystr);
- return findSet(keystr.str(), create);
- }
- SocketSet* findSet(const char* key, bool create = false)
- {
- auto ptrptr = m_avail4ep.getValue(key);
- if (ptrptr)
- return *ptrptr;
- else if (create)
- {
- SocketSet* sset = new SocketSet();
- m_avail4ep.setValue(key, sset);
- return sset;
- }
- return nullptr;
- }
- void cleanup()
- {
- std::vector<EpSocketSetMapping*> elems;
- for (auto& e : m_avail4ep)
- {
- if (e.getValue()->empty())
- elems.push_back(&e);
- }
- for (auto& e : elems)
- m_avail4ep.removeExact(e);
- }
- };
- using SockInfoMap = MapBetween<Linked<ISocket>, ISocket*, Owned<CPersistentInfo>, CPersistentInfo*>;
- using StringIntMap = MapStringTo<int, int>;
- // Important data structures for the implementation:
- // m_selecHandler: used to detect incoming data on a socket or socket closure from the other end
- // m_infomap: keep track of the status of a socket, there's one entry for each reusable socket, no matter if the socket is being used or idle. The main purpose is to
- // keep track of the status and life span of the socket so that it can be recycled properly.
- // m_availkeeper: keep track of available sockets that can be assigned for reusing. It's a map between <endpoint, protocal> and the set of available sockets. The
- // main purpose is to speed up finding an available socket
- class CPersistentHandler : implements IPersistentHandler, implements ISocketSelectNotify, public Thread
- {
- private:
- static const int MAX_INFLIGHT_TIME = 1800;
- int m_maxIdleTime = DEFAULT_MAX_PERSISTENT_IDLE_TIME;
- int m_maxReqs = DEFAULT_MAX_PERSISTENT_REQUESTS;
- Owned<ISocketSelectHandler> m_selectHandler;
- IPersistentSelectNotify* m_notify;
- Semaphore m_waitsem;
- bool m_stop = false;
- SockInfoMap m_infomap;
- CAvailKeeper m_availkeeper;
- CriticalSection m_critsect;
- PersistentLogLevel m_loglevel = PersistentLogLevel::PLogNormal;
- static int CurID;
- int m_id = 0;
- bool m_enableDoNotReuseList = false;
- StringIntMap m_instantCloseCounts;
- StringIntMap m_doNotReuseList;
- public:
- IMPLEMENT_IINTERFACE;
- CPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel, bool enableDoNotReuseList)
- : m_stop(false), m_notify(notify), m_maxIdleTime(maxIdleTime), m_maxReqs(maxReqs), m_loglevel(loglevel), m_enableDoNotReuseList(enableDoNotReuseList)
- {
- m_id = ++CurID;
- m_selectHandler.setown(createSocketSelectHandler());
- }
- virtual ~CPersistentHandler()
- {
- }
- virtual void add(ISocket* sock, SocketEndpoint* ep = nullptr, PersistentProtocol proto = PersistentProtocol::ProtoTCP) override
- {
- if (!sock || !sock->isValid())
- return;
- PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: adding socket %d to handler %d", sock->OShandle(), m_id);
- CriticalBlock block(m_critsect);
- if (m_enableDoNotReuseList && ep != nullptr)
- {
- StringBuffer epstr;
- ep->getUrlStr(epstr);
- if(m_doNotReuseList.getValue(epstr.str()) != nullptr)
- {
- PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: socket %d's target endpoint %s is in DoNotReuseList, will not add it.", sock->OShandle(), epstr.str());
- sock->shutdown();
- sock->close();
- return;
- }
- }
- m_selectHandler->add(sock, SELECTMODE_READ, this);
- Owned<CPersistentInfo> info = new CPersistentInfo(false, usTick()/1000, 0, ep, proto, sock);
- m_infomap.setValue(sock, info.getLink());
- m_availkeeper.add(info);
- }
- virtual void remove(ISocket* sock) override
- {
- if (!sock)
- return;
- PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Removing socket %d from handler %d", sock->OShandle(), m_id);
- CriticalBlock block(m_critsect);
- Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
- CPersistentInfo* info = nullptr;
- if (val)
- info = *val;
- bool removedFromSelectHandler = info && info->inUse; //If inUse sock was already removed from select handler
- if (info)
- {
- if (!info->inUse)
- m_availkeeper.remove(info);
- m_infomap.remove(sock);
- }
- if (!removedFromSelectHandler)
- m_selectHandler->remove(sock);
- }
- virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne) override
- {
- PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Done using socket %d, keep=%s", sock->OShandle(), boolToStr(keep));
- CriticalBlock block(m_critsect);
- Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
- CPersistentInfo* info = nullptr;
- if (val)
- info = *val;
- if (info)
- {
- info->useCount += usesOverOne;
- bool reachedQuota = m_maxReqs > 0 && m_maxReqs <= info->useCount;
- if(!sock->isValid())
- keep = false;
- if (keep && !reachedQuota)
- {
- info->inUse = false;
- info->timeUsed = usTick()/1000;
- m_selectHandler->add(sock, SELECTMODE_READ, this);
- m_availkeeper.add(info);
- }
- else
- {
- if (reachedQuota)
- PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Socket %d reached quota", sock->OShandle());
- if(!keep)
- PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Indicated not to keep socket %d", sock->OShandle());
- remove(sock);
- }
- }
- }
- virtual ISocket* getAvailable(SocketEndpoint* ep = nullptr, bool* pShouldClose = nullptr, PersistentProtocol proto = PersistentProtocol::ProtoTCP) override
- {
- CriticalBlock block(m_critsect);
- Owned<CPersistentInfo> info = m_availkeeper.get(ep, proto);
- if (info)
- {
- Linked<ISocket> sock = info->sock;
- info->inUse = true;
- info->timeUsed = usTick()/1000;
- info->useCount++;
- if (pShouldClose != nullptr)
- *pShouldClose = m_maxReqs > 0 && m_maxReqs <= info->useCount;
- m_selectHandler->remove(sock);
- PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Obtained persistent socket %d from handler %d", info->sock->OShandle(), m_id);
- return sock.getClear();
- }
- return nullptr;
- }
- //ISocketSelectNotify
- bool notifySelected(ISocket *sock,unsigned selected) override
- {
- size32_t x = sock->avail_read();
- if (x == 0)
- {
- PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Detected closing of connection %d from the other end", sock->OShandle());
- if (m_enableDoNotReuseList)
- {
- CriticalBlock block(m_critsect);
- Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
- CPersistentInfo* info = nullptr;
- if (val)
- info = *val;
- if (info && info->epstr.length() > 0)
- {
- int* countptr = m_instantCloseCounts.getValue(info->epstr.str());
- if (info->useCount == 0)
- {
- const static int MAX_INSTANT_CLOSES = 5;
- int count = 1;
- if (countptr)
- count = (*countptr)+1;
- if (count < MAX_INSTANT_CLOSES)
- m_instantCloseCounts.setValue(info->epstr.str(), count);
- else if (m_doNotReuseList.getValue(info->epstr.str()) == nullptr)
- {
- PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Endpoint %s has instantly closed connection for %d times in a row, adding it to DoNotReuseList", info->epstr.str(), MAX_INSTANT_CLOSES);
- m_doNotReuseList.setValue(info->epstr.str(), 1);
- }
- }
- else if (countptr)
- m_instantCloseCounts.remove(info->epstr.str());
- }
- }
- remove(sock);
- }
- else if (m_notify != nullptr)
- {
- bool reachedQuota = false;
- bool ignore = false;
- Owned<ISocket> mysock(LINK(sock));
- PERSILOG(PersistentLogLevel::PLogMax, "Data arrived on persistent connection %d", sock->OShandle());
- {
- CriticalBlock block(m_critsect);
- Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
- CPersistentInfo* info = nullptr;
- if (val)
- info = *val;
- if (info)
- {
- m_availkeeper.remove(info);
- info->inUse = true;
- info->timeUsed = usTick()/1000;
- info->useCount++;
- reachedQuota = m_maxReqs > 0 && m_maxReqs <= info->useCount;
- }
- else
- {
- ignore = true;
- PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: No info found for socket %d, ignore data", sock->OShandle());
- }
- m_selectHandler->remove(sock);
- }
- if (!ignore)
- m_notify->notifySelected(sock, selected, this, reachedQuota);
- }
- else
- {
- PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Unexpected data received on connection %d, so discard the connection.", sock->OShandle());
- remove(sock);
- }
- return false;
- }
- //Thread
- virtual void start() override
- {
- m_selectHandler->start();
- Thread::start();
- PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Handler %d started with max idle time %d and max requests %d", m_id, m_maxIdleTime, m_maxReqs);
- }
- virtual int run() override
- {
- while (true)
- {
- m_waitsem.wait(1000);
- if (m_stop)
- break;
- unsigned now = usTick()/1000;
- CriticalBlock block(m_critsect);
- std::vector<ISocket*> socks1;
- std::vector<ISocket*> socks2;
- for (auto& si:m_infomap)
- {
- CPersistentInfo* info = si.getValue();
- if (!info)
- continue;
- if(m_maxIdleTime > 0 && !info->inUse && info->timeUsed + m_maxIdleTime*1000 < now)
- socks1.push_back(*(ISocket**)(si.getKey()));
- if(info->inUse && info->timeUsed + MAX_INFLIGHT_TIME*1000 < now)
- socks2.push_back(*(ISocket**)(si.getKey()));
- }
- for (auto& s:socks1)
- {
- PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Socket %d has been idle for %d seconds so remove it", s->OShandle(), m_maxIdleTime);
- remove(s);
- }
- for (auto& s:socks2)
- {
- PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Socket %d has been in flight for %d seconds, remove it", s->OShandle(), MAX_INFLIGHT_TIME);
- remove(s);
- }
- }
- return 0;
- }
- virtual void stop(bool wait) override
- {
- m_selectHandler->stop(wait);
- m_stop = true;
- m_waitsem.signal();
- if(wait)
- join(1000);
- PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Handler %d stopped", m_id);
- }
- virtual bool inDoNotReuseList(SocketEndpoint* ep)
- {
- if(!ep)
- return false;
- StringBuffer epstr;
- ep->getUrlStr(epstr);
- if(epstr.length()> 0 && m_doNotReuseList.getValue(epstr.str()) != nullptr)
- return true;
- return false;
- }
- };
- bool isHttpPersistable(const char* httpVer, const char* conHeader)
- {
- if (isEmptyString(httpVer))
- return false;
- if (!isEmptyString(conHeader))
- {
- if (strieq(conHeader, "close"))
- return false;
- else if (strieq(conHeader, "Keep-Alive"))
- return true;
- }
- return !streq(httpVer, "1.0");
- }
- int CPersistentHandler::CurID = 0;
- IPersistentHandler* createPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel, bool enableDoNotReuseList)
- {
- Owned<CPersistentHandler> handler = new CPersistentHandler(notify, maxIdleTime, maxReqs, loglevel, enableDoNotReuseList);
- handler->start();
- return handler.getClear();
- }
|