ws_dfsclient.cpp 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2022 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <vector>
  14. #include "jliball.hpp"
  15. #include "jflz.hpp"
  16. #include "jsecrets.hpp"
  17. #include "seclib.hpp"
  18. #include "ws_dfs.hpp"
  19. #include "workunit.hpp"
  20. #include "eclwatch_errorlist.hpp" // only for ECLWATCH_FILE_NOT_EXIST
  21. #include "soapmessage.hpp"
  22. #include "dafdesc.hpp"
  23. #include "dadfs.hpp"
  24. #include "dautils.hpp"
  25. #ifndef _CONTAINERIZED
  26. #include "environment.hpp"
  27. #endif
  28. #include "ws_dfsclient.hpp"
  29. namespace wsdfs
  30. {
  31. class CKeepAliveThread : public CSimpleInterface, implements IThreaded
  32. {
  33. CThreaded threaded;
  34. unsigned periodMs;
  35. Semaphore sem;
  36. public:
  37. CKeepAliveThread(unsigned _periodSecs) : threaded("CKeepAliveThread", this), periodMs(_periodSecs * 1000)
  38. {
  39. threaded.start();
  40. }
  41. void stop()
  42. {
  43. sem.signal();
  44. }
  45. virtual void threadmain() override
  46. {
  47. while (true)
  48. {
  49. if (sem.wait(periodMs))
  50. return;
  51. }
  52. }
  53. };
  54. template <class INTERFACE>
  55. class CServiceDistributedFileBase : public CSimpleInterfaceOf<INTERFACE>
  56. {
  57. protected:
  58. Linked<IDFSFile> dfsFile;
  59. StringAttr logicalName;
  60. Owned<IDistributedFile> legacyDFSFile;
  61. Owned<IFileDescriptor> fileDesc;
  62. class CDistributedSuperFileIterator: public CSimpleInterfaceOf<IDistributedSuperFileIterator>
  63. {
  64. Linked<IDFSFile> source;
  65. Owned<IDistributedSuperFile> cur;
  66. std::vector<std::string> owners;
  67. unsigned which = 0;
  68. void setCurrent(unsigned w)
  69. {
  70. VStringBuffer lfn("~remote::%s::%s", source->queryRemoteName(), owners[w].c_str());
  71. Owned<IDFSFile> dfsFile = lookupDFSFile(lfn, source->queryTimeoutSecs(), keepAliveExpiryFrequency, source->queryUserDescriptor());
  72. if (!dfsFile)
  73. throw makeStringExceptionV(0, "Failed to open superfile %s", lfn.str());
  74. if (!dfsFile->numSubFiles())
  75. throwUnexpected();
  76. Owned<IDistributedFile> legacyDFSFile = createLegacyDFSFile(dfsFile);
  77. IDistributedSuperFile *super = legacyDFSFile->querySuperFile();
  78. assertex(super);
  79. cur.set(super);
  80. }
  81. public:
  82. CDistributedSuperFileIterator(IDFSFile *_source, std::vector<std::string> _owners) : source(_source), owners(_owners)
  83. {
  84. }
  85. virtual bool first() override
  86. {
  87. if (owners.empty())
  88. return false;
  89. which = 0;
  90. setCurrent(which);
  91. return true;
  92. }
  93. virtual bool next() override
  94. {
  95. if (which == (owners.size()-1))
  96. {
  97. cur.clear();
  98. return false;
  99. }
  100. ++which;
  101. setCurrent(which);
  102. return true;
  103. }
  104. virtual bool isValid() override
  105. {
  106. return cur != nullptr;
  107. }
  108. virtual IDistributedSuperFile &query() override
  109. {
  110. return *cur;
  111. }
  112. virtual const char *queryName() override
  113. {
  114. if (!isValid())
  115. return nullptr;
  116. return owners[which].c_str();
  117. }
  118. };
  119. public:
  120. CServiceDistributedFileBase(IDFSFile *_dfsFile) : dfsFile(_dfsFile)
  121. {
  122. logicalName.set(dfsFile->queryFileMeta()->queryProp("@name"));
  123. }
  124. virtual unsigned numParts() override { return legacyDFSFile->numParts(); }
  125. virtual IDistributedFilePart &queryPart(unsigned idx) override { return legacyDFSFile->queryPart(idx); }
  126. virtual IDistributedFilePart* getPart(unsigned idx) override { return legacyDFSFile->getPart(idx); }
  127. virtual StringBuffer &getLogicalName(StringBuffer &name) override { return legacyDFSFile->getLogicalName(name); }
  128. virtual const char *queryLogicalName() override { return legacyDFSFile->queryLogicalName(); }
  129. virtual IDistributedFilePartIterator *getIterator(IDFPartFilter *filter=NULL) override { return legacyDFSFile->getIterator(filter); }
  130. virtual IFileDescriptor *getFileDescriptor(const char *clustername=NULL) override { return fileDesc.getLink(); }
  131. virtual const char *queryDefaultDir() override { return legacyDFSFile->queryDefaultDir(); }
  132. virtual const char *queryPartMask() override { return legacyDFSFile->queryPartMask(); }
  133. virtual IPropertyTree &queryAttributes() override { return legacyDFSFile->queryAttributes(); }
  134. virtual bool lockProperties(unsigned timeoutms=INFINITE) override
  135. {
  136. // TODO: implement. But for now only foreign [read] files are supported, where updates and locking have never been implemented.
  137. return true;
  138. }
  139. virtual void unlockProperties(DFTransactionState state=TAS_NONE) override
  140. {
  141. // TODO: implement. But for now only foreign [read] files are supported, where updates and locking have never been implemented.
  142. }
  143. virtual bool getModificationTime(CDateTime &dt) override { return legacyDFSFile->getModificationTime(dt); }
  144. virtual bool getAccessedTime(CDateTime &dt) override { return legacyDFSFile->getAccessedTime(dt); }
  145. virtual unsigned numCopies(unsigned partno) override { return legacyDFSFile->numCopies(partno); }
  146. virtual bool existsPhysicalPartFiles(unsigned short port) override
  147. {
  148. return legacyDFSFile->existsPhysicalPartFiles(port);
  149. }
  150. virtual __int64 getFileSize(bool allowphysical, bool forcephysical) override
  151. {
  152. return legacyDFSFile->getFileSize(allowphysical, forcephysical);
  153. }
  154. virtual __int64 getDiskSize(bool allowphysical, bool forcephysical) override
  155. {
  156. return legacyDFSFile->getDiskSize(allowphysical, forcephysical);
  157. }
  158. virtual bool getFileCheckSum(unsigned &checksum) override { return legacyDFSFile->getFileCheckSum(checksum); }
  159. virtual unsigned getPositionPart(offset_t pos,offset_t &base) override { return legacyDFSFile->getPositionPart(pos,base); }
  160. virtual IDistributedSuperFile *querySuperFile() override
  161. {
  162. return nullptr;
  163. }
  164. virtual IDistributedSuperFileIterator *getOwningSuperFiles(IDistributedFileTransaction *transaction=NULL) override
  165. {
  166. if (transaction)
  167. throwUnexpected();
  168. Owned<IPropertyTreeIterator> iter = dfsFile->queryFileMeta()->getElements("SuperFile/SuperOwner");
  169. std::vector<std::string> superOwners;
  170. StringBuffer pname;
  171. ForEach(*iter)
  172. {
  173. iter->query().getProp("@name",pname.clear());
  174. if (pname.length())
  175. superOwners.push_back(pname.str());
  176. }
  177. return new CDistributedSuperFileIterator(dfsFile, superOwners);
  178. }
  179. virtual bool isCompressed(bool *blocked=NULL) override { return legacyDFSFile->isCompressed(blocked); }
  180. virtual StringBuffer &getClusterName(unsigned clusternum,StringBuffer &name) override { return legacyDFSFile->getClusterName(clusternum,name); }
  181. virtual unsigned getClusterNames(StringArray &clusters) override { return legacyDFSFile->getClusterNames(clusters); } // (use findCluster)
  182. virtual unsigned numClusters() override { return legacyDFSFile->numClusters(); }
  183. virtual unsigned findCluster(const char *clustername) override { return legacyDFSFile->findCluster(clustername); }
  184. virtual ClusterPartDiskMapSpec &queryPartDiskMapping(unsigned clusternum) override { return legacyDFSFile->queryPartDiskMapping(clusternum); }
  185. virtual IGroup *queryClusterGroup(unsigned clusternum) override { return legacyDFSFile->queryClusterGroup(clusternum); }
  186. virtual StringBuffer &getClusterGroupName(unsigned clusternum, StringBuffer &name) override
  187. {
  188. return fileDesc->getClusterGroupName(clusternum, name);
  189. }
  190. virtual StringBuffer &getECL(StringBuffer &buf) override { return legacyDFSFile->getECL(buf); }
  191. virtual bool canModify(StringBuffer &reason) override
  192. {
  193. return false;
  194. }
  195. virtual bool canRemove(StringBuffer &reason,bool ignoresub=false) override
  196. {
  197. return false;
  198. }
  199. virtual bool checkClusterCompatible(IFileDescriptor &fdesc, StringBuffer &err) override { return legacyDFSFile->checkClusterCompatible(fdesc,err); }
  200. virtual bool getFormatCrc(unsigned &crc) override { return legacyDFSFile->getFormatCrc(crc); }
  201. virtual bool getRecordSize(size32_t &rsz) override { return legacyDFSFile->getRecordSize(rsz); }
  202. virtual bool getRecordLayout(MemoryBuffer &layout, const char *attrname) override { return legacyDFSFile->getRecordLayout(layout,attrname); }
  203. virtual StringBuffer &getColumnMapping(StringBuffer &mapping) override { return legacyDFSFile->getColumnMapping(mapping); }
  204. virtual bool isRestrictedAccess() override { return legacyDFSFile->isRestrictedAccess(); }
  205. virtual unsigned setDefaultTimeout(unsigned timems) override { return legacyDFSFile->setDefaultTimeout(timems); }
  206. virtual void validate() override { legacyDFSFile->validate(); }
  207. virtual IPropertyTree *queryHistory() const override { return legacyDFSFile->queryHistory(); }
  208. virtual bool isExternal() const override { return false; }
  209. virtual bool getSkewInfo(unsigned &maxSkew, unsigned &minSkew, unsigned &maxSkewPart, unsigned &minSkewPart, bool calculateIfMissing) override { return legacyDFSFile->getSkewInfo(maxSkew, minSkew, maxSkewPart, minSkewPart, calculateIfMissing); }
  210. virtual int getExpire() override { return legacyDFSFile->getExpire(); }
  211. virtual void getCost(const char * cluster, double & atRestCost, double & accessCost) override { legacyDFSFile->getCost(cluster, atRestCost, accessCost); }
  212. // setters (change file meta data)
  213. virtual void setPreferredClusters(const char *clusters) override { legacyDFSFile->setPreferredClusters(clusters); }
  214. virtual void setSingleClusterOnly() override { legacyDFSFile->setSingleClusterOnly(); }
  215. virtual void addCluster(const char *clustername,const ClusterPartDiskMapSpec &mspec) override { legacyDFSFile->addCluster(clustername, mspec); }
  216. virtual bool removeCluster(const char *clustername) override { return legacyDFSFile->removeCluster(clustername); }
  217. virtual void updatePartDiskMapping(const char *clustername,const ClusterPartDiskMapSpec &spec) override { legacyDFSFile->updatePartDiskMapping(clustername, spec); }
  218. /* NB/TBD: these modifications are only effecting this instance, the changes are not propagated to Dali
  219. * This is the same behaviour when foreign files are used, but will need addressing in future.
  220. */
  221. virtual void setModificationTime(const CDateTime &dt) override
  222. {
  223. legacyDFSFile->setModificationTime(dt);
  224. }
  225. virtual void setModified() override
  226. {
  227. legacyDFSFile->setModified();
  228. }
  229. virtual void setAccessedTime(const CDateTime &dt) override
  230. {
  231. legacyDFSFile->setAccessedTime(dt);
  232. }
  233. virtual void setAccessed() override
  234. {
  235. legacyDFSFile->setAccessed();
  236. }
  237. virtual void addAttrValue(const char *attr, unsigned __int64 value) override
  238. {
  239. legacyDFSFile->addAttrValue(attr, value);
  240. }
  241. virtual void setExpire(int expireDays) override
  242. {
  243. legacyDFSFile->setExpire(expireDays);
  244. }
  245. virtual void setECL(const char *ecl) override
  246. {
  247. legacyDFSFile->setECL(ecl);
  248. }
  249. virtual void resetHistory() override
  250. {
  251. legacyDFSFile->resetHistory();
  252. }
  253. virtual void setProtect(const char *callerid, bool protect=true, unsigned timeoutms=INFINITE) override
  254. {
  255. legacyDFSFile->setProtect(callerid, protect, timeoutms);
  256. }
  257. virtual void setColumnMapping(const char *mapping) override
  258. {
  259. legacyDFSFile->setColumnMapping(mapping);
  260. }
  261. virtual void setRestrictedAccess(bool restricted) override
  262. {
  263. legacyDFSFile->setRestrictedAccess(restricted);
  264. }
  265. virtual bool renamePhysicalPartFiles(const char *newlfn,const char *cluster=NULL,IMultiException *exceptions=NULL,const char *newbasedir=NULL) override
  266. {
  267. UNIMPLEMENTED_X("CServiceDistributedFileBase::renamePhysicalPartFiles");
  268. }
  269. virtual void rename(const char *logicalname,IUserDescriptor *user) override
  270. {
  271. UNIMPLEMENTED_X("CServiceDistributedFileBase::rename");
  272. }
  273. virtual void attach(const char *logicalname,IUserDescriptor *user) override
  274. {
  275. UNIMPLEMENTED_X("CServiceDistributedFileBase::rename");
  276. }
  277. virtual void detach(unsigned timeoutms=INFINITE, ICodeContext *ctx=NULL) override
  278. {
  279. UNIMPLEMENTED_X("CServiceDistributedFileBase::detach");
  280. }
  281. virtual void enqueueReplicate() override
  282. {
  283. UNIMPLEMENTED_X("CServiceDistributedFileBase::enqueueReplicate");
  284. }
  285. };
  286. class CServiceDistributedFile : public CServiceDistributedFileBase<IDistributedFile>
  287. {
  288. typedef CServiceDistributedFileBase<IDistributedFile> PARENT;
  289. public:
  290. CServiceDistributedFile(IDFSFile *_dfsFile) : PARENT(_dfsFile)
  291. {
  292. IPropertyTree *file = dfsFile->queryFileMeta()->queryPropTree("File");
  293. const char *remotePlaneName = file->queryProp("@group");
  294. VStringBuffer planeXPath("planes[@name=\"%s\"]", remotePlaneName);
  295. IPropertyTree *filePlane = dfsFile->queryCommonMeta()->queryPropTree(planeXPath);
  296. assertex(filePlane);
  297. const char *remoteName = dfsFile->queryRemoteName(); // NB: null if local
  298. if (!isEmptyString(remoteName))
  299. {
  300. // Path translation is necessary, because the local plane will not necessarily have the same
  301. // prefix. In particular, both a local and remote plane may want to use the same prefix/mount.
  302. // So, the local plane will be defined with a unique prefix locally.
  303. // Files backed by URL's or hostGroups will be access directly, are not mounted, and do not require
  304. // this translation.
  305. const char *filePlanePrefix = filePlane->queryProp("@prefix");
  306. if (isAbsolutePath(filePlanePrefix) && !filePlane->hasProp("@hosts")) // otherwise assume url
  307. {
  308. #ifndef _CONTAINERIZED
  309. throw makeStringException(0, "Bare metal does not support remote file access to planes without hosts");
  310. #endif
  311. // A external plane within another environment backed by a PVC, will need a pre-existing
  312. // corresponding plane and PVC in the local environment.
  313. // The local plane will be associated with the remote environment, via a storage/remote mapping.
  314. Owned<IPropertyTree> remoteStorage = getRemoteStorage(remoteName);
  315. if (!remoteStorage)
  316. throw makeStringExceptionV(0, "Remote storage '%s' not found", remoteName);
  317. VStringBuffer remotePlaneXPath("planes[@remote='%s']/@local", remotePlaneName);
  318. const char *localMappedPlaneName = remoteStorage->queryProp(remotePlaneXPath);
  319. if (isEmptyString(localMappedPlaneName))
  320. throw makeStringExceptionV(0, "Remote plane '%s' not found in remote storage definition '%s'", remotePlaneName, remoteName);
  321. Owned<IStoragePlane> localPlane = getRemoteStoragePlane(localMappedPlaneName, false);
  322. if (!localPlane)
  323. throw makeStringExceptionV(0, "Local plane not found, mapped to by remote storage '%s' (%s->%s)", remoteName, remotePlaneName, localMappedPlaneName);
  324. DBGLOG("Remote logical file '%s' using remote storage '%s', mapping remote plane '%s' to local plane '%s'", logicalName.str(), remoteName, remotePlaneName, localMappedPlaneName);
  325. StringBuffer filePlanePrefix;
  326. filePlane->getProp("@prefix", filePlanePrefix);
  327. if (filePlane->hasProp("@subPath"))
  328. filePlanePrefix.append('/').append(filePlane->queryProp("@subPath"));
  329. // the plane prefix should match the base of file's base directory
  330. // Q: what if the plane has been redefined since the files were created?
  331. VStringBuffer clusterXPath("Cluster[@name=\"%s\"]", remotePlaneName);
  332. IPropertyTree *cluster = file->queryPropTree(clusterXPath);
  333. assertex(cluster);
  334. const char *clusterDir = cluster->queryProp("@defaultBaseDir");
  335. assertex(startsWith(clusterDir, filePlanePrefix));
  336. clusterDir += filePlanePrefix.length();
  337. StringBuffer newPath(localPlane->queryPrefix());
  338. if (strlen(clusterDir))
  339. newPath.append(clusterDir); // add remaining tail of path
  340. cluster->setProp("@defaultBaseDir", newPath.str());
  341. const char *dir = file->queryProp("@directory");
  342. assertex(startsWith(dir, filePlanePrefix));
  343. dir += filePlanePrefix.length();
  344. newPath.clear().append(localPlane->queryPrefix());
  345. if (strlen(dir))
  346. newPath.append(dir); // add remaining tail of path
  347. DBGLOG("Remapping logical file directory to '%s'", newPath.str());
  348. file->setProp("@directory", newPath.str());
  349. }
  350. }
  351. fileDesc.setown(deserializeFileDescriptorTree(file));
  352. if (fileDesc)
  353. fileDesc->setTraceName(logicalName);
  354. legacyDFSFile.setown(queryDistributedFileDirectory().createNew(fileDesc, logicalName));
  355. }
  356. };
  357. class CServiceSuperDistributedFile : public CServiceDistributedFileBase<IDistributedSuperFile>
  358. {
  359. typedef CServiceDistributedFileBase<IDistributedSuperFile> PARENT;
  360. Owned<IDistributedSuperFile> legacyDFSSuperFile;
  361. public:
  362. CServiceSuperDistributedFile(IDFSFile *_dfsFile) : PARENT(_dfsFile)
  363. {
  364. IArrayOf<IDistributedFile> subFiles;
  365. unsigned subs = dfsFile->numSubFiles();
  366. for (unsigned s=0; s<subs; s++)
  367. {
  368. Owned<IDFSFile> subFile = dfsFile->getSubFile(s);
  369. Owned<IDistributedFile> legacyDFSFile = createLegacyDFSFile(subFile);
  370. subFiles.append(*legacyDFSFile.getClear());
  371. }
  372. legacyDFSSuperFile.setown(queryDistributedFileDirectory().createNewSuperFile(dfsFile->queryFileMeta()->queryPropTree("SuperFile"), logicalName, &subFiles));
  373. legacyDFSFile.set(legacyDFSSuperFile);
  374. fileDesc.setown(legacyDFSSuperFile->getFileDescriptor());
  375. }
  376. // IDistributedFile overrides
  377. virtual IDistributedSuperFile *querySuperFile() override
  378. {
  379. return this;
  380. }
  381. // IDistributedSuperFile overrides
  382. virtual IDistributedFile &querySubFile(unsigned idx,bool sub) override
  383. {
  384. return legacyDFSSuperFile->querySubFile(idx, sub);
  385. }
  386. virtual IDistributedFile *querySubFileNamed(const char *name, bool sub) override
  387. {
  388. return legacyDFSSuperFile->querySubFileNamed(name, sub);
  389. }
  390. virtual IDistributedFile *getSubFile(unsigned idx,bool sub) override
  391. {
  392. return legacyDFSSuperFile->getSubFile(idx, sub);
  393. }
  394. virtual unsigned numSubFiles(bool sub) override
  395. {
  396. return legacyDFSSuperFile->numSubFiles(sub);
  397. }
  398. virtual bool isInterleaved() override
  399. {
  400. return legacyDFSSuperFile->isInterleaved();
  401. }
  402. virtual IDistributedFile *querySubPart(unsigned partidx,unsigned &subfileidx) override
  403. {
  404. return legacyDFSSuperFile->querySubPart(partidx, subfileidx);
  405. }
  406. virtual unsigned getPositionPart(offset_t pos, offset_t &base) override
  407. {
  408. return legacyDFSSuperFile->getPositionPart(pos, base);
  409. }
  410. virtual IDistributedFileIterator *getSubFileIterator(bool supersub=false) override
  411. {
  412. return legacyDFSSuperFile->getSubFileIterator(supersub);
  413. }
  414. virtual void validate() override
  415. {
  416. if (!legacyDFSSuperFile->existsPhysicalPartFiles(0))
  417. {
  418. const char * logicalName = queryLogicalName();
  419. throw MakeStringException(-1, "Some physical parts do not exists, for logical file : %s",(isEmptyString(logicalName) ? "[unattached]" : logicalName));
  420. }
  421. }
  422. // IDistributedSuperFile
  423. virtual void addSubFile(const char *subfile, bool before=false, const char *other=NULL, bool addcontents=false, IDistributedFileTransaction *transaction=NULL) override
  424. {
  425. UNIMPLEMENTED_X("CServiceSuperDistributedFile::addSubFile");
  426. }
  427. virtual bool removeSubFile(const char *subfile, bool remsub, bool remcontents=false, IDistributedFileTransaction *transaction=NULL) override
  428. {
  429. UNIMPLEMENTED_X("CServiceSuperDistributedFile::removeSubFile");
  430. }
  431. virtual bool removeOwnedSubFiles(bool remsub, IDistributedFileTransaction *transaction=NULL) override
  432. {
  433. UNIMPLEMENTED_X("CServiceSuperDistributedFile::removeOwnedSubFiles");
  434. }
  435. virtual bool swapSuperFile( IDistributedSuperFile *_file, IDistributedFileTransaction *transaction) override
  436. {
  437. UNIMPLEMENTED_X("CServiceSuperDistributedFile::swapSuperFile");
  438. }
  439. };
  440. static IDFSFile *createDFSFile(IPropertyTree *commonMeta, IPropertyTree *fileMeta, const char *remoteName, unsigned timeoutSecs, IUserDescriptor *userDesc);
  441. class CDFSFile : public CSimpleInterfaceOf<IDFSFile>
  442. {
  443. Linked<IPropertyTree> commonMeta; // e.g. share info between IFDSFiles, e.g. common plane info between subfiles
  444. Linked<IPropertyTree> fileMeta;
  445. unsigned __int64 lockId;
  446. std::vector<Owned<IDFSFile>> subFiles;
  447. StringAttr remoteName;
  448. unsigned timeoutSecs;
  449. Linked<IUserDescriptor> userDesc;
  450. public:
  451. CDFSFile(IPropertyTree *_commonMeta, IPropertyTree *_fileMeta, const char *_remoteName, unsigned _timeoutSecs, IUserDescriptor *_userDesc)
  452. : commonMeta(_commonMeta), fileMeta(_fileMeta), remoteName(_remoteName), timeoutSecs(_timeoutSecs), userDesc(_userDesc)
  453. {
  454. lockId = fileMeta->getPropInt64("@lockId");
  455. if (fileMeta->getPropBool("@isSuper"))
  456. {
  457. Owned<IPropertyTreeIterator> iter = fileMeta->getElements("FileMeta");
  458. ForEach(*iter)
  459. {
  460. IPropertyTree &subMeta = iter->query();
  461. subFiles.push_back(createDFSFile(commonMeta, &subMeta, remoteName, timeoutSecs, userDesc));
  462. }
  463. }
  464. }
  465. virtual IPropertyTree *queryFileMeta() const override
  466. {
  467. return fileMeta;
  468. }
  469. virtual IPropertyTree *queryCommonMeta() const override
  470. {
  471. return commonMeta;
  472. }
  473. virtual unsigned __int64 getLockId() const override
  474. {
  475. return lockId;
  476. }
  477. virtual unsigned numSubFiles() const override
  478. {
  479. return (unsigned)subFiles.size();
  480. }
  481. virtual IDFSFile *getSubFile(unsigned idx) const override
  482. {
  483. return LINK(subFiles[idx]);
  484. }
  485. virtual const char *queryRemoteName() const override
  486. {
  487. return remoteName;
  488. }
  489. virtual IUserDescriptor *queryUserDescriptor() const override
  490. {
  491. return userDesc.getLink();
  492. }
  493. virtual unsigned queryTimeoutSecs() const override
  494. {
  495. return timeoutSecs;
  496. }
  497. };
  498. static IDFSFile *createDFSFile(IPropertyTree *commonMeta, IPropertyTree *fileMeta, const char *remoteName, unsigned timeoutSecs, IUserDescriptor *userDesc)
  499. {
  500. return new CDFSFile(commonMeta, fileMeta, remoteName, timeoutSecs, userDesc);
  501. }
  502. IClientWsDfs *getDfsClient(const char *serviceUrl, IUserDescriptor *userDesc)
  503. {
  504. // JCSMORE - can I reuse these, are they thread safe (AFishbeck?)
  505. VStringBuffer dfsUrl("%s/WsDfs", serviceUrl);
  506. Owned<IClientWsDfs> dfsClient = createWsDfsClient();
  507. dfsClient->addServiceUrl(dfsUrl);
  508. StringBuffer user, token;
  509. userDesc->getUserName(user),
  510. userDesc->getPassword(token);
  511. dfsClient->setUsernameToken(user, token, "");
  512. return dfsClient.getClear();
  513. }
  514. static CriticalSection serviceLeaseMapCS;
  515. static std::unordered_map<std::string, unsigned __int64> serviceLeaseMap;
  516. unsigned __int64 ensureClientLease(const char *service, IUserDescriptor *userDesc)
  517. {
  518. CriticalBlock block(serviceLeaseMapCS);
  519. auto r = serviceLeaseMap.find(service);
  520. if (r != serviceLeaseMap.end())
  521. return r->second;
  522. Owned<IClientWsDfs> dfsClient = getDfsClient(service, userDesc);
  523. Owned<IClientLeaseResponse> leaseResp;
  524. unsigned timeoutSecs = 60;
  525. CTimeMon tm(timeoutSecs*1000);
  526. while (true)
  527. {
  528. try
  529. {
  530. Owned<IClientLeaseRequest> leaseReq = dfsClient->createGetLeaseRequest();
  531. leaseReq->setKeepAliveExpiryFrequency(keepAliveExpiryFrequency);
  532. leaseResp.setown(dfsClient->GetLease(leaseReq));
  533. unsigned __int64 leaseId = leaseResp->getLeaseId();
  534. serviceLeaseMap[service] = leaseId;
  535. return leaseId;
  536. }
  537. catch (IException *e)
  538. {
  539. /* NB: there should really be a different IException class and a specific error code
  540. * The server knows it's an unsupported method.
  541. */
  542. if (SOAP_SERVER_ERROR != e->errorCode())
  543. throw;
  544. e->Release();
  545. }
  546. if (tm.timedout())
  547. throw makeStringExceptionV(0, "GetLease timed out: timeoutSecs=%u", timeoutSecs);
  548. Sleep(5000); // sanity sleep
  549. }
  550. }
  551. #ifndef _CONTAINERIZED
  552. static std::vector<std::string> dfsServiceUrls;
  553. static CriticalSection dfsServiceUrlCrit;
  554. static std::atomic<unsigned> currentDfsServiceUrl{0};
  555. static bool dfsServiceUrlsDiscovered = false;
  556. #endif
  557. IDFSFile *lookupDFSFile(const char *logicalName, unsigned timeoutSecs, unsigned keepAliveExpiryFrequency, IUserDescriptor *userDesc)
  558. {
  559. CDfsLogicalFileName lfn;
  560. lfn.set(logicalName);
  561. StringBuffer remoteName, remoteLogicalFileName;
  562. StringBuffer serviceUrl;
  563. if (lfn.isRemote())
  564. {
  565. verifyex(lfn.getRemoteSpec(remoteName, remoteLogicalFileName));
  566. if (!strieq(remoteName, "local")) // "local" is a reserve remote name, used to mean the local environment.
  567. {
  568. Owned<IPropertyTree> remoteStorage = getRemoteStorage(remoteName.str());
  569. if (!remoteStorage)
  570. throw makeStringExceptionV(0, "Remote storage '%s' not found", remoteName.str());
  571. serviceUrl.set(remoteStorage->queryProp("@service"));
  572. logicalName = remoteLogicalFileName;
  573. }
  574. }
  575. if (!serviceUrl.length())
  576. {
  577. #ifdef _CONTAINERIZED
  578. // NB: only expected to be here if experimental option #option('dfsesp-localfiles', true); is in use.
  579. // This finds and uses local dfs service for local read lookukup.
  580. Owned<IPropertyTreeIterator> eclWatchServices = getGlobalConfigSP()->getElements("services[@type='dfs']");
  581. if (!eclWatchServices->first())
  582. throw makeStringException(-1, "Dfs service not defined in esp services");
  583. const IPropertyTree &eclWatch = eclWatchServices->query();
  584. StringBuffer eclWatchName;
  585. eclWatch.getProp("@name", eclWatchName);
  586. auto result = getExternalService(eclWatchName);
  587. if (result.first.empty())
  588. throw makeStringExceptionV(-1, "dfs '%s': service not found", eclWatchName.str());
  589. if (0 == result.second)
  590. throw makeStringExceptionV(-1, "dfs '%s': service port not defined", eclWatchName.str());
  591. const char *protocol = eclWatch.getPropBool("@tls") ? "https" : "http";
  592. serviceUrl.appendf("%s://%s:%u", protocol, result.first.c_str(), result.second);
  593. #else
  594. {
  595. CriticalBlock b(dfsServiceUrlCrit);
  596. if (!dfsServiceUrlsDiscovered)
  597. {
  598. dfsServiceUrlsDiscovered = true;
  599. getAccessibleServiceURLList("WsSMC", dfsServiceUrls);
  600. if (0 == dfsServiceUrls.size())
  601. throw makeStringException(-1, "Could not find any DFS services in the target HPCC configuration.");
  602. }
  603. }
  604. serviceUrl.append(dfsServiceUrls[currentDfsServiceUrl++].c_str());
  605. logicalName = remoteLogicalFileName;
  606. remoteName.clear(); // local
  607. #endif
  608. }
  609. DBGLOG("Looking up file '%s' on '%s'", logicalName, serviceUrl.str());
  610. Owned<IClientWsDfs> dfsClient = getDfsClient(serviceUrl, userDesc);
  611. Owned<IClientDFSFileLookupResponse> dfsResp;
  612. CTimeMon tm(timeoutSecs*1000); // NB: this timeout loop is to cater for *a* esp disappearing.
  613. while (true)
  614. {
  615. try
  616. {
  617. Owned<IClientDFSFileLookupRequest> dfsReq = dfsClient->createDFSFileLookupRequest();
  618. dfsReq->setName(logicalName);
  619. unsigned remaining;
  620. if (tm.timedout(&remaining))
  621. break;
  622. dfsReq->setRequestTimeout(remaining/1000);
  623. unsigned __int64 clientLeaseId = ensureClientLease(serviceUrl, userDesc);
  624. dfsReq->setLeaseId(clientLeaseId);
  625. dfsResp.setown(dfsClient->DFSFileLookup(dfsReq));
  626. const IMultiException *excep = &dfsResp->getExceptions(); // NB: warning despite getXX name, this does not Link
  627. if (excep->ordinality() > 0)
  628. throw LINK((IMultiException *)excep); // NB - const IException.. not caught in general..
  629. const char *base64Resp = dfsResp->getMeta();
  630. MemoryBuffer compressedRespMb;
  631. JBASE64_Decode(base64Resp, compressedRespMb);
  632. MemoryBuffer decompressedRespMb;
  633. fastLZDecompressToBuffer(decompressedRespMb, compressedRespMb);
  634. Owned<IPropertyTree> meta = createPTree(decompressedRespMb);
  635. IPropertyTree *fileMeta = meta->queryPropTree("FileMeta");
  636. if (!fileMeta) // file not found
  637. return nullptr;
  638. // remoteName empty if local
  639. return createDFSFile(meta, fileMeta, remoteName.length()?remoteName.str():nullptr, timeoutSecs, userDesc);
  640. }
  641. catch (IException *e)
  642. {
  643. /* NB: there should really be a different IException class and a specific error code
  644. * The server knows it's an unsupported method.
  645. */
  646. if (SOAP_SERVER_ERROR != e->errorCode())
  647. throw;
  648. e->Release();
  649. }
  650. if (tm.timedout())
  651. throw makeStringExceptionV(0, "DFSFileLookup timed out: file=%s, timeoutSecs=%u", logicalName, timeoutSecs);
  652. Sleep(5000); // sanity sleep
  653. }
  654. return nullptr; // should never be able to reach here, but keeps compiler happy
  655. }
  656. IDistributedFile *createLegacyDFSFile(IDFSFile *dfsFile)
  657. {
  658. if (dfsFile->queryFileMeta()->getPropBool("@isSuper"))
  659. return new CServiceSuperDistributedFile(dfsFile);
  660. else
  661. return new CServiceDistributedFile(dfsFile);
  662. }
  663. IDistributedFile *lookupLegacyDFSFile(const char *logicalName, unsigned timeoutSecs, unsigned keepAliveExpiryFrequency, IUserDescriptor *userDesc)
  664. {
  665. Owned<IDFSFile> dfsFile = lookupDFSFile(logicalName, timeoutSecs, keepAliveExpiryFrequency, userDesc);
  666. if (!dfsFile)
  667. return nullptr;
  668. return createLegacyDFSFile(dfsFile);
  669. }
  670. } // namespace wsdfs