Pārlūkot izejas kodu

HPCC-23671 Remove dependency on environment from workunit

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 5 gadi atpakaļ
vecāks
revīzija
46af33eb9c

+ 59 - 79
common/environment/environment.cpp

@@ -30,6 +30,7 @@
 #include "dafdesc.hpp"
 #include "dasds.hpp"
 #include "dalienv.hpp"
+#include "daqueue.hpp"
 
 #include <string>
 #include <unordered_map>
@@ -1024,6 +1025,54 @@ public:
 
 //==========================================================================================
 
+extern ENVIRONMENT_API unsigned __int64 readSizeSetting(const char * sizeStr, const unsigned long defaultSize)
+{
+    StringBuffer buf(sizeStr);
+    buf.trim();
+
+    if (buf.isEmpty())
+        return defaultSize;
+
+    const char* ptrStart = buf;
+    const char* ptrAfterDigit = ptrStart;
+    while (*ptrAfterDigit && isdigit(*ptrAfterDigit))
+        ptrAfterDigit++;
+
+    if (!*ptrAfterDigit)
+        return atol(buf);
+
+    const char* ptr = ptrAfterDigit;
+    while (*ptr && (ptr[0] == ' '))
+        ptr++;
+
+    char c = ptr[0];
+    buf.setLength(ptrAfterDigit - ptrStart);
+    unsigned __int64 size = atoll(buf);
+    switch (c)
+    {
+    case 'k':
+    case 'K':
+        size *= 1000;
+        break;
+    case 'm':
+    case 'M':
+        size *= 1000000;
+        break;
+    case 'g':
+    case 'G':
+        size *= 1000000000;
+        break;
+    case 't':
+    case 'T':
+        size *= 1000000000000;
+        break;
+    default:
+        break;
+    }
+    return size;
+}
+
+
 class CConstInstanceInfo : public CConstEnvBase, implements IConstInstanceInfo
 {
 public:
@@ -2481,53 +2530,6 @@ extern ENVIRONMENT_API void closeEnvironment()
     }
 }
 
-extern ENVIRONMENT_API unsigned __int64 readSizeSetting(const char * sizeStr, const unsigned long defaultSize)
-{
-    StringBuffer buf(sizeStr);
-    buf.trim();
-
-    if (buf.isEmpty())
-        return defaultSize;
-
-    const char* ptrStart = buf;
-    const char* ptrAfterDigit = ptrStart;
-    while (*ptrAfterDigit && isdigit(*ptrAfterDigit))
-        ptrAfterDigit++;
-
-    if (!*ptrAfterDigit)
-        return atol(buf);
-
-    const char* ptr = ptrAfterDigit;
-    while (*ptr && (ptr[0] == ' '))
-        ptr++;
-
-    char c = ptr[0];
-    buf.setLength(ptrAfterDigit - ptrStart);
-    unsigned __int64 size = atoll(buf);
-    switch (c)
-    {
-    case 'k':
-    case 'K':
-        size *= 1000;
-        break;
-    case 'm':
-    case 'M':
-        size *= 1000000;
-        break;
-    case 'g':
-    case 'G':
-        size *= 1000000000;
-        break;
-    case 't':
-    case 'T':
-        size *= 1000000000000;
-        break;
-    default:
-        break;
-    }
-    return size;
-}
-
 unsigned getAccessibleServiceURLList(const char *serviceType, std::vector<std::string> &list)
 {
     unsigned added = 0;
@@ -2706,6 +2708,14 @@ public:
             platform = HThorCluster;
         }
 
+#ifdef _CONTAINERIZED
+        if (agent || roxie)
+        {
+            agentQueue.set(getClusterEclAgentQueueName(queue.clear(), name));
+            if (agent)
+                agentName.set(agent->queryProp("@name"));
+        }
+#else
         if (agent)
         {
             assertex(!roxie);
@@ -2714,6 +2724,8 @@ public:
         }
         else if (roxie)
             agentQueue.set(getClusterRoxieQueueName(queue.clear(), name));
