浏览代码

Merge pull request #15283 from afishbeck/roxieLocalEnoughPlanes

HPCC-25930 Allow roxie to specify a list of acceptable storage planes

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 3 年之前
父节点
当前提交
8f3c848bee

+ 72 - 25
common/workunit/referencedfilelist.cpp

@@ -150,11 +150,17 @@ public:
     IPropertyTree *getRemoteFileTree(IUserDescriptor *user, INode *remote, const char *remotePrefix);
     IPropertyTree *getSpecifiedOrRemoteFileTree(IUserDescriptor *user, INode *remote, const char *remotePrefix);
 
+    void processLocalFileInfo(IDistributedFile *df, const StringArray &locations, const char *srcCluster, StringArray *subfiles);
     void processLocalFileInfo(IDistributedFile *df, const char *dstCluster, const char *srcCluster, StringArray *subfiles);
     void processRemoteFileTree(IPropertyTree *tree, const char *srcCluster, StringArray *subfiles);
 
+    void resolveLocal(const StringArray &locations, const char *srcCluster, IUserDescriptor *user, StringArray *subfiles);
     void resolveLocal(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, StringArray *subfiles);
+
+    void resolveRemote(IUserDescriptor *user, INode *remote, const char *remotePrefix, const StringArray &locations, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveForeign=false);
     void resolveRemote(IUserDescriptor *user, INode *remote, const char *remotePrefix, const char *dstCluster, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveForeign=false);
+
+    void resolve(const StringArray &locations, const char *srcCluster, IUserDescriptor *user, INode *remote, const char *remotePrefix, bool checkLocalFirst, StringArray *subfiles, bool trackSubFiles, bool resolveForeign=false);
     void resolve(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, INode *remote, const char *remotePrefix, bool checkLocalFirst, StringArray *subfiles, bool trackSubFiles, bool resolveForeign=false);
 
     virtual const char *getLogicalName() const {return logicalName.str();}
@@ -234,29 +240,42 @@ public:
     void addFilesFromPackage(IPropertyTree &package, const char *_daliip, const char *srcCluster, const char *_remotePrefix);
 
     virtual IReferencedFileIterator *getFiles();
-    virtual void cloneFileInfo(unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder);
+    virtual void cloneFileInfo(const char *dstCluster, unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder);
 
     virtual void cloneRelationships();
-    virtual void cloneAllInfo(unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder)
+    virtual void cloneAllInfo(const char *dstCluster, unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder)
     {
-        cloneFileInfo(updateFlags, helper, cloneSuperInfo, cloneForeign, redundancy, channelsPerNode, replicateOffset, defReplicateFolder);
+        cloneFileInfo(dstCluster, updateFlags, helper, cloneSuperInfo, cloneForeign, redundancy, channelsPerNode, replicateOffset, defReplicateFolder);
         cloneRelationships();
     }
-    virtual void resolveFiles(const char *process, const char *remoteIP, const char *_remotePrefix, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool trackSubFiles, bool resolveForeign=false);
-    void resolveSubFiles(StringArray &subfiles, bool checkLocalFirst, bool trackSubFiles, bool resolveForeign);
+    virtual void resolveFiles(const StringArray &locations, const char *remoteIP, const char *_remotePrefix, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool trackSubFiles, bool resolveForeign=false) override;
+
+    void resolveSubFiles(StringArray &subfiles, const StringArray &locations, bool checkLocalFirst, bool trackSubFiles, bool resolveForeign);
 
 public:
     Owned<IUserDescriptor> user;
     Owned<INode> remote;
     MapStringToMyClass<ReferencedFile> map;
-    StringAttr process;
     StringAttr srcCluster;
     StringAttr remotePrefix;
     bool allowForeign;
     bool allowSizeCalc;
 };
 
