Browse Source

Merge remote-tracking branch 'origin/closedown-4.2.x'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 years ago
parent
commit
8e55d578c0
6 changed files with 83 additions and 57 deletions
  1. 32 40
      common/workunit/workunit.cpp
  2. 2 2
      dali/base/dadfs.cpp
  3. 7 2
      dali/ft/filecopy.cpp
  4. 30 8
      dali/ft/ftbase.cpp
  5. 4 2
      dali/ft/ftbase.ipp
  6. 8 3
      dali/ft/fttransform.cpp

+ 32 - 40
common/workunit/workunit.cpp

@@ -787,7 +787,7 @@ public:
     void unlockRemote(bool closing);
     void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen=0, const void *valueData=NULL);
     void abort();
-    void cleanupAndDelete(bool deldll,bool deleteOwned);
+    void cleanupAndDelete(bool deldll,bool deleteOwned, const StringArray *deleteExclusions=NULL);
     bool switchThorQueue(const char *cluster, IQueueSwitcher *qs);
     void setAllowedClusters(const char *value);
     IStringVal & getAllowedClusters(IStringVal & str) const;
@@ -1940,25 +1940,20 @@ mapEnums querySortFields[] =
 class asyncRemoveDllWorkItem: public CInterface, implements IWorkQueueItem // class only used in asyncRemoveDll
 {
     StringAttr name;
-    bool removeDlls;
-    bool removeDirectory;
 public:
     IMPLEMENT_IINTERFACE;
 
-    asyncRemoveDllWorkItem(const char * _name, bool _removeDlls, bool _removeDirectory)
-        : name(_name)
+    asyncRemoveDllWorkItem(const char * _name) : name(_name)
     {
-        removeDlls = _removeDlls;
-        removeDirectory = _removeDirectory;
     }
     void execute()
     {
-        PROGLOG("WU removeDll %s",name.get());
-        queryDllServer().removeDll(name,removeDlls,removeDirectory);
+        PROGLOG("WU removeDll %s", name.get());
+        queryDllServer().removeDll(name, true, true); // <name>, removeDlls=true, removeDirectory=true
     }
 };      
 
-class asyncRemoveRemoteFileWorkItem: public CInterface, implements IWorkQueueItem // class only used in asyncRemoveDll
+class asyncRemoveRemoteFileWorkItem: public CInterface, implements IWorkQueueItem // class only used in asyncRemoveFile
 {
     RemoteFilename name;
 public:
@@ -2595,10 +2590,9 @@ public:
         return numWorkUnitsFiltered(filters,filterbuf,NULL,NULL);
     }
 
-    void asyncRemoveDll(const char * name, bool removeDlls, bool removeDirectory)
+    void asyncRemoveDll(const char * name)
     {
-        const char * tail = pathTail(name);
-        deletedllworkq->post(new asyncRemoveDllWorkItem(tail,removeDlls,removeDirectory));
+        deletedllworkq->post(new asyncRemoveDllWorkItem(name));
     }
 
     void asyncRemoveFile(const char * ip, const char * name)
@@ -3017,7 +3011,7 @@ CLocalWorkUnit::~CLocalWorkUnit()
     catch (IException *E) { LOG(MCexception(E, MSGCLS_warning), E, "Exception during ~CLocalWorkUnit"); E->Release(); }
 }
 