+#endif
+
         // MORE - does this need to be conditional?
         serverQueue.set(getClusterEclCCServerQueueName(queue.clear(), name));
     }
@@ -2830,13 +2842,6 @@ IStringVal &getProcessQueueNames(IStringVal &ret, const char *process, const cha
     return ret;
 }
 
-#define ROXIE_QUEUE_EXT ".roxie"
-#define THOR_QUEUE_EXT ".thor"
-#define ECLCCSERVER_QUEUE_EXT ".eclserver"
-#define ECLSERVER_QUEUE_EXT ECLCCSERVER_QUEUE_EXT
-#define ECLSCHEDULER_QUEUE_EXT ".eclscheduler"
-#define ECLAGENT_QUEUE_EXT ".agent"
-
 extern void getDFUServerQueueNames(StringArray &ret, const char *process)
 {
     Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
@@ -2887,11 +2892,6 @@ extern IStringVal &getThorQueueNames(IStringVal &ret, const char *process)
     return getProcessQueueNames(ret, process, "ThorCluster", THOR_QUEUE_EXT);
 }
 
-extern StringBuffer &getClusterThorQueueName(StringBuffer &ret, const char *cluster)
-{
-    return ret.append(cluster).append(THOR_QUEUE_EXT);
-}
-
 extern StringBuffer &getClusterThorGroupName(StringBuffer &ret, const char *cluster)
 {
     Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
@@ -2955,26 +2955,6 @@ extern ClusterType getClusterTypeByClusterName(const char *cluster)
     return NoCluster;
 }
 
