Przeglądaj źródła

HPCC-20443 Add WsDfu.DFUFileCreate and WsDfu.DFUFilePublish

The methods are used to create a logical file (temp) and publish
it later.

Also add FileId into DFUFileCreateResponse and DFUFilePublishRequest;
Move file name and job Id to request base; Add a warning if failed to
return RecordTypeInfo; Check physical file parts; Create Group name
using PartLocations. Return system group if it's locations match.

Call getFileDafilesrvConfiguration before getFileMeta. Also split
getFileDafilesrvConfiguration() to two.

Signed-off-by: wangkx <kevin.wang@lexisnexis.com>
wangkx 6 lat temu
rodzic
commit
006d732300

+ 49 - 0
common/workunit/workunit.cpp

@@ -7269,6 +7269,55 @@ extern WORKUNIT_API StringBuffer &getClusterThorGroupName(StringBuffer &ret, con
     return ret;
 }
 
+extern WORKUNIT_API StringBuffer &getClusterGroupName(StringBuffer &ret, const char *cluster)
+{
+    Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
+    Owned<IConstEnvironment> env = factory->openEnvironment();
+    Owned<IPropertyTree> root = &env->getPTree();
+    StringBuffer path;
+    path.set("Software/ThorCluster[@name=\"").append(cluster).append("\"]");
+    IPropertyTree * child = root->queryPropTree(path);
+    if (child)
+    {
+        return getClusterGroupName(*child, ret);
+    }
+    path.set("Software/RoxieCluster[@name=\"").append(cluster).append("\"]");
+    child = root->queryPropTree(path);
+    if (child)
+    {
+        return getClusterGroupName(*child, ret);
+    }
+    path.set("Software/EclAgentProcess[@name=\"").append(cluster).append("\"]");
+    child = root->queryPropTree(path);
+    if (child)
+    {
+        return ret.setf("hthor__%s", cluster);
+    }
+
+    return ret;
+}
+
+extern WORKUNIT_API ClusterType getClusterTypeByClusterName(const char *cluster)
+{
+    Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
+    Owned<IConstEnvironment> env = factory->openEnvironment();
+    Owned<IPropertyTree> root = &env->getPTree();
+    StringBuffer path;
+    path.set("Software/ThorCluster[@name=\"").append(cluster).append("\"]");
+    if (root->hasProp(path))
+        return ThorLCRCluster;
+
+    path.set("Software/RoxieCluster[@name=\"").append(cluster).append("\"]");
+    if (root->hasProp(path))
+        return RoxieCluster;
+
+    path.set("Software/EclAgentProcess[@name=\"").append(cluster).append("\"]");
+    if (root->hasProp(path))
+        return HThorCluster;
+
+    return NoCluster;
+}
+
 extern WORKUNIT_API StringBuffer &getClusterRoxieQueueName(StringBuffer &ret, const char *cluster)
 {
     return ret.append(cluster).append(ROXIE_QUEUE_EXT);

+ 2 - 0
common/workunit/workunit.hpp

@@ -1550,6 +1550,8 @@ extern WORKUNIT_API IStringVal &getEclSchedulerQueueNames(IStringVal &ret, const
 extern WORKUNIT_API IStringVal &getAgentQueueNames(IStringVal &ret, const char *process);
 extern WORKUNIT_API IStringVal &getRoxieQueueNames(IStringVal &ret, const char *process);
 extern WORKUNIT_API IStringVal &getThorQueueNames(IStringVal &ret, const char *process);
+extern WORKUNIT_API ClusterType getClusterTypeByClusterName(const char *cluster);
+extern WORKUNIT_API StringBuffer &getClusterGroupName(StringBuffer &ret, const char *cluster);
 extern WORKUNIT_API StringBuffer &getClusterThorQueueName(StringBuffer &ret, const char *cluster);
 extern WORKUNIT_API StringBuffer &getClusterThorGroupName(StringBuffer &ret, const char *cluster);
 extern WORKUNIT_API StringBuffer &getClusterRoxieQueueName(StringBuffer &ret, const char *cluster);

+ 22 - 0
dali/base/dafdesc.cpp

@@ -2336,7 +2336,29 @@ IFileDescriptor *createFileDescriptor(const char *lname,IGroup *grp,IPropertyTre
     return res;
 }
 
+IFileDescriptor *createFileDescriptor(const char *lname, const char *clusterType, const char *groupName, IGroup *group)
+{
+    StringBuffer partMask;
+    unsigned parts = group->ordinality();
+    getPartMask(partMask, lname, parts);
+
+    StringBuffer curDir, defaultDir;
+    if (!getConfigurationDirectory(nullptr, "data", clusterType, groupName, defaultDir))
+        makePhysicalPartName(lname, 0, 0, curDir, false, DFD_OSdefault); // legacy
+    else
+        makePhysicalPartName(lname, 0, 0, curDir, false, SepCharBaseOs(getPathSepChar(defaultDir)), defaultDir.str());
+
+    Owned<IFileDescriptor> fileDesc = createFileDescriptor();
+    fileDesc->setNumParts(parts);
+    fileDesc->setPartMask(partMask);
+    fileDesc->setDefaultDir(curDir);
 
+    ClusterPartDiskMapSpec mspec;
+    mspec.defaultCopies = DFD_DefaultCopies;
+    fileDesc->addCluster(groupName, group, mspec);
+
+    return fileDesc.getClear();
+}
 
 IFileDescriptor *deserializeFileDescriptor(MemoryBuffer &mb)
 {

+ 1 - 0
dali/base/dafdesc.hpp

@@ -324,6 +324,7 @@ extern da_decl bool setReplicateDir(const char *name,StringBuffer &out, bool isr
 
 extern da_decl IFileDescriptor *createFileDescriptor();
 extern da_decl IFileDescriptor *createFileDescriptor(IPropertyTree *attr);      // ownership of attr tree is taken
+extern da_decl IFileDescriptor *createFileDescriptor(const char *lname, const char *clusterType, const char *groupName, IGroup *grp);
 extern da_decl IFileDescriptor *createExternalFileDescriptor(const char *logicalname);
 extern da_decl IFileDescriptor *getExternalFileDescriptor(const char *logicalname);
 extern da_decl ISuperFileDescriptor *createSuperFileDescriptor(IPropertyTree *attr);        // ownership of attr tree is taken

+ 8 - 7
esp/clients/wsdfuaccess/wsdfuaccess.cpp

@@ -50,6 +50,7 @@ bool getFileAccess(StringBuffer &metaInfo, const char *serviceUrl, const char *j
     dfuClient->setUsernameToken(user, token, "");
 
     Owned<IClientDFUFileAccessRequest> dfuReq = dfuClient->createDFUFileAccessRequest();
+    IEspDFUFileAccessRequestBase &requestBase = dfuReq->updateRequestBase();
 
     CDfsLogicalFileName lfn;
     lfn.set(logicalName);
@@ -58,12 +59,12 @@ bool getFileAccess(StringBuffer &metaInfo, const char *serviceUrl, const char *j
     lfn.getCluster(cluster);
     lfn.get(lfnName); // remove cluster if present
 
-    dfuReq->setName(lfnName);
-    dfuReq->setCluster(cluster);
-    dfuReq->setExpirySeconds(expirySecs);
-    dfuReq->setAccessRole(CFileAccessRole_Engine);
-    dfuReq->setAccessType(translateToCSecAccessAccessType(access));
-    dfuReq->setJobId(jobId);
+    requestBase.setName(lfnName);
+    requestBase.setCluster(cluster);
+    requestBase.setExpirySeconds(expirySecs);
+    requestBase.setAccessRole(CFileAccessRole_Engine);
+    requestBase.setAccessType(translateToCSecAccessAccessType(access));
+    requestBase.setJobId(jobId);
 
     Owned<IClientDFUFileAccessResponse> dfuResp = dfuClient->DFUFileAccess(dfuReq);
 
@@ -71,7 +72,7 @@ bool getFileAccess(StringBuffer &metaInfo, const char *serviceUrl, const char *j
     if (excep->ordinality() > 0)
         throw LINK((IMultiException *)excep); // JCSMORE - const IException.. not caught in general..
 
-    metaInfo.append(dfuResp->getMetaInfoBlob());
+    metaInfo.append(dfuResp->getAccessInfo().getMetaInfoBlob());
     return true;
 }
 

+ 55 - 11
esp/scm/ws_dfu.ecm

@@ -832,39 +832,50 @@ ESPresponse [exceptions_inline, nil_remove] EraseHistoryResponse
     ESParray<ESPStruct History, Origin> History;
 };
 
-ESPrequest DFUFileAccessRequest
+ESPStruct DFUFileAccessRequestBase
 {
     string Name;
     string Cluster;
+    string JobId;
     int ExpirySeconds(60);
     ESPenum FileAccessRole AccessRole;
     ESPenum SecAccessType AccessType;
-    string JobId;
     bool ReturnJsonTypeInfo(false);
     bool ReturnBinTypeInfo(false);
 };
 
-// DFUPartLocations and DFUPartCopies part of DFUFileAccessResponse below
-ESPStruct DFUPartLocations
+ESPrequest DFUFileAccessRequest
+{
+    EspStruct DFUFileAccessRequestBase RequestBase;
+};
+
+ESPStruct DFUPartLocation
 {
     int LocationIndex;
-    string Location;
+    string Host;
 };
 
-ESPStruct DFUPartCopies
+ESPStruct DFUFileCopy
+{
+    int CopyIndex;
+    int LocationIndex;
+    string Path;
+};
+
+ESPStruct DFUFilePart
 {
     int PartIndex;
-    ESParray<string> LocationIndexes;
+    ESParray<EspStruct DFUFileCopy> Copies;
 };
 
-ESPresponse [exceptions_inline] DFUFileAccessResponse
+ESPStruct DFUFileAccessInfo
 {
     string MetaInfoBlob;
     string ExpiryTime;
-    // {NumParts, FilePartLocations, FileParts} depend on ReturnFileInfo in request
+    // {NumParts, FileParts} depend on ReturnFileInfo in request
     int NumParts;        // number of parts in logical file
-    ESParray<EspStruct DFUPartLocations> FilePartLocations;
-    ESParray<EspStruct DFUPartCopies> FileParts;
+    ESParray<EspStruct DFUPartLocation> FileLocations;
+    ESParray<EspStruct DFUFilePart> FileParts;
     
     binary RecordTypeInfoBin;   // optional
     string RecordTypeInfoJson;  // optional
@@ -873,6 +884,37 @@ ESPresponse [exceptions_inline] DFUFileAccessResponse
     bool fileAccessSSL;
 };
 
+ESPresponse [exceptions_inline] DFUFileAccessResponse
+{
+    EspStruct DFUFileAccessInfo AccessInfo;
+};
+
+ESPrequest [nil_remove] DFUFileCreateRequest
+{
+    string ECLRecordDefinition;
+    ESParray<string> PartLocations;
+    EspStruct DFUFileAccessRequestBase RequestBase;
+};
+
+ESPresponse [exceptions_inline, nil_remove] DFUFileCreateResponse
+{
+    string FileId;
+    string Warning;
+    EspStruct DFUFileAccessInfo AccessInfo;
+};
+
+ESPrequest [nil_remove] DFUFilePublishRequest
+{
+    string FileId;
+    string ECLRecordDefinition;
+    int64 RecordCount;
+    int64 FileSize;
+};
+
+ESPresponse [exceptions_inline, nil_remove] DFUFilePublishResponse
+{
+};
+
 //  ===========================================================================
 ESPservice [
     auth_feature("DEFERRED"),
@@ -906,6 +948,8 @@ ESPservice [
     ESPmethod EclRecordTypeInfo(EclRecordTypeInfoRequest, EclRecordTypeInfoResponse);
 
     ESPmethod [auth_feature("DfuAccess:READ"), min_var("1.39")] DFUFileAccess(DFUFileAccessRequest, DFUFileAccessResponse);
+    ESPmethod [auth_feature("DfuAccess:FULL"), min_var("1.39")] DFUFileCreate(DFUFileCreateRequest, DFUFileCreateResponse);
+    ESPmethod [auth_feature("DfuAccess:FULL"), min_var("1.39")] DFUFilePublish(DFUFilePublishRequest, DFUFilePublishResponse);
 };
 
 SCMexportdef(WSDFU);

+ 384 - 72
esp/services/ws_dfu/ws_dfuService.cpp

@@ -81,6 +81,11 @@ static const char* FEATURE_URL="DfuAccess";
 
 #define REMOVE_FILE_SDS_CONNECT_TIMEOUT (1000*15)  // 15 seconds
 
+static const char *DFUFileIdSeparator = "|";
+static const char *DFUFileCreate_FileNamePostfix = ".wsdfucreate.tmp";
+static const char *DFUFileCreate_GroupNamePrefix = "wsdfucreate";
+static const char *ConfigurationDirectoryForDataCategory = "data";
+
 const unsigned NODE_GROUP_CACHE_DEFAULT_TIMEOUT = 30*60*1000; //30 minutes
 
 const unsigned MAX_VIEWKEYFILE_ROWS = 1000;
@@ -5878,55 +5883,77 @@ int CWsDfuEx::GetIndexData(IEspContext &context, bool bSchemaOnly, const char* i
     return iRet;
 }
 
-unsigned CWsDfuEx::getFilePartsInfo(IEspContext &context, IDistributedFile *df, const char *clusterName,
-    IArrayOf<IEspDFUPartLocations> &dfuPartLocations, IArrayOf<IEspDFUPartCopies> &dfuPartCopies)
+void CWsDfuEx::getFilePartsInfo(IEspContext &context, IFileDescriptor *fdesc, unsigned numParts, bool forFileCreate, IEspDFUFileAccessInfo &accessInfo)
 {
-    int nextLocationsIndex = 1; // NB: both LocationIndex and PartIndex are 1 based in response.
-    MapStringTo<int> locationMap;
-    Owned<IFileDescriptor> fdesc = df->getFileDescriptor(clusterName);
+    IArrayOf<IEspDFUFilePart> dfuParts;
+    IArrayOf<IEspDFUPartLocation> dfuPartLocations;
+
+    unsigned newLocationIndex = 0;
+    MapStringTo<unsigned> partLocationMap;
+    // NB: both CopyIndex and PartIndex are 1 based in response.
     Owned<IPartDescriptorIterator> pi = fdesc->getIterator();
     ForEach(*pi)
     {
         IPartDescriptor& part = pi->query();
-        unsigned partIndex = part.queryPartIndex();
+        if (isPartTLK(&part))
+            continue;
 
-        StringArray partCopyLocationsIndexes;
+        IArrayOf<IEspDFUFileCopy> fileCopies;
         for (unsigned int i=0; i<part.numCopies(); i++)
         {
-            StringBuffer host, locationsIndexStr;
+            StringBuffer host, path;
+            part.getPath(path, i);
+
+            Owned<IEspDFUFileCopy> fileCopy = createDFUFileCopy();
+            if (forFileCreate)
+                fileCopy->setPath(path.str());
+            fileCopy->setCopyIndex(i + 1);
+
             part.queryNode(i)->endpoint().getUrlStr(host);
-            int *locationsIndex = locationMap.getValue(host.str());
-            if (locationsIndex)
+            unsigned *locationIndex = partLocationMap.getValue(host.str());
+            if (locationIndex)
+                fileCopy->setLocationIndex(*locationIndex);
+            else
             {
-                partCopyLocationsIndexes.append(locationsIndexStr.append(*locationsIndex).str());
-                continue;
-            }
+                partLocationMap.setValue(host.str(), ++newLocationIndex);
+                fileCopy->setLocationIndex(newLocationIndex);
 
-            Owned<IEspDFUPartLocations> partLocations = createDFUPartLocations();
-            partLocations->setLocationIndex(nextLocationsIndex);
-            partLocations->setLocation(host.str());
-            dfuPartLocations.append(*partLocations.getClear());
+                Owned<IEspDFUPartLocation> partLocation = createDFUPartLocation();
+                partLocation->setLocationIndex(newLocationIndex);
+                partLocation->setHost(host.str());
+                dfuPartLocations.append(*partLocation.getClear());
+            }
 
-            partCopyLocationsIndexes.append(locationsIndexStr.append(nextLocationsIndex).str());
-            locationMap.setValue(host.str(), nextLocationsIndex);
-            nextLocationsIndex++;
+            fileCopies.append(*fileCopy.getClear());
         }
 
-        Owned<IEspDFUPartCopies> partCopies = createDFUPartCopies();
-        partCopies->setPartIndex(partIndex + 1);
-        partCopies->setLocationIndexes(partCopyLocationsIndexes);
-        dfuPartCopies.append(*partCopies.getClear());
+        Owned<IEspDFUFilePart> filePart = createDFUFilePart();
+        filePart->setPartIndex(part.queryPartIndex() + 1);
+        filePart->setCopies(fileCopies);
+        dfuParts.append(*filePart.getClear());
     }
-    return df->numParts();
+
+    accessInfo.setNumParts(numParts);
+    accessInfo.setFileParts(dfuParts);
+    accessInfo.setFileLocations(dfuPartLocations);
 }
 
 static const char *securityInfoVersion="1";
-void CWsDfuEx::getFileMeta(StringBuffer &metaInfoStr, IDistributedFile &file, IUserDescriptor *user, CFileAccessRole role, const char *expiryTime, const char *keyPairName, IConstDFUFileAccessRequest &req)
+void CWsDfuEx::getFileMeta(StringBuffer &metaInfoStr, StringBuffer &expiryTime, const char *fileName,
+    IFileDescriptor *fDesc, IUserDescriptor *user, const char *jobId, const char *keyPairName, IConstDFUFileAccessRequestBase &req)
 {
+    // setup "expiryTime"
+    unsigned expirySecs = req.getExpirySeconds();
+    if (expirySecs > maxFileAccessExpirySeconds)
+        expirySecs = maxFileAccessExpirySeconds;
+    time_t now;
+    time(&now);
+    CDateTime expiryDt;
+    expiryDt.set(now + expirySecs);
+    expiryDt.getString(expiryTime);
+
     Owned<IPropertyTree> metaInfoEnvelope = createPTree();
     Owned<IPropertyTree> metaInfo = createPTree();
-    const char *clusterName = req.getCluster(); // can be null
-    Owned<IFileDescriptor> fDesc = file.getFileDescriptor(clusterName);
     extractFilePartInfo(*metaInfo, *fDesc);
 
     MemoryBuffer metaInfoMb;
@@ -5941,8 +5968,8 @@ void CWsDfuEx::getFileMeta(StringBuffer &metaInfoStr, IDistributedFile &file, IU
     if (!isEmptyString(keyPairName)) // without it, meta data is not encrypted
     {
         metaInfo->setProp("version", securityInfoVersion);
-        metaInfo->setProp("logicalFilename", file.queryLogicalName());
-        metaInfo->setProp("jobId", req.getJobId());
+        metaInfo->setProp("logicalFilename", fileName);
+        metaInfo->setProp("jobId", jobId);
         metaInfo->setProp("accessType", req.getAccessTypeAsString());
         StringBuffer userStr;
         if (user)
@@ -5970,6 +5997,19 @@ void CWsDfuEx::getFileMeta(StringBuffer &metaInfoStr, IDistributedFile &file, IU
     JBASE64_Encode(compressedMetaInfoMb.bytes(), compressedMetaInfoMb.length(), metaInfoStr, false);
 }
 
+void CWsDfuEx::getFileDafilesrvConfiguration(StringBuffer &keyPairName, unsigned &port, bool &secure, const char *cluster)
+{
+    port = DEFAULT_ROWSERVICE_PORT;
+    secure = false;
+    keyPairName.set(env->getClusterKeyPairName(cluster));
+    Owned<IConstDaFileSrvInfo> daFileSrvInfo = env->getDaFileSrvGroupInfo(cluster);
+    if (daFileSrvInfo)
+    {
+        port = daFileSrvInfo->getPort();
+        secure = daFileSrvInfo->getSecure();
+    }
+}
+
 void CWsDfuEx::getFileDafilesrvConfiguration(StringBuffer &keyPairName, unsigned &retPort, bool &retSecure, IDistributedFile &file)
 {
     retPort = DEFAULT_ROWSERVICE_PORT;
@@ -5979,15 +6019,11 @@ void CWsDfuEx::getFileDafilesrvConfiguration(StringBuffer &keyPairName, unsigned
     {
         StringBuffer clusterName;
         const char *cluster = file.getClusterName(c, clusterName.clear()).str();
-        const char *_keyPairName = env->getClusterKeyPairName(cluster);
-        Owned<IConstDaFileSrvInfo> daFileSrvInfo = env->getDaFileSrvGroupInfo(cluster);
-        unsigned port = DEFAULT_ROWSERVICE_PORT;
-        bool secure = false;
-        if (daFileSrvInfo)
-        {
-            port = daFileSrvInfo->getPort();
-            secure = daFileSrvInfo->getSecure();
-        }
+
+        StringBuffer _keyPairName;
+        unsigned port;
+        bool secure;
+        getFileDafilesrvConfiguration(_keyPairName, port, secure, cluster);
         if (0 == c)
         {
             keyPairName.set(_keyPairName);
@@ -6006,12 +6042,12 @@ void CWsDfuEx::getFileDafilesrvConfiguration(StringBuffer &keyPairName, unsigned
     }
 }
 
-void CWsDfuEx::getFileAccess(IEspContext &context, IUserDescriptor *udesc, SecAccessFlags accessType, IEspDFUFileAccessRequest &req, IEspDFUFileAccessResponse &resp)
+void CWsDfuEx::getFileAccess(IEspContext &context, IUserDescriptor *udesc, SecAccessFlags accessType, IConstDFUFileAccessRequestBase &req, IEspDFUFileAccessInfo &accessInfo)
 {
     bool writePermissions = (accessType == SecAccess_Write) || (accessType == SecAccess_Full);
     bool readPermissions = true; // by implication
 
-    StringBuffer fileName(req.getName());
+    StringBuffer fileName = req.getName();
     if (!isEmptyString(req.getCluster()))
         fileName.append("@").append(req.getCluster());
 
@@ -6030,8 +6066,9 @@ void CWsDfuEx::getFileAccess(IEspContext &context, IUserDescriptor *udesc, SecAc
     }
     Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(fileName, udesc, false, false, true); // lock super-owners
     if (!df)
-        throw MakeStringException(ECLWATCH_FILE_NOT_EXIST,"Cannot find file %s.", req.getName());
+        throw MakeStringException(ECLWATCH_FILE_NOT_EXIST,"Cannot find file '%s'.", fileName.str());
 
+    Owned<IFileDescriptor> fileDesc = df->getFileDescriptor(req.getCluster());
     CFileAccessRole role = req.getAccessRole();
     switch (role)
     {
@@ -6054,11 +6091,7 @@ void CWsDfuEx::getFileAccess(IEspContext &context, IUserDescriptor *udesc, SecAc
         }
         case CFileAccessRole_External:
         {
-            IArrayOf<IEspDFUPartLocations> dfuPartLocations;
-            IArrayOf<IEspDFUPartCopies> dfuPartCopies;
-            resp.setNumParts(getFilePartsInfo(context, df, req.getCluster(), dfuPartLocations, dfuPartCopies));
-            resp.setFilePartLocations(dfuPartLocations);
-            resp.setFileParts(dfuPartCopies);
+            getFilePartsInfo(context, fileDesc, df->numParts(), false, accessInfo);
             if (req.getReturnJsonTypeInfo() || req.getReturnJsonTypeInfo())
             {
                 MemoryBuffer binLayout;
@@ -6066,9 +6099,9 @@ void CWsDfuEx::getFileAccess(IEspContext &context, IUserDescriptor *udesc, SecAc
                 if (!getRecordFormatFromRtlType(binLayout, jsonLayout, df->queryAttributes(), req.getReturnJsonTypeInfo(), req.getReturnJsonTypeInfo()))
                     getRecordFormatFromECL(binLayout, jsonLayout, df->queryAttributes(), req.getReturnJsonTypeInfo(), req.getReturnJsonTypeInfo());
                 if (req.getReturnJsonTypeInfo() && jsonLayout.length())
-                    resp.setRecordTypeInfoJson(jsonLayout.str());
+                    accessInfo.setRecordTypeInfoJson(jsonLayout.str());
                 if (req.getReturnBinTypeInfo() && binLayout.length())
-                    resp.setRecordTypeInfoBin(binLayout);
+                    accessInfo.setRecordTypeInfoBin(binLayout);
             }
             break;
         }
@@ -6076,28 +6109,17 @@ void CWsDfuEx::getFileAccess(IEspContext &context, IUserDescriptor *udesc, SecAc
             throwUnexpected();
     }
 
-    // setup "expiryTime"
-    unsigned expirySecs = req.getExpirySeconds();
-    if (expirySecs > maxFileAccessExpirySeconds)
-        expirySecs = maxFileAccessExpirySeconds;
-    time_t now;
-    time(&now);
-    CDateTime expiryDt;
-    expiryDt.set(now + expirySecs);
-    StringBuffer expiryTimeStr;
-    expiryDt.getString(expiryTimeStr);
-
     StringBuffer keyPairName;
     unsigned port;
     bool secure;
     getFileDafilesrvConfiguration(keyPairName, port, secure, *df);
 
-    StringBuffer metaInfo;
-    getFileMeta(metaInfo, *df, udesc, role, expiryTimeStr, keyPairName, req);
-    resp.setMetaInfoBlob(metaInfo);
-    resp.setExpiryTime(expiryTimeStr);
-    resp.setFileAccessPort(port);
-    resp.setFileAccessSSL(secure);
+    StringBuffer metaInfo, expiryTime;
+    getFileMeta(metaInfo, expiryTime, fileName, fileDesc, udesc, req.getJobId(), keyPairName, req);
+    accessInfo.setMetaInfoBlob(metaInfo);
+    accessInfo.setExpiryTime(expiryTime);
+    accessInfo.setFileAccessPort(port);
+    accessInfo.setFileAccessSSL(secure);
 }
 
 
@@ -6123,13 +6145,16 @@ bool CWsDfuEx::onDFUFileAccess(IEspContext &context, IEspDFUFileAccessRequest &r
 {
     try
     {
-        SecAccessFlags accessType = translateToSecAccessFlags(req.getAccessType());
+        IConstDFUFileAccessRequestBase &requestBase = req.getRequestBase();
+        SecAccessFlags accessType = translateToSecAccessFlags(requestBase.getAccessType());
         if (SecAccess_None == accessType)
-            throw MakeStringException(ECLWATCH_DFU_ACCESS_DENIED, "onDFUFileAccess - Permission denied.");
-        else if (!context.validateFeatureAccess(FEATURE_URL, accessType, false))
-            throw MakeStringException(ECLWATCH_DFU_ACCESS_DENIED, "onDFUFileAccess - Permission denied.");
+        {
+            context.setAuthStatus(AUTH_STATUS_NOACCESS);
+            throw MakeStringException(ECLWATCH_DFU_ACCESS_DENIED, "WsDfu::DFUFileAccess - Permission denied.");
+        }
+        context.ensureFeatureAccess(FEATURE_URL, accessType, ECLWATCH_DFU_ACCESS_DENIED, "WsDfu::DFUFileAccess: Permission denied.");
 
-        if (isEmptyString(req.getName()))
+        if (isEmptyString(requestBase.getName()))
              throw MakeStringException(ECLWATCH_INVALID_INPUT, "No Name defined.");
 
         StringBuffer userID;
@@ -6141,7 +6166,294 @@ bool CWsDfuEx::onDFUFileAccess(IEspContext &context, IEspDFUFileAccessRequest &r
             userDesc.setown(createUserDescriptor());
             userDesc->set(userID.str(), context.queryPassword(), context.querySignature());
         }
-        getFileAccess(context, userDesc, accessType, req, resp);
+        getFileAccess(context, userDesc, accessType, requestBase, resp.updateAccessInfo());
+    }
+    catch (IException *e)
+    {
+        FORWARDEXCEPTION(context, e,  ECLWATCH_INTERNAL_ERROR);
+    }
+    return true;
+}
+
+IGroup *CWsDfuEx::getDFUFileIGroup(const char *clusterName, ClusterType clusterType, const char *clusterTypeEx, StringArray &locations, StringBuffer &groupName)
+{
+    GroupType groupType;
+    StringBuffer basedir;
+    getClusterGroupName(groupName, clusterName);
+    Owned<IGroup> groupFound = queryNamedGroupStore().lookup(groupName.str(), basedir, groupType);
+    if (!groupFound)
+        throw MakeStringException(ECLWATCH_INVALID_INPUT, "Failed to get Group for Group %s.", groupName.str());
+
+    if (!locations.ordinality())
+        return groupFound.getClear();
+
+    StringBuffer locationStr;
+    SocketEndpointArray epa;
+    ForEachItemIn(i, locations)
+    {
+        SocketEndpoint ep(locations.item(i));
+        if (ep.isNull())
+            throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid location '%s'.", locations.item(i));
+        epa.append(ep);
+        locationStr.append(locations.item(i)).append(";");
+    }
+
+    Owned<IGroup> group = createIGroup(epa);
+    if (group->equals(groupFound))
+        return group.getClear();
+
+    groupName.set(DFUFileCreate_GroupNamePrefix).append(hashc((unsigned char*) locationStr.str(), locationStr.length(), 0));
+
+    bool foundGroup = false;
+    unsigned groupNameLength = groupName.length();
+    while (true)
+    {
+        GroupType groupType;
+        StringBuffer basedir;
+        Owned<IGroup> groupFound = queryNamedGroupStore().lookup(groupName.str(), basedir, groupType);
+        if (!groupFound)
+            break;
+
+        if (group->equals(groupFound))
+        {
+            foundGroup = true;
+            ESPLOG(LogMax, "Found DFUFileIGroup %s", groupName.str());
+            break;
+        }
+        //The original group name is used by another group. Rename it to: 'original name + _ + a random number'.
+        //We have to check if the new name is also used by another group. If yes, rename it to:
+        //'the original name + _ + another random number' until we find out a name which is not used.
+        groupName.setLength(groupNameLength); //remove _rand if any
+        groupName.append("_").append(rand());
+    }
+
+    if (!foundGroup)
+    {
+        StringBuffer defaultDir;
+        if (!getConfigurationDirectory(nullptr, ConfigurationDirectoryForDataCategory, clusterTypeEx, groupName.str(), defaultDir))
+            throw MakeStringException(ECLWATCH_INVALID_INPUT, "Failed to get ConfigurationDirectory: %s.", groupName.str());
+
+        GroupType grpType = (clusterType == ThorLCRCluster) ? grp_thor : ((clusterType == HThorCluster) ? grp_hthor : grp_roxie);
+        queryNamedGroupStore().add(groupName.str(), group, false, defaultDir.str(), grpType);
+        ESPLOG(LogMin, "DFUFileIGroup %s added", groupName.str());
+    }
+    return group.getClear();
+}
+
+void CWsDfuEx::exportRecordDefinitionBinaryType(const char *recordDefinition, MemoryBuffer &layoutBin)
+{
+    MultiErrorReceiver errs;
+    Owned<IHqlExpression> expr = parseQuery(recordDefinition, &errs);
+    if (errs.errCount() > 0)
+    {
+        StringBuffer errorMsg;
+        throw MakeStringException(ECLWATCH_INVALID_INPUT, "Failed in parsing ECL %s: %s.", recordDefinition, errs.toString(errorMsg).str());
+    }
+    if (!expr)
+        throw MakeStringException(ECLWATCH_INVALID_INPUT, "Failed in parsing ECL: %s.", recordDefinition);
+    
+    if (!exportBinaryType(layoutBin, expr, false))
+        throw MakeStringException(ECLWATCH_INVALID_INPUT, "Failed in exportBinaryType.");
+}
+
+void CWsDfuEx::getFileAccessBeforePublish(IEspContext &context, const char *fileName, const char *cluster,
+    const char *jobId, MemoryBuffer& layoutBin, IFileDescriptor *fileDesc, IUserDescriptor *udesc,
+    IConstDFUFileAccessRequestBase &req, IEspDFUFileCreateResponse &resp)
+{
+    IEspDFUFileAccessInfo &accessInfo = resp.updateAccessInfo();
+    getFilePartsInfo(context, fileDesc, fileDesc->numParts(), true, accessInfo);
+
+    StringBuffer keyPairName;
+    unsigned port;
+    bool secure;
+    getFileDafilesrvConfiguration(keyPairName, port, secure, cluster);
+
+    StringBuffer metaInfo, expiryTime;
+    getFileMeta(metaInfo, expiryTime, fileName, fileDesc, udesc, jobId, keyPairName, req);
+    accessInfo.setMetaInfoBlob(metaInfo);
+    accessInfo.setExpiryTime(expiryTime);
+
+    if (layoutBin.length() == 0)
+    {
+        if (!req.getReturnJsonTypeInfo() && !req.getReturnBinTypeInfo())
+            return;
+        resp.setWarning("RecordTypeInfoBin or RecordTypeInfoJson requested, but no type info found from ECLRecordDefinition");
+    }
+
+    if (req.getReturnJsonTypeInfo())
+    {
+        StringBuffer jsonLayout;
+        Owned<IRtlFieldTypeDeserializer> deserializer(createRtlFieldTypeDeserializer());
+        const RtlTypeInfo *typeInfo = deserializer->deserialize(layoutBin);
+        dumpTypeInfo(jsonLayout, typeInfo);
+        if (jsonLayout.length())
+            accessInfo.setRecordTypeInfoJson(jsonLayout.str());
+    }
+    if (req.getReturnBinTypeInfo())
+    {
+        MemoryBuffer binLayout;
+        layoutBin.swapWith(binLayout);
+        if (binLayout.length())
+            accessInfo.setRecordTypeInfoBin(binLayout);
+    }
+}
+
+bool CWsDfuEx::onDFUFileCreate(IEspContext &context, IEspDFUFileCreateRequest &req, IEspDFUFileCreateResponse &resp)
+{
+    try
+    {
+        IConstDFUFileAccessRequestBase &requestBase = req.getRequestBase();
+        SecAccessFlags accessType = translateToSecAccessFlags(requestBase.getAccessType());
+        if (SecAccess_None == accessType)
+        {
+            context.setAuthStatus(AUTH_STATUS_NOACCESS);
+            throw MakeStringException(ECLWATCH_DFU_ACCESS_DENIED, "WsDfu::DFUFileCreate - Permission denied.");
+        }
+        context.ensureFeatureAccess(FEATURE_URL, accessType, ECLWATCH_DFU_ACCESS_DENIED, "WsDfu::DFUFileCreate: Permission denied.");
+
+        const char *fileName = requestBase.getName();
+        const char *clusterName = requestBase.getCluster();
+        const char *recordDefinition = req.getECLRecordDefinition();
+        if (isEmptyString(fileName))
+             throw MakeStringException(ECLWATCH_INVALID_INPUT, "No Name defined.");
+        if (isEmptyString(clusterName))
+             throw MakeStringException(ECLWATCH_INVALID_INPUT, "No Cluster defined.");
+        if (isEmptyString(recordDefinition))
+             throw MakeStringException(ECLWATCH_INVALID_INPUT, "No ECLRecordDefinition defined.");
+
+        ClusterType clusterType = getClusterTypeByClusterName(clusterName);
+        if (clusterType == NoCluster)
+             throw MakeStringException(ECLWATCH_INVALID_INPUT, "Cluster %s not found.", clusterName);
+
+        const char *clusterTypeEx = clusterTypeString(clusterType, false);
+
+        StringBuffer groupName;
+        Owned<IGroup> group = getDFUFileIGroup(clusterName, clusterType, clusterTypeEx, req.getPartLocations(), groupName);
+
+        MemoryBuffer layoutBin;
+        exportRecordDefinitionBinaryType(recordDefinition, layoutBin);
+
+        StringBuffer userId;
+        Owned<IUserDescriptor> userDesc;
+        context.getUserID(userId);
+        if (!userId.isEmpty())
+        {
+            userDesc.setown(createUserDescriptor());
+            userDesc->set(userId.str(), context.queryPassword(), context.querySignature());
+        }
+
+        StringBuffer tempFileName = fileName;
+        tempFileName.append(DFUFileCreate_FileNamePostfix);
+
+        StringBuffer jobId = requestBase.getJobId();
+        if (jobId.isEmpty())
+            jobId.appendf("Create %s on %s", tempFileName.str(), clusterName);
+
+        Owned<IFileDescriptor> fileDesc = createFileDescriptor(tempFileName, clusterTypeEx, groupName, group);
+        fileDesc->queryProperties().setProp("@job", jobId);
+        if (!userId.isEmpty())
+            fileDesc->queryProperties().setProp("@owner", userId);
+
+        Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(fileDesc);
+        file->setAccessed();
+        file->setECL(recordDefinition);
+        file->queryAttributes().setPropBin("_rtlType", layoutBin.length(), layoutBin.toByteArray());
+        file->queryAttributes().setPropInt64("@recordCount", 0);
+        file->queryAttributes().setPropInt64("@size", 0);
+
+        getFileAccessBeforePublish(context, tempFileName, clusterName, jobId, layoutBin, fileDesc, userDesc, requestBase, resp);
+
+        //create FileId
+        StringBuffer fileID;
+        fileID.set(groupName.str()).append(DFUFileIdSeparator).append(clusterName).append(DFUFileIdSeparator).append(tempFileName.str());
+        resp.setFileId(fileID.str());
+    }
+    catch (IException *e)
+    {
+        FORWARDEXCEPTION(context, e,  ECLWATCH_INTERNAL_ERROR);
+    }
+    return true;
+}
+
+bool CWsDfuEx::onDFUFilePublish(IEspContext &context, IEspDFUFilePublishRequest &req, IEspDFUFilePublishResponse &resp)
+{
+    try
+    {
+        context.ensureFeatureAccess(FEATURE_URL, SecAccess_Write, ECLWATCH_DFU_ACCESS_DENIED, "WsDfu::DFUFilePublish: Permission denied.");
+
+        const char *fileId = req.getFileId();
+        const char *recordDefinition = req.getECLRecordDefinition();
+        if (isEmptyString(fileId))
+             throw MakeStringException(ECLWATCH_INVALID_INPUT, "No FileId defined.");
+        if (isEmptyString(recordDefinition))
+             throw MakeStringException(ECLWATCH_INVALID_INPUT, "No ECLRecordDefinition defined.");
+
+        StringArray fileIdItems;
+        fileIdItems.appendList(fileId, DFUFileIdSeparator);
+        if (fileIdItems.ordinality() < 3)
+            throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid FileId '%s'.", fileId);
+
+        const char *groupName = fileIdItems.item(0);
+        const char *clusterName = fileIdItems.item(1);
+        const char *tempFileName = fileIdItems.item(2);
+        if (isEmptyString(groupName))
+             throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid FileId: empty groupName.");
+        if (isEmptyString(clusterName))
+             throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid FileId: empty clusterName.");
+        if (isEmptyString(tempFileName))
+             throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid FileId: empty FileName.");
+
+        StringBuffer userId, newFileName;
+        newFileName.set(tempFileName);
+        if (newFileName.length() <= strlen(DFUFileCreate_FileNamePostfix))
+            throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid FileId: cannot read FileName from %s.", tempFileName);
+        newFileName.setLength(newFileName.length() - strlen(DFUFileCreate_FileNamePostfix)); //remove DFUFileCreate_FileNamePostfix
+
+        ClusterType clusterType = getClusterTypeByClusterName(clusterName);
+        if (clusterType == NoCluster)
+             throw MakeStringException(ECLWATCH_INVALID_INPUT, "Cluster %s not found.", clusterName);
+
+        const char *clusterTypeEx = clusterTypeString(clusterType, false);
+
+        GroupType groupType;
+        StringBuffer basedir;
+        Owned<IGroup> group = queryNamedGroupStore().lookup(groupName, basedir, groupType);
+        if (!group)
+            throw MakeStringException(ECLWATCH_FILE_NOT_EXIST, "Failed to find group %s.", groupName);
+
+        MemoryBuffer layoutBin;
+        exportRecordDefinitionBinaryType(recordDefinition, layoutBin);
+
+        Owned<IUserDescriptor> userDesc;
+        context.getUserID(userId);
+        if (!userId.isEmpty())
+        {
+            userDesc.setown(createUserDescriptor());
+            userDesc->set(userId.str(), context.queryPassword(), context.querySignature());
+        }
+
+        VStringBuffer jobId("Publish %s on %s", newFileName.str(), clusterName);
+        Owned<IFileDescriptor> fileDesc = createFileDescriptor(tempFileName, clusterTypeEx, groupName, group);
+        fileDesc->queryProperties().setProp("@job", jobId);
+        if (!userId.isEmpty())
+            fileDesc->queryProperties().setProp("@owner", userId);
+
+        Owned<IDistributedFile> newFile = queryDistributedFileDirectory().createNew(fileDesc);
+        newFile->validate();
+
+        if (!newFile->renamePhysicalPartFiles(newFileName.str(), nullptr, nullptr, nullptr))
+            throw MakeStringException(ECLWATCH_FILE_NOT_EXIST, "Failed in renamePhysicalPartFiles %s.", newFileName.str());
+
+        newFile->setAccessed();
+        newFile->setECL(recordDefinition);
+        newFile->queryAttributes().setPropBin("_rtlType", layoutBin.length(), layoutBin.toByteArray());
+        if (!req.getRecordCount_isNull())
+            newFile->queryAttributes().setPropInt64("@recordCount", req.getRecordCount());
+        if (!req.getFileSize_isNull())
+            newFile->queryAttributes().setPropInt64("@size", req.getFileSize());
+        newFile->attach(newFileName.str(), userDesc);
+
+        LOG(daliAuditLogCat,",FileAccess,,CREATED,%s,%s,%s", groupName, userId.str(), newFileName.str());
     }
     catch (IException *e)
     {

+ 9 - 5
esp/services/ws_dfu/ws_dfuService.hpp

@@ -131,7 +131,6 @@ public:
 
 class CWsDfuEx : public CWsDfu
 {
-private:
     Owned<IXslProcessor> m_xsl;
     Mutex m_superfilemutex;
     unsigned nodeGroupCacheTimeout;
@@ -170,6 +169,8 @@ public:
     virtual bool onListHistory(IEspContext &context, IEspListHistoryRequest &req, IEspListHistoryResponse &resp);
     virtual bool onEraseHistory(IEspContext &context, IEspEraseHistoryRequest &req, IEspEraseHistoryResponse &resp);
     virtual bool onDFUFileAccess(IEspContext &context, IEspDFUFileAccessRequest &req, IEspDFUFileAccessResponse &resp);
+    virtual bool onDFUFileCreate(IEspContext &context, IEspDFUFileCreateRequest &req, IEspDFUFileCreateResponse &resp);
+    virtual bool onDFUFilePublish(IEspContext &context, IEspDFUFilePublishRequest &req, IEspDFUFilePublishResponse &resp);
 
 private:
     const char* getPrefixFromLogicalName(const char* logicalName, StringBuffer& prefix);
@@ -236,11 +237,14 @@ private:
     void queryFieldNames(IEspContext &context, const char *fileName, const char *cluster,
         unsigned __int64 fieldMask, StringArray &fieldNames);
     void parseFieldMask(unsigned __int64 fieldMask, unsigned &fieldCount, IntArray &fieldIndexArray);
-    unsigned getFilePartsInfo(IEspContext &context, IDistributedFile *df, const char *clusterName,
-        IArrayOf<IEspDFUPartLocations> &dfuPartLocations, IArrayOf<IEspDFUPartCopies> &dfuPartCopies);
+    void getFilePartsInfo(IEspContext &context, IFileDescriptor *fdesc, unsigned numParts, bool forFileCreate, IEspDFUFileAccessInfo &accessInfo);
     void getFileDafilesrvConfiguration(StringBuffer &keyPairName, unsigned &port, bool &secure, IDistributedFile &file);
-    void getFileMeta(StringBuffer &metaInfo, IDistributedFile &file, IUserDescriptor *user, CFileAccessRole role, const char *expiryTime, const char *keyPairName, IConstDFUFileAccessRequest &req);
-    void getFileAccess(IEspContext &context, IUserDescriptor *udesc, SecAccessFlags accessType, IEspDFUFileAccessRequest &req, IEspDFUFileAccessResponse &resp);
+    void getFileDafilesrvConfiguration(StringBuffer &keyPairName, unsigned &retPort, bool &retSecure, const char *cluster);
+    void getFileMeta(StringBuffer &metaInfo, StringBuffer &expiryTime, const char *fileName, IFileDescriptor *fDesc, IUserDescriptor *user, const char *jobID, const char *keyPairName, IConstDFUFileAccessRequestBase &req);
+    void getFileAccess(IEspContext &context, IUserDescriptor *udesc, SecAccessFlags accessType, IConstDFUFileAccessRequestBase &req, IEspDFUFileAccessInfo &accessInfo);
+    IGroup *getDFUFileIGroup(const char *clusterName, ClusterType clusterType, const char *clusterTypeEx, StringArray &locations, StringBuffer &groupName);
+    void exportRecordDefinitionBinaryType(const char *recordDefinition, MemoryBuffer &layoutBin);
+    void getFileAccessBeforePublish(IEspContext &context, const char *logicalName, const char *cluster, const char *jobId, MemoryBuffer& layoutBin, IFileDescriptor *fileDesc, IUserDescriptor *udesc, IConstDFUFileAccessRequestBase &req, IEspDFUFileCreateResponse &resp);
 
     bool attachServiceToDali() override
     {