Browse Source

Merge pull request #919 from jakesmith/gh-839

gh-839 - linkSubfile ignored transactions

Reviewed-by: Renato Golin <rengolin@hpccsystems.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 13 years ago
parent
commit
e2f5a00c6e
1 changed files with 58 additions and 30 deletions
  1. 58 30
      dali/base/dadfs.cpp

+ 58 - 30
dali/base/dadfs.cpp

@@ -944,7 +944,7 @@ public:
 
     IDistributedSuperFile *lookupSuperFile(const char *logicalname,IUserDescriptor *user,IDistributedFileTransaction *transaction,bool fixmissing=false,unsigned timeout=INFINITE);
 
-    void linkSuperOwner(IDistributedFile &subfile,const char *superfile,bool link);
+    void linkSuperOwner(IDistributedFile &subfile,const char *superfile,bool link,IDistributedFileTransaction *transaction);
 
     int getFilePermissions(const char *lname,IUserDescriptor *user,unsigned auditflags);
     int getNodePermissions(const IpAddress &ip,IUserDescriptor *user,unsigned auditflags);
@@ -2581,7 +2581,7 @@ public:
                     queryLogicalName(), superRepO);
     }
 
-    void linkSuperOwner(const char *superfile,bool link)
+    void linkSuperOwner(const char *superfile,bool link,IDistributedFileTransaction *transaction)
     {
         if (!superfile||!*superfile)
             return;
@@ -2590,14 +2590,20 @@ public:
             if (t) {
                 if (!link) {
                     root->removeTree(t);
-                    conn->commit();
-                    dfCheckRoot("linkSuperOwner",root,conn);
+                    if (!transaction)
+                    {
+                        conn->commit();
+                        dfCheckRoot("linkSuperOwner",root,conn);
+                    }
                 }
             }
             else if (link) {
                 t.setown(addNamedPropTree(root,"SuperOwner","@name",superfile));
-                conn->commit();
-                dfCheckRoot("linkSuperOwner.2",root,conn);
+                if (!transaction)
+                {
+                    conn->commit();
+                    dfCheckRoot("linkSuperOwner.2",root,conn);
+                }
             }
         }
         else 
@@ -4030,6 +4036,7 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
     {
         StringAttr parentlname;
         Owned<IDistributedSuperFile> parent;
+        Owned<IDistributedFile> sub;
         StringAttr subfile;
         bool before;
         StringAttr other;
@@ -4048,18 +4055,16 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
             if (!parent)
                 throw MakeStringException(-1,"addSubFile: SuperFile %s cannot be found",parentlname.get());
             if (!subfile.isEmpty()) {
-                Owned<IDistributedFile> tmp;
                 try {
-                    tmp.setown(transaction->lookupFile(subfile,SDS_SUB_LOCK_TIMEOUT));
+                    sub.setown(transaction->lookupFile(subfile,SDS_SUB_LOCK_TIMEOUT));
                 }
                 catch (IDFS_Exception *e) {
                     if (e->errorCode()!=DFSERR_LookupConnectionTimout)
                         throw;
                     return false;
                 }
-                if (!tmp.get())
+                if (!sub.get())
                     throw MakeStringException(-1,"addSubFile: File %s cannot be found to add",subfile.get());
-                tmp.clear();
                 if (parent->querySubFileNamed(subfile)) 
                     WARNLOG("addSubFile: File %s is already a subfile of %s", subfile.get(),parent->queryLogicalName());
                 // cannot abort here as may be clearsuperfile that hasn't been done
@@ -4067,27 +4072,30 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
 //                  throw MakeStringException(-1,"addSubFile: File %s is already a subfile of %s", subfile.get(),parent->queryLogicalName());
             }
             if (parent->lockTransaction(SDS_SUB_LOCK_TIMEOUT))
-                return true;
+                if (sub->lockTransaction(SDS_SUB_LOCK_TIMEOUT))
+                    return true;
+            sub.clear();
             parent.clear();
             return false;
         }
         void run()
         {
-            IDistributedFile *sub = transaction->lookupFile(subfile);
             if (!sub)
                 throw MakeStringException(-1,"addSubFile(2): File %s cannot be found to add",subfile.get());
             CDistributedSuperFile *sf = QUERYINTERFACE(parent.get(),CDistributedSuperFile);                 
             if (sf) {
                 CPauseTransaction pausetrans(transaction);
-                sf->doAddSubFile(sub,before,other,transaction);
+                sf->doAddSubFile(LINK(sub),before,other,transaction);
             }
         }
         void unlock(bool commit,bool rollback)
         {
+            sub->unlockTransaction(commit,rollback);
             parent->unlockTransaction(commit,rollback);
             CDistributedSuperFile *sf = QUERYINTERFACE(parent.get(),CDistributedSuperFile);                 
             if (sf&&commit)
                 sf->updateParentFileAttrs(transaction);
+            sub.clear();
             parent.clear();
         }
         