-extern StringBuffer &getClusterRoxieQueueName(StringBuffer &ret, const char *cluster)
-{
-    return ret.append(cluster).append(ROXIE_QUEUE_EXT);
-}
-
-extern StringBuffer &getClusterEclCCServerQueueName(StringBuffer &ret, const char *cluster)
-{
-    return ret.append(cluster).append(ECLCCSERVER_QUEUE_EXT);
-}
-
-extern StringBuffer &getClusterEclServerQueueName(StringBuffer &ret, const char *cluster)
-{
-    return ret.append(cluster).append(ECLSERVER_QUEUE_EXT);
-}
-
-extern StringBuffer &getClusterEclAgentQueueName(StringBuffer &ret, const char *cluster)
-{
-    return ret.append(cluster).append(ECLAGENT_QUEUE_EXT);
-}
-
 extern IStringIterator *getTargetClusters(const char *processType, const char *processName)
 {
     Owned<CStringArrayIterator> ret = new CStringArrayIterator;

+ 0 - 7
common/environment/environment.hpp

@@ -227,8 +227,6 @@ class StringBuffer;
 extern "C" ENVIRONMENT_API IEnvironmentFactory * getEnvironmentFactory(bool update);
 extern "C" ENVIRONMENT_API void closeEnvironment();
 
-extern ENVIRONMENT_API unsigned __int64 readSizeSetting(const char * sizeStr, const unsigned long defaultSize);
-
 extern ENVIRONMENT_API unsigned getAccessibleServiceURLList(const char *serviceType, std::vector<std::string> &list);
 
 //------------------- Moved from workunit.hpp -------------
@@ -274,12 +272,7 @@ extern ENVIRONMENT_API IStringVal &getRoxieQueueNames(IStringVal &ret, const cha
 extern ENVIRONMENT_API IStringVal &getThorQueueNames(IStringVal &ret, const char *process);
 extern ENVIRONMENT_API ClusterType getClusterTypeByClusterName(const char *cluster);
 extern ENVIRONMENT_API StringBuffer &getClusterGroupName(StringBuffer &ret, const char *cluster);
-extern ENVIRONMENT_API StringBuffer &getClusterThorQueueName(StringBuffer &ret, const char *cluster);
 extern ENVIRONMENT_API StringBuffer &getClusterThorGroupName(StringBuffer &ret, const char *cluster);
-extern ENVIRONMENT_API StringBuffer &getClusterRoxieQueueName(StringBuffer &ret, const char *cluster);
-extern ENVIRONMENT_API StringBuffer &getClusterEclCCServerQueueName(StringBuffer &ret, const char *cluster);
-extern ENVIRONMENT_API StringBuffer &getClusterEclServerQueueName(StringBuffer &ret, const char *cluster);
-extern ENVIRONMENT_API StringBuffer &getClusterEclAgentQueueName(StringBuffer &ret, const char *cluster);
 extern ENVIRONMENT_API IStringIterator *getTargetClusters(const char *processType, const char *processName);
 extern ENVIRONMENT_API bool validateTargetClusterName(const char *clustname);
 extern ENVIRONMENT_API IConstWUClusterInfo* getTargetClusterInfo(const char *clustname);

+ 8 - 2
common/workunit/CMakeLists.txt

@@ -43,7 +43,6 @@ set (    SRCS
     )
 
 include_directories ( 
-         ./../../common/environment
          ./../../common/workunit
          ./../../system/mp 
          ./../../dali/ft 
@@ -61,6 +60,10 @@ include_directories (
          ./../../rtl/nbcd
     )
 
+if (NOT CONTAINERIZED)
+    include_directories ( ./../../common/environment )
+endif()
+
 HPCC_ADD_LIBRARY( workunit SHARED ${SRCS} )
 set_target_properties(workunit PROPERTIES 
     COMPILE_FLAGS -D_USRDLL
@@ -78,5 +81,8 @@ if(NOT PLUGIN)
         nbcd 
         eclrtl 
         deftype
-        environment)
+        )
+    if (NOT CONTAINERIZED)
+        target_link_libraries(workunit environment)
+    endif()
 endif()

+ 60 - 31
common/workunit/workunit.cpp

@@ -48,7 +48,10 @@
 
 #include "wuerror.hpp"
 #include "wujobq.hpp"
+#ifndef _CONTAINERIZED
 #include "environment.hpp"
+#endif
+#include "daqueue.hpp"
 #include "workunit.ipp"
 #include "digisign.hpp"
 
@@ -7002,20 +7005,31 @@ const char *CLocalWorkUnit::queryActionDesc() const
     return p->queryProp("Action");
 }
 
-IStringVal& CLocalWorkUnit::getApplicationValue(const char *app, const char *propname, IStringVal &str) const
+bool CLocalWorkUnit::hasApplicationValue(const char *app, const char *propname) const
 {
+    StringBuffer prop("Application/");
+    prop.append(app).append('/').append(propname);
+
     CriticalBlock block(crit);
+    return p->hasProp(prop);
+}
+
+IStringVal& CLocalWorkUnit::getApplicationValue(const char *app, const char *propname, IStringVal &str) const
+{
     StringBuffer prop("Application/");
     prop.append(app).append('/').append(propname);
+
+    CriticalBlock block(crit);
     str.set(p->queryProp(prop.str())); 
     return str;
 }
 
 int CLocalWorkUnit::getApplicationValueInt(const char *app, const char *propname, int defVal) const
 {
-    CriticalBlock block(crit);
     StringBuffer prop("Application/");
     prop.append(app).append('/').append(propname);
+
+    CriticalBlock block(crit);
     return p->getPropInt(prop.str(), defVal); 
 }
 
@@ -7276,22 +7290,38 @@ wuTokenStates verifyWorkunitDAToken(const char * ctxUser, const char * daToken)
     return wuActive ? wuTokenValid : wuTokenWorkunitInactive;
 }
 
