Bläddra i källkod

Merge pull request #12046 from jakesmith/hpcc-19155

HPCC-19155 Multiple problems with lazy caching.

Reviewed-By: Mark Kelly <mark.kelly@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 6 år sedan
förälder
incheckning
77ba4ffffe

+ 8 - 10
thorlcr/activities/fetch/thfetchslave.cpp

@@ -43,10 +43,11 @@
 
 struct FPosTableEntryIFileIO : public FPosTableEntry
 {
-    FPosTableEntryIFileIO() { file = NULL; }
-    ~FPosTableEntryIFileIO() { ::Release(file); }
-    unsigned location;
-    IDelayedFile *file;
+    ~FPosTableEntryIFileIO()
+    {
+        ::Release(file);
+    }
+    IFileIO *file = nullptr;
 };
 
 class CFetchStream : public IRowStream, implements IStopInput, implements IFetchStream, public CSimpleInterface
@@ -57,7 +58,6 @@ class CFetchStream : public IRowStream, implements IStopInput, implements IFetch
     Linked<IExpander> eexp;
 
     FPosTableEntryIFileIO *fPosMultiPartTable;
-    unsigned tableSize;
     unsigned files, offsetCount;
     CriticalSection stopsect;
     CPartDescriptorArray parts;
@@ -157,9 +157,7 @@ public:
                 e->base = part.queryProperties().getPropInt64("@offset");
                 e->top = e->base + part.queryProperties().getPropInt64("@size");
                 e->index = f;
-
-                Owned<IDelayedFile> lfile = queryThor().queryFileCache().lookup(owner, logicalFilename, part);
-                e->file = lfile.getClear();
+                e->file = queryThor().queryFileCache().lookupIFileIO(owner, logicalFilename, part); // NB: freed by FPosTableEntryIFileIO dtor
             }
         }
     }
@@ -181,8 +179,8 @@ public:
         keyOutStream.setown(distributor->connect(keyRowIf, keyIn, fposHash, NULL, NULL));
     }
     virtual IRowStream *queryOutput() override { return this; }
-    virtual IFileIO *getPartIO(unsigned part) override { assertex(part<files); return fPosMultiPartTable[part].file->getFileIO(); }
-    virtual StringBuffer &getPartName(unsigned part, StringBuffer &out) override { return getPartFilename(parts.item(part), fPosMultiPartTable[part].location, out, true); }
+    virtual IFileIO *getPartIO(unsigned part) override { assertex(part<files); return LINK(fPosMultiPartTable[part].file); }
+    virtual StringBuffer &getPartName(unsigned part, StringBuffer &out) override { return getPartFilename(parts.item(part), 0, out, true); }
     virtual void abort() override
     {
         if (distributor)

+ 2 - 2
thorlcr/activities/indexread/thindexreadslave.cpp

@@ -252,14 +252,14 @@ public:
 
                 // local key handling
 
-                Owned<IDelayedFile> lfile = queryThor().queryFileCache().lookup(*this, logicalFilename, part);
+                Owned<IFileIO> lazyIFileIO = queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part);
 
                 RemoteFilename rfn;
                 part.getFilename(0, rfn);
                 StringBuffer path;
                 rfn.getPath(path); // NB: use for tracing only, IDelayedFile uses IPartDescriptor and any copy
 
-                Owned<IKeyIndex> keyIndex = createKeyIndex(path, crc, *lfile, false, false);
+                Owned<IKeyIndex> keyIndex = createKeyIndex(path, crc, *lazyIFileIO, false, false);
                 Owned<IKeyManager> klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, nullptr, helper->hasNewSegmentMonitors());
                 if (localMerge)
                 {

+ 6 - 4
thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp

@@ -1641,8 +1641,9 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
 
             IPropertyTree const &props = filePart.queryOwner().queryProperties();
             unsigned publishedFormatCrc = (unsigned)props.getPropInt("@formatCrc", 0);
-            Owned<IDelayedFile> lfile = queryThor().queryFileCache().lookup(*this, indexName, filePart);
-            Owned<IKeyIndex> keyIndex = createKeyIndex(filename, crc, *lfile, false, false);
+            Owned<IFileIO> lazyFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, filePart);
+            Owned<IDelayedFile> delayedFile = createDelayedFile(lazyFileIO);
+            Owned<IKeyIndex> keyIndex = createKeyIndex(filename, crc, *delayedFile, false, false);
             keyIndexes.append(*keyIndex.getClear());
         }
     }
