瀏覽代碼

Merge pull request #15853 from jakesmith/HPCC-27294-dfs-for-hthor-roxie

HPCC-27294 Enable hthor+roxie DFS service access

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 3 年之前
父節點
當前提交
21c2823dd2

+ 0 - 275
dali/base/dautils.cpp

@@ -3231,281 +3231,6 @@ void safeChangeModeWrite(IRemoteConnection *conn,const char *name,bool &reload,
     }
 }
 
-
-class CLocalOrDistributedFile: implements ILocalOrDistributedFile, public CInterface
-{
-    bool fileExists;
-    Owned<IDistributedFile> dfile;
-    CDfsLogicalFileName lfn;    // set if localpath but prob not useful
-    StringAttr localpath;
-    StringAttr fileDescPath;
-public:
-    IMPLEMENT_IINTERFACE;
-    CLocalOrDistributedFile()
-    {
-        fileExists = false;
-    }
-
-    virtual const char *queryLogicalName() override
-    {
-        return lfn.get();
-    }
-
-    virtual IDistributedFile * queryDistributedFile() override
-    { 
-        return dfile.get(); 
-    }
-
-    bool init(const char *fname,IUserDescriptor *user,bool onlylocal,bool onlydfs, bool write, bool isPrivilegedUser, const StringArray *clusters)
-    {
-        fileExists = false;
-        if (!onlydfs)
-            lfn.allowOsPath(true);
-        if (!lfn.setValidate(fname))
-            return false;
-        if (!onlydfs)
-        {
-            bool gotlocal = true;
-            if (isAbsolutePath(fname)||(stdIoHandle(fname)>=0)) 
-                localpath.set(fname);
-            else if (!strstr(fname,"::"))
-            {
-                // treat it as a relative file
-                StringBuffer fn;
-                localpath.set(makeAbsolutePath(fname,fn).str());
-            }
-            else if (!lfn.isExternal())
-                gotlocal = false;
-            if (gotlocal)
-            {
-                if (!write && !onlylocal) // MORE - this means the dali access checks not happening... maybe that's ok?
-                    dfile.setown(queryDistributedFileDirectory().lookup(lfn,user,write,false,false,nullptr,isPrivilegedUser)); // MORE - if dFile is not null then arguably exists should be true
-                Owned<IFile> file = getPartFile(0,0);
-                if (file.get())
-                {
-                    fileExists = file->exists();
-                    return fileExists || write;
-                }
-            }
-        }
-        if (!onlylocal)
-        {
-            if (lfn.isExternal())
-            {
-                Owned<IFileDescriptor> fDesc = createExternalFileDescriptor(lfn.get());
-                dfile.setown(queryDistributedFileDirectory().createExternal(fDesc, lfn.get()));
-                Owned<IFile> file = getPartFile(0,0);
-                if (file.get())
-                    fileExists = file->exists();
-                if (write && lfn.isExternal()&&(dfile->numParts()==1))   // if it is writing to an external file then don't return distributed
-                    dfile.clear();
-                return true;
-            }
-            else
-            {
-                dfile.setown(queryDistributedFileDirectory().lookup(lfn,user,write,false,false,nullptr,isPrivilegedUser));
-                if (dfile.get())
-                    return true;
-            }
-
-            StringBuffer dir;
-            unsigned stripeNum = 0;
-#ifdef _CONTAINERIZED
-            StringBuffer cluster;
-            if (clusters)
-            {
-                if (clusters->ordinality()>1)
-                    throw makeStringExceptionV(0, "Container mode does not yet support output to multiple clusters while writing file %s)", fname);
-                cluster.append(clusters->item(0));
-            }
-            else
-                getDefaultStoragePlane(cluster);
-            Owned<IStoragePlane> plane = getDataStoragePlane(cluster, true);
-            dir.append(plane->queryPrefix());
-            unsigned numStripedDevices = plane->numDevices();
-            stripeNum = calcStripeNumber(0, lfn.get(), numStripedDevices);
-#endif
-            StringBuffer descPath;
-            makePhysicalDirectory(descPath, lfn.get(), 0, DFD_OSdefault, dir);
-            fileDescPath.set(descPath);
-
-            // MORE - should we create the IDistributedFile here ready for publishing (and/or to make sure it's locked while we write)?
-            StringBuffer physicalPath;
-            makePhysicalPartName(lfn.get(), 1, 1, physicalPath, 0, DFD_OSdefault, dir, false, stripeNum); // more - may need to override path for roxie
-            localpath.set(physicalPath);
-            fileExists = (dfile != NULL);
-            return write;
-        }
-        return false;
-    }
-
-    virtual IFileDescriptor *getFileDescriptor() override
-    {
-        if (dfile.get())
-            return dfile->getFileDescriptor();
-        Owned<IFileDescriptor> fileDesc = createFileDescriptor();
-        fileDesc->setTraceName(lfn.get());
-        StringBuffer dir;
-        if (localpath.isEmpty()) { // e.g. external file
-            StringBuffer tail;
-            IException *e=NULL;
-            bool iswin=
-#ifdef _WIN32
-                true;
-#else
-                false;
-#endif
-            if (!lfn.getExternalPath(dir,tail,iswin,&e)) {
-                if (e)
-                    throw e;
-                return NULL;
-            }
-        }
-        else 
-            splitDirTail(fileDescPath,dir);
-        fileDesc->setDefaultDir(dir.str());
-        RemoteFilename rfn;
-        getPartFilename(rfn,0,0);
-        fileDesc->setPart(0,rfn);
-        fileDesc->queryPartDiskMapping(0).defaultCopies = DFD_DefaultCopies;
-        return fileDesc.getClear();
-    }
-
-    virtual bool getModificationTime(CDateTime &dt) override
-    {
-        if (dfile.get())
-            return dfile->getModificationTime(dt);
-        Owned<IFile> file = getPartFile(0,0);
-        if (file.get()) {
-            CDateTime dt;
-            return file->getTime(NULL,&dt,NULL);
-        }
-        return false;
-    }
-
-    virtual unsigned numParts() override 
-    {
-        if (dfile.get()) 
-            return dfile->numParts();
-        return 1;
-    }
-
-    virtual unsigned numPartCopies(unsigned partnum) override
-    {
-        if (dfile.get()) 
-            return dfile->queryPart(partnum).numCopies();
-        return 1;
-    }
-    
-    virtual IFile *getPartFile(unsigned partnum,unsigned copy) override
-    {
-        RemoteFilename rfn;
-        if ((partnum==0)&&(copy==0))
-            return createIFile(getPartFilename(rfn,partnum,copy));
-        return NULL;
-    }
-    
-    virtual void getDirAndFilename(StringBuffer &dir, StringBuffer &filename) override
-    {
-        if (dfile.get())
-        {
-            dir.append(dfile->queryDefaultDir());
-            splitFilename(localpath, nullptr, nullptr, &filename, &filename);
-        }
-        else if (localpath.isEmpty())
-        {
-            RemoteFilename rfn;
-            lfn.getExternalFilename(rfn);
-            StringBuffer fullPath;
-            rfn.getLocalPath(fullPath);
-            splitFilename(localpath, nullptr, &dir, &filename, &filename);
-        }
-        else
-        {
-            dir.append(fileDescPath);
-            splitFilename(localpath, nullptr, nullptr, &filename, &filename);
-        }
-    }
-
-    virtual RemoteFilename &getPartFilename(RemoteFilename &rfn, unsigned partnum,unsigned copy) override
-    {
-        if (dfile.get()) 
-            dfile->queryPart(partnum).getFilename(rfn,copy);
-        else if (localpath.isEmpty())
-            lfn.getExternalFilename(rfn);
-        else
-            rfn.setRemotePath(localpath);
-        return rfn;
-    }
-
-    StringBuffer &getPartFilename(StringBuffer &path, unsigned partnum,unsigned copy)
-    {
-        RemoteFilename rfn;
-        if (dfile.get()) 
-            dfile->queryPart(partnum).getFilename(rfn,copy);
-        else if (localpath.isEmpty())
-            lfn.getExternalFilename(rfn);
-        else 
-            path.append(localpath);
-        if (rfn.isLocal())
-            rfn.getLocalPath(path);
-        else
-            rfn.getRemotePath(path);
-        return path;
-    }
-
-    virtual bool getPartCrc(unsigned partnum, unsigned &crc) override
-    {
-        if (dfile.get())  
-            return dfile->queryPart(partnum).getCrc(crc);
-        Owned<IFile> file = getPartFile(0,0);
-        if (file.get()) {
-            crc = file->getCRC();
-            return true;
-        }
-        return false;
-    }
-
-    virtual offset_t getPartFileSize(unsigned partnum) override
-    {
-        if (dfile.get()) 
-            return dfile->queryPart(partnum).getFileSize(true,false);
-        Owned<IFile> file = getPartFile(0,0);
-        if (file.get())
-            return file->size();
-        return (offset_t)-1;
-    }
-
-    virtual offset_t getFileSize() override
-    {
-        if (dfile.get())
-            dfile->getFileSize(true,false);
-        offset_t ret = 0;
-        unsigned np = numParts();
-        for (unsigned i = 0;i<np;i++)
-            ret += getPartFileSize(i);
-        return ret;
-    }
-
-    virtual bool exists() const override
-    {
-        return fileExists;
-    }
-
-    virtual bool isExternal() const override
-    {
-        return lfn.isExternal();
-    }
-};
-
-ILocalOrDistributedFile* createLocalOrDistributedFile(const char *fname,IUserDescriptor *user,bool onlylocal,bool onlydfs, bool iswrite, bool isPrivilegedUser, const StringArray *clusters)
-{
-    Owned<CLocalOrDistributedFile> ret = new CLocalOrDistributedFile();
-    if (ret->init(fname,user,onlylocal,onlydfs,iswrite,isPrivilegedUser,clusters))
-        return ret.getClear();
-    return NULL;
-}
-
 static bool transactionLoggingOn=false;
 static cycle_t slowTransactionThreshold=0;
 const bool &queryTransactionLogging() { return transactionLoggingOn; }

