Jelajahi Sumber

Merge pull request #3296 from afishbeck/roxie_clone_files3

HPCC-2704 Setup and copy files to roxie when copying/publishing queries

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 tahun lalu
induk
melakukan
97145603c6

+ 473 - 0
common/roxiemanager/referencedfilelist.cpp

@@ -0,0 +1,473 @@
+/*##############################################################################
+
+    Copyright (C) 2012 HPCC Systems.
+
+    All rights reserved. This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU Affero General Public License as
+    published by the Free Software Foundation, either version 3 of the
+    License, or (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Affero General Public License for more details.
+
+    You should have received a copy of the GNU Affero General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+############################################################################## */
+
+#include "referencedfilelist.hpp"
+
+#include "jptree.hpp"
+#include "workunit.hpp"
+#include "eclhelper.hpp"
+#include "dautils.hpp"
+#include "dadfs.hpp"
+#include "dasess.hpp"
+
+#define WF_LOOKUP_TIMEOUT (1000*15)  // 15 seconds
+
+bool getIsOpt(const IPropertyTree &graphNode)
+{
+    if (graphNode.hasProp("att[@name='_isOpt']"))
+        return graphNode.getPropBool("att[@name='_isOpt']/@value", false);
+    else
+        return graphNode.getPropBool("att[@name='_isIndexOpt']/@value", false);
+}
+
+const char *skipForeign(const char *name, StringBuffer *ip=NULL)
+{
+    if (*name=='~')
+        name++;
+    const char *d1 = strstr(name, "::");
+     if (d1)
+     {
+        StringBuffer cmp;
+        if (strieq("foreign", cmp.append(d1-name, name).trim().str()))
+        {
+            // foreign scope - need to strip off the ip and port
+            d1 += 2;  // skip ::
+
+            const char *d2 = strstr(d1,"::");
+            if (d2)
+            {
+                if (ip)
+                    ip->append(d2-d1, d1).trim();
+                d2 += 2;
+                while (*d2 == ' ')
+                    d2++;
+
+                name = d2;
+            }
+        }
+    }
+    return name;
+}
+
+class ReferencedFile : public CInterface, implements IReferencedFile
+{
+public:
+    IMPLEMENT_IINTERFACE;
+    ReferencedFile(const char *name, bool isSubFile=false) : flags(0)
+    {
+        StringBuffer ip;
+        logicalName.set(skipForeign(name, &ip));
+        if (ip.length())
+            foreignNode.setown(createINode(ip.str()));
+        if (isSubFile)
+            flags |= RefSubFile;
+    }
+
+    void reset()
+    {
+        flags &= RefSubFile;
+    }
+    IPropertyTree *getForeignOrRemoteFileTree(IUserDescriptor *user, INode *remote);
+
+    void processLocalFileInfo(IDistributedFile *df, const char *cluster, StringArray *subfiles);
+    void processRemoteFileTree(IPropertyTree *tree, bool foreign, StringArray *subfiles);
+
+    void resolveLocal(const char *cluster, IUserDescriptor *user, StringArray *subfiles);
+    void resolveRemote(IUserDescriptor *user, INode *remote, const char *cluster, bool checkLocalFirst, StringArray *subfiles);
+    void resolve(const char *cluster, IUserDescriptor *user, INode *remote, bool checkLocalFirst, StringArray *subfiles);
+
+    virtual const char *getLogicalName() const {return logicalName.get();}
+    virtual unsigned getFlags() const {return flags;}
+    virtual const SocketEndpoint &getForeignIP() const {return foreignNode->endpoint();}
+    virtual void cloneInfo(IDFUhelper *helper, IUserDescriptor *user, INode *remote, const char *cluster, bool overwrite=false);
+    void cloneSuperInfo(IUserDescriptor *user, INode *remote, bool overwrite);
+
+public:
+    StringAttr logicalName;
+    Owned<INode> foreignNode;
+
+    unsigned flags;
+};
+
+void ReferencedFile::processLocalFileInfo(IDistributedFile *df, const char *cluster, StringArray *subfiles)
+{
+    IDistributedSuperFile *super = df->querySuperFile();
+    if (super)
+    {
+        flags |= RefFileSuper;
+        if (subfiles)
+        {
+            Owned<IDistributedFileIterator> it = super->getSubFileIterator();
+            ForEach(*it)
+            {
+                IDistributedFile &sub = it->query();
+                StringBuffer name;
+                sub.getLogicalName(name);
+                subfiles->append(name.str());
+            }
+        }
+    }
+    else
+    {
+        if (!cluster || !*cluster)
+            return;
+        if (df->findCluster(cluster)==NotFound)
+            flags |= RefFileNotOnCluster;
+    }
+}
+
+void ReferencedFile::processRemoteFileTree(IPropertyTree *tree, bool foreign, StringArray *subfiles)
+{
+    flags |= RefFileRemote;
+    if (foreign)
+        flags |= RefFileForeign;
+    if (streq(tree->queryName(), queryDfsXmlBranchName(DXB_SuperFile)))
+    {
+        flags |= RefFileSuper;
+        if (subfiles)
+        {
+            Owned<IPropertyTreeIterator> it = tree->getElements("SubFile");
+            ForEach(*it)
+                subfiles->append(it->query().queryProp("@name"));
+        }
+    }
+
+}
+
+void ReferencedFile::resolveLocal(const char *cluster, IUserDescriptor *user, StringArray *subfiles)
+{
+    reset();
+    Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName.get(), user);
+    if(df)
+        processLocalFileInfo(df, cluster, subfiles);
+    else
+        flags |= RefFileNotFound;
+}
+
+IPropertyTree *ReferencedFile::getForeignOrRemoteFileTree(IUserDescriptor *user, INode *remote)
+{
+    IDistributedFileDirectory &dir = queryDistributedFileDirectory();
+    Owned<IPropertyTree> tree;
+    if (foreignNode)
+        tree.setown(dir.getFileTree(logicalName.get(), foreignNode, user, WF_LOOKUP_TIMEOUT, false, false));
+    if (!tree && remote)
+        tree.setown(dir.getFileTree(logicalName.get(), remote, user, WF_LOOKUP_TIMEOUT, false, false));
+    return tree.getClear();
+}
+
+void ReferencedFile::resolveRemote(IUserDescriptor *user, INode *remote, const char *cluster, bool checkLocalFirst, StringArray *subfiles)
+{
+    reset();
+    if (checkLocalFirst)
+    {
+        Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName.get(), user);
+        if(df)
+        {
+            processLocalFileInfo(df, cluster, subfiles);
+            return;
+        }
+    }
+    Owned<IPropertyTree> tree = getForeignOrRemoteFileTree(user, remote);
+    if (tree)
+        processRemoteFileTree(tree, false, subfiles);
+    else
+        flags |= RefFileNotFound;
+}
+
+void ReferencedFile::resolve(const char *cluster, IUserDescriptor *user, INode *remote, bool checkLocalFirst, StringArray *subfiles)
+{
+    if (foreignNode || remote)
+        resolveRemote(user, remote, cluster, checkLocalFirst, subfiles);
+    else
+        resolveLocal(cluster, user, subfiles);
+}
+
+void ReferencedFile::cloneInfo(IDFUhelper *helper, IUserDescriptor *user, INode *remote, const char *cluster, bool overwrite)
+{
+    if (flags & RefFileSuper)
+        return;
+    if (!(flags & (RefFileRemote | RefFileForeign | RefFileNotOnCluster)))
+        return;
+
+    StringBuffer addr;
+    if (flags & RefFileForeign)
+        foreignNode->endpoint().getUrlStr(addr);
+    else if (remote)
+        remote->endpoint().getUrlStr(addr);
+
+    try
+    {
+        helper->createSingleFileClone(logicalName.get(), logicalName.get(), cluster,
+            DFUcpdm_c_replicated_by_d, true, NULL, user, addr.str(), NULL, overwrite, false);
+    }
+    catch (IException *e)
+    {
+        flags |= RefFileCopyInfoFailed;
+        DBGLOG(e);
+        e->Release();
+    }
+    catch (...)
+    {
+        flags |= RefFileCopyInfoFailed;
+        DBGLOG("Unknown error copying file info for %s, from %s", logicalName.sget(), addr.str());
+    }
+}
+
+void ReferencedFile::cloneSuperInfo(IUserDescriptor *user, INode *remote, bool overwrite)
+{
+    if (!(flags & RefFileSuper) || !(flags & RefFileRemote))
+        return;
+
+    try
+    {
+        Owned<IPropertyTree> tree = getForeignOrRemoteFileTree(user, remote);
+        if (!tree)
+            return;
+
+        IDistributedFileDirectory &dir = queryDistributedFileDirectory();
+        Owned<IDistributedFile> df = dir.lookup(logicalName.get(), user);
+        if(df)
+        {
+            if (!overwrite)
+                return;
+            df->detach();
+            df.clear();
+        }
+
+        Owned<IDistributedSuperFile> superfile = dir.createSuperFile(logicalName.get(), true, false, user);
+        Owned<IPropertyTreeIterator> subfiles = tree->getElements("SubFile");
+        ForEach(*subfiles)
+        {
+            const char *name = subfiles->query().queryProp("@name");
+            if (name && *name)
+                superfile->addSubFile(name, false, NULL, false);
+        }
+    }
+    catch (IException *e)
+    {
+        flags |= RefFileCopyInfoFailed;
+        DBGLOG(e);
+        e->Release();
+    }
+    catch (...)
+    {
+        flags |= RefFileCopyInfoFailed;
+        DBGLOG("Unknown error copying superfile info for %s", logicalName.get());
+    }
+}
+
+class ReferencedFileList : public CInterface, implements IReferencedFileList
+{
+public:
+    IMPLEMENT_IINTERFACE;
+    ReferencedFileList(const char *username, const char *pw)
+    {
+        if (username && pw)
+        {
+            user.setown(createUserDescriptor());
+            user->set(username, pw);
+        }
+    }
+
+    virtual void addFile(const char *ln);
+    virtual void addFiles(StringArray &files);
+    virtual void addFilesFromWorkUnit(IConstWorkUnit *cw);
+
+    virtual IReferencedFileIterator *getFiles();
+    virtual void cloneFileInfo(bool overwrite, bool cloneSuperInfo);
+    virtual void cloneRelationships();
+    virtual void cloneAllInfo(bool overwrite, bool cloneSuperInfo)
+    {
+        cloneFileInfo(overwrite, cloneSuperInfo);
+        cloneRelationships();
+    }
+    virtual void resolveFiles(const char *process, const char *remoteIP, bool checkLocalFirst, bool addSubFiles);
+    void resolveSubFiles(StringArray &subfiles, bool checkLocalFirst);
+
+public:
+    Owned<IUserDescriptor> user;
+    Owned<INode> remote;
+    MapStringToMyClass<ReferencedFile> map;
+    StringAttr process;
+};
+
+class ReferencedFileIterator : public CInterface, implements IReferencedFileIterator
+{
+public:
+    IMPLEMENT_IINTERFACE;
+    ReferencedFileIterator(ReferencedFileList *_list)
+    {
+        list.set(_list);
+        iter.setown(new HashIterator(list->map));
+    }
+
+    virtual bool first()
+    {
+        return iter->first();
+    }
+
+    virtual bool next()
+    {
+        return iter->next();
+    }
+    virtual bool isValid()
+    {
+        return iter->isValid();
+    }
+    virtual IReferencedFile  & query()
+    {
+        return *list->map.mapToValue(&iter->query());
+    }
+
+    virtual ReferencedFile  & queryObject()
+    {
+        return *(list->map.mapToValue(&iter->query()));
+    }
+
+public:
+    Owned<ReferencedFileList> list;
+    Owned<HashIterator> iter;
+};
+
+void ReferencedFileList::addFile(const char *ln)
+{
+    Owned<ReferencedFile> file = new ReferencedFile(ln);
+    if (file->logicalName.length() && !map.getValue(file->getLogicalName()))
+    {
+        const char *refln = file->getLogicalName();
+        map.setValue(refln, file.getClear());
+    }
+}
+
+void ReferencedFileList::addFiles(StringArray &files)
+{
+    ForEachItemIn(i, files)
+        addFile(files.item(i));
+}
+
+void ReferencedFileList::addFilesFromWorkUnit(IConstWorkUnit *cw)
+{
+    Owned<IConstWUGraphIterator> graphs = &cw->getGraphs(GraphTypeActivities);
+    ForEach(*graphs)
+    {
+        Owned <IPropertyTree> xgmml = graphs->query().getXGMMLTree(false);
+        Owned<IPropertyTreeIterator> iter = xgmml->getElements("//node[att/@name='_fileName']");
+        ForEach(*iter)
+        {
+            IPropertyTree &node = iter->query();
+            const char *logicalName = node.queryProp("att[@name='_fileName']/@value");
+            if (!logicalName)
+                continue;
+            ThorActivityKind kind = (ThorActivityKind) node.getPropInt("att[@name='_kind']/@value", TAKnone);
+            //not likely to be part of roxie queries, but for forward compatibility:
+            if(kind==TAKdiskwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite)
+                continue;
+            if (node.getPropBool("att[@name='_isSpill']/@value") ||
+                node.getPropBool("att[@name='_isTransformSpill']/@value"))
+                continue;
+            addFile(logicalName);
+        }
+    }
+}
+
+void ReferencedFileList::resolveSubFiles(StringArray &subfiles, bool checkLocalFirst)
+{
+    StringArray childSubFiles;
+    ForEachItemIn(i, subfiles)
+    {
+        Owned<ReferencedFile> file = new ReferencedFile(subfiles.item(i), true);
+        if (file->logicalName.length() && !map.getValue(file->getLogicalName()))
+        {
+            file->resolve(process.get(), user, remote, checkLocalFirst, &childSubFiles);
+            const char *ln = file->getLogicalName();
+            map.setValue(ln, file.getClear());
+        }
+    }
+    if (childSubFiles.length())
+        resolveSubFiles(childSubFiles, checkLocalFirst);
+}
+
+void ReferencedFileList::resolveFiles(const char *_process, const char *remoteIP, bool checkLocalFirst, bool expandSuperFiles)
+{
+    process.set(_process);
+    remote.setown((remoteIP && *remoteIP) ? createINode(remoteIP, 7070) : NULL);
+    StringArray subfiles;
+    {
+        ReferencedFileIterator files(this);
+        ForEach(files)
+            files.queryObject().resolve(process, user, remote, checkLocalFirst, expandSuperFiles ? &subfiles : NULL);
+    }
+    if (expandSuperFiles)
+        resolveSubFiles(subfiles, checkLocalFirst);
+}
+
+void ReferencedFileList::cloneFileInfo(bool overwrite, bool cloneSuperInfo)
+{
+    Owned<IDFUhelper> helper = createIDFUhelper();
+    ReferencedFileIterator files(this);
+    ForEach(files)
+        files.queryObject().cloneInfo(helper, user, remote, process, overwrite);
+    if (cloneSuperInfo)
+        ForEach(files)
+            files.queryObject().cloneSuperInfo(user, remote, overwrite);
+}
+
+void ReferencedFileList::cloneRelationships()
+{
+    if (!remote || remote->endpoint().isNull())
+        return;
+
+    StringBuffer addr;
+    remote->endpoint().getUrlStr(addr);
+    IDistributedFileDirectory &dir = queryDistributedFileDirectory();
+    ReferencedFileIterator files(this);
+    ForEach(files)
+    {
+        ReferencedFile &file = files.queryObject();
+        if (!(file.getFlags() & RefFileRemote))
+            continue;
+        Owned<IFileRelationshipIterator> iter = dir.lookupFileRelationships(file.getLogicalName(), NULL,
+            NULL, NULL, NULL, NULL, NULL, addr.str(), WF_LOOKUP_TIMEOUT);
+
+        ForEach(*iter)
+        {
+            IFileRelationship &r=iter->query();
+            const char* assoc = r.querySecondaryFilename();
+            if (!assoc)
+                continue;
+            if (*assoc == '~')
+                assoc++;
+            IReferencedFile *refAssoc = map.getValue(assoc);
+            if (refAssoc && !(refAssoc->getFlags() & RefFileCopyInfoFailed))
+            {
+                dir.addFileRelationship(file.getLogicalName(), assoc, r.queryPrimaryFields(), r.querySecondaryFields(),
+                    r.queryKind(), r.queryCardinality(), r.isPayload(), r.queryDescription());
+            }
+        }
+    }
+}
+
+IReferencedFileIterator *ReferencedFileList::getFiles()
+{
+    return new ReferencedFileIterator(this);
+}
+
+IReferencedFileList *createReferencedFileList(const char *user, const char *pw)
+{
+    return new ReferencedFileList(user, pw);
+}