@@ -2044,8 +2045,9 @@ public:
                 unsigned i=0;
                 for(; i<dataParts.ordinality(); i++)
                 {
-                    Owned<IDelayedFile> dFile = queryThor().queryFileCache().lookup(*this, indexName, dataParts.item(i), eexp);
-                    fetchFiles.append(*dFile.getClear());
+                    Owned<IFileIO> lazyFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, dataParts.item(i), eexp);
+                    Owned<IDelayedFile> delayedFile = createDelayedFile(lazyFileIO);
+                    fetchFiles.append(*delayedFile.getClear());
                 }
             }
         }

+ 18 - 6
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -912,7 +912,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor
                 {
                     unsigned partNo = partCopy & partMask;
                     unsigned copy = partCopy >> 24;
-                    Owned<IKeyIndex> keyIndex = activity.createPartKeyIndex(partNo, copy);
+                    Owned<IKeyIndex> keyIndex = activity.createPartKeyIndex(partNo, copy, false);
                     partKeySet->addIndex(keyIndex.getClear());
                 }
                 keyManager.setown(createKeyMerger(helper->queryIndexRecordSize()->queryRecordAccessor(true), partKeySet, 0, nullptr, helper->hasNewSegmentMonitors()));
@@ -1797,7 +1797,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor
         }
         return tlkKeyIndexes.ordinality();
     }
-    IKeyIndex *createPartKeyIndex(unsigned partNo, unsigned copy)
+    IKeyIndex *createPartKeyIndex(unsigned partNo, unsigned copy, bool delayed)
     {
         IPartDescriptor &filePart = allIndexParts.item(partNo);
         unsigned crc=0;
@@ -1807,13 +1807,25 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor
         StringBuffer filename;
         rfn.getPath(filename);
 
-        Owned<IDelayedFile> lfile = queryThor().queryFileCache().lookup(*this, indexName, filePart);
-
-        return createKeyIndex(filename, crc, *lfile, false, false);
+        if (delayed)
+        {
+            Owned<IFileIO> lazyFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, filePart);
+            Owned<IDelayedFile> delayedFile = createDelayedFile(lazyFileIO);
+            return createKeyIndex(filename, crc, *delayedFile, false, false);
+        }
+        else
+        {
+            /* NB: createKeyIndex here, will load the key immediately
+             * But that's okay, because we are only here on demand.
+             * The underlying IFileIO can later be closed by fhe file caching mechanism.
+             */
+            Owned<IFileIO> lazyIFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, filePart);
+            return createKeyIndex(filename, crc, *lazyIFileIO, false, false);
+        }
     }
     IKeyManager *createPartKeyManager(unsigned partNo, unsigned copy)
     {
-        Owned<IKeyIndex> keyIndex = createPartKeyIndex(partNo, copy);
+        Owned<IKeyIndex> keyIndex = createPartKeyIndex(partNo, copy, false);
         return createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, nullptr, helper->hasNewSegmentMonitors());
     }
     const void *preparePendingLookupRow(void *row, size32_t maxSz, const void *lhsRow, size32_t keySz)

+ 3 - 1
thorlcr/graph/thgraph.hpp

@@ -1165,9 +1165,11 @@ interface IExpander;
 interface IThorFileCache : extends IInterface
 {
     virtual bool remove(const char *filename) = 0;
-    virtual IDelayedFile *lookup(CActivityBase &activity, const char *logicalFilenae, IPartDescriptor &partDesc, IExpander *expander=NULL) = 0;
+    virtual IFileIO *lookupIFileIO(CActivityBase &activity, const char *logicalFilenae, IPartDescriptor &partDesc, IExpander *expander=nullptr) = 0;
 };
 
