Explorar o código

Merge pull request #10023 from jakesmith/hpcc-15675

HPCC-15675 Refactor lazy fileio

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman %!s(int64=8) %!d(string=hai) anos
pai
achega
e85d83a69e

+ 6 - 6
roxie/ccd/ccdfile.cpp

@@ -389,22 +389,22 @@ public:
     virtual const char *queryFilename() { return logical->queryFilename(); }
     virtual const char *queryFilename() { return logical->queryFilename(); }
     virtual bool isAlive() const { return CInterface::isAlive(); }
     virtual bool isAlive() const { return CInterface::isAlive(); }
 
 
-    virtual IMemoryMappedFile *queryMappedFile()
+    virtual IMemoryMappedFile *getMappedFile() override
     {
     {
         CriticalBlock b(crit);
         CriticalBlock b(crit);
         if (mmapped)
         if (mmapped)
-            return mmapped;
+            return mmapped.getLink();
         if (!remote)
         if (!remote)
         {
         {
             mmapped.setown(logical->openMemoryMapped());
             mmapped.setown(logical->openMemoryMapped());
-            return mmapped;
+            return mmapped.getLink();
         }
         }
-        return NULL;
+        return nullptr;
     }
     }
 
 
-    virtual IFileIO *queryFileIO()
+    virtual IFileIO *getFileIO() override
     {
     {
-        return this;
+        return LINK(this);
     }
     }
 
 
 
 

+ 1 - 1
roxie/ccd/ccdfile.hpp

@@ -48,7 +48,7 @@ interface ILazyFileIO : extends IFileIO
     virtual void close() = 0;
     virtual void close() = 0;
     virtual void setCopying(bool copying) = 0;
     virtual void setCopying(bool copying) = 0;
     virtual bool isCopying() const = 0;
     virtual bool isCopying() const = 0;
-    virtual IMemoryMappedFile *queryMappedFile() = 0;
+    virtual IMemoryMappedFile *getMappedFile() = 0;
 
 
     virtual void setCache(const IRoxieFileCache *) = 0;
     virtual void setCache(const IRoxieFileCache *) = 0;
     virtual void removeCache(const IRoxieFileCache *) = 0;
     virtual void removeCache(const IRoxieFileCache *) = 0;

+ 10 - 6
system/jhtree/jhtree.cpp

