浏览代码

HPCC-19964 Std.File.Copy not preserving compression

Add code to preserve compression

Remove unused definitions and code

Update filecompcopy.ecl

Tested maually with local and remote copy

Signed-off-by: Attila Vamos <attila.vamos@gmail.com>
Attila Vamos 7 年之前
父节点
当前提交
eac7723c3b
共有 6 个文件被更改,包括 68 次插入45 次删除
  1. 56 25
      dali/dfu/dfurun.cpp
  2. 0 8
      dali/ft/daft.cpp
  3. 8 6
      dali/ft/filecopy.cpp
  4. 0 1
      dali/ft/filecopy.hpp
  5. 0 1
      dali/ft/filecopy.ipp
  6. 4 4
      testing/regress/ecl/filecompcopy.ecl

+ 56 - 25
dali/dfu/dfurun.cpp

@@ -1265,30 +1265,35 @@ public:
             case DFUcmd_add:
                 {
                     destination->getLogicalName(tmp.clear());
-                    if (tmp.length()) {
+                    if (tmp.length())
+                    {
                         CDfsLogicalFileName tmpdlfn;
                         StringBuffer newroxieprefix;
                         constructDestinationName(tmp.str(),oldRoxiePrefix,destination,tmpdlfn,newroxieprefix);
                         tmp.clear().append(tmpdlfn.get());
                         bool iswin;
-                        if (!destination->getWindowsOS(iswin)) { // would normally know!
+                        if (!destination->getWindowsOS(iswin)) // would normally know!
+                            {
                             // set default OS to cluster 0
                             Owned<IGroup> grp=destination->getGroup(0);
-                            if (grp.get()) {
-                                switch (queryOS(grp->queryNode(0).endpoint())) {
-                                case MachineOsW2K:
-                                    destination->setWindowsOS(true);
-                                    iswin = false;
-                                    break;
-                                case MachineOsSolaris:
-                                case MachineOsLinux:
-                                    destination->setWindowsOS(false);
-                                    iswin = false;
-                                    break;
+                            if (grp.get())
+                            {
+                                switch (queryOS(grp->queryNode(0).endpoint()))
+                                {
+                                    case MachineOsW2K:
+                                        destination->setWindowsOS(true);
+                                        iswin = false;
+                                        break;
+                                    case MachineOsSolaris:
+                                    case MachineOsLinux:
+                                        destination->setWindowsOS(false);
+                                        iswin = false;
+                                        break;
                                 };
                             }
                         }
-                        if (destination->getWrap()) {
+                        if (destination->getWrap())
+                        {
                             Owned<IFileDescriptor> fdesc = source?source->getFileDescriptor():NULL;
                             if (fdesc)
                                 destination->setNumPartsOverride(fdesc->numParts());
@@ -1304,25 +1309,44 @@ public:
 
                         opttree->setPropBool("@quotedTerminator", options->getQuotedTerminator());
                         Owned<IFileDescriptor> fdesc = destination->getFileDescriptor(iskey,options->getSuppressNonKeyRepeats()&&!iskey);
-                        if (fdesc) {
-                            if (options->getSubfileCopy()) {// need to set destination compressed or not
+                        if (fdesc)
+                        {
+                            if (options->getSubfileCopy()) // need to set destination compressed or not
+                            {
                                 if (opttree->getPropBool("@compress"))
                                     fdesc->queryProperties().setPropBool("@blockCompressed",true);
                                 else
                                     fdesc->queryProperties().removeProp("@blockCompressed");
                             }
-                            if (!encryptkey.isEmpty()) {
+                            if (!encryptkey.isEmpty())
+                            {
                                 fdesc->queryProperties().setPropBool("@encrypted",true);
                                 fdesc->queryProperties().setPropBool("@blockCompressed",true);
                             }
-                            if (multiclusterinsert) {
+                            else if (options->getPreserveCompression())
+                            {
+                                bool dstCompressed = false;
+                                if (srcFile)
+                                    dstCompressed = srcFile->isCompressed();
+                                else
+                                {
+                                    IFileDescriptor * srcDesc = (auxfdesc.get() ? auxfdesc.get() : foreignfdesc.get());
+                                    dstCompressed = srcDesc && srcDesc->isCompressed();
+                                }
+                                if (dstCompressed)
+                                    fdesc->queryProperties().setPropBool("@blockCompressed",true);
+                            }
+
+                            if (multiclusterinsert)
+                            {
                                 if (foreigncopy)
                                     throw MakeStringException(-1,"Cannot create multi cluster file in foreign file");
                                 StringBuffer err;
                                 if (!srcFile->checkClusterCompatible(*fdesc,err))
                                     throw MakeStringException(-1,"Incompatible file for multicluster add - %s",err.str());
                             }
-                            else if (multiclustermerge) {
+                            else if (multiclustermerge)
+                            {
                                 dstFile.setown(fdir.lookup(tmp.str(),userdesc,true));
                                 if (!dstFile)
                                     throw MakeStringException(-1,"Destination for merge %s does not exist",tmp.str());
@@ -1330,9 +1354,11 @@ public:
                                 if (!dstFile->checkClusterCompatible(*fdesc,err))
                                     throw MakeStringException(-1,"Incompatible file for multicluster merge - %s",err.str());
                             }
-                            else {
+                            else
+                            {
                                 Owned<IDistributedFile> oldfile = fdir.lookup(tmp.str(),userdesc,true);
-                                if (oldfile) {
+                                if (oldfile)
+                                {
                                     StringBuffer reason;
                                     bool canRemove = oldfile->canRemove(reason);
                                     oldfile.clear();
@@ -1362,7 +1388,8 @@ public:
                             dstName.set(tmp.str());
                         }
                     }
-                    if (!dstFile&&!multiclusterinsert) {
+                    if (!dstFile&&!multiclusterinsert)
+                    {
                         throw MakeStringException(-1,"Destination file %s could not be created",tmp.str());
                     }
                 }
@@ -1457,9 +1484,13 @@ public:
                                 Audit("COPYDIFF",userdesc,srcName.get(),dstName.get());
                             }
                         }
-                        else if (foreigncopy||auxfdesc) {
-                            fsys.import(auxfdesc.get()?auxfdesc.get():foreignfdesc.get(), dstFile, recovery, recoveryconn, filter, opttree, &feedback, &abortnotify, dfuwuid);
-                            if (!abortnotify.abortRequested()) {
+                        else if (foreigncopy||auxfdesc)
+                        {
+                            IFileDescriptor * srcDesc = (auxfdesc.get() ? auxfdesc.get() : foreignfdesc.get());
+                            fsys.import(srcDesc, dstFile, recovery, recoveryconn, filter, opttree, &feedback, &abortnotify, dfuwuid);
+
+                            if (!abortnotify.abortRequested())
+                            {
                                 if (needrep)
                                     replicating = true;
                                 else

+ 0 - 8
dali/ft/daft.cpp

@@ -58,14 +58,6 @@ void CDistributedFileSystem::copy(IDistributedFile * from, IDistributedFile * to
     sprayer->setPartFilter(filter);
     sprayer->setSource(from);
     sprayer->setTarget(to);
-
-    bool compressInput = from->isCompressed();
-    bool compressOutput = options->getPropBool("@compress");
-    bool preserveCompression = options->getPropBool("@preserveCompression");
-    LOG(MCdebugInfo, unknownJob, "DFS: compressInput:%d, compressOutput:%d, preserveCompression:%d ", compressInput, compressOutput, preserveCompression);
-    if (preserveCompression & compressInput)
-        sprayer->setTargetCompression(compressInput);
-
     sprayer->spray();
 }
 

+ 8 - 6
dali/ft/filecopy.cpp

@@ -2504,6 +2504,7 @@ void FileSprayer::setSource(IDistributedFile * source)
     if (history)
         srcHistory.setown(createPTreeFromIPT(history));
 
+    compressedInput = source->isCompressed();
     extractSourceFormat(srcAttr);
     unsigned numParts = source->numParts();
     for (unsigned idx=0; idx < numParts; idx++)
@@ -2538,6 +2539,7 @@ void FileSprayer::setSource(IFileDescriptor * source)
 void FileSprayer::setSource(IFileDescriptor * source, unsigned copy, unsigned mirrorCopy)
 {
     IPropertyTree *attr = &source->queryProperties();
+    compressedInput = source->isCompressed();
     extractSourceFormat(attr);
     srcAttr.setown(createPTreeFromIPT(&source->queryProperties()));
     IPropertyTree *history = source->queryHistory();
@@ -2637,15 +2639,15 @@ void FileSprayer::setSourceTarget(IFileDescriptor * fd, DaftReplicateMode mode)
         setCopyCompressedRaw();
 }
 
-void FileSprayer::setTargetCompression(bool compress)
-{
-    compressOutput = compress;
-}
-
 void FileSprayer::setTarget(IDistributedFile * target)
 {
     distributedTarget.set(target);
-    compressOutput = !encryptKey.isEmpty()||target->isCompressed();
+
+    compressOutput = target->isCompressed();
+
+    LOG(MCdebugInfo, unknownJob, "FileSprayer::setTarget: compressedInput:%s, compressOutput:%s",
+                                boolToStr(compressedInput),
+                                boolToStr(compressOutput));
 
     if (tgtFormat.restore(&target->queryAttributes()))
         unknownTargetFormat = false;

+ 0 - 1
dali/ft/filecopy.hpp

@@ -145,7 +145,6 @@ public:
     virtual void setSource(IDistributedFilePart * part) = 0;
     virtual void setSourceTarget(IFileDescriptor * fd, DaftReplicateMode mode) = 0;
     virtual void setTarget(IDistributedFile * target) = 0;
-    virtual void setTargetCompression(bool compress) = 0;
     virtual void setTarget(IFileDescriptor * target, unsigned copy=0) = 0;
     virtual void setTarget(IGroup * target) = 0;
     virtual void setTarget(INode * target) = 0;

+ 0 - 1
dali/ft/filecopy.ipp

@@ -192,7 +192,6 @@ public:
     virtual void setSource(IDistributedFilePart * part);
     virtual void setSourceTarget(IFileDescriptor * fd, DaftReplicateMode mode);
     virtual void setTarget(IDistributedFile * target);
-    virtual void setTargetCompression(bool compress);
     virtual void setTarget(IFileDescriptor * target, unsigned copy);
     virtual void setTarget(IGroup * target);
     virtual void setTarget(INode * target);

+ 4 - 4
testing/regress/ecl/filecompcopy.ecl

@@ -64,10 +64,10 @@ rec fillRow(rec L, unsigned4 c) := transform
 
 outdata := NORMALIZE(one_per_node, numrecs, fillRow(LEFT, counter));  
 
-copiedcmp1 := DATASET(prefix + 'testfile_exp_copy_cmp', rec, flat, __compressed__);
-copiedcmp2 := DATASET(prefix + 'testfile_cmp_copy_cmp', rec, flat, __compressed__);
-copiedcmp3 := DATASET(prefix + 'testfile_cmp_copy_cmp2', rec, flat, __compressed__);
-copiedcmp4 := DATASET(prefix + 'testfile_cmp_copy_cmp3', rec, flat, __compressed__);
+copiedcmp1 := DATASET(prefix + 'testfile_exp_copy_cmp', rec, flat);
+copiedcmp2 := DATASET(prefix + 'testfile_cmp_copy_cmp', rec, flat);
+copiedcmp3 := DATASET(prefix + 'testfile_cmp_copy_cmp2', rec, flat);
+copiedcmp4 := DATASET(prefix + 'testfile_cmp_copy_cmp3', rec, flat);
 copiedexp := DATASET(prefix + 'testfile_cmp_copy_exp', rec, flat);
 
 unsigned compareDatasets(dataset(rec) ds1,dataset(rec) ds2) := FUNCTION