ws_dfsclient.cpp 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2022 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <vector>
  14. #include "jliball.hpp"
  15. #include "jflz.hpp"
  16. #include "jsecrets.hpp"
  17. #include "seclib.hpp"
  18. #include "ws_dfs.hpp"
  19. #include "workunit.hpp"
  20. #include "eclwatch_errorlist.hpp" // only for ECLWATCH_FILE_NOT_EXIST
  21. #include "soapmessage.hpp"
  22. #include "soapbind.hpp"
  23. #include "dafdesc.hpp"
  24. #include "dadfs.hpp"
  25. #include "dautils.hpp"
  26. #ifndef _CONTAINERIZED
  27. #include "environment.hpp"
  28. #endif
  29. #include "ws_dfsclient.hpp"
  30. namespace wsdfs
  31. {
  32. class CKeepAliveThread : public CSimpleInterface, implements IThreaded
  33. {
  34. CThreaded threaded;
  35. unsigned periodMs;
  36. Semaphore sem;
  37. public:
  38. CKeepAliveThread(unsigned _periodSecs) : threaded("CKeepAliveThread", this), periodMs(_periodSecs * 1000)
  39. {
  40. threaded.start();
  41. }
  42. void stop()
  43. {
  44. sem.signal();
  45. }
  46. virtual void threadmain() override
  47. {
  48. while (true)
  49. {
  50. if (sem.wait(periodMs))
  51. return;
  52. }
  53. }
  54. };
  55. template <class INTERFACE>
  56. class CServiceDistributedFileBase : public CSimpleInterfaceOf<INTERFACE>
  57. {
  58. protected:
  59. Linked<IDFSFile> dfsFile;
  60. StringAttr logicalName;
  61. Owned<IDistributedFile> legacyDFSFile;
  62. Owned<IFileDescriptor> fileDesc;
  63. class CDistributedSuperFileIterator: public CSimpleInterfaceOf<IDistributedSuperFileIterator>
  64. {
  65. Linked<IDFSFile> source;
  66. Owned<IDistributedSuperFile> cur;
  67. std::vector<std::string> owners;
  68. unsigned which = 0;
  69. void setCurrent(unsigned w)
  70. {
  71. VStringBuffer lfn("~remote::%s::%s", source->queryRemoteName(), owners[w].c_str());
  72. Owned<IDFSFile> dfsFile = lookupDFSFile(lfn, source->queryTimeoutSecs(), keepAliveExpiryFrequency, source->queryUserDescriptor());
  73. if (!dfsFile)
  74. throw makeStringExceptionV(0, "Failed to open superfile %s", lfn.str());
  75. if (!dfsFile->numSubFiles())
  76. throwUnexpected();
  77. Owned<IDistributedFile> legacyDFSFile = createLegacyDFSFile(dfsFile);
  78. IDistributedSuperFile *super = legacyDFSFile->querySuperFile();
  79. assertex(super);
  80. cur.set(super);
  81. }
  82. public:
  83. CDistributedSuperFileIterator(IDFSFile *_source, std::vector<std::string> _owners) : source(_source), owners(_owners)
  84. {
  85. }
  86. virtual bool first() override
  87. {
  88. if (owners.empty())
  89. return false;
  90. which = 0;
  91. setCurrent(which);
  92. return true;
  93. }
  94. virtual bool next() override
  95. {
  96. if (which == (owners.size()-1))
  97. {
  98. cur.clear();
  99. return false;
  100. }
  101. ++which;
  102. setCurrent(which);
  103. return true;
  104. }
  105. virtual bool isValid() override
  106. {
  107. return cur != nullptr;
  108. }
  109. virtual IDistributedSuperFile &query() override
  110. {
  111. return *cur;
  112. }
  113. virtual const char *queryName() override
  114. {
  115. if (!isValid())
  116. return nullptr;
  117. return owners[which].c_str();
  118. }
  119. };
  120. public:
  121. CServiceDistributedFileBase(IDFSFile *_dfsFile) : dfsFile(_dfsFile)
  122. {
  123. logicalName.set(dfsFile->queryFileMeta()->queryProp("@name"));
  124. }
  125. virtual unsigned numParts() override { return legacyDFSFile->numParts(); }
  126. virtual IDistributedFilePart &queryPart(unsigned idx) override { return legacyDFSFile->queryPart(idx); }
  127. virtual IDistributedFilePart* getPart(unsigned idx) override { return legacyDFSFile->getPart(idx); }
  128. virtual StringBuffer &getLogicalName(StringBuffer &name) override { return legacyDFSFile->getLogicalName(name); }
  129. virtual const char *queryLogicalName() override { return legacyDFSFile->queryLogicalName(); }
  130. virtual IDistributedFilePartIterator *getIterator(IDFPartFilter *filter=NULL) override { return legacyDFSFile->getIterator(filter); }
  131. virtual IFileDescriptor *getFileDescriptor(const char *clustername=NULL) override { return fileDesc.getLink(); }
  132. virtual const char *queryDefaultDir() override { return legacyDFSFile->queryDefaultDir(); }
  133. virtual const char *queryPartMask() override { return legacyDFSFile->queryPartMask(); }
  134. virtual IPropertyTree &queryAttributes() override { return legacyDFSFile->queryAttributes(); }
  135. virtual bool lockProperties(unsigned timeoutms=INFINITE) override
  136. {
  137. // TODO: implement. But for now only foreign [read] files are supported, where updates and locking have never been implemented.
  138. return true;
  139. }
  140. virtual void unlockProperties(DFTransactionState state=TAS_NONE) override
  141. {
  142. // TODO: implement. But for now only foreign [read] files are supported, where updates and locking have never been implemented.
  143. }
  144. virtual bool getModificationTime(CDateTime &dt) override { return legacyDFSFile->getModificationTime(dt); }
  145. virtual bool getAccessedTime(CDateTime &dt) override { return legacyDFSFile->getAccessedTime(dt); }
  146. virtual unsigned numCopies(unsigned partno) override { return legacyDFSFile->numCopies(partno); }
  147. virtual bool existsPhysicalPartFiles(unsigned short port) override
  148. {
  149. return legacyDFSFile->existsPhysicalPartFiles(port);
  150. }
  151. virtual __int64 getFileSize(bool allowphysical, bool forcephysical) override
  152. {
  153. return legacyDFSFile->getFileSize(allowphysical, forcephysical);
  154. }
  155. virtual __int64 getDiskSize(bool allowphysical, bool forcephysical) override
  156. {
  157. return legacyDFSFile->getDiskSize(allowphysical, forcephysical);
  158. }
  159. virtual bool getFileCheckSum(unsigned &checksum) override { return legacyDFSFile->getFileCheckSum(checksum); }
  160. virtual unsigned getPositionPart(offset_t pos,offset_t &base) override { return legacyDFSFile->getPositionPart(pos,base); }
  161. virtual IDistributedSuperFile *querySuperFile() override
  162. {
  163. return nullptr;
  164. }
  165. virtual IDistributedSuperFileIterator *getOwningSuperFiles(IDistributedFileTransaction *transaction=NULL) override
  166. {
  167. if (transaction)
  168. throwUnexpected();
  169. Owned<IPropertyTreeIterator> iter = dfsFile->queryFileMeta()->getElements("SuperFile/SuperOwner");
  170. std::vector<std::string> superOwners;
  171. StringBuffer pname;
  172. ForEach(*iter)
  173. {
  174. iter->query().getProp("@name",pname.clear());
  175. if (pname.length())
  176. superOwners.push_back(pname.str());
  177. }
  178. return new CDistributedSuperFileIterator(dfsFile, superOwners);
  179. }
  180. virtual bool isCompressed(bool *blocked=NULL) override { return legacyDFSFile->isCompressed(blocked); }
  181. virtual StringBuffer &getClusterName(unsigned clusternum,StringBuffer &name) override { return legacyDFSFile->getClusterName(clusternum,name); }
  182. virtual unsigned getClusterNames(StringArray &clusters) override { return legacyDFSFile->getClusterNames(clusters); } // (use findCluster)
  183. virtual unsigned numClusters() override { return legacyDFSFile->numClusters(); }
  184. virtual unsigned findCluster(const char *clustername) override { return legacyDFSFile->findCluster(clustername); }
  185. virtual ClusterPartDiskMapSpec &queryPartDiskMapping(unsigned clusternum) override { return legacyDFSFile->queryPartDiskMapping(clusternum); }
  186. virtual IGroup *queryClusterGroup(unsigned clusternum) override { return legacyDFSFile->queryClusterGroup(clusternum); }
  187. virtual StringBuffer &getClusterGroupName(unsigned clusternum, StringBuffer &name) override
  188. {
  189. return fileDesc->getClusterGroupName(clusternum, name);
  190. }
  191. virtual StringBuffer &getECL(StringBuffer &buf) override { return legacyDFSFile->getECL(buf); }
  192. virtual bool canModify(StringBuffer &reason) override
  193. {
  194. return false;
  195. }
  196. virtual bool canRemove(StringBuffer &reason,bool ignoresub=false) override
  197. {
  198. return false;
  199. }
  200. virtual bool checkClusterCompatible(IFileDescriptor &fdesc, StringBuffer &err) override { return legacyDFSFile->checkClusterCompatible(fdesc,err); }
  201. virtual bool getFormatCrc(unsigned &crc) override { return legacyDFSFile->getFormatCrc(crc); }
  202. virtual bool getRecordSize(size32_t &rsz) override { return legacyDFSFile->getRecordSize(rsz); }
  203. virtual bool getRecordLayout(MemoryBuffer &layout, const char *attrname) override { return legacyDFSFile->getRecordLayout(layout,attrname); }
  204. virtual StringBuffer &getColumnMapping(StringBuffer &mapping) override { return legacyDFSFile->getColumnMapping(mapping); }
  205. virtual bool isRestrictedAccess() override { return legacyDFSFile->isRestrictedAccess(); }
  206. virtual unsigned setDefaultTimeout(unsigned timems) override { return legacyDFSFile->setDefaultTimeout(timems); }
  207. virtual void validate() override { legacyDFSFile->validate(); }
  208. virtual IPropertyTree *queryHistory() const override { return legacyDFSFile->queryHistory(); }
  209. virtual bool isExternal() const override { return false; }
  210. virtual bool getSkewInfo(unsigned &maxSkew, unsigned &minSkew, unsigned &maxSkewPart, unsigned &minSkewPart, bool calculateIfMissing) override { return legacyDFSFile->getSkewInfo(maxSkew, minSkew, maxSkewPart, minSkewPart, calculateIfMissing); }
  211. virtual int getExpire() override { return legacyDFSFile->getExpire(); }
  212. virtual void getCost(const char * cluster, double & atRestCost, double & accessCost) override { legacyDFSFile->getCost(cluster, atRestCost, accessCost); }
  213. // setters (change file meta data)
  214. virtual void setPreferredClusters(const char *clusters) override { legacyDFSFile->setPreferredClusters(clusters); }
  215. virtual void setSingleClusterOnly() override { legacyDFSFile->setSingleClusterOnly(); }
  216. virtual void addCluster(const char *clustername,const ClusterPartDiskMapSpec &mspec) override { legacyDFSFile->addCluster(clustername, mspec); }
  217. virtual bool removeCluster(const char *clustername) override { return legacyDFSFile->removeCluster(clustername); }
  218. virtual void updatePartDiskMapping(const char *clustername,const ClusterPartDiskMapSpec &spec) override { legacyDFSFile->updatePartDiskMapping(clustername, spec); }
  219. /* NB/TBD: these modifications are only effecting this instance, the changes are not propagated to Dali
  220. * This is the same behaviour when foreign files are used, but will need addressing in future.
  221. */
  222. virtual void setModificationTime(const CDateTime &dt) override
  223. {
  224. legacyDFSFile->setModificationTime(dt);
  225. }
  226. virtual void setModified() override
  227. {
  228. legacyDFSFile->setModified();
  229. }
  230. virtual void setAccessedTime(const CDateTime &dt) override
  231. {
  232. legacyDFSFile->setAccessedTime(dt);
  233. }
  234. virtual void setAccessed() override
  235. {
  236. legacyDFSFile->setAccessed();
  237. }
  238. virtual void addAttrValue(const char *attr, unsigned __int64 value) override
  239. {
  240. legacyDFSFile->addAttrValue(attr, value);
  241. }
  242. virtual void setExpire(int expireDays) override
  243. {
  244. legacyDFSFile->setExpire(expireDays);
  245. }
  246. virtual void setECL(const char *ecl) override
  247. {
  248. legacyDFSFile->setECL(ecl);
  249. }
  250. virtual void resetHistory() override
  251. {
  252. legacyDFSFile->resetHistory();
  253. }
  254. virtual void setProtect(const char *callerid, bool protect=true, unsigned timeoutms=INFINITE) override
  255. {
  256. legacyDFSFile->setProtect(callerid, protect, timeoutms);
  257. }
  258. virtual void setColumnMapping(const char *mapping) override
  259. {
  260. legacyDFSFile->setColumnMapping(mapping);
  261. }
  262. virtual void setRestrictedAccess(bool restricted) override
  263. {
  264. legacyDFSFile->setRestrictedAccess(restricted);
  265. }
  266. virtual bool renamePhysicalPartFiles(const char *newlfn,const char *cluster=NULL,IMultiException *exceptions=NULL,const char *newbasedir=NULL) override
  267. {
  268. UNIMPLEMENTED_X("CServiceDistributedFileBase::renamePhysicalPartFiles");
  269. }
  270. virtual void rename(const char *logicalname,IUserDescriptor *user) override
  271. {
  272. UNIMPLEMENTED_X("CServiceDistributedFileBase::rename");
  273. }
  274. virtual void attach(const char *logicalname,IUserDescriptor *user) override
  275. {
  276. UNIMPLEMENTED_X("CServiceDistributedFileBase::rename");
  277. }
  278. virtual void detach(unsigned timeoutms=INFINITE, ICodeContext *ctx=NULL) override
  279. {
  280. UNIMPLEMENTED_X("CServiceDistributedFileBase::detach");
  281. }
  282. virtual void enqueueReplicate() override
  283. {
  284. UNIMPLEMENTED_X("CServiceDistributedFileBase::enqueueReplicate");
  285. }
  286. };
  287. class CServiceDistributedFile : public CServiceDistributedFileBase<IDistributedFile>
  288. {
  289. typedef CServiceDistributedFileBase<IDistributedFile> PARENT;
  290. public:
  291. CServiceDistributedFile(IDFSFile *_dfsFile) : PARENT(_dfsFile)
  292. {
  293. IPropertyTree *file = dfsFile->queryFileMeta()->queryPropTree("File");
  294. const char *remoteName = dfsFile->queryRemoteName(); // NB: null if local
  295. if (!isEmptyString(remoteName))
  296. {
  297. Owned<IPropertyTree> remoteStorage = getRemoteStorage(remoteName);
  298. if (!remoteStorage)
  299. throw makeStringExceptionV(0, "Remote storage '%s' not found", remoteName);
  300. if (!remoteStorage->getPropBool("@useDafilesrv"))
  301. {
  302. // Path translation is necessary, because the local plane will not necessarily have the same
  303. // prefix. In particular, both a local and remote plane may want to use the same prefix/mount.
  304. // So, the local plane will be defined with a unique prefix locally.
  305. // Files backed by URL's or hostGroups will be access directly, are not mounted, and do not require
  306. // this translation.
  307. const char *remotePlaneName = file->queryProp("@group");
  308. VStringBuffer planeXPath("planes[@name=\"%s\"]", remotePlaneName);
  309. IPropertyTree *filePlane = dfsFile->queryCommonMeta()->queryPropTree(planeXPath);
  310. assertex(filePlane);
  311. const char *filePlanePrefix = filePlane->queryProp("@prefix");
  312. if (isAbsolutePath(filePlanePrefix) && !filePlane->hasProp("@hosts")) // otherwise assume url
  313. {
  314. #ifndef _CONTAINERIZED
  315. throw makeStringException(0, "Bare metal does not support remote file access to planes without hosts");
  316. #endif
  317. // A external plane within another environment backed by a PVC, will need a pre-existing
  318. // corresponding plane and PVC in the local environment.
  319. // The local plane will be associated with the remote environment, via a storage/remote mapping.
  320. VStringBuffer remotePlaneXPath("planes[@remote='%s']/@local", remotePlaneName);
  321. const char *localMappedPlaneName = remoteStorage->queryProp(remotePlaneXPath);
  322. if (isEmptyString(localMappedPlaneName))
  323. throw makeStringExceptionV(0, "Remote plane '%s' not found in remote storage definition '%s'", remotePlaneName, remoteName);
  324. Owned<IStoragePlane> localPlane = getRemoteStoragePlane(localMappedPlaneName, false);
  325. if (!localPlane)
  326. throw makeStringExceptionV(0, "Local plane not found, mapped to by remote storage '%s' (%s->%s)", remoteName, remotePlaneName, localMappedPlaneName);
  327. DBGLOG("Remote logical file '%s' using remote storage '%s', mapping remote plane '%s' to local plane '%s'", logicalName.str(), remoteName, remotePlaneName, localMappedPlaneName);
  328. StringBuffer filePlanePrefix;
  329. filePlane->getProp("@prefix", filePlanePrefix);
  330. if (filePlane->hasProp("@subPath"))
  331. filePlanePrefix.append('/').append(filePlane->queryProp("@subPath"));
  332. // the plane prefix should match the base of file's base directory
  333. // Q: what if the plane has been redefined since the files were created?
  334. VStringBuffer clusterXPath("Cluster[@name=\"%s\"]", remotePlaneName);
  335. IPropertyTree *cluster = file->queryPropTree(clusterXPath);
  336. assertex(cluster);
  337. const char *clusterDir = cluster->queryProp("@defaultBaseDir");
  338. assertex(startsWith(clusterDir, filePlanePrefix));
  339. clusterDir += filePlanePrefix.length();
  340. StringBuffer newPath(localPlane->queryPrefix());
  341. if (strlen(clusterDir))
  342. newPath.append(clusterDir); // add remaining tail of path
  343. cluster->setProp("@defaultBaseDir", newPath.str());
  344. const char *dir = file->queryProp("@directory");
  345. assertex(startsWith(dir, filePlanePrefix));
  346. dir += filePlanePrefix.length();
  347. newPath.clear().append(localPlane->queryPrefix());
  348. if (strlen(dir))
  349. newPath.append(dir); // add remaining tail of path
  350. DBGLOG("Remapping logical file directory to '%s'", newPath.str());
  351. file->setProp("@directory", newPath.str());
  352. }
  353. }
  354. }
  355. fileDesc.setown(deserializeFileDescriptorTree(file));
  356. if (fileDesc)
  357. fileDesc->setTraceName(logicalName);
  358. legacyDFSFile.setown(queryDistributedFileDirectory().createNew(fileDesc, logicalName));
  359. }
  360. };
  361. class CServiceSuperDistributedFile : public CServiceDistributedFileBase<IDistributedSuperFile>
  362. {
  363. typedef CServiceDistributedFileBase<IDistributedSuperFile> PARENT;
  364. Owned<IDistributedSuperFile> legacyDFSSuperFile;
  365. public:
  366. CServiceSuperDistributedFile(IDFSFile *_dfsFile) : PARENT(_dfsFile)
  367. {
  368. IArrayOf<IDistributedFile> subFiles;
  369. unsigned subs = dfsFile->numSubFiles();
  370. for (unsigned s=0; s<subs; s++)
  371. {
  372. Owned<IDFSFile> subFile = dfsFile->getSubFile(s);
  373. Owned<IDistributedFile> legacyDFSFile = createLegacyDFSFile(subFile);
  374. subFiles.append(*legacyDFSFile.getClear());
  375. }
  376. legacyDFSSuperFile.setown(queryDistributedFileDirectory().createNewSuperFile(dfsFile->queryFileMeta()->queryPropTree("SuperFile"), logicalName, &subFiles));
  377. legacyDFSFile.set(legacyDFSSuperFile);
  378. fileDesc.setown(legacyDFSSuperFile->getFileDescriptor());
  379. }
  380. // IDistributedFile overrides
  381. virtual IDistributedSuperFile *querySuperFile() override
  382. {
  383. return this;
  384. }
  385. // IDistributedSuperFile overrides
  386. virtual IDistributedFile &querySubFile(unsigned idx,bool sub) override
  387. {
  388. return legacyDFSSuperFile->querySubFile(idx, sub);
  389. }
  390. virtual IDistributedFile *querySubFileNamed(const char *name, bool sub) override
  391. {
  392. return legacyDFSSuperFile->querySubFileNamed(name, sub);
  393. }
  394. virtual IDistributedFile *getSubFile(unsigned idx,bool sub) override
  395. {
  396. return legacyDFSSuperFile->getSubFile(idx, sub);
  397. }
  398. virtual unsigned numSubFiles(bool sub) override
  399. {
  400. return legacyDFSSuperFile->numSubFiles(sub);
  401. }
  402. virtual bool isInterleaved() override
  403. {
  404. return legacyDFSSuperFile->isInterleaved();
  405. }
  406. virtual IDistributedFile *querySubPart(unsigned partidx,unsigned &subfileidx) override
  407. {
  408. return legacyDFSSuperFile->querySubPart(partidx, subfileidx);
  409. }
  410. virtual unsigned getPositionPart(offset_t pos, offset_t &base) override
  411. {
  412. return legacyDFSSuperFile->getPositionPart(pos, base);
  413. }
  414. virtual IDistributedFileIterator *getSubFileIterator(bool supersub=false) override
  415. {
  416. return legacyDFSSuperFile->getSubFileIterator(supersub);
  417. }
  418. virtual void validate() override
  419. {
  420. if (!legacyDFSSuperFile->existsPhysicalPartFiles(0))
  421. {
  422. const char * logicalName = queryLogicalName();
  423. throw MakeStringException(-1, "Some physical parts do not exists, for logical file : %s",(isEmptyString(logicalName) ? "[unattached]" : logicalName));
  424. }
  425. }
  426. // IDistributedSuperFile
  427. virtual void addSubFile(const char *subfile, bool before=false, const char *other=NULL, bool addcontents=false, IDistributedFileTransaction *transaction=NULL) override
  428. {
  429. UNIMPLEMENTED_X("CServiceSuperDistributedFile::addSubFile");
  430. }
  431. virtual bool removeSubFile(const char *subfile, bool remsub, bool remcontents=false, IDistributedFileTransaction *transaction=NULL) override
  432. {
  433. UNIMPLEMENTED_X("CServiceSuperDistributedFile::removeSubFile");
  434. }
  435. virtual bool removeOwnedSubFiles(bool remsub, IDistributedFileTransaction *transaction=NULL) override
  436. {
  437. UNIMPLEMENTED_X("CServiceSuperDistributedFile::removeOwnedSubFiles");
  438. }
  439. virtual bool swapSuperFile( IDistributedSuperFile *_file, IDistributedFileTransaction *transaction) override
  440. {
  441. UNIMPLEMENTED_X("CServiceSuperDistributedFile::swapSuperFile");
  442. }
  443. };
  444. static IDFSFile *createDFSFile(IPropertyTree *commonMeta, IPropertyTree *fileMeta, const char *remoteName, unsigned timeoutSecs, IUserDescriptor *userDesc);
  445. class CDFSFile : public CSimpleInterfaceOf<IDFSFile>
  446. {
  447. Linked<IPropertyTree> commonMeta; // e.g. share info between IFDSFiles, e.g. common plane info between subfiles
  448. Linked<IPropertyTree> fileMeta;
  449. unsigned __int64 lockId;
  450. std::vector<Owned<IDFSFile>> subFiles;
  451. StringAttr remoteName;
  452. unsigned timeoutSecs;
  453. Linked<IUserDescriptor> userDesc;
  454. public:
  455. CDFSFile(IPropertyTree *_commonMeta, IPropertyTree *_fileMeta, const char *_remoteName, unsigned _timeoutSecs, IUserDescriptor *_userDesc)
  456. : commonMeta(_commonMeta), fileMeta(_fileMeta), remoteName(_remoteName), timeoutSecs(_timeoutSecs), userDesc(_userDesc)
  457. {
  458. lockId = fileMeta->getPropInt64("@lockId");
  459. if (fileMeta->getPropBool("@isSuper"))
  460. {
  461. Owned<IPropertyTreeIterator> iter = fileMeta->getElements("FileMeta");
  462. ForEach(*iter)
  463. {
  464. IPropertyTree &subMeta = iter->query();
  465. subFiles.push_back(createDFSFile(commonMeta, &subMeta, remoteName, timeoutSecs, userDesc));
  466. }
  467. }
  468. }
  469. virtual IPropertyTree *queryFileMeta() const override
  470. {
  471. return fileMeta;
  472. }
  473. virtual IPropertyTree *queryCommonMeta() const override
  474. {
  475. return commonMeta;
  476. }
  477. virtual unsigned __int64 getLockId() const override
  478. {
  479. return lockId;
  480. }
  481. virtual unsigned numSubFiles() const override
  482. {
  483. return (unsigned)subFiles.size();
  484. }
  485. virtual IDFSFile *getSubFile(unsigned idx) const override
  486. {
  487. return LINK(subFiles[idx]);
  488. }
  489. virtual const char *queryRemoteName() const override
  490. {
  491. return remoteName;
  492. }
  493. virtual IUserDescriptor *queryUserDescriptor() const override
  494. {
  495. return userDesc.getLink();
  496. }
  497. virtual unsigned queryTimeoutSecs() const override
  498. {
  499. return timeoutSecs;
  500. }
  501. };
  502. static IDFSFile *createDFSFile(IPropertyTree *commonMeta, IPropertyTree *fileMeta, const char *remoteName, unsigned timeoutSecs, IUserDescriptor *userDesc)
  503. {
  504. return new CDFSFile(commonMeta, fileMeta, remoteName, timeoutSecs, userDesc);
  505. }
  506. IClientWsDfs *getDfsClient(const char *serviceUrl, IUserDescriptor *userDesc)
  507. {
  508. // JCSMORE - can I reuse these, are they thread safe (AFishbeck?)
  509. VStringBuffer dfsUrl("%s/WsDfs", serviceUrl);
  510. Owned<IClientWsDfs> dfsClient = createWsDfsClient();
  511. dfsClient->addServiceUrl(dfsUrl);
  512. StringBuffer user, token;
  513. userDesc->getUserName(user),
  514. userDesc->getPassword(token);
  515. dfsClient->setUsernameToken(user, token, "");
  516. return dfsClient.getClear();
  517. }
  518. static CriticalSection localSecretCrit;
  519. static void configureClientSSL(IEspClientRpcSettings &rpc, const char *secretName)
  520. {
  521. /*
  522. * This is a bit of a kludge, it gets the certificates from secrets, and writes them to local temp strorage.
  523. * It does this so that it can pass the filename paths to rpc ssl / secure socket layer, which currently only
  524. * accepts filenames, not binary blobs from memory.
  525. */
  526. StringBuffer clientCertFilename, clientPrivateKeyFilename, caCertFilename;
  527. StringBuffer tempDirStr;
  528. verifyex(getConfigurationDirectory(getGlobalConfigSP()->queryPropTree("Directories"), "temp", "ssl", "ssl", tempDirStr));
  529. addPathSepChar(tempDirStr);
  530. tempDirStr.append(secretName);
  531. addPathSepChar(tempDirStr);
  532. clientCertFilename.append(tempDirStr).append("tls.crt");
  533. clientPrivateKeyFilename.append(tempDirStr).append("tls.key");
  534. caCertFilename.append(tempDirStr).append("ca.crt");
  535. CriticalBlock b(localSecretCrit);
  536. if (!checkDirExists(tempDirStr.str()))
  537. {
  538. Owned<IFile> dir = createIFile(tempDirStr.str());
  539. dir->createDirectory();
  540. StringBuffer secretValue;
  541. Owned<IFile> file = createIFile(clientCertFilename);
  542. Owned<IFileIO> io = file->open(IFOcreate);
  543. getSecretValue(secretValue, "storage", secretName, "tls.crt", true);
  544. io->write(0, secretValue.length(), secretValue.str());
  545. io->close();
  546. file.setown(createIFile(clientPrivateKeyFilename));
  547. io.setown(file->open(IFOcreate));
  548. getSecretValue(secretValue.clear(), "storage", secretName, "tls.key", true);
  549. io->write(0, secretValue.length(), secretValue.str());
  550. io->close();
  551. file.setown(createIFile(caCertFilename));
  552. io.setown(file->open(IFOcreate));
  553. getSecretValue(secretValue.clear(), "storage", secretName, "ca.crt", true);
  554. io->write(0, secretValue.length(), secretValue.str());
  555. io->close();
  556. }
  557. setRpcSSLOptions(rpc, true, clientCertFilename, clientPrivateKeyFilename, caCertFilename, false);
  558. }
  559. static CriticalSection serviceLeaseMapCS;
  560. static std::unordered_map<std::string, unsigned __int64> serviceLeaseMap;
  561. unsigned __int64 ensureClientLease(IClientWsDfs *dfsClient, const char *service, const char *secretName, IUserDescriptor *userDesc)
  562. {
  563. CriticalBlock block(serviceLeaseMapCS);
  564. auto r = serviceLeaseMap.find(service);
  565. if (r != serviceLeaseMap.end())
  566. return r->second;
  567. Owned<IClientLeaseRequest> leaseReq = dfsClient->createGetLeaseRequest();
  568. if (!isEmptyString(secretName))
  569. configureClientSSL(leaseReq->rpc(), secretName);
  570. leaseReq->setKeepAliveExpiryFrequency(keepAliveExpiryFrequency);
  571. Owned<IClientLeaseResponse> leaseResp;
  572. unsigned timeoutSecs = 60;
  573. CTimeMon tm(timeoutSecs*1000);
  574. while (true)
  575. {
  576. try
  577. {
  578. leaseResp.setown(dfsClient->GetLease(leaseReq));
  579. unsigned __int64 leaseId = leaseResp->getLeaseId();
  580. serviceLeaseMap[service] = leaseId;
  581. return leaseId;
  582. }
  583. catch (IException *e)
  584. {
  585. /* NB: there should really be a different IException class and a specific error code
  586. * The server knows it's an unsupported method.
  587. */
  588. if (SOAP_SERVER_ERROR != e->errorCode())
  589. throw;
  590. e->Release();
  591. }
  592. if (tm.timedout())
  593. throw makeStringExceptionV(0, "GetLease timed out: timeoutSecs=%u", timeoutSecs);
  594. Sleep(5000); // sanity sleep
  595. }
  596. }
  597. #ifndef _CONTAINERIZED
  598. static std::vector<std::string> dfsServiceUrls;
  599. static CriticalSection dfsServiceUrlCrit;
  600. static std::atomic<unsigned> currentDfsServiceUrl{0};
  601. static bool dfsServiceUrlsDiscovered = false;
  602. #endif
  603. IDFSFile *lookupDFSFile(const char *logicalName, unsigned timeoutSecs, unsigned keepAliveExpiryFrequency, IUserDescriptor *userDesc)
  604. {
  605. CDfsLogicalFileName lfn;
  606. lfn.set(logicalName);
  607. StringBuffer remoteName, remoteLogicalFileName;
  608. StringBuffer serviceUrl;
  609. StringBuffer serviceSecret;
  610. bool useDafilesrv = false;
  611. if (lfn.isRemote())
  612. {
  613. verifyex(lfn.getRemoteSpec(remoteName, remoteLogicalFileName));
  614. // "local" is a reserved remote name, used to mean the local environment
  615. // which will be auto-discovered.
  616. if (!strieq(remoteName, "local"))
  617. {
  618. Owned<IPropertyTree> remoteStorage = getRemoteStorage(remoteName.str());
  619. if (!remoteStorage)
  620. throw makeStringExceptionV(0, "Remote storage '%s' not found", remoteName.str());
  621. serviceUrl.set(remoteStorage->queryProp("@service"));
  622. serviceSecret.set(remoteStorage->queryProp("@secret"));
  623. logicalName = remoteLogicalFileName;
  624. useDafilesrv = remoteStorage->getPropBool("@useDafilesrv");
  625. }
  626. }
  627. if (!serviceUrl.length())
  628. {
  629. // auto-discover local environment dfs service.
  630. #ifdef _CONTAINERIZED
  631. // NB: only expected to be here if experimental option #option('dfsesp-localfiles', true); is in use.
  632. // This finds and uses local dfs service for local read lookukup.
  633. Owned<IPropertyTreeIterator> eclWatchServices = getGlobalConfigSP()->getElements("services[@type='dfs']");
  634. if (!eclWatchServices->first())
  635. throw makeStringException(-1, "Dfs service not defined in esp services");
  636. const IPropertyTree &eclWatch = eclWatchServices->query();
  637. StringBuffer eclWatchName;
  638. eclWatch.getProp("@name", eclWatchName);
  639. auto result = getExternalService(eclWatchName);
  640. if (result.first.empty())
  641. throw makeStringExceptionV(-1, "dfs '%s': service not found", eclWatchName.str());
  642. if (0 == result.second)
  643. throw makeStringExceptionV(-1, "dfs '%s': service port not defined", eclWatchName.str());
  644. const char *protocol = eclWatch.getPropBool("@tls") ? "https" : "http";
  645. serviceUrl.appendf("%s://%s:%u", protocol, result.first.c_str(), result.second);
  646. #else
  647. {
  648. CriticalBlock b(dfsServiceUrlCrit);
  649. if (!dfsServiceUrlsDiscovered)
  650. {
  651. dfsServiceUrlsDiscovered = true;
  652. getAccessibleServiceURLList("WsSMC", dfsServiceUrls);
  653. if (0 == dfsServiceUrls.size())
  654. throw makeStringException(-1, "Could not find any DFS services in the target HPCC configuration.");
  655. }
  656. }
  657. serviceUrl.append(dfsServiceUrls[currentDfsServiceUrl].c_str());
  658. currentDfsServiceUrl = (currentDfsServiceUrl+1 == dfsServiceUrls.size()) ? 0 : currentDfsServiceUrl+1;
  659. remoteName.clear(); // local
  660. #endif
  661. }
  662. bool useSSL = startsWith(serviceUrl, "https");
  663. if (!useSSL)
  664. serviceSecret.clear();
  665. DBGLOG("Looking up file '%s' on '%s'", logicalName, serviceUrl.str());
  666. Owned<IClientWsDfs> dfsClient = getDfsClient(serviceUrl, userDesc);
  667. unsigned __int64 clientLeaseId = ensureClientLease(dfsClient, serviceUrl, serviceSecret, userDesc);
  668. Owned<IClientDFSFileLookupResponse> dfsResp;
  669. Owned<IClientDFSFileLookupRequest> dfsReq = dfsClient->createDFSFileLookupRequest();
  670. if (useSSL && serviceSecret.length())
  671. configureClientSSL(dfsReq->rpc(), serviceSecret.str());
  672. dfsReq->setAccessViaDafilesrv(useDafilesrv);
  673. dfsReq->setName(logicalName);
  674. dfsReq->setLeaseId(clientLeaseId);
  675. CTimeMon tm(timeoutSecs*1000); // NB: this timeout loop is to cater for *a* esp disappearing (e.g. if behind load balancer)
  676. while (true)
  677. {
  678. try
  679. {
  680. unsigned remaining;
  681. if (tm.timedout(&remaining))
  682. break;
  683. dfsReq->setRequestTimeout(remaining/1000);
  684. dfsResp.setown(dfsClient->DFSFileLookup(dfsReq));
  685. const IMultiException *excep = &dfsResp->getExceptions(); // NB: warning despite getXX name, this does not Link
  686. if (excep->ordinality() > 0)
  687. throw LINK((IMultiException *)excep); // NB - const IException.. not caught in general..
  688. const char *base64Resp = dfsResp->getMeta();
  689. MemoryBuffer compressedRespMb;
  690. JBASE64_Decode(base64Resp, compressedRespMb);
  691. MemoryBuffer decompressedRespMb;
  692. fastLZDecompressToBuffer(decompressedRespMb, compressedRespMb);
  693. Owned<IPropertyTree> meta = createPTree(decompressedRespMb);
  694. IPropertyTree *fileMeta = meta->queryPropTree("FileMeta");
  695. if (!fileMeta) // file not found
  696. return nullptr;
  697. // remoteName empty if local
  698. return createDFSFile(meta, fileMeta, remoteName.length()?remoteName.str():nullptr, timeoutSecs, userDesc);
  699. }
  700. catch (IException *e)
  701. {
  702. /* NB: there should really be a different IException class and a specific error code
  703. * The server knows it's an unsupported method.
  704. */
  705. if (SOAP_SERVER_ERROR != e->errorCode())
  706. throw;
  707. e->Release();
  708. }
  709. if (tm.timedout())
  710. throw makeStringExceptionV(0, "DFSFileLookup timed out: file=%s, timeoutSecs=%u", logicalName, timeoutSecs);
  711. Sleep(5000); // sanity sleep
  712. }
  713. return nullptr; // should never be able to reach here, but keeps compiler happy
  714. }
  715. IDistributedFile *createLegacyDFSFile(IDFSFile *dfsFile)
  716. {
  717. if (dfsFile->queryFileMeta()->getPropBool("@isSuper"))
  718. return new CServiceSuperDistributedFile(dfsFile);
  719. else
  720. return new CServiceDistributedFile(dfsFile);
  721. }
  722. IDistributedFile *lookupLegacyDFSFile(const char *logicalName, unsigned timeoutSecs, unsigned keepAliveExpiryFrequency, IUserDescriptor *userDesc)
  723. {
  724. Owned<IDFSFile> dfsFile = lookupDFSFile(logicalName, timeoutSecs, keepAliveExpiryFrequency, userDesc);
  725. if (!dfsFile)
  726. return nullptr;
  727. return createLegacyDFSFile(dfsFile);
  728. }
  729. IDistributedFile *lookup(CDfsLogicalFileName &lfn, IUserDescriptor *user, AccessMode accessMode, bool hold, bool lockSuperOwner, IDistributedFileTransaction *transaction, bool priviledged, unsigned timeout)
  730. {
  731. bool viaDali = false;
  732. // DFS service currently only supports remote files
  733. if (isWrite(accessMode))
  734. viaDali = true;
  735. else
  736. {
  737. // switch to Dali if non-remote file, unless "dfsesp-localfiles" enabled (and non-external)
  738. if (!lfn.isRemote())
  739. {
  740. if (lfn.isExternal() || (!getComponentConfigSP()->getPropBool("dfsesp-localfiles")))
  741. viaDali = true;
  742. }
  743. }
  744. if (viaDali)
  745. return queryDistributedFileDirectory().lookup(lfn, user, accessMode, hold, lockSuperOwner, transaction, priviledged, timeout);
  746. return wsdfs::lookupLegacyDFSFile(lfn.get(), timeout, wsdfs::keepAliveExpiryFrequency, user);
  747. }
  748. IDistributedFile *lookup(const char *logicalFilename, IUserDescriptor *user, AccessMode accessMode, bool hold, bool lockSuperOwner, IDistributedFileTransaction *transaction, bool priviledged, unsigned timeout)
  749. {
  750. CDfsLogicalFileName lfn;
  751. lfn.set(logicalFilename);
  752. return lookup(lfn, user, accessMode, hold, lockSuperOwner, transaction, priviledged, timeout);
  753. }
  754. } // namespace wsdfs
  755. class CLocalOrDistributedFile: implements ILocalOrDistributedFile, public CInterface
  756. {
  757. bool fileExists;
  758. Owned<IDistributedFile> dfile;
  759. CDfsLogicalFileName lfn; // set if localpath but prob not useful
  760. StringAttr localpath;
  761. StringAttr fileDescPath;
  762. public:
  763. IMPLEMENT_IINTERFACE;
  764. CLocalOrDistributedFile()
  765. {
  766. fileExists = false;
  767. }
  768. virtual const char *queryLogicalName() override
  769. {
  770. return lfn.get();
  771. }
  772. virtual IDistributedFile * queryDistributedFile() override
  773. {
  774. return dfile.get();
  775. }
  776. bool init(const char *fname,IUserDescriptor *user,bool onlylocal,bool onlydfs, AccessMode accessMode, bool isPrivilegedUser, const StringArray *clusters)
  777. {
  778. fileExists = false;
  779. if (!onlydfs)
  780. lfn.allowOsPath(true);
  781. if (!lfn.setValidate(fname))
  782. return false;
  783. bool write = isWrite(accessMode);
  784. if (!onlydfs)
  785. {
  786. bool gotlocal = true;
  787. if (isAbsolutePath(fname)||(stdIoHandle(fname)>=0))
  788. localpath.set(fname);
  789. else if (!strstr(fname,"::"))
  790. {
  791. // treat it as a relative file
  792. StringBuffer fn;
  793. localpath.set(makeAbsolutePath(fname,fn).str());
  794. }
  795. else if (!lfn.isExternal())
  796. gotlocal = false;
  797. if (gotlocal)
  798. {
  799. if (!write && !onlylocal) // MORE - this means the dali access checks not happening... maybe that's ok?
  800. dfile.setown(wsdfs::lookup(lfn, user, accessMode, false, false, nullptr, isPrivilegedUser, INFINITE));
  801. Owned<IFile> file = getPartFile(0,0);
  802. if (file.get())
  803. {
  804. fileExists = file->exists();
  805. return fileExists || write;
  806. }
  807. }
  808. }
  809. if (!onlylocal)
  810. {
  811. if (lfn.isExternal() && !lfn.isRemote())
  812. {
  813. Owned<IFileDescriptor> fDesc = createExternalFileDescriptor(lfn.get());
  814. dfile.setown(queryDistributedFileDirectory().createExternal(fDesc, lfn.get()));
  815. Owned<IFile> file = getPartFile(0,0);
  816. if (file.get())
  817. fileExists = file->exists();
  818. if (write && lfn.isExternal()&&(dfile->numParts()==1)) // if it is writing to an external file then don't return distributed
  819. dfile.clear();
  820. return true;
  821. }
  822. else
  823. {
  824. dfile.setown(wsdfs::lookup(lfn, user, accessMode, false, false, nullptr, isPrivilegedUser, INFINITE));
  825. if (dfile.get())
  826. return true;
  827. }
  828. StringBuffer dir;
  829. unsigned stripeNum = 0;
  830. #ifdef _CONTAINERIZED
  831. StringBuffer cluster;
  832. if (clusters)
  833. {
  834. if (clusters->ordinality()>1)
  835. throw makeStringExceptionV(0, "Container mode does not yet support output to multiple clusters while writing file %s)", fname);
  836. cluster.append(clusters->item(0));
  837. }
  838. else
  839. getDefaultStoragePlane(cluster);
  840. Owned<IStoragePlane> plane = getDataStoragePlane(cluster, true);
  841. dir.append(plane->queryPrefix());
  842. unsigned numStripedDevices = plane->numDevices();
  843. stripeNum = calcStripeNumber(0, lfn.get(), numStripedDevices);
  844. #endif
  845. StringBuffer descPath;
  846. makePhysicalDirectory(descPath, lfn.get(), 0, DFD_OSdefault, dir);
  847. fileDescPath.set(descPath);
  848. // MORE - should we create the IDistributedFile here ready for publishing (and/or to make sure it's locked while we write)?
  849. StringBuffer physicalPath;
  850. makePhysicalPartName(lfn.get(), 1, 1, physicalPath, 0, DFD_OSdefault, dir, false, stripeNum); // more - may need to override path for roxie
  851. localpath.set(physicalPath);
  852. fileExists = (dfile != NULL);
  853. return write;
  854. }
  855. return false;
  856. }
  857. virtual IFileDescriptor *getFileDescriptor() override
  858. {
  859. if (dfile.get())
  860. return dfile->getFileDescriptor();
  861. Owned<IFileDescriptor> fileDesc = createFileDescriptor();
  862. fileDesc->setTraceName(lfn.get());
  863. StringBuffer dir;
  864. if (localpath.isEmpty()) { // e.g. external file
  865. StringBuffer tail;
  866. IException *e=NULL;
  867. bool iswin=
  868. #ifdef _WIN32
  869. true;
  870. #else
  871. false;
  872. #endif
  873. if (!lfn.getExternalPath(dir,tail,iswin,&e)) {
  874. if (e)
  875. throw e;
  876. return NULL;
  877. }
  878. }
  879. else
  880. splitDirTail(fileDescPath,dir);
  881. fileDesc->setDefaultDir(dir.str());
  882. RemoteFilename rfn;
  883. getPartFilename(rfn,0,0);
  884. fileDesc->setPart(0,rfn);
  885. fileDesc->queryPartDiskMapping(0).defaultCopies = DFD_DefaultCopies;
  886. return fileDesc.getClear();
  887. }
  888. virtual bool getModificationTime(CDateTime &dt) override
  889. {
  890. if (dfile.get())
  891. return dfile->getModificationTime(dt);
  892. Owned<IFile> file = getPartFile(0,0);
  893. if (file.get()) {
  894. CDateTime dt;
  895. return file->getTime(NULL,&dt,NULL);
  896. }
  897. return false;
  898. }
  899. virtual unsigned numParts() override
  900. {
  901. if (dfile.get())
  902. return dfile->numParts();
  903. return 1;
  904. }
  905. virtual unsigned numPartCopies(unsigned partnum) override
  906. {
  907. if (dfile.get())
  908. return dfile->queryPart(partnum).numCopies();
  909. return 1;
  910. }
  911. virtual IFile *getPartFile(unsigned partnum,unsigned copy) override
  912. {
  913. RemoteFilename rfn;
  914. if ((partnum==0)&&(copy==0))
  915. return createIFile(getPartFilename(rfn,partnum,copy));
  916. return NULL;
  917. }
  918. virtual void getDirAndFilename(StringBuffer &dir, StringBuffer &filename) override
  919. {
  920. if (dfile.get())
  921. {
  922. dir.append(dfile->queryDefaultDir());
  923. splitFilename(localpath, nullptr, nullptr, &filename, &filename);
  924. }
  925. else if (localpath.isEmpty())
  926. {
  927. RemoteFilename rfn;
  928. lfn.getExternalFilename(rfn);
  929. StringBuffer fullPath;
  930. rfn.getLocalPath(fullPath);
  931. splitFilename(localpath, nullptr, &dir, &filename, &filename);
  932. }
  933. else
  934. {
  935. dir.append(fileDescPath);
  936. splitFilename(localpath, nullptr, nullptr, &filename, &filename);
  937. }
  938. }
  939. virtual RemoteFilename &getPartFilename(RemoteFilename &rfn, unsigned partnum,unsigned copy) override
  940. {
  941. if (dfile.get())
  942. dfile->queryPart(partnum).getFilename(rfn,copy);
  943. else if (localpath.isEmpty())
  944. lfn.getExternalFilename(rfn);
  945. else
  946. rfn.setRemotePath(localpath);
  947. return rfn;
  948. }
  949. StringBuffer &getPartFilename(StringBuffer &path, unsigned partnum,unsigned copy)
  950. {
  951. RemoteFilename rfn;
  952. if (dfile.get())
  953. dfile->queryPart(partnum).getFilename(rfn,copy);
  954. else if (localpath.isEmpty())
  955. lfn.getExternalFilename(rfn);
  956. else
  957. path.append(localpath);
  958. if (rfn.isLocal())
  959. rfn.getLocalPath(path);
  960. else
  961. rfn.getRemotePath(path);
  962. return path;
  963. }
  964. virtual bool getPartCrc(unsigned partnum, unsigned &crc) override
  965. {
  966. if (dfile.get())
  967. return dfile->queryPart(partnum).getCrc(crc);
  968. Owned<IFile> file = getPartFile(0,0);
  969. if (file.get()) {
  970. crc = file->getCRC();
  971. return true;
  972. }
  973. return false;
  974. }
  975. virtual offset_t getPartFileSize(unsigned partnum) override
  976. {
  977. if (dfile.get())
  978. return dfile->queryPart(partnum).getFileSize(true,false);
  979. Owned<IFile> file = getPartFile(0,0);
  980. if (file.get())
  981. return file->size();
  982. return (offset_t)-1;
  983. }
  984. virtual offset_t getFileSize() override
  985. {
  986. if (dfile.get())
  987. dfile->getFileSize(true,false);
  988. offset_t ret = 0;
  989. unsigned np = numParts();
  990. for (unsigned i = 0;i<np;i++)
  991. ret += getPartFileSize(i);
  992. return ret;
  993. }
  994. virtual bool exists() const override
  995. {
  996. return fileExists;
  997. }
  998. virtual bool isExternal() const override
  999. {
  1000. return lfn.isExternal();
  1001. }
  1002. };
  1003. ILocalOrDistributedFile* createLocalOrDistributedFile(const char *fname,IUserDescriptor *user,bool onlylocal,bool onlydfs, AccessMode accessMode, bool isPrivilegedUser, const StringArray *clusters)
  1004. {
  1005. Owned<CLocalOrDistributedFile> ret = new CLocalOrDistributedFile();
  1006. if (ret->init(fname,user,onlylocal,onlydfs,accessMode,isPrivilegedUser,clusters))
  1007. return ret.getClear();
  1008. return NULL;
  1009. }