|
@@ -89,7 +89,27 @@ public:
|
|
}
|
|
}
|
|
};
|
|
};
|
|
|
|
|
|
-void submitWsWorkunit(IEspContext& context, IConstWorkUnit* cw, const char* cluster, const char* snapshot, int maxruntime, bool compile, bool resetWorkflow)
|
|
|
|
|
|
+void setWsWuXmlParameters(IWorkUnit *wu, const char *xml, bool setJobname=false)
|
|
|
|
+{
|
|
|
|
+ if (!xml || !*xml)
|
|
|
|
+ return;
|
|
|
|
+ Owned<IPropertyTree> tree = createPTreeFromXMLString(xml, ipt_none, (XmlReaderOptions)(xr_ignoreWhiteSpace | xr_ignoreNameSpaces));
|
|
|
|
+ IPropertyTree *root = tree.get();
|
|
|
|
+ if (strieq(root->queryName(), "Envelope"))
|
|
|
|
+ root = root->queryPropTree("Body/*[1]");
|
|
|
|
+ if (!root)
|
|
|
|
+ return;
|
|
|
|
+ if (setJobname)
|
|
|
|
+ {
|
|
|
|
+ SCMStringBuffer name;
|
|
|
|
+ wu->getJobName(name);
|
|
|
|
+ if (!name.length())
|
|
|
|
+ wu->setJobName(root->queryName());
|
|
|
|
+ }
|
|
|
|
+ wu->setXmlParams(LINK(root));
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+void submitWsWorkunit(IEspContext& context, IConstWorkUnit* cw, const char* cluster, const char* snapshot, int maxruntime, bool compile, bool resetWorkflow, const char *paramXml=NULL)
|
|
{
|
|
{
|
|
ensureWsWorkunitAccess(context, *cw, SecAccess_Write);
|
|
ensureWsWorkunitAccess(context, *cw, SecAccess_Write);
|
|
switch(cw->getState())
|
|
switch(cw->getState())
|
|
@@ -129,6 +149,8 @@ void submitWsWorkunit(IEspContext& context, IConstWorkUnit* cw, const char* clus
|
|
wu->schedule();
|
|
wu->schedule();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ setWsWuXmlParameters(wu, paramXml, (wu->getAction()==WUActionExecuteExisting));
|
|
|
|
+
|
|
wu->commit();
|
|
wu->commit();
|
|
wu.clear();
|
|
wu.clear();
|
|
|
|
|
|
@@ -142,13 +164,125 @@ void submitWsWorkunit(IEspContext& context, IConstWorkUnit* cw, const char* clus
|
|
AuditSystemAccess(context.queryUserId(), true, "Submitted %s", wuid.str());
|
|
AuditSystemAccess(context.queryUserId(), true, "Submitted %s", wuid.str());
|
|
}
|
|
}
|
|
|
|
|
|
-void submitWsWorkunit(IEspContext& context, const char *wuid, const char* cluster, const char* snapshot, int maxruntime, bool compile, bool resetWorkflow)
|
|
|
|
|
|
+void submitWsWorkunit(IEspContext& context, const char *wuid, const char* cluster, const char* snapshot, int maxruntime, bool compile, bool resetWorkflow, const char *paramXml=NULL)
|
|
{
|
|
{
|
|
Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
|
|
Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
|
|
Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid, false);
|
|
Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid, false);
|
|
- return submitWsWorkunit(context, cw, cluster, snapshot, maxruntime, compile, resetWorkflow);
|
|
|
|
|
|
+ return submitWsWorkunit(context, cw, cluster, snapshot, maxruntime, compile, resetWorkflow, paramXml);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+void copyWsWorkunit(IEspContext &context, IWorkUnit &wu, const char *srcWuid)
|
|
|
|
+{
|
|
|
|
+ Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
|
|
|
|
+ Owned<IConstWorkUnit> src(factory->openWorkUnit(srcWuid, false));
|
|
|
|
+
|
|
|
|
+ SCMStringBuffer wuid;
|
|
|
|
+ wu.getWuid(wuid);
|
|
|
|
+
|
|
|
|
+ queryExtendedWU(&wu)->copyWorkUnit(src);
|
|
|
|
+
|
|
|
|
+ SCMStringBuffer token;
|
|
|
|
+ wu.setSecurityToken(createToken(wuid.str(), context.queryUserId(), context.queryPassword(), token).str());
|
|
|
|
+ wu.commit();
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+void runWsWorkunit(IEspContext &context, StringBuffer &wuid, const char *srcWuid, const char *cluster, const char *paramXml=NULL)
|
|
|
|
+{
|
|
|
|
+ StringBufferAdaptor isvWuid(wuid);
|
|
|
|
+
|
|
|
|
+ NewWsWorkunit wu(context);
|
|
|
|
+ wu->getWuid(isvWuid);
|
|
|
|
+ copyWsWorkunit(context, *wu, srcWuid);
|
|
|
|
+ wu.clear();
|
|
|
|
+
|
|
|
|
+ submitWsWorkunit(context, wuid.str(), cluster, NULL, 0, false, true, paramXml);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+void runWsWorkunit(IEspContext &context, IConstWorkUnit *cw, const char *srcWuid, const char *cluster, const char *paramXml=NULL)
|
|
|
|
+{
|
|
|
|
+ WorkunitUpdate wu(&cw->lock());
|
|
|
|
+ copyWsWorkunit(context, *wu, srcWuid);
|
|
|
|
+ wu.clear();
|
|
|
|
+
|
|
|
|
+ submitWsWorkunit(context, cw, cluster, NULL, 0, false, true, paramXml);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+IException *noteException(IWorkUnit *wu, IException *e, WUExceptionSeverity level=ExceptionSeverityError)
|
|
|
|
+{
|
|
|
|
+ if (wu)
|
|
|
|
+ {
|
|
|
|
+ Owned<IWUException> we = wu->createException();
|
|
|
|
+ StringBuffer s;
|
|
|
|
+ we->setExceptionMessage(e->errorMessage(s).str());
|
|
|
|
+ we->setExceptionSource("WsWorkunits");
|
|
|
|
+ we->setSeverity(level);
|
|
|
|
+ if (level==ExceptionSeverityError)
|
|
|
|
+ wu->setState(WUStateFailed);
|
|
|
|
+ }
|
|
|
|
+ return e;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+StringBuffer &resolveQueryWuid(StringBuffer &wuid, const char *queryset, const char *query, bool notSuspended=true, IWorkUnit *wu=NULL)
|
|
|
|
+{
|
|
|
|
+ Owned<IPropertyTree> qs = getQueryRegistry(queryset, true);
|
|
|
|
+ if (!qs)
|
|
|
|
+ throw noteException(wu, MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "QuerySet '%s' not found", queryset));
|
|
|
|
+ Owned<IPropertyTree> q = resolveQueryAlias(qs, query);
|
|
|
|
+ if (!q)
|
|
|
|
+ throw noteException(wu, MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query '%s/%s' not found", queryset, query));
|
|
|
|
+ if (notSuspended && q->getPropBool("@suspended"))
|
|
|
|
+ throw noteException(wu, MakeStringException(ECLWATCH_QUERY_SUSPENDED, "Query '%s/%s' is suspended", queryset, query));
|
|
|
|
+ return wuid.append(q->queryProp("@wuid"));
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+void runWsWuQuery(IEspContext &context, IConstWorkUnit *cw, const char *queryset, const char *query, const char *cluster, const char *paramXml=NULL)
|
|
|
|
+{
|
|
|
|
+ StringBuffer srcWuid;
|
|
|
|
+
|
|
|
|
+ WorkunitUpdate wu(&cw->lock());
|
|
|
|
+ resolveQueryWuid(srcWuid, queryset, query, true, wu);
|
|
|
|
+ copyWsWorkunit(context, *wu, srcWuid);
|
|
|
|
+ wu.clear();
|
|
|
|
+
|
|
|
|
+ submitWsWorkunit(context, cw, cluster, NULL, 0, false, true, paramXml);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+void runWsWuQuery(IEspContext &context, StringBuffer &wuid, const char *queryset, const char *query, const char *cluster, const char *paramXml=NULL)
|
|
|
|
+{
|
|
|
|
+ StringBuffer srcWuid;
|
|
|
|
+ StringBufferAdaptor isvWuid(wuid);
|
|
|
|
+
|
|
|
|
+ NewWsWorkunit wu(context);
|
|
|
|
+ wu->getWuid(isvWuid);
|
|
|
|
+ resolveQueryWuid(srcWuid, queryset, query, true, wu);
|
|
|
|
+ copyWsWorkunit(context, *wu, srcWuid);
|
|
|
|
+ wu.clear();
|
|
|
|
+
|
|
|
|
+ submitWsWorkunit(context, wuid.str(), cluster, NULL, 0, false, true, paramXml);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+class ExecuteExistingQueryInfo
|
|
|
|
+{
|
|
|
|
+public:
|
|
|
|
+ ExecuteExistingQueryInfo(IConstWorkUnit *cw)
|
|
|
|
+ {
|
|
|
|
+ SCMStringBuffer isv;
|
|
|
|
+ cw->getJobName(isv);
|
|
|
|
+ const char *name = isv.str();
|
|
|
|
+ const char *div = strchr(name, '.');
|
|
|
|
+ if (div)
|
|
|
|
+ {
|
|
|
|
+ queryset.set(name, div-name);
|
|
|
|
+ query.set(div+1);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+public:
|
|
|
|
+ StringAttr queryset;
|
|
|
|
+ StringAttr query;
|
|
|
|
+};
|
|
|
|
+
|
|
typedef enum _WuActionType
|
|
typedef enum _WuActionType
|
|
{
|
|
{
|
|
ActionDelete=0,
|
|
ActionDelete=0,
|
|
@@ -529,9 +663,8 @@ bool CWsWorkunitsEx::onWUUpdate(IEspContext &context, IEspWUUpdateRequest &req,
|
|
wu->setClusterName(s.str());
|
|
wu->setClusterName(s.str());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- const char *xmlParams = req.getXmlParams();
|
|
|
|
- if (notEmpty(xmlParams))
|
|
|
|
- wu->setXmlParams(xmlParams);
|
|
|
|
|
|
+
|
|
|
|
+ setWsWuXmlParameters(wu, req.getXmlParams(), (req.getAction()==WUActionExecuteExisting));
|
|
|
|
|
|
if (notEmpty(req.getQueryText()))
|
|
if (notEmpty(req.getQueryText()))
|
|
{
|
|
{
|
|
@@ -909,7 +1042,21 @@ bool CWsWorkunitsEx::onWUSubmit(IEspContext &context, IEspWUSubmitRequest &req,
|
|
if (isEmpty(req.getCluster()))
|
|
if (isEmpty(req.getCluster()))
|
|
throw MakeStringException(ECLWATCH_INVALID_INPUT,"No Cluster defined.");
|
|
throw MakeStringException(ECLWATCH_INVALID_INPUT,"No Cluster defined.");
|
|
|
|
|
|
- submitWsWorkunit(context, req.getWuid(), req.getCluster(), req.getSnapshot(), req.getMaxRunTime(), true, false);
|
|
|
|
|
|
+ Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
|
|
|
|
+ Owned<IConstWorkUnit> cw = factory->openWorkUnit(req.getWuid(), false);
|
|
|
|
+ if (cw->getAction()==WUActionExecuteExisting)
|
|
|
|
+ {
|
|
|
|
+ ExecuteExistingQueryInfo info(cw);
|
|
|
|
+ if (info.queryset.isEmpty() || info.query.isEmpty())
|
|
|
|
+ {
|
|
|
|
+ WorkunitUpdate wu(&cw->lock());
|
|
|
|
+ throw noteException(wu, MakeStringException(ECLWATCH_INVALID_INPUT,"Queryset and/or query not specified"));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ runWsWuQuery(context, cw, info.queryset.sget(), info.query.sget(), req.getCluster(), NULL);
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ submitWsWorkunit(context, cw, req.getCluster(), req.getSnapshot(), req.getMaxRunTime(), true, false);
|
|
|
|
|
|
if (req.getBlockTillFinishTimer() != 0)
|
|
if (req.getBlockTillFinishTimer() != 0)
|
|
waitForWorkUnitToComplete(req.getWuid(), req.getBlockTillFinishTimer());
|
|
waitForWorkUnitToComplete(req.getWuid(), req.getBlockTillFinishTimer());
|
|
@@ -927,84 +1074,30 @@ bool CWsWorkunitsEx::onWURun(IEspContext &context, IEspWURunRequest &req, IEspWU
|
|
{
|
|
{
|
|
try
|
|
try
|
|
{
|
|
{
|
|
- SCMStringBuffer wuid;
|
|
|
|
- wuid.set(req.getWuid());
|
|
|
|
|
|
+ const char *runWuid = req.getWuid();
|
|
|
|
+ StringBuffer wuid;
|
|
|
|
|
|
- bool cloneWorkunit=req.getCloneWorkunit();
|
|
|
|
- if (!wuid.length() && notEmpty(req.getQuerySet()) && notEmpty(req.getQuery()))
|
|
|
|
|
|
+ if (runWuid && *runWuid)
|
|
{
|
|
{
|
|
- cloneWorkunit=true;
|
|
|
|
- Owned<IPropertyTree> qstree = getQueryRegistry(req.getQuerySet(), true);
|
|
|
|
- if (qstree)
|
|
|
|
|
|
+ if (req.getCloneWorkunit())
|
|
|
|
+ runWsWorkunit(context, wuid, runWuid, req.getCluster(), req.getInput());
|
|
|
|
+ else
|
|
{
|
|
{
|
|
- IPropertyTree *query = NULL;
|
|
|
|
- VStringBuffer xpath("Alias[@name=\"%s\"]", req.getQuery());
|
|
|
|
- IPropertyTree *alias = qstree->queryPropTree(xpath.str());
|
|
|
|
- if (alias)
|
|
|
|
- {
|
|
|
|
- const char *quid = alias->queryProp("@id");
|
|
|
|
- if (!quid)
|
|
|
|
- throw MakeStringException(-1, "Alias %s/%s has no Query defined", req.getQuerySet(), req.getQuery());
|
|
|
|
- xpath.clear().appendf("Query[@id='%s']", quid);
|
|
|
|
- query = qstree->queryPropTree(xpath.str());
|
|
|
|
- if (!query)
|
|
|
|
- throw MakeStringException(-1, "Alias %s/%s refers to a non existing query %s", req.getQuerySet(), req.getQuery(), quid);
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- xpath.clear().appendf("Query[@id=\"%s\"]", req.getQuery());
|
|
|
|
- query = qstree->queryPropTree(xpath.str());
|
|
|
|
- }
|
|
|
|
- if (query)
|
|
|
|
- {
|
|
|
|
- if (query->getPropBool("@suspended"))
|
|
|
|
- throw MakeStringException(-1, "Query %s/%s is currently suspended", req.getQuerySet(), req.getQuery());
|
|
|
|
-
|
|
|
|
- wuid.set(query->queryProp("@wuid"));
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- throw MakeStringException(-1, "Query %s/%s not found", req.getQuerySet(), req.getQuery());
|
|
|
|
|
|
+ submitWsWorkunit(context, runWuid, req.getCluster(), NULL, 0, false, true, req.getInput());
|
|
|
|
+ wuid.set(runWuid);
|
|
}
|
|
}
|
|
- else
|
|
|
|
- throw MakeStringException(-1, "QuerySet %s not found", req.getQuerySet());
|
|
|
|
}
|
|
}
|
|
-
|
|
|
|
- if (!wuid.length())
|
|
|
|
|
|
+ else if (notEmpty(req.getQuerySet()) && notEmpty(req.getQuery()))
|
|
|
|
+ runWsWuQuery(context, wuid, req.getQuerySet(), req.getQuery(), req.getCluster(), req.getInput());
|
|
|
|
+ else
|
|
throw MakeStringException(ECLWATCH_MISSING_PARAMS,"Workunit or Query required");
|
|
throw MakeStringException(ECLWATCH_MISSING_PARAMS,"Workunit or Query required");
|
|
|
|
|
|
- ensureWsWorkunitAccess(context, wuid.str(), SecAccess_Write);
|
|
|
|
-
|
|
|
|
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
|
|
|
|
- if(cloneWorkunit)
|
|
|
|
- {
|
|
|
|
- Owned<IConstWorkUnit> src(factory->openWorkUnit(wuid.str(), false));
|
|
|
|
- NewWsWorkunit wu(factory, context);
|
|
|
|
- wu->getWuid(wuid);
|
|
|
|
- queryExtendedWU(wu)->copyWorkUnit(src);
|
|
|
|
-
|
|
|
|
- SCMStringBuffer token;
|
|
|
|
- wu->setSecurityToken(createToken(wuid.str(), context.queryUserId(), context.queryPassword(), token).str());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- Owned<IConstWorkUnit> cw(factory->openWorkUnit(wuid.str(), false));
|
|
|
|
- if (!cw)
|
|
|
|
- throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", wuid.str());
|
|
|
|
-
|
|
|
|
- if (notEmpty(req.getInput()))
|
|
|
|
- {
|
|
|
|
- Owned<IWuWebView> web = createWuWebView(*cw, NULL, getCFD(), true);
|
|
|
|
- web->addInputsFromXml(req.getInput());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- submitWsWorkunit(context, cw, req.getCluster(), NULL, 0, false, true);
|
|
|
|
- cw.clear();
|
|
|
|
-
|
|
|
|
int timeToWait = req.getWait();
|
|
int timeToWait = req.getWait();
|
|
if (timeToWait != 0)
|
|
if (timeToWait != 0)
|
|
waitForWorkUnitToComplete(wuid.str(), timeToWait);
|
|
waitForWorkUnitToComplete(wuid.str(), timeToWait);
|
|
|
|
|
|
-
|
|
|
|
- cw.set(factory->openWorkUnit(wuid.str(), false));
|
|
|
|
|
|
+ Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
|
|
|
|
+ Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
|
|
if (!cw)
|
|
if (!cw)
|
|
throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", wuid.str());
|
|
throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", wuid.str());
|
|
|
|
|