Преглед изворни кода

Merge pull request #5328 from jakesmith/hpcc-10768

HPCC-10768 - rollback consistent Thor file locking for 4.2.2

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday пре 11 година
родитељ
комит
3b84d0fcd0

+ 2 - 1
thorlcr/activities/diskread/thdiskread.cpp

@@ -30,7 +30,7 @@ class CDiskReadMasterVF : public CDiskReadMasterBase
 {
 public:
     CDiskReadMasterVF(CMasterGraphElement *info) : CDiskReadMasterBase(info) { }
-    virtual void validateFile()
+    virtual void validateFile(IDistributedFile *file)
     {
         IHThorDiskReadBaseArg *helper = (IHThorDiskReadBaseArg *)queryHelper();
         bool codeGenGrouped = 0 != (TDXgrouped & helper->getFlags());
@@ -165,6 +165,7 @@ public:
         {
             if (!helper->hasSegmentMonitors() && !helper->hasFilter() && !(helper->getFlags() & TDXtemporary))
             {
+                Owned<IDistributedFile> file = queryThorFileManager().lookup(container.queryJob(), fileName, 0 != ((TDXtemporary|TDXjobtemp) & helper->getFlags()), 0 != (TDRoptional & helper->getFlags()));
                 if (file.get() && canMatch)
                 {
                     if (0 != (TDRunfilteredcount & helper->getFlags()) && file->queryAttributes().hasProp("@recordCount"))

+ 4 - 3
thorlcr/activities/hashdistrib/thhashdistrib.cpp

@@ -127,7 +127,7 @@ public:
 class IndexDistributeActivityMaster : public HashDistributeMasterBase
 {
     MemoryBuffer tlkMb;
-    Owned<IDistributedFile> file;
+    OwnedRoxieString indexFileName;
 
 public:
     IndexDistributeActivityMaster(CMasterGraphElement *info) : HashDistributeMasterBase(DM_index, info) { }
@@ -139,9 +139,9 @@ public:
         IHThorKeyedDistributeArg *helper = (IHThorKeyedDistributeArg *)queryHelper();
 
         StringBuffer scoped;
-        OwnedRoxieString indexFileName(helper->getIndexFileName());
+        indexFileName.setown(helper->getIndexFileName());
         queryThorFileManager().addScope(container.queryJob(), indexFileName, scoped);
-        file.setown(queryThorFileManager().lookup(container.queryJob(), indexFileName));
+        Owned<IDistributedFile> file = queryThorFileManager().lookup(container.queryJob(), indexFileName);
         if (!file)
             throw MakeActivityException(this, 0, "KeyedDistribute: Failed to find key: %s", scoped.str());
         if (0 == file->numParts())
@@ -173,6 +173,7 @@ public:
     virtual void done()
     {
         HashDistributeMasterBase::done();
+        Owned<IDistributedFile> file = queryThorFileManager().lookup(container.queryJob(), indexFileName, false, true);
         if (file)
             file->setAccessed();
     }

+ 11 - 7
thorlcr/activities/keypatch/thkeypatch.cpp

@@ -31,7 +31,7 @@ class CKeyPatchMaster : public CMasterActivity
     bool local;
     unsigned width;
     StringArray clusters;
-    Owned<IDistributedFile> originalIndexFile, patchFile;
+    OwnedRoxieString originalName, patchName;
 
 public:
     CKeyPatchMaster(CMasterGraphElement *info) : CMasterActivity(info)
@@ -44,10 +44,10 @@ public:
     {
         helper = (IHThorKeyPatchArg *)queryHelper();
 
-        OwnedRoxieString originalName(helper->getOriginalName());
-        OwnedRoxieString patchName(helper->getPatchName());
-        originalIndexFile.setown(queryThorFileManager().lookup(container.queryJob(), originalName));
-        patchFile.setown(queryThorFileManager().lookup(container.queryJob(), patchName));
+        originalName.setown(helper->getOriginalName());
+        patchName.setown(helper->getPatchName());
+        Owned<IDistributedFile> originalIndexFile = queryThorFileManager().lookup(container.queryJob(), originalName);
+        Owned<IDistributedFile> patchFile = queryThorFileManager().lookup(container.queryJob(), patchName);
         
         if (originalIndexFile->numParts() != patchFile->numParts())
             throw MakeActivityException(this, TE_KeyPatchIndexSizeMismatch, "Index %s and patch %s differ in width", originalName.get(), patchName.get());
@@ -164,8 +164,12 @@ public:
 
         container.queryTempHandler()->registerFile(outputName, container.queryOwner().queryGraphId(), 0, false, WUFileStandard, &clusters);
         queryThorFileManager().publish(container.queryJob(), outputName, false, *newIndexDesc);
-        originalIndexFile->setAccessed();
-        patchFile->setAccessed();
+        Owned<IDistributedFile> originalIndexFile = queryThorFileManager().lookup(container.queryJob(), originalName, false, true);
+        if (originalIndexFile)
+	        originalIndexFile->setAccessed();
+        Owned<IDistributedFile> patchFile = queryThorFileManager().lookup(container.queryJob(), patchName, false, true);
+        if (patchFile)
+	        patchFile->setAccessed();
     }
     void preStart(size32_t parentExtractSz, const byte *parentExtract)
     {

+ 10 - 6
thorlcr/activities/msort/thmsort.cpp

@@ -56,8 +56,8 @@ class CMSortActivityMaster : public CMasterActivity
     IThorSorterMaster *imaster;
     mptag_t mpTagRPC, barrierMpTag;
     Owned<IBarrier> barrier;
-    Owned<IDistributedFile> coSortFile;
-    
+    OwnedRoxieString cosortlogname;
+
 public:
     CMSortActivityMaster(CMasterGraphElement *info)
       : CMasterActivity(info)
@@ -85,6 +85,7 @@ protected:
             Owned<IException> e = MakeActivityException(this, 0, "Ignoring, unsupported sort order algorithm '%s'", algoname.get());
             reportExceptionToWorkunit(container.queryJob().queryWorkUnit(), e);
         }
+        cosortlogname.setown(helper->getSortedFilename());
     }
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
     {
@@ -137,10 +138,9 @@ protected:
                 skewThreshold = container.queryJob().getWorkUnitValueInt("defaultSkewThreshold", 0);
         }
         StringBuffer cosortfilenames;
-        OwnedRoxieString cosortlogname(helper->getSortedFilename());
         if (cosortlogname&&*cosortlogname)
         {
-            coSortFile.setown(queryThorFileManager().lookup(container.queryJob(), cosortlogname));
+            Owned<IDistributedFile> coSortFile = queryThorFileManager().lookup(container.queryJob(), cosortlogname);
             Owned<IFileDescriptor> fileDesc = coSortFile->getFileDescriptor();
             queryThorFileManager().noteFileRead(container.queryJob(), coSortFile);
             unsigned o;
@@ -195,8 +195,12 @@ protected:
     {
         ActPrintLog("done");
         CMasterActivity::done();
-        if (coSortFile)
-            coSortFile->setAccessed();
+        if (cosortlogname&&*cosortlogname)
+        {
+            Owned<IDistributedFile> coSortFile = queryThorFileManager().lookup(container.queryJob(), cosortlogname, false, true);
+            if (coSortFile)
+                coSortFile->setAccessed();
+        }
         ActPrintLog("done exit");
     }
 };

+ 3 - 2
thorlcr/activities/thdiskbase.cpp

@@ -39,7 +39,7 @@ void CDiskReadMasterBase::init()
 {
     IHThorDiskReadBaseArg *helper = (IHThorDiskReadBaseArg *) queryHelper();
     fileName.setown(helper->getFileName());
-    file.setown(queryThorFileManager().lookup(container.queryJob(), fileName, 0 != ((TDXtemporary|TDXjobtemp) & helper->getFlags()), 0 != (TDRoptional & helper->getFlags()), true));
+    Owned<IDistributedFile> file = queryThorFileManager().lookup(container.queryJob(), fileName, 0 != ((TDXtemporary|TDXjobtemp) & helper->getFlags()), 0 != (TDRoptional & helper->getFlags()), true);
 
     if (file)
     {
@@ -74,7 +74,7 @@ void CDiskReadMasterBase::init()
                 }
             }
         }
-        validateFile();
+        validateFile(file);
         void *ekey;
         size32_t ekeylen;
         helper->getEncryptKey(ekeylen,ekey);
@@ -116,6 +116,7 @@ void CDiskReadMasterBase::done()
     fileDesc.clear();
     if (!abortSoon) // in case query has relinquished control of file usage to another query (e.g. perists)
     {
+        Owned<IDistributedFile> file = queryThorFileManager().lookup(container.queryJob(), fileName, 0 != ((TDXtemporary|TDXjobtemp) & helper->getFlags()), 0 != (TDRoptional & helper->getFlags()), true);
         if (file)
             file->setAccessed();
     }

+ 1 - 2
thorlcr/activities/thdiskbase.ipp

@@ -33,14 +33,13 @@ protected:
     IHash *hash;
     Owned<ProgressInfo> inputProgress;
     OwnedRoxieString fileName;
-    Owned<IDistributedFile> file;
 
 public:
     CDiskReadMasterBase(CMasterGraphElement *info);
     void init();
     void serializeSlaveData(MemoryBuffer &dst, unsigned slave);
     void done();
-    virtual void validateFile() { }
+    virtual void validateFile(IDistributedFile *file) { }
     void deserializeStats(unsigned node, MemoryBuffer &mb);
     void getXGMML(unsigned idx, IPropertyTree *edge);
 };