Quellcode durchsuchen

HPCC-15675 Refactor lazy fileio

1) To avoid critical section being held too long - it was being held for
duration of i/o operation, now it will be held whilst the IFileIO
interface is fetched and linked only.
2) Avoid a bug whereby a file had been added to the cache by an old
activity then opened by a new and hit a failure, in doing so it would
reference the old (released) activity and crash. Refactor so the current
activity requesting the cached item passes in the activity.
3) Change IDelayedFile methods to getXXX() methods to better support
the delayed semantics.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith vor 8 Jahren
Ursprung
Commit
a471e7f434

+ 6 - 6
roxie/ccd/ccdfile.cpp

@@ -389,22 +389,22 @@ public:
     virtual const char *queryFilename() { return logical->queryFilename(); }
     virtual const char *queryFilename() { return logical->queryFilename(); }
     virtual bool isAlive() const { return CInterface::isAlive(); }
     virtual bool isAlive() const { return CInterface::isAlive(); }
 
 
-    virtual IMemoryMappedFile *queryMappedFile()
+    virtual IMemoryMappedFile *getMappedFile() override
     {
     {
         CriticalBlock b(crit);
         CriticalBlock b(crit);
         if (mmapped)
         if (mmapped)
-            return mmapped;
+            return mmapped.getLink();
         if (!remote)
         if (!remote)
         {
         {
             mmapped.setown(logical->openMemoryMapped());
             mmapped.setown(logical->openMemoryMapped());
-            return mmapped;
+            return mmapped.getLink();
         }
         }
-        return NULL;
+        return nullptr;
     }
     }
 
 
-    virtual IFileIO *queryFileIO()
+    virtual IFileIO *getFileIO() override
     {
     {
-        return this;
+        return LINK(this);
     }
     }
 
 
 
 

+ 1 - 1
roxie/ccd/ccdfile.hpp

@@ -48,7 +48,7 @@ interface ILazyFileIO : extends IFileIO
     virtual void close() = 0;
     virtual void close() = 0;
     virtual void setCopying(bool copying) = 0;
     virtual void setCopying(bool copying) = 0;
     virtual bool isCopying() const = 0;
     virtual bool isCopying() const = 0;
-    virtual IMemoryMappedFile *queryMappedFile() = 0;
+    virtual IMemoryMappedFile *getMappedFile() = 0;
 
 
     virtual void setCache(const IRoxieFileCache *) = 0;
     virtual void setCache(const IRoxieFileCache *) = 0;
     virtual void removeCache(const IRoxieFileCache *) = 0;
     virtual void removeCache(const IRoxieFileCache *) = 0;

+ 10 - 6
system/jhtree/jhtree.cpp

