Browse Source

HPCC-21871 Ensure lazy file io objects removed from cache

Remove CLazyFileIO objects on descruction from cache.
Also include crc of parts in cache id.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 6 năm trước cách đây
mục cha
commit
1464d3678b
2 tập tin đã thay đổi với 35 bổ sung16 xóa
  1. 1 1
      thorlcr/graph/thgraph.hpp
  2. 34 15
      thorlcr/graph/thgraphslave.cpp

+ 1 - 1
thorlcr/graph/thgraph.hpp

@@ -1164,7 +1164,7 @@ interface IDelayedFile;
 interface IExpander;
 interface IThorFileCache : extends IInterface
 {
-    virtual bool remove(const char *filename) = 0;
+    virtual bool remove(const char *filename, unsigned crc) = 0;
     virtual IFileIO *lookupIFileIO(CActivityBase &activity, const char *logicalFilenae, IPartDescriptor &partDesc, IExpander *expander=nullptr) = 0;
 };
 

+ 34 - 15
thorlcr/graph/thgraphslave.cpp

@@ -1943,17 +1943,17 @@ IActivityReplicatedFile *createEnsurePrimaryPartFile(const char *logicalFilename
 ///////////////
 
 class CFileCache;
-class CLazyFileIO : public CSimpleInterfaceOf<IFileIO>
+class CLazyFileIO : public CInterfaceOf<IFileIO>
 {
     CFileCache &cache;
     Owned<IActivityReplicatedFile> repFile;
     Linked<IExpander> expander;
     bool compressed;
-    StringAttr filename;
     CRuntimeStatisticCollection fileStats;
     CriticalSection crit;
     Owned<IFileIO> iFileIO; // real IFileIO
     CActivityBase *activity = nullptr;
+    StringAttr filename, id;
 
     IFileIO *getFileIO()
     {
@@ -1966,16 +1966,17 @@ class CLazyFileIO : public CSimpleInterfaceOf<IFileIO>
         return iFileIO.getClear();
     }
 public:
-    CLazyFileIO(CFileCache &_cache, const char *_filename, IActivityReplicatedFile *_repFile, bool _compressed, IExpander *_expander)
-        : cache(_cache), filename(_filename), repFile(_repFile), compressed(_compressed), expander(_expander), fileStats(diskLocalStatistics)
+    CLazyFileIO(CFileCache &_cache, const char *_filename, const char *_id, IActivityReplicatedFile *_repFile, bool _compressed, IExpander *_expander)
+        : cache(_cache), filename(_filename), id(_id), repFile(_repFile), compressed(_compressed), expander(_expander), fileStats(diskLocalStatistics)
     {
     }
+    virtual void beforeDispose() override;
     void setActivity(CActivityBase *_activity)
     {
         activity = _activity;
     }
     IFileIO *getOpenFileIO(CActivityBase &activity);
-    const char *queryFindString() const { return filename.get(); } // for string HT
+    const char *queryFindString() const { return id; } // for string HT
 // IFileIO impl.
     virtual size32_t read(offset_t pos, size32_t len, void * data) override
     {
@@ -2033,7 +2034,7 @@ public:
 
 class CFileCache : public CSimpleInterfaceOf<IThorFileCache>
 {
-    OwningStringSuperHashTableOf<CLazyFileIO> files;
+    StringSuperHashTableOf<CLazyFileIO> files; // NB: table doesn't own entries, entries remove themselves on destruction.
     ICopyArrayOf<CLazyFileIO> openFiles;
     unsigned limit, purgeN;
     CriticalSection crit;
@@ -2047,12 +2048,13 @@ class CFileCache : public CSimpleInterfaceOf<IThorFileCache>
             openFiles.item(i).close();
         openFiles.removen(0, purgeN);
     }
-    bool _remove(const char *filename)
+    bool _remove(const char *id)
     {
-        Linked<CLazyFileIO> lFile = files.find(filename);
+        CLazyFileIO *lFile = files.find(id);
+        if (!lFile) return false;
         bool ret = files.removeExact(lFile);
         if (!ret) return false;
-        openFiles.zap(*lFile.get());
+        openFiles.zap(*lFile);
         return true;
     }
 public:
@@ -2075,11 +2077,19 @@ public:
         openFiles.zap(lFile);
         openFiles.append(lFile);
     }
+    bool remove(const char *id)
+    {
+        CriticalBlock b(crit);
+        return _remove(id);
+    }
 // IThorFileCache impl.
-    virtual bool remove(const char *filename) override
+    virtual bool remove(const char *filename, unsigned crc) override
     {
+        StringBuffer id(filename);
+        if (crc)
+            id.append(crc);
         CriticalBlock b(crit);
-        return _remove(filename);
+        return _remove(id);
     }
     virtual IFileIO *lookupIFileIO(CActivityBase &activity, const char *logicalFilename, IPartDescriptor &partDesc, IExpander *expander) override
     {
@@ -2087,14 +2097,18 @@ public:
         RemoteFilename rfn;
         partDesc.getFilename(0, rfn);
         rfn.getPath(filename);
+        StringBuffer id(filename);
+        unsigned crc = partDesc.queryProperties().getPropInt("@fileCrc");
+        if (crc)
+            id.append(crc);
         CriticalBlock b(crit);
-        CLazyFileIO *file = files.find(filename.str());
+        Linked<CLazyFileIO> file = files.find(id);
         if (!file)
         {
             Owned<IActivityReplicatedFile> repFile = createEnsurePrimaryPartFile(logicalFilename, &partDesc);
             bool compressed = partDesc.queryOwner().isCompressed();
-            file = new CLazyFileIO(*this, filename.str(), repFile.getClear(), compressed, expander);
-            files.replace(* file); // NB: files HT owns
+            file.setown(new CLazyFileIO(*this, filename, id, repFile.getClear(), compressed, expander));
+            files.replace(* file); // NB: files does not own 'file', CLazyFileIO will remove itself from cache on destruction
 
             /* NB: there will be 1 CLazyFileIO per physical file part name
              * They will be linked by multiple lookups
@@ -2107,11 +2121,16 @@ public:
              */
         }
         file->setActivity(&activity); // an activity needed by IActivityReplicatedFile, mainly for logging purposes.
-        return LINK(file);
+        return file.getClear();
     }
 };
 
 ////
+void CLazyFileIO::beforeDispose()
+{
+    cache.remove(id);
+}
+
 IFileIO *CLazyFileIO::getOpenFileIO(CActivityBase &activity)
 {
     CriticalBlock b(crit);