-void CLocalWorkUnit::cleanupAndDelete(bool deldll,bool deleteOwned)
+void CLocalWorkUnit::cleanupAndDelete(bool deldll, bool deleteOwned, const StringArray *deleteExclusions)
 {
     TIME_SECTION("WUDELETE cleanupAndDelete total");
     // Delete any related things in SDS etc that might otherwise be forgotten
@@ -3060,20 +3054,22 @@ void CLocalWorkUnit::cleanupAndDelete(bool deldll,bool deleteOwned)
             {
                 Owned<IConstWUAssociatedFileIterator> iter = &q->getAssociatedFiles();
                 SCMStringBuffer name;
-                SCMStringBuffer ip;
                 ForEach(*iter)
                 {
                     IConstWUAssociatedFile & cur = iter->query();
-                    cur.getName(name);
-                    if (cur.getType() == FileTypeDll)
+                    cur.getNameTail(name);
+                    if (!deleteExclusions || (NotFound == deleteExclusions->find(name.str())))
                     {
-                        bool removeDir = true;        // this is to keep the code the same as before, but I don't know why it only does it for the dll.
-                        factory->asyncRemoveDll(name.str(), true, removeDir);
-                    }
-                    else
-                    {
-                        cur.getIp(ip);
-                        factory->asyncRemoveFile(ip.str(), name.str());
+                        Owned<IDllEntry> entry = queryDllServer().getEntry(name.str());
+                        if (entry.get())
+                            factory->asyncRemoveDll(name.str());
+                        else
+                        {
+                            SCMStringBuffer ip, localPath;
+                            cur.getName(localPath);
+                            cur.getIp(ip);
+                            factory->asyncRemoveFile(ip.str(), localPath.str());
+                        }
                     }
                 }
             }
@@ -3183,6 +3179,7 @@ bool CLocalWorkUnit::archiveWorkUnit(const char *base,bool del,bool ignoredllerr
         return false;   
     }
 
+    StringArray deleteExclusions; // associated files not to delete, added if failure to copy
     Owned<IConstWUAssociatedFileIterator> iter = &q->getAssociatedFiles();
     SCMStringBuffer name;
     Owned<IException> exception;
@@ -3201,7 +3198,6 @@ bool CLocalWorkUnit::archiveWorkUnit(const char *base,bool del,bool ignoredllerr
                 Owned<IPropertyTree> generatedDllBranch = createPTree();
                 generatedDllBranch->setProp("@name", entry->queryName());
                 generatedDllBranch->setProp("@kind", entry->queryKind());
-                bool removeDllFiles = true;
                 exception.clear();
                 try
                 {
@@ -3223,7 +3219,11 @@ bool CLocalWorkUnit::archiveWorkUnit(const char *base,bool del,bool ignoredllerr
                     try
                     {
                         if (dstfile->exists())
-                            removeDllFiles = false; // i.e. previously archived
+                        {
+                            if (streq(srcfile->queryFilename(), dstfile->queryFilename()))
+                                deleteExclusions.append(name.str()); // restored workunit, referencing archive location for query dll
+                            // still want to delete if already archived but there are source file copies
+                        }
                         else
                             copyFile(dstfile,srcfile);
                         makeAbsolutePath(dstfile->queryFilename(), locpath.clear());
@@ -3240,7 +3240,7 @@ bool CLocalWorkUnit::archiveWorkUnit(const char *base,bool del,bool ignoredllerr
                         EXCLOG(exception.get(), "archiveWorkUnit (copying associated file)");
                         //copy failed, so store original (best) location and don't delete the files
                         filename.getRemotePath(locpath.clear());
-                        removeDllFiles = false;
+                        deleteExclusions.append(name.str());
                     }
                     else
                     {
@@ -3249,21 +3249,15 @@ bool CLocalWorkUnit::archiveWorkUnit(const char *base,bool del,bool ignoredllerr
                 }
                 generatedDllBranch->setProp("@location", locpath.str());
                 generatedDlls->addPropTree("GeneratedDll", generatedDllBranch.getClear());
-                if (del)
-                {
-                    bool removeDir = (cur.getType() == FileTypeDll); // copied from cleanupAndDelete code, above
-                    if (!p->getPropBool("@isClone", false))         // Leak to protect against cloned WUs
-                        entry->remove(removeDllFiles, removeDir);
-                }
             }
             else // no generated dll entry
             {
-                SCMStringBuffer fname, ip;
-                cur.getName(fname);
+                SCMStringBuffer localPath, ip;
+                cur.getName(localPath);
                 cur.getIp(ip);
                 SocketEndpoint ep(ip.str());
                 RemoteFilename rfn;
-                rfn.setPath(ep, fname.str());
+                rfn.setPath(ep, localPath.str());
                 Owned<IFile> srcFile = createIFile(rfn);
                 addPathSepChar(dst.clear().append(base));
                 rfn.getTail(dst);
@@ -3271,14 +3265,13 @@ bool CLocalWorkUnit::archiveWorkUnit(const char *base,bool del,bool ignoredllerr
                 try
                 {
                     copyFile(dstFile, srcFile);
-                    if (del)
-                        srcFile->remove();
                 }
                 catch (IException *e)
                 {
                     VStringBuffer msg("Failed to archive associated file '%s' to destination '%s'", srcFile->queryFilename(), dstFile->queryFilename());
                     EXCLOG(e, msg.str());
                     e->Release();
+                    deleteExclusions.append(name.str());
                 }
             }
         }
@@ -3294,8 +3287,7 @@ bool CLocalWorkUnit::archiveWorkUnit(const char *base,bool del,bool ignoredllerr
     {
         //setState(WUStateArchived);    // this isn't useful as about to delete it!
         q.clear();
-        //deldll false as should have deleted all those we successfully copied, and archived and removed SDS entries, above
-        cleanupAndDelete(false, deleteOwned);
+        cleanupAndDelete(true, deleteOwned, &deleteExclusions);
     }
 
     return true;

+ 2 - 2
dali/base/dadfs.cpp

@@ -4438,7 +4438,7 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
         }
         bool prepare()
         {
-            parent.setown(transaction->lookupSuperFile(parentlname,true));
+            parent.setown(transaction->lookupSuperFile(parentlname));
             if (!parent)
                 throw MakeStringException(-1,"removeSubFile: SuperFile %s cannot be found",parentlname.get());
             if (!subfile.isEmpty())
@@ -4530,7 +4530,7 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
         }
         bool prepare()
         {
-            parent.setown(transaction->lookupSuperFile(parentlname,true));
+            parent.setown(transaction->lookupSuperFile(parentlname));
             if (!parent)
                 throw MakeStringException(-1,"removeOwnedSubFiles: SuperFile %s cannot be found", parentlname.get());
             // Try to lock all files

+ 7 - 2
dali/ft/filecopy.cpp

@@ -351,7 +351,7 @@ bool FileTransferThread::performTransfer()
 
         msg.append(progress.ordinality());
         ForEachItemIn(i, progress)
-            progress.item(i).serialize(msg);
+            progress.item(i).serializeCore(msg);
 
         msg.append(sprayer.throttleNicSpeed);
         msg.append(sprayer.compressedInput);
@@ -364,6 +364,10 @@ bool FileTransferThread::performTransfer()
         sprayer.srcFormat.serializeExtra(msg, 1);
         sprayer.tgtFormat.serializeExtra(msg, 1);
 
+        ForEachItemIn(i2, progress)
+            progress.item(i2).serializeExtra(msg, 1);
+
+        //NB: Any extra data must be appended at the end...
         if (!catchWriteBuffer(socket, msg))
             throwError1(RFSERR_TimeoutWaitConnect, url.str());
 
@@ -380,7 +384,8 @@ bool FileTransferThread::performTransfer()
                 break;
 
             OutputProgress newProgress;
-            newProgress.deserialize(msg);
+            newProgress.deserializeCore(msg);
+            newProgress.deserializeExtra(msg, 1);
             sprayer.updateProgress(newProgress);
 
             LOG(MCdebugProgress(10000), job, "Update %s: %d %"I64F"d->%"I64F"d", url.str(), newProgress.whichPartition, newProgress.inputLength, newProgress.outputLength);

+ 30 - 8
dali/ft/ftbase.cpp

@@ -513,7 +513,7 @@ void OutputProgress::reset()
     compressedPartSize = 0;
 }
 
-MemoryBuffer & OutputProgress::deserialize(MemoryBuffer & in)
+MemoryBuffer & OutputProgress::deserializeCore(MemoryBuffer & in)
 { 
     unsigned _inputCRC, _outputCRC;
     bool hasTime;
@@ -524,9 +524,22 @@ MemoryBuffer & OutputProgress::deserialize(MemoryBuffer & in)
         resultTime.deserialize(in);
     else
         resultTime.clear();
-    in.read(hasCompressed);
-    if (hasCompressed)
-        in.read(compressedPartSize);
+    return in;
+}
+
+MemoryBuffer & OutputProgress::deserializeExtra(MemoryBuffer & in, unsigned version)
+{
+    if (in.remaining())
+    {
+        switch (version)
+        {
+        case 1:
+            in.read(hasCompressed);
+            if (hasCompressed)
+                in.read(compressedPartSize);
+            break;
+        }
+    }
     return in;
 }
 
@@ -537,7 +550,7 @@ void OutputProgress::trace()
     LOG(MCdebugInfoDetail, unknownJob, "[%d] %s  %"I64F"d[%x]->%"I64F"d[%x]", whichPartition, statusText[status], inputLength, inputCRC, outputLength, outputCRC);
 }
 
-MemoryBuffer & OutputProgress::serialize(MemoryBuffer & out)        
+MemoryBuffer & OutputProgress::serializeCore(MemoryBuffer & out)        
 { 
     bool hasTime = !resultTime.isNull();
     unsigned _inputCRC = inputCRC;
@@ -545,12 +558,21 @@ MemoryBuffer & OutputProgress::serialize(MemoryBuffer & out)
     out.append(status).append(whichPartition).append(hasInputCRC).append(_inputCRC).append(inputLength).append(_outputCRC).append(outputLength).append(hasTime);
     if (hasTime)
         resultTime.serialize(out);
-    out.append(hasCompressed);
-    if (hasCompressed )
-        out.append(compressedPartSize);
     return out;
 }
 
+MemoryBuffer & OutputProgress::serializeExtra(MemoryBuffer & out, unsigned version)
+{
+    switch (version)
+    {
+    case 1:
+        out.append(hasCompressed);
+        if (hasCompressed )
+            out.append(compressedPartSize);
+        break;
+    }
+    return out;
+}
 
 void OutputProgress::set(const OutputProgress & other)
 {

+ 4 - 2
dali/ft/ftbase.ipp

@@ -64,10 +64,12 @@ public:
     void reset();
     void set(const OutputProgress & other);
 
-    MemoryBuffer & deserialize(MemoryBuffer & in);
+    MemoryBuffer & deserializeCore(MemoryBuffer & in);
+    MemoryBuffer & deserializeExtra(MemoryBuffer & in, unsigned version);
     void restore(IPropertyTree * tree);
     void save(IPropertyTree * tree);
-    MemoryBuffer & serialize(MemoryBuffer & out);
+    MemoryBuffer & serializeCore(MemoryBuffer & out);
+    MemoryBuffer & serializeExtra(MemoryBuffer & out, unsigned version);
     void trace();
 
 public:

+ 8 - 3
dali/ft/fttransform.cpp

@@ -503,7 +503,8 @@ void TransferServer::sendProgress(OutputProgress & curProgress)
 {
     MemoryBuffer msg;
     msg.setEndian(__BIG_ENDIAN);
-    curProgress.serialize(msg.clear().append(false));
+    curProgress.serializeCore(msg.clear().append(false));
+    curProgress.serializeExtra(msg, 1);
     if (!catchWriteBuffer(masterSocket, msg))
         throwError(RFSERR_TimeoutWaitMaster);
 
@@ -618,7 +619,7 @@ void TransferServer::deserializeAction(MemoryBuffer & msg, unsigned action)
     for (unsigned i = 0; i < numProgress; i++)
     {
         OutputProgress & next = *new OutputProgress;
-        next.deserialize(msg);
+        next.deserializeCore(msg);
         progress.append(next);
     }
     if (msg.remaining())
@@ -637,6 +638,9 @@ void TransferServer::deserializeAction(MemoryBuffer & msg, unsigned action)
         tgtFormat.deserializeExtra(msg, 1);
     }
 
+    ForEachItemIn(i1, progress)
+        progress.item(i1).deserializeExtra(msg, 1);
+
     LOG(MCdebugProgress, unknownJob, "throttle(%d), transferBufferSize(%d)", throttleNicSpeed, transferBufferSize);
     PROGLOG("compressedInput(%d), compressedOutput(%d), copyCompressed(%d)", compressedInput?1:0, compressOutput?1:0, copyCompressed?1:0);
     PROGLOG("encrypt(%d), decrypt(%d)", encryptKey.isEmpty()?0:1, decryptKey.isEmpty()?0:1);
@@ -908,7 +912,8 @@ processedProgress:
                         curProgress.compressedPartSize = output->size();
                         curProgress.hasCompressed = true;
                     }
-                    curProgress.serialize(msg.clear().append(false));
+                    curProgress.serializeCore(msg.clear().append(false));
+                    curProgress.serializeExtra(msg, 1);
                     if (!catchWriteBuffer(masterSocket, msg))
                         throwError(RFSERR_TimeoutWaitMaster);
                 }