Browse Source

Merge pull request #421 from stuartort/issue245

Fix #245 Enable running of queries to roxie via eclplus

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 13 years ago
parent
commit
0af93056c9

+ 9 - 1
common/roxiemanager/roxiequerymanager.cpp

@@ -203,12 +203,20 @@ public:
         return true;
     }
 
-    bool deployWorkunit(SCMStringBuffer &wuid,  SCMStringBuffer &roxieQueryName, IRoxieQueryProcessingInfo &processingInfo, const char *userId, WUQueryActivationOptions activateOption, bool allowNewRoxieOnDemandQuery, const char *querySetName, bool notifyRoxie, SCMStringBuffer &status, SCMStringBuffer &roxieDeployStatus)
+    bool deployWorkunit(SCMStringBuffer &wuid,  SCMStringBuffer &roxieQueryName, IRoxieQueryProcessingInfo &processingInfo, const char *userId, WUQueryActivationOptions activateOption, const char *querySetName, bool notifyRoxie, SCMStringBuffer &status, SCMStringBuffer &roxieDeployStatus)
     {
         if (!wuid.length())
             throw MakeStringException(ROXIEMANAGER_MISSING_ID, "Missing workunit id");
 
         Owned<IConstWorkUnit> workunit = getWorkUnit(wuid.str());
+        return publishWorkunit(workunit, roxieQueryName, processingInfo, userId, activateOption, querySetName, notifyRoxie, status, roxieDeployStatus);
+    }
+
+    bool publishWorkunit(IConstWorkUnit *workunit,  SCMStringBuffer &roxieQueryName, IRoxieQueryProcessingInfo &processingInfo, const char *userId, WUQueryActivationOptions activateOption, const char *querySetName, bool notifyRoxie, SCMStringBuffer &status, SCMStringBuffer &roxieDeployStatus)
+    {
+        if (!workunit)
+            throw MakeStringException(ROXIEMANAGER_MISSING_ID, "Missing workunit id");
+
         if (!roxieQueryName.length()) {
             SCMStringBuffer jobName;
             roxieQueryName.set(workunit->getJobName(jobName).str());

+ 8 - 4
ecl/eclplus/QueryHelper.cpp

@@ -53,10 +53,9 @@ bool QueryHelper::doit(FILE * fp)
     ureq->setWuid(wu->getWuid());
 
     // Make a workUnit
+    StringBuffer jobname;
     if(globals->hasProp("jobname"))
-    {
-        ureq->setJobname(globals->queryProp("jobname"));
-    }
+        jobname.append(globals->queryProp("jobname"));
 
     StringBuffer ecl;
     if (globals->getProp("ecl", ecl))
@@ -65,6 +64,8 @@ bool QueryHelper::doit(FILE * fp)
         {
             StringBuffer filename(ecl.str()+1);
             ecl.clear().loadFile(filename);
+            if (jobname.length() == 0)
+                splitFilename(filename, NULL, NULL, &jobname, NULL);
         }
         ureq->setQueryText(ecl.str());
     }
@@ -73,6 +74,8 @@ bool QueryHelper::doit(FILE * fp)
 
     if (globals->getPropInt("compileOnly", 0)!=0)
         ureq->setAction(WUActionCompile);
+    if (jobname.length())
+        ureq->setJobname(jobname);
 
     IArrayOf<IEspDebugValue> dvals;
     IArrayOf<IEspApplicationValue> avals;
@@ -194,6 +197,7 @@ bool QueryHelper::doSubmitWorkUnit(FILE * fp, const char * wuid, const char* clu
     Owned<IClientWUSubmitRequest> req = wuclient->createWUSubmitRequest();
     req->setWuid(wuid);
     req->setCluster(cluster);
+    req->setNotifyCluster(true);
     if(globals->hasProp("snapshot"))
         req->setSnapshot(globals->queryProp("snapshot"));
 
@@ -241,7 +245,7 @@ bool QueryHelper::doSubmitWorkUnit(FILE * fp, const char * wuid, const char* clu
         else
         {
             WUState s;
-            
+
             int initial_wait = 30;
             int polling_period = 30;
             int waited = 0;

+ 2 - 1
esp/scm/roxiemanagerscm.ecm

@@ -135,7 +135,8 @@ SCMinterface IRoxieQueryManager(IInterface)
     bool deployQuery(SCMStringBuffer &wuid, SCMStringBuffer &roxieQueryName, IRoxieQueryCompileInfo &compileInfo,
                      IRoxieQueryProcessingInfo &processingInfo, const char *userId, WUQueryActivationOptions activateOption, bool allowNewRoxieOnDemandQuery, const char *targetClusterName, const char *querySetName, bool notifyRoxie, SCMStringBuffer &status, SCMStringBuffer &roxieDeployStatus);
 
-    bool deployWorkunit(SCMStringBuffer &wuid,  SCMStringBuffer &roxieQueryName, IRoxieQueryProcessingInfo &processingInfo, const char *userId, WUQueryActivationOptions activateOption, bool allowNewRoxieOnDemandQuery, const char *querySetName, bool notifyRoxie, SCMStringBuffer &status, SCMStringBuffer &roxieDeployStatus);
+    bool deployWorkunit(SCMStringBuffer &wuid,  SCMStringBuffer &roxieQueryName, IRoxieQueryProcessingInfo &processingInfo, const char *userId, WUQueryActivationOptions activateOption, const char *querySetName, bool notifyRoxie, SCMStringBuffer &status, SCMStringBuffer &roxieDeployStatus);
+    bool publishWorkunit(IConstWorkUnit *workunit,  SCMStringBuffer &roxieQueryName, IRoxieQueryProcessingInfo &processingInfo, const char *userId, WUQueryActivationOptions activateOption, const char *querySetName, bool notifyRoxie, SCMStringBuffer &status, SCMStringBuffer &roxieDeployStatus);
     
     bool publishFromQuerySet(SCMStringBuffer &name, SCMStringBuffer &roxieQueryName, IRoxieQueryProcessingInfo &processingInfo, const char *userId, WUQueryActivationOptions activateOption, const char *sourceQuerySetName, const char *targetQuerySetName, const char *sourceDaliIP, const char *queryComment, bool notifyRoxie, SCMStringBuffer &status, SCMStringBuffer &roxieDeployStatus);
     

+ 2 - 1
esp/scm/ws_workunits.ecm

@@ -473,6 +473,7 @@ ESPrequest WUSubmitRequest
     int    MaxRunTime;
     [min_ver("1.02")] int BlockTillFinishTimer(0);
     [min_ver("1.19")] bool SyntaxCheck(false);
+    bool NotifyCluster(false);
 };
 
 ESPresponse [exceptions_inline] WUSubmitResponse
@@ -965,7 +966,7 @@ ESPrequest WUDeployWorkunitRequest
     string Wuid;
     string JobName;
     int Activate;
-
+    bool NotifyCluster(false);
 };
 
 ESPresponse [exceptions_inline] WUDeployWorkunitResponse

+ 118 - 50
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -693,44 +693,94 @@ void adjustRowvalues(IPropertyTree* xgmml, const char* popupId)
         }
     }
 }