+ 0 - 2
dali/base/dautils.hpp

@@ -449,8 +449,6 @@ interface ILocalOrDistributedFile: extends IInterface
     virtual bool isExternal() const = 0;
 };
 
-extern da_decl ILocalOrDistributedFile* createLocalOrDistributedFile(const char *fname,IUserDescriptor *user,bool onlylocal,bool onlydfs,bool iswrite, bool isPrivilegedUser, const StringArray *clusters);
-
 typedef __int64 ConnectionId;
 
 struct LockData

+ 5 - 3
ecl/eclagent/eclagent.cpp

@@ -55,6 +55,8 @@
 #include "anawu.hpp"
 #include "hpccconfig.hpp"
 
+#include "ws_dfsclient.hpp"
+
 using roxiemem::OwnedRoxieString;
 
 #include <memory>
@@ -1438,7 +1440,7 @@ bool EclAgent::fileExists(const char *name)
     StringBuffer lfn;
     expandLogicalName(lfn, name);
 
-    Owned<IDistributedFile> f = queryDistributedFileDirectory().lookup(lfn.str(),queryUserDescriptor(), false, false, false, nullptr, defaultPrivilegedUser);
+    Owned<IDistributedFile> f = wsdfs::lookup(lfn.str(), queryUserDescriptor(), false, false, false, nullptr, defaultPrivilegedUser, INFINITE);
     if (f)
         return true;
     return false;
@@ -2758,7 +2760,7 @@ unsigned __int64 EclAgent::getDatasetHash(const char * logicalName, unsigned __i
         return crc;
     }
 