@@ -1995,7 +1995,8 @@ class CLazyKeyIndex : implements IKeyIndex, public CInterface
 {
 {
     StringAttr keyfile;
     StringAttr keyfile;
     unsigned crc; 
     unsigned crc; 
-    Linked<IDelayedFile> iFileIO;
+    Linked<IDelayedFile> delayedFile;
+    Owned<IFileIO> iFileIO;
     Owned<IKeyIndex> realKey;
     Owned<IKeyIndex> realKey;
     CriticalSection c;
     CriticalSection c;
     bool isTLK;
     bool isTLK;
@@ -2006,11 +2007,14 @@ class CLazyKeyIndex : implements IKeyIndex, public CInterface
         CriticalBlock b(c);
         CriticalBlock b(c);
         if (!realKey)
         if (!realKey)
         {
         {
-            IMemoryMappedFile *mapped = useMemoryMappedIndexes ? iFileIO->queryMappedFile() : 0;
+            Owned<IMemoryMappedFile> mapped = useMemoryMappedIndexes ? delayedFile->getMappedFile() : nullptr;
             if (mapped)
             if (mapped)
                 realKey.setown(queryKeyStore()->load(keyfile, crc, mapped, isTLK, preloadAllowed));
                 realKey.setown(queryKeyStore()->load(keyfile, crc, mapped, isTLK, preloadAllowed));
             else
             else
-                realKey.setown(queryKeyStore()->load(keyfile, crc, iFileIO->queryFileIO(), isTLK, preloadAllowed));
+            {
+                iFileIO.setown(delayedFile->getFileIO());
+                realKey.setown(queryKeyStore()->load(keyfile, crc, iFileIO, isTLK, preloadAllowed));
+            }
             if (!realKey)
             if (!realKey)
             {
             {
                 DBGLOG("Lazy key file %s could not be opened", keyfile.get());
                 DBGLOG("Lazy key file %s could not be opened", keyfile.get());
@@ -2022,8 +2026,8 @@ class CLazyKeyIndex : implements IKeyIndex, public CInterface
 
 
 public:
 public:
     IMPLEMENT_IINTERFACE;
     IMPLEMENT_IINTERFACE;
-    CLazyKeyIndex(const char *_keyfile, unsigned _crc, IDelayedFile *_iFileIO, bool _isTLK, bool _preloadAllowed)
-        : keyfile(_keyfile), crc(_crc), iFileIO(_iFileIO), isTLK(_isTLK), preloadAllowed(_preloadAllowed)
+    CLazyKeyIndex(const char *_keyfile, unsigned _crc, IDelayedFile *_delayedFile, bool _isTLK, bool _preloadAllowed)
+        : keyfile(_keyfile), crc(_crc), delayedFile(_delayedFile), isTLK(_isTLK), preloadAllowed(_preloadAllowed)
     {}
     {}
 
 
     virtual bool IsShared() const { return CInterface::IsShared(); }
     virtual bool IsShared() const { return CInterface::IsShared(); }
@@ -2047,7 +2051,7 @@ public:
     virtual offset_t queryMetadataHead() { return checkOpen().queryMetadataHead(); }
     virtual offset_t queryMetadataHead() { return checkOpen().queryMetadataHead(); }
     virtual IPropertyTree * getMetadata() { return checkOpen().getMetadata(); }
     virtual IPropertyTree * getMetadata() { return checkOpen().getMetadata(); }
     virtual unsigned getNodeSize() { return checkOpen().getNodeSize(); }
     virtual unsigned getNodeSize() { return checkOpen().getNodeSize(); }
-    virtual const IFileIO *queryFileIO() const override { return iFileIO->queryFileIO(); }
+    virtual const IFileIO *queryFileIO() const override { return iFileIO; } // NB: if not yet opened, will be null
 };
 };
 
 
 extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, IFileIO &iFileIO, bool isTLK, bool preloadAllowed)
 extern jhtree_decl IKeyIndex *createKeyIndex(const char *keyfile, unsigned crc, IFileIO &iFileIO, bool isTLK, bool preloadAllowed)

+ 2 - 2
system/jhtree/jhtree.hpp

@@ -30,8 +30,8 @@
 
 
 interface jhtree_decl IDelayedFile : public IInterface
 interface jhtree_decl IDelayedFile : public IInterface
 {
 {
-    virtual IMemoryMappedFile *queryMappedFile() = 0;
-    virtual IFileIO *queryFileIO() = 0;
+    virtual IMemoryMappedFile *getMappedFile() = 0;
+    virtual IFileIO *getFileIO() = 0;
 };
 };
 
 
 interface jhtree_decl IKeyCursor : public IInterface
 interface jhtree_decl IKeyCursor : public IInterface

+ 11 - 8
thorlcr/activities/fetch/thfetchslave.cpp

@@ -172,17 +172,17 @@ public:
     }
     }
 
 
     // IFetchStream
     // IFetchStream
-    virtual void start(IRowStream *_keyIn)
+    virtual void start(IRowStream *_keyIn) override
     {
     {
         fposHash = new CFPosHandler(*iFetchHandler, offsetCount, offsetTable);
         fposHash = new CFPosHandler(*iFetchHandler, offsetCount, offsetTable);
         keyIn.set(_keyIn);
         keyIn.set(_keyIn);
         distributor = createHashDistributor(&owner, owner.queryContainer().queryJobChannel().queryJobComm(), tag, false, this, "FetchStream");
         distributor = createHashDistributor(&owner, owner.queryContainer().queryJobChannel().queryJobComm(), tag, false, this, "FetchStream");
         keyOutStream.setown(distributor->connect(keyRowIf, keyIn, fposHash, NULL));
         keyOutStream.setown(distributor->connect(keyRowIf, keyIn, fposHash, NULL));
     }
     }
-    virtual IRowStream *queryOutput() { return this; }
-    virtual IFileIO *queryPartIO(unsigned part) { assertex(part<files); return fPosMultiPartTable[part].file->queryFileIO(); }
-    virtual StringBuffer &getPartName(unsigned part, StringBuffer &out) { return getPartFilename(parts.item(part), fPosMultiPartTable[part].location, out, true); }
-    virtual void abort()
+    virtual IRowStream *queryOutput() override { return this; }
+    virtual IFileIO *getPartIO(unsigned part) override { assertex(part<files); return fPosMultiPartTable[part].file->getFileIO(); }
+    virtual StringBuffer &getPartName(unsigned part, StringBuffer &out) override { return getPartFilename(parts.item(part), fPosMultiPartTable[part].location, out, true); }
+    virtual void abort() override
     {
     {
         if (distributor)
         if (distributor)
             distributor->abort();
             distributor->abort();
@@ -520,7 +520,8 @@ public:
     CFetchSlaveActivity(CGraphElementBase *container) : CFetchSlaveBase(container) { }
     CFetchSlaveActivity(CGraphElementBase *container) : CFetchSlaveBase(container) { }
     virtual size32_t fetch(ARowBuilder & rowBuilder, const void *keyRow, unsigned filePartIndex, unsigned __int64 localFpos, unsigned __int64 fpos)
     virtual size32_t fetch(ARowBuilder & rowBuilder, const void *keyRow, unsigned filePartIndex, unsigned __int64 localFpos, unsigned __int64 fpos)
     {
     {
-        Owned<ISerialStream> stream = createFileSerialStream(fetchStream->queryPartIO(filePartIndex), localFpos);
+        Owned<IFileIO> partIO = fetchStream->getPartIO(filePartIndex);
+        Owned<ISerialStream> stream = createFileSerialStream(partIO, localFpos);
         CThorStreamDeserializerSource ds(stream);
         CThorStreamDeserializerSource ds(stream);
         RtlDynamicRowBuilder fetchedRowBuilder(fetchDiskRowIf->queryRowAllocator());
         RtlDynamicRowBuilder fetchedRowBuilder(fetchDiskRowIf->queryRowAllocator());
         size32_t fetchedLen = fetchDiskRowIf->queryRowDeserializer()->deserialize(fetchedRowBuilder, ds);
         size32_t fetchedLen = fetchDiskRowIf->queryRowDeserializer()->deserialize(fetchedRowBuilder, ds);
@@ -557,7 +558,8 @@ public:
     }
     }
     virtual size32_t fetch(ARowBuilder & rowBuilder, const void *keyRow, unsigned filePartIndex, unsigned __int64 localFpos, unsigned __int64 fpos)
     virtual size32_t fetch(ARowBuilder & rowBuilder, const void *keyRow, unsigned filePartIndex, unsigned __int64 localFpos, unsigned __int64 fpos)
     {
     {
-        Owned<ISerialStream> inputStream = createFileSerialStream(fetchStream->queryPartIO(filePartIndex), localFpos);
+        Owned<IFileIO> partIO = fetchStream->getPartIO(filePartIndex);
+        Owned<ISerialStream> inputStream = createFileSerialStream(partIO, localFpos);
         if (inputStream->eos())
         if (inputStream->eos())
             return 0;
             return 0;
         size32_t minRequired = 4096; // MORE - make configurable
         size32_t minRequired = 4096; // MORE - make configurable
@@ -630,7 +632,8 @@ public:
         lastMatches = new Owned<IColumnProvider>[files];
         lastMatches = new Owned<IColumnProvider>[files];
         for (f=0; f<files; f++)
         for (f=0; f<files; f++)
         {
         {
-            streams[f].setown(createBufferedIOStream(fetchStream->queryPartIO(f)));
+            Owned<IFileIO> partIO = fetchStream->getPartIO(f);
+            streams[f].setown(createBufferedIOStream(partIO));
             // NB: the index is based on path iteration matches, so on lookup the elements start at positioned stream
             // NB: the index is based on path iteration matches, so on lookup the elements start at positioned stream
             // i.e. getXmlIteratorPath not used (or supplied) here.
             // i.e. getXmlIteratorPath not used (or supplied) here.
             if (container.getKind()==TAKjsonfetch)
             if (container.getKind()==TAKjsonfetch)

+ 1 - 1
thorlcr/activities/fetch/thfetchslave.ipp

@@ -34,7 +34,7 @@ interface IFetchStream : extends IInterface
 {
 {
     virtual void start(IRowStream *input) = 0;
     virtual void start(IRowStream *input) = 0;
     virtual IRowStream *queryOutput() = 0;
     virtual IRowStream *queryOutput() = 0;
-    virtual IFileIO *queryPartIO(unsigned part) = 0;
+    virtual IFileIO *getPartIO(unsigned part) = 0;
     virtual StringBuffer &getPartName(unsigned part, StringBuffer &out) = 0;
     virtual StringBuffer &getPartName(unsigned part, StringBuffer &out) = 0;
     virtual void abort() = 0;
     virtual void abort() = 0;
 };
 };

+ 4 - 4
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -821,8 +821,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
                                 memcpy(fetchOutPtr, row.get(), FETCHKEY_HEADER_SIZE);
                                 memcpy(fetchOutPtr, row.get(), FETCHKEY_HEADER_SIZE);
                                 fetchOutPtr += FETCHKEY_HEADER_SIZE;
                                 fetchOutPtr += FETCHKEY_HEADER_SIZE;
 
 
-                                IFileIO &iFileIO = owner.queryFilePartIO(filePartIndex);
-                                Owned<ISerialStream> stream = createFileSerialStream(&iFileIO, localFpos);
+                                Owned<IFileIO> iFileIO = owner.getFilePartIO(filePartIndex);
+                                Owned<ISerialStream> stream = createFileSerialStream(iFileIO, localFpos);
                                 CThorStreamDeserializerSource ds(stream);
                                 CThorStreamDeserializerSource ds(stream);
 
 
                                 RtlDynamicRowBuilder fetchedRowBuilder(fetchDiskRowIf->queryRowAllocator());
                                 RtlDynamicRowBuilder fetchedRowBuilder(fetchDiskRowIf->queryRowAllocator());
@@ -1633,10 +1633,10 @@ public:
     }
     }
 #endif
 #endif
 
 
-    IFileIO &queryFilePartIO(unsigned partNum)
+    IFileIO *getFilePartIO(unsigned partNum)
     {
     {
         assertex(partNum<dataParts.ordinality());
         assertex(partNum<dataParts.ordinality());
-        return *fetchFiles.item(partNum).queryFileIO();
+        return fetchFiles.item(partNum).getFileIO();
     }
     }
     inline void noteStats(unsigned seeks, unsigned scans)
     inline void noteStats(unsigned seeks, unsigned scans)
     {
     {

+ 1 - 1
thorlcr/graph/thgraph.hpp

@@ -1133,7 +1133,7 @@ interface IDelayedFile;
 interface IExpander;
 interface IExpander;
 interface IThorFileCache : extends IInterface
 interface IThorFileCache : extends IInterface
 {
 {
-    virtual bool remove(IDelayedFile &dFile) = 0;
+    virtual bool remove(const char *filename) = 0;
     virtual IDelayedFile *lookup(CActivityBase &activity, const char *logicalFilenae, IPartDescriptor &partDesc, IExpander *expander=NULL) = 0;
     virtual IDelayedFile *lookup(CActivityBase &activity, const char *logicalFilenae, IPartDescriptor &partDesc, IExpander *expander=NULL) = 0;
 };
 };
 
 

+ 105 - 94
thorlcr/graph/thgraphslave.cpp

@@ -1728,20 +1728,19 @@ bool ensurePrimary(CActivityBase *activity, IPartDescriptor &partDesc, OwnedIFil
     return false;
     return false;
 }
 }
 
 
-class CEnsurePrimaryPartFile : public CInterface, implements IReplicatedFile
+class CEnsurePrimaryPartFile : public CInterface, implements IActivityReplicatedFile
 {
 {
-    CActivityBase &activity;
     Linked<IPartDescriptor> partDesc;
     Linked<IPartDescriptor> partDesc;
     StringAttr logicalFilename;
     StringAttr logicalFilename;
     Owned<IReplicatedFile> part;
     Owned<IReplicatedFile> part;
 public:
 public:
     IMPLEMENT_IINTERFACE;
     IMPLEMENT_IINTERFACE;
 
 
-    CEnsurePrimaryPartFile(CActivityBase &_activity, const char *_logicalFilename, IPartDescriptor *_partDesc) 
-        : activity(_activity), logicalFilename(_logicalFilename), partDesc(_partDesc)
+    CEnsurePrimaryPartFile(const char *_logicalFilename, IPartDescriptor *_partDesc)
+        : logicalFilename(_logicalFilename), partDesc(_partDesc)
     {
     {
     }
     }
-    virtual IFile *open()
+    virtual IFile *open(CActivityBase &activity) override
     {
     {
         unsigned location;
         unsigned location;
         OwnedIFile iFile;
         OwnedIFile iFile;
@@ -1756,7 +1755,7 @@ public:
             throw e;
             throw e;
         }
         }
     }
     }
-
+    virtual IFile *open() override { throwUnexpected(); }
     RemoteFilenameArray &queryCopies() 
     RemoteFilenameArray &queryCopies() 
     { 
     { 
         if(!part.get()) 
         if(!part.get()) 
@@ -1765,18 +1764,18 @@ public:
     }
     }
 };
 };
 
 
-IReplicatedFile *createEnsurePrimaryPartFile(CActivityBase &activity, const char *logicalFilename, IPartDescriptor *partDesc)
+IActivityReplicatedFile *createEnsurePrimaryPartFile(const char *logicalFilename, IPartDescriptor *partDesc)
 {
 {
-    return new CEnsurePrimaryPartFile(activity, logicalFilename, partDesc);
+    return new CEnsurePrimaryPartFile(logicalFilename, partDesc);
 }
 }
 
 
 ///////////////
 ///////////////
 
 
 class CFileCache;
 class CFileCache;
-class CLazyFileIO : public CInterface, implements IFileIO, implements IDelayedFile
+class CLazyFileIO : public CInterface
 {
 {
     CFileCache &cache;
     CFileCache &cache;
-    Owned<IReplicatedFile> repFile;
+    Owned<IActivityReplicatedFile> repFile;
     Linked<IExpander> expander;
     Linked<IExpander> expander;
     bool compressed;
     bool compressed;
     StringAttr filename;
     StringAttr filename;
@@ -1784,69 +1783,35 @@ class CLazyFileIO : public CInterface, implements IFileIO, implements IDelayedFi
     CriticalSection crit;
     CriticalSection crit;
     Owned<IFileIO> iFileIO; // real IFileIO
     Owned<IFileIO> iFileIO; // real IFileIO
 
 
-    void checkOpen(); // references CFileCache method
-
-public:
-    IMPLEMENT_IINTERFACE;
-    CLazyFileIO(CFileCache &_cache, const char *_filename, IReplicatedFile *_repFile, bool _compressed, IExpander *_expander)
-    : cache(_cache), filename(_filename), repFile(_repFile), compressed(_compressed), expander(_expander), fileStats(diskLocalStatistics)
-    {
-    }
-    ~CLazyFileIO()
-    {
-        iFileIO.clear();
-    }
-
-    const char *queryFindString() const { return filename.get(); } // for string HT
-
-// IFileIO impl.
-    virtual size32_t read(offset_t pos, size32_t len, void * data)
-    {
-        CriticalBlock b(crit);
-        checkOpen();
-        return iFileIO->read(pos, len, data);
-    }
-    virtual offset_t size()
+    IFileIO *getFileIO()
     {
     {
         CriticalBlock b(crit);
         CriticalBlock b(crit);
-        checkOpen();
-        return iFileIO->size();
+        return iFileIO.getLink();
     }
     }
-    virtual size32_t write(offset_t pos, size32_t len, const void * data)
+    IFileIO *getClearFileIO()
     {
     {
         CriticalBlock b(crit);
         CriticalBlock b(crit);
-        checkOpen();
-        return iFileIO->write(pos, len, data);
+        return iFileIO.getClear();
     }
     }
-    virtual void flush()
+public:
+    IMPLEMENT_IINTERFACE;
+
+    CLazyFileIO(CFileCache &_cache, const char *_filename, IActivityReplicatedFile *_repFile, bool _compressed, IExpander *_expander)
+        : cache(_cache), filename(_filename), repFile(_repFile), compressed(_compressed), expander(_expander), fileStats(diskLocalStatistics)
     {
     {
-        CriticalBlock b(crit);
-        if (iFileIO)
-            iFileIO->flush();
     }
     }
-    virtual void close()
+    IFileIO *getOpenFileIO(CActivityBase &activity);
+    const char *queryFindString() const { return filename.get(); } // for string HT
+    void close()
     {
     {
-        CriticalBlock b(crit);
-        if (iFileIO)
+        Owned<IFileIO> openiFileIO = getClearFileIO();
+        if (openiFileIO)
         {
         {
-            mergeStats(fileStats, iFileIO);
-            iFileIO->close();
+            openiFileIO->close();
+            mergeStats(fileStats, openiFileIO);
         }
         }
-        iFileIO.clear();
-    }
-    virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1)
-    {
-        CriticalBlock b(crit);
-        checkOpen();
-        return iFileIO->appendFile(file, pos, len);
-    }
-    virtual void setSize(offset_t size)
-    {
-        CriticalBlock b(crit);
-        checkOpen();
-        iFileIO->setSize(size);
     }
     }
-    virtual unsigned __int64 getStatistic(StatisticKind kind)
+    unsigned __int64 getStatistic(StatisticKind kind)
     {
     {
         switch (kind)
         switch (kind)
         {
         {
@@ -1856,13 +1821,10 @@ public:
             return cycle_to_nanosec(getStatistic(StCycleDiskWriteIOCycles));
             return cycle_to_nanosec(getStatistic(StCycleDiskWriteIOCycles));
         }
         }
 
 
-        CriticalBlock b(crit);
-        unsigned __int64 openValue = iFileIO ? iFileIO->getStatistic(kind) : 0;
+        Owned<IFileIO> openiFileIO = getFileIO();
+        unsigned __int64 openValue = openiFileIO ? openiFileIO->getStatistic(kind) : 0;
         return openValue + fileStats.getStatisticValue(kind);
         return openValue + fileStats.getStatisticValue(kind);
     }
     }
-// IDelayedFile impl.
-    virtual IMemoryMappedFile *queryMappedFile() { return NULL; }
-    virtual IFileIO *queryFileIO() { return this; }
 };
 };
 
 
 class CFileCache : public CInterface, implements IThorFileCache
 class CFileCache : public CInterface, implements IThorFileCache
@@ -1872,22 +1834,70 @@ class CFileCache : public CInterface, implements IThorFileCache
     unsigned limit, purgeN;
     unsigned limit, purgeN;
     CriticalSection crit;
     CriticalSection crit;
 
 
-    class CDelayedFileWapper : public CInterface, implements IDelayedFile
+    class CDelayedFileWapper : public CSimpleInterfaceOf<IDelayedFile>, implements IFileIO
     {
     {
+        typedef CSimpleInterfaceOf<IDelayedFile> PARENT;
+
         CFileCache &cache;
         CFileCache &cache;
+        CActivityBase &activity;
         Linked<CLazyFileIO> lFile;
         Linked<CLazyFileIO> lFile;
+        CriticalSection crit;
+
     public:
     public:
-        IMPLEMENT_IINTERFACE;
+        IMPLEMENT_IINTERFACE_USING(PARENT);
 
 
-        CDelayedFileWapper(CFileCache &_cache, CLazyFileIO &_lFile) : cache(_cache), lFile(&_lFile) { }
+        CDelayedFileWapper(CFileCache &_cache, CActivityBase &_activity, CLazyFileIO &_lFile) : cache(_cache), activity(_activity), lFile(&_lFile) { }
 
 
         ~CDelayedFileWapper()
         ~CDelayedFileWapper()
         {
         {
-            cache.remove(*lFile);
+            cache.remove(lFile->queryFindString());
         }
         }
         // IDelayedFile impl.
         // IDelayedFile impl.
-        virtual IMemoryMappedFile *queryMappedFile() { return lFile->queryMappedFile(); }
-        virtual IFileIO *queryFileIO() { return lFile->queryFileIO(); }
+        virtual IMemoryMappedFile *getMappedFile() override { return nullptr; }
+        virtual IFileIO *getFileIO() override
+        {
+            // NB: lFile needs an activity to open fileIO
+            return lFile->getOpenFileIO(activity);
+        }
+        // IFileIO impl.
+        virtual size32_t read(offset_t pos, size32_t len, void * data) override
+        {
+            Owned<IFileIO> iFileIO = lFile->getOpenFileIO(activity);
+            return iFileIO->read(pos, len, data);
+        }
+        virtual offset_t size() override
+        {
+            Owned<IFileIO> iFileIO = lFile->getOpenFileIO(activity);
+            return iFileIO->size();
+        }
+        virtual size32_t write(offset_t pos, size32_t len, const void * data) override
+        {
+            Owned<IFileIO> iFileIO = lFile->getOpenFileIO(activity);
+            return iFileIO->write(pos, len, data);
+        }
+        virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1) override
+        {
+            Owned<IFileIO> iFileIO = lFile->getOpenFileIO(activity);
+            return iFileIO->appendFile(file, pos, len);
+        }
+        virtual void setSize(offset_t size) override
+        {
+            Owned<IFileIO> iFileIO = lFile->getOpenFileIO(activity);
+            iFileIO->setSize(size);
+        }
+        virtual void flush() override
+        {
+            Owned<IFileIO> iFileIO = lFile->getOpenFileIO(activity);
+            iFileIO->flush();
+        }
+        virtual void close() override
+        {
+            lFile->close();
+        }
+        virtual unsigned __int64 getStatistic(StatisticKind kind) override
+        {
+            return lFile->getStatistic(kind);
+        }
     };
     };
 
 
     void purgeOldest()
     void purgeOldest()
@@ -1909,11 +1919,12 @@ class CFileCache : public CInterface, implements IThorFileCache
             openFiles.zap(lFile);
             openFiles.zap(lFile);
         }
         }
     }
     }