@@ -1995,7 +1995,8 @@ class CLazyKeyIndex : implements IKeyIndex, public CInterface
 {
 {
     StringAttr keyfile;
     StringAttr keyfile;
     unsigned crc; 
     unsigned crc; 
-    Linked<IDelayedFile> iFileIO;
+    Linked<IDelayedFile> delayedFile;
+    Owned<IFileIO> iFileIO;
     Owned<IKeyIndex> realKey;
     Owned<IKeyIndex> realKey;
     CriticalSection c;
     CriticalSection c;
     bool isTLK;
     bool isTLK;
@@ -2006,11 +2007,14 @@ class CLazyKeyIndex : implements IKeyIndex, public CInterface
         CriticalBlock b(c);
         CriticalBlock b(c);
         if (!realKey)
         if (!realKey)
         {
         {
-            IMemoryMappedFile *mapped = useMemoryMappedIndexes ? iFileIO->queryMappedFile() : 0;
+            Owned<IMemoryMappedFile> mapped = useMemoryMappedIndexes ? delayedFile->getMappedFile() : nullptr;
             if (mapped)
             if (mapped)
                 realKey.setown(queryKeyStore()->load(keyfile, crc, mapped, isTLK, preloadAllowed));
                 realKey.setown(queryKeyStore()->load(keyfile, crc, mapped, isTLK, preloadAllowed));
             else
             else
-                realKey.setown(queryKeyStore()->load(keyfile, crc, iFileIO->queryFileIO(), isTLK, preloadAllowed));
+            {
+                iFileIO.setown(delayedFile->getFileIO());
+                realKey.setown(queryKeyStore()->load(keyfile, crc, iFileIO, isTLK, preloadAllowed));
+            }
             if (!realKey)
             if (!realKey)
             {
             {
                 DBGLOG("Lazy key file %s could not be opened", keyfile.get());
                 DBGLOG("Lazy key file %s could not be opened", keyfile.get());
@@ -2022,8 +2026,8 @@ class CLazyKeyIndex : implements IKeyIndex, public CInterface
 
 
 public:
 public:
     IMPLEMENT_IINTERFACE;
     IMPLEMENT_IINTERFACE;
-    CLazyKeyIndex(const char *_keyfile, unsigned _crc, IDelayedFile *_iFileIO, bool _isTLK, bool _preloadAllowed)
-        : keyfile(_keyfile), crc(_crc), iFileIO(_iFileIO), isTLK(_isTLK), preloadAllowed(_preloadAllowed)
+    CLazyKeyIndex(const char *_keyfile, unsigned _crc, IDelayedFile *_delayedFile, bool _isTLK, bool _preloadAllowed)
+        : keyfile(_keyfile), crc(_crc), delayedFile(_delayedFile), isTLK(_isTLK), preloadAllowed(_preloadAllowed)
     {}
     {}
 
 
     virtual bool IsShared() const { return CInterface::IsShared(); }
     virtual bool IsShared() const { return CInterface::IsShared(); }
@@ -2047,7 +2051,7 @@ public:
     virtual offset_t queryMetadataHead() { return checkOpen().queryMetadataHead(); }
     virtual offset_t queryMetadataHead() { return checkOpen().queryMetadataHead(); }
     virtual IPropertyTree * getMetadata() { return checkOpen().getMetadata(); }
     virtual IPropertyTree * getMetadata() { return checkOpen().getMetadata(); }
     virtual unsigned getNodeSize() { return checkOpen().getNodeSize(); }
     virtual unsigned getNodeSize() { return checkOpen().getNodeSize(); }
-    virtual const IFileIO *queryFileIO() const override { return iFileIO->queryFileIO(); }
+    virtual const IFileIO *queryFileIO() const override { return iFileIO; } // NB: if not yet opened, will be null
 };
 };
 
 
 extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, IFileIO &iFileIO, bool isTLK, bool preloadAllowed)
 extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, IFileIO &iFileIO, bool isTLK, bool preloadAllowed)

+ 2 - 2
system/jhtree/jhtree.hpp

@@ -30,8 +30,8 @@
 
 
 interface jhtree_decl IDelayedFile : public IInterface
 interface jhtree_decl IDelayedFile : public IInterface
 {
 {
-    virtual IMemoryMappedFile *queryMappedFile() = 0;
-    virtual IFileIO *queryFileIO() = 0;
+    virtual IMemoryMappedFile *getMappedFile() = 0;
+    virtual IFileIO *getFileIO() = 0;
 };
 };
 
 
 interface jhtree_decl IKeyCursor : public IInterface
 interface jhtree_decl IKeyCursor : public IInterface

+ 11 - 8
thorlcr/activities/fetch/thfetchslave.cpp

@@ -172,17 +172,17 @@ public:
     }
     }
 
 
     // IFetchStream
     // IFetchStream
-    virtual void start(IRowStream *_keyIn)
+    virtual void start(IRowStream *_keyIn) override
     {
     {
         fposHash = new CFPosHandler(*iFetchHandler, offsetCount, offsetTable);
         fposHash = new CFPosHandler(*iFetchHandler, offsetCount, offsetTable);
         keyIn.set(_keyIn);
         keyIn.set(_keyIn);
         distributor = createHashDistributor(&owner, owner.queryContainer().queryJobChannel().queryJobComm(), tag, false, this, "FetchStream");
         distributor = createHashDistributor(&owner, owner.queryContainer().queryJobChannel().queryJobComm(), tag, false, this, "FetchStream");
         keyOutStream.setown(distributor->connect(keyRowIf, keyIn, fposHash, NULL));
         keyOutStream.setown(distributor->connect(keyRowIf, keyIn, fposHash, NULL));
     }
     }