-void submitQueryWU(IConstWorkUnit *workunit, IEspContext& context, CRoxieQuery& roxieQuery, bool allowNewRoxieOnDemandQuery)
+
+int waitCompileWU(const char *wuid, IEspContext &context, bool wait)
 {
+    try
+    {
+        secWaitForWorkUnitToCompile(wuid, *context.querySecManager(), *context.queryUser(), -1);
+        return CWUWrapper(wuid, context)->getState();
+    }
+    catch(IException* e)
+    {
+        FORWARDEXCEPTION(context, e,  ECLWATCH_INTERNAL_ERROR);
+    }
+
+    return WUStateFailed;
+}
+
+void submitQueryWU(const char *wuid, IEspContext& context, CRoxieQuery& roxieQuery, bool allowNewRoxieOnDemandQuery)
+{
+    Owned<IWorkUnitFactory> factory = getSecWorkUnitFactory(*context.querySecManager(), *context.queryUser());
+    Owned<IConstWorkUnit> workunit = factory->openWorkUnit(wuid, false);
     SocketEndpoint ep;
     ep.set(roxieQuery.ip);
 
-    const SocketEndpoint & ep1 = queryCoven().queryComm().queryGroup().queryNode(0).endpoint();
     StringBuffer daliIp;
-    ep1.getUrlStr(daliIp);
+    if (workunit->hasDebugValue("lookupDaliIp"))
+    {
+        SCMStringBuffer ip;
+        workunit->getDebugValue("lookupDaliIp", ip);
+        daliIp.append(ip.str());
+    }
+    else
+    {
+        const SocketEndpoint & ep1 = queryCoven().queryComm().queryGroup().queryNode(0).endpoint();
+        ep1.getUrlStr(daliIp);
+    }
 
     StringBuffer user;
     StringBuffer password;
     context.getUserID(user);
     context.getPassword(password);
 
+    SCMStringBuffer jobName;
+    workunit->getJobName(jobName);
+    if (jobName.length() == 0)
+    {
+        jobName.set(roxieQuery.jobName.str());
+        if (!jobName.length())
+            jobName.set(wuid);
+    }
+
     Owned<IRoxieQueryManager> manager = createRoxieQueryManager(ep, roxieQuery.roxieClusterName.str(), daliIp, roxieQuery.roxieTimeOut, user.str(), password.str(), 1);  // MORE - use 1 as default traceLevel
     SCMStringBuffer result; // not used since we request result to be placed in WU - MORE - clean this up
     if (roxieQuery.action == WUActionExecuteExisting)
     {
-        manager->runQuery(workunit, roxieQuery.jobName.str(), false, allowNewRoxieOnDemandQuery, result);
+        manager->runQuery(workunit, jobName.str(), false, allowNewRoxieOnDemandQuery, result);
     }
-    else
+    else if (roxieQuery.action != WUActionCompile)
     {
-        Owned<IRoxieQueryCompileInfo> compileInfo = createRoxieQueryCompileInfo(roxieQuery.ecl.str(),
-                                                                                roxieQuery.jobName.str(),
-                                                                                roxieQuery.clusterName.str(),
-                                                                                "WS_WORKUNITS");
-        compileInfo->setWuTimeOut(roxieQuery.wuTimeOut);
-
         Owned<IRoxieQueryProcessingInfo> processingInfo = createRoxieQueryProcessingInfo();
         processingInfo->setResolveFileInfo(true);
-        SCMStringBuffer generatedQueryName;
-        generatedQueryName.set(roxieQuery.jobName.str());
-        if (!generatedQueryName.length())
-            generatedQueryName.set(roxieQuery.wuid.str());
+        processingInfo->setLoadDataOnly(false);
+        processingInfo->setResolveFileInfo(true);
+        processingInfo->setNoForms(false);
+        processingInfo->setDfsDaliIp(daliIp.str());
+        processingInfo->setResolveKeyDiffInfo(false);
+        processingInfo->setCopyKeyDiffLocationInfo(false);
+        processingInfo->setLayoutTranslationEnabled(false);
 
-        manager->compileQuery(roxieQuery.wuid, generatedQueryName, *compileInfo.get(), *processingInfo.get(), roxieQuery.clusterName.str(), result); 
-    }
+        SCMStringBuffer wuCluster;
+        workunit->getClusterName(wuCluster);
+        Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(wuCluster.str());
+        SCMStringBuffer querySetName;
+        clusterInfo->getQuerySetName(querySetName);
+
+        SCMStringBuffer deployStatus;
+
+        manager->publishWorkunit(workunit, jobName, *processingInfo.get(), user.str(), DO_NOT_ACTIVATE, querySetName.str(), true, result, deployStatus);
+        SCMStringBuffer currentQueryName;
+        workunit->getDebugValue("queryid", currentQueryName);
+        manager->runQuery(workunit, currentQueryName.str(), false, allowNewRoxieOnDemandQuery, result);
+        if (!workunit->getDebugValueBool("@leaveWuInQuerySet", 0))
+            manager->deleteQuery(currentQueryName.str(), querySetName.str(), true, deployStatus);
 
+        Owned<IWorkUnit> wu0(factory->updateWorkUnit(wuid));
+        wu0->setState(WUStateCompleted);
+        wu0->commit();
+        wu0.clear();
+    }
 }
 
 void compileScheduledQueryWU(IEspContext& context, CRoxieQuery& roxieQuery)
