Jelajahi Sumber

HPCC-10666 Operations on Files should not, by default, change the state of
the file. e.g. STD.File.Copy converts compressed files to uncompressed files.

Add preservecompression parameter to preserve compression in copy

Extend filecompcopy.ecl to test preservecompression flag

Update filecompcopy.xml key file

Signed-off-by: Attila Vamos <attila.vamos@gmail.com>

Attila Vamos 10 tahun lalu
induk
melakukan
59c15e3f27

+ 0 - 1
dali/dfu/dfurun.cpp

@@ -1220,7 +1220,6 @@ public:
                             opttree->setPropBool("@recordStructurePresent", true);
 
                         opttree->setPropBool("@quotedTerminator", options->getQuotedTerminator());
-
                         Owned<IFileDescriptor> fdesc = destination->getFileDescriptor(iskey,options->getSuppressNonKeyRepeats()&&!iskey);
                         if (fdesc) {
                             if (options->getSubfileCopy()) {// need to set destination compressed or not

+ 9 - 0
dali/dfu/dfuwu.cpp

@@ -2057,6 +2057,15 @@ public:
         queryRoot()->setPropBool("@quotedTerminator",val);
     }
 
+    bool getPreserveCompression() const
+    {
+        return queryRoot()->getPropBool("@preserveCompression");
+    }
+
+    void setPreserveCompression(bool val)
+    {
+        queryRoot()->setPropBool("@preserveCompression",val);
+    }
 };
 
 class CExceptionIterator: public CInterface, implements IExceptionIterator

+ 2 - 3
dali/dfu/dfuwu.hpp

@@ -165,13 +165,11 @@ interface IConstDFUoptions : extends IInterface
     virtual bool getSuppressNonKeyRepeats() const = 0;
     virtual bool getSubfileCopy() const = 0;                                // i.e. called by supercopy
     virtual bool getEncDec(StringAttr &enc,StringAttr &dec) = 0;
-
     virtual IPropertyTree *queryTree() const = 0;                   // used by DFU server
     virtual bool getFailIfNoSourceFile() const = 0;
-
     virtual bool getRecordStructurePresent() const = 0;
-
     virtual bool getQuotedTerminator() const = 0;
+    virtual bool getPreserveCompression() const = 0;
 };
 
 interface IDFUoptions : extends IConstDFUoptions
@@ -209,6 +207,7 @@ interface IDFUoptions : extends IConstDFUoptions
     virtual void setFailIfNoSourceFile(bool val=false) = 0;
     virtual void setRecordStructurePresent(bool val=false) = 0;
     virtual void setQuotedTerminator(bool val=true) = 0;
+    virtual void setPreserveCompression(bool val=true) = 0;
 };
 
 interface IConstDFUfileSpec: extends IInterface

+ 2 - 0
dali/dfuplus/dfuplus.cpp

@@ -744,6 +744,8 @@ int CDfuPlusHelper::copy()
     }
     if(globals->hasProp("compress"))
         req->setCompress(globals->getPropBool("compress", false));
+    if(globals->hasProp("preserveCompression"))
+        req->setPreserveCompression(globals->getPropBool("preserveCompression", true));
     if(globals->hasProp("encrypt"))
         req->setEncrypt(globals->queryProp("encrypt"));
     if(globals->hasProp("decrypt"))

+ 1 - 0
dali/dfuplus/main.cpp

@@ -107,6 +107,7 @@ void handleSyntax()
     out.append("        diffkeysrc=<old-key-name>   -- use keydiff/keypatch (src old name)\n");
     out.append("        diffkeydst=<old-key-name>   -- use keydiff/keypatch (dst old name)\n");
     out.append("        multicopy=0|1   -- each destination part gets whole file\n");
+    out.append("        preservecompression=1|0 -- optional, default is 1 (preserve compression)\n");
     out.append("    remove options:\n");
     out.append("        name=<logical-name>\n");
     out.append("        names=<multiple-logical-names-separated-by-comma>\n");

+ 8 - 0
dali/ft/daft.cpp