+extern graph_decl IDelayedFile *createDelayedFile(IFileIO *iFileIO);
+
 class graph_decl CThorResourceBase : implements IThorResource, public CInterface
 {
 public:

+ 96 - 101
thorlcr/graph/thgraphslave.cpp

@@ -1938,7 +1938,7 @@ IActivityReplicatedFile *createEnsurePrimaryPartFile(const char *logicalFilename
 ///////////////
 
 class CFileCache;
-class CLazyFileIO : public CInterface
+class CLazyFileIO : public CSimpleInterfaceOf<IFileIO>
 {
     CFileCache &cache;
     Owned<IActivityReplicatedFile> repFile;
@@ -1948,6 +1948,7 @@ class CLazyFileIO : public CInterface
     CRuntimeStatisticCollection fileStats;
     CriticalSection crit;
     Owned<IFileIO> iFileIO; // real IFileIO
+    CActivityBase *activity = nullptr;
 
     IFileIO *getFileIO()
     {
@@ -1960,24 +1961,40 @@ class CLazyFileIO : public CInterface
         return iFileIO.getClear();
     }
 public:
-    IMPLEMENT_IINTERFACE;
-
     CLazyFileIO(CFileCache &_cache, const char *_filename, IActivityReplicatedFile *_repFile, bool _compressed, IExpander *_expander)
         : cache(_cache), filename(_filename), repFile(_repFile), compressed(_compressed), expander(_expander), fileStats(diskLocalStatistics)
     {
     }
+    void setActivity(CActivityBase *_activity)
+    {
+        activity = _activity;
+    }
     IFileIO *getOpenFileIO(CActivityBase &activity);
     const char *queryFindString() const { return filename.get(); } // for string HT
-    void close()
+// IFileIO impl.
+    virtual size32_t read(offset_t pos, size32_t len, void * data) override
+    {
+        Owned<IFileIO> iFileIO = getOpenFileIO(*activity);
+        return iFileIO->read(pos, len, data);
+    }
+    virtual offset_t size() override
+    {
+        Owned<IFileIO> iFileIO = getOpenFileIO(*activity);
+        return iFileIO->size();
+    }
+    virtual void close() override
     {
+        /* NB: clears CLazyFileIO's ownership of the underlying IFileIO, there will be disposed on exit of this function if no other references,
+         * and as a result will close the underlying file handle or remote connection.
+         * There can be concurrent threads and theoretically other threads could be in some of the other CLazyFileIO methods and still have
+         * references to the underlying iFileIO, in which case the last thread referencing it will release, dispose and close handle.
+         * But given that we are in probably here via purgeOldest(), then it is very unlikely that there is an active read at this time.
+         */
         Owned<IFileIO> openiFileIO = getClearFileIO();
         if (openiFileIO)
-        {
-            openiFileIO->close();
             mergeStats(fileStats, openiFileIO);
-        }
     }
-    unsigned __int64 getStatistic(StatisticKind kind)
+    virtual unsigned __int64 getStatistic(StatisticKind kind) override
     {
         switch (kind)
         {
@@ -1991,99 +2008,39 @@ public:
         unsigned __int64 openValue = openiFileIO ? openiFileIO->getStatistic(kind) : 0;
         return openValue + fileStats.getStatisticValue(kind);
     }
+    virtual size32_t write(offset_t pos, size32_t len, const void * data) override
+    {
+        throwUnexpectedX("CDelayedFileWrapper::write() called for a cached IFileIO object");
+    }
+    virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1) override
+    {
+        throwUnexpectedX("CDelayedFileWrapper::appendFile() called for a cached IFileIO object");
+    }
+    virtual void setSize(offset_t size) override
+    {
+        throwUnexpectedX("CDelayedFileWrapper::setSize() called for a cached IFileIO object");
+    }
+    virtual void flush() override
+    {
+        throwUnexpectedX("CDelayedFileWrapper::flush() called for a cached IFileIO object");
+    }
 };
 
-class CFileCache : public CInterface, implements IThorFileCache
+class CFileCache : public CSimpleInterfaceOf<IThorFileCache>
 {
     OwningStringSuperHashTableOf<CLazyFileIO> files;
-    CICopyArrayOf<CLazyFileIO> openFiles;
+    ICopyArrayOf<CLazyFileIO> openFiles;
     unsigned limit, purgeN;
     CriticalSection crit;
 
-    class CDelayedFileWapper : public CSimpleInterfaceOf<IDelayedFile>, implements IFileIO
-    {
-        typedef CSimpleInterfaceOf<IDelayedFile> PARENT;
-
-        CFileCache &cache;
-        CActivityBase &activity;
-        Linked<CLazyFileIO> lFile;
-        CriticalSection crit;
-
-    public:
-        IMPLEMENT_IINTERFACE_USING(PARENT);
-
-        CDelayedFileWapper(CFileCache &_cache, CActivityBase &_activity, CLazyFileIO &_lFile) : cache(_cache), activity(_activity), lFile(&_lFile) { }
-
-        ~CDelayedFileWapper()
-        {
-            cache.remove(lFile->queryFindString());
-        }
-        // IDelayedFile impl.
-        virtual IMemoryMappedFile *getMappedFile() override { return nullptr; }
-        virtual IFileIO *getFileIO() override
-        {
-            // NB: lFile needs an activity to open fileIO
-            return lFile->getOpenFileIO(activity);
-        }
-        // IFileIO impl.
-        virtual size32_t read(offset_t pos, size32_t len, void * data) override
-        {
-            Owned<IFileIO> iFileIO = lFile->getOpenFileIO(activity);
-            return iFileIO->read(pos, len, data);
-        }
-        virtual offset_t size() override
-        {
-            Owned<IFileIO> iFileIO = lFile->getOpenFileIO(activity);
-            return iFileIO->size();
-        }
-        virtual size32_t write(offset_t pos, size32_t len, const void * data) override
-        {
-            Owned<IFileIO> iFileIO = lFile->getOpenFileIO(activity);
-            return iFileIO->write(pos, len, data);
-        }
-        virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1) override
-        {
-            Owned<IFileIO> iFileIO = lFile->getOpenFileIO(activity);
-            return iFileIO->appendFile(file, pos, len);
-        }
-        virtual void setSize(offset_t size) override
-        {
-            Owned<IFileIO> iFileIO = lFile->getOpenFileIO(activity);
-            iFileIO->setSize(size);
-        }
-        virtual void flush() override
-        {
-            Owned<IFileIO> iFileIO = lFile->getOpenFileIO(activity);
-            iFileIO->flush();
-        }
-        virtual void close() override
-        {
-            lFile->close();
-        }
-        virtual unsigned __int64 getStatistic(StatisticKind kind) override
-        {
-            return lFile->getStatistic(kind);
-        }
-    };
-
     void purgeOldest()
     {
+        // NB: called in crit
         // will be ordered oldest first.
-        unsigned count = 0;
-        CICopyArrayOf<CLazyFileIO> toClose;
-        ForEachItemIn(o, openFiles)
-        {
-            CLazyFileIO &lFile = openFiles.item(o);
-            toClose.append(lFile);
-            if (++count>=purgeN) // crude for now, just remove oldest N
-                break;
-        }
-        ForEachItemIn(r, toClose)
-        {
-            CLazyFileIO &lFile = toClose.item(r);
-            lFile.close();
-            openFiles.zap(lFile);
-        }
+        dbgassertex(purgeN >= openFiles.ordinality()); // purgeOldest() should not be called unless >= limit, and purgeN always >= limit.
+        for (unsigned i=0; i<purgeN; i++)
+            openFiles.item(i).close();
+        openFiles.removen(0, purgeN);
     }
     bool _remove(const char *filename)
     {
@@ -2094,8 +2051,6 @@ class CFileCache : public CInterface, implements IThorFileCache
         return true;
     }
 public:
-    IMPLEMENT_IINTERFACE;
-
     CFileCache(unsigned _limit) : limit(_limit)
     {
         assertex(limit);
@@ -2103,7 +2058,6 @@ public:
         if (purgeN > limit) purgeN=limit; // why would it be, but JIC.
         PROGLOG("FileCache: limit = %d, purgeN = %d", limit, purgeN);
     }
-
     void opening(CLazyFileIO &lFile)
     {
         CriticalBlock b(crit);
@@ -2112,34 +2066,46 @@ public:
             purgeOldest(); // will close purgeN
             assertex(openFiles.ordinality() < limit);
         }
+        // NB: moves to end if already in openFiles, meaning head of openFiles are oldest used
         openFiles.zap(lFile);
         openFiles.append(lFile);
     }
-
 // IThorFileCache impl.
-    virtual bool remove(const char *filename)
+    virtual bool remove(const char *filename) override
     {
         CriticalBlock b(crit);
         return _remove(filename);
     }
-    virtual IDelayedFile *lookup(CActivityBase &activity, const char *logicalFilename, IPartDescriptor &partDesc, IExpander *expander)
+    virtual IFileIO *lookupIFileIO(CActivityBase &activity, const char *logicalFilename, IPartDescriptor &partDesc, IExpander *expander) override
     {
         StringBuffer filename;
         RemoteFilename rfn;
         partDesc.getFilename(0, rfn);
         rfn.getPath(filename);
         CriticalBlock b(crit);
-        Linked<CLazyFileIO> file = files.find(filename.str());
+        CLazyFileIO *file = files.find(filename.str());
         if (!file)
         {
             Owned<IActivityReplicatedFile> repFile = createEnsurePrimaryPartFile(logicalFilename, &partDesc);
             bool compressed = partDesc.queryOwner().isCompressed();
-            file.setown(new CLazyFileIO(*this, filename.str(), repFile.getClear(), compressed, expander));
+            file = new CLazyFileIO(*this, filename.str(), repFile.getClear(), compressed, expander);
+            files.replace(* file); // NB: files HT owns
+
+            /* NB: there will be 1 CLazyFileIO per physical file part name
+             * They will be linked by multiple lookups
+             *
+             * When the file cache hits the limit, it will calll CLazyFileIO.close()
+             * This does not actually close the file, but releases CLazyFileIO's underlying real IFileIO
+             * Each active CLazyFileIO file op. has a link to the underlying IFileIO.
+             * Meaning, that only when there are no active ops. and close() has exited, is the underlying
+             * real IFileIO actually freed and the file handle closed.
+             */
         }
-        files.replace(*LINK(file));
-        return new CDelayedFileWapper(*this, activity, *file); // to avoid circular dependency and allow destruction to remove from cache
+        file->setActivity(&activity); // an activity needed by IActivityReplicatedFile, mainly for logging purposes.
+        return LINK(file);
     }
 };
