Pārlūkot izejas kodu

Transaction refactoring - create super file

Adding super-files within transaction. Locking and unlocking within
the transaction, making sure the file exists and onle unlock on
rollback if it wasn't. Transaction is now compulsory, even if
not active, which will trigger an auto-commit. A local transaction
is created in case the pointer is NULL, but eventually, it'll
assertex on transaction existing.

This commit narrowes the windows to which a super file stays in the
file system (from infinite to just between transaction's lock and
unlock), but the gap is still there. There is a FIXME to that effect.

Also, the action's lock/unlock add and remove the entry from the
filesystem, which in case of retry is inefficient, but we must
know if the addition will fail before we lock the other actions.
Would be good to have a solution to that, as well.
Renato Golin 13 gadi atpakaļ
vecāks
revīzija
733cd07639
3 mainītis faili ar 124 papildinājumiem un 20 dzēšanām
  1. 117 16
      dali/base/dadfs.cpp
  2. 5 2
      dali/base/dadfs.hpp
  3. 2 2
      plugins/fileservices/fileservices.cpp

+ 117 - 16
dali/base/dadfs.cpp

@@ -893,7 +893,7 @@ public:
         return createNew(fdesc,NULL,includeports);
     }
     IDistributedFile *createNew(IPropertyTree *tree,bool ignoregroup);
-    IDistributedSuperFile *createSuperFile(const char *logicalname,bool interleaved,bool ifdoesnotexist,IUserDescriptor *user);
+    IDistributedSuperFile *createSuperFile(const char *logicalname,bool interleaved,bool ifdoesnotexist,IUserDescriptor *user,IDistributedFileTransaction *transaction=NULL);
 
     IDistributedFileIterator *getIterator(const char *wildname, bool includesuper,IUserDescriptor *user);
     IDFAttributesIterator *getDFAttributesIterator(const char *wildname, bool recursive, bool includesuper,INode *foreigndali,IUserDescriptor *user,unsigned foreigndalitimeout);
@@ -1152,6 +1152,11 @@ public:
     {
         actions.append(*action);
     }
+    void addSuperFile(IDistributedSuperFile *sfile)
+    {
+        if (!findFile(sfile->queryLogicalName()))
+            dflist.append(*sfile);
+    }
     void start()
     {
         if (isactive)
@@ -1176,6 +1181,7 @@ public:
                     break;
                 while (nlocked) // unlock for retry
                     actions.item(--nlocked).unlock(false,false);
+                // This could be dangerous now that superfiles are in transactions
                 dflist.kill();
                 PROGLOG("CDistributedFileTransaction: Transaction pausing");
                 Sleep(SDS_TRANSACTION_RETRY/2+(getRandom()%SDS_TRANSACTION_RETRY)); 
@@ -1227,12 +1233,26 @@ public:
     }
     void rollback()
     {
+        actions.kill(); // should be empty
+        dflist.kill(); // release locks
         if (!isactive)
             return;
         isactive = false;
+        // this we only want to do if active
         delayeddelete.kill();
-        actions.kill();
-        dflist.kill();
+    }
+    void autoCommit()
+    {
+        if (!isactive) {
+            try {
+                isactive = true;
+                commit();
+            }
+            catch (IException *e) {
+                rollback();
+                throw;
+            }
+        }
     }
 
     IDistributedFile *findFile(const char *name)
@@ -1338,9 +1358,15 @@ public:
     }
     void start()  { if (chain) chain->start(); }
     void commit() { if (chain) chain->commit(); }
+    void autoCommit() { if (chain) chain->autoCommit(); }
     void rollback() { if (chain) chain->rollback(); }
     bool active()  { if (chain) return chain->active(); return isactive; }
     bool setActive(bool on) { if (chain) return chain->setActive(on); bool ret = isactive; isactive = on; return ret; }
+    void addSuperFile(IDistributedSuperFile *sfile)
+    {
+        if (chain)
+            chain->addSuperFile(sfile);
+    }
     IDistributedFile *lookupFile(const char *lfn,unsigned timeout=INFINITE)
     {
         if (owner&&(strcmp(lfn,owner->queryLogicalName())==0))
@@ -3956,7 +3982,6 @@ struct SuperFileSubTreeCache
     }
 };
 
-
 class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
 {
     void checkNotForeign()
@@ -5287,7 +5312,7 @@ public:
         }
         if (transaction&&transaction->active()) {
             cAddSubFileAction *action = new cAddSubFileAction(transaction,queryLogicalName(),subfile,before,other,addcontents);
-            // action is now owned by transaction so don't unlink or delete!
+            // action is owned by transaction (acquired on CDFAction's c-tor) so don't unlink or delete!
             return;
         }
         Owned<IDistributedFile> sub = transaction ? transaction->lookupFile(subfile) : parent->lookup(subfile,udesc,false,NULL,defaultTimeout);
