Browse Source

HPCC-17569 Add roxie support for persistent http connections

Signed-off-by: Anthony Fishbeck <anthony.fishbeck@lexisnexis.com>
Anthony Fishbeck 7 years ago
parent
commit
4ed26462f5

+ 14 - 3
common/thorhelper/roxiehelper.cpp

@@ -1598,7 +1598,15 @@ void CSafeSocket::setHttpMode(const char *queryName, bool arrayMode, HttpHelper
     mlResponseFmt = httphelper.queryResponseMlFormat();
     respCompression = httphelper.getRespCompression();
     heartbeat = false;
-    assertex(contentHead.length()==0 && contentTail.length()==0);
+
+    //reset persistent http connection
+    contentHead.clear();
+    contentTail.clear();
+    ForEachItemIn(idx, queued)
+        free(queued.item(idx));
+    queued.kill();
+    lengths.kill();
+
     if (mlResponseFmt==MarkupFmt_JSON)
     {
         contentHead.set("{");
@@ -1747,10 +1755,11 @@ private:
     StringBuffer content;
     ISocket *sock = nullptr;
     HttpCompression compression = HttpCompression::NONE;
+    bool httpKeepAlive = false;
     unsigned int sent = 0;
 public:
 
-    HttpResponseHandler(ISocket *s, CriticalSection &crit) : sock(s), c(crit)
+    HttpResponseHandler(ISocket *s, CriticalSection &crit, bool keepAlive) : sock(s), c(crit), httpKeepAlive(keepAlive)
     {
     }
     inline bool compressing()
@@ -1771,6 +1780,8 @@ public:
             compression = respCompression;
         header.append("HTTP/1.0 200 OK\r\n");
         header.append("Content-Type: ").append(mlFmt == MarkupFmt_JSON ? "application/json" : "text/xml").append("\r\n");
+        if (httpKeepAlive)
+            header.append("Connection: Keep-Alive\r\n");
         if (!compressing())
         {
             header.append("Content-Length: ").append(length).append("\r\n\r\n");
@@ -1839,7 +1850,7 @@ void CSafeSocket::flush()
         ForEachItemIn(idx, lengths)
             contentLength += lengths.item(idx);
 
-        HttpResponseHandler resp(sock, crit);
+        HttpResponseHandler resp(sock, crit, httpKeepAlive);
 
         resp.init(contentLength, mlResponseFmt, respCompression);
         if (!adaptiveRoot || mlResponseFmt != MarkupFmt_JSON)

+ 18 - 0
common/thorhelper/roxiehelper.hpp

@@ -39,6 +39,7 @@ class THORHELPER_API HttpHelper : public CInterface
 private:
     HttpMethod method;
     bool useEnvelope = false;
+    StringAttr version;
     StringAttr url;
     StringAttr authToken;
     StringAttr contentType;
@@ -65,6 +66,13 @@ public:
     HttpHelper(StringArray *_validTargets) : validTargets(_validTargets), method(HttpMethod::NONE) {parameters.setown(createProperties(true));}
     inline bool isHttp() { return method!=HttpMethod::NONE; }
     inline bool isHttpGet(){ return method==HttpMethod::GET; }
+    inline bool allowKeepAlive()
+    {
+        const char *connection = queryRequestHeader("Connection");
+        if (!connection)
+            return !streq(version, "1.0");
+        return strieq(connection, "Keep-Alive");
+    }
     inline bool isControlUrl()
     {
         const char *control = queryTarget();
@@ -146,6 +154,13 @@ public:
         {
             url.set(v, end - v);
             parseURL();
+            v=end+5;
+            if (*v=='/')
+            {
+                end=strstr(++v, "\r\n");
+                if (end)
+                    version.set(v, end-v);
+            }
         }
     }
     void parseRequestHeaders(const char *headers);
@@ -343,6 +358,7 @@ interface SafeSocket : extends IInterface
     virtual void sendJsonException(IException *E, const char *queryName) = 0;
     virtual void setHttpMode(const char *queryName, bool arrayMode, HttpHelper &httphelper) = 0;
     virtual void setHttpMode(bool mode) = 0;
+    virtual void setHttpKeepAlive(bool val) = 0;
     virtual void setHeartBeat() = 0;
     virtual bool sendHeartBeat(const IContextLogger &logctx) = 0;
     virtual void flush() = 0;
@@ -363,6 +379,7 @@ class THORHELPER_API CSafeSocket : implements SafeSocket, public CInterface
 protected:
     Linked<ISocket> sock;
     bool httpMode;
+    bool httpKeepAlive = false;
     bool heartbeat;
     bool adaptiveRoot = false;
     TextMarkupFormat mlResponseFmt = MarkupFmt_Unknown;
@@ -386,6 +403,7 @@ public:
     bool readBlock(StringBuffer &ret, unsigned timeout, HttpHelper *pHttpHelper, bool &, bool &, unsigned maxBlockSize);
     void setHttpMode(const char *queryName, bool arrayMode, HttpHelper &httphelper);
     void setHttpMode(bool mode) override {httpMode = mode;}
+    virtual void setHttpKeepAlive(bool val) { httpKeepAlive = val; }
     void setAdaptiveRoot(bool adaptive){adaptiveRoot=adaptive;}
     bool getAdaptiveRoot(){return adaptiveRoot;}
     void checkSendHttpException(HttpHelper &httphelper, IException *E, const char *queryName);

+ 2 - 0
configuration/xsds_xmls/experimental.xml

@@ -887,6 +887,8 @@
                 logQueueLen="512"
                 lowTimeout="10000"
                 maxBlockSize="10000000"
+                maxHttpConnectionRequests="10"
+                maxHttpKeepAliveWait="5000"
                 maxLocalFilesOpen="4000"
                 maxLockAttempts="5"
                 maxRemoteFilesOpen="1000"

+ 14 - 0
initfiles/componentfiles/configxml/roxie.xsd.in

@@ -718,6 +718,20 @@
         </xs:appinfo>
       </xs:annotation>
     </xs:attribute>
+    <xs:attribute name="maxHttpConnectionRequests" type="xs:nonNegativeInteger" use="optional" default="10">
+      <xs:annotation>
+        <xs:appinfo>
+          <tooltip>Max number of query requests per persistent http connection</tooltip>
+        </xs:appinfo>
+      </xs:annotation>
+    </xs:attribute>
+    <xs:attribute name="maxHttpKeepAliveWait" type="xs:nonNegativeInteger" use="optional" default="5000">
+      <xs:annotation>
+        <xs:appinfo>
+          <tooltip>Max number of miliseconds to wait for addtional requests on a persistent http connection</tooltip>
+        </xs:appinfo>
+      </xs:annotation>
+    </xs:attribute>
     <xs:attribute name="maxLocalFilesOpen" type="xs:nonNegativeInteger" use="optional" default="4000">
       <xs:annotation>
         <xs:appinfo>

+ 2 - 0
initfiles/etc/DIR_NAME/environment.xml.in

@@ -87,6 +87,8 @@
                 logQueueLen="512"
                 lowTimeout="10000"
                 maxBlockSize="10000000"
+                maxHttpConnectionRequests="10"
+                maxHttpKeepAliveWait="5000"
                 maxLocalFilesOpen="4000"
                 maxLockAttempts="5"
                 maxRemoteFilesOpen="1000"

+ 20 - 1
roxie/ccd/ccdprotocol.cpp

@@ -56,6 +56,8 @@ public:
         defaultXmlReadFlags = ctx.ctxGetPropBool("@defaultStripLeadingWhitespace", true) ? ptr_ignoreWhiteSpace : ptr_none;
         trapTooManyActiveQueries = ctx.ctxGetPropBool("@trapTooManyActiveQueries", true);
         numRequestArrayThreads = ctx.ctxGetPropInt("@requestArrayThreads", 5);
+        maxHttpConnectionRequests = ctx.ctxGetPropInt("@maxHttpConnectionRequests", 10);
+        maxHttpKeepAliveWait = ctx.ctxGetPropInt("@maxHttpKeepAliveWait", 5000);
     }
     IHpccProtocolListener *createListener(const char *protocol, IHpccProtocolMsgSink *sink, unsigned port, unsigned listenQueue, const char *config, const char *certFile=nullptr, const char *keyFile=nullptr, const char *passPhrase=nullptr)
     {
@@ -67,6 +69,8 @@ public:
     PTreeReaderOptions defaultXmlReadFlags;
     unsigned maxBlockSize;
     unsigned numRequestArrayThreads;
+    unsigned maxHttpConnectionRequests = 10;
+    unsigned maxHttpKeepAliveWait = 5000;
     bool trapTooManyActiveQueries;
 };
 
@@ -1701,6 +1705,8 @@ private:
         IpAddress peer;
         bool continuationNeeded = false;
         bool isStatus = false;
+        unsigned remainingHttpConnectionRequests = global->maxHttpConnectionRequests ? global->maxHttpConnectionRequests : 1;
+        unsigned readWait = WAIT_FOREVER;
 
         Owned<IHpccProtocolMsgContext> msgctx = sink->createMsgContext(startTime);
         IContextLogger &logctx = *msgctx->queryLogContext();
@@ -1715,7 +1721,7 @@ readAnother:
             if (client)
             {
                 client->querySocket()->getPeerAddress(peer);
-                if (!client->readBlock(rawText.clear(), WAIT_FOREVER, &httpHelper, continuationNeeded, isStatus, global->maxBlockSize))
+                if (!client->readBlock(rawText.clear(), readWait, &httpHelper, continuationNeeded, isStatus, global->maxBlockSize))
                 {
                     if (traceLevel > 8)
                     {
@@ -1745,6 +1751,13 @@ readAnother:
         }
 
         bool isHTTP = httpHelper.isHttp();
+        if (isHTTP)
+        {
+            if (httpHelper.allowKeepAlive())
+                client->setHttpKeepAlive(remainingHttpConnectionRequests > 1);
+            else
+                remainingHttpConnectionRequests = 1;
+        }
 
         TextMarkupFormat mlResponseFmt = MarkupFmt_Unknown;
         TextMarkupFormat mlRequestFmt = MarkupFmt_Unknown;
@@ -2083,6 +2096,12 @@ readAnother:
                     unsigned replyLen = 0;
                     client->write(&replyLen, sizeof(replyLen));
                 }
+                if (--remainingHttpConnectionRequests > 0)
+                {
+                    readWait = global->maxHttpKeepAliveWait;
+                    goto readAnother;
+                }
+
                 client.clear();
             }
             catch (IException * E)

+ 2 - 0
testing/regress/environment.xml.in

@@ -86,6 +86,8 @@
                 logQueueLen="512"
                 lowTimeout="10000"
                 maxBlockSize="10000000"
+                maxHttpConnectionRequests="10"
+                maxHttpKeepAliveWait="5000"
                 maxLocalFilesOpen="4000"
                 maxLockAttempts="5"
                 maxRemoteFilesOpen="1000"