Browse Source

HPCC-9224 WsEcl support for starting workunits asynchronously

Asynchrounous workunits can now be created by changing the URL used to
submit queries.  Supported by HTTP-GET, SOAP, JSON, and FORM-POST.

Replacing "submit" in the url with "async" will cause workunit based
queries to immediately return the WUID of the result workunit rather
than waiting for the workunit to complete.

Synchronous URL:

http://ip:port/WsEcl/submit/query/hthor/myquery?parm1=abc

Aysnchrounous URL:

http://ip:port/WsEcl/async/query/hthor/myquery?parm1=abc

Signed-off-by: Anthony Fishbeck <Anthony.Fishbeck@lexisnexis.com>
Anthony Fishbeck 12 years ago
parent
commit
7f8f0a35cc

+ 14 - 0
common/wuwebview/wuwebview.cpp

@@ -288,6 +288,8 @@ public:
     virtual void renderSingleResult(const char *viewName, const char *resultname, StringBuffer &html);
     virtual void expandResults(const char *xml, StringBuffer &out, unsigned flags);
     virtual void expandResults(StringBuffer &out, unsigned flags);
+    virtual void createWuidResponse(StringBuffer &out, unsigned flags);
+
     virtual void applyResultsXSLT(const char *filename, const char *xml, StringBuffer &out);
     virtual void applyResultsXSLT(const char *filename, StringBuffer &out);
     virtual StringBuffer &aggregateResources(const char *type, StringBuffer &content);
@@ -572,6 +574,18 @@ void WuWebView::expandResults(const char *xml, StringBuffer &out, unsigned flags
     out.append(expander.buffer);
 }
 
+void WuWebView::createWuidResponse(StringBuffer &out, unsigned flags)
+{
+    flags &= ~WWV_ADD_RESULTS_TAG;
+    flags |= WWV_OMIT_RESULT_TAG;
+
+    WuExpandedResultBuffer expander(name.str(), flags);
+    SCMStringBuffer wuid;
+    appendXMLTag(expander.buffer, "Wuid", cw->getWuid(wuid).str());
+    expander.finalize();
+    out.append(expander.buffer);
+}
+
 void WuWebView::expandResults(StringBuffer &out, unsigned flags)
 {
     SCMStringBuffer xml;

+ 1 - 0
common/wuwebview/wuwebview.hpp

@@ -52,6 +52,7 @@ interface IWuWebView : extends IInterface
     virtual void expandResults(StringBuffer &out, unsigned flags)=0;
     virtual void addInputsFromPTree(IPropertyTree *pt)=0;
     virtual void addInputsFromXml(const char *xml)=0;
+    virtual void createWuidResponse(StringBuffer &out, unsigned flags)=0;
 };
 
 extern WUWEBVIEW_API IWuWebView *createWuWebView(IConstWorkUnit &wu, const char *queryname, const char*dir, bool mapEspDir);

+ 30 - 7
esp/services/ws_ecl/ws_ecl_service.cpp

