瀏覽代碼

HPCC-20996 Stream write support and services

+ add streaing write support to dafilesrv
+ add a dafilesrv generic json handler, so JSON command can
proceed data (needed for write)
+ restructure dafilesrv service so activity deals with processing
+ New DFU file interfaces and streaming interface implementations,
that connect directly to Esp and provide partition stream read
and write services, plus create and publish services.
+ Add standalone unittests for streaming interface interfaction
with dafilersv
+ Add datest test cases for read, write, copy, publish
+ Add getAccessibleServiceURLList service to environment to
discover WsDfu services (similar to fileservices and needs
additional commoning up and clarification)
+ Expose removePhysicalPartFiles to use on file before
publish
+ Add new WsDfu services to handle create without locations etc.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 6 年之前
父節點
當前提交
6e6f6a5633

+ 75 - 24
common/environment/environment.cpp

@@ -154,7 +154,7 @@ private:
     mutable bool dropZoneCacheBuilt;
     mutable bool machineCacheBuilt;
     mutable bool sparkThorCacheBuilt;
-    mutable bool clusterKeyNameCache;
+    mutable bool clusterGroupKeyNameCache;
     StringBuffer fileAccessUrl;
 
     struct KeyPairMapEntity
@@ -162,7 +162,7 @@ private:
         std::string publicKey, privateKey;
     };
     mutable std::unordered_map<std::string, KeyPairMapEntity> keyPairMap;
-    mutable std::unordered_map<std::string, std::string> keyClusterMap;
+    mutable std::unordered_map<std::string, std::string> keyGroupMap;
     StringBuffer xPath;
     mutable unsigned numOfMachines;
     mutable unsigned numOfDropZones;
@@ -179,9 +179,9 @@ private:
     mutable bool dropZoneRestrictionEnabled = true;
 
 
-    void ensureClusterKeyMap() const // keyPairMap and keyClusterMap it alters is mutable
+    void ensureClusterGroupKeyMap() const // keyPairMap and keyGroupMap it alters is mutable
     {
-        if (!clusterKeyNameCache)
+        if (!clusterGroupKeyNameCache)
         {
             StringBuffer keysDir;
             envGetConfigurationDirectory("keys",nullptr, nullptr, keysDir);
@@ -235,40 +235,40 @@ private:
             for (unsigned i=0; i<2; i++) // once for std. "ClusterGroup", 2nd time for legacy "Cluster"
             {
 #endif
-                Owned<IPropertyTreeIterator> clusterIter = p->getElements(groupKeysPath);
+                Owned<IPropertyTreeIterator> clusterGroupIter = p->getElements(groupKeysPath);
 
 #ifdef BKWRDCOMPAT_CLUSTER_VS_CLUSTERGROUP
-                if (clusterIter->first() && keyClusterMap.size()) // NB: always 0 1st time around.
+                if (clusterGroupIter->first() && keyGroupMap.size()) // NB: always 0 1st time around.
                 {
                     WARNLOG("Invalid configuration: mixed 'Keys/ClusterGroup' definitions and legacy 'Keys/Cluster' definitions found, legacy 'Keys/Cluster' definition will be ignored.");
                     break;
                 }
 #endif
-                ForEach(*clusterIter)
+                ForEach(*clusterGroupIter)
                 {
-                    IPropertyTree &cluster = clusterIter->query();
-                    const char *clusterName = cluster.queryProp("@name");
-                    if (isEmptyString(clusterName))
+                    IPropertyTree &clusterGroup = clusterGroupIter->query();
+                    const char *groupName = clusterGroup.queryProp("@name");
+                    if (isEmptyString(groupName))
                     {
                         WARNLOG("skipping %s entry with no name", groupKeysPath);
                         continue;
                     }
-                    if (cluster.hasProp("@keyPairName"))
+                    if (clusterGroup.hasProp("@keyPairName"))
                     {
-                        const char *keyPairName = cluster.queryProp("@keyPairName");
+                        const char *keyPairName = clusterGroup.queryProp("@keyPairName");
                         if (isEmptyString(keyPairName))
                         {
-                            WARNLOG("skipping invalid %s entry, name=%s", groupKeysPath, clusterName);
+                            WARNLOG("skipping invalid %s entry, name=%s", groupKeysPath, groupName);
                             continue;
                         }
-                        keyClusterMap[clusterName] = keyPairName;
+                        keyGroupMap[groupName] = keyPairName;
                     }
                 }
 #ifdef BKWRDCOMPAT_CLUSTER_VS_CLUSTERGROUP
                 groupKeysPath = "EnvSettings/Keys/Cluster";
             }
 #endif
-            clusterKeyNameCache = true;
+            clusterGroupKeyNameCache = true;
         }
     }
 
@@ -316,22 +316,22 @@ public:
     IConstDropZoneInfo * getDropZoneByIndex(unsigned index) const;
     bool isDropZoneRestrictionEnabled() const;
 
-    virtual const char *getClusterKeyPairName(const char *cluster) const override
+    virtual const char *getClusterGroupKeyPairName(const char *group) const override
     {
         synchronized procedure(safeCache);
-        ensureClusterKeyMap();
-        return keyClusterMap[cluster].c_str();
+        ensureClusterGroupKeyMap();
+        return keyGroupMap[group].c_str();
     }
     virtual const char *getPublicKeyPath(const char *keyPairName) const override
     {
         synchronized procedure(safeCache);
-        ensureClusterKeyMap();
+        ensureClusterGroupKeyMap();
         return keyPairMap[keyPairName].publicKey.c_str();
     }
     virtual const char *getPrivateKeyPath(const char *keyPairName) const override
     {
         synchronized procedure(safeCache);
-        ensureClusterKeyMap();
+        ensureClusterGroupKeyMap();
         return keyPairMap[keyPairName].privateKey.c_str();
     }
     virtual const char *getFileAccessUrl() const
@@ -457,8 +457,8 @@ public:
             { return c->getDropZoneIterator(); }
     virtual bool isDropZoneRestrictionEnabled() const
             { return c->isDropZoneRestrictionEnabled(); }
-    virtual const char *getClusterKeyPairName(const char *cluster) const override
-            { return c->getClusterKeyPairName(cluster); }
+    virtual const char *getClusterGroupKeyPairName(const char *cluster) const override
+            { return c->getClusterGroupKeyPairName(cluster); }
     virtual const char *getPublicKeyPath(const char *keyPairName) const override
             { return c->getPublicKeyPath(keyPairName); }
     virtual const char *getPrivateKeyPath(const char *keyPairName) const override
@@ -1322,7 +1322,7 @@ void CLocalEnvironment::init()
     numOfDropZones = 0;
     numOfSparkThors = 0;
     isDropZoneRestrictionLoaded = false;
-    clusterKeyNameCache = false;
+    clusterGroupKeyNameCache = false;
     ::getFileAccessUrl(fileAccessUrl);
 }
 
@@ -1785,7 +1785,7 @@ void CLocalEnvironment::clearCache()
         p.setown(conn->getRoot());
     }
     cache.kill();
-    keyClusterMap.clear();
+    keyGroupMap.clear();
     keyPairMap.clear();
     init();
     resetPasswordsFromSDS();
@@ -2362,3 +2362,54 @@ extern ENVIRONMENT_API unsigned long readSizeSetting(const char * sizeStr, const
     }
     return size;
 }
+
+unsigned getAccessibleServiceURLList(const char *serviceType, std::vector<std::string> &list)
+{
+    unsigned added = 0;
+    Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
+    Owned<IConstEnvironment> daliEnv = factory->openEnvironment();
+    Owned<IPropertyTree> env = &daliEnv->getPTree();
+    if (env.get())
+    {
+        StringBuffer fileMetaServiceUrl;
+        StringBuffer espInstanceComputerName;
+        StringBuffer bindingProtocol;
+        StringBuffer xpath;
+        StringBuffer instanceAddress;
+        StringBuffer espServiceType;
+
+        Owned<IPropertyTreeIterator> espProcessIter = env->getElements("Software/EspProcess");
+        ForEach(*espProcessIter)
+        {
+            Owned<IPropertyTreeIterator> espBindingIter = espProcessIter->query().getElements("EspBinding");
+            ForEach(*espBindingIter)
+            {
+                xpath.setf("Software/EspService[@name=\"%s\"]/Properties/@type",  espBindingIter->query().queryProp("@service"));
+
+                if (strisame(env->queryProp(xpath), serviceType))
+                {
+                    if (espBindingIter->query().getProp("@protocol", bindingProtocol.clear()))
+                    {
+                        Owned<IPropertyTreeIterator> espInstanceIter = espProcessIter->query().getElements("Instance");
+                        ForEach(*espInstanceIter)
+                        {
+                            if (espInstanceIter->query().getProp("@computer", espInstanceComputerName.clear()))
+                            {
+                                xpath.setf("Hardware/Computer[@name=\"%s\"]/@netAddress", espInstanceComputerName.str());
+                                if (env->getProp(xpath.str(), instanceAddress.clear()))
+                                {
+                                    fileMetaServiceUrl.setf("%s://%s:%d", bindingProtocol.str(), instanceAddress.str(), espBindingIter->query().getPropInt("@port",8010));
+                                    list.push_back(fileMetaServiceUrl.str());
+                                    ++added;
+                                }
+                            }
+                        }
+                    }
+                }
+            }//ESPBinding
+        }//ESPProcess
+    }
+    return added;
+}
+
+

+ 2 - 1
common/environment/environment.hpp

@@ -180,7 +180,7 @@ interface IConstEnvironment : extends IConstEnvBase
     virtual IConstDropZoneInfo * getDropZoneByAddressPath(const char * netaddress, const char *targetPath) const = 0;
     virtual IConstDropZoneInfoIterator * getDropZoneIterator() const = 0;
     virtual bool isDropZoneRestrictionEnabled() const = 0;
-    virtual const char *getClusterKeyPairName(const char *cluster) const = 0;
+    virtual const char *getClusterGroupKeyPairName(const char *cluster) const = 0;
     virtual const char *getPublicKeyPath(const char *keyPairName) const = 0;
     virtual const char *getPrivateKeyPath(const char *keyPairName) const = 0;
     virtual const char *getFileAccessUrl() const = 0;
@@ -216,6 +216,7 @@ extern "C" ENVIRONMENT_API void closeEnvironment();
 
 extern ENVIRONMENT_API unsigned long readSizeSetting(const char * sizeStr, const unsigned long defaultSize);
 
+extern ENVIRONMENT_API unsigned getAccessibleServiceURLList(const char *serviceType, std::vector<std::string> &list);
 
 #endif // _ENVIRONMENT_INCL
 //end

+ 19 - 10
common/remote/CMakeLists.txt

