Procházet zdrojové kódy

HPCC-8902 #workunit ('cluster', 'xxx') not supported by eclccserver

Make sure that the cluster info is cloned from the generated workunit.

Add code to eclccserver to reload the cluster after compilation, and
check it is valid.

Add code to ecl run to reload the cluster after compilation.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman před 11 roky
rodič
revize
f3b81897ad

+ 13 - 3
common/workunit/workunit.cpp

@@ -4898,6 +4898,11 @@ static void updateProp(IPropertyTree * to, const IPropertyTree * from, const cha
         to->setProp(xpath, from->queryProp(xpath));
 }
 
+static void setProp(IPropertyTree * to, const IPropertyTree * from, const char * xpath)
+{
+    if (from->hasProp(xpath))
+        to->setProp(xpath, from->queryProp(xpath));
+}
 
 static void copyTree(IPropertyTree * to, const IPropertyTree * from, const char * xpath)
 {
@@ -4959,6 +4964,8 @@ void CLocalWorkUnit::copyWorkUnit(IConstWorkUnit *cached, bool all)
     copyTree(p, fromP, "Workflow");
     if (all)
     {
+        // 'all' mode is used when setting up a dali WU from the embedded wu in a workunit dll
+
         // Merge timing info from both branches
         pt = fromP->getBranch("Timings");
         if (pt)
@@ -4976,10 +4983,13 @@ void CLocalWorkUnit::copyWorkUnit(IConstWorkUnit *cached, bool all)
     updateProp(p, fromP, "SNAPSHOT");
 
     //MORE: This is very adhoc.  All options that should be cloned should really be in a common branch
-    if (all && (fromP->hasProp("PriorityFlag") || fromP->hasProp("@priorityClass")))
+    if (all)
     {
-        updateProp(p, fromP, "PriorityFlag");
-        updateProp(p, fromP, "@priorityClass");
+        setProp(p, fromP, "PriorityFlag");
+        setProp(p, fromP, "@priorityClass");
+        setProp(p, fromP, "@protected");
+        setProp(p, fromP, "@clusterName");
+        updateProp(p, fromP, "@scope");
     }
 
     //Variables may have been set up as parameters to the query - so need to preserve any values that were supplied.

+ 29 - 6
ecl/eclccserver/eclccserver.cpp

@@ -358,6 +358,14 @@ class EclccCompileThread : public CInterface, implements IPooledThread, implemen
         return false;
     }
 
+    void failCompilation(const char *error)
+    {
+        reportError(error, 2);
+        workunit->setState(WUStateFailed);
+        workunit->commit();
+        workunit.clear();
+    }
+
 public:
     IMPLEMENT_IINTERFACE;
     EclccCompileThread(unsigned _idx)
@@ -397,12 +405,8 @@ public:
         Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(clusterName.str());
         if (!clusterInfo)
         {
-            StringBuffer errStr;
-            errStr.appendf("Cluster %s not recognized", clusterName.str());
-            reportError(errStr, 2);
-            workunit->setState(WUStateFailed);
-            workunit->commit();
-            workunit.clear();
+            VStringBuffer errStr("Cluster %s not recognized", clusterName.str());
+            failCompilation(errStr);
             return;
         }
         ClusterType platform = clusterInfo->getPlatform();
@@ -413,6 +417,25 @@ public:
         if (ok)
         {
             workunit->setState(WUStateCompiled);
+            SCMStringBuffer newClusterName;
+            workunit->getClusterName(newClusterName);   // Workunit can change the cluster name via #workunit, so reload it
+            if (strcmp(newClusterName.str(), clusterName.str()) != 0)
+            {
+                clusterInfo.setown(getTargetClusterInfo(clusterName.str()));
+                if (!clusterInfo)
+                {
+                    VStringBuffer errStr("Cluster %s by #workunit not recognized", clusterName.str());
+                    failCompilation(errStr);
+                    return;
+                }
+                if (platform != clusterInfo->getPlatform())
+                {
+                    VStringBuffer errStr("Cluster %s specified by #workunit is wrong type for this queue", clusterName.str());
+                    failCompilation(errStr);
+                    return;
+                }
+                clusterInfo.clear();
+            }
             if (workunit->getAction()==WUActionRun || workunit->getAction()==WUActionUnknown)  // Assume they meant run....
             {
                 if (isLibrary(workunit))

+ 11 - 6
ecl/eclcmd/eclcmd_core.cpp

@@ -27,7 +27,7 @@
 #include "eclcmd_core.hpp"
 
 
-bool doDeploy(EclCmdWithEclTarget &cmd, IClientWsWorkunits *client, const char *cluster, const char *name, StringBuffer *wuid, bool noarchive, bool displayWuid=true)
+bool doDeploy(EclCmdWithEclTarget &cmd, IClientWsWorkunits *client, const char *cluster, const char *name, StringBuffer *wuid, StringBuffer *wucluster, bool noarchive, bool displayWuid=true)
 {
     StringBuffer s;
     if (cmd.optVerbose)
@@ -83,7 +83,9 @@ bool doDeploy(EclCmdWithEclTarget &cmd, IClientWsWorkunits *client, const char *
     if (w && *w)
     {
         if (wuid)
-            wuid->append(w);
+            wuid->clear().append(w);
+        if (wucluster)
+            wucluster->clear().append(resp->getWorkunit().getCluster());
         fprintf(stdout, "\n");
         if (cmd.optVerbose)
             fprintf(stdout, "Deployed\n   wuid: ");
@@ -162,7 +164,7 @@ public:
     virtual int processCMD()
     {
         Owned<IClientWsWorkunits> client = createCmdClient(WsWorkunits, *this);
-        return doDeploy(*this, client, optTargetCluster.get(), optName.get(), NULL, optNoArchive) ? 0 : 1;
+        return doDeploy(*this, client, optTargetCluster.get(), optName.get(), NULL, NULL, optNoArchive) ? 0 : 1;
     }
     virtual void usage()
     {
@@ -298,7 +300,7 @@ public:
         StringBuffer wuid;
         if (optObj.type==eclObjWuid)
             wuid.set(optObj.value.get());
-        else if (!doDeploy(*this, client, optTargetCluster.get(), optName.get(), &wuid, optNoArchive))
+        else if (!doDeploy(*this, client, optTargetCluster.get(), optName.get(), &wuid, NULL, optNoArchive))
             return 1;
 
         StringBuffer descr;
@@ -469,6 +471,7 @@ public:
         req->setNoRootTag(optNoRoot);
 
         StringBuffer wuid;
+        StringBuffer wuCluster;
         StringBuffer queryset;
         StringBuffer query;
 
@@ -488,14 +491,16 @@ public:
         else
         {
             req->setCloneWorkunit(false);
-            if (!doDeploy(*this, client, optTargetCluster.get(), optName.get(), &wuid, optNoArchive, optVerbose))
+            if (!doDeploy(*this, client, optTargetCluster.get(), optName.get(), &wuid, &wuCluster, optNoArchive, optVerbose))
                 return 1;
             req->setWuid(wuid.str());
             if (optVerbose)
                 fprintf(stdout, "Running deployed workunit %s\n", wuid.str());
         }
 
-        if (optTargetCluster.length())
+        if (wuCluster.length())
+            req->setCluster(wuCluster.str());
+        else if (optTargetCluster.length())
             req->setCluster(optTargetCluster.get());
         req->setWait((int)optWaitTime);
         if (optInput.length())