浏览代码

HPCC-8050 RemoveEntry/Physical in transactions

Added removal of files (logically and physically) to transactions
as delayed delete. Unit and regression tests added.

Signed-off-by: Renato Golin <rengolin@hpccsystems.com>
Renato Golin 12 年之前
父节点
当前提交
c6b5eddf33

+ 2 - 2
common/workunit/workunit.cpp

@@ -5581,7 +5581,7 @@ void CLocalWorkUnit::deleteTempFiles(const char *graph, bool deleteOwned, bool d
         {
             const char *name = file.queryProp("@name");
             LOG(MCdebugProgress, unknownJob, "Removing workunit file %s from DFS", name);
-            queryDistributedFileDirectory().removePhysical(name, queryUserDescriptor(), NULL, NULL);
+            queryDistributedFileDirectory().removePhysical(name, queryUserDescriptor());
             toRemove.append(file);
         }
     }
@@ -5689,7 +5689,7 @@ void CLocalWorkUnit::releaseFile(const char *fileName)
             files->removeTree(file);
             if (!name.isEmpty()&&(1 == usageCount))
             {
-                if (queryDistributedFileDirectory().removePhysical(fileName, queryUserDescriptor(), NULL, NULL))
+                if (queryDistributedFileDirectory().removePhysical(fileName, queryUserDescriptor()))
                     LOG(MCdebugProgress, unknownJob, "Removed (released) file %s from DFS", name.get());
             }
         }

+ 92 - 32
dali/base/dadfs.cpp

@@ -912,8 +912,8 @@ public:
     bool existsPhysical(const char *_logicalname,IUserDescriptor *user);
 
     void addEntry(CDfsLogicalFileName &lfn,IPropertyTree *root,bool superfile, bool ignoreexists);
-    bool removeEntry(const char *_logicalname,IUserDescriptor *user, unsigned timeoutms=INFINITE);
-    bool removePhysical(const char *_logicalname,IUserDescriptor *user,const char *cluster,IMultiException *mexcept);
+    bool removeEntry(const char *_logicalname,IUserDescriptor *user, unsigned timeoutms=INFINITE, IDistributedFileTransaction *transaction=NULL);
+    bool removePhysical(const char *_logicalname,IUserDescriptor *user,const char *cluster=NULL,unsigned timeoutms=INFINITE,IDistributedFileTransaction *transaction=NULL);
     void renamePhysical(const char *oldname,const char *newname,IUserDescriptor *user,IDistributedFileTransaction *transaction);
     void removeEmptyScope(const char *name);
 