+bool CLocalWorkUnit::resolveFilePrefix(StringBuffer & prefix, const char * queue) const
+{
+    if (hasApplicationValue("prefix", queue))
+    {
+        getApplicationValue("prefix", queue, StringBufferAdaptor(prefix));
+        return true;
+    }
+
+#ifndef _CONTAINERIZED
+    Owned<IConstWUClusterInfo> ci = getTargetClusterInfo(queue);
+    if (ci)
+    {
+        ci->getScope(StringBufferAdaptor(prefix));
+        return true;
+    }
+#endif
+    return false;
+}
+
 IStringVal& CLocalWorkUnit::getScope(IStringVal &str) const 
 {
+    StringBuffer prefix;
     CriticalBlock block(crit);
     if (p->hasProp("Debug/ForceScope"))
     {
-        StringBuffer prefix(p->queryProp("Debug/ForceScope"));
-        str.set(prefix.toLowerCase().str()); 
+        prefix.append(p->queryProp("Debug/ForceScope")).toLowerCase();
     }
     else
     {
-        Owned <IConstWUClusterInfo> ci = getTargetClusterInfo(p->queryProp("@clusterName"));
-        if (ci)
-            ci->getScope(str); 
-        else
-            str.clear();
+        resolveFilePrefix(prefix, p->queryProp("@clusterName"));
     }
+    str.set(prefix.str());
     return str;
 }
 
@@ -7537,6 +7567,12 @@ void CLocalWorkUnit::copyWorkUnit(IConstWorkUnit *cached, bool copyStats, bool a
         ensurePTree(p, "Application");
         p->setPropTree("Application/LibraryModule", pt);
     }
+    pt = fromP->getBranch("Application/prefix");
+    if (pt)
+    {
+        ensurePTree(p, "Application");
+        p->setPropTree("Application/prefix", pt);
+    }
 
     pt = fromP->queryBranch("Debug"); 
     if (pt)
