dafdesc.cpp 96 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #define da_decl DECL_EXPORT
  14. #include "platform.h"
  15. #include "portlist.h"
  16. #include "jlib.hpp"
  17. #include "jfile.hpp"
  18. #include "jiter.ipp"
  19. #include "jmisc.hpp"
  20. #include "jexcept.hpp"
  21. #include "jptree.hpp"
  22. #include "jlzw.hpp"
  23. #include "dafdesc.hpp"
  24. #include "rmtfile.hpp"
  25. #include "dautils.hpp"
  26. #include "dasds.hpp"
  27. #include "dafdesc.hpp"
  28. #include "dadfs.hpp"
  29. #define INCLUDE_1_OF_1 // whether to use 1_of_1 for single part files
  30. #define SDS_CONNECT_TIMEOUT (1000*60*60*2) // better than infinite
  31. // These are legacy and cannot be changed.
  32. #define SERIALIZATION_VERSION ((byte)0xd4)
  33. #define SERIALIZATION_VERSION2 ((byte)0xd5) // with trailing superfile info
  34. bool isMulti(const char *str)
  35. {
  36. if (str&&!isSpecialPath(str))
  37. loop {
  38. switch (*str) {
  39. case ',':
  40. case '*':
  41. case '?':
  42. return true;
  43. case 0:
  44. return false;
  45. }
  46. str++;
  47. }
  48. return false;
  49. }
  50. bool isCompressed(IPropertyTree &props, bool *blocked)
  51. {
  52. if (props.getPropBool("@blockCompressed"))
  53. {
  54. if (blocked) *blocked = true;
  55. return true;
  56. }
  57. else
  58. {
  59. if (blocked) *blocked = false;
  60. return props.getPropBool("@rowCompressed");
  61. }
  62. }
  63. bool getCrcFromPartProps(IPropertyTree &fileattr,IPropertyTree &props, unsigned &crc)
  64. {
  65. if (props.hasProp("@fileCrc"))
  66. {
  67. crc = (unsigned)props.getPropInt64("@fileCrc");
  68. return true;
  69. }
  70. // NB: old @crc keys and compressed were not crc of file but of data within.
  71. const char *kind = props.queryProp("@kind");
  72. if (kind&&strcmp(kind,"key")) // key part
  73. return false;
  74. bool blocked;
  75. if (isCompressed(fileattr,&blocked)) {
  76. if (!blocked)
  77. return false;
  78. crc = COMPRESSEDFILECRC;
  79. return true;
  80. }
  81. if (!props.hasProp("@crc"))
  82. return false;
  83. crc = (unsigned)props.getPropInt64("@crc");
  84. return true;
  85. }
  86. void ClusterPartDiskMapSpec::setRoxie (unsigned redundancy, unsigned channelsPerNode, int _replicateOffset)
  87. {
  88. flags = 0;
  89. replicateOffset = _replicateOffset?_replicateOffset:1;
  90. defaultCopies = redundancy+1;
  91. if ((channelsPerNode>1)&&(redundancy==0)) {
  92. flags |= CPDMSF_wrapToNextDrv;
  93. flags |= CPDMSF_overloadedConfig;
  94. maxDrvs = channelsPerNode;
  95. }
  96. else
  97. maxDrvs = (redundancy>1)?(redundancy+1):2;
  98. if (_replicateOffset==0)
  99. flags |= CPDMSF_fillWidth;
  100. startDrv = 0;
  101. }
  102. bool ClusterPartDiskMapSpec::calcPartLocation (unsigned part, unsigned maxparts, unsigned copy, unsigned clusterwidth, unsigned &node, unsigned &drv)
  103. {
  104. // this is more cryptic than it could be (e.g. by special casing)
  105. // because it handles the cases that aren't going to ever happen, in a general way
  106. node = 0;
  107. drv = 0;
  108. if (!clusterwidth||!maxparts)
  109. return false;
  110. if (part>=maxparts)
  111. return false;
  112. unsigned nc = numCopies(part,clusterwidth,maxparts);
  113. if (copy>=nc)
  114. return false;
  115. unsigned dc=defaultCopies?defaultCopies:DFD_DefaultCopies;
  116. drv = startDrv;
  117. bool fw = (flags&CPDMSF_fillWidth)!=0;
  118. if (fw&&(maxparts>clusterwidth/2))
  119. fw = false;
  120. // calc primary
  121. node = part%clusterwidth;
  122. unsigned repdrv = startDrv+1;
  123. if (flags&CPDMSF_wrapToNextDrv) {
  124. drv += startDrv+(part/clusterwidth)%maxDrvs;
  125. repdrv = (1+(maxparts-1)/clusterwidth)%maxDrvs;
  126. }
  127. if (copy) {
  128. if (fw) {
  129. if (interleave>1)
  130. ERRLOG("ClusterPartDiskMapSpec interleave not allowed if fill width set");
  131. if (flags&CPDMSF_repeatedPart)
  132. ERRLOG("ClusterPartDiskMapSpec repeated part not allowed if fill width set");
  133. unsigned m = clusterwidth/maxparts;
  134. drv = startDrv+(repdrv+(copy/m-1))%maxDrvs;
  135. node += (copy%m)*maxparts;
  136. }
  137. else if ((flags&CPDMSF_repeatedPart)) {
  138. if (flags&CPDMSF_wrapToNextDrv)
  139. ERRLOG("ClusterPartDiskMapSpec repeated part not allowed if wrap to next drive set");
  140. unsigned repnum = copy%dc;
  141. unsigned nodenum = copy/dc;
  142. drv = startDrv+repnum%maxDrvs;
  143. if (interleave>1)
  144. node = (node+nodenum+(replicateOffset*repnum*interleave))%clusterwidth;
  145. else
  146. node = (node+nodenum+(replicateOffset*repnum))%clusterwidth;
  147. }
  148. else {
  149. drv = startDrv+(repdrv+copy-1)%maxDrvs;
  150. if (interleave>1)
  151. node = (node+(replicateOffset*copy*interleave))%clusterwidth;
  152. else
  153. node = (node+(replicateOffset*copy))%clusterwidth;
  154. }
  155. }
  156. return true;
  157. }
  158. inline void setPropDef(IPropertyTree *tree,const char *prop,int val,int def)
  159. {
  160. if (val!=def)
  161. tree->setPropInt(prop,val);
  162. else
  163. tree->removeProp(prop);
  164. }
  165. inline int getPropDef(IPropertyTree *tree,const char *prop,int def)
  166. {
  167. if (tree)
  168. return tree->getPropInt(prop,def);
  169. return def;
  170. }
  171. void ClusterPartDiskMapSpec::toProp(IPropertyTree *tree)
  172. {
  173. if (!tree)
  174. return;
  175. setPropDef(tree,"@replicateOffset",replicateOffset,1);
  176. setPropDef(tree,"@redundancy",defaultCopies?(defaultCopies-1):1,1);
  177. setPropDef(tree,"@maxDrvs",maxDrvs?maxDrvs:2,2);
  178. setPropDef(tree,"@startDrv",startDrv,0);
  179. setPropDef(tree,"@interleave",interleave,0);
  180. setPropDef(tree,"@mapFlags",flags,0);
  181. setPropDef(tree,"@repeatedPart",repeatedPart,(int)CPDMSRP_notRepeated);
  182. if (defaultBaseDir.isEmpty())
  183. tree->removeProp("@defaultBaseDir");
  184. else
  185. tree->setProp("@defaultBaseDir",defaultBaseDir);
  186. if (defaultReplicateDir.isEmpty())
  187. tree->removeProp("@defaultReplicateDir");
  188. else
  189. tree->setProp("@defaultReplicateDir",defaultReplicateDir);
  190. }
  191. void ClusterPartDiskMapSpec::fromProp(IPropertyTree *tree)
  192. {
  193. unsigned defrep = 1;
  194. // if directory is specified then must match default base to be default replicated
  195. StringBuffer dir;
  196. if (tree&&tree->getProp("@directory",dir)) {
  197. const char * base = queryBaseDirectory(grp_unknown, 0, SepCharBaseOs(getPathSepChar(dir.str())));
  198. size32_t l = strlen(base);
  199. if ((memcmp(base,dir.str(),l)!=0)||((l!=dir.length())&&!isPathSepChar(dir.charAt(l))))
  200. defrep = 0;
  201. }
  202. replicateOffset = getPropDef(tree,"@replicateOffset",1);
  203. defaultCopies = getPropDef(tree,"@redundancy",defrep)+1;
  204. maxDrvs = (byte)getPropDef(tree,"@maxDrvs",2);
  205. startDrv = (byte)getPropDef(tree,"@startDrv",defrep?0:getPathDrive(dir.str()));
  206. interleave = getPropDef(tree,"@interleave",0);
  207. flags = (byte)getPropDef(tree,"@mapFlags",0);
  208. repeatedPart = (unsigned)getPropDef(tree,"@repeatedPart",(int)CPDMSRP_notRepeated);
  209. setDefaultBaseDir(tree->queryProp("@defaultBaseDir"));
  210. setDefaultReplicateDir(tree->queryProp("@defaultReplicateDir"));
  211. }
  212. void ClusterPartDiskMapSpec::serialize(MemoryBuffer &mb)
  213. {
  214. mb.append(flags);
  215. mb.append(replicateOffset);
  216. mb.append(defaultCopies);
  217. mb.append(startDrv);
  218. mb.append(maxDrvs);
  219. mb.append(interleave);
  220. if (flags&CPDMSF_repeatedPart)
  221. mb.append(repeatedPart);
  222. if (flags&CPDMSF_defaultBaseDir)
  223. mb.append(defaultBaseDir);
  224. if (flags&CPDMSF_defaultReplicateDir)
  225. mb.append(defaultReplicateDir);
  226. }
  227. void ClusterPartDiskMapSpec::deserialize(MemoryBuffer &mb)
  228. {
  229. mb.read(flags);
  230. mb.read(replicateOffset);
  231. mb.read(defaultCopies);
  232. mb.read(startDrv);
  233. mb.read(maxDrvs);
  234. mb.read(interleave);
  235. if (flags&CPDMSF_repeatedPart)
  236. mb.read(repeatedPart);
  237. else
  238. repeatedPart = CPDMSRP_notRepeated;
  239. if (flags&CPDMSF_defaultBaseDir)
  240. mb.read(defaultBaseDir);
  241. else
  242. defaultBaseDir.clear();
  243. if (flags&CPDMSF_defaultReplicateDir)
  244. mb.read(defaultReplicateDir);
  245. else
  246. defaultReplicateDir.clear();
  247. }
  248. void ClusterPartDiskMapSpec::ensureReplicate()
  249. {
  250. if (defaultCopies <= DFD_NoCopies)
  251. defaultCopies = DFD_DefaultCopies;
  252. }
  253. bool ClusterPartDiskMapSpec::isReplicated() const
  254. {
  255. // If defaultCopies is zero (deprecated/legacy), the default value for replicated is true
  256. // Else, if it has any copy (>= 2), than it is replicated
  257. return defaultCopies != DFD_NoCopies;
  258. }
  259. unsigned ClusterPartDiskMapSpec::numCopies(unsigned part,unsigned clusterwidth, unsigned filewidth)
  260. {
  261. if (flags&CPDMSF_repeatedPart) {
  262. if (repeatedPart&CPDMSRP_lastRepeated) {
  263. if (part+1==filewidth)
  264. return clusterwidth*defaultCopies;
  265. }
  266. else if ((part==(repeatedPart&CPDMSRP_partMask))||(repeatedPart&CPDMSRP_allRepeated))
  267. return clusterwidth*defaultCopies;
  268. if (repeatedPart&CPDMSRP_onlyRepeated)
  269. return 0;
  270. }
  271. return defaultCopies;
  272. }
  273. void ClusterPartDiskMapSpec::setRepeatedCopies(unsigned partnum,bool onlyrepeats)
  274. {
  275. repeatedPart = partnum;
  276. if (partnum!=CPDMSRP_notRepeated) {
  277. flags |= CPDMSF_repeatedPart;
  278. if (onlyrepeats)
  279. repeatedPart |= CPDMSRP_onlyRepeated;
  280. }
  281. else
  282. flags &= ~CPDMSF_repeatedPart;
  283. }
  284. void ClusterPartDiskMapSpec::setDefaultBaseDir(const char *dir)
  285. {
  286. defaultBaseDir.set(dir);
  287. if (defaultBaseDir.isEmpty())
  288. flags &= ~CPDMSF_defaultBaseDir;
  289. else
  290. flags |= CPDMSF_defaultBaseDir;
  291. }
  292. void ClusterPartDiskMapSpec::setDefaultReplicateDir(const char *dir)
  293. {
  294. defaultReplicateDir.set(dir);
  295. if (defaultReplicateDir.isEmpty())
  296. flags &= ~CPDMSF_defaultReplicateDir;
  297. else
  298. flags |= CPDMSF_defaultReplicateDir;
  299. }
  300. ClusterPartDiskMapSpec & ClusterPartDiskMapSpec::operator=(const ClusterPartDiskMapSpec &other)
  301. {
  302. replicateOffset = other.replicateOffset;
  303. defaultCopies = other.defaultCopies;
  304. maxDrvs = other.maxDrvs;
  305. startDrv = other.startDrv;
  306. flags = other.flags;
  307. interleave = other.interleave;
  308. repeatedPart = other.repeatedPart;
  309. setDefaultBaseDir(other.defaultBaseDir);
  310. setDefaultReplicateDir(other.defaultReplicateDir);
  311. return *this;
  312. }
  313. // --------------------------------------------------------
  314. static void removeDir(const char *name,const char *dir,StringBuffer &out)
  315. {
  316. const char *s=name;
  317. const char *d=dir;
  318. if (d&&*d) {
  319. while (*s&&(toupper(*s)==toupper(*d))) {
  320. s++;
  321. d++;
  322. }
  323. if ((*d==0)&&isPathSepChar(*s)) // support cross OS
  324. name = s+1;
  325. }
  326. out.append(name);
  327. }
  328. #define RO_SINGLE_PART (0x40000000) // used for singletons
  329. struct CClusterInfo: implements IClusterInfo, public CInterface
  330. {
  331. Linked<IGroup> group;
  332. StringAttr name; // group name
  333. ClusterPartDiskMapSpec mspec;
  334. void checkClusterName(INamedGroupStore *resolver)
  335. {
  336. // check name matches group
  337. if (resolver&&group) {
  338. if (!name.isEmpty()) {
  339. StringBuffer defaultDir;
  340. GroupType groupType;
  341. Owned<IGroup> lgrp = resolver->lookup(name, defaultDir, groupType);
  342. if (lgrp&&lgrp->equals(group))
  343. {
  344. if (mspec.defaultBaseDir.isEmpty())
  345. {
  346. mspec.setDefaultBaseDir(defaultDir); // MORE - should possibly set up the rest of the mspec info from the group info here
  347. }
  348. if (mspec.defaultCopies>1 && mspec.defaultReplicateDir.isEmpty())
  349. {
  350. mspec.setDefaultReplicateDir(queryBaseDirectory(groupType, 1)); // MORE - not sure this is strictly correct
  351. }
  352. return; // ok
  353. }
  354. name.clear();
  355. }
  356. StringBuffer gname;
  357. if (resolver->find(group,gname,true)||(group->ordinality()>1))
  358. name.set(gname);
  359. }
  360. }
  361. public:
  362. IMPLEMENT_IINTERFACE;
  363. CClusterInfo(MemoryBuffer &mb,INamedGroupStore *resolver)
  364. {
  365. StringAttr grptext;
  366. mb.read(grptext);
  367. if (!grptext.isEmpty())
  368. group.setown(createIGroup(grptext));
  369. mspec.deserialize(mb);
  370. mb.read(name);
  371. checkClusterName(resolver);
  372. }
  373. CClusterInfo(const char *_name,IGroup *_group,const ClusterPartDiskMapSpec &_mspec,INamedGroupStore *resolver)
  374. : name(_name),group(_group)
  375. {
  376. name.toLowerCase();
  377. mspec =_mspec;
  378. checkClusterName(resolver);
  379. }
  380. CClusterInfo(IPropertyTree *pt,INamedGroupStore *resolver,unsigned flags)
  381. {
  382. if (!pt)
  383. return;
  384. name.set(pt->queryProp("@name"));
  385. mspec.fromProp(pt);
  386. if ((((flags&IFDSF_EXCLUDE_GROUPS)==0)||name.isEmpty())&&pt->hasProp("Group"))
  387. group.setown(createIGroup(pt->queryProp("Group")));
  388. if (!name.isEmpty()&&!group.get()&&resolver)
  389. {
  390. StringBuffer defaultDir;
  391. GroupType groupType;
  392. group.setown(resolver->lookup(name.get(), defaultDir, groupType));
  393. // MORE - common some of this with checkClusterName?
  394. if (mspec.defaultBaseDir.isEmpty())
  395. {
  396. mspec.setDefaultBaseDir(defaultDir); // MORE - should possibly set up the rest of the mspec info from the group info here
  397. }
  398. if (mspec.defaultCopies>1 && mspec.defaultReplicateDir.isEmpty())
  399. {
  400. mspec.setDefaultReplicateDir(queryBaseDirectory(groupType, 1)); // MORE - not sure this is strictly correct
  401. }
  402. }
  403. else
  404. checkClusterName(resolver);
  405. }
  406. const char *queryGroupName()
  407. {
  408. return name.isEmpty()?NULL:name.get();
  409. }
  410. IGroup *queryGroup(IGroupResolver *resolver)
  411. {
  412. if (!group&&!name.isEmpty()&&resolver)
  413. group.setown(resolver->lookup(name));
  414. return group.get();
  415. }
  416. StringBuffer &getGroupName(StringBuffer &ret,IGroupResolver *resolver)
  417. {
  418. if (name.isEmpty()) {
  419. if (group)
  420. {
  421. if (resolver)
  422. resolver->find(group,ret,true); // this will set single node as well
  423. else if (group->ordinality()==1)
  424. group->getText(ret);
  425. }
  426. }
  427. else
  428. ret.append(name);
  429. return ret;
  430. }
  431. void serialize(MemoryBuffer &mb)
  432. {
  433. StringBuffer grptext;
  434. if (group)
  435. group->getText(grptext);
  436. mb.append(grptext);
  437. mspec.serialize(mb);
  438. mb.append(name);
  439. }
  440. INode *queryNode(unsigned idx,unsigned maxparts,unsigned copy)
  441. {
  442. if (!group.get())
  443. return queryNullNode();
  444. unsigned nn;
  445. unsigned dn;
  446. if (!mspec.calcPartLocation (idx,maxparts,copy, group->ordinality(), nn, dn))
  447. return queryNullNode();
  448. return &group->queryNode(nn);
  449. }
  450. unsigned queryDrive(unsigned idx,unsigned maxparts,unsigned copy)
  451. {
  452. if (!group.get())
  453. return 0;
  454. unsigned nn;
  455. unsigned dn;
  456. mspec.calcPartLocation (idx,maxparts,copy, group->ordinality(), nn, dn);
  457. return dn;
  458. }
  459. void serializeTree(IPropertyTree *pt,unsigned flags)
  460. {
  461. mspec.toProp(pt);
  462. if (group&&(((flags&IFDSF_EXCLUDE_GROUPS)==0)||name.isEmpty())) {
  463. StringBuffer gs;
  464. group->getText(gs);
  465. pt->setProp("Group",gs.str());
  466. }
  467. if (!name.isEmpty()&&((flags&IFDSF_EXCLUDE_CLUSTERNAMES)==0))
  468. pt->setProp("@name",name);
  469. }
  470. ClusterPartDiskMapSpec &queryPartDiskMapping()
  471. {
  472. return mspec;
  473. }
  474. void setGroupName(const char *_name)
  475. {
  476. name.set(_name);
  477. name.toLowerCase();
  478. }
  479. void setGroup(IGroup *_group)
  480. {
  481. group.set(_group);
  482. }
  483. IGroup *queryGroup()
  484. {
  485. return group;
  486. }
  487. void getBaseDir(StringBuffer &basedir,DFD_OS os)
  488. {
  489. if (mspec.defaultBaseDir.isEmpty()) // assume current platform's default
  490. basedir.append(queryBaseDirectory(grp_unknown, 0, os));
  491. else
  492. basedir.append(mspec.defaultBaseDir);
  493. }
  494. void getReplicateDir(StringBuffer &basedir,DFD_OS os)
  495. {
  496. if (mspec.defaultReplicateDir.isEmpty()) // assume current platform's default
  497. basedir.append(queryBaseDirectory(grp_unknown, 1, os));
  498. else
  499. basedir.append(mspec.defaultReplicateDir);
  500. }
  501. StringBuffer &getClusterLabel(StringBuffer &ret)
  502. {
  503. return getGroupName(ret, NULL);
  504. }
  505. };
  506. IClusterInfo *createClusterInfo(const char *name,
  507. IGroup *grp,
  508. const ClusterPartDiskMapSpec &mspec,
  509. INamedGroupStore *resolver)
  510. {
  511. return new CClusterInfo(name,grp,mspec,resolver);
  512. }
  513. IClusterInfo *deserializeClusterInfo(MemoryBuffer &mb,
  514. INamedGroupStore *resolver)
  515. {
  516. return new CClusterInfo(mb,resolver);
  517. }
  518. IClusterInfo *deserializeClusterInfo(IPropertyTree *pt,
  519. INamedGroupStore *resolver,
  520. unsigned flags)
  521. {
  522. return new CClusterInfo(pt,resolver,flags);
  523. }
  524. class CFileDescriptorBase: public CInterface
  525. {
  526. protected:
  527. PointerArray parts; // of CPartDescriptor
  528. public:
  529. StringAttr tracename;
  530. IArrayOf<IClusterInfo> clusters;
  531. Owned<IPropertyTree> attr;
  532. StringAttr directory;
  533. StringAttr partmask;
  534. virtual unsigned numParts() = 0; // number of parts
  535. virtual unsigned numCopies(unsigned partnum) = 0; // number of copies
  536. virtual INode *doQueryNode(unsigned partidx, unsigned copy, unsigned rn) = 0; // query machine node
  537. virtual unsigned queryDrive(unsigned partidx, unsigned copy) = 0; // query drive
  538. virtual StringBuffer &getPartTail(StringBuffer &name,unsigned idx) = 0;
  539. virtual StringBuffer &getPartDirectory(StringBuffer &name,unsigned idx,unsigned copy = 0) = 0; // get filename dir
  540. virtual void serializePart(MemoryBuffer &mb,unsigned idx) = 0;
  541. virtual const char *queryDefaultDir() = 0;
  542. virtual IFileDescriptor &querySelf() = 0;
  543. virtual unsigned copyClusterNum(unsigned partidx, unsigned copy,unsigned *replicate=NULL) = 0;
  544. };
  545. class CPartDescriptor : implements IPartDescriptor
  546. {
  547. protected: friend class CFileDescriptor;
  548. StringAttr overridename; // this may be a multi path - may or not be relative to directory
  549. // if not set use parent mask (and is *not* multi in this case)
  550. bool ismulti; // only set if overridename set (otherwise false)
  551. CFileDescriptorBase &parent; // this is for the cluster *not* for the entire file
  552. unsigned partIndex;
  553. Owned<IPropertyTree> props;
  554. public:
  555. virtual void Link(void) const
  556. {
  557. parent.Link();
  558. }
  559. virtual bool Release(void) const
  560. {
  561. return parent.Release();
  562. }
  563. CPartDescriptor(CFileDescriptorBase &_parent,unsigned idx,IPropertyTree *pt)
  564. : parent(_parent)
  565. {
  566. partIndex = idx;
  567. ismulti = false;
  568. if (!isEmptyPTree(pt)) {
  569. if (pt->getPropInt("@num",idx+1)-1!=idx)
  570. WARNLOG("CPartDescriptor part index mismatch");
  571. overridename.set(pt->queryProp("@name"));
  572. if (overridename.isEmpty())
  573. overridename.clear();
  574. else
  575. ismulti = ::isMulti(overridename);
  576. props.setown(createPTreeFromIPT(pt));
  577. //props->removeProp("@num"); // keep these for legacy
  578. //props->removeProp("@name");
  579. props->removeProp("@node");
  580. }
  581. else
  582. props.setown(createPTree("Part"));
  583. }
  584. void set(unsigned idx, const char *_tail, IPropertyTree *pt)
  585. {
  586. partIndex = idx;
  587. setOverrideName(_tail);
  588. props.setown(pt?createPTreeFromIPT(pt):createPTree("Part"));
  589. }
  590. CPartDescriptor(CFileDescriptorBase &_parent, unsigned idx, MemoryBuffer &mb)
  591. : parent(_parent)
  592. {
  593. partIndex = idx;
  594. mb.read(overridename);
  595. if (overridename.isEmpty()) // shouldn't really need this
  596. overridename.clear();
  597. ismulti = ::isMulti(overridename);
  598. props.setown(createPTree(mb));
  599. }
  600. unsigned queryPartIndex()
  601. {
  602. return partIndex;
  603. }
  604. unsigned numCopies()
  605. {
  606. return parent.numCopies(partIndex);
  607. }
  608. virtual INode *queryNode(unsigned copy)
  609. {
  610. return parent.doQueryNode(partIndex,copy,(props&&props->hasProp("@rn"))?props->getPropInt("@rn"):(unsigned)-1);
  611. }
  612. virtual unsigned queryDrive(unsigned copy)
  613. {
  614. return parent.queryDrive(partIndex,copy);
  615. }
  616. INode *getNode(unsigned copy=0)
  617. {
  618. return LINK(queryNode(copy));
  619. }
  620. IPropertyTree &queryProperties()
  621. {
  622. return *props;
  623. }
  624. IPropertyTree *getProperties()
  625. {
  626. return props.get();
  627. }
  628. bool getCrc(unsigned &crc)
  629. {
  630. return getCrcFromPartProps(*parent.attr,*props,crc);
  631. }
  632. IFileDescriptor &queryOwner()
  633. {
  634. return parent.querySelf();
  635. }
  636. RemoteFilename &getFilename(unsigned copy, RemoteFilename &rfn)
  637. {
  638. if (ismulti) {
  639. RemoteMultiFilename rmfn;
  640. getMultiFilename(copy, rmfn);
  641. if (rmfn.ordinality()==1) {
  642. rfn.set(rmfn.item(0));
  643. return rfn;
  644. }
  645. throw MakeStringException(-1,"Remote Filename: Cannot resolve single part from wild/multi filename");
  646. }
  647. StringBuffer fullpath;
  648. getPath(fullpath,copy);
  649. rfn.setPath(queryNode(copy)->endpoint(),fullpath.str());
  650. return rfn;
  651. }
  652. StringBuffer &getPath(StringBuffer &path,unsigned copy)
  653. {
  654. StringBuffer tail;
  655. getTail(tail);
  656. if (!tail.length()||!isPathSepChar(tail.charAt(0))) {
  657. getDirectory(path,copy);
  658. addPathSepChar(path);
  659. }
  660. path.append(tail);
  661. return path;
  662. }
  663. StringBuffer &getTail(StringBuffer &name)
  664. {
  665. return parent.getPartTail(name,partIndex);
  666. }
  667. StringBuffer &getDirectory(StringBuffer &dir,unsigned copy)
  668. {
  669. return parent.getPartDirectory(dir,partIndex,copy);
  670. }
  671. bool isMulti()
  672. {
  673. return ismulti;
  674. }
  675. RemoteMultiFilename &getMultiFilename(unsigned copy, RemoteMultiFilename &rmfn)
  676. {
  677. if (ismulti) {
  678. rmfn.setEp(queryNode(copy)->endpoint());
  679. StringBuffer dir;
  680. parent.getPartDirectory(dir,partIndex,copy);
  681. StringBuffer tmp1;
  682. StringBuffer tmp2;
  683. splitDirMultiTail(overridename,tmp1,tmp2);
  684. rmfn.append(tmp2, dir);
  685. }
  686. else {
  687. RemoteFilename rfn;
  688. getFilename(copy,rfn);
  689. rmfn.append(rfn);
  690. }
  691. return rmfn;
  692. }
  693. void subserialize(MemoryBuffer &mb)
  694. {
  695. mb.append(overridename);
  696. props->serialize(mb);
  697. }
  698. bool subserializeTree(IPropertyTree *pt)
  699. {
  700. bool ret = false;
  701. if (props) {
  702. Owned<IAttributeIterator> attriter = props->getAttributes();
  703. ForEach(*attriter) {
  704. const char *an = attriter->queryName();
  705. if ((stricmp(an,"@num")!=0)&&(stricmp(an,"@name")!=0)) {
  706. pt->setProp(an,attriter->queryValue());
  707. ret = true;
  708. }
  709. }
  710. Owned<IPropertyTreeIterator> iter = props->getElements("*");
  711. ForEach(*iter) {
  712. ret = true;
  713. pt->addPropTree(iter->query().queryName(),createPTreeFromIPT(&iter->query()));
  714. }
  715. }
  716. if (!overridename.isEmpty()) {
  717. pt->setProp("@name",overridename);
  718. ret = true;
  719. }
  720. if (ret)
  721. pt->setPropInt("@num",partIndex+1);
  722. if ((partIndex==0)&&(parent.numParts()==1)) { // more legacy
  723. SocketEndpoint ep = queryNode(0)->endpoint();
  724. StringBuffer tmp;
  725. if (!ep.isNull())
  726. pt->setProp("@node",ep.getUrlStr(tmp).str());
  727. if (overridename.isEmpty()&&!parent.partmask.isEmpty()) {
  728. expandMask(tmp.clear(), parent.partmask, 0, 1);
  729. pt->setProp("@name",tmp.str());
  730. }
  731. }
  732. return ret;
  733. }
  734. void setOverrideName(const char *_tail)
  735. {
  736. if (!_tail||!*_tail)
  737. overridename.clear();
  738. else
  739. overridename.set(_tail);
  740. ismulti = ::isMulti(_tail);
  741. }
  742. const char *queryOverrideName()
  743. {
  744. if (overridename.isEmpty())
  745. return NULL;
  746. return overridename;
  747. }
  748. void serialize(MemoryBuffer &mb)
  749. {
  750. parent.serializePart(mb,partIndex);
  751. }
  752. unsigned copyClusterNum(unsigned copy,unsigned *replicate=NULL)
  753. {
  754. return parent.copyClusterNum(partIndex,copy,replicate);
  755. }
  756. IReplicatedFile *getReplicatedFile()
  757. {
  758. IReplicatedFile *ret = createReplicatedFile();
  759. RemoteFilenameArray &copies = ret->queryCopies();
  760. unsigned nc = numCopies();
  761. for (unsigned copy=0;copy<nc;copy++) {
  762. RemoteFilename rfn;
  763. copies.append(getFilename(copy,rfn));
  764. }
  765. return ret;
  766. }
  767. };
  768. // --------------------------------------------------------
  769. class CPartDescriptorArrayIterator : public CArrayIteratorOf<IPartDescriptor, IPartDescriptorIterator>
  770. {
  771. public:
  772. CPartDescriptorArrayIterator() : CArrayIteratorOf<IPartDescriptor, IPartDescriptorIterator>(array) { }
  773. CPartDescriptorArray array;
  774. };
  775. void getClusterInfo(IPropertyTree &pt, INamedGroupStore *resolver, unsigned flags, IArrayOf<IClusterInfo> &clusters)
  776. {
  777. unsigned nc = pt.getPropInt("@numclusters");
  778. if (!nc) { // legacy format
  779. unsigned np = pt.getPropInt("@numparts");
  780. StringArray groups;
  781. getFileGroups(&pt,groups);
  782. unsigned gi = 0;
  783. do {
  784. Owned<IGroup> cgroup;
  785. const char *grp = (gi<groups.ordinality())?groups.item(gi):NULL;
  786. if (grp&&resolver)
  787. cgroup.setown(resolver->lookup(grp));
  788. // get nodes from parts if complete (and group 0)
  789. if (gi==0) { // don't assume lookup name correct!
  790. SocketEndpoint *eps = (SocketEndpoint *)calloc(np?np:1,sizeof(SocketEndpoint));
  791. MemoryBuffer mb;
  792. Owned<IPropertyTreeIterator> piter;
  793. if (pt.getPropBin("Parts",mb))
  794. piter.setown(deserializePartAttrIterator(mb));
  795. else
  796. piter.setown(pt.getElements("Part"));
  797. ForEach(*piter) {
  798. IPropertyTree &cpt = piter->query();
  799. unsigned num = cpt.getPropInt("@num");
  800. if (num>np) {
  801. eps = (SocketEndpoint *)checked_realloc(eps,num*sizeof(SocketEndpoint),np*sizeof(SocketEndpoint),-21);
  802. memset(eps+np,0,(num-np)*sizeof(SocketEndpoint));
  803. np = num;
  804. }
  805. const char *node = cpt.queryProp("@node");
  806. if (node&&*node)
  807. eps[num-1].set(node);
  808. }
  809. unsigned i=0;
  810. for (i=0;i<np;i++)
  811. if (eps[i].isNull())
  812. break;
  813. if (i==np) {
  814. Owned<IGroup> ngrp = createIGroup(np,eps);
  815. if (!cgroup.get()||(ngrp->compare(cgroup)!=GRbasesubset))
  816. cgroup.setown(ngrp.getClear());
  817. }
  818. free(eps);
  819. }
  820. ClusterPartDiskMapSpec mspec;
  821. IClusterInfo *cluster = createClusterInfo(grp,cgroup,mspec,resolver);
  822. clusters.append(*cluster);
  823. gi++;
  824. } while (gi<groups.ordinality());
  825. }
  826. else {
  827. Owned<IPropertyTreeIterator> iter = pt.getElements("Cluster");
  828. ForEach(*iter)
  829. clusters.append(*deserializeClusterInfo(&iter->query(),resolver,flags));
  830. }
  831. }
  832. class CFileDescriptor: public CFileDescriptorBase, implements ISuperFileDescriptor
  833. {
  834. SocketEndpointArray *pending; // for constructing cluster group
  835. bool setupdone;
  836. byte version;
  837. IFileDescriptor &querySelf()
  838. {
  839. return *this;
  840. }
  841. void openPending()
  842. {
  843. if (!pending) {
  844. pending = new SocketEndpointArray;
  845. if (setupdone)
  846. throw MakeStringException(-1,"IFileDescriptor - setup already done");
  847. setupdone = true;
  848. ClusterPartDiskMapSpec mspec;
  849. clusters.append(*createClusterInfo(NULL,NULL,mspec));
  850. }
  851. }
  852. void doClosePending()
  853. {
  854. // first sort out cluster
  855. unsigned np = parts.ordinality();
  856. unsigned n = pending->ordinality();
  857. assertex(clusters.ordinality());
  858. assertex(np>=n);
  859. if (n==0) {
  860. clusters.remove(clusters.ordinality()-1);
  861. WARNLOG("CFileDescriptor: removing empty cluster");
  862. }
  863. else {
  864. unsigned w;
  865. for (w=1;w<n;w++) {
  866. unsigned i;
  867. for (i=w;i<n;i++)
  868. if (!pending->item(i).equals(pending->item(i%w)))
  869. break;
  870. if (i==n)
  871. break;
  872. }
  873. for (unsigned i=n;i>w;)
  874. pending->remove(--i);
  875. Owned<IGroup> newgrp = createIGroup(*pending);
  876. clusters.item(clusters.ordinality()-1).setGroup(newgrp);
  877. }
  878. delete pending;
  879. pending = NULL;
  880. if ((n==1)&&(isSpecialPath(part(0)->overridename)))
  881. return;
  882. // now look for a directory
  883. // this is a bit longwinded!
  884. // expand all tails
  885. StringBuffer tmp;
  886. if (!directory.isEmpty()) {
  887. StringBuffer fp;
  888. ForEachItemIn(i,parts) {
  889. CPartDescriptor *pt = part(i);
  890. if (!pt)
  891. WARNLOG("Null part in pending file descriptor");
  892. else if (pt->isMulti()) {
  893. assertex(!pt->overridename.isEmpty());
  894. if (!isAbsolutePath(pt->overridename)) { // assumes all multi are same
  895. mergeDirMultiTail(directory,pt->overridename,fp.clear()); // assumes all multi are same
  896. pt->setOverrideName(fp.str());
  897. }
  898. }
  899. else {
  900. RemoteFilename rfn;
  901. pt->getFilename(0,rfn);
  902. rfn.getLocalPath(fp.clear());
  903. pt->setOverrideName(fp.str());
  904. }
  905. }
  906. }
  907. directory.clear();
  908. StringBuffer dir;
  909. // now find longest common dir (multi complicates this somewhat)
  910. CPartDescriptor &part0 = *part(0);
  911. bool multi = part0.isMulti();
  912. if (multi)
  913. splitDirMultiTail(part0.overridename,dir,tmp);
  914. else
  915. splitDirTail(part0.overridename,dir);
  916. if (dir.length()==0) {
  917. WARNLOG("CFileDescriptor cannot determine directory for file %s in '%s'",tracename.str(),part0.overridename.str());
  918. }
  919. else {
  920. const char *s = dir.str();
  921. for (unsigned i=1;i<np;i++) {
  922. CPartDescriptor &pt = *part(i);
  923. multi = pt.isMulti();
  924. StringBuffer tdir; // would be easier without multi
  925. assertex(!pt.overridename.isEmpty()); // should have been set above
  926. if (multi) {
  927. StringBuffer tmp;
  928. splitDirMultiTail(pt.overridename,dir,tmp);
  929. }
  930. else
  931. splitDirTail(part0.overridename,tdir);
  932. const char *t = tdir.str();
  933. const char *d = s;
  934. while (*d&&(*t==*d)) {
  935. d++;
  936. t++;
  937. }
  938. if (*t) { // not full match
  939. while ((d!=s)&&!isPathSepChar(*(d-1)))
  940. d--;
  941. dir.setLength(d-s);
  942. s = dir.str(); // paranoid
  943. if (dir.length()<=2) // must be at least "/x/" or "d:\"
  944. break; // no common dir
  945. }
  946. }
  947. if (dir.length()>2) {
  948. // now change all tails to relative
  949. StringBuffer relpath;
  950. for (unsigned i2=0;i2<np;i2++) {
  951. CPartDescriptor &pt = *part(i2);
  952. multi = pt.isMulti();
  953. relpath.clear();
  954. if (multi) {
  955. removeRelativeMultiPath(pt.overridename,dir.str(),relpath);
  956. }
  957. else
  958. relpath.append(splitRelativePath(pt.overridename,dir.str(),relpath));
  959. pt.setOverrideName(relpath.str());
  960. }
  961. if ((dir.length()>1)&&(strcmp(dir.str()+1,":\\")!=0))
  962. dir.setLength(dir.length()-1); // take off sep char
  963. directory.set(dir);
  964. }
  965. }
  966. // see if can create a partmask
  967. for (unsigned i=0;i<np;i++) {
  968. CPartDescriptor &pt = *part(i);
  969. if (pt.isMulti()) {
  970. partmask.clear();
  971. break;
  972. }
  973. if (!partmask.isEmpty()) {
  974. if (!matchesMask(pt.overridename,partmask,i,np)) {
  975. partmask.clear();
  976. if (i!=0)
  977. break;
  978. }
  979. }
  980. if (partmask.isEmpty()&&!constructMask(partmask,pt.overridename,i,np))
  981. break;
  982. }
  983. if (partmask)
  984. for (unsigned i2=0;i2<np;i2++)
  985. part(i2)->setOverrideName(NULL); // no longer need
  986. }
  987. inline void closePending() // bit of a pain, but must be called at start of interrogation functions
  988. {
  989. if (pending)
  990. doClosePending();
  991. }
  992. StringBuffer &getPartTail(StringBuffer &name,unsigned idx)
  993. {
  994. unsigned n = numParts();
  995. if (idx<n) {
  996. CPartDescriptor &pt = *part(idx);
  997. if (!pt.overridename.isEmpty()) {
  998. if (isSpecialPath(pt.overridename))
  999. return name.append(pt.overridename);
  1000. if (pt.isMulti()) {
  1001. StringBuffer tmp;
  1002. splitDirMultiTail(pt.overridename,tmp,name);
  1003. }
  1004. else
  1005. name.append(pathTail(pt.overridename));
  1006. }
  1007. else if (!partmask.isEmpty())
  1008. expandMask(name, pathTail(partmask), idx, n);
  1009. }
  1010. return name;
  1011. }
  1012. StringBuffer &getPartDirectory(StringBuffer &buf,unsigned idx,unsigned copy)
  1013. {
  1014. unsigned n = numParts();
  1015. if (idx<n) {
  1016. StringBuffer fullpath;
  1017. StringBuffer tmp1;
  1018. CPartDescriptor &pt = *part(idx);
  1019. if (!pt.overridename.isEmpty()) {
  1020. if (isSpecialPath(pt.overridename))
  1021. return buf;
  1022. if (pt.isMulti()) {
  1023. StringBuffer tmpon; // bit messy but need to ensure dir put back on before removing!
  1024. const char *on = pt.overridename.get();
  1025. if (on&&*on&&!isAbsolutePath(on)&&!directory.isEmpty())
  1026. on = addPathSepChar(tmpon.append(directory)).append(on).str();
  1027. StringBuffer tmp2;
  1028. splitDirMultiTail(on,tmp1,tmp2);
  1029. }
  1030. else
  1031. splitDirTail(pt.overridename,tmp1);
  1032. if (directory.isEmpty()||(isAbsolutePath(tmp1.str())||(stdIoHandle(tmp1.str())>=0)))
  1033. fullpath.swapWith(tmp1);
  1034. else {
  1035. fullpath.append(directory);
  1036. if (fullpath.length())
  1037. addPathSepChar(fullpath);
  1038. fullpath.append(tmp1);
  1039. }
  1040. }
  1041. else if (!partmask.isEmpty()) {
  1042. fullpath.append(directory);
  1043. if (containsPathSepChar(partmask)) {
  1044. if (fullpath.length())
  1045. addPathSepChar(fullpath);
  1046. splitDirTail(partmask,fullpath);
  1047. }
  1048. }
  1049. else
  1050. fullpath.append(directory);
  1051. replaceClusterDir(idx,copy, fullpath);
  1052. StringBuffer baseDir, repDir;
  1053. unsigned lcopy;
  1054. IClusterInfo * cluster = queryCluster(idx,copy,lcopy);
  1055. if (cluster)
  1056. {
  1057. DFD_OS os = SepCharBaseOs(getPathSepChar(fullpath));
  1058. cluster->getBaseDir(baseDir, os);
  1059. cluster->getReplicateDir(repDir, os);
  1060. }
  1061. setReplicateFilename(fullpath,queryDrive(idx,copy),baseDir.str(),repDir.str());
  1062. if ((fullpath.length()>3)&&isPathSepChar(fullpath.charAt(fullpath.length()-1)))
  1063. fullpath.setLength(fullpath.length()-1);
  1064. if (buf.length())
  1065. buf.append(fullpath);
  1066. else
  1067. buf.swapWith(fullpath);
  1068. }
  1069. return buf;
  1070. }
  1071. IClusterInfo *queryCluster(unsigned partno,unsigned copy, unsigned &lcopy)
  1072. {
  1073. closePending();
  1074. unsigned n=clusters.ordinality();
  1075. unsigned i=0;
  1076. bool c = 0;
  1077. while (i<n) {
  1078. unsigned mc = numClusterCopies(i,partno);
  1079. if (copy<mc) {
  1080. lcopy = copy;
  1081. return &clusters.item(i);
  1082. }
  1083. copy -= mc;
  1084. i++;
  1085. }
  1086. return NULL;
  1087. }
  1088. IClusterInfo *queryCluster(const char *_clusterName)
  1089. {
  1090. StringAttr clusterName = _clusterName;
  1091. clusterName.toLowerCase();
  1092. StringBuffer name;
  1093. ForEachItemIn(c, clusters)
  1094. {
  1095. if (0 == strcmp(clusters.item(c).getClusterLabel(name.clear()).str(), clusterName))
  1096. return &clusters.item(c);
  1097. }
  1098. return NULL;
  1099. }
  1100. void replaceClusterDir(unsigned partno,unsigned copy, StringBuffer &path)
  1101. {
  1102. // assumes default dir matches one of clusters
  1103. closePending();
  1104. if (path.length()<3)
  1105. return;
  1106. const char *ds = path.str();
  1107. unsigned nc = clusters.ordinality();
  1108. if (nc<=1)
  1109. return; // not much can do
  1110. StringAttr matched;
  1111. StringAttr toadd;
  1112. unsigned i=0;
  1113. bool c = 0;
  1114. int cp = (int)copy;
  1115. while (i<nc) {
  1116. StringBuffer dcmp;
  1117. clusters.item(i).getBaseDir(dcmp,SepCharBaseOs(getPathSepChar(ds))); // no trailing sep
  1118. const char *t = dcmp.str();
  1119. const char *d = ds;
  1120. while (*d&&(*t==*d)) {
  1121. d++;
  1122. t++;
  1123. }
  1124. if (!*t&&(!*d||isPathSepChar(*d))&&(dcmp.length()>matched.length()))
  1125. matched.set(dcmp);
  1126. unsigned mc = numClusterCopies(i,partno);
  1127. if ((cp>=0)&&(copy<mc))
  1128. toadd.set(dcmp);
  1129. copy -= mc;
  1130. i++;
  1131. }
  1132. if (!matched.isEmpty()&&!toadd.isEmpty()&&(strcmp(matched,toadd)!=0)) {
  1133. StringBuffer tmp(toadd);
  1134. tmp.append(ds+matched.length());
  1135. path.swapWith(tmp);
  1136. }
  1137. }
  1138. public:
  1139. IMPLEMENT_IINTERFACE;
  1140. CFileDescriptor(MemoryBuffer &mb, IArrayOf<IPartDescriptor> *partsret, UnsignedArray **subcounts=NULL, bool *_interleaved=NULL)
  1141. {
  1142. // bit fiddly
  1143. if (subcounts)
  1144. *subcounts = NULL;
  1145. pending = NULL;
  1146. setupdone = true;
  1147. mb.read(version);
  1148. if ((version != SERIALIZATION_VERSION) && (version != SERIALIZATION_VERSION2)) // check serialization matched
  1149. throw MakeStringException(-1,"FileDescriptor serialization version mismatch %d/%d",(int)SERIALIZATION_VERSION,(int)version);
  1150. mb.read(tracename);
  1151. mb.read(directory);
  1152. mb.read(partmask);
  1153. unsigned n;
  1154. mb.read(n);
  1155. for (unsigned i1 = 0; i1 < n; i1++)
  1156. clusters.append(*deserializeClusterInfo(mb));
  1157. unsigned partidx;
  1158. mb.read(partidx); // -1 if all parts, -2 if multiple parts
  1159. mb.read(n); // numparts
  1160. CPartDescriptor *part;
  1161. if (partidx == (unsigned)-2)
  1162. {
  1163. UnsignedArray pia;
  1164. unsigned pi;
  1165. loop
  1166. {
  1167. mb.read(pi);
  1168. if (pi == (unsigned)-1)
  1169. break;
  1170. pia.append(pi);
  1171. }
  1172. for (unsigned i3 = 0; i3 < n; i3++)
  1173. parts.append(NULL);
  1174. ForEachItemIn(i4, pia)
  1175. {
  1176. unsigned p = pia.item(i4);
  1177. if (p < n) {
  1178. part = new CPartDescriptor(*this, p, mb);
  1179. parts.replace(part, p);
  1180. }
  1181. }
  1182. if (partsret)
  1183. {
  1184. ForEachItemIn(i5, pia)
  1185. {
  1186. unsigned p = pia.item(i5);
  1187. if (p < parts.ordinality())
  1188. {
  1189. CPartDescriptor *pt = (CPartDescriptor *)parts.item(p);
  1190. partsret->append(*LINK(pt));
  1191. }
  1192. }
  1193. }
  1194. }
  1195. else
  1196. {
  1197. for (unsigned i2=0; i2 < n; i2++)
  1198. {
  1199. if ((partidx == (unsigned)-1) || (partidx == i2))
  1200. {
  1201. part = new CPartDescriptor(*this, i2, mb);
  1202. if (partsret)
  1203. partsret->append(*LINK(part));
  1204. }
  1205. else
  1206. part = NULL; // new CPartDescriptor(*this, i2, NULL);
  1207. parts.append(part);
  1208. }
  1209. }
  1210. attr.setown(createPTree(mb));
  1211. if (!attr)
  1212. attr.setown(createPTree("Attr")); // doubt can happen
  1213. if (version == SERIALIZATION_VERSION2)
  1214. {
  1215. if (subcounts)
  1216. *subcounts = new UnsignedArray;
  1217. unsigned n;
  1218. mb.read(n);
  1219. while (n)
  1220. {
  1221. unsigned np;
  1222. mb.read(np);
  1223. if (subcounts)
  1224. (*subcounts)->append(np);
  1225. n--;
  1226. }
  1227. bool interleaved;
  1228. mb.read(interleaved);
  1229. if (_interleaved)
  1230. *_interleaved = interleaved;
  1231. }
  1232. }
  1233. void ensureRequiredStructuresExist()
  1234. {
  1235. if (!attr) attr.setown(createPTree("Attr"));
  1236. }
  1237. CFileDescriptor(IPropertyTree *tree, INamedGroupStore *resolver, unsigned flags)
  1238. {
  1239. pending = NULL;
  1240. if ((flags&IFDSF_ATTR_ONLY)||!tree) {
  1241. if (tree)
  1242. attr.setown(tree);
  1243. ensureRequiredStructuresExist();
  1244. setupdone = false;
  1245. return;
  1246. }
  1247. else
  1248. setupdone = true;
  1249. IPropertyTree &pt = *tree;
  1250. tracename.set(pt.queryProp("@trace"));
  1251. directory.set(pt.queryProp("@directory"));
  1252. partmask.set(pt.queryProp("@partmask"));
  1253. unsigned np = pt.getPropInt("@numparts");
  1254. StringBuffer query;
  1255. IPropertyTree **trees = NULL;
  1256. Owned<IPropertyTreeIterator> piter;
  1257. MemoryBuffer mb;
  1258. IPropertyTree *at = pt.queryPropTree("Attr");
  1259. getClusterInfo(pt,resolver,flags,clusters);
  1260. offset_t totalsize = (offset_t)-1;
  1261. if (flags&IFDSF_EXCLUDE_PARTS) {
  1262. for (unsigned i2=0;i2<np;i2++)
  1263. parts.append(new CPartDescriptor(*this,i2,NULL));
  1264. }
  1265. else {
  1266. if (!at||(at->getPropInt64("@size",-1)==-1))
  1267. totalsize = 0;
  1268. if ((piter.get()&&mb.length())||pt.getPropBin("Parts",mb)) {
  1269. if (!piter.get())
  1270. piter.setown(deserializePartAttrIterator(mb));
  1271. unsigned i2=0;
  1272. ForEach(*piter) {
  1273. if (totalsize!=(offset_t)-1) {
  1274. offset_t sz = piter->query().getPropInt64("@size",-1);
  1275. if (sz!=(offset_t)-1)
  1276. totalsize += sz;
  1277. else
  1278. totalsize = (offset_t)-1;
  1279. }
  1280. parts.append(new CPartDescriptor(*this,i2++,&piter->query()));
  1281. }
  1282. }
  1283. else { // parts may not be in order
  1284. IArrayOf<IPropertyTree> trees;
  1285. if (!piter.get())
  1286. piter.setown(pt.getElements("Part"));
  1287. ForEach(*piter) {
  1288. IPropertyTree &cpt = piter->query();
  1289. unsigned num = cpt.getPropInt("@num");
  1290. if (!num)
  1291. continue;
  1292. while (num>trees.ordinality()+1)
  1293. trees.append(*createPTree("Part"));
  1294. cpt.Link();
  1295. if (num>trees.ordinality())
  1296. trees.append(cpt);
  1297. else
  1298. trees.replace(cpt,num-1);
  1299. }
  1300. for (unsigned i2=0;i2<np;i2++) {
  1301. if (totalsize!=(offset_t)-1) {
  1302. offset_t sz = (i2<trees.ordinality())?(offset_t)trees.item(i2).getPropInt64("@size",-1):(offset_t)-1;
  1303. if (sz!=(offset_t)-1)
  1304. totalsize += sz;
  1305. else
  1306. totalsize = (offset_t)-1;
  1307. }
  1308. parts.append(new CPartDescriptor(*this,i2,(i2<trees.ordinality())?&trees.item(i2):NULL));
  1309. }
  1310. }
  1311. }
  1312. piter.clear();
  1313. if (at)
  1314. attr.setown(createPTreeFromIPT(at));
  1315. else
  1316. attr.setown(createPTree("Attr"));
  1317. if (totalsize!=(offset_t)-1)
  1318. attr->setPropInt64("@size",totalsize);
  1319. }
  1320. void serializePart(MemoryBuffer &mb,unsigned partidx)
  1321. {
  1322. serializeParts(mb,&partidx,1);
  1323. }
  1324. void serializeParts(MemoryBuffer &mb,unsigned *partlist, unsigned nparts);
  1325. void serializeParts(MemoryBuffer &mb,UnsignedArray &partlist)
  1326. {
  1327. serializeParts(mb,partlist.getArray(),partlist.ordinality());
  1328. }
  1329. void serialize(MemoryBuffer &mb)
  1330. {
  1331. serializePart(mb,(unsigned)-1);
  1332. }
  1333. void serializeTree(IPropertyTree &pt,unsigned flags)
  1334. {
  1335. closePending();
  1336. // if (!tracename.isEmpty())
  1337. // pt.setProp("@trace",tracename); // don't include trace name in tree (may revisit later)
  1338. if (!directory.isEmpty())
  1339. pt.setProp("@directory",directory);
  1340. if (!partmask.isEmpty())
  1341. pt.setProp("@partmask",partmask);
  1342. unsigned n = clusters.ordinality();
  1343. pt.setPropInt("@numclusters",n);
  1344. unsigned cn = 0;
  1345. // JCSMORE - afaics, IFileDescriptor @group is no longer used
  1346. StringBuffer grplist;
  1347. ForEachItemIn(i1,clusters) {
  1348. Owned<IPropertyTree> ct = createPTree("Cluster");
  1349. clusters.item(i1).serializeTree(ct,flags);
  1350. if (!isEmptyPTree(ct)) {
  1351. const char *cname = ct->queryProp("@name");
  1352. if (cname&&*cname) {
  1353. if (grplist.length())
  1354. grplist.append(',');
  1355. grplist.append(cname);
  1356. }
  1357. pt.addPropTree("Cluster",ct.getClear());
  1358. }
  1359. else
  1360. WARNLOG("CFileDescriptor::serializeTree - empty cluster");
  1361. }
  1362. if (grplist.length())
  1363. pt.setProp("@group",grplist.str());
  1364. else
  1365. pt.removeProp("@group");
  1366. /// ^^
  1367. n = numParts();
  1368. pt.setPropInt("@numparts",n);
  1369. if ((flags&IFDSF_EXCLUDE_PARTS)==0) {
  1370. if ((n==1)||((flags&CPDMSF_packParts)==0)) {
  1371. for (unsigned i2=0;i2<n;i2++) {
  1372. Owned<IPropertyTree> p = createPTree("Part");
  1373. if (part(i2)->subserializeTree(p))
  1374. pt.addPropTree("Part",p.getClear());
  1375. }
  1376. }
  1377. else {
  1378. MemoryBuffer mb;
  1379. for (unsigned i2=0;i2<n;i2++) {
  1380. // this seems a bit excessive in conversions
  1381. Owned<IPropertyTree> p = createPTree("Part");
  1382. part(i2)->subserializeTree(p);
  1383. serializePartAttr(mb,p);
  1384. }
  1385. pt.setPropBin("Parts",mb.length(),mb.toByteArray());
  1386. }
  1387. }
  1388. IPropertyTree *t = &queryProperties();
  1389. if (!isEmptyPTree(t))
  1390. pt.addPropTree("Attr",createPTreeFromIPT(t));
  1391. }
  1392. IPropertyTree *getFileTree(unsigned flags)
  1393. {
  1394. Owned<IPropertyTree> ret = createPTree(queryDfsXmlBranchName(DXB_File));
  1395. serializeTree(*ret,flags);
  1396. return ret.getClear();
  1397. }
  1398. virtual ~CFileDescriptor()
  1399. {
  1400. closePending(); // not sure strictly needed
  1401. ForEachItemInRev(p, parts)
  1402. delpart(p);
  1403. }
  1404. void setDefaultDir(const char *dirname)
  1405. {
  1406. const char *s=dirname;
  1407. size32_t l = strlen(s);
  1408. char sc = 0;
  1409. if ((l>1)&&(isPathSepChar(dirname[l-1]))&&(strcmp(dirname+1,":\\")!=0)) {
  1410. l--;
  1411. sc = dirname[l];
  1412. }
  1413. if (l&&!isAbsolutePath(dirname)) { // support cross-OS
  1414. // assume relative path on same OS
  1415. if (!sc)
  1416. sc = getPathSepChar(dirname);
  1417. StringBuffer tmp;
  1418. tmp.append(queryBaseDirectory(grp_unknown, 0, SepCharBaseOs(sc))).append(sc).append(s);
  1419. directory.set(tmp.str());
  1420. }
  1421. else
  1422. directory.set(s,l);
  1423. }
  1424. int getReplicateOffset(unsigned clusternum)
  1425. {
  1426. closePending();
  1427. if (clusternum>=clusters.ordinality())
  1428. return 1;
  1429. return clusters.item(clusternum).queryPartDiskMapping().replicateOffset;
  1430. }
  1431. CPartDescriptor *part(unsigned idx)
  1432. {
  1433. CPartDescriptor *ret = (CPartDescriptor *)parts.item(idx);
  1434. if (!ret) { // this is not normally expected!
  1435. ret = new CPartDescriptor(*this,idx,NULL);
  1436. parts.replace(ret,idx);
  1437. }
  1438. return ret;
  1439. }
  1440. void delpart(unsigned idx)
  1441. {
  1442. CPartDescriptor *p = (CPartDescriptor *)parts.item(idx);
  1443. delete p;
  1444. parts.remove(idx);
  1445. }
  1446. void doSetPart(unsigned idx, const SocketEndpoint &ep, const char *filename, IPropertyTree *pt)
  1447. {
  1448. // setPart from ep/node ignores port in ep
  1449. openPending();
  1450. while (parts.ordinality()<=idx) {
  1451. SocketEndpoint nullep;
  1452. parts.append(new CPartDescriptor(*this,idx,NULL));
  1453. pending->append(nullep);
  1454. }
  1455. CPartDescriptor &p = *part(idx);
  1456. p.set(idx,filename,pt);
  1457. if (idx>=pending->ordinality())
  1458. ERRLOG("IFileDescriptor setPart called after cluster finished");
  1459. else {
  1460. SocketEndpoint &pep = pending->element(idx);
  1461. if (pep.isNull())
  1462. pep=ep;
  1463. else
  1464. ERRLOG("IFileDescriptor setPart called twice for same part");
  1465. }
  1466. }
  1467. void setPart(unsigned idx, INode *node, const char *filename, IPropertyTree *pt)
  1468. {
  1469. if (node)
  1470. setPart(idx,node->endpoint(),filename,pt); // ignore port
  1471. }
  1472. void setPart(unsigned idx, const IpAddress &ip, const char *filename, IPropertyTree *pt)
  1473. {
  1474. SocketEndpoint ep(0,ip);
  1475. doSetPart(idx,ep,filename,pt);
  1476. }
  1477. void setPart(unsigned idx, const RemoteFilename &name, IPropertyTree *pt)
  1478. {
  1479. StringBuffer localname;
  1480. name.getLocalPath(localname);
  1481. SocketEndpoint ep = name.queryEndpoint();
  1482. doSetPart(idx,ep,localname.str(),pt);
  1483. }
  1484. void setTraceName(const char *trc)
  1485. {
  1486. tracename.set(trc);
  1487. }
  1488. unsigned numClusterCopies(unsigned cnum,unsigned partnum)
  1489. {
  1490. IClusterInfo &cluster = clusters.item(cnum);
  1491. IGroup *grp = cluster.queryGroup();
  1492. return cluster.queryPartDiskMapping().numCopies(partnum,grp?grp->ordinality():1,numParts());
  1493. }
  1494. unsigned numCopies(unsigned partnum)
  1495. {
  1496. closePending();
  1497. unsigned ret = 0;
  1498. ForEachItemIn(i,clusters)
  1499. ret += numClusterCopies(i,partnum);
  1500. return ret;
  1501. }
  1502. INode *getNode(unsigned partidx,unsigned copy)
  1503. {
  1504. INode *ret = queryNode(partidx,copy);
  1505. return LINK(ret);
  1506. }
  1507. INode *doQueryNode(unsigned idx,unsigned copy,unsigned rn)
  1508. {
  1509. closePending();
  1510. unsigned lcopy;
  1511. IClusterInfo * cluster = queryCluster(idx,copy,lcopy);
  1512. if (!cluster)
  1513. return queryNullNode();
  1514. if ((copy==1)&&(rn!=(unsigned)-1)) {
  1515. IGroup *group = cluster->queryGroup();
  1516. if (group&&rn<group->ordinality())
  1517. return &group->queryNode((rank_t)rn);
  1518. }
  1519. return cluster->queryNode(idx,numParts(),lcopy);
  1520. }
  1521. unsigned queryDrive(unsigned idx,unsigned copy)
  1522. {
  1523. closePending();
  1524. unsigned lcopy;
  1525. IClusterInfo * cluster = queryCluster(idx,copy,lcopy);
  1526. if (!cluster)
  1527. return 0;
  1528. return cluster->queryDrive(idx,numParts(),lcopy);
  1529. }
  1530. INode *queryNode(unsigned idx,unsigned copy)
  1531. {
  1532. closePending();
  1533. if (idx<numParts())
  1534. return part(idx)->queryNode(copy);
  1535. return NULL;
  1536. }
  1537. RemoteFilename &getFilename(unsigned idx, unsigned copy, RemoteFilename &rfn)
  1538. {
  1539. closePending();
  1540. return part(idx)->getFilename(copy, rfn);
  1541. }
  1542. StringBuffer &getTraceName(StringBuffer &str)
  1543. {
  1544. closePending();
  1545. return str.append(tracename);
  1546. }
  1547. virtual IPropertyTree *getProperties()
  1548. {
  1549. closePending();
  1550. return attr.getLink();
  1551. }
  1552. IPropertyTree &queryProperties()
  1553. {
  1554. closePending();
  1555. return *attr.get();
  1556. }
  1557. IPropertyTree *queryHistory()
  1558. {
  1559. closePending();
  1560. return attr->queryPropTree("History");
  1561. }
  1562. bool isMulti(unsigned partidx=(unsigned)-1)
  1563. {
  1564. closePending();
  1565. if (partidx==(unsigned)-1) {
  1566. for(partidx=0; partidx<numParts(); partidx++)
  1567. if (part(partidx)->isMulti())
  1568. return true;
  1569. return false;
  1570. }
  1571. return ((partidx<numParts()) && part(partidx)->isMulti());
  1572. }
  1573. RemoteMultiFilename &getMultiFilename(unsigned partidx,unsigned cpy, RemoteMultiFilename &rfn)
  1574. {
  1575. closePending();
  1576. return part(partidx)->getMultiFilename(cpy, rfn);
  1577. }
  1578. IPartDescriptor *getPart(unsigned idx)
  1579. {
  1580. IPartDescriptor *ret = queryPart(idx);
  1581. return LINK(ret);
  1582. }
  1583. IPartDescriptor *queryPart(unsigned idx)
  1584. {
  1585. closePending();
  1586. if (idx<numParts())
  1587. return part(idx);
  1588. return NULL;
  1589. }
  1590. IPartDescriptorIterator *getIterator()
  1591. {
  1592. closePending();
  1593. CPartDescriptorArrayIterator *iter = new CPartDescriptorArrayIterator();
  1594. unsigned n=0;
  1595. for (; n<numParts(); n++) iter->array.append(*getPart(n));
  1596. return iter;
  1597. }
  1598. const char *queryKind()
  1599. {
  1600. return queryProperties().queryProp("@kind");
  1601. }
  1602. bool isGrouped()
  1603. {
  1604. return queryProperties().getPropBool("@grouped");
  1605. }
  1606. bool isCompressed(bool *blocked=NULL)
  1607. {
  1608. return ::isCompressed(queryProperties(), blocked);
  1609. }
  1610. const char *queryDefaultDir()
  1611. {
  1612. closePending();
  1613. return directory;
  1614. }
  1615. void setPartMask(const char *mask)
  1616. {
  1617. partmask.set(mask);
  1618. }
  1619. unsigned addCluster(const char *name,IGroup *grp,const ClusterPartDiskMapSpec &map)
  1620. {
  1621. closePending();
  1622. IClusterInfo * cluster = createClusterInfo(name,grp,map);
  1623. clusters.append(*cluster);
  1624. return clusters.ordinality()-1;
  1625. }
  1626. unsigned addCluster(IGroup *grp,const ClusterPartDiskMapSpec &map)
  1627. {
  1628. return addCluster(NULL,grp,map);
  1629. }
  1630. void endCluster(ClusterPartDiskMapSpec &map)
  1631. {
  1632. closePending();
  1633. if (clusters.ordinality())
  1634. clusters.item(clusters.ordinality()-1).queryPartDiskMapping() = map;
  1635. }
  1636. const char *queryPartMask()
  1637. {
  1638. closePending();
  1639. return partmask;
  1640. }
  1641. IGroup *getGroup()
  1642. {
  1643. IGroup *g = queryClusterGroup(0);
  1644. return LINK(g);
  1645. }
  1646. unsigned numParts()
  1647. {
  1648. closePending();
  1649. return parts.ordinality();
  1650. }
  1651. void setNumParts(unsigned numparts)
  1652. {
  1653. closePending();
  1654. while (parts.ordinality()<numparts)
  1655. parts.append(new CPartDescriptor(*this,parts.ordinality(),NULL));
  1656. while (parts.ordinality()>numparts)
  1657. delpart(parts.ordinality()-1);
  1658. }
  1659. unsigned numClusters()
  1660. {
  1661. closePending();
  1662. return clusters.ordinality();
  1663. }
  1664. unsigned copyClusterNum(unsigned partidx, unsigned copy,unsigned *replicate=NULL)
  1665. {
  1666. unsigned lcopy=0;
  1667. IClusterInfo * cluster = queryCluster(partidx,copy,lcopy);
  1668. if (replicate)
  1669. *replicate = lcopy;
  1670. if (!cluster)
  1671. return NotFound;
  1672. // bit silly finding again
  1673. return clusters.find(*cluster);
  1674. }
  1675. ClusterPartDiskMapSpec &queryPartDiskMapping(unsigned clusternum)
  1676. {
  1677. closePending();
  1678. assertex(clusternum<numClusters());
  1679. return clusters.item(clusternum).queryPartDiskMapping();
  1680. }
  1681. IGroup *queryClusterGroup(unsigned clusternum)
  1682. {
  1683. closePending();
  1684. assertex(clusternum<numClusters());
  1685. return clusters.item(clusternum).queryGroup();
  1686. }
  1687. void setClusterGroup(unsigned clusternum,IGroup *grp)
  1688. {
  1689. closePending();
  1690. assertex(clusternum<numClusters());
  1691. clusters.item(clusternum).setGroup(grp);
  1692. }
  1693. StringBuffer &getClusterGroupName(unsigned clusternum,StringBuffer &ret,IGroupResolver *resolver)
  1694. {
  1695. closePending();
  1696. assertex(clusternum<numClusters());
  1697. return clusters.item(clusternum).getGroupName(ret,resolver);
  1698. }
  1699. void setClusterGroupName(unsigned clusternum,const char *name)
  1700. {
  1701. closePending();
  1702. assertex(clusternum<numClusters());
  1703. clusters.item(clusternum).setGroupName(name);
  1704. }
  1705. StringBuffer &getClusterLabel(unsigned clusternum,StringBuffer &ret)
  1706. // either roxie label or node group name
  1707. {
  1708. closePending();
  1709. assertex(clusternum<numClusters());
  1710. return clusters.item(clusternum).getClusterLabel(ret);
  1711. }
  1712. void setClusterOrder(StringArray &names,bool exclusive)
  1713. {
  1714. closePending();
  1715. unsigned done = 0;
  1716. StringBuffer cname;
  1717. ForEachItemIn(i,names)
  1718. {
  1719. StringAttr name = names.item(i);
  1720. name.toLowerCase();
  1721. for (unsigned j=done;j<clusters.ordinality();j++)
  1722. {
  1723. clusters.item(j).getClusterLabel(cname.clear());
  1724. if (strcmp(cname.str(),name)==0)
  1725. {
  1726. if (done!=j)
  1727. clusters.swap(done,j);
  1728. done++;
  1729. break;
  1730. }
  1731. }
  1732. }
  1733. if (exclusive)
  1734. {
  1735. if (!done)
  1736. done = 1;
  1737. StringAttr oldDefaultDir;
  1738. StringBuffer baseDir1;
  1739. while (clusters.ordinality()>done)
  1740. {
  1741. clusters.item(clusters.ordinality()-1).getBaseDir(baseDir1.clear(),SepCharBaseOs(getPathSepChar(directory)));
  1742. // if baseDir is leading component this file's default directory..
  1743. if (!oldDefaultDir.length() && directory.length()>=baseDir1.length() && 0==strncmp(directory, baseDir1, baseDir1.length()) &&
  1744. (directory.length()==baseDir1.length() || isPathSepChar(directory[baseDir1.length()])))
  1745. oldDefaultDir.set(baseDir1.str());
  1746. clusters.remove(clusters.ordinality()-1);
  1747. }
  1748. if (oldDefaultDir.length() && clusters.ordinality())
  1749. {
  1750. StringBuffer baseDir2;
  1751. clusters.item(0).getBaseDir(baseDir2.clear(), SepCharBaseOs(getPathSepChar(directory)));
  1752. StringBuffer newDir(baseDir2.str());
  1753. if (directory.length()>oldDefaultDir.length())
  1754. newDir.append(directory.get()+oldDefaultDir.length());
  1755. directory.set(newDir.str());
  1756. }
  1757. }
  1758. }
  1759. virtual void ensureReplicate()
  1760. {
  1761. for (unsigned clusterIdx = 0; clusterIdx<numClusters(); clusterIdx++)
  1762. queryPartDiskMapping(clusterIdx).ensureReplicate();
  1763. }
  1764. ISuperFileDescriptor *querySuperFileDescriptor()
  1765. {
  1766. return NULL;
  1767. }
  1768. bool mapSubPart(unsigned superpartnum, unsigned &subfile, unsigned &subpartnum)
  1769. {
  1770. // shouldn't get called ever
  1771. subpartnum = superpartnum;
  1772. subfile = 0;
  1773. return true;
  1774. }
  1775. void setSubMapping(UnsignedArray &_subcounts, bool _interleaved)
  1776. {
  1777. UNIMPLEMENTED_X("setSubMapping called from CFileDescriptor!");
  1778. }
  1779. unsigned querySubFiles()
  1780. {
  1781. UNIMPLEMENTED_X("querySubFiles called from CFileDescriptor!");
  1782. }
  1783. };
  1784. class CSuperFileDescriptor: public CFileDescriptor
  1785. {
  1786. UnsignedArray *subfilecounts;
  1787. bool interleaved;
  1788. public:
  1789. CSuperFileDescriptor(MemoryBuffer &mb, IArrayOf<IPartDescriptor> *partsret)
  1790. : CFileDescriptor(mb,partsret,&subfilecounts,&interleaved)
  1791. {
  1792. }
  1793. CSuperFileDescriptor(IPropertyTree *attr)
  1794. : CFileDescriptor(attr,NULL,IFDSF_ATTR_ONLY) // only support attr here
  1795. {
  1796. subfilecounts = NULL;
  1797. }
  1798. virtual ~CSuperFileDescriptor()
  1799. {
  1800. delete subfilecounts;
  1801. }
  1802. ISuperFileDescriptor *querySuperFileDescriptor()
  1803. {
  1804. return this;
  1805. }
  1806. bool mapSubPart(unsigned superpartnum, unsigned &subfile, unsigned &subpartnum)
  1807. {
  1808. subpartnum = superpartnum;
  1809. subfile = 0;
  1810. if (!subfilecounts) // its a file!
  1811. return true;
  1812. if (interleaved) {
  1813. unsigned p = 0;
  1814. unsigned f = 0;
  1815. bool found = false;
  1816. loop {
  1817. if (f==subfilecounts->ordinality()) {
  1818. if (!found)
  1819. break; // no more
  1820. found = false;
  1821. p++;
  1822. f = 0;
  1823. }
  1824. if (p<subfilecounts->item(f)) {
  1825. if (!superpartnum) {
  1826. subfile = f;
  1827. subpartnum = p;
  1828. return true;
  1829. }
  1830. superpartnum--;
  1831. found = true;
  1832. }
  1833. f++;
  1834. }
  1835. }
  1836. else { // sequential
  1837. while (subfile<subfilecounts->ordinality()) {
  1838. if (subpartnum<subfilecounts->item(subfile))
  1839. return true;
  1840. subpartnum -= subfilecounts->item(subfile);
  1841. subfile++;
  1842. }
  1843. }
  1844. return false;
  1845. }
  1846. void setSubMapping(UnsignedArray &_subcounts, bool _interleaved)
  1847. {
  1848. interleaved = _interleaved;
  1849. if (_subcounts.ordinality()) {
  1850. if (subfilecounts)
  1851. subfilecounts->kill();
  1852. else
  1853. subfilecounts = new UnsignedArray;
  1854. ForEachItemIn(i,_subcounts)
  1855. subfilecounts->append(_subcounts.item(i));
  1856. }
  1857. else {
  1858. delete subfilecounts;
  1859. subfilecounts = NULL;
  1860. }
  1861. }
  1862. unsigned querySubFiles()
  1863. {
  1864. if (!subfilecounts) // its a file!
  1865. return 1;
  1866. return subfilecounts->ordinality();
  1867. }
  1868. void serializeSub(MemoryBuffer &mb)
  1869. {
  1870. if (subfilecounts) {
  1871. unsigned count = subfilecounts->ordinality();
  1872. mb.append(count);
  1873. ForEachItemIn(i,*subfilecounts)
  1874. mb.append(subfilecounts->item(i));
  1875. }
  1876. else
  1877. mb.append((unsigned)0);
  1878. mb.append(interleaved);
  1879. }
  1880. };
  1881. void CFileDescriptor::serializeParts(MemoryBuffer &mb,unsigned *partlist, unsigned nparts)
  1882. {
  1883. closePending();
  1884. ISuperFileDescriptor *isdesc = querySuperFileDescriptor();
  1885. CSuperFileDescriptor *sdesc = isdesc?(QUERYINTERFACE(isdesc,CSuperFileDescriptor)):NULL;
  1886. mb.append(sdesc?SERIALIZATION_VERSION2:SERIALIZATION_VERSION);
  1887. mb.append(tracename);
  1888. mb.append(directory);
  1889. mb.append(partmask);
  1890. // first clusters
  1891. unsigned n = clusters.ordinality();
  1892. mb.append(n);
  1893. ForEachItemIn(i1,clusters)
  1894. clusters.item(i1).serialize(mb);
  1895. n = numParts();
  1896. if (nparts==1) {
  1897. unsigned pi = *partlist;
  1898. mb.append(pi).append(n);
  1899. if (pi==(unsigned)-1) {
  1900. for (unsigned i2=0;i2<n;i2++)
  1901. part(i2)->subserialize(mb);
  1902. }
  1903. else if (pi<n)
  1904. part(pi)->subserialize(mb);
  1905. }
  1906. else {
  1907. mb.append((unsigned)-2).append(n); // -2 is for multiple
  1908. for (unsigned i3=0;i3<nparts;i3++)
  1909. mb.append(partlist[i3]);
  1910. mb.append((unsigned)-1); // end of list
  1911. for (unsigned i4=0;i4<nparts;i4++)
  1912. part(partlist[i4])->subserialize(mb);
  1913. }
  1914. queryProperties().serialize(mb);
  1915. if (sdesc)
  1916. sdesc->serializeSub(mb);
  1917. }
  1918. IFileDescriptor *createFileDescriptor(IPropertyTree *tree)
  1919. {
  1920. return new CFileDescriptor(tree,NULL,IFDSF_ATTR_ONLY);
  1921. }
  1922. ISuperFileDescriptor *createSuperFileDescriptor(IPropertyTree *tree)
  1923. {
  1924. return new CSuperFileDescriptor(tree);
  1925. }
  1926. IFileDescriptor *createFileDescriptor()
  1927. {
  1928. return new CFileDescriptor(NULL,NULL,0);
  1929. }
  1930. static IFileDescriptor *_createExternalFileDescriptor(const char *_logicalname, bool lookup)
  1931. {
  1932. CDfsLogicalFileName logicalname;
  1933. logicalname.set(_logicalname);
  1934. //authentication already done
  1935. SocketEndpoint ep;
  1936. Owned<IGroup> group;
  1937. if (!logicalname.getEp(ep))
  1938. {
  1939. StringBuffer grp;
  1940. if (logicalname.getGroupName(grp).length()==0)
  1941. throw MakeStringException(-1,"missing node in external file name (%s)",logicalname.get());
  1942. group.setown(queryNamedGroupStore().lookup(grp.str()));
  1943. if (!group)
  1944. throw MakeStringException(-1,"cannot resolve node %s in external file name (%s)",grp.str(),logicalname.get());
  1945. ep = group->queryNode(0).endpoint();
  1946. }
  1947. bool iswin=false;
  1948. bool usedafs;
  1949. switch (getDaliServixOs(ep))
  1950. {
  1951. case DAFS_OSwindows:
  1952. iswin = true;
  1953. // fall through
  1954. case DAFS_OSlinux:
  1955. case DAFS_OSsolaris:
  1956. usedafs = ep.port||!ep.isLocal();
  1957. break;
  1958. default:
  1959. #ifdef _WIN32
  1960. iswin = true;
  1961. #else
  1962. iswin = false;
  1963. #endif
  1964. usedafs = false;
  1965. break;
  1966. }
  1967. //rest is local path
  1968. Owned<IFileDescriptor> fileDesc = createFileDescriptor();
  1969. StringBuffer dir;
  1970. StringBuffer tail;
  1971. IException *e=NULL;
  1972. if (!logicalname.getExternalPath(dir,tail,iswin,&e))
  1973. {
  1974. if (e)
  1975. throw e;
  1976. return NULL;
  1977. }
  1978. fileDesc->setDefaultDir(dir.str());
  1979. unsigned n = group.get()?group->ordinality():1;
  1980. StringBuffer partname;
  1981. CDateTime modTime;
  1982. StringBuffer modTimeStr;
  1983. for (unsigned i=0;i<n;i++)
  1984. {
  1985. if (group.get())
  1986. ep = group->queryNode(i).endpoint();
  1987. partname.clear();
  1988. partname.append(dir);
  1989. const char *s = tail.str();
  1990. bool isspecial = (*s=='>');
  1991. if (isspecial)
  1992. partname.append(s);
  1993. else
  1994. {
  1995. while (*s)
  1996. {
  1997. if (memicmp(s,"$P$",3)==0)
  1998. {
  1999. partname.append(i+1);
  2000. s += 3;
  2001. }
  2002. else if (memicmp(s,"$N$",3)==0)
  2003. {
  2004. partname.append(n);
  2005. s += 3;
  2006. }
  2007. else
  2008. partname.append(*(s++));
  2009. }
  2010. }
  2011. if (!ep.port&&usedafs)
  2012. ep.port = getDaliServixPort();
  2013. RemoteFilename rfn;
  2014. rfn.setPath(ep,partname.str());
  2015. if (!isspecial&&(memcmp(partname.str(),"/$/",3)!=0)&&(memcmp(partname.str(),"\\$\\",3)!=0)) // don't get date on external data
  2016. {
  2017. try
  2018. {
  2019. Owned<IFile> file = createIFile(rfn);
  2020. CDateTime dt;
  2021. if (file&&file->getTime(NULL,&dt,NULL))
  2022. {
  2023. if ((0 == modTimeStr.length())||(dt.compareDate(modTime)>0))
  2024. {
  2025. modTime.set(dt);
  2026. modTime.getString(modTimeStr);
  2027. }
  2028. }
  2029. }
  2030. catch (IException *e)
  2031. {
  2032. EXCLOG(e,"CDistributedFileDirectory::createExternal");
  2033. e->Release();
  2034. }
  2035. }
  2036. if (lookup)
  2037. {
  2038. OwnedIFile iFile = createIFile(rfn);
  2039. if (!iFile->exists())
  2040. return NULL; // >=1 part does not exist.
  2041. }
  2042. if (modTimeStr.length())
  2043. {
  2044. Owned<IPropertyTree> part = createPTree("Part");
  2045. part->setProp("@modified", modTimeStr.str());
  2046. fileDesc->setPart(i, rfn, part);
  2047. }
  2048. else
  2049. fileDesc->setPart(i, rfn);
  2050. }
  2051. fileDesc->queryPartDiskMapping(0).defaultCopies = DFD_NoCopies;
  2052. return fileDesc.getClear();
  2053. }
  2054. IFileDescriptor *createExternalFileDescriptor(const char *logicalname)
  2055. {
  2056. return _createExternalFileDescriptor(logicalname, false);
  2057. }
  2058. IFileDescriptor *getExternalFileDescriptor(const char *logicalname)
  2059. {
  2060. return _createExternalFileDescriptor(logicalname, true);
  2061. }
  2062. inline void moveProp(IPropertyTree *to,IPropertyTree *from,const char *name)
  2063. {
  2064. const char *p = from->queryProp(name);
  2065. if (p&&*p) {
  2066. to->setProp(name,p);
  2067. from->removeProp(name);
  2068. }
  2069. }
  2070. static CFileDescriptor * doDeserializePartFileDescriptors(MemoryBuffer &mb,IArrayOf<IPartDescriptor> *parts)
  2071. {
  2072. size32_t savepos = mb.getPos();
  2073. byte version;
  2074. mb.read(version);
  2075. mb.reset(savepos);
  2076. if (version==SERIALIZATION_VERSION2) // its super
  2077. return new CSuperFileDescriptor(mb,parts);
  2078. return new CFileDescriptor(mb,parts);
  2079. }
  2080. extern da_decl void deserializePartFileDescriptors(MemoryBuffer &mb,IArrayOf<IPartDescriptor> &parts)
  2081. {
  2082. Owned<CFileDescriptor> parent = doDeserializePartFileDescriptors(mb,&parts);
  2083. }
  2084. IPartDescriptor *deserializePartFileDescriptor(MemoryBuffer &mb)
  2085. {
  2086. IArrayOf<IPartDescriptor> parts;
  2087. Owned<CFileDescriptor> parent = doDeserializePartFileDescriptors(mb,&parts);
  2088. if (parts.ordinality()!=1)
  2089. ERRLOG("deserializePartFileDescriptor deserializing multiple parts not single part");
  2090. if (parts.ordinality()==0)
  2091. return NULL;
  2092. return LINK(&parts.item(0));
  2093. }
  2094. IFileDescriptor *createFileDescriptor(const char *lname,IGroup *grp,IPropertyTree *tree,DFD_OS os,unsigned width)
  2095. {
  2096. // only handles 1 copy
  2097. IFileDescriptor *res = createFileDescriptor(tree);
  2098. res->setTraceName(lname);
  2099. StringBuffer dir;
  2100. makePhysicalPartName(lname, 0, 0, dir,false,os);
  2101. res->setDefaultDir(dir.str());
  2102. if (width==0)
  2103. width = grp->ordinality();
  2104. StringBuffer s;
  2105. for (unsigned i=0;i<width;i++) {
  2106. makePhysicalPartName(lname, i+1, width, s.clear(),false,os);
  2107. RemoteFilename rfn;
  2108. rfn.setPath(grp->queryNode(i%grp->ordinality()).endpoint(),s.str());
  2109. res->setPart(i,rfn,NULL);
  2110. }
  2111. ClusterPartDiskMapSpec map; // use defaults
  2112. map.defaultCopies = DFD_DefaultCopies;
  2113. res->endCluster(map);
  2114. return res;
  2115. }
  2116. IFileDescriptor *deserializeFileDescriptor(MemoryBuffer &mb)
  2117. {
  2118. return doDeserializePartFileDescriptors(mb,NULL);
  2119. }
  2120. IFileDescriptor *deserializeFileDescriptorTree(IPropertyTree *tree, INamedGroupStore *resolver, unsigned flags)
  2121. {
  2122. return new CFileDescriptor(tree, resolver, flags);
  2123. }
  2124. inline bool validFNameChar(char c)
  2125. {
  2126. static const char *invalids = "*\"/:<>?\\|";
  2127. return (c>=32 && c<127 && !strchr(invalids, c));
  2128. }
  2129. static const char * defaultWindowsBaseDirectories[__grp_size][MAX_REPLICATION_LEVELS] =
  2130. {
  2131. { "c:\\thordata", "d:\\thordata" },
  2132. { "c:\\thordata", "d:\\thordata" },
  2133. { "c:\\roxiedata", "d:\\roxiedata" },
  2134. { "c:\\hthordata", "d:\\hthordata" },
  2135. { "c:\\hthordata", "d:\\hthordata" },
  2136. };
  2137. static const char * defaultUnixBaseDirectories[__grp_size][MAX_REPLICATION_LEVELS] =
  2138. {
  2139. { "/var/lib/HPCCSystems/hpcc-data/thor", "/var/lib/HPCCSystems/hpcc-mirror/thor" },
  2140. { "/var/lib/HPCCSystems/hpcc-data/thor", "/var/lib/HPCCSystems/hpcc-mirror/thor" },
  2141. { "/var/lib/HPCCSystems/hpcc-data/roxie", "/var/lib/HPCCSystems/hpcc-data2/roxie", "/var/lib/HPCCSystems/hpcc-data3/roxie", "/var/lib/HPCCSystems/hpcc-data4/roxie" },
  2142. { "/var/lib/HPCCSystems/hpcc-data/eclagent", "/var/lib/HPCCSystems/hpcc-mirror/eclagent" },
  2143. { "/var/lib/HPCCSystems/hpcc-data/unknown", "/var/lib/HPCCSystems/hpcc-mirror/unknown" },
  2144. };
  2145. static const char *componentNames[__grp_size] =
  2146. {
  2147. "thor", "thor", "roxie", "eclagent", "unknown"
  2148. };
  2149. static const char *dirTypeNames[MAX_REPLICATION_LEVELS] =
  2150. {
  2151. "data", "data2", "data3", "data4"
  2152. };
  2153. static StringAttr windowsBaseDirectories[__grp_size][MAX_REPLICATION_LEVELS];
  2154. static StringAttr unixBaseDirectories[__grp_size][MAX_REPLICATION_LEVELS];
  2155. static StringAttr defaultpartmask("$L$._$P$_of_$N$");
  2156. static SpinLock ldbSpin;
  2157. static bool ldbDone = false;
  2158. void loadDefaultBases()
  2159. {
  2160. SpinBlock b(ldbSpin);
  2161. if (ldbDone)
  2162. return;
  2163. ldbDone = true;
  2164. SessionId mysessid = myProcessSession();
  2165. if (mysessid)
  2166. {
  2167. Owned<IRemoteConnection> conn = querySDS().connect("/Environment/Software/Directories", mysessid, RTM_LOCK_READ, SDS_CONNECT_TIMEOUT);
  2168. if (conn) {
  2169. IPropertyTree* dirs = conn->queryRoot();
  2170. for (unsigned groupType = 0; groupType < __grp_size; groupType++)
  2171. {
  2172. const char *component = componentNames[groupType];
  2173. for (unsigned replicationLevel = 0; replicationLevel < MAX_REPLICATION_LEVELS; replicationLevel++)
  2174. {
  2175. StringBuffer dirout;
  2176. const char *dirType = dirTypeNames[replicationLevel];
  2177. if (replicationLevel==1 && groupType!=grp_roxie)
  2178. dirType = "mirror";
  2179. if (getConfigurationDirectory(dirs, dirType, component,
  2180. "dummy", // NB this is dummy value (but actually hopefully not used anyway)
  2181. dirout))
  2182. unixBaseDirectories[groupType][replicationLevel].set(dirout.str());
  2183. }
  2184. }
  2185. }
  2186. }
  2187. for (unsigned groupType = 0; groupType < __grp_size; groupType++)
  2188. for (unsigned replicationLevel = 0; replicationLevel < MAX_REPLICATION_LEVELS; replicationLevel++)
  2189. {
  2190. if (unixBaseDirectories[groupType][replicationLevel].isEmpty())
  2191. unixBaseDirectories[groupType][replicationLevel].set(defaultUnixBaseDirectories[groupType][replicationLevel]);
  2192. if (windowsBaseDirectories[groupType][replicationLevel].isEmpty())
  2193. windowsBaseDirectories[groupType][replicationLevel].set(defaultWindowsBaseDirectories[groupType][replicationLevel]);
  2194. }
  2195. }
  2196. const char *queryBaseDirectory(GroupType groupType, unsigned replicateLevel, DFD_OS os)
  2197. {
  2198. if (os==DFD_OSdefault)
  2199. #ifdef _WIN32
  2200. os = DFD_OSwindows;
  2201. #else
  2202. os = DFD_OSunix;
  2203. #endif
  2204. assertex(replicateLevel < MAX_REPLICATION_LEVELS);
  2205. loadDefaultBases();
  2206. switch (os)
  2207. {
  2208. case DFD_OSwindows:
  2209. return windowsBaseDirectories[groupType][replicateLevel];
  2210. case DFD_OSunix:
  2211. return unixBaseDirectories[groupType][replicateLevel];
  2212. }
  2213. return NULL;
  2214. }
  2215. void setBaseDirectory(const char * dir, unsigned replicateLevel, DFD_OS os)
  2216. {
  2217. // 2 possibilities
  2218. // either its an absolute path
  2219. // or use /c$/thordata and /d$/thordata
  2220. if (os==DFD_OSdefault)
  2221. #ifdef _WIN32
  2222. os = DFD_OSwindows;
  2223. #else
  2224. os = DFD_OSunix;
  2225. #endif
  2226. assertex(replicateLevel < MAX_REPLICATION_LEVELS);
  2227. loadDefaultBases();
  2228. StringBuffer out;
  2229. if (!dir||!*dir||!isAbsolutePath(dir))
  2230. throw MakeStringException(-1,"setBaseDirectory(%s) requires an absolute path",dir ? dir : "null");
  2231. size32_t l = strlen(dir);
  2232. if ((l>3)&&(isPathSepChar(dir[l-1])))
  2233. l--;
  2234. switch (os) {
  2235. case DFD_OSwindows:
  2236. windowsBaseDirectories[grp_unknown][replicateLevel].set(dir,l);
  2237. break;
  2238. case DFD_OSunix:
  2239. unixBaseDirectories[grp_unknown][replicateLevel].set(dir,l);
  2240. break;
  2241. }
  2242. }
  2243. const char *queryPartMask()
  2244. {
  2245. return defaultpartmask.get();
  2246. }
  2247. void setPartMask(const char * mask)
  2248. {
  2249. defaultpartmask.set(mask);
  2250. }
  2251. StringBuffer &getPartMask(StringBuffer &ret,const char *lname,unsigned partmax)
  2252. {
  2253. // ret is in *and* out
  2254. StringAttr tmp;
  2255. const char *m;
  2256. if (!ret.length())
  2257. m = queryPartMask();
  2258. else {
  2259. tmp.set(ret.str());
  2260. m = tmp.get();
  2261. ret.clear();
  2262. }
  2263. StringBuffer lns;
  2264. if (lname) {
  2265. bool maybequery = false;
  2266. const char *lnamebase = lname;
  2267. loop {
  2268. const char *e = strstr(lname,"::");
  2269. if (!e)
  2270. break;
  2271. lname = e+2;
  2272. if (*lname=='>')
  2273. maybequery = true;
  2274. }
  2275. if (maybequery) {
  2276. CDfsLogicalFileName lfn;
  2277. lfn.set(lnamebase);
  2278. if (lfn.isQuery()) {
  2279. RemoteFilename rfn;
  2280. lfn.getExternalFilename(rfn);
  2281. StringBuffer path;
  2282. rfn.getPath(path);
  2283. // start at third separator
  2284. const char *s = path.str();
  2285. unsigned si = 0;
  2286. while (*s&&(si!=3)) {
  2287. if (isPathSepChar(*s))
  2288. si++;
  2289. s++;
  2290. }
  2291. return ret.append(s);
  2292. }
  2293. }
  2294. char c;
  2295. const char *l = lname;
  2296. while ((c=*(l++))!=0) {
  2297. if (validFNameChar(c))
  2298. lns.append(c);
  2299. else
  2300. lns.appendf("%%%.2X", (int) c);
  2301. }
  2302. lns.trim().toLowerCase();
  2303. }
  2304. else if (!partmax)
  2305. return ret.append(m);
  2306. char c;
  2307. while ((c=*(m++))!=0) {
  2308. if (c=='$') {
  2309. char pc = toupper(m[0]);
  2310. if (pc&&(m[1]=='$')) {
  2311. switch (pc) {
  2312. case 'L':
  2313. if (lname) {
  2314. ret.append(lns.str());
  2315. m+=2;
  2316. continue;
  2317. }
  2318. case 'N':
  2319. if (partmax) {
  2320. ret.append(partmax);
  2321. m+=2;
  2322. continue;
  2323. }
  2324. }
  2325. }
  2326. }
  2327. ret.append(c);
  2328. }
  2329. return ret;
  2330. }
  2331. inline const char *skipRoot(const char *lname)
  2332. {
  2333. loop {
  2334. while (*lname==' ')
  2335. lname++;
  2336. if (*lname!='.')
  2337. break;
  2338. const char *s = lname+1;
  2339. while (*s==' ')
  2340. s++;
  2341. if (!*s)
  2342. lname = s;
  2343. else if ((s[0]==':')&&(s[1]==':'))
  2344. lname = s+2;
  2345. else
  2346. break;
  2347. }
  2348. return lname;
  2349. }
  2350. StringBuffer &makePhysicalPartName(const char *lname, unsigned partno, unsigned partmax, StringBuffer &result, unsigned replicateLevel, DFD_OS os,const char *diroverride)
  2351. {
  2352. assertex(lname);
  2353. if (strstr(lname,"::>")) { // probably query
  2354. CDfsLogicalFileName lfn;
  2355. lfn.set(lname);
  2356. if (lfn.isQuery()) {
  2357. RemoteFilename rfn;
  2358. lfn.getExternalFilename(rfn);
  2359. StringBuffer path;
  2360. rfn.getPath(path);
  2361. // query start at third separator
  2362. const char *s = path.str();
  2363. const char *sb = s;
  2364. unsigned si = 0;
  2365. while (*s&&(si!=3)) {
  2366. if (isPathSepChar(*s)) {
  2367. if (os!=DFD_OSdefault)
  2368. path.setCharAt(s-sb,OsSepChar(os));
  2369. si++;
  2370. }
  2371. s++;
  2372. }
  2373. if (partno==0)
  2374. return result.append(s-sb,sb);
  2375. return result.append(sb);
  2376. }
  2377. }
  2378. if (diroverride&&*diroverride) {
  2379. if (os==DFD_OSdefault)
  2380. os = SepCharBaseOs(getPathSepChar(diroverride));
  2381. result.append(diroverride);
  2382. }
  2383. else
  2384. result.append(queryBaseDirectory(grp_unknown, replicateLevel, os));
  2385. size32_t l = result.length();
  2386. if ((l>3)&&(result.charAt(l-1)!=OsSepChar(os))) {
  2387. result.append(OsSepChar(os));
  2388. l++;
  2389. }
  2390. lname = skipRoot(lname);
  2391. char c;
  2392. while ((c=*(lname++))!=0) {
  2393. if ((c==':')&&(*lname==':')) {
  2394. lname++;
  2395. result.clip().append(OsSepChar(os));
  2396. l = result.length();
  2397. lname = skipRoot(lname);
  2398. }
  2399. else if (validFNameChar(c))
  2400. result.append((char)tolower(c));
  2401. else
  2402. result.appendf("%%%.2X", (int) c);
  2403. }
  2404. if (partno==0) { // just return directory (with trailing PATHSEP)
  2405. result.setLength(l);
  2406. }
  2407. else {
  2408. #ifndef INCLUDE_1_OF_1
  2409. if (partmax>1) // avoid 1_of_1
  2410. #endif
  2411. {
  2412. StringBuffer tail(result.str()+l);
  2413. tail.trim();
  2414. result.setLength(l);
  2415. const char *m = queryPartMask();
  2416. while ((c=*(m++))!=0) {
  2417. if (c=='$') {
  2418. char pc = toupper(m[0]);
  2419. if (pc&&(m[1]=='$')) {
  2420. switch (pc) {
  2421. case 'P':
  2422. result.append(partno);
  2423. m+=2;
  2424. continue;
  2425. case 'N':
  2426. result.append(partmax);
  2427. m+=2;
  2428. continue;
  2429. case 'L':
  2430. result.append(tail);
  2431. m+=2;
  2432. continue;
  2433. }
  2434. }
  2435. }
  2436. result.append(c);
  2437. }
  2438. }
  2439. }
  2440. return result.clip();
  2441. }
  2442. StringBuffer &makeSinglePhysicalPartName(const char *lname, StringBuffer &result, bool allowospath, bool &wasdfs,const char *diroverride)
  2443. {
  2444. wasdfs = !(allowospath&&(isAbsolutePath(lname)||(stdIoHandle(lname)>=0)));
  2445. if (wasdfs)
  2446. return makePhysicalPartName(lname, 1, 1, result, false, DFD_OSdefault,diroverride);
  2447. return result.append(lname);
  2448. }
  2449. bool setReplicateDir(const char *dir,StringBuffer &out,bool isrep,const char *baseDir,const char *repDir)
  2450. {
  2451. // assumes that dir contains a separator (like base)
  2452. if (!dir)
  2453. return false;
  2454. const char *sep=findPathSepChar(dir);
  2455. if (!sep)
  2456. return false;
  2457. DFD_OS os = SepCharBaseOs(*sep);
  2458. const char *d = baseDir?baseDir:queryBaseDirectory(grp_unknown, isrep ? 0 : 1,os);
  2459. if (!d)
  2460. return false;
  2461. unsigned match = 0;
  2462. unsigned count = 0;
  2463. unsigned i;
  2464. for (i=0;d[i]&&dir[i]&&(d[i]==dir[i]);i++)
  2465. if (isPathSepChar(dir[i])) {
  2466. match = i;
  2467. count++;
  2468. }
  2469. const char *r = repDir?repDir:queryBaseDirectory(grp_unknown, isrep ? 1 : 0,os);
  2470. if (d[i]==0) {
  2471. if ((dir[i]==0)||isPathSepChar(dir[i])) {
  2472. out.append(r).append(dir+i);
  2473. return true;
  2474. }
  2475. }
  2476. else if (count) { // this is a bit of a kludge to handle roxie backup
  2477. const char *s = r;
  2478. const char *b = s;
  2479. while (s&&*s) {
  2480. if (isPathSepChar(*s)) {
  2481. if (--count==0) {
  2482. out.append(s-b,b).append(dir+match);
  2483. return true;
  2484. }
  2485. }
  2486. s++;
  2487. }
  2488. }
  2489. return false;
  2490. }
  2491. IFileDescriptor *createMultiCopyFileDescriptor(IFileDescriptor *in,unsigned num)
  2492. {
  2493. Owned<IFileDescriptor> out = createFileDescriptor(createPTreeFromIPT(&in->queryProperties()));
  2494. IPropertyTree &t = out->queryProperties();
  2495. __int64 rc = t.getPropInt64("@recordCount",-1);
  2496. if (rc>0)
  2497. t.setPropInt64("@recordCount",rc*num);
  2498. __int64 sz = t.getPropInt64("@size",-1);
  2499. if (sz>0)
  2500. t.setPropInt64("@size",sz*num);
  2501. Owned<IPartDescriptorIterator> iter=in->getIterator();
  2502. unsigned n = 0;
  2503. while (num--) {
  2504. if (iter->first()) {
  2505. do {
  2506. IPartDescriptor &part = iter->query();
  2507. RemoteFilename rfn;
  2508. part.getFilename(0,rfn);
  2509. out->setPart(n,rfn,&part.queryProperties());
  2510. n++;
  2511. } while (iter->next());
  2512. }
  2513. }
  2514. return out.getClear();
  2515. }
  2516. void removePartFiles(IFileDescriptor *desc,IMultiException *mexcept)
  2517. {
  2518. if (!desc)
  2519. return;
  2520. CriticalSection crit;
  2521. class casyncfor: public CAsyncFor
  2522. {
  2523. CriticalSection &crit;
  2524. IMultiException *mexcept;
  2525. IFileDescriptor *parent;
  2526. public:
  2527. casyncfor(IFileDescriptor *_parent,IMultiException *_mexcept,CriticalSection &_crit)
  2528. : crit(_crit)
  2529. {
  2530. parent = _parent;
  2531. mexcept = _mexcept;
  2532. }
  2533. void Do(unsigned i)
  2534. {
  2535. CriticalBlock block(crit);
  2536. unsigned nc = parent->numCopies(i);
  2537. for (unsigned copy = 0; copy < nc; copy++) {
  2538. RemoteFilename rfn;
  2539. parent->getFilename(i,copy,rfn);
  2540. Owned<IFile> partfile = createIFile(rfn);
  2541. StringBuffer eps;
  2542. try
  2543. {
  2544. unsigned start = msTick();
  2545. CriticalUnblock unblock(crit);
  2546. if (partfile->remove()) {
  2547. // PROGLOG("Removed '%s'",partfile->queryFilename());
  2548. unsigned t = msTick()-start;
  2549. if (t>60*1000)
  2550. LOG(MCwarning, unknownJob, "Removing %s from %s took %ds", partfile->queryFilename(), rfn.queryEndpoint().getUrlStr(eps).str(), t/1000);
  2551. }
  2552. // else
  2553. // LOG(MCwarning, unknownJob, "Failed to remove file part %s from %s", partfile->queryFilename(),rfn.queryEndpoint().getUrlStr(eps).str());
  2554. }
  2555. catch (IException *e)
  2556. {
  2557. if (mexcept)
  2558. mexcept->append(*e);
  2559. else {
  2560. StringBuffer s("Failed to remove file part ");
  2561. s.append(partfile->queryFilename()).append(" from ");
  2562. rfn.queryEndpoint().getUrlStr(s);
  2563. EXCLOG(e, s.str());
  2564. e->Release();
  2565. }
  2566. }
  2567. }
  2568. }
  2569. } afor(desc,mexcept,crit);
  2570. afor.For(desc->numParts(),10,false,true);
  2571. }
  2572. StringBuffer &setReplicateFilename(StringBuffer &filename,unsigned drvnum,const char *baseDir,const char *repDir)
  2573. {
  2574. if (!drvnum)
  2575. return filename; //do nothing!
  2576. StringBuffer tmp(filename); // bit klunky
  2577. if (strcmp(swapPathDrive(tmp,0,drvnum).str(),filename.str())!=0)
  2578. tmp.swapWith(filename);
  2579. else if (drvnum==1) { // OSS
  2580. if(setReplicateDir(filename.str(),tmp.clear(),true,baseDir,repDir))
  2581. tmp.swapWith(filename);
  2582. }
  2583. return filename;
  2584. }
  2585. IGroup *shrinkRepeatedGroup(IGroup *grp)
  2586. {
  2587. if (!grp)
  2588. return NULL;
  2589. unsigned w = grp->ordinality();
  2590. for (unsigned i=1;i<w;i++) {
  2591. unsigned j;
  2592. for (j=i;j<w;j++)
  2593. if (!grp->queryNode(j).equals(&grp->queryNode(j%i)))
  2594. break;
  2595. if (j==w)
  2596. return grp->subset(0U,i);
  2597. }
  2598. return LINK(grp);
  2599. }
  2600. IFileDescriptor *createFileDescriptorFromRoxieXML(IPropertyTree *tree,const char *clustername)
  2601. {
  2602. if (!tree)
  2603. return NULL;
  2604. bool iskey = (strcmp(tree->queryName(),"Key")==0);
  2605. Owned<IPropertyTree> attr = createPTree("Attr");
  2606. Owned<IFileDescriptor> res = createFileDescriptor(attr.getLink());
  2607. const char *id = tree->queryProp("@id");
  2608. if (id) {
  2609. if (*id=='~')
  2610. id++;
  2611. res->setTraceName(id);
  2612. }
  2613. else
  2614. id = "";
  2615. const char *dir = tree->queryProp("@directory");
  2616. if (!dir||!*dir)
  2617. throw MakeStringException(-1,"createFileDescriptorFromRoxie: %s missing directory",id);
  2618. const char *mask = tree->queryProp("@partmask");
  2619. if (!mask||!*mask)
  2620. throw MakeStringException(-1,"createFileDescriptorFromRoxie: %s missing part mask",id);
  2621. unsigned np = tree->getPropInt("@numparts");
  2622. IPropertyTree *part1 = tree->queryPropTree("Part_1");
  2623. if (!part1)
  2624. throw MakeStringException(-1,"createFileDescriptorFromRoxie: %s missing part 1",id);
  2625. // assume same number of copies for all parts
  2626. unsigned nc = 0;
  2627. StringBuffer xpath;
  2628. StringBuffer locpath;
  2629. StringArray locdirs;
  2630. loop {
  2631. IPropertyTree *loc = part1->queryPropTree(xpath.clear().appendf("Loc[%d]",nc+1));
  2632. if (!loc)
  2633. break;
  2634. const char *path = loc->queryProp("@path");
  2635. if (!path)
  2636. throw MakeStringException(-1,"createFileDescriptorFromRoxie: %s missing part 1 loc path",id);
  2637. RemoteFilename rfn;
  2638. rfn.setRemotePath(path);
  2639. if (rfn.queryEndpoint().isNull())
  2640. break;
  2641. locdirs.append(rfn.getLocalPath(locpath.clear()).str());
  2642. nc++;
  2643. }
  2644. if (!nc)
  2645. throw MakeStringException(-1,"createFileDescriptorFromRoxie: %s missing part 1 Loc",id);
  2646. StringBuffer fulldir(locdirs.item(0));
  2647. addPathSepChar(fulldir).append(tree->queryProp("@directory"));
  2648. res->setDefaultDir(fulldir.str());
  2649. // create a group
  2650. SocketEndpointArray *epa = new SocketEndpointArray[nc];
  2651. for (unsigned p=1;p<=np;p++) {
  2652. IPropertyTree *part = tree->queryPropTree(xpath.clear().appendf("Part_%d",p));
  2653. if (!part)
  2654. throw MakeStringException(-1,"createFileDescriptorFromRoxie: %s missing part %d",id,p);
  2655. if (iskey&&(p==np)&&(np>1)) // leave off tlk
  2656. continue;
  2657. unsigned c;
  2658. for(c = 0;c<nc;c++) {
  2659. IPropertyTree *loc = part->queryPropTree(xpath.clear().appendf("Loc[%d]",c+1));
  2660. if (loc) {
  2661. const char *path = loc->queryProp("@path");
  2662. if (!path)
  2663. throw MakeStringException(-1,"createFileDescriptorFromRoxie: %s missing part %d loc path",id,p);
  2664. RemoteFilename rfn;
  2665. rfn.setRemotePath(path);
  2666. bool found = false;
  2667. ForEachItemIn(d,locdirs) {
  2668. if (strcmp(rfn.getLocalPath(locpath.clear()).str(),locdirs.item(d))==0) {
  2669. SocketEndpoint ep = rfn.queryEndpoint();
  2670. if (ep.port==DAFILESRV_PORT || ep.port==SECURE_DAFILESRV_PORT)
  2671. ep.port = 0;
  2672. epa[d].append(ep);
  2673. found = true;
  2674. break;
  2675. }
  2676. }
  2677. }
  2678. else
  2679. ERRLOG("createFileDescriptorFromRoxie: %s missing part %s",id,xpath.str());
  2680. }
  2681. }
  2682. res->setPartMask(mask);
  2683. res->setNumParts(np);
  2684. SocketEndpointArray merged; // this is a bit odd but needed for when num parts smaller than cluster width
  2685. ForEachItemIn(ei1,epa[0])
  2686. merged.append(epa[0].item(ei1));
  2687. for (unsigned enc=1;enc<nc;enc++) { // not quick! (n^2)
  2688. ForEachItemIn(ei2,epa[enc]) {
  2689. SocketEndpoint ep = epa[enc].item(ei2);
  2690. ForEachItemIn(ei3,merged) {
  2691. if (!merged.item(ei3).equals(ep)) {
  2692. merged.append(ep);
  2693. break;
  2694. }
  2695. }
  2696. }
  2697. }
  2698. Owned<IGroup> epgrp = createIGroup(merged);
  2699. Owned<IGroup> grp = shrinkRepeatedGroup(epgrp);
  2700. // find replication offset
  2701. ClusterPartDiskMapSpec map;
  2702. if (nc) {
  2703. map.replicateOffset = 0;
  2704. unsigned i2;
  2705. unsigned i3;
  2706. loop {
  2707. for (i2=1;i2<nc;i2++) {
  2708. for (i3=0;i3<epa[i2].ordinality();i3++) {
  2709. INode &node = grp->queryNode((i3+map.replicateOffset*i2)%grp->ordinality());
  2710. if (!node.endpoint().equals(epa[i2].item(i3)))
  2711. break;
  2712. }
  2713. if (i3<epa[i2].ordinality())
  2714. break;
  2715. }
  2716. if (i2==nc)
  2717. break;
  2718. map.replicateOffset++;
  2719. if (map.replicateOffset==grp->ordinality())
  2720. throw MakeStringException(-1,"createFileDescriptorFromRoxie: %s cannot determine replication offset",id);
  2721. }
  2722. }
  2723. map.defaultCopies = nc;
  2724. if (iskey) {
  2725. map.repeatedPart = (np-1);
  2726. map.flags |= CPDMSF_repeatedPart;
  2727. }
  2728. if (clustername) {
  2729. #if 0
  2730. Owned<IGroup> cgrp = queryNamedGroupStore().lookup(clustername);
  2731. if (!cgrp)
  2732. throw MakeStringException(-1,"createFileDescriptorFromRoxieXML: Cluster %s not found",clustername);
  2733. if (!cgrp->equals(grp))
  2734. throw MakeStringException(-1,"createFileDescriptorFromRoxieXML: Cluster %s does not match XML",clustername);
  2735. #endif
  2736. res->addCluster(clustername,grp,map);
  2737. }
  2738. else
  2739. res->addCluster(grp,map);
  2740. delete [] epa;
  2741. for (unsigned p=1;;p++) {
  2742. IPropertyTree *part = tree->queryPropTree(xpath.clear().appendf("Part_%d",p));
  2743. if (!part)
  2744. break;
  2745. IPropertyTree &pprop = res->queryPart(p-1)->queryProperties();
  2746. StringBuffer ps;
  2747. if (part->getProp("@crc",ps.clear())&&ps.length())
  2748. pprop.setProp("@fileCRC",ps.str());
  2749. if (part->getProp("@size",ps.clear())&&ps.length())
  2750. pprop.setProp("@size",ps.str());
  2751. if (part->getProp("@modified",ps.clear())&&ps.length())
  2752. pprop.setProp("@modified",ps.str());
  2753. if (iskey&&(p==np-1))
  2754. pprop.setProp("@kind","topLevelKey");
  2755. #ifdef _DEBUG // test parts match
  2756. unsigned c;
  2757. for(c = 0;c<nc;c++) {
  2758. IPropertyTree *loc = part->queryPropTree(xpath.clear().appendf("Loc[%d]",c+1));
  2759. if (loc) {
  2760. const char *path = loc->queryProp("@path");
  2761. if (!path)
  2762. throw MakeStringException(-1,"createFileDescriptorFromRoxie: %s missing part %d loc path",id,c+1);
  2763. StringBuffer fullpath(path);
  2764. addPathSepChar(fullpath).append(tree->queryProp("@directory"));
  2765. expandMask(addPathSepChar(fullpath),mask,p-1,np);
  2766. RemoteFilename rfn;
  2767. rfn.setRemotePath(fullpath.str());
  2768. if (!rfn.queryEndpoint().isNull()) {
  2769. unsigned c2;
  2770. for (c2=0;c2<res->numCopies(p-1);c2++) {
  2771. RemoteFilename rfn2;
  2772. res->getFilename(p-1,c2,rfn2);
  2773. if (rfn2.equals(rfn))
  2774. break;
  2775. StringBuffer tmp;
  2776. rfn2.getPath(tmp);
  2777. //PROGLOG("%s",tmp.str());
  2778. }
  2779. if (c2==res->numCopies(p-1)) {
  2780. res->numCopies(p-1);
  2781. PROGLOG("ERROR: createFileDescriptorFromRoxie [%d,%d] %s not found",p,c,fullpath.str());
  2782. }
  2783. }
  2784. }
  2785. }
  2786. #endif
  2787. }
  2788. IPropertyTree &fprop = res->queryProperties();
  2789. StringBuffer fps;
  2790. if (tree->getProp("@crc",fps.clear())&&fps.length())
  2791. fprop.setProp("@checkSum",fps.str());
  2792. if (tree->getProp("@recordCount",fps.clear())&&fps.length())
  2793. fprop.setProp("@recordCount",fps.str());
  2794. if (tree->getProp("@size",fps.clear())&&fps.length())
  2795. fprop.setProp("@size",fps.str());
  2796. if (tree->getProp("@formatCrc",fps.clear())&&fps.length())
  2797. fprop.setProp("@formatCrc",fps.str());
  2798. MemoryBuffer mb;
  2799. if (tree->getPropBin("_record_layout", mb))
  2800. fprop.setPropBin("_record_layout", mb.length(), mb.toByteArray());
  2801. if (iskey) {
  2802. fprop.setProp("@kind","key");
  2803. }
  2804. return res.getLink();
  2805. }