浏览代码

Merge pull request #11877 from mayx/HPCC-20552-DoNotPersistList-702

HPCC-20552 Detect endpoints that don't support persistent connection

Reviewed-By: Anthony Fishbeck <anthony.fishbeck@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 6 年之前
父节点
当前提交
718c99f376
共有 3 个文件被更改,包括 74 次插入9 次删除
  1. 5 1
      esp/bindings/http/client/httpclient.cpp
  2. 67 7
      esp/platform/persistent.cpp
  3. 2 1
      esp/platform/persistent.hpp

+ 5 - 1
esp/bindings/http/client/httpclient.cpp

@@ -50,7 +50,7 @@ CHttpClientContext::CHttpClientContext(IPropertyTree* config) : m_config(config)
 
 void CHttpClientContext::initPersistentHandler()
 {
-    m_persistentHandler.setown(createPersistentHandler(nullptr, DEFAULT_MAX_PERSISTENT_IDLE_TIME, DEFAULT_MAX_PERSISTENT_REQUESTS, static_cast<PersistentLogLevel>(getEspLogLevel())));
+    m_persistentHandler.setown(createPersistentHandler(nullptr, DEFAULT_MAX_PERSISTENT_IDLE_TIME, DEFAULT_MAX_PERSISTENT_REQUESTS, static_cast<PersistentLogLevel>(getEspLogLevel()), true));
 }
 
 CHttpClientContext::~CHttpClientContext()
@@ -249,6 +249,10 @@ int CHttpClient::connect(StringBuffer& errmsg, bool forceNewConnection)
             ep.port=80;
     }
     m_ep = ep;
+
+    if(m_persistentHandler->inDoNotReuseList(&ep))
+        m_disableKeepAlive = true;
+
     Linked<ISocket> pSock = (m_disableKeepAlive || forceNewConnection)?nullptr:m_persistentHandler->getAvailable(&ep);
     if(pSock)
     {

+ 67 - 7
esp/platform/persistent.cpp

@@ -31,6 +31,8 @@ public:
     CPersistentInfo(bool _inUse, unsigned _timeUsed, unsigned _useCount, SocketEndpoint* _ep)
         : inUse(_inUse), timeUsed(_timeUsed), useCount(_useCount), ep(_ep?(new SocketEndpoint(*_ep)):nullptr)
     {
+        if(_ep)
+            _ep->getUrlStr(epstr);
     }
     virtual ~CPersistentInfo() { } //TODO remove trace
 protected:
@@ -38,9 +40,11 @@ protected:
     unsigned timeUsed;
     unsigned useCount;
     std::unique_ptr<SocketEndpoint> ep;
+    StringBuffer epstr;
 };
 
 using SockInfoMap = MapBetween<Linked<ISocket>, ISocket*, Owned<CPersistentInfo>, CPersistentInfo*>;
+using StringIntMap = MapStringTo<int, int>;
 
 class CPersistentHandler : implements IPersistentHandler, implements ISocketSelectNotify, public Thread
 {
@@ -57,10 +61,13 @@ private:
     PersistentLogLevel m_loglevel = PersistentLogLevel::PLogNormal;
     static int CurID;
     int m_id = 0;
+    bool m_enableDoNotReuseList = false;
+    StringIntMap m_instantCloseCounts;
+    StringIntMap m_doNotReuseList;
 public:
     IMPLEMENT_IINTERFACE;
-    CPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel)
-                        : m_stop(false), m_notify(notify), m_maxIdleTime(maxIdleTime), m_maxReqs(maxReqs), m_loglevel(loglevel)
+    CPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel, bool enableDoNotReuseList)
+                        : m_stop(false), m_notify(notify), m_maxIdleTime(maxIdleTime), m_maxReqs(maxReqs), m_loglevel(loglevel), m_enableDoNotReuseList(enableDoNotReuseList)
     {
         m_id = ++CurID;
         m_selectHandler.setown(createSocketSelectHandler());
@@ -74,8 +81,20 @@ public:
     {
         if (!sock)
             return;
-        PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: adding socket %d to handler %d", sock->OShandle(), m_id);
         synchronized block(m_mutex);
+        PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: adding socket %d to handler %d", sock->OShandle(), m_id);
+        if (m_enableDoNotReuseList && ep != nullptr)
+        {
+            StringBuffer epstr;
+            ep->getUrlStr(epstr);
+            if(m_doNotReuseList.getValue(epstr.str()) != nullptr)
+            {
+                PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: socket %d's target endpoint %s is in DoNotReuseList, will not add it.", sock->OShandle(), epstr.str());
+                sock->shutdown();
+                sock->close();
+                return;
+            }
+        }
         m_selectHandler->add(sock, SELECTMODE_READ, this);
         m_infomap.setValue(sock, new CPersistentInfo(false, usTick()/1000, 0, ep));
     }
@@ -90,10 +109,11 @@ public:
         CPersistentInfo* info = nullptr;
         if (val)
             info = *val;
-        if (!info || !info->inUse) //If inUse sock was already removed from select handler
-            m_selectHandler->remove(sock);
+        bool removedFromSelectHandler = info && info->inUse; //If inUse sock was already removed from select handler
         if (info)
             m_infomap.remove(sock);
+        if (!removedFromSelectHandler)
+            m_selectHandler->remove(sock);
     }
 
     virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne) override
