referencedfilelist.cpp 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "referencedfilelist.hpp"
  14. #include "jptree.hpp"
  15. #include "workunit.hpp"
  16. #include "eclhelper.hpp"
  17. #include "dautils.hpp"
  18. #include "dasds.hpp"
  19. #include "dadfs.hpp"
  20. #include "dasess.hpp"
  21. #define WF_LOOKUP_TIMEOUT (1000*15) // 15 seconds
  22. bool getIsOpt(const IPropertyTree &graphNode)
  23. {
  24. if (graphNode.hasProp("att[@name='_isOpt']"))
  25. return graphNode.getPropBool("att[@name='_isOpt']/@value", false);
  26. else
  27. return graphNode.getPropBool("att[@name='_isIndexOpt']/@value", false);
  28. }
  29. bool checkForeign(const char *lfn)
  30. {
  31. if (*lfn=='~')
  32. lfn++;
  33. static size_t l = strlen("foreign");
  34. if (strnicmp("foreign", lfn, l)==0)
  35. {
  36. lfn+=l;
  37. while (isspace(*lfn))
  38. lfn++;
  39. if (lfn[0]==':' && lfn[1]==':')
  40. return true;
  41. }
  42. return false;
  43. }
  44. const char *skipForeign(const char *name, StringBuffer *ip)
  45. {
  46. unsigned maxTildas = 2;
  47. while (maxTildas-- && *name=='~')
  48. name++;
  49. const char *d1 = strstr(name, "::");
  50. if (d1)
  51. {
  52. StringBuffer cmp;
  53. if (strieq("foreign", cmp.append(d1-name, name).trim().str()))
  54. {
  55. // foreign scope - need to strip off the ip and port
  56. d1 += 2; // skip ::
  57. const char *d2 = strstr(d1,"::");
  58. if (d2)
  59. {
  60. if (ip)
  61. ip->append(d2-d1, d1).trim();
  62. d2 += 2;
  63. while (*d2 == ' ')
  64. d2++;
  65. name = d2;
  66. }
  67. }
  68. }
  69. return name;
  70. }
  71. void splitDfsLocation(const char *address, StringBuffer &cluster, StringBuffer &ip, StringBuffer &prefix, const char *defaultCluster)
  72. {
  73. if (!address || !*address)
  74. {
  75. cluster.append(defaultCluster);
  76. return;
  77. }
  78. const char *s=strchr(address, '@');
  79. if (s)
  80. {
  81. cluster.append(s - address, address);
  82. address = s + 1;
  83. }
  84. else if (defaultCluster && *defaultCluster)
  85. cluster.append(defaultCluster);
  86. s=strchr(address, '/');
  87. if (!s)
  88. ip.append(address);
  89. else
  90. {
  91. ip.append(s - address, address);
  92. prefix.append(s+1);
  93. }
  94. }
  95. void splitDerivedDfsLocation(const char *address, StringBuffer &cluster, StringBuffer &ip, StringBuffer &prefix, const char *defaultCluster, const char *baseCluster, const char *baseIP, const char *basePrefix)
  96. {
  97. if (address && *address)
  98. {
  99. splitDfsLocation(address, cluster, ip, prefix, defaultCluster);
  100. return;
  101. }
  102. ip.append(baseIP);
  103. cluster.append(baseCluster);
  104. prefix.append(basePrefix);
  105. }
  106. class ReferencedFileList;
  107. class ReferencedFile : implements IReferencedFile, public CInterface
  108. {
  109. public:
  110. IMPLEMENT_IINTERFACE;
  111. ReferencedFile(const char *lfn, const char *sourceIP, const char *srcCluster, const char *prefix, bool isSubFile, unsigned _flags, const char *_pkgid, bool noDfs, bool calcSize)
  112. : flags(_flags), pkgid(_pkgid), noDfsResolution(noDfs), calcFileSize(calcSize), fileSize(0), numParts(0), trackSubFiles(false)
  113. {
  114. {
  115. //Scope ensures strings are assigned
  116. StringAttrBuilder logicalNameText(logicalName), daliipText(daliip);
  117. logicalNameText.set(skipForeign(lfn, &daliipText)).toLowerCase();
  118. }
  119. if (daliip.length())
  120. flags |= RefFileForeign;
  121. else
  122. daliip.set(sourceIP);
  123. fileSrcCluster.set(srcCluster);
  124. filePrefix.set(prefix);
  125. if (isSubFile)
  126. flags |= RefSubFile;
  127. }
  128. void reset()
  129. {
  130. flags &= ~(RefFileNotOnCluster | RefFileNotFound | RefFileRemote | RefFileCopyInfoFailed | RefFileCloned | RefFileNotOnSource); //these flags are calculated during resolve
  131. }
  132. IPropertyTree *getRemoteFileTree(IUserDescriptor *user, INode *remote, const char *remotePrefix);
  133. IPropertyTree *getSpecifiedOrRemoteFileTree(IUserDescriptor *user, INode *remote, const char *remotePrefix);
  134. void processLocalFileInfo(IDistributedFile *df, const char *dstCluster, const char *srcCluster, StringArray *subfiles);
  135. void processRemoteFileTree(IPropertyTree *tree, const char *srcCluster, StringArray *subfiles);
  136. void resolveLocal(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, StringArray *subfiles);
  137. void resolveRemote(IUserDescriptor *user, INode *remote, const char *remotePrefix, const char *dstCluster, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveForeign=false);
  138. void resolve(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, INode *remote, const char *remotePrefix, bool checkLocalFirst, StringArray *subfiles, bool trackSubFiles, bool resolveForeign=false);
  139. virtual const char *getLogicalName() const {return logicalName.str();}
  140. virtual unsigned getFlags() const {return flags;}
  141. virtual const SocketEndpoint &getForeignIP(SocketEndpoint &ep) const
  142. {
  143. if ((flags & RefFileForeign) && daliip.length())
  144. ep.set(daliip.str());
  145. else
  146. ep.set(NULL);
  147. return ep;
  148. }
  149. virtual void cloneInfo(unsigned updateFlags, IDFUhelper *helper, IUserDescriptor *user, const char *dstCluster, const char *srcCluster, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder);
  150. void cloneSuperInfo(unsigned updateFlags, ReferencedFileList *list, IUserDescriptor *user, INode *remote);
  151. virtual const char *queryPackageId() const {return pkgid.get();}
  152. virtual __int64 getFileSize()
  153. {
  154. return fileSize;
  155. }
  156. virtual unsigned getNumParts()
  157. {
  158. return numParts;
  159. }
  160. virtual const StringArray &getSubFileNames() const { return subFileNames; };
  161. virtual void appendSubFileNames(const StringArray &names)
  162. {
  163. ForEachItemIn(i, names)
  164. subFileNames.append(names.item(i));
  165. };
  166. public:
  167. StringArray subFileNames;
  168. StringAttr logicalName;
  169. StringAttr pkgid;
  170. StringAttr daliip;
  171. StringAttr filePrefix;
  172. StringAttr fileSrcCluster;
  173. __int64 fileSize;
  174. unsigned numParts;
  175. unsigned flags;
  176. bool noDfsResolution;
  177. bool calcFileSize;
  178. bool trackSubFiles;
  179. };
  180. class ReferencedFileList : implements IReferencedFileList, public CInterface
  181. {
  182. public:
  183. IMPLEMENT_IINTERFACE;
  184. ReferencedFileList(const char *username, const char *pw, bool allowForeignFiles, bool allowFileSizeCalc)
  185. : allowForeign(allowForeignFiles), allowSizeCalc(allowFileSizeCalc)
  186. {
  187. if (username && pw)
  188. {
  189. user.setown(createUserDescriptor());
  190. user->set(username, pw);
  191. }
  192. }
  193. ReferencedFileList(IUserDescriptor *userDesc, bool allowForeignFiles, bool allowFileSizeCalc)
  194. : allowForeign(allowForeignFiles), allowSizeCalc(allowFileSizeCalc)
  195. {
  196. if (userDesc)
  197. user.set(userDesc);
  198. }
  199. void ensureFile(const char *ln, unsigned flags, const char *pkgid, bool noDfsResolution, const StringArray *subfileNames, const char *daliip=NULL, const char *srcCluster=NULL, const char *remotePrefix=NULL);
  200. virtual void addFile(const char *ln, const char *daliip=NULL, const char *srcCluster=NULL, const char *remotePrefix=NULL);
  201. virtual void addFiles(StringArray &files);
  202. virtual void addFilesFromWorkUnit(IConstWorkUnit *cw);
  203. virtual bool addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackageMap *pm, const char *queryid);
  204. virtual bool addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackage *pkg);
  205. virtual void addFilesFromPackageMap(IPropertyTree *pm);
  206. void addFileFromSubFile(IPropertyTree &subFile, const char *_daliip, const char *srcCluster, const char *_remotePrefix);
  207. void addFilesFromSuperFile(IPropertyTree &superFile, const char *_daliip, const char *srcCluster, const char *_remotePrefix);
  208. void addFilesFromPackage(IPropertyTree &package, const char *_daliip, const char *srcCluster, const char *_remotePrefix);
  209. virtual IReferencedFileIterator *getFiles();
  210. virtual void cloneFileInfo(unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder);
  211. virtual void cloneRelationships();
  212. virtual void cloneAllInfo(unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder)
  213. {
  214. cloneFileInfo(updateFlags, helper, cloneSuperInfo, cloneForeign, redundancy, channelsPerNode, replicateOffset, defReplicateFolder);
  215. cloneRelationships();
  216. }
  217. virtual void resolveFiles(const char *process, const char *remoteIP, const char *_remotePrefix, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool trackSubFiles, bool resolveForeign=false);
  218. void resolveSubFiles(StringArray &subfiles, bool checkLocalFirst, bool trackSubFiles, bool resolveForeign);
  219. public:
  220. Owned<IUserDescriptor> user;
  221. Owned<INode> remote;
  222. MapStringToMyClass<ReferencedFile> map;
  223. StringAttr process;
  224. StringAttr srcCluster;
  225. StringAttr remotePrefix;
  226. bool allowForeign;
  227. bool allowSizeCalc;
  228. };
  229. void ReferencedFile::processLocalFileInfo(IDistributedFile *df, const char *dstCluster, const char *srcCluster, StringArray *subfiles)
  230. {
  231. IDistributedSuperFile *super = df->querySuperFile();
  232. if (super)
  233. {
  234. flags |= RefFileSuper;
  235. if (subfiles)
  236. {
  237. Owned<IDistributedFileIterator> it = super->getSubFileIterator(true); //supersub = true, no need to deal with LOCAL supersubs
  238. ForEach(*it)
  239. {
  240. IDistributedFile &sub = it->query();
  241. StringBuffer name;
  242. sub.getLogicalName(name);
  243. subfiles->append(name.str());
  244. if (trackSubFiles)
  245. subFileNames.append(name.str());
  246. }
  247. }
  248. }
  249. else
  250. {
  251. flags |= RefSubFile;
  252. if (!dstCluster || !*dstCluster)
  253. return;
  254. if (df->findCluster(dstCluster)==NotFound)
  255. flags |= RefFileNotOnCluster;
  256. if (fileSrcCluster.length())
  257. srcCluster=fileSrcCluster;
  258. if (srcCluster && *srcCluster)
  259. if (NotFound == df->findCluster(srcCluster))
  260. flags |= RefFileNotOnSource;
  261. fileSize = df->getFileSize(calcFileSize, false);
  262. numParts = df->numParts();
  263. }
  264. }
  265. void ReferencedFile::processRemoteFileTree(IPropertyTree *tree, const char *srcCluster, StringArray *subfiles)
  266. {
  267. flags |= RefFileRemote;
  268. if (fileSrcCluster.length())
  269. srcCluster = fileSrcCluster;
  270. if (streq(tree->queryName(), queryDfsXmlBranchName(DXB_SuperFile)))
  271. {
  272. flags |= RefFileSuper;
  273. if (subfiles)
  274. {
  275. Owned<IPropertyTreeIterator> it = tree->getElements("SubFile");
  276. ForEach(*it)
  277. {
  278. const char *lfn = it->query().queryProp("@name");
  279. StringBuffer foreignLfn;
  280. if (flags & RefFileForeign)
  281. lfn = foreignLfn.append("foreign::").append(this->daliip).append("::").append(lfn).str();
  282. subfiles->append(lfn);
  283. if (trackSubFiles)
  284. subFileNames.append(lfn);
  285. }
  286. }
  287. }
  288. else if (srcCluster && *srcCluster)
  289. {
  290. VStringBuffer xpath("Cluster[@name='%s']", srcCluster);
  291. if (!tree->hasProp(xpath))
  292. flags |= RefFileNotOnSource;
  293. numParts = tree->getPropInt("@numparts", 0);
  294. fileSize = tree->getPropInt64("Attr/@size", 0);
  295. }
  296. }
  297. void ReferencedFile::resolveLocal(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, StringArray *subfiles)
  298. {
  299. if (flags & RefFileInPackage)
  300. return;
  301. if (noDfsResolution)
  302. {
  303. flags |= RefFileNotFound;
  304. return;
  305. }
  306. reset();
  307. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName.str(), user);
  308. if(df)
  309. processLocalFileInfo(df, dstCluster, srcCluster, subfiles);
  310. else
  311. {
  312. flags |= RefFileNotFound;
  313. DBGLOG("ReferencedFile not found (local) %s", logicalName.str());
  314. }
  315. }
  316. IPropertyTree *ReferencedFile::getRemoteFileTree(IUserDescriptor *user, INode *remote, const char *remotePrefix)
  317. {
  318. if (!remote)
  319. return NULL;
  320. StringBuffer remoteLFN;
  321. if (remotePrefix && *remotePrefix)
  322. remoteLFN.append(remotePrefix).append("::").append(logicalName);
  323. return queryDistributedFileDirectory().getFileTree(remoteLFN.length() ? remoteLFN.str() : logicalName.str(), user, remote, WF_LOOKUP_TIMEOUT, false, false);
  324. }
  325. IPropertyTree *ReferencedFile::getSpecifiedOrRemoteFileTree(IUserDescriptor *user, INode *remote, const char *remotePrefix)
  326. {
  327. if (daliip.length())
  328. {
  329. Owned<INode> daliNode;
  330. daliNode.setown(createINode(daliip));
  331. return getRemoteFileTree(user, daliNode, filePrefix);
  332. }
  333. if (!remote)
  334. return NULL;
  335. StringBuffer remoteLFN;
  336. Owned<IPropertyTree> fileTree = getRemoteFileTree(user, remote, remotePrefix);
  337. if (!fileTree)
  338. return NULL;
  339. StringAttrBuilder daliipText(daliip);
  340. remote->endpoint().getUrlStr(daliipText);
  341. filePrefix.set(remotePrefix);
  342. return fileTree.getClear();
  343. }
  344. void ReferencedFile::resolveRemote(IUserDescriptor *user, INode *remote, const char *remotePrefix, const char *dstCluster, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveForeign)
  345. {
  346. if ((flags & RefFileForeign) && !resolveForeign && !trackSubFiles)
  347. return;
  348. if (flags & RefFileInPackage)
  349. return;
  350. if (noDfsResolution)
  351. {
  352. flags |= RefFileNotFound;
  353. return;
  354. }
  355. reset();
  356. if (checkLocalFirst) //usually means we don't want to overwrite existing file info
  357. {
  358. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName.str(), user);
  359. if(df)
  360. {
  361. processLocalFileInfo(df, dstCluster, NULL, subfiles);
  362. return;
  363. }
  364. }
  365. Owned<IPropertyTree> tree = getSpecifiedOrRemoteFileTree(user, remote, remotePrefix);
  366. if (tree)
  367. {
  368. processRemoteFileTree(tree, srcCluster, subfiles);
  369. return;
  370. }
  371. else if (!checkLocalFirst && (!srcCluster || !*srcCluster)) //haven't already checked and not told to use a specific copy
  372. {
  373. resolveLocal(dstCluster, srcCluster, user, subfiles);
  374. return;
  375. }
  376. flags |= RefFileNotFound;
  377. StringBuffer dest;
  378. DBGLOG("Remote ReferencedFile not found %s [dali=%s, remote=%s, prefix=%s]", logicalName.str(), daliip.get(), remote ? remote->endpoint().getUrlStr(dest).str() : nullptr, remotePrefix);
  379. }
  380. void ReferencedFile::resolve(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, INode *remote, const char *remotePrefix, bool checkLocalFirst, StringArray *subfiles, bool _trackSubFiles, bool resolveForeign)
  381. {
  382. trackSubFiles = _trackSubFiles;
  383. if (daliip.length() || remote)
  384. resolveRemote(user, remote, remotePrefix, dstCluster, srcCluster, checkLocalFirst, subfiles, resolveForeign);
  385. else
  386. resolveLocal(dstCluster, srcCluster, user, subfiles);
  387. }
  388. void ReferencedFile::cloneInfo(unsigned updateFlags, IDFUhelper *helper, IUserDescriptor *user, const char *dstCluster, const char *srcCluster, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder)
  389. {
  390. if ((flags & RefFileCloned) || (flags & RefFileSuper) || (flags & RefFileInPackage))
  391. return;
  392. if ((flags & RefFileForeign) && !cloneForeign)
  393. return;
  394. if (!(flags & (RefFileRemote | RefFileForeign | RefFileNotOnCluster)))
  395. return;
  396. if (fileSrcCluster.length())
  397. srcCluster = fileSrcCluster;
  398. try
  399. {
  400. StringBuffer srcLFN;
  401. if (filePrefix.length())
  402. srcLFN.append(filePrefix.str()).append("::");
  403. srcLFN.append(logicalName.str());
  404. helper->cloneRoxieSubFile(srcLFN, srcCluster, logicalName, dstCluster, filePrefix, redundancy, channelsPerNode, replicateOffset, defReplicateFolder, user, daliip, updateFlags);
  405. flags |= RefFileCloned;
  406. }
  407. catch (IException *e)
  408. {
  409. flags |= RefFileCopyInfoFailed;
  410. DBGLOG(e, "ReferencedFile ");
  411. e->Release();
  412. }
  413. catch (...)
  414. {
  415. flags |= RefFileCopyInfoFailed;
  416. DBGLOG("ReferencedFile Unknown error copying file info for [%s::] %s, from %s on dfs-dali %s", filePrefix.str(), logicalName.str(), fileSrcCluster.length() ? fileSrcCluster.get() : "*", daliip.str());
  417. }
  418. }
  419. void ReferencedFile::cloneSuperInfo(unsigned updateFlags, ReferencedFileList *list, IUserDescriptor *user, INode *remote)
  420. {
  421. if ((flags & RefFileCloned) || (flags & RefFileInPackage) || !(flags & RefFileSuper) || !(flags & RefFileRemote))
  422. return;
  423. try
  424. {
  425. Owned<IPropertyTree> tree = getSpecifiedOrRemoteFileTree(user, remote, NULL);
  426. if (!tree)
  427. return;
  428. IDistributedFileDirectory &dir = queryDistributedFileDirectory();
  429. Owned<IDistributedFile> df = dir.lookup(logicalName.str(), user);
  430. if(df)
  431. {
  432. if (!(updateFlags & DALI_UPDATEF_SUPERFILES))
  433. return;
  434. df->detach();
  435. df.clear();
  436. }
  437. Owned<IDistributedSuperFile> superfile = dir.createSuperFile(logicalName.str(),user, true, false);
  438. flags |= RefFileCloned;
  439. Owned<IPropertyTreeIterator> subfiles = tree->getElements("SubFile");
  440. ForEach(*subfiles)
  441. {
  442. const char *name = subfiles->query().queryProp("@name");
  443. if (list)
  444. {
  445. //ensure superfile in superfile is cloned, before add
  446. ReferencedFile *subref = list->map.getValue(name);
  447. if (subref)
  448. subref->cloneSuperInfo(updateFlags, list, user, remote);
  449. }
  450. if (name && *name)
  451. superfile->addSubFile(name, false, NULL, false);
  452. }
  453. }
  454. catch (IException *e)
  455. {
  456. flags |= RefFileCopyInfoFailed;
  457. DBGLOG(e, "ReferencedFile ");
  458. e->Release();
  459. }
  460. catch (...)
  461. {
  462. flags |= RefFileCopyInfoFailed;
  463. DBGLOG("ReferencedFile Unknown error copying superfile info for %s", logicalName.str());
  464. }
  465. }
  466. class ReferencedFileIterator : implements IReferencedFileIterator, public CInterface
  467. {
  468. public:
  469. IMPLEMENT_IINTERFACE;
  470. ReferencedFileIterator(ReferencedFileList *_list)
  471. {
  472. list.set(_list);
  473. iter.setown(new HashIterator(list->map));
  474. }
  475. virtual bool first()
  476. {
  477. return iter->first();
  478. }
  479. virtual bool next()
  480. {
  481. return iter->next();
  482. }
  483. virtual bool isValid()
  484. {
  485. return iter->isValid();
  486. }
  487. virtual IReferencedFile & query()
  488. {
  489. return *list->map.mapToValue(&iter->query());
  490. }
  491. virtual ReferencedFile & queryObject()
  492. {
  493. return *(list->map.mapToValue(&iter->query()));
  494. }
  495. public:
  496. Owned<ReferencedFileList> list;
  497. Owned<HashIterator> iter;
  498. };
  499. void ReferencedFileList::ensureFile(const char *ln, unsigned flags, const char *pkgid, bool noDfsResolution, const StringArray *subfileNames, const char *daliip, const char *srcCluster, const char *prefix)
  500. {
  501. if (!allowForeign && checkForeign(ln))
  502. throw MakeStringException(-1, "Foreign file not allowed%s: %s", (flags & RefFileInPackage) ? " (declared in package)" : "", ln);
  503. Owned<ReferencedFile> file = new ReferencedFile(ln, daliip, srcCluster, prefix, false, flags, pkgid, noDfsResolution, allowSizeCalc);
  504. if (!file->logicalName.length())
  505. return;
  506. if (subfileNames)
  507. file->appendSubFileNames(*subfileNames);
  508. ReferencedFile *existing = map.getValue(file->getLogicalName());
  509. if (existing)
  510. existing->flags |= flags;
  511. else
  512. {
  513. const char *refln = file->getLogicalName();
  514. // NOTE: setValue links its parameter
  515. map.setValue(refln, file);
  516. }
  517. }
  518. void ReferencedFileList::addFile(const char *ln, const char *daliip, const char *srcCluster, const char *prefix)
  519. {
  520. ensureFile(ln, 0, NULL, false, nullptr, daliip, srcCluster, prefix);
  521. }
  522. void ReferencedFileList::addFiles(StringArray &files)
  523. {
  524. ForEachItemIn(i, files)
  525. addFile(files.item(i));
  526. }
  527. void ReferencedFileList::addFileFromSubFile(IPropertyTree &subFile, const char *ip, const char *cluster, const char *prefix)
  528. {
  529. addFile(subFile.queryProp("@value"), ip, cluster, prefix);
  530. }
  531. void ReferencedFileList::addFilesFromSuperFile(IPropertyTree &superFile, const char *_ip, const char *_cluster, const char *_prefix)
  532. {
  533. StringBuffer ip;
  534. StringBuffer cluster;
  535. StringBuffer prefix;
  536. splitDerivedDfsLocation(superFile.queryProp("@daliip"), cluster, ip, prefix, NULL, _cluster, _ip, _prefix);
  537. if (superFile.hasProp("@sourceCluster"))
  538. cluster.set(superFile.queryProp("@sourceCluster"));
  539. Owned<IPropertyTreeIterator> subFiles = superFile.getElements("SubFile[@value]");
  540. ForEach(*subFiles)
  541. addFileFromSubFile(subFiles->query(), ip, cluster, prefix);
  542. }
  543. void ReferencedFileList::addFilesFromPackage(IPropertyTree &package, const char *_ip, const char *_cluster, const char *_prefix)
  544. {
  545. StringBuffer ip;
  546. StringBuffer cluster;
  547. StringBuffer prefix;
  548. splitDerivedDfsLocation(package.queryProp("@daliip"), cluster, ip, prefix, NULL, _cluster, _ip, _prefix);
  549. if (package.hasProp("@sourceCluster"))
  550. cluster.set(package.queryProp("@sourceCluster"));
  551. Owned<IPropertyTreeIterator> supers = package.getElements("SuperFile");
  552. ForEach(*supers)
  553. addFilesFromSuperFile(supers->query(), ip, cluster, prefix);
  554. }
  555. void ReferencedFileList::addFilesFromPackageMap(IPropertyTree *pm)
  556. {
  557. StringBuffer ip;
  558. StringBuffer cluster;
  559. StringBuffer prefix;
  560. const char *srcCluster = pm->queryProp("@sourceCluster");
  561. splitDerivedDfsLocation(pm->queryProp("@daliip"), cluster, ip, prefix, srcCluster, srcCluster, NULL, NULL);
  562. Owned<IPropertyTreeIterator> packages = pm->getElements("Package");
  563. ForEach(*packages)
  564. addFilesFromPackage(packages->query(), ip, cluster, prefix);
  565. packages.setown(pm->getElements("Part/Package"));
  566. ForEach(*packages)
  567. addFilesFromPackage(packages->query(), ip, cluster, prefix);
  568. }
  569. bool ReferencedFileList::addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackage *pkg)
  570. {
  571. Owned<IConstWUGraphIterator> graphs = &cw->getGraphs(GraphTypeActivities);
  572. ForEach(*graphs)
  573. {
  574. Owned <IPropertyTree> xgmml = graphs->query().getXGMMLTree(false);
  575. Owned<IPropertyTreeIterator> iter = xgmml->getElements("//node[att/@name='_*ileName']");
  576. ForEach(*iter)
  577. {
  578. IPropertyTree &node = iter->query();
  579. bool isOpt = false;
  580. const char *logicalName = node.queryProp("att[@name='_fileName']/@value");
  581. if (!logicalName)
  582. logicalName = node.queryProp("att[@name='_indexFileName']/@value");
  583. if (!logicalName)
  584. continue;
  585. isOpt = node.getPropBool("att[@name='_isIndexOpt']/@value");
  586. if (!isOpt)
  587. isOpt = node.getPropBool("att[@name='_isOpt']/@value");
  588. ThorActivityKind kind = (ThorActivityKind) node.getPropInt("att[@name='_kind']/@value", TAKnone);
  589. //not likely to be part of roxie queries, but for forward compatibility:
  590. if(kind==TAKdiskwrite || kind==TAKspillwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite || kind==TAKjsonwrite)
  591. continue;
  592. if (node.getPropBool("att[@name='_isSpill']/@value") ||
  593. node.getPropBool("att[@name='_isTransformSpill']/@value"))
  594. continue;
  595. StringArray subfileNames;
  596. unsigned flags = isOpt ? RefFileOptional : RefFileNotOptional;
  597. if (pkg)
  598. {
  599. const char *pkgid = pkg->locateSuperFile(logicalName);
  600. if (pkgid)
  601. {
  602. flags |= (RefFileSuper | RefFileInPackage);
  603. Owned<ISimpleSuperFileEnquiry> ssfe = pkg->resolveSuperFile(logicalName);
  604. if (ssfe && ssfe->numSubFiles()>0)
  605. {
  606. unsigned count = ssfe->numSubFiles();
  607. while (count--)
  608. {
  609. StringBuffer subfile;
  610. ssfe->getSubFileName(count, subfile);
  611. ensureFile(subfile, RefSubFile | RefFileInPackage, pkgid, false, nullptr);
  612. subfileNames.append(subfile);
  613. }
  614. }
  615. }
  616. ensureFile(logicalName, flags, pkgid, pkg->isCompulsory(), &subfileNames);
  617. }
  618. else
  619. ensureFile(logicalName, flags, NULL, false, &subfileNames);
  620. }
  621. }
  622. return pkg ? pkg->isCompulsory() : false;
  623. }
  624. bool ReferencedFileList::addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackageMap *pm, const char *queryid)
  625. {
  626. const IHpccPackage *pkg = NULL;
  627. if (pm && queryid && *queryid)
  628. {
  629. pkg = pm->matchPackage(queryid);
  630. }
  631. return addFilesFromQuery(cw, pkg);
  632. }
  633. void ReferencedFileList::addFilesFromWorkUnit(IConstWorkUnit *cw)
  634. {
  635. addFilesFromQuery(cw, NULL, NULL);
  636. }
  637. void ReferencedFileList::resolveSubFiles(StringArray &subfiles, bool checkLocalFirst, bool trackSubFiles, bool resolveForeign)
  638. {
  639. StringArray childSubFiles;
  640. ForEachItemIn(i, subfiles)
  641. {
  642. const char *lfn = subfiles.item(i);
  643. if (!allowForeign && checkForeign(lfn))
  644. throw MakeStringException(-1, "Foreign sub file not allowed: %s", lfn);
  645. Owned<ReferencedFile> file = new ReferencedFile(lfn, NULL, NULL, NULL, true, 0, NULL, false, allowSizeCalc);
  646. if (file->logicalName.length() && !map.getValue(file->getLogicalName()))
  647. {
  648. file->resolve(process.get(), srcCluster, user, remote, remotePrefix, checkLocalFirst, &childSubFiles, trackSubFiles, resolveForeign);
  649. const char *ln = file->getLogicalName();
  650. // NOTE: setValue links its parameter
  651. map.setValue(ln, file);
  652. }
  653. }
  654. if (childSubFiles.length())
  655. resolveSubFiles(childSubFiles, checkLocalFirst, trackSubFiles, resolveForeign);
  656. }
  657. void ReferencedFileList::resolveFiles(const char *_process, const char *remoteIP, const char *_remotePrefix, const char *_srcCluster, bool checkLocalFirst, bool expandSuperFiles, bool trackSubFiles, bool resolveForeign)
  658. {
  659. process.set(_process);
  660. remote.setown((remoteIP && *remoteIP) ? createINode(remoteIP, 7070) : NULL);
  661. srcCluster.set(_srcCluster);
  662. remotePrefix.set(_remotePrefix);
  663. StringArray subfiles;
  664. {
  665. ReferencedFileIterator files(this);
  666. ForEach(files)
  667. files.queryObject().resolve(process, srcCluster, user, remote, remotePrefix, checkLocalFirst, expandSuperFiles ? &subfiles : NULL, trackSubFiles, resolveForeign);
  668. }
  669. if (expandSuperFiles)
  670. resolveSubFiles(subfiles, checkLocalFirst, trackSubFiles, resolveForeign);
  671. }
  672. void ReferencedFileList::cloneFileInfo(unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder)
  673. {
  674. ReferencedFileIterator files(this);
  675. ForEach(files)
  676. files.queryObject().cloneInfo(updateFlags, helper, user, process, srcCluster, cloneForeign, redundancy, channelsPerNode, replicateOffset, defReplicateFolder);
  677. if (cloneSuperInfo)
  678. ForEach(files)
  679. files.queryObject().cloneSuperInfo(updateFlags, this, user, remote);
  680. }
  681. void ReferencedFileList::cloneRelationships()
  682. {
  683. if (!remote || remote->endpoint().isNull())
  684. return;
  685. StringBuffer addr;
  686. remote->endpoint().getUrlStr(addr);
  687. IDistributedFileDirectory &dir = queryDistributedFileDirectory();
  688. ReferencedFileIterator files(this);
  689. ForEach(files)
  690. {
  691. ReferencedFile &file = files.queryObject();
  692. if (!(file.getFlags() & RefFileRemote))
  693. continue;
  694. Owned<IFileRelationshipIterator> iter = dir.lookupFileRelationships(file.getLogicalName(), NULL,
  695. NULL, NULL, NULL, NULL, NULL, addr.str(), WF_LOOKUP_TIMEOUT);
  696. ForEach(*iter)
  697. {
  698. IFileRelationship &r=iter->query();
  699. const char* assoc = r.querySecondaryFilename();
  700. if (!assoc)
  701. continue;
  702. if (*assoc == '~')
  703. assoc++;
  704. IReferencedFile *refAssoc = map.getValue(assoc);
  705. if (refAssoc && !(refAssoc->getFlags() & RefFileCopyInfoFailed))
  706. {
  707. dir.addFileRelationship(file.getLogicalName(), assoc, r.queryPrimaryFields(), r.querySecondaryFields(),
  708. r.queryKind(), r.queryCardinality(), r.isPayload(), user, r.queryDescription());
  709. }
  710. }
  711. }
  712. }
  713. IReferencedFileIterator *ReferencedFileList::getFiles()
  714. {
  715. return new ReferencedFileIterator(this);
  716. }
  717. IReferencedFileList *createReferencedFileList(const char *user, const char *pw, bool allowForeignFiles, bool allowFileSizeCalc)
  718. {
  719. return new ReferencedFileList(user, pw, allowForeignFiles, allowFileSizeCalc);
  720. }
  721. IReferencedFileList *createReferencedFileList(IUserDescriptor *user, bool allowForeignFiles, bool allowFileSizeCalc)
  722. {
  723. return new ReferencedFileList(user, allowForeignFiles, allowFileSizeCalc);
  724. }