Browse Source

HPCC-24834 Ability to free up ESP socket before returning from processRequest

- Add interface ISocketReturner with method returnSocket()
- Thread classes Implement the ISocketReturner interface
- Pass the ISocketReturner object reference to http server, request
  and response

Signed-off-by: Yanrui Ma <yanrui.ma@lexisnexisrisk.com>
Yanrui Ma 4 năm trước cách đây
mục cha
commit
8b5e71e7e1

+ 55 - 15
esp/bindings/http/platform/httpprot.cpp

@@ -475,10 +475,25 @@ bool CHttpThread::onRequest()
     time_t t = time(NULL);  
     initThreadLocal(sizeof(t), &t);
 
+    m_httpserver = httpserver;
+    httpserver->setSocketReturner(this);
     httpserver->setIsSSL(m_is_ssl);
     httpserver->setShouldClose(m_shouldClose);
     httpserver->processRequest();
 
+    returnSocket(false);
+
+    clearThreadLocal();
+
+    return false;
+}
+
+void CHttpThread::returnSocket(bool cascade)
+{
+    if (m_httpSocketReturned)
+        return;
+    m_httpSocketReturned = true;
+    CEspHttpServer* httpserver = dynamic_cast<CEspHttpServer*>(m_httpserver);
     if (m_persistentHandler == nullptr)
     {
         keepAlive = !m_shouldClose && m_apport->queryProtocol()->persistentEnabled() && httpserver->persistentEligible();
@@ -490,11 +505,15 @@ bool CHttpThread::onRequest()
         keepAlive = !m_shouldClose && httpserver->persistentEligible();
         m_persistentHandler->doneUsing(m_socket, keepAlive);
     }
-    clearThreadLocal();
 
-    return false;
+    if (cascade)
+        CEspProtocolThread::returnSocket();
 }
 
+void CHttpThread::returnSocket()
+{
+    returnSocket(true);
+}
 /**************************************************************************
  *  CPooledHttpThread Implementation                                      *
  **************************************************************************/
@@ -507,6 +526,9 @@ void CPooledHttpThread::init(void *param)
     m_ssctx = (ISecureSocketContext*)(((void**)param)[4]);
     m_persistentHandler = (IPersistentHandler*)(((void**)param)[5]);
     m_shouldClose = *(bool*)(((void**)param)[6]);
+    m_httpserver = nullptr;
+    m_processAborted = false;
+    m_socketReturned = false;
 }
 
 CPooledHttpThread::~CPooledHttpThread()
@@ -549,38 +571,57 @@ void CPooledHttpThread::threadmain()
     {
         httpserver.setown(new CEspHttpServer(*m_socket, m_apport, false, getMaxRequestEntityLength()));
     }
+    m_httpserver = httpserver;
     httpserver->setShouldClose(m_shouldClose);
+    httpserver->setSocketReturner(this);
     time_t t = time(NULL);  
     initThreadLocal(sizeof(t), &t);
-    bool keepAlive = false;
     try
     {
         ESP_TIME_SECTION("CPooledHttpThread::threadmain: httpserver->processRequest()");
         httpserver->processRequest();
-        if (m_persistentHandler == nullptr)
-        {
-            keepAlive = !m_shouldClose && m_apport->queryProtocol()->persistentEnabled() && httpserver->persistentEligible();
-            if (keepAlive)
-                m_apport->queryProtocol()->addPersistent(m_socket.get());
-        }
-        else
-        {
-            keepAlive = !m_shouldClose && httpserver->persistentEligible();
-            m_persistentHandler->doneUsing(m_socket, keepAlive);
-        }
     }
     catch (IException *e) 
     {
+        m_processAborted = true;
         StringBuffer estr;
         IERRLOG("Exception(%d, %s) in CPooledHttpThread::threadmain().", e->errorCode(), e->errorMessage(estr).str());
         e->Release();
     }
     catch(...)
     {
+        m_processAborted = true;
         IERRLOG("General Exception - in CPooledHttpThread::threadmain().");
     }
+
+    returnSocket();
+
     clearThreadLocal();
 
