Browse Source

HPCC-9202 - DFS transaction remove/add regression

Removing then adding back the same subfile would cause a
'addSubFile: File <sub> is already a subfile of <super>' error

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 12 năm trước cách đây
mục cha
commit
1462264641
1 tập tin đã thay đổi với 32 bổ sung20 xóa
  1. 32 20
      dali/base/dadfs.cpp

+ 32 - 20
dali/base/dadfs.cpp

@@ -1360,19 +1360,13 @@ public:
     IDistributedSuperFile *lookupSuperFile(const char *name, unsigned timeout)
     {
         IDistributedSuperFile *ret;
-        IDistributedFile * f = findFile(name);
-        if (f) {
-            ret = f->querySuperFile();
-            if (ret)
-                return LINK(ret);
-        }
-        ret = queryDistributedFileDirectory().lookupSuperFile(name,udesc,this,timeout);
-        if (!ret)
-            return NULL;
-        if (isactive) {
-            ret->Link();
-            dflist.append(*ret);
-        }
+        IDistributedFile *f = findFile(name);
+        if (f)
+            ret = LINK(f->querySuperFile());
+        else
+            ret = queryDistributedFileDirectory().lookupSuperFile(name,udesc,this,timeout);
+        if (isactive && ret)
+            addFileToCache(ret);
         return ret;
     }
 
@@ -1421,6 +1415,27 @@ public:
         }
         delayeddelete.kill();
     }
+
+private:
+    void addFileToCache(IDistributedFile *file)
+    {
+        file->Link();
+        dflist.append(*file);
+        // Also add subfiles to cache
+        IDistributedSuperFile * sfile = file->querySuperFile();
+        if (sfile) {
+            Owned<IDistributedFileIterator> iter = sfile->getSubFileIterator();
+            ForEach(*iter) {
+                IDistributedFile *f = &iter->query();
+                if (f->querySuperFile()) {
+                    addFileToCache(f);
+                } else {
+                    f->Link();
+                    dflist.append(*f);
+                }
+            }
+        }
+    }
 };
 
 static bool recursiveCheckEmptyScope(IPropertyTree &ct)
@@ -3952,7 +3967,7 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
         virtual ~cAddSubFileAction() {}
         bool prepare()
         {
-            parent.setown(transaction->lookupSuperFile(parentlname));   
+            parent.setown(transaction->lookupSuperFileCached(parentlname));
             if (!parent)
                 throw MakeStringException(-1,"addSubFile: SuperFile %s cannot be found",parentlname.get());
             if (!subfile.isEmpty())
@@ -3962,9 +3977,6 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
                     sub.setown(transaction->lookupFile(subfile,SDS_SUB_LOCK_TIMEOUT));
                     if (!sub)
                         throw MakeStringException(-1,"cAddSubFileAction: sub file %s not found", subfile.sget());
-                    // Must validate before locking for update below, to check sub is not already in parent (and therefore locked already)
-                    CDistributedSuperFile *sf = dynamic_cast<CDistributedSuperFile *>(parent.get());;
-                    sf->validateAddSubFile(sub);
                 }
                 catch (IDFS_Exception *e)
                 {
@@ -4030,7 +4042,7 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
         virtual ~cRemoveSubFileAction() {}
         bool prepare()
         {
-            parent.setown(transaction->lookupSuperFile(parentlname,true));
+            parent.setown(transaction->lookupSuperFileCached(parentlname,true));
             if (!parent)
                 throw MakeStringException(-1,"removeSubFile: SuperFile %s cannot be found",parentlname.get());
             if (!subfile.isEmpty())
@@ -4118,10 +4130,10 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
         virtual ~cSwapFileAction() {}
         bool prepare()
         {
-            parent.setown(transaction->lookupSuperFile(parentlname));
+            parent.setown(transaction->lookupSuperFileCached(parentlname));
             if (!parent)
                 throw MakeStringException(-1,"swapSuperFile: SuperFile %s cannot be found",parentlname.get());
-            file.setown(transaction->lookupSuperFile(filelname));
+            file.setown(transaction->lookupSuperFileCached(filelname));
             if (!file)
             {
                 parent.clear();