@@ -8839,7 +8875,7 @@ bool isFilenameResolved(StringBuffer& filename)
 bool CLocalWorkUnit::getFieldUsageArray(StringArray & filenames, StringArray & columnnames, const char * clusterName) const
 {
     bool scopeLoaded = false;
-    SCMStringBuffer defaultScope;
+    StringBuffer defaultScope;
 
     Owned<IConstWUFileUsageIterator> files = getFieldUsage();
 
@@ -8878,10 +8914,9 @@ bool CLocalWorkUnit::getFieldUsageArray(StringArray & filenames, StringArray & c
                 // loading a default scope from config is expensive, and should be only done once and be reused later.
                 if (!scopeLoaded)
                 {
-                    Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(clusterName);
-                    if (!clusterInfo)
+                    //MORE: This should actually depend on the cluster that was active when the file was read!
+                    if (!resolveFilePrefix(defaultScope, clusterName))
                         throw MakeStringException(WUERR_InvalidCluster, "Unknown cluster %s", clusterName);
-                    clusterInfo->getScope(defaultScope);
                     scopeLoaded = true;
                 }
 
@@ -8969,24 +9004,20 @@ bool CLocalWorkUnit::switchThorQueue(const char *cluster, IQueueSwitcher *qs)
     CriticalBlock block(crit);
     if (qs->isAuto()&&!getAllowAutoQueueSwitch())
         return false;
-    Owned<IConstWUClusterInfo> newci = getTargetClusterInfo(cluster);
-    if (!newci) 
-        return false;
-    StringBuffer currentcluster;
-    if (!p->getProp("@clusterName",currentcluster))
-        return false;
-    Owned<IConstWUClusterInfo> curci = getTargetClusterInfo(currentcluster.str());
-    if (!curci)
-        return false;
-    SCMStringBuffer curqname;
-    curci->getThorQueue(curqname);
+
+    const char * currentcluster = queryClusterName();
     const char *wuid = p->queryName();
+    StringBuffer curqname;
+    getClusterThorQueueName(curqname, currentcluster);
+
     void *qi = qs->getQ(curqname.str(),wuid);
     if (!qi)
         return false;
+
     setClusterName(cluster);
-    SCMStringBuffer newqname;
-    newci->getThorQueue(newqname);
+
+    StringBuffer newqname;
+    getClusterThorQueueName(newqname, cluster);
     qs->putQ(newqname.str(),wuid,qi);
     return true;
 }
@@ -11364,11 +11395,9 @@ extern WORKUNIT_API void submitWorkUnit(const char *wuid, const char *username,
         throw MakeStringException(WUERR_InvalidCluster, "No target cluster specified");
     workunit->commit();
     workunit.clear();
-    Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(clusterName.str());
-    if (!clusterInfo) 
-        throw MakeStringException(WUERR_InvalidCluster, "Unknown cluster %s", clusterName.str());
-    SCMStringBuffer serverQueue;
-    clusterInfo->getServerQueue(serverQueue);
+
+    StringBuffer serverQueue;
+    getClusterEclCCServerQueueName(serverQueue, clusterName);
     assertex(serverQueue.length());
     Owned<IJobQueue> queue = createJobQueue(serverQueue.str());
     if (!queue.get()) 

+ 2 - 0
common/workunit/workunit.ipp

@@ -576,6 +576,8 @@ protected:
     void loadLibraries() const;
     void loadClusters() const;
     void checkAgentRunning(WUState & state);
+    bool hasApplicationValue(const char * application, const char * propname) const;
+    bool resolveFilePrefix(StringBuffer & prefix, const char * queue) const;
 
     // Implemented by derived classes
     virtual IPropertyTree *getGraphProgressTree() const { return NULL; };

+ 10 - 2
common/workunit/wujobq.cpp

@@ -29,10 +29,15 @@
 #include "daclient.hpp"
 #include "dasds.hpp"
 #include "dasess.hpp"
+#include "daqueue.hpp"
 
-#include "environment.hpp"
+#include "workunit.hpp"
 #include "wujobq.hpp"
 
+#ifndef _CONTAINERIZED
+#include "environment.hpp"
+#endif
+
 #ifdef _MSC_VER
 #pragma warning (disable : 4355)
 #endif
@@ -2098,8 +2103,10 @@ IJobQueue *createJobQueue(const char *name)
 extern bool WORKUNIT_API runWorkUnit(const char *wuid, const char *queueName)
 {
 #ifdef _CONTAINERIZED
-    VStringBuffer agentQueue("%s.agent", queueName);
+    StringBuffer agentQueue;
+    getClusterEclAgentQueueName(agentQueue, queueName);
 #else
+    //NB: In the non-container system the name of the roxie agent queue does not follow the convention for the containerized system
     Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(queueName);
     if (!clusterInfo.get())
         return false;
@@ -2108,6 +2115,7 @@ extern bool WORKUNIT_API runWorkUnit(const char *wuid, const char *queueName)
     if (!agentQueue.length())
         return false;
 #endif
+
     Owned<IJobQueue> queue = createJobQueue(agentQueue.str());
     if (!queue.get()) 
         throw MakeStringException(-1, "Could not create workunit queue");

+ 6 - 1
common/workunit/wujobq.hpp

@@ -21,7 +21,6 @@
 
 #include "jsocket.hpp"
 #include "dasess.hpp"
-#include "workunit.hpp"
 
 interface IJobQueueItem: extends serializable
 {
@@ -48,6 +47,12 @@ interface IJobQueueItem: extends serializable
 
 typedef IIteratorOf<IJobQueueItem> IJobQueueIterator;
 
+#ifdef WORKUNIT_EXPORTS
+    #define WORKUNIT_API DECL_EXPORT
+#else
+    #define WORKUNIT_API DECL_IMPORT
+#endif
+
 class WORKUNIT_API CJobQueueContents: public IArrayOf<IJobQueueItem>
 {  // used as a 'snapshot' of queue items
 public:

+ 1 - 0
dali/base/CMakeLists.txt

@@ -49,6 +49,7 @@ set (    INCLUDES
          dadiags.hpp
          dafdesc.hpp
          danqs.hpp
+         daqueue.hpp
          dasds.hpp
          dasess.hpp
          dasubs.hpp

+ 63 - 0
dali/base/daqueue.hpp

@@ -0,0 +1,63 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2020 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef DAQUEUE_HPP
+#define DAQUEUE_HPP
+
+#ifdef DALI_EXPORTS
+    #define da_decl DECL_EXPORT
+#else
+    #define da_decl DECL_IMPORT
+#endif
+
+#include "jstring.hpp"
+
+#define ROXIE_QUEUE_EXT ".roxie"
+#define THOR_QUEUE_EXT ".thor"
+#define ECLCCSERVER_QUEUE_EXT ".eclserver"
+#define ECLSERVER_QUEUE_EXT ECLCCSERVER_QUEUE_EXT
+#define ECLSCHEDULER_QUEUE_EXT ".eclscheduler"
+#define ECLAGENT_QUEUE_EXT ".agent"
+
+#ifndef _CONTAINERIZED
+inline StringBuffer &getClusterRoxieQueueName(StringBuffer &ret, const char *cluster)
+{
+    return ret.append(cluster).append(ROXIE_QUEUE_EXT);
+}
+#endif
+
+inline StringBuffer &getClusterEclCCServerQueueName(StringBuffer &ret, const char *cluster)
+{
+    return ret.append(cluster).append(ECLCCSERVER_QUEUE_EXT);
+}
+
+inline StringBuffer &getClusterEclServerQueueName(StringBuffer &ret, const char *cluster)
+{
+    return ret.append(cluster).append(ECLSERVER_QUEUE_EXT);
+}
+
+inline StringBuffer &getClusterEclAgentQueueName(StringBuffer &ret, const char *cluster)
+{
+    return ret.append(cluster).append(ECLAGENT_QUEUE_EXT);
+}
+
+inline StringBuffer &getClusterThorQueueName(StringBuffer &ret, const char *cluster)
+{
+    return ret.append(cluster).append(THOR_QUEUE_EXT);
+}
+
+#endif

+ 12 - 0
dockerfiles/hpcc/values.schema.json

@@ -31,6 +31,10 @@
               "type": "string",
               "description": "The name of the eclagent process"
             },
+            "prefix": {
+              "type": "string",
+              "description": "The (optional) file prefix to add to relative filenames"
+            },
             "required": [ "name" ] 
           }
         ]
@@ -187,6 +191,10 @@
           "type": "string",
           "description": "The name of the roxie process"
         },
+        "prefix": {
+          "type": "string",
+          "description": "The (optional) file prefix to add to relative filenames"
+        },
         "ports": {
           "type": "array",
           "description": "The ports to listen on",
@@ -206,6 +214,10 @@
           "type": "string",
           "description": "The name of the thor process"
         },
+        "prefix": {
+          "type": "string",
+          "description": "The (optional) file prefix to add to relative filenames"
+        },
         "eclagent": {
           "$ref": "#/definitions/eclagent"
         },

+ 4 - 1
dockerfiles/hpcc/values.yaml

@@ -54,9 +54,11 @@ dali:
 eclagent:
 - name: hthor
   replicas: 1
+  prefix: hthor
   containerPerAgent: false
 - name: roxie
   replicas: 1
+  prefix: roxie
   containerPerAgent: false
   type: roxie
     
@@ -72,7 +74,7 @@ esp:
 
 roxie:
 - name: roxie-cluster
-  disabled: false
+  prefix: roxiecluster
   ports: [9876,0]
   numChannels: 2
   serverReplicas: 1
@@ -85,6 +87,7 @@ thor:
   numSlaves: 2
   globalMemorySize: 4096
   maxActive: 2
+  prefix: thor
   eclagent:
     replicas: 1
     containerPerAgent: false

+ 3 - 1
ecl/agentexec/agentexec.cpp

@@ -21,6 +21,8 @@
 #include "jlog.hpp"
 #include "jfile.hpp"
 #include "jutil.hpp"
+#include "daqueue.hpp"
+#include "workunit.hpp"
 #include "environment.hpp"
 
 class CEclAgentExecutionServer : public CInterfaceOf<IThreadFactory>
@@ -97,7 +99,7 @@ int CEclAgentExecutionServer::run()
         Owned<IGroup> serverGroup = createIGroup(daliServers, DALI_SERVER_PORT);
         initClientProcess(serverGroup, DCR_AgentExec);
 #ifdef _CONTAINERIZED
-        queueNames.append(agentName).append(".agent");
+        getClusterEclAgentQueueName(queueNames, agentName);
 #else
         getAgentQueueNames(queueNames, agentName);
 #endif

+ 1 - 0
ecl/eclcc/eclcc.cpp

@@ -1153,6 +1153,7 @@ void EclCC::processSingleQuery(EclCompileInstance & instance,
     //The only exception would be a dll created for a one-time query.  (Currently handled by eclserver.)
     instance.wu->setCloneable(true);
 
+    recordQueueFilePrefixes(instance.wu, configuration);
     applyDebugOptions(instance.wu);
     applyApplicationOptions(instance.wu);
 

+ 2 - 1
ecl/eclccserver/eclccserver.cpp

@@ -29,6 +29,7 @@
 #include "wujobq.hpp"
 #include "dllserver.hpp"
 #include "thorplugin.hpp"
+#include "daqueue.hpp"
 #ifndef _CONTAINERIZED
 #include "dalienv.hpp"
 #endif
@@ -860,7 +861,7 @@ int main(int argc, const char *argv[])
                 IPTree &queue = queues->query();
                 if (queueNames.length())
                     queueNames.append(",");
-                queueNames.append(queue.queryProp("@name")).append(".eclserver");
+                getClusterEclCCServerQueueName(queueNames, queue.queryProp("@name"));
             }
 #else
             SCMStringBuffer queueNames;

+ 13 - 0
ecl/hqlcpp/hqlecl.cpp

@@ -868,3 +868,16 @@ void setWorkunitHash(IWorkUnit * wu, IHqlExpression * expr)
     wu->setHash(cacheCRC);
 }
 
+void recordQueueFilePrefixes(IWorkUnit * wu, IPropertyTree * configuration)
+{
+    Owned<IPropertyTreeIterator> iter = configuration->getElements("queues");
+    ForEach(*iter)
+    {
+        IPropertyTree & cur = iter->query();
+        const char * name = cur.queryProp("@name");
+        const char * prefix = cur.queryProp("@prefix");
+        if (!prefix)
+            prefix = "";
+        wu->setApplicationValue("prefix", name, prefix, true);
+    }
+}

+ 1 - 0
ecl/hqlcpp/hqlecl.hpp

@@ -59,5 +59,6 @@ extern HQLCPP_API ClusterType queryClusterType(IConstWorkUnit * wu, ClusterType
 extern HQLCPP_API IHqlExpression * extractExternalLibraries(HqlExprArray & libraries, IHqlExpression * query);
 extern HQLCPP_API unsigned getLibraryCRC(IHqlExpression * library);
 extern HQLCPP_API void setWorkunitHash(IWorkUnit * wu, IHqlExpression * expr);
+extern HQLCPP_API void recordQueueFilePrefixes(IWorkUnit * wu, IPropertyTree * configuration);
 
 #endif

+ 1 - 1
esp/services/ws_smc/ws_smcService.cpp

@@ -25,8 +25,8 @@
 #include "ws_smcService.hpp"
 #include "wshelpers.hpp"
 
-#include "dalienv.hpp"
 #include "dasds.hpp"
+#include "daqueue.hpp"
 #include "WUWrapper.hpp"
 #include "dfuwu.hpp"
 #include "exception_util.hpp"