@@ -1170,30 +1170,29 @@ static StringBuffer &checkNeedFQ(const char *filename, const char *dir, RemoteFi
 
 class CDelayedDelete: public CInterface
 {
-    StringAttr lfn;
+    StringAttr cluster;
+    CDfsLogicalFileName lfn;
     bool remphys;
     Linked<IUserDescriptor> user;
+    unsigned timeoutms;
 public:
-    CDelayedDelete(const char *_lfn,bool _remphys,IUserDescriptor *_user)
-        : lfn(_lfn),user(_user)
+    CDelayedDelete(const char *_lfn,bool _remphys,IUserDescriptor *_user,unsigned _timeoutms,const char* _cluster=NULL)
+        : user(_user), timeoutms(_timeoutms), cluster(_cluster)
     {
+        lfn.set(_lfn);
         remphys = _remphys;
     }
-    void doDelete()
+    void doDelete() // Throw on error!
     {
-        try {
-            if (remphys) 
-                queryDistributedFileDirectory().removePhysical(lfn.get(),user.get(),NULL,NULL);
-            else 
-                queryDistributedFileDirectory().removeEntry(lfn.get(),user);
-        }
-        catch (IException *e) {
-
-            StringBuffer s;
-            s.appendf("Transaction commit deleting %s: ",lfn.get());
-            e->errorMessage(s);
-            WARNLOG("%s",s.str());
-            e->Release();
+        CDistributedFileDirectory *dir = QUERYINTERFACE(&queryDistributedFileDirectory(), CDistributedFileDirectory);
+        assertex(dir);
+        if (remphys) {
+            Owned<IMultiException> exceptions = MakeMultiException("Transaction");
+            dir->doRemovePhysical(lfn,cluster,exceptions,user,false);
+            if (exceptions->ordinality())
+                throw exceptions.getClear();
+        } else {
+            dir->doRemoveEntry(lfn,user,false,timeoutms);
         }
     }
 };
@@ -1402,18 +1401,25 @@ public:
         return udesc;
     }
 
-    bool addDelayedDelete(const char *lfn,bool remphys,IUserDescriptor *user)
+    bool addDelayedDelete(const char *lfn,bool remphys,IUserDescriptor *user,unsigned timeoutms,const char*cluster)
     {
-        delayeddelete.append(*new CDelayedDelete(lfn,remphys,user));
+        delayeddelete.append(*new CDelayedDelete(lfn,remphys,user,timeoutms,cluster));
         return true;
     }
     
-    void deleteFiles()      // no rollback, no exceptions thrown, no regrets
+    void deleteFiles()      // no rollback at this point
     {
+        Owned<IMultiException> me = MakeMultiException("Transaction");
         ForEachItemIn(i,delayeddelete) {
-            delayeddelete.item(i).doDelete();
+            try {
+                delayeddelete.item(i).doDelete();
+            } catch (IException *e) {
+                me->append(*e);
+            }
         }
         delayeddelete.kill();
+        if (me->ordinality())
+            throw me.getClear();
     }
 };
 
@@ -6970,11 +6976,38 @@ bool CDistributedFileDirectory::doRemoveEntry(CDfsLogicalFileName &dlfn,IUserDes
 }
 
 
-bool CDistributedFileDirectory::removeEntry(const char *name,IUserDescriptor *user, unsigned timeoutms)
+bool CDistributedFileDirectory::removeEntry(const char *name,IUserDescriptor *user, unsigned timeoutms, IDistributedFileTransaction *transaction)
 {
-    CDfsLogicalFileName dlfn;   
-    dlfn.set(name);
-    return doRemoveEntry(dlfn,user,false, timeoutms);
+    CDfsLogicalFileName logicalname;
+    logicalname.set(name);
+    checkLogicalName(logicalname,user,true,true,false,"delete");
+
+    // Create a local transaction that will be destroyed (but never touch the external transaction)
+    Linked<IDistributedFileTransaction> localtrans;
+    if (transaction) {
+        localtrans.set(transaction);
+    } else {
+        // TODO: Make it explicit in the API that a transaction is required
+        localtrans.setown(new CDistributedFileTransaction(user));
+    }
+
+    // action is owned by transaction (acquired on CDFAction's c-tor) so don't unlink or delete!
+    localtrans->addDelayedDelete(logicalname.get(), false, user, timeoutms);
+
+    try
+    {
+        localtrans->autoCommit();
+    }
+    catch (IException *e)
+    {
+        // TODO: Transform removeEntry into void
+        StringBuffer msg;
+        e->errorMessage(msg);
+        ERRLOG("Error while deleting %s: %s", logicalname.get(), msg.str());
+        e->Release();
+        return false;
+    }
+    return true;
 }
 
 void CDistributedFileDirectory::removeEmptyScope(const char *scope)
@@ -7031,11 +7064,38 @@ bool CDistributedFileDirectory::doRemovePhysical(CDfsLogicalFileName &dlfn,const
     return true;
 }
 