@@ -1462,6 +1462,7 @@ int CWsEclBinding::getXsdDefinition(IEspContext &context, CHttpRequest *request,
             content.append("<xsd:complexType>");
             content.append("<xsd:all>");
             content.append("<xsd:element name=\"Exceptions\" type=\"tns:ArrayOfEspException\" minOccurs=\"0\"/>");
+            content.append("<xsd:element name=\"Wuid\" type=\"xsd:string\" minOccurs=\"0\"/>");
 
             Owned<IPropertyTreeIterator> result_xsds =xsdtree->getElements("Result");
             if (!result_xsds->first())
@@ -1829,14 +1830,24 @@ void CWsEclBinding::getWsEclJsonResponse(StringBuffer& jsonmsg, IEspContext &con
             node = node->queryPropTree("Body");
         if (node->hasProp(element))
             node = node->queryPropTree(element);
+        const char *wuid = node->queryProp("Wuid");
         if (node->hasProp("Results"))
             node = node->queryPropTree("Results");
         if (node->hasProp("Result"))
             node = node->queryPropTree("Result");
 
-        jsonmsg.appendf("{\n  \"%s\": {\n    \"Results\": {\n", element.str());
-
+        jsonmsg.appendf("{\n  \"%s\": {\n", element.str());
         Owned<IPropertyTreeIterator> exceptions = node->getElements("Exception");
+        Owned<IPropertyTreeIterator> datasets = node->getElements("Dataset");
+        if (wuid && *wuid)
+            appendJSONValue(jsonmsg.pad(3), "Wuid", wuid);
+        if ((!exceptions || !exceptions->first()) && (!datasets || !datasets->first()))
+        {
+            jsonmsg.append("  }\n}");
+            return;
+        }
+
+        appendJSONName(jsonmsg.pad(3), "Results").append("{\n");
         if (exceptions && exceptions->first())
         {
             appendJSONName(jsonmsg.pad(3), "Exceptions").append("{\n");
@@ -1846,7 +1857,6 @@ void CWsEclBinding::getWsEclJsonResponse(StringBuffer& jsonmsg, IEspContext &con
             jsonmsg.append("\n   ]\n    }\n");
         }
 
-        Owned<IPropertyTreeIterator> datasets = node->getElements("Dataset");
         ForEach(*datasets)
         {
             IPropertyTree &ds = datasets->query();
@@ -2159,7 +2169,6 @@ void CWsEclBinding::addParameterToWorkunit(IWorkUnit * workunit, IConstWUResult
     }
 }
 
-
 int CWsEclBinding::submitWsEclWorkunit(IEspContext & context, WsEclWuInfo &wsinfo, const char *xml, StringBuffer &out, unsigned flags, const char *viewname, const char *xsltname)
 {
     Owned <IWorkUnitFactory> factory = getSecWorkUnitFactory(*context.querySecManager(), *context.queryUser());
@@ -2210,9 +2219,11 @@ int CWsEclBinding::submitWsEclWorkunit(IEspContext & context, WsEclWuInfo &wsinf
 
     runWorkUnit(wuid.str(), wsinfo.qsetname.sget());
 
+    bool async = context.queryRequestParameters()->hasProp("_async");
+
     //don't wait indefinately, in case submitted to an inactive queue wait max + 5 mins
     int wutimeout = 300000;
-    if (waitForWorkUnitToComplete(wuid.str(), wutimeout))
+    if (!async && waitForWorkUnitToComplete(wuid.str(), wutimeout))
     {
         Owned<IWuWebView> web = createWuWebView(wuid.str(), wsinfo.queryname.get(), getCFD(), true);
         if (!web)
@@ -2229,10 +2240,13 @@ int CWsEclBinding::submitWsEclWorkunit(IEspContext & context, WsEclWuInfo &wsinf
     }
     else
     {
-        DBGLOG("WS-ECL request timed out, WorkUnit %s", wuid.str());
+        if (!async)
+            DBGLOG("WS-ECL request timed out, WorkUnit %s", wuid.str());
+        Owned<IWuWebView> web = createWuWebView(wuid.str(), wsinfo.queryname.get(), getCFD(), true);
+        web->createWuidResponse(out, flags);
     }
 
-    DBGLOG("WS-ECL Request processed [using Doxie]");
+    DBGLOG("WS-ECL Request processed");
     return true;
 }
 
@@ -2710,6 +2724,11 @@ int CWsEclBinding::onGet(CHttpRequest* request, CHttpResponse* response)
 
         StringBuffer methodName;
         nextPathNode(thepath, methodName);
+        if (strieq(methodName, "async"))
+        {
+            parms->setProp("_async", 1);
+            methodName.set("submit");
+        }
 
         if (!stricmp(methodName.str(), "tabview"))
         {
@@ -2843,6 +2862,8 @@ void CWsEclBinding::handleJSONPost(CHttpRequest *request, CHttpResponse *respons
 
         StringBuffer action;
         nextPathNode(thepath, action);
+        if (strieq(action, "async"))
+            parms->setProp("_async", 1);
 
         StringBuffer lookup;
         nextPathNode(thepath, lookup);
@@ -2954,6 +2975,8 @@ int CWsEclBinding::HandleSoapRequest(CHttpRequest* request, CHttpResponse* respo
 
     StringBuffer action;
     nextPathNode(thepath, action);
+    if (strieq(action, "async"))
+        parms->setProp("_async", 1);
 
     StringBuffer lookup;
     nextPathNode(thepath, lookup);