@@ -4408,24 +4458,35 @@ bool CWsWorkunitsEx::onWUDeployWorkunit(IEspContext &context, IEspWUDeployWorkun
         SCMStringBuffer status;
         SCMStringBuffer roxieDeployStatus;
 
-        const SocketEndpoint & ep1 = queryCoven().queryComm().queryGroup().queryNode(0).endpoint();
         StringBuffer daliIp;
+        const SocketEndpoint &ep1 = queryCoven().queryComm().queryGroup().queryNode(0).endpoint();
         ep1.getUrlStr(daliIp);
     
         SocketEndpoint ep;
+        StringBuffer netAddress;
+        getClusterConfig("RoxieCluster", wuCluster.str(), "RoxieServerProcess[1]", netAddress);
+        ep.getUrlStr(netAddress);
+
         Owned<IRoxieQueryManager> manager = createRoxieQueryManager(ep, querySetName.str(), daliIp, roxieQueryRoxieTimeOut, user.str(), password.str(), 1);
     
         Owned<IRoxieQueryProcessingInfo> processingInfo = createRoxieQueryProcessingInfo();
         processingInfo->setLoadDataOnly(false);
         processingInfo->setResolveFileInfo(true);
         processingInfo->setNoForms(false);
-        processingInfo->setDfsDaliIp(daliIp.str());
+        if (wu->hasDebugValue("lookupDaliIp"))
+        {
+            SCMStringBuffer ip;
+            wu->getDebugValue("lookupDaliIp", ip);
+            processingInfo->setDfsDaliIp(ip.str());
+        }
+        else
+            processingInfo->setDfsDaliIp(daliIp.str());
         processingInfo->setResolveKeyDiffInfo(false);
         processingInfo->setCopyKeyDiffLocationInfo(false);
         processingInfo->setLayoutTranslationEnabled(false);
 
 
-        manager->deployWorkunit(wuid, queryName, *processingInfo.get(), user.str(), MAKE_ACTIVATE, false, querySetName.str(), false, status, roxieDeployStatus);
+        manager->deployWorkunit(wuid, queryName, *processingInfo.get(), user.str(), MAKE_ACTIVATE, querySetName.str(), req.getNotifyCluster(), status, roxieDeployStatus);
     }
     else
         processWorkunit(wu, wuid.str(), queryName, wuCluster, querySetName, activateOption);