-void ReferencedFile::processLocalFileInfo(IDistributedFile *df, const char *dstCluster, const char *srcCluster, StringArray *subfiles)
+bool fileExistsWithinLocations(IDistributedFile *df, const StringArray &locations)
+{
+    if (!df)
+        return false;
+    ForEachItemIn(i, locations)
+    {
+        const char *name = locations.item(i);
+        if (df->findCluster(name)!=NotFound)
+            return true;
+    }
+    return false;
+}
+
+void ReferencedFile::processLocalFileInfo(IDistributedFile *df, const StringArray &locations, const char *srcCluster, StringArray *subfiles)
 {
     IDistributedSuperFile *super = df->querySuperFile();
     if (super)
@@ -279,9 +298,9 @@ void ReferencedFile::processLocalFileInfo(IDistributedFile *df, const char *dstC
     else
     {
         flags |= RefSubFile;
-        if (!dstCluster || !*dstCluster)
+        if (!locations.length())
             return;
-        if (df->findCluster(dstCluster)==NotFound)
+        if (!fileExistsWithinLocations(df, locations))
             flags |= RefFileNotOnCluster;
         if (fileSrcCluster.length())
             srcCluster=fileSrcCluster;
@@ -293,6 +312,13 @@ void ReferencedFile::processLocalFileInfo(IDistributedFile *df, const char *dstC
     }
 }
 
+void ReferencedFile::processLocalFileInfo(IDistributedFile *df, const char *dstCluster, const char *srcCluster, StringArray *subfiles)
+{
+    StringArray locations;
+    locations.append(dstCluster);
+    processLocalFileInfo(df, locations, srcCluster, subfiles);
+}
+
 void ReferencedFile::processRemoteFileTree(IPropertyTree *tree, const char *srcCluster, StringArray *subfiles)
 {
     flags |= RefFileRemote;
@@ -328,7 +354,7 @@ void ReferencedFile::processRemoteFileTree(IPropertyTree *tree, const char *srcC
 
 }
 
-void ReferencedFile::resolveLocal(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, StringArray *subfiles)
+void ReferencedFile::resolveLocal(const StringArray &locations, const char *srcCluster, IUserDescriptor *user, StringArray *subfiles)
 {
     if (flags & RefFileInPackage)
         return;
@@ -340,7 +366,7 @@ void ReferencedFile::resolveLocal(const char *dstCluster, const char *srcCluster
     reset();
     Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName.str(), user, false, false, false, nullptr, defaultPrivilegedUser);
     if(df)
-        processLocalFileInfo(df, dstCluster, srcCluster, subfiles);
+        processLocalFileInfo(df, locations, srcCluster, subfiles);
     else
     {
         flags |= RefFileNotFound;
@@ -348,6 +374,14 @@ void ReferencedFile::resolveLocal(const char *dstCluster, const char *srcCluster
     }
 }
 
+void ReferencedFile::resolveLocal(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, StringArray *subfiles)
+{
+    StringArray locations;
+    if (!isEmptyString(dstCluster))
+        locations.append(dstCluster);
+    resolveLocal(locations, srcCluster, user, subfiles);
+}
+
 IPropertyTree *ReferencedFile::getRemoteFileTree(IUserDescriptor *user, INode *remote, const char *remotePrefix)
 {
     if (!remote)
@@ -380,6 +414,13 @@ IPropertyTree *ReferencedFile::getSpecifiedOrRemoteFileTree(IUserDescriptor *use
 
 void ReferencedFile::resolveRemote(IUserDescriptor *user, INode *remote, const char *remotePrefix, const char *dstCluster, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveForeign)
 {
+    StringArray locations;
+    if (!isEmptyString(dstCluster))
+        locations.append(dstCluster);
+    resolveRemote(user, remote, remotePrefix, locations, srcCluster, checkLocalFirst, subfiles, resolveForeign);
+}
+void ReferencedFile::resolveRemote(IUserDescriptor *user, INode *remote, const char *remotePrefix, const StringArray &locations, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveForeign)
+{
     if ((flags & RefFileForeign) && !resolveForeign && !trackSubFiles)
         return;
     if (flags & RefFileInPackage)
@@ -395,7 +436,7 @@ void ReferencedFile::resolveRemote(IUserDescriptor *user, INode *remote, const c
         Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName.str(), user, false, false, false, nullptr, defaultPrivilegedUser);
         if(df)
         {
-            processLocalFileInfo(df, dstCluster, NULL, subfiles);
+            processLocalFileInfo(df, locations, NULL, subfiles);
             return;
         }
     }
@@ -407,7 +448,7 @@ void ReferencedFile::resolveRemote(IUserDescriptor *user, INode *remote, const c
     }
     else if (!checkLocalFirst && (!srcCluster || !*srcCluster)) //haven't already checked and not told to use a specific copy
     {
-        resolveLocal(dstCluster, srcCluster, user, subfiles);
+        resolveLocal(locations, srcCluster, user, subfiles);
         return;
     }
 
@@ -417,13 +458,20 @@ void ReferencedFile::resolveRemote(IUserDescriptor *user, INode *remote, const c
     DBGLOG("Remote ReferencedFile not found %s [dali=%s, remote=%s, prefix=%s]", logicalName.str(), daliip.get(), remote ? remote->endpoint().getUrlStr(dest).str() : nullptr, remotePrefix);
 }
 
-void ReferencedFile::resolve(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, INode *remote, const char *remotePrefix, bool checkLocalFirst, StringArray *subfiles, bool _trackSubFiles, bool resolveForeign)
+void ReferencedFile::resolve(const StringArray &locations, const char *srcCluster, IUserDescriptor *user, INode *remote, const char *remotePrefix, bool checkLocalFirst, StringArray *subfiles, bool _trackSubFiles, bool resolveForeign)
 {
     trackSubFiles = _trackSubFiles;
     if (daliip.length() || remote)
-        resolveRemote(user, remote, remotePrefix, dstCluster, srcCluster, checkLocalFirst, subfiles, resolveForeign);
+        resolveRemote(user, remote, remotePrefix, locations, srcCluster, checkLocalFirst, subfiles, resolveForeign);
     else
-        resolveLocal(dstCluster, srcCluster, user, subfiles);
+        resolveLocal(locations, srcCluster, user, subfiles);
+}
+
+void ReferencedFile::resolve(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, INode *remote, const char *remotePrefix, bool checkLocalFirst, StringArray *subfiles, bool _trackSubFiles, bool resolveForeign)
+{
+    StringArray locations;
+    if (!isEmptyString(dstCluster))
+        locations.append(dstCluster);
 }
 
 void ReferencedFile::cloneInfo(unsigned updateFlags, IDFUhelper *helper, IUserDescriptor *user, const char *dstCluster, const char *srcCluster, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder)
@@ -701,7 +749,7 @@ void ReferencedFileList::addFilesFromWorkUnit(IConstWorkUnit *cw)
     addFilesFromQuery(cw, NULL, NULL);
 }
 
-void ReferencedFileList::resolveSubFiles(StringArray &subfiles, bool checkLocalFirst, bool trackSubFiles, bool resolveForeign)
+void ReferencedFileList::resolveSubFiles(StringArray &subfiles, const StringArray &locations, bool checkLocalFirst, bool trackSubFiles, bool resolveForeign)
 {
     StringArray childSubFiles;
     ForEachItemIn(i, subfiles)
@@ -713,19 +761,18 @@ void ReferencedFileList::resolveSubFiles(StringArray &subfiles, bool checkLocalF
         Owned<ReferencedFile> file = new ReferencedFile(lfn, NULL, NULL, NULL, true, 0, NULL, false, allowSizeCalc);
         if (file->logicalName.length() && !map.getValue(file->getLogicalName()))
         {
-            file->resolve(process.get(), srcCluster, user, remote, remotePrefix, checkLocalFirst, &childSubFiles, trackSubFiles, resolveForeign);
+            file->resolve(locations, srcCluster, user, remote, remotePrefix, checkLocalFirst, &childSubFiles, trackSubFiles, resolveForeign);
             const char *ln = file->getLogicalName();
             // NOTE: setValue links its parameter
             map.setValue(ln, file);
         }
     }
     if (childSubFiles.length())
-        resolveSubFiles(childSubFiles, checkLocalFirst, trackSubFiles, resolveForeign);
+        resolveSubFiles(childSubFiles, locations, checkLocalFirst, trackSubFiles, resolveForeign);
 }
 
-void ReferencedFileList::resolveFiles(const char *_process, const char *remoteIP, const char *_remotePrefix, const char *_srcCluster, bool checkLocalFirst, bool expandSuperFiles, bool trackSubFiles, bool resolveForeign)
+void ReferencedFileList::resolveFiles(const StringArray &locations, const char *remoteIP, const char *_remotePrefix, const char *_srcCluster, bool checkLocalFirst, bool expandSuperFiles, bool trackSubFiles, bool resolveForeign)
 {
-    process.set(_process);
     remote.setown((remoteIP && *remoteIP) ? createINode(remoteIP, 7070) : NULL);
     srcCluster.set(_srcCluster);
     remotePrefix.set(_remotePrefix);
@@ -734,17 +781,17 @@ void ReferencedFileList::resolveFiles(const char *_process, const char *remoteIP
     {
         ReferencedFileIterator files(this);
         ForEach(files)
-            files.queryObject().resolve(process, srcCluster, user, remote, remotePrefix, checkLocalFirst, expandSuperFiles ? &subfiles : NULL, trackSubFiles, resolveForeign);
+            files.queryObject().resolve(locations, srcCluster, user, remote, remotePrefix, checkLocalFirst, expandSuperFiles ? &subfiles : NULL, trackSubFiles, resolveForeign);
     }
     if (expandSuperFiles)
-        resolveSubFiles(subfiles, checkLocalFirst, trackSubFiles, resolveForeign);
+        resolveSubFiles(subfiles, locations, checkLocalFirst, trackSubFiles, resolveForeign);
 }
 
-void ReferencedFileList::cloneFileInfo(unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder)
+void ReferencedFileList::cloneFileInfo(const char *dstCluster, unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder)
 {
     ReferencedFileIterator files(this);
     ForEach(files)
-        files.queryObject().cloneInfo(updateFlags, helper, user, process, srcCluster, cloneForeign, redundancy, channelsPerNode, replicateOffset, defReplicateFolder);
+        files.queryObject().cloneInfo(updateFlags, helper, user, dstCluster, srcCluster, cloneForeign, redundancy, channelsPerNode, replicateOffset, defReplicateFolder);
     if (cloneSuperInfo)
         ForEach(files)
             files.queryObject().cloneSuperInfo(updateFlags, this, user, remote);

+ 3 - 3
common/workunit/referencedfilelist.hpp

@@ -63,9 +63,9 @@ interface IReferencedFileList : extends IInterface
     virtual void addFiles(StringArray &files)=0;
 
     virtual IReferencedFileIterator *getFiles()=0;
-    virtual void resolveFiles(const char *process, const char *remoteIP, const char * remotePrefix, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool trackSubFiles, bool resolveForeign=false)=0;
-    virtual void cloneAllInfo(unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defRepFolder)=0;
-    virtual void cloneFileInfo(unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defRepFolder)=0;
+    virtual void resolveFiles(const StringArray &locations, const char *remoteIP, const char * remotePrefix, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool trackSubFiles, bool resolveForeign=false)=0;
+    virtual void cloneAllInfo(const char *dstCluster, unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defRepFolder)=0;
+    virtual void cloneFileInfo(const char *dstCluster, unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defRepFolder)=0;
     virtual void cloneRelationships()=0;
 };
 

+ 13 - 7
esp/services/ws_packageprocess/ws_packageprocessService.cpp

@@ -142,19 +142,23 @@ void cloneFileInfoToDali(unsigned updateFlags, StringArray &notFound, IPropertyT
 #ifdef _CONTAINERIZED
     SCMStringBuffer clusterName;
     dstInfo->getName(clusterName);
-    StringBuffer targetPlane;
-    getRoxieDefaultPlane(targetPlane, clusterName.str());
-    wufiles->resolveFiles(targetPlane, lookupDaliIp, remotePrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM)), false, false);
-    wufiles->cloneAllInfo(updateFlags, helper, true, false, 0, 1, 0, nullptr);
+    StringArray locations; //roxie won't make local copies of files on these planes
+    StringBuffer targetPlane; //roxies default plane, where files will be copied if not found in locations
+    getRoxieDirectAccessPlanes(locations, targetPlane, clusterName.str(), true);
+
+    wufiles->resolveFiles(locations, lookupDaliIp, remotePrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM)), false, false);
+    wufiles->cloneAllInfo(targetPlane, updateFlags, helper, true, false, 0, 1, 0, nullptr);
 #else
+    StringArray locations;
     SCMStringBuffer processName;
     dstInfo->getRoxieProcess(processName);
-    wufiles->resolveFiles(processName.str(), lookupDaliIp, remotePrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM)), false, false);
+    locations.append(processName.str());
+    wufiles->resolveFiles(locations, lookupDaliIp, remotePrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM)), false, false);
 
     StringBuffer defReplicateFolder;
     getConfigurationDirectory(NULL, "data2", "roxie", processName.str(), defReplicateFolder);
 