@@ -57,6 +57,14 @@ void CDistributedFileSystem::copy(IDistributedFile * from, IDistributedFile * to
     sprayer->setPartFilter(filter);
     sprayer->setSource(from);
     sprayer->setTarget(to);
+
+    bool compressInput = from->isCompressed();
+    bool compressOutput = options->getPropBool("@compress");
+    bool preserveCompression = options->getPropBool("@preserveCompression");
+    LOG(MCdebugInfo, unknownJob, "DFS: compressInput:%d, compressOutput:%d, preserveCompression:%d ", compressInput, compressOutput, preserveCompression);
+    if (preserveCompression & compressInput)
+        sprayer->setTargetCompression(compressInput);
+
     sprayer->spray();
 }
 

+ 11 - 1
dali/ft/filecopy.cpp

@@ -2500,6 +2500,11 @@ void FileSprayer::setSourceTarget(IFileDescriptor * fd, DaftReplicateMode mode)
         setCopyCompressedRaw();
 }
 
+void FileSprayer::setTargetCompression(bool compress)
+{
+    compressOutput = compress;
+}
+
 void FileSprayer::setTarget(IDistributedFile * target)
 {
     distributedTarget.set(target);
@@ -2733,6 +2738,8 @@ void FileSprayer::spray()
     if ((sourceSize == 0) && failIfNoSourceFile)
         throwError(DFTERR_NoFilesMatchWildcard);
 
+    LOG(MCdebugInfo, job, "compressedInput:%d, compressOutput:%d", compressedInput, compressOutput);
+
     LocalAbortHandler localHandler(daftAbortHandler);
 
     if (allowRecovery && progressTree->getPropBool(ANcomplete))
@@ -2973,7 +2980,10 @@ void FileSprayer::updateTargetProperties()
                      (stricmp(aname,"@eclCRC")==0)||
                      (stricmp(aname,"@formatCrc")==0)||
                      (stricmp(aname,"@owner")==0)||
-                     ((stricmp(aname,FArecordCount)==0)&&!gotrc))
+                     ((stricmp(aname,FArecordCount)==0)&&!gotrc) ||
+                     ((stricmp(aname,"@blockCompressed")==0)&&copyCompressed) ||
+                     ((stricmp(aname,"@rowCompressed")==0)&&copyCompressed)
+                     )
                     )
                     curProps.setProp(aname,aiter->queryValue());
             }

+ 1 - 0
dali/ft/filecopy.hpp

@@ -122,6 +122,7 @@ public:
     virtual void setSource(IDistributedFilePart * part) = 0;
     virtual void setSourceTarget(IFileDescriptor * fd, DaftReplicateMode mode) = 0;
     virtual void setTarget(IDistributedFile * target) = 0;
+    virtual void setTargetCompression(bool compress) = 0;
     virtual void setTarget(IFileDescriptor * target, unsigned copy=0) = 0;
     virtual void setTarget(IGroup * target) = 0;
     virtual void setTarget(INode * target) = 0;

+ 2 - 0
dali/ft/filecopy.ipp

@@ -192,6 +192,7 @@ public:
     virtual void setSource(IDistributedFilePart * part);
     virtual void setSourceTarget(IFileDescriptor * fd, DaftReplicateMode mode);
     virtual void setTarget(IDistributedFile * target);
+    virtual void setTargetCompression(bool compress);
     virtual void setTarget(IFileDescriptor * target, unsigned copy);
     virtual void setTarget(IGroup * target);
     virtual void setTarget(INode * target);
@@ -315,6 +316,7 @@ protected:
     size32_t                transferBufferSize;
     StringAttr              encryptKey;
     StringAttr              decryptKey;
+    bool                    preserveCompression;
 };
 
 

+ 4 - 4
ecllibrary/std/File.ecl