@@ -32,6 +32,7 @@ set (    SRCS
          rmtspawn.cpp 
          rmtssh.cpp 
          rmtsmtp.cpp 
+         dafsstream.cpp
          sockfile.cpp
          
          remoteerr.hpp
@@ -51,16 +52,23 @@ if (USE_URIPARSER)
 endif(USE_URIPARSER)
 
 include_directories (
-         ./../../system/hrpc 
-         ./../../system/mp 
-         ./../../system/include 
-         ./../../system/jlib 
-         ./../../system/jhtree
-         ./../../rtl/eclrtl
-         ./../../system/security/securesocket
-         ./../../system/security/cryptohelper
-         ./../../testing/unittests
-         ./../../rtl/include
+         ${HPCC_SOURCE_DIR}/system/hrpc 
+         ${HPCC_SOURCE_DIR}/system/mp 
+         ${HPCC_SOURCE_DIR}/system/include 
+         ${HPCC_SOURCE_DIR}/system/jlib 
+         ${HPCC_SOURCE_DIR}/system/jhtree
+         ${HPCC_SOURCE_DIR}/rtl/eclrtl
+         ${HPCC_SOURCE_DIR}/system/security/shared
+         ${HPCC_SOURCE_DIR}/system/security/securesocket
+         ${HPCC_SOURCE_DIR}/system/security/cryptohelper
+         ${HPCC_SOURCE_DIR}/testing/unittests
+         ${HPCC_SOURCE_DIR}/rtl/include
+         ${HPCC_SOURCE_DIR}/rtl/eclrtl
+         ${HPCC_SOURCE_DIR}/ecl/hql
+         ${HPCC_SOURCE_DIR}/common/deftype
+         ${HPCC_SOURCE_DIR}/dali/base
+         ${CMAKE_BINARY_DIR}
+         ${CMAKE_BINARY_DIR}/oss
     )
 
 ADD_DEFINITIONS( -D_USRDLL -DREMOTE_EXPORTS )
@@ -72,6 +80,7 @@ target_link_libraries ( remote
     eclrtl
     jlib
     jhtree 
+    hql   
     mp
     ${URIPARSER_LIBRARIES}
     ${CPPUNIT_LIBRARIES}

+ 109 - 0
common/remote/dafscommon.hpp

@@ -0,0 +1,109 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef DAFSCOMMON_HPP
+#define DAFSCOMMON_HPP
+
+const unsigned RFEnoerror = 0;
+
+enum
+{
+    RFCopenIO,                                      // 0
+    RFCcloseIO,
+    RFCread,
+    RFCwrite,
+    RFCsize,
+    RFCexists,
+    RFCremove,
+    RFCrename,
+    RFCgetver,
+    RFCisfile,
+    RFCisdirectory,                                 // 10
+    RFCisreadonly,
+    RFCsetreadonly,
+    RFCgettime,
+    RFCsettime,
+    RFCcreatedir,
+    RFCgetdir,
+    RFCstop,
+    RFCexec,                                        // legacy cmd removed
+    RFCdummy1,                                      // legacy placeholder
+    RFCredeploy,                                    // 20
+    RFCgetcrc,
+    RFCmove,
+// 1.5 features below
+    RFCsetsize,
+    RFCextractblobelements,
+    RFCcopy,
+    RFCappend,
+    RFCmonitordir,
+    RFCsettrace,
+    RFCgetinfo,
+    RFCfirewall,    // not used currently          // 30
+    RFCunlock,
+    RFCunlockreply,
+    RFCinvalid,
+    RFCcopysection,
+// 1.7e
+    RFCtreecopy,
+// 1.7e - 1
+    RFCtreecopytmp,
+// 1.8
+    RFCsetthrottle, // legacy version
+// 1.9
+    RFCsetthrottle2,
+    RFCsetfileperms,
+// 2.0
+    RFCreadfilteredindex,    // No longer used     // 40
+    RFCreadfilteredindexcount,
+    RFCreadfilteredindexblob,
+// 2.2
+    RFCStreamRead,                                 // 43
+// 2.4
+    RFCStreamReadTestSocket,                       // 44
+// 2.5
+    RFCStreamGeneral,                              // 45
+    RFCStreamReadJSON = '{',
+    RFCmaxnormal,
+    RFCmax,
+    RFCunknown = 255 // 0 would have been more sensible, but can't break backward compatibility
+};
+
+enum DAFS_ERROR_CODES {
+    DAFSERR_connection_failed               = -1,
+    DAFSERR_authenticate_failed             = -2,
+    DAFSERR_protocol_failure                = -3,
+    DAFSERR_serveraccept_failed             = -4,
+    DAFSERR_serverinit_failed               = -5,
+    DAFSERR_cmdstream_invalidexpiry         = -6,
+    DAFSERR_cmdstream_authexpired           = -7,
+    DAFSERR_cmdstream_unsupported_recfmt    = -8,
+    DAFSERR_cmdstream_openfailure           = -9,
+    DAFSERR_cmdstream_protocol_failure      = -10,
+    DAFSERR_cmdstream_unauthorized          = -11,
+    DAFSERR_cmdstream_unknownwritehandle    = -12,
+    DAFSERR_cmdstream_generalwritefailure   = -13
+};
+
+inline MemoryBuffer & initSendBuffer(MemoryBuffer & buff)
+{
+    buff.setEndian(__BIG_ENDIAN);       // transfer as big endian...
+    buff.append((unsigned)0);           // reserve space for length prefix
+    return buff;
+}
+
+#endif // DAFSCOMMON_HPP

文件差異過大導致無法顯示
+ 1380 - 0
common/remote/dafsstream.cpp


+ 139 - 0
common/remote/dafsstream.hpp

@@ -0,0 +1,139 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef DAFSCLIENT_HPP
+#define DAFSCLIENT_HPP
+
+#ifdef REMOTE_EXPORTS
+#define REMOTE_API DECL_EXPORT
+#else
+#define REMOTE_API DECL_IMPORT
+#endif
+
+#include "seclib.hpp"
+
+interface IOutputMetaData;
+namespace dafsstream
+{
+
+enum DFUFileType { dft_none, dft_flat, dft_index };
+
+enum DaFsExceptionCode
+{
+    DaFsClient_NoStreamWriteSupport = 1,
+    DaFsClient_OpenFailure,
+    DaFsClient_ECLParseError,
+    DaFsClient_ConnectionFailure,
+    DaFsClient_TooOld,
+    DaFsClient_NotStarted,
+    DaFsClient_ReaderEndOfStream,
+    DaFsClient_CompressorSetupError,
+    DaFsClient_InvalidFileType,
+    DaFsClient_WriteError,
+    DaFsClient_InvalidMetaInfo,
+};
+
+interface IDaFsException : extends IException
+{
+};
+
+interface REMOTE_API IDFUFilePartBase : extends IInterface
+{
+    virtual void start() = 0;
+    virtual void finalize() = 0;
+    virtual IOutputMetaData *queryMeta() const = 0;
+};
+
+interface REMOTE_API IDFUFilePartReader : extends IDFUFilePartBase
+{
+    virtual const void *nextRow(size32_t &sz) = 0;
+    virtual const void *getRows(size32_t min, size32_t &got) = 0; // will read at least min, and returned data will contain whole rows only
+
+// NB: these methods should be called before start()
+    virtual void addFieldFilter(const char *filter) = 0;
+    virtual void clearFieldFilters() = 0;
+    virtual void setOutputRecordFormat(const char *eclRecDef) = 0;
+    virtual void addVirtualFieldMapping(const char *fieldName, const char *fieldValue) = 0;
+};
+
+interface REMOTE_API IDFUFilePartWriter : extends IDFUFilePartBase
+{
+    virtual void write(size32_t sz, const void *rowData) = 0; // NB: can be multiple rows
+};
+
+
+interface REMOTE_API IDFUFileAccessExt : extends IInterface
+{
+    virtual IOutputMetaData *queryMeta() const = 0;
+    virtual IFileDescriptor &queryFileDescriptor() const = 0;
+    virtual IPropertyTree &queryProperties() const = 0; // NB: properties of file descriptor
+    virtual void setLayoutBin(size32_t sz, const void *layoutBin) = 0;
+};
+
+enum DFUFileOption { dfo_null=0, dfo_compressedRemoteStreams=1 }; // NB: will be used in bit field
+interface REMOTE_API IDFUFileAccess : extends IInterface
+{
+    virtual const char *queryName() const = 0;
+    virtual const char *queryFileId() const = 0;
+    virtual unsigned queryNumParts() const = 0;
+    virtual SecAccessFlags queryAccessType() const = 0;
+    virtual bool queryIsGrouped() const = 0;
+    virtual DFUFileType queryType() const = 0;
+    virtual bool queryIsCompressed() const = 0;
+    virtual const char *queryClusterGroupName() const = 0;
+    virtual const char *queryPartHost(unsigned part, unsigned copy=0) const = 0;
+    virtual const char *queryJSONTypeInfo() const = 0;
+    virtual const char *queryECLRecordDefinition() const = 0;
+    virtual const void *queryMetaInfoBlob(size32_t &sz) const = 0;
+    virtual const char *queryCommCompressionType() const = 0;
+    virtual DFUFileOption queryFileOptions() const = 0;
+    virtual bool isFileOptionSet(DFUFileOption opt) const = 0;
+    virtual const char *queryFileProperty(const char *name) const = 0;
+    virtual __int64 queryFilePropertyInt(const char *name) const = 0;
+    virtual const char *queryPartProperty(unsigned part, const char *name) const = 0;
+    virtual __int64 queryPartPropertyInt(unsigned part, const char *name) const = 0;
+
+    virtual void setCommCompressionType(const char *compType) = 0;
+    virtual void setFileOption(DFUFileOption opt) = 0;
+    virtual void clearFileOption(DFUFileOption opt) = 0;
+    virtual void setECLRecordDefinition(const char *eclRecDef) = 0;
+    virtual void setFileProperty(const char *name, const char *value) = 0;
+    virtual void setFilePropertyInt(const char *name, __int64 value) = 0;
+    virtual void setPartProperty(unsigned part, const char *name, const char *value) = 0;
+    virtual void setPartPropertyInt(unsigned part, const char *name, __int64 value) = 0;
+
+// NB: these changes effect future creation of IDFUFilePartReader or IDFUFilePartWriter instances
+    virtual void setStreamReplyLimitK(unsigned k) = 0;
+    virtual void setExpirySecs(unsigned secs) = 0;
+
+// NB: the intention is for a IDFUFileAccess to be used to create instances for multiple parts, but not to mix types.
+    virtual IDFUFilePartReader *createFilePartReader(unsigned p, unsigned copy=0, IOutputMetaData *outMeta=nullptr, bool preserveGrouping=false) = 0;
+    virtual IDFUFilePartWriter *createFilePartWriter(unsigned p) = 0;
+    virtual IDFUFilePartWriter *createFilePartAppender(unsigned p) = 0;
+
+    virtual IDFUFileAccessExt *queryEngineInterface() = 0;
+};
+
+// NB: fileId, supplied/only needed by older esp's at publish time
+REMOTE_API IDFUFileAccess *createDFUFileAccess(const char *metaInfoBlobB64, const char *fileId=nullptr);
+REMOTE_API IRowWriter *createRowWriter(IDFUFilePartWriter *partWriter);
+
+REMOTE_API void setDefaultCommCompression(const char *compType);
+
+} // end of namespace dafsstream
+
+#endif

+ 0 - 15
common/remote/remoteerr.hpp

@@ -94,20 +94,5 @@ interface REMOTE_API IDAFS_Exception: extends IException
 { // Raise by dafilesrv calls
 };
 
-enum DAFS_ERROR_CODES {
-    DAFSERR_connection_failed               = -1,
-    DAFSERR_authenticate_failed             = -2,
-    DAFSERR_protocol_failure                = -3,
-    DAFSERR_serveraccept_failed             = -4,
-    DAFSERR_serverinit_failed               = -5,
-    DAFSERR_cmdstream_invalidexpiry         = -6,
-    DAFSERR_cmdstream_authexpired           = -7,
-    DAFSERR_cmdstream_unsupported_recfmt    = -8,
-    DAFSERR_cmdstream_openfailure           = -9,
-    DAFSERR_cmdstream_protocol_failure      = -10,
-    DAFSERR_cmdstream_unauthorized          = -11
-};
-
-
 
 #endif

文件差異過大導致無法顯示
+ 967 - 549
common/remote/sockfile.cpp


+ 27 - 4
common/remote/sockfile.hpp

@@ -27,6 +27,10 @@
 #define REMOTE_API DECL_IMPORT
 #endif
 
+#define DAFILESRV_METAINFOVERSION 2
+
+#define DAFILESRV_STREAMREAD_MINVERSION 22
+#define DAFILESRV_STREAMGENERAL_MINVERSION 25
 
 enum ThrottleClass
 {
@@ -63,6 +67,7 @@ enum RowServiceCfg
 interface IRemoteFileServer : extends IInterface
 {
     virtual void run(DAFSConnectCfg connectMethod, const SocketEndpoint &listenep, unsigned sslPort=0, const SocketEndpoint *rowServiceEp=nullptr, bool rowServiceSSL=false, bool rowServiceOnStdPort=true) = 0;
+    virtual void run(DAFSConnectCfg _connectMethod, ISocket *listenSocket, ISocket *secureSocket, ISocket *rowServiceSocket) = 0;
     virtual void stop() = 0;
     virtual unsigned idleTime() = 0; // in ms
     virtual void setThrottle(ThrottleClass throttleClass, unsigned limit, unsigned delayMs=DEFAULT_STDCMD_THROTTLEDELAYMS, unsigned cpuThreshold=DEFAULT_STDCMD_THROTTLECPULIMIT, unsigned queueLimit=DEFAULT_STDCMD_THROTTLEQUEUELIMIT) = 0;
@@ -78,13 +83,16 @@ interface IRemoteRowServer : extends IInterface
     virtual StringBuffer &getStats(StringBuffer &stats, bool reset) = 0;
 };
 
-#define FILESRV_VERSION 24 // don't forget VERSTRING in sockfile.cpp
+#define FILESRV_VERSION 25 // don't forget VERSTRING in sockfile.cpp
 
 interface IKeyManager;
 interface IDelayedFile;
+interface IDaFsConnection;
 
 extern REMOTE_API IFile * createRemoteFile(SocketEndpoint &ep,const char * _filename);
 extern REMOTE_API unsigned getRemoteVersion(ISocket * _socket, StringBuffer &ver);
+extern REMOTE_API unsigned getCachedRemoteVersion(IDaFsConnection &daFsConnection);
+extern REMOTE_API unsigned getCachedRemoteVersion(const SocketEndpoint &ep, bool secure);
 extern REMOTE_API unsigned stopRemoteServer(ISocket * _socket);
 extern REMOTE_API const char *remoteServerVersionString();
 extern REMOTE_API IRemoteFileServer * createRemoteFileServer(unsigned maxThreads=DEFAULT_THREADLIMIT, unsigned maxThreadsDelayMs=DEFAULT_THREADLIMITDELAYMS, unsigned maxAsyncCopy=DEFAULT_ASYNCCOPYMAX, IPropertyTree *keyPairInfo=nullptr);
@@ -114,8 +122,26 @@ extern REMOTE_API IRemoteFileIO *createRemoteFilteredFile(SocketEndpoint &ep, co
 interface IIndexLookup;
 extern REMOTE_API IIndexLookup *createRemoteFilteredKey(SocketEndpoint &ep, const char * filename, unsigned crc, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, unsigned __int64 chooseNLimit);
 
+typedef unsigned char RemoteFileCommandType;
+extern REMOTE_API RemoteFileCommandType queryRemoteStreamCmd(); // used by testsocket only
+
+interface IFileDescriptor;
+typedef IFileDescriptor *(*FileDescriptorFactoryType)(IPropertyTree *);
+extern REMOTE_API void configureRemoteCreateFileDescriptorCB(FileDescriptorFactoryType cb);
+
 
 // client only
+extern FileDescriptorFactoryType queryRemoteCreateFileDescriptorCB();
+interface IDaFsConnection : extends IInterface
+{
+    virtual void close(int handle) = 0;
+    virtual void send(MemoryBuffer &sendMb, MemoryBuffer &reply) = 0;
+    virtual unsigned getVersion(StringBuffer &ver) = 0;
+    virtual const SocketEndpoint &queryEp() const = 0;
+};
+
+IDaFsConnection *createDaFsConnection(const SocketEndpoint &ep, DAFSConnectCfg connectMethod, const char *tracing);
+
 extern void clientSetDaliServixSocketCaching(bool set);
 extern void clientDisconnectRemoteFile(IFile *file);
 extern void clientDisconnectRemoteIoOnExit(IFileIO *fileio,bool set);
@@ -135,7 +161,4 @@ extern bool clientAsyncCopyFileSection(const char *uuid,    // from genUUID - mu
 extern void clientSetRemoteFileTimeouts(unsigned maxconnecttime,unsigned maxreadtime);
 extern void clientAddSocketToCache(SocketEndpoint &ep,ISocket *socket);
 
-typedef unsigned char RemoteFileCommandType;
-extern REMOTE_API RemoteFileCommandType queryRemoteStreamCmd(); // used by testsocket only
-
 #endif

+ 13 - 3
common/thorhelper/thorcommon.cpp

@@ -1998,7 +1998,7 @@ void bindMemoryToLocalNodes()
 #endif
 }
 
-extern THORHELPER_API IOutputMetaData *getDaliLayoutInfo(IPropertyTree const &props)
+static IOutputMetaData *_getDaliLayoutInfo(MemoryBuffer &layoutBin, IPropertyTree const &props)
 {
     try
     {
@@ -2006,7 +2006,6 @@ extern THORHELPER_API IOutputMetaData *getDaliLayoutInfo(IPropertyTree const &pr
         bool isGrouped = props.getPropBool("@grouped", false);
         if (props.hasProp("_rtlType"))
         {
-            MemoryBuffer layoutBin;
             props.getPropBin("_rtlType", layoutBin);
             try
             {
@@ -2034,7 +2033,6 @@ extern THORHELPER_API IOutputMetaData *getDaliLayoutInfo(IPropertyTree const &pr
                     props.getPropBin("_record_layout", mb);
                     expr.setown(patchEclRecordDefinitionFromRecordLayout(expr, mb));
                 }
-                MemoryBuffer layoutBin;
                 if (exportBinaryType(layoutBin, expr, isIndex))
                     return createTypeInfoOutputMetaData(layoutBin, isGrouped);
             }
@@ -2056,6 +2054,18 @@ extern THORHELPER_API IOutputMetaData *getDaliLayoutInfo(IPropertyTree const &pr
     return nullptr;
 }
 
+extern THORHELPER_API IOutputMetaData *getDaliLayoutInfo(IPropertyTree const &props)
+{
+    MemoryBuffer layoutBin;
+    return _getDaliLayoutInfo(layoutBin, props);
+}
+
+extern THORHELPER_API bool getDaliLayoutInfo(MemoryBuffer &layoutBin, IPropertyTree const &props)
+{
+    Owned<IOutputMetaData> meta = _getDaliLayoutInfo(layoutBin, props);
+    return nullptr != meta; // meta created to verify, but only returning layoutBin;
+}
+
 static bool getTranslators(Owned<const IDynamicTransform> &translator, Owned<const IKeyTranslator> *keyedTranslator, const char *tracing, unsigned expectedCrc, IOutputMetaData *expectedFormat, unsigned publishedCrc, IOutputMetaData *publishedFormat, unsigned projectedCrc, IOutputMetaData *projectedFormat, RecordTranslationMode mode)
 {
     if (expectedCrc)

+ 1 - 0
common/thorhelper/thorcommon.hpp

@@ -633,6 +633,7 @@ extern THORHELPER_API void setAutoAffinity(unsigned curProcess, unsigned process
 extern THORHELPER_API void bindMemoryToLocalNodes();
 
 extern THORHELPER_API IOutputMetaData *getDaliLayoutInfo(IPropertyTree const &props);
+extern THORHELPER_API bool getDaliLayoutInfo(MemoryBuffer &layoutBin, IPropertyTree const &props);
 
 /* Returns a dynamic translator (as 1st parameter) given a generated expected format, the published format and the desired projectedFormat,
  * providing translation mode and crc's allow translation. Returns true if translator created.

+ 72 - 67
dali/base/dadfs.cpp

@@ -1135,6 +1135,7 @@ public:
         defaultTimeout = timems;
         return ret;
     }
+    virtual bool removePhysicalPartFiles(const char *logicalName, IFileDescriptor *fileDesc, IMultiException *mexcept, unsigned numParallelDeletes=0) override;
 };
 
 
@@ -3351,71 +3352,7 @@ protected:
         if (logicalName.isForeign())
             throw MakeStringException(-1,"cannot remove a foreign file (%s)",logicalName.get());
 
-        class casyncfor: public CAsyncFor
-        {
-            IFileDescriptor *fileDesc;
-            CriticalSection errcrit;
-            IMultiException *mexcept;
-        public:
-            bool ok;
-            bool islazy;
-            casyncfor(IFileDescriptor *_fileDesc, IMultiException *_mexcept)
-            {
-                fileDesc = _fileDesc;
-                ok = true;
-                islazy = false;
-                mexcept = _mexcept;
-            }
-            void Do(unsigned i)
-            {
-                IPartDescriptor *part = fileDesc->queryPart(i);
-                unsigned nc = part->numCopies();
-                for (unsigned copy = 0; copy < nc; copy++)
-                {
-                    RemoteFilename rfn;
-                    part->getFilename(copy, rfn);
-                    Owned<IFile> partfile = createIFile(rfn);
-                    StringBuffer eps;
-                    try
-                    {
-                        unsigned start = msTick();
-                        if (!partfile->remove()&&(copy==0)&&!islazy) // only warn about missing primary files
-                            LOG(MCwarning, unknownJob, "Failed to remove file part %s from %s", partfile->queryFilename(),rfn.queryEndpoint().getUrlStr(eps).str());
-                        else
-                        {
-                            unsigned t = msTick()-start;
-                            if (t>5*1000)
-                                LOG(MCwarning, unknownJob, "Removing %s from %s took %ds", partfile->queryFilename(), rfn.queryEndpoint().getUrlStr(eps).str(), t/1000);
-                        }
-                    }
-                    catch (IException *e)
-                    {
-                        CriticalBlock block(errcrit);
-                        if (mexcept)
-                            mexcept->append(*e);
-                        else
-                        {
-                            StringBuffer s("Failed to remove file part ");
-                            s.append(partfile->queryFilename()).append(" from ");
-                            rfn.queryEndpoint().getUrlStr(s);
-                            EXCLOG(e, s.str());
-                            e->Release();
-                        }
-                        ok = false;
-                    }
-                }
-            }
-        } afor(fileDesc, mexcept);
-        afor.islazy = fileDesc->queryProperties().getPropBool("@lazy");
-        if (0 == numParallelDeletes)
-            numParallelDeletes = fileDesc->numParts();
-        if (numParallelDeletes > MAX_PHYSICAL_DELETE_THREADS)
-        {
-            WARNLOG("Limiting parallel physical delete threads to %d", MAX_PHYSICAL_DELETE_THREADS);
-            numParallelDeletes = MAX_PHYSICAL_DELETE_THREADS;
-        }
-        afor.For(fileDesc->numParts(),numParallelDeletes,false,true);
-        return afor.ok;
+        return parent->removePhysicalPartFiles(logicalName.get(), fileDesc, mexcept, numParallelDeletes);
     }
 
 protected: friend class CDistributedFilePart;
@@ -4172,8 +4109,6 @@ public:
                 StringBuffer copyDir(baseDir);
                 adjustClusterDir(i, copy, copyDir);
                 fullname.clear().append(copyDir).append(newPath);
-
-                PROGLOG("fullname = %s", fullname.str());
                 newNames.item(i).append(fullname);
             }
         }
@@ -12598,6 +12533,76 @@ IDFAttributesIterator* CDistributedFileDirectory::getLogicalFilesSorted(
         cacheHint, total, allMatchingFiles, true, true);
 }
 
+bool CDistributedFileDirectory::removePhysicalPartFiles(const char *logicalName, IFileDescriptor *fileDesc, IMultiException *mexcept, unsigned numParallelDeletes)
+{
+    class casyncfor: public CAsyncFor
+    {
+        IFileDescriptor *fileDesc;
+        CriticalSection errcrit;
+        IMultiException *mexcept;
+    public:
+        bool ok;
+        bool islazy;
+        casyncfor(IFileDescriptor *_fileDesc, IMultiException *_mexcept)
+        {
+            fileDesc = _fileDesc;
+            ok = true;
+            islazy = false;
+            mexcept = _mexcept;
+        }
+        void Do(unsigned i)
+        {
+            IPartDescriptor *part = fileDesc->queryPart(i);
+            unsigned nc = part->numCopies();
+            for (unsigned copy = 0; copy < nc; copy++)
+            {
+                RemoteFilename rfn;
+                part->getFilename(copy, rfn);
+                Owned<IFile> partfile = createIFile(rfn);
+                StringBuffer eps;
+                try
+                {
+                    unsigned start = msTick();
+                    if (!partfile->remove()&&(copy==0)&&!islazy) // only warn about missing primary files
+                        LOG(MCwarning, unknownJob, "Failed to remove file part %s from %s", partfile->queryFilename(),rfn.queryEndpoint().getUrlStr(eps).str());
+                    else
+                    {
+                        unsigned t = msTick()-start;
+                        if (t>5*1000)
+                            LOG(MCwarning, unknownJob, "Removing %s from %s took %ds", partfile->queryFilename(), rfn.queryEndpoint().getUrlStr(eps).str(), t/1000);
+                    }
+                }
+                catch (IException *e)
+                {
+                    CriticalBlock block(errcrit);
+                    if (mexcept)
+                        mexcept->append(*e);
+                    else
+                    {
+                        StringBuffer s("Failed to remove file part ");
+                        s.append(partfile->queryFilename()).append(" from ");
+                        rfn.queryEndpoint().getUrlStr(s);
+                        EXCLOG(e, s.str());
+                        e->Release();
+                    }
+                    ok = false;
+                }
+            }
+        }
+    } afor(fileDesc, mexcept);
+    afor.islazy = fileDesc->queryProperties().getPropBool("@lazy");
+    if (0 == numParallelDeletes)
+        numParallelDeletes = fileDesc->numParts();
+    if (numParallelDeletes > MAX_PHYSICAL_DELETE_THREADS)
+    {
+        WARNLOG("Limiting parallel physical delete threads to %d", MAX_PHYSICAL_DELETE_THREADS);
+        numParallelDeletes = MAX_PHYSICAL_DELETE_THREADS;
+    }
+    afor.For(fileDesc->numParts(),numParallelDeletes,false,true);
+    return afor.ok;
+}
+
+
 #ifdef _USE_CPPUNIT
 /*
  * This method removes files only logically. removeEntry() used to do that, but the only

+ 3 - 0
dali/base/dadfs.hpp

@@ -687,6 +687,9 @@ interface IDistributedFileDirectory: extends IInterface
 
     virtual unsigned setDefaultTimeout(unsigned timems) = 0;                                // sets default timeout for SDS connections and locking
                                                                                             // returns previous value
+
+    // useful to clearup after temporary unpublished file.
+    virtual bool removePhysicalPartFiles(const char *logicalName, IFileDescriptor *fileDesc, IMultiException *mexcept, unsigned numParallelDeletes=0) = 0;
 };
 
 

+ 14 - 0
dali/base/dafdesc.cpp

@@ -3108,3 +3108,17 @@ void extractFilePartInfo(IPropertyTree &info, IFileDescriptor &file)
         }
     }
 }
+
+static IFileDescriptor *factoryForRemoteFileDescriptor(IPropertyTree *fileInfo)
+{
+    if (fileInfo)
+        return deserializeFileDescriptorTree(fileInfo);
+    else
+        return createFileDescriptor();
+}
+
+extern da_decl FileDescriptorFactoryType queryFileDescriptorFactory()
+{
+    return factoryForRemoteFileDescriptor;
+}
+

+ 3 - 0
dali/base/dafdesc.hpp

@@ -371,5 +371,8 @@ inline DFD_OS SepCharBaseOs(char c)
 
 extern da_decl void extractFilePartInfo(IPropertyTree &info, IFileDescriptor &file);
 
+typedef IFileDescriptor *(*FileDescriptorFactoryType)(IPropertyTree *);
+extern da_decl FileDescriptorFactoryType queryFileDescriptorFactory();
+
 
 #endif

+ 8 - 7
dali/dafilesrv/dafilesrv.cmake

@@ -28,15 +28,15 @@ set (    SRCS
     )
 
 include_directories ( 
-         ./../../system/hrpc 
-         ./../../common/remote 
-         ./../../system/include 
-         ./../../system/jhtree
-         ./../../system/jlib 
-         ./../../rtl/eclrtl
+         ${HPCC_SOURCE_DIR}/system/hrpc 
+         ${HPCC_SOURCE_DIR}/common/remote 
+         ${HPCC_SOURCE_DIR}/system/include 
+         ${HPCC_SOURCE_DIR}/system/jhtree
+         ${HPCC_SOURCE_DIR}/system/jlib 
+         ${HPCC_SOURCE_DIR}/rtl/eclrtl
+         ${HPCC_SOURCE_DIR}/system/security/shared
          ${CMAKE_BINARY_DIR}
          ${CMAKE_BINARY_DIR}/oss
-         ./../../system/security/shared
     )
 
 if (WIN32)
@@ -49,5 +49,6 @@ install ( TARGETS dafilesrv RUNTIME DESTINATION ${EXEC_DIR} )
 target_link_libraries ( dafilesrv
          jlib
          remote
+         dalibase
     )
 

+ 5 - 0
dali/dafilesrv/dafilesrv.cpp

@@ -27,6 +27,7 @@
 #include "jmisc.hpp"
 #include "rmtfile.hpp"
 #include "dalienv.hpp"
+#include "dafdesc.hpp"
 
 #ifdef _MSC_VER
 #pragma warning (disable : 4355)
@@ -38,6 +39,7 @@ static const char* defaultRowSericeConfiguration = "RowSvc";
 
 
 #include "remoteerr.hpp"
+#include "dafscommon.hpp"
 #include "sockfile.hpp"
 
 void usage()
@@ -334,6 +336,7 @@ int initDaemon()
 
 #endif
 
+
 int main(int argc,char **argv) 
 {
     InitModuleObjects();
@@ -858,6 +861,8 @@ int main(int argc,char **argv)
     server.setown(createRemoteFileServer(maxThreads, maxThreadsDelayMs, maxAsyncCopy, keyPairInfo));
     server->setThrottle(ThrottleStd, parallelRequestLimit, throttleDelayMs, throttleCPULimit);
     server->setThrottle(ThrottleSlow, parallelSlowRequestLimit, throttleSlowDelayMs, throttleSlowCPULimit);
+    configureRemoteCreateFileDescriptorCB(queryFileDescriptorFactory());
+
     class CPerfHook : public CSimpleInterfaceOf<IPerfMonHook>
     {
     public:

+ 12 - 9
dali/datest/datest.cmake

@@ -28,13 +28,14 @@ set (    SRCS
     )
 
 include_directories ( 
-         ./../../common/remote 
-         ./../server 
-         ./../../system/mp 
-         . 
-         ./../../system/include 
-         ./../../system/jlib
-         ./../../system/security/shared
+         ${HPCC_SOURCE_DIR}/dali/server 
+         ${HPCC_SOURCE_DIR}/system/mp 
+         ${HPCC_SOURCE_DIR}/system/include 
+         ${HPCC_SOURCE_DIR}/system/jlib
+         ${HPCC_SOURCE_DIR}/system/security/shared
+         ${HPCC_SOURCE_DIR}/esp/clients/wsdfuaccess
+         ${HPCC_SOURCE_DIR}/rtl/include
+         ${HPCC_SOURCE_DIR}/rtl/eclrtl
     )
 
 HPCC_ADD_EXECUTABLE ( datest ${SRCS} )
@@ -43,9 +44,11 @@ target_link_libraries ( datest
          jlib
          mp 
          hrpc 
-         remote 
+         remote
+         eclrtl
+         wsdfuaccess 
          dalibase 
-                 ${CPPUNIT_LIBRARIES}
+         ${CPPUNIT_LIBRARIES}
     )
 
 

+ 293 - 1
dali/datest/datest.cpp

@@ -33,7 +33,14 @@
 #include "dasess.hpp"
 #include "mplog.hpp"
 
+#include "rtlformat.hpp"
+
 #include "jptree.hpp"
+#include "wsdfuaccess.hpp"
+
+using namespace wsdfuaccess;
+using namespace dafsstream;
+
 
 #define DEFAULT_TEST "RANDTEST"
 static const char *whichTest = DEFAULT_TEST;
@@ -2229,6 +2236,285 @@ void TestSDS1()
 #endif
 }
 
+void testDfuStreamRead(const char *fname)
+{
+    configureRemoteCreateFileDescriptorCB(queryFileDescriptorFactory());
+
+    // reads a DFS file
+    try
+    {
+        Owned<IUserDescriptor> userDesc = createUserDescriptor();
+        userDesc->set("jsmith","password");
+
+        Owned<IDFUFileAccess> srcFile = lookupDFUFile(fname, "testDfuStreamRead", 300, userDesc);
+        if (!srcFile)
+        {
+            WARNLOG("File '%s' not found!", fname);
+            return;
+        }
+
+        IOutputMetaData *meta = srcFile->queryEngineInterface()->queryMeta();
+        CommonXmlWriter xmlWriter(0);
+
+        unsigned sourceN = srcFile->queryNumParts();
+        for (unsigned p=0; p<sourceN; p++)
+        {
+            Owned<IDFUFilePartReader> reader = srcFile->createFilePartReader(p, 0, nullptr, true);
+
+            reader->start();
+
+            while (true)
+            {
+                size32_t sz;
+                const void *row = reader->nextRow(sz);
+                if (!row)
+                {
+                    if (!srcFile->queryIsGrouped())
+                        break;
+                    row = reader->nextRow(sz);
+                    if (!row)
+                        break;
+                }
+                meta->toXML((const byte *)row, xmlWriter.clear());
+                PROGLOG("Row: %s", xmlWriter.str());
+            }
+        }
+    }
+    catch (IException *e)
+    {
+        EXCLOG(e, nullptr);
+        e->Release();
+    }
+}
+
+void testDfuStreamWrite(const char *fname)
+{
+    configureRemoteCreateFileDescriptorCB(queryFileDescriptorFactory());
+
+    // reads a DFS file and writes it to <filename>_copy
+    try
+    {
+        Owned<IUserDescriptor> userDesc = createUserDescriptor();
+        userDesc->set("jsmith","password");
+
+        const char *newFileName = "dfsstream::newfile1";
+        if (!isEmptyString(fname))
+            newFileName = fname;
+        const char *newEclRecDef = "{ string10 fname; string10 sname; unsigned4 age; };";
+        Owned<IDFUFileAccess> tgtFile = createDFUFile(newFileName, "mythor", dft_flat, newEclRecDef, "datest-write-newfile1", 300, true, userDesc); // NB: compressed file
+
+// NB: must match record definition
+        struct Row
+        {
+            std::string fname;
+            std::string sname;
+            unsigned age;
+        };
+        const std::array<Row, 6> rows = { { { "John      ", "Smith     ", 59 },
+                                            { "Samuel    ", "Peeps     ", 39 },
+                                            { "Bob       ", "Marks     ", 12 },
+                                            { "Jake      ", "Smith     ", 12 },
+                                            { "Paul      ", "Smith     ", 12 },
+                                            { "Sarah     ", "Potters   ", 28 }
+                                          }
+                                        };
+
+        offset_t fileSize = 0;
+        unsigned numRecs = 0;
+        unsigned targetN = tgtFile->queryNumParts();
+        for (unsigned p=0; p<targetN; p++)
+        {
+            Owned<IDFUFilePartWriter> writer = tgtFile->createFilePartWriter(p);
+            writer->start();
+            unsigned numPartRecs = 0;
+            offset_t partSize = 0;
+            for (auto &row: rows)
+            {
+                char rowMem[24];
+                memcpy(rowMem, row.fname.c_str(), 10);
+                memcpy(rowMem+10, row.sname.c_str(), 10);
+                memcpy(rowMem+20, &row.age, sizeof(row.age));
+                writer->write(sizeof(rowMem), rowMem);
+                partSize += sizeof(rowMem);
+                ++numPartRecs;
+            }
+            tgtFile->setPartPropertyInt(p, "@recordCount", numPartRecs); // JCSMORE
+            tgtFile->setPartPropertyInt(p, "@size", partSize);
+            numRecs += numPartRecs;
+            fileSize += partSize;
+        }
+        tgtFile->setFilePropertyInt("@recordCount", numRecs);
+        tgtFile->setFilePropertyInt("@size", fileSize);
+        publishDFUFile(tgtFile, true, userDesc);
+        tgtFile.clear();
+
+
+        // Read the file back
+
+
+        Owned<IDFUFileAccess> srcFile = lookupDFUFile(newFileName, "datest-read-newfile1", 300, userDesc);
+        if (!srcFile)
+        {
+            WARNLOG("File '%s' not found!", newFileName);
+            return;
+        }
+
+        unsigned sourceN = srcFile->queryNumParts();
+        for (unsigned p=0; p<sourceN; p++)
+        {
+            Owned<IDFUFilePartReader> reader = srcFile->createFilePartReader(p);
+
+            // filter by Smith and project to new format
+            reader->addFieldFilter("sname=['Smith']");
+            reader->setOutputRecordFormat("{ string5 age; string20 fname; };");
+
+            reader->start();
+
+            while (true)
+            {
+                size32_t sz;
+                const byte *row = (const byte *)reader->nextRow(sz);
+                if (!row)
+                {
+                    if (!srcFile->queryIsGrouped())
+                        break;
+                    row = (const byte *)reader->nextRow(sz);
+                    if (!row)
+                        break;
+                }
+                PROGLOG("Row: age=%.*s, fname=%.*s", 5, row, 20, row+5);
+            }
+        }
+    }
+    catch (IException *e)
+    {
+        EXCLOG(e, nullptr);
+        e->Release();
+    }
+}
+
+void testDfuStreamCopy(const char *srcFileName)
+{
+    configureRemoteCreateFileDescriptorCB(queryFileDescriptorFactory());
+
+    // reads a DFS file and writes it to <filename>_copy
+    try
+    {
+        if (isEmptyString(srcFileName))
+            throw makeStringException(0, "no source logical filename supplied");
+
+        Owned<IUserDescriptor> userDesc = createUserDescriptor();
+        userDesc->set("jsmith","password");
+
+        Owned<IDFUFileAccess> srcFile = lookupDFUFile(srcFileName, "datest", 60, userDesc);
+        if (!srcFile)
+        {
+            WARNLOG("File '%s' not found", srcFileName);
+            return;
+        }
+        IDFUFileAccessExt *srcFileEx = srcFile->queryEngineInterface();
+
+        const char *eclRecDef = srcFileEx->queryProperties().queryProp("ECL");
+        if (!eclRecDef)
+            throw makeStringExceptionV(0, "File '%s' has no record definition", srcFileName);
+        IOutputMetaData *srcMeta = srcFileEx->queryMeta();
+
+        const char *srcGroup = srcFile->queryClusterGroupName();
+        const char *clusterName = startsWith(srcGroup, "hthor__") ? "myeclagent" : "mythor";
+        StringBuffer tgtFileName(srcFileName);
+        tgtFileName.append("_copy");
+        Owned<IDFUFileAccess> tgtFile = createDFUFile(tgtFileName, clusterName, dft_flat, eclRecDef, "myRequestId", 300, false, userDesc);
+        IDFUFileAccessExt *tgtFileEx = srcFile->queryEngineInterface();
+
+
+        unsigned tgtFileParts = tgtFile->queryNumParts();
+        unsigned currentWriterPart = 0;
+        Owned<IDFUFilePartWriter> writer;
+
+        unsigned numRecs = 0;
+        unsigned srcFileParts = srcFile->queryNumParts();
+        unsigned tally = srcFileParts;
+        for (unsigned p=0; p<srcFileParts; p++)
+        {
+            Owned<IDFUFilePartReader> reader = srcFile->createFilePartReader(p);
+            reader->start();
+
+            if (tally >= srcFileParts)
+            {
+                tally -= srcFileParts;
+                writer.setown(tgtFile->createFilePartWriter(currentWriterPart++));
+                writer->start();
+            }
+            tally += tgtFileParts;
+
+            while (true)
+            {
+                size32_t sz;
+                const void *row = reader->nextRow(sz);
+                if (!row)
+                {
+                    if (!srcFile->queryIsGrouped())
+                        break;
+                    row = reader->nextRow(sz);
+                    if (!row)
+                        break;
+                }
+                ++numRecs;
+                CommonXmlWriter xmlwrite(0);
+                srcMeta->toXML((const byte *)row, xmlwrite);
+                PROGLOG("row: %s", xmlwrite.str());
+
+                writer->write(sz, row);
+            }
+
+        }
+        writer.clear();
+
+        // write some blank parts, if src # parts less than target # parts
+        while (currentWriterPart<tgtFileParts)
+        {
+            writer.setown(tgtFile->createFilePartWriter(currentWriterPart++));
+            writer->start();
+            writer.clear();
+        }
+        PROGLOG("numRecs writtern = %u", numRecs);
+        tgtFileEx->queryProperties().setPropInt64("@recordCount", numRecs);
+        //tgtFileEx->queryProperties().setPropInt64("@size", fileSize);
+        publishDFUFile(tgtFile, true, userDesc);
+
+        // read it back for good measure
+        Owned<IDFUFileAccess> newSrcFile = lookupDFUFile(tgtFileName, "datest", 60, userDesc);
+        if (!newSrcFile)
+        {
+            WARNLOG("File '%s' not found", tgtFileName.str());
+            return;
+        }
+        IOutputMetaData *tgtMeta = tgtFileEx->queryMeta();
+        CommonXmlWriter xmlWriter(0);
+        for (unsigned p=0; p<tgtFileParts; p++)
+        {
+            Owned<IDFUFilePartReader> reader = newSrcFile->createFilePartReader(p);
+            reader->start();
+
+            while (true)
+            {
+                size32_t sz;
+                const void *row = reader->nextRow(sz);
+                if (!row)
+                    break;
+                ++numRecs;
+                tgtMeta->toXML((const byte *)row, xmlWriter);
+                PROGLOG("new file row: %s", xmlWriter.str());
+            }
+        }
+    }
+    catch (IException *e)
+    {
+        EXCLOG(e, nullptr);
+        e->Release();
+    }
+}
+
 
 class CClientTestSDS : public Thread
 {
@@ -3038,7 +3324,7 @@ void usage(const char *error=NULL)
 {
     if (error) printf("%s\n", error);
     printf("usage: DATEST <server_ip:port>* [/test <name> [<test params...>] [/NITER <iterations>]\n");
-    printf("where name = RANDTEST | DFS | QTEST | QTEST2 | SESSION | LOCKS | SDS1 | SDS2 | XPATHS| STRESS | STRESS2 | SHUTDOWN | EXTERNAL | SUBLOCKS | SUBSCRIPTION | CONNECTIONSUBS | MULTIFILE | NODESUBS\n");
+    printf("where name = RANDTEST | DFS | QTEST | QTEST2 | SESSION | LOCKS | SDS1 | SDS2 | XPATHS| STRESS | STRESS2 | SHUTDOWN | EXTERNAL | SUBLOCKS | SUBSCRIPTION | CONNECTIONSUBS | MULTIFILE | NODESUBS | DFUSTREAMREAD | DFUSTREAMWRITE | DFUSTREAMCOPY\n");
     printf("eg:  datest . /test QTEST put          -- one coven server running locally, running qtest with param \"put\"\n");
     printf("     datest eq0001016 eq0001017        -- two coven servers, use default test %s\n", DEFAULT_TEST);
 }
@@ -3241,6 +3527,12 @@ int main(int argc, char* argv[])
                 Test_PartIter();
             else if (TEST("MULTICONNECT"))
                 testMultiConnect();
+            else if (TEST("DFUSTREAMREAD"))
+                testDfuStreamRead(testParams.item(0));
+            else if (TEST("DFUSTREAMWRITE"))
+                testDfuStreamWrite(testParams.ordinality() ? testParams.item(0) : nullptr);
+            else if (TEST("DFUSTREAMCOPY"))
+                testDfuStreamCopy(testParams.ordinality() ? testParams.item(0) : nullptr);
 //          else if (TEST("DALILOG"))
 //              testDaliLog(testParams.ordinality()&&0!=atoi(testParams.item(0)));
             else

+ 2 - 0
esp/bindings/http/client/httpclient.cpp

@@ -910,7 +910,9 @@ HttpClientErrCode CHttpClient::postRequest(ISoapMessage &req, ISoapMessage& resp
         parseSoapFault(httpresponse->getContent(content),errmsg);
 
         response.set_err(errmsg.str());
+#ifdef _DEBUG // JCSMORE - check with AF
         DBGLOG("SOAP_SERVER_ERROR: %s", errmsg.str());
+#endif
 
         return HttpClientErrCode::Error;
     }

+ 20 - 9
esp/clients/wsdfuaccess/CMakeLists.txt

@@ -36,17 +36,25 @@ set (    SRCS
 include_directories ( 
          ${HPCC_SOURCE_DIR}/system/include 
          ${HPCC_SOURCE_DIR}/system/xmllib
-         ${HPCC_SOURCE_DIR}/system/security/shared 
-         ${HPCC_SOURCE_DIR}/system/security/securesocket 
+         ${HPCC_SOURCE_DIR}/system/security/shared
+         ${HPCC_SOURCE_DIR}/system/security/securesocket
+         ${HPCC_SOURCE_DIR}/system/security/cryptohelper
+         ${HPCC_SOURCE_DIR}/common/remote
+         ${HPCC_SOURCE_DIR}/common/environment
+         ${HPCC_SOURCE_DIR}/common/thorhelper
+         ${HPCC_SOURCE_DIR}/rtl/include
+         ${HPCC_SOURCE_DIR}/rtl/eclrtl
          ${HPCC_SOURCE_DIR}/dali/base
          ${HPCC_SOURCE_DIR}/system/mp
-         ${HPCC_SOURCE_DIR}/esp/bindings 
-         ${HPCC_SOURCE_DIR}/esp/bindings/SOAP/client 
-         ${HPCC_SOURCE_DIR}/esp/bindings/SOAP/xpp 
-         ${HPCC_SOURCE_DIR}/system/jlib 
-         ${HPCC_SOURCE_DIR}/esp/platform 
+         ${HPCC_SOURCE_DIR}/esp/bindings
+         ${HPCC_SOURCE_DIR}/esp/bindings/SOAP/client
+         ${HPCC_SOURCE_DIR}/esp/bindings/SOAP/xpp
+         ${HPCC_SOURCE_DIR}/system/jlib
+         ${HPCC_SOURCE_DIR}/esp/platform
          ${HPCC_SOURCE_DIR}/esp/clients
-         ${HPCC_SOURCE_DIR}/esp/bindings/SOAP/Platform 
+         ${HPCC_SOURCE_DIR}/esp/bindings/SOAP/Platform
+         ${HPCC_SOURCE_DIR}/esp/smc/SMCLib
+         ${HPCC_SOURCE_DIR}/testing/unittests
     )
 
 ADD_DEFINITIONS( -D_USRDLL -DWSDFUACCESS_EXPORTS )
@@ -56,8 +64,11 @@ install ( TARGETS wsdfuaccess RUNTIME DESTINATION ${EXEC_DIR} LIBRARY DESTINATIO
 target_link_libraries ( wsdfuaccess 
          jlib
          xmllib 
-         esphttp 
+         esphttp
+         remote
+         thorhelper
          dalibase
+         ${CPPUNIT_LIBRARIES}
     )
 
 

+ 725 - 25
esp/clients/wsdfuaccess/wsdfuaccess.cpp

@@ -15,42 +15,85 @@
     limitations under the License.
 ############################################################################## */
 
+#include <vector>
+
 #include "jliball.hpp"
+#include "jflz.hpp"
+#include "daclient.hpp"
 #include "dautils.hpp"
 #include "seclib.hpp"
+#include "environment.hpp"
 #include "ws_dfu.hpp"
+#include "dafsstream.hpp"
+#include "dafdesc.hpp"
+#include "dadfs.hpp"
+#include "dasess.hpp"
+#include "thorcommon.hpp"
+#include "sockfile.hpp"
+#include "digisign.hpp"
+
+#include "eclwatch_errorlist.hpp" // only for ECLWATCH_FILE_NOT_EXIST
+#include "soapmessage.hpp"
 
 #include "wsdfuaccess.hpp"
 
+using namespace dafsstream;
+using namespace cryptohelper;
+
 namespace wsdfuaccess
 {
 
-CSecAccessType translateToCSecAccessAccessType(SecAccessFlags from)
+//#define TEST_RETURNTEXTRESPONSE
+
+static std::vector<std::string> dfuServiceUrls;
+static CriticalSection dfuServiceUrlCrit;
+static std::atomic<unsigned> currentDfuServiceUrl{0};
+static std::atomic<bool> dfuServiceUrlsDiscovered{false};
+
+void ensureAccessibleDfuServiceURLList()
 {
-    switch (from)
+    bool expected = false;
+    if (dfuServiceUrlsDiscovered.compare_exchange_strong(expected, true))
     {
-        case SecAccess_Access:
-            return CSecAccessType_Access;
-        case SecAccess_Read:
-            return CSecAccessType_Read;
-        case SecAccess_Write:
-            return CSecAccessType_Write;
-        case SecAccess_Full:
-            return CSecAccessType_Full;
-        case SecAccess_None:
-        default:
-            return CSecAccessType_None;
+        getAccessibleServiceURLList("WsSMC", dfuServiceUrls);
+        if (0 == dfuServiceUrls.size())
+            throw MakeStringException(-1, "Could not find any DFU services in the target HPCC configuration.");
+
+        for (auto &s: dfuServiceUrls)
+            s = s + "/WsDfu/";
     }
 }
 
-bool getFileAccess(StringBuffer &metaInfo, const char *serviceUrl, const char *jobId, const char *logicalName, SecAccessFlags access, unsigned expirySecs, const char *user, const char *token)
+static unsigned getNumDfuServiceURL()
 {
-    Owned<IClientWsDfu> dfuClient = createWsDfuClient();
-    dfuClient->addServiceUrl(serviceUrl);
-    dfuClient->setUsernameToken(user, token, "");
+    ensureAccessibleDfuServiceURLList();
+    return dfuServiceUrls.size();
+}
+
+/* advances to next DFU service URL, wraps if necessary.
+ * If concurrent threads are trying to advance, only 1 will succeed, but this call will update current.
+ */
+static const char *advanceToNextAvailableDFUServiceURL(unsigned &currentURL)
+{
+    ensureAccessibleDfuServiceURLList();
+    // 1st check if need to rollover
+    unsigned expected = dfuServiceUrls.size()-1;
+    if (currentDfuServiceUrl.compare_exchange_strong(expected, 0))
+        currentURL = 0;
+    else
+    {
+        // try to advance by 1.
+        if (currentDfuServiceUrl.compare_exchange_strong(currentURL, currentURL+1))
+            currentURL++;
+        else // someone else already has, get current dfu service url
+            currentURL = currentDfuServiceUrl;
+    }
+    return dfuServiceUrls[currentURL].c_str();
+}
 
+static IClientDFUFileAccessResponse *doLookupDFUFileDeprecated(IClientWsDfu *dfuClient, const char *logicalName, const char *requestId, unsigned expirySecs)
+{
     Owned<IClientDFUFileAccessRequest> dfuReq = dfuClient->createDFUFileAccessRequest();
-    IEspDFUFileAccessRequestBase &requestBase = dfuReq->updateRequestBase();
 
     CDfsLogicalFileName lfn;
     lfn.set(logicalName);
@@ -59,21 +102,678 @@ bool getFileAccess(StringBuffer &metaInfo, const char *serviceUrl, const char *j
     lfn.getCluster(cluster);
     lfn.get(lfnName); // remove cluster if present
 
+    IEspDFUFileAccessRequestBase &requestBase = dfuReq->updateRequestBase();
     requestBase.setName(lfnName);
     requestBase.setCluster(cluster);
     requestBase.setExpirySeconds(expirySecs);
-    requestBase.setAccessRole(CFileAccessRole_Engine);
-    requestBase.setAccessType(translateToCSecAccessAccessType(access));
-    requestBase.setJobId(jobId);
+    requestBase.setJobId(requestId);
+    requestBase.setAccessRole(CFileAccessRole_External);
+    requestBase.setAccessType(CSecAccessType_Read);
+    requestBase.setReturnBinTypeInfo(true);
+
+    return dfuClient->DFUFileAccess(dfuReq);
+}
+
+static IClientDFUFileAccessResponse *doLookupDFUFile(IClientWsDfu *dfuClient, const char *logicalName, const char *requestId, unsigned expirySecs)
+{
+    Owned<IClientDFUFileAccessV2Request> dfuReq = dfuClient->createDFUFileAccessV2Request();
 
-    Owned<IClientDFUFileAccessResponse> dfuResp = dfuClient->DFUFileAccess(dfuReq);
+    CDfsLogicalFileName lfn;
+    lfn.set(logicalName);
+
+    StringBuffer cluster, lfnName;
+    lfn.getCluster(cluster);
+    lfn.get(lfnName); // remove cluster if present
+
+    dfuReq->setName(lfnName);
+    dfuReq->setCluster(cluster);
+    dfuReq->setExpirySeconds(expirySecs);
+    dfuReq->setRequestId(requestId);
+#ifdef TEST_RETURNTEXTRESPONSE
+    dfuReq->setReturnTextResponse(true);
+#endif
+
+    return dfuClient->DFUFileAccessV2(dfuReq);
+}
+
+static IDFUFileAccess *doLookupDFUFileHandleLegacy(const char *serviceUrl, const char *logicalName, const char *requestId, unsigned expirySecs, const char *user, const char *password)
+{
+    Owned<IClientWsDfu> dfuClient = createWsDfuClient();
+    dfuClient->addServiceUrl(serviceUrl);
+    dfuClient->setUsernameToken(user, password, "");
+    Owned<IClientDFUFileAccessResponse> dfuResp;
+    try
+    {
+        dfuResp.setown(doLookupDFUFile(dfuClient, logicalName, requestId, expirySecs));
+    }
+    catch (IException *e)
+    {
+        /* NB: there should really be a different IException class and a specific error code
+         * The server knows it's an unsupported method.
+         */
+        if (SOAP_SERVER_ERROR != e->errorCode())
+            throw;
+        // fall through and try deprecated method
+        e->Release();
+    }
+    if (!dfuResp)
+        dfuResp.setown(doLookupDFUFileDeprecated(dfuClient, logicalName, requestId, expirySecs));
 
     const IMultiException *excep = &dfuResp->getExceptions(); // NB: warning despite getXX name, this does not Link
     if (excep->ordinality() > 0)
-        throw LINK((IMultiException *)excep); // JCSMORE - const IException.. not caught in general..
+        throw LINK((IMultiException *)excep); // NB - const IException.. not caught in general..
+
+    Owned<IDFUFileAccess> ret = createDFUFileAccess(dfuResp->getAccessInfo().getMetaInfoBlob());
+
+    if (!ret->queryEngineInterface()->queryMeta()) // as a result of legacy WsDFU version
+    {
+        const MemoryBuffer &binLayout = dfuResp->getAccessInfo().getRecordTypeInfoBin();
+        if (0 == binLayout.length())
+            throw makeStringExceptionV(0, "lookupDFUFile(%s) - layout missing", logicalName);
+        ret->queryEngineInterface()->setLayoutBin(binLayout.length(), binLayout.bytes());
+    }
+    return ret.getClear();
+}
+
+static IClientDFUFileCreateResponse *doCreateDFUFileDeprecated(IClientWsDfu *dfuClient, const char *logicalName, const char *cluster, DFUFileType type, const char *recDef, const char *requestId, unsigned expirySecs)
+{
+    Owned<IClientDFUFileCreateRequest> dfuReq = dfuClient->createDFUFileCreateRequest();
+
+    dfuReq->setECLRecordDefinition(recDef);
+    IEspDFUFileAccessRequestBase &requestBase = dfuReq->updateRequestBase();
+    requestBase.setName(logicalName);
+    requestBase.setCluster(cluster);
+    requestBase.setExpirySeconds(expirySecs);
+    requestBase.setJobId(requestId);
+    requestBase.setAccessRole(CFileAccessRole_External);
+    requestBase.setAccessType(CSecAccessType_Write);
+    requestBase.setReturnBinTypeInfo(true);
+
+    return dfuClient->DFUFileCreate(dfuReq);
+}
+
+static IClientDFUFileCreateResponse *doCreateDFUFile(IClientWsDfu *dfuClient, const char *logicalName, const char *cluster, DFUFileType type, const char *recDef, const char *requestId, unsigned expirySecs, bool compressed)
+{
+    Owned<IClientDFUFileCreateV2Request> dfuReq = dfuClient->createDFUFileCreateV2Request();
+
+    dfuReq->setECLRecordDefinition(recDef);
+    dfuReq->setName(logicalName);
+    dfuReq->setCluster(cluster);
+    dfuReq->setExpirySeconds(expirySecs);
+    dfuReq->setRequestId(requestId);
+    dfuReq->setCompressed(compressed);
+#ifdef TEST_RETURNTEXTRESPONSE
+    dfuReq->setReturnTextResponse(true);
+#endif
+
+    CDFUFileType serviceType;
+    switch (type)
+    {
+        case dft_flat:
+            serviceType = CDFUFileType_Flat;
+            break;
+        case dft_index:
+            serviceType = CDFUFileType_Index;
+            break;
+    }
+    dfuReq->setType(serviceType);
+    return dfuClient->DFUFileCreateV2(dfuReq);
+}
+
+static IDFUFileAccess *doCreateDFUFileHandleLegacy(const char *serviceUrl, const char *logicalName, const char *cluster, DFUFileType type, const char *recDef, const char *requestId, unsigned expirySecs, bool compressed, const char *user, const char *password)
+{
+    Owned<IClientWsDfu> dfuClient = createWsDfuClient();
+    dfuClient->addServiceUrl(serviceUrl);
+    dfuClient->setUsernameToken(user, password, "");
+
+    Owned<IClientDFUFileCreateResponse> dfuResp;
+    try
+    {
+        dfuResp.setown(doCreateDFUFile(dfuClient, logicalName, cluster, type, recDef, requestId, expirySecs, compressed));
+    }
+    catch (IException *e)
+    {
+        /* NB: there should really be a different IException class and a specific error code
+         * The server knows it's an unsupported method.
+         */
+        if (SOAP_SERVER_ERROR != e->errorCode())
+            throw;
+        // fall through and try deprecated method
+        e->Release();
+    }
+    if (!dfuResp)
+    {
+        if (compressed)
+            WARNLOG("createDFUFile(%s), legacy esp server does not support creating compressed files", logicalName);
+        dfuResp.setown(doCreateDFUFileDeprecated(dfuClient, logicalName, cluster, type, recDef, requestId, expirySecs));
+    }
+
+    const IMultiException *excep = &dfuResp->getExceptions(); // NB: warning despite getXX name, this does not Link
+    if (excep->ordinality() > 0)
+        throw LINK((IMultiException *)excep); // NB: - const IException.. not caught in general..
+
+    Owned<IDFUFileAccess> ret = createDFUFileAccess(dfuResp->getAccessInfo().getMetaInfoBlob(), dfuResp->getFileId());
+    // NB: patch up record definition if server didn't return it (because legacy WsDFU version)
+    if (!ret->queryEngineInterface()->queryProperties().hasProp("ECL"))
+        ret->queryEngineInterface()->queryProperties().setProp("ECL", recDef);
+    if (!ret->queryEngineInterface()->queryMeta()) // as a result of legacy WsDFU version
+    {
+        const MemoryBuffer &binLayout = dfuResp->getAccessInfo().getRecordTypeInfoBin();
+        if (0 == binLayout.length())
+            throw makeStringExceptionV(0, "createDFUFile(%s) - layout missing", logicalName);
+        ret->queryEngineInterface()->setLayoutBin(binLayout.length(), binLayout.bytes());
+    }
+
+    return ret.getClear();
+}
+
+static void doPublishDFUFile(const char *serviceUrl, IDFUFileAccess *dfuFile, bool overwrite, const char *user, const char *password)
+{
+    Owned<IClientWsDfu> dfuClient = createWsDfuClient();
+    dfuClient->addServiceUrl(serviceUrl);
+    dfuClient->setUsernameToken(user, password, "");
+
+    Owned<IClientDFUFilePublishRequest> dfuReq = dfuClient->createDFUFilePublishRequest();
+
+    dfuReq->setFileId(dfuFile->queryFileId());
+
+    dfuReq->setOverwrite(overwrite); // NB: WsDfu min_ver 1.50
+    IFileDescriptor &fileDesc = dfuFile->queryEngineInterface()->queryFileDescriptor();
+
+    MemoryBuffer mb;
+    fileDesc.serialize(mb);
+    dfuReq->setFileDescriptorBlob(mb); // NB: WsDfu min_ver 1.50
+
+    const char *eclRecDef = fileDesc.queryProperties().queryProp("ECL");
+    dfuReq->setECLRecordDefinition(eclRecDef); // NB: WsDfu depv_ver < 1.50
+
+    Owned<IClientDFUFilePublishResponse> dfuResp = dfuClient->DFUFilePublish(dfuReq);
+
+    const IMultiException *excep = &dfuResp->getExceptions(); // NB: warning despite getXX name, this does not Link
+    if (excep->ordinality() > 0)
+        throw LINK((IMultiException *)excep); // NB: - const IException.. not caught in general..
+}
+
+// wrapper to the doLookupDFUFile, that discovers and tries DFUService URL's
+IDFUFileAccess *lookupDFUFile(const char *logicalName, const char *requestId, unsigned expirySecs, const char *user, const char *password)
+{
+    unsigned currentUrl;
+    const char *espServiceUrl = advanceToNextAvailableDFUServiceURL(currentUrl);
+    unsigned c = getNumDfuServiceURL(); // max attempts
+    while (c)
+    {
+        try
+        {
+            /* JCSMORE - where would locking fit in?
+             * *IF* Esp established lock, then there'd be no association with this client (no state), and if Esp restarted lock would be lost,
+             * if this client died, the lock would remain (on Esp).
+             *
+             * Idea:
+             * 1) Esp establishes lock on behalf of this client.
+             * 2) This client sends keep-alive packets every N seconds (To Esp).
+             * 3) Esp ensures lock remains alive somehow (something (Esp?) could keep persistent [written] state of active locks?)
+             * 4) If no keep-alive for a lock, Esp closes it.
+             *
+             * Would require the ability (in Dali) to create locks without session association.
+             * As long as Dali is the lock manager, Would probably be best if the keep-alive packets were
+             * forwarded to Dali, and it managed the live/stale locks.
+             */
+
+            return doLookupDFUFileHandleLegacy(espServiceUrl, logicalName, requestId, expirySecs, user, password);
+        }
+        catch (IJSOCK_Exception *e)
+        {
+            EXCLOG(e, nullptr);
+            e->Release();
+        }
+        catch (IException *e)
+        {
+            if (ECLWATCH_FILE_NOT_EXIST == e->errorCode())
+            {
+                e->Release();
+                return nullptr; // not found
+            }
+            throw;
+        }
+        espServiceUrl = advanceToNextAvailableDFUServiceURL(currentUrl);
+        --c;
+    }
+    StringBuffer msg("Failed to contact DFU service: { ");
+    for (auto &url: dfuServiceUrls)
+        msg.append(url.c_str());
+    msg.append("}");
+    throw makeStringException(0, msg.str());
+}
+
+IDFUFileAccess *lookupDFUFile(const char *logicalName, const char *requestId, unsigned expirySecs, IUserDescriptor *userDesc)
+{
+    assertex(userDesc);
+    StringBuffer user, password;
+    userDesc->getUserName(user);
+    userDesc->getPassword(password);
+    IDFUFileAccess *ret = lookupDFUFile(logicalName, requestId, expirySecs, user, password);
+    if (ret)
+        ret->setFileOption(dfo_compressedRemoteStreams);
+    return ret;
+}
+
+
+// wrapper to the doCreateDFUFile, that discovers and tries DFUService URL's
+IDFUFileAccess *createDFUFile(const char *logicalName, const char *cluster, DFUFileType type, const char *recDef, const char *requestId, unsigned expirySecs, bool compressed, const char *user, const char *password)
+{
+    unsigned currentUrl;
+    const char *espServiceUrl = advanceToNextAvailableDFUServiceURL(currentUrl);
+    unsigned c = getNumDfuServiceURL(); // max attempts
+    while (c)
+    {
+        try
+        {
+            return doCreateDFUFileHandleLegacy(espServiceUrl, logicalName, cluster, type, recDef, requestId, expirySecs, compressed, user, password);
+        }
+        catch (IJSOCK_Exception *e)
+        {
+            EXCLOG(e, nullptr);
+            e->Release();
+        }
+        advanceToNextAvailableDFUServiceURL(currentUrl);
+        --c;
+    }
+    StringBuffer msg("Failed to contact DFU service: { ");
+    for (auto &url: dfuServiceUrls)
+        msg.append(url.c_str());
+    msg.append("}");
+    throw makeStringException(0, msg.str());
+}
+
+// NB: no way to create grouped flat file output at the moment, but not sure would ever want to support that.
+IDFUFileAccess *createDFUFile(const char *logicalName, const char *cluster, DFUFileType type, const char *recDef, const char *requestId, unsigned expirySecs, bool compressed, IUserDescriptor *userDesc)
+{
+    assertex(userDesc);
+    StringBuffer user, password;
+    userDesc->getUserName(user);
+    userDesc->getPassword(password);
+    return createDFUFile(logicalName, cluster, type, recDef, requestId, expirySecs, compressed, user, password);
+}
+
+// wrapper to the doPublishDFUFile, that discovers and tries DFUService URL's
+void publishDFUFile(IDFUFileAccess *dfuFile, bool overwrite, const char *user, const char *password)
+{
+    unsigned currentUrl;
+    const char *espServiceUrl = advanceToNextAvailableDFUServiceURL(currentUrl);
+    unsigned c = getNumDfuServiceURL(); // max attempts
+    while (c)
+    {
+        try
+        {
+            doPublishDFUFile(espServiceUrl, dfuFile, overwrite, user, password);
+            return;
+        }
+        catch (IJSOCK_Exception *e)
+        {
+            EXCLOG(e, nullptr);
+            e->Release();
+        }
+        advanceToNextAvailableDFUServiceURL(currentUrl);
+        --c;
+    }
+    StringBuffer msg("Failed to contact DFU service: { ");
+    for (auto &url: dfuServiceUrls)
+        msg.append(url.c_str());
+    msg.append("}");
+    throw makeStringException(0, msg.str());
+}
 
-    metaInfo.append(dfuResp->getAccessInfo().getMetaInfoBlob());
-    return true;
+void publishDFUFile(IDFUFileAccess *dfuFile, bool overwrite, IUserDescriptor *userDesc)
+{
+    assertex(userDesc);
+    StringBuffer user, password;
+    userDesc->getUserName(user);
+    userDesc->getPassword(password);
+    publishDFUFile(dfuFile, overwrite, user, password);
 }
 
+
+
+/*
+ * createDFUFileAccess() and encodeDFUFileMeta() will normally be called by the DFU service
+ * via a DFS file request. So that the meta info blob can be returned to the client of the service.
+ * However, for testing purposes it's also useful to create these blobs elsewhere directly from IFileDescriptor's
+ */
+IPropertyTree *createDFUFileMetaInfo(const char *fileName, IFileDescriptor *fileDesc, const char *requestId, const char *accessType, unsigned expirySecs,
+                                     IUserDescriptor *userDesc, const char *keyPairName, unsigned port, bool secure, unsigned maxFileAccessExpirySeconds)
+{
+    /*
+     * version
+     * fileName
+     * requestId [optional]
+     * accessType [const "READ" for this method]
+     * user
+     * port (int)      // port # of dafilesrv srvice to connect to
+     * secure (bool)   // if true = SSL connection
+     * keyPairName      // name of key pair to use
+     * expiryTime      // (seconds) timeout for validity of this request
+     * jsonTypeInfo     // JSON representation of the file's record definition
+     */
+    Owned<IPropertyTree> metaInfo = createPTree();
+
+    metaInfo->setProp("logicalFilename", fileName);
+    if (!isEmptyString(requestId))
+        metaInfo->setProp("requestId", requestId);
+    metaInfo->setProp("accessType", accessType);
+    StringBuffer userStr;
+    if (userDesc)
+        metaInfo->setProp("user", userDesc->getUserName(userStr).str());
+
+    // key, port, secure
+    metaInfo->setPropInt("port", port);
+    metaInfo->setPropBool("secure", secure);
+    if (!isEmptyString(keyPairName))
+        metaInfo->setProp("keyPairName", keyPairName);
+
+    // expiry time
+    if (expirySecs > maxFileAccessExpirySeconds)
+        expirySecs = maxFileAccessExpirySeconds;
+    time_t now;
+    time(&now);
+    CDateTime expiryDt;
+    expiryDt.set(now + expirySecs);
+    StringBuffer expiryTime;
+    expiryDt.getString(expiryTime);
+    metaInfo->setProp("expiryTime", expiryTime);
+
+    // layout
+    MemoryBuffer binLayout;
+    if (getDaliLayoutInfo(binLayout, fileDesc->queryProperties()))
+        metaInfo->setPropBin("binLayout", binLayout.length(), binLayout.toByteArray());
+
+    // file meta info
+    INode *node1 = fileDesc->queryNode(0);
+    SocketEndpoint ep = node1->endpoint();
+    unsigned dafilesrvVersion = getCachedRemoteVersion(node1->endpoint(), secure);
+
+    if (dafilesrvVersion < DAFILESRV_STREAMGENERAL_MINVERSION)
+    {
+        metaInfo->setPropInt("version", 1); // legacy format
+        extractFilePartInfo(*metaInfo, *fileDesc);
+    }
+    else
+    {
+        metaInfo->setPropInt("version", DAFILESRV_METAINFOVERSION);
+        IPropertyTree *fileInfoTree = metaInfo->setPropTree("FileInfo");
+        fileDesc->serializeTree(*fileInfoTree);
+    }
+    return metaInfo.getClear();
+}
+
+StringBuffer &encodeDFUFileMeta(StringBuffer &metaInfoBlob, IPropertyTree *metaInfo, IConstEnvironment *environment)
+{
+    MemoryBuffer metaInfoMb;
+
+    /* NB: If file access security is disabled in the environment, or on a per cluster basis
+     * keyPairName will be blank. In that case the meta data is returned in plain format.
+     * NB2: Dafilesrv's would also require file access security to be disabled in that case,
+     * otherwise they will be denied access.
+     * Should be part of the same configuration setup.
+     */
+#ifdef _USE_OPENSSL
+    if (metaInfo->hasProp("keyPairName") && environment) // without it, meta data is not encrypted
+    {
+        MemoryBuffer metaInfoBlob;
+        metaInfo->serialize(metaInfoBlob);
+
+        const char *keyPairName = metaInfo->queryProp("keyPairName");
+        const char *privateKeyFName = environment->getPrivateKeyPath(keyPairName);
+        Owned<CLoadedKey> privateKey = loadPrivateKeyFromFile(privateKeyFName, nullptr);
+        StringBuffer metaInfoSignature;
+        digiSign(metaInfoSignature, metaInfoBlob.length(), metaInfoBlob.bytes(), *privateKey);
+
+        Owned<IPropertyTree> metaInfoEnvelope = createPTree();
+        metaInfoEnvelope->setProp("signature", metaInfoSignature);
+        metaInfoEnvelope->setPropBin("metaInfoBlob", metaInfoBlob.length(), metaInfoBlob.bytes());
+        metaInfoEnvelope->serialize(metaInfoMb.clear());
+    }
+    else
+#endif
+        metaInfo->serialize(metaInfoMb);
+
+    MemoryBuffer compressedMetaInfoMb;
+    fastLZCompressToBuffer(compressedMetaInfoMb, metaInfoMb.length(), metaInfoMb.bytes());
+    JBASE64_Encode(compressedMetaInfoMb.bytes(), compressedMetaInfoMb.length(), metaInfoBlob, false);
+    return metaInfoBlob;
+}
+
+
+
 } // namespace wsdfuaccess
+
+
+#ifdef _USE_CPPUNIT
+#include "unittests.hpp"
+#include "sockfile.hpp"
+#include "rmtfile.hpp"
+#include "dafscommon.hpp"
+#include "portlist.h"
+
+using namespace wsdfuaccess;
+class DFUAccessTests : public CppUnit::TestFixture
+{
+    CPPUNIT_TEST_SUITE(DFUAccessTests);
+        CPPUNIT_TEST(testStartServer);
+        CPPUNIT_TEST(testDaFsStreamingStd);
+        CPPUNIT_TEST(testDaFsStreamingCompressed);
+        CPPUNIT_TEST(testDaFsStreamingGrouped);
+        CPPUNIT_TEST(testDaFsStreamingCompressedAndGrouped);
+        CPPUNIT_TEST(testFinish);
+   CPPUNIT_TEST_SUITE_END();
+
+   unsigned serverPort = DAFILESRV_PORT+1; // do not use standard port, which if in a URL will be converted to local path if IP is local
+   StringBuffer basePath;
+   Owned<CSimpleInterface> serverThread;
+   Owned<IFileDescriptor> fileDesc;
+protected:
+    void testStartServer()
+    {
+        Owned<ISocket> socket;
+
+        unsigned endPort = MP_END_PORT;
+        while (1)
+        {
+            try
+            {
+                socket.setown(ISocket::create(serverPort));
+                break;
+            }
+            catch (IJSOCK_Exception *e)
+            {
+                if (e->errorCode() != JSOCKERR_port_in_use)
+                {
+                    StringBuffer eStr;
+                    e->errorMessage(eStr);
+                    e->Release();
+                    CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
+                }
+                else if (serverPort == endPort)
+                {
+                    e->Release();
+                    CPPUNIT_ASSERT_MESSAGE("Could not find a free port to use for remote file server", 0);
+                }
+            }
+            ++serverPort;
+        }
+
+        basePath.append("//");
+        SocketEndpoint ep(serverPort);
+        ep.getUrlStr(basePath);
+
+        char cpath[_MAX_DIR];
+        if (!GetCurrentDirectory(_MAX_DIR, cpath))
+            CPPUNIT_ASSERT_MESSAGE("Current directory path too big", 0);
+        else
+            basePath.append(cpath);
+        addPathSepChar(basePath);
+
+        class CServerThread : public CSimpleInterface, implements IThreaded
+        {
+            CThreaded threaded;
+            Owned<IRemoteFileServer> server;
+            Linked<ISocket> socket;
+        public:
+            CServerThread(IRemoteFileServer *_server, ISocket *_socket) : server(_server), socket(_socket), threaded("CServerThread")
+            {
+                threaded.init(this);
+            }
+            ~CServerThread()
+            {
+                threaded.join();
+            }
+        // IThreaded
+            virtual void threadmain() override
+            {
+                DAFSConnectCfg sslCfg = SSLNone;
+                server->run(sslCfg, socket, nullptr, nullptr);
+            }
+        };
+        enableDafsAuthentication(false);
+        Owned<IRemoteFileServer> server = createRemoteFileServer();
+        serverThread.setown(new CServerThread(QUERYINTERFACE(server.getClear(), IRemoteFileServer), socket.getClear()));
+    }
+    void testDaFsStreaming(bool compressed, bool grouped)
+    {
+        configureRemoteCreateFileDescriptorCB(queryFileDescriptorFactory());
+
+        const char *thorInstance = "mythor";
+        const char *groupName = thorInstance;
+        const char *fname = ".::dfuaccess::testfname1";
+        IUserDescriptor *userDesc = nullptr;
+        const char *keyPairName = nullptr;
+        unsigned port = 0;
+        bool secure = false;
+        unsigned expiryTime = 60;
+        unsigned maxFileAccessExpirySeconds = 300;
+
+        unsigned numRecsInTest = 1000;
+
+        const char *eclRecDef = "{ string5 f1; string10 f2; };";
+        size32_t fixedRecSize = 15;
+
+        fileDesc.setown(createFileDescriptor());
+
+        GroupType groupType;
+        StringBuffer basedir;
+
+        SocketEndpointArray eps;
+        SocketEndpoint ep(".", serverPort);
+        eps.append(ep);
+        Owned<IGroup> group = createIGroup(eps);
+
+        fileDesc.setown(createFileDescriptor(fname, "thor", "mythor", group));
+        fileDesc->queryProperties().setProp("ECL", eclRecDef);
+        if (grouped)
+            fileDesc->queryProperties().setPropBool("@grouped", true);
+
+        Owned<IPropertyTree> metaInfo = createDFUFileMetaInfo(fname, fileDesc, "cppunit-test1", "WRITE", 30,
+                                                              userDesc, keyPairName, port, secure, maxFileAccessExpirySeconds);
+        StringBuffer metaInfoBlob;
+        encodeDFUFileMeta(metaInfoBlob, metaInfo, nullptr);
+
+        Owned<IDFUFileAccess> newFile = createDFUFileAccess(metaInfoBlob);
+        newFile->setStreamReplyLimitK(1); // set a low limit to force testing of continuation etc.
+
+        if (compressed)
+            newFile->setFileOption(dfo_compressedRemoteStreams);
+        else
+            newFile->clearFileOption(dfo_compressedRemoteStreams);
+
+        CRC32 writeCrc32;
+        // write
+        unsigned n = newFile->queryNumParts();
+        for (unsigned p=0; p<n; p++)
+        {
+            Owned<IDFUFilePartWriter> writer = newFile->createFilePartWriter(p);
+            writer->start();
+
+            for (unsigned r=0; r<numRecsInTest; r++)
+            {
+                VStringBuffer rowData("%5u%10u", r, r);
+                writer->write(fixedRecSize, rowData.str());
+                writeCrc32.tally(fixedRecSize, rowData.str());
+                if (grouped)
+                    writer->write(0, nullptr); // eog
+            }
+        }
+        newFile->setFilePropertyInt("@recordCount", numRecsInTest);
+
+        // publish would normally happen here, but this unittest is self-contained (no esp etc.)
+
+
+        CRC32 readCrc32;
+        // read back
+        for (unsigned p=0; p<n; p++)
+        {
+            Owned<IDFUFilePartReader> reader = newFile->createFilePartReader(p, 0, nullptr, true);
+            reader->start();
+
+            for (unsigned r=0; r<numRecsInTest; r++)
+            {
+                size32_t sz;
+                const void *row = reader->nextRow(sz);
+                if (!row)
+                {
+                    row = reader->nextRow(sz);
+                    assertex(row);
+                }
+                readCrc32.tally(sz, row);
+#ifdef _DEBUG
+                printf("%.*s%.*s\n", 5, (const char *)row, 10, ((const char *)row)+5);
+#endif
+            }
+        }
+        if (writeCrc32.get() != readCrc32.get())
+        {
+            VStringBuffer errMsg("DFU write/read test: crc's don't match. Write crc=%x, read crc=%x", writeCrc32.get(), readCrc32.get());
+            CPPUNIT_ASSERT_MESSAGE(errMsg.str(), 0);
+        }
+    }
+    void testDaFsStreamingStd()
+    {
+        testDaFsStreaming(false, false);
+    }
+    void testDaFsStreamingCompressed()
+    {
+        testDaFsStreaming(true, false);
+    }
+    void testDaFsStreamingGrouped()
+    {
+        testDaFsStreaming(false, true);
+    }
+    void testDaFsStreamingCompressedAndGrouped()
+    {
+        testDaFsStreaming(true, true);
+    }
+    void testFinish()
+    {
+        // clearup
+        if (fileDesc)
+        {
+            RemoteFilename rfn;
+            fileDesc->getFilename(0, 0, rfn);
+            StringBuffer path;
+            rfn.getPath(path);
+            Owned<IFile> iFile = createIFile(path);
+            iFile->remove();
+        }
+
+        SocketEndpoint ep(serverPort);
+        Owned<ISocket> sock = ISocket::connect_timeout(ep, 60 * 1000);
+        CPPUNIT_ASSERT(RFEnoerror == stopRemoteServer(sock));
+
+        serverThread.clear();
+    }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION( DFUAccessTests );
+CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( DFUAccessTests, "DFUAccessTests" );
+
+
+#endif // _USE_CPPUNIT
+

+ 23 - 1
esp/clients/wsdfuaccess/wsdfuaccess.hpp

@@ -28,13 +28,35 @@
 
 #endif
 
+#include "dafsstream.hpp"
 
 class StringBuffer;
+interface IUserDescriptor;
+interface IFileDescriptor;
+interface IConstEnvironment;
 
+using dafsstream::IDFUFileAccess;
+using dafsstream::DFUFileType;
 namespace wsdfuaccess
 {
 
-WSDFUACCESS_API bool getFileAccess(StringBuffer &metaInfo, const char *serviceUrl, const char *jobId, const char *logicalName, SecAccessFlags access, unsigned expirySecs, const char *user, const char *token);
+WSDFUACCESS_API IDFUFileAccess *lookupDFUFile(const char *logicalName, const char *requestId, unsigned expirySecs, const char *user, const char *password);
+WSDFUACCESS_API IDFUFileAccess *lookupDFUFile(const char *logicalName, const char *requestId, unsigned expirySecs, IUserDescriptor *userDesc);
+
+WSDFUACCESS_API IDFUFileAccess *createDFUFile(const char *logicalName, const char *cluster, DFUFileType type, const char *recDef, const char *requestId, unsigned expirySecs, bool compressed, const char *user, const char *password);
+WSDFUACCESS_API IDFUFileAccess *createDFUFile(const char *logicalName, const char *cluster, DFUFileType type, const char *recDef, const char *requestId, unsigned expirySecs, bool compressed, IUserDescriptor *userDesc);
+
+WSDFUACCESS_API void publishDFUFile(IDFUFileAccess *dfuFile, bool overwrite, IUserDescriptor *userDesc);
+WSDFUACCESS_API void publishDFUFile(IDFUFileAccess *dfuFile, bool overwrite, const char *user, const char *password);
+
+/* createDFUFileAccess() and encodeDFUFileMeta() will normally be called by the DFU service
+ * via a DFS file request. So that the meta info blob can be returned to the client of the service.
+ * However, for testing purposes it's also useful to create these blobs elsewhere directly from IFileDescriptor's
+ */
+WSDFUACCESS_API IPropertyTree *createDFUFileMetaInfo(const char *fileName, IFileDescriptor *fileDesc, const char *requestId, const char *accessType, unsigned expirySecs,
+                                                     IUserDescriptor *userDesc, const char *keyPairName, unsigned port, bool secure, unsigned maxFileAccessExpirySeconds);
+WSDFUACCESS_API StringBuffer &encodeDFUFileMeta(StringBuffer &metaInfoBlob, IPropertyTree *metaInfo, IConstEnvironment *environment);
+
 
 } // end of namespace wsdfuaccess
 

+ 65 - 15
esp/scm/ws_dfu.ecm

@@ -835,6 +835,7 @@ ESPresponse [exceptions_inline, nil_remove] EraseHistoryResponse
     ESParray<ESPStruct History, Origin> History;
 };
 
+// NB: DFUFileAccessRequestBase - depr_ver("1.50")
 ESPStruct DFUFileAccessRequestBase
 {
     string Name;
@@ -847,11 +848,21 @@ ESPStruct DFUFileAccessRequestBase
     bool ReturnBinTypeInfo(false);
 };
 
+// NB: DFUFileAccessRequestBase - depr_ver("1.50")
 ESPrequest DFUFileAccessRequest
 {
     EspStruct DFUFileAccessRequestBase RequestBase;
 };
 
+ESPrequest DFUFileAccessV2Request
+{
+    string Name;                        // the logical file name
+    string Cluster;                     // needed only if on >1 Cluster and want to disambiguate
+    string RequestId;                   // optional
+    int ExpirySeconds(60);              // how long the retrieved meta info is valid for, to access physical files (in dafilesrv)
+    bool ReturnTextResponse(false);     // Returns response in full not just in blob    
+};
+
 ESPStruct DFUPartLocation
 {
     int LocationIndex;
@@ -873,25 +884,42 @@ ESPStruct DFUFilePart
 
 ESPStruct DFUFileAccessInfo
 {
-    string MetaInfoBlob;
-    string ExpiryTime;
-    // {NumParts, FileParts} depend on ReturnFileInfo in request
-    int NumParts;        // number of parts in logical file
-    ESParray<EspStruct DFUPartLocation> FileLocations;
-    ESParray<EspStruct DFUFilePart> FileParts;
+    string MetaInfoBlob;                               // Contains serialized from of all meta data required + signature if security enabled
+    string ExpiryTime;                                 // After this time, the meta information cannot be used to communicate with dafilesrv's 
+    int NumParts;                                      // number of parts in logical file
+    ESParray<EspStruct DFUPartLocation> FileLocations; // List of hosts of parts  
+    ESParray<EspStruct DFUFilePart> FileParts;         // File parts. NB: these reference (index) the FileLocations 
     
-    binary RecordTypeInfoBin;   // optional
-    string RecordTypeInfoJson;  // optional
+    [depr_ver("1.50")] binary RecordTypeInfoBin;       // Binary encoded type info (also contained in metablob)
+    string RecordTypeInfoJson;                         // JSON encoded type info
     
-    int fileAccessPort;
-    bool fileAccessSSL;
+    int fileAccessPort;                                // dafilesrv port
+    bool fileAccessSSL;                                // use secure SSL?
 };
 
 ESPresponse [exceptions_inline] DFUFileAccessResponse
 {
-    EspStruct DFUFileAccessInfo AccessInfo;
+    /* AccessInfo.MetaInfoBlob contains everything, inc. signature.
+     * We want like this (and not as separate EspStruct) access members,
+     * because:
+     * a) it is more efficient for the factoried object to decode the bare bones meta info.
+     * b) the object can efficiently be serialized and passed to others (e.g. thormaster to slaves, to dafilesrv)
+     *
+     * NB: The other DFUFileAccessInfo fields are only returned if ReturnTextResponse is true in the request 
+     */
+    EspStruct DFUFileAccessInfo AccessInfo; 
+};
+
+
+ESPenum DFUFileType : string
+{
+    Flat("Flat"),
+    Index("Index"),
+    Xml("Xml"),
+    Csv("Csv"),
 };
 
+
 ESPrequest [nil_remove] DFUFileCreateRequest
 {
     string ECLRecordDefinition;
@@ -899,6 +927,18 @@ ESPrequest [nil_remove] DFUFileCreateRequest
     EspStruct DFUFileAccessRequestBase RequestBase;
 };
 
+ESPrequest [nil_remove] DFUFileCreateV2Request
+{
+    string Name;                        // the logical file name
+    string Cluster;                     // needed only if on >1 Cluster and want to disambiguate
+    ESPenum DFUFileType Type;           // e.g. flat, csv, xml
+    string ECLRecordDefinition;                           // ecl text record definition
+    string RequestId;                   // optional 
+    int ExpirySeconds(60);              // how long the retrieved meta info is valid for, to access physical files (in dafilesrv) 
+    bool ReturnTextResponse(false);     // Returns response in full not just in blob
+    bool Compressed(false);             // The new file will be compressed on disk    
+};
+
 ESPresponse [exceptions_inline, nil_remove] DFUFileCreateResponse
 {
     string FileId;
@@ -909,6 +949,10 @@ ESPresponse [exceptions_inline, nil_remove] DFUFileCreateResponse
 ESPrequest [nil_remove] DFUFilePublishRequest
 {
     string FileId;
+    [min_ver("1.50")] bool Overwrite;
+    [min_ver("1.50")] binary FileDescriptorBlob; // optional (see below) 
+
+// if FileDescriptorBlob supplied, these are optional       
     string ECLRecordDefinition;
     int64 RecordCount;
     int64 FileSize;
@@ -921,7 +965,8 @@ ESPresponse [exceptions_inline, nil_remove] DFUFilePublishResponse
 //  ===========================================================================
 ESPservice [
     auth_feature("DEFERRED"),
-    version("1.41"),
+    version("1.50"),
+    default_client_version("1.50"),
     noforms,
     exceptions_inline("./smc_xslt/exceptions.xslt")] WsDfu
 {
@@ -950,9 +995,14 @@ ESPservice [
     ESPmethod DFURecordTypeInfo(DFURecordTypeInfoRequest, DFURecordTypeInfoResponse);
     ESPmethod EclRecordTypeInfo(EclRecordTypeInfoRequest, EclRecordTypeInfoResponse);
 
-    ESPmethod [auth_feature("DfuAccess:READ"), min_var("1.39")] DFUFileAccess(DFUFileAccessRequest, DFUFileAccessResponse);
-    ESPmethod [auth_feature("DfuAccess:FULL"), min_var("1.39")] DFUFileCreate(DFUFileCreateRequest, DFUFileCreateResponse);
-    ESPmethod [auth_feature("DfuAccess:FULL"), min_var("1.39")] DFUFilePublish(DFUFilePublishRequest, DFUFilePublishResponse);
+    ESPmethod [auth_feature("DfuAccess:FULL"), min_ver("1.39")] DFUFilePublish(DFUFilePublishRequest, DFUFilePublishResponse);
+
+    ESPmethod [auth_feature("DfuAccess:READ"), min_ver("1.50")] DFUFileAccessV2(DFUFileAccessV2Request, DFUFileAccessResponse);
+    ESPmethod [auth_feature("DfuAccess:FULL"), min_ver("1.50")] DFUFileCreateV2(DFUFileCreateV2Request, DFUFileCreateResponse);
+
+// NB: these methods are deprecated from ver >= 1.50
+    ESPmethod [auth_feature("DfuAccess:READ"), min_ver("1.39")] DFUFileAccess(DFUFileAccessRequest, DFUFileAccessResponse);
+    ESPmethod [auth_feature("DfuAccess:FULL"), min_ver("1.39")] DFUFileCreate(DFUFileCreateRequest, DFUFileCreateResponse);
 };
 
 SCMexportdef(WSDFU);

+ 1 - 0
esp/services/ws_dfu/CMakeLists.txt

@@ -102,6 +102,7 @@ target_link_libraries ( ws_dfu
          dfuXRefLib 
          dfuwu 
          roxiecommlib 
+         wsdfuaccess
     )
 
 IF (USE_OPENSSL)

+ 381 - 311
esp/services/ws_dfu/ws_dfuService.cpp

@@ -45,12 +45,11 @@
 #include "dfuwu.hpp"
 #include "fverror.hpp"
 #include "nbcd.hpp"
+#include "thorcommon.hpp"
 
 #include "jstring.hpp"
 #include "exception_util.hpp"
 
-#include "ws_dfuService.hpp"
-
 #include "hqlerror.hpp"
 #include "hqlexpr.hpp"
 #include "hqlutil.hpp"
@@ -61,6 +60,11 @@
 #include "jflz.hpp"
 #include "digisign.hpp"
 
+#include "wsdfuaccess/wsdfuaccess.hpp"
+
+#include "ws_dfuService.hpp"
+
+using namespace wsdfuaccess;
 using namespace cryptohelper;
 
 
@@ -85,6 +89,7 @@ static const char *DFUFileIdSeparator = "|";
 static const char *DFUFileCreate_FileNamePostfix = ".wsdfucreate.tmp";
 static const char *DFUFileCreate_GroupNamePrefix = "wsdfucreate";
 static const char *ConfigurationDirectoryForDataCategory = "data";
+static std::atomic<unsigned> dfuCreateUniqId{1};
 
 const unsigned NODE_GROUP_CACHE_DEFAULT_TIMEOUT = 30*60*1000; //30 minutes
 
@@ -5887,7 +5892,7 @@ int CWsDfuEx::GetIndexData(IEspContext &context, bool bSchemaOnly, const char* i
     return iRet;
 }
 
-void CWsDfuEx::getFilePartsInfo(IEspContext &context, IFileDescriptor *fdesc, unsigned numParts, bool forFileCreate, IEspDFUFileAccessInfo &accessInfo)
+void CWsDfuEx::getFilePartsInfo(IEspContext &context, IFileDescriptor &fileDesc, bool forFileCreate, IEspDFUFileAccessInfo &accessInfo)
 {
     IArrayOf<IEspDFUFilePart> dfuParts;
     IArrayOf<IEspDFUPartLocation> dfuPartLocations;
@@ -5895,7 +5900,7 @@ void CWsDfuEx::getFilePartsInfo(IEspContext &context, IFileDescriptor *fdesc, un
     unsigned newLocationIndex = 0;
     MapStringTo<unsigned> partLocationMap;
     // NB: both CopyIndex and PartIndex are 1 based in response.
-    Owned<IPartDescriptorIterator> pi = fdesc->getIterator();
+    Owned<IPartDescriptorIterator> pi = fileDesc.getIterator();
     ForEach(*pi)
     {
         IPartDescriptor& part = pi->query();
@@ -5937,76 +5942,17 @@ void CWsDfuEx::getFilePartsInfo(IEspContext &context, IFileDescriptor *fdesc, un
         dfuParts.append(*filePart.getClear());
     }
 
-    accessInfo.setNumParts(numParts);
+    accessInfo.setNumParts(fileDesc.numParts());
     accessInfo.setFileParts(dfuParts);
     accessInfo.setFileLocations(dfuPartLocations);
 }
 
-static const char *securityInfoVersion="1";
-void CWsDfuEx::getFileMeta(StringBuffer &metaInfoStr, StringBuffer &expiryTime, const char *fileName,
-    IFileDescriptor *fDesc, IUserDescriptor *user, const char *jobId, const char *keyPairName, IConstDFUFileAccessRequestBase &req)
-{
-    // setup "expiryTime"
-    unsigned expirySecs = req.getExpirySeconds();
-    if (expirySecs > maxFileAccessExpirySeconds)
-        expirySecs = maxFileAccessExpirySeconds;
-    time_t now;
-    time(&now);
-    CDateTime expiryDt;
-    expiryDt.set(now + expirySecs);
-    expiryDt.getString(expiryTime);
-
-    Owned<IPropertyTree> metaInfoEnvelope = createPTree();
-    Owned<IPropertyTree> metaInfo = createPTree();
-    extractFilePartInfo(*metaInfo, *fDesc);
-
-    MemoryBuffer metaInfoMb;
-
-    /* NB: If file access security is disabled in the environment, or on a per cluster basis
-     * keyPairName will be blank. In that case the meta data is returned in plain format.
-     * NB2: Dafilesrv's would also require file access security to be disabled in that case,
-     * otherwise they will be denied access.
-     * Should be part of the same configuration setup.
-     */
-#ifdef _USE_OPENSSL
-    if (!isEmptyString(keyPairName)) // without it, meta data is not encrypted
-    {
-        metaInfo->setProp("version", securityInfoVersion);
-        metaInfo->setProp("logicalFilename", fileName);
-        metaInfo->setProp("jobId", jobId);
-        metaInfo->setProp("accessType", req.getAccessTypeAsString());
-        StringBuffer userStr;
-        if (user)
-            metaInfo->setProp("user", user->getUserName(userStr).str());
-        metaInfo->setProp("keyPairName", keyPairName);
-        metaInfo->setProp("expiryTime", expiryTime);
-
-        MemoryBuffer metaInfoBlob;
-        metaInfo->serialize(metaInfoBlob);
-
-        const char *privateKeyFName = env->getPrivateKeyPath(keyPairName);
-        Owned<CLoadedKey> privateKey = loadPrivateKeyFromFile(privateKeyFName, nullptr);
-        StringBuffer metaInfoSignature;
-        digiSign(metaInfoSignature, metaInfoBlob.length(), metaInfoBlob.bytes(), *privateKey);
-        metaInfoEnvelope->setProp("signature", metaInfoSignature);
-        metaInfoEnvelope->setPropBin("metaInfoBlob", metaInfoBlob.length(), metaInfoBlob.bytes());
-        metaInfoEnvelope->serialize(metaInfoMb.clear());
-    }
-    else
-#endif
-        metaInfo->serialize(metaInfoMb);
-
-    MemoryBuffer compressedMetaInfoMb;
-    fastLZCompressToBuffer(compressedMetaInfoMb, metaInfoMb.length(), metaInfoMb.bytes());
-    JBASE64_Encode(compressedMetaInfoMb.bytes(), compressedMetaInfoMb.length(), metaInfoStr, false);
-}
-
-void CWsDfuEx::getFileDafilesrvConfiguration(StringBuffer &keyPairName, unsigned &port, bool &secure, const char *cluster)
+void CWsDfuEx::getFileDafilesrvConfiguration(StringBuffer &keyPairName, unsigned &port, bool &secure, const char *group)
 {
     port = DEFAULT_ROWSERVICE_PORT;
     secure = false;
-    keyPairName.set(env->getClusterKeyPairName(cluster));
-    Owned<IConstDaFileSrvInfo> daFileSrvInfo = env->getDaFileSrvGroupInfo(cluster);
+    keyPairName.set(env->getClusterGroupKeyPairName(group));
+    Owned<IConstDaFileSrvInfo> daFileSrvInfo = env->getDaFileSrvGroupInfo(group);
     if (daFileSrvInfo)
     {
         port = daFileSrvInfo->getPort();
@@ -6014,22 +5960,20 @@ void CWsDfuEx::getFileDafilesrvConfiguration(StringBuffer &keyPairName, unsigned
     }
 }
 
-void CWsDfuEx::getFileDafilesrvConfiguration(StringBuffer &keyPairName, unsigned &retPort, bool &retSecure, IDistributedFile &file)
+void CWsDfuEx::getFileDafilesrvConfiguration(StringBuffer &keyPairName, unsigned &retPort, bool &retSecure, const char *fileName, std::vector<std::string> &groups)
 {
     retPort = DEFAULT_ROWSERVICE_PORT;
     retSecure = false;
-    unsigned numClusters = file.numClusters();
-    for (unsigned c=0; c<numClusters; c++)
+    bool firstGroup = true;
+    for (auto &group: groups)
     {
-        StringBuffer clusterName;
-        const char *cluster = file.getClusterName(c, clusterName.clear()).str();
-
         StringBuffer _keyPairName;
         unsigned port;
         bool secure;
-        getFileDafilesrvConfiguration(_keyPairName, port, secure, cluster);
-        if (0 == c)
+        getFileDafilesrvConfiguration(_keyPairName, port, secure, group.c_str());
+        if (firstGroup)
         {
+            firstGroup = false;
             keyPairName.set(_keyPairName);
             retPort = port;
             retSecure = secure;
@@ -6037,104 +5981,25 @@ void CWsDfuEx::getFileDafilesrvConfiguration(StringBuffer &keyPairName, unsigned
         else
         {
             if (!strsame(keyPairName, _keyPairName))
-                throwStringExceptionV(0, "Configuration issue - file '%s' is on multiple clusters, keys for file access must match", file.queryLogicalName());
+                throwStringExceptionV(0, "Configuration issue - file '%s' is on multiple clusters, keys for file access must match", fileName);
             if (retPort != port)
-                throwStringExceptionV(0, "Configuration issue - file '%s' is on multiple clusters, dafilesrv's port for file access must match", file.queryLogicalName());
+                throwStringExceptionV(0, "Configuration issue - file '%s' is on multiple clusters, dafilesrv's port for file access must match", fileName);
             if (retSecure != secure)
-                throwStringExceptionV(0, "Configuration issue - file '%s' is on multiple clusters, dafilesrv's security setting for file access must match", file.queryLogicalName());
+                throwStringExceptionV(0, "Configuration issue - file '%s' is on multiple clusters, dafilesrv's security setting for file access must match", fileName);
         }
     }
 }
 
-void CWsDfuEx::getFileAccess(IEspContext &context, IUserDescriptor *udesc, SecAccessFlags accessType, IConstDFUFileAccessRequestBase &req, IEspDFUFileAccessInfo &accessInfo)
+static void getJsonTypeInfo(IFileDescriptor &fileDesc, IEspDFUFileAccessInfo &accessInfo)
 {
-    bool writePermissions = (accessType == SecAccess_Write) || (accessType == SecAccess_Full);
-    bool readPermissions = true; // by implication
-
-    CDfsLogicalFileName lfn;
-    lfn.set(req.getName());
-    StringBuffer fileName = lfn.get();
-    if (!isEmptyString(req.getCluster()))
-        fileName.append("@").append(req.getCluster());
-
-    checkLogicalName(fileName, udesc, readPermissions, writePermissions, false, nullptr);
-
-    switch (accessType)
-    {
-        case SecAccess_Access:
-        case SecAccess_Read:
-            break;
-        default:
-        {
-            // NB - no handling for write/full at moment
-            return;
-        }
-    }
-    Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(fileName, udesc, false, false, true); // lock super-owners
-    if (!df)
-        throw MakeStringException(ECLWATCH_FILE_NOT_EXIST,"Cannot find file '%s'.", fileName.str());
-
-    Owned<IFileDescriptor> fileDesc = df->getFileDescriptor(req.getCluster());
-    CFileAccessRole role = req.getAccessRole();
-    switch (role)
-    {
-        case CFileAccessRole_Token:
-        {
-            break;
-        }
-        case CFileAccessRole_Engine:
-        {
-            /* JCSMORE - for now do nothing
-             * Ideally, would get the file tree here and add it to 'metaInfo' tree, i.e. outside of uncrypted secureInfo blob
-             * Then client could construct a IDistributeFile from it etc.
-             * However, the way the engines and IDistributedFile work at the moment, means that using this info
-             * at the client side would require a significant amount of refactoring of the IDistributedFile implementation.
-             * Not least because IDF allows updates via IPT -> Dali.
-             *
-             * So for now don't send anything, and rely on engine fetching the legacy way, i.e. direct from Dali, via lazy fetching etc.
-             */
-            break;
-        }
-        case CFileAccessRole_External:
-        {
-            getFilePartsInfo(context, fileDesc, df->numParts(), false, accessInfo);
-            if (req.getReturnJsonTypeInfo() || req.getReturnBinTypeInfo())
-            {
-                MemoryBuffer binLayout;
-                StringBuffer jsonLayout;
-                if (!getRecordFormatFromRtlType(binLayout, jsonLayout, df->queryAttributes(), req.getReturnBinTypeInfo(), req.getReturnJsonTypeInfo()))
-                    getRecordFormatFromECL(binLayout, jsonLayout, df->queryAttributes(), req.getReturnBinTypeInfo(), req.getReturnJsonTypeInfo());
-                if (req.getReturnJsonTypeInfo() && jsonLayout.length())
-                    accessInfo.setRecordTypeInfoJson(jsonLayout.str());
-                if (req.getReturnBinTypeInfo() && binLayout.length())
-                    accessInfo.setRecordTypeInfoBin(binLayout);
-            }
-            break;
-        }
-        default:
-            throwUnexpected();
-    }
-
-    StringBuffer keyPairName;
-    unsigned port;
-    bool secure;
-    getFileDafilesrvConfiguration(keyPairName, port, secure, *df);
-
-    StringBuffer metaInfo, expiryTime;
-    getFileMeta(metaInfo, expiryTime, fileName, fileDesc, udesc, req.getJobId(), keyPairName, req);
-    accessInfo.setMetaInfoBlob(metaInfo);
-    accessInfo.setExpiryTime(expiryTime);
-    accessInfo.setFileAccessPort(port);
-    accessInfo.setFileAccessSSL(secure);
-
-    StringBuffer userName;
-    if (udesc)
-        udesc->getUserName(userName);
-    LOG(daliAuditLogCat,",FileAccess,EspProcess,READ,%s,%s,%s,jobid=%s,expirySecs=%d", req.getCluster(),
-        userName.str(), fileName.str(), req.getJobId(), req.getExpirySeconds());
+    MemoryBuffer binLayout;
+    StringBuffer jsonLayout;
+    if (!getRecordFormatFromRtlType(binLayout, jsonLayout, fileDesc.queryProperties(), false, true))
+        getRecordFormatFromECL(binLayout, jsonLayout, fileDesc.queryProperties(), false, true);
+    if (jsonLayout.length())
+        accessInfo.setRecordTypeInfoJson(jsonLayout.str());
 }
 
-
 SecAccessFlags translateToSecAccessFlags(CSecAccessType from)
 {
     switch (from)
@@ -6153,32 +6018,100 @@ SecAccessFlags translateToSecAccessFlags(CSecAccessType from)
     }
 }
 
+void CWsDfuEx::dFUFileAccessCommon(IEspContext &context, const CDfsLogicalFileName &lfn, const char *requestId, unsigned expirySecs, bool returnTextResponse, IEspDFUFileAccessInfo &accessInfo)
+{
+    StringBuffer fileName;
+    lfn.get(fileName, false, true);
+    if (0 == fileName.length())
+         throw MakeStringException(ECLWATCH_INVALID_INPUT, "DFU File lookup: No Name defined. (requestId=%s, expirySecs=%u)", requestId, expirySecs);
+
+    StringBuffer userID;
+    context.getUserID(userID);
+
+    Owned<IUserDescriptor> userDesc;
+    if (!userID.isEmpty())
+    {
+        userDesc.setown(createUserDescriptor());
+        userDesc->set(userID.str(), context.queryPassword(), context.querySignature());
+    }
+
+    checkLogicalName(fileName, userDesc, true, false, false, nullptr); // check for read permissions
+
+    Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(fileName, userDesc, false, false, true); // lock super-owners
+    if (!df)
+        throw MakeStringException(ECLWATCH_FILE_NOT_EXIST,"Cannot find file '%s'.", fileName.str());
+
+    StringBuffer cluster;
+    lfn.getCluster(cluster);
+    Owned<IFileDescriptor> fileDesc = df->getFileDescriptor(cluster);
+
+    // NB: if file has copies on >1 cluster, they must share the same key
+
+    std::vector<std::string> groups;
+    unsigned numClusters = df->numClusters();
+    for (unsigned c=0; c<numClusters; c++)
+    {
+        StringBuffer clusterName;
+        const char *clusterGroup = df->getClusterName(c, clusterName.clear());
+        groups.push_back(clusterGroup);
+    }
+
+    StringBuffer keyPairName;
+    unsigned port;
+    bool secure;
+    getFileDafilesrvConfiguration(keyPairName, port, secure, fileName, groups);
+
+    Owned<IPropertyTree> metaInfo = createDFUFileMetaInfo(fileName, fileDesc, requestId, "READ", expirySecs, userDesc, keyPairName, port, secure, maxFileAccessExpirySeconds);
+    StringBuffer metaInfoBlob;
+    encodeDFUFileMeta(metaInfoBlob, metaInfo, env);
+    accessInfo.setMetaInfoBlob(metaInfoBlob);
+
+    if (returnTextResponse)
+    {
+        getFilePartsInfo(context, *fileDesc, false, accessInfo);
+        getJsonTypeInfo(*fileDesc, accessInfo);
+
+        accessInfo.setExpiryTime(metaInfo->queryProp("expiryTime"));
+        accessInfo.setFileAccessPort(metaInfo->getPropInt("port"));
+        accessInfo.setFileAccessSSL(metaInfo->getPropBool("secure"));
+    }
+
+    LOG(daliAuditLogCat,",FileAccess,EspProcess,READ,%s,%s,%s,jobid=%s,expirySecs=%d", cluster.str(), userID.str(), fileName.str(), requestId, expirySecs);
+}
+
+// NB: deprecated from ver >= 1.50
 bool CWsDfuEx::onDFUFileAccess(IEspContext &context, IEspDFUFileAccessRequest &req, IEspDFUFileAccessResponse &resp)
 {
     try
     {
         IConstDFUFileAccessRequestBase &requestBase = req.getRequestBase();
-        SecAccessFlags accessType = translateToSecAccessFlags(requestBase.getAccessType());
-        if (SecAccess_None == accessType)
-        {
-            context.setAuthStatus(AUTH_STATUS_NOACCESS);
-            throw MakeStringException(ECLWATCH_DFU_ACCESS_DENIED, "WsDfu::DFUFileAccess - Permission denied.");
-        }
-        context.ensureFeatureAccess(FEATURE_URL, accessType, ECLWATCH_DFU_ACCESS_DENIED, "WsDfu::DFUFileAccess: Permission denied.");
 
-        if (isEmptyString(requestBase.getName()))
-             throw MakeStringException(ECLWATCH_INVALID_INPUT, "No Name defined.");
+        bool returnTextResponse = CFileAccessRole_External == requestBase.getAccessRole();
 
-        StringBuffer userID;
-        context.getUserID(userID);
+        CDfsLogicalFileName lfn;
+        lfn.set(requestBase.getName());
+        lfn.setCluster(requestBase.getCluster());
 
-        Owned<IUserDescriptor> userDesc;
-        if (!userID.isEmpty())
-        {
-            userDesc.setown(createUserDescriptor());
-            userDesc->set(userID.str(), context.queryPassword(), context.querySignature());
-        }
-        getFileAccess(context, userDesc, accessType, requestBase, resp.updateAccessInfo());
+        IEspDFUFileAccessInfo &accessInfo = resp.updateAccessInfo();
+        dFUFileAccessCommon(context, lfn, requestBase.getJobId(), requestBase.getExpirySeconds(), returnTextResponse, accessInfo);
+    }
+    catch (IException *e)
+    {
+        FORWARDEXCEPTION(context, e,  ECLWATCH_INTERNAL_ERROR);
+    }
+    return true;
+}
+
+bool CWsDfuEx::onDFUFileAccessV2(IEspContext &context, IEspDFUFileAccessV2Request &req, IEspDFUFileAccessResponse &resp)
+{
+    try
+    {
+        CDfsLogicalFileName lfn;
+        lfn.set(req.getName());
+        lfn.setCluster(req.getCluster());
+
+        IEspDFUFileAccessInfo &accessInfo = resp.updateAccessInfo();
+        dFUFileAccessCommon(context, lfn, req.getRequestId(), req.getExpirySeconds(), req.getReturnTextResponse(), accessInfo);
     }
     catch (IException *e)
     {
@@ -6187,14 +6120,15 @@ bool CWsDfuEx::onDFUFileAccess(IEspContext &context, IEspDFUFileAccessRequest &r
     return true;
 }
 
-IGroup *CWsDfuEx::getDFUFileIGroup(const char *clusterName, ClusterType clusterType, const char *clusterTypeEx, StringArray &locations, StringBuffer &groupName)
+// NB: deprecated from ver >= 1.50
+static IGroup *getDFUFileIGroup(const char *clusterName, ClusterType clusterType, const char *clusterTypeEx, StringArray &locations, StringBuffer &groupName)
 {
     GroupType groupType;
     StringBuffer basedir;
     getClusterGroupName(groupName, clusterName);
     Owned<IGroup> groupFound = queryNamedGroupStore().lookup(groupName.str(), basedir, groupType);
     if (!groupFound)
-        throw MakeStringException(ECLWATCH_INVALID_INPUT, "Failed to get Group for Group %s.", groupName.str());
+        throw MakeStringException(ECLWATCH_INVALID_INPUT, "getDFUFileIGroup: Failed to get Group for Group %s.", groupName.str());
 
     if (!locations.ordinality())
         return groupFound.getClear();
@@ -6205,7 +6139,7 @@ IGroup *CWsDfuEx::getDFUFileIGroup(const char *clusterName, ClusterType clusterT
     {
         SocketEndpoint ep(locations.item(i));
         if (ep.isNull())
-            throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid location '%s'.", locations.item(i));
+            throw MakeStringException(ECLWATCH_INVALID_INPUT, "getDFUFileIGroup: Invalid location '%s'.", locations.item(i));
         epa.append(ep);
         locationStr.append(locations.item(i)).append(";");
     }
@@ -6243,7 +6177,7 @@ IGroup *CWsDfuEx::getDFUFileIGroup(const char *clusterName, ClusterType clusterT
     {
         StringBuffer defaultDir;
         if (!getConfigurationDirectory(nullptr, ConfigurationDirectoryForDataCategory, clusterTypeEx, groupName.str(), defaultDir))
-            throw MakeStringException(ECLWATCH_INVALID_INPUT, "Failed to get ConfigurationDirectory: %s.", groupName.str());
+            throw MakeStringException(ECLWATCH_INVALID_INPUT, "getDFUFileIGroup: Failed to get ConfigurationDirectory: %s.", groupName.str());
 
         GroupType grpType = (clusterType == ThorLCRCluster) ? grp_thor : ((clusterType == HThorCluster) ? grp_hthor : grp_roxie);
         queryNamedGroupStore().add(groupName.str(), group, false, defaultDir.str(), grpType);
@@ -6259,128 +6193,210 @@ void CWsDfuEx::exportRecordDefinitionBinaryType(const char *recordDefinition, Me
     if (errs.errCount() > 0)
     {
         StringBuffer errorMsg;
-        throw MakeStringException(ECLWATCH_INVALID_INPUT, "Failed in parsing ECL %s: %s.", recordDefinition, errs.toString(errorMsg).str());
+        throw MakeStringException(ECLWATCH_INVALID_INPUT, "exportRecordDefinitionBinaryType: Failed in parsing ECL %s: %s.", recordDefinition, errs.toString(errorMsg).str());
     }
     if (!expr)
-        throw MakeStringException(ECLWATCH_INVALID_INPUT, "Failed in parsing ECL: %s.", recordDefinition);
+        throw MakeStringException(ECLWATCH_INVALID_INPUT, "exportRecordDefinitionBinaryType: Failed in parsing ECL: %s.", recordDefinition);
     
     if (!exportBinaryType(layoutBin, expr, false))
-        throw MakeStringException(ECLWATCH_INVALID_INPUT, "Failed in exportBinaryType.");
-}
-
-void CWsDfuEx::getFileAccessBeforePublish(IEspContext &context, const char *fileName, const char *cluster,
-    const char *jobId, MemoryBuffer& layoutBin, IFileDescriptor *fileDesc, IUserDescriptor *udesc,
-    IConstDFUFileAccessRequestBase &req, IEspDFUFileCreateResponse &resp)
-{
-    IEspDFUFileAccessInfo &accessInfo = resp.updateAccessInfo();
-    getFilePartsInfo(context, fileDesc, fileDesc->numParts(), true, accessInfo);
-
-    StringBuffer keyPairName;
-    unsigned port;
-    bool secure;
-    getFileDafilesrvConfiguration(keyPairName, port, secure, cluster);
-
-    StringBuffer metaInfo, expiryTime;
-    getFileMeta(metaInfo, expiryTime, fileName, fileDesc, udesc, jobId, keyPairName, req);
-    accessInfo.setMetaInfoBlob(metaInfo);
-    accessInfo.setExpiryTime(expiryTime);
-
-    if (layoutBin.length() == 0)
-    {
-        if (!req.getReturnJsonTypeInfo() && !req.getReturnBinTypeInfo())
-            return;
-        resp.setWarning("RecordTypeInfoBin or RecordTypeInfoJson requested, but no type info found from ECLRecordDefinition");
-    }
-
-    if (req.getReturnJsonTypeInfo())
-    {
-        StringBuffer jsonLayout;
-        Owned<IRtlFieldTypeDeserializer> deserializer(createRtlFieldTypeDeserializer());
-        const RtlTypeInfo *typeInfo = deserializer->deserialize(layoutBin);
-        dumpTypeInfo(jsonLayout, typeInfo);
-        if (jsonLayout.length())
-            accessInfo.setRecordTypeInfoJson(jsonLayout.str());
-    }
-    if (req.getReturnBinTypeInfo())
-    {
-        MemoryBuffer binLayout;
-        layoutBin.swapWith(binLayout);
-        if (binLayout.length())
-            accessInfo.setRecordTypeInfoBin(binLayout);
-    }
+        throw MakeStringException(ECLWATCH_INVALID_INPUT, "exportRecordDefinitionBinaryType: Failed in exportBinaryType.");
 }
 
+// NB: deprecated from ver >= 1.50
 bool CWsDfuEx::onDFUFileCreate(IEspContext &context, IEspDFUFileCreateRequest &req, IEspDFUFileCreateResponse &resp)
 {
     try
     {
         IConstDFUFileAccessRequestBase &requestBase = req.getRequestBase();
-        SecAccessFlags accessType = translateToSecAccessFlags(requestBase.getAccessType());
-        if (SecAccess_None == accessType)
-        {
-            context.setAuthStatus(AUTH_STATUS_NOACCESS);
-            throw MakeStringException(ECLWATCH_DFU_ACCESS_DENIED, "WsDfu::DFUFileCreate - Permission denied.");
-        }
-        context.ensureFeatureAccess(FEATURE_URL, accessType, ECLWATCH_DFU_ACCESS_DENIED, "WsDfu::DFUFileCreate: Permission denied.");
-
         const char *fileName = requestBase.getName();
         const char *clusterName = requestBase.getCluster();
         const char *recordDefinition = req.getECLRecordDefinition();
+        StringBuffer requestId = requestBase.getJobId();
+        unsigned expirySecs = requestBase.getExpirySeconds();
+        bool returnTextResponse = true;
+
         if (isEmptyString(fileName))
-             throw MakeStringException(ECLWATCH_INVALID_INPUT, "No Name defined.");
+             throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "DFUFileCreate: No Name defined.");
         if (isEmptyString(clusterName))
-             throw MakeStringException(ECLWATCH_INVALID_INPUT, "No Cluster defined.");
+             throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "DFUFileCreate: No Cluster defined.");
         if (isEmptyString(recordDefinition))
-             throw MakeStringException(ECLWATCH_INVALID_INPUT, "No ECLRecordDefinition defined.");
+             throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "DFUFileCreate: No ECLRecordDefinition defined.");
+
+        StringBuffer userId;
+        context.getUserID(userId);
+        Owned<IUserDescriptor> userDesc;
+        if (!userId.isEmpty())
+        {
+            userDesc.setown(createUserDescriptor());
+            userDesc->set(userId.str(), context.queryPassword(), context.querySignature());
+        }
 
         ClusterType clusterType = getClusterTypeByClusterName(clusterName);
         if (clusterType == NoCluster)
-             throw MakeStringException(ECLWATCH_INVALID_INPUT, "Cluster %s not found.", clusterName);
+             throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "DFUFileCreate: Cluster %s not found.", clusterName);
 
-        const char *clusterTypeEx = clusterTypeString(clusterType, false);
+        CDfsLogicalFileName lfn;
+        lfn.set(fileName);
+        StringBuffer normalizedFileName = lfn.get();
 
+        checkLogicalName(normalizedFileName, userDesc, false, true, false, nullptr);
+
+        const char *clusterTypeEx = clusterTypeString(clusterType, false);
         StringBuffer groupName;
         Owned<IGroup> group = getDFUFileIGroup(clusterName, clusterType, clusterTypeEx, req.getPartLocations(), groupName);
 
+        StringBuffer tempFileName = normalizedFileName;
+        tempFileName.append(DFUFileCreate_FileNamePostfix);
+
+        //create FileId
+        StringBuffer fileId;
+        fileId.set(groupName.str()).append(DFUFileIdSeparator).append(clusterName).append(DFUFileIdSeparator).append(tempFileName.str());
+        resp.setFileId(fileId.str());
+
+        if (requestId.isEmpty())
+            requestId.appendf("Create %s on %s", normalizedFileName.str(), clusterName);
+
+        Owned<IFileDescriptor> fileDesc = createFileDescriptor(tempFileName, clusterTypeString(clusterType, false), groupName, group);
+        fileDesc->queryProperties().setProp("@job", requestId);
+        if (!userId.isEmpty())
+            fileDesc->queryProperties().setProp("@owner", userId);
+
         MemoryBuffer layoutBin;
         exportRecordDefinitionBinaryType(recordDefinition, layoutBin);
+        if (0 == layoutBin.length())
+            throw makeStringExceptionV(ECLWATCH_INVALID_ECLRECDEF, "DFUFileCreate: Failed to parse ECL record definition: %s.", recordDefinition);
+        fileDesc->queryProperties().setPropBin("_rtlType", layoutBin.length(), layoutBin.toByteArray());
+        fileDesc->queryProperties().setProp("ECL", recordDefinition);
+
+        // NB: if file has copies on >1 cluster, they must share the same key
+        StringBuffer keyPairName;
+        unsigned port;
+        bool secure;
+
+        std::vector<std::string> groups;
+        groups.push_back(groupName.str());
+        getFileDafilesrvConfiguration(keyPairName, port, secure, normalizedFileName, groups);
+
+        Owned<IPropertyTree> metaInfo = createDFUFileMetaInfo(tempFileName, fileDesc, requestId, "WRITE", expirySecs, userDesc, keyPairName, port, secure, maxFileAccessExpirySeconds);
+        metaInfo->setProp("clusterName", clusterName);
+
+        StringBuffer metaInfoBlob;
+        encodeDFUFileMeta(metaInfoBlob, metaInfo, env);
+
+        IEspDFUFileAccessInfo &accessInfo = resp.updateAccessInfo();
+        accessInfo.setMetaInfoBlob(metaInfoBlob);
+
+        getFilePartsInfo(context, *fileDesc, true, accessInfo);
+        getJsonTypeInfo(*fileDesc, accessInfo);
+
+        accessInfo.setExpiryTime(metaInfo->queryProp("expiryTime"));
+        accessInfo.setFileAccessPort(metaInfo->getPropInt("port"));
+        accessInfo.setFileAccessSSL(metaInfo->getPropBool("secure"));
+    }
+    catch (IException *e)
+    {
+        FORWARDEXCEPTION(context, e,  ECLWATCH_INTERNAL_ERROR);
+    }
+    return true;
+}
+
+bool CWsDfuEx::onDFUFileCreateV2(IEspContext &context, IEspDFUFileCreateV2Request &req, IEspDFUFileCreateResponse &resp)
+{
+    try
+    {
+        const char *fileName = req.getName();
+        const char *clusterName = req.getCluster();
+        const char *recordDefinition = req.getECLRecordDefinition();
+        unsigned expirySecs = req.getExpirySeconds();
+        StringBuffer requestId = req.getRequestId();
+        bool returnTextResponse = req.getReturnTextResponse();
+
+        if (isEmptyString(fileName))
+             throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "DFUFileCreateV2: No Name defined.");
+        if (isEmptyString(clusterName))
+             throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "DFUFileCreateV2: No Cluster defined.");
+        if (isEmptyString(recordDefinition))
+             throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "DFUFileCreateV2: No ECLRecordDefinition defined.");
 
         StringBuffer userId;
-        Owned<IUserDescriptor> userDesc;
         context.getUserID(userId);
+        Owned<IUserDescriptor> userDesc;
         if (!userId.isEmpty())
         {
             userDesc.setown(createUserDescriptor());
             userDesc->set(userId.str(), context.queryPassword(), context.querySignature());
         }
 
+        ClusterType clusterType = getClusterTypeByClusterName(clusterName);
+        if (clusterType == NoCluster)
+             throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "DFUFileCreateV2: Cluster %s not found.", clusterName);
+
+        StringBuffer groupName;
+        getClusterGroupName(groupName, clusterName);
+        if (isEmptyString(groupName))
+             throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "DFUFileCreateV2: Group for cluster %s not found.", clusterName);
+        Owned<IGroup> group = queryNamedGroupStore().lookup(groupName.str());
+        if (!group)
+            throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "DFUFileCreateV2: Failed to get Group %s.", groupName.str());
+
         CDfsLogicalFileName lfn;
         lfn.set(fileName);
-        StringBuffer tempFileName = lfn.get();
+        StringBuffer normalizedFileName = lfn.get();
+
+        checkLogicalName(normalizedFileName, userDesc, false, true, false, nullptr);
+
+        StringBuffer tempFileName = normalizedFileName;
+        tempFileName.append(".").append(dfuCreateUniqId++); // avoid potential clash if >1 creating file. One will succeed at publish time.
         tempFileName.append(DFUFileCreate_FileNamePostfix);
 
-        StringBuffer jobId = requestBase.getJobId();
-        if (jobId.isEmpty())
-            jobId.appendf("Create %s on %s", tempFileName.str(), clusterName);
+        //create FileId
+        StringBuffer fileId;
+        fileId.set(groupName).append(DFUFileIdSeparator).append(clusterName).append(DFUFileIdSeparator).append(tempFileName);
+        resp.setFileId(fileId.str());
 
-        Owned<IFileDescriptor> fileDesc = createFileDescriptor(tempFileName, clusterTypeEx, groupName, group);
-        fileDesc->queryProperties().setProp("@job", jobId);
+        if (requestId.isEmpty())
+            requestId.appendf("Create %s on %s", normalizedFileName.str(), clusterName);
+
+        Owned<IFileDescriptor> fileDesc = createFileDescriptor(tempFileName, clusterTypeString(clusterType, false), groupName, group);
+        fileDesc->queryProperties().setProp("@job", requestId);
         if (!userId.isEmpty())
             fileDesc->queryProperties().setProp("@owner", userId);
+        fileDesc->queryProperties().setProp("ECL", recordDefinition);
 
-        Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fileDesc);
-        file->setAccessed();
-        file->setECL(recordDefinition);
-        file->queryAttributes().setPropBin("_rtlType", layoutBin.length(), layoutBin.toByteArray());
-        file->queryAttributes().setPropInt64("@recordCount", 0);
-        file->queryAttributes().setPropInt64("@size", 0);
+        MemoryBuffer layoutBin;
+        exportRecordDefinitionBinaryType(recordDefinition, layoutBin);
+        if (0 == layoutBin.length())
+            throw makeStringExceptionV(ECLWATCH_INVALID_ECLRECDEF, "DFUFileCreateV2: Failed to parse ECL record definition: %s.", recordDefinition);
+        fileDesc->queryProperties().setPropBin("_rtlType", layoutBin.length(), layoutBin.toByteArray());
+        if (req.getCompressed())
+            fileDesc->queryProperties().setPropBool("@blockCompressed", true);
+
+        // NB: if file has copies on >1 cluster, they must share the same key
+        StringBuffer keyPairName;
+        unsigned port;
+        bool secure;
 
-        getFileAccessBeforePublish(context, tempFileName, clusterName, jobId, layoutBin, fileDesc, userDesc, requestBase, resp);
+        std::vector<std::string> groups;
+        groups.push_back(groupName.str());
+        getFileDafilesrvConfiguration(keyPairName, port, secure, normalizedFileName, groups);
 
-        //create FileId
-        StringBuffer fileID;
-        fileID.set(groupName.str()).append(DFUFileIdSeparator).append(clusterName).append(DFUFileIdSeparator).append(tempFileName.str());
-        resp.setFileId(fileID.str());
+        Owned<IPropertyTree> metaInfo = createDFUFileMetaInfo(tempFileName, fileDesc, requestId, "WRITE", expirySecs, userDesc, keyPairName, port, secure, maxFileAccessExpirySeconds);
+        metaInfo->setProp("clusterName", clusterName);
+
+        StringBuffer metaInfoBlob;
+        encodeDFUFileMeta(metaInfoBlob, metaInfo, env);
+
+        IEspDFUFileAccessInfo &accessInfo = resp.updateAccessInfo();
+        accessInfo.setMetaInfoBlob(metaInfoBlob);
+        if (returnTextResponse)
+        {
+            getFilePartsInfo(context, *fileDesc, true, accessInfo);
+            getJsonTypeInfo(*fileDesc, accessInfo);
+
+            accessInfo.setExpiryTime(metaInfo->queryProp("expiryTime"));
+            accessInfo.setFileAccessPort(metaInfo->getPropInt("port"));
+            accessInfo.setFileAccessSSL(metaInfo->getPropBool("secure"));
+        }
     }
     catch (IException *e)
     {
@@ -6391,55 +6407,97 @@ bool CWsDfuEx::onDFUFileCreate(IEspContext &context, IEspDFUFileCreateRequest &r
 
 bool CWsDfuEx::onDFUFilePublish(IEspContext &context, IEspDFUFilePublishRequest &req, IEspDFUFilePublishResponse &resp)
 {
+    Owned<IException> exception;
+    Owned<IDistributedFile> newFile;
+    Owned<IFileDescriptor> fileDesc;
+    StringBuffer normalizeTempFileName;
     try
     {
-        context.ensureFeatureAccess(FEATURE_URL, SecAccess_Write, ECLWATCH_DFU_ACCESS_DENIED, "WsDfu::DFUFilePublish: Permission denied.");
-
         const char *fileId = req.getFileId();
-        const char *recordDefinition = req.getECLRecordDefinition();
         if (isEmptyString(fileId))
-             throw MakeStringException(ECLWATCH_INVALID_INPUT, "No FileId defined.");
-        if (isEmptyString(recordDefinition))
-             throw MakeStringException(ECLWATCH_INVALID_INPUT, "No ECLRecordDefinition defined.");
+             throw makeStringException(ECLWATCH_INVALID_INPUT, "DFUFilePublish: No FileId defined.");
+
+        const char *recordDefinition = req.getECLRecordDefinition();
 
         StringArray fileIdItems;
         fileIdItems.appendList(fileId, DFUFileIdSeparator);
-        if (fileIdItems.ordinality() < 3)
-            throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid FileId '%s'.", fileId);
-
         const char *groupName = fileIdItems.item(0);
         const char *clusterName = fileIdItems.item(1);
         const char *tempFileName = fileIdItems.item(2);
         if (isEmptyString(groupName))
-             throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid FileId: empty groupName.");
+             throw makeStringException(ECLWATCH_INVALID_INPUT, "DFUFilePublish: Invalid FileId: empty groupName.");
         if (isEmptyString(clusterName))
-             throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid FileId: empty clusterName.");
+             throw makeStringException(ECLWATCH_INVALID_INPUT, "DFUFilePublish: Invalid FileId: empty clusterName.");
         if (isEmptyString(tempFileName))
-             throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid FileId: empty FileName.");
+             throw makeStringException(ECLWATCH_INVALID_INPUT, "DFUFilePublish: Invalid FileId: empty FileName.");
 
-        StringBuffer userId, newFileName;
         CDfsLogicalFileName lfn;
         lfn.set(tempFileName);
-        newFileName.set(lfn.get());
-        if (newFileName.length() <= strlen(DFUFileCreate_FileNamePostfix))
-            throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid FileId: cannot read FileName from %s.", tempFileName);
-        newFileName.setLength(newFileName.length() - strlen(DFUFileCreate_FileNamePostfix)); //remove DFUFileCreate_FileNamePostfix
+        normalizeTempFileName.set(lfn.get());
 
-        ClusterType clusterType = getClusterTypeByClusterName(clusterName);
-        if (clusterType == NoCluster)
-             throw MakeStringException(ECLWATCH_INVALID_INPUT, "Cluster %s not found.", clusterName);
+        size32_t postFixLen = strlen(DFUFileCreate_FileNamePostfix);
+        if (normalizeTempFileName.length() <= postFixLen)
+            throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "DFUFilePublish: Invalid temporary filename: %s.", fileId);
 
-        const char *clusterTypeEx = clusterTypeString(clusterType, false);
+        const MemoryBuffer &fDescBlob = req.getFileDescriptorBlob();
+        if (fDescBlob.length())
+        {
+            MemoryBuffer mb;
+            mb.setBuffer(fDescBlob.length(), (void *)fDescBlob.toByteArray());
+            fileDesc.setown(deserializeFileDescriptor(mb));
+        }
+        else
+        {
+            if (isEmptyString(recordDefinition))
+                 throw makeStringException(ECLWATCH_INVALID_INPUT, "DFUFilePublish: No ECLRecordDefinition defined.");
 
-        GroupType groupType;
-        StringBuffer basedir;
-        Owned<IGroup> group = queryNamedGroupStore().lookup(groupName, basedir, groupType);
-        if (!group)
-            throw MakeStringException(ECLWATCH_FILE_NOT_EXIST, "Failed to find group %s.", groupName);
+            ClusterType clusterType = getClusterTypeByClusterName(clusterName);
+            const char *clusterTypeEx = clusterTypeString(clusterType, false);
+            GroupType groupType;
+            StringBuffer basedir;
+            Owned<IGroup> group = queryNamedGroupStore().lookup(groupName, basedir, groupType);
+            if (!group)
+                throw makeStringExceptionV(ECLWATCH_FILE_NOT_EXIST, "DFUFilePublish: Failed to find group %s.", groupName);
 
-        MemoryBuffer layoutBin;
-        exportRecordDefinitionBinaryType(recordDefinition, layoutBin);
+            fileDesc.setown(createFileDescriptor(normalizeTempFileName, clusterTypeEx, groupName, group));
+        }
 
+        StringBuffer newFileName = normalizeTempFileName;
+        const char *start = newFileName;
+        const char *pos = start + (newFileName.length() - postFixLen);
+        const char *startPos = pos;
+        if (!streq(pos, DFUFileCreate_FileNamePostfix))
+            throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "DFUFilePublish: Invalid temporary filename: %s.", fileId);
+        while (true)
+        {
+            --pos;
+            if (pos == start)
+            {
+                // old style ( <= 1.39 ), without unique id
+                newFileName.setLength(startPos-start);
+                break;
+            }
+            else if ('.' == *pos)
+            {
+                newFileName.setLength(pos-start);
+                break;
+            }
+        }
+        newFileName.setLength(pos-start);
+
+        if (!isEmptyString(recordDefinition))
+        {
+            MemoryBuffer layoutBin;
+            exportRecordDefinitionBinaryType(recordDefinition, layoutBin);
+            fileDesc->queryProperties().setPropBin("_rtlType", layoutBin.length(), layoutBin.toByteArray());
+            fileDesc->queryProperties().setProp("ECL", recordDefinition);
+        }
+        if (!req.getRecordCount_isNull())
+            fileDesc->queryProperties().setPropInt64("@recordCount", req.getRecordCount());
+        if (!req.getFileSize_isNull())
+            fileDesc->queryProperties().setPropInt64("@size", req.getFileSize());
+
+        StringBuffer userId;
         Owned<IUserDescriptor> userDesc;
         context.getUserID(userId);
         if (!userId.isEmpty())
@@ -6447,37 +6505,49 @@ bool CWsDfuEx::onDFUFilePublish(IEspContext &context, IEspDFUFilePublishRequest
             userDesc.setown(createUserDescriptor());
             userDesc->set(userId.str(), context.queryPassword(), context.querySignature());
         }
+        Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(newFileName, userDesc, false, false, true);
+        if (df)
+        {
+            if (!req.getOverwrite())
+                throw makeStringExceptionV(ECLWATCH_FILE_ALREADY_EXISTS, "DFUFilePublish: File already exists (%s) and overwrite not specified.", newFileName.str());
+            df->detach(30000);
+        }
 
-        VStringBuffer jobId("Publish %s on %s", newFileName.str(), clusterName);
-        Owned<IFileDescriptor> fileDesc = createFileDescriptor(tempFileName, clusterTypeEx, groupName, group);
-        Owned<IDistributedFile> oldFile = queryDistributedFileDirectory().createNew(fileDesc);
-        oldFile->validate();
-
-        if (!oldFile->renamePhysicalPartFiles(newFileName.str(), nullptr, nullptr, nullptr))
-            throw MakeStringException(ECLWATCH_FILE_NOT_EXIST, "Failed in renamePhysicalPartFiles %s.", newFileName.str());
+        newFile.setown(queryDistributedFileDirectory().createNew(fileDesc));
+        newFile->validate();
+        newFile->setAccessed();
+        newFile->attach(normalizeTempFileName, userDesc);
 
-        Owned<IFileDescriptor> newFileDesc = createFileDescriptor(newFileName, clusterTypeEx, groupName, group);
-        newFileDesc->queryProperties().setProp("@job", jobId);
-        if (!userId.isEmpty())
-            newFileDesc->queryProperties().setProp("@owner", userId);
+        if (!newFile->renamePhysicalPartFiles(newFileName, nullptr, nullptr))//, fileDesc->queryDefaultDir()))
+            throw makeStringExceptionV(ECLWATCH_FILE_NOT_EXIST, "DFUFilePublish: Failed in renamePhysicalPartFiles %s.", newFileName.str());
 
-        Owned<IDistributedFile> newFile = queryDistributedFileDirectory().createNew(newFileDesc);
-        newFile->setAccessed();
-        newFile->setECL(recordDefinition);
-        newFile->queryAttributes().setPropBin("_rtlType", layoutBin.length(), layoutBin.toByteArray());
-        if (!req.getRecordCount_isNull())
-            newFile->queryAttributes().setPropInt64("@recordCount", req.getRecordCount());
-        if (!req.getFileSize_isNull())
-            newFile->queryAttributes().setPropInt64("@size", req.getFileSize());
-        newFile->attach(newFileName.str(), userDesc);
+        newFile->rename(newFileName, userDesc);
 
         LOG(daliAuditLogCat,",FileAccess,EspProcess,CREATED,%s,%s,%s", groupName, userId.str(), newFileName.str());
     }
     catch (IException *e)
     {
-        FORWARDEXCEPTION(context, e,  ECLWATCH_INTERNAL_ERROR);
+        exception.setown(e);
+    }
+
+    if (exception)
+    {
+        if (fileDesc)
+        {
+            Owned<IMultiException> exceptions = MakeMultiException("CWsDfuEx::onDFUFilePublish");
+            queryDistributedFileDirectory().removePhysicalPartFiles(normalizeTempFileName, fileDesc, exceptions);
+            if (exceptions->ordinality())
+            {
+                StringBuffer errMsg("Error whilst clearing up temporary file: ");
+                EXCLOG(exceptions, errMsg.append(normalizeTempFileName).str());
+            }
+        }
+        if (newFile)
+            newFile->detach(30000);
+        FORWARDEXCEPTION(context, exception.getClear(), ECLWATCH_INTERNAL_ERROR);
     }
     return true;
 }
 
+
 //////////////////////HPCC Browser//////////////////////////

+ 12 - 9
esp/services/ws_dfu/ws_dfuService.hpp

@@ -140,6 +140,7 @@ class CWsDfuEx : public CWsDfu
     Owned<IConstEnvironment> env;
     static const unsigned defaultMaxFileAccessExpirySeconds=86400; // 24 hours
 
+    void dFUFileAccessCommon(IEspContext &context, const CDfsLogicalFileName &lfn, const char *requestId, unsigned expirySecs, bool returnTextResponse, IEspDFUFileAccessInfo &accessInfo);
 public:
     IMPLEMENT_IINTERFACE;
     virtual ~CWsDfuEx(){};
@@ -168,10 +169,16 @@ public:
     virtual bool onSuperfileAction(IEspContext &context, IEspSuperfileActionRequest &req, IEspSuperfileActionResponse &resp);
     virtual bool onListHistory(IEspContext &context, IEspListHistoryRequest &req, IEspListHistoryResponse &resp);
     virtual bool onEraseHistory(IEspContext &context, IEspEraseHistoryRequest &req, IEspEraseHistoryResponse &resp);
-    virtual bool onDFUFileAccess(IEspContext &context, IEspDFUFileAccessRequest &req, IEspDFUFileAccessResponse &resp);
-    virtual bool onDFUFileCreate(IEspContext &context, IEspDFUFileCreateRequest &req, IEspDFUFileCreateResponse &resp);
+
     virtual bool onDFUFilePublish(IEspContext &context, IEspDFUFilePublishRequest &req, IEspDFUFilePublishResponse &resp);
 
+    virtual bool onDFUFileAccessV2(IEspContext &context, IEspDFUFileAccessV2Request &req, IEspDFUFileAccessResponse &resp);
+    virtual bool onDFUFileCreateV2(IEspContext &context, IEspDFUFileCreateV2Request &req, IEspDFUFileCreateResponse &resp);
+
+    // NB: the following 3 methods are deprecated from ver >= 1.50
+        virtual bool onDFUFileAccess(IEspContext &context, IEspDFUFileAccessRequest &req, IEspDFUFileAccessResponse &resp);
+        virtual bool onDFUFileCreate(IEspContext &context, IEspDFUFileCreateRequest &req, IEspDFUFileCreateResponse &resp);
+
 private:
     const char* getPrefixFromLogicalName(const char* logicalName, StringBuffer& prefix);
     bool addDFUQueryFilter(DFUQResultField *filters, unsigned short &count, MemoryBuffer &buff, const char* value, DFUQResultField name);
@@ -237,14 +244,10 @@ private:
     void queryFieldNames(IEspContext &context, const char *fileName, const char *cluster,
         unsigned __int64 fieldMask, StringArray &fieldNames);
     void parseFieldMask(unsigned __int64 fieldMask, unsigned &fieldCount, IntArray &fieldIndexArray);
-    void getFilePartsInfo(IEspContext &context, IFileDescriptor *fdesc, unsigned numParts, bool forFileCreate, IEspDFUFileAccessInfo &accessInfo);
-    void getFileDafilesrvConfiguration(StringBuffer &keyPairName, unsigned &port, bool &secure, IDistributedFile &file);
-    void getFileDafilesrvConfiguration(StringBuffer &keyPairName, unsigned &retPort, bool &retSecure, const char *cluster);
-    void getFileMeta(StringBuffer &metaInfo, StringBuffer &expiryTime, const char *fileName, IFileDescriptor *fDesc, IUserDescriptor *user, const char *jobID, const char *keyPairName, IConstDFUFileAccessRequestBase &req);
-    void getFileAccess(IEspContext &context, IUserDescriptor *udesc, SecAccessFlags accessType, IConstDFUFileAccessRequestBase &req, IEspDFUFileAccessInfo &accessInfo);
-    IGroup *getDFUFileIGroup(const char *clusterName, ClusterType clusterType, const char *clusterTypeEx, StringArray &locations, StringBuffer &groupName);
+    void getFilePartsInfo(IEspContext &context, IFileDescriptor &fileDesc, bool forFileCreate, IEspDFUFileAccessInfo &accessInfo);
+    void getFileDafilesrvConfiguration(StringBuffer &keyPairName, unsigned &port, bool &secure, const char *fileName, std::vector<std::string> &groups);
+    void getFileDafilesrvConfiguration(StringBuffer &keyPairName, unsigned &retPort, bool &retSecure, const char *group);
     void exportRecordDefinitionBinaryType(const char *recordDefinition, MemoryBuffer &layoutBin);
-    void getFileAccessBeforePublish(IEspContext &context, const char *logicalName, const char *cluster, const char *jobId, MemoryBuffer& layoutBin, IFileDescriptor *fileDesc, IUserDescriptor *udesc, IConstDFUFileAccessRequestBase &req, IEspDFUFileCreateResponse &resp);
 
     bool attachServiceToDali() override
     {

+ 2 - 0
esp/smc/SMCLib/eclwatch_errorlist.hpp

@@ -125,6 +125,8 @@
 
 #define ECLWATCH_VIEW_ACCESS_DENIED         ECLWATCH_ERROR_START+105
 #define ECLWATCH_CANNOT_COPY_DLL            ECLWATCH_ERROR_START+106
+#define ECLWATCH_INVALID_ECLRECDEF          ECLWATCH_ERROR_START+107
+
 
 #endif //_ECLWATCH_ERRORLIST_HPP__
 

+ 36 - 1
system/jlib/jlzw.hpp

@@ -144,7 +144,42 @@ extern jlib_decl bool removeCompressorHandler(ICompressHandler *handler); // ret
 extern jlib_decl ICompressor *getCompressor(const char *type, const char *options=NULL);
 extern jlib_decl IExpander *getExpander(const char *type, const char *options=NULL);
 
-
+inline unsigned translateToCompMethod(const char *compStr)
+{
+    unsigned compMethod = COMPRESS_METHOD_LZ4;
+    if (!isEmptyString(compStr))
+    {
+        if (strieq("FLZ", compStr))
+            compMethod = COMPRESS_METHOD_FASTLZ;
+        else if (strieq("LZW", compStr))
+            compMethod = COMPRESS_METHOD_LZW;
+        else if (strieq("RDIFF", compStr))
+            compMethod = COMPRESS_METHOD_ROWDIF;
+        else if (strieq("LZMA", compStr))
+            compMethod = COMPRESS_METHOD_LZMA;
+        //else // default is LZ4
+    }
+    return compMethod;
+}
+
+inline const char *translateFromCompMethod(unsigned compMethod)
+{
+    switch (compMethod)
+    {
+        case COMPRESS_METHOD_ROWDIF:
+            return "RDIFF";
+        case COMPRESS_METHOD_LZW:
+            return "LZW";
+        case COMPRESS_METHOD_FASTLZ:
+            return "FLZ";
+        case COMPRESS_METHOD_LZ4:
+            return "LZ4";
+        case COMPRESS_METHOD_LZMA:
+            return "LZMA";
+        default:
+            return ""; // none
+    }
+}
 
 #define MIN_ROWCOMPRESS_RECSIZE 8
 #endif

+ 1 - 0
system/jlib/jstring.hpp

@@ -574,6 +574,7 @@ extern jlib_decl bool endsWithIgnoreCase(const char* src, const char* dst);
 inline bool strieq(const char* s, const char* t) { return stricmp(s,t)==0; }
 inline bool streq(const char* s, const char* t) { return strcmp(s,t)==0; }
 inline bool strsame(const char* s, const char* t) { return (s == t) || (s && t && strcmp(s,t)==0); }  // also allow nulls
+inline bool strisame(const char* s, const char* t) { return (s == t) || (s && t && stricmp(s,t)==0); }  // also allow nulls
 inline bool isEmptyString(const char *text) { return !text||!*text; }
 inline bool hasPrefix(const char * text, const char * prefix, bool caseSensitive)
 {