Ver código fonte

Merge remote-tracking branch 'origin/candidate-4.2.2' into closedown-4.2.x

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 anos atrás
pai
commit
1e8b28b881
4 arquivos alterados com 49 adições e 15 exclusões
  1. 7 2
      dali/ft/filecopy.cpp
  2. 30 8
      dali/ft/ftbase.cpp
  3. 4 2
      dali/ft/ftbase.ipp
  4. 8 3
      dali/ft/fttransform.cpp

+ 7 - 2
dali/ft/filecopy.cpp

@@ -351,7 +351,7 @@ bool FileTransferThread::performTransfer()
 
         msg.append(progress.ordinality());
         ForEachItemIn(i, progress)
-            progress.item(i).serialize(msg);
+            progress.item(i).serializeCore(msg);
 
         msg.append(sprayer.throttleNicSpeed);
         msg.append(sprayer.compressedInput);
@@ -364,6 +364,10 @@ bool FileTransferThread::performTransfer()
         sprayer.srcFormat.serializeExtra(msg, 1);
         sprayer.tgtFormat.serializeExtra(msg, 1);
 
+        ForEachItemIn(i2, progress)
+            progress.item(i2).serializeExtra(msg, 1);
+
+        //NB: Any extra data must be appended at the end...
         if (!catchWriteBuffer(socket, msg))
             throwError1(RFSERR_TimeoutWaitConnect, url.str());
 
@@ -380,7 +384,8 @@ bool FileTransferThread::performTransfer()
                 break;
 
             OutputProgress newProgress;
-            newProgress.deserialize(msg);
+            newProgress.deserializeCore(msg);
+            newProgress.deserializeExtra(msg, 1);
             sprayer.updateProgress(newProgress);
 
             LOG(MCdebugProgress(10000), job, "Update %s: %d %"I64F"d->%"I64F"d", url.str(), newProgress.whichPartition, newProgress.inputLength, newProgress.outputLength);

+ 30 - 8
dali/ft/ftbase.cpp

@@ -513,7 +513,7 @@ void OutputProgress::reset()
     compressedPartSize = 0;
 }
 
-MemoryBuffer & OutputProgress::deserialize(MemoryBuffer & in)
+MemoryBuffer & OutputProgress::deserializeCore(MemoryBuffer & in)
 { 
     unsigned _inputCRC, _outputCRC;
     bool hasTime;
@@ -524,9 +524,22 @@ MemoryBuffer & OutputProgress::deserialize(MemoryBuffer & in)
         resultTime.deserialize(in);
     else
         resultTime.clear();
-    in.read(hasCompressed);
-    if (hasCompressed)
-        in.read(compressedPartSize);
+    return in;
+}
+
+MemoryBuffer & OutputProgress::deserializeExtra(MemoryBuffer & in, unsigned version)
+{
+    if (in.remaining())
+    {
+        switch (version)
+        {
+        case 1:
+            in.read(hasCompressed);
+            if (hasCompressed)
+                in.read(compressedPartSize);
+            break;
+        }
+    }
     return in;
 }
 
@@ -537,7 +550,7 @@ void OutputProgress::trace()
     LOG(MCdebugInfoDetail, unknownJob, "[%d] %s  %"I64F"d[%x]->%"I64F"d[%x]", whichPartition, statusText[status], inputLength, inputCRC, outputLength, outputCRC);
 }
 
-MemoryBuffer & OutputProgress::serialize(MemoryBuffer & out)        
+MemoryBuffer & OutputProgress::serializeCore(MemoryBuffer & out)        
 { 
     bool hasTime = !resultTime.isNull();
     unsigned _inputCRC = inputCRC;
@@ -545,12 +558,21 @@ MemoryBuffer & OutputProgress::serialize(MemoryBuffer & out)
     out.append(status).append(whichPartition).append(hasInputCRC).append(_inputCRC).append(inputLength).append(_outputCRC).append(outputLength).append(hasTime);
     if (hasTime)
         resultTime.serialize(out);
-    out.append(hasCompressed);
-    if (hasCompressed )
-        out.append(compressedPartSize);
     return out;
 }
 
+MemoryBuffer & OutputProgress::serializeExtra(MemoryBuffer & out, unsigned version)
+{
+    switch (version)
+    {
+    case 1:
+        out.append(hasCompressed);
+        if (hasCompressed )
+            out.append(compressedPartSize);
+        break;
+    }
+    return out;
+}
 
 void OutputProgress::set(const OutputProgress & other)
 {

+ 4 - 2
dali/ft/ftbase.ipp

@@ -64,10 +64,12 @@ public:
     void reset();
     void set(const OutputProgress & other);
 
-    MemoryBuffer & deserialize(MemoryBuffer & in);
+    MemoryBuffer & deserializeCore(MemoryBuffer & in);
+    MemoryBuffer & deserializeExtra(MemoryBuffer & in, unsigned version);
     void restore(IPropertyTree * tree);
     void save(IPropertyTree * tree);
-    MemoryBuffer & serialize(MemoryBuffer & out);
+    MemoryBuffer & serializeCore(MemoryBuffer & out);
+    MemoryBuffer & serializeExtra(MemoryBuffer & out, unsigned version);
     void trace();
 
 public:

+ 8 - 3
dali/ft/fttransform.cpp

@@ -503,7 +503,8 @@ void TransferServer::sendProgress(OutputProgress & curProgress)
 {
     MemoryBuffer msg;
     msg.setEndian(__BIG_ENDIAN);
-    curProgress.serialize(msg.clear().append(false));
+    curProgress.serializeCore(msg.clear().append(false));
+    curProgress.serializeExtra(msg, 1);
     if (!catchWriteBuffer(masterSocket, msg))
         throwError(RFSERR_TimeoutWaitMaster);
 
@@ -618,7 +619,7 @@ void TransferServer::deserializeAction(MemoryBuffer & msg, unsigned action)
     for (unsigned i = 0; i < numProgress; i++)
     {
         OutputProgress & next = *new OutputProgress;
-        next.deserialize(msg);
+        next.deserializeCore(msg);
         progress.append(next);
     }
     if (msg.remaining())
@@ -637,6 +638,9 @@ void TransferServer::deserializeAction(MemoryBuffer & msg, unsigned action)
         tgtFormat.deserializeExtra(msg, 1);
     }
 
+    ForEachItemIn(i1, progress)
+        progress.item(i1).deserializeExtra(msg, 1);
+
     LOG(MCdebugProgress, unknownJob, "throttle(%d), transferBufferSize(%d)", throttleNicSpeed, transferBufferSize);
     PROGLOG("compressedInput(%d), compressedOutput(%d), copyCompressed(%d)", compressedInput?1:0, compressOutput?1:0, copyCompressed?1:0);
     PROGLOG("encrypt(%d), decrypt(%d)", encryptKey.isEmpty()?0:1, decryptKey.isEmpty()?0:1);
@@ -908,7 +912,8 @@ processedProgress:
                         curProgress.compressedPartSize = output->size();
                         curProgress.hasCompressed = true;
                     }
-                    curProgress.serialize(msg.clear().append(false));
+                    curProgress.serializeCore(msg.clear().append(false));
+                    curProgress.serializeExtra(msg, 1);
                     if (!catchWriteBuffer(masterSocket, msg))
                         throwError(RFSERR_TimeoutWaitMaster);
                 }