-    wufiles->cloneAllInfo(updateFlags, helper, true, false, dstInfo->getRoxieRedundancy(), dstInfo->getChannelsPerNode(), dstInfo->getRoxieReplicateOffset(), defReplicateFolder);
+    wufiles->cloneAllInfo(processName.str(), updateFlags, helper, true, false, dstInfo->getRoxieRedundancy(), dstInfo->getChannelsPerNode(), dstInfo->getRoxieReplicateOffset(), defReplicateFolder);
 #endif
 
     Owned<IReferencedFileIterator> iter = wufiles->getFiles();
@@ -1205,7 +1209,9 @@ void CWsPackageProcessEx::validatePackage(IEspContext &context, IEspValidatePack
     {
         Owned<IReferencedFileList> pmfiles = createReferencedFileList(context.queryUserId(), context.queryPassword(), true, false);
         pmfiles->addFilesFromPackageMap(mapTree);
-        pmfiles->resolveFiles(process.str(), nullptr, nullptr, nullptr, true, false, false);
+        StringArray locations;
+        locations.append(process.str());
+        pmfiles->resolveFiles(locations, nullptr, nullptr, nullptr, true, false, false);
         Owned<IReferencedFileIterator> files = pmfiles->getFiles();
         ForEach(*files)
         {

+ 17 - 10
esp/services/ws_workunits/ws_workunitsQuerySets.cpp

@@ -366,7 +366,9 @@ void QueryFilesInUse::loadTarget(IPropertyTree *t, const char *target, unsigned
         wufiles->addFilesFromQuery(cw, pm, queryid);
         if (aborting)
             return;
-        wufiles->resolveFiles(process.str(), NULL, NULL, NULL, true, true, false, false);
+        StringArray locations;
+        locations.append(process.str());
+        wufiles->resolveFiles(locations, NULL, NULL, NULL, true, true, false, false);
 
         Owned<IReferencedFileIterator> files = wufiles->getFiles();
         ForEach(*files)
@@ -753,21 +755,23 @@ public:
             queryname = queryid.append(queryname).append(".0").str(); //prepublish dummy version number to support fuzzy match like queries="myquery.*" in package
         files->addFilesFromQuery(cw, pm, queryname);
 
+        StringArray locations;
 #ifdef _CONTAINERIZED
         StringBuffer targetPlane;
-        getRoxieDefaultPlane(targetPlane, target);
+        getRoxieDirectAccessPlanes(locations, targetPlane, target, true);
         const char * targetPlaneOrGroup = targetPlane;
 #else
         const char * targetPlaneOrGroup = process;
+        locations.append(targetPlaneOrGroup);
 #endif
-        files->resolveFiles(targetPlaneOrGroup, remoteIP, remotePrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES)), true, false, true);
+        files->resolveFiles(locations, remoteIP, remotePrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES)), true, false, true);
         Owned<IDFUhelper> helper = createIDFUhelper();
 #ifdef _CONTAINERIZED