@@ -554,8 +554,8 @@ EXPORT Despray(varstring logicalName, varstring destinationIP, varstring destina
  * @return              The DFU workunit id for the job.
  */
 
-EXPORT varstring fCopy(varstring sourceLogicalName, varstring destinationGroup, varstring destinationLogicalName, varstring sourceDali='', integer4 timeOut=-1, varstring espServerIpPort=GETENV('ws_fs_server'), integer4 maxConnections=-1, boolean allowOverwrite=FALSE, boolean replicate=FALSE, boolean asSuperfile=FALSE, boolean compress=FALSE, boolean forcePush=FALSE, integer4 transferBufferSize=0) :=
-    lib_fileservices.FileServices.fCopy(sourceLogicalName, destinationGroup, destinationLogicalName, sourceDali, timeOut, espServerIpPort, maxConnections, allowOverwrite, replicate, asSuperfile, compress, forcePush, transferBufferSize);
+EXPORT varstring fCopy(varstring sourceLogicalName, varstring destinationGroup, varstring destinationLogicalName, varstring sourceDali='', integer4 timeOut=-1, varstring espServerIpPort=GETENV('ws_fs_server'), integer4 maxConnections=-1, boolean allowOverwrite=FALSE, boolean replicate=FALSE, boolean asSuperfile=FALSE, boolean compress=FALSE, boolean forcePush=FALSE, integer4 transferBufferSize=0, boolean preserveCompression=TRUE) :=
+    lib_fileservices.FileServices.fCopy(sourceLogicalName, destinationGroup, destinationLogicalName, sourceDali, timeOut, espServerIpPort, maxConnections, allowOverwrite, replicate, asSuperfile, compress, forcePush, transferBufferSize, preserveCompression);
 
 /**
  * Same as fCopy, but does not return the DFU Workunit ID.
@@ -563,8 +563,8 @@ EXPORT varstring fCopy(varstring sourceLogicalName, varstring destinationGroup,
  * @see fCopy
  */
 
-EXPORT Copy(varstring sourceLogicalName, varstring destinationGroup, varstring destinationLogicalName, varstring sourceDali='', integer4 timeOut=-1, varstring espServerIpPort=GETENV('ws_fs_server'), integer4 maxConnections=-1, boolean allowOverwrite=FALSE, boolean replicate=FALSE, boolean asSuperfile=FALSE, boolean compress=FALSE, boolean forcePush=FALSE, integer4 transferBufferSize=0) :=
-    lib_fileservices.FileServices.Copy(sourceLogicalName, destinationGroup, destinationLogicalName, sourceDali, timeOut, espServerIpPort, maxConnections, allowOverwrite, replicate, asSuperfile, compress, forcePush, transferBufferSize);
+EXPORT Copy(varstring sourceLogicalName, varstring destinationGroup, varstring destinationLogicalName, varstring sourceDali='', integer4 timeOut=-1, varstring espServerIpPort=GETENV('ws_fs_server'), integer4 maxConnections=-1, boolean allowOverwrite=FALSE, boolean replicate=FALSE, boolean asSuperfile=FALSE, boolean compress=FALSE, boolean forcePush=FALSE, integer4 transferBufferSize=0, boolean preserveCompression=TRUE) :=
+    lib_fileservices.FileServices.Copy(sourceLogicalName, destinationGroup, destinationLogicalName, sourceDali, timeOut, espServerIpPort, maxConnections, allowOverwrite, replicate, asSuperfile, compress, forcePush, transferBufferSize, preserveCompression);
 
 /**
  * Ensures the specified file is replicated to its mirror copies.

+ 3 - 3
esp/scm/ws_fs.ecm

@@ -82,10 +82,9 @@ ESPStruct [nil_remove] DFUWorkunit
     string decrypt;
 
     [min_ver("1.08")] bool failIfNoSourceFile(false);
-
     [min_ver("1.09")] bool recordStructurePresent(false);
-
     [min_ver("1.10")] bool quotedTerminator(true);
+    [min_ver("1.12")] bool preserveCompression(true);
 };
 
 ESPStruct [nil_remove] GroupNode
@@ -455,6 +454,7 @@ ESPrequest [nil_remove] Copy
     string encrypt;
     string decrypt;
 
+    [min_ver("1.12")] bool preserveCompression(true);
 };
 
 ESPresponse [exceptions_inline] 
@@ -629,7 +629,7 @@ ESPresponse [exceptions_inline, nil_remove] GetSprayTargetsResponse
 };
 
 ESPservice [
-    version("1.11"),
+    version("1.12"),
     exceptions_inline("./smc_xslt/exceptions.xslt")] FileSpray
 {
     ESPuses ESPstruct DFUWorkunit;

+ 1 - 0
esp/services/ws_fs/ws_fsService.cpp

@@ -2438,6 +2438,7 @@ bool CFileSprayEx::onCopy(IEspContext &context, IEspCopy &req, IEspCopyResponse
         wuFSpecDest->setLogicalName(dstname);
         wuFSpecDest->setFileMask(fileMask.str());
         wuOptions->setOverwrite(req.getOverwrite());
+        wuOptions->setPreserveCompression(req.getPreserveCompression());
 
         if (bRoxie)
         {

File diff ditekan karena terlalu besar
+ 25 - 8
plugins/fileservices/fileservices.cpp


+ 2 - 0
plugins/fileservices/fileservices.hpp

@@ -58,6 +58,7 @@ FILESERVICES_API void FILESERVICES_CALL fsSprayVariable_v5(ICodeContext *ctx, co
 FILESERVICES_API void FILESERVICES_CALL fsSprayXml(ICodeContext *ctx, const char * sourceIP, const char * sourcePath, int sourceMaxRecordSize, const char *sourceRowTag, const char *sourceEncoding, const char * destinationGroup, const char * destinationLogicalName, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite, bool replicate, bool compress=false, bool failIfNoSourceFile=false);
 FILESERVICES_API void FILESERVICES_CALL fsDespray(ICodeContext *ctx, const char * sourceLogicalName, const char * destinationIP, const char * destinationPath, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite=false);
 FILESERVICES_API void FILESERVICES_CALL fsCopy(ICodeContext *ctx, const char * sourceLogicalName, const char *destinationGroup, const char * destinationLogicalName, const char * sourceDali, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite, bool replicate, bool asSuperfile, bool compress, bool forcePush, int transferBufferSize);
+FILESERVICES_API void FILESERVICES_CALL fsCopy_v2(ICodeContext *ctx, const char * sourceLogicalName, const char *destinationGroup, const char * destinationLogicalName, const char * sourceDali, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite, bool replicate, bool asSuperfile, bool compress, bool forcePush, int transferBufferSize, bool preserveCompression);
 FILESERVICES_API void FILESERVICES_CALL fsDkc(ICodeContext *ctx, const char * sourceLogicalName, const char * destinationIP, const char * destinationPath, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite);
 FILESERVICES_API void FILESERVICES_CALL fsReplicate(ICodeContext *ctx, const char * sourceLogicalName, int timeOut, const char * espServerIpPort);
 FILESERVICES_API void FILESERVICES_CALL fsCreateSuperFile(ICodeContext *ctx, const char *lsuperfn, bool sequentialparts, bool ifdoesnotexist);
@@ -89,6 +90,7 @@ FILESERVICES_API char * FILESERVICES_CALL fsfSprayVariable_v5(ICodeContext *ctx,
 FILESERVICES_API char * FILESERVICES_CALL fsfSprayXml(ICodeContext *ctx, const char * sourceIP, const char * sourcePath, int sourceMaxRecordSize, const char *sourceRowTag, const char *sourceEncoding, const char * destinationGroup, const char * destinationLogicalName, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite, bool replicate, bool compress=false, bool failIfNoSourceFile=false);
 FILESERVICES_API char * FILESERVICES_CALL fsfDespray(ICodeContext *ctx, const char * sourceLogicalName, const char * destinationIP, const char * destinationPath, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite=false);
 FILESERVICES_API char * FILESERVICES_CALL fsfCopy(ICodeContext *ctx, const char * sourceLogicalName, const char *destinationGroup, const char * destinationLogicalName, const char * sourceDali, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite, bool replicate, bool asSuperfile, bool compress, bool forcePush, int transferBufferSize);
+FILESERVICES_API char * FILESERVICES_CALL fsfCopy_v2(ICodeContext *ctx, const char * sourceLogicalName, const char *destinationGroup, const char * destinationLogicalName, const char * sourceDali, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite, bool replicate, bool asSuperfile, bool compress, bool forcePush, int transferBufferSize, bool preserveCompression);
 FILESERVICES_API char * FILESERVICES_CALL fsfDkc(ICodeContext *ctx, const char * sourceLogicalName, const char * destinationIP, const char * destinationPath, int timeOut, const char * espServerIpPort, int maxConnections, bool overwrite);
 FILESERVICES_API char * FILESERVICES_CALL fsfReplicate(ICodeContext *ctx, const char * sourceLogicalName, int timeOut, const char * espServerIpPort);
 FILESERVICES_API char *  FILESERVICES_CALL fsfMonitorLogicalFileName(ICodeContext *ctx, const char *eventname, const char *_lfn,int shotcount, const char * espServerIpPort);

+ 33 - 1
testing/regress/ecl/filecompcopy.ecl

@@ -65,6 +65,8 @@ outdata := NORMALIZE(one_per_node, numrecs, fillRow(LEFT, counter));
 
 copiedcmp1 := DATASET('nhtest::testfile_exp_copy_cmp', rec, flat, __compressed__);
 copiedcmp2 := DATASET('nhtest::testfile_cmp_copy_cmp', rec, flat, __compressed__);
+copiedcmp3 := DATASET('nhtest::testfile_cmp_copy_cmp2', rec, flat, __compressed__);
+copiedcmp4 := DATASET('nhtest::testfile_cmp_copy_cmp3', rec, flat, __compressed__);
 copiedexp := DATASET('nhtest::testfile_cmp_copy_exp', rec, flat);
 
 unsigned compareDatasets(dataset(rec) ds1,dataset(rec) ds2) := FUNCTION
@@ -106,6 +108,34 @@ sequential (
             true                        // compress
            ),  
 
+// test copy compressed to compressed with preservecompression flag
+
+FileServices.Copy('nhtest::testfile_cmp',   // sourceLogicalName
+            '',                             // destinationGroup
+            'nhtest::testfile_cmp_copy_cmp2', // destinationLogicalName
+            ,                               // sourceDali
+            5*60*1000,                      // timeOut
+            ,                               // espServerIpPort
+            ,                               // maxConnections
+            true,                           // allowoverwrite
+            false,                          // replicate
+            ,                               // asSuperfile,
+            ,
+            PRESERVECOMPRESSION:=true
+           ),
+
+// test copy compressed to compressed without preservecompression flag
+
+FileServices.Copy('nhtest::testfile_cmp',   // sourceLogicalName
+            '',                             // destinationGroup
+            'nhtest::testfile_cmp_copy_cmp3', // destinationLogicalName
+            ,                               // sourceDali
+            5*60*1000,                      // timeOut
+            ,                               // espServerIpPort
+            ,                               // maxConnections
+            true,                           // allowoverwrite
+            false                           // replicate
+           ),
 
 // test copy compressed to expanded  
 
@@ -117,13 +147,15 @@ sequential (
             ,                               // espServerIpPort
             ,                               // maxConnections, 
             true,                           // allowoverwrite
-            true,                       // replicate=false, 
+            false,                       // replicate=false,
             ,                           // asSuperfile, 
             false                       // compress
            ),
            
    OUTPUT(compareDatasets(outdata,copiedcmp1)),
    OUTPUT(compareDatasets(outdata,copiedcmp2)),
+   OUTPUT(compareDatasets(outdata,copiedcmp3)),
+   OUTPUT(compareDatasets(outdata,copiedcmp4)),
    OUTPUT(compareDatasets(outdata,copiedexp))
            
  );

+ 6 - 0
testing/regress/ecl/key/filecompcopy.xml

@@ -11,3 +11,9 @@
 <Dataset name='Result 5'>
  <Row><Result_5>0</Result_5></Row>
 </Dataset>
+<Dataset name='Result 6'>
+ <Row><Result_6>0</Result_6></Row>
+</Dataset>
+<Dataset name='Result 7'>
+ <Row><Result_7>0</Result_7></Row>
+</Dataset>