dfuxreflib.cpp 99 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. // DFU XREF Program
  14. #include "platform.h"
  15. #include "jlib.hpp"
  16. #include "jmisc.hpp"
  17. #include "jptree.hpp"
  18. #include "mpbase.hpp"
  19. #include "mpcomm.hpp"
  20. #include "daclient.hpp"
  21. #include "dadiags.hpp"
  22. #include "danqs.hpp"
  23. #include "dadfs.hpp"
  24. #include "dasds.hpp"
  25. #include "dalienv.hpp"
  26. #include "daft.hpp"
  27. #include "rmtfile.hpp"
  28. #include "dautils.hpp"
  29. #include "jptree.hpp"
  30. #include "XRefNodeManager.hpp"
  31. //#define PARTS_SIZE_NEEDED
  32. //#define CROSSLINK_CHECK_NEEDED
  33. static bool fixSizes = false;
  34. #define TESTXML
  35. #define SDS_CONNECT_TIMEOUT (1000*60*60*2) // better than infinite
  36. #define SDS_DFS_ROOT "Files" // followed by scope/name
  37. //#define CONNECT_EACH_PATH
  38. #include "dfuxreflib.hpp"
  39. extern IPropertyTree *getDirectory(const char * directory, INode * node, unsigned short port);
  40. void testGetDir()
  41. {
  42. SocketEndpoint ep("10.173.72.1");
  43. Owned<INode> node = createINode(ep);
  44. Owned<IPropertyTree> results = getDirectory("/c$/thordata;/d$/thordata",node,getDaliServixPort());
  45. if (results) {
  46. PROGLOG("--------------------------------------------------------");
  47. PROGLOG("DIR");
  48. StringBuffer dirs;
  49. toXML(results, dirs, 2);
  50. PROGLOG("\n%s\n----------------------------------------------------------",dirs.str());
  51. }
  52. }
  53. #define CFBcluster (0x00000001)
  54. #define CFBname (0x00000002)
  55. #define CFBnumparts (0x00000004)
  56. #define CFBpartslost (0x00000008)
  57. #define CFBprimarylost (0x00000010)
  58. #define CFBreplicatedlost (0x00000040)
  59. #define CFBmodified (0x00000080)
  60. #define CFBsize (0x00000100)
  61. #define CFBmismatchedsizes (0x00000400)
  62. #define CFBpartnode (0x00010000)
  63. #define CFBpartnum (0x00020000)
  64. #define CFBpartreplicate (0x00040000)
  65. #define CFBpartprimary (0x00080000)
  66. #define CFBpartmask (CFBpartnode|CFBpartnum|CFBpartreplicate|CFBpartprimary)
  67. static IPropertyTree *addBranch(IPropertyTree *dst,const char *name)
  68. {
  69. return dst->addPropTree(name,createPTree());
  70. }
  71. struct CFileEntry;
  72. struct CDfuDirEntry;
  73. class COrphanEntry;
  74. struct CLogicalNameEntry;
  75. typedef CFileEntry *CFileEntryPtr;
  76. typedef CDfuDirEntry *CDfuDirEntryPtr;
  77. typedef COrphanEntry *COrphanEntryPtr;
  78. typedef CLogicalNameEntry *CLogicalNameEntryPtr;
  79. typedef MapStringTo<CLogicalNameEntryPtr> CLogicalNameMap;
  80. //typedef MapStringTo<CFileEntryPtr> CFileEntryMap;
  81. typedef MapStringTo<CDfuDirEntryPtr> CDfuDirEntryMap;
  82. typedef MapStringTo<COrphanEntryPtr> COrphanEntryMap;
  83. Owned <IFileIOStream> outfileio;
  84. void outf(const char *fmt, ...) __attribute__((format(printf, 1, 2)));
  85. void outf(const char *fmt, ...)
  86. {
  87. va_list args;
  88. va_start(args, fmt);
  89. StringBuffer buf;
  90. buf.valist_appendf(fmt,args);
  91. va_end(args);
  92. if (outfileio)
  93. outfileio->write(buf.length(),buf.str());
  94. else
  95. printf("%s",buf.str());
  96. }
  97. #define FEF_RESOLVED 0x01
  98. #define FEF_REPLICATE 0x02
  99. struct CFileEntry: public CInterface
  100. {
  101. CFileEntry(const char *_fname,CLogicalNameEntry *_owner,unsigned _part,bool _replicate,__int64 _size, bool compresskludge, __int64 _compsize);
  102. StringBuffer &getLogicalName(StringBuffer &buf);
  103. unsigned queryHash()
  104. {
  105. return keyhash;
  106. }
  107. bool comparePath(const char *toname)
  108. {
  109. return strcmp(toname,fname)==0;
  110. }
  111. inline bool resolved() { return (flags&FEF_RESOLVED)!=0; }
  112. inline bool replicate() { return (flags&FEF_REPLICATE)!=0; }
  113. //IpAddress ip;
  114. //StringAttr dir; // TBD
  115. //StringAttr tail;
  116. StringAttr fname;
  117. CLogicalNameEntry *owner;
  118. #ifdef PARTS_SIZE_NEEDED
  119. __int64 size;
  120. __int64 expsize;
  121. #endif
  122. #ifdef CROSSLINK_CHECK_NEEDED
  123. Owned<CFileEntry> crosslink;
  124. #endif
  125. unsigned keyhash;
  126. unsigned short part;
  127. byte flags;
  128. //bool replicate;
  129. //bool resolved;
  130. };
  131. class CFileEntryMap : public SuperHashTableOf<CFileEntry, const char>
  132. {
  133. public:
  134. ~CFileEntryMap()
  135. {
  136. _releaseAll();
  137. }
  138. virtual void onAdd(void *e)
  139. {
  140. }
  141. virtual void onRemove(void *e)
  142. {
  143. CFileEntry &elem=*(CFileEntry *)e;
  144. elem.Release();
  145. }
  146. virtual unsigned getHashFromElement(const void *e) const
  147. {
  148. return ((CFileEntry *) e)->queryHash();
  149. }
  150. virtual unsigned getHashFromFindParam(const void *fp) const
  151. {
  152. return hashc((const unsigned char *)fp, strlen((const char *)fp), 0);
  153. }
  154. virtual const void *getFindParam(const void *e) const
  155. {
  156. return ((CFileEntry *) e)->fname;
  157. }
  158. virtual bool matchesFindParam(const void *e, const void *fp, unsigned fphash) const
  159. {
  160. return (0 == strcmp(((CFileEntry *) e)->fname, (const char *)fp));
  161. }
  162. };
  163. StringBuffer &substnum(StringBuffer &str,const char *sub,unsigned n)
  164. {
  165. StringBuffer out;
  166. const char *s=str.str();
  167. const char *p=sub;
  168. while (*s) {
  169. if (*s==*p) {
  170. p++;
  171. if (!*p) {
  172. out.append(n);
  173. p = sub;
  174. }
  175. }
  176. else {
  177. if (p!=sub) {
  178. out.append(p-sub,sub);
  179. p = sub;
  180. }
  181. out.append(*s);
  182. }
  183. s++;
  184. }
  185. str.swapWith(out);
  186. return str;
  187. }
  188. static StringBuffer &makeScopeQuery(const char *scope,StringBuffer &query)
  189. {
  190. const char *s=scope;
  191. for (;;) {
  192. const char *e=strstr(s,"::");
  193. if (s!=scope)
  194. query.append('/');
  195. StringBuffer tail;
  196. if (e)
  197. tail.append(e-s,s);
  198. else
  199. tail.append(s);
  200. query.append("Scope[@name=\"").append(tail.toLowerCase().str()).append("\"]");
  201. if (!e)
  202. break;
  203. s = e+2;
  204. }
  205. return query;
  206. }
  207. static const char *splitScope(const char *name,StringBuffer &scope)
  208. {
  209. const char *s=strstr(name,"::");
  210. if (s) {
  211. for (;;) {
  212. const char *ns = strstr(s+2,"::");
  213. if (!ns)
  214. break;
  215. s = ns;
  216. }
  217. StringBuffer str;
  218. str.append(s-name,name);
  219. scope.append(str.toLowerCase().str());
  220. return s+2;
  221. }
  222. scope.append(".");
  223. return name;
  224. }
  225. static StringBuffer &makeFullnameQuery(const char *lname,StringBuffer &query)
  226. {
  227. StringBuffer scope;
  228. StringBuffer tail(splitScope(lname,scope));
  229. return makeScopeQuery(scope.str(),query).append("/File[@name=\"").append(tail.toLowerCase().str()).append("\"]");
  230. }
  231. static StringBuffer &makeAbsoluteFullnameQuery(const char *lname,StringBuffer &query)
  232. {
  233. query.append(SDS_DFS_ROOT "/");
  234. return makeFullnameQuery(lname,query);
  235. }
  236. struct CLogicalNameEntry;
  237. class CEndpointItem: public CInterface
  238. {
  239. public:
  240. rank_t rank;
  241. SocketEndpoint ep;
  242. };
  243. struct CDfuDirEntry: public CInterface
  244. {
  245. CDfuDirEntry(const char *_name,unsigned _clustsize)
  246. : name(_name)
  247. {
  248. size = 0;
  249. num = 0;
  250. numdir = 0;
  251. maxsize = 0;
  252. minsize = -1;
  253. clustersize = _clustsize;
  254. }
  255. StringAttr name;
  256. __int64 size;
  257. __int64 minsize;
  258. IpAddress minip;
  259. __int64 maxsize;
  260. IpAddress maxip;
  261. unsigned num;
  262. unsigned numdir;
  263. unsigned clustersize;
  264. StringBuffer &getskew(StringBuffer &skew)
  265. {
  266. if (clustersize&&size&&(minsize<maxsize)) {
  267. __int64 av = size/(__int64)clustersize;
  268. if (av) {
  269. unsigned pcp = (unsigned)(maxsize*100/av);
  270. unsigned pcn = (unsigned)(minsize*100/av);
  271. if ((pcp>100)||(pcn<100))
  272. skew.appendf("+%d%%/-%d%%",pcp-100,100-pcn);
  273. }
  274. }
  275. return skew;
  276. }
  277. };
  278. static unsigned short getDafsPort(const SocketEndpoint &ep,unsigned &numfails,CriticalSection *sect)
  279. {
  280. if (sect) {
  281. CriticalBlock block(*sect);
  282. if (numfails>5)
  283. return 0;
  284. }
  285. else if (numfails>5)
  286. return 0;
  287. unsigned short nPort = getActiveDaliServixPort(ep);
  288. if (nPort)
  289. return nPort;
  290. StringBuffer err("Failed to connect to DaFileSrv on ");
  291. ep.getIpText(err);
  292. #ifdef _WIN32
  293. OERRLOG("%s",err.str());
  294. if (sect) {
  295. CriticalBlock block(*sect);
  296. numfails++;
  297. }
  298. else
  299. numfails++;
  300. #else
  301. throw MakeStringExceptionDirect(-1, err.str());
  302. #endif
  303. return 0;
  304. }
  305. class CEndpointTable: public SuperHashTableOf<CEndpointItem,SocketEndpoint>
  306. {
  307. public:
  308. ~CEndpointTable()
  309. {
  310. _releaseAll();
  311. }
  312. void onAdd(void *)
  313. {
  314. // not used
  315. }
  316. void onRemove(void *e)
  317. {
  318. CEndpointItem &elem=*(CEndpointItem *)e;
  319. elem.Release();
  320. }
  321. unsigned getHashFromElement(const void *e) const
  322. {
  323. const CEndpointItem &elem=*(const CEndpointItem *)e;
  324. return elem.ep.hash(0);
  325. }
  326. unsigned getHashFromFindParam(const void *fp) const
  327. {
  328. return ((const SocketEndpoint *)fp)->hash(0);
  329. }
  330. const void * getFindParam(const void *p) const
  331. {
  332. const CEndpointItem &elem=*(const CEndpointItem *)p;
  333. return &elem.ep;
  334. }
  335. bool matchesFindParam(const void * et, const void *fp, unsigned) const
  336. {
  337. return ((CEndpointItem *)et)->ep.equals(*(SocketEndpoint *)fp);
  338. }
  339. IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(CEndpointItem,SocketEndpoint);
  340. };
  341. class CXRefManagerBase
  342. {
  343. protected:
  344. struct cMessage: public CInterface
  345. {
  346. StringAttr lname;
  347. StringAttr msg;
  348. cMessage(const char *_lname,const char *_msg)
  349. : lname(_lname), msg(_msg)
  350. {
  351. }
  352. };
  353. public:
  354. IArrayOf<IGroup> knowngroups;
  355. StringAttrArray knowngroupnames;
  356. Linked<IXRefProgressCallback> msgcallback;
  357. CriticalSection logsect;
  358. unsigned totalCompressed;
  359. __int64 totalUncompressedSize;
  360. __int64 totalCompressedSize;
  361. __int64 totalSizeOrphans;
  362. unsigned totalNumOrphans;
  363. CIArrayOf<CLogicalNameEntry> logicalnamelist;
  364. CLogicalNameMap logicalnamemap;
  365. CLogicalNameMap logicaldirmap;
  366. CFileEntryMap filemap;
  367. CDfuDirEntryMap dirmap;
  368. CIArrayOf<CDfuDirEntry> dirlist;
  369. COrphanEntryMap orphanmap;
  370. CIArrayOf<COrphanEntry> orphanlist;
  371. CEndpointTable EndpointTable;
  372. CIArrayOf<cMessage> errors;
  373. CIArrayOf<cMessage> warnings;
  374. CriticalSection inprogresssect;
  375. SocketEndpointArray inprogress;
  376. CXRefManagerBase()
  377. {
  378. totalCompressed=0;
  379. totalUncompressedSize=0;
  380. totalCompressedSize=0;
  381. }
  382. virtual ~CXRefManagerBase();
  383. IGroup *resolveGroup(IGroup *_grp,StringBuffer &gname) // takes ownership of _grp
  384. {
  385. StringBuffer name;
  386. IGroup *grp = _grp;
  387. ForEachItemIn(i,knowngroups) {
  388. GroupRelation gr = _grp->compare(&knowngroups.item(i));
  389. const char *gn = knowngroupnames.item(i).text.get();
  390. if ((gr==GRidentical)||(gn&&*gn&&((gr==GRbasesubset)||(gr==GRwrappedsuperset)))) {
  391. _grp->Release();
  392. _grp=NULL;
  393. if (gn)
  394. gname.append(gn);
  395. grp = &knowngroups.item(i);
  396. grp->Link();
  397. break;
  398. }
  399. }
  400. if (_grp) {
  401. if (queryNamedGroupStore().find(_grp,gname)) {
  402. if (gname.length()) {
  403. _grp->Release();
  404. grp=queryNamedGroupStore().lookup(gname.str());
  405. }
  406. }
  407. else {
  408. name.clear();
  409. }
  410. knowngroupnames.append(* new StringAttrItem(gname.str()));
  411. knowngroups.append(*LINK(grp));
  412. }
  413. return grp;
  414. }
  415. void log(const char * format, ...) __attribute__((format(printf, 2, 3)))
  416. {
  417. CriticalBlock block(logsect);
  418. va_list args;
  419. va_start(args, format);
  420. StringBuffer line;
  421. line.valist_appendf(format, args);
  422. va_end(args);
  423. if (msgcallback) {
  424. msgcallback->progress(line.str());
  425. }
  426. else {
  427. PROGLOG("%s",line.str());
  428. }
  429. }
  430. void error(const char *lname,const char * format, ...) __attribute__((format(printf, 3, 4)))
  431. {
  432. CriticalBlock block(logsect);
  433. va_list args;
  434. va_start(args, format);
  435. StringBuffer line;
  436. line.valist_appendf(format, args);
  437. va_end(args);
  438. if (errors.ordinality()<1000) {
  439. errors.append(*new cMessage(lname,line.str()));
  440. if (errors.ordinality()==1000)
  441. errors.append(*new cMessage("","error limit exceeded (1000), truncating"));
  442. }
  443. if (msgcallback) {
  444. StringBuffer cbline("ERROR: ");
  445. cbline.append(lname).append(": ").append(line);
  446. msgcallback->progress(cbline.str());
  447. }
  448. else {
  449. OERRLOG("%s: %s",lname,line.str());
  450. }
  451. }
  452. void warn(const char *lname,const char * format, ...) __attribute__((format(printf, 3, 4)))
  453. {
  454. CriticalBlock block(logsect);
  455. va_list args;
  456. va_start(args, format);
  457. StringBuffer line;
  458. line.valist_appendf(format, args);
  459. va_end(args);
  460. if (warnings.ordinality()<1000) {
  461. warnings.append(*new cMessage(lname,line.str()));
  462. if (warnings.ordinality()==1000)
  463. warnings.append(*new cMessage("","warning limit (1000) exceeded, truncating"));
  464. }
  465. if (msgcallback) {
  466. StringBuffer cbline("WARNING: ");
  467. cbline.append(lname).append(": ").append(line);
  468. msgcallback->progress(cbline.str());
  469. }
  470. else {
  471. UWARNLOG("%s: %s",lname,line.str());
  472. }
  473. }
  474. void addNodeInProgress(INode &node)
  475. {
  476. CriticalBlock block(inprogresssect);
  477. SocketEndpoint ep = node.endpoint();
  478. inprogress.append(ep);
  479. }
  480. void removeNodeInProgress(INode &node)
  481. {
  482. CriticalBlock block(inprogresssect);
  483. SocketEndpoint ep = node.endpoint();
  484. inprogress.zap(ep);
  485. if (inprogress.ordinality()==0)
  486. return;
  487. StringBuffer msg("Waiting for ");
  488. ForEachItemIn(i,inprogress) {
  489. if (i)
  490. msg.append(", ");
  491. inprogress.item(i).getIpText(msg);
  492. }
  493. if (msgcallback)
  494. msgcallback->progress(msg.str());
  495. else
  496. PROGLOG("%s",msg.str());
  497. }
  498. void incDirSize(const IpAddress &ip,const char *dir,__int64 sz,unsigned num,unsigned numdir,unsigned clustsize)
  499. {
  500. CDfuDirEntryPtr *entryp= dirmap.getValue(dir);
  501. CDfuDirEntryPtr entry;
  502. if (entryp)
  503. entry = *entryp;
  504. else {
  505. entry = new CDfuDirEntry(dir,clustsize);
  506. dirmap.setValue(dir,entry);
  507. dirlist.append(*entry);
  508. }
  509. entry->num+=num;
  510. entry->numdir+=numdir;
  511. entry->size+=sz;
  512. if ((entry->minsize==-1)||(entry->minsize>sz)) {
  513. entry->minsize=sz;
  514. entry->minip.ipset(ip);
  515. }
  516. if (entry->maxsize<sz) {
  517. entry->maxsize=sz;
  518. entry->maxip.ipset(ip);
  519. }
  520. }
  521. };
  522. struct CLogicalNameEntry: public CInterface
  523. {
  524. CLogicalNameEntry(CXRefManagerBase &_manager, const char *_lname,IPropertyTree &file) // takes ownership of grp
  525. : manager(_manager),lname(_lname)
  526. {
  527. replicated = false;
  528. outsidedir = 0;
  529. primarynum = 0;
  530. replicatenum = 0;
  531. max = file.getPropInt("@numparts");
  532. missinggrp = false;
  533. unknowngrp = false;
  534. const char *s=file.queryProp("@group"); // TBD - Handling for multiple clusters?
  535. if (s&&*s) {
  536. grpname.append(s);
  537. if (isAnonCluster(s))
  538. manager.warn(_lname,"File has anonymous cluster");
  539. if (strchr(s,','))
  540. manager.error(_lname,"XREF can't handle multi-file clusters yet!");
  541. }
  542. s = file.queryProp("@partmask");
  543. if (s&&*s)
  544. pmask.set(s);
  545. s = file.queryProp("@modified");
  546. if (s&&*s)
  547. {
  548. modified.set(s);
  549. }
  550. done = false;
  551. wrongwidth = false;
  552. primaryresolved = NULL;
  553. replicateresolved = NULL;
  554. nummismatchedsizes=0;
  555. mismatchedsizeinfo = NULL;
  556. totalsize=0;
  557. recordsize = file.getPropInt("Attr/@recordSize",-1);
  558. compressed = (file.getPropInt("Attr/@rowCompressed", 0)!=0)||(file.getPropInt("Attr/@blockCompressed", 0)!=0);
  559. grouped = file.getPropInt("Attr/@grouped", 0)!=0;
  560. const char *partmask = file.queryProp("@partmask");
  561. StringBuffer tmp;
  562. if (partmask&&*partmask) {
  563. if (!containsPathSepChar(partmask)) {
  564. const char *dir = file.queryProp("@directory");
  565. if (dir&&*dir)
  566. tmp.append(dir).append(getPathSepChar(dir));
  567. }
  568. tmp.append(partmask);
  569. }
  570. tmp.toLowerCase();
  571. substnum(tmp,"$n$",max);
  572. dirpartmask.set(tmp.str());
  573. }
  574. ~CLogicalNameEntry()
  575. {
  576. free(primaryresolved);
  577. free(replicateresolved);
  578. }
  579. void setGroup(IGroup *_grp,IGroup *lgrp) // note incoming group does *not* override cluster
  580. // takes ownerhip of lgrp if set
  581. {
  582. StringBuffer name;
  583. unsigned n = 0;
  584. if (_grp) {
  585. n = _grp->ordinality();
  586. grp.setown(manager.resolveGroup(_grp,name));
  587. if (!grp) {
  588. SocketListCreator cr;
  589. SocketEndpointArray epa;
  590. ForEachNodeInGroup(i,*grp) {
  591. SocketEndpoint ep = grp->queryNode(i).endpoint();
  592. epa.append(ep);
  593. }
  594. cr.addSockets(epa);
  595. manager.warn(lname.get(),"Cluster group %s did not resolve",cr.getText());
  596. }
  597. }
  598. if (grpname.length()) {
  599. if(!_grp||(stricmp(grpname.str(),name.str())!=0)) {
  600. if (!lgrp)
  601. lgrp = queryNamedGroupStore().lookup(grpname.str());
  602. if (lgrp) {
  603. if (name.length())
  604. mismatchgrp.set(name.str());
  605. grp.setown(lgrp);
  606. }
  607. else {
  608. unknowngrp = true;
  609. grpname.swapWith(name);
  610. manager.warn(lname.get(),"Cluster group %s not found",grpname.str());
  611. }
  612. }
  613. }
  614. else {
  615. grpname.swapWith(name);
  616. if (n>1)
  617. missinggrp = true;
  618. }
  619. primaryresolved=(CFileEntry * *)calloc(max,sizeof(CFileEntry *));
  620. replicateresolved=(CFileEntry * *)calloc(max,sizeof(CFileEntry *));
  621. if (!grp)
  622. manager.warn(lname.get(),"No cluster group set");
  623. }
  624. bool remove(IUserDescriptor *user)
  625. {
  626. IDistributedFileDirectory &fdir = queryDistributedFileDirectory();
  627. Owned<IDistributedFile> file = fdir.lookup(lname.get(),user);
  628. if (!file)
  629. return false;
  630. file->detach();
  631. outf("Removed %s from DDFS\n",lname.get());
  632. return true;
  633. }
  634. IGroup *queryGroup()
  635. {
  636. return grp.get();
  637. }
  638. unsigned queryNumParts()
  639. {
  640. return max;
  641. }
  642. void addCrossLink(CLogicalNameEntry *entry)
  643. {
  644. if (entry) {
  645. ForEachItemIn(i,crosslinked) {
  646. if (&crosslinked.item(i)==entry)
  647. return;
  648. }
  649. crosslinked.append(*LINK(entry));
  650. }
  651. }
  652. void addMismatchedSize(__int64 expected, __int64 actual, __int64 expanded, CFileEntry *entry)
  653. {
  654. unsigned i = nummismatchedsizes++;
  655. mismatchedsizeinfo = (MMSinfo *)(mismatchedsizeinfo?realloc(mismatchedsizeinfo,nummismatchedsizes*sizeof(MMSinfo)):malloc(sizeof(MMSinfo)));
  656. mismatchedsizeinfo[i].expected = expected;
  657. mismatchedsizeinfo[i].actual = actual;
  658. mismatchedsizeinfo[i].expanded = expanded;
  659. mismatchedsizeinfo[i].entry = entry;
  660. }
  661. bool incomplete()
  662. {
  663. for (unsigned j=0;j<max;j++)
  664. if (!primaryresolved[j]&&!replicateresolved[j])
  665. return true;
  666. return false;
  667. }
  668. void resolve(CFileEntry *entry);
  669. IPropertyTree *addFileBranch(IPropertyTree *dst,unsigned flags);
  670. StringAttr lname;
  671. StringAttr pmask;
  672. StringAttr modified;
  673. unsigned primarynum;
  674. unsigned replicatenum;
  675. unsigned max;
  676. unsigned outsidedir;
  677. StringBuffer errdir;
  678. Owned<IGroup> grp;
  679. SocketEndpointArray outsidenodes;
  680. CIArrayOf<CLogicalNameEntry> crosslinked;
  681. bool unknowngrp;
  682. StringAttr mismatchgrp;
  683. bool missinggrp;
  684. bool wrongwidth;
  685. struct MMSinfo {
  686. __int64 expected;
  687. __int64 actual;
  688. __int64 expanded;
  689. CFileEntry *entry;
  690. } *mismatchedsizeinfo;
  691. unsigned nummismatchedsizes;
  692. StringBuffer grpname;
  693. bool replicated;
  694. bool done;
  695. CFileEntry **primaryresolved;
  696. CFileEntry **replicateresolved;
  697. __int64 totalsize;
  698. size32_t recordsize;
  699. bool compressed;
  700. bool grouped;
  701. StringAttr dirpartmask;
  702. CXRefManagerBase &manager;
  703. };
  704. CFileEntry::CFileEntry(const char *_fname,CLogicalNameEntry *_owner,unsigned _part,bool _replicate,__int64 _size, bool compresskludge, __int64 _compsize)
  705. {
  706. fname.set(_fname);
  707. keyhash = hashc((const unsigned char *)fname.get(), fname.length(), 0);
  708. flags = 0;
  709. if (_replicate)
  710. flags |= FEF_REPLICATE;
  711. part = _part;
  712. #ifdef PARTS_SIZE_NEEDED
  713. size = (_compsize>0)?_compsize:_size;
  714. expsize = (_compsize>0)?_size:-1;
  715. #endif
  716. owner = _owner;
  717. if (compresskludge)
  718. owner->compressed = true;
  719. }
  720. StringBuffer &CFileEntry::getLogicalName(StringBuffer &buf)
  721. {
  722. buf.append(owner->lname.get()).append('[').append(part);
  723. if (replicate())
  724. buf.append('R');
  725. return buf.append(']');
  726. }
  727. CXRefManagerBase::~CXRefManagerBase()
  728. {
  729. ForEachItemIn(i,logicalnamelist) {
  730. CLogicalNameEntry &item = logicalnamelist.item(i);
  731. item.crosslinked.kill();
  732. }
  733. }
  734. static bool constructLogicalName(const char *fullname,const char *basedir,StringBuffer &logicalname,unsigned &num,unsigned &max)
  735. {
  736. max = 0;
  737. num = 0;
  738. // mask filename with $P$ extension or normal filename to logical name and num (0 for $P$) and max
  739. while (*fullname&&(toupper(*fullname)==toupper(*basedir))) {
  740. fullname++;
  741. basedir++;
  742. }
  743. if (isPathSepChar(*fullname))
  744. fullname++;
  745. if (!*fullname)
  746. return false;
  747. const char *s=fullname;
  748. for (;;) {
  749. const char *e=s;
  750. while (*e&&!isPathSepChar(*e))
  751. e++;
  752. if (!*e)
  753. break;
  754. if (logicalname.length())
  755. logicalname.append("::");
  756. logicalname.append(e-s,s);
  757. s = e+1;
  758. }
  759. const char *ext = strchr(s,'.');
  760. if (!ext)
  761. return false;
  762. for (;;) {
  763. const char *ne = strchr(ext+1,'.');
  764. if (!ne)
  765. break;
  766. ext = ne;
  767. }
  768. const char *es=ext;
  769. if (memicmp(es,"._$P$_of_",9)==0)
  770. es += 9;
  771. else {
  772. if (memcmp(es,"._",2)!=0)
  773. return false;
  774. es+=2;
  775. StringBuffer n;
  776. while (*es!='_') {
  777. if (!*es)
  778. return false;
  779. n.append(*(es++));
  780. }
  781. num = atoi(n.str());
  782. if (memicmp(es,"_of_",4)!=0)
  783. return false;
  784. es += 4;
  785. }
  786. max = atoi(es);
  787. if (logicalname.length())
  788. logicalname.append("::");
  789. logicalname.append(ext-s,s);
  790. logicalname.toLowerCase();
  791. return true;
  792. }
  793. static void constructPartname(const char *filename,unsigned n, StringBuffer &pn,bool replicate)
  794. {
  795. StringBuffer repdir;
  796. // mask filename with $P$ extension to paticular part filename
  797. if (replicate) {
  798. setReplicateDir(filename,repdir);
  799. filename = repdir.str();
  800. }
  801. while (*filename) {
  802. if (memicmp(filename,"_$P$_",5)==0) {
  803. filename = filename+5;
  804. pn.append('_').append(n).append('_');
  805. }
  806. else {
  807. pn.append(*filename);
  808. filename++;
  809. }
  810. }
  811. }
  812. static bool parseFileName(const char *name,StringBuffer &mname,unsigned &num,unsigned &max,bool &replicate)
  813. {
  814. // takes filename and creates mask filename with $P$ extension
  815. StringBuffer nonrepdir;
  816. replicate = setReplicateDir(name,nonrepdir,false);
  817. if (replicate)
  818. name = nonrepdir.str();
  819. num = 0;
  820. max = 0;
  821. for (;;) {
  822. char c=*name;
  823. if (!c)
  824. break;
  825. if ((c=='.')&&(name[1]=='_')) {
  826. unsigned pn = 0;
  827. const char *s = name+2;
  828. while (*s&&isdigit(*s)) {
  829. pn = pn*10+(*s-'0');
  830. s++;
  831. }
  832. if (pn&&(memicmp(s,"_of_",4)==0)) {
  833. unsigned mn = 0;
  834. s += 4;
  835. while (*s&&isdigit(*s)) {
  836. mn = mn*10+(*s-'0');
  837. s++;
  838. }
  839. if ((mn!=0)&&((*s==0)||(*s=='.'))&&(mn>=pn)) { // NB allow trailing extension
  840. mname.append("._$P$_of_").append(mn);
  841. if (*s)
  842. mname.append(s);
  843. num = pn;
  844. max = mn;
  845. return true;
  846. }
  847. }
  848. }
  849. mname.append(c);
  850. name++;
  851. }
  852. return false;
  853. }
  854. class COrphanEntry: public CInterface
  855. {
  856. public:
  857. StringAttr basedir;
  858. StringAttr fname;
  859. StringAttr modified;
  860. unsigned max;
  861. __int64 size;
  862. SocketEndpointArray primaryepa;
  863. UnsignedArray primarypna;
  864. SocketEndpointArray replicateepa;
  865. UnsignedArray replicatepna;
  866. bool dirfailed;
  867. CLogicalNameEntry *misplaced;
  868. CXRefManagerBase &manager;
  869. byte incompletestate; // 0 unknown, 1 ignore, 2 incomplete
  870. COrphanEntry(CXRefManagerBase &_manager, const char *_fname,const char *_basedir,unsigned _max,const char *_modified,CLogicalNameEntry *_misplaced)
  871. : manager(_manager),fname(_fname), basedir(_basedir), modified(_modified)
  872. {
  873. dirfailed = false;
  874. max = _max;
  875. size = 0;
  876. misplaced = _misplaced;
  877. incompletestate = 0;
  878. }
  879. void add(SocketEndpoint &ep,unsigned pn,bool replicate,__int64 sz)
  880. {
  881. if (max&&(pn>=max)) {
  882. return;
  883. }
  884. size += sz;
  885. if (replicate) {
  886. replicateepa.append(ep);
  887. replicatepna.append(pn);
  888. }
  889. else {
  890. primaryepa.append(ep);
  891. primarypna.append(pn);
  892. }
  893. }
  894. bool complete()
  895. {
  896. if (incompletestate)
  897. return (incompletestate>1);
  898. if (!max) {
  899. // check matches mask
  900. return true;
  901. }
  902. if (max==num()) {
  903. // check for a group here
  904. incompletestate = 2;
  905. return true;
  906. }
  907. incompletestate = 1;
  908. return false;
  909. }
  910. bool isSingleton()
  911. {
  912. if (primaryepa.ordinality()==1) {
  913. if (replicateepa.ordinality()>1)
  914. return false;
  915. if (primarypna.item(0)>0)
  916. return false;
  917. }
  918. if (replicateepa.ordinality()==1) {
  919. if (primaryepa.ordinality()>1)
  920. return false;
  921. if (replicatepna.item(0)>0)
  922. return false;
  923. }
  924. return true;
  925. }
  926. bool singletonName(StringBuffer &name,bool replicate)
  927. {
  928. SocketEndpoint ep;
  929. if (replicate) {
  930. if (replicateepa.ordinality()==0)
  931. return false;
  932. ep = replicateepa.item(0);
  933. }
  934. else {
  935. if (primaryepa.ordinality()==0)
  936. return false;
  937. ep = primaryepa.item(0);
  938. }
  939. StringBuffer partname;
  940. constructPartname(fname.get(),1,partname,replicate);
  941. RemoteFilename rfn;
  942. rfn.setPath(ep,partname.str());
  943. rfn.getRemotePath(name);
  944. return true;
  945. }
  946. unsigned num()
  947. {
  948. if (!max)
  949. return 1;
  950. bool *done = (bool *)calloc(sizeof(bool),max);
  951. ForEachItemIn(i1,primarypna)
  952. done[primarypna.item(i1)] = true;
  953. ForEachItemIn(i2,replicatepna)
  954. done[replicatepna.item(i2)] = true;
  955. unsigned ret = 0;
  956. for (unsigned i=0;i<max;i++)
  957. if (done[i])
  958. ret++;
  959. free(done);
  960. return ret;
  961. }
  962. void getEps(StringBuffer &out,bool replicate)
  963. {
  964. SocketEndpointArray &epa=(replicate?replicateepa:primaryepa);
  965. UnsignedArray &pna=(replicate?replicatepna:primarypna);
  966. unsigned *sorted = new unsigned[epa.ordinality()];
  967. // its expected to be in order so just do insertion sort
  968. ForEachItemIn(i1,pna) {
  969. unsigned i = i1;
  970. unsigned pn = pna.item(i1);
  971. while (i&&(pna.item(sorted[i-1])>pn))
  972. i--;
  973. if (i<i1)
  974. memmove(sorted+i+1,sorted+i,sizeof(unsigned)*(i1-i));
  975. sorted[i] = i1;
  976. }
  977. StringBuffer prefix;
  978. ForEachItemIn(i2,epa) {
  979. if (i2)
  980. out.append(", ");
  981. const SocketEndpoint &item = epa.item(sorted[i2]);
  982. StringBuffer cur;
  983. item.getUrlStr(cur);
  984. const char *s1 = prefix.str();
  985. const char *s2 = cur.str();
  986. if (prefix.length()&&(memcmp(s1,s2,prefix.length())==0))
  987. out.append(s2+prefix.length());
  988. else {
  989. unsigned n = 2;
  990. out.append(s2);
  991. prefix.clear();
  992. while (*s2&&n) {
  993. prefix.append(*s2);
  994. if (*s2=='.')
  995. n--;
  996. s2++;
  997. }
  998. }
  999. out.append('[').append(pna.item(sorted[i2])+1).append(']');
  1000. }
  1001. delete [] sorted;
  1002. }
  1003. IPropertyTree *addFileBranch(IPropertyTree *dst,unsigned flags)
  1004. {
  1005. unsigned i;
  1006. unsigned numparts = max?max:1;
  1007. StringBuffer buf;
  1008. IPropertyTree *out;
  1009. if (flags&CFBpartmask) {
  1010. unsigned np = (flags&CFBpartprimary)?primarypna.ordinality():0;
  1011. unsigned nr = (flags&CFBpartreplicate)?replicatepna.ordinality():0;
  1012. if (np+nr==0)
  1013. return NULL;
  1014. out = addBranch(dst,"File");
  1015. if (fname.get())
  1016. {
  1017. StringBuffer expfn;
  1018. StringBuffer repfn;
  1019. const char *pm = fname.get();
  1020. if (np+nr==1) {
  1021. expandMask(expfn, fname.get(), (np?primarypna.item(0):replicatepna.item(0)), numparts);
  1022. pm = expfn.str();
  1023. if ((np==0)&&setReplicateDir(pm,repfn))
  1024. pm = repfn.str();
  1025. }
  1026. else if (((flags&CFBpartprimary)==0) && setReplicateDir(pm,repfn)) // not sure if this used
  1027. pm = repfn.str();
  1028. out->setProp("Partmask",pm);
  1029. }
  1030. }
  1031. else
  1032. out = addBranch(dst,"File");
  1033. if (flags&CFBnumparts) {
  1034. out->setPropInt("Numparts",numparts);
  1035. }
  1036. if (flags&CFBmodified) {
  1037. if (modified.get())
  1038. out->setProp("Modified",modified.get());
  1039. }
  1040. if (flags&CFBcluster) {
  1041. // don't support cluster resolution on orphans
  1042. flags |= (CFBpartnum|CFBpartnode);
  1043. }
  1044. if (flags&CFBsize) {
  1045. out->setPropInt64("Size",size);
  1046. }
  1047. if (flags&CFBpartmask) {
  1048. for (int copy=0;copy<=1;copy++) {
  1049. if ((!copy&&(flags&CFBpartprimary))||
  1050. (copy&&(flags&CFBpartreplicate))) {
  1051. UnsignedArray *parts = new UnsignedArray[numparts];
  1052. if (copy) {
  1053. ForEachItemIn(i2,replicatepna)
  1054. parts[replicatepna.item(i2)].append(i2);
  1055. }
  1056. else {
  1057. ForEachItemIn(i1,primarypna)
  1058. parts[primarypna.item(i1)].append(i1);
  1059. }
  1060. for (i=0;i<numparts;i++) {
  1061. if (parts[i].ordinality()) {
  1062. StringBuffer xpath;
  1063. xpath.appendf("Part[Num=\"%d\"]",i+1);
  1064. IPropertyTree *b = out->queryPropTree(xpath.str());
  1065. if (!b)
  1066. b = addBranch(out,"Part");
  1067. if (flags&CFBpartnode) {
  1068. ForEachItemIn(i3,parts[i]) {
  1069. unsigned p = parts[i].item(i3);
  1070. if (copy) {
  1071. replicateepa.item(p).getUrlStr(buf.clear());
  1072. b->addProp("RNode",buf.str());
  1073. }
  1074. else {
  1075. primaryepa.item(p).getUrlStr(buf.clear());
  1076. b->addProp("Node",buf.str());
  1077. }
  1078. }
  1079. }
  1080. if (flags&CFBpartnum)
  1081. b->setPropInt("Num",i+1);
  1082. }
  1083. }
  1084. delete [] parts;
  1085. }
  1086. }
  1087. }
  1088. return out;
  1089. }
  1090. };
  1091. void CLogicalNameEntry::resolve(CFileEntry *entry)
  1092. {
  1093. unsigned part=entry->part-1;
  1094. if (part>=max) { // MORE
  1095. manager.error(lname.get(),"Part %d: Invalid entry (greater than max %d)",part+1,max);
  1096. return;
  1097. }
  1098. if (entry->replicate()) {
  1099. if (replicateresolved[part]) {
  1100. manager.error(lname.get(),"Part %d: Multiple replicated part entry",part+1);
  1101. }
  1102. else {
  1103. replicatenum++;
  1104. replicateresolved[part] = entry;
  1105. }
  1106. }
  1107. else {
  1108. if (primaryresolved[part]) {
  1109. manager.error(lname.get(),"Part %d: Multiple primary part entry",part+1);
  1110. }
  1111. else {
  1112. primarynum++;
  1113. primaryresolved[part] = entry;
  1114. #ifdef PARTS_SIZE_NEEDED
  1115. if (entry->size>0) {
  1116. totalsize += entry->size;
  1117. if (((int)recordsize>0)&&
  1118. ((entry->size%recordsize)!=0)&&
  1119. !compressed&&
  1120. (!grouped||(entry->size%(recordsize+1)!=0))) {
  1121. manager.error(lname.get(),"Part %d: Record size %d not multiple of file size %" I64F "d\n",part+1,recordsize,entry->size);
  1122. }
  1123. }
  1124. #endif
  1125. }
  1126. }
  1127. #ifdef PARTS_SIZE_NEEDED
  1128. if (primaryresolved[part]&&replicateresolved[part]&&(primaryresolved[part]->size!=replicateresolved[part]->size)) {
  1129. manager.error(lname.get(),"Part %d: primary size %" I64F "d is different to replicate size %" I64F "d",part+1,primaryresolved[part]->size,replicateresolved[part]->size);
  1130. }
  1131. #endif
  1132. }
  1133. IPropertyTree *CLogicalNameEntry::addFileBranch(IPropertyTree *dst,unsigned flags)
  1134. {
  1135. unsigned i;
  1136. StringBuffer buf;
  1137. IPropertyTree *out = addBranch(dst,"File");
  1138. if (flags&CFBname) {
  1139. out->setProp("Name",lname.get());
  1140. }
  1141. if (flags&CFBpartmask) {
  1142. if (pmask.get()) {
  1143. StringBuffer tmp(pmask.get());
  1144. tmp.toLowerCase();
  1145. out->setProp("Partmask",tmp.str());
  1146. }
  1147. }
  1148. if (flags&CFBnumparts) {
  1149. out->setPropInt("Numparts",max);
  1150. }
  1151. if (flags&CFBpartslost) {
  1152. unsigned n=0;
  1153. for (i=0;i<max;i++)
  1154. if ((primaryresolved[i]==NULL)&&(replicateresolved[i]==NULL))
  1155. n++;
  1156. out->setPropInt("Partslost",n);
  1157. }
  1158. if (flags&CFBprimarylost) {
  1159. if (primarynum!=max)
  1160. out->setPropInt("Primarylost",max-primarynum);
  1161. }
  1162. if (flags&CFBreplicatedlost) {
  1163. if (replicatenum!=max)
  1164. out->setPropInt("Replicatedlost",max-replicatenum);
  1165. }
  1166. if (flags&CFBmodified) {
  1167. if (modified.get())
  1168. out->setProp("Modified",modified.get());
  1169. }
  1170. if (flags&CFBcluster) {
  1171. if (grpname.length())
  1172. out->setProp("Cluster",grpname.str());
  1173. }
  1174. #ifdef PARTS_SIZE_NEEDED
  1175. if (flags&CFBsize) {
  1176. out->setPropInt64("Size",totalsize);
  1177. }
  1178. #endif
  1179. if ((flags&CFBpartmask)&&grp&&grp->ordinality()) {
  1180. unsigned n=0;
  1181. unsigned nc = (flags&CFBpartreplicate)?2:1;
  1182. unsigned rep;
  1183. for (i=0;i<max;i++) {
  1184. for (rep=0;rep<nc;rep++) {
  1185. CFileEntry *e=rep?replicateresolved[i]:primaryresolved[i];
  1186. if ((flags&CFBpartslost)&&e)
  1187. continue;
  1188. IPropertyTree *part = addBranch(out,"Part");
  1189. if (flags&CFBpartnode) {
  1190. grp->queryNode((i+rep)%grp->ordinality()).endpoint().getUrlStr(buf.clear()); // TBD check grp==cluster TBD
  1191. part->setProp("Node",buf.str());
  1192. }
  1193. if (flags&CFBpartnum) {
  1194. part->setPropInt("Num",i+1);
  1195. }
  1196. if (flags&CFBpartreplicate) {
  1197. if (rep)
  1198. part->setPropInt("Replicate",1);
  1199. }
  1200. if (flags&CFBmismatchedsizes) {
  1201. unsigned k = nummismatchedsizes;
  1202. while (k&&(mismatchedsizeinfo[k-1].entry!=e))
  1203. k--;
  1204. if (k) {
  1205. part->setPropInt64("Recordedsize",mismatchedsizeinfo[k-1].expected);
  1206. part->setPropInt64("Actualsize",mismatchedsizeinfo[k-1].actual);
  1207. }
  1208. }
  1209. }
  1210. }
  1211. }
  1212. return out;
  1213. }
  1214. struct TimedBlock
  1215. {
  1216. char *msg;
  1217. unsigned start;
  1218. unsigned limit;
  1219. unsigned ln;
  1220. TimedBlock(const char *_msg,unsigned _limit,unsigned _ln)
  1221. {
  1222. msg = strdup(_msg);
  1223. ln = _ln;
  1224. start = msTick();
  1225. limit = _limit;
  1226. }
  1227. ~TimedBlock()
  1228. {
  1229. unsigned elapsed=msTick()-start;
  1230. if (elapsed>limit)
  1231. DBGLOG("TIME: %s took %dms - line(%d)",msg,elapsed,ln);
  1232. free(msg);
  1233. }
  1234. };
  1235. #define TIMEDBLOCK(name,msg,lim) TimedBlock name(msg,lim,__LINE__);
  1236. void loadFromDFS(CXRefManagerBase &manager,IGroup *grp,unsigned numdirs,const char **dirbaselist,const char* Cluster)
  1237. {
  1238. rank_t r=grp->ordinality();
  1239. while (r--) {
  1240. SocketEndpoint ep=grp->queryNode(r).endpoint();
  1241. CEndpointItem *item=manager.EndpointTable.find(ep);
  1242. if (!item) {
  1243. item = new CEndpointItem;
  1244. item->ep = ep;
  1245. item->rank = r;
  1246. manager.EndpointTable.add(*item);
  1247. }
  1248. }
  1249. class Cscanner: public CSDSFileScanner
  1250. {
  1251. SocketEndpointArray epa;
  1252. CLogicalNameEntry* lnentry; // set during processFile
  1253. StringBuffer localname;
  1254. StringBuffer remotename;
  1255. unsigned pass;
  1256. unsigned numdirs;
  1257. const char **dirbaselist;
  1258. CXRefManagerBase &manager;
  1259. IGroup *testgroup;
  1260. Owned<IGroup> cachedgroup;
  1261. public:
  1262. Cscanner(CXRefManagerBase &_manager, IGroup *_testgroup, unsigned _numdirs,const char **_dirbaselist)
  1263. : manager(_manager)
  1264. {
  1265. numdirs = _numdirs;
  1266. dirbaselist = _dirbaselist;
  1267. testgroup = _testgroup;
  1268. }
  1269. bool checkGroupOk(const char *grpname)
  1270. {
  1271. if (!grpname)
  1272. return true;
  1273. cachedgroup.setown(queryNamedGroupStore().lookup(grpname));
  1274. if (!cachedgroup)
  1275. return true;
  1276. ForEachNodeInGroup(i,*cachedgroup)
  1277. if (testgroup->isMember(&cachedgroup->queryNode(i))) // would be better to have hash here TBD
  1278. return true;
  1279. return false;
  1280. }
  1281. virtual bool checkFileOk(IPropertyTree &file,const char *filename)
  1282. {
  1283. StringArray groups;
  1284. getFileGroups(&file,groups);
  1285. ForEachItemIn(i,groups) {
  1286. if (checkGroupOk(groups.item(i)))
  1287. return true;
  1288. }
  1289. return false;
  1290. }
  1291. void processParts(IPropertyTree &root)
  1292. {
  1293. Owned<IPropertyTreeIterator> iter;
  1294. unsigned numparts = root.getPropInt("@numparts");
  1295. MemoryBuffer mb;
  1296. if (root.getPropBin("Parts",mb)) {
  1297. iter.setown(deserializePartAttrIterator(mb)); // this itterator is in order
  1298. ForEach(*iter) {
  1299. IPropertyTree &part = iter->query();
  1300. unsigned partno = part.getPropInt("@num",0);
  1301. SocketEndpoint ep;
  1302. const char *eps = part.queryProp("@node");
  1303. if (eps&&*eps)
  1304. ep.set(eps);
  1305. processPart(root,part,partno,numparts,ep);
  1306. }
  1307. }
  1308. else { // slow but should be going anyway
  1309. iter.setown(root.getElements("Part")); // use parts
  1310. IArrayOf<IPropertyTree> parts;
  1311. Owned<IPropertyTree> empty = createPTree("Attr");
  1312. unsigned i;
  1313. for (i=0;i<numparts;i++)
  1314. parts.append(*empty.getLink());
  1315. unsigned lastpartno=0;
  1316. ForEach(*iter) {
  1317. IPropertyTree &part = iter->query();
  1318. unsigned partno = part.getPropInt("@num",0);
  1319. if (partno&&(partno!=lastpartno)&&(partno<=numparts)) {
  1320. parts.replace(*createPTreeFromIPT(&part),partno-1);
  1321. lastpartno = partno;
  1322. }
  1323. }
  1324. for (i=0;i<numparts;i++) {
  1325. IPropertyTree &part = parts.item(i);
  1326. SocketEndpoint ep;
  1327. const char *eps = part.queryProp("@node");
  1328. if (eps&&*eps)
  1329. ep.set(eps);
  1330. processPart(root,part,i+1,numparts,ep);
  1331. }
  1332. }
  1333. }
  1334. void processFile(IPropertyTree &file,StringBuffer &name)
  1335. {
  1336. if (!manager.msgcallback.get())
  1337. DBGLOG("processFile %s",name.str());
  1338. Owned<CLogicalNameEntry> lnentrybase = new CLogicalNameEntry(manager,name.str(),file);
  1339. lnentry = lnentrybase;
  1340. epa.kill();
  1341. pass=0;
  1342. processParts(file); // could improve
  1343. manager.logicalnamemap.setValue(name.str(), lnentry);
  1344. lnentry->Link();
  1345. manager.logicalnamelist.append(*lnentry);
  1346. manager.logicaldirmap.setValue(lnentry->dirpartmask.get(), lnentry);
  1347. lnentry->setGroup(epa.ordinality()?createIGroup(epa):NULL,cachedgroup.getLink());
  1348. pass = 2;
  1349. processParts(file);
  1350. }
  1351. void processFile(StringBuffer &name,StringBuffer &fullpath)
  1352. {
  1353. Owned<IRemoteConnection> conn = querySDS().connect(fullpath.str(),myProcessSession(),RTM_LOCK_READ|RTM_SUB, SDS_CONNECT_TIMEOUT);
  1354. if (conn) // there is at least one duff entry that this is needed for
  1355. processFile(*conn->queryRoot(),name);
  1356. }
  1357. void processPart(IPropertyTree &file,IPropertyTree &part,unsigned partno,unsigned numparts,SocketEndpoint &ep)
  1358. {
  1359. if (pass==0) {
  1360. partno--;
  1361. if (!ep.isNull()) {
  1362. SocketEndpoint nullep;
  1363. while (partno>=epa.ordinality())
  1364. epa.append(nullep);
  1365. epa.element(partno) = ep;
  1366. }
  1367. }
  1368. else {
  1369. bool replicate=false;
  1370. const char *partname = part.queryProp("@name");
  1371. const char *partmask = file.queryProp("@partmask");
  1372. const char *partdir = file.queryProp("@directory");
  1373. int replicateoffset = file.getPropInt("@replicateOffset",1);
  1374. for (;;) {
  1375. RemoteFilename rfn;
  1376. IGroup *grp = lnentry->queryGroup();
  1377. if (!grp) {
  1378. manager.warn(lnentry->lname.get(),"No group found, ignoring logical file");
  1379. return;
  1380. }
  1381. constructPartFilename(grp,partno,numparts,partname,partmask,partdir,replicate,replicateoffset,rfn);
  1382. SocketEndpoint rep=rfn.queryEndpoint();
  1383. if (manager.EndpointTable.find(rep)!=NULL) {
  1384. rfn.getLocalPath(localname.clear());
  1385. bool dirmatch=false;
  1386. unsigned k;
  1387. for (k=0;k<numdirs;k++)
  1388. {
  1389. if (memicmp(localname.str(),dirbaselist[k],strlen(dirbaselist[k]))==0) {
  1390. dirmatch = true;
  1391. break;
  1392. }
  1393. }
  1394. if (dirmatch) {
  1395. rfn.getRemotePath(remotename.clear());
  1396. remotename.toLowerCase();
  1397. CFileEntry *entry= new CFileEntry(remotename.str(),lnentry,partno,replicate,part.getPropInt64("@size", -1),part.getPropInt("@rowCompression", 0)!=0,part.getPropInt("@compressedSize", -1));
  1398. CFileEntry *oldentry= manager.filemap.find(remotename.str());
  1399. if (oldentry)
  1400. {
  1401. if (oldentry->owner!=lnentry) {
  1402. #ifdef CROSSLINK_CHECK_NEEDED
  1403. StringBuffer s1;
  1404. StringBuffer s2;
  1405. entry->getLogicalName(s1);
  1406. oldentry->getLogicalName(s2);
  1407. //outf("CROSSLINK: %s contains same file as %s\n",s1.str(),s2.str());
  1408. entry->owner->addCrossLink(oldentry->owner);
  1409. oldentry->owner->addCrossLink(entry->owner);
  1410. oldentry->crosslink.setown(entry);
  1411. #endif
  1412. }
  1413. else
  1414. entry->Release();
  1415. }
  1416. else {
  1417. manager.filemap.add(*entry);
  1418. }
  1419. }
  1420. else {
  1421. lnentry->outsidedir++;
  1422. if (lnentry->errdir.length()==0)
  1423. lnentry->errdir.append(localname.str());
  1424. }
  1425. }
  1426. else {
  1427. if (lnentry->outsidenodes.find(rep)==NotFound)
  1428. lnentry->outsidenodes.append(rep);
  1429. }
  1430. if (replicate)
  1431. break;
  1432. replicate = true;
  1433. }
  1434. }
  1435. }
  1436. } scanner(manager,grp,numdirs,dirbaselist);
  1437. manager.log("Loading Files branch from SDS");
  1438. Owned<IRemoteConnection> conn = querySDS().connect(SDS_DFS_ROOT,myProcessSession(),RTM_LOCK_READ, INFINITE);
  1439. if (!conn) {
  1440. throw MakeStringException(-1,"Could not connect to Files");
  1441. }
  1442. conn->changeMode(RTM_NONE);
  1443. manager.log("Files loaded, scanning");
  1444. scanner.scan(conn);
  1445. manager.log("Scanning done");
  1446. }
  1447. class CPhysicalXREF
  1448. {
  1449. unsigned numdirs;
  1450. const char **dirbaselist;
  1451. CriticalSection xrefsect;
  1452. CXRefManagerBase &manager;
  1453. public:
  1454. CPhysicalXREF(CXRefManagerBase &_manager, unsigned _numdirs,const char **_dirbaselist)
  1455. : manager(_manager)
  1456. {
  1457. numdirs = _numdirs;
  1458. dirbaselist = _dirbaselist;
  1459. setDaliServixSocketCaching(true);
  1460. }
  1461. void getBaseDir(const char *name,StringBuffer &basedir)
  1462. {
  1463. unsigned i;
  1464. for (i=0;i<numdirs;i++) {
  1465. size32_t l = strlen(dirbaselist[i]);
  1466. if ((memicmp(dirbaselist[i],name,l)==0)&&isPathSepChar(name[l])) {
  1467. basedir.append(dirbaselist[i]).toLowerCase();
  1468. return;
  1469. }
  1470. }
  1471. }
  1472. void xreffile(SocketEndpoint &ep,const char *fullname,IPropertyTree *file)
  1473. {
  1474. RemoteFilename rname;
  1475. StringBuffer name;
  1476. rname.setPath(ep,fullname);
  1477. rname.getRemotePath(name);
  1478. name.toLowerCase();
  1479. CFileEntry *entry= manager.filemap.find(name.str());
  1480. __int64 sz = file->getPropInt64("@size", -1);
  1481. if (entry) {
  1482. do {
  1483. entry->flags |= FEF_RESOLVED;
  1484. #ifdef PARTS_SIZE_NEEDED
  1485. if (entry->owner->compressed) {
  1486. if ((entry->expsize>=0)&&((sz!=entry->size)||((entry->size==0)&&(entry->expsize>0)))) {
  1487. manager.error(entry->owner->lname.get(),"Part %d: %s size mismatch: recorded size %" I64F "d, actual size %" I64F "d, expanded size %" I64F "d",entry->part,entry->replicate?"replicate":"primary",entry->size,sz,entry->expsize);
  1488. entry->owner->addMismatchedSize(entry->size,sz,entry->expsize,entry);
  1489. }
  1490. if (entry->size>=0) {
  1491. manager.totalCompressed++;
  1492. manager.totalCompressedSize+=sz;
  1493. manager.totalUncompressedSize+=entry->size;
  1494. }
  1495. }
  1496. else if ((entry->size>=0)&&(entry->size!=sz)) {
  1497. if (!entry->replicate) {
  1498. entry->owner->addMismatchedSize(entry->size,sz,-1,entry);
  1499. }
  1500. manager.error(entry->owner->lname.get(),"Part %d: %s size mismatch: recorded size %" I64F "d, actual size %" I64F "d",entry->part,entry->replicate?"replicate":"primary",entry->size,sz);
  1501. entry->size=sz; // set to actual size for fix
  1502. }
  1503. #endif
  1504. entry->owner->resolve(entry);
  1505. #ifdef CROSSLINK_CHECK_NEEDED
  1506. entry = entry->crosslink;
  1507. #else
  1508. entry = NULL;
  1509. #endif
  1510. } while (entry);
  1511. }
  1512. else {
  1513. manager.totalSizeOrphans += sz;
  1514. manager.totalNumOrphans++;
  1515. StringBuffer orphanname;
  1516. unsigned m;
  1517. unsigned n;
  1518. bool replicate;
  1519. parseFileName(fullname,orphanname,n,m,replicate);
  1520. if (n>m)
  1521. manager.error(fullname, "Part %d: number greater than max %d",n+1,m);
  1522. else {
  1523. orphanname.toLowerCase();
  1524. COrphanEntryPtr *entryp = manager.orphanmap.getValue(orphanname.str());
  1525. COrphanEntryPtr entry;
  1526. if (entryp)
  1527. entry = *entryp;
  1528. else {
  1529. StringBuffer basedir;
  1530. getBaseDir(orphanname.str(),basedir);
  1531. CLogicalNameEntryPtr *parentp = manager.logicaldirmap.getValue(orphanname.str());
  1532. entry = new COrphanEntry(manager,orphanname.str(),basedir.str(),m,file->queryProp("@modified"),parentp?*parentp:NULL);
  1533. manager.orphanmap.setValue(orphanname.str(),entry);
  1534. manager.orphanlist.append(*entry);
  1535. }
  1536. if (n)
  1537. n--;
  1538. entry->add(ep,n,replicate,sz);
  1539. }
  1540. }
  1541. }
  1542. void xrefdir(SocketEndpoint &ep,StringBuffer &path,IPropertyTree *dirtree,unsigned clustsize)
  1543. {
  1544. unsigned fnum=0;
  1545. unsigned dnum=0;
  1546. size32_t pathlen = path.length();
  1547. Owned<IPropertyTreeIterator> dirs= dirtree->getElements("directory");
  1548. if (dirs->first()) {
  1549. do {
  1550. IPropertyTree &dir = dirs->query();
  1551. if (path.length()&&!isPathSepChar(path.charAt(path.length()-1)))
  1552. path.append(getPathSepChar(path.str()));
  1553. path.append(dir.queryProp("@name"));
  1554. xrefdir(ep,path,&dir,clustsize);
  1555. path.setLength(pathlen);
  1556. } while (dirs->next());
  1557. dnum++;
  1558. }
  1559. Owned<IPropertyTreeIterator> files= dirtree->getElements("file");
  1560. if (files->first()) {
  1561. do {
  1562. IPropertyTree &file = files->query();
  1563. const char *fname=file.queryProp("@name");
  1564. size32_t l = strlen(fname);
  1565. if ((l>4)&&(stricmp(fname+l-4,".crc")==0))
  1566. continue;
  1567. if (path.length()&&!isPathSepChar(path.charAt(path.length()-1)))
  1568. path.append(getPathSepChar(path.str()));
  1569. path.append(fname);
  1570. xreffile(ep,path.str(),&file);
  1571. path.setLength(pathlen);
  1572. fnum++;
  1573. } while (files->next());
  1574. }
  1575. manager.incDirSize(ep,path.str(),dirtree->getPropInt64("@size", 0),fnum,dnum,clustsize);
  1576. }
  1577. void xref(IPropertyTree *machine,unsigned clustsize)
  1578. {
  1579. CriticalBlock block(xrefsect);
  1580. StringBuffer path;
  1581. SocketEndpoint ep(machine->queryProp("@ip"));
  1582. ep.port = 0;
  1583. xrefdir(ep,path,machine,clustsize);
  1584. }
  1585. };
  1586. class CXRefManager: public CXRefManagerBase
  1587. {
  1588. #ifdef PARTS_SIZE_NEEDED
  1589. void fixSizeInDFS()
  1590. {
  1591. StringBuffer name;
  1592. __int64 sz;
  1593. ForEachItemIn(i,logicalnamelist) {
  1594. CLogicalNameEntry &item = logicalnamelist.item(i);
  1595. if (!item.done&&item.nummismatchedsizes) {
  1596. Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup(item.lname.get());
  1597. if (file) {
  1598. outf("checking %s\n",item.lname.get());
  1599. Owned<IDistributedFilePartIterator> partiter = file->getIterator();
  1600. unsigned partno=0;
  1601. __int64 total = 0;
  1602. ForEach(*partiter) {
  1603. partno++;
  1604. Owned<IDistributedFilePart> part = &partiter->get();
  1605. RemoteFilename rname;
  1606. part->getFilename(rname);
  1607. rname.getRemotePath(name.clear());
  1608. name.toLowerCase();
  1609. CFileEntry *entry= filemap.find(name.str());
  1610. if (entry) {
  1611. __int64 sz = part->queryAttributes().getPropInt64("@size", -1);
  1612. if (sz!=entry->size) {
  1613. StringBuffer s1;
  1614. entry->getLogicalName(s1);
  1615. outf("SIZEFIX: Changing size for %s from %" I64F "d to %" I64F "d\n",s1.str(),sz,entry->size);
  1616. part->lockProperties().setPropInt64("@size", entry->size);
  1617. part->unlockProperties();
  1618. }
  1619. if (total!=-1)
  1620. total += entry->size;
  1621. }
  1622. else
  1623. total = -1;
  1624. }
  1625. sz = file->queryAttributes().getPropInt64("@size", -1);
  1626. if (sz!=total) {
  1627. outf("SIZEFIX: Changing total size for %s from %" I64F "d to %" I64F "d\n",item.lname.get(),sz,total);
  1628. if (total!=-1)
  1629. file->lockProperties().setPropInt64("@size", total);
  1630. else
  1631. file->lockProperties().removeProp("@size");
  1632. file->unlockProperties();
  1633. }
  1634. }
  1635. }
  1636. }
  1637. outf("Size fix completed\n");
  1638. }
  1639. #endif
  1640. void xrefRemoteDirectories(IGroup *g,unsigned numdirs,const char **dirbaselist,unsigned numthreads)
  1641. {
  1642. if (numthreads<=1)
  1643. numthreads = 10; // should be safe now
  1644. StringBuffer dirlist;
  1645. unsigned j;
  1646. for (j=0;j<numdirs;j++) {
  1647. const char *basedir = dirbaselist[j];
  1648. if (!basedir||!*basedir)
  1649. continue;
  1650. if (dirlist.length())
  1651. dirlist.append(';');
  1652. dirlist.append(basedir);
  1653. }
  1654. CPhysicalXREF cxref(*this,numdirs,dirbaselist);
  1655. unsigned numfails=0;
  1656. class casyncfor: public CAsyncFor
  1657. {
  1658. CPhysicalXREF &cxref;
  1659. IGroup *g;
  1660. const char *dirlist;
  1661. bool abort;
  1662. CXRefManagerBase &manager;
  1663. CriticalSection crit;
  1664. unsigned numfails;
  1665. public:
  1666. casyncfor(IGroup *_g, const char *_dirlist, CPhysicalXREF &_cxref, CXRefManagerBase *_manager)
  1667. : cxref(_cxref), manager(*_manager)
  1668. {
  1669. g = _g;
  1670. dirlist = _dirlist;
  1671. abort = false;
  1672. numfails = 0;
  1673. }
  1674. void Do(unsigned idx)
  1675. {
  1676. if (abort)
  1677. return;
  1678. CriticalBlock block(manager.logsect); // only the get directory is async
  1679. try {
  1680. StringBuffer msg;
  1681. INode &node = g->queryNode(idx);
  1682. node.endpoint().getUrlStr(msg);
  1683. manager.log("Getting directories for %s",msg.str());
  1684. manager.addNodeInProgress(node);
  1685. Owned<IPropertyTree> results;
  1686. {
  1687. CriticalUnblock unblock(manager.logsect);
  1688. unsigned short port = getDafsPort(node.endpoint(),numfails,&crit);
  1689. results.setown(getDirectory(dirlist,&node,port));
  1690. }
  1691. manager.log("Crossreferencing %s",msg.str());
  1692. cxref.xref(results,g->ordinality());
  1693. manager.removeNodeInProgress(node);
  1694. }
  1695. catch (IException *)
  1696. {
  1697. abort = true;
  1698. throw;
  1699. }
  1700. }
  1701. } afor(g,dirlist.str(),cxref,this);
  1702. afor.For(g->ordinality(), numthreads,false,true);
  1703. ForEachItemIn(i1,logicalnamelist) {
  1704. CLogicalNameEntry &item = logicalnamelist.item(i1);
  1705. if (item.nummismatchedsizes) {
  1706. error(item.lname.get(),"%d part%s physical size differ from recorded size\n",item.nummismatchedsizes,item.nummismatchedsizes>1?"s":"");
  1707. }
  1708. if (item.crosslinked.ordinality()) {
  1709. StringBuffer to;
  1710. ForEachItemIn(cli1,item.crosslinked) {
  1711. if (cli1)
  1712. to.append(", ");
  1713. to.append(item.crosslinked.item(cli1).lname.get());
  1714. }
  1715. error(item.lname.get(),"Crosslinked with %s",to.str());
  1716. }
  1717. }
  1718. }
  1719. void listDir(IGroup *g,unsigned numdirs,const char **dirbaselist)
  1720. {
  1721. class cDirScan
  1722. {
  1723. public:
  1724. char pathsepchar;
  1725. void scanfile(SocketEndpoint &ep,const char *fullname,IPropertyTree *file)
  1726. {
  1727. const char * tail = fullname;
  1728. for (;;) {
  1729. const char *s = strchr(tail,pathsepchar);
  1730. if (!s)
  1731. break;
  1732. tail = s+1;
  1733. }
  1734. if (memicmp(tail+3,".HISTORY.",9)!=0)
  1735. return;
  1736. RemoteFilename rname;
  1737. StringBuffer name;
  1738. rname.setPath(ep,fullname);
  1739. rname.getRemotePath(name);
  1740. Owned<IFile> f= createIFile(name.str());
  1741. outf("%12" I64F "d %s\n",f->size(),name.str());
  1742. }
  1743. void scandir(SocketEndpoint &ep,StringBuffer &path,IPropertyTree *dirtree)
  1744. {
  1745. size32_t pathlen = path.length();
  1746. Owned<IPropertyTreeIterator> dirs= dirtree->getElements("directory");
  1747. if (dirs->first()) {
  1748. do {
  1749. IPropertyTree &dir = dirs->query();
  1750. if (path.length()&&(path.charAt(path.length()-1)!=pathsepchar))
  1751. path.append(pathsepchar);
  1752. path.append(dir.queryProp("@name"));
  1753. scandir(ep,path,&dir);
  1754. path.setLength(pathlen);
  1755. } while (dirs->next());
  1756. }
  1757. Owned<IPropertyTreeIterator> files= dirtree->getElements("file");
  1758. if (files->first()) {
  1759. do {
  1760. IPropertyTree &file = files->query();
  1761. const char *fname=file.queryProp("@name");
  1762. size32_t l = strlen(fname);
  1763. // if ((l>4)&&(stricmp(fname+l-4,".crc")==0))
  1764. // continue;
  1765. if (path.length()&&(path.charAt(path.length()-1)!=pathsepchar))
  1766. path.append(pathsepchar);
  1767. path.append(fname);
  1768. scanfile(ep,path.str(),&file);
  1769. path.setLength(pathlen);
  1770. } while (files->next());
  1771. }
  1772. }
  1773. void scan(IPropertyTree *machine)
  1774. {
  1775. StringBuffer path;
  1776. SocketEndpoint ep(machine->queryProp("@ip"));
  1777. ep.port = 0;
  1778. scandir(ep,path,machine);
  1779. }
  1780. } scandir;
  1781. StringBuffer dirlist;
  1782. unsigned j;
  1783. for (j=0;j<numdirs;j++) {
  1784. const char *basedir = dirbaselist[j];
  1785. if (!basedir||!*basedir)
  1786. continue;
  1787. if (dirlist.length())
  1788. dirlist.append(';');
  1789. dirlist.append(basedir);
  1790. }
  1791. unsigned numfails = 0;
  1792. for (unsigned i=0;i<g->ordinality();i++) {
  1793. INode &node = g->queryNode(i);
  1794. scandir.pathsepchar = getPathSepChar(dirlist.str());
  1795. unsigned short port = getDafsPort(node.endpoint(),numfails,NULL);
  1796. Owned<IPropertyTree> results = getDirectory(dirlist.str(),&node,port);
  1797. scandir.scan(results);
  1798. }
  1799. }
  1800. CriticalSection xrefsect;
  1801. void addn(StringBuffer &b,unsigned p,unsigned &firstp,unsigned &lastp)
  1802. {
  1803. if (p) {
  1804. if (firstp==0) {
  1805. firstp = p;
  1806. lastp = p;
  1807. }
  1808. else if (lastp+1==p)
  1809. lastp = p;
  1810. else {
  1811. addn(b,0,firstp,lastp);
  1812. firstp = p;
  1813. lastp = p;
  1814. }
  1815. }
  1816. else if (firstp!=0) {
  1817. if (b.length())
  1818. b.append(',');
  1819. b.append(firstp);
  1820. if (lastp!=firstp) {
  1821. b.append('-');
  1822. b.append(lastp);
  1823. }
  1824. firstp = 0;
  1825. }
  1826. }
  1827. const char *plural(const char *s)
  1828. {
  1829. if ((strchr(s,'-')!=NULL)||(strchr(s,',')!=NULL))
  1830. return "s";
  1831. else return "";
  1832. }
  1833. void outputTextReport(const char *filename)
  1834. {
  1835. Owned<IFile> file = createIFile(filename);
  1836. Owned<IFileIO> fileio = file->open(IFOcreate);
  1837. if (!fileio) {
  1838. printf("ERROR cannot create %s\n",filename);
  1839. return;
  1840. }
  1841. outfileio.setown(createIOStream(fileio)); SuperHashIteratorOf<CFileEntry> fileiter(filemap);
  1842. if (errors.ordinality()) {
  1843. outf("\n--------------------------------------------------------\nERRORS\n");
  1844. ForEachItemIn(i,errors) {
  1845. cMessage &item = errors.item(i);
  1846. outf("%s: %s\n",item.lname.get(),item.msg.get());
  1847. }
  1848. }
  1849. if (warnings.ordinality()) {
  1850. outf("\n--------------------------------------------------------\nWARNINGS:\n");
  1851. ForEachItemIn(i,warnings) {
  1852. cMessage &item = warnings.item(i);
  1853. outf("%s: %s\n",item.lname.get(),item.msg.get());
  1854. }
  1855. }
  1856. {
  1857. outf("\n--------------------------------------------------------\nUNRESOLVED PRIMARY PARTS:\n");
  1858. ForEach(fileiter) {
  1859. CFileEntry &item = fileiter.query();
  1860. if (!item.resolved()&&!item.replicate()) {
  1861. StringBuffer s1;
  1862. item.getLogicalName(s1);
  1863. outf("%s (%s)\n",s1.str(),item.fname.get());
  1864. }
  1865. }
  1866. }
  1867. {
  1868. outf("\n--------------------------------------------------------\nUNRESOLVED REPLICATE PARTS:\n");
  1869. ForEach(fileiter) {
  1870. CFileEntry &item = fileiter.query();
  1871. if (!item.resolved()&&item.replicate()) {
  1872. StringBuffer s1;
  1873. item.getLogicalName(s1);
  1874. outf("%s (%s)\n",s1.str(),item.fname.get());
  1875. }
  1876. }
  1877. }
  1878. outf("\n--------------------------------------------------------\nMISPLACED PARTS:\n");
  1879. {
  1880. ForEachItemIn(i,orphanlist) {
  1881. COrphanEntry &item = orphanlist.item(i);
  1882. if (!item.complete()&&item.misplaced) {
  1883. if (item.max>0)
  1884. outf("%s %s (found %d, size %" I64F "d, modified %s)\n",item.fname.get(),item.misplaced->lname.get(),item.num(),item.size,item.modified.get());
  1885. else
  1886. outf("%s (size %" I64F "d, modified %s)\n",item.fname.get(),item.size,item.modified.get());
  1887. StringBuffer s1;
  1888. item.getEps(s1,false);
  1889. outf(" primary: %s\n",s1.str());
  1890. item.getEps(s1.clear(),true);
  1891. outf(" replicate: %s\n",s1.str());
  1892. }
  1893. }
  1894. }
  1895. outf("\n--------------------------------------------------------\nORPHANS PARTIAL:\n");
  1896. {
  1897. ForEachItemIn(i,orphanlist) {
  1898. COrphanEntry &item = orphanlist.item(i);
  1899. if (!item.complete()&&!item.misplaced) {
  1900. if (item.max>0)
  1901. outf("%s (found %d, size %" I64F "d, modified %s)\n",item.fname.get(),item.num(),item.size,item.modified.get());
  1902. else
  1903. outf("%s (size %" I64F "d, modified %s)\n",item.fname.get(),item.size,item.modified.get());
  1904. StringBuffer s1;
  1905. item.getEps(s1,false);
  1906. outf(" primary: %s\n",s1.str());
  1907. item.getEps(s1.clear(),true);
  1908. outf(" replicate: %s\n",s1.str());
  1909. }
  1910. }
  1911. }
  1912. outf("\n--------------------------------------------------------\nSINGLETON ORPHANS:\n");
  1913. {
  1914. ForEachItemIn(i,orphanlist) {
  1915. COrphanEntry &item = orphanlist.item(i);
  1916. if (item.complete()) {
  1917. StringBuffer sname;
  1918. if (item.isSingleton()) {
  1919. if (item.singletonName(sname,false))
  1920. outf("%s (size %" I64F "d)\n",sname.str(),item.size);
  1921. if (item.singletonName(sname.clear(),true))
  1922. outf("%s (size %" I64F "d)\n",sname.str(),item.size);
  1923. }
  1924. }
  1925. }
  1926. }
  1927. outf("\n--------------------------------------------------------\nORPHANS COMPLETE:\n");
  1928. {
  1929. ForEachItemIn(i,orphanlist) {
  1930. COrphanEntry &item = orphanlist.item(i);
  1931. if (item.complete()) {
  1932. StringBuffer sname;
  1933. if (!item.isSingleton()) {
  1934. outf("%s %s(size %" I64F "d)\n",item.fname.get(),item.misplaced?item.misplaced->lname.get():"",item.size);
  1935. StringBuffer s1;
  1936. item.getEps(s1,false);
  1937. outf(" primary: %s\n",s1.str());
  1938. item.getEps(s1.clear(),true);
  1939. outf(" replicate: %s\n",s1.str());
  1940. }
  1941. }
  1942. }
  1943. }
  1944. outf("\n--------------------------------------------------------\nOUTSIDE DIRECTORIES:\n");
  1945. {
  1946. ForEachItemIn(i,logicalnamelist) {
  1947. CLogicalNameEntry &item = logicalnamelist.item(i);
  1948. if (!item.done&&item.outsidedir) {
  1949. outf("%s (directory %s, %d of %d)\n",item.lname.get(),item.errdir.str(),item.outsidedir,item.max);
  1950. item.done = true;
  1951. }
  1952. }
  1953. }
  1954. outf("\n--------------------------------------------------------\nOUTSIDE CLUSTERS:\n");
  1955. {
  1956. ForEachItemIn(i,logicalnamelist) {
  1957. CLogicalNameEntry &item = logicalnamelist.item(i);
  1958. if (!item.done&&item.outsidenodes.ordinality()) {
  1959. outf("%s (%d of %d)\n",item.lname.get(),item.outsidenodes.ordinality(),item.max);
  1960. SocketListCreator cr;
  1961. cr.addSockets(item.outsidenodes);
  1962. outf(" %s\n",cr.getText());
  1963. item.done = true;
  1964. }
  1965. }
  1966. }
  1967. outf("\n--------------------------------------------------------\nINCOMPLETE FILES:\n");
  1968. {
  1969. ForEachItemIn(i,logicalnamelist) {
  1970. CLogicalNameEntry &item = logicalnamelist.item(i);
  1971. if (!item.done&&item.incomplete()) {
  1972. outf("%s (found %d of %d",item.lname.get(),item.primarynum,item.max);
  1973. unsigned m=item.max-item.primarynum;
  1974. if (m<=10) {
  1975. StringBuffer unres;
  1976. for (unsigned j=0;j<item.max;j++) {
  1977. if (!item.primaryresolved[j]&&!item.replicateresolved[j]) {
  1978. if (unres.length()!=0)
  1979. unres.append(',');
  1980. unres.append(j+1);
  1981. }
  1982. }
  1983. outf(" missing parts: %s",unres.str());
  1984. }
  1985. outf(")\n");
  1986. item.done = true;
  1987. }
  1988. }
  1989. }
  1990. #ifdef PARTS_SIZE_NEEDED
  1991. outf("\n--------------------------------------------------------\nMISMATCHED SIZES:\n");
  1992. {
  1993. if (fixSizes)
  1994. fixSizeInDFS();
  1995. ForEachItemIn(i,logicalnamelist) {
  1996. CLogicalNameEntry &item = logicalnamelist.item(i);
  1997. if (!item.done&&item.nummismatchedsizes) {
  1998. outf("%s (%d of %d)\n",item.lname.get(),item.nummismatchedsizes,item.max);
  1999. }
  2000. }
  2001. }
  2002. #endif
  2003. outf("\n--------------------------------------------------------\nPRIMARY MISSING:\n");
  2004. {
  2005. ForEachItemIn(i,logicalnamelist) {
  2006. CLogicalNameEntry &item = logicalnamelist.item(i);
  2007. if (!item.done&&(item.primarynum!=item.max)) {
  2008. outf("%s (found %d of %d",item.lname.get(),item.primarynum,item.max);
  2009. unsigned m=item.max-item.primarynum;
  2010. if (m<=10) {
  2011. StringBuffer unres;
  2012. unsigned j0a;
  2013. for (j0a=0;j0a<item.max;j0a++) {
  2014. if (!item.primaryresolved[j0a]&&item.replicateresolved[j0a]) {
  2015. if (unres.length()!=0)
  2016. unres.append(',');
  2017. unres.append(j0a+1);
  2018. }
  2019. }
  2020. outf(" missing parts: %s",unres.str());
  2021. }
  2022. outf(")\n");
  2023. item.done = true;
  2024. }
  2025. }
  2026. }
  2027. outf("\n--------------------------------------------------------\nSECONDARY MISSING:\n");
  2028. {
  2029. ForEachItemIn(i,logicalnamelist) {
  2030. CLogicalNameEntry &item = logicalnamelist.item(i);
  2031. if (!item.done&&(item.replicatenum!=item.max)) {
  2032. outf("%s (found %d of %d",item.lname.get(),item.replicatenum,item.max);
  2033. unsigned m=item.max-item.primarynum;
  2034. if (m<=10) {
  2035. StringBuffer unres;
  2036. unsigned j0a;
  2037. for (j0a=0;j0a<item.max;j0a++) {
  2038. if (item.primaryresolved[j0a]&&!item.replicateresolved[j0a]) {
  2039. if (unres.length()!=0)
  2040. unres.append(',');
  2041. unres.append(j0a+1);
  2042. }
  2043. }
  2044. outf(" missing parts: %s",unres.str());
  2045. }
  2046. outf(")\n");
  2047. item.done = true;
  2048. }
  2049. }
  2050. }
  2051. outf("\n--------------------------------------------------------\nCROSSLINKED:\n");
  2052. {
  2053. ForEachItemIn(i1,logicalnamelist) {
  2054. CLogicalNameEntry &item = logicalnamelist.item(i1);
  2055. if (!item.done&&item.crosslinked.ordinality()) {
  2056. StringBuffer to;
  2057. ForEachItemIn(cli1,item.crosslinked) {
  2058. if (cli1)
  2059. to.append(", ");
  2060. to.append(item.crosslinked.item(cli1).lname.get());
  2061. }
  2062. outf("%s to %s\n",item.lname.get(),to.str());
  2063. item.done = true;
  2064. }
  2065. }
  2066. }
  2067. outf("\n--------------------------------------------------------\nGROUPS:\n");
  2068. {
  2069. ForEachItemIn(i,logicalnamelist) {
  2070. CLogicalNameEntry &item = logicalnamelist.item(i);
  2071. if (item.unknowngrp||item.mismatchgrp.get()||item.missinggrp) {
  2072. Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup(item.lname.get(),UNKNOWN_USER);
  2073. if (file) {
  2074. if (item.missinggrp)
  2075. outf("WARNING: Missing group for %s\n",item.lname.get());
  2076. else if (item.mismatchgrp.get()) {
  2077. // ** TBD check queryClusterName correct in following
  2078. StringBuffer tmp;
  2079. outf("ERROR: Group mismatch for %s, Group says %s but nodes match %s\n",item.lname.get(),file->getClusterName(0,tmp).str(),item.mismatchgrp.get());
  2080. }
  2081. else if (item.unknowngrp) {
  2082. // ** TBD check queryClusterName correct in following
  2083. StringBuffer tmp;
  2084. outf("WARNING: Unknown group %s for %s\n",file->getClusterName(0,tmp).str(),item.lname.get());
  2085. }
  2086. }
  2087. }
  2088. }
  2089. }
  2090. {
  2091. outf("\n--------------------------------------------------------\nDIRECTORIES:\n");
  2092. ForEachItemIn(i,dirlist) {
  2093. CDfuDirEntry &item = dirlist.item(i);
  2094. if (item.minsize<0)
  2095. item.minsize = 0;
  2096. StringBuffer s1;
  2097. if (!item.minip.isNull())
  2098. item.minip.getIpText(s1);
  2099. StringBuffer s2;
  2100. if (!item.maxip.isNull())
  2101. item.maxip.getIpText(s2);
  2102. StringBuffer skew;
  2103. item.getskew(skew);
  2104. outf("%s numfiles=%u totalsize=%" CF "d minsize=%" CF "d(%s) maxsize=%" CF "d(%s), skew=%s\n",item.name.get(),item.num,item.size,
  2105. item.minsize,s1.str(),item.maxsize,s2.str(),skew.str());
  2106. }
  2107. }
  2108. outf("\n--------------------------------------------------------\nOK:\n");
  2109. {
  2110. ForEachItemIn(i,logicalnamelist) {
  2111. CLogicalNameEntry &item = logicalnamelist.item(i);
  2112. if (!item.done) {
  2113. outf("%s\n",item.lname.get());
  2114. }
  2115. }
  2116. }
  2117. outf("TOTAL ORPHANS: %d files %" I64F "d bytes\n",totalNumOrphans,totalSizeOrphans);
  2118. outf("Row Compression: %d files %" I64F "d compressed %" I64F "d uncompressed\n",totalCompressed,totalCompressedSize, totalUncompressedSize);
  2119. outfileio.clear();
  2120. }
  2121. void outputCsvReport(const char *filename)
  2122. {
  2123. // more needs doing to make 'proper' csv
  2124. Owned<IFile> file = createIFile(filename);
  2125. Owned<IFileIO> fileio = file->open(IFOcreate);
  2126. if (!fileio) {
  2127. printf("ERROR cannot create %s\n",filename);
  2128. return;
  2129. }
  2130. outfileio.setown(createIOStream(fileio));
  2131. outf("===============================================================\n");
  2132. outf("\n--------------------------------------------------------\nORPHANS PARTIAL:\n");
  2133. {
  2134. ForEachItemIn(i,orphanlist) {
  2135. COrphanEntry &item = orphanlist.item(i);
  2136. if ((!item.complete())&&!item.misplaced) {
  2137. outf("%s,%d,%" I64F "d,%s\n",item.fname.get(),item.num(),item.size,item.modified.get());
  2138. }
  2139. }
  2140. }
  2141. outf("\n--------------------------------------------------------\nORPHANS PARTIAL FILES:\n");
  2142. {
  2143. ForEachItemIn(i,orphanlist) {
  2144. COrphanEntry &item = orphanlist.item(i);
  2145. if (!item.complete()) {
  2146. SocketEndpoint nullep;
  2147. ForEachItemIn(j1,item.primaryepa) {
  2148. SocketEndpoint ep=item.primaryepa.item(j1);
  2149. unsigned pn=item.primarypna.item(j1);
  2150. if (!ep.equals(nullep)) {
  2151. StringBuffer partname;
  2152. constructPartname(item.fname.get(),pn+1,partname,false);
  2153. RemoteFilename rfn;
  2154. rfn.setPath(ep,partname.str());
  2155. rfn.getRemotePath(partname.clear());
  2156. outf("%s,%s\n",partname.str(),item.misplaced?item.misplaced->lname.get():"");
  2157. }
  2158. }
  2159. ForEachItemIn(j2,item.replicateepa) {
  2160. SocketEndpoint ep=item.replicateepa.item(j2);
  2161. unsigned pn=item.replicatepna.item(j2);
  2162. if (!ep.equals(nullep)) {
  2163. StringBuffer partname;
  2164. constructPartname(item.fname.get(),pn+1,partname,true);
  2165. RemoteFilename rfn;
  2166. rfn.setPath(ep,partname.str());
  2167. rfn.getRemotePath(partname.clear());
  2168. outf("%s,%s\n",partname.str(),item.misplaced?item.misplaced->lname.get():"");
  2169. }
  2170. }
  2171. }
  2172. }
  2173. }
  2174. outf("\n--------------------------------------------------------\nORPHANS COMPLETE:\n");
  2175. {
  2176. ForEachItemIn(i,orphanlist) {
  2177. COrphanEntry &item = orphanlist.item(i);
  2178. if (item.complete()&&!item.isSingleton()) {
  2179. outf("%s,%d,%" I64F "d,%s %s\n",item.fname.get(),item.num(),item.size,item.modified.get(),item.misplaced?item.misplaced->lname.get():"");
  2180. }
  2181. }
  2182. }
  2183. outf("\n--------------------------------------------------------\nINCOMPLETE FILES:\n");
  2184. {
  2185. ForEachItemIn(i,logicalnamelist) {
  2186. CLogicalNameEntry &item = logicalnamelist.item(i);
  2187. item.done = false;
  2188. bool incomplete = false;
  2189. unsigned j0a;
  2190. for (j0a=0;j0a<item.max;j0a++)
  2191. if (!item.primaryresolved[j0a]&&!item.replicateresolved[j0a])
  2192. incomplete = true;
  2193. if (incomplete&&(!item.outsidedir)&&(item.outsidenodes.ordinality()==0)&&(item.primarynum!=item.max)) {
  2194. outf("%s,%d,%d,%d,%" I64F "d\n",item.lname.get(),item.primarynum,item.replicatenum,item.max,item.totalsize);
  2195. item.done = true;
  2196. }
  2197. }
  2198. }
  2199. outf("\n--------------------------------------------------------\nPRIMARY MISSING:\n");
  2200. {
  2201. ForEachItemIn(i,logicalnamelist) {
  2202. CLogicalNameEntry &item = logicalnamelist.item(i);
  2203. if (!item.outsidedir&&(item.outsidenodes.ordinality()==0)&&(item.primarynum!=item.max)) {
  2204. outf("%s,%d,%d,%" I64F "d\n",item.lname.get(),item.primarynum,item.max,item.totalsize);
  2205. item.done = true;
  2206. }
  2207. }
  2208. }
  2209. outf("\n--------------------------------------------------------\nSECONDARY MISSING:\n");
  2210. {
  2211. ForEachItemIn(i,logicalnamelist) {
  2212. CLogicalNameEntry &item = logicalnamelist.item(i);
  2213. if (!item.outsidedir&&(item.outsidenodes.ordinality()==0)&&(item.replicatenum!=item.max)) {
  2214. outf("%s,%d,%d,%" I64F "d\n",item.lname.get(),item.replicatenum,item.max,item.totalsize);
  2215. item.done = true;
  2216. }
  2217. }
  2218. }
  2219. outfileio.clear();
  2220. }
  2221. void outputBackupReport()
  2222. {
  2223. ForEachItemIn(i,logicalnamelist) {
  2224. CLogicalNameEntry &item = logicalnamelist.item(i);
  2225. if ((!item.outsidedir)&&(item.outsidenodes.ordinality()==0)) {
  2226. if ((item.primarynum!=item.max)||(item.replicatenum!=item.max)) {
  2227. StringBuffer cdrv;
  2228. StringBuffer ddrv;
  2229. StringBuffer missing;
  2230. unsigned firstc=0;
  2231. unsigned firstd=0;
  2232. unsigned firstm=0;
  2233. unsigned lastc=0;
  2234. unsigned lastd=0;
  2235. unsigned lastm=0;
  2236. unsigned j;
  2237. for (j=0;j<item.max;j++) {
  2238. if (!item.primaryresolved[j]) {
  2239. if (!item.replicateresolved[j]) {
  2240. addn(missing,j+1,firstm,lastm);
  2241. }
  2242. else {
  2243. addn(cdrv,j+1,firstc,lastc);
  2244. }
  2245. }
  2246. else if (!item.replicateresolved[j]) {
  2247. addn(ddrv,j+1,firstd,lastd);
  2248. }
  2249. }
  2250. addn(cdrv,0,firstc,lastc);
  2251. addn(ddrv,0,firstd,lastd);
  2252. addn(missing,0,firstm,lastm);
  2253. printf("%s\n",item.lname.get());
  2254. if (cdrv.length())
  2255. printf(" C: missing part%s %s\n",plural(cdrv.str()),cdrv.str());
  2256. if (ddrv.length())
  2257. printf(" D: missing part%s %s\n",plural(ddrv.str()),ddrv.str());
  2258. if (missing.length())
  2259. printf(" C: and D: missing part%s %s\n",plural(missing.str()),missing.str());
  2260. printf("\n");
  2261. }
  2262. }
  2263. }
  2264. }
  2265. static int compareDirectory(CInterface * const *le, CInterface * const *re)
  2266. {
  2267. const CDfuDirEntry *l = (const CDfuDirEntry *)*le;
  2268. const CDfuDirEntry *r = (const CDfuDirEntry *)*re;
  2269. __int64 dif = l->size-r->size;
  2270. if (dif<0)
  2271. return 1;
  2272. if (dif>0)
  2273. return -1;
  2274. return stricmp(l->name.get(),r->name.get());
  2275. }
  2276. IPropertyTree * outputTree()
  2277. {
  2278. log("Collating output");
  2279. IPropertyTree *out = createPTree("XREF");
  2280. IPropertyTree *orphans = addBranch(out,"Orphans");
  2281. IPropertyTree *found = addBranch(out,"Found");
  2282. IPropertyTree *lost = addBranch(out,"Lost");
  2283. IPropertyTree *message = addBranch(out,"Messages");
  2284. IPropertyTree *directories = addBranch(out,"Directories");
  2285. // Lost
  2286. {
  2287. DBGLOG("// Lost");
  2288. ForEachItemIn(i,logicalnamelist)
  2289. {
  2290. CLogicalNameEntry &item = logicalnamelist.item(i);
  2291. if ((!item.outsidedir)&&(item.outsidenodes.ordinality()==0)) {
  2292. if (item.incomplete()) { // need check for if replicated here
  2293. item.addFileBranch(lost,CFBname|CFBcluster|CFBsize|CFBnumparts|CFBpartslost|CFBprimarylost|CFBreplicatedlost|CFBmodified|CFBpartnode|CFBpartnum|CFBpartprimary|CFBpartreplicate);
  2294. item.done = true;
  2295. }
  2296. }
  2297. }
  2298. }
  2299. // Found
  2300. {
  2301. DBGLOG("// Found");
  2302. ForEachItemIn(i,orphanlist) {
  2303. COrphanEntry &item = orphanlist.item(i);
  2304. if (item.complete()) {
  2305. item.addFileBranch(found,CFBpartnode|CFBpartnum|CFBpartreplicate|CFBpartprimary|CFBnumparts|CFBcluster|CFBmodified|CFBsize);
  2306. }
  2307. }
  2308. }
  2309. // Orphans
  2310. {
  2311. DBGLOG("// Orphans");
  2312. ForEachItemIn(i,orphanlist) {
  2313. COrphanEntry &item = orphanlist.item(i);
  2314. if (!item.complete()) {
  2315. item.addFileBranch(orphans,CFBpartnode|CFBpartnum|CFBpartprimary|CFBnumparts|CFBmodified|CFBsize|CFBpartnode|CFBpartnum);
  2316. item.addFileBranch(orphans,CFBpartnode|CFBpartnum|CFBpartreplicate|CFBnumparts|CFBmodified|CFBsize|CFBpartnode|CFBpartnum);
  2317. }
  2318. }
  2319. }
  2320. // Messages
  2321. {
  2322. DBGLOG("// Messages");
  2323. {
  2324. ForEachItemIn(i,errors) {
  2325. cMessage &item = errors.item(i);
  2326. IPropertyTree *t = addBranch(message,"Error");
  2327. t->addProp("File",item.lname.get());
  2328. t->addProp("Text",item.msg.get());
  2329. }
  2330. }
  2331. {
  2332. ForEachItemIn(i,warnings) {
  2333. cMessage &item = warnings.item(i);
  2334. IPropertyTree *t = addBranch(message,"Warning");
  2335. t->addProp("File",item.lname.get());
  2336. t->addProp("Text",item.msg.get());
  2337. }
  2338. }
  2339. }
  2340. // Directories
  2341. {
  2342. dirlist.sort(compareDirectory);
  2343. DBGLOG("// Directories");
  2344. {
  2345. ForEachItemIn(i,dirlist) {
  2346. CDfuDirEntry &item = dirlist.item(i);
  2347. if (item.minsize<0)
  2348. item.minsize = 0;
  2349. if ((item.num==0)&&(item.size==0)&&(item.numdir>0)) // exclude intermediate empty dirs
  2350. continue;
  2351. IPropertyTree *t = addBranch(directories,"Directory");
  2352. t->addProp("Name",item.name.get());
  2353. t->addPropInt("Num",item.num);
  2354. t->addPropInt64("Size",item.size);
  2355. if (item.size) {
  2356. t->addPropInt64("MaxSize",item.maxsize);
  2357. StringBuffer s1;
  2358. if (!item.maxip.isNull())
  2359. item.maxip.getIpText(s1);
  2360. t->addProp("MaxIP",s1.str());
  2361. t->addPropInt64("MinSize",item.minsize);
  2362. s1.clear();
  2363. if (!item.minip.isNull())
  2364. item.minip.getIpText(s1);
  2365. t->addProp("MinIP",s1.str());
  2366. item.getskew(s1.clear());
  2367. if (s1.length())
  2368. t->addProp("Skew",s1.str());
  2369. }
  2370. }
  2371. }
  2372. }
  2373. return out;
  2374. }
  2375. public:
  2376. IPropertyTree *process(unsigned nclusters,const char **clusters,unsigned numdirs,const char **dirbaselist,unsigned flags,IXRefProgressCallback *_msgcallback,unsigned numthreads)
  2377. {
  2378. CriticalBlock block(xrefsect);
  2379. msgcallback.set(_msgcallback);
  2380. IPropertyTree *out=NULL;
  2381. Owned<IGroup> g;
  2382. unsigned j;
  2383. if (!nclusters) {
  2384. error("XREF","No clusters specified\n");
  2385. return NULL;
  2386. }
  2387. if (!numdirs) {
  2388. error("XREF","No directories specified\n");
  2389. return NULL;
  2390. }
  2391. for (j=0;j<nclusters;j++) {
  2392. Owned<IGroup> gsub = queryNamedGroupStore().lookup(clusters[j]);
  2393. if (!gsub) {
  2394. error(clusters[j],"Could not find cluster group");
  2395. return NULL;
  2396. }
  2397. if (!g)
  2398. g.set(gsub.get());
  2399. else
  2400. g.setown(g->combine(gsub.get()));
  2401. }
  2402. totalSizeOrphans =0;
  2403. totalNumOrphans = 0;
  2404. logicalnamelist.kill();
  2405. dirlist.kill();
  2406. orphanlist.kill();
  2407. const char* cluster = clusters[0];
  2408. loadFromDFS(*this,g,numdirs,dirbaselist,cluster);
  2409. xrefRemoteDirectories(g,numdirs,dirbaselist,numthreads);
  2410. StringBuffer filename;
  2411. filename.clear().append("xrefrpt");
  2412. addFileTimestamp(filename, true);
  2413. filename.append(".txt");
  2414. if (flags&PMtextoutput)
  2415. outputTextReport(filename.str());
  2416. filename.clear().append("xrefrpt");
  2417. addFileTimestamp(filename, true);
  2418. filename.append(".txt");
  2419. if (flags&PMcsvoutput)
  2420. outputCsvReport(filename.str());
  2421. if (flags&PMbackupoutput)
  2422. outputBackupReport();
  2423. if (flags&PMtreeoutput)
  2424. out = outputTree();
  2425. logicalnamemap.kill();
  2426. filemap.kill();
  2427. orphanmap.kill();
  2428. dirmap.kill();
  2429. logicaldirmap.kill();
  2430. log("Complete");
  2431. DBGLOG("Finished...");
  2432. return out;
  2433. }
  2434. };
  2435. IPropertyTree * runXRef(unsigned nclusters,const char **clusters,IXRefProgressCallback *callback,unsigned numthreads)
  2436. {
  2437. if (nclusters==0)
  2438. return NULL;
  2439. CXRefManager xrefmanager;
  2440. const char *dirs[2];
  2441. unsigned numdirs;
  2442. #ifdef _WIN32
  2443. bool islinux = false;
  2444. #else
  2445. bool islinux = true;
  2446. #endif
  2447. // assume all nodes same OS
  2448. Owned<IGroup> group = queryNamedGroupStore().lookup(clusters[0]);
  2449. if (group)
  2450. islinux = queryOS(group->queryNode(0).endpoint())==MachineOsLinux;
  2451. dirs[0] = queryBaseDirectory(grp_unknown, 0,islinux?DFD_OSunix:DFD_OSwindows); // MORE - should use the info from the group store
  2452. dirs[1] = queryBaseDirectory(grp_unknown, 1,islinux?DFD_OSunix:DFD_OSwindows);
  2453. numdirs = 2;
  2454. IPropertyTree *ret=NULL;
  2455. try {
  2456. ret = xrefmanager.process(nclusters,clusters,numdirs,dirs,PMtreeoutput,callback,numthreads);
  2457. }
  2458. catch (IException *e) {
  2459. StringBuffer s;
  2460. e->errorMessage(s);
  2461. if (callback)
  2462. callback->error(s.str());
  2463. else
  2464. IERRLOG("%s",s.str());
  2465. }
  2466. return ret;
  2467. }
  2468. IPropertyTree * runXRefCluster(const char *cluster,IXRefNode *nodeToUpdate)
  2469. {
  2470. DBGLOG("runXRefCluster starting for cluster %s",cluster);
  2471. CXRefManager xrefmanager;
  2472. IPropertyTree *ret=NULL;
  2473. IXRefProgressCallback* callback = dynamic_cast<IXRefProgressCallback*>(nodeToUpdate);
  2474. try {
  2475. const char *clusters[1];
  2476. clusters[0] = cluster;
  2477. ret = runXRef(1, clusters,callback,4); // only single thread for time being
  2478. // xrefmanager.process(1,clusters,4,dirs,PMtreeoutput,callback);
  2479. }
  2480. catch (IException *e) {
  2481. StringBuffer s;
  2482. e->errorMessage(s);
  2483. if (callback)
  2484. callback->error(s.str());
  2485. else
  2486. IERRLOG("%s",s.str());
  2487. }
  2488. if(ret)
  2489. {
  2490. DBGLOG("runXRefCluster building DFU node for cluster %s",cluster);
  2491. nodeToUpdate->BuildXRefData(*ret,cluster);
  2492. nodeToUpdate->commit();
  2493. }
  2494. return ret;
  2495. }
  2496. IPropertyTree * RunProcess(XRefCmd cmd, unsigned nclusters,const char **clusters,unsigned numArgs,const char **args,unsigned flags,IXRefProgressCallback *_msgcallback,unsigned numthreads)
  2497. {
  2498. //Provide a wrapper for the command line
  2499. switch (cmd)
  2500. {
  2501. case xrefUpdate:
  2502. {
  2503. if (flags & PMupdateeclwatch)
  2504. {
  2505. if (nclusters==1)
  2506. {
  2507. const char *cluster = *clusters;
  2508. CXRefNodeManager nodemanager;
  2509. Owned<IPropertyTree> tree = runXRef(nclusters,clusters,NULL,numthreads);
  2510. if (tree)
  2511. {
  2512. Owned<IXRefNode> xRefNode = nodemanager.getXRefNode(cluster);
  2513. if (!xRefNode.get())
  2514. xRefNode.setown( nodemanager.CreateXRefNode(cluster));
  2515. xRefNode->setCluster(cluster);
  2516. xRefNode->BuildXRefData(*tree.get(),cluster);
  2517. xRefNode->commit();
  2518. }
  2519. }
  2520. else
  2521. {
  2522. // do clusters 1 at time
  2523. for (unsigned i = 0; i<nclusters; i++)
  2524. RunProcess(cmd,1,clusters+i,numArgs,args,flags,_msgcallback,numthreads);
  2525. }
  2526. }
  2527. break;
  2528. }
  2529. case xrefScan:
  2530. {
  2531. CXRefManager xrefmanager;
  2532. return xrefmanager.process(nclusters,clusters,numArgs,args,flags,_msgcallback,numthreads);
  2533. }
  2534. case xrefListFound:
  2535. case xrefAttachFound:
  2536. {
  2537. if (1 == nclusters)
  2538. {
  2539. const char *match = "*"; // default
  2540. if (numArgs)
  2541. match = args[0];
  2542. const char *cluster = clusters[0];
  2543. Owned<IXRefNodeManager> XRefNodeManager = CreateXRefNodeFactory();
  2544. Owned<IConstXRefNode> xRefNode = XRefNodeManager->getXRefNode(cluster);
  2545. if (!xRefNode)
  2546. UWARNLOG("Cannot find XREF info for cluster: %s", cluster);
  2547. else
  2548. {
  2549. StringBuffer partMask;
  2550. if (streq("*", match))
  2551. partMask.append(match);
  2552. else
  2553. {
  2554. partMask.append(xRefNode->queryRootDir());
  2555. addPathSepChar(partMask);
  2556. const char *s = match;
  2557. do
  2558. {
  2559. const char *next = strstr(s, "::");
  2560. if (!next)
  2561. {
  2562. partMask.append(s);
  2563. break;
  2564. }
  2565. else
  2566. partMask.append(next-s, s).append('/');
  2567. s = next+2;
  2568. }
  2569. while (true);
  2570. partMask.append(".*");
  2571. }
  2572. Owned<IXRefFilesNode> foundFiles = xRefNode->getFoundFiles();
  2573. Owned<IPropertyTreeIterator> iter = foundFiles->getMatchingFiles(partMask, "Partmask");
  2574. unsigned processed = 0;
  2575. StringArray matched;
  2576. ForEach(*iter)
  2577. {
  2578. IPropertyTree &file = iter->query();
  2579. const char *partMask = file.queryProp("Partmask");
  2580. switch (cmd)
  2581. {
  2582. case xrefListFound:
  2583. {
  2584. CDfsLogicalFileName lfn;
  2585. if (!lfn.setFromMask(partMask, xRefNode->queryRootDir()))
  2586. {
  2587. fprintf(stderr, "Error processing partMask=%s, could not deduce logical filename\n", partMask);
  2588. continue;
  2589. }
  2590. const char *logicalName = lfn.get();
  2591. unsigned __int64 size = file.getPropInt64("Size");
  2592. const char *modified = file.queryProp("Modified");
  2593. unsigned numParts = file.getPropInt("Numparts");
  2594. if (!processed)
  2595. {
  2596. PROGLOG("Name,Size,Modified,NumParts");
  2597. PROGLOG("===========================");
  2598. }
  2599. PROGLOG("%s,%" I64F "u,%s,%u", logicalName, size, modified, numParts);
  2600. ++processed;
  2601. break;
  2602. }
  2603. case xrefAttachFound:
  2604. {
  2605. matched.append(partMask);
  2606. break;
  2607. }
  2608. default:
  2609. throwUnexpected();
  2610. }
  2611. }
  2612. switch (cmd)
  2613. {
  2614. case xrefListFound:
  2615. {
  2616. PROGLOG("%u files found", processed);
  2617. break;
  2618. }
  2619. case xrefAttachFound:
  2620. {
  2621. iter.clear();
  2622. PROGLOG("%u found files matched", matched.ordinality());
  2623. ForEachItemIn(f, matched)
  2624. {
  2625. const char *partMask = matched.item(f);
  2626. CDfsLogicalFileName lfn;
  2627. if (!lfn.setFromMask(partMask, xRefNode->queryRootDir()))
  2628. {
  2629. fprintf(stderr, "Error processing partMask=%s, could not deduce logical filename\n", partMask);
  2630. continue;
  2631. }
  2632. const char *logicalName = lfn.get();
  2633. StringBuffer errStr;
  2634. if (foundFiles->AttachPhysical(partMask, nullptr, cluster, errStr))
  2635. {
  2636. PROGLOG("File '%s' attached", lfn.get());
  2637. processed++;
  2638. }
  2639. else
  2640. fprintf(stderr, "%s\n", errStr.str());
  2641. }
  2642. PROGLOG("%u files re-attached. Committing xref meta data changes..", processed);
  2643. if (processed>0)
  2644. foundFiles->Commit();
  2645. break;
  2646. }
  2647. default:
  2648. break;
  2649. }
  2650. }
  2651. }
  2652. else
  2653. {
  2654. // do clusters 1 at time
  2655. for (unsigned i = 0; i<nclusters; i++)
  2656. RunProcess(cmd,1,clusters+i,numArgs,args,flags,_msgcallback,numthreads);
  2657. }
  2658. break;
  2659. }
  2660. }
  2661. return nullptr;
  2662. }