+
 ////
 IFileIO *CLazyFileIO::getOpenFileIO(CActivityBase &activity)
 {
@@ -2165,6 +2131,35 @@ IThorFileCache *createFileCache(unsigned limit)
     return new CFileCache(limit);
 }
 
+IDelayedFile *createDelayedFile(IFileIO *iFileIO)
+{
+    /* NB: all this serves to do, is to create IDelayedFile shell
+     * It does not implement the delay itself, the CLazyFileIO it links to does that.
+     * However a IDelayedFile is expected/used by some jhtree mechanism, as a further
+     * delayed route before invoking key loads.
+     */
+
+    class CDelayedFileWrapper : public CSimpleInterfaceOf<IDelayedFile>
+    {
+        Linked<IFileIO> lFile;
+
+    public:
+        CDelayedFileWrapper(IFileIO *_lFile) : lFile(_lFile) { }
+        ~CDelayedFileWrapper()
+        {
+        }
+        // IDelayedFile impl.
+        virtual IMemoryMappedFile *getMappedFile() override { return nullptr; }
+        virtual IFileIO *getFileIO() override
+        {
+            return lFile.getLink();
+        }
+    };
+
+    // NB: CLazyFileIO can't implement IDelayedFile, purely because it's method would cause circular links
+    return new CDelayedFileWrapper(iFileIO);
+}
+
 /*
  * strand stuff
  */