|
@@ -232,6 +232,13 @@ bool CWsEclService::init(const char * name, const char * type, IPropertyTree * c
|
|
|
else
|
|
|
workunitTimeout = WAIT_FOREVER;
|
|
|
|
|
|
+ const char *headerName = serviceTree->queryProp("HttpGlobalIdHeader");
|
|
|
+ if (headerName && *headerName && !streq(headerName, "HPCC-Global-Id")) //default will be checked anyway
|
|
|
+ globalIdHttpHeader.set(headerName);
|
|
|
+ headerName = serviceTree->queryProp("HttpCallerIdHeader");
|
|
|
+ if (headerName && *headerName && !streq(headerName, "HPCC-Caller-Id")) //default will be checked anyway
|
|
|
+ callerIdHttpHeader.set(headerName);
|
|
|
+
|
|
|
Owned<IPropertyTreeIterator> cfgTargets = serviceTree->getElements("Targets/Target");
|
|
|
ForEach(*cfgTargets)
|
|
|
targets.append(cfgTargets->query().queryProp(NULL));
|
|
@@ -1850,7 +1857,7 @@ int CWsEclBinding::getWsEcl2Form(CHttpRequest* request, CHttpResponse* response,
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
-int CWsEclBinding::submitWsEclWorkunit(IEspContext & context, WsEclWuInfo &wsinfo, IPropertyTree *reqTree, StringBuffer &out, unsigned flags, TextMarkupFormat fmt, const char *viewname, const char *xsltname)
|
|
|
+int CWsEclBinding::submitWsEclWorkunit(IEspContext & context, WsEclWuInfo &wsinfo, IPropertyTree *reqTree, StringBuffer &out, unsigned flags, CHttpRequest *httpreq, TextMarkupFormat fmt, const char *viewname, const char *xsltname)
|
|
|
{
|
|
|
IConstWorkUnit *sourceWorkUnit = wsinfo.ensureWorkUnit();
|
|
|
|
|
@@ -1871,6 +1878,23 @@ int CWsEclBinding::submitWsEclWorkunit(IEspContext & context, WsEclWuInfo &wsinf
|
|
|
|
|
|
StringAttr wuid(workunit->queryWuid()); // NB queryWuid() not valid after workunit,clear()
|
|
|
|
|
|
+ if (httpreq)
|
|
|
+ {
|
|
|
+ StringBuffer globalId, callerId;
|
|
|
+ wsecl->getHttpGlobalIdHeader(httpreq, globalId);
|
|
|
+ wsecl->getHttpCallerIdHeader(httpreq, callerId);
|
|
|
+ if (globalId.length())
|
|
|
+ {
|
|
|
+ workunit->setDebugValue("GlobalId", globalId.str(), true);
|
|
|
+
|
|
|
+ SocketEndpoint ep;
|
|
|
+ StringBuffer localId;
|
|
|
+ appendLocalId(localId, httpreq->getSocket()->getEndpoint(ep), 0);
|
|
|
+ workunit->setDebugValue("CallerId", localId.str(), true); //our localId becomes caller id for the next hop
|
|
|
+ DBGLOG("GlobalId: %s, CallerId: %s, LocalId: %s, Wuid: %s", globalId.str(), callerId.str(), localId.str(), wuid.str());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
SCMStringBuffer token;
|
|
|
createToken(wuid.str(), context.queryUserId(), context.queryPassword(), token);
|
|
|
workunit->setSecurityToken(token.str());
|
|
@@ -1923,13 +1947,13 @@ int CWsEclBinding::submitWsEclWorkunit(IEspContext & context, WsEclWuInfo &wsinf
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
-int CWsEclBinding::submitWsEclWorkunit(IEspContext & context, WsEclWuInfo &wsinfo, const char *xml, StringBuffer &out, unsigned flags, TextMarkupFormat fmt, const char *viewname, const char *xsltname)
|
|
|
+int CWsEclBinding::submitWsEclWorkunit(IEspContext & context, WsEclWuInfo &wsinfo, const char *xml, StringBuffer &out, unsigned flags, CHttpRequest *httpreq, TextMarkupFormat fmt, const char *viewname, const char *xsltname)
|
|
|
{
|
|
|
Owned<IPropertyTree> reqTree = createPTreeFromXMLString(xml, ipt_ordered, (PTreeReaderOptions)(ptr_ignoreWhiteSpace|ptr_ignoreNameSpaces));
|
|
|
- return submitWsEclWorkunit(context, wsinfo, reqTree, out, flags, fmt, viewname, xsltname);
|
|
|
+ return submitWsEclWorkunit(context, wsinfo, reqTree, out, flags, httpreq, fmt, viewname, xsltname);
|
|
|
}
|
|
|
|
|
|
-void CWsEclBinding::sendRoxieRequest(const char *target, StringBuffer &req, StringBuffer &resp, StringBuffer &status, const char *query, bool trim, const char *contentType)
|
|
|
+void CWsEclBinding::sendRoxieRequest(const char *target, StringBuffer &req, StringBuffer &resp, StringBuffer &status, const char *query, bool trim, const char *contentType, CHttpRequest *httpreq)
|
|
|
{
|
|
|
ISmartSocketFactory *conn = NULL;
|
|
|
SocketEndpoint ep;
|
|
@@ -1949,8 +1973,28 @@ void CWsEclBinding::sendRoxieRequest(const char *target, StringBuffer &req, Stri
|
|
|
if (!trim)
|
|
|
url.append("?.trim=0");
|
|
|
|
|
|
+ Owned<IProperties> headers;
|
|
|
Owned<IHttpClient> httpclient = httpctx->createHttpClient(NULL, url);
|
|
|
httpclient->setTimeOut(wsecl->roxieTimeout);
|
|
|
+ if (httpreq)
|
|
|
+ {
|
|
|
+ StringBuffer globalId, callerId;
|
|
|
+ wsecl->getHttpGlobalIdHeader(httpreq, globalId);
|
|
|
+ wsecl->getHttpCallerIdHeader(httpreq, callerId);
|
|
|
+
|
|
|
+ if (globalId.length())
|
|
|
+ {
|
|
|
+ headers.setown(createProperties());
|
|
|
+ headers->setProp(wsecl->queryGlobalIdHeaderName(), globalId);
|
|
|
+
|
|
|
+ SocketEndpoint ep;
|
|
|
+ StringBuffer localId;
|
|
|
+ appendLocalId(localId, httpreq->getSocket()->getEndpoint(ep), 0);
|
|
|
+ if (localId.length())
|
|
|
+ headers->setProp(wsecl->queryCallerIdHeaderName(), localId);
|
|
|
+ DBGLOG("GlobalId: %s, CallerId: %s, LocalId: %s", globalId.str(), callerId.str(), localId.str());
|
|
|
+ }
|
|
|
+ }
|
|
|
if (0 > httpclient->sendRequest("POST", contentType, req, resp, status))
|
|
|
throw MakeStringException(-1, "Roxie cluster communication error: %s", target);
|
|
|
}
|
|
@@ -2000,7 +2044,7 @@ int CWsEclBinding::onSubmitQueryOutput(IEspContext &context, CHttpRequest* reque
|
|
|
getWsEclJsonRequest(jsonmsg, context, request, wsinfo, "json", NULL, REQSF_TRIM, false);
|
|
|
if (jsonp && *jsonp)
|
|
|
output.append(jsonp).append('(');
|
|
|
- sendRoxieRequest(wsinfo.qsetname.get(), jsonmsg, output, status, wsinfo.queryname, trim, "application/json");
|
|
|
+ sendRoxieRequest(wsinfo.qsetname.get(), jsonmsg, output, status, wsinfo.queryname, trim, "application/json", request);
|
|
|
if (jsonp && *jsonp)
|
|
|
output.append(");");
|
|
|
}
|
|
@@ -2017,11 +2061,11 @@ int CWsEclBinding::onSubmitQueryOutput(IEspContext &context, CHttpRequest* reque
|
|
|
if (!format || !streq(format, "expanded"))
|
|
|
xmlflags |= WWV_OMIT_SCHEMAS;
|
|
|
if (!isRoxieReq)
|
|
|
- submitWsEclWorkunit(context, wsinfo, soapmsg.str(), output, xmlflags, outputJSON ? MarkupFmt_JSON : MarkupFmt_XML);
|
|
|
+ submitWsEclWorkunit(context, wsinfo, soapmsg.str(), output, xmlflags, request, outputJSON ? MarkupFmt_JSON : MarkupFmt_XML);
|
|
|
else
|
|
|
{
|
|
|
StringBuffer roxieresp;
|
|
|
- sendRoxieRequest(wsinfo.qsetname, soapmsg, roxieresp, status, wsinfo.queryname, trim, "text/xml");
|
|
|
+ sendRoxieRequest(wsinfo.qsetname, soapmsg, roxieresp, status, wsinfo.queryname, trim, "text/xml", request);
|
|
|
if (xmlflags & WWV_OMIT_SCHEMAS)
|
|
|
expandWuXmlResults(output, wsinfo.queryname, roxieresp.str(), xmlflags);
|
|
|
else
|
|
@@ -2069,7 +2113,7 @@ int CWsEclBinding::onSubmitQueryOutputView(IEspContext &context, CHttpRequest* r
|
|
|
const char *view = context.queryRequestParameters()->queryProp("view");
|
|
|
if (strieq(clustertype.str(), "roxie"))
|
|
|
{
|
|
|
- sendRoxieRequest(wsinfo.qsetname.get(), soapmsg, output, status, wsinfo.queryname, false, "text/xml");
|
|
|
+ sendRoxieRequest(wsinfo.qsetname.get(), soapmsg, output, status, wsinfo.queryname, false, "text/xml", request);
|
|
|
Owned<IWuWebView> web = createWuWebView(*wu, wsinfo.qsetname.get(), wsinfo.queryname.get(), getCFD(), true, queryXsltConfig());
|
|
|
if (!view)
|
|
|
web->applyResultsXSLT(xsltfile.str(), output.str(), html);
|
|
@@ -2078,7 +2122,7 @@ int CWsEclBinding::onSubmitQueryOutputView(IEspContext &context, CHttpRequest* r
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- submitWsEclWorkunit(context, wsinfo, soapmsg.str(), html, 0, MarkupFmt_XML, view, xsltfile.str());
|
|
|
+ submitWsEclWorkunit(context, wsinfo, soapmsg.str(), html, 0, request, MarkupFmt_XML, view, xsltfile.str());
|
|
|
}
|
|
|
|
|
|
response->setContent(html.str());
|
|
@@ -2484,7 +2528,7 @@ int CWsEclBinding::onGet(CHttpRequest* request, CHttpResponse* response)
|
|
|
StringBuffer status;
|
|
|
if (getEspLogLevel()>LogNormal)
|
|
|
DBGLOG("roxie req: %s", soapreq.str());
|
|
|
- sendRoxieRequest(target, soapreq, output, status, qid, parms->getPropBool(".trim", true), "text/xml");
|
|
|
+ sendRoxieRequest(target, soapreq, output, status, qid, parms->getPropBool(".trim", true), "text/xml", request);
|
|
|
if (getEspLogLevel()>LogNormal)
|
|
|
DBGLOG("roxie resp: %s", output.str());
|
|
|
|
|
@@ -2672,7 +2716,7 @@ void CWsEclBinding::handleJSONPost(CHttpRequest *request, CHttpResponse *respons
|
|
|
|
|
|
StringBuffer status;
|
|
|
if (wsecl->connMap.getValue(queryset.str()))
|
|
|
- sendRoxieRequest(queryset.str(), content, jsonresp, status, queryname.str(), trim, "application/json");
|
|
|
+ sendRoxieRequest(queryset.str(), content, jsonresp, status, queryname.str(), trim, "application/json", request);
|
|
|
else
|
|
|
{
|
|
|
WsEclWuInfo wsinfo(wuid.str(), queryset.str(), queryname.str(), ctx->queryUserId(), ctx->queryPassword());
|
|
@@ -2691,7 +2735,7 @@ void CWsEclBinding::handleJSONPost(CHttpRequest *request, CHttpResponse *respons
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
- submitWsEclWorkunit(*ctx, wsinfo, reqTree, jsonresp, 0, MarkupFmt_JSON);
|
|
|
+ submitWsEclWorkunit(*ctx, wsinfo, reqTree, jsonresp, 0, request, MarkupFmt_JSON);
|
|
|
}
|
|
|
if (jsonp && *jsonp)
|
|
|
jsonresp.append(");");
|
|
@@ -2789,7 +2833,7 @@ int CWsEclBinding::HandleSoapRequest(CHttpRequest* request, CHttpResponse* respo
|
|
|
bool trim = ctx->queryRequestParameters()->getPropBool(".trim", true);
|
|
|
StringBuffer content(request->queryContent());
|
|
|
StringBuffer output;
|
|
|
- sendRoxieRequest(target, content, output, status, queryname, trim, "text/xml");
|
|
|
+ sendRoxieRequest(target, content, output, status, queryname, trim, "text/xml", request);
|
|
|
if (!(xmlflags & WWV_CDATA_SCHEMAS))
|
|
|
soapresp.swapWith(output);
|
|
|
else
|
|
@@ -2808,7 +2852,7 @@ int CWsEclBinding::HandleSoapRequest(CHttpRequest* request, CHttpResponse* respo
|
|
|
else
|
|
|
{
|
|
|
WsEclWuInfo wsinfo(wuid.str(), target.str(), queryname.str(), ctx->queryUserId(), ctx->queryPassword());
|
|
|
- submitWsEclWorkunit(*ctx, wsinfo, content.str(), soapresp, xmlflags);
|
|
|
+ submitWsEclWorkunit(*ctx, wsinfo, content.str(), soapresp, xmlflags, request);
|
|
|
}
|
|
|
|
|
|
if (getEspLogLevel()>LogNormal)
|