-        files->cloneAllInfo(updateFlags, helper, true, true, 0, 1, 0, nullptr);
+        files->cloneAllInfo(targetPlaneOrGroup, updateFlags, helper, true, true, 0, 1, 0, nullptr);
 #else
         StringBuffer defReplicateFolder;
         getConfigurationDirectory(NULL, "data2", "roxie", process.str(), defReplicateFolder);
-        files->cloneAllInfo(updateFlags, helper, true, true, clusterInfo->getRoxieRedundancy(), clusterInfo->getChannelsPerNode(), clusterInfo->getRoxieReplicateOffset(), defReplicateFolder);
+        files->cloneAllInfo(targetPlaneOrGroup, updateFlags, helper, true, true, clusterInfo->getRoxieRedundancy(), clusterInfo->getChannelsPerNode(), clusterInfo->getRoxieReplicateOffset(), defReplicateFolder);
 #endif
     }
 
@@ -2405,7 +2409,9 @@ bool CWsWorkunitsEx::getQueryFiles(IEspContext &context, const char* wuid, const
         Owned<IReferencedFileList> wufiles = createReferencedFileList(context.queryUserId(),
             context.queryPassword(), true, true);
         wufiles->addFilesFromQuery(cw, (ps) ? ps->queryActiveMap(target) : NULL, query);
-        wufiles->resolveFiles(process.str(), NULL, NULL, NULL, true, true, true, true);
+        StringArray locations;
+        locations.append(process.str());
+        wufiles->resolveFiles(locations, NULL, NULL, NULL, true, true, true, true);
         Owned<IReferencedFileIterator> refFileItr = wufiles->getFiles();
         ForEach(*refFileItr)
         {
@@ -2962,7 +2968,7 @@ public:
         if (isContainerized())
         {
             StringAttrBuilder builder(process);
-            getRoxieDefaultPlane(builder, target);
+            getRoxieDirectAccessPlanes(locations, builder, target, true);
         }
         else
             process.set(destProcess);
@@ -2972,18 +2978,18 @@ public:
     {
         if (cloneFilesEnabled)
         {
-            wufiles->resolveFiles(process, dfsIP, srcPrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM)), true, false, true);
+            wufiles->resolveFiles(locations, dfsIP, srcPrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM)), true, false, true);
             Owned<IDFUhelper> helper = createIDFUhelper();
             Owned <IConstWUClusterInfo> cl = getWUClusterInfoByName(target);
             if (cl)
             {
 #ifdef _CONTAINERIZED
-                wufiles->cloneAllInfo(updateFlags, helper, true, true, 0, 1, 0, nullptr);
+                wufiles->cloneAllInfo(process.str(), updateFlags, helper, true, true, 0, 1, 0, nullptr);
 #else
                 SCMStringBuffer process;
                 StringBuffer defReplicateFolder;
                 getConfigurationDirectory(NULL, "data2", "roxie", cl->getRoxieProcess(process).str(), defReplicateFolder);
-                wufiles->cloneAllInfo(updateFlags, helper, true, true, cl->getRoxieRedundancy(), cl->getChannelsPerNode(), cl->getRoxieReplicateOffset(), defReplicateFolder);
+                wufiles->cloneAllInfo(process.str(), updateFlags, helper, true, true, cl->getRoxieRedundancy(), cl->getChannelsPerNode(), cl->getRoxieReplicateOffset(), defReplicateFolder);
 #endif
             }
         }