@@ -4192,8 +4200,9 @@ protected:
         }
     }
 
-    void addItem(unsigned pos,IDistributedFile *file)
+    void addItem(unsigned pos,IDistributedFile *_file)
     {
+        Owned<IDistributedFile> file = _file;
         partscache.kill();
         // first renumber all above
         StringBuffer path;
@@ -4212,7 +4221,7 @@ protected:
             root->setPropTree("Attr",createPTreeFromIPT(&file->queryProperties()));
         }
         root->addPropTree("SubFile",sub);
-        subfiles.add(*file,pos);
+        subfiles.add(*file.getClear(),pos);
         root->setPropInt("@numsubfiles",subfiles.ordinality());
     }
 
@@ -4352,14 +4361,14 @@ protected:
         }
     };
 
-    void linkSubFile(unsigned pos)
+    void linkSubFile(unsigned pos,IDistributedFileTransaction *transaction)
     {
-        parent->linkSuperOwner(subfiles.item(pos),queryLogicalName(),true);
+        parent->linkSuperOwner(subfiles.item(pos),queryLogicalName(),true,transaction);
     }
 
-    void unlinkSubFile(unsigned pos)
+    void unlinkSubFile(unsigned pos,IDistributedFileTransaction *transaction)
     {
-        parent->linkSuperOwner(subfiles.item(pos),queryLogicalName(),false);
+        parent->linkSuperOwner(subfiles.item(pos),queryLogicalName(),false,transaction);
     }
 
     void checkSubFormatAttr(IDistributedFile *sub, IDistributedFileTransaction *_transaction, const char* exprefix="")
@@ -4988,8 +4997,9 @@ public:
         return new cSubFileIterator(subfiles,supersub);
     }
 
