|
@@ -322,19 +322,12 @@ struct cDirDesc
|
|
|
const char *decodeName(unsigned drv,const char *name,unsigned node, unsigned numnodes,
|
|
|
StringAttr &mask, // decoded mask
|
|
|
unsigned &pf, // part node
|
|
|
- unsigned &nf, // num parts
|
|
|
- unsigned &xn) // part in wrong place (xn!=(unsigned)-1)
|
|
|
+ unsigned &nf) // num parts
|
|
|
{
|
|
|
- xn = (unsigned)-1;
|
|
|
const char *fn = name;
|
|
|
// first see if tail fits a mask
|
|
|
- if (deduceMask(fn, true, mask, pf, nf)) {
|
|
|
- if ((pf+drv)%numnodes!=node) { // alternative replicate offsets TBD
|
|
|
- //PROGLOG("**misplaced(%d,%d,%d,%d) %s",pf,nf,drv,node,name);
|
|
|
- xn = node; // in the wrong place
|
|
|
- }
|
|
|
+ if (deduceMask(fn, true, mask, pf, nf))
|
|
|
fn = mask.get();
|
|
|
- }
|
|
|
else { // didn't match mask so use straight name
|
|
|
//PROGLOG("**unmatched(%d,%d,%d) %s",drv,node,numnodes,name);
|
|
|
pf = (node+numnodes-drv)%numnodes;
|
|
@@ -344,14 +337,15 @@ struct cDirDesc
|
|
|
}
|
|
|
|
|
|
|
|
|
- cFileDesc *addFile(unsigned drv,const char *name,__int64 sz,CDateTime &dt,unsigned node, unsigned numnodes, CLargeMemoryAllocator *mem)
|
|
|
+ cFileDesc *addFile(unsigned drv,const char *name,__int64 sz,CDateTime &dt,unsigned node, const SocketEndpoint &ep, IGroup &grp, unsigned numnodes, CLargeMemoryAllocator *mem)
|
|
|
{
|
|
|
|
|
|
unsigned nf;
|
|
|
unsigned pf;
|
|
|
- unsigned xn;
|
|
|
StringAttr mask;
|
|
|
- const char *fn = decodeName(drv,name,node,numnodes,mask,pf,nf,xn);
|
|
|
+ const char *fn = decodeName(drv,name,node,numnodes,mask,pf,nf);
|
|
|
+ bool misplaced = !grp.queryNode(pf).endpoint().equals(ep);
|
|
|
+
|
|
|
cFileDesc *file = files.find(fn,false);
|
|
|
if (!file) {
|
|
|
if (!mem)
|
|
@@ -359,11 +353,11 @@ struct cDirDesc
|
|
|
file = cFileDesc::create(*mem,fn,nf);
|
|
|
files.add(file);
|
|
|
}
|
|
|
- if (xn!=(unsigned)-1) {
|
|
|
+ if (misplaced) {
|
|
|
cMisplacedRec *mp = file->misplaced;
|
|
|
while (mp) {
|
|
|
- if (mp->eq(drv,pf,xn,numnodes)) {
|
|
|
- ERRLOG(LOGPFX "Duplicate file with mismatched tail (%d,%d) %s",pf,xn,name);
|
|
|
+ if (mp->eq(drv,pf,node,numnodes)) {
|
|
|
+ ERRLOG(LOGPFX "Duplicate file with mismatched tail (%d,%d) %s",pf,node,name);
|
|
|
return NULL;
|
|
|
}
|
|
|
mp = mp->next;
|
|
@@ -371,7 +365,7 @@ struct cDirDesc
|
|
|
if (!mem)
|
|
|
return NULL;
|
|
|
mp = (cMisplacedRec *)mem->alloc(sizeof(cMisplacedRec));
|
|
|
- mp->init(drv,pf,xn,numnodes);
|
|
|
+ mp->init(drv,pf,node,numnodes);
|
|
|
mp->next = file->misplaced;
|
|
|
file->misplaced = mp;
|
|
|
|
|
@@ -383,19 +377,20 @@ struct cDirDesc
|
|
|
return file;
|
|
|
}
|
|
|
|
|
|
- bool markFile(unsigned drv,const char *name, unsigned node, unsigned numnodes)
|
|
|
+ bool markFile(unsigned drv,const char *name, unsigned node, const SocketEndpoint &ep, IGroup &grp, unsigned numnodes)
|
|
|
{
|
|
|
unsigned nf;
|
|
|
unsigned pf;
|
|
|
unsigned xn;
|
|
|
StringAttr mask;
|
|
|
- const char *fn = decodeName(drv,name,node,numnodes,mask,pf,nf,xn);
|
|
|
+ const char *fn = decodeName(drv,name,node,numnodes,mask,pf,nf);
|
|
|
+ bool misplaced = !grp.queryNode(pf).endpoint().equals(ep);
|
|
|
cFileDesc *file = files.find(fn,false);
|
|
|
if (file) {
|
|
|
- if (xn!=(unsigned)-1) {
|
|
|
+ if (misplaced) {
|
|
|
cMisplacedRec *mp = file->misplaced;
|
|
|
while (mp) {
|
|
|
- if (mp->eq(drv,pf,xn,numnodes)) {
|
|
|
+ if (mp->eq(drv,pf,node,numnodes)) {
|
|
|
mp->marked = true;
|
|
|
return true;
|
|
|
}
|
|
@@ -671,13 +666,14 @@ class CNewXRefManager: public CNewXRefManagerBase
|
|
|
IArrayOf<IPropertyTree> sorteddirs;
|
|
|
|
|
|
public:
|
|
|
- Owned<IGroup> grp;
|
|
|
+ Owned<IGroup> grp, rawgrp;
|
|
|
StringArray clusters; // list of matching cluster (used in xref)
|
|
|
StringBuffer clusterscsl; // comma separated list of cluster (used in xref)
|
|
|
unsigned numnodes;
|
|
|
StringArray lostfiles;
|
|
|
CLargeMemoryAllocator mem;
|
|
|
bool verbose;
|
|
|
+ unsigned numuniqnodes = 0;
|
|
|
|
|
|
|
|
|
CNewXRefManager(unsigned maxMb=DEFAULT_MAXMEMORY)
|
|
@@ -775,17 +771,23 @@ public:
|
|
|
iphashsz = numnodes*2;
|
|
|
iphash = new IpAddress[iphashsz];
|
|
|
ipnum = new unsigned[iphashsz];
|
|
|
+ SocketEndpointArray deduppedEps;
|
|
|
ForEachNodeInGroup(i,*grp) {
|
|
|
const SocketEndpoint &ep = grp->queryNode(i).endpoint();
|
|
|
if (ep.port!=0)
|
|
|
WARNLOG(LOGPFX "Group has ports!");
|
|
|
// check port 0 TBD
|
|
|
- addIpHash(ep,i);
|
|
|
+ if (NotFound == checkIpHash(ep)) {
|
|
|
+ addIpHash(ep,i);
|
|
|
+ deduppedEps.append(ep);
|
|
|
+ }
|
|
|
}
|
|
|
+ rawgrp.setown(createIGroup(deduppedEps));
|
|
|
+ numuniqnodes = rawgrp->ordinality();
|
|
|
clusters.kill();
|
|
|
clusterscsl.clear().append(grpstr);
|
|
|
clusters.append(grpstr.str());
|
|
|
- Owned<INamedGroupIterator> giter = queryNamedGroupStore().getIterator(grp,false);
|
|
|
+ Owned<INamedGroupIterator> giter = queryNamedGroupStore().getIterator(rawgrp,false);
|
|
|
StringBuffer gname;
|
|
|
ForEach(*giter) {
|
|
|
giter->get(gname.clear());
|
|
@@ -795,7 +797,7 @@ public:
|
|
|
}
|
|
|
}
|
|
|
// add the first IP also
|
|
|
- grp->queryNode(0).endpoint().getIpText(gname.clear());
|
|
|
+ rawgrp->queryNode(0).endpoint().getIpText(gname.clear());
|
|
|
clusters.append(gname.str());
|
|
|
clusterscsl.append(',').append(gname.str());
|
|
|
if (basedir.length()==0) {
|
|
@@ -916,7 +918,7 @@ public:
|
|
|
nsz += fsz;
|
|
|
iter->getModifiedTime(dt);
|
|
|
if (!fileFiltered(path.str(),dt)) {
|
|
|
- pdir->addFile(drv,fname.str(),fsz,dt,node,numnodes,&mem);
|
|
|
+ pdir->addFile(drv,fname.str(),fsz,dt,node,ep,*grp,numnodes,&mem);
|
|
|
}
|
|
|
}
|
|
|
path.setLength(dsz);
|
|
@@ -951,7 +953,7 @@ public:
|
|
|
: parent(_parent), crit(_crit), abort(_abort)
|
|
|
{
|
|
|
rootdir = _rootdir;
|
|
|
- n = parent.numnodes;
|
|
|
+ n = parent.numuniqnodes;
|
|
|
r = (n+1)/2;
|
|
|
ok = true;
|
|
|
}
|
|
@@ -963,7 +965,7 @@ public:
|
|
|
if (!ok||abort)
|
|
|
return;
|
|
|
StringBuffer path(rootdir);
|
|
|
- SocketEndpoint ep = parent.grp->queryNode(i).endpoint();
|
|
|
+ SocketEndpoint ep = parent.rawgrp->queryNode(i).endpoint();
|
|
|
StringBuffer tmp;
|
|
|
parent.log("Scanning %s directory %s",ep.getUrlStr(tmp).str(),path.str());
|
|
|
if (!parent.scanDirectory(i,ep,path,0,NULL,NULL)) {
|
|
@@ -972,7 +974,7 @@ public:
|
|
|
}
|
|
|
i = (i+r)%n;
|
|
|
setReplicateFilename(path,1);
|
|
|
- ep = parent.grp->queryNode(i).endpoint();
|
|
|
+ ep = parent.rawgrp->queryNode(i).endpoint();
|
|
|
parent.log("Scanning %s directory %s",ep.getUrlStr(tmp.clear()).str(),path.str());
|
|
|
if (!parent.scanDirectory(i,ep,path,1,NULL,NULL)) {
|
|
|
ok = false;
|
|
@@ -980,9 +982,9 @@ public:
|
|
|
// PROGLOG("Done %i - %d used",i,parent.mem.maxallocated());
|
|
|
}
|
|
|
} afor(*this,rootdir,crit,abort);
|
|
|
- if (numThreads > numnodes)
|
|
|
- numThreads = numnodes;
|
|
|
- afor.For(numnodes,numThreads,true,numThreads>1);
|
|
|
+ if (numThreads > numuniqnodes)
|
|
|
+ numThreads = numuniqnodes;
|
|
|
+ afor.For(numuniqnodes,numThreads,true,numThreads>1);
|
|
|
if (afor.ok)
|
|
|
log("Directory scan complete");
|
|
|
else
|
|
@@ -1084,7 +1086,7 @@ public:
|
|
|
pdir = parent.findDirectory(dir.str());
|
|
|
lastdir.clear().append(dir);
|
|
|
}
|
|
|
- if (pdir&&pdir->markFile(drv,tail,nn,parent.numnodes)) {
|
|
|
+ if (pdir&&pdir->markFile(drv,tail,nn,ep,*parent.grp,parent.numnodes)) {
|
|
|
matched++;
|
|
|
}
|
|
|
|