|
@@ -56,7 +56,7 @@ public:
|
|
|
defaultXmlReadFlags = ctx.ctxGetPropBool("@defaultStripLeadingWhitespace", true) ? ptr_ignoreWhiteSpace : ptr_none;
|
|
|
trapTooManyActiveQueries = ctx.ctxGetPropBool("@trapTooManyActiveQueries", true);
|
|
|
numRequestArrayThreads = ctx.ctxGetPropInt("@requestArrayThreads", 5);
|
|
|
- maxHttpConnectionRequests = ctx.ctxGetPropInt("@maxHttpConnectionRequests", 10);
|
|
|
+ maxHttpConnectionRequests = ctx.ctxGetPropInt("@maxHttpConnectionRequests", 0);
|
|
|
maxHttpKeepAliveWait = ctx.ctxGetPropInt("@maxHttpKeepAliveWait", 5000); // In milliseconds
|
|
|
}
|
|
|
IHpccProtocolListener *createListener(const char *protocol, IHpccProtocolMsgSink *sink, unsigned port, unsigned listenQueue, const char *config, const char *certFile=nullptr, const char *keyFile=nullptr, const char *passPhrase=nullptr)
|
|
@@ -69,7 +69,7 @@ public:
|
|
|
PTreeReaderOptions defaultXmlReadFlags;
|
|
|
unsigned maxBlockSize;
|
|
|
unsigned numRequestArrayThreads;
|
|
|
- unsigned maxHttpConnectionRequests = 10;
|
|
|
+ unsigned maxHttpConnectionRequests = 0;
|
|
|
unsigned maxHttpKeepAliveWait = 5000;
|
|
|
bool trapTooManyActiveQueries;
|
|
|
};
|
|
@@ -1732,12 +1732,12 @@ private:
|
|
|
unsigned memused = 0;
|
|
|
IpAddress peer;
|
|
|
bool continuationNeeded = false;
|
|
|
+ bool resetQstart = false;
|
|
|
bool isStatus = false;
|
|
|
unsigned remainingHttpConnectionRequests = global->maxHttpConnectionRequests ? global->maxHttpConnectionRequests : 1;
|
|
|
unsigned readWait = WAIT_FOREVER;
|
|
|
|
|
|
Owned<IHpccProtocolMsgContext> msgctx = sink->createMsgContext(startTime);
|
|
|
- IContextLogger &logctx = *msgctx->queryLogContext();
|
|
|
|
|
|
readAnother:
|
|
|
unsigned agentsReplyLen = 0;
|
|
@@ -1762,15 +1762,32 @@ readAnother:
|
|
|
return;
|
|
|
}
|
|
|
}
|
|
|
- if (continuationNeeded)
|
|
|
+ if (resetQstart)
|
|
|
{
|
|
|
+ resetQstart = false;
|
|
|
qstart = msTick();
|
|
|
time(&startTime);
|
|
|
+ msgctx.setown(sink->createMsgContext(startTime));
|
|
|
}
|
|
|
}
|
|
|
catch (IException * E)
|
|
|
{
|
|
|
- if (traceLevel > 0)
|
|
|
+ bool expectedError = false;
|
|
|
+ if (resetQstart) //persistent connection - initial request has already been processed
|
|
|
+ {
|
|
|
+ switch (E->errorCode())
|
|
|
+ {
|
|
|
+ //closing of persistent socket is not an error
|
|
|
+ case JSOCKERR_not_opened:
|
|
|
+ case JSOCKERR_broken_pipe:
|
|
|
+ case JSOCKERR_timeout_expired:
|
|
|
+ case JSOCKERR_graceful_close:
|
|
|
+ expectedError = true;
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (traceLevel > 0 && !expectedError)
|
|
|
{
|
|
|
StringBuffer b;
|
|
|
IERRLOG("Error reading query from socket: %s", E->errorMessage(b).str());
|
|
@@ -1780,6 +1797,7 @@ readAnother:
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ IContextLogger &logctx = *msgctx->queryLogContext();
|
|
|
bool isHTTP = httpHelper.isHttp();
|
|
|
if (isHTTP)
|
|
|
{
|
|
@@ -1817,6 +1835,7 @@ readAnother:
|
|
|
bool isBlind = false;
|
|
|
bool isDebug = false;
|
|
|
unsigned protocolFlags = isHTTP ? 0 : HPCC_PROTOCOL_NATIVE;
|
|
|
+ unsigned requestArraySize = 0; //for logging, considering all the ways requests can be counted this name seems least confusing
|
|
|
|
|
|
Owned<IPropertyTree> queryPT;
|
|
|
StringBuffer sanitizedText;
|
|
@@ -1866,7 +1885,7 @@ readAnother:
|
|
|
Owned<IHpccProtocolResponse> protocol = createProtocolResponse(queryPT->queryName(), client, httpHelper, logctx, protocolFlags | HPCC_PROTOCOL_CONTROL, global->defaultXmlReadFlags);
|
|
|
sink->onControlMsg(msgctx, queryPT, protocol);
|
|
|
protocol->finalize(0);
|
|
|
- if (streq(queryName, "lock") || streq(queryName, "childlock"))
|
|
|
+ if (streq(queryName, "lock") || streq(queryName, "childlock")) //don't reset qstart, lock time should be included
|
|
|
goto readAnother;
|
|
|
}
|
|
|
else if (isStatus)
|
|
@@ -1980,6 +1999,7 @@ readAnother:
|
|
|
fixedreq->addPropTree(iter->query().queryName(), LINK(&iter->query()));
|
|
|
}
|
|
|
requestArray.append(*fixedreq);
|
|
|
+ requestArraySize++;
|
|
|
}
|
|
|
}
|
|
|
else
|
|
@@ -1991,6 +2011,7 @@ readAnother:
|
|
|
fixedreq->addPropTree(iter->query().queryName(), LINK(&iter->query()));
|
|
|
}
|
|
|
requestArray.append(*fixedreq);
|
|
|
+ requestArraySize = 1;
|
|
|
|
|
|
msgctx->setIntercept(queryPT->getPropBool("@log", false));
|
|
|
msgctx->setTraceLevel(queryPT->getPropInt("@traceLevel", logctx.queryTraceLevel()));
|
|
@@ -2108,10 +2129,11 @@ readAnother:
|
|
|
}
|
|
|
unsigned bytesOut = client? client->bytesOut() : 0;
|
|
|
unsigned elapsed = msTick() - qstart;
|
|
|
- sink->noteQuery(msgctx.get(), peerStr, failed, bytesOut, elapsed, memused, agentsReplyLen, agentsDuplicates, agentsResends, continuationNeeded);
|
|
|
+ sink->noteQuery(msgctx.get(), peerStr, failed, bytesOut, elapsed, memused, agentsReplyLen, agentsDuplicates, agentsResends, continuationNeeded, requestArraySize);
|
|
|
if (continuationNeeded)
|
|
|
{
|
|
|
rawText.clear();
|
|
|
+ resetQstart = true;
|
|
|
goto readAnother;
|
|
|
}
|
|
|
else
|
|
@@ -2132,6 +2154,7 @@ readAnother:
|
|
|
if (isHTTP && --remainingHttpConnectionRequests > 0)
|
|
|
{
|
|
|
readWait = global->maxHttpKeepAliveWait;
|
|
|
+ resetQstart = true;
|
|
|
goto readAnother;
|
|
|
}
|
|
|
|