@@ -6819,44 +6880,49 @@ void CWsWorkunitsEx::submitWU(IEspContext& context, const char* wuid, const char
     
     if (isRoxie)
     {
-        Owned<IWorkUnitFactory> factory = getSecWorkUnitFactory(*context.querySecManager(), *context.queryUser());
-        Owned<IConstWorkUnit> lw = factory->openWorkUnit(wuid, false);
-        if(!lw.get())
-            throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid);
+        try
+        {
+            CRoxieQuery roxieQuery;
+            wu.setown(factory->updateWorkUnit(wuid));
+            roxieQuery.action = wu->getAction();
+            wu->setAction(WUActionCompile);
+            wu->commit();
+            wu.clear();
 
-        CRoxieQuery roxieQuery;
-        roxieQuery.wuid.set(wuid);
-        StringBuffer component_cluster;
+            if (context.querySecManager())
+            {
+                secSubmitWorkUnit(wuid, *context.querySecManager(), *context.queryUser());
+            }
 
-        getTargetClusterComponentName(cluster, eqRoxieCluster, component_cluster);
-        roxieQuery.roxieClusterName.append(component_cluster);
+            roxieQuery.wuid.set(wuid);
+            StringBuffer component_cluster;
 
-        roxieQuery.clusterName.append(cluster);
+            getTargetClusterComponentName(cluster, eqRoxieCluster, component_cluster);
+            roxieQuery.roxieClusterName.append(component_cluster);
 
-        roxieQuery.roxieTimeOut = roxieQueryRoxieTimeOut; //Hardcoded for now
-        roxieQuery.wuTimeOut = roxieQueryWUTimeOut;
+            roxieQuery.clusterName.append(cluster);
 
-        SCMStringBuffer qname;
-        Owned<IConstWUQuery> query=lw->getQuery();
-        if (query)
-        {
-            query->getQueryText(qname);
-            if(qname.length())
-            {
-                roxieQuery.ecl.append(qname.str());
-            }
+            roxieQuery.roxieTimeOut = roxieQueryRoxieTimeOut; //Hardcoded for now
+            roxieQuery.wuTimeOut = roxieQueryWUTimeOut;
+
+            StringBuffer netAddress;
+            getClusterConfig("RoxieCluster", component_cluster, "RoxieServerProcess[1]", netAddress);
+            roxieQuery.ip = netAddress.str();
+
+            submitQueryWU(wuid, context, roxieQuery, m_allowNewRoxieOnDemandQuery);
         }
+        catch(IException *e)
+        {
+            Owned<IWorkUnit> wu0(factory->updateWorkUnit(wuid));
+            if(!wu0.get())
+                throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.",wuid);
 
-        SCMStringBuffer jn;
-        lw->getJobName(jn);
-        if (jn.length())
-            roxieQuery.jobName.set(lw->getJobName(jn).str());
-        roxieQuery.action = lw->getAction();
+            wu0->setState(WUStateFailed);
+            wu0->commit();
+            wu0.clear();
 
-        StringBuffer netAddress;
-        getClusterConfig("RoxieCluster", component_cluster, "RoxieServerProcess[1]", netAddress);
-        roxieQuery.ip = netAddress.str();
-        submitQueryWU(lw, context, roxieQuery, m_allowNewRoxieOnDemandQuery);
+            throw e;
+        }
     }
     else if (!compile)
     {
@@ -8059,6 +8125,8 @@ bool CWsWorkunitsEx::onWUWaitComplete(IEspContext &context, IEspWUWaitRequest &r
 {
     try
     {
+        SCMStringBuffer wuid;
+        wuid.set(req.getWuid());
         resp.setStateID(secWaitForWorkUnitToComplete(req.getWuid(), *context.querySecManager(), *context.queryUser(),req.getWait(), req.getReturnOnWait()));
     }
     catch(IException* e)