/*############################################################################## HPCC SYSTEMS software Copyright (C) 2022 HPCC Systems®. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ############################################################################## */ #include #include "jliball.hpp" #include "jflz.hpp" #include "jsecrets.hpp" #include "seclib.hpp" #include "ws_dfs.hpp" #include "workunit.hpp" #include "eclwatch_errorlist.hpp" // only for ECLWATCH_FILE_NOT_EXIST #include "soapmessage.hpp" #include "dafdesc.hpp" #include "dadfs.hpp" #include "dautils.hpp" #ifndef _CONTAINERIZED #include "environment.hpp" #endif #include "ws_dfsclient.hpp" namespace wsdfs { class CKeepAliveThread : public CSimpleInterface, implements IThreaded { CThreaded threaded; unsigned periodMs; Semaphore sem; public: CKeepAliveThread(unsigned _periodSecs) : threaded("CKeepAliveThread", this), periodMs(_periodSecs * 1000) { threaded.start(); } void stop() { sem.signal(); } virtual void threadmain() override { while (true) { if (sem.wait(periodMs)) return; } } }; template class CServiceDistributedFileBase : public CSimpleInterfaceOf { protected: Linked dfsFile; StringAttr logicalName; Owned legacyDFSFile; Owned fileDesc; class CDistributedSuperFileIterator: public CSimpleInterfaceOf { Linked source; Owned cur; std::vector owners; unsigned which = 0; void setCurrent(unsigned w) { VStringBuffer lfn("~remote::%s::%s", source->queryRemoteName(), owners[w].c_str()); Owned dfsFile = lookupDFSFile(lfn, source->queryTimeoutSecs(), keepAliveExpiryFrequency, source->queryUserDescriptor()); if (!dfsFile) throw makeStringExceptionV(0, "Failed to open superfile %s", lfn.str()); if (!dfsFile->numSubFiles()) throwUnexpected(); Owned legacyDFSFile = createLegacyDFSFile(dfsFile); IDistributedSuperFile *super = legacyDFSFile->querySuperFile(); assertex(super); cur.set(super); } public: CDistributedSuperFileIterator(IDFSFile *_source, std::vector _owners) : source(_source), owners(_owners) { } virtual bool first() override { if (owners.empty()) return false; which = 0; setCurrent(which); return true; } virtual bool next() override { if (which == (owners.size()-1)) { cur.clear(); return false; } ++which; setCurrent(which); return true; } virtual bool isValid() override { return cur != nullptr; } virtual IDistributedSuperFile &query() override { return *cur; } virtual const char *queryName() override { if (!isValid()) return nullptr; return owners[which].c_str(); } }; public: CServiceDistributedFileBase(IDFSFile *_dfsFile) : dfsFile(_dfsFile) { logicalName.set(dfsFile->queryFileMeta()->queryProp("@name")); } virtual unsigned numParts() override { return legacyDFSFile->numParts(); } virtual IDistributedFilePart &queryPart(unsigned idx) override { return legacyDFSFile->queryPart(idx); } virtual IDistributedFilePart* getPart(unsigned idx) override { return legacyDFSFile->getPart(idx); } virtual StringBuffer &getLogicalName(StringBuffer &name) override { return legacyDFSFile->getLogicalName(name); } virtual const char *queryLogicalName() override { return legacyDFSFile->queryLogicalName(); } virtual IDistributedFilePartIterator *getIterator(IDFPartFilter *filter=NULL) override { return legacyDFSFile->getIterator(filter); } virtual IFileDescriptor *getFileDescriptor(const char *clustername=NULL) override { return fileDesc.getLink(); } virtual const char *queryDefaultDir() override { return legacyDFSFile->queryDefaultDir(); } virtual const char *queryPartMask() override { return legacyDFSFile->queryPartMask(); } virtual IPropertyTree &queryAttributes() override { return legacyDFSFile->queryAttributes(); } virtual bool lockProperties(unsigned timeoutms=INFINITE) override { // TODO: implement. But for now only foreign [read] files are supported, where updates and locking have never been implemented. return true; } virtual void unlockProperties(DFTransactionState state=TAS_NONE) override { // TODO: implement. But for now only foreign [read] files are supported, where updates and locking have never been implemented. } virtual bool getModificationTime(CDateTime &dt) override { return legacyDFSFile->getModificationTime(dt); } virtual bool getAccessedTime(CDateTime &dt) override { return legacyDFSFile->getAccessedTime(dt); } virtual unsigned numCopies(unsigned partno) override { return legacyDFSFile->numCopies(partno); } virtual bool existsPhysicalPartFiles(unsigned short port) override { return legacyDFSFile->existsPhysicalPartFiles(port); } virtual __int64 getFileSize(bool allowphysical, bool forcephysical) override { return legacyDFSFile->getFileSize(allowphysical, forcephysical); } virtual __int64 getDiskSize(bool allowphysical, bool forcephysical) override { return legacyDFSFile->getDiskSize(allowphysical, forcephysical); } virtual bool getFileCheckSum(unsigned &checksum) override { return legacyDFSFile->getFileCheckSum(checksum); } virtual unsigned getPositionPart(offset_t pos,offset_t &base) override { return legacyDFSFile->getPositionPart(pos,base); } virtual IDistributedSuperFile *querySuperFile() override { return nullptr; } virtual IDistributedSuperFileIterator *getOwningSuperFiles(IDistributedFileTransaction *transaction=NULL) override { if (transaction) throwUnexpected(); Owned iter = dfsFile->queryFileMeta()->getElements("SuperFile/SuperOwner"); std::vector superOwners; StringBuffer pname; ForEach(*iter) { iter->query().getProp("@name",pname.clear()); if (pname.length()) superOwners.push_back(pname.str()); } return new CDistributedSuperFileIterator(dfsFile, superOwners); } virtual bool isCompressed(bool *blocked=NULL) override { return legacyDFSFile->isCompressed(blocked); } virtual StringBuffer &getClusterName(unsigned clusternum,StringBuffer &name) override { return legacyDFSFile->getClusterName(clusternum,name); } virtual unsigned getClusterNames(StringArray &clusters) override { return legacyDFSFile->getClusterNames(clusters); } // (use findCluster) virtual unsigned numClusters() override { return legacyDFSFile->numClusters(); } virtual unsigned findCluster(const char *clustername) override { return legacyDFSFile->findCluster(clustername); } virtual ClusterPartDiskMapSpec &queryPartDiskMapping(unsigned clusternum) override { return legacyDFSFile->queryPartDiskMapping(clusternum); } virtual IGroup *queryClusterGroup(unsigned clusternum) override { return legacyDFSFile->queryClusterGroup(clusternum); } virtual StringBuffer &getClusterGroupName(unsigned clusternum, StringBuffer &name) override { return fileDesc->getClusterGroupName(clusternum, name); } virtual StringBuffer &getECL(StringBuffer &buf) override { return legacyDFSFile->getECL(buf); } virtual bool canModify(StringBuffer &reason) override { return false; } virtual bool canRemove(StringBuffer &reason,bool ignoresub=false) override { return false; } virtual bool checkClusterCompatible(IFileDescriptor &fdesc, StringBuffer &err) override { return legacyDFSFile->checkClusterCompatible(fdesc,err); } virtual bool getFormatCrc(unsigned &crc) override { return legacyDFSFile->getFormatCrc(crc); } virtual bool getRecordSize(size32_t &rsz) override { return legacyDFSFile->getRecordSize(rsz); } virtual bool getRecordLayout(MemoryBuffer &layout, const char *attrname) override { return legacyDFSFile->getRecordLayout(layout,attrname); } virtual StringBuffer &getColumnMapping(StringBuffer &mapping) override { return legacyDFSFile->getColumnMapping(mapping); } virtual bool isRestrictedAccess() override { return legacyDFSFile->isRestrictedAccess(); } virtual unsigned setDefaultTimeout(unsigned timems) override { return legacyDFSFile->setDefaultTimeout(timems); } virtual void validate() override { legacyDFSFile->validate(); } virtual IPropertyTree *queryHistory() const override { return legacyDFSFile->queryHistory(); } virtual bool isExternal() const override { return false; } virtual bool getSkewInfo(unsigned &maxSkew, unsigned &minSkew, unsigned &maxSkewPart, unsigned &minSkewPart, bool calculateIfMissing) override { return legacyDFSFile->getSkewInfo(maxSkew, minSkew, maxSkewPart, minSkewPart, calculateIfMissing); } virtual int getExpire() override { return legacyDFSFile->getExpire(); } virtual void getCost(const char * cluster, double & atRestCost, double & accessCost) override { legacyDFSFile->getCost(cluster, atRestCost, accessCost); } // setters (change file meta data) virtual void setPreferredClusters(const char *clusters) override { legacyDFSFile->setPreferredClusters(clusters); } virtual void setSingleClusterOnly() override { legacyDFSFile->setSingleClusterOnly(); } virtual void addCluster(const char *clustername,const ClusterPartDiskMapSpec &mspec) override { legacyDFSFile->addCluster(clustername, mspec); } virtual bool removeCluster(const char *clustername) override { return legacyDFSFile->removeCluster(clustername); } virtual void updatePartDiskMapping(const char *clustername,const ClusterPartDiskMapSpec &spec) override { legacyDFSFile->updatePartDiskMapping(clustername, spec); } /* NB/TBD: these modifications are only effecting this instance, the changes are not propagated to Dali * This is the same behaviour when foreign files are used, but will need addressing in future. */ virtual void setModificationTime(const CDateTime &dt) override { legacyDFSFile->setModificationTime(dt); } virtual void setModified() override { legacyDFSFile->setModified(); } virtual void setAccessedTime(const CDateTime &dt) override { legacyDFSFile->setAccessedTime(dt); } virtual void setAccessed() override { legacyDFSFile->setAccessed(); } virtual void addAttrValue(const char *attr, unsigned __int64 value) override { legacyDFSFile->addAttrValue(attr, value); } virtual void setExpire(int expireDays) override { legacyDFSFile->setExpire(expireDays); } virtual void setECL(const char *ecl) override { legacyDFSFile->setECL(ecl); } virtual void resetHistory() override { legacyDFSFile->resetHistory(); } virtual void setProtect(const char *callerid, bool protect=true, unsigned timeoutms=INFINITE) override { legacyDFSFile->setProtect(callerid, protect, timeoutms); } virtual void setColumnMapping(const char *mapping) override { legacyDFSFile->setColumnMapping(mapping); } virtual void setRestrictedAccess(bool restricted) override { legacyDFSFile->setRestrictedAccess(restricted); } virtual bool renamePhysicalPartFiles(const char *newlfn,const char *cluster=NULL,IMultiException *exceptions=NULL,const char *newbasedir=NULL) override { UNIMPLEMENTED_X("CServiceDistributedFileBase::renamePhysicalPartFiles"); } virtual void rename(const char *logicalname,IUserDescriptor *user) override { UNIMPLEMENTED_X("CServiceDistributedFileBase::rename"); } virtual void attach(const char *logicalname,IUserDescriptor *user) override { UNIMPLEMENTED_X("CServiceDistributedFileBase::rename"); } virtual void detach(unsigned timeoutms=INFINITE, ICodeContext *ctx=NULL) override { UNIMPLEMENTED_X("CServiceDistributedFileBase::detach"); } virtual void enqueueReplicate() override { UNIMPLEMENTED_X("CServiceDistributedFileBase::enqueueReplicate"); } }; class CServiceDistributedFile : public CServiceDistributedFileBase { typedef CServiceDistributedFileBase PARENT; public: CServiceDistributedFile(IDFSFile *_dfsFile) : PARENT(_dfsFile) { IPropertyTree *file = dfsFile->queryFileMeta()->queryPropTree("File"); const char *remotePlaneName = file->queryProp("@group"); VStringBuffer planeXPath("planes[@name=\"%s\"]", remotePlaneName); IPropertyTree *filePlane = dfsFile->queryCommonMeta()->queryPropTree(planeXPath); assertex(filePlane); const char *remoteName = dfsFile->queryRemoteName(); // NB: null if local if (!isEmptyString(remoteName)) { // Path translation is necessary, because the local plane will not necessarily have the same // prefix. In particular, both a local and remote plane may want to use the same prefix/mount. // So, the local plane will be defined with a unique prefix locally. // Files backed by URL's or hostGroups will be access directly, are not mounted, and do not require // this translation. const char *filePlanePrefix = filePlane->queryProp("@prefix"); if (isAbsolutePath(filePlanePrefix) && !filePlane->hasProp("@hosts")) // otherwise assume url { #ifndef _CONTAINERIZED throw makeStringException(0, "Bare metal does not support remote file access to planes without hosts"); #endif // A external plane within another environment backed by a PVC, will need a pre-existing // corresponding plane and PVC in the local environment. // The local plane will be associated with the remote environment, via a storage/remote mapping. Owned remoteStorage = getRemoteStorage(remoteName); if (!remoteStorage) throw makeStringExceptionV(0, "Remote storage '%s' not found", remoteName); VStringBuffer remotePlaneXPath("planes[@remote='%s']/@local", remotePlaneName); const char *localMappedPlaneName = remoteStorage->queryProp(remotePlaneXPath); if (isEmptyString(localMappedPlaneName)) throw makeStringExceptionV(0, "Remote plane '%s' not found in remote storage definition '%s'", remotePlaneName, remoteName); Owned localPlane = getRemoteStoragePlane(localMappedPlaneName, false); if (!localPlane) throw makeStringExceptionV(0, "Local plane not found, mapped to by remote storage '%s' (%s->%s)", remoteName, remotePlaneName, localMappedPlaneName); DBGLOG("Remote logical file '%s' using remote storage '%s', mapping remote plane '%s' to local plane '%s'", logicalName.str(), remoteName, remotePlaneName, localMappedPlaneName); StringBuffer filePlanePrefix; filePlane->getProp("@prefix", filePlanePrefix); if (filePlane->hasProp("@subPath")) filePlanePrefix.append('/').append(filePlane->queryProp("@subPath")); // the plane prefix should match the base of file's base directory // Q: what if the plane has been redefined since the files were created? VStringBuffer clusterXPath("Cluster[@name=\"%s\"]", remotePlaneName); IPropertyTree *cluster = file->queryPropTree(clusterXPath); assertex(cluster); const char *clusterDir = cluster->queryProp("@defaultBaseDir"); assertex(startsWith(clusterDir, filePlanePrefix)); clusterDir += filePlanePrefix.length(); StringBuffer newPath(localPlane->queryPrefix()); if (strlen(clusterDir)) newPath.append(clusterDir); // add remaining tail of path cluster->setProp("@defaultBaseDir", newPath.str()); const char *dir = file->queryProp("@directory"); assertex(startsWith(dir, filePlanePrefix)); dir += filePlanePrefix.length(); newPath.clear().append(localPlane->queryPrefix()); if (strlen(dir)) newPath.append(dir); // add remaining tail of path DBGLOG("Remapping logical file directory to '%s'", newPath.str()); file->setProp("@directory", newPath.str()); } } fileDesc.setown(deserializeFileDescriptorTree(file)); if (fileDesc) fileDesc->setTraceName(logicalName); legacyDFSFile.setown(queryDistributedFileDirectory().createNew(fileDesc, logicalName)); } }; class CServiceSuperDistributedFile : public CServiceDistributedFileBase { typedef CServiceDistributedFileBase PARENT; Owned legacyDFSSuperFile; public: CServiceSuperDistributedFile(IDFSFile *_dfsFile) : PARENT(_dfsFile) { IArrayOf subFiles; unsigned subs = dfsFile->numSubFiles(); for (unsigned s=0; s subFile = dfsFile->getSubFile(s); Owned legacyDFSFile = createLegacyDFSFile(subFile); subFiles.append(*legacyDFSFile.getClear()); } legacyDFSSuperFile.setown(queryDistributedFileDirectory().createNewSuperFile(dfsFile->queryFileMeta()->queryPropTree("SuperFile"), logicalName, &subFiles)); legacyDFSFile.set(legacyDFSSuperFile); fileDesc.setown(legacyDFSSuperFile->getFileDescriptor()); } // IDistributedFile overrides virtual IDistributedSuperFile *querySuperFile() override { return this; } // IDistributedSuperFile overrides virtual IDistributedFile &querySubFile(unsigned idx,bool sub) override { return legacyDFSSuperFile->querySubFile(idx, sub); } virtual IDistributedFile *querySubFileNamed(const char *name, bool sub) override { return legacyDFSSuperFile->querySubFileNamed(name, sub); } virtual IDistributedFile *getSubFile(unsigned idx,bool sub) override { return legacyDFSSuperFile->getSubFile(idx, sub); } virtual unsigned numSubFiles(bool sub) override { return legacyDFSSuperFile->numSubFiles(sub); } virtual bool isInterleaved() override { return legacyDFSSuperFile->isInterleaved(); } virtual IDistributedFile *querySubPart(unsigned partidx,unsigned &subfileidx) override { return legacyDFSSuperFile->querySubPart(partidx, subfileidx); } virtual unsigned getPositionPart(offset_t pos, offset_t &base) override { return legacyDFSSuperFile->getPositionPart(pos, base); } virtual IDistributedFileIterator *getSubFileIterator(bool supersub=false) override { return legacyDFSSuperFile->getSubFileIterator(supersub); } virtual void validate() override { if (!legacyDFSSuperFile->existsPhysicalPartFiles(0)) { const char * logicalName = queryLogicalName(); throw MakeStringException(-1, "Some physical parts do not exists, for logical file : %s",(isEmptyString(logicalName) ? "[unattached]" : logicalName)); } } // IDistributedSuperFile virtual void addSubFile(const char *subfile, bool before=false, const char *other=NULL, bool addcontents=false, IDistributedFileTransaction *transaction=NULL) override { UNIMPLEMENTED_X("CServiceSuperDistributedFile::addSubFile"); } virtual bool removeSubFile(const char *subfile, bool remsub, bool remcontents=false, IDistributedFileTransaction *transaction=NULL) override { UNIMPLEMENTED_X("CServiceSuperDistributedFile::removeSubFile"); } virtual bool removeOwnedSubFiles(bool remsub, IDistributedFileTransaction *transaction=NULL) override { UNIMPLEMENTED_X("CServiceSuperDistributedFile::removeOwnedSubFiles"); } virtual bool swapSuperFile( IDistributedSuperFile *_file, IDistributedFileTransaction *transaction) override { UNIMPLEMENTED_X("CServiceSuperDistributedFile::swapSuperFile"); } }; static IDFSFile *createDFSFile(IPropertyTree *commonMeta, IPropertyTree *fileMeta, const char *remoteName, unsigned timeoutSecs, IUserDescriptor *userDesc); class CDFSFile : public CSimpleInterfaceOf { Linked commonMeta; // e.g. share info between IFDSFiles, e.g. common plane info between subfiles Linked fileMeta; unsigned __int64 lockId; std::vector> subFiles; StringAttr remoteName; unsigned timeoutSecs; Linked userDesc; public: CDFSFile(IPropertyTree *_commonMeta, IPropertyTree *_fileMeta, const char *_remoteName, unsigned _timeoutSecs, IUserDescriptor *_userDesc) : commonMeta(_commonMeta), fileMeta(_fileMeta), remoteName(_remoteName), timeoutSecs(_timeoutSecs), userDesc(_userDesc) { lockId = fileMeta->getPropInt64("@lockId"); if (fileMeta->getPropBool("@isSuper")) { Owned iter = fileMeta->getElements("FileMeta"); ForEach(*iter) { IPropertyTree &subMeta = iter->query(); subFiles.push_back(createDFSFile(commonMeta, &subMeta, remoteName, timeoutSecs, userDesc)); } } } virtual IPropertyTree *queryFileMeta() const override { return fileMeta; } virtual IPropertyTree *queryCommonMeta() const override { return commonMeta; } virtual unsigned __int64 getLockId() const override { return lockId; } virtual unsigned numSubFiles() const override { return (unsigned)subFiles.size(); } virtual IDFSFile *getSubFile(unsigned idx) const override { return LINK(subFiles[idx]); } virtual const char *queryRemoteName() const override { return remoteName; } virtual IUserDescriptor *queryUserDescriptor() const override { return userDesc.getLink(); } virtual unsigned queryTimeoutSecs() const override { return timeoutSecs; } }; static IDFSFile *createDFSFile(IPropertyTree *commonMeta, IPropertyTree *fileMeta, const char *remoteName, unsigned timeoutSecs, IUserDescriptor *userDesc) { return new CDFSFile(commonMeta, fileMeta, remoteName, timeoutSecs, userDesc); } IClientWsDfs *getDfsClient(const char *serviceUrl, IUserDescriptor *userDesc) { // JCSMORE - can I reuse these, are they thread safe (AFishbeck?) VStringBuffer dfsUrl("%s/WsDfs", serviceUrl); Owned dfsClient = createWsDfsClient(); dfsClient->addServiceUrl(dfsUrl); StringBuffer user, token; userDesc->getUserName(user), userDesc->getPassword(token); dfsClient->setUsernameToken(user, token, ""); return dfsClient.getClear(); } static CriticalSection serviceLeaseMapCS; static std::unordered_map serviceLeaseMap; unsigned __int64 ensureClientLease(const char *service, IUserDescriptor *userDesc) { CriticalBlock block(serviceLeaseMapCS); auto r = serviceLeaseMap.find(service); if (r != serviceLeaseMap.end()) return r->second; Owned dfsClient = getDfsClient(service, userDesc); Owned leaseResp; unsigned timeoutSecs = 60; CTimeMon tm(timeoutSecs*1000); while (true) { try { Owned leaseReq = dfsClient->createGetLeaseRequest(); leaseReq->setKeepAliveExpiryFrequency(keepAliveExpiryFrequency); leaseResp.setown(dfsClient->GetLease(leaseReq)); unsigned __int64 leaseId = leaseResp->getLeaseId(); serviceLeaseMap[service] = leaseId; return leaseId; } catch (IException *e) { /* NB: there should really be a different IException class and a specific error code * The server knows it's an unsupported method. */ if (SOAP_SERVER_ERROR != e->errorCode()) throw; e->Release(); } if (tm.timedout()) throw makeStringExceptionV(0, "GetLease timed out: timeoutSecs=%u", timeoutSecs); Sleep(5000); // sanity sleep } } #ifndef _CONTAINERIZED static std::vector dfsServiceUrls; static CriticalSection dfsServiceUrlCrit; static std::atomic currentDfsServiceUrl{0}; static bool dfsServiceUrlsDiscovered = false; #endif IDFSFile *lookupDFSFile(const char *logicalName, unsigned timeoutSecs, unsigned keepAliveExpiryFrequency, IUserDescriptor *userDesc) { CDfsLogicalFileName lfn; lfn.set(logicalName); StringBuffer remoteName, remoteLogicalFileName; StringBuffer serviceUrl; if (lfn.isRemote()) { verifyex(lfn.getRemoteSpec(remoteName, remoteLogicalFileName)); if (!strieq(remoteName, "local")) // "local" is a reserve remote name, used to mean the local environment. { Owned remoteStorage = getRemoteStorage(remoteName.str()); if (!remoteStorage) throw makeStringExceptionV(0, "Remote storage '%s' not found", remoteName.str()); serviceUrl.set(remoteStorage->queryProp("@service")); logicalName = remoteLogicalFileName; } } if (!serviceUrl.length()) { #ifdef _CONTAINERIZED // NB: only expected to be here if experimental option #option('dfsesp-localfiles', true); is in use. // This finds and uses local dfs service for local read lookukup. Owned eclWatchServices = getGlobalConfigSP()->getElements("services[@type='dfs']"); if (!eclWatchServices->first()) throw makeStringException(-1, "Dfs service not defined in esp services"); const IPropertyTree &eclWatch = eclWatchServices->query(); StringBuffer eclWatchName; eclWatch.getProp("@name", eclWatchName); auto result = getExternalService(eclWatchName); if (result.first.empty()) throw makeStringExceptionV(-1, "dfs '%s': service not found", eclWatchName.str()); if (0 == result.second) throw makeStringExceptionV(-1, "dfs '%s': service port not defined", eclWatchName.str()); const char *protocol = eclWatch.getPropBool("@tls") ? "https" : "http"; serviceUrl.appendf("%s://%s:%u", protocol, result.first.c_str(), result.second); #else { CriticalBlock b(dfsServiceUrlCrit); if (!dfsServiceUrlsDiscovered) { dfsServiceUrlsDiscovered = true; getAccessibleServiceURLList("WsSMC", dfsServiceUrls); if (0 == dfsServiceUrls.size()) throw makeStringException(-1, "Could not find any DFS services in the target HPCC configuration."); } } serviceUrl.append(dfsServiceUrls[currentDfsServiceUrl++].c_str()); logicalName = remoteLogicalFileName; remoteName.clear(); // local #endif } DBGLOG("Looking up file '%s' on '%s'", logicalName, serviceUrl.str()); Owned dfsClient = getDfsClient(serviceUrl, userDesc); Owned dfsResp; CTimeMon tm(timeoutSecs*1000); // NB: this timeout loop is to cater for *a* esp disappearing. while (true) { try { Owned dfsReq = dfsClient->createDFSFileLookupRequest(); dfsReq->setName(logicalName); unsigned remaining; if (tm.timedout(&remaining)) break; dfsReq->setRequestTimeout(remaining/1000); unsigned __int64 clientLeaseId = ensureClientLease(serviceUrl, userDesc); dfsReq->setLeaseId(clientLeaseId); dfsResp.setown(dfsClient->DFSFileLookup(dfsReq)); const IMultiException *excep = &dfsResp->getExceptions(); // NB: warning despite getXX name, this does not Link if (excep->ordinality() > 0) throw LINK((IMultiException *)excep); // NB - const IException.. not caught in general.. const char *base64Resp = dfsResp->getMeta(); MemoryBuffer compressedRespMb; JBASE64_Decode(base64Resp, compressedRespMb); MemoryBuffer decompressedRespMb; fastLZDecompressToBuffer(decompressedRespMb, compressedRespMb); Owned meta = createPTree(decompressedRespMb); IPropertyTree *fileMeta = meta->queryPropTree("FileMeta"); if (!fileMeta) // file not found return nullptr; // remoteName empty if local return createDFSFile(meta, fileMeta, remoteName.length()?remoteName.str():nullptr, timeoutSecs, userDesc); } catch (IException *e) { /* NB: there should really be a different IException class and a specific error code * The server knows it's an unsupported method. */ if (SOAP_SERVER_ERROR != e->errorCode()) throw; e->Release(); } if (tm.timedout()) throw makeStringExceptionV(0, "DFSFileLookup timed out: file=%s, timeoutSecs=%u", logicalName, timeoutSecs); Sleep(5000); // sanity sleep } return nullptr; // should never be able to reach here, but keeps compiler happy } IDistributedFile *createLegacyDFSFile(IDFSFile *dfsFile) { if (dfsFile->queryFileMeta()->getPropBool("@isSuper")) return new CServiceSuperDistributedFile(dfsFile); else return new CServiceDistributedFile(dfsFile); } IDistributedFile *lookupLegacyDFSFile(const char *logicalName, unsigned timeoutSecs, unsigned keepAliveExpiryFrequency, IUserDescriptor *userDesc) { Owned dfsFile = lookupDFSFile(logicalName, timeoutSecs, keepAliveExpiryFrequency, userDesc); if (!dfsFile) return nullptr; return createLegacyDFSFile(dfsFile); } } // namespace wsdfs