-    virtual IRowStream *queryOutput() { return this; }
-    virtual IFileIO *queryPartIO(unsigned part) { assertex(part<files); return fPosMultiPartTable[part].file->queryFileIO(); }
-    virtual StringBuffer &getPartName(unsigned part, StringBuffer &out) { return getPartFilename(parts.item(part), fPosMultiPartTable[part].location, out, true); }
-    virtual void abort()
+    virtual IRowStream *queryOutput() override { return this; }
+    virtual IFileIO *getPartIO(unsigned part) override { assertex(part<files); return fPosMultiPartTable[part].file->getFileIO(); }
+    virtual StringBuffer &getPartName(unsigned part, StringBuffer &out) override { return getPartFilename(parts.item(part), fPosMultiPartTable[part].location, out, true); }
+    virtual void abort() override
     {
     {
         if (distributor)
         if (distributor)
             distributor->abort();
             distributor->abort();
@@ -520,7 +520,8 @@ public:
     CFetchSlaveActivity(CGraphElementBase *container) : CFetchSlaveBase(container) { }
     CFetchSlaveActivity(CGraphElementBase *container) : CFetchSlaveBase(container) { }
     virtual size32_t fetch(ARowBuilder & rowBuilder, const void *keyRow, unsigned filePartIndex, unsigned __int64 localFpos, unsigned __int64 fpos)
     virtual size32_t fetch(ARowBuilder & rowBuilder, const void *keyRow, unsigned filePartIndex, unsigned __int64 localFpos, unsigned __int64 fpos)
     {
     {
-        Owned<ISerialStream> stream = createFileSerialStream(fetchStream->queryPartIO(filePartIndex), localFpos);
+        Owned<IFileIO> partIO = fetchStream->getPartIO(filePartIndex);
+        Owned<ISerialStream> stream = createFileSerialStream(partIO, localFpos);
         CThorStreamDeserializerSource ds(stream);
         CThorStreamDeserializerSource ds(stream);
         RtlDynamicRowBuilder fetchedRowBuilder(fetchDiskRowIf->queryRowAllocator());
         RtlDynamicRowBuilder fetchedRowBuilder(fetchDiskRowIf->queryRowAllocator());
         size32_t fetchedLen = fetchDiskRowIf->queryRowDeserializer()->deserialize(fetchedRowBuilder, ds);
         size32_t fetchedLen = fetchDiskRowIf->queryRowDeserializer()->deserialize(fetchedRowBuilder, ds);
@@ -557,7 +558,8 @@ public:
     }
     }
     virtual size32_t fetch(ARowBuilder & rowBuilder, const void *keyRow, unsigned filePartIndex, unsigned __int64 localFpos, unsigned __int64 fpos)
     virtual size32_t fetch(ARowBuilder & rowBuilder, const void *keyRow, unsigned filePartIndex, unsigned __int64 localFpos, unsigned __int64 fpos)
     {
     {
-        Owned<ISerialStream> inputStream = createFileSerialStream(fetchStream->queryPartIO(filePartIndex), localFpos);
+        Owned<IFileIO> partIO = fetchStream->getPartIO(filePartIndex);
+        Owned<ISerialStream> inputStream = createFileSerialStream(partIO, localFpos);
         if (inputStream->eos())
         if (inputStream->eos())
             return 0;
             return 0;
         size32_t minRequired = 4096; // MORE - make configurable
         size32_t minRequired = 4096; // MORE - make configurable
@@ -630,7 +632,8 @@ public:
         lastMatches = new Owned<IColumnProvider>[files];
         lastMatches = new Owned<IColumnProvider>[files];
         for (f=0; f<files; f++)
         for (f=0; f<files; f++)
         {
         {
-            streams[f].setown(createBufferedIOStream(fetchStream->queryPartIO(f)));
+            Owned<IFileIO> partIO = fetchStream->getPartIO(f);
+            streams[f].setown(createBufferedIOStream(partIO));
             // NB: the index is based on path iteration matches, so on lookup the elements start at positioned stream
             // NB: the index is based on path iteration matches, so on lookup the elements start at positioned stream
             // i.e. getXmlIteratorPath not used (or supplied) here.
             // i.e. getXmlIteratorPath not used (or supplied) here.
             if (container.getKind()==TAKjsonfetch)
             if (container.getKind()==TAKjsonfetch)

+ 1 - 1
thorlcr/activities/fetch/thfetchslave.ipp

@@ -34,7 +34,7 @@ interface IFetchStream : extends IInterface
 {
 {
     virtual void start(IRowStream *input) = 0;
     virtual void start(IRowStream *input) = 0;
     virtual IRowStream *queryOutput() = 0;
     virtual IRowStream *queryOutput() = 0;
-    virtual IFileIO *queryPartIO(unsigned part) = 0;
+    virtual IFileIO *getPartIO(unsigned part) = 0;
     virtual StringBuffer &getPartName(unsigned part, StringBuffer &out) = 0;
     virtual StringBuffer &getPartName(unsigned part, StringBuffer &out) = 0;
     virtual void abort() = 0;
     virtual void abort() = 0;
 };
 };

+ 4 - 4
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -821,8 +821,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
                                 memcpy(fetchOutPtr, row.get(), FETCHKEY_HEADER_SIZE);
                                 memcpy(fetchOutPtr, row.get(), FETCHKEY_HEADER_SIZE);
                                 fetchOutPtr += FETCHKEY_HEADER_SIZE;
                                 fetchOutPtr += FETCHKEY_HEADER_SIZE;
 
 
-                                IFileIO &iFileIO = owner.queryFilePartIO(filePartIndex);
-                                Owned<ISerialStream> stream = createFileSerialStream(&iFileIO, localFpos);
+                                Owned<IFileIO> iFileIO = owner.getFilePartIO(filePartIndex);
+                                Owned<ISerialStream> stream = createFileSerialStream(iFileIO, localFpos);
                                 CThorStreamDeserializerSource ds(stream);
                                 CThorStreamDeserializerSource ds(stream);
 
 
                                 RtlDynamicRowBuilder fetchedRowBuilder(fetchDiskRowIf->queryRowAllocator());
                                 RtlDynamicRowBuilder fetchedRowBuilder(fetchDiskRowIf->queryRowAllocator());
@@ -1633,10 +1633,10 @@ public:
     }
     }
 #endif
 #endif
 
 
-    IFileIO &queryFilePartIO(unsigned partNum)
+    IFileIO *getFilePartIO(unsigned partNum)
     {
     {
         assertex(partNum<dataParts.ordinality());
         assertex(partNum<dataParts.ordinality());
-        return *fetchFiles.item(partNum).queryFileIO();
+        return fetchFiles.item(partNum).getFileIO();
     }
     }
     inline void noteStats(unsigned seeks, unsigned scans)
     inline void noteStats(unsigned seeks, unsigned scans)
     {
     {

+ 1 - 1
thorlcr/graph/thgraph.hpp

@@ -1133,7 +1133,7 @@ interface IDelayedFile;
 interface IExpander;
 interface IExpander;
 interface IThorFileCache : extends IInterface
 interface IThorFileCache : extends IInterface
 {
 {
-    virtual bool remove(IDelayedFile &dFile) = 0;
+    virtual bool remove(const char *filename) = 0;
     virtual IDelayedFile *lookup(CActivityBase &activity, const char *logicalFilenae, IPartDescriptor &partDesc, IExpander *expander=NULL) = 0;
     virtual IDelayedFile *lookup(CActivityBase &activity, const char *logicalFilenae, IPartDescriptor &partDesc, IExpander *expander=NULL) = 0;
 };
 };
 
 

+ 105 - 94
thorlcr/graph/thgraphslave.cpp

@@ -1728,20 +1728,19 @@ bool ensurePrimary(CActivityBase *activity, IPartDescriptor &partDesc, OwnedIFil
     return false;
     return false;
 }
 }
 
 
-class CEnsurePrimaryPartFile : public CInterface, implements IReplicatedFile
+class CEnsurePrimaryPartFile : public CInterface, implements IActivityReplicatedFile
 {
 {
-    CActivityBase &activity;
     Linked<IPartDescriptor> partDesc;
     Linked<IPartDescriptor> partDesc;
     StringAttr logicalFilename;
     StringAttr logicalFilename;
     Owned<IReplicatedFile> part;
     Owned<IReplicatedFile> part;
 public:
 public:
     IMPLEMENT_IINTERFACE;
     IMPLEMENT_IINTERFACE;
 
 
-    CEnsurePrimaryPartFile(CActivityBase &_activity, const char *_logicalFilename, IPartDescriptor *_partDesc) 
-        : activity(_activity), logicalFilename(_logicalFilename), partDesc(_partDesc)
+    CEnsurePrimaryPartFile(const char *_logicalFilename, IPartDescriptor *_partDesc)
+        : logicalFilename(_logicalFilename), partDesc(_partDesc)
     {
     {
     }
     }
-    virtual IFile *open()
+    virtual IFile *open(CActivityBase &activity) override
     {
     {
         unsigned location;
         unsigned location;
         OwnedIFile iFile;
         OwnedIFile iFile;
@@ -1756,7 +1755,7 @@ public:
             throw e;
             throw e;
         }
         }
     }
     }
-
+    virtual IFile *open() override { throwUnexpected(); }
     RemoteFilenameArray &queryCopies() 
     RemoteFilenameArray &queryCopies() 
     { 
     { 
         if(!part.get()) 
         if(!part.get()) 
@@ -1765,18 +1764,18 @@ public:
     }
     }
 };
 };
 
 
-IReplicatedFile *createEnsurePrimaryPartFile(CActivityBase &activity, const char *logicalFilename, IPartDescriptor *partDesc)
+IActivityReplicatedFile *createEnsurePrimaryPartFile(const char *logicalFilename, IPartDescriptor *partDesc)
 {
 {
-    return new CEnsurePrimaryPartFile(activity, logicalFilename, partDesc);
+    return new CEnsurePrimaryPartFile(logicalFilename, partDesc);
 }
 }
 
 
 ///////////////
 ///////////////
 
 
 class CFileCache;
 class CFileCache;
-class CLazyFileIO : public CInterface, implements IFileIO, implements IDelayedFile
+class CLazyFileIO : public CInterface
 {
 {
     CFileCache &cache;
     CFileCache &cache;
-    Owned<IReplicatedFile> repFile;
+    Owned<IActivityReplicatedFile> repFile;
     Linked<IExpander> expander;
     Linked<IExpander> expander;
     bool compressed;
     bool compressed;
     StringAttr filename;
     StringAttr filename;
@@ -1784,69 +1783,35 @@ class CLazyFileIO : public CInterface, implements IFileIO, implements IDelayedFi
     CriticalSection crit;
     CriticalSection crit;
     Owned<IFileIO> iFileIO; // real IFileIO
     Owned<IFileIO> iFileIO; // real IFileIO
 
 
-    void checkOpen(); // references CFileCache method
-
-public:
-    IMPLEMENT_IINTERFACE;
-    CLazyFileIO(CFileCache &_cache, const char *_filename, IReplicatedFile *_repFile, bool _compressed, IExpander *_expander)
-    : cache(_cache), filename(_filename), repFile(_repFile), compressed(_compressed), expander(_expander), fileStats(diskLocalStatistics)
-    {
-    }
-    ~CLazyFileIO()
-    {
-        iFileIO.clear();
-    }
-
-    const char *queryFindString() const { return filename.get(); } // for string HT
-
-// IFileIO impl.
-    virtual size32_t read(offset_t pos, size32_t len, void * data)
-    {
-        CriticalBlock b(crit);
-        checkOpen();
-        return iFileIO->read(pos, len, data);
-    }
-    virtual offset_t size()
+    IFileIO *getFileIO()
     {
     {
         CriticalBlock b(crit);
         CriticalBlock b(crit);
-        checkOpen();
-        return iFileIO->size();
+        return iFileIO.getLink();
     }
     }
-    virtual size32_t write(offset_t pos, size32_t len, const void * data)
+    IFileIO *getClearFileIO()
     {
     {
         CriticalBlock b(crit);
         CriticalBlock b(crit);
-        checkOpen();
-        return iFileIO->write(pos, len, data);
+        return iFileIO.getClear();
     }
     }
-    virtual void flush()
+public:
+    IMPLEMENT_IINTERFACE;
+
+    CLazyFileIO(CFileCache &_cache, const char *_filename, IActivityReplicatedFile *_repFile, bool _compressed, IExpander *_expander)
+        : cache(_cache), filename(_filename), repFile(_repFile), compressed(_compressed), expander(_expander), fileStats(diskLocalStatistics)
     {
     {
-        CriticalBlock b(crit);
-        if (iFileIO)
-            iFileIO->flush();
     }
     }
-    virtual void close()
+    IFileIO *getOpenFileIO(CActivityBase &activity);
+    const char *queryFindString() const { return filename.get(); } // for string HT
+    void close()
     {
     {
-        CriticalBlock b(crit);
-        if (iFileIO)
+        Owned<IFileIO> openiFileIO = getClearFileIO();
+        if (openiFileIO)
         {
         {
-            mergeStats(fileStats, iFileIO);
-            iFileIO->close();
+            openiFileIO->close();
+            mergeStats(fileStats, openiFileIO);
         }
         }
-        iFileIO.clear();
-    }
-    virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1)
-    {
-        CriticalBlock b(crit);
-        checkOpen();
-        return iFileIO->appendFile(file, pos, len);
-    }
-    virtual void setSize(offset_t size)
-    {
-        CriticalBlock b(crit);
-        checkOpen();
-        iFileIO->setSize(size);
     }
     }
-    virtual unsigned __int64 getStatistic(StatisticKind kind)
+    unsigned __int64 getStatistic(StatisticKind kind)
     {
     {
         switch (kind)
         switch (kind)
         {
         {
@@ -1856,13 +1821,10 @@ public:
             return cycle_to_nanosec(getStatistic(StCycleDiskWriteIOCycles));
             return cycle_to_nanosec(getStatistic(StCycleDiskWriteIOCycles));
         }
         }
 
 
-        CriticalBlock b(crit);
-        unsigned __int64 openValue = iFileIO ? iFileIO->getStatistic(kind) : 0;
+        Owned<IFileIO> openiFileIO = getFileIO();
+        unsigned __int64 openValue = openiFileIO ? openiFileIO->getStatistic(kind) : 0;
         return openValue + fileStats.getStatisticValue(kind);
         return openValue + fileStats.getStatisticValue(kind);
     }
     }
-// IDelayedFile impl.
-    virtual IMemoryMappedFile *queryMappedFile() { return NULL; }
-    virtual IFileIO *queryFileIO() { return this; }
 };
 };
 
 
 class CFileCache : public CInterface, implements IThorFileCache
 class CFileCache : public CInterface, implements IThorFileCache
@@ -1872,22 +1834,70 @@ class CFileCache : public CInterface, implements IThorFileCache
     unsigned limit, purgeN;
     unsigned limit, purgeN;
     CriticalSection crit;
     CriticalSection crit;
 
 
-    class CDelayedFileWapper : public CInterface, implements IDelayedFile
+    class CDelayedFileWapper : public CSimpleInterfaceOf<IDelayedFile>, implements IFileIO
     {
     {
+        typedef CSimpleInterfaceOf<IDelayedFile> PARENT;
+
         CFileCache &cache;
         CFileCache &cache;
+        CActivityBase &activity;
         Linked<CLazyFileIO> lFile;
         Linked<CLazyFileIO> lFile;
+        CriticalSection crit;
+
     public:
     public:
-        IMPLEMENT_IINTERFACE;
+        IMPLEMENT_IINTERFACE_USING(PARENT);
 
 
-        CDelayedFileWapper(CFileCache &_cache, CLazyFileIO &_lFile) : cache(_cache), lFile(&_lFile) { }
+        CDelayedFileWapper(CFileCache &_cache, CActivityBase &_activity, CLazyFileIO &_lFile) : cache(_cache), activity(_activity), lFile(&_lFile) { }
 
 
         ~CDelayedFileWapper()
         ~CDelayedFileWapper()
         {
         {
-            cache.remove(*lFile);
+            cache.remove(lFile->queryFindString());
         }
         }
         // IDelayedFile impl.
         // IDelayedFile impl.
-        virtual IMemoryMappedFile *queryMappedFile() { return lFile->queryMappedFile(); }
-        virtual IFileIO *queryFileIO() { return lFile->queryFileIO(); }
+        virtual IMemoryMappedFile *getMappedFile() override { return nullptr; }
+        virtual IFileIO *getFileIO() override
+        {
+            // NB: lFile needs an activity to open fileIO
+            return lFile->getOpenFileIO(activity);
+        }
+        // IFileIO impl.
+        virtual size32_t read(offset_t pos, size32_t len, void * data) override
+        {
+            Owned<IFileIO> iFileIO = lFile->getOpenFileIO(activity);
+            return iFileIO->read(pos, len, data);
+        }
+        virtual offset_t size() override
+        {
+            Owned<IFileIO> iFileIO = lFile->getOpenFileIO(activity);
+            return iFileIO->size();
+        }
+        virtual size32_t write(offset_t pos, size32_t len, const void * data) override
+        {
+            Owned<IFileIO> iFileIO = lFile->getOpenFileIO(activity);
+            return iFileIO->write(pos, len, data);
+        }
+        virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1) override
+        {
+            Owned<IFileIO> iFileIO = lFile->getOpenFileIO(activity);
+            return iFileIO->appendFile(file, pos, len);
+        }
+        virtual void setSize(offset_t size) override
+        {
+            Owned<IFileIO> iFileIO = lFile->getOpenFileIO(activity);
+            iFileIO->setSize(size);
+        }
+        virtual void flush() override
+        {
+            Owned<IFileIO> iFileIO = lFile->getOpenFileIO(activity);
+            iFileIO->flush();
+        }
+        virtual void close() override
+        {
+            lFile->close();
+        }
+        virtual unsigned __int64 getStatistic(StatisticKind kind) override
+        {
+            return lFile->getStatistic(kind);
+        }
     };
     };
 
 
     void purgeOldest()
     void purgeOldest()
@@ -1909,11 +1919,12 @@ class CFileCache : public CInterface, implements IThorFileCache
             openFiles.zap(lFile);
             openFiles.zap(lFile);
         }
         }
     }
     }