-    bool _remove(CLazyFileIO &lFile)
+    bool _remove(const char *filename)
     {
     {
-        bool ret = files.removeExact(&lFile);
+        Linked<CLazyFileIO> lFile = files.find(filename);
+        bool ret = files.removeExact(lFile);
         if (!ret) return false;
         if (!ret) return false;
-        openFiles.zap(lFile);
+        openFiles.zap(*lFile.get());
         return true;
         return true;
     }
     }
 public:
 public:
@@ -1940,12 +1951,10 @@ public:
     }
     }
 
 
 // IThorFileCache impl.
 // IThorFileCache impl.
-    virtual bool remove(IDelayedFile &dFile)
+    virtual bool remove(const char *filename)
     {
     {
-        CLazyFileIO *lFile = QUERYINTERFACE(&dFile, CLazyFileIO);
-        assertex(lFile);
         CriticalBlock b(crit);
         CriticalBlock b(crit);
-        return _remove(*lFile);
+        return _remove(filename);
     }
     }
     virtual IDelayedFile *lookup(CActivityBase &activity, const char *logicalFilename, IPartDescriptor &partDesc, IExpander *expander)
     virtual IDelayedFile *lookup(CActivityBase &activity, const char *logicalFilename, IPartDescriptor &partDesc, IExpander *expander)
     {
     {
@@ -1957,30 +1966,32 @@ public:
         Linked<CLazyFileIO> file = files.find(filename.str());
         Linked<CLazyFileIO> file = files.find(filename.str());
         if (!file)
         if (!file)
         {
         {
-            Owned<IReplicatedFile> repFile = createEnsurePrimaryPartFile(activity, logicalFilename, &partDesc);
+            Owned<IActivityReplicatedFile> repFile = createEnsurePrimaryPartFile(logicalFilename, &partDesc);
             bool compressed = partDesc.queryOwner().isCompressed();
             bool compressed = partDesc.queryOwner().isCompressed();
             file.setown(new CLazyFileIO(*this, filename.str(), repFile.getClear(), compressed, expander));
             file.setown(new CLazyFileIO(*this, filename.str(), repFile.getClear(), compressed, expander));
         }
         }
         files.replace(*LINK(file));
         files.replace(*LINK(file));
-        return new CDelayedFileWapper(*this, *file); // to avoid circular dependency and allow destruction to remove from cache
+        return new CDelayedFileWapper(*this, activity, *file); // to avoid circular dependency and allow destruction to remove from cache
     }
     }
 };
 };
 ////
 ////
