Browse Source

Merge pull request #9682 from afishbeck/eclRunPolling

HPCC-17037 Add polling option to ecl command line "run"

Reviewed-By: Kevin Wang <kevin.wang@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 8 years ago
parent
commit
b5a2250455

+ 25 - 2
common/workunit/workunit.cpp

@@ -3017,12 +3017,35 @@ public:
 
     virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState)
     {
-        Owned<WorkUnitWaiter> waiter = new WorkUnitWaiter(wuid, SubscribeOptionState);
-        LocalIAbortHandler abortHandler(*waiter);
         WUState ret = WUStateUnknown;
         StringBuffer wuRoot;
         getXPath(wuRoot, wuid);
         Owned<IRemoteConnection> conn = sdsManager->connect(wuRoot.str(), session, 0, SDS_LOCK_TIMEOUT);
+        if (timeout == 0) //no need to subscribe
+        {
+            ret = (WUState) getEnum(conn->queryRoot(), "@state", states);
+            switch (ret)
+            {
+            case WUStateCompiled:
+            case WUStateUploadingFiles:
+                if (!compiled)
+                    break;
+            //fall through
+            case WUStateCompleted:
+            case WUStateFailed:
+            case WUStateAborted:
+                return ret;
+            case WUStateWait:
+                if(returnOnWaitState)
+                    return ret;
+            default:
+                break;
+            }
+            return WUStateUnknown;
+        }
+
+        Owned<WorkUnitWaiter> waiter = new WorkUnitWaiter(wuid, SubscribeOptionState);
+        LocalIAbortHandler abortHandler(*waiter);
         if (conn)
         {
             SessionId agent = -1;

+ 1 - 0
ecl/eclcmd/eclcmd_common.hpp

@@ -129,6 +129,7 @@ typedef IEclCommand *(*EclCommandFactory)(const char *cmdname);
 #define ECLOPT_INPUT_S "-in"
 
 #define ECLOPT_NOROOT "--noroot"
+#define ECLOPT_POLL "--poll"
 
 #define ECLOPT_WUID "--wuid"
 #define ECLOPT_WUID_S "-wu"

+ 259 - 4
ecl/eclcmd/eclcmd_core.cpp

@@ -66,14 +66,16 @@ void expandDefintionsAsDebugValues(const IArrayOf<IEspNamedValue> & definitions,
 
 }
 
-bool doDeploy(EclCmdWithEclTarget &cmd, IClientWsWorkunits *client, const char *cluster, const char *name, StringBuffer *wuid, StringBuffer *wucluster, bool noarchive, bool displayWuid=true, bool compress=true)
+void checkFeatures(IClientWsWorkunits *client, bool &useCompression, int &major, int &minor, int &point)
 {
-    bool useCompression = false;
     try
     {
         Owned<IClientWUCheckFeaturesRequest> req = client->createWUCheckFeaturesRequest();
         Owned<IClientWUCheckFeaturesResponse> resp = client->WUCheckFeatures(req);
         useCompression = resp->getDeployment().getUseCompression();
+        major = resp->getBuildVersionMajor();
+        minor = resp->getBuildVersionMinor();
+        point = resp->getBuildVersionPoint();
     }
     catch(IException *E) //most likely an older ESP
     {
@@ -82,6 +84,15 @@ bool doDeploy(EclCmdWithEclTarget &cmd, IClientWsWorkunits *client, const char *
     catch(...)
     {
     }
+}
+
+bool doDeploy(EclCmdWithEclTarget &cmd, IClientWsWorkunits *client, const char *cluster, const char *name, StringBuffer *wuid, StringBuffer *wucluster, bool noarchive, bool displayWuid=true, bool compress=true)
+{
+    int major = 0;
+    int minor = 0;
+    int point = 0;
+    bool useCompression = false;
+    checkFeatures(client, useCompression, major, minor, point);
 
     bool compressed = false;
     if (useCompression)
@@ -516,6 +527,19 @@ private:
     bool optDontAppendCluster; //Undesirable but here temporarily because DALI may have locking issues
 };
 
+inline unsigned nextWait(unsigned wait, unsigned waited)
+{
+    if (waited < 5000)
+        return 1000;
+    if (waited < 30000)
+        return 5000;
+    if (waited < 60000)
+        return 10000;
+    if (waited < 120000)
+        return 30000;
+    return 60000;
+}
+
 class EclCmdRun : public EclCmdWithEclTarget
 {
 public:
@@ -542,6 +566,10 @@ public:
                 continue;
             if (iter.matchFlag(optNoRoot, ECLOPT_NOROOT))
                 continue;
+            if (iter.matchFlag(optPoll, ECLOPT_POLL))
+                continue;
+            if (iter.matchFlag(optPre64, "--pre64")) //only for troubleshooting, do not document
+                continue;
             if (iter.matchOption(optExceptionSeverity, ECLOPT_EXCEPTION_LEVEL))
                 continue;
             if (EclCmdWithEclTarget::matchCommandLineOption(iter, true)!=EclCmdOptionMatch)
@@ -566,6 +594,214 @@ public:
         }
         return true;
     }
+
+    int checkComplete(IClientWsWorkunits* client, IClientWUWaitRequest* req)
+    {
+        Owned<IClientWUWaitResponse> resp = client->WUWaitComplete(req);
+        if (resp->getExceptions().ordinality())
+            throw LINK(&resp->getExceptions());
+        return resp->getStateID();
+    }
+
+    int checkComplete(IClientWsWorkunits* client, IClientWUInfoRequest* req)
+    {
+        Owned<IClientWUInfoResponse> resp = client->WUInfo(req);
+        if (resp->getExceptions().ordinality())
+            throw LINK(&resp->getExceptions());
+        int state = resp->getWorkunit().getStateID();
+        switch (state)
+        {
+        case WUStateCompleted:
+        case WUStateFailed:
+        case WUStateAborted:
+            return state;
+        }
+
+        return WUStateUnknown; //emulate result from waitForWorkUnit which will be called for non-legacy builds
+    }
+
+    void initPollRequest(Owned<IClientWUInfoRequest> &req, IClientWsWorkunits* client, const char *wuid)
+    {
+        req.setown(client->createWUInfoRequest());
+        req->setWuid(wuid);
+        req->setTruncateEclTo64k(true);
+        req->setIncludeExceptions(false);
+        req->setIncludeGraphs(false);
+        req->setIncludeSourceFiles(false);
+        req->setIncludeResults(false);
+        req->setIncludeResultsViewNames(false);
+        req->setIncludeVariables(false);
+        req->setIncludeTimers(false);
+        req->setIncludeDebugValues(false);
+        req->setIncludeApplicationValues(false);
+        req->setIncludeWorkflows(false);
+        req->setIncludeXmlSchemas(false);
+        req->setIncludeResourceURLs(false);
+        req->setSuppressResultSchemas(true);
+    }
+
+    void initPollRequest(Owned<IClientWUWaitRequest> &req, IClientWsWorkunits* client, const char *wuid)
+    {
+        req.setown(client->createWUWaitCompleteRequest());
+        req->setWuid(wuid);
+        req->setWait(0); //Just return the current state
+    }
+
+    int pollForCompletion(IClientWsWorkunits* client, const char *wuid, bool optimized)
+    {
+        Owned<IClientWUInfoRequest> reqInfo;
+        Owned<IClientWUWaitRequest> reqQuick;
+
+        if (optimized)
+            initPollRequest(reqQuick, client, wuid);
+        else
+            initPollRequest(reqInfo, client, wuid);
+
+        int state = WUStateUnknown;
+        for(;;)
+        {
+            if (optimized)
+                state = checkComplete(client, reqQuick);
+            else
+                state = checkComplete(client, reqInfo);
+            if (state != WUStateUnknown)
+                break;
+            unsigned waited = msTick() - startTimeMs;
+            if (optWaitTime!=(unsigned)-1 && waited>=optWaitTime)
+                return WUStateUnknown;
+            Sleep(nextWait(optWaitTime, waited));
+        }
+        return state;
+    }
+
+    void gatherLegacyServerResults(IClientWsWorkunits* client, const char *wuid)
+    {
+        Owned<IClientWUInfoRequest> req = client->createWUInfoRequest();
+        req->setWuid(wuid);
+        req->setIncludeExceptions(true);
+        req->setIncludeGraphs(false);
+        req->setIncludeSourceFiles(false);
+        req->setIncludeResults(false); //ECL layout results, not xml
+        req->setIncludeResultsViewNames(false);
+        req->setIncludeVariables(false);
+        req->setIncludeTimers(false);
+        req->setIncludeDebugValues(false);
+        req->setIncludeApplicationValues(false);
+        req->setIncludeWorkflows(false);
+        req->setIncludeXmlSchemas(false);
+        req->setIncludeResourceURLs(false);
+        req->setSuppressResultSchemas(true);
+        Owned<IClientWUInfoResponse> resp = client->WUInfo(req);
+
+        IConstECLWorkunit &wu = resp->getWorkunit();
+        IArrayOf<IConstECLException> &exceptions = wu.getExceptions();
+        unsigned count = wu.getResultCount();
+        if (count<=0 && exceptions.ordinality()<=0)
+            return;
+        fputs("<Result>\n", stdout);
+        ForEachItemIn(pos, exceptions)
+        {
+            IConstECLException &e = exceptions.item(pos);
+            fprintf(stdout, " <Exception><Code>%d</Code><Source>%s</Source><Message>%s</Message></Exception>\n", e.getCode(), e.getSource(), e.getMessage());
+        }
+        Owned<IClientWUResultRequest> resReq = client->createWUResultRequest();
+        resReq->setWuid(wuid);
+        resReq->setSuppressXmlSchema(true);
+        for (unsigned seq=0; seq<count; seq++)
+        {
+            resReq->setSequence(seq);
+            Owned<IClientWUResultResponse> resp = client->WUResult(resReq);
+
+            if (resp->getExceptions().ordinality())
+                throw LINK(&resp->getExceptions());
+            fwrite(resp->getResult(), 8, 1, stdout);
+            //insert name attribute into <Dataset> tag
+            fprintf(stdout, " name='%s'", resp->getName());
+            fputs(resp->getResult()+8, stdout);
+        }
+        fputs("</Result>\n", stdout);
+    }
+
+    void getAndOutputResults(IClientWsWorkunits* client, const char *wuid)
+    {
+        Owned<IClientWUFullResultRequest> req = client->createWUFullResultRequest();
+        req->setWuid(wuid);
+        req->setNoRootTag(optNoRoot);
+        req->setExceptionSeverity(optExceptionSeverity);
+
+        Owned<IClientWUFullResultResponse> resp = client->WUFullResult(req);
+        if (resp->getResults())
+            fprintf(stdout, "%s\n", resp->getResults());
+    }
+
+    void processResults(IClientWsWorkunits* client, const char *wuid, bool optimized)
+    {
+        if (optimized)
+            getAndOutputResults(client, wuid);
+        else
+            gatherLegacyServerResults(client, wuid);
+    }
+
+    int pollForResults(IClientWsWorkunits* client, const char *wuid)
+    {
+        int major = 0;
+        int minor = 0;
+        int point = 0;
+        bool useCompression = false;
+        checkFeatures(client, useCompression, major, minor, point);
+        bool optimized = !optPre64 && (major>=6 && minor>=3);
+
+        try
+        {
+            int state = pollForCompletion(client, wuid, optimized);
+            switch (state)
+            {
+            case WUStateCompleted:
+                processResults(client, wuid, optimized);
+                return 0;
+            case WUStateUnknown:
+                fprintf(stderr, "Timed out waiting for %s to complete, workunit is still running.\n", wuid);
+                break;
+            case WUStateFailed:
+            case WUStateAborted:
+                fprintf(stderr, "%s %s.\n", wuid, getWorkunitStateStr((WUState)state));
+                break;
+            default:
+                fprintf(stderr, "%s in unrecognized state.\n", wuid);
+                break;
+            }
+        }
+        catch (IMultiException *ME)
+        {
+            outputMultiExceptionsEx(*ME);
+            ME->Release();
+        }
+        catch (IException *E)
+        {
+            StringBuffer msg;
+            fprintf(stderr, "Exception polling for results: %d: %s\n", E->errorCode(), E->errorMessage(msg).str());
+            E->Release();
+        }
+        return 1;
+    }
+    int getInitialRunWait()
+    {
+        if (!optPoll)
+            return optWaitTime;
+        return (optWaitTime < 10000) ? optWaitTime : 10000; //stay connected for the first 10 seconds even if polling
+    }
+    bool isFinalState(WUState state)
+    {
+        switch (state)
+        {
+        case WUStateCompleted:
+        case WUStateFailed:
+        case WUStateAborted:
+            return true;
+        }
+        return false;
+    }
+
     virtual int processCMD()
     {
         Owned<IClientWsWorkunits> client = createCmdClientExt(WsWorkunits, *this, "?upload_"); //upload_ disables maxRequestEntityLength
@@ -604,7 +840,8 @@ public:
             req->setCluster(wuCluster.str());
         else if (optTargetCluster.length())
             req->setCluster(optTargetCluster.get());
-        req->setWait((int)optWaitTime);
+
+        req->setWait(getInitialRunWait());
         if (optInput.length())
             req->setInput(optInput.get());
         req->setExceptionSeverity(optExceptionSeverity); //throws exception if invalid value
@@ -615,6 +852,7 @@ public:
         if (variables.length())
             req->setVariables(variables);
 
+        startTimeMs = msTick();
         Owned<IClientWURunResponse> resp = client->WURun(req);
 
         if (checkMultiExceptionsQueryNotFound(resp->getExceptions()))
@@ -629,8 +867,21 @@ public:
         StringBuffer respwuid(resp->getWuid());
         if (optVerbose && respwuid.length() && !streq(wuid.str(), respwuid.str()))
             fprintf(stdout, "As %s\n", respwuid.str());
-        if (!streq(resp->getState(), "completed"))
+        WUState state = getWorkUnitState(resp->getState());
+        if (optPoll && !isFinalState(state))
+            return pollForResults(client, wuid);
+
+        switch (state)
+        {
+        case WUStateRunning:
+            fprintf(stderr, "Timed out waiting for %s to complete, workunit is still running.\n", wuid.str()); //server side waiting timed out
+            break;
+        case WUStateCompleted:
+            break;
+        default:
             fprintf(stderr, "%s %s\n", respwuid.str(), resp->getState());
+        }
+
         if (resp->getResults())
             fprintf(stdout, "%s\n", resp->getResults());
 
@@ -662,6 +913,7 @@ public:
             "   -in,--input=<file|xml>    file or xml content to use as query input\n"
             "   -X<name>=<value>          sets the stored input value (stored('name'))\n"
             "   --wait=<ms>               time to wait for completion\n"
+            "   --poll                    poll for results, rather than remain connected\n"
             "   --exception-level=<level> minimum severity level for exceptions\n"
             "                             values: 'info', 'warning', 'error'\n",
             stdout);
@@ -673,7 +925,10 @@ private:
     StringAttr optExceptionSeverity;
     IArrayOf<IEspNamedValue> variables;
     unsigned optWaitTime;
+    unsigned startTimeMs = 0;
     bool optNoRoot;
+    bool optPoll;
+    bool optPre64;  //only for troubleshooting, do not document
 };
 
 void outputQueryActionResults(const IArrayOf<IConstQuerySetQueryActionResult> &results, const char *act, const char *qs)

+ 14 - 0
esp/scm/ws_workunits.ecm

@@ -813,6 +813,19 @@ ESPresponse [exceptions_inline,http_encode(0)] WUResultResponse
     [json_inline(1)] string Result;
 };
 
+ESPrequest WUFullResultRequest
+{
+    string Wuid;
+    bool NoRootTag(0);
+    ESPenum WUExceptionSeverity ExceptionSeverity("info");
+};
+
+ESPresponse [exceptions_inline,http_encode(0)] WUFullResultResponse
+{
+    string Wuid;
+    [json_inline(1)] string Results;
+};
+
 ESPrequest WUResultViewRequest
 {
     string Wuid;
@@ -1811,6 +1824,7 @@ ESPservice [
     ESPmethod [resp_xsl_default("/esp/xslt/graph_gvc.xslt")]     WUGVCGraphInfo(WUGVCGraphInfoRequest, WUGVCGraphInfoResponse);
     ESPmethod [description("Stub for Ajax GVC Graph."), help(""), resp_xsl_default("/esp/xslt/GvcGraph.xslt")] GVCAjaxGraph(GVCAjaxGraphRequest, GVCAjaxGraphResponse);
     ESPmethod [resp_xsl_default("/esp/xslt/result.xslt")]        WUResult(WUResultRequest, WUResultResponse);
+    ESPmethod WUFullResult(WUFullResultRequest, WUFullResultResponse);
     ESPmethod WUResultView(WUResultViewRequest, WUResultViewResponse);
     ESPmethod [resp_xsl_default("/esp/xslt/wuid_jobs.xslt")]     WUJobList(WUJobListRequest, WUJobListResponse);
     ESPmethod [resp_xsl_default("/esp/xslt/wuaction_results.xslt")] WUAction(WUActionRequest, WUActionResponse); 

+ 52 - 0
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -1052,6 +1052,58 @@ bool CWsWorkunitsEx::onWURun(IEspContext &context, IEspWURunRequest &req, IEspWU
     return true;
 }
 
+bool CWsWorkunitsEx::onWUFullResult(IEspContext &context, IEspWUFullResultRequest &req, IEspWUFullResultResponse &resp)
+{
+    try
+    {
+        StringBuffer wuid = req.getWuid();
+        WsWuHelpers::checkAndTrimWorkunit("WUFullResult", wuid);
+
+        ErrorSeverity severity = checkGetExceptionSeverity(req.getExceptionSeverity());
+
+        if (!wuid.length())
+            throw MakeStringException(ECLWATCH_MISSING_PARAMS,"Workunit or Query required");
+        if (!looksLikeAWuid(wuid, 'W'))
+            throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Workunit ID: %s", wuid.str());
+        PROGLOG("WUFullResults: %s", wuid.str());
+
+        Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
+        Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str());
+        if (!cw)
+            throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.", wuid.str());
+
+        ensureWsWorkunitAccess(context, *cw, SecAccess_Read);
+
+        resp.setWuid(wuid.str());
+
+        switch (cw->getState())
+        {
+            case WUStateCompleted:
+            case WUStateFailed:
+            case WUStateUnknown:
+            {
+                SCMStringBuffer result;
+                unsigned flags = WorkUnitXML_SeverityTags;
+                if (req.getNoRootTag())
+                    flags |= WorkUnitXML_NoRoot;
+                if (context.getResponseFormat()==ESPSerializationJSON)
+                    getFullWorkUnitResultsJSON(context.queryUserId(), context.queryPassword(), cw.get(), result, flags, severity);
+                else
+                    getFullWorkUnitResultsXML(context.queryUserId(), context.queryPassword(), cw.get(), result, flags, severity);
+                resp.setResults(result.str());
+                break;
+            }
+            default:
+                throw MakeStringException(ECLWATCH_CANNOT_GET_WU_RESULT, "Cannot get results Workunit %s %s.", wuid.str(), getWorkunitStateStr(cw->getState()));
+        }
+    }
+    catch(IException* e)
+    {
+        FORWARDEXCEPTION(context, e,  ECLWATCH_INTERNAL_ERROR);
+    }
+    return true;
+}
+
 
 bool CWsWorkunitsEx::onWUWaitCompiled(IEspContext &context, IEspWUWaitRequest &req, IEspWUWaitResponse &resp)
 {

+ 1 - 0
esp/services/ws_workunits/ws_workunitsService.hpp

@@ -207,6 +207,7 @@ public:
     bool onWUInfoDetails(IEspContext &context, IEspWUInfoRequest &req, IEspWUInfoResponse &resp);
     bool onWUFile(IEspContext &context,IEspWULogFileRequest &req, IEspWULogFileResponse &resp);
     bool onWUResult(IEspContext &context,IEspWUResultRequest &req, IEspWUResultResponse &resp);
+    bool onWUFullResult(IEspContext &context, IEspWUFullResultRequest &req, IEspWUFullResultResponse &resp);
     bool onWUResultView(IEspContext &context, IEspWUResultViewRequest &req, IEspWUResultViewResponse &resp);
     bool onWUResultSummary(IEspContext &context, IEspWUResultSummaryRequest &req, IEspWUResultSummaryResponse &resp);
     bool onWUResultBin(IEspContext &context, IEspWUResultBinRequest &req, IEspWUResultBinResponse &resp);