+ 61 - 0
common/roxiemanager/referencedfilelist.hpp

@@ -0,0 +1,61 @@
+/*##############################################################################
+
+    Copyright (C) 2012 HPCC Systems.
+
+    All rights reserved. This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU Affero General Public License as
+    published by the Free Software Foundation, either version 3 of the
+    License, or (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Affero General Public License for more details.
+
+    You should have received a copy of the GNU Affero General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+############################################################################## */
+
+#ifndef REFFILE_LIST_HPP
+#define REFFILE_LIST_HPP
+
+#include "jlib.hpp"
+#include "workunit.hpp"
+#include "dadfs.hpp"
+#include "dfuutil.hpp"
+
+#define RefFileNone           0x000
+#define RefFileIndex          0x001
+#define RefFileNotOnCluster   0x002
+#define RefFileNotFound       0x004
+#define RefFileRemote         0x008
+#define RefFileForeign        0x010
+#define RefFileSuper          0x020
+#define RefSubFile            0x040
+#define RefFileCopyInfoFailed 0x080
+
+interface IReferencedFile : extends IInterface
+{
+    virtual const char *getLogicalName() const =0;
+    virtual unsigned getFlags() const =0;
+    virtual const SocketEndpoint &getForeignIP() const =0;
+};
+
+interface IReferencedFileIterator : extends IIteratorOf<IReferencedFile> { };
+
+interface IReferencedFileList : extends IInterface
+{
+    virtual void addFilesFromWorkUnit(IConstWorkUnit *cw)=0;
+    virtual void addFile(const char *ln)=0;
+    virtual void addFiles(StringArray &files)=0;
+
+    virtual IReferencedFileIterator *getFiles()=0;
+    virtual void resolveFiles(const char *process, const char *remoteIP, bool checkLocalFirst, bool addSubFiles)=0;
+    virtual void cloneAllInfo(bool overwrite, bool cloneSuperInfo)=0;
+    virtual void cloneFileInfo(bool overwrite, bool cloneSuperInfo)=0;
+    virtual void cloneRelationships()=0;
+};
+
+IReferencedFileList *createReferencedFileList(const char *user, const char *pw);
+
+#endif //REFFILE_LIST_HPP

