|
@@ -71,13 +71,14 @@ class ReferencedFile : public CInterface, implements IReferencedFile
|
|
|
{
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
|
- ReferencedFile(const char *name, bool isSubFile=false, unsigned _flags=0) : flags(_flags)
|
|
|
+ ReferencedFile(const char *lfn, const char *sourceIP, const char *srcCluster, bool isSubFile=false, unsigned _flags=0) : flags(_flags)
|
|
|
{
|
|
|
- StringBuffer ip;
|
|
|
- StringBuffer lc(skipForeign(name, &ip));
|
|
|
- logicalName.set(lc.toLowerCase());
|
|
|
- if (ip.length())
|
|
|
- foreignNode.setown(createINode(ip.str()));
|
|
|
+ logicalName.set(skipForeign(lfn, &daliip)).toLowerCase();
|
|
|
+ if (daliip.length())
|
|
|
+ flags |= RefFileForeign;
|
|
|
+ else
|
|
|
+ daliip.set(sourceIP);
|
|
|
+ fileSrcCluster.set(srcCluster);
|
|
|
if (isSubFile)
|
|
|
flags |= RefSubFile;
|
|
|
}
|
|
@@ -89,22 +90,29 @@ public:
|
|
|
IPropertyTree *getForeignOrRemoteFileTree(IUserDescriptor *user, INode *remote);
|
|
|
|
|
|
void processLocalFileInfo(IDistributedFile *df, const char *dstCluster, const char *srcCluster, StringArray *subfiles);
|
|
|
- void processRemoteFileTree(IPropertyTree *tree, const char *srcCluster, bool foreign, StringArray *subfiles);
|
|
|
+ void processRemoteFileTree(IPropertyTree *tree, const char *srcCluster, StringArray *subfiles);
|
|
|
|
|
|
void resolveLocal(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, StringArray *subfiles);
|
|
|
- void resolveRemote(IUserDescriptor *user, INode *remote, const char *dstCluster, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles);
|
|
|
- void resolve(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, INode *remote, bool checkLocalFirst, StringArray *subfiles);
|
|
|
+ void resolveRemote(IUserDescriptor *user, INode *remote, const char *dstCluster, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveForeign=false);
|
|
|
+ void resolve(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, INode *remote, bool checkLocalFirst, StringArray *subfiles, bool resolveForeign=false);
|
|
|
|
|
|
- virtual const char *getLogicalName() const {return logicalName.get();}
|
|
|
+ virtual const char *getLogicalName() const {return logicalName.str();}
|
|
|
virtual unsigned getFlags() const {return flags;}
|
|
|
- virtual const SocketEndpoint &getForeignIP() const {return foreignNode->endpoint();}
|
|
|
- virtual void cloneInfo(IDFUhelper *helper, IUserDescriptor *user, INode *remote, const char *dstCluster, const char *srcCluster, bool overwrite=false);
|
|
|
+ virtual const SocketEndpoint &getForeignIP(SocketEndpoint &ep) const
|
|
|
+ {
|
|
|
+ if (flags & RefFileForeign && daliip.length())
|
|
|
+ ep.set(daliip.str());
|
|
|
+ else
|
|
|
+ ep.set(NULL);
|
|
|
+ return ep;
|
|
|
+ }
|
|
|
+ virtual void cloneInfo(IDFUhelper *helper, IUserDescriptor *user, INode *remote, const char *dstCluster, const char *srcCluster, bool overwrite=false, bool cloneForeign=false);
|
|
|
void cloneSuperInfo(ReferencedFileList *list, IUserDescriptor *user, INode *remote, bool overwrite);
|
|
|
|
|
|
public:
|
|
|
- StringAttr logicalName;
|
|
|
- Owned<INode> foreignNode;
|
|
|
-
|
|
|
+ StringBuffer logicalName;
|
|
|
+ StringBuffer daliip;
|
|
|
+ StringAttr fileSrcCluster;
|
|
|
unsigned flags;
|
|
|
};
|
|
|
|
|
@@ -121,25 +129,29 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- void ensureFile(const char *ln, unsigned flags);
|
|
|
+ void ensureFile(const char *ln, unsigned flags, const char *daliip=NULL, const char *srcCluster=NULL);
|
|
|
|
|
|
- virtual void addFile(const char *ln);
|
|
|
+ virtual void addFile(const char *ln, const char *daliip=NULL, const char *srcCluster=NULL);
|
|
|
virtual void addFiles(StringArray &files);
|
|
|
virtual void addFilesFromWorkUnit(IConstWorkUnit *cw);
|
|
|
virtual void addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackageMap *pm, const char *queryid);
|
|
|
virtual void addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackage *pkg);
|
|
|
virtual void addFilesFromPackageMap(IPropertyTree *pm);
|
|
|
|
|
|
+ void addFileFromSubFile(IPropertyTree &subFile, const char *_daliip, const char *srcCluster);
|
|
|
+ void addFilesFromSuperFile(IPropertyTree &superFile, const char *_daliip, const char *srcCluster);
|
|
|
+ void addFilesFromPackage(IPropertyTree &package, const char *_daliip, const char *srcCluster);
|
|
|
+
|
|
|
virtual IReferencedFileIterator *getFiles();
|
|
|
- virtual void cloneFileInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo);
|
|
|
+ virtual void cloneFileInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo, bool cloneForeign=false);
|
|
|
virtual void cloneRelationships();
|
|
|
- virtual void cloneAllInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo)
|
|
|
+ virtual void cloneAllInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo, bool cloneForeign=false)
|
|
|
{
|
|
|
- cloneFileInfo(helper, overwrite, cloneSuperInfo);
|
|
|
+ cloneFileInfo(helper, overwrite, cloneSuperInfo, cloneForeign);
|
|
|
cloneRelationships();
|
|
|
}
|
|
|
- virtual void resolveFiles(const char *process, const char *remoteIP, const char *srcCluster, bool checkLocalFirst, bool addSubFiles);
|
|
|
- void resolveSubFiles(StringArray &subfiles, bool checkLocalFirst);
|
|
|
+ virtual void resolveFiles(const char *process, const char *remoteIP, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool resolveForeign=false);
|
|
|
+ void resolveSubFiles(StringArray &subfiles, bool checkLocalFirst, bool resolveForeign);
|
|
|
|
|
|
public:
|
|
|
Owned<IUserDescriptor> user;
|
|
@@ -173,17 +185,19 @@ void ReferencedFile::processLocalFileInfo(IDistributedFile *df, const char *dstC
|
|
|
return;
|
|
|
if (df->findCluster(dstCluster)==NotFound)
|
|
|
flags |= RefFileNotOnCluster;
|
|
|
+ if (fileSrcCluster.length())
|
|
|
+ srcCluster=fileSrcCluster;
|
|
|
if (srcCluster && *srcCluster)
|
|
|
if (NotFound == df->findCluster(srcCluster))
|
|
|
flags |= RefFileNotOnSource;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-void ReferencedFile::processRemoteFileTree(IPropertyTree *tree, const char *srcCluster, bool foreign, StringArray *subfiles)
|
|
|
+void ReferencedFile::processRemoteFileTree(IPropertyTree *tree, const char *srcCluster, StringArray *subfiles)
|
|
|
{
|
|
|
flags |= RefFileRemote;
|
|
|
- if (foreign)
|
|
|
- flags |= RefFileForeign;
|
|
|
+ if (fileSrcCluster.length())
|
|
|
+ srcCluster = fileSrcCluster;
|
|
|
if (streq(tree->queryName(), queryDfsXmlBranchName(DXB_SuperFile)))
|
|
|
{
|
|
|
flags |= RefFileSuper;
|
|
@@ -209,7 +223,7 @@ void ReferencedFile::resolveLocal(const char *dstCluster, const char *srcCluster
|
|
|
if (flags & RefFileInPackage)
|
|
|
return;
|
|
|
reset();
|
|
|
- Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName.get(), user);
|
|
|
+ Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName.str(), user);
|
|
|
if(df)
|
|
|
processLocalFileInfo(df, dstCluster, srcCluster, subfiles);
|
|
|
else
|
|
@@ -218,23 +232,27 @@ void ReferencedFile::resolveLocal(const char *dstCluster, const char *srcCluster
|
|
|
|
|
|
IPropertyTree *ReferencedFile::getForeignOrRemoteFileTree(IUserDescriptor *user, INode *remote)
|
|
|
{
|
|
|
- IDistributedFileDirectory &dir = queryDistributedFileDirectory();
|
|
|
- Owned<IPropertyTree> tree;
|
|
|
- if (foreignNode)
|
|
|
- tree.setown(dir.getFileTree(logicalName.get(), user, foreignNode, WF_LOOKUP_TIMEOUT, false, false));
|
|
|
- if (!tree && remote)
|
|
|
- tree.setown(dir.getFileTree(logicalName.get(), user, remote, WF_LOOKUP_TIMEOUT, false, false));
|
|
|
- return tree.getClear();
|
|
|
+ Owned<INode> daliNode;
|
|
|
+ if (daliip.length())
|
|
|
+ {
|
|
|
+ daliNode.setown(createINode(daliip));
|
|
|
+ remote = daliNode;
|
|
|
+ }
|
|
|
+ if (!remote)
|
|
|
+ return NULL;
|
|
|
+ return queryDistributedFileDirectory().getFileTree(logicalName.str(), user, remote, WF_LOOKUP_TIMEOUT, false, false);
|
|
|
}
|
|
|
|
|
|
-void ReferencedFile::resolveRemote(IUserDescriptor *user, INode *remote, const char *dstCluster, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles)
|
|
|
+void ReferencedFile::resolveRemote(IUserDescriptor *user, INode *remote, const char *dstCluster, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveForeign)
|
|
|
{
|
|
|
+ if ((flags & RefFileForeign) && !resolveForeign)
|
|
|
+ return;
|
|
|
if (flags & RefFileInPackage)
|
|
|
return;
|
|
|
reset();
|
|
|
if (checkLocalFirst)
|
|
|
{
|
|
|
- Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName.get(), user);
|
|
|
+ Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName.str(), user);
|
|
|
if(df)
|
|
|
{
|
|
|
processLocalFileInfo(df, dstCluster, NULL, subfiles);
|
|
@@ -243,35 +261,40 @@ void ReferencedFile::resolveRemote(IUserDescriptor *user, INode *remote, const c
|
|
|
}
|
|
|
Owned<IPropertyTree> tree = getForeignOrRemoteFileTree(user, remote);
|
|
|
if (tree)
|
|
|
- processRemoteFileTree(tree, srcCluster, false, subfiles);
|
|
|
+ processRemoteFileTree(tree, srcCluster, subfiles);
|
|
|
else
|
|
|
flags |= RefFileNotFound;
|
|
|
}
|
|
|
|
|
|
-void ReferencedFile::resolve(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, INode *remote, bool checkLocalFirst, StringArray *subfiles)
|
|
|
+void ReferencedFile::resolve(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, INode *remote, bool checkLocalFirst, StringArray *subfiles, bool resolveForeign)
|
|
|
{
|
|
|
- if (foreignNode || remote)
|
|
|
- resolveRemote(user, remote, dstCluster, srcCluster, checkLocalFirst, subfiles);
|
|
|
+ if (daliip.length() || remote)
|
|
|
+ resolveRemote(user, remote, dstCluster, srcCluster, checkLocalFirst, subfiles, resolveForeign);
|
|
|
else
|
|
|
resolveLocal(dstCluster, srcCluster, user, subfiles);
|
|
|
}
|
|
|
|
|
|
-void ReferencedFile::cloneInfo(IDFUhelper *helper, IUserDescriptor *user, INode *remote, const char *dstCluster, const char *srcCluster, bool overwrite)
|
|
|
+void ReferencedFile::cloneInfo(IDFUhelper *helper, IUserDescriptor *user, INode *remote, const char *dstCluster, const char *srcCluster, bool overwrite, bool cloneForeign)
|
|
|
{
|
|
|
if ((flags & RefFileCloned) || (flags & RefFileSuper) || (flags & RefFileInPackage))
|
|
|
return;
|
|
|
+ if ((flags & RefFileForeign) && !cloneForeign)
|
|
|
+ return;
|
|
|
if (!(flags & (RefFileRemote | RefFileForeign | RefFileNotOnCluster)))
|
|
|
return;
|
|
|
|
|
|
StringBuffer addr;
|
|
|
if (flags & RefFileForeign)
|
|
|
- foreignNode->endpoint().getUrlStr(addr);
|
|
|
+ addr.set(daliip);
|
|
|
else if (remote)
|
|
|
remote->endpoint().getUrlStr(addr);
|
|
|
|
|
|
+ if (fileSrcCluster.length())
|
|
|
+ srcCluster = fileSrcCluster;
|
|
|
+
|
|
|
try
|
|
|
{
|
|
|
- helper->createSingleFileClone(logicalName.get(), srcCluster, logicalName.get(), dstCluster,
|
|
|
+ helper->createSingleFileClone(logicalName.str(), srcCluster, logicalName.str(), dstCluster,
|
|
|
DFUcpdm_c_replicated_by_d, true, NULL, user, addr.str(), NULL, overwrite, false);
|
|
|
flags |= RefFileCloned;
|
|
|
}
|
|
@@ -284,7 +307,7 @@ void ReferencedFile::cloneInfo(IDFUhelper *helper, IUserDescriptor *user, INode
|
|
|
catch (...)
|
|
|
{
|
|
|
flags |= RefFileCopyInfoFailed;
|
|
|
- DBGLOG("Unknown error copying file info for %s, from %s", logicalName.sget(), addr.str());
|
|
|
+ DBGLOG("Unknown error copying file info for %s, from %s", logicalName.str(), addr.str());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -300,7 +323,7 @@ void ReferencedFile::cloneSuperInfo(ReferencedFileList *list, IUserDescriptor *u
|
|
|
return;
|
|
|
|
|
|
IDistributedFileDirectory &dir = queryDistributedFileDirectory();
|
|
|
- Owned<IDistributedFile> df = dir.lookup(logicalName.get(), user);
|
|
|
+ Owned<IDistributedFile> df = dir.lookup(logicalName.str(), user);
|
|
|
if(df)
|
|
|
{
|
|
|
if (!overwrite)
|
|
@@ -309,7 +332,7 @@ void ReferencedFile::cloneSuperInfo(ReferencedFileList *list, IUserDescriptor *u
|
|
|
df.clear();
|
|
|
}
|
|
|
|
|
|
- Owned<IDistributedSuperFile> superfile = dir.createSuperFile(logicalName.get(),user, true, false);
|
|
|
+ Owned<IDistributedSuperFile> superfile = dir.createSuperFile(logicalName.str(),user, true, false);
|
|
|
flags |= RefFileCloned;
|
|
|
Owned<IPropertyTreeIterator> subfiles = tree->getElements("SubFile");
|
|
|
ForEach(*subfiles)
|
|
@@ -335,7 +358,7 @@ void ReferencedFile::cloneSuperInfo(ReferencedFileList *list, IUserDescriptor *u
|
|
|
catch (...)
|
|
|
{
|
|
|
flags |= RefFileCopyInfoFailed;
|
|
|
- DBGLOG("Unknown error copying superfile info for %s", logicalName.get());
|
|
|
+ DBGLOG("Unknown error copying superfile info for %s", logicalName.str());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -377,9 +400,9 @@ public:
|
|
|
Owned<HashIterator> iter;
|
|
|
};
|
|
|
|
|
|
-void ReferencedFileList::ensureFile(const char *ln, unsigned flags)
|
|
|
+void ReferencedFileList::ensureFile(const char *ln, unsigned flags, const char *daliip, const char *srcCluster)
|
|
|
{
|
|
|
- Owned<ReferencedFile> file = new ReferencedFile(ln, false, flags);
|
|
|
+ Owned<ReferencedFile> file = new ReferencedFile(ln, daliip, srcCluster, false, flags);
|
|
|
if (!file->logicalName.length())
|
|
|
return;
|
|
|
ReferencedFile *existing = map.getValue(file->getLogicalName());
|
|
@@ -392,9 +415,9 @@ void ReferencedFileList::ensureFile(const char *ln, unsigned flags)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-void ReferencedFileList::addFile(const char *ln)
|
|
|
+void ReferencedFileList::addFile(const char *ln, const char *daliip, const char *srcCluster)
|
|
|
{
|
|
|
- ensureFile(ln, 0);
|
|
|
+ ensureFile(ln, 0, daliip, srcCluster);
|
|
|
}
|
|
|
|
|
|
void ReferencedFileList::addFiles(StringArray &files)
|
|
@@ -403,11 +426,42 @@ void ReferencedFileList::addFiles(StringArray &files)
|
|
|
addFile(files.item(i));
|
|
|
}
|
|
|
|
|
|
+void ReferencedFileList::addFileFromSubFile(IPropertyTree &subFile, const char *daliip, const char *srcCluster)
|
|
|
+{
|
|
|
+ addFile(subFile.queryProp("@value"), daliip, srcCluster);
|
|
|
+}
|
|
|
+
|
|
|
+void ReferencedFileList::addFilesFromSuperFile(IPropertyTree &superFile, const char *daliip, const char *srcCluster)
|
|
|
+{
|
|
|
+ if (superFile.hasProp("@daliip"))
|
|
|
+ daliip = superFile.queryProp("@daliip");
|
|
|
+ if (superFile.hasProp("@sourceCluster"))
|
|
|
+ srcCluster = superFile.queryProp("@sourceCluster");
|
|
|
+
|
|
|
+ Owned<IPropertyTreeIterator> subFiles = superFile.getElements("SubFile[@value]");
|
|
|
+ ForEach(*subFiles)
|
|
|
+ addFileFromSubFile(subFiles->query(), daliip, srcCluster);
|
|
|
+}
|
|
|
+
|
|
|
+void ReferencedFileList::addFilesFromPackage(IPropertyTree &package, const char *daliip, const char *srcCluster)
|
|
|
+{
|
|
|
+ if (package.hasProp("@daliip"))
|
|
|
+ daliip = package.queryProp("@daliip");
|
|
|
+ if (package.hasProp("@sourceCluster"))
|
|
|
+ srcCluster = package.queryProp("@sourceCluster");
|
|
|
+
|
|
|
+ Owned<IPropertyTreeIterator> supers = package.getElements("SuperFile");
|
|
|
+ ForEach(*supers)
|
|
|
+ addFilesFromSuperFile(supers->query(), daliip, srcCluster);
|
|
|
+}
|
|
|
+
|
|
|
void ReferencedFileList::addFilesFromPackageMap(IPropertyTree *pm)
|
|
|
{
|
|
|
- Owned<IPropertyTreeIterator> files = pm->getElements("Package/SuperFile/SubFile[@value]");
|
|
|
- ForEach(*files)
|
|
|
- addFile(files->query().queryProp("@value"));
|
|
|
+ const char *daliip = pm->queryProp("@daliip");
|
|
|
+ const char *srcCluster = pm->queryProp("@sourceCluster");
|
|
|
+ Owned<IPropertyTreeIterator> packages = pm->getElements("Package");
|
|
|
+ ForEach(*packages)
|
|
|
+ addFilesFromPackage(packages->query(), daliip, srcCluster);
|
|
|
}
|
|
|
|
|
|
void ReferencedFileList::addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackage *pkg)
|
|
@@ -466,24 +520,24 @@ void ReferencedFileList::addFilesFromWorkUnit(IConstWorkUnit *cw)
|
|
|
addFilesFromQuery(cw, NULL, NULL);
|
|
|
}
|
|
|
|
|
|
-void ReferencedFileList::resolveSubFiles(StringArray &subfiles, bool checkLocalFirst)
|
|
|
+void ReferencedFileList::resolveSubFiles(StringArray &subfiles, bool checkLocalFirst, bool resolveForeign)
|
|
|
{
|
|
|
StringArray childSubFiles;
|
|
|
ForEachItemIn(i, subfiles)
|
|
|
{
|
|
|
- Owned<ReferencedFile> file = new ReferencedFile(subfiles.item(i), true);
|
|
|
+ Owned<ReferencedFile> file = new ReferencedFile(subfiles.item(i), NULL, NULL, true);
|
|
|
if (file->logicalName.length() && !map.getValue(file->getLogicalName()))
|
|
|
{
|
|
|
- file->resolve(process.get(), srcCluster, user, remote, checkLocalFirst, &childSubFiles);
|
|
|
+ file->resolve(process.get(), srcCluster, user, remote, checkLocalFirst, &childSubFiles, resolveForeign);
|
|
|
const char *ln = file->getLogicalName();
|
|
|
map.setValue(ln, file.getClear());
|
|
|
}
|
|
|
}
|
|
|
if (childSubFiles.length())
|
|
|
- resolveSubFiles(childSubFiles, checkLocalFirst);
|
|
|
+ resolveSubFiles(childSubFiles, checkLocalFirst, resolveForeign);
|
|
|
}
|
|
|
|
|
|
-void ReferencedFileList::resolveFiles(const char *_process, const char *remoteIP, const char *_srcCluster, bool checkLocalFirst, bool expandSuperFiles)
|
|
|
+void ReferencedFileList::resolveFiles(const char *_process, const char *remoteIP, const char *_srcCluster, bool checkLocalFirst, bool expandSuperFiles, bool resolveForeign)
|
|
|
{
|
|
|
process.set(_process);
|
|
|
remote.setown((remoteIP && *remoteIP) ? createINode(remoteIP, 7070) : NULL);
|
|
@@ -493,17 +547,17 @@ void ReferencedFileList::resolveFiles(const char *_process, const char *remoteIP
|
|
|
{
|
|
|
ReferencedFileIterator files(this);
|
|
|
ForEach(files)
|
|
|
- files.queryObject().resolve(process, srcCluster, user, remote, checkLocalFirst, expandSuperFiles ? &subfiles : NULL);
|
|
|
+ files.queryObject().resolve(process, srcCluster, user, remote, checkLocalFirst, expandSuperFiles ? &subfiles : NULL, resolveForeign);
|
|
|
}
|
|
|
if (expandSuperFiles)
|
|
|
- resolveSubFiles(subfiles, checkLocalFirst);
|
|
|
+ resolveSubFiles(subfiles, checkLocalFirst, resolveForeign);
|
|
|
}
|
|
|
|
|
|
-void ReferencedFileList::cloneFileInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo)
|
|
|
+void ReferencedFileList::cloneFileInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo, bool cloneForeign)
|
|
|
{
|
|
|
ReferencedFileIterator files(this);
|
|
|
ForEach(files)
|
|
|
- files.queryObject().cloneInfo(helper, user, remote, process, srcCluster, overwrite);
|
|
|
+ files.queryObject().cloneInfo(helper, user, remote, process, srcCluster, overwrite, cloneForeign);
|
|
|
if (cloneSuperInfo)
|
|
|
ForEach(files)
|
|
|
files.queryObject().cloneSuperInfo(this, user, remote, overwrite);
|