-void CLazyFileIO::checkOpen()
+IFileIO *CLazyFileIO::getOpenFileIO(CActivityBase &activity)
 {
 {
     CriticalBlock b(crit);
     CriticalBlock b(crit);
-    if (iFileIO)
-        return;
-    cache.opening(*this);
-    Owned<IFile> iFile = repFile->open();
-    if (NULL != expander.get())
-        iFileIO.setown(createCompressedFileReader(iFile, expander));
-    else if (compressed)
-        iFileIO.setown(createCompressedFileReader(iFile));
-    else
-        iFileIO.setown(iFile->open(IFOread));
-    if (!iFileIO.get())
-        throw MakeThorException(0, "CLazyFileIO: failed to open: %s", filename.get());
+    if (!iFileIO)
+    {
+        cache.opening(*this);
+        Owned<IFile> iFile = repFile->open(activity);
+        if (NULL != expander.get())
+            iFileIO.setown(createCompressedFileReader(iFile, expander));
+        else if (compressed)
+            iFileIO.setown(createCompressedFileReader(iFile));
+        else
+            iFileIO.setown(iFile->open(IFOread));
+        if (!iFileIO.get())
+            throw MakeThorException(0, "CLazyFileIO: failed to open: %s", filename.get());
+    }
+    return iFileIO.getLink();
 }
 }
 
 
 IThorFileCache *createFileCache(unsigned limit)
 IThorFileCache *createFileCache(unsigned limit)

+ 6 - 1
thorlcr/graph/thgraphslave.hpp

@@ -483,9 +483,14 @@ public:
     }
     }
 };
 };
 
 
+interface IActivityReplicatedFile : extends IReplicatedFile
+{
+    virtual IFile *open(CActivityBase &activity) = 0;
+};
+
 interface IPartDescriptor;
 interface IPartDescriptor;
 extern graphslave_decl bool ensurePrimary(CActivityBase *activity, IPartDescriptor &partDesc, OwnedIFile & ifile, unsigned &location, StringBuffer &path);
 extern graphslave_decl bool ensurePrimary(CActivityBase *activity, IPartDescriptor &partDesc, OwnedIFile & ifile, unsigned &location, StringBuffer &path);
-extern graphslave_decl IReplicatedFile *createEnsurePrimaryPartFile(CActivityBase &activity, const char *logicalFilename, IPartDescriptor *partDesc);
+extern graphslave_decl IActivityReplicatedFile *createEnsurePrimaryPartFile(const char *logicalFilename, IPartDescriptor *partDesc);
 extern graphslave_decl IThorFileCache *createFileCache(unsigned limit);
 extern graphslave_decl IThorFileCache *createFileCache(unsigned limit);
 
 
 #endif
 #endif