@@ -5346,7 +5371,7 @@ public:
         }
         if (transaction&&transaction->active()) {
             cRemoveSubFileAction *action = new cRemoveSubFileAction(transaction,queryLogicalName(),subfile,remsub,remphys);
-            // action is now owned by transaction so don't unlink or delete!
+            // action is owned by transaction (acquired on CDFAction's c-tor) so don't unlink or delete!
             return true;
         }
 
@@ -5363,7 +5388,7 @@ public:
         partscache.kill();
         if (transaction&&transaction->active()) {
             cSwapFileAction *action = new cSwapFileAction(transaction,queryLogicalName(),_file->queryLogicalName());
-            // action is now owned by transaction so don't unlink or delete!
+            // action is owned by transaction (acquired on CDFAction's c-tor) so don't unlink or delete!
             return true;
         }
 
@@ -6627,19 +6652,95 @@ IDistributedFile *CDistributedFileDirectory::createNew(IFileDescriptor *fdesc,co
     return file;
 }
 
-IDistributedSuperFile *CDistributedFileDirectory::createSuperFile(const char *_logicalname, bool _interleaved,bool ifdoesnotexist,IUserDescriptor *user)
+/**
+ * Creates a super-file within a transaction.
+ */
+class cCreateSuperFileAction: public CDFAction
+{
+    CDfsLogicalFileName logicalname;
+    CDistributedFileDirectory *parent;
+    Owned<IDistributedSuperFile> super;
+    IUserDescriptor *user;
+    IPropertyTree *root;
+    bool ifdoesnotexist;
+    bool created;
+public:
+    cCreateSuperFileAction(IDistributedFileTransaction *_transaction,
+                           CDistributedFileDirectory *_parent,
+                           IUserDescriptor *_user,
+                           const char *_flname,
+                           bool interleaved)
+        : CDFAction(_transaction), parent(_parent), user(_user), created(false)
+    {
+        logicalname.set(_flname);
+        // We *have* to make sure the file doesn't exist here
+        IDistributedSuperFile *sfile = parent->lookupSuperFile(logicalname.get(), user, transaction, false, SDS_SUB_LOCK_TIMEOUT);
+        if (sfile) {
+            super.setown(sfile);
+        } else {
+            // Create file and link to transaction, so subsequent lookups won't fail
+            root = createPTree();
+            root->setPropInt("@interleaved",interleaved?2:0); // this is ill placed
+            super.setown(new CDistributedSuperFile(parent, root, logicalname, user));
+            created = true;
+        }
+        transaction->addSuperFile(LINK(super));
+    }
+    virtual ~cCreateSuperFileAction() {}
+    bool lock()
+    {
+        // Attache the file to DFS
+        parent->addEntry(logicalname,root,true,false);
+        // FIXME: This will introduce a window (until commit) that
+        // the file is accessible. Use super->attach/detach instead.
+        return true;
+    }
+    void run()
+    {
+        // Do nothing, file is already created
+    }
+    void unlock(bool commit, bool rollback)
+    {
+        // on retry, we need to remove the file so next lock doesn't fail
+        if (created && !commit)
+            parent->removeEntry(logicalname.get(), user);
+    }
+
+};
+
+IDistributedSuperFile *CDistributedFileDirectory::createSuperFile(const char *_logicalname, bool _interleaved,bool ifdoesnotexist,IUserDescriptor *user,IDistributedFileTransaction *transaction)
 {
     CDfsLogicalFileName logicalname;
     logicalname.set(_logicalname);
     checkLogicalName(logicalname,user,true,true,false,"have a superfile with");
-    //assertex(!transaction); // transaction TBD
-    StringBuffer tail;
-    IPropertyTree *root = createPTree();
-    root->setPropInt("@interleaved",_interleaved?2:0);
-    addEntry(logicalname,root,true,ifdoesnotexist);
-    StringBuffer query;
-    CFileConnectLock fcl("CDistributedFileDirectory::createSuperFile",logicalname,DXB_SuperFile,false,false,defaultTimeout);
-    return new CDistributedSuperFile(this, fcl.detach(), logicalname, user, false, NULL, false, defaultTimeout);
+
+    IDistributedSuperFile *sfile = lookupSuperFile(logicalname.get(), user, transaction, false, defaultTimeout);
+    if (sfile) {
+        if (ifdoesnotexist) {
+            // Cache, since we're going to use it
+            if (transaction && transaction->active())
+                transaction->addSuperFile(LINK(sfile));
+            return sfile;
+        } else
+            throw MakeStringException(-1,"createSuperFile: SuperFile %s already exists",logicalname.get());
+    }
+
+    // Create a local transaction that will be destroyed (but never touch the external transaction)
+    Linked<IDistributedFileTransaction> localtrans;
+    if (transaction) {
+        localtrans.set(transaction);
+    } else {
+        // TODO: Make it explicit in the API that a transaction is required
+        localtrans.setown(new CDistributedFileTransaction(user));
+    }
+
+    // action is owned by transaction (acquired on CDFAction's c-tor) so don't unlink or delete!
+    cCreateSuperFileAction *action = new cCreateSuperFileAction(localtrans,this,user,_logicalname,_interleaved);
+
+    localtrans->autoCommit();
+
+    // Should have been persisted to the DFS by now
+    return lookupSuperFile(logicalname.get(), user, localtrans, false, defaultTimeout);
 }
 
 static bool checkProtectAttr(const char *logicalname,IPropertyTree *froot,StringBuffer &reason)