@@ -3010,6 +3016,7 @@ private:
     StringAttr queryDirectory;
     bool cloneFilesEnabled = false;
     unsigned updateFlags = 0;
+    StringArray locations;
 
 public:
     StringArray existingQueryIds;

+ 1 - 0
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -458,6 +458,7 @@ void CWsWorkunitsEx::init(IPropertyTree *cfg, const char *process, const char *s
 
     const char *name = cfg->queryProp("Software/EspProcess/@name");
     getConfigurationDirectory(directories, "query", "esp", name ? name : "esp", queryDirectory);
+
     recursiveCreateDirectory(queryDirectory.str());
 
     dataCache.setown(new DataCache(DATA_SIZE));

+ 33 - 5
esp/smc/SMCLib/TpContainer.cpp

@@ -791,12 +791,8 @@ bool getSashaServiceEP(SocketEndpoint &serviceEndpoint, const char *service, boo
     return true;
 }
 
-StringBuffer & getRoxieDefaultPlane(StringBuffer & plane, const char * roxieName)
+static StringBuffer & getRoxieDefaultPlane(IPropertyTree * queue, StringBuffer & plane, const char * roxieName)
 {
-    Owned<IPropertyTree> queue = getContainerClusterConfig(roxieName);
-    if (!queue)
-        throw makeStringExceptionV(ECLWATCH_INVALID_CLUSTER_NAME, "Unknown queue name %s", roxieName);
-
     if (queue->getProp("@dataPlane", plane))
         return plane;
 
@@ -806,3 +802,35 @@ StringBuffer & getRoxieDefaultPlane(StringBuffer & plane, const char * roxieName
         throwUnexpectedX("No default data plane defined");
     return plane.append(dataPlanes->query().queryProp("@name"));
 }
+
+StringBuffer & getRoxieDefaultPlane(StringBuffer & plane, const char * roxieName)
+{
+    Owned<IPropertyTree> queue = getContainerClusterConfig(roxieName);
+    if (!queue)
+        throw makeStringExceptionV(ECLWATCH_INVALID_CLUSTER_NAME, "Unknown queue name %s", roxieName);
+
+    return getRoxieDefaultPlane(queue, plane, roxieName);
+}
+
+//By default roxie will copy files from planes other than it's default, if a plane is added to directAccessPlanes
+//  roxie will continue to read the file directly without making a copy
+StringArray & getRoxieDirectAccessPlanes(StringArray & planes, StringBuffer &defaultPlane, const char * roxieName, bool includeDefaultPlane)
+{
+    Owned<IPropertyTree> queue = getContainerClusterConfig(roxieName);
+    if (!queue)
+        throw makeStringExceptionV(ECLWATCH_INVALID_CLUSTER_NAME, "Unknown queue name %s", roxieName);
+
+    getRoxieDefaultPlane(queue, defaultPlane, roxieName);
+    if (defaultPlane.length() && includeDefaultPlane)
+        planes.appendUniq(defaultPlane);
+
+    Owned<IPropertyTreeIterator> iter = queue->getElements("directAccessPlanes");
+    ForEach(*iter)
+    {
+        const char *plane = iter->query().queryProp("");
+        if (!isEmptyString(plane))
+            planes.appendUniq(plane);
+    }
+
+    return planes;
+}

+ 7 - 0
esp/smc/SMCLib/TpWrapper.cpp

@@ -2176,3 +2176,10 @@ StringBuffer & getRoxieDefaultPlane(StringBuffer & plane, const char * roxieName
     return plane;
 }
 
+StringArray & getRoxieDirectAccessPlanes(StringArray & planes, StringBuffer &defaultPlane, const char * roxieName, bool includeDefaultPlane)
+{
+    getRoxieDefaultPlane(defaultPlane, roxieName);
+    if (defaultPlane.length() && includeDefaultPlane)
+        planes.append(defaultPlane);
+    return planes;
+}

+ 2 - 0
esp/smc/SMCLib/TpWrapper.hpp

@@ -223,6 +223,8 @@ extern TPWRAPPER_API bool getSashaService(StringBuffer &serviceAddress, const ch
 extern TPWRAPPER_API bool getSashaServiceEP(SocketEndpoint &serviceEndpoint, const char *service, bool failIfNotFound);
 
 extern TPWRAPPER_API StringBuffer & getRoxieDefaultPlane(StringBuffer & plane, const char * roxieName);
+extern TPWRAPPER_API StringArray & getRoxieDirectAccessPlanes(StringArray & planes, StringBuffer &defaultPlane, const char * roxieName, bool includeDefaultPlane);
+
 extern TPWRAPPER_API bool validateDataPlaneName(const char *remoteDali, const char * name);
 extern TPWRAPPER_API bool matchNetAddressRequest(const char* netAddressReg, bool ipReq, IConstTpMachine& tpMachine);
 

+ 3 - 0
helm/hpcc/templates/_helpers.tpl

@@ -761,6 +761,9 @@ Generate instance queue names
   prefix: {{ .prefix | default "null" }}
   queriesOnly: true
   dataPlane: {{ .dataPlane | default (include "hpcc.getDefaultDataPlane" $) }}
+  {{- if hasKey . "directAccessPlanes" }}
+  directAccessPlanes: {{ .directAccessPlanes }}
+  {{- end }}
  {{- end }}
 {{ end -}}
 {{- range $.Values.thor -}}

+ 2 - 2
helm/hpcc/values.schema.json

@@ -1014,8 +1014,8 @@
           "description": "The default storage plane to write data files to",
           "type": "string"
         },
-        "storagePlanes": {
-          "description": "A list of storage planes suitable for storing roxie data",
+        "directAccessPlanes": {
+          "description": "A list of storage planes suitable for roxie to read from directly and not have roxie copy the data to roxie's default plane",
           "type": "array",
           "items": { "type": "string" }
         },

+ 1 - 0
helm/hpcc/values.yaml

@@ -513,6 +513,7 @@ roxie:
   #maxStartupTime: 600     # Maximum time to wait for startup to complete before failing
   topoServer:
     replicas: 1
+  #directAccessPlanes: []  #add direct access planes that roxie will read from without copying the data to its default data plane
 
 thor:
 - name: thor

+ 83 - 8
roxie/ccd/ccdfile.cpp

@@ -552,10 +552,20 @@ static int getClusterPriority(const char *clusterName)
     return priority ? *priority : 100;
 }
 
-static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations, const char *localFileName, const char *fromCluster, bool includeFromCluster)
+static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations, const char *localFileName, const StringArray &fromClusters, bool includeFromCluster)
 {
     if (traceRemoteFiles)
-        DBGLOG("appendRemoteLocations lfn=%s fromCluster=%s, includeFromCluster=%s", nullText(localFileName), nullText(fromCluster), boolToStr(includeFromCluster));
+    {
+        StringBuffer s;
+        ForEachItemIn(ifc, fromClusters)
+        {
+            const char *fromCluster = fromClusters.item(ifc);
+            if (s.length())
+                s.append('/');
+            s.append(fromCluster);
+        }
+        DBGLOG("appendRemoteLocations lfn=%s fromCluster=%s, includeFromCluster=%s", nullText(localFileName), s.str(), boolToStr(includeFromCluster));
+    }
     IFileDescriptor &fdesc = pdesc->queryOwner();
     unsigned numCopies = pdesc->numCopies();
     unsigned lastClusterNo = (unsigned) -1;
@@ -570,9 +580,9 @@ static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations
         fdesc.getClusterGroupName(clusterNo, clusterName);
         if (traceRemoteFiles)
            DBGLOG("appendRemoteLocations found entry in cluster %s", clusterName.str());
-        if (fromCluster && *fromCluster)
+        if (fromClusters.length())
         {
-            bool matches = strieq(clusterName.str(), fromCluster);
+            bool matches = fromClusters.contains(clusterName);
             if (matches!=includeFromCluster)
                 continue;
         }
@@ -619,6 +629,14 @@ static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations
     }
 }
 
+static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations, const char *localFileName, const char *fromCluster, bool includeFromCluster)
+{
+    StringArray fromClusters;
+    if (!isEmptyString(fromCluster))
+        fromClusters.append(fromCluster);
+    appendRemoteLocations(pdesc, locations, localFileName, fromClusters, includeFromCluster);
+}
+
 //----------------------------------------------------------------------------------------------
 
 typedef StringArray *StringArrayPtr;
@@ -987,6 +1005,7 @@ class CRoxieFileCache : implements IRoxieFileCache, implements ICopyFileProgress
 
     ILazyFileIO *openFile(const char *lfn, unsigned partNo, unsigned channel, const char *localLocation,
                            IPartDescriptor *pdesc,
+                           const StringArray &localEnoughLocationInfo,
                            const StringArray &remoteLocationInfo,
                            offset_t size, const CDateTime &modified)
     {
@@ -1010,7 +1029,35 @@ class CRoxieFileCache : implements IRoxieFileCache, implements ICopyFileProgress
         {
             bool addedOne = false;
 
-#ifndef _CONTAINERIZED
+#ifdef _CONTAINERIZED
+            // put the localEnoughLocations next in the list
+            ForEachItemIn(plane_idx, localEnoughLocationInfo)
+            {
+                try
+                {
+                    const char *localEnoughName = localEnoughLocationInfo.item(plane_idx);
+                    Owned<IFile> localEnoughFile = createIFile(localEnoughName);
+                    RoxieFileStatus status = fileUpToDate(localEnoughFile, size, modified, isCompressed);
+                    if (status==FileIsValid)
+                    {
+                        if (miscDebugTraceLevel > 5)
+                            DBGLOG("adding local enough location %s", localEnoughName);
+                        ret->addSource(localEnoughFile.getClear());
+                        addedOne = true;
+                        //do not set ret->setRemote(true) these locations are treated as if found locally, and not copied to the default plane
+                    }
+                    else if (localEnoughFile->exists() && !ignoreOrphans)  // Implies local dali and local enough file out of sync
+                        throw MakeStringException(ROXIE_FILE_ERROR, "Direct access (local enough) file %s does not match DFS information", localEnoughName);
+                    else if (miscDebugTraceLevel > 10)
+                        DBGLOG("Checked local enough data plane location %s, status=%d", localEnoughName, (int) status);
+                }
+                catch (IException *E)
+                {
+                    EXCLOG(MCoperatorError, E, "While creating local enough file reference");
+                    E->Release();
+                }
+            }
+#else
             // put the peerRoxieLocations next in the list
             StringArray localLocations;
             if (selfTestMode)
@@ -1628,6 +1675,7 @@ public:
 
     virtual ILazyFileIO *lookupFile(const char *lfn, RoxieFileType fileType,
                                      IPartDescriptor *pdesc, unsigned numParts, unsigned channel,
+                                     const StringArray &localEnoughLocationInfo,
                                      const StringArray &deployedLocationInfo, bool startFileCopy)
     {
         unsigned replicationLevel = getReplicationLevel(channel);
@@ -1706,7 +1754,7 @@ public:
                     return f.getClear();
             }
 
-            ret.setown(openFile(lfn, partNo, channel, localLocation, pdesc, deployedLocationInfo, dfsSize, dfsDate));
+            ret.setown(openFile(lfn, partNo, channel, localLocation, pdesc, localEnoughLocationInfo, deployedLocationInfo, dfsSize, dfsDate));
 
             if (startFileCopy)
             {
@@ -2064,6 +2112,21 @@ public:
     }
 };
 
+#ifdef _CONTAINERIZED
+static bool getDirectAccessStoragePlanes(StringArray &planes)
+{
+    Owned<IPropertyTreeIterator> iter = getComponentConfigSP()->getElements("directAccessPlanes");
+    ForEach(*iter)
+    {
+        const char *plane = iter->query().queryProp("");
+        if (!isEmptyString(plane))
+            planes.appendUniq(plane);
+    }
+
+    return !planes.empty();
+}
+#endif
+
 ILazyFileIO *createPhysicalFile(const char *id, IPartDescriptor *pdesc, IPartDescriptor *remotePDesc, RoxieFileType fileType, int numParts, bool startCopy, unsigned channel)
 {
 #ifdef _CONTAINERIZED
@@ -2071,6 +2134,7 @@ ILazyFileIO *createPhysicalFile(const char *id, IPartDescriptor *pdesc, IPartDes
 #else
     const char *myCluster = roxieName.str();
 #endif
+    StringArray localEnoughLocations; //files from these locations won't be copied to the default plane
     StringArray remoteLocations;
     const char *peerCluster = pdesc->queryOwner().queryProperties().queryProp("@cloneFromPeerCluster");
     if (peerCluster)
@@ -2079,11 +2143,21 @@ ILazyFileIO *createPhysicalFile(const char *id, IPartDescriptor *pdesc, IPartDes
             appendRemoteLocations(pdesc, remoteLocations, NULL, peerCluster, true);  // Add only from specified cluster
     }
     else
+    {
+#ifdef _CONTAINERIZED
+        StringArray localEnoughPlanes;
+        if (getDirectAccessStoragePlanes(localEnoughPlanes))
+            appendRemoteLocations(pdesc, localEnoughLocations, NULL, localEnoughPlanes, true);
+        localEnoughPlanes.append(myCluster);
+        appendRemoteLocations(pdesc, remoteLocations, NULL, localEnoughPlanes, false);      // Add from any plane on same dali, other than default or loacal enough
+#else
         appendRemoteLocations(pdesc, remoteLocations, NULL, myCluster, false);      // Add from any cluster on same dali, other than mine
+#endif
+    }
     if (remotePDesc)
         appendRemoteLocations(remotePDesc, remoteLocations, NULL, NULL, false);    // Then any remote on remote dali
 
-    return queryFileCache().lookupFile(id, fileType, pdesc, numParts, channel, remoteLocations, startCopy);
+    return queryFileCache().lookupFile(id, fileType, pdesc, numParts, channel, localEnoughLocations, remoteLocations, startCopy);
 }
 
 //====================================================================================================
@@ -3712,6 +3786,7 @@ protected:
         remove("test.local");
         remove("test.remote");
         remove("test.buddy");
+        StringArray localEnough;
         StringArray remotes;
         DummyPartDescriptor pdesc;
         CDateTime dummy;
@@ -3725,7 +3800,7 @@ protected:
         close(f);
         CRoxieFileCache &cache = static_cast<CRoxieFileCache &>(queryFileCache());
 
-        Owned<ILazyFileIO> io = cache.openFile("test.local", 0, 0, "test.local", NULL, remotes, sizeof(int), dummy);
+        Owned<ILazyFileIO> io = cache.openFile("test.local", 0, 0, "test.local", NULL, localEnough, remotes, sizeof(int), dummy);
         CPPUNIT_ASSERT(io != NULL);
 
         // Reading it should read 1

+ 1 - 1
roxie/ccd/ccdfile.hpp

@@ -61,7 +61,7 @@ interface ILazyFileIO : extends IFileIO
 interface IRoxieFileCache : extends IInterface
 {
     virtual ILazyFileIO *lookupFile(const char *lfn, RoxieFileType fileType, IPartDescriptor *pdesc, unsigned numParts,
-                                      unsigned channel, const StringArray &deployedLocationInfo, bool startFileCopy) = 0;
+                                      unsigned channel, const StringArray &localEnoughLocationInfo, const StringArray &deployedLocationInfo, bool startFileCopy) = 0;
     virtual RoxieFileStatus fileUpToDate(IFile *f, offset_t size, const CDateTime &modified, bool isCompressed, bool autoDisconnect=true) = 0;
     virtual int numFilesToCopy() = 0;
     virtual void closeExpired(bool remote) = 0;