-bool CDistributedFileDirectory::removePhysical(const char *_logicalname,IUserDescriptor *user,const char *cluster,IMultiException *exceptions)
+bool CDistributedFileDirectory::removePhysical(const char *_logicalname,IUserDescriptor *user,const char *cluster,unsigned timeoutms,IDistributedFileTransaction *transaction)
 {
-    CDfsLogicalFileName dlfn;
-    dlfn.set(_logicalname);
-    return doRemovePhysical(dlfn,cluster,exceptions,user,false);
+    CDfsLogicalFileName logicalname;
+    logicalname.set(_logicalname);
+    checkLogicalName(logicalname,user,true,true,false,"delete physical");
+
+    // Create a local transaction that will be destroyed (but never touch the external transaction)
+    Linked<IDistributedFileTransaction> localtrans;
+    if (transaction) {
+        localtrans.set(transaction);
+    } else {
+        // TODO: Make it explicit in the API that a transaction is required
+        localtrans.setown(new CDistributedFileTransaction(user));
+    }
+
+    // action is owned by transaction (acquired on CDFAction's c-tor) so don't unlink or delete!
+    localtrans->addDelayedDelete(logicalname.get(), true, user, timeoutms, cluster);
+
+    try
+    {
+        localtrans->autoCommit();
+    }
+    catch (IException *e)
+    {
+        // TODO: Transform removePhysical into void
+        StringBuffer msg;
+        e->errorMessage(msg);
+        ERRLOG("Error while deleting %s: %s", logicalname.get(), msg.str());
+        e->Release();
+        return false;
+    }
+    return true;
 }
 
     
@@ -9708,7 +9768,7 @@ void CDistributedFileDirectory::promoteSuperFiles(unsigned numsf,const char **sf
     // MORE - once deletion of logic files are also in transaction we can move this up (and allow promote within transactions)
     if (delsub) {
         ForEachItemIn(j,outunlinked) 
-            removePhysical(outunlinked.item(j),user,NULL,NULL);
+            removePhysical(outunlinked.item(j),user,NULL,timeout,transaction);
     }
 }
 

+ 3 - 3
dali/base/dadfs.hpp

@@ -185,7 +185,7 @@ interface IDistributedFileTransaction: extends IInterface
     virtual IDistributedSuperFile *lookupSuperFile(const char *slfn,unsigned timeout=INFINITE)=0;
     virtual IDistributedSuperFile *lookupSuperFileCached(const char *slfn,unsigned timeout=INFINITE)=0;
     virtual IUserDescriptor *queryUser()=0;
-    virtual bool addDelayedDelete(const char *lfn,bool remphys,IUserDescriptor *user)=0; // used internally to delay deletes untill commit 
+    virtual bool addDelayedDelete(const char *lfn,bool remphys,IUserDescriptor *user,unsigned timeoutms=INFINITE,const char*cluster=NULL)=0; // used internally to delay deletes untill commit
     virtual void addAction(CDFAction *action)=0; // internal
     virtual void addFile(IDistributedFile *file)=0; // TODO: avoid this being necessary
     virtual void clearFiles()=0; // internal
@@ -457,8 +457,8 @@ interface IDistributedFileDirectory: extends IInterface
 
     virtual IDFScopeIterator *getScopeIterator(IUserDescriptor *user, const char *subscope=NULL,bool recursive=true,bool includeempty=false)=0;
 
-    virtual bool removeEntry(const char *name,IUserDescriptor *user, unsigned timeoutms=INFINITE) = 0;  // equivalent to lookup/detach/release
-    virtual bool removePhysical(const char *name,IUserDescriptor *user,const char *cluster=NULL,IMultiException *exceptions=NULL) = 0;                           // removes the physical parts as well as entry
+    virtual bool removeEntry(const char *name,IUserDescriptor *user, unsigned timeoutms=INFINITE, IDistributedFileTransaction *transaction=NULL) = 0;  // equivalent to lookup/detach/release
+    virtual bool removePhysical(const char *name,IUserDescriptor *user,const char *cluster=NULL,unsigned timeoutms=INFINITE,IDistributedFileTransaction *transaction=NULL) = 0;                           // removes the physical parts as well as entry
     virtual void renamePhysical(const char *oldname,const char *newname,IUserDescriptor *user,IDistributedFileTransaction *transaction) = 0;                         // renames the physical parts as well as entry
     virtual void removeEmptyScope(const char *scope) = 0;   // does nothing if called on non-empty scope
     

+ 46 - 4
dali/regress/daregress.cpp