-    void doAddSubFile(IDistributedFile *sub,bool before,const char *other,IDistributedFileTransaction *transaction) // takes ownership of sub
+    void doAddSubFile(IDistributedFile *_sub,bool before,const char *other,IDistributedFileTransaction *transaction) // takes ownership of sub
     {
+        Owned<IDistributedFile> sub = _sub;
         if (strcmp(sub->queryLogicalName(),queryLogicalName())==0)
             throw MakeStringException(-1,"addSubFile: Cannot add file %s to itself", queryLogicalName());
         if (subfiles.ordinality())
@@ -5010,10 +5020,10 @@ public:
         else
             pos = before?0:subfiles.ordinality();
         unsigned cmppos = (pos==0)?1:0;
-        addItem(pos,sub);     // remove if failure TBD?
+        addItem(pos,sub.getClear());     // remove if failure TBD?
         setModified();
         updateFileAttrs();
-        linkSubFile(pos);     
+        linkSubFile(pos, transaction);
     }
 
     void addSubFile(const char * subfile,
@@ -5164,6 +5174,7 @@ public:
             {
                 StringAttr parentlname;
                 Owned<IDistributedSuperFile> parent;
+                Owned<IDistributedFile> sub;
                 StringAttr subfile;
                 bool remsub;
                 bool remphys;
@@ -5182,9 +5193,23 @@ public:
                     parent.setown(transaction->lookupSuperFile(parentlname,true));
                     if (!parent)
                         throw MakeStringException(-1,"removeSubFile: SuperFile %s cannot be found",parentlname.get());
+                    if (!subfile.isEmpty()) {
+                        try {
+                            sub.setown(transaction->lookupFile(subfile,SDS_SUB_LOCK_TIMEOUT));
+                        }
+                        catch (IDFS_Exception *e) {
+                            if (e->errorCode()!=DFSERR_LookupConnectionTimout)
+                                throw;
+                            return false;
+                        }
+                        if (!parent->querySubFileNamed(subfile))
+                            WARNLOG("addSubFile: File %s is not a subfile of %s", subfile.get(),parent->queryLogicalName());
+                    }
                     if (parent->lockTransaction(SDS_SUB_LOCK_TIMEOUT))
-                        return true;
+                        if (!sub || sub->lockTransaction(SDS_SUB_LOCK_TIMEOUT))
+                            return true;
                     parent.clear();
+                    sub.clear();
                     return false;
                 }
                 void run()
@@ -5194,8 +5219,11 @@ public:
                 }
                 void unlock(bool commit,bool rollback)
                 {
+                    if (sub)
+                        sub->unlockTransaction(commit,rollback);
                     parent->unlockTransaction(commit,rollback); 
                     parent.clear();
+                    sub.clear();
                 }
             } *action = new cRemoveSubFileAction(transaction,queryLogicalName(),subfile,remsub,remphys,remcontents);
             // action is now owned by transaction so don't unlink or delete!
@@ -5220,7 +5248,7 @@ public:
                 unlockProperties();
                 return false;
             }
-            unlinkSubFile(pos);
+            unlinkSubFile(pos,transaction);
             removeItem(pos,subname.clear());
             subnames.append(* new StringAttrItem(subname.str()));
             setModified();
@@ -5239,7 +5267,7 @@ public:
                 if (pos) {
                     do {
                         pos--;
-                        unlinkSubFile(pos);
+                        unlinkSubFile(pos,transaction);
                         removeItem(pos,subname.clear());
                         subnames.append(* new StringAttrItem(subname.str()));
                     } while (pos);
@@ -5346,7 +5374,7 @@ public:
         if (pos) {
             do {
                 pos--;
-                unlinkSubFile(pos);
+                unlinkSubFile(pos,transaction);
                 removeItem(pos,subname.clear());
                 subnames1.append(* new StringAttrItem(subname.str()));
             } while (pos);
@@ -5356,7 +5384,7 @@ public:
         if (pos) {
             do {
                 pos--;
-                file->unlinkSubFile(pos);
+                file->unlinkSubFile(pos,transaction);
                 file->removeItem(pos,subname.clear());
                 subnames2.append(* new StringAttrItem(subname.str()));
             } while (pos);
@@ -8539,16 +8567,16 @@ void CDistributedFileDirectory::resolveForeignFiles(IPropertyTree *tree,const IN
     // do origname?
 }
 
-void CDistributedFileDirectory::linkSuperOwner(IDistributedFile &subfile,const char *superfile,bool link)
+void CDistributedFileDirectory::linkSuperOwner(IDistributedFile &subfile,const char *superfile,bool link,IDistributedFileTransaction *transaction)
 {
     IDistributedSuperFile *ssub = subfile.querySuperFile();
     if (ssub) {
         CDistributedSuperFile *cdsuper = QUERYINTERFACE(ssub,CDistributedSuperFile);
-        cdsuper->linkSuperOwner(superfile,link);
+        cdsuper->linkSuperOwner(superfile,link,transaction);
     }
     else {
         CDistributedFile *cdfile = QUERYINTERFACE(&subfile,CDistributedFile);
-        cdfile->linkSuperOwner(superfile,link);
+        cdfile->linkSuperOwner(superfile,link,transaction);
     }
 }