Просмотр исходного кода

HPCC-17581 Adding ESP Server side persistent connection support

- Add IPersistentHandler interface and class to keep and maintain
  persistent connections
- Add persistent connection to server side ESP. Add maxPersistentIdleTime
  and maxPersistentRequest configuration parameters.
- Add persistent connection to httpclient that is used by ws_ecl and
  esdl to connect to Roxie and other services.
- Add -persist option to soapplus client and server commands

Signed-off-by: mayx <yanrui.ma@lexisnexisrisk.com>
mayx 7 лет назад
Родитель
Сommit
3afed4c8d5

+ 77 - 41
esp/bindings/http/client/httpclient.cpp

@@ -40,10 +40,12 @@
 
 CHttpClientContext::CHttpClientContext()
 {
+    m_persistentHandler.setown(createPersistentHandler(nullptr));
 }
 
 CHttpClientContext::CHttpClientContext(IPropertyTree* config) : m_config(config)
 {
+    m_persistentHandler.setown(createPersistentHandler(nullptr));
 }
 
 CHttpClientContext::~CHttpClientContext()
@@ -90,6 +92,8 @@ IHttpClient* CHttpClientContext::createHttpClient(const char* proxy, const char*
         client->setSsCtx(m_ssctx.get());
     }
 
+    client->setPersistentHandler(m_persistentHandler);
+
 #ifdef COOKIE_HANDLING
     client->m_context = this;
     if(url && *url)
