|
@@ -139,13 +139,13 @@ public:
|
|
|
StringAttr cluster2;
|
|
|
Owned<IGroup> grp2;
|
|
|
ClusterPartDiskMapSpec spec2;
|
|
|
- bool overwrite;
|
|
|
+ unsigned overwriteFlags;
|
|
|
IDistributedFileDirectory *fdir;
|
|
|
bool repeattlk;
|
|
|
bool copyphysical;
|
|
|
unsigned level;
|
|
|
|
|
|
-
|
|
|
+ CFileCloner() : overwriteFlags(0) {}
|
|
|
|
|
|
void physicalCopyFile(IFileDescriptor *srcdesc,IFileDescriptor *dstdesc)
|
|
|
{
|
|
@@ -405,21 +405,22 @@ public:
|
|
|
throw afor2.exc.getClear();
|
|
|
}
|
|
|
|
|
|
- void updateCloneFrom(IFileDescriptor *srcfdesc, IFileDescriptor *dstfdesc, INode *srcdali, const char *srcCluster)
|
|
|
+ void updateCloneFrom(const char *lfn, IPropertyTree &attrs, IFileDescriptor *srcfdesc, INode *srcdali, const char *srcCluster)
|
|
|
{
|
|
|
+ DBGLOG("updateCloneFrom %s", lfn);
|
|
|
if (!srcdali || srcdali->endpoint().isNull())
|
|
|
- dstfdesc->queryProperties().setProp("@cloneFromPeerCluster", srcCluster);
|
|
|
+ attrs.setProp("@cloneFromPeerCluster", srcCluster);
|
|
|
else
|
|
|
{
|
|
|
StringBuffer s;
|
|
|
- dstfdesc->queryProperties().setProp("@cloneFrom", srcdali->endpoint().getUrlStr(s).str());
|
|
|
- dstfdesc->queryProperties().setProp("@cloneFromDir", srcfdesc->queryDefaultDir());
|
|
|
+ attrs.setProp("@cloneFrom", srcdali->endpoint().getUrlStr(s).str());
|
|
|
+ attrs.setProp("@cloneFromDir", srcfdesc->queryDefaultDir());
|
|
|
if (srcCluster && *srcCluster) //where to copy from has been explicity set to a remote location, don't copy from local sources
|
|
|
- dstfdesc->queryProperties().setProp("@cloneFromPeerCluster", "-");
|
|
|
+ attrs.setProp("@cloneFromPeerCluster", "-");
|
|
|
if (prefix.length())
|
|
|
- dstfdesc->queryProperties().setProp("@cloneFromPrefix", prefix.get());
|
|
|
+ attrs.setProp("@cloneFromPrefix", prefix.get());
|
|
|
|
|
|
- while(dstfdesc->queryProperties().removeProp("cloneFromGroup"));
|
|
|
+ while(attrs.removeProp("cloneFromGroup"));
|
|
|
|
|
|
unsigned numClusters = srcfdesc->numClusters();
|
|
|
for (unsigned clusterNum = 0; clusterNum < numClusters; clusterNum++)
|
|
@@ -432,13 +433,25 @@ public:
|
|
|
groupInfo->setProp("@groupName", sourceGroup);
|
|
|
ClusterPartDiskMapSpec &spec = srcfdesc->queryPartDiskMapping(clusterNum);
|
|
|
spec.toProp(groupInfo);
|
|
|
- dstfdesc->queryProperties().addPropTree("cloneFromGroup", groupInfo.getClear());
|
|
|
+ attrs.addPropTree("cloneFromGroup", groupInfo.getClear());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ void updateCloneFrom(IDistributedFile *dfile, IFileDescriptor *srcfdesc, INode *srcdali, const char *srcCluster)
|
|
|
+ {
|
|
|
+ DistributedFilePropertyLock lock(dfile);
|
|
|
+ IPropertyTree &attrs = lock.queryAttributes();
|
|
|
+ updateCloneFrom(dfile->queryLogicalName(), attrs, srcfdesc, srcdali, srcCluster);
|
|
|
+ }
|
|
|
+ void updateCloneFrom(const char *lfn, IFileDescriptor *dstfdesc, IFileDescriptor *srcfdesc, INode *srcdali, const char *srcCluster)
|
|
|
+ {
|
|
|
+ updateCloneFrom(lfn, dstfdesc->queryProperties(), srcfdesc, srcdali, srcCluster);
|
|
|
+ }
|
|
|
|
|
|
void cloneSubFile(IPropertyTree *ftree,const char *destfilename, INode *srcdali, const char *srcCluster) // name already has prefix added
|
|
|
{
|
|
|
+ DBGLOG("cloneSubFile %s", destfilename);
|
|
|
+
|
|
|
Owned<IFileDescriptor> srcfdesc = deserializeFileDescriptorTree(ftree, NULL, 0);
|
|
|
const char * kind = srcfdesc->queryProperties().queryProp("@kind");
|
|
|
bool iskey = kind&&(strcmp(kind,"key")==0);
|
|
@@ -481,16 +494,39 @@ public:
|
|
|
}
|
|
|
|
|
|
if (copyphysical) {
|
|
|
+ DBGLOG("copyphysical dst=%s", destfilename);
|
|
|
physicalCopyFile(srcfdesc,dstfdesc);
|
|
|
physicalReplicateFile(dstfdesc,destfilename);
|
|
|
}
|
|
|
|
|
|
- updateCloneFrom(srcfdesc, dstfdesc, srcdali, srcCluster);
|
|
|
+ updateCloneFrom(destfilename, dstfdesc, srcfdesc, srcdali, srcCluster);
|
|
|
|
|
|
Owned<IDistributedFile> dstfile = fdir->createNew(dstfdesc);
|
|
|
dstfile->attach(destfilename,userdesc);
|
|
|
}
|
|
|
|
|
|
+ void addCluster(IPropertyTree *ftree,const char *destfilename)
|
|
|
+ {
|
|
|
+ CDfsLogicalFileName dstlfn;
|
|
|
+ if (!dstlfn.setValidate(destfilename,true))
|
|
|
+ throw MakeStringException(-1,"Logical name %s invalid",destfilename);
|
|
|
+ Owned<IDistributedFile> dfile = fdir->lookup(dstlfn,userdesc,true);
|
|
|
+ if (dfile) {
|
|
|
+ ClusterPartDiskMapSpec spec = spec1;
|
|
|
+ const char * kind = ftree->queryProp("Attr/@kind");
|
|
|
+ bool iskey = kind&&(strcmp(kind,"key")==0);
|
|
|
+ if (iskey&&repeattlk)
|
|
|
+ spec.setRepeatedCopies(CPDMSRP_lastRepeated,false);
|
|
|
+ dfile->addCluster(cluster1,spec);
|
|
|
+ if (iskey&&!cluster2.isEmpty())
|
|
|
+ dfile->addCluster(cluster2,spec2);
|
|
|
+ if (copyphysical) {
|
|
|
+ Owned<IFileDescriptor> fdesc=dfile->getFileDescriptor();
|
|
|
+ physicalReplicateFile(fdesc,destfilename);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
void extendSubFile(IPropertyTree *ftree,const char *destfilename)
|
|
|
{
|
|
|
CDfsLogicalFileName dstlfn;
|
|
@@ -532,7 +568,7 @@ public:
|
|
|
foreignuserdesc.set(_foreignuserdesc);
|
|
|
else if (_userdesc)
|
|
|
foreignuserdesc.set(_userdesc);
|
|
|
- overwrite = _overwrite;
|
|
|
+ overwriteFlags = _overwrite ? DALI_UPDATEF_REPLACE_FILE : 0;
|
|
|
copyphysical = _copyphysical;
|
|
|
nameprefix.set(_nameprefix);
|
|
|
repeattlk = _repeattlk;
|
|
@@ -576,7 +612,7 @@ public:
|
|
|
spec2.setDefaultBaseDir(defdir2.str());
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+ inline bool checkOverwrite(unsigned flag) { return (overwriteFlags & flag)!=0; }
|
|
|
void cloneSuperFile(const char *filename, CDfsLogicalFileName &dlfn)
|
|
|
{
|
|
|
level++;
|
|
@@ -633,7 +669,7 @@ public:
|
|
|
// first see if target exists (and remove if does and overwrite specified)
|
|
|
Owned<IDistributedFile> dfile = fdir->lookup(dlfn,userdesc,true);
|
|
|
if (dfile) {
|
|
|
- if (!overwrite)
|
|
|
+ if (!checkOverwrite(DALI_UPDATEF_REPLACE_FILE))
|
|
|
throw MakeStringException(-1,"Destination file %s already exists",dlfn.get());
|
|
|
dfile->detach();
|
|
|
dfile.clear();
|
|
@@ -709,7 +745,7 @@ public:
|
|
|
// first see if target exists (and remove if does and overwrite specified)
|
|
|
Owned<IDistributedFile> dfile = fdir->lookup(dlfn,userdesc,true);
|
|
|
if (dfile) {
|
|
|
- if (!overwrite)
|
|
|
+ if (!checkOverwrite(DALI_UPDATEF_REPLACE_FILE))
|
|
|
throw MakeStringException(-1,"Destination file %s already exists",dlfn.get());
|
|
|
|
|
|
IPropertyTree &attloc = dfile->queryAttributes();
|
|
@@ -719,7 +755,7 @@ public:
|
|
|
{
|
|
|
Owned<IFileDescriptor> dstfdesc=dfile->getFileDescriptor();
|
|
|
Owned<IFileDescriptor> srcfdesc = deserializeFileDescriptorTree(ftree, NULL, 0);
|
|
|
- updateCloneFrom(srcfdesc, dstfdesc, srcdali, srcCluster);
|
|
|
+ updateCloneFrom(filename, dstfdesc, srcfdesc, srcdali, srcCluster);
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -730,6 +766,148 @@ public:
|
|
|
level--;
|
|
|
}
|
|
|
|
|
|
+ inline const char *getDaliEndPointStr(INode *daliNode, StringBuffer &s)
|
|
|
+ {
|
|
|
+ if (!daliNode)
|
|
|
+ return "(local)";
|
|
|
+ return daliNode->endpoint().getUrlStr(s).str();
|
|
|
+ }
|
|
|
+ inline bool checkHasCluster(IDistributedFile *dfile)
|
|
|
+ {
|
|
|
+ StringArray clusterNames;
|
|
|
+ dfile->getClusterNames(clusterNames);
|
|
|
+ return clusterNames.find(cluster1)!=NotFound;
|
|
|
+ }
|
|
|
+ inline bool checkIsKey(IPropertyTree *ftree)
|
|
|
+ {
|
|
|
+ const char * kind = ftree->queryProp("Attr/@kind");
|
|
|
+ return kind && streq(kind, "key");
|
|
|
+ }
|
|
|
+ void addCluster(IDistributedFile *dfile, IPropertyTree *srcFtree)
|
|
|
+ {
|
|
|
+ if (dfile)
|
|
|
+ {
|
|
|
+ ClusterPartDiskMapSpec spec = spec1;
|
|
|
+ if (checkIsKey(srcFtree) && repeattlk)
|
|
|
+ spec.setRepeatedCopies(CPDMSRP_lastRepeated, false);
|
|
|
+ DBGLOG("addCluster %s to file %s", cluster1.str(), dfile->queryLogicalName());
|
|
|
+ dfile->addCluster(cluster1, spec);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ bool checkFileChanged(IDistributedFile *dfile, IPropertyTree *srcftree, IPropertyTree *srcAttrs)
|
|
|
+ {
|
|
|
+ IPropertyTree &dstAttrs = dfile->queryAttributes();
|
|
|
+ return (dfile->numParts() != (unsigned) srcftree->getPropInt("@numparts") ||
|
|
|
+ srcAttrs->getPropInt("@eclCRC") != dstAttrs.getPropInt("@eclCRC") ||
|
|
|
+ srcAttrs->getPropInt("@totalCRC") != dstAttrs.getPropInt64("@totalCRC"));
|
|
|
+ }
|
|
|
+ inline bool checkValueChanged(const char *s1, const char *s2)
|
|
|
+ {
|
|
|
+ if (s1 && s2)
|
|
|
+ return !streq(s1, s2);
|
|
|
+ return ((s1 && *s1) || (s2 && *s2));
|
|
|
+ }
|
|
|
+ bool checkCloneFromChanged(IDistributedFile *dfile, IFileDescriptor *srcfdesc, INode *srcdali, const char *srcCluster)
|
|
|
+ {
|
|
|
+ if (!srcdali || srcdali->endpoint().isNull())
|
|
|
+ {
|
|
|
+ return checkValueChanged(dfile->queryAttributes().queryProp("@cloneFromPeerCluster"), srcCluster);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ StringBuffer s;
|
|
|
+ if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneFrom"), srcdali->endpoint().getUrlStr(s).str()))
|
|
|
+ return true;
|
|
|
+ if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneFromDir"), srcfdesc->queryDefaultDir()))
|
|
|
+ return true;
|
|
|
+ if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneFromPeerCluster"), (srcCluster && *srcCluster) ? "-" : NULL))
|
|
|
+ return true;
|
|
|
+ if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneFromPrefix"), prefix.get()))
|
|
|
+ return true;
|
|
|
+
|
|
|
+ unsigned groupCount = dfile->queryAttributes().getCount("cloneFromGroup");
|
|
|
+ if (srcCluster && *srcCluster && groupCount!=1)
|
|
|
+ return true;
|
|
|
+
|
|
|
+ unsigned numSrcClusters = srcfdesc->numClusters();
|
|
|
+ if (numSrcClusters != groupCount)
|
|
|
+ return true;
|
|
|
+ StringArray clusterNames;
|
|
|
+ dfile->getClusterNames(clusterNames);
|
|
|
+ for (unsigned clusterNum = 0; clusterNum < numSrcClusters; clusterNum++)
|
|
|
+ {
|
|
|
+ StringBuffer sourceGroup;
|
|
|
+ srcfdesc->getClusterGroupName(clusterNum, sourceGroup, NULL);
|
|
|
+ if (!clusterNames.contains(sourceGroup.str()))
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ void cloneRoxieFile(const char *filename, const char *destfilename)
|
|
|
+ {
|
|
|
+ Linked<INode> srcdali = foreigndalinode;
|
|
|
+ CDfsLogicalFileName srcLFN;
|
|
|
+ srcLFN.set(filename);
|
|
|
+ if (srcLFN.isForeign())
|
|
|
+ {
|
|
|
+ SocketEndpoint ep;
|
|
|
+ srcLFN.getEp(ep);
|
|
|
+ srcLFN.clearForeign();
|
|
|
+ srcdali.setown(createINode(ep));
|
|
|
+ }
|
|
|
+ StringBuffer s;
|
|
|
+ Owned<IPropertyTree> ftree = fdir->getFileTree(srcLFN.get(), foreignuserdesc, srcdali, FOREIGN_DALI_TIMEOUT, false);
|
|
|
+ if (!ftree.get())
|
|
|
+ throw MakeStringException(-1,"Source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s));
|
|
|
+ IPropertyTree *attsrc = ftree->queryPropTree("Attr");
|
|
|
+ if (!attsrc)
|
|
|
+ throw MakeStringException(-1,"Attributes for source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s));
|
|
|
+
|
|
|
+ CDfsLogicalFileName dlfn;
|
|
|
+ dlfn.set(destfilename);
|
|
|
+ if (!streq(ftree->queryName(),queryDfsXmlBranchName(DXB_File)))
|
|
|
+ throw MakeStringException(-1,"Source file %s in Dali %s is not a simple file",filename, getDaliEndPointStr(srcdali, s));
|
|
|
+
|
|
|
+ if (!srcdali.get()||queryCoven().inCoven(srcdali))
|
|
|
+ {
|
|
|
+ // if dali is local and filenames same
|
|
|
+ if (streq(srcLFN.get(), dlfn.get()))
|
|
|
+ {
|
|
|
+ extendSubFile(ftree,dlfn.get());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //see if target already exists
|
|
|
+ Owned<IDistributedFile> dfile = fdir->lookup(dlfn, userdesc, true);
|
|
|
+ if (dfile)
|
|
|
+ {
|
|
|
+ if (!checkOverwrite(DALI_UPDATEF_SUBFILE_MASK))
|
|
|
+ throw MakeStringException(-1, "Destination file %s already exists", dlfn.get());
|
|
|
+
|
|
|
+ if (checkOverwrite(DALI_UPDATEF_REPLACE_FILE) && checkFileChanged(dfile, ftree, attsrc)) //complete overwrite
|
|
|
+ {
|
|
|
+ DBGLOG("replacing file %s", destfilename);
|
|
|
+ dfile->detach();
|
|
|
+ dfile.clear();
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (checkOverwrite(DALI_UPDATEF_APPEND_CLUSTER) && !checkHasCluster(dfile))
|
|
|
+ addCluster(dfile, ftree);
|
|
|
+ if (checkOverwrite(DALI_UPDATEF_CLONE_FROM))
|
|
|
+ {
|
|
|
+ Owned<IFileDescriptor> srcfdesc = deserializeFileDescriptorTree(ftree, NULL, 0);
|
|
|
+ if (checkCloneFromChanged(dfile, srcfdesc, srcdali, srcCluster))
|
|
|
+ updateCloneFrom(dfile, srcfdesc, srcdali, srcCluster);
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ cloneSubFile(ftree,dlfn.get(),srcdali, srcCluster);
|
|
|
+ }
|
|
|
|
|
|
};
|
|
|
|
|
@@ -1052,18 +1230,19 @@ public:
|
|
|
const char *defReplicateFolder,
|
|
|
IUserDescriptor *userdesc, // user desc for local dali
|
|
|
const char *foreigndali, // can be omitted if srcname foreign or local
|
|
|
- bool overwrite // overwrite destination if exists
|
|
|
+ unsigned overwriteFlags // overwrite destination if exists
|
|
|
)
|
|
|
{
|
|
|
- DBGLOG("cloneRoxieSubFile src=%s@%s, dst=%s@%s, prefix=%s, ow=%d, docopy=false", srcLFN, srcCluster, dstLFN, dstCluster, prefix, overwrite);
|
|
|
+ DBGLOG("cloneRoxieSubFile src=%s@%s, dst=%s@%s, prefix=%s, ow=%d, docopy=false", srcLFN, srcCluster, dstLFN, dstCluster, prefix, overwriteFlags);
|
|
|
CFileCloner cloner;
|
|
|
- cloner.init(dstCluster, DFUcpdm_c_replicated_by_d, true, NULL, userdesc, foreigndali, NULL, NULL, overwrite, false);
|
|
|
+ cloner.init(dstCluster, DFUcpdm_c_replicated_by_d, true, NULL, userdesc, foreigndali, NULL, NULL, false, false);
|
|
|
+ cloner.overwriteFlags = overwriteFlags;
|
|
|
cloner.spec1.setRoxie(redundancy, channelsPerNode, replicateOffset);
|
|
|
if (defReplicateFolder)
|
|
|
cloner.spec1.setDefaultReplicateDir(defReplicateFolder);
|
|
|
cloner.srcCluster.set(srcCluster);
|
|
|
cloner.prefix.set(prefix);
|
|
|
- cloner.cloneFile(srcLFN, dstLFN);
|
|
|
+ cloner.cloneRoxieFile(srcLFN, dstLFN);
|
|
|
}
|
|
|
|
|
|
|