Browse Source

HPCC-10077 Spray compressed does not set @compressedSize on file parts

First update after review.

Signed-off-by: Attila Vamos <attila.vamos@gmail.com>
Attila Vamos 11 years ago
parent
commit
2cf4434896
5 changed files with 18 additions and 21 deletions
  1. 1 12
      dali/ft/filecopy.cpp
  2. 0 1
      dali/ft/filecopy.ipp
  3. 13 0
      dali/ft/ftbase.cpp
  4. 2 0
      dali/ft/ftbase.ipp
  5. 2 8
      dali/ft/fttransform.cpp

+ 1 - 12
dali/ft/filecopy.cpp

@@ -381,17 +381,6 @@ bool FileTransferThread::performTransfer()
 
             OutputProgress newProgress;
             newProgress.deserialize(msg);
-            // We can receive the compressed file size from ftclient after
-            // the file transformed and renamed
-            if( sprayer.compressOutput && (newProgress.status == OutputProgress::StatusRenamed))
-            {
-                size32_t compressedSize;
-                if( msg.remaining() >= sizeof(compressedSize))
-                    msg.read(compressedSize);
-                else
-                    compressedSize = newProgress.outputLength;
-                sprayer.compressedPartSize.append(compressedSize);
-            }
             sprayer.updateProgress(newProgress);
 
             LOG(MCdebugProgress(10000), job, "Update %s: %d %"I64F"d->%"I64F"d", url.str(), newProgress.whichPartition, newProgress.inputLength, newProgress.outputLength);
@@ -2859,7 +2848,7 @@ void FileSprayer::updateTargetProperties()
 
                 if (compressOutput)
                 {
-                    curProps.setPropInt64(FAcompressedSize, compressedPartSize.item(idx));
+                    curProps.setPropInt64(FAcompressedSize, curProgress.compressedPartSize);
                 }
 
                 TargetLocation & curTarget = targets.item(cur.whichOutput);

+ 0 - 1
dali/ft/filecopy.ipp

@@ -313,7 +313,6 @@ protected:
     size32_t                transferBufferSize;
     StringAttr              encryptKey;
     StringAttr              decryptKey;
-    UnsignedArray           compressedPartSize;
 };
 
 

+ 13 - 0
dali/ft/ftbase.cpp

@@ -509,6 +509,7 @@ void OutputProgress::reset()
     inputLength = 0;
     outputCRC = 0;
     outputLength = 0;
+    compressedPartSize = 0;
 }
 
 MemoryBuffer & OutputProgress::deserialize(MemoryBuffer & in)
@@ -522,6 +523,9 @@ MemoryBuffer & OutputProgress::deserialize(MemoryBuffer & in)
         resultTime.deserialize(in);
     else
         resultTime.clear();
+    in.read(hasCompressed);
+    if (hasCompressed)
+        in.read(compressedPartSize);
     return in;
 }
 
@@ -540,6 +544,9 @@ MemoryBuffer & OutputProgress::serialize(MemoryBuffer & out)
     out.append(status).append(whichPartition).append(hasInputCRC).append(_inputCRC).append(inputLength).append(_outputCRC).append(outputLength).append(hasTime);
     if (hasTime)
         resultTime.serialize(out);
+    out.append(hasCompressed);
+    if (hasCompressed )
+        out.append(compressedPartSize);
     return out;
 }
 
@@ -554,6 +561,8 @@ void OutputProgress::set(const OutputProgress & other)
     outputLength = other.outputLength;
     status = other.status;
     resultTime = other.resultTime;
+    hasCompressed = other.hasCompressed;
+    compressedPartSize = other.compressedPartSize;
 }
 
 void OutputProgress::restore(IPropertyTree * tree)
@@ -566,6 +575,8 @@ void OutputProgress::restore(IPropertyTree * tree)
     outputCRC = tree->getPropInt("@outputCRC");
     outputLength = tree->getPropInt64("@outputLength");
     resultTime.setString(tree->queryProp("@modified"));
+    hasCompressed = tree->getPropInt("@compressed");
+    compressedPartSize = tree->getPropInt64("@compressedPartSize");
 }
 
 void OutputProgress::save(IPropertyTree * tree)
@@ -582,6 +593,8 @@ void OutputProgress::save(IPropertyTree * tree)
         StringBuffer timestr;
         tree->setProp("@modified", resultTime.getString(timestr));
     }
+    tree->setPropInt("@compressed", hasCompressed);
+    tree->setPropInt64("@compressedPartSize", compressedPartSize);
 }
 
 

+ 2 - 0
dali/ft/ftbase.ipp

@@ -80,6 +80,8 @@ public:
     CDateTime       resultTime;
     byte            status;
     bool            hasInputCRC;
+    bool            hasCompressed;
+    offset_t        compressedPartSize;
 
 //Not saved/serialized - should probably be in a Sprayer-only class that contains an outputProgress.
     Owned<IPropertyTree> tree;

+ 2 - 8
dali/ft/fttransform.cpp

@@ -830,6 +830,7 @@ processedProgress:
                     compressor.setown(createAESCompressor256(key.length(),key.str()));
                 }
                 outio.setown(createCompressedFileWriter(outio, 0, true, compressor));
+                curProgress.hasCompressed = true;
             }
 
             LOG(MCdebugProgress, unknownJob, "Start pulling to file: %s", localFilename.str());
@@ -903,15 +904,8 @@ processedProgress:
                     //Notify the master that the file has been renamed - and send the modified time.
                     msg.setEndian(__BIG_ENDIAN);
                     curProgress.status = OutputProgress::StatusRenamed;
+                    curProgress.compressedPartSize = output->compressedSize();
                     curProgress.serialize(msg.clear().append(false));
-                    // If output compressed then send the compressed size in the end of
-                    // 'StatusRenamed' message
-                    if (compressOutput)
-                    {
-                        size32_t compressedSize = output->compressedSize();
-                        msg.append(compressedSize);
-                    }
-
                     if (!catchWriteBuffer(masterSocket, msg))
                         throwError(RFSERR_TimeoutWaitMaster);
                 }