Browse Source

Merge pull request #1843 from afishbeck/execute_existing

Fix gh-1686 missing WsWorkunit support for WUActionExecuteExisting

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 13 years ago
parent
commit
cd78a49ecb
2 changed files with 168 additions and 74 deletions
  1. 167 74
      esp/services/ws_workunits/ws_workunitsService.cpp
  2. 1 0
      esp/smc/SMCLib/exception_util.hpp

+ 167 - 74
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -89,7 +89,27 @@ public:
     }
 };
 
-void submitWsWorkunit(IEspContext& context, IConstWorkUnit* cw, const char* cluster, const char* snapshot, int maxruntime, bool compile, bool resetWorkflow)
+void setWsWuXmlParameters(IWorkUnit *wu, const char *xml, bool setJobname=false)
+{
+    if (!xml || !*xml)
+        return;
+    Owned<IPropertyTree> tree = createPTreeFromXMLString(xml, ipt_none, (XmlReaderOptions)(xr_ignoreWhiteSpace | xr_ignoreNameSpaces));
+    IPropertyTree *root = tree.get();
+    if (strieq(root->queryName(), "Envelope"))
+        root = root->queryPropTree("Body/*[1]");
+    if (!root)
+        return;
+    if (setJobname)
+    {
+        SCMStringBuffer name;
+        wu->getJobName(name);
+        if (!name.length())
+            wu->setJobName(root->queryName());
+    }
+    wu->setXmlParams(LINK(root));
+}
+
+void submitWsWorkunit(IEspContext& context, IConstWorkUnit* cw, const char* cluster, const char* snapshot, int maxruntime, bool compile, bool resetWorkflow, const char *paramXml=NULL)
 {
     ensureWsWorkunitAccess(context, *cw, SecAccess_Write);
     switch(cw->getState())
@@ -129,6 +149,8 @@ void submitWsWorkunit(IEspContext& context, IConstWorkUnit* cw, const char* clus
             wu->schedule();
     }
 
+    setWsWuXmlParameters(wu, paramXml, (wu->getAction()==WUActionExecuteExisting));
+
     wu->commit();
     wu.clear();
 
@@ -142,13 +164,125 @@ void submitWsWorkunit(IEspContext& context, IConstWorkUnit* cw, const char* clus
     AuditSystemAccess(context.queryUserId(), true, "Submitted %s", wuid.str());
 }
 
-void submitWsWorkunit(IEspContext& context, const char *wuid, const char* cluster, const char* snapshot, int maxruntime, bool compile, bool resetWorkflow)
+void submitWsWorkunit(IEspContext& context, const char *wuid, const char* cluster, const char* snapshot, int maxruntime, bool compile, bool resetWorkflow, const char *paramXml=NULL)
 {
     Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
     Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid, false);
-    return submitWsWorkunit(context, cw, cluster, snapshot, maxruntime, compile, resetWorkflow);
+    return submitWsWorkunit(context, cw, cluster, snapshot, maxruntime, compile, resetWorkflow, paramXml);
+}
+
+
+void copyWsWorkunit(IEspContext &context, IWorkUnit &wu, const char *srcWuid)
+{
+    Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
+    Owned<IConstWorkUnit> src(factory->openWorkUnit(srcWuid, false));
+
+    SCMStringBuffer wuid;
+    wu.getWuid(wuid);
+
+    queryExtendedWU(&wu)->copyWorkUnit(src);
+
+    SCMStringBuffer token;
+    wu.setSecurityToken(createToken(wuid.str(), context.queryUserId(), context.queryPassword(), token).str());
+    wu.commit();
+}
+
+void runWsWorkunit(IEspContext &context, StringBuffer &wuid, const char *srcWuid, const char *cluster, const char *paramXml=NULL)
+{
+    StringBufferAdaptor isvWuid(wuid);
+
+    NewWsWorkunit wu(context);
+    wu->getWuid(isvWuid);
+    copyWsWorkunit(context, *wu, srcWuid);
+    wu.clear();
+
+    submitWsWorkunit(context, wuid.str(), cluster, NULL, 0, false, true, paramXml);
+}
+
+void runWsWorkunit(IEspContext &context, IConstWorkUnit *cw, const char *srcWuid, const char *cluster, const char *paramXml=NULL)
+{
+    WorkunitUpdate wu(&cw->lock());
+    copyWsWorkunit(context, *wu, srcWuid);
+    wu.clear();
+
+    submitWsWorkunit(context, cw, cluster, NULL, 0, false, true, paramXml);
+}
+
+IException *noteException(IWorkUnit *wu, IException *e, WUExceptionSeverity level=ExceptionSeverityError)
+{
+    if (wu)
+    {
+        Owned<IWUException> we = wu->createException();
+        StringBuffer s;
+        we->setExceptionMessage(e->errorMessage(s).str());
+        we->setExceptionSource("WsWorkunits");
+        we->setSeverity(level);
+        if (level==ExceptionSeverityError)
+            wu->setState(WUStateFailed);
+    }
+    return e;
+}
+
+StringBuffer &resolveQueryWuid(StringBuffer &wuid, const char *queryset, const char *query, bool notSuspended=true, IWorkUnit *wu=NULL)
+{
+    Owned<IPropertyTree> qs = getQueryRegistry(queryset, true);
+    if (!qs)
+        throw noteException(wu, MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "QuerySet '%s' not found", queryset));
+    Owned<IPropertyTree> q = resolveQueryAlias(qs, query);
+    if (!q)
+        throw noteException(wu, MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query '%s/%s' not found", queryset, query));
+    if (notSuspended && q->getPropBool("@suspended"))
+        throw noteException(wu, MakeStringException(ECLWATCH_QUERY_SUSPENDED, "Query '%s/%s' is suspended", queryset, query));
+    return wuid.append(q->queryProp("@wuid"));
+}
+
+void runWsWuQuery(IEspContext &context, IConstWorkUnit *cw, const char *queryset, const char *query, const char *cluster, const char *paramXml=NULL)
+{
+    StringBuffer srcWuid;
+
+    WorkunitUpdate wu(&cw->lock());
+    resolveQueryWuid(srcWuid, queryset, query, true, wu);
+    copyWsWorkunit(context, *wu, srcWuid);
+    wu.clear();
+
+    submitWsWorkunit(context, cw, cluster, NULL, 0, false, true, paramXml);
+}
+
+void runWsWuQuery(IEspContext &context, StringBuffer &wuid, const char *queryset, const char *query, const char *cluster, const char *paramXml=NULL)
+{
+    StringBuffer srcWuid;
+    StringBufferAdaptor isvWuid(wuid);
+
+    NewWsWorkunit wu(context);
+    wu->getWuid(isvWuid);
+    resolveQueryWuid(srcWuid, queryset, query, true, wu);
+    copyWsWorkunit(context, *wu, srcWuid);
+    wu.clear();
+
+    submitWsWorkunit(context, wuid.str(), cluster, NULL, 0, false, true, paramXml);
 }
 