+ 3 - 3
dali/base/dadfs.cpp

@@ -897,7 +897,7 @@ public:
     IDFScopeIterator *getScopeIterator(const char *subscope,bool recursive,bool includeempty,IUserDescriptor *user);
     bool loadScopeContents(const char *scopelfn,StringArray *scopes,    StringArray *supers,StringArray *files, bool includeemptyscopes);
 
-    IPropertyTree *getFileTree(const char *lname,const INode *foreigndali,IUserDescriptor *user,unsigned foreigndalitimeout,bool expandnodes=false); 
+    IPropertyTree *getFileTree(const char *lname,const INode *foreigndali,IUserDescriptor *user,unsigned foreigndalitimeout,bool expandnodes=false, bool appendForeign=true);
     void setFileAccessed(CDfsLogicalFileName &dlfn, const CDateTime &dt,const INode *foreigndali=NULL,IUserDescriptor *user=NULL,unsigned foreigndalitimeout=FOREIGN_DALI_TIMEOUT);
     IFileDescriptor *getFileDescriptor(const char *lname,const INode *foreigndali=NULL,IUserDescriptor *user=NULL,unsigned foreigndalitimeout=FOREIGN_DALI_TIMEOUT);
     IDistributedFile *getFile(const char *lname,const INode *foreigndali=NULL,IUserDescriptor *user=NULL,unsigned foreigndalitimeout=FOREIGN_DALI_TIMEOUT);