-    bool _remove(CLazyFileIO &lFile)
+    bool _remove(const char *filename)
     {
     {
-        bool ret = files.removeExact(&lFile);
+        Linked<CLazyFileIO> lFile = files.find(filename);
+        bool ret = files.removeExact(lFile);
         if (!ret) return false;
         if (!ret) return false;
-        openFiles.zap(lFile);
+        openFiles.zap(*lFile.get());
         return true;
         return true;
     }
     }
 public:
 public:
@@ -1940,12 +1951,10 @@ public:
     }
     }
 
 
 // IThorFileCache impl.
 // IThorFileCache impl.
-    virtual bool remove(IDelayedFile &dFile)
+    virtual bool remove(const char *filename)
     {
     {
-        CLazyFileIO *lFile = QUERYINTERFACE(&dFile, CLazyFileIO);
-        assertex(lFile);
         CriticalBlock b(crit);
         CriticalBlock b(crit);
-        return _remove(*lFile);
+        return _remove(filename);
     }
     }
     virtual IDelayedFile *lookup(CActivityBase &activity, const char *logicalFilename, IPartDescriptor &partDesc, IExpander *expander)
     virtual IDelayedFile *lookup(CActivityBase &activity, const char *logicalFilename, IPartDescriptor &partDesc, IExpander *expander)
     {
     {
@@ -1957,30 +1966,32 @@ public:
         Linked<CLazyFileIO> file = files.find(filename.str());
         Linked<CLazyFileIO> file = files.find(filename.str());
         if (!file)
         if (!file)
         {
         {
-            Owned<IReplicatedFile> repFile = createEnsurePrimaryPartFile(activity, logicalFilename, &partDesc);
+            Owned<IActivityReplicatedFile> repFile = createEnsurePrimaryPartFile(logicalFilename, &partDesc);
             bool compressed = partDesc.queryOwner().isCompressed();
             bool compressed = partDesc.queryOwner().isCompressed();
             file.setown(new CLazyFileIO(*this, filename.str(), repFile.getClear(), compressed, expander));
             file.setown(new CLazyFileIO(*this, filename.str(), repFile.getClear(), compressed, expander));
         }
         }
         files.replace(*LINK(file));
         files.replace(*LINK(file));
-        return new CDelayedFileWapper(*this, *file); // to avoid circular dependency and allow destruction to remove from cache
+        return new CDelayedFileWapper(*this, activity, *file); // to avoid circular dependency and allow destruction to remove from cache
     }
     }
 };
 };
 ////
 ////