+}
+
+void CPooledHttpThread::returnSocket()
+{
+    if (m_socketReturned)
+        return;
+    m_socketReturned = true;
+    CEspHttpServer* httpserver = dynamic_cast<CEspHttpServer*>(m_httpserver);
+    bool keepAlive = false;
+    if (!m_processAborted)
+    {
+        if (m_persistentHandler == nullptr)
+        {
+            keepAlive = !m_shouldClose && m_apport->queryProtocol()->persistentEnabled() && httpserver->persistentEligible();
+            if (keepAlive)
+                m_apport->queryProtocol()->addPersistent(m_socket.get());
+        }
+        else
+        {
+            keepAlive = !m_shouldClose && httpserver->persistentEligible();
+            m_persistentHandler->doneUsing(m_socket, keepAlive);
+        }
+    }
+
     try
     {
         if (m_socket != nullptr)
@@ -600,5 +641,4 @@ void CPooledHttpThread::threadmain()
     {
         IERRLOG("General Exception - CPooledHttpThread::threadmain(), closing socket.");
     }
-
 }

+ 10 - 1
esp/bindings/http/platform/httpprot.hpp

@@ -38,7 +38,7 @@
 #include <map>
 using namespace std;
 
-class CPooledHttpThread : public CInterface, implements IPooledThread
+class CPooledHttpThread : public CInterface, implements IPooledThread, implements ISocketReturner
 {
 private:
     Owned<ISocket> m_socket;
@@ -49,6 +49,9 @@ private:
     ISecureSocketContext* m_ssctx;
     IPersistentHandler* m_persistentHandler = nullptr;
     bool m_shouldClose = false;
+    IHttpServerService* m_httpserver = nullptr;
+    bool m_socketReturned = false;
+    bool m_processAborted = false;
 public:
     IMPLEMENT_IINTERFACE;
 
@@ -63,6 +66,7 @@ public:
 
     virtual void setMaxRequestEntityLength(int len) {m_MaxRequestEntityLength = len;}
     virtual int getMaxRequestEntityLength() { return m_MaxRequestEntityLength; }
+    virtual void returnSocket();
 };
 
 class CHttpThreadPoolFactory : public CInterface, implements IThreadFactory
@@ -90,6 +94,10 @@ private:
     ISecureSocketContext* m_ssctx;
     IPersistentHandler* m_persistentHandler = nullptr;
     bool m_shouldClose = false;
+    IHttpServerService* m_httpserver = nullptr;
+    bool m_httpSocketReturned = false;
+    void returnSocket(bool cascade);
+
 public:
     CHttpThread(ISocket *sock, CEspApplicationPort* apport, bool viewConfig, bool isSSL = false, ISecureSocketContext* ssctx = NULL, IPersistentHandler* persistentHandler = NULL);
     
@@ -100,6 +108,7 @@ public:
     virtual int getMaxRequestEntityLength() { return m_MaxRequestEntityLength; }
 
     void setShouldClose(bool should) {m_shouldClose = should;}
+    virtual void returnSocket();
 };
 
 class esp_http_decl CHttpProtocol : public CEspProtocol

+ 7 - 0
esp/bindings/http/platform/httpservice.hpp

@@ -70,6 +70,7 @@ class CEspHttpServer : implements IHttpServerService, public CInterface
     bool isSSL = false;
     bool shouldClose = false;
     CriticalSection critDaliSession;
+    ISocketReturner* m_socketReturner = nullptr;
 protected:
     ISocket&                m_socket;
     Owned<CHttpRequest>     m_request;
@@ -160,6 +161,12 @@ public:
     bool persistentEligible();
     void setIsSSL(bool _isSSL) { isSSL = _isSSL; };
     void setShouldClose(bool should) { shouldClose = should; }
+    void setSocketReturner(ISocketReturner* returner)
+    {
+        m_socketReturner = returner;
+        m_request->setSocketReturner(returner);
+        m_response->setSocketReturner(returner);
+    }
 };
 
 

+ 4 - 0
esp/bindings/http/platform/httptransport.ipp

@@ -19,6 +19,7 @@
 #define _HTTPTRANSPORT_IPP__
 
 #include "esphttp.hpp"
+#include "espthread.hpp"
 
 //Jlib
 #include "jsocket.hpp"
@@ -87,6 +88,7 @@ protected:
     IArrayOf<CEspCookie> m_cookies;
 
     Owned<CMimeMultiPart> m_multipart;
+    ISocketReturner* m_socketReturner = nullptr;
 
     int parseOneHeader(char* oneline);
     virtual void parseCookieHeader(char* cookiestr);
@@ -275,6 +277,8 @@ public:
     virtual bool compressContent(StringBuffer* originalContent, int compressType) { return false; }
     virtual bool shouldDecompress(int& compressType) { return false; }
     virtual bool decompressContent(StringBuffer* originalContent, int compressType) { return false; }
+    virtual void setSocketReturner(ISocketReturner* returner) { m_socketReturner = returner; }
+    virtual ISocketReturner* querySocketReturner() { return m_socketReturner; }
 };
 
 

+ 9 - 3
esp/platform/espthread.cpp

@@ -190,7 +190,16 @@ int CEspProtocolThread::run()
     {
         IERRLOG("Unknown Exception in CEspProtocolThread::run while processing request.");
     }
+    returnSocket();
+    Release();
+    return 0;
+}
 
+void CEspProtocolThread::returnSocket()
+{
+    if (m_socketReturned)
+        return;
+    m_socketReturned = true;
     if(!keepAlive)
     {
         try
@@ -209,7 +218,4 @@ int CEspProtocolThread::run()
             DBGLOG("General Exception - in CEspProtocolThread::run while closing socket.");
         }
     }
-    Release();
-    return 0;
 }
-

+ 7 - 1
esp/platform/espthread.hpp

@@ -20,8 +20,12 @@
 
 #define TERMINATE_EXITCODE -99 // the working process use it to tell the monitor to exit too.
 
+interface ISocketReturner
+{
+    virtual void returnSocket() = 0;
+};
 
-class CEspProtocolThread: public Thread
+class CEspProtocolThread: public Thread, implements ISocketReturner
 {
 protected:
    bool terminating;
@@ -36,6 +40,7 @@ protected:
 
     int run();
     bool keepAlive = false;
+    bool m_socketReturned = false;
 public:
     IMPLEMENT_IINTERFACE;
     
@@ -49,6 +54,7 @@ public:
    
    virtual const char *getServiceName();
    virtual bool onRequest();
+   virtual void returnSocket();
 };