referencedfilelist.cpp 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "referencedfilelist.hpp"
  14. #include "jptree.hpp"
  15. #include "workunit.hpp"
  16. #include "eclhelper.hpp"
  17. #include "dautils.hpp"
  18. #include "dasds.hpp"
  19. #include "dadfs.hpp"
  20. #include "dasess.hpp"
  21. #define WF_LOOKUP_TIMEOUT (1000*15) // 15 seconds
  22. bool getIsOpt(const IPropertyTree &graphNode)
  23. {
  24. if (graphNode.hasProp("att[@name='_isOpt']"))
  25. return graphNode.getPropBool("att[@name='_isOpt']/@value", false);
  26. else
  27. return graphNode.getPropBool("att[@name='_isIndexOpt']/@value", false);
  28. }
  29. bool checkForeign(const char *lfn)
  30. {
  31. if (*lfn=='~')
  32. lfn++;
  33. static size_t l = strlen("foreign");
  34. if (strnicmp("foreign", lfn, l)==0)
  35. {
  36. lfn+=l;
  37. while (isspace(*lfn))
  38. lfn++;
  39. if (lfn[0]==':' && lfn[1]==':')
  40. return true;
  41. }
  42. return false;
  43. }
  44. const char *skipForeign(const char *name, StringBuffer *ip)
  45. {
  46. unsigned maxTildas = 2;
  47. while (maxTildas-- && *name=='~')
  48. name++;
  49. const char *d1 = strstr(name, "::");
  50. if (d1)
  51. {
  52. StringBuffer cmp;
  53. if (strieq("foreign", cmp.append(d1-name, name).trim().str()))
  54. {
  55. // foreign scope - need to strip off the ip and port
  56. d1 += 2; // skip ::
  57. const char *d2 = strstr(d1,"::");
  58. if (d2)
  59. {
  60. if (ip)
  61. ip->append(d2-d1, d1).trim();
  62. d2 += 2;
  63. while (*d2 == ' ')
  64. d2++;
  65. name = d2;
  66. }
  67. }
  68. }
  69. return name;
  70. }
  71. void splitDfsLocation(const char *address, StringBuffer &cluster, StringBuffer &ip, StringBuffer &prefix, const char *defaultCluster)
  72. {
  73. if (!address || !*address)
  74. {
  75. cluster.append(defaultCluster);
  76. return;
  77. }
  78. const char *s=strchr(address, '@');
  79. if (s)
  80. {
  81. cluster.append(s - address, address);
  82. address = s + 1;
  83. }
  84. else if (defaultCluster && *defaultCluster)
  85. cluster.append(defaultCluster);
  86. s=strchr(address, '/');
  87. if (!s)
  88. ip.append(address);
  89. else
  90. {
  91. ip.append(s - address, address);
  92. prefix.append(s+1);
  93. }
  94. }
  95. void splitDerivedDfsLocation(const char *address, StringBuffer &cluster, StringBuffer &ip, StringBuffer &prefix, const char *defaultCluster, const char *baseCluster, const char *baseIP, const char *basePrefix)
  96. {
  97. if (address && *address)
  98. {
  99. splitDfsLocation(address, cluster, ip, prefix, defaultCluster);
  100. return;
  101. }
  102. ip.append(baseIP);
  103. cluster.append(baseCluster);
  104. prefix.append(basePrefix);
  105. }
  106. class ReferencedFileList;
  107. class ReferencedFile : public CInterface, implements IReferencedFile
  108. {
  109. public:
  110. IMPLEMENT_IINTERFACE;
  111. ReferencedFile(const char *lfn, const char *sourceIP, const char *srcCluster, const char *prefix, bool isSubFile, unsigned _flags, const char *_pkgid, bool noDfs, bool calcSize)
  112. : flags(_flags), pkgid(_pkgid), noDfsResolution(noDfs), calcFileSize(calcSize), fileSize(0), numParts(0)
  113. {
  114. {
  115. //Scope ensures strings are assigned
  116. StringAttrBuilder logicalNameText(logicalName), daliipText(daliip);
  117. logicalNameText.set(skipForeign(lfn, &daliipText)).toLowerCase();
  118. }
  119. if (daliip.length())
  120. flags |= RefFileForeign;
  121. else
  122. daliip.set(sourceIP);
  123. fileSrcCluster.set(srcCluster);
  124. filePrefix.set(prefix);
  125. if (isSubFile)
  126. flags |= RefSubFile;
  127. }
  128. void reset()
  129. {
  130. flags &= (RefSubFile | RefFileIndex | RefFileForeign | RefFileSuper | RefSubFile | RefFileInPackage);
  131. }
  132. IPropertyTree *getRemoteFileTree(IUserDescriptor *user, INode *remote, const char *remotePrefix);
  133. IPropertyTree *getSpecifiedOrRemoteFileTree(IUserDescriptor *user, INode *remote, const char *remotePrefix);
  134. void processLocalFileInfo(IDistributedFile *df, const char *dstCluster, const char *srcCluster, StringArray *subfiles);
  135. void processRemoteFileTree(IPropertyTree *tree, const char *srcCluster, StringArray *subfiles);
  136. void resolveLocal(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, StringArray *subfiles);
  137. void resolveRemote(IUserDescriptor *user, INode *remote, const char *remotePrefix, const char *dstCluster, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveForeign=false);
  138. void resolve(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, INode *remote, const char *remotePrefix, bool checkLocalFirst, StringArray *subfiles, bool resolveForeign=false);
  139. virtual const char *getLogicalName() const {return logicalName.str();}
  140. virtual unsigned getFlags() const {return flags;}
  141. virtual const SocketEndpoint &getForeignIP(SocketEndpoint &ep) const
  142. {
  143. if ((flags & RefFileForeign) && daliip.length())
  144. ep.set(daliip.str());
  145. else
  146. ep.set(NULL);
  147. return ep;
  148. }
  149. virtual void cloneInfo(IDFUhelper *helper, IUserDescriptor *user, const char *dstCluster, const char *srcCluster, bool overwrite, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder);
  150. void cloneSuperInfo(ReferencedFileList *list, IUserDescriptor *user, INode *remote, bool overwrite);
  151. virtual const char *queryPackageId() const {return pkgid.get();}
  152. virtual __int64 getFileSize()
  153. {
  154. return fileSize;
  155. }
  156. virtual unsigned getNumParts()
  157. {
  158. return numParts;
  159. }
  160. public:
  161. StringAttr logicalName;
  162. StringAttr pkgid;
  163. StringAttr daliip;
  164. StringAttr filePrefix;
  165. StringAttr fileSrcCluster;
  166. __int64 fileSize;
  167. unsigned numParts;
  168. unsigned flags;
  169. bool noDfsResolution;
  170. bool calcFileSize;
  171. };
  172. class ReferencedFileList : public CInterface, implements IReferencedFileList
  173. {
  174. public:
  175. IMPLEMENT_IINTERFACE;
  176. ReferencedFileList(const char *username, const char *pw, bool allowForeignFiles, bool allowFileSizeCalc)
  177. : allowForeign(allowForeignFiles), allowSizeCalc(allowFileSizeCalc)
  178. {
  179. if (username && pw)
  180. {
  181. user.setown(createUserDescriptor());
  182. user->set(username, pw);
  183. }
  184. }
  185. ReferencedFileList(IUserDescriptor *userDesc, bool allowForeignFiles, bool allowFileSizeCalc)
  186. : allowForeign(allowForeignFiles), allowSizeCalc(allowFileSizeCalc)
  187. {
  188. if (userDesc)
  189. user.set(userDesc);
  190. }
  191. void ensureFile(const char *ln, unsigned flags, const char *pkgid, bool noDfsResolution, const char *daliip=NULL, const char *srcCluster=NULL, const char *remotePrefix=NULL);
  192. virtual void addFile(const char *ln, const char *daliip=NULL, const char *srcCluster=NULL, const char *remotePrefix=NULL);
  193. virtual void addFiles(StringArray &files);
  194. virtual void addFilesFromWorkUnit(IConstWorkUnit *cw);
  195. virtual void addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackageMap *pm, const char *queryid);
  196. virtual void addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackage *pkg);
  197. virtual void addFilesFromPackageMap(IPropertyTree *pm);
  198. void addFileFromSubFile(IPropertyTree &subFile, const char *_daliip, const char *srcCluster, const char *_remotePrefix);
  199. void addFilesFromSuperFile(IPropertyTree &superFile, const char *_daliip, const char *srcCluster, const char *_remotePrefix);
  200. void addFilesFromPackage(IPropertyTree &package, const char *_daliip, const char *srcCluster, const char *_remotePrefix);
  201. virtual IReferencedFileIterator *getFiles();
  202. virtual void cloneFileInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder);
  203. virtual void cloneRelationships();
  204. virtual void cloneAllInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder)
  205. {
  206. cloneFileInfo(helper, overwrite, cloneSuperInfo, cloneForeign, redundancy, channelsPerNode, replicateOffset, defReplicateFolder);
  207. cloneRelationships();
  208. }
  209. virtual void resolveFiles(const char *process, const char *remoteIP, const char *_remotePrefix, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool resolveForeign=false);
  210. void resolveSubFiles(StringArray &subfiles, bool checkLocalFirst, bool resolveForeign);
  211. public:
  212. Owned<IUserDescriptor> user;
  213. Owned<INode> remote;
  214. MapStringToMyClass<ReferencedFile> map;
  215. StringAttr process;
  216. StringAttr srcCluster;
  217. StringAttr remotePrefix;
  218. bool allowForeign;
  219. bool allowSizeCalc;
  220. };
  221. void ReferencedFile::processLocalFileInfo(IDistributedFile *df, const char *dstCluster, const char *srcCluster, StringArray *subfiles)
  222. {
  223. IDistributedSuperFile *super = df->querySuperFile();
  224. if (super)
  225. {
  226. flags |= RefFileSuper;
  227. if (subfiles)
  228. {
  229. Owned<IDistributedFileIterator> it = super->getSubFileIterator(true); //supersub = true, no need to deal with LOCAL supersubs
  230. ForEach(*it)
  231. {
  232. IDistributedFile &sub = it->query();
  233. StringBuffer name;
  234. sub.getLogicalName(name);
  235. subfiles->append(name.str());
  236. }
  237. }
  238. }
  239. else
  240. {
  241. flags |= RefSubFile;
  242. if (!dstCluster || !*dstCluster)
  243. return;
  244. if (df->findCluster(dstCluster)==NotFound)
  245. flags |= RefFileNotOnCluster;
  246. if (fileSrcCluster.length())
  247. srcCluster=fileSrcCluster;
  248. if (srcCluster && *srcCluster)
  249. if (NotFound == df->findCluster(srcCluster))
  250. flags |= RefFileNotOnSource;
  251. fileSize = df->getFileSize(calcFileSize, false);
  252. numParts = df->numParts();
  253. }
  254. }
  255. void ReferencedFile::processRemoteFileTree(IPropertyTree *tree, const char *srcCluster, StringArray *subfiles)
  256. {
  257. flags |= RefFileRemote;
  258. if (fileSrcCluster.length())
  259. srcCluster = fileSrcCluster;
  260. if (streq(tree->queryName(), queryDfsXmlBranchName(DXB_SuperFile)))
  261. {
  262. flags |= RefFileSuper;
  263. if (subfiles)
  264. {
  265. Owned<IPropertyTreeIterator> it = tree->getElements("SubFile");
  266. ForEach(*it)
  267. {
  268. const char *lfn = it->query().queryProp("@name");
  269. StringBuffer foreignLfn;
  270. if (flags & RefFileForeign)
  271. lfn = foreignLfn.append("foreign::").append(this->daliip).append("::").append(lfn).str();
  272. subfiles->append(lfn);
  273. }
  274. }
  275. }
  276. else if (srcCluster && *srcCluster)
  277. {
  278. VStringBuffer xpath("Cluster[@name='%s']", srcCluster);
  279. if (!tree->hasProp(xpath))
  280. flags |= RefFileNotOnSource;
  281. numParts = tree->getPropInt("@numparts", 0);
  282. fileSize = tree->getPropInt64("Attr/@size", 0);
  283. }
  284. }
  285. void ReferencedFile::resolveLocal(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, StringArray *subfiles)
  286. {
  287. if (flags & RefFileInPackage)
  288. return;
  289. if (noDfsResolution)
  290. {
  291. flags |= RefFileNotFound;
  292. return;
  293. }
  294. reset();
  295. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName.str(), user);
  296. if(df)
  297. processLocalFileInfo(df, dstCluster, srcCluster, subfiles);
  298. else
  299. flags |= RefFileNotFound;
  300. }
  301. IPropertyTree *ReferencedFile::getRemoteFileTree(IUserDescriptor *user, INode *remote, const char *remotePrefix)
  302. {
  303. if (!remote)
  304. return NULL;
  305. StringBuffer remoteLFN;
  306. if (remotePrefix && *remotePrefix)
  307. remoteLFN.append(remotePrefix).append("::").append(logicalName);
  308. return queryDistributedFileDirectory().getFileTree(remoteLFN.length() ? remoteLFN.str() : logicalName.str(), user, remote, WF_LOOKUP_TIMEOUT, false, false);
  309. }
  310. IPropertyTree *ReferencedFile::getSpecifiedOrRemoteFileTree(IUserDescriptor *user, INode *remote, const char *remotePrefix)
  311. {
  312. if (daliip.length())
  313. {
  314. Owned<INode> daliNode;
  315. daliNode.setown(createINode(daliip));
  316. return getRemoteFileTree(user, daliNode, filePrefix);
  317. }
  318. if (!remote)
  319. return NULL;
  320. StringBuffer remoteLFN;
  321. Owned<IPropertyTree> fileTree = getRemoteFileTree(user, remote, remotePrefix);
  322. if (!fileTree)
  323. return NULL;
  324. StringAttrBuilder daliipText(daliip);
  325. remote->endpoint().getUrlStr(daliipText);
  326. filePrefix.set(remotePrefix);
  327. return fileTree.getClear();
  328. }
  329. void ReferencedFile::resolveRemote(IUserDescriptor *user, INode *remote, const char *remotePrefix, const char *dstCluster, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveForeign)
  330. {
  331. if ((flags & RefFileForeign) && !resolveForeign)
  332. return;
  333. if (flags & RefFileInPackage)
  334. return;
  335. if (noDfsResolution)
  336. {
  337. flags |= RefFileNotFound;
  338. return;
  339. }
  340. reset();
  341. if (checkLocalFirst) //usually means we don't want to overwrite existing file info
  342. {
  343. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName.str(), user);
  344. if(df)
  345. {
  346. processLocalFileInfo(df, dstCluster, NULL, subfiles);
  347. return;
  348. }
  349. }
  350. Owned<IPropertyTree> tree = getSpecifiedOrRemoteFileTree(user, remote, remotePrefix);
  351. if (tree)
  352. {
  353. processRemoteFileTree(tree, srcCluster, subfiles);
  354. return;
  355. }
  356. else if (!checkLocalFirst && (!srcCluster || !*srcCluster)) //haven't already checked and not told to use a specific copy
  357. {
  358. resolveLocal(dstCluster, srcCluster, user, subfiles);
  359. return;
  360. }
  361. flags |= RefFileNotFound;
  362. }
  363. void ReferencedFile::resolve(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, INode *remote, const char *remotePrefix, bool checkLocalFirst, StringArray *subfiles, bool resolveForeign)
  364. {
  365. if (daliip.length() || remote)
  366. resolveRemote(user, remote, remotePrefix, dstCluster, srcCluster, checkLocalFirst, subfiles, resolveForeign);
  367. else
  368. resolveLocal(dstCluster, srcCluster, user, subfiles);
  369. }
  370. void ReferencedFile::cloneInfo(IDFUhelper *helper, IUserDescriptor *user, const char *dstCluster, const char *srcCluster, bool overwrite, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder)
  371. {
  372. if ((flags & RefFileCloned) || (flags & RefFileSuper) || (flags & RefFileInPackage))
  373. return;
  374. if ((flags & RefFileForeign) && !cloneForeign)
  375. return;
  376. if (!(flags & (RefFileRemote | RefFileForeign | RefFileNotOnCluster)))
  377. return;
  378. if (fileSrcCluster.length())
  379. srcCluster = fileSrcCluster;
  380. try
  381. {
  382. StringBuffer srcLFN;
  383. if (filePrefix.length())
  384. srcLFN.append(filePrefix.str()).append("::");
  385. srcLFN.append(logicalName.str());
  386. helper->cloneRoxieSubFile(srcLFN, srcCluster, logicalName, dstCluster, filePrefix, redundancy, channelsPerNode, replicateOffset, defReplicateFolder, user, daliip, overwrite);
  387. flags |= RefFileCloned;
  388. }
  389. catch (IException *e)
  390. {
  391. flags |= RefFileCopyInfoFailed;
  392. DBGLOG(e);
  393. e->Release();
  394. }
  395. catch (...)
  396. {
  397. flags |= RefFileCopyInfoFailed;
  398. DBGLOG("Unknown error copying file info for [%s::] %s, from %s on dfs-dali %s", filePrefix.str(), logicalName.str(), fileSrcCluster.length() ? fileSrcCluster.get() : "*", daliip.str());
  399. }
  400. }
  401. void ReferencedFile::cloneSuperInfo(ReferencedFileList *list, IUserDescriptor *user, INode *remote, bool overwrite)
  402. {
  403. if ((flags & RefFileCloned) || (flags & RefFileInPackage) || !(flags & RefFileSuper) || !(flags & RefFileRemote))
  404. return;
  405. try
  406. {
  407. Owned<IPropertyTree> tree = getSpecifiedOrRemoteFileTree(user, remote, NULL);
  408. if (!tree)
  409. return;
  410. IDistributedFileDirectory &dir = queryDistributedFileDirectory();
  411. Owned<IDistributedFile> df = dir.lookup(logicalName.str(), user);
  412. if(df)
  413. {
  414. if (!overwrite)
  415. return;
  416. df->detach();
  417. df.clear();
  418. }
  419. Owned<IDistributedSuperFile> superfile = dir.createSuperFile(logicalName.str(),user, true, false);
  420. flags |= RefFileCloned;
  421. Owned<IPropertyTreeIterator> subfiles = tree->getElements("SubFile");
  422. ForEach(*subfiles)
  423. {
  424. const char *name = subfiles->query().queryProp("@name");
  425. if (list)
  426. {
  427. //ensure superfile in superfile is cloned, before add
  428. ReferencedFile *subref = list->map.getValue(name);
  429. if (subref)
  430. subref->cloneSuperInfo(list, user, remote, overwrite);
  431. }
  432. if (name && *name)
  433. superfile->addSubFile(name, false, NULL, false);
  434. }
  435. }
  436. catch (IException *e)
  437. {
  438. flags |= RefFileCopyInfoFailed;
  439. DBGLOG(e);
  440. e->Release();
  441. }
  442. catch (...)
  443. {
  444. flags |= RefFileCopyInfoFailed;
  445. DBGLOG("Unknown error copying superfile info for %s", logicalName.str());
  446. }
  447. }
  448. class ReferencedFileIterator : public CInterface, implements IReferencedFileIterator
  449. {
  450. public:
  451. IMPLEMENT_IINTERFACE;
  452. ReferencedFileIterator(ReferencedFileList *_list)
  453. {
  454. list.set(_list);
  455. iter.setown(new HashIterator(list->map));
  456. }
  457. virtual bool first()
  458. {
  459. return iter->first();
  460. }
  461. virtual bool next()
  462. {
  463. return iter->next();
  464. }
  465. virtual bool isValid()
  466. {
  467. return iter->isValid();
  468. }
  469. virtual IReferencedFile & query()
  470. {
  471. return *list->map.mapToValue(&iter->query());
  472. }
  473. virtual ReferencedFile & queryObject()
  474. {
  475. return *(list->map.mapToValue(&iter->query()));
  476. }
  477. public:
  478. Owned<ReferencedFileList> list;
  479. Owned<HashIterator> iter;
  480. };
  481. void ReferencedFileList::ensureFile(const char *ln, unsigned flags, const char *pkgid, bool noDfsResolution, const char *daliip, const char *srcCluster, const char *prefix)
  482. {
  483. if (!allowForeign && checkForeign(ln))
  484. throw MakeStringException(-1, "Foreign file not allowed%s: %s", (flags & RefFileInPackage) ? " (declared in package)" : "", ln);
  485. Owned<ReferencedFile> file = new ReferencedFile(ln, daliip, srcCluster, prefix, false, flags, pkgid, noDfsResolution, allowSizeCalc);
  486. if (!file->logicalName.length())
  487. return;
  488. ReferencedFile *existing = map.getValue(file->getLogicalName());
  489. if (existing)
  490. existing->flags |= flags;
  491. else
  492. {
  493. const char *refln = file->getLogicalName();
  494. // NOTE: setValue links its parameter
  495. map.setValue(refln, file);
  496. }
  497. }
  498. void ReferencedFileList::addFile(const char *ln, const char *daliip, const char *srcCluster, const char *prefix)
  499. {
  500. ensureFile(ln, 0, NULL, false, daliip, srcCluster, prefix);
  501. }
  502. void ReferencedFileList::addFiles(StringArray &files)
  503. {
  504. ForEachItemIn(i, files)
  505. addFile(files.item(i));
  506. }
  507. void ReferencedFileList::addFileFromSubFile(IPropertyTree &subFile, const char *ip, const char *cluster, const char *prefix)
  508. {
  509. addFile(subFile.queryProp("@value"), ip, cluster, prefix);
  510. }
  511. void ReferencedFileList::addFilesFromSuperFile(IPropertyTree &superFile, const char *_ip, const char *_cluster, const char *_prefix)
  512. {
  513. StringBuffer ip;
  514. StringBuffer cluster;
  515. StringBuffer prefix;
  516. splitDerivedDfsLocation(superFile.queryProp("@daliip"), cluster, ip, prefix, NULL, _cluster, _ip, _prefix);
  517. if (superFile.hasProp("@sourceCluster"))
  518. cluster.set(superFile.queryProp("@sourceCluster"));
  519. Owned<IPropertyTreeIterator> subFiles = superFile.getElements("SubFile[@value]");
  520. ForEach(*subFiles)
  521. addFileFromSubFile(subFiles->query(), ip, cluster, prefix);
  522. }
  523. void ReferencedFileList::addFilesFromPackage(IPropertyTree &package, const char *_ip, const char *_cluster, const char *_prefix)
  524. {
  525. StringBuffer ip;
  526. StringBuffer cluster;
  527. StringBuffer prefix;
  528. splitDerivedDfsLocation(package.queryProp("@daliip"), cluster, ip, prefix, NULL, _cluster, _ip, _prefix);
  529. if (package.hasProp("@sourceCluster"))
  530. cluster.set(package.queryProp("@sourceCluster"));
  531. Owned<IPropertyTreeIterator> supers = package.getElements("SuperFile");
  532. ForEach(*supers)
  533. addFilesFromSuperFile(supers->query(), ip, cluster, prefix);
  534. }
  535. void ReferencedFileList::addFilesFromPackageMap(IPropertyTree *pm)
  536. {
  537. StringBuffer ip;
  538. StringBuffer cluster;
  539. StringBuffer prefix;
  540. const char *srcCluster = pm->queryProp("@sourceCluster");
  541. splitDerivedDfsLocation(pm->queryProp("@daliip"), cluster, ip, prefix, srcCluster, srcCluster, NULL, NULL);
  542. Owned<IPropertyTreeIterator> packages = pm->getElements("Package");
  543. ForEach(*packages)
  544. addFilesFromPackage(packages->query(), ip, cluster, prefix);
  545. }
  546. void ReferencedFileList::addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackage *pkg)
  547. {
  548. Owned<IConstWUGraphIterator> graphs = &cw->getGraphs(GraphTypeActivities);
  549. ForEach(*graphs)
  550. {
  551. Owned <IPropertyTree> xgmml = graphs->query().getXGMMLTree(false);
  552. Owned<IPropertyTreeIterator> iter = xgmml->getElements("//node[att/@name='_*ileName']");
  553. ForEach(*iter)
  554. {
  555. IPropertyTree &node = iter->query();
  556. const char *logicalName = node.queryProp("att[@name='_fileName']/@value");
  557. if (!logicalName)
  558. logicalName = node.queryProp("att[@name='_indexFileName']/@value");
  559. if (!logicalName)
  560. continue;
  561. ThorActivityKind kind = (ThorActivityKind) node.getPropInt("att[@name='_kind']/@value", TAKnone);
  562. //not likely to be part of roxie queries, but for forward compatibility:
  563. if(kind==TAKdiskwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite || kind==TAKjsonwrite)
  564. continue;
  565. if (node.getPropBool("att[@name='_isSpill']/@value") ||
  566. node.getPropBool("att[@name='_isTransformSpill']/@value"))
  567. continue;
  568. unsigned flags = 0;
  569. if (pkg)
  570. {
  571. const char *pkgid = pkg->locateSuperFile(logicalName);
  572. if (pkgid)
  573. {
  574. flags |= (RefFileSuper | RefFileInPackage);
  575. Owned<ISimpleSuperFileEnquiry> ssfe = pkg->resolveSuperFile(logicalName);
  576. if (ssfe && ssfe->numSubFiles()>0)
  577. {
  578. unsigned count = ssfe->numSubFiles();
  579. while (count--)
  580. {
  581. StringBuffer subfile;
  582. ssfe->getSubFileName(count, subfile);
  583. ensureFile(subfile, RefSubFile | RefFileInPackage, pkgid, false);
  584. }
  585. }
  586. }
  587. ensureFile(logicalName, flags, pkgid, pkg->isCompulsory());
  588. }
  589. else
  590. ensureFile(logicalName, flags, NULL, false);
  591. }
  592. }
  593. }
  594. void ReferencedFileList::addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackageMap *pm, const char *queryid)
  595. {
  596. const IHpccPackage *pkg = NULL;
  597. if (pm && queryid && *queryid)
  598. pkg = pm->matchPackage(queryid);
  599. addFilesFromQuery(cw, pkg);
  600. }
  601. void ReferencedFileList::addFilesFromWorkUnit(IConstWorkUnit *cw)
  602. {
  603. addFilesFromQuery(cw, NULL, NULL);
  604. }
  605. void ReferencedFileList::resolveSubFiles(StringArray &subfiles, bool checkLocalFirst, bool resolveForeign)
  606. {
  607. StringArray childSubFiles;
  608. ForEachItemIn(i, subfiles)
  609. {
  610. const char *lfn = subfiles.item(i);
  611. if (!allowForeign && checkForeign(lfn))
  612. throw MakeStringException(-1, "Foreign sub file not allowed: %s", lfn);
  613. Owned<ReferencedFile> file = new ReferencedFile(lfn, NULL, NULL, NULL, true, 0, NULL, false, allowSizeCalc);
  614. if (file->logicalName.length() && !map.getValue(file->getLogicalName()))
  615. {
  616. file->resolve(process.get(), srcCluster, user, remote, remotePrefix, checkLocalFirst, &childSubFiles, resolveForeign);
  617. const char *ln = file->getLogicalName();
  618. // NOTE: setValue links its parameter
  619. map.setValue(ln, file);
  620. }
  621. }
  622. if (childSubFiles.length())
  623. resolveSubFiles(childSubFiles, checkLocalFirst, resolveForeign);
  624. }
  625. void ReferencedFileList::resolveFiles(const char *_process, const char *remoteIP, const char *_remotePrefix, const char *_srcCluster, bool checkLocalFirst, bool expandSuperFiles, bool resolveForeign)
  626. {
  627. process.set(_process);
  628. remote.setown((remoteIP && *remoteIP) ? createINode(remoteIP, 7070) : NULL);
  629. srcCluster.set(_srcCluster);
  630. remotePrefix.set(_remotePrefix);
  631. StringArray subfiles;
  632. {
  633. ReferencedFileIterator files(this);
  634. ForEach(files)
  635. files.queryObject().resolve(process, srcCluster, user, remote, remotePrefix, checkLocalFirst, expandSuperFiles ? &subfiles : NULL, resolveForeign);
  636. }
  637. if (expandSuperFiles)
  638. resolveSubFiles(subfiles, checkLocalFirst, resolveForeign);
  639. }
  640. void ReferencedFileList::cloneFileInfo(IDFUhelper *helper, bool overwrite, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder)
  641. {
  642. ReferencedFileIterator files(this);
  643. ForEach(files)
  644. files.queryObject().cloneInfo(helper, user, process, srcCluster, overwrite, cloneForeign, redundancy, channelsPerNode, replicateOffset, defReplicateFolder);
  645. if (cloneSuperInfo)
  646. ForEach(files)
  647. files.queryObject().cloneSuperInfo(this, user, remote, overwrite);
  648. }
  649. void ReferencedFileList::cloneRelationships()
  650. {
  651. if (!remote || remote->endpoint().isNull())
  652. return;
  653. StringBuffer addr;
  654. remote->endpoint().getUrlStr(addr);
  655. IDistributedFileDirectory &dir = queryDistributedFileDirectory();
  656. ReferencedFileIterator files(this);
  657. ForEach(files)
  658. {
  659. ReferencedFile &file = files.queryObject();
  660. if (!(file.getFlags() & RefFileRemote))
  661. continue;
  662. Owned<IFileRelationshipIterator> iter = dir.lookupFileRelationships(file.getLogicalName(), NULL,
  663. NULL, NULL, NULL, NULL, NULL, addr.str(), WF_LOOKUP_TIMEOUT);
  664. ForEach(*iter)
  665. {
  666. IFileRelationship &r=iter->query();
  667. const char* assoc = r.querySecondaryFilename();
  668. if (!assoc)
  669. continue;
  670. if (*assoc == '~')
  671. assoc++;
  672. IReferencedFile *refAssoc = map.getValue(assoc);
  673. if (refAssoc && !(refAssoc->getFlags() & RefFileCopyInfoFailed))
  674. {
  675. dir.addFileRelationship(file.getLogicalName(), assoc, r.queryPrimaryFields(), r.querySecondaryFields(),
  676. r.queryKind(), r.queryCardinality(), r.isPayload(), user, r.queryDescription());
  677. }
  678. }
  679. }
  680. }
  681. IReferencedFileIterator *ReferencedFileList::getFiles()
  682. {
  683. return new ReferencedFileIterator(this);
  684. }
  685. IReferencedFileList *createReferencedFileList(const char *user, const char *pw, bool allowForeignFiles, bool allowFileSizeCalc)
  686. {
  687. return new ReferencedFileList(user, pw, allowForeignFiles, allowFileSizeCalc);
  688. }
  689. IReferencedFileList *createReferencedFileList(IUserDescriptor *user, bool allowForeignFiles, bool allowFileSizeCalc)
  690. {
  691. return new ReferencedFileList(user, allowForeignFiles, allowFileSizeCalc);
  692. }