@@ -328,7 +328,7 @@ static bool setupDFS(const char *scope, unsigned supersToDel=3, unsigned subsToC
         sub.append("::").append(name);
 
         // Remove first
-        if (dir.exists(sub.str(),user,true,false) && !dir.removePhysical(sub.str(), user, NULL, NULL)) {
+        if (dir.exists(sub.str(),user,true,false) && !dir.removePhysical(sub.str(), user)) {
             ERROR1("Can't remove %s", sub.str());
             return false;
         }
@@ -913,27 +913,69 @@ static void testDFSDel()
         e->Release();
     }
 
+    // Logical Remove
     printf("Deleting 'regress::del::super1, should work\n");
     if (!dir.removeEntry("regress::del::super1", user)) {
         ERROR("Can't remove super1");
         return;
     }
-    printf("Deleting 'regress::del::sub1, should work\n");
+    printf("Deleting 'regress::del::sub1 autoCommit, should work\n");
     if (!dir.removeEntry("regress::del::sub1", user)) {
         ERROR("Can't remove sub1");
         return;
     }
+
+    printf("Removing 'regress::del::sub2 - rollback\n");
+    transaction->start();
+    dir.removeEntry("regress::del::sub2", user, INFINITE, transaction);
+    transaction->rollback();
+
+    if (!dir.exists("regress::del::sub2", user, true, false)) {
+        ERROR("Shouldn't have removed sub2 on rollback");
+        return;
+    }
+
+    printf("Removing 'regress::del::sub2 - commit\n");
+    transaction->start();
+    dir.removeEntry("regress::del::sub2", user, INFINITE, transaction);
+    transaction->commit();
+
+    if (dir.exists("regress::del::sub2", user, true, false)) {
+        ERROR("Should have removed sub2 on commit");
+        return;
+    }
+
+    // Physical Remove
+    printf("Physically removing 'regress::del::sub3 - rollback\n");
+    transaction->start();
+    dir.removeEntry("regress::del::sub3", user, INFINITE, transaction);
+    transaction->rollback();
+
+    if (!dir.exists("regress::del::sub3", user, true, false)) {
+        ERROR("Shouldn't have removed sub3 on rollback");
+        return;
+    }
+
+    printf("Physically removing 'regress::del::sub3 - commit\n");
+    transaction->start();
+    dir.removeEntry("regress::del::sub3", user, INFINITE, transaction);
+    transaction->commit();
+
+    if (dir.exists("regress::del::sub3", user, true, false)) {
+        ERROR("Should have removed sub3 on commit");
+        return;
+    }
 }
 
 static void testDFSRename()
 {
     Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction(user); // disabled, auto-commit
 
-    if (dir.exists("regress::rename::other1",user,false,false) && !dir.removePhysical("regress::rename::other1", user, NULL, NULL)) {
+    if (dir.exists("regress::rename::other1",user,false,false) && !dir.removePhysical("regress::rename::other1", user)) {
         ERROR("Can't remove 'regress::rename::other1'");
         return;
     }
-    if (dir.exists("regress::rename::other2",user,false,false) && !dir.removePhysical("regress::rename::other2", user, NULL, NULL)) {
+    if (dir.exists("regress::rename::other2",user,false,false) && !dir.removePhysical("regress::rename::other2", user)) {
         ERROR("Can't remove 'regress::rename::other2'");
         return;
     }

+ 7 - 2
plugins/fileservices/fileservices.cpp

@@ -345,13 +345,18 @@ FILESERVICES_API void FILESERVICES_CALL fsDeleteLogicalFile(ICodeContext *ctx, c
     StringBuffer lfn;
     constructLogicalName(ctx, name, lfn);
 
+    IDistributedFileTransaction *transaction = ctx->querySuperFileTransaction();
     Linked<IUserDescriptor> udesc = ctx->queryUserDescriptor();
     StringBuffer uname;
     PrintLog("Deleting NS logical file %s for user %s", lfn.str(),udesc?udesc->getUserName(uname).str():"");
-    if (queryDistributedFileDirectory().removePhysical(lfn.str(),udesc,NULL,NULL))
+    if (queryDistributedFileDirectory().removePhysical(lfn.str(),udesc,NULL,INFINITE,transaction))
     {
         StringBuffer s("DeleteLogicalFile ('");         // ** TBD use removephysical (handles cluster)
-        s.append(lfn).append("') done");
+        s.append(lfn);
+        if (transaction->active())
+            s.append("') added to transaction");
+        else
+            s.append("') done");
         WUmessage(ctx,ExceptionSeverityInformation,NULL,s.str());
         AuditMessage(ctx,"DeleteLogicalFile",lfn.str());
 

+ 28 - 0
testing/ecl/key/superfile10.xml

@@ -0,0 +1,28 @@
+<Dataset name='Result 1'>
+</Dataset>
+<Dataset name='Result 2'>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><Result_3>true</Result_3></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><Result_4>true</Result_4></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><Result_5>false</Result_5></Row>
+</Dataset>
+<Dataset name='Result 6'>
+ <Row><Result_6>true</Result_6></Row>
+</Dataset>
+<Dataset name='Result 7'>
+ <Row><Result_7>false</Result_7></Row>
+</Dataset>
+<Dataset name='Result 8'>
+ <Row><Result_8>true</Result_8></Row>
+</Dataset>
+<Dataset name='Result 9'>
+ <Row><Result_9>false</Result_9></Row>
+</Dataset>
+<Dataset name='Result 10'>
+ <Row><Result_10>false</Result_10></Row>
+</Dataset>

+ 69 - 0
testing/ecl/superfile10.ecl

@@ -0,0 +1,69 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+import Std.System.Thorlib;
+import Std.File AS FileServices;
+import Std.Str;
+// Super File regression test
+//noRoxie
+
+rec :=
+RECORD
+        integer i;
+    string1 id;
+END;
+
+ds1 := DATASET([{1,'A'}, {1,'B'}, {1,'C'}], rec);
+ds2 := DATASET([{1,'A'}, {1,'B'}, {1,'C'}], rec);
+
+clusterLFNPrefix := thorlib.getExpandLogicalName('regress::');
+
+string stripPrefix(string qlfn) := IF (Str.Find(qlfn, clusterLFNprefix, 1) = 1, Str.FindReplace(qlfn, clusterLFNPrefix, ''), qlfn);
+
+conditionalDelete(string lfn) := FUNCTION
+        RETURN IF(FileServices.FileExists(lfn), FileServices.DeleteLogicalFile(lfn));
+END;
+
+
+SEQUENTIAL(
+  // Prepare
+  conditionalDelete ('regress::subfile14'),
+  conditionalDelete ('regress::subfile15'),
+  OUTPUT(ds1,,'regress::subfile14',overwrite),
+  OUTPUT(ds2,,'regress::subfile15',overwrite),
+  OUTPUT(FileServices.FileExists('regress::subfile14')), // true
+  OUTPUT(FileServices.FileExists('regress::subfile15')), // true
+
+  // Remove Auto-commit
+  FileServices.DeleteLogicalFile('regress::subfile14'),
+  OUTPUT(FileServices.FileExists('regress::subfile14')), // false
+  OUTPUT(FileServices.FileExists('regress::subfile15')), // true
+
+  // Remove + Rollback
+  FileServices.StartSuperFileTransaction(),
+  FileServices.DeleteLogicalFile('regress::subfile15'),
+  FileServices.FinishSuperFileTransaction(true),    // rollback
+  OUTPUT(FileServices.FileExists('regress::subfile14')), // false
+  OUTPUT(FileServices.FileExists('regress::subfile15')), // true
+
+  // Remove + Commit
+  FileServices.StartSuperFileTransaction(),
+  FileServices.DeleteLogicalFile('regress::subfile15'),
+  FileServices.FinishSuperFileTransaction(),    // commit
+  OUTPUT(FileServices.FileExists('regress::subfile14')), // false
+  OUTPUT(FileServices.FileExists('regress::subfile15')), // false
+);