+ 5 - 2
dali/base/dadfs.hpp

@@ -165,12 +165,14 @@ class CDFAction ;
 /**
  * File operations can be included in a transaction to ensure that multiple
  * updates are handled atomically. This is the interface to a transaction
- * instance.
+ * instance. Auto-commit state when transaction is not active (ie. you
+ * don't want the actions to be executed on commit).
  */
 interface IDistributedFileTransaction: extends IInterface
 {
     virtual void start()=0;
     virtual void commit()=0;
+    virtual void autoCommit()=0; // if transaction not active, commit straight away
     virtual void rollback()=0;
     virtual bool active()=0;
     virtual bool setActive(bool on)=0; // returns old value (used internally)
@@ -179,6 +181,7 @@ interface IDistributedFileTransaction: extends IInterface
     virtual IUserDescriptor *queryUser()=0;
     virtual bool addDelayedDelete(const char *lfn,bool remphys,IUserDescriptor *user)=0; // used internally to delay deletes untill commit 
     virtual void addAction(CDFAction *action)=0; // internal
+    virtual void addSuperFile(IDistributedSuperFile *sfile)=0; // TODO: avoid this being necessary
     virtual void clearFiles()=0; // internal
     virtual IDistributedFileTransaction *baseTransaction()=0;
 };
@@ -419,7 +422,7 @@ interface IDistributedFileDirectory: extends IInterface
     virtual IPropertyTree *getFileTree(const char *lname,const INode *foreigndali=NULL,IUserDescriptor *user=NULL, unsigned foreigndalitimeout=FOREIGN_DALI_TIMEOUT, bool expandnodes=true) =0;
     virtual IFileDescriptor *getFileDescriptor(const char *lname,const INode *foreigndali=NULL,IUserDescriptor *user=NULL, unsigned foreigndalitimeout=FOREIGN_DALI_TIMEOUT) =0;
 
-    virtual IDistributedSuperFile *createSuperFile(const char *logicalname,bool interleaved,bool ifdoesnotexist=false,IUserDescriptor *user=NULL) = 0;
+    virtual IDistributedSuperFile *createSuperFile(const char *logicalname,bool interleaved,bool ifdoesnotexist=false,IUserDescriptor *user=NULL,IDistributedFileTransaction *transaction=NULL) = 0;
     virtual IDistributedSuperFile *lookupSuperFile(const char *logicalname,IUserDescriptor *user=NULL,
                                                     IDistributedFileTransaction *transaction=NULL, // transaction only used for looking up sub files
                                                     bool fixmissing=false,  // used when removing

+ 2 - 2
plugins/fileservices/fileservices.cpp

@@ -1033,11 +1033,11 @@ static void CheckNotInTransaction(ICodeContext *ctx, const char *fn)
 
 FILESERVICES_API void FILESERVICES_CALL fsCreateSuperFile(ICodeContext *ctx, const char *lsuperfn, bool sequentialparts, bool ifdoesnotexist)
 {
+    IDistributedFileTransaction *transaction = ctx->querySuperFileTransaction();
     Linked<IUserDescriptor> udesc = ctx->queryUserDescriptor();
     StringBuffer lsfn;
     constructLogicalName(ctx, lsuperfn, lsfn);
-    Owned<IDistributedSuperFile> file = queryDistributedFileDirectory().createSuperFile(lsfn,!sequentialparts,ifdoesnotexist,udesc);
-    CheckNotInTransaction(ctx,"CreateSuperFile");
+    Owned<IDistributedSuperFile> file = queryDistributedFileDirectory().createSuperFile(lsfn,!sequentialparts,ifdoesnotexist,udesc,transaction);
     StringBuffer s("CreateSuperFile ('");
     s.append(lsfn).append("') done");
     AuditMessage(ctx,"CreateSuperFile",lsfn.str());