Pārlūkot izejas kodu

Fix gh-3083 - logical rename, not renaming physicals correctly

The rename code was assuming the logical file directory matched the current instance base directory.
Which is not likely to be true, e.g. eclagents vs thor's have different default data directories and the rename context is normally eclagent. So renaming a thor file in eclagent, it will normally mismatch.
If it detected a mismatch, there was some odd legacy code which was effectively taking the 1st two head subdirectories from the existing path and using that as the new head.
The result on a default OSS system, was that renames attempt to rename to /var/lib/<filename>

This commit, deducing the old base directory based on the existing lfn directory.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 13 gadi atpakaļ
vecāks
revīzija
60eb8709c0

+ 71 - 54
dali/base/dadfs.cpp

@@ -3362,6 +3362,40 @@ public:
         return afor.ok;
     }
 
+    // This method takes an existing physical directory path for a logical file
+    // and a constructed path to the same logical file created in this context
+    // and deduces the original base path e.g. /var/lib/HPCCSystems/hpcc-data/thor
+    // This is necessary, because there is no not enough context to directly fetch the
+    // original base path to construct new paths for the rename
+    bool getBase(const char *oldPath, const char *thisPath, StringBuffer &baseDir)
+    {
+        const char *oldEnd = oldPath+strlen(oldPath)-1;
+        const char *thisEnd = thisPath+strlen(thisPath)-1;
+        if (isPathSepChar(*oldEnd))
+            oldEnd--;
+        if (isPathSepChar(*thisEnd))
+            thisEnd--;
+        const char *oldP = oldEnd, *thisP = thisEnd;
+        loop {
+            if (oldP==oldPath || thisP==thisPath)
+                break;
+            if (*oldP != *thisP) {
+                // unless last was separator, continue until find one
+                if (isPathSepChar(*(oldP+1)))
+                    oldP++;
+                else {
+                    while (oldP != oldPath && (!isPathSepChar(*oldP)))
+                        oldP--;
+                }
+                baseDir.append(oldP-oldPath, oldPath);
+                return true;
+            }
+            --oldP;
+            --thisP;
+        }
+        return false;
+    }
+
     bool renamePhysicalPartFiles(const char *newname,
                                  const char *cluster,
                                  IMultiException *mexcept,
@@ -3377,58 +3411,42 @@ public:
         StringBuffer basedir;
         if (newbasedir)
             diroverride = newbasedir;
-        else
-        {
-            const char * base = queryBaseDirectory(false,os);
-            size32_t l = strlen(base);
-            if ((memcmp(base,directory.get(),l)==0)&&((l==directory.length())||isPathSepChar(directory.get()[l]))) {
-                basedir.append(base);
-                diroverride = basedir.str();
-            }
-            else {  // shouldn't get here normally
-                // now assume 'Base' dir is same as old
-                basedir.append(directory);
-                const char *e = strchr(basedir.str()+((psc=='/')?1:0),psc);
-                if (e) {
-                    const char *e2=strchr(e+1,psc);
-                    if (e2) 
-                        basedir.setLength(e2-basedir.str());
-                    diroverride = basedir.str();
-                }
-            }
-        }
-        makePhysicalPartName(newname, 0, 0, newdir, false, os, diroverride);
-        if (newdir.length()==0)
-            return false;
-        if (isPathSepChar(newdir.charAt(newdir.length()-1)))
-            newdir.setLength(newdir.length()-1);
+
+        const char *myBase = queryBaseDirectory(false,os);
+        StringBuffer baseDir, newPath;
+        makePhysicalPartName(logicalName.get(), 0, 0, newPath, false, os, diroverride);
+        if (!getBase(directory, newPath, baseDir))
+            baseDir.append(myBase); // getBase returns false, if directory==newPath, so have common base
         getPartMask(newmask,newname,width);
         if (newmask.length()==0)
             return false;
-        StringAttrArray newnamesprim;
-        StringAttrArray newnamesrep;
-        StringBuffer tmp;
-        StringBuffer tmp2;
+        makePhysicalPartName(newname, 0, 0, newPath.clear(), false, os, diroverride);
+        if (newPath.length()==0)
+            return false;
+        if (isPathSepChar(newPath.charAt(newPath.length()-1)))
+            newPath.setLength(newPath.length()-1);
+        newPath.remove(0, strlen(myBase));
+        newdir.append(baseDir).append(newPath);
         StringBuffer fullname;
+        CIArrayOf<CIStringArray> newNames;
         unsigned i;
         for (i=0;i<width;i++) {
+            newNames.append(*new CIStringArray);
             CDistributedFilePart &part = parts.item(i);
-            const char *fn = expandMask(tmp.clear(),newmask,i,width).str();
-            fullname.clear();
-            if (findPathSepChar(fn)) 
-                fullname.append(fn);
-            else
-                addPathSepChar(fullname.append(newdir)).append(fn);
-            // ensure on same drive
-            setPathDrive(fullname,part.queryDrive(0));
+            for (unsigned copy=0; copy<part.numCopies(); copy++) {
+                makePhysicalPartName(newname, i+1, width, newPath.clear(), false, os, myBase);
+                newPath.remove(0, strlen(myBase));
+
+                StringBuffer copyDir(baseDir);
+                adjustClusterDir(i, copy, copyDir);
+                fullname.clear().append(copyDir).append(newPath);
 
-            newnamesprim.append(*new StringAttrItem(fullname.str()));
-            if (part.numCopies()>1) {                                            // NH *** TBD
-                setReplicateDir(fullname,tmp2.clear());
-                setPathDrive(tmp2,part.queryDrive(1));
-                newnamesrep.append(*new StringAttrItem(tmp2.str()));
+                PROGLOG("fullname = %s", fullname.str());
+                newNames.item(i).append(fullname);
             }
         }
+        // NB: the code below, specifically deals with 1 primary + 1 replicate
+        // it will need refactoring if it's to deal with multiple clusters/copies
 
         // first check file doestn't exist for any new part
 
@@ -3437,8 +3455,7 @@ public:
         {
         protected:
             CriticalSection &crit;
-            StringAttrArray &newnamesprim;
-            StringAttrArray &newnamesrep;
+            CIStringArray *&newNames;
             IDistributedFile *file;
             unsigned width;
             IMultiException *mexcept;
@@ -3450,8 +3467,8 @@ public:
             bool * donerep;
             IException *except;
 
-            casyncforbase(IDistributedFile *_file,StringAttrArray &_newnamesprim,StringAttrArray &_newnamesrep,unsigned _width,IMultiException *_mexcept,CriticalSection &_crit,bool *_ignoreprim,bool *_ignorerep)
-                : newnamesprim(_newnamesprim),newnamesrep(_newnamesrep),crit(_crit)
+            casyncforbase(IDistributedFile *_file,CIStringArray *&_newNames,unsigned _width,IMultiException *_mexcept,CriticalSection &_crit,bool *_ignoreprim,bool *_ignorerep)
+                : newNames(_newNames),crit(_crit)
             {
                 width = _width;
                 file = _file;
@@ -3497,7 +3514,7 @@ public:
                     IException *ex = NULL;
                     RemoteFilename oldrfn;
                     part->getFilename(oldrfn,(unsigned)copy);
-                    const char *newfn = (copy==0)?newnamesprim.item(idx).text.get():newnamesrep.item(idx).text.get();
+                    const char *newfn = newNames[idx].item(copy);
                     if (!newfn||!*newfn)
                         continue;
                     RemoteFilename newrfn;
@@ -3532,8 +3549,8 @@ public:
         class casyncfor1: public casyncforbase
         {
         public:
-            casyncfor1(IDistributedFile *_file,StringAttrArray &_newnamesprim,StringAttrArray &_newnamesrep,unsigned _width,IMultiException *_mexcept,CriticalSection &_crit,bool *_ignoreprim,bool *_ignorerep)
-                : casyncforbase(_file,_newnamesprim,_newnamesrep,_width,_mexcept,_crit,_ignoreprim,_ignorerep)
+            casyncfor1(IDistributedFile *_file,CIStringArray *&_newNames,unsigned _width,IMultiException *_mexcept,CriticalSection &_crit,bool *_ignoreprim,bool *_ignorerep)
+                : casyncforbase(_file,_newNames,_width,_mexcept,_crit,_ignoreprim,_ignorerep)
             {
             }
             bool doPart(IDistributedFilePart *part,bool isrep,RemoteFilename &oldrfn,RemoteFilename &newrfn, bool &done)
@@ -3563,7 +3580,7 @@ public:
                 return true;
             }
 
-        } afor1 (this,newnamesprim,newnamesrep,width,mexcept,crit,NULL,NULL);
+        } afor1 (this,*newNames.getArray(),width,mexcept,crit,NULL,NULL);
         afor1.For(width,10,false,true);
         if (afor1.except)
             throw afor1.except; // no recovery needed
@@ -3592,8 +3609,8 @@ public:
         class casyncfor2: public casyncforbase
         {
         public:
-            casyncfor2(IDistributedFile *_file,StringAttrArray &_newnamesprim,StringAttrArray &_newnamesrep,unsigned _width,IMultiException *_mexcept,CriticalSection &_crit,bool *_ignoreprim,bool *_ignorerep)
-                : casyncforbase(_file,_newnamesprim,_newnamesrep,_width,_mexcept,_crit,_ignoreprim,_ignorerep)
+            casyncfor2(IDistributedFile *_file,CIStringArray *&_newNames,unsigned _width,IMultiException *_mexcept,CriticalSection &_crit,bool *_ignoreprim,bool *_ignorerep)
+                : casyncforbase(_file,_newNames,_width,_mexcept,_crit,_ignoreprim,_ignorerep)
             {
             }
             bool doPart(IDistributedFilePart *part,bool isrep,RemoteFilename &oldrfn,RemoteFilename &newrfn, bool &done)
@@ -3612,7 +3629,7 @@ public:
                 return true;;
             }
 
-        } afor2 (this,newnamesprim,newnamesrep,width,mexcept,crit,ignoreprim,ignorerep);
+        } afor2 (this,*newNames.getArray(),width,mexcept,crit,ignoreprim,ignorerep);
         afor2.For(width,10,false,true);
         if (afor2.ok) {
             // now rename directory and partmask
@@ -3639,7 +3656,7 @@ public:
                     if (done) {
                         RemoteFilename oldrfn;
                         part->getFilename(oldrfn,(unsigned)copy);
-                        const char *newfn = (copy==0)?newnamesprim.item(i).text.get():newnamesrep.item(i).text.get();
+                        const char *newfn = newNames.item(i).item(copy);
                         if (!newfn||!*newfn)
                             continue;
                         RemoteFilename newrfn;

+ 3 - 0
system/jlib/jutil.hpp

@@ -141,6 +141,9 @@ extern jlib_decl StringBuffer& decodeUrlUseridPassword(StringBuffer& out, const
 class StringArray : public ArrayOf<const char *, const char *>
 {
 };
+class CIStringArray : public StringArray, public CInterface
+{
+};
 
 // separated list to array
 extern jlib_decl void DelimToStringArray(const char *csl, StringArray &dst, const char * delim, bool deldup=false);

+ 38 - 0
testing/ecl/fileservice.ecl

@@ -0,0 +1,38 @@
+/*##############################################################################
+
+    Copyright (C) 2011 HPCC Systems.
+
+    All rights reserved. This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU Affero General Public License as
+    published by the Free Software Foundation, either version 3 of the
+    License, or (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Affero General Public License for more details.
+
+    You should have received a copy of the GNU Affero General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+############################################################################## */
+
+//noRoxie
+
+import Std.File;
+
+rec := RECORD
+    STRING6 name;
+    INTEGER8 blah;
+    STRING9 value;
+END;
+
+ds := DATASET([{'fruit', 123, 'apple'}, {'fruit', 246, 'ford'}, {'os', 680, 'bsd'}, {'music', 369, 'rhead'}, {'os', 987, 'os'}], rec);
+
+SEQUENTIAL(
+  OUTPUT(ds, , '~regress::renametest.d00', OVERWRITE),
+  File.RenameLogicalFile('~regress::renametest.d00', '~regress::afterrename1.d00'),
+  File.RenameLogicalFile('~regress::afterrename1.d00', '~regress::scope1::scope2::afterrename2.d00'),
+  File.RenameLogicalFile('~regress::scope1::scope2::afterrename2.d00', '~regress::scope1::afterrename3.d00'),
+  OUTPUT(DATASET('~regress::scope1::afterrename3.d00', rec, FLAT)),
+  File.DeleteLogicalFile('~regress::scope1::afterrename3.d00')
+);

+ 9 - 0
testing/ecl/key/fileservice.xml

@@ -0,0 +1,9 @@
+<Dataset name='Result 1'>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><name>fruit </name><blah>123</blah><value>apple    </value></Row>
+ <Row><name>fruit </name><blah>246</blah><value>ford     </value></Row>
+ <Row><name>os    </name><blah>680</blah><value>bsd      </value></Row>
+ <Row><name>music </name><blah>369</blah><value>rhead    </value></Row>
+ <Row><name>os    </name><blah>987</blah><value>os       </value></Row>
+</Dataset>