+class ExecuteExistingQueryInfo
+{
+public:
+    ExecuteExistingQueryInfo(IConstWorkUnit *cw)
+    {
+        SCMStringBuffer isv;
+        cw->getJobName(isv);
+        const char *name = isv.str();
+        const char *div = strchr(name, '.');
+        if (div)
+        {
+            queryset.set(name, div-name);
+            query.set(div+1);
+        }
+    }
+
+public:
+    StringAttr queryset;
+    StringAttr query;
+};
+
 typedef enum _WuActionType
 {
     ActionDelete=0,
@@ -529,9 +663,8 @@ bool CWsWorkunitsEx::onWUUpdate(IEspContext &context, IEspWUUpdateRequest &req,
                     wu->setClusterName(s.str());
             }
         }
-        const char *xmlParams = req.getXmlParams();
-        if (notEmpty(xmlParams))
-            wu->setXmlParams(xmlParams);
+
+        setWsWuXmlParameters(wu, req.getXmlParams(), (req.getAction()==WUActionExecuteExisting));
 
         if (notEmpty(req.getQueryText()))
         {
@@ -909,7 +1042,21 @@ bool CWsWorkunitsEx::onWUSubmit(IEspContext &context, IEspWUSubmitRequest &req,
         if (isEmpty(req.getCluster()))
             throw MakeStringException(ECLWATCH_INVALID_INPUT,"No Cluster defined.");
 
-        submitWsWorkunit(context, req.getWuid(), req.getCluster(), req.getSnapshot(), req.getMaxRunTime(), true, false);
+        Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
+        Owned<IConstWorkUnit> cw = factory->openWorkUnit(req.getWuid(), false);
+        if (cw->getAction()==WUActionExecuteExisting)
+        {
+            ExecuteExistingQueryInfo info(cw);
+            if (info.queryset.isEmpty() || info.query.isEmpty())
+            {
+                WorkunitUpdate wu(&cw->lock());
+                throw noteException(wu, MakeStringException(ECLWATCH_INVALID_INPUT,"Queryset and/or query not specified"));
+            }
+
+            runWsWuQuery(context, cw, info.queryset.sget(), info.query.sget(), req.getCluster(), NULL);
+        }
+        else
+            submitWsWorkunit(context, cw, req.getCluster(), req.getSnapshot(), req.getMaxRunTime(), true, false);
 
         if (req.getBlockTillFinishTimer() != 0)
             waitForWorkUnitToComplete(req.getWuid(), req.getBlockTillFinishTimer());
@@ -927,84 +1074,30 @@ bool CWsWorkunitsEx::onWURun(IEspContext &context, IEspWURunRequest &req, IEspWU
 {
     try
     {
-        SCMStringBuffer wuid;
-        wuid.set(req.getWuid());
+        const char *runWuid = req.getWuid();
+        StringBuffer wuid;
 
-        bool cloneWorkunit=req.getCloneWorkunit();
-        if (!wuid.length() && notEmpty(req.getQuerySet()) && notEmpty(req.getQuery()))
+        if (runWuid && *runWuid)
         {
-            cloneWorkunit=true;
-            Owned<IPropertyTree> qstree = getQueryRegistry(req.getQuerySet(), true);
-            if (qstree)
+            if (req.getCloneWorkunit())
+                runWsWorkunit(context, wuid, runWuid, req.getCluster(), req.getInput());
+            else
             {
-                IPropertyTree *query = NULL;
-                VStringBuffer xpath("Alias[@name=\"%s\"]", req.getQuery());
-                IPropertyTree *alias = qstree->queryPropTree(xpath.str());
-                if (alias)
-                {
-                    const char *quid = alias->queryProp("@id");
-                    if (!quid)
-                        throw MakeStringException(-1, "Alias %s/%s has no Query defined", req.getQuerySet(), req.getQuery());
-                    xpath.clear().appendf("Query[@id='%s']", quid);
-                    query = qstree->queryPropTree(xpath.str());
-                    if (!query)
-                        throw MakeStringException(-1, "Alias %s/%s refers to a non existing query %s", req.getQuerySet(), req.getQuery(), quid);
-                }
-                else
-                {
-                    xpath.clear().appendf("Query[@id=\"%s\"]", req.getQuery());
-                    query = qstree->queryPropTree(xpath.str());
-                }
-                if (query)
-                {
-                    if (query->getPropBool("@suspended"))
-                        throw MakeStringException(-1, "Query %s/%s is currently suspended", req.getQuerySet(), req.getQuery());
-
-                    wuid.set(query->queryProp("@wuid"));
-                }
-                else
-                    throw MakeStringException(-1, "Query %s/%s not found", req.getQuerySet(), req.getQuery());
+                submitWsWorkunit(context, runWuid, req.getCluster(), NULL, 0, false, true, req.getInput());
+                wuid.set(runWuid);
             }
-            else
-                throw MakeStringException(-1, "QuerySet %s not found", req.getQuerySet());
         }
-
-        if (!wuid.length())
+        else if (notEmpty(req.getQuerySet()) && notEmpty(req.getQuery()))
+            runWsWuQuery(context, wuid, req.getQuerySet(), req.getQuery(), req.getCluster(), req.getInput());
+        else
             throw MakeStringException(ECLWATCH_MISSING_PARAMS,"Workunit or Query required");
 
-        ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Write);
-
-        Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
-        if(cloneWorkunit)
-        {
-            Owned<IConstWorkUnit> src(factory->openWorkUnit(wuid.str(), false));
-            NewWsWorkunit wu(factory, context);
-            wu->getWuid(wuid);
-            queryExtendedWU(wu)->copyWorkUnit(src);
-
-            SCMStringBuffer token;
-            wu->setSecurityToken(createToken(wuid.str(), context.queryUserId(), context.queryPassword(), token).str());
-        }
-
-        Owned<IConstWorkUnit> cw(factory->openWorkUnit(wuid.str(), false));
-        if (!cw)
-            throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", wuid.str());
-
-        if (notEmpty(req.getInput()))
-        {
-            Owned<IWuWebView> web = createWuWebView(*cw, NULL, getCFD(), true);
-            web->addInputsFromXml(req.getInput());
-        }
-
-        submitWsWorkunit(context, cw, req.getCluster(), NULL, 0, false, true);
-        cw.clear();
-
         int timeToWait = req.getWait();
         if (timeToWait != 0)
             waitForWorkUnitToComplete(wuid.str(), timeToWait);
 
-
-        cw.set(factory->openWorkUnit(wuid.str(), false));
+        Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
+        Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
         if (!cw)
             throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", wuid.str());
 

+ 1 - 0
esp/smc/SMCLib/exception_util.hpp

@@ -119,6 +119,7 @@
 #define ECLWATCH_ALIAS_NOT_FOUND            ECLWATCH_ERROR_START+98
 #define ECLWATCH_QUERYSET_NOT_ON_CLUSTER    ECLWATCH_ERROR_START+99
 #define ECLWATCH_CONTROL_QUERY_FAILED       ECLWATCH_ERROR_START+100
+#define ECLWATCH_QUERY_SUSPENDED            ECLWATCH_ERROR_START+101
 
 inline void FORWARDEXCEPTION(IEspContext &context, IException *e, unsigned codeNew)
 {