@@ -154,6 +174,34 @@ public:
         if (x == 0)
         {
             PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Detected closing of connection %d from the other end", sock->OShandle());
+            if (m_enableDoNotReuseList)
+            {
+                synchronized block(m_mutex);
+                Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
+                CPersistentInfo* info = nullptr;
+                if (val)
+                    info = *val;
+                if (info && info->epstr.length() > 0)
+                {
+                    int* countptr = m_instantCloseCounts.getValue(info->epstr.str());
+                    if (info->useCount == 0)
+                    {
+                        const static int MAX_INSTANT_CLOSES = 5;
+                        int count = 1;
+                        if (countptr)
+                            count = (*countptr)+1;
+                        if (count < MAX_INSTANT_CLOSES)
+                            m_instantCloseCounts.setValue(info->epstr.str(), count);
+                        else if (m_doNotReuseList.getValue(info->epstr.str()) == nullptr)
+                        {
+                            PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Endpoint %s has instantly closed connection for %d times in a row, adding it to DoNotReuseList", info->epstr.str(), MAX_INSTANT_CLOSES);
+                            m_doNotReuseList.setValue(info->epstr.str(), 1);
+                        }
+                    }
+                    else if (countptr)
+                        m_instantCloseCounts.remove(info->epstr.str());
+                }
+            }
             remove(sock);
         }
         else if (m_notify != nullptr)
@@ -238,13 +286,25 @@ public:
             join(1000);
         PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Handler %d stopped", m_id);
     }
+
+    virtual bool inDoNotReuseList(SocketEndpoint* ep)
+    {
+        if(!ep)
+            return false;
+        StringBuffer epstr;
+        ep->getUrlStr(epstr);
+        if(epstr.length()> 0 && m_doNotReuseList.getValue(epstr.str()) != nullptr)
+            return true;
+        return false;
+    }
+
 };
 
 int CPersistentHandler::CurID = 0;
 
-IPersistentHandler* createPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel)
+IPersistentHandler* createPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel, bool enableDoNotReuseList)
 {
-    Owned<CPersistentHandler> handler = new CPersistentHandler(notify, maxIdleTime, maxReqs, loglevel);
+    Owned<CPersistentHandler> handler = new CPersistentHandler(notify, maxIdleTime, maxReqs, loglevel, enableDoNotReuseList);
     handler->start();
     return handler.getClear();
 }

+ 2 - 1
esp/platform/persistent.hpp

@@ -33,6 +33,7 @@ interface IPersistentHandler : implements IInterface
     virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne = 0) = 0;
     virtual Linked<ISocket> getAvailable(SocketEndpoint* ep = nullptr) = 0;
     virtual void stop(bool wait) = 0;
+    virtual bool inDoNotReuseList(SocketEndpoint* ep) = 0;
 };
 
 interface IPersistentSelectNotify
@@ -40,6 +41,6 @@ interface IPersistentSelectNotify
     virtual bool notifySelected(ISocket *sock,unsigned selected, IPersistentHandler* handler) = 0;
 };
 
-IPersistentHandler* createPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime = DEFAULT_MAX_PERSISTENT_IDLE_TIME, int maxReqs = DEFAULT_MAX_PERSISTENT_REQUESTS, PersistentLogLevel loglevel=PersistentLogLevel::PLogMin);
+IPersistentHandler* createPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime = DEFAULT_MAX_PERSISTENT_IDLE_TIME, int maxReqs = DEFAULT_MAX_PERSISTENT_REQUESTS, PersistentLogLevel loglevel=PersistentLogLevel::PLogMin, bool enableDoNotReuseList=false);
 
 #endif //__PERSISTENT_HPP__