@@ -121,7 +125,7 @@ IHttpClient* CHttpClientContext::createHttpClient(const char* proxy, const char*
 
 
 
-CHttpClient::CHttpClient(const char *proxy, const char* url) : m_proxy(proxy), m_url(url), m_disableKeepAlive(false)
+CHttpClient::CHttpClient(const char *proxy, const char* url) : m_proxy(proxy), m_url(url), m_disableKeepAlive(false), m_isPersistentSocket(false), m_numRequests(0)
 {
     StringBuffer protocol,username,password, host, port, path;
     Utils::SplitURL(url, protocol,username,password, host, port, path);
@@ -146,14 +150,25 @@ CHttpClient::~CHttpClient()
 {
     if(m_socket)
     {
-        try
+        Owned<ISocket> forRelease(m_socket);
+        if(m_isPersistentSocket)
         {
-            m_socket->shutdown();
-            m_socket->close();
-            m_socket->Release();
+            m_persistentHandler->doneUsing(m_socket, !m_disableKeepAlive, m_numRequests>1?(m_numRequests-1):0);
         }
-        catch(...)
+        else if(!m_disableKeepAlive)
+        {
+            m_persistentHandler->add(m_socket, &m_ep);
+        }
+        else
         {
+            try
+            {
+                m_socket->shutdown();
+                m_socket->close();
+            }
+            catch(...)
+            {
+            }
         }
     }
 }
@@ -196,7 +211,7 @@ void CHttpClient::setTimeOut(unsigned int timeout)
 int CHttpClient::connect(StringBuffer& errmsg)
 {
     SocketEndpoint ep;
-    
+
     if(m_proxy.length() == 0)
     {
         if(m_host.length() <= 0)
@@ -223,45 +238,55 @@ int CHttpClient::connect(StringBuffer& errmsg)
         if (ep.port==0)
             ep.port=80;
     }
-
-    try
+    m_ep = ep;
+    Linked<ISocket> pSock = m_disableKeepAlive?nullptr:m_persistentHandler->getAvailable(&ep);
+    if(pSock)
     {
-        m_socket = ISocket::connect_timeout(ep, m_connectTimeoutMs);
-
-        if(strcmp(m_protocol.get(), "HTTPS") == 0)
+        m_isPersistentSocket = true;
+        DBGLOG("Reuse persistent connection %d", pSock->OShandle());
+        m_socket = pSock.getLink();
+    }
+    else
+    {
+        m_isPersistentSocket = false;
+        try
         {
-            ISecureSocket* securesocket = m_ssctx->createSecureSocket(m_socket);
-            int res = securesocket->secure_connect();
-            if(res < 0)
-            {
-                m_socket->shutdown();
-                m_socket->close();
-                m_socket->Release();
-                m_socket = NULL;
-            }
-            else
+            m_socket = ISocket::connect_timeout(ep, m_connectTimeoutMs);
+
+            if(strcmp(m_protocol.get(), "HTTPS") == 0)
             {
-                m_socket = securesocket;
+                ISecureSocket* securesocket = m_ssctx->createSecureSocket(m_socket);
+                int res = securesocket->secure_connect();
+                if(res < 0)
+                {
+                    m_socket->shutdown();
+                    m_socket->close();
+                    m_socket->Release();
+                    m_socket = NULL;
+                }
+                else
+                {
+                    m_socket = securesocket;
+                }
             }
         }
+        catch(IException *e)
+        {
+            StringBuffer url;
+            ERRLOG("Error connecting to %s", ep.getUrlStr(url).str());
+            DBGLOG(e);
+            e->Release();
+            m_socket = NULL;
+            return -1;
+        }
+        catch(...)
+        {
+            StringBuffer url;
+            ERRLOG("Unknown exception connecting to %s", ep.getUrlStr(url).str());
+            m_socket = NULL;
+            return -1;
+        }
     }
-    catch(IException *e)
-    {
-        StringBuffer url;
-        ERRLOG("Error connecting to %s", ep.getUrlStr(url).str());
-        DBGLOG(e);
-        e->Release();
-        m_socket = NULL;
-        return -1;
-    }
-    catch(...)
-    {
-        StringBuffer url;
-        ERRLOG("Unknown exception connecting to %s", ep.getUrlStr(url).str());
-        m_socket = NULL;
-        return -1;
-    }
-
     if(m_socket == NULL)
     {
         StringBuffer urlstr;
@@ -366,6 +391,9 @@ int CHttpClient::sendRequest(const char* method, const char* contenttype, String
 
     httpresponse->getContent(response);
 
+    if(!httpresponse->getPersistentEligible())
+        m_disableKeepAlive = true;
+    m_numRequests++;
 
     if (getEspLogLevel()>LogNormal)
         DBGLOG("Response content: %s", response.str());
@@ -494,6 +522,10 @@ int CHttpClient::sendRequest(IProperties *headers, const char* method, const cha
 //write(ofile, response.str(), response.length());
 //close(ofile);
 //}
+    if(!httpresponse->getPersistentEligible())
+        m_disableKeepAlive = true;
+    m_numRequests++;
+
     if (getEspLogLevel()>LogNormal)
         DBGLOG("Response content: %s", response.str());
 
@@ -730,6 +762,10 @@ int CHttpClient::postRequest(ISoapMessage &req, ISoapMessage& resp)
         return -1;
     }
 
+    if(!httpresponse->getPersistentEligible())
+        m_disableKeepAlive = true;
+    m_numRequests++;
+
     StringBuffer contenttype;
     httpresponse->getContentType(contenttype);
     response.set_content_type(contenttype.str());
@@ -741,7 +777,7 @@ int CHttpClient::postRequest(ISoapMessage &req, ISoapMessage& resp)
         if(httpresponse->isTextMessage())
             DBGLOG("http response content = %s", content.str());
     }
-   
+
     response.set_text(content.str());
             
     // parse soap fault

+ 7 - 0
esp/bindings/http/client/httpclient.ipp

@@ -21,6 +21,7 @@
 #include "httpclient.hpp"
 #include "securesocket.hpp"
 #include "espsession.ipp"
+#include "persistent.hpp"
 
 //#define COOKIE_HANDLING
 
@@ -31,6 +32,7 @@ private:
     Owned<ISecureSocketContext> m_ssctx;
     Owned<IPropertyTree> m_config;
     CriticalSection m_sscrit;
+    Owned<IPersistentHandler> m_persistentHandler;
 
 #ifdef COOKIE_HANDLING
     ReadWriteLock m_rwlock;
@@ -100,6 +102,10 @@ private:
     StringAttr m_password;
     StringAttr m_realm;
     ISecureSocketContext *m_ssctx;
+    IPersistentHandler* m_persistentHandler;
+    bool m_isPersistentSocket;
+    int m_numRequests;
+    SocketEndpoint m_ep;
 
     virtual int connect(StringBuffer& errmsg);
 
@@ -124,6 +130,7 @@ public:
     virtual void setRealm(const char* realm);
     virtual void setConnectTimeOutMs(unsigned timeout) override;
     virtual void setTimeOut(unsigned int timeout);
+    virtual void setPersistentHandler(IPersistentHandler* handler) { m_persistentHandler = handler; }
 };
 
 #endif

+ 64 - 24
esp/bindings/http/platform/httpprot.cpp

@@ -113,6 +113,8 @@ void CHttpProtocol::init(IPropertyTree * cfg, const char * process, const char *
         }
     }
 
+    initPersistentHandler(proc_cfg);
+
     Owned<IPropertyTree> proto_cfg = getProtocolConfig(cfg, protocol, process);
     if(proto_cfg)
     {
@@ -124,8 +126,7 @@ void CHttpProtocol::init(IPropertyTree * cfg, const char * process, const char *
     }
 }
 
-
-bool CHttpProtocol::notifySelected(ISocket *sock,unsigned selected)
+bool CHttpProtocol::notifySelected(ISocket *sock,unsigned selected, IPersistentHandler* persistentHandler)
 {
     try
     {
@@ -139,20 +140,24 @@ bool CHttpProtocol::notifySelected(ISocket *sock,unsigned selected)
         
         if(apport != NULL)
         {
-            Owned<ISocket> accepted = sock->accept();
+            Owned<ISocket> accepted;
+            if (persistentHandler == nullptr)
+                accepted.setown(sock->accept());
+            else
+                accepted.set(sock);
             if (accepted.get() != NULL)
             {
                 char peername[256];
                 int port = accepted->peer_name(peername, 256);
 
     #if defined(_DEBUG)
-                DBGLOG("HTTP connection from %s:%d", peername, port);
+                DBGLOG("HTTP connection from %s:%d on %s socket", peername, port, persistentHandler?"persistent":"new");
     #endif          
 
                 if(m_maxConcurrentThreads > 0)
                 {
                     // Using Threading pool instead of generating one thread per request.
-                    void ** holder = new void*[5];
+                    void ** holder = new void*[6];
                     holder[0] = (void*)(accepted.getLink());
                     holder[1] = (void*)apport;
                     int maxEntityLength = getMaxRequestEntityLength();
@@ -161,6 +166,7 @@ bool CHttpProtocol::notifySelected(ISocket *sock,unsigned selected)
                     holder[3] = (void*)&useSSL;
                     ISecureSocketContext* ctx = NULL;
                     holder[4] = (void*)ctx;
+                    holder[5] = (void*)persistentHandler;
                     try
                     {
                         http_thread_pool->start((void*)holder, "", m_threadCreateTimeout > 0?m_threadCreateTimeout*1000:0);
@@ -185,7 +191,7 @@ bool CHttpProtocol::notifySelected(ISocket *sock,unsigned selected)
                 else
                 {
                     /* create one thread per request */
-                    CHttpThread *workthread = new CHttpThread(accepted.getLink(), apport, CEspProtocol::getViewConfig());
+                    CHttpThread *workthread = new CHttpThread(accepted.getLink(), apport, CEspProtocol::getViewConfig(), false, nullptr, persistentHandler);
                     workthread->setMaxRequestEntityLength(getMaxRequestEntityLength());
                     workthread->start();
                     workthread->Release();
@@ -297,6 +303,8 @@ void CSecureHttpProtocol::init(IPropertyTree * cfg, const char * process, const
         }
     }
 
+    initPersistentHandler(proc_cfg);
+
     Owned<IPropertyTree> proto_cfg = getProtocolConfig(cfg, protocol, process);
     if(proto_cfg)
     {
@@ -308,7 +316,7 @@ void CSecureHttpProtocol::init(IPropertyTree * cfg, const char * process, const
     }
 }
 
-bool CSecureHttpProtocol::notifySelected(ISocket *sock,unsigned selected)
+bool CSecureHttpProtocol::notifySelected(ISocket *sock,unsigned selected, IPersistentHandler* persistentHandler)
 {
     try
     {
@@ -321,32 +329,37 @@ bool CSecureHttpProtocol::notifySelected(ISocket *sock,unsigned selected)
         
         if(apport != NULL)
         {
-            ISocket *accepted = sock->accept();
-            if (accepted!=NULL)
+            Owned<ISocket>accepted;
+            if(persistentHandler == nullptr)
+                accepted.setown(sock->accept());
+            else
+                accepted.set(sock);
+            if (accepted.get() != NULL)
             {
                 char peername[256];
                 int port = accepted->peer_name(peername, 256);
-                DBGLOG("HTTPS connection from %s:%d", peername, port);
+                DBGLOG("HTTPS connection from %s:%d on %s socket", peername, port, persistentHandler?"persistent":"new");
                 if(m_ssctx != NULL)
                 {
                     if(m_maxConcurrentThreads > 0)
                     {
                         // Using Threading pool instead of generating one thread per request.
-                        void ** holder = new void*[5];
-                        holder[0] = (void*)accepted;
+                        void ** holder = new void*[6];
+                        holder[0] = (void*)accepted.getLink();
                         holder[1] = (void*)apport;
                         int maxEntityLength = getMaxRequestEntityLength();
                         holder[2] = (void*)&maxEntityLength;
                         bool useSSL = true;
                         holder[3] = (void*)&useSSL;
                         holder[4] = (void*)m_ssctx.get();
+                        holder[5] = (void*)persistentHandler;
                         http_thread_pool->start((void*)holder);
                         delete [] holder;
                     }
                     else
                     {
                         /* create one thread per request */
-                        CHttpThread *workthread = new CHttpThread(accepted, apport, CEspProtocol::getViewConfig(), true, m_ssctx.get());
+                        CHttpThread *workthread = new CHttpThread(accepted.getLink(), apport, CEspProtocol::getViewConfig(), true, m_ssctx.get(), persistentHandler);
                         workthread->setMaxRequestEntityLength(getMaxRequestEntityLength());
                         workthread->start();
                         DBGLOG("Request processing thread started.");
@@ -389,7 +402,7 @@ const char * CSecureHttpProtocol::getProtocolName()
  *  CHttpThread Implementation                                            *
  **************************************************************************/
 CHttpThread::CHttpThread(bool viewConfig) : 
-   CEspProtocolThread("Http Thread") 
+   CEspProtocolThread("Http Thread"), m_persistentHandler(nullptr)
 {
     m_viewConfig = viewConfig;
     m_is_ssl = false;
@@ -397,15 +410,15 @@ CHttpThread::CHttpThread(bool viewConfig) :
 }
 
 CHttpThread::CHttpThread(ISocket *sock, bool viewConfig) : 
-   CEspProtocolThread(sock, "HTTP Thread") 
+   CEspProtocolThread(sock, "HTTP Thread"), m_persistentHandler(nullptr)
 {
     m_viewConfig = viewConfig;
     m_is_ssl = false;
     m_ssctx = NULL;
 }
 
-CHttpThread::CHttpThread(ISocket *sock, CEspApplicationPort* apport, bool viewConfig, bool isSSL, ISecureSocketContext* ssctx) : 
-   CEspProtocolThread(sock, "HTTP Thread") 
+CHttpThread::CHttpThread(ISocket *sock, CEspApplicationPort* apport, bool viewConfig, bool isSSL, ISecureSocketContext* ssctx,  IPersistentHandler* persistentHandler) :
+   CEspProtocolThread(sock, "HTTP Thread"), m_persistentHandler(persistentHandler)
 {
     m_viewConfig = viewConfig;
     m_apport = apport;
@@ -419,12 +432,13 @@ CHttpThread::~CHttpThread()
 
 bool CHttpThread::onRequest()
 {
+    keepAlive = false;
     ActiveRequests recording;
 
     Owned<CEspHttpServer> httpserver;
     
     Owned<ISecureSocket> secure_sock;
-    if(m_is_ssl && m_ssctx)
+    if(m_is_ssl && m_ssctx && m_persistentHandler == nullptr)
     {
         DBGLOG("Creating secure socket");
         secure_sock.setown(m_ssctx->createSecureSocket(m_socket.getLink(), getEspLogLevel()));
@@ -451,7 +465,8 @@ bool CHttpThread::onRequest()
             DBGLOG("Unknown exception accepting from secure socket");
             return false;
         }
-        DBGLOG("Accepted from secure socket");
+        DBGLOG("Request from secure socket");
+        m_socket.set(secure_sock);
         httpserver.setown(new CEspHttpServer(*secure_sock.get(), m_apport, m_viewConfig, getMaxRequestEntityLength()));
     }
     else
@@ -463,8 +478,20 @@ bool CHttpThread::onRequest()
     initThreadLocal(sizeof(t), &t);
 
     httpserver->processRequest();
+
+    if (m_persistentHandler == nullptr)
+    {
+        keepAlive = m_apport->queryProtocol()->persistentEnabled() && httpserver->persistentEligible();
+        if (keepAlive)
+            m_apport->queryProtocol()->addPersistent(m_socket.get());
+    }
+    else
+    {
+        keepAlive = httpserver->persistentEligible();
+        m_persistentHandler->doneUsing(m_socket, keepAlive);
+    }
     clearThreadLocal();
-    
+
     return false;
 }
 
@@ -478,6 +505,7 @@ void CPooledHttpThread::init(void *param)
     m_MaxRequestEntityLength = *(int*)(((void**)param)[2]);
     m_is_ssl = *(bool*)(((void**)param)[3]);
     m_ssctx = (ISecureSocketContext*)(((void**)param)[4]);
+    m_persistentHandler = (IPersistentHandler*)(((void**)param)[5]);
 }
 
 CPooledHttpThread::~CPooledHttpThread()
@@ -490,7 +518,7 @@ void CPooledHttpThread::threadmain()
     Owned<CEspHttpServer> httpserver;
     
     Owned<ISecureSocket> secure_sock;
-    if(m_is_ssl && m_ssctx)
+    if(m_is_ssl && m_ssctx && m_persistentHandler == nullptr)
     {
         secure_sock.setown(m_ssctx->createSecureSocket(m_socket.getLink(), getEspLogLevel()));
         int res = 0;
@@ -513,7 +541,8 @@ void CPooledHttpThread::threadmain()
         {
             return;
         }
-        httpserver.setown(new CEspHttpServer(*secure_sock.get(), m_apport, false, getMaxRequestEntityLength()));
+        m_socket.set(secure_sock);
+        httpserver.setown(new CEspHttpServer(*m_socket, m_apport, false, getMaxRequestEntityLength()));
     }
     else
     {
@@ -522,10 +551,22 @@ void CPooledHttpThread::threadmain()
 
     time_t t = time(NULL);  
     initThreadLocal(sizeof(t), &t);
+    bool keepAlive = false;
     try
     {
         ESP_TIME_SECTION("CPooledHttpThread::threadmain: httpserver->processRequest()");
         httpserver->processRequest();
+        if (m_persistentHandler == nullptr)
+        {
+            keepAlive = m_apport->queryProtocol()->persistentEnabled() && httpserver->persistentEligible();
+            if (keepAlive)
+                m_apport->queryProtocol()->addPersistent(m_socket.get());
+        }
+        else
+        {
+            keepAlive = httpserver->persistentEligible();
+            m_persistentHandler->doneUsing(m_socket, keepAlive);
+        }
     }
     catch (IException *e) 
     {
@@ -541,7 +582,7 @@ void CPooledHttpThread::threadmain()
 
     try
     {
-        if(m_socket != NULL)
+        if (!keepAlive && m_socket != nullptr)
         {
             m_socket->shutdown(SHUTDOWN_WRITE);
             m_socket.clear();
@@ -559,4 +600,3 @@ void CPooledHttpThread::threadmain()
     }
 
 }
-

+ 5 - 6
esp/bindings/http/platform/httpprot.hpp

@@ -47,7 +47,7 @@ private:
     int m_MaxRequestEntityLength;
     bool m_is_ssl;
     ISecureSocketContext* m_ssctx;
-
+    IPersistentHandler* m_persistentHandler;
 public:
     IMPLEMENT_IINTERFACE;
 
@@ -87,9 +87,9 @@ private:
     CHttpThread(ISocket *sock, bool viewConfig);
     bool m_is_ssl;
     ISecureSocketContext* m_ssctx;
-
+    IPersistentHandler* m_persistentHandler;
 public:
-    CHttpThread(ISocket *sock, CEspApplicationPort* apport, bool viewConfig, bool isSSL = false, ISecureSocketContext* ssctx = NULL);
+    CHttpThread(ISocket *sock, CEspApplicationPort* apport, bool viewConfig, bool isSSL = false, ISecureSocketContext* ssctx = NULL, IPersistentHandler* persistentHandler = NULL);
     
     virtual ~CHttpThread();
     virtual bool onRequest();
@@ -98,7 +98,6 @@ public:
     virtual int getMaxRequestEntityLength() { return m_MaxRequestEntityLength; }
 };
 
-
 class esp_http_decl CHttpProtocol : public CEspProtocol
 {
 private:
@@ -120,7 +119,7 @@ public:
 
     virtual void init(IPropertyTree * cfg, const char * process, const char * protocol);
 
-    virtual bool notifySelected(ISocket *sock,unsigned selected);
+    virtual bool notifySelected(ISocket *sock,unsigned selected, IPersistentHandler* persistentHandler) override;
 //IEspProtocol
     virtual const char * getProtocolName();
 };
@@ -142,7 +141,7 @@ public:
 
     virtual void init(IPropertyTree * cfg, const char * process, const char * protocol);
 
-    virtual bool notifySelected(ISocket *sock,unsigned selected);
+    virtual bool notifySelected(ISocket *sock,unsigned selected, IPersistentHandler* persistentHandler) override;
 //IEspProtocol
     virtual const char * getProtocolName();
 };

+ 13 - 0
esp/bindings/http/platform/httpservice.cpp

@@ -32,6 +32,7 @@
 #include "http/platform/httpsecurecontext.hpp"
 #include "http/platform/httpservice.hpp"
 #include "http/platform/httptransport.hpp"
+#include "http/platform/httpprot.hpp"
 
 #include "htmlpage.hpp"
 #include "dasds.hpp"
@@ -212,6 +213,8 @@ void checkSetCORSAllowOrigin(CHttpRequest *req, CHttpResponse *resp)
 
 int CEspHttpServer::processRequest()
 {
+    m_request->setPersistentEnabled(m_apport->queryProtocol()->persistentEnabled());
+    m_response->setPersistentEnabled(m_apport->queryProtocol()->persistentEnabled());
     try
     {
         if (m_request->receive(NULL)==-1) // MORE - pass in IMultiException if we want to see exceptions (which are not fatal)
@@ -235,6 +238,8 @@ int CEspHttpServer::processRequest()
         return 0;
     }
 
+    m_response->setPersistentEligible(m_request->getPersistentEligible());
+
     try
     {
         
@@ -1749,3 +1754,11 @@ const char* CEspHttpServer::readCookie(const char* cookieName, StringBuffer& coo
         cookieValue.append(sessionIDCookie->getValue());
     return cookieValue.str();
 }
+
+bool CEspHttpServer::persistentEligible()
+{
+    if(m_request.get() != nullptr)
+        return m_request->getPersistentEligible();
+    else
+        return false;
+}

+ 1 - 1
esp/bindings/http/platform/httpservice.hpp

@@ -134,8 +134,8 @@ public:
     virtual int onUpdatePassword(CHttpRequest* request, CHttpResponse* response);
 #endif
 
-
     virtual const char * getServiceType() {return "HttpServer";};
+    bool persistentEligible();
 };
 
 

+ 29 - 3
esp/bindings/http/platform/httptransport.cpp

@@ -322,7 +322,7 @@ and CHttpResponse
 ****************************************************************************/
 
 
-CHttpMessage::CHttpMessage(ISocket& socket) : m_socket(socket)
+CHttpMessage::CHttpMessage(ISocket& socket) : m_socket(socket), m_persistentEligible(false)
 {
     m_bufferedsocket.setown(createBufferedSocket(&socket));
     m_content_length = -1;
@@ -1284,6 +1284,11 @@ int CHttpRequest::parseFirstLine(char* oneline)
         parseQueryString(querystr);
     }
 
+    StringBuffer verbuf;
+    curptr = Utils::getWord(curptr, verbuf);
+    if(verbuf.length() > 0)
+        m_version.set(verbuf);
+
     delete[] buff;
 
     return 0;
@@ -1842,6 +1847,18 @@ int CHttpRequest::processHeaders(IMultiException *me)
     if(m_content_length > 0 && m_MaxRequestEntityLength > 0 && m_content_length > m_MaxRequestEntityLength && (!isUpload(false)))
         throw createEspHttpException(HTTP_STATUS_BAD_REQUEST_CODE, "The request length was too long.", HTTP_STATUS_BAD_REQUEST);
 
+    StringBuffer conheader;
+    getHeader("Connection", conheader);
+    float httpver = 1.0;
+    if(m_version.length() > 5)
+        httpver = atof(m_version.str()+5);
+    if(((httpver + 0.001 > 1.1 && (conheader.length() == 0 || stricmp(conheader.str(), "Close") != 0))
+                || (conheader.length() > 0 && stricmp(conheader.str(), "Keep-Alive") == 0))
+            && (m_content_length != -1 || (m_httpMethod.length()>0 && stricmp(m_httpMethod.get(), "GET") == 0)))
+        setPersistentEligible(true);
+    else
+        setPersistentEligible(false);
+
     return 0;
 }
 
@@ -2073,8 +2090,9 @@ StringBuffer& CHttpResponse::constructHeaderBuffer(StringBuffer& headerbuf, bool
     if(inclLen && m_content_length > 0)
         headerbuf.append("Content-Length: ").append(m_content_length).append("\r\n");
 
-    headerbuf.append("Connection: close\r\n");
-    
+    if(!(m_persistentEnabled && getPersistentEligible()))
+        headerbuf.append("Connection: close\r\n");
+
     ForEachItemIn(x, m_cookies)
     {
         CEspCookie* cookie = &m_cookies.item(x);
@@ -2270,6 +2288,14 @@ int CHttpResponse::processHeaders(IMultiException *me)
         parseOneHeader(oneline);
         lenread = m_bufferedsocket->readline(oneline, MAX_HTTP_HEADER_LEN, me);
     }
+
+    StringBuffer conheader;
+    getHeader("Connection", conheader);
+    if(conheader.length() == 0 || stricmp(conheader.str(), "Close") != 0)
+        setPersistentEligible(true);
+    else
+        setPersistentEligible(false);
+
     return 0;
 }
 

+ 6 - 1
esp/bindings/http/platform/httptransport.ipp

@@ -71,6 +71,8 @@ protected:
     StringAttr   m_paramstr;
     int m_supportClientXslt;
     bool         m_isForm;
+    bool         m_persistentEligible;
+    bool         m_persistentEnabled;
 
     int m_paramCount;
     int m_attachCount;
@@ -254,6 +256,10 @@ public:
         return false;
     }
     const char* queryAllParameterString() { return allParameterString.str(); }
+
+    virtual void setPersistentEligible(bool eligible) { m_persistentEligible = eligible; }
+    virtual bool getPersistentEligible() { return m_persistentEligible; }
+    virtual void setPersistentEnabled(bool enabled) { m_persistentEnabled = enabled; }
 };
 
 
@@ -305,7 +311,6 @@ private:
     bool            m_authrequired;
     int             m_MaxRequestEntityLength;
     ESPSerializationFormat respSerializationFormat;
-
     virtual int parseFirstLine(char* oneline);
     virtual StringBuffer& constructHeaderBuffer(StringBuffer& headerbuf, bool inclLen);
     virtual int processHeaders(IMultiException *me);

+ 36 - 3
esp/platform/espprotocol.cpp

@@ -49,7 +49,7 @@ void ActiveRequests::dec()
     atomic_dec(&gActiveRequests);
 }
 
-CEspApplicationPort::CEspApplicationPort(bool viewcfg) : bindingCount(0), defBinding(-1), viewConfig(viewcfg), rootAuth(false), navWidth(165), navResize(false), navScroll(false)
+CEspApplicationPort::CEspApplicationPort(bool viewcfg, CEspProtocol* prot) : bindingCount(0), defBinding(-1), viewConfig(viewcfg), rootAuth(false), navWidth(165), navResize(false), navScroll(false), protocol(prot)
 {
     build_ver = getBuildVersion();
 
@@ -632,11 +632,13 @@ CEspProtocol::CEspProtocol()
 CEspProtocol::~CEspProtocol()
 {
     clear();
+    if(m_persistentHandler)
+        m_persistentHandler->stop(true);
 }
 
 bool CEspProtocol::notifySelected(ISocket *sock,unsigned selected)
 {
-    return true;
+    return notifySelected(sock, selected, nullptr);
 }
 
 const char * CEspProtocol::getProtocolName()
@@ -663,7 +665,7 @@ void CEspProtocol::addBindingMap(ISocket *sock, IEspRpcBinding* binding, bool is
     }
     else
     {
-        apport = new CEspApplicationPort(m_viewConfig);
+        apport = new CEspApplicationPort(m_viewConfig, this);
         apport->appendBinding(entry, isdefault);
 
         CApplicationPortMap::value_type vt(port, apport);
@@ -676,3 +678,34 @@ CEspApplicationPort* CEspProtocol::queryApplicationPort(int port)
     CApplicationPortMap::iterator apport_it = m_portmap.find(port);
     return (apport_it != m_portmap.end()) ? (*apport_it).second : NULL;
 }
+
+void CEspProtocol::addPersistent(ISocket* sock)
+{
+    if (m_persistentHandler != nullptr)
+        m_persistentHandler->add(sock);
+}
+
+void CEspProtocol::initPersistentHandler(IPropertyTree * proc_cfg)
+{
+    const char* idleTimeStr = nullptr;
+    const char* maxReqsStr = nullptr;
+    if (proc_cfg != nullptr)
+    {
+        idleTimeStr = proc_cfg->queryProp("@maxPersistentIdleTime");
+        maxReqsStr = proc_cfg->queryProp("@maxPersistentRequests");
+    }
+    //To disable persistent connections, set maxPersistentIdleTime or maxPersistentRequests to 0
+    int maxIdleTime = DEFAULT_MAX_PERSISTENT_IDLE_TIME;
+    if (idleTimeStr != nullptr && *idleTimeStr != '\0')
+        maxIdleTime = atoi(idleTimeStr);
+    int maxReqs = DEFAULT_MAX_PERSISTENT_REQUESTS;
+    if (maxReqsStr != nullptr && *maxReqsStr != '\0')
+        maxReqs = atoi(maxReqsStr);
+
+    if (maxIdleTime == 0 || maxReqs == 0)
+    {
+        DBGLOG("Persistent connection won't be enabled because maxPersistentIdleTime or maxPersistentRequests is set to 0");
+        return;
+    }
+    m_persistentHandler.setown(createPersistentHandler(this, maxIdleTime, maxReqs, PersistentLogLevel::PLogMax));
+}

+ 13 - 4
esp/platform/espprotocol.hpp

@@ -24,6 +24,7 @@
 //SCM Interfaces
 #include "esp.hpp"
 #include "xslprocessor.hpp"
+#include "persistent.hpp"
 
 //STL
 #include <algorithm>
@@ -79,9 +80,9 @@ public:
     }
 };
 
+class CEspProtocol;
 
 //MAKEPointerArray(CEspBindingEntry*, CEspBindingArray);
-
 class CEspApplicationPort
 {
     CEspBindingEntry* bindings[100];
@@ -99,8 +100,9 @@ class CEspApplicationPort
 
     HINSTANCE hxsl;
     Owned<IXslProcessor> xslp;
+    CEspProtocol* protocol;
 public:
-    CEspApplicationPort(bool viewcfg);
+    CEspApplicationPort(bool viewcfg, CEspProtocol* prot);
 
     ~CEspApplicationPort()
     {
@@ -127,6 +129,7 @@ public:
 
     CEspBindingEntry* queryBindingItem(int item){return (item<bindingCount) ? bindings[item] : NULL;}
     CEspBindingEntry* getDefaultBinding(){return bindings[(defBinding>=0) ? defBinding : 0];}
+    CEspProtocol* queryProtocol() { return protocol; }
 #ifdef _USE_OPENLDAP
     unsigned updatePassword(IEspContext &context, IHttpMessage* request, StringBuffer& message);
     void onUpdatePasswordInput(IEspContext &context, StringBuffer &html);
@@ -140,7 +143,8 @@ typedef map<int, CEspApplicationPort*> CApplicationPortMap;
 
 class CEspProtocol : public CInterface,
     implements IEspProtocol,
-    implements ISocketSelectNotify
+    implements ISocketSelectNotify,
+    implements IPersistentSelectNotify
 {
 private:
     //map between socket port and one or more bindings
@@ -148,7 +152,8 @@ private:
     bool m_viewConfig;
     int m_MaxRequestEntityLength;
     IEspContainer *m_container;
-
+    unsigned m_nextSeq;
+    Owned<IPersistentHandler> m_persistentHandler;
 public:
     IMPLEMENT_IINTERFACE;
 
@@ -177,6 +182,7 @@ public:
     bool getViewConfig(){return m_viewConfig;}
 
     virtual bool notifySelected(ISocket *sock,unsigned selected);
+    virtual bool notifySelected(ISocket *sock,unsigned selected, IPersistentHandler* persistentHandler) override { return false; };
 
     //IEspProtocol
     virtual const char * getProtocolName();
@@ -187,6 +193,9 @@ public:
     virtual void setMaxRequestEntityLength(int len) {m_MaxRequestEntityLength = len;};
     virtual int getMaxRequestEntityLength() { return m_MaxRequestEntityLength; }
     virtual void setContainer(IEspContainer* container) { m_container = container; }
+    virtual void initPersistentHandler(IPropertyTree * proc_cfg);
+    virtual bool persistentEnabled() { return m_persistentHandler != nullptr; }
+    virtual void addPersistent(ISocket* sock);
 };
 
 #endif

+ 18 - 16
esp/platform/espthread.cpp

@@ -53,14 +53,14 @@
 
 
 CEspProtocolThread::CEspProtocolThread(const char *name)
-: Thread("CEspProtocolThread"), m_name(name)
+: Thread("CEspProtocolThread"), m_name(name), keepAlive(false)
 {
     terminating = false;
 }
 
 
 CEspProtocolThread::CEspProtocolThread(ISocket *sock, const char *name)
-: Thread("CEspProtocolThread"), m_name(name)
+: Thread("CEspProtocolThread"), m_name(name), keepAlive(false)
 {
     terminating = false;
     setSocket(sock);
@@ -195,22 +195,24 @@ int CEspProtocolThread::run()
         ERRLOG("Unknown Exception in CEspProtocolThread::run while processing request.");
     }
 
-    try
+    if(!keepAlive)
     {
-        m_socket->shutdown();
-        m_socket->close();
-    }
-    catch (IException *e) 
-    {
-        StringBuffer estr;
-        DBGLOG("Exception(%d, %s) - in CEspProtocolThread::run while closing socket.", e->errorCode(), e->errorMessage(estr).str());
-        e->Release();
-    }
-    catch(...)
-    {
-        DBGLOG("General Exception - in CEspProtocolThread::run while closing socket.");
+        try
+        {
+            m_socket->shutdown();
+            m_socket->close();
+        }
+        catch (IException *e)
+        {
+            StringBuffer estr;
+            DBGLOG("Exception(%d, %s) - in CEspProtocolThread::run while closing socket.", e->errorCode(), e->errorMessage(estr).str());
+            e->Release();
+        }
+        catch(...)
+        {
+            DBGLOG("General Exception - in CEspProtocolThread::run while closing socket.");
+        }
     }
-
     Release();
     return 0;
 }

+ 1 - 1
esp/platform/espthread.hpp

@@ -35,7 +35,7 @@ protected:
    StringAttr m_name;
 
     int run();
-
+    bool keepAlive;
 public:
     IMPLEMENT_IINTERFACE;
     

+ 252 - 0
esp/platform/persistent.cpp

@@ -0,0 +1,252 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "persistent.hpp"
+#include "jthread.hpp"
+#include "jdebug.hpp"
+#include "jlog.hpp"
+#include <memory>
+
+#define PERSILOG(loglevel, ...) if(static_cast<int>(loglevel) <= static_cast<int>(m_loglevel)) DBGLOG(__VA_ARGS__)
+
+class CPersistentInfo : implements IInterface, public CInterface
+{
+    friend class CPersistentHandler;
+public:
+    IMPLEMENT_IINTERFACE;
+    CPersistentInfo(bool _inUse, int _timeUsed, int _useCount, SocketEndpoint* _ep)
+        : inUse(_inUse), timeUsed(_timeUsed), useCount(_useCount), ep(_ep?(new SocketEndpoint(*_ep)):nullptr)
+    {
+    }
+    virtual ~CPersistentInfo() { } //TODO remove trace
+protected:
+    bool inUse;
+    int timeUsed;
+    int useCount;
+    std::unique_ptr<SocketEndpoint> ep;
+};
+
+using SockInfoMap = MapBetween<Linked<ISocket>, ISocket*, Owned<CPersistentInfo>, CPersistentInfo*>;
+
+class CPersistentHandler : implements IPersistentHandler, implements ISocketSelectNotify, public Thread
+{
+private:
+    static const int MAX_INFLIGHT_TIME = 600;
+    int m_maxIdleTime;
+    int m_maxReqs;
+    Owned<ISocketSelectHandler> m_selectHandler;
+    IPersistentSelectNotify* m_notify;
+    Semaphore m_waitsem;
+    bool m_stop;
+    SockInfoMap m_infomap;
+    Mutex m_mutex;
+    PersistentLogLevel m_loglevel;
+    static int CurID;
+    int m_id = 0;
+public:
+    IMPLEMENT_IINTERFACE;
+    CPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel)
+                        : m_stop(false), m_notify(notify), m_maxIdleTime(maxIdleTime), m_maxReqs(maxReqs), m_loglevel(loglevel)
+    {
+        m_id = ++CurID;
+        m_selectHandler.setown(createSocketSelectHandler());
+    }
+
+    virtual ~CPersistentHandler()
+    {
+    }
+
+    virtual void add(ISocket* sock, SocketEndpoint* ep = nullptr) override
+    {
+        if (!sock)
+            return;
+        PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: adding socket %d to handler %d", sock->OShandle(), m_id);
+        synchronized block(m_mutex);
+        m_selectHandler->add(sock, SELECTMODE_READ, this);
+        m_infomap.setValue(sock, new CPersistentInfo(false, usTick()/1000, 0, ep));
+    }
+
+    virtual void remove(ISocket* sock) override
+    {
+        if (!sock)
+            return;
+        PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Removing socket %d from handler %d", sock->OShandle(), m_id);
+        synchronized block(m_mutex);
+        Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
+        CPersistentInfo* info = nullptr;
+        if (val)
+            info = *val;
+        if (!info || !info->inUse) //If inUse sock was already removed from select handler
+            m_selectHandler->remove(sock);
+        if (info)
+            m_infomap.remove(sock);
+    }
+
+    virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne) override
+    {
+        synchronized block(m_mutex);
+        Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
+        CPersistentInfo* info = nullptr;
+        if (val)
+            info = *val;
+        if (info)
+        {
+            info->useCount += usesOverOne;
+            bool quotaReached = m_maxReqs > 0 && m_maxReqs <= info->useCount;
+            if (keep && !quotaReached)
+            {
+                info->inUse = false;
+                info->timeUsed = usTick()/1000;
+                m_selectHandler->add(sock, SELECTMODE_READ, this);
+            }
+            else
+            {
+                if (quotaReached)
+                    PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Socket %d reached quota", sock->OShandle());
+                if(!keep)
+                    PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Indicated not to keep socket %d", sock->OShandle());
+                remove(sock);
+            }
+        }
+    }
+
+    virtual Linked<ISocket> getAvailable(SocketEndpoint* ep = nullptr) override
+    {
+        synchronized block(m_mutex);
+        for (auto si:m_infomap)
+        {
+            CPersistentInfo* info = si.getValue();
+            if (info && !info->inUse && (ep == nullptr || (info->ep != nullptr && *(info->ep) == *ep)))
+            {
+                ISocket* sock = *(ISocket**)(si.getKey());
+                if (sock)
+                {
+                    info->inUse = true;
+                    info->timeUsed = usTick()/1000;
+                    info->useCount++;
+                    m_selectHandler->remove(sock);
+                    PERSILOG(PersistentLogLevel::PLogMax, "PERSISTENT: Obtained persistent socket %d from handler %d", sock->OShandle(), m_id);
+                    return sock;
+                }
+            }
+        }
+        return nullptr;
+    }
+
+    //ISocketSelectNotify
+    bool notifySelected(ISocket *sock,unsigned selected) override
+    {
+        size32_t x = sock->avail_read();
+        if (x == 0)
+        {
+            PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Detected closing of connection %d from the other end", sock->OShandle());
+            remove(sock);
+        }
+        else if (m_notify != nullptr)
+        {
+            bool ignore = false;
+            Owned<ISocket> mysock(LINK(sock));
+            PERSILOG(PersistentLogLevel::PLogMax, "Data arrived on persistent connection %d", sock->OShandle());
+            {
+                synchronized block(m_mutex);
+                Owned<CPersistentInfo>* val = m_infomap.getValue(sock);
+                CPersistentInfo* info = nullptr;
+                if (val)
+                    info = *val;
+                if (info)
+                {
+                    info->inUse = true;
+                    info->timeUsed = usTick()/1000;
+                    info->useCount++;
+                }
+                else
+                {
+                    ignore = true;
+                    PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: No info found for socket %d, ignore data", sock->OShandle());
+                }
+                m_selectHandler->remove(sock);
+            }
+            if (!ignore)
+                m_notify->notifySelected(sock, selected, this);
+        }
+        return false;
+    }
+
+    //Thread
+    virtual void start() override
+    {
+        m_selectHandler->start();
+        Thread::start();
+        PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Handler %d started with max idle time %d and max requests %d", m_id, m_maxIdleTime, m_maxReqs);
+    }
+
+    virtual int run() override
+    {
+        while (true)
+        {
+            m_waitsem.wait(1000);
+            if (m_stop)
+                break;
+            unsigned now = usTick()/1000;
+            unsigned oldest = now - m_maxIdleTime*1000;
+            unsigned oldest_inflight = now - MAX_INFLIGHT_TIME*1000;
+            synchronized block(m_mutex);
+            std::vector<ISocket*> socks1;
+            std::vector<ISocket*> socks2;
+            for (auto& si:m_infomap)
+            {
+                CPersistentInfo* info = si.getValue();
+                if (!info)
+                    continue;
+                if(m_maxIdleTime > 0 && !info->inUse && info->timeUsed < oldest)
+                    socks1.push_back(*(ISocket**)(si.getKey()));
+                if(info->inUse && info->timeUsed < oldest_inflight)
+                    socks2.push_back(*(ISocket**)(si.getKey()));
+            }
+            for (auto s:socks1)
+            {
+                PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Socket %d has been idle for %d seconds so remove it", s->OShandle(), m_maxIdleTime);
+                remove(s);
+            }
+            for (auto s:socks2)
+            {
+                PERSILOG(PersistentLogLevel::PLogMin, "PERSISTENT: Socket %d has been in flight for %d seconds, remove it", s->OShandle(), MAX_INFLIGHT_TIME);
+                remove(s);
+            }
+        }
+        return 0;
+    }
+
+    virtual void stop(bool wait) override
+    {
+        m_selectHandler->stop(wait);
+        m_stop = true;
+        m_waitsem.signal();
+        if(wait)
+            join(1000);
+        PERSILOG(PersistentLogLevel::PLogNormal, "PERSISTENT: Handler %d stopped", m_id);
+    }
+};
+
+int CPersistentHandler::CurID = 0;
+
+IPersistentHandler* createPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime, int maxReqs, PersistentLogLevel loglevel)
+{
+    Owned<CPersistentHandler> handler = new CPersistentHandler(notify, maxIdleTime, maxReqs, loglevel);
+    handler->start();
+    return handler.getClear();
+}

+ 45 - 0
esp/platform/persistent.hpp

@@ -0,0 +1,45 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef __PERSISTENT_HPP__
+#define __PERSISTENT_HPP__
+
+#include "jlib.hpp"
+#include "jsocket.hpp"
+
+#define DEFAULT_MAX_PERSISTENT_IDLE_TIME 60
+#define DEFAULT_MAX_PERSISTENT_REQUESTS  100
+
+enum class PersistentLogLevel { PLogNone=0, PLogMin=1, PLogNormal=5, PLogMax=10};
+
+interface IPersistentHandler : implements IInterface
+{
+    virtual void add(ISocket* sock, SocketEndpoint* ep = nullptr) = 0;
+    virtual void remove(ISocket* sock) = 0;
+    virtual void doneUsing(ISocket* sock, bool keep, unsigned usesOverOne = 0) = 0;
+    virtual Linked<ISocket> getAvailable(SocketEndpoint* ep = nullptr) = 0;
+    virtual void stop(bool wait) = 0;
+};
+
+interface IPersistentSelectNotify
+{
+    virtual bool notifySelected(ISocket *sock,unsigned selected, IPersistentHandler* handler) = 0;
+};
+
+IPersistentHandler* createPersistentHandler(IPersistentSelectNotify* notify, int maxIdleTime = DEFAULT_MAX_PERSISTENT_IDLE_TIME, int maxReqs = DEFAULT_MAX_PERSISTENT_REQUESTS, PersistentLogLevel loglevel=PersistentLogLevel::PLogNormal);
+
+#endif //__PERSISTENT_HPP__

+ 1 - 0
esp/protocols/http/CMakeLists.txt

@@ -46,6 +46,7 @@ set(SRCS
     ../../platform/espcache.cpp
     ../../platform/sechandler.cpp
     ../../platform/txsummary.cpp
+    ../../platform/persistent.cpp
     mapinfo.cpp
     plugin.cpp
     )

+ 147 - 57
esp/tools/soapplus/http.cpp

@@ -44,6 +44,36 @@ void createDirectories(const char* outdir, const char* url, bool bClient, bool b
 
 const char* sepstr = "\n---------------\n";
 
+class CSocketChecker : implements IInterface, public CInterface
+{
+private:
+    Owned<ISocket> m_socket;
+
+public:
+    IMPLEMENT_IINTERFACE;
+
+    CSocketChecker(ISocket* sock)
+    {
+        m_socket.set(sock);
+    }
+
+    //0: normal (timeout or data available)
+    //1: socket closed
+    //<0: error
+    int waitAndCheck(int waitmillisecs)
+    {
+        if(!m_socket || !m_socket->check_connection())
+            return -1;
+        int ret = m_socket->wait_read(waitmillisecs);
+        if(ret <= 0)
+            return ret;
+        if(m_socket->avail_read() == 0)
+            return 1;
+        else
+            return 0;
+    }
+};
+
 HttpStat::HttpStat()
 {
     threads = 1;
@@ -1318,7 +1348,6 @@ StringBuffer& HttpClient::generateGetRequest(StringBuffer& request)
     request.append("Accept: image/gif, image/x-xbitmap, image/jpeg, image/pjpeg, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword, application/x-shockwave-flash, */*\r\n");
     request.append("Accept-Language: en-us\r\n");
     request.append("User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)\r\n");
-    request.append("Connection: close\r\n");
     request.append("Host: ").append(m_host.str());
     if(m_port != 80)
         request.appendf(":%d", m_port);
@@ -1346,7 +1375,6 @@ StringBuffer& HttpClient::insertSoapHeaders(StringBuffer& request)
     headers.append("Content-Type: text/xml\r\n");
     headers.append("User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)\r\n");
     headers.appendf("Content-Length: %d\r\n", request.length());
-    headers.append("Connection: close\r\n");
     headers.append("Host: ").append(m_host.str());
     if(m_port != 80)
         headers.appendf(":%d", m_port);
@@ -1536,7 +1564,7 @@ int HttpClient::sendRequest(StringBuffer& req, IFileIO* request_output, IFileIO*
             request.append(c);
         seq++;
     }
-    
+
     if(seq < req.length())
         request.append(req.length() - seq, req.str() + seq);
 
@@ -1547,8 +1575,6 @@ int HttpClient::sendRequest(StringBuffer& req, IFileIO* request_output, IFileIO*
     if(request_output)
         request_output->write(0, request.length(), request.str());
 
-    unsigned start1 = msTick();
-
     SocketEndpoint ep;
     ep.set(m_host.str(), m_port);
     Owned<ISocket> socket;
@@ -1584,74 +1610,109 @@ int HttpClient::sendRequest(StringBuffer& req, IFileIO* request_output, IFileIO*
         return -1;
     }
 
-    if(http_tracelevel >= 5)
-        fprintf(m_logfile, ">>sending out request. Request length=%d\n", request.length());
+    bool isPersist = m_globals->getPropBool("isPersist", false);
+    int numReq = 1;
+    int pausemillisecs = 0;
+    if(isPersist)
+    {
+        if(m_globals->hasProp("persistrequests"))
+            numReq = atoi(m_globals->queryProp("persistrequests"));
+        if(m_globals->hasProp("persistpause"))
+            pausemillisecs = atof(m_globals->queryProp("persistpause"))*1000;
+        DBGLOG("Is persist, %d %d", numReq, pausemillisecs);
+    }
+    else
+        DBGLOG("Is not persist");
 
-    if(http_tracelevel >= 10)
-        fprintf(m_logfile, "%s%s%s\n", sepstr, request.str(), sepstr);
+    for(int iter=0; iter<numReq; iter++)
+    {
+        unsigned start1 = msTick();
 
-    socket->write(request.str(), request.length());
-                
-    StringBuffer buf;
-    StringBuffer* bufptr;
-    if(outputbuf)
-        bufptr = outputbuf;
-    else
-        bufptr = &buf;
-    Owned<IByteOutputStream> ostream = createOutputStream(*bufptr);
-    bool isRoxie;
-    __int64 resplen = Http::receiveData(socket.get(), ostream.get(), true, isRoxie, NULL, full_output, content_output);
-    if(http_tracelevel >= 5)
-        fprintf(m_logfile, ">>received response. Response length: %" I64F "d.\n", resplen);
-    if(http_tracelevel >= 10)
-        fprintf(m_logfile, "%s\n", bufptr->str());
+        if(http_tracelevel >= 5)
+            fprintf(m_logfile, ">>sending out request. Request length=%d\n", request.length());
 
-    socket->shutdown();
-    socket->close();
+        if(http_tracelevel >= 10)
+            fprintf(m_logfile, "%s%s%s\n", sepstr, request.str(), sepstr);
 
-    unsigned end1 = msTick();
+        socket->write(request.str(), request.length());
 
-    int duration = end1 - start1;
+        StringBuffer buf;
+        StringBuffer* bufptr;
+        if(outputbuf)
+            bufptr = outputbuf;
+        else
+            bufptr = &buf;
+        Owned<IByteOutputStream> ostream = createOutputStream(*bufptr);
+        bool isRoxie;
+        __int64 resplen = Http::receiveData(socket.get(), ostream.get(), true, isRoxie, NULL, full_output, content_output);
+        if(http_tracelevel >= 5)
+            fprintf(m_logfile, ">>received response. Response length: %" I64F "d.\n", resplen);
+        if(http_tracelevel >= 10)
+            fprintf(m_logfile, "%s\n", bufptr->str());
 
-    if(http_tracelevel >= 5)
-        fprintf(m_logfile, "Time taken to send request and receive response: %d milli-seconds\n", duration);
 
-    if(stat)
-    {
-        stat->duration += duration;
-        stat->totaltime += duration;
-        stat->numrequests += 1;
-        if(duration > stat->slowest)
-            stat->slowest = duration;
-        if(duration < stat->fastest)
-            stat->fastest = duration;
-        stat->totalreqlen += request.length();
-        stat->totalresplen += resplen;
-    }
+        if(!isPersist || iter >= numReq - 1)
+        {
+            socket->shutdown();
+            socket->close();
+        }
 
-    if(m_doValidation)
-    {
-        int ret = validate(*bufptr);
-        if(http_tracelevel > 0 && m_doValidation == 1)
+        unsigned end1 = msTick();
+
+        int duration = end1 - start1;
+
+        if(http_tracelevel >= 5)
+            fprintf(m_logfile, "Time taken to send request and receive response: %d milli-seconds\n", duration);
+
+        if(stat)
         {
-            if(ret == 0)
+            stat->duration += duration;
+            stat->totaltime += duration;
+            stat->numrequests += 1;
+            if(duration > stat->slowest)
+                stat->slowest = duration;
+            if(duration < stat->fastest)
+                stat->fastest = duration;
+            stat->totalreqlen += request.length();
+            stat->totalresplen += resplen;
+        }
+
+        if(m_doValidation)
+        {
+            int ret = validate(*bufptr);
+            if(http_tracelevel > 0 && m_doValidation == 1)
             {
-                fprintf(m_logfile, "\n%sSuccessfully validated the response against the xsd%s\n", sepstr, sepstr);
+                if(ret == 0)
+                {
+                    fprintf(m_logfile, "\n%sSuccessfully validated the response against the xsd%s\n", sepstr, sepstr);
+                }
+                else
+                {
+                    fprintf(m_logfile, "Error: Validation against the xsd failed.\n");
+                }
             }
-            else
+        }
+
+        if(http_tracelevel >= 5)
+            fprintf(m_logfile, "%s", sepstr);
+        if(isPersist && iter < numReq - 1)
+        {
+            Owned<CSocketChecker> checker = new CSocketChecker(socket.get());
+            int ret = checker->waitAndCheck(pausemillisecs);
+            if(ret != 0)
             {
-                fprintf(m_logfile, "Error: Validation against the xsd failed.\n");
+                if(ret > 0)
+                    fprintf(m_logfile, "\n>>Persistent connection closed by the other end.\n");
+                else
+                    fprintf(m_logfile, "\n>>Persistent connection got error.\n");
+                break;
             }
         }
     }
 
-    if(http_tracelevel >= 5)
-        fprintf(m_logfile, "%s", sepstr);
-
     return 0;
 }
 
-
 SimpleServer::SimpleServer(IProperties* globals, int port, const char* inputpath, const char* outputdir, bool writeToFiles, int iterations)
 {
     m_globals = globals;
@@ -1666,6 +1727,7 @@ SimpleServer::SimpleServer(IProperties* globals, int port, const char* inputpath
     m_logfile = stdout;
     m_iterations = iterations;
     m_headerlen = 0;
+    m_isPersist = m_globals->getPropBool("isPersist", false);
 }
 
 int SimpleServer::start()
@@ -1727,13 +1789,37 @@ int SimpleServer::start()
         if(m_iterations != -1 && seq >= m_iterations)
             break;
 
-        Owned<ISocket> client = socket->accept();
+        Owned<ISocket> client;
+        if(!m_isPersist || m_persistentSocket.get() == nullptr)
+        {
+            client.setown(socket->accept());
+            if(m_isPersist)
+                m_persistentSocket.set(client.get());
+        }
+        else
+        {
+            Owned<CSocketChecker> checker = new CSocketChecker(m_persistentSocket.get());
+            int ret = checker->waitAndCheck(WAIT_FOREVER);
+            if(ret == 0)
+                client.set(m_persistentSocket.get());
+            else
+            {
+                if(ret > 0)
+                    fprintf(m_logfile, "\n>>Persistent connection closed by the other end, accepting new connection...\n");
+                else
+                    fprintf(m_logfile, "\n>>Persistent connection got error, accepting new connection...\n");
+                m_persistentSocket->shutdown();
+                m_persistentSocket->close();
+                client.setown(socket->accept());
+                m_persistentSocket.set(client.get());
+            }
+        }
 
         char peername[256];
         int port = client->peer_name(peername, 256);
 
         if(http_tracelevel >= 5)
-            fprintf(m_logfile, "\n>>receivd request from %s:%d\n", peername, port);
+            fprintf(m_logfile, "\n>>received request from %s:%d\n", peername, port);
 
         StringBuffer requestbuf;
         StringBuffer reqFileName;
@@ -1947,7 +2033,11 @@ int SimpleServer::start()
         if(server_infile.get() != NULL && server_infile->isDirectory())
             m_response.clear();
 
-        client->close();
+        if(!m_isPersist)
+        {
+            client->shutdown();
+            client->close();
+        }
         seq++;
     }
 

+ 2 - 1
esp/tools/soapplus/http.hpp

@@ -198,7 +198,6 @@ public:
     void addEspRequest(const char* requestId, const char* service, const char* method, StringBuffer& request, HttpStat& httpStat);
 };
 
-
 class SimpleServer
 {
     int m_port;
@@ -212,6 +211,8 @@ class SimpleServer
     IProperties* m_globals;
 
     bool         m_writeToFiles;
+    bool         m_isPersist;
+    Owned<ISocket> m_persistentSocket;
 
 public:
     SimpleServer(IProperties* globals, int port, const char* inputpath = NULL, const char* outputdir = NULL, bool writeToFiles = false, int iterations = -1);

+ 24 - 0
esp/tools/soapplus/main.cpp

@@ -39,6 +39,7 @@ void usage()
     puts("   -t : test schema parser");
     puts("   -diff <left> <right> : compare 2 files or directories. You can ignore certain xpaths, as specified in the config file (with -cfg option). An example of config file is: <whatever><Ignore><XPath>/soap:Body/AAC_AuthorizeRequest/ClientIP</XPath><XPath>/soap:Header/Security/UsernameToken/Username</XPath></Ignore></whatever>");
     puts("   -stress <threads> <seconds> : run stress test, with the specified number of threads, for the specified number of seconds.");
+    puts("   -persist [<number-of-requests>] [<pause-seconds>] : use persistent connection. For client mode you can optionally specify the number of requests to send or receive, and the number of seconds to pause between sending requests.");
     puts("");
     puts("Options: ");
     puts("   -url <[http|https]://[user:passwd@]host:port/path>: the url to send requests to. For esp, the path is normally the service name, not the method name. For examle, it could be WsADL, WsAccurint, WsIdentity etc.");
@@ -631,6 +632,29 @@ int main(int argc, char** argv)
                 return 0;
             }
         }
+        else if(stricmp(argv[i], "-persist") == 0)
+        {
+            i++;
+            globals->setProp("isPersist", 1);
+            if(argc > i)
+            {
+                const char* numstr = argv[i];
+                if(numstr[0] >= '0' && numstr[0] <= '9')
+                {
+                    i++;
+                    globals->setProp("persistrequests", numstr);
+                    if(argc > i)
+                    {
+                        const char* secstr = argv[i];
+                        if((secstr[0] >= '0' && secstr[0] <= '9') || secstr[0] == '.')
+                        {
+                            i++;
+                            globals->setProp("persistpause", secstr);
+                        }
+                    }
+                }
+            }
+        }
         else if(stricmp(argv[i], "-delay") == 0)
         {
             i++;