|
@@ -20,6 +20,7 @@
|
|
|
#include "jdebug.hpp"
|
|
|
#include "jlog.hpp"
|
|
|
#include <memory>
|
|
|
+#include <unordered_set>
|
|
|
#ifdef _WIN32
|
|
|
#include <windows.h>
|
|
|
#include <winsock2.h>
|
|
@@ -27,30 +28,151 @@
|
|
|
#endif
|
|
|
|
|
|
#define PERSILOG(loglevel, ...) if(static_cast<int>(loglevel) <= static_cast<int>(m_loglevel)) DBGLOG(__VA_ARGS__)
|
|
|
+#define MAX_NO_CLEANUP_SOCKETSETS 1000
|
|
|
+
|
|
|
+static inline StringBuffer& addKeySuffix(PersistentProtocol proto, StringBuffer& keystr)
|
|
|
+{
|
|
|
+ switch (proto)
|
|
|
+ {
|
|
|
+ case PersistentProtocol::ProtoTCP:
|
|
|
+ break;
|
|
|
+ case PersistentProtocol::ProtoTLS:
|
|
|
+ keystr.append('~');
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ throw makeStringException(-1, "New suffix should be defined");
|
|
|
+ }
|
|
|
+ return keystr;
|
|
|
+}
|
|
|
|
|
|
class CPersistentInfo : implements IInterface, public CInterface
|
|
|
{
|
|
|
friend class CPersistentHandler;
|
|
|
+ friend class CAvailKeeper;
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
|
- CPersistentInfo(bool _inUse, unsigned _timeUsed, unsigned _useCount, SocketEndpoint* _ep)
|
|
|
- : inUse(_inUse), timeUsed(_timeUsed), useCount(_useCount), ep(_ep?(new SocketEndpoint(*_ep)):nullptr)
|
|
|
+ CPersistentInfo(bool _inUse, unsigned _timeUsed, unsigned _useCount, SocketEndpoint* _ep, PersistentProtocol _proto, ISocket* _sock)
|
|
|
+ : inUse(_inUse), timeUsed(_timeUsed), useCount(_useCount), ep(_ep?(new SocketEndpoint(*_ep)):nullptr), proto(_proto), sock(_sock)
|
|
|
{
|
|
|
if(_ep)
|
|
|
+ {
|
|
|
_ep->getUrlStr(epstr);
|
|
|
+ keystr.set(epstr);
|
|
|
+ addKeySuffix(proto, keystr);
|
|
|
+ }
|
|
|
}
|
|
|
- virtual ~CPersistentInfo() { } //TODO remove trace
|
|
|
protected:
|
|
|
bool inUse;
|
|
|
unsigned timeUsed;
|
|
|
unsigned useCount;
|
|
|
std::unique_ptr<SocketEndpoint> ep;
|
|
|
StringBuffer epstr;
|
|
|
+ StringBuffer keystr;
|
|
|
+ PersistentProtocol proto;
|
|
|
+ Linked<ISocket> sock;
|
|
|
+};
|
|
|
+
|
|
|
+struct LinkedPersistentInfoHash
|
|
|
+{
|
|
|
+ size_t operator()(const Linked<CPersistentInfo>& linkedinfo) const
|
|
|
+ {
|
|
|
+ return std::hash<CPersistentInfo*>()(linkedinfo);
|
|
|
+ }
|
|
|
+};
|
|
|
+
|
|
|
+using SocketSet = std::unordered_set<Linked<CPersistentInfo>, LinkedPersistentInfoHash>;
|
|
|
+using EpSocketSetMap = MapStringTo<OwnedPtr<SocketSet>, SocketSet*>;
|
|
|
+using EpSocketSetMapping = MappingStringTo<OwnedPtr<SocketSet>, SocketSet*>;
|
|
|
+
|
|
|
+class CAvailKeeper
|
|
|
+{
|
|
|
+private:
|
|
|
+ SocketSet m_avail;
|
|
|
+ EpSocketSetMap m_avail4ep;
|
|
|
+public:
|
|
|
+ void add(CPersistentInfo* sockinfo)
|
|
|
+ {
|
|
|
+ findSet(sockinfo, true)->insert(sockinfo);
|
|
|
+ if (m_avail4ep.count() > MAX_NO_CLEANUP_SOCKETSETS)
|
|
|
+ cleanup();
|
|
|
+ }
|
|
|
+ void remove(CPersistentInfo* sockinfo)
|
|
|
+ {
|
|
|
+ SocketSet* sset = findSet(sockinfo);
|
|
|
+ if (sset)
|
|
|
+ sset->erase(sockinfo);
|
|
|
+ }
|
|
|
+ CPersistentInfo* get(SocketEndpoint* ep, PersistentProtocol proto)
|
|
|
+ {
|
|
|
+ SocketSet* sset = findSet(ep, proto);
|
|
|
+ if (sset)
|
|
|
+ {
|
|
|
+ //The first available socket will suffice
|
|
|
+ auto iter = sset->begin();
|
|
|
+ if (iter != sset->end())
|
|
|
+ {
|
|
|
+ Linked<CPersistentInfo> info = *iter;
|
|
|
+ sset->erase(iter);
|
|
|
+ return info.getClear();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nullptr;
|
|
|
+ }
|
|
|
+private:
|
|
|
+ inline StringBuffer& calcKey(SocketEndpoint& ep, PersistentProtocol proto, StringBuffer& keystr)
|
|
|
+ {
|
|
|
+ ep.getUrlStr(keystr);
|
|
|
+ return addKeySuffix(proto, keystr);
|
|
|
+ }
|
|
|
+ SocketSet* findSet(CPersistentInfo* info, bool create = false)
|
|
|
+ {
|
|
|
+ if (!info->ep.get())
|
|
|
+ return &m_avail;
|
|
|
+ return findSet(info->keystr.str(), create);
|
|
|
+ }
|
|
|
+ SocketSet* findSet(SocketEndpoint* ep, PersistentProtocol proto, bool create = false)
|
|
|
+ {
|
|
|
+ if (!ep)
|
|
|
+ return &m_avail;
|
|
|
+ StringBuffer keystr;
|
|
|
+ calcKey(*ep, proto, keystr);
|
|
|
+ return findSet(keystr.str(), create);
|
|
|
+ }
|
|
|
+ SocketSet* findSet(const char* key, bool create = false)
|
|
|
+ {
|
|
|
+ auto ptrptr = m_avail4ep.getValue(key);
|
|
|
+ if (ptrptr)
|
|
|
+ return *ptrptr;
|
|
|
+ else if (create)
|
|
|
+ {
|
|
|
+ SocketSet* sset = new SocketSet();
|
|
|
+ m_avail4ep.setValue(key, sset);
|
|
|
+ return sset;
|
|
|
+ }
|
|
|
+ return nullptr;
|
|
|
+ }
|
|
|
+ void cleanup()
|
|
|
+ {
|
|
|
+ std::vector<EpSocketSetMapping*> elems;
|
|
|
+ for (auto& e : m_avail4ep)
|
|
|
+ {
|
|
|
+ if (e.getValue()->empty())
|
|
|
+ elems.push_back(&e);
|
|
|
+ }
|
|
|
+ for (auto& e : elems)
|
|
|
+ m_avail4ep.removeExact(e);
|
|
|
+ }
|
|
|
};
|
|
|
|
|
|
using SockInfoMap = MapBetween<Linked<ISocket>, ISocket*, Owned<CPersistentInfo>, CPersistentInfo*>;
|
|
|
using StringIntMap = MapStringTo<int, int>;
|
|
|
|
|
|
+// Important data structures for the implementation:
|
|
|
+// m_selecHandler: used to detect incoming data on a socket or socket closure from the other end
|
|
|
+// m_infomap: keep track of the status of a socket, there's one entry for each reusable socket, no matter if the socket is being used or idle. The main purpose is to
|
|
|
+// keep track of the status and life span of the socket so that it can be recycled properly.
|
|
|
+// m_availkeeper: keep track of available sockets that can be assigned for reusing. It's a map between <endpoint, protocal> and the set of available sockets. The
|
|
|
+// main purpose is to speed up finding an available socket
|
|
|
class CPersistentHandler : implements IPersistentHandler, implements ISocketSelectNotify, public Thread
|
|
|
{
|
|
|
private:
|
|
@@ -62,7 +184,8 @@ private:
|
|
|
Semaphore m_waitsem;
|
|
|
bool m_stop = false;
|
|
|
SockInfoMap m_infomap;
|
|
|
- Mutex m_mutex;
|
|
|
+ CAvailKeeper m_availkeeper;
|
|
|
+ CriticalSection m_critsect;
|
|
|
PersistentLogLevel m_loglevel = PersistentLogLevel::PLogNormal;
|
|
|
static int CurID;
|
|
|
int m_id = 0;
|
|
@@ -82,12 +205,12 @@ public:
|
|
|
{
|
|
|
}
|
|
|
|
|
|
- virtual void add(ISocket* sock, SocketEndpoint* ep = nullptr) override
|
|
|
+ virtual void add(ISocket* sock, SocketEndpoint* ep = nullptr, PersistentProtocol proto = PersistentProtocol::ProtoTCP) override
|
|
|
{
|
|
|
if (!sock || sock->OShandle() == INVALID_SOCKET)
|
|
|
return;
|
|
|
- synchronized block(m_mutex);
|
|
|
- PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: adding socket %d to handler %d", sock->OShandle(), m_id);
|
|
|
+ PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: adding socket %d to handler %d", sock->OShandle(), m_id);
|
|
|
+ CriticalBlock block(m_critsect);
|
|
|
if (m_enableDoNotReuseList && ep != nullptr)
|
|
|
{
|
|
|
StringBuffer epstr;
|
|
@@ -101,29 +224,36 @@ public:
|
|
|
}
|
|
|
}
|
|
|
m_selectHandler->add(sock, SELECTMODE_READ, this);
|
|
|
- m_infomap.setValue(sock, new CPersistentInfo(false, usTick()/1000, 0, ep));
|
|
|
+ Owned<CPersistentInfo> info = new CPersistentInfo(false, usTick()/1000, 0, ep, proto, sock);
|
|
|
+ m_infomap.setValue(sock, info.getLink());
|
|
|
+ m_availkeeper.add(info);
|
|
|
}
|
|
|
|
|
|
virtual void remove(ISocket* sock) override
|
|
|
{
|
|
|
if (!sock)
|
|
|
return;
|
|
|
- PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Removing socket %d from handler %d", sock->OShandle(), m_id);
|
|
|
- synchronized block(m_mutex);
|
|
|
+ PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Removing socket %d from handler %d", sock->OShandle(), m_id);
|
|
|
+ CriticalBlock block(m_critsect);
|
|
|
Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
|
|
|
CPersistentInfo* info = nullptr;
|
|
|
if (val)
|
|
|
info = *val;
|
|
|
bool removedFromSelectHandler = info && info->inUse; //If inUse sock was already removed from select handler
|
|
|
if (info)
|
|
|
+ {
|
|
|
+ if (!info->inUse)
|
|
|
+ m_availkeeper.remove(info);
|
|
|
m_infomap.remove(sock);
|
|
|
+ }
|
|
|
if (!removedFromSelectHandler)
|
|
|
m_selectHandler->remove(sock);
|
|
|
}
|
|
|
|
|
|
virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne) override
|
|
|
{
|
|
|
- synchronized block(m_mutex);
|
|
|
+ PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Done using socket %d, keep=%s", sock->OShandle(), boolToStr(keep));
|
|
|
+ CriticalBlock block(m_critsect);
|
|
|
Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
|
|
|
CPersistentInfo* info = nullptr;
|
|
|
if (val)
|
|
@@ -139,11 +269,12 @@ public:
|
|
|
info->inUse = false;
|
|
|
info->timeUsed = usTick()/1000;
|
|
|
m_selectHandler->add(sock, SELECTMODE_READ, this);
|
|
|
+ m_availkeeper.add(info);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
if (reachedQuota)
|
|
|
- PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Socket %d reached quota", sock->OShandle());
|
|
|
+ PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Socket %d reached quota", sock->OShandle());
|
|
|
if(!keep)
|
|
|
PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Indicated not to keep socket %d", sock->OShandle());
|
|
|
remove(sock);
|
|
@@ -151,27 +282,21 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- virtual Linked<ISocket> getAvailable(SocketEndpoint* ep = nullptr, bool* pShouldClose = nullptr) override
|
|
|
+ virtual ISocket* getAvailable(SocketEndpoint* ep = nullptr, bool* pShouldClose = nullptr, PersistentProtocol proto = PersistentProtocol::ProtoTCP) override
|
|
|
{
|
|
|
- synchronized block(m_mutex);
|
|
|
- for (auto& si:m_infomap)
|
|
|
+ CriticalBlock block(m_critsect);
|
|
|
+ Owned<CPersistentInfo> info = m_availkeeper.get(ep, proto);
|
|
|
+ if (info)
|
|
|
{
|
|
|
- CPersistentInfo* info = si.getValue();
|
|
|
- if (info && !info->inUse && (ep == nullptr || (info->ep != nullptr && *(info->ep) == *ep)))
|
|
|
- {
|
|
|
- ISocket* sock = *(ISocket**)(si.getKey());
|
|
|
- if (sock)
|
|
|
- {
|
|
|
- info->inUse = true;
|
|
|
- info->timeUsed = usTick()/1000;
|
|
|
- info->useCount++;
|
|
|
- if (pShouldClose != nullptr)
|
|
|
- *pShouldClose = m_maxReqs > 0 && m_maxReqs <= info->useCount;
|
|
|
- m_selectHandler->remove(sock);
|
|
|
- PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Obtained persistent socket %d from handler %d", sock->OShandle(), m_id);
|
|
|
- return sock;
|
|
|
- }
|
|
|
- }
|
|
|
+ Linked<ISocket> sock = info->sock;
|
|
|
+ info->inUse = true;
|
|
|
+ info->timeUsed = usTick()/1000;
|
|
|
+ info->useCount++;
|
|
|
+ if (pShouldClose != nullptr)
|
|
|
+ *pShouldClose = m_maxReqs > 0 && m_maxReqs <= info->useCount;
|
|
|
+ m_selectHandler->remove(sock);
|
|
|
+ PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Obtained persistent socket %d from handler %d", info->sock->OShandle(), m_id);
|
|
|
+ return sock.getClear();
|
|
|
}
|
|
|
return nullptr;
|
|
|
}
|
|
@@ -182,10 +307,10 @@ public:
|
|
|
size32_t x = sock->avail_read();
|
|
|
if (x == 0)
|
|
|
{
|
|
|
- PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Detected closing of connection %d from the other end", sock->OShandle());
|
|
|
+ PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Detected closing of connection %d from the other end", sock->OShandle());
|
|
|
if (m_enableDoNotReuseList)
|
|
|
{
|
|
|
- synchronized block(m_mutex);
|
|
|
+ CriticalBlock block(m_critsect);
|
|
|
Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
|
|
|
CPersistentInfo* info = nullptr;
|
|
|
if (val)
|
|
@@ -220,13 +345,14 @@ public:
|
|
|
Owned<ISocket> mysock(LINK(sock));
|
|
|
PERSILOG(PersistentLogLevel::PLogMax, "Data arrived on persistent connection %d", sock->OShandle());
|
|
|
{
|
|
|
- synchronized block(m_mutex);
|
|
|
+ CriticalBlock block(m_critsect);
|
|
|
Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
|
|
|
CPersistentInfo* info = nullptr;
|
|
|
if (val)
|
|
|
info = *val;
|
|
|
if (info)
|
|
|
{
|
|
|
+ m_availkeeper.remove(info);
|
|
|
info->inUse = true;
|
|
|
info->timeUsed = usTick()/1000;
|
|
|
info->useCount++;
|
|
@@ -261,7 +387,7 @@ public:
|
|
|
if (m_stop)
|
|
|
break;
|
|
|
unsigned now = usTick()/1000;
|
|
|
- synchronized block(m_mutex);
|
|
|
+ CriticalBlock block(m_critsect);
|
|
|
std::vector<ISocket*> socks1;
|
|
|
std::vector<ISocket*> socks2;
|
|
|
for (auto& si:m_infomap)
|
|
@@ -276,7 +402,7 @@ public:
|
|
|
}
|
|
|
for (auto& s:socks1)
|
|
|
{
|
|
|
- PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Socket %d has been idle for %d seconds so remove it", s->OShandle(), m_maxIdleTime);
|
|
|
+ PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Socket %d has been idle for %d seconds so remove it", s->OShandle(), m_maxIdleTime);
|
|
|
remove(s);
|
|
|
}
|
|
|
for (auto& s:socks2)
|
|
@@ -311,6 +437,20 @@ public:
|
|
|
|
|
|
};
|
|
|
|
|
|
+bool isHttpPersistable(const char* httpVer, const char* conHeader)
|
|
|
+{
|
|
|
+ if (isEmptyString(httpVer))
|
|
|
+ return false;
|
|
|
+ if (!isEmptyString(conHeader))
|
|
|
+ {
|
|
|
+ if (strieq(conHeader, "close"))
|
|
|
+ return false;
|
|
|
+ else if (strieq(conHeader, "Keep-Alive"))
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return !streq(httpVer, "1.0");
|
|
|
+}
|
|
|
+
|
|
|
int CPersistentHandler::CurID = 0;
|
|
|
|
|
|
IPersistentHandler* createPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel, bool enableDoNotReuseList)
|