-void CLazyFileIO::checkOpen()
+IFileIO *CLazyFileIO::getOpenFileIO(CActivityBase &activity)
 {
 {
     CriticalBlock b(crit);
     CriticalBlock b(crit);
-    if (iFileIO)
-        return;
-    cache.opening(*this);
-    Owned<IFile> iFile = repFile->open();
-    if (NULL != expander.get())
-        iFileIO.setown(createCompressedFileReader(iFile, expander));
-    else if (compressed)
-        iFileIO.setown(createCompressedFileReader(iFile));
-    else
-        iFileIO.setown(iFile->open(IFOread));
-    if (!iFileIO.get())
-        throw MakeThorException(0, "CLazyFileIO: failed to open: %s", filename.get());
+    if (!iFileIO)
+    {
+        cache.opening(*this);
+        Owned<IFile> iFile = repFile->open(activity);
+        if (NULL != expander.get())
+            iFileIO.setown(createCompressedFileReader(iFile, expander));
+        else if (compressed)
+            iFileIO.setown(createCompressedFileReader(iFile));
+        else
+            iFileIO.setown(iFile->open(IFOread));
+        if (!iFileIO.get())
+            throw MakeThorException(0, "CLazyFileIO: failed to open: %s", filename.get());
+    }
+    return iFileIO.getLink();
 }
 }
 
 
 IThorFileCache *createFileCache(unsigned limit)
 IThorFileCache *createFileCache(unsigned limit)

+ 6 - 1
thorlcr/graph/thgraphslave.hpp

@@ -483,9 +483,14 @@ public:
     }
     }
 };
 };
 
 
+interface IActivityReplicatedFile : extends IReplicatedFile
+{
+    virtual IFile *open(CActivityBase &activity) = 0;
+};
+
 interface IPartDescriptor;
 interface IPartDescriptor;
 extern graphslave_decl bool ensurePrimary(CActivityBase *activity, IPartDescriptor &partDesc, OwnedIFile & ifile, unsigned &location, StringBuffer &path);
 extern graphslave_decl bool ensurePrimary(CActivityBase *activity, IPartDescriptor &partDesc, OwnedIFile & ifile, unsigned &location, StringBuffer &path);
-extern graphslave_decl IReplicatedFile *createEnsurePrimaryPartFile(CActivityBase &activity, const char *logicalFilename, IPartDescriptor *partDesc);
+extern graphslave_decl IActivityReplicatedFile *createEnsurePrimaryPartFile(const char *logicalFilename, IPartDescriptor *partDesc);
 extern graphslave_decl IThorFileCache *createFileCache(unsigned limit);
 extern graphslave_decl IThorFileCache *createFileCache(unsigned limit);
 
 
 #endif
 #endif