-    Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup(fullname.str(),queryUserDescriptor(), false, false, false, nullptr, defaultPrivilegedUser);
+    Owned<IDistributedFile> file = wsdfs::lookup(fullname.str(),queryUserDescriptor(), false, false, false, nullptr, defaultPrivilegedUser, INFINITE);
     if (file)
     {
         WorkunitUpdate wu = updateWorkUnit();
@@ -3068,7 +3070,7 @@ restart:     // If things change beneath us as we are deleting, repeat the proce
                 MilliSleep(PERSIST_LOCK_SLEEP + (getRandom()%PERSIST_LOCK_SLEEP));
                 persistLock.setown(getPersistReadLock(goer));
             }
-            Owned<IDistributedFile> f = queryDistributedFileDirectory().lookup(goer, queryUserDescriptor(), true, false, false, nullptr, defaultPrivilegedUser);
+            Owned<IDistributedFile> f = wsdfs::lookup(goer, queryUserDescriptor(), true, false, false, nullptr, defaultPrivilegedUser, INFINITE);
             if (!f)
                 goto restart; // Persist has been deleted since last checked - repeat the whole process
             const char *newAccessTime = f->queryAttributes().queryProp("@accessed");

+ 22 - 20
ecl/eclcc/CMakeLists.txt

@@ -33,22 +33,23 @@ set (    SRCS
 include_directories ( 
          ${CMAKE_BINARY_DIR}
          ${CMAKE_BINARY_DIR}/oss
-         ./../../ecl/hqlcpp 
-         ./../../common/environment
-         ./../../common/workunit 
-         ./../../common/dllserver 
-         ./../../common/deftype 
-         ./../../system/include 
-         ./../../ecl/hql
-         ./../../dali/base
-         ./../../system/mp
-         ./../../rtl/include 
-         ./../../rtl/eclrtl 
-         ./../../system/jlib 
-         ./../../fs/dafsclient
-         ./../../system/security/shared
-         ./../../system/security/zcrypt
+         ${HPCC_SOURCE_DIR}/ecl/hqlcpp 
+         ${HPCC_SOURCE_DIR}/common/environment
+         ${HPCC_SOURCE_DIR}/common/workunit 
+         ${HPCC_SOURCE_DIR}/common/dllserver 
+         ${HPCC_SOURCE_DIR}/common/deftype 
+         ${HPCC_SOURCE_DIR}/dali/base
+         ${HPCC_SOURCE_DIR}/ecl/hql
+         ${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient
+         ${HPCC_SOURCE_DIR}/fs/dafsclient
+         ${HPCC_SOURCE_DIR}/rtl/include 
+         ${HPCC_SOURCE_DIR}/rtl/eclrtl 
          ${HPCC_SOURCE_DIR}/system/codesigner
+         ${HPCC_SOURCE_DIR}/system/include 
+         ${HPCC_SOURCE_DIR}/system/jlib 
+         ${HPCC_SOURCE_DIR}/system/mp
+         ${HPCC_SOURCE_DIR}/system/security/shared
+         ${HPCC_SOURCE_DIR}/system/security/zcrypt
     )
 
 ADD_DEFINITIONS( -D_CONSOLE )
@@ -61,17 +62,18 @@ endif()
 HPCC_ADD_EXECUTABLE ( eclcc ${SRCS} )
 install ( TARGETS eclcc RUNTIME DESTINATION ${EXEC_DIR} )
 target_link_libraries ( eclcc 
-         jlib
-         nbcd 
          eclrtl 
          deftype 
          dafsclient 
          dalibase 
-         workunit 
-         thorhelper 
+         dllserver
          hql
          hqlcpp
-         dllserver
+         jlib
+         nbcd 
+         thorhelper 
+         workunit 
+         ws_dfsclient
     )
 
 if (NOT CONTAINERIZED)

+ 3 - 1
ecl/eclcc/eclcc.cpp

@@ -71,6 +71,8 @@
 #include "zcrypt.hpp"
 #endif
 
+#include "ws_dfsclient.hpp"
+
 //#define TEST_LEGACY_DEPENDENCY_CODE
 
 #define INIFILE "eclcc.ini"
@@ -2500,7 +2502,7 @@ IHqlExpression *EclCC::lookupDFSlayout(const char *filename, IErrorReceiver &err
         // Look up the file in Dali
         try
         {
-            Owned<IDistributedFile> dfsFile = queryDistributedFileDirectory().lookup(filename, udesc, false, false, false, nullptr, defaultPrivilegedUser);
+            Owned<IDistributedFile> dfsFile = wsdfs::lookup(filename, udesc, false, false, false, nullptr, defaultPrivilegedUser, INFINITE);
             if (dfsFile)
             {
                 const char *recordECL = dfsFile->queryAttributes().queryProp("ECL");

+ 26 - 24
ecl/hthor/CMakeLists.txt

@@ -46,31 +46,32 @@ set (    INCLUDES
 include_directories (
          .
          ${HPCC_SOURCE_DIR}/common/remote
-         ${HPCC_SOURCE_DIR}/system/jhtree
-         ${HPCC_SOURCE_DIR}/system/hrpc
-         ${HPCC_SOURCE_DIR}/system/mp
-         ${HPCC_SOURCE_DIR}/common/workunit
+         ${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient
          ${HPCC_SOURCE_DIR}/common/deftype
-         ${HPCC_SOURCE_DIR}/system/include
-         ${HPCC_SOURCE_DIR}/dali/base
-         ${HPCC_SOURCE_DIR}/rtl/include
-         ${HPCC_SOURCE_DIR}/ecl/eclagent
-         ${HPCC_SOURCE_DIR}/system/jlib
-         ${HPCC_SOURCE_DIR}/common/thorhelper
-         ${HPCC_SOURCE_DIR}/rtl/eclrtl
-         ${HPCC_SOURCE_DIR}/roxie/roxiemem
-         ${HPCC_SOURCE_DIR}/roxie/roxie
-         ${HPCC_SOURCE_DIR}/roxie/ccd
          ${HPCC_SOURCE_DIR}/common/dllserver
          ${HPCC_SOURCE_DIR}/common/environment
-         ${HPCC_SOURCE_DIR}/ecl/schedulectrl
+         ${HPCC_SOURCE_DIR}/common/thorhelper
+         ${HPCC_SOURCE_DIR}/common/workunit
+         ${HPCC_SOURCE_DIR}/common/wuanalysis
+         ${HPCC_SOURCE_DIR}/dali/base
+         ${HPCC_SOURCE_DIR}/dali/ft
+         ${HPCC_SOURCE_DIR}/ecl/eclagent
          ${HPCC_SOURCE_DIR}/ecl/hql
+         ${HPCC_SOURCE_DIR}/ecl/schedulectrl
          ${HPCC_SOURCE_DIR}/fs/dafsclient
+         ${HPCC_SOURCE_DIR}/roxie/ccd
+         ${HPCC_SOURCE_DIR}/roxie/roxie
+         ${HPCC_SOURCE_DIR}/roxie/roxiemem
+         ${HPCC_SOURCE_DIR}/rtl/eclrtl
+         ${HPCC_SOURCE_DIR}/rtl/include
+         ${HPCC_SOURCE_DIR}/system/hrpc
+         ${HPCC_SOURCE_DIR}/system/include
+         ${HPCC_SOURCE_DIR}/system/jlib
+         ${HPCC_SOURCE_DIR}/system/jhtree
+         ${HPCC_SOURCE_DIR}/system/mp
+         ${HPCC_SOURCE_DIR}/system/security/shared
          ${CMAKE_BINARY_DIR}
          ${CMAKE_BINARY_DIR}/oss
-         ${HPCC_SOURCE_DIR}/dali/ft
-         ${HPCC_SOURCE_DIR}/system/security/shared
-         ${HPCC_SOURCE_DIR}/common/wuanalysis
     )
 
 if (CMAKE_COMPILER_IS_GNUCC OR CMAKE_COMPILER_IS_CLANG)
@@ -82,20 +83,21 @@ ADD_DEFINITIONS( -D_USRDLL -DHTHOR_EXPORTS -DSTARTQUERY_EXPORTS )
 HPCC_ADD_LIBRARY( hthorlib SHARED ${SRCS} ${INCLUDES} )
 install ( TARGETS hthorlib RUNTIME DESTINATION ${EXEC_DIR} LIBRARY DESTINATION ${LIB_DIR} ARCHIVE DESTINATION componentfiles/cl/lib )
 target_link_libraries ( hthorlib
-         jlib
-         mp
-         hrpc
          dafsclient
          dalibase
          dllserver
-         nbcd
          eclrtl
          deftype
-         workunit
+         hrpc
          jhtree
-         thorhelper
+         jlib
+         mp
+         nbcd
          roxiemem
          schedulectrl
+         thorhelper
+         workunit
+         ws_dfsclient
          wuanalysis
     )
 

+ 4 - 1
ecl/hthor/hthor.cpp

@@ -58,6 +58,9 @@
 #include "thormeta.hpp"
 #include "thorread.hpp"
 
+#include "ws_dfsclient.hpp"
+
+
 #define EMPTY_LOOP_LIMIT 1000
 
 static unsigned const hthorReadBufferSize = 0x10000;
@@ -1096,7 +1099,7 @@ CHThorIndexWriteActivity::CHThorIndexWriteActivity(IAgentContext &_agent, unsign
     expandLogicalFilename(lfn, fname, agent.queryWorkUnit(), agent.queryResolveFilesLocally(), false);
     if (!agent.queryResolveFilesLocally())
     {
-        Owned<IDistributedFile> f = queryDistributedFileDirectory().lookup(lfn, agent.queryCodeContext()->queryUserDescriptor(), true, false, false, nullptr, defaultNonPrivilegedUser);
+        Owned<IDistributedFile> f = wsdfs::lookup(lfn, agent.queryCodeContext()->queryUserDescriptor(), true, false, false, nullptr, defaultNonPrivilegedUser, INFINITE);
 
         if (f)
         {

+ 308 - 2
esp/clients/ws_dfsclient/ws_dfsclient.cpp

@@ -716,8 +716,8 @@ IDFSFile *lookupDFSFile(const char *logicalName, unsigned timeoutSecs, unsigned
                     throw makeStringException(-1, "Could not find any DFS services in the target HPCC configuration.");
             }
         }
-        serviceUrl.append(dfsServiceUrls[currentDfsServiceUrl++].c_str());
-        logicalName = remoteLogicalFileName;
+        serviceUrl.append(dfsServiceUrls[currentDfsServiceUrl].c_str());
+        currentDfsServiceUrl = (currentDfsServiceUrl+1 == dfsServiceUrls.size()) ? 0 : currentDfsServiceUrl+1;
         remoteName.clear(); // local
 #endif
     }
@@ -797,6 +797,312 @@ IDistributedFile *lookupLegacyDFSFile(const char *logicalName, unsigned timeoutS
     return createLegacyDFSFile(dfsFile);
 }
 
+IDistributedFile *lookup(CDfsLogicalFileName &lfn, IUserDescriptor *user, bool write, bool hold, bool lockSuperOwner, IDistributedFileTransaction *transaction, bool priviledged, unsigned timeout)
+{
+    bool viaDali = false;
+
+    // DFS service currently only supports remote files 
+    if (write)
+        viaDali = true;
+    else
+    {
+        // switch to Dali if non-remote file, unless "dfsesp-localfiles" enabled (and non-external) 
+        if (!lfn.isRemote())
+        {
+            if (lfn.isExternal() || (!getComponentConfigSP()->getPropBool("dfsesp-localfiles")))
+                viaDali = true;
+        }
+    }
+    if (viaDali)
+        return queryDistributedFileDirectory().lookup(lfn, user, write, hold, lockSuperOwner, transaction, priviledged, timeout);
+
+    return wsdfs::lookupLegacyDFSFile(lfn.get(), timeout, wsdfs::keepAliveExpiryFrequency, user);
+}
+
+IDistributedFile *lookup(const char *logicalFilename, IUserDescriptor *user, bool write, bool hold, bool lockSuperOwner, IDistributedFileTransaction *transaction, bool priviledged, unsigned timeout)
+{
+    CDfsLogicalFileName lfn;
+    lfn.set(logicalFilename);
+    return lookup(lfn, user, write, hold, lockSuperOwner, transaction, priviledged, timeout);
+}
+
 
 } // namespace wsdfs
 
+
+class CLocalOrDistributedFile: implements ILocalOrDistributedFile, public CInterface
+{
+    bool fileExists;
+    Owned<IDistributedFile> dfile;
+    CDfsLogicalFileName lfn;    // set if localpath but prob not useful
+    StringAttr localpath;
+    StringAttr fileDescPath;
+public:
+    IMPLEMENT_IINTERFACE;
+    CLocalOrDistributedFile()
+    {
+        fileExists = false;
+    }
+
+    virtual const char *queryLogicalName() override
+    {
+        return lfn.get();
+    }
+
+    virtual IDistributedFile * queryDistributedFile() override
+    { 
+        return dfile.get(); 
+    }
+
+    bool init(const char *fname,IUserDescriptor *user,bool onlylocal,bool onlydfs, bool write, bool isPrivilegedUser, const StringArray *clusters)
+    {
+        fileExists = false;
+        if (!onlydfs)
+            lfn.allowOsPath(true);
+        if (!lfn.setValidate(fname))
+            return false;
+        if (!onlydfs)
+        {
+            bool gotlocal = true;
+            if (isAbsolutePath(fname)||(stdIoHandle(fname)>=0)) 
+                localpath.set(fname);
+            else if (!strstr(fname,"::"))
+            {
+                // treat it as a relative file
+                StringBuffer fn;
+                localpath.set(makeAbsolutePath(fname,fn).str());
+            }
+            else if (!lfn.isExternal())
+                gotlocal = false;
+            if (gotlocal)
+            {
+                if (!write && !onlylocal) // MORE - this means the dali access checks not happening... maybe that's ok?
+                    dfile.setown(wsdfs::lookup(lfn, user, write, false, false, nullptr, isPrivilegedUser, INFINITE));
+                Owned<IFile> file = getPartFile(0,0);
+                if (file.get())
+                {
+                    fileExists = file->exists();
+                    return fileExists || write;
+                }
+            }
+        }
+        if (!onlylocal)
+        {
+            if (lfn.isExternal() && !lfn.isRemote())
+            {
+                Owned<IFileDescriptor> fDesc = createExternalFileDescriptor(lfn.get());
+                dfile.setown(queryDistributedFileDirectory().createExternal(fDesc, lfn.get()));
+                Owned<IFile> file = getPartFile(0,0);
+                if (file.get())
+                    fileExists = file->exists();
+                if (write && lfn.isExternal()&&(dfile->numParts()==1))   // if it is writing to an external file then don't return distributed
+                    dfile.clear();
+                return true;
+            }
+            else
+            {
+                dfile.setown(wsdfs::lookup(lfn, user, write, false, false, nullptr, isPrivilegedUser, INFINITE));
+                if (dfile.get())
+                    return true;
+            }
+
+            StringBuffer dir;
+            unsigned stripeNum = 0;
+#ifdef _CONTAINERIZED
+            StringBuffer cluster;
+            if (clusters)
+            {
+                if (clusters->ordinality()>1)
+                    throw makeStringExceptionV(0, "Container mode does not yet support output to multiple clusters while writing file %s)", fname);
+                cluster.append(clusters->item(0));
+            }
+            else
+                getDefaultStoragePlane(cluster);
+            Owned<IStoragePlane> plane = getDataStoragePlane(cluster, true);
+            dir.append(plane->queryPrefix());
+            unsigned numStripedDevices = plane->numDevices();
+            stripeNum = calcStripeNumber(0, lfn.get(), numStripedDevices);
+#endif
+            StringBuffer descPath;
+            makePhysicalDirectory(descPath, lfn.get(), 0, DFD_OSdefault, dir);
+            fileDescPath.set(descPath);
+
+            // MORE - should we create the IDistributedFile here ready for publishing (and/or to make sure it's locked while we write)?
+            StringBuffer physicalPath;
+            makePhysicalPartName(lfn.get(), 1, 1, physicalPath, 0, DFD_OSdefault, dir, false, stripeNum); // more - may need to override path for roxie
+            localpath.set(physicalPath);
+            fileExists = (dfile != NULL);
+            return write;
+        }
+        return false;
+    }
+
+    virtual IFileDescriptor *getFileDescriptor() override
+    {
+        if (dfile.get())
+            return dfile->getFileDescriptor();
+        Owned<IFileDescriptor> fileDesc = createFileDescriptor();
+        fileDesc->setTraceName(lfn.get());
+        StringBuffer dir;
+        if (localpath.isEmpty()) { // e.g. external file
+            StringBuffer tail;
+            IException *e=NULL;
+            bool iswin=
+#ifdef _WIN32
+                true;
+#else
+                false;
+#endif
+            if (!lfn.getExternalPath(dir,tail,iswin,&e)) {
+                if (e)
+                    throw e;
+                return NULL;
+            }
+        }
+        else 
+            splitDirTail(fileDescPath,dir);
+        fileDesc->setDefaultDir(dir.str());
+        RemoteFilename rfn;
+        getPartFilename(rfn,0,0);
+        fileDesc->setPart(0,rfn);
+        fileDesc->queryPartDiskMapping(0).defaultCopies = DFD_DefaultCopies;
+        return fileDesc.getClear();
+    }
+
+    virtual bool getModificationTime(CDateTime &dt) override
+    {
+        if (dfile.get())
+            return dfile->getModificationTime(dt);
+        Owned<IFile> file = getPartFile(0,0);
+        if (file.get()) {
+            CDateTime dt;
+            return file->getTime(NULL,&dt,NULL);
+        }
+        return false;
+    }
+
+    virtual unsigned numParts() override 
+    {
+        if (dfile.get()) 
+            return dfile->numParts();
+        return 1;
+    }
+
+    virtual unsigned numPartCopies(unsigned partnum) override
+    {
+        if (dfile.get()) 
+            return dfile->queryPart(partnum).numCopies();
+        return 1;
+    }
+    
+    virtual IFile *getPartFile(unsigned partnum,unsigned copy) override
+    {
+        RemoteFilename rfn;
+        if ((partnum==0)&&(copy==0))
+            return createIFile(getPartFilename(rfn,partnum,copy));
+        return NULL;
+    }
+    
+    virtual void getDirAndFilename(StringBuffer &dir, StringBuffer &filename) override
+    {
+        if (dfile.get())
+        {
+            dir.append(dfile->queryDefaultDir());
+            splitFilename(localpath, nullptr, nullptr, &filename, &filename);
+        }
+        else if (localpath.isEmpty())
+        {
+            RemoteFilename rfn;
+            lfn.getExternalFilename(rfn);
+            StringBuffer fullPath;
+            rfn.getLocalPath(fullPath);
+            splitFilename(localpath, nullptr, &dir, &filename, &filename);
+        }
+        else
+        {
+            dir.append(fileDescPath);
+            splitFilename(localpath, nullptr, nullptr, &filename, &filename);
+        }
+    }
+
+    virtual RemoteFilename &getPartFilename(RemoteFilename &rfn, unsigned partnum,unsigned copy) override
+    {
+        if (dfile.get()) 
+            dfile->queryPart(partnum).getFilename(rfn,copy);
+        else if (localpath.isEmpty())
+            lfn.getExternalFilename(rfn);
+        else
+            rfn.setRemotePath(localpath);
+        return rfn;
+    }
+
+    StringBuffer &getPartFilename(StringBuffer &path, unsigned partnum,unsigned copy)
+    {
+        RemoteFilename rfn;
+        if (dfile.get()) 
+            dfile->queryPart(partnum).getFilename(rfn,copy);
+        else if (localpath.isEmpty())
+            lfn.getExternalFilename(rfn);
+        else 
+            path.append(localpath);
+        if (rfn.isLocal())
+            rfn.getLocalPath(path);
+        else
+            rfn.getRemotePath(path);
+        return path;
+    }
+
+    virtual bool getPartCrc(unsigned partnum, unsigned &crc) override
+    {
+        if (dfile.get())  
+            return dfile->queryPart(partnum).getCrc(crc);
+        Owned<IFile> file = getPartFile(0,0);
+        if (file.get()) {
+            crc = file->getCRC();
+            return true;
+        }
+        return false;
+    }
+
+    virtual offset_t getPartFileSize(unsigned partnum) override
+    {
+        if (dfile.get()) 
+            return dfile->queryPart(partnum).getFileSize(true,false);
+        Owned<IFile> file = getPartFile(0,0);
+        if (file.get())
+            return file->size();
+        return (offset_t)-1;
+    }
+
+    virtual offset_t getFileSize() override
+    {
+        if (dfile.get())
+            dfile->getFileSize(true,false);
+        offset_t ret = 0;
+        unsigned np = numParts();
+        for (unsigned i = 0;i<np;i++)
+            ret += getPartFileSize(i);
+        return ret;
+    }
+
+    virtual bool exists() const override
+    {
+        return fileExists;
+    }
+
+    virtual bool isExternal() const override
+    {
+        return lfn.isExternal();
+    }
+};
+
+
+ILocalOrDistributedFile* createLocalOrDistributedFile(const char *fname,IUserDescriptor *user,bool onlylocal,bool onlydfs, bool iswrite, bool isPrivilegedUser, const StringArray *clusters)
+{
+    Owned<CLocalOrDistributedFile> ret = new CLocalOrDistributedFile();
+    if (ret->init(fname,user,onlylocal,onlydfs,iswrite,isPrivilegedUser,clusters))
+        return ret.getClear();
+    return NULL;
+}
+
+

+ 7 - 0
esp/clients/ws_dfsclient/ws_dfsclient.hpp

@@ -51,6 +51,13 @@ WS_DFSCLIENT_API IDFSFile *lookupDFSFile(const char *logicalName, unsigned timeo
 WS_DFSCLIENT_API IDistributedFile *createLegacyDFSFile(IDFSFile *dfsFile);
 WS_DFSCLIENT_API IDistributedFile *lookupLegacyDFSFile(const char *logicalName, unsigned timeoutSecs, unsigned keepAliveExpiryFrequency, IUserDescriptor *userDesc);
 
+WS_DFSCLIENT_API IDistributedFile *lookup(CDfsLogicalFileName &lfn, IUserDescriptor *user, bool write, bool hold, bool lockSuperOwner, IDistributedFileTransaction *transaction, bool priviledged, unsigned timeout);
+WS_DFSCLIENT_API IDistributedFile *lookup(const char *logicalFilename, IUserDescriptor *user, bool write, bool hold, bool lockSuperOwner, IDistributedFileTransaction *transaction, bool priviledged, unsigned timeout);
+
+
 } // end of namespace wsdfs
 
+interface ILocalOrDistributedFile;
+WS_DFSCLIENT_API ILocalOrDistributedFile* createLocalOrDistributedFile(const char *fname,IUserDescriptor *user,bool onlylocal,bool onlydfs,bool iswrite, bool isPrivilegedUser, const StringArray *clusters);
+
 #endif // _WS_DFSCLIENT_HPP

+ 29 - 27
roxie/ccd/CMakeLists.txt

@@ -64,34 +64,35 @@ set (   SRCS
 
 include_directories ( 
          .
-         ${HPCC_SOURCE_DIR}/fs/dafsclient
-         ${HPCC_SOURCE_DIR}/system/jhtree
-         ${HPCC_SOURCE_DIR}/system/mp
-         ${HPCC_SOURCE_DIR}/common/workunit
-         ${HPCC_SOURCE_DIR}/roxie/udplib
-         ${HPCC_SOURCE_DIR}/roxie/ccdcache
-         ${HPCC_SOURCE_DIR}/roxie/roxie
+         ${HPCC_SOURCE_DIR}/common/deftype
+         ${HPCC_SOURCE_DIR}/common/dllserver
          ${HPCC_SOURCE_DIR}/common/environment
+         ${HPCC_SOURCE_DIR}/common/thorhelper
+         ${HPCC_SOURCE_DIR}/common/workunit
+         ${HPCC_SOURCE_DIR}/dali/base
+         ${HPCC_SOURCE_DIR}/dali/dfu
+         ${HPCC_SOURCE_DIR}/dali/ft
          ${HPCC_SOURCE_DIR}/ecl/hthor
          ${HPCC_SOURCE_DIR}/ecl/schedulectrl
+         ${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient
+         ${HPCC_SOURCE_DIR}/fs/dafsclient
+         ${HPCC_SOURCE_DIR}/roxie/ccdcache
+         ${HPCC_SOURCE_DIR}/rtl/eclrtl
+         ${HPCC_SOURCE_DIR}/rtl/include
          ${HPCC_SOURCE_DIR}/rtl/nbcd
-         ${HPCC_SOURCE_DIR}/common/deftype
-         ${HPCC_SOURCE_DIR}/system/include
-         ${HPCC_SOURCE_DIR}/dali/base
-         ${HPCC_SOURCE_DIR}/dali/dfu
+         ${HPCC_SOURCE_DIR}/roxie/roxie
          ${HPCC_SOURCE_DIR}/roxie/roxiemem
-         ${HPCC_SOURCE_DIR}/common/dllserver
+         ${HPCC_SOURCE_DIR}/roxie/udplib
+         ${HPCC_SOURCE_DIR}/system/include
+         ${HPCC_SOURCE_DIR}/system/jhtree
          ${HPCC_SOURCE_DIR}/system/jlib
-         ${HPCC_SOURCE_DIR}/common/thorhelper
-         ${HPCC_SOURCE_DIR}/rtl/eclrtl
-         ${HPCC_SOURCE_DIR}/rtl/include
+         ${HPCC_SOURCE_DIR}/system/libbase58
+         ${HPCC_SOURCE_DIR}/system/mp
+         ${HPCC_SOURCE_DIR}/system/security/shared
+         ${HPCC_SOURCE_DIR}/system/security/securesocket
          ${HPCC_SOURCE_DIR}/testing/unittests
          ${CMAKE_BINARY_DIR}
          ${CMAKE_BINARY_DIR}/oss
-         ${HPCC_SOURCE_DIR}/dali/ft
-         ${HPCC_SOURCE_DIR}/system/security/shared
-         ${HPCC_SOURCE_DIR}/system/security/securesocket
-         ${HPCC_SOURCE_DIR}/system/libbase58
     )
 
 if (CMAKE_COMPILER_IS_GNUCC OR CMAKE_COMPILER_IS_CLANG)
@@ -112,20 +113,21 @@ HPCC_ADD_LIBRARY( ccd SHARED ${SRCS} )
 install ( TARGETS ccd RUNTIME DESTINATION ${EXEC_DIR} LIBRARY DESTINATION ${LIB_DIR} ARCHIVE DESTINATION componentfiles/cl/lib )
 
 target_link_libraries ( ccd 
-         jlib
-         nbcd
-         roxiemem 
-         udplib 
          dafsclient 
-         eclrtl 
          dalibase 
          deftype 
-         thorhelper 
+         dllserver 
+         eclrtl 
          jhtree 
+         jlib
+         libbase58
+         nbcd
+         roxiemem 
          schedulectrl
-         dllserver 
+         thorhelper 
+         udplib 
          workunit 
-         libbase58
+         ws_dfsclient
     )
 
 if (NOT CONTAINERIZED)

+ 4 - 3
roxie/ccd/ccdcontext.cpp

@@ -39,6 +39,7 @@
 #include "ccdstate.hpp"
 #include "roxiehelper.hpp"
 #include "enginecontext.hpp"
+#include "ws_dfsclient.hpp"
 
 #include <list>
 #include <string>
@@ -450,7 +451,7 @@ private:
 
     inline bool fileExists(const char *lfn)
     {
-        Owned<IDistributedFile> f = queryDistributedFileDirectory().lookup(lfn, queryUserDescriptor(), false, false, false, nullptr, defaultPrivilegedUser);
+        Owned<IDistributedFile> f = wsdfs::lookup(lfn, queryUserDescriptor(), false, false, false, nullptr, defaultPrivilegedUser, INFINITE);
         if (f)
             return true;
         return false;
@@ -769,7 +770,7 @@ private:
                     MilliSleep(PERSIST_LOCK_SLEEP + (getRandom()%PERSIST_LOCK_SLEEP));
                     persistLock.setown(getPersistReadLock(goer));
                 }
-                Owned<IDistributedFile> f = queryDistributedFileDirectory().lookup(goer, queryUserDescriptor(), true, false, false, nullptr, defaultPrivilegedUser);
+                Owned<IDistributedFile> f = wsdfs::lookup(goer, queryUserDescriptor(), true, false, false, nullptr, defaultPrivilegedUser, INFINITE);
                 if (!f)
                     goto restart; // Persist has been deleted since last checked - repeat the whole process
                 const char *newAccessTime = f->queryAttributes().queryProp("@accessed");
@@ -3817,7 +3818,7 @@ public:
     {
         StringBuffer fullname;
         expandLogicalFilename(fullname, logicalName, workUnit, false, false);
-        Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup(fullname.str(),queryUserDescriptor(),false,false,false,nullptr,defaultPrivilegedUser);
+        Owned<IDistributedFile> file = wsdfs::lookup(fullname.str(),queryUserDescriptor(),false,false,false,nullptr,defaultPrivilegedUser,INFINITE);
         if (file)
         {
             WorkunitUpdate wu = updateWorkUnit();

+ 2 - 1
roxie/ccd/ccddali.cpp

@@ -32,6 +32,7 @@
 #include "thorplugin.hpp"
 #include "workflow.hpp"
 #include "mpcomm.hpp"
+#include "ws_dfsclient.hpp"
 
 #ifndef _CONTAINERIZED
 #define ROXIE_DALI_CACHE
@@ -626,7 +627,7 @@ public:
             unsigned start = msTick();
             CDfsLogicalFileName lfn;
             lfn.set(logicalName);
-            Owned<IDistributedFile> dfsFile = queryDistributedFileDirectory().lookup(lfn, userdesc.get(), writeAccess, cacheIt,false,nullptr,isPrivilegedUser);
+            Owned<IDistributedFile> dfsFile = wsdfs::lookup(lfn, userdesc.get(), writeAccess, cacheIt,false,nullptr,isPrivilegedUser,INFINITE);
             if (dfsFile)
             {
                 IDistributedSuperFile *super = dfsFile->querySuperFile();

+ 3 - 25
roxie/ccd/ccdfile.cpp

@@ -1669,31 +1669,9 @@ public:
         }
         else
         {
-            // MORE - not at all sure about this. Foreign files should stay foreign ?
-            CDfsLogicalFileName dlfn;
-            dlfn.set(lfn);
-            if (dlfn.isForeign())
-                dlfn.clearForeign();
-
-            bool defaultDirPerPart = false;
-            StringBuffer defaultDir;
-            unsigned stripeNum = 0;
-#ifdef _CONTAINERIZED
-            if (!dlfn.isExternal())
-            {
-                IFileDescriptor &fileDesc = pdesc->queryOwner();
-                StringBuffer planeName;
-                fileDesc.getClusterGroupName(0, planeName);
-                Owned<IStoragePlane> plane = getDataStoragePlane(planeName, true);
-                defaultDir.append(plane->queryPrefix());
-                unsigned numStripedDevices = plane->numDevices();
-                stripeNum = calcStripeNumber(partNo-1, dlfn.get(), numStripedDevices);
-                FileDescriptorFlags fileFlags = static_cast<FileDescriptorFlags>(fileDesc.queryProperties().getPropInt("@flags"));
-                if (FileDescriptorFlags::none != (fileFlags & FileDescriptorFlags::dirperpart))
-                    defaultDirPerPart = true;
-            }
-#endif
-            makePhysicalPartName(dlfn.get(), partNo, numParts, localLocation, replicationLevel, DFD_OSdefault, defaultDir.str(), defaultDirPerPart, stripeNum);
+            RemoteFilename rfn;
+            pdesc->getFilename(replicationLevel, rfn);
+            rfn.getLocalPath(localLocation);
         }
         Owned<ILazyFileIO> ret;
         try

+ 3 - 0
roxie/ccd/ccdstate.cpp

@@ -43,6 +43,9 @@
 #include "pkgimpl.hpp"
 #include "roxiehelper.hpp"
 
+#include "ws_dfsclient.hpp"
+
+
 //-------------------------------------------------------------------------------------------
 // class CRoxiePluginCtx - provide the environments for plugins loaded by roxie. 
 // Base class handles making sure memory allocation comes from the right heap. 

+ 6 - 13
thorlcr/mfilemanager/thmfilemanager.cpp

@@ -313,22 +313,15 @@ public:
 
     IDistributedFile *timedLookup(CJobBase &job, CDfsLogicalFileName &lfn, bool write, bool privilegedUser=false, unsigned timeout=INFINITE)
     {
-        VStringBuffer blockedMsg("lock file '%s' for %s access", lfn.get(), write ? "WRITE" : "READ");
-        if (!write)
+        auto func = [&job, &lfn, write, privilegedUser](unsigned timeout)
         {
-            if (lfn.isRemote() || (!lfn.isExternal() && job.getOptBool("dfsesp-localfiles")))
-            {
-                auto func = [&job, &lfn](unsigned timeout)
-                {
-                    return wsdfs::lookupLegacyDFSFile(lfn.get(), timeout, wsdfs::keepAliveExpiryFrequency, job.queryUserDescriptor());
-                };
-                return blockReportFunc<IDistributedFile *>(job, func, timeout, blockedMsg);
-            }
-        }
-        // NB: if we're here, we're not using DFSESP
-        auto func = [&job, &lfn, write, privilegedUser](unsigned timeout) { return queryDistributedFileDirectory().lookup(lfn, job.queryUserDescriptor(), write, false, false, nullptr, privilegedUser, timeout); };
+            return wsdfs::lookup(lfn, job.queryUserDescriptor(), write, false, false, nullptr, privilegedUser, timeout);
+        };
+
+        VStringBuffer blockedMsg("lock file '%s' for %s access", lfn.get(), write ? "WRITE" : "READ");
         return blockReportFunc<IDistributedFile *>(job, func, timeout, blockedMsg);
     }
+    
     IDistributedFile *timedLookup(CJobBase &job, const char *logicalName, bool write, bool privilegedUser=false, unsigned timeout=INFINITE)
     {
         CDfsLogicalFileName lfn;