@@ -8501,7 +8501,7 @@ void CDistributedFileDirectory::setFileProtect(CDfsLogicalFileName &dlfn, const
     checkDfsReplyException(mb);
 }
 
-IPropertyTree *CDistributedFileDirectory::getFileTree(const char *lname, const INode *foreigndali,IUserDescriptor *user,unsigned foreigndalitimeout, bool expandnodes)
+IPropertyTree *CDistributedFileDirectory::getFileTree(const char *lname, const INode *foreigndali,IUserDescriptor *user,unsigned foreigndalitimeout, bool expandnodes, bool appendForeign)
 {
     // this accepts either a foreign dali node or a foreign lfn
     Owned<INode> fnode;
@@ -8565,7 +8565,7 @@ IPropertyTree *CDistributedFileDirectory::getFileTree(const char *lname, const I
             dlfn2.setForeign(foreigndali->endpoint(),false);
         ret->setProp("OrigName",dlfn.get());
     }
-    if (foreigndali) 
+    if (foreigndali && appendForeign)
         resolveForeignFiles(ret,foreigndali);
     return ret.getClear();
 }

+ 1 - 1
dali/base/dadfs.hpp

@@ -466,7 +466,7 @@ interface IDistributedFileDirectory: extends IInterface
     virtual bool exists(const char *logicalname,bool notsuper=false,bool superonly=false,IUserDescriptor *user=NULL) = 0;                           // logical name exists
     virtual bool existsPhysical(const char *logicalname,IUserDescriptor *user=NULL) = 0;                                                    // physical parts exists
 
-    virtual IPropertyTree *getFileTree(const char *lname,const INode *foreigndali=NULL,IUserDescriptor *user=NULL, unsigned foreigndalitimeout=FOREIGN_DALI_TIMEOUT, bool expandnodes=true) =0;
+    virtual IPropertyTree *getFileTree(const char *lname,const INode *foreigndali=NULL,IUserDescriptor *user=NULL, unsigned foreigndalitimeout=FOREIGN_DALI_TIMEOUT, bool expandnodes=true, bool appendForeign=true) =0;
     virtual IFileDescriptor *getFileDescriptor(const char *lname,const INode *foreigndali=NULL,IUserDescriptor *user=NULL, unsigned foreigndalitimeout=FOREIGN_DALI_TIMEOUT) =0;
 
     virtual IDistributedSuperFile *createSuperFile(const char *logicalname,bool interleaved,bool ifdoesnotexist=false,IUserDescriptor *user=NULL,IDistributedFileTransaction *transaction=NULL) = 0;

+ 10 - 7
dali/dfu/dfuutil.cpp

@@ -404,7 +404,7 @@ class CFileCloner
 
 
 
-    void cloneSubFile(IPropertyTree *ftree,const char *destfilename)                // name already has prefix added
+    void cloneSubFile(IPropertyTree *ftree,const char *destfilename, INode *srcdali)   // name already has prefix added
     {
         Owned<IFileDescriptor> srcfdesc = deserializeFileDescriptorTree(ftree,&queryNamedGroupStore(),0);
         const char * kind = srcfdesc->queryProperties().queryProp("@kind");
@@ -457,11 +457,14 @@ class CFileCloner
             physicalReplicateFile(dstfdesc,destfilename);
         }
 
+        if (srcdali && !srcdali->endpoint().isNull())
+        {
+            StringBuffer s;
+            dstfdesc->queryProperties().setProp("@cloneFrom", srcdali->endpoint().getUrlStr(s).str());
+        }
+
         Owned<IDistributedFile> dstfile = fdir->createNew(dstfdesc);
         dstfile->attach(destfilename,userdesc);
-
-
-
     }
 
     void extendSubFile(IPropertyTree *ftree,const char *destfilename)
@@ -617,7 +620,7 @@ public:
             dfile.clear();
         }
         if (strcmp(ftree->queryName(),queryDfsXmlBranchName(DXB_File))==0) {
-            cloneSubFile(ftree,dlfn.get());
+            cloneSubFile(ftree,dlfn.get(), srcdali);
         }
         else if (strcmp(ftree->queryName(),queryDfsXmlBranchName(DXB_SuperFile))==0) {
             StringArray subfiles;
@@ -692,7 +695,7 @@ public:
             }
             dfile.clear();
         }
-        cloneSubFile(ftree,dlfn.get());
+        cloneSubFile(ftree,dlfn.get(),srcdali);
         level--;
     }
 
@@ -959,7 +962,7 @@ public:
         cloner.cloneSuperFile(srcname,dlfn);
     }
 
-    void createSingleFileClone(const char *srcname,             // src LFN (can't be super)
+    void createSingleFileClone(const char *srcname,         // src LFN (can't be super)
                          const char *dstname,               // dst LFN
                          const char *cluster1,              // group name of roxie cluster
                          DFUclusterPartDiskMapping clustmap, // how the nodes are mapped

+ 9 - 0
ecl/eclcmd/eclcmd_common.hpp

@@ -54,6 +54,12 @@ typedef IEclCommand *(*EclCommandFactory)(const char *cmdname);
 #define ECLOPT_PASSWORD_ENV "ECL_PASSWORD"
 
 #define ECLOPT_NORELOAD "--no-reload"
+#define ECLOPT_OVERWRITE "--overwrite"
+#define ECLOPT_OVERWRITE_S "-O"
+#define ECLOPT_OVERWRITE_INI "overwriteDefault"
+#define ECLOPT_OVERWRITE_ENV NULL
+
+#define ECLOPT_DONT_COPY_FILES "--no-files"
 
 #define ECLOPT_NO_ACTIVATE "--no-activate"
 #define ECLOPT_ACTIVATE "--activate"
@@ -78,6 +84,9 @@ typedef IEclCommand *(*EclCommandFactory)(const char *cmdname);
 
 #define ECLOPT_NOROOT "--noroot"
 
+#define ECLOPT_DALIIP "--daliip"
+#define ECLOPT_PROCESS "--process"
+
 #define ECLOPT_WUID "--wuid"
 #define ECLOPT_WUID_S "-wu"
 #define ECLOPT_CLUSTER_DEPRECATED "--cluster"

+ 20 - 1
ecl/eclcmd/queries/ecl-queries.cpp

@@ -93,6 +93,12 @@ public:
     }
     virtual bool parseCommandLineOptions(ArgvIterator &iter)
     {
+        if (iter.done())
+        {
+            usage();
+            return false;
+        }
+
         for (; !iter.done(); iter.next())
         {
             const char *arg = iter.query();
@@ -261,7 +267,7 @@ private:
 class EclCmdQueriesCopy : public EclCmdCommon
 {
 public:
-    EclCmdQueriesCopy() : optActivate(false), optNoReload(false), optMsToWait(10000)
+    EclCmdQueriesCopy() : optActivate(false), optNoReload(false), optMsToWait(10000), optDontCopyFiles(false), optOverwrite(false)
     {
     }
     virtual bool parseCommandLineOptions(ArgvIterator &iter)
@@ -288,6 +294,8 @@ public:
                 }
                 continue;
             }
+            if (iter.matchOption(optDaliIP, ECLOPT_DALIIP))
+                continue;
             if (iter.matchFlag(optActivate, ECLOPT_ACTIVATE)||iter.matchFlag(optActivate, ECLOPT_ACTIVATE_S))
                 continue;
             if (iter.matchFlag(optNoReload, ECLOPT_NORELOAD))
@@ -296,6 +304,8 @@ public:
                 continue;
             if (iter.matchOption(optTargetCluster, ECLOPT_TARGET)||iter.matchOption(optTargetCluster, ECLOPT_TARGET_S))
                 continue;
+            if (iter.matchFlag(optDontCopyFiles, ECLOPT_DONT_COPY_FILES))
+                continue;
             if (iter.matchOption(optMsToWait, ECLOPT_WAIT))
                 continue;
             if (EclCmdCommon::matchCommandLineOption(iter, true)!=EclCmdOptionMatch)
@@ -332,7 +342,10 @@ public:
         req->setSource(optSourceQueryPath.get());
         req->setTarget(optTargetQuerySet.get());
         req->setCluster(optTargetCluster.get());
+        req->setDaliServer(optDaliIP.get());
         req->setActivate(optActivate);
+        req->setOverwrite(optOverwrite);
+        req->setDontCopyFiles(optDontCopyFiles);
         req->setWait(optMsToWait);
         req->setNoReload(optNoReload);
 
@@ -364,8 +377,11 @@ public:
             "                          or: queryset/query\n"
             "   <target_queryset>      name of queryset to copy the query into\n"
             "   -t, --target=<val>     Local target cluster to associate with remote workunit\n"
+            "   --no-files             Do not copy files referenced by query\n"
+            "   --daliip=<ip>          For file copying if remote version < 3.8\n"
             "   -A, --activate         Activate the new query\n"
             "   --no-reload            Do not request a reload of the (roxie) cluster\n"
+            "   -O, --overwrite        Overwrite existing files\n"
             "   --wait=<ms>            Max time to wait in milliseconds\n"
             " Common Options:\n",
             stdout);
@@ -375,9 +391,12 @@ private:
     StringAttr optSourceQueryPath;
     StringAttr optTargetQuerySet;
     StringAttr optTargetCluster;
+    StringAttr optDaliIP;
     unsigned optMsToWait;
     bool optActivate;
     bool optNoReload;
+    bool optOverwrite;
+    bool optDontCopyFiles;
 };
 
 IEclCommand *createEclQueriesCommand(const char *cmdname)

+ 4 - 2
esp/scm/ws_workunits.ecm

@@ -630,6 +630,7 @@ ESPresponse [exceptions_inline] WULogFileResponse
     [min_ver("1.36")] string QueryName;
     [min_ver("1.36")] string QueryId;
     [min_ver("1.36")] string FileName;
+    [min_ver("1.38")] string DaliServer;
     [http_content("application/octet-stream")] binary thefile;
 };
 
@@ -1059,8 +1060,6 @@ ESPrequest WUPublishWorkunitRequest
     string JobName;
     int Activate;
     bool NotifyCluster(false);
-    bool showFiles(0);
-    bool CopyLocal(0);
     int Wait(10000);
     bool NoReload(0);
 };
@@ -1246,7 +1245,10 @@ ESPrequest WUQuerySetCopyQueryRequest
     string Source;
     string Target;
     string Cluster;
+    string DaliServer;
     int Activate;
+    bool Overwrite(false);
+    bool DontCopyFiles(false);
     int Wait(10000);
     bool NoReload(0);
 };

+ 5 - 0
esp/services/ws_workunits/CMakeLists.txt

@@ -28,11 +28,14 @@ include(${HPCC_SOURCE_DIR}/esp/scm/smcscm.cmake)
 
 set (    SRCS
          ../../../dali/sasha/sacmd.cpp
+         ../../../dali/dfu/dfuutil.cpp
          ${ESPSCM_GENERATED_DIR}/common_esp.cpp
          ${ESPSCM_GENERATED_DIR}/ws_workunits_esp.cpp
          ${ESPSCM_GENERATED_DIR}/ws_fs_esp.cpp
          ${HPCC_SOURCE_DIR}/esp/scm/ws_workunits.ecm
          ${HPCC_SOURCE_DIR}/esp/clients/roxiecontrol.cpp
+         ${HPCC_SOURCE_DIR}/common/roxiemanager/referencedfilelist.cpp
+         ${HPCC_SOURCE_DIR}/common/roxiemanager/referencedfilelist.hpp
          ws_workunitsPlugin.cpp
          ws_workunitsService.cpp
          ws_workunitsService.hpp
@@ -49,8 +52,10 @@ include_directories (
          ./../../../dali/dfu
          ./../../../dali/sasha
          ./../../../common/roxiemanager
+         ./../../../common/remote
          ./../../../system/jlib
          ./../../../common/environment
+         ./../../../roxie/roxie
          ./../../services
          ./../common
          ./../../../system/xmllib

+ 67 - 12
esp/services/ws_workunits/ws_workunitsQuerySets.cpp

@@ -24,6 +24,11 @@
 #include "dfuwu.hpp"
 #include "eclhelper.hpp"
 #include "roxiecontrol.hpp"
+#include "dfuutil.hpp"
+#include "dautils.hpp"
+#include "referencedfilelist.hpp"
+
+#define DALI_FILE_LOOKUP_TIMEOUT (1000*15*1)  // 15 seconds
 
 const unsigned roxieQueryRoxieTimeOut = 60000;
 
@@ -40,7 +45,16 @@ bool isRoxieProcess(const char *process)
     return conn->queryRoot()->hasProp(xpath.str());
 }
 
-void fetchRemoteWorkunit(IEspContext &context, const char *netAddress, const char *queryset, const char *query, const char *wuid, StringBuffer &name, StringBuffer &xml, StringBuffer &dllname, MemoryBuffer &dll)
+void checkUseEspOrDaliIP(SocketEndpoint &ep, const char *ip, const char *esp)
+{
+    if (!ip || !*ip)
+        return;
+    ep.set(ip, 7070);
+    if (ep.isLoopBack() || *ip=='.' || (ip[0]=='0' && ip[1]=='.'))
+        ep.ipset(esp);
+}
+
+void fetchRemoteWorkunit(IEspContext &context, const char *netAddress, const char *queryset, const char *query, const char *wuid, StringBuffer &name, StringBuffer &xml, StringBuffer &dllname, MemoryBuffer &dll, StringBuffer &daliServer)
 {
     Owned<IClientWsWorkunits> ws;
     ws.setown(createWsWorkunitsClient());
@@ -70,6 +84,10 @@ void fetchRemoteWorkunit(IEspContext &context, const char *netAddress, const cha
     dll.append(resp->getThefile());
     dllname.append(resp->getFileName());
     name.append(resp->getQueryName());
+    SocketEndpoint ep;
+    checkUseEspOrDaliIP(ep, resp->getDaliServer(), netAddress);
+    if (!ep.isNull())
+        ep.getUrlStr(daliServer);
 }
 
 void doWuFileCopy(IClientFileSpray &fs, IEspWULogicalFileCopyInfo &info, const char *logicalname, const char *cluster, bool isRoxie, bool supercopy)
@@ -280,6 +298,7 @@ bool reloadCluster(const char *cluster, unsigned wait)
     return (clusterInfo) ? reloadCluster(clusterInfo, wait) : true;
 }
 
+
 bool CWsWorkunitsEx::onWUPublishWorkunit(IEspContext &context, IEspWUPublishWorkunitRequest & req, IEspWUPublishWorkunitResponse & resp)
 {
     StringBuffer wuid = req.getWuid();
@@ -310,6 +329,15 @@ bool CWsWorkunitsEx::onWUPublishWorkunit(IEspContext &context, IEspWUPublishWork
     if (!isValidCluster(target.str()))
         throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", target.str());
 
+    Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(target.str());
+    if (clusterInfo->getPlatform()==RoxieCluster)
+    {
+        SCMStringBuffer process;
+        Owned<IReferencedFileList> wufiles = createReferencedFileList(context.queryUserId(), context.queryPassword());
+        wufiles->resolveFiles(clusterInfo->getRoxieProcess(process).str(), NULL, true, true);
+        wufiles->cloneAllInfo(false, true);
+    }
+
     WorkunitUpdate wu(&cw->lock());
     if (notEmpty(req.getJobName()))
         wu->setJobName(req.getJobName());
@@ -324,17 +352,9 @@ bool CWsWorkunitsEx::onWUPublishWorkunit(IEspContext &context, IEspWUPublishWork
     resp.setQueryName(queryName.str());
     resp.setQuerySet(target.str());
 
-    Owned <IConstWUClusterInfo> targetInfo = getTargetClusterInfo(target.str());
-    if (req.getCopyLocal() || req.getShowFiles())
-    {
-        IArrayOf<IConstWUCopyLogicalClusterFileSections> clusterfiles;
-        copyWULogicalFilesToTarget(context, *targetInfo, *cw, clusterfiles, req.getCopyLocal());
-        resp.setClusterFiles(clusterfiles);
-    }
-
     bool reloadFailed = false;
     if (0!=req.getWait() && !req.getNoReload())
-        reloadFailed = !reloadCluster(targetInfo, (unsigned)req.getWait());
+        reloadFailed = !reloadCluster(clusterInfo, (unsigned)req.getWait());
     resp.setReloadFailed(reloadFailed);
 
     return true;
@@ -794,6 +814,32 @@ bool splitQueryPath(const char *path, StringBuffer &netAddress, StringBuffer &qu
     return true;
 }
 
+void copyQueryFilesToClusters(IEspContext &context, IConstWorkUnit *cw, const char *remoteIP, StringArray &clusters, bool overwrite)
+{
+    Owned<IReferencedFileList> wufiles = createReferencedFileList(context.queryUserId(), context.queryPassword());
+    wufiles->addFilesFromWorkUnit(cw);
+    ForEachItemIn(i, clusters)
+    {
+        const char *cluster = clusters.item(i);
+        SCMStringBuffer process;
+        Owned <IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(cluster);
+        if (clusterInfo && clusterInfo->getRoxieProcess(process).length())
+        {
+            wufiles->resolveFiles(process.str(), remoteIP, !overwrite, true);
+            wufiles->cloneAllInfo(overwrite, true);
+        }
+    }
+}
+
+void copyQueryFilesToCluster(IEspContext &context, IConstWorkUnit *cw, const char *remoteIP, const char *cluster, bool overwrite)
+{
+    if (!cluster || !*cluster)
+        return;
+    StringArray clusters;
+    clusters.append(cluster);
+    copyQueryFilesToClusters(context, cw, remoteIP, clusters, overwrite);
+}
+
 bool CWsWorkunitsEx::onWUQuerysetCopyQuery(IEspContext &context, IEspWUQuerySetCopyQueryRequest &req, IEspWUQuerySetCopyQueryResponse &resp)
 {
     unsigned start = msTick();
@@ -814,6 +860,7 @@ bool CWsWorkunitsEx::onWUQuerysetCopyQuery(IEspContext &context, IEspWUQuerySetC
     if (!splitQueryPath(source, srcAddress, srcQuerySet, srcQuery))
         throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid source query path");
 
+    StringBuffer remoteIP;
     StringBuffer queryName;
     StringBuffer wuid;
     if (srcAddress.length())
@@ -821,7 +868,7 @@ bool CWsWorkunitsEx::onWUQuerysetCopyQuery(IEspContext &context, IEspWUQuerySetC
         StringBuffer xml;
         MemoryBuffer dll;
         StringBuffer dllname;
-        fetchRemoteWorkunit(context, srcAddress.str(), srcQuerySet.str(), srcQuery.str(), NULL, queryName, xml, dllname, dll);
+        fetchRemoteWorkunit(context, srcAddress.str(), srcQuerySet.str(), srcQuery.str(), NULL, queryName, xml, dllname, dll, remoteIP);
         deploySharedObject(context, wuid, dllname.str(), target, queryName.str(), dll, queryDirectory.str(), xml.str());
     }
     else
@@ -838,7 +885,15 @@ bool CWsWorkunitsEx::onWUQuerysetCopyQuery(IEspContext &context, IEspWUQuerySetC
     }
 
     Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
-    WorkunitUpdate wu(factory->updateWorkUnit(wuid.str()));
+    Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
+
+    if (!req.getDontCopyFiles())
+    {
+        const char *reqDali = req.getDaliServer();
+        copyQueryFilesToCluster(context, cw, (reqDali && *reqDali) ? reqDali : remoteIP.str(), target, req.getOverwrite());
+    }
+
+    WorkunitUpdate wu(&cw->lock());
     if (!wu)
         throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "Error opening wuid %s for query %s", wuid.str(), source);
 

+ 3 - 0
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -654,6 +654,8 @@ void CWsWorkunitsEx::init(IPropertyTree *cfg, const char *process, const char *s
 
     refreshValidClusters();
 
+    daliServers.set(cfg->queryProp("Software/EspProcess/@daliServers"));
+
     wuActionTable.setValue("delete", ActionDelete);
     wuActionTable.setValue("abort", ActionAbort);
     wuActionTable.setValue("pausenow", ActionPauseNow);
@@ -2599,6 +2601,7 @@ bool CWsWorkunitsEx::onWUFile(IEspContext &context,IEspWULogFileRequest &req, IE
                 StringBuffer name;
                 winfo.getWorkunitDll(name, mb);
                 resp.setFileName(name.str());
+                resp.setDaliServer(daliServers.get());
                 openSaveFile(context, opt, req.getName(), HTTP_TYPE_OCTET_STREAM, mb, resp);
             }
             else if (strieq(File_Res,req.getType()))

+ 1 - 0
esp/services/ws_workunits/ws_workunitsService.hpp

@@ -107,6 +107,7 @@ public:
 private:
     unsigned awusCacheMinutes;
     StringBuffer queryDirectory;
+    StringAttr daliServers;
     Owned<DataCache> dataCache;
     Owned<ArchivedWuCache> archivedWuCache;
     BoolHash validClusters;

+ 82 - 16
roxie/ccd/ccdfile.cpp

@@ -1525,7 +1525,74 @@ public:
     }
 };
 
-ILazyFileIO *createDynamicFile(const char *id, IPartDescriptor *pdesc, RoxieFileType fileType, int numParts)
+IFileDescriptor *checkCloneFrom(const char *id, IFileDescriptor *fdesc)
+{
+    if (id && !strnicmp(id, "foreign", 7)) //if need to support dali hopping should add each remote location
+        return NULL;
+    if (!fdesc || !fdesc->queryProperties().hasProp("@cloneFrom"))
+        return NULL;
+    SocketEndpoint cloneFrom;
+    cloneFrom.set(fdesc->queryProperties().queryProp("@cloneFrom"));
+    if (!cloneFrom.isNull())
+    {
+        CDfsLogicalFileName lfn;
+        lfn.set(id);
+        lfn.setForeign(cloneFrom, false);
+        Owned<IDistributedFile> cloneFile = queryDistributedFileDirectory().lookup(lfn);
+        if (cloneFile)
+        {
+            Owned<IFileDescriptor> cloneFDesc = cloneFile->getFileDescriptor();
+            if (cloneFDesc->numParts()!=fdesc->numParts())
+            {
+                StringBuffer s;
+                DBGLOG(ROXIE_MISMATCH, "File %s cloneFrom(%s) mismatch", id, cloneFrom.getIpText(s).str());
+                return NULL;
+            }
+            return cloneFDesc.getClear();
+        }
+    }
+    return NULL;
+}
+
+IPartDescriptor *queryMatchingRemotePart(IPartDescriptor *pdesc, IFileDescriptor *remoteFDesc, unsigned int partNum)
+{
+    if (!remoteFDesc)
+        return NULL;
+    IPartDescriptor *remotePDesc = remoteFDesc->queryPart(partNum);
+    if (!remotePDesc)
+        return NULL;
+    unsigned int crc, remoteCrc;
+    if (!pdesc || !pdesc->getCrc(crc)) //local crc not available, never DFS copied?
+        return remotePDesc;
+    if (remotePDesc->getCrc(remoteCrc) && remoteCrc==crc)
+        return remotePDesc;
+    return NULL;
+}
+
+inline bool isCopyFromCluster(IPartDescriptor *pdesc, unsigned copy, const char *process)
+{
+    StringBuffer s;
+    unsigned clusterNo = pdesc->copyClusterNum(copy);
+    return strieq(process, pdesc->queryOwner().getClusterGroupName(clusterNo, s));
+}
+
+inline void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations, bool checkSelf, unsigned maxAppend=2)
+{
+    unsigned numCopies = pdesc->numCopies();
+    unsigned appended = 0;
+    for (unsigned copy = 0; appended < maxAppend && copy < numCopies; copy++)
+    {
+        if (checkSelf && isCopyFromCluster(pdesc, copy, roxieName.str())) //don't add ourself
+            continue;
+        RemoteFilename r;
+        pdesc->getFilename(copy,r);
+        StringBuffer path;
+        locations.append(r.getRemotePath(path).str());
+        appended++;
+    }
+}
+
+ILazyFileIO *createDynamicFile(const char *id, IPartDescriptor *pdesc, IPartDescriptor *remotePDesc, RoxieFileType fileType, int numParts)
 {
     IPropertyTree &partProps = pdesc->queryProperties();
     offset_t dfsSize = partProps.getPropInt64("@size");
@@ -1554,17 +1621,9 @@ ILazyFileIO *createDynamicFile(const char *id, IPartDescriptor *pdesc, RoxieFile
 
     makePhysicalPartName(logicalname, partNo, numParts, localFileName, false, DFD_OSdefault, baseDataDirectory);  // MORE - if we get the dataDirectory we can pass it in and possibly` reuse an existing file
 
-    unsigned numCopies = pdesc->numCopies();
-    if (numCopies > 2)
-        numCopies = 2;   // only care about maximum of 2 locations at this time
-    for (unsigned copy = 0; copy < numCopies; copy++)
-    {
-        RemoteFilename r;
-        pdesc->getFilename(copy,r);
-        StringBuffer origName;
-        r.getRemotePath(origName);
-        remoteLocations.append(origName.str());
-    }
+    appendRemoteLocations(pdesc, remoteLocations, true);
+    appendRemoteLocations(remotePDesc, remoteLocations, false);
+
     return queryFileCache().lookupFile(id, partNo, fileType, localFileName, NULL, NULL, localLocations, remoteLocations, dfsSize, fileDate, false, true, false, false, crcResources ? crc : 0, pdesc->queryOwner().isCompressed(), NULL);
 }
 
@@ -2025,6 +2084,7 @@ public:
         Owned<CFileIOArray> f = new CFileIOArray();
         f->addFile(NULL, 0);
         IFileDescriptor *fdesc = subFiles.item(0);
+        Owned<IFileDescriptor> remoteFDesc = checkCloneFrom(subNames.item(0), fdesc);
         if (fdesc)
         {
             unsigned numParts = fdesc->numParts();
@@ -2036,7 +2096,8 @@ public:
                     {
                         IPartDescriptor *pdesc = fdesc->queryPart(i-1);
                         assertex(pdesc);
-                        Owned<ILazyFileIO> file = createDynamicFile(subNames.item(0), pdesc, ROXIE_FILE, numParts);
+                        IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, i-1);
+                        Owned<ILazyFileIO> file = createDynamicFile(subNames.item(0), pdesc, remotePDesc, ROXIE_FILE, numParts);
                         IPropertyTree &partProps = pdesc->queryProperties();
                         f->addFile(LINK(file), partProps.getPropInt64("@offset"));
                     }
@@ -2102,15 +2163,18 @@ public:
                     Owned<IKeyIndexSet> keyset = createKeyIndexSet();
                     ForEachItemIn(idx, subFiles)
                     {
-                        Owned <ILazyFileIO> part;
                         IFileDescriptor *fdesc = subFiles.item(idx);
+                        Owned<IFileDescriptor> remoteFDesc = checkCloneFrom(subNames.item(idx), fdesc);
+
+                        Owned <ILazyFileIO> part;
                         unsigned crc = 0;
                         if (fdesc) // NB there may be no parts for this channel 
                         {
                             IPartDescriptor *pdesc = fdesc->queryPart(partNo-1);
+                            IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, partNo-1);
                             if (pdesc)
                             {
-                                part.setown(createDynamicFile(subNames.item(idx), pdesc, ROXIE_KEY, fdesc->numParts()));
+                                part.setown(createDynamicFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, fdesc->numParts()));
                                 pdesc->getCrc(crc);
                             }
                         }
@@ -2133,13 +2197,15 @@ public:
             ForEachItemIn(idx, subFiles)
             {
                 IFileDescriptor *fdesc = subFiles.item(idx);
+                Owned<IFileDescriptor> remoteFDesc = checkCloneFrom(subNames.item(idx), fdesc);
                 Owned<IKeyIndexBase> key;
                 if (fdesc)
                 {
                     unsigned numParts = fdesc->numParts();
                     assertex(numParts > 0);
                     IPartDescriptor *pdesc = fdesc->queryPart(numParts - 1);
-                    Owned<ILazyFileIO> keyFile = createDynamicFile(subNames.item(idx), pdesc, ROXIE_KEY, numParts);
+                    IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, numParts - 1);
+                    Owned<ILazyFileIO> keyFile = createDynamicFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, numParts);
                     unsigned crc = 0;
                     pdesc->getCrc(crc);
                     StringBuffer pname;

+ 1 - 1
roxie/ccd/ccdfile.hpp

@@ -64,7 +64,7 @@ interface ILazyFileIO : extends IFileIO
     virtual void removeCache(const IRoxieFileCache *) = 0;
 };
 
-extern ILazyFileIO *createDynamicFile(const char *id, IPartDescriptor *pdesc, RoxieFileType fileType, int numParts);
+extern ILazyFileIO *createDynamicFile(const char *id, IPartDescriptor *pdesc, RoxieFileType fileType, int numParts, SocketEndpoint &cloneFrom);
 
 interface IRoxieFileCache : extends IInterface
 {