Browse Source

HPCC-26921 Avoid CQ's re-looking up logical files.

If a CQ performed a logical file lookup, e.g. via a diskread/kj/
indexread etc., and the logical filename could vary, e.g. it was
computed each time, then each worker would re-initialized the
manager activity, and cause a new Dali lookup per iteration.
e.g. on a 400 way Thor executing a CQ 1000 times, there would be
400,000 Dali lookups.

Change so that if within a CQ, the DFS lookups are cached
(not cleared) per execution. This results in the LFN being looked
up only once if the computed logical file remains constant.

Also, add temporary workaround for subFileStats tracking issues,
related to activities being able to have >1 readfile, some of
which can be superfiles.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 3 years ago
parent
commit
de68a48d3b

+ 6 - 4
thorlcr/activities/diskread/thdiskread.cpp

@@ -106,9 +106,12 @@ public:
     virtual void done() override
     {
         CDiskReadMasterVF::done();
-        IHThorDiskReadBaseArg *helper = (IHThorDiskReadBaseArg *)queryHelper();
-        if (0 != (helper->getFlags() & TDXtemporary) && !container.queryJob().queryUseCheckpoints())
-            container.queryTempHandler()->deregisterFile(fileName, fileDesc->queryProperties().getPropBool("@pausefile"));
+        if (file)
+        {
+            IHThorDiskReadBaseArg *helper = (IHThorDiskReadBaseArg *)queryHelper();
+            if (0 != (helper->getFlags() & TDXtemporary) && !container.queryJob().queryUseCheckpoints())
+                container.queryTempHandler()->deregisterFile(fileName, fileDesc->queryProperties().getPropBool("@pausefile"));
+        }
     }
     virtual void init() override
     {
@@ -192,7 +195,6 @@ public:
         {
             if (!helper->hasSegmentMonitors() && !helper->hasFilter() && !(helper->getFlags() & TDXtemporary))
             {
-                IDistributedFile *file = queryReadFile(0);
                 if (file && canMatch)
                 {
                     if (0 != (TDRunfilteredcount & helper->getFlags()) && file->queryAttributes().hasProp("@recordCount"))

+ 17 - 7
thorlcr/activities/fetch/thfetch.cpp

@@ -34,6 +34,7 @@ class CFetchActivityMaster : public CMasterActivity
     std::vector<OwnedPtr<CThorStatsCollection>> subFileStats;
 
 protected:
+    Owned<IDistributedFile> fetchFile;
     IHThorFetchArg *helper;
 
 public:
@@ -49,11 +50,11 @@ public:
     {
         if (endpoints) free(endpoints);
     }
-    virtual void init()
+    virtual void init() override
     {
         CMasterActivity::init();
         OwnedRoxieString fname(helper->getFileName());
-        Owned<IDistributedFile> fetchFile = queryThorFileManager().lookup(container.queryJob(), fname, false, 0 != (helper->getFetchFlags() & FFdatafileoptional), false, container.activityIsCodeSigned());
+        fetchFile.setown(lookupReadFile(fname, false, false, 0 != (helper->getFetchFlags() & FFdatafileoptional)));
         if (fetchFile)
         {
             if (isFileKey(fetchFile))
@@ -77,15 +78,25 @@ public:
                 throw MakeActivityException(this, 0, "File '%s' was published as encrypted but no encryption key provided", fetchFile->queryLogicalName());
             IDistributedSuperFile *super = fetchFile->querySuperFile();
             unsigned numsubs = super?super->numSubFiles(true):0;
-            for (unsigned i=0; i<numsubs; i++)
-                subFileStats.push_back(new CThorStatsCollection(diskReadPartStatistics));
+
+            /* JCS->SHAMSER - kludge for now, don't add more than max
+                * But it means updateFileReadCostStats will not be querying the correct files,
+                * if the file varies per CQ execution (see other notes in updateFileReadCostStats)
+                */
+            for (unsigned i=subFileStats.size(); i<numsubs; i++)
+                subFileStats.push_back(new CThorStatsCollection(diskReadRemoteStatistics));
+
 
             mapping.setown(getFileSlaveMaps(fetchFile->queryLogicalName(), *fileDesc, container.queryJob().queryUserDescriptor(), container.queryJob().querySlaveGroup(), container.queryLocalOrGrouped(), false, NULL, super));
             mapping->serializeFileOffsetMap(offsetMapMb);
-            addReadFile(fetchFile);
         }
     }
-    virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
+    virtual void kill() override
+    {
+        CMasterActivity::kill();
+        fetchFile.clear();
+    }
+    virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave) override
     {
         if (mapping)
         {
@@ -120,7 +131,6 @@ public:
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
     {
         CFetchActivityMaster::serializeSlaveData(dst, slave);
-        IDistributedFile *fetchFile = queryReadFile(0);
         if (fetchFile)
             fetchFile->queryAttributes().serialize(dst);
     }

+ 1 - 5
thorlcr/activities/hashdistrib/thhashdistrib.cpp

@@ -100,9 +100,7 @@ public:
         StringBuffer scoped;
         OwnedRoxieString indexFileName(helper->getIndexFileName());
         queryThorFileManager().addScope(container.queryJob(), indexFileName, scoped);
-        Owned<IDistributedFile> file = queryThorFileManager().lookup(container.queryJob(), indexFileName, false, false, false, container.activityIsCodeSigned());
-        if (!file)
-            throw MakeActivityException(this, 0, "KeyedDistribute: Failed to find key: %s", scoped.str());
+        Owned<IDistributedFile> file = lookupReadFile(indexFileName, false, false, false);
         if (0 == file->numParts())
             throw MakeActivityException(this, 0, "KeyedDistribute: Can't distribute based on an empty key: %s", scoped.str());
         if (!isFileKey(file))
@@ -123,8 +121,6 @@ public:
 
         tlkMb.append(iFileIO->size());
         ::read(iFileIO, 0, (size32_t)iFileIO->size(), tlkMb);
-
-        addReadFile(file);
     }
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
     {

+ 35 - 29
thorlcr/activities/indexread/thindexread.cpp

@@ -216,46 +216,52 @@ public:
     virtual void init() override
     {
         CMasterActivity::init();
-        OwnedRoxieString helperFileName = indexBaseHelper->getFileName();
-        StringBuffer expandedFileName;
-        queryThorFileManager().addScope(container.queryJob(), helperFileName, expandedFileName);
-        fileName.set(expandedFileName);
-        Owned<IDistributedFile> index = queryThorFileManager().lookup(container.queryJob(), helperFileName, false, 0 != (TIRoptional & indexBaseHelper->getFlags()), true, container.activityIsCodeSigned());
-        if (index)
+        if ((container.queryLocalOrGrouped() || indexBaseHelper->canMatchAny()))
         {
-            checkFileType(this, index, "key", true);
+            OwnedRoxieString helperFileName = indexBaseHelper->getFileName();
+            StringBuffer expandedFileName;
+            queryThorFileManager().addScope(container.queryJob(), helperFileName, expandedFileName);
+            fileName.set(expandedFileName);
+            Owned<IDistributedFile> index = lookupReadFile(helperFileName, false, false, 0 != (TIRoptional & indexBaseHelper->getFlags()));
+            if (index && (0 == index->numParts())) // possible if superfile
+                index.clear();
+            if (index)
+            {
+                checkFileType(this, index, "key", true);
 
-            partitionKey = index->queryAttributes().hasProp("@partitionFieldMask");
-            localKey = index->queryAttributes().getPropBool("@local") && !partitionKey;
+                partitionKey = index->queryAttributes().hasProp("@partitionFieldMask");
+                localKey = index->queryAttributes().getPropBool("@local") && !partitionKey;
 
-            if (container.queryLocalData() && !localKey)
-                throw MakeActivityException(this, 0, "Index Read cannot be LOCAL unless supplied index is local");
+                if (container.queryLocalData() && !localKey)
+                    throw MakeActivityException(this, 0, "Index Read cannot be LOCAL unless supplied index is local");
 
-            nofilter = 0 != (TIRnofilter & indexBaseHelper->getFlags());
-            if (localKey)
-                nofilter = true;
-            else
-            {
-                IDistributedSuperFile *super = index->querySuperFile();
-                IDistributedFile *sub = super ? &super->querySubFile(0,true) : index.get();
-                if (sub && 1 == sub->numParts())
+                nofilter = 0 != (TIRnofilter & indexBaseHelper->getFlags());
+                if (localKey)
                     nofilter = true;
-                if (super)
+                else
                 {
-                    unsigned numSubFiles = super->numSubFiles();
-                    for (unsigned i=0; i<numSubFiles; i++)
-                        subIndexFileStats.push_back(new CThorStatsCollection(indexReadActivityStatistics));
+                    IDistributedSuperFile *super = index->querySuperFile();
+                    IDistributedFile *sub = super ? &super->querySubFile(0,true) : index.get();
+                    if (sub && 1 == sub->numParts())
+                        nofilter = true;
+                    if (super)
+                    {
+                        unsigned numSubFiles = super->numSubFiles(true);
+
+                        /* JCS->SHAMSER - kludge for now, don't add more than max
+                        * But it means updateFileReadCostStats will not be querying the correct files,
+                        * if the file varies per CQ execution (see other notes in updateFileReadCostStats)
+                        */
+                        for (unsigned i=subIndexFileStats.size(); i<numSubFiles; i++)
+                            subIndexFileStats.push_back(new CThorStatsCollection(indexReadActivityStatistics));
+                    }
                 }
-            }
-            //MORE: Change index getFormatCrc once we support projected rows for indexes.
-            checkFormatCrc(this, index, indexBaseHelper->getDiskFormatCrc(), indexBaseHelper->queryDiskRecordSize(), indexBaseHelper->getProjectedFormatCrc(), indexBaseHelper->queryProjectedDiskRecordSize(), true);
-            if ((container.queryLocalOrGrouped() || indexBaseHelper->canMatchAny()) && index->numParts())
-            {
+                //MORE: Change index getFormatCrc once we support projected rows for indexes.
+                checkFormatCrc(this, index, indexBaseHelper->getDiskFormatCrc(), indexBaseHelper->queryDiskRecordSize(), indexBaseHelper->getProjectedFormatCrc(), indexBaseHelper->queryProjectedDiskRecordSize(), true);
                 fileDesc.setown(getConfiguredFileDescriptor(*index));
                 if (container.queryLocalOrGrouped())
                     nofilter = true;
                 prepareKey(index);
-                addReadFile(index);
                 mapping.setown(getFileSlaveMaps(index->queryLogicalName(), *fileDesc, container.queryJob().queryUserDescriptor(), container.queryJob().querySlaveGroup(), container.queryLocalOrGrouped(), true, NULL, index->querySuperFile()));
             }
         }

+ 2 - 4
thorlcr/activities/keydiff/thkeydiff.cpp

@@ -56,10 +56,10 @@ public:
         queryThorFileManager().addScope(container.queryJob(), outputHelperName, expandedFileName.clear(), false);
         outputName.set(expandedFileName);
 
-        originalIndexFile.setown(queryThorFileManager().lookup(container.queryJob(), originalHelperName,false, false, false, container.activityIsCodeSigned()));
+        originalIndexFile.setown(lookupReadFile(originalHelperName, false, false, false));
+        newIndexFile.setown(lookupReadFile(updatedHelperName, false, false, false));
         if (!isFileKey(originalIndexFile))
             throw MakeActivityException(this, TE_FileTypeMismatch, "Attempting to read flat file as an index: %s", originalHelperName.get());
-        newIndexFile.setown(queryThorFileManager().lookup(container.queryJob(), updatedHelperName, false, false, false, container.activityIsCodeSigned()));
         if (!isFileKey(newIndexFile))
             throw MakeActivityException(this, TE_FileTypeMismatch, "Attempting to read flat file as an index: %s", updatedHelperName.get());
         if (originalIndexFile->numParts() != newIndexFile->numParts())
@@ -87,8 +87,6 @@ public:
         fillClusterArray(container.queryJob(), outputName, clusters, groups);
         patchDesc.setown(queryThorFileManager().create(container.queryJob(), outputName, clusters, groups, 0 != (KDPoverwrite & helper->getFlags()), 0, !local, width));
         patchDesc->queryProperties().setProp("@kind", "keydiff");
-        addReadFile(originalIndexFile);
-        addReadFile(newIndexFile);
     }
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
     {

+ 12 - 14
thorlcr/activities/keyedjoin/thkeyedjoin-legacy.cpp

@@ -30,6 +30,7 @@ namespace LegacyKJ
 class CKeyedJoinMaster : public CMasterActivity
 {
     IHThorKeyedJoinArg *helper;
+    Owned<IDistributedFile> dataFile, indexFile;
     Owned<IFileDescriptor> dataFileDesc;
     Owned<CSlavePartMapping> dataFileMapping;
     MemoryBuffer offsetMapMb, initMb;
@@ -54,12 +55,11 @@ public:
             if (TAG_NULL != tags[i])
                 container.queryJob().freeMPTag(tags[i]);
     }
-    virtual void init()
+    virtual void init() override
     {
         CMasterActivity::init();
         OwnedRoxieString indexFileName(helper->getIndexFileName());
-        Owned<IDistributedFile> dataFile;
-        Owned<IDistributedFile> indexFile = queryThorFileManager().lookup(container.queryJob(), indexFileName, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, container.activityIsCodeSigned());
+        indexFile.setown(lookupReadFile(indexFileName, false, false, 0 != (helper->getJoinFlags() & JFindexoptional)));
 
         unsigned keyReadWidth = (unsigned)container.queryJob().getWorkUnitValueInt("KJKRR", 0);
         if (!keyReadWidth || keyReadWidth>container.queryJob().querySlaves())
@@ -207,7 +207,7 @@ public:
                     OwnedRoxieString fetchFilename(helper->getFileName());
                     if (fetchFilename)
                     {
-                        dataFile.setown(queryThorFileManager().lookup(container.queryJob(), fetchFilename, false, 0 != (helper->getFetchFlags() & FFdatafileoptional), true, container.activityIsCodeSigned()));
+                        dataFile.setown(lookupReadFile(fetchFilename, false, false, 0 != (helper->getFetchFlags() & FFdatafileoptional)));
                         if (dataFile)
                         {
                             if (isFileKey(dataFile))
@@ -260,22 +260,20 @@ public:
             else
                 indexFile.clear();
         }
-        if (indexFile)
-        {
-            addReadFile(indexFile);
-            if (dataFile)
-                addReadFile(dataFile);
-        }
-        else
+        if (!indexFile)
             initMb.append((unsigned)0);
     }
-    virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
+    virtual void kill() override
+    {
+        CMasterActivity::kill();
+        indexFile.clear();
+        dataFile.clear();
+    }
+    virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave) override
     {
         dst.append(initMb);
-        IDistributedFile *indexFile = queryReadFile(0); // 0 == indexFile, 1 == dataFile
         if (indexFile && helper->diskAccessRequired())
         {
-            IDistributedFile *dataFile = queryReadFile(1);
             if (dataFile)
             {
                 dst.append(remoteDataFiles);

+ 16 - 10
thorlcr/activities/keyedjoin/thkeyedjoin.cpp

@@ -303,7 +303,9 @@ public:
         totalIndexParts = 0;
 
         Owned<IDistributedFile> dataFile;
-        Owned<IDistributedFile> indexFile = queryThorFileManager().lookup(container.queryJob(), indexFileName, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, container.activityIsCodeSigned());
+        
+        
+        Owned<IDistributedFile> indexFile = lookupReadFile(indexFileName, false, false, 0 != (helper->getJoinFlags() & JFindexoptional));
         if (indexFile)
         {
             if (!isFileKey(indexFile))
@@ -311,7 +313,12 @@ public:
             IDistributedSuperFile *superIndex = indexFile->querySuperFile();
             // One entry for each subfile (unless it is not a superfile => then add one entry for index data file stats)
             unsigned numSuperIndexSubs = superIndex?superIndex->numSubFiles(true):1;
-            for (unsigned i=0; i<numSuperIndexSubs; i++)
+
+            /* JCS->SHAMSER - kludge for now, don't add more than max
+            * But it means updateFileReadCostStats will not be querying the correct files,
+            * if the file varies per CQ execution (see other notes in updateFileReadCostStats)
+            */
+            for (unsigned i=fileStats.size(); i<numSuperIndexSubs; i++)
                 fileStats.push_back(new CThorStatsCollection(indexReadStatistics));
 
             if (helper->diskAccessRequired())
@@ -319,7 +326,7 @@ public:
                 OwnedRoxieString fetchFilename(helper->getFileName());
                 if (fetchFilename)
                 {
-                    dataFile.setown(queryThorFileManager().lookup(container.queryJob(), fetchFilename, false, 0 != (helper->getFetchFlags() & FFdatafileoptional), true, container.activityIsCodeSigned()));
+                    dataFile.setown(lookupReadFile(fetchFilename, false, false, 0 != (helper->getFetchFlags() & FFdatafileoptional)));
                     if (dataFile)
                     {
                         if (isFileKey(dataFile))
@@ -360,7 +367,12 @@ public:
                         IDistributedSuperFile *super = dataFile->querySuperFile();
                         // One entry for each subfile (unless it is not a superfile => then have 1 entry for data file stats)
                         unsigned numsubs = super?super->numSubFiles(true):1;
-                        for (unsigned i=0; i<numsubs; i++)
+
+                        /* JCS->SHAMSER - kludge for now, don't add more than max
+                        * But it means updateFileReadCostStats will not be querying the correct files,
+                        * if the file varies per CQ execution (see other notes in updateFileReadCostStats)
+                        */
+                        for (unsigned i=fileStats.size(); i<numsubs; i++)
                             fileStats.push_back(new CThorStatsCollection(diskReadRemoteStatistics));
                     }
                 }
@@ -511,12 +523,6 @@ public:
         }
         else
             initMb.append(totalIndexParts); // 0
-        if (indexFile)
-        {
-            addReadFile(indexFile);
-            if (dataFile)
-                addReadFile(dataFile);
-        }
     }
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
     {

+ 2 - 5
thorlcr/activities/keypatch/thkeypatch.cpp

@@ -55,10 +55,10 @@ public:
         queryThorFileManager().addScope(container.queryJob(), outputHelperName, expandedFileName.clear(), false);
         outputName.set(expandedFileName);
 
-        originalIndexFile.setown(queryThorFileManager().lookup(container.queryJob(), originalHelperName, false, false, false, container.activityIsCodeSigned()));
+        originalIndexFile.setown(lookupReadFile(originalHelperName, false, false, false));
         if (!isFileKey(originalIndexFile))
             throw MakeActivityException(this, TE_FileTypeMismatch, "Attempting to read flat file as an index: %s", originalHelperName.get());
-        patchFile.setown(queryThorFileManager().lookup(container.queryJob(), patchHelperName, false, false, false, container.activityIsCodeSigned()));
+        patchFile.setown(lookupReadFile(patchHelperName, false, false, false));
         if (isFileKey(patchFile))
             throw MakeActivityException(this, TE_FileTypeMismatch, "Attempting to read index as a patch file: %s", patchHelperName.get());
         
@@ -67,9 +67,6 @@ public:
         if (originalIndexFile->querySuperFile() || patchFile->querySuperFile())
             throw MakeActivityException(this, 0, "Patching super files not supported");
         
-        addReadFile(originalIndexFile);
-        addReadFile(patchFile);
-
         width = originalIndexFile->numParts();
 
         originalDesc.setown(originalIndexFile->getFileDescriptor());

+ 1 - 2
thorlcr/activities/msort/thmsort.cpp

@@ -91,10 +91,9 @@ protected:
         OwnedRoxieString cosortlogname(helper->getSortedFilename());
         if (cosortlogname&&*cosortlogname)
         {
-            Owned<IDistributedFile> coSortFile = queryThorFileManager().lookup(container.queryJob(), cosortlogname, false, false, false, container.activityIsCodeSigned());
+            Owned<IDistributedFile> coSortFile = lookupReadFile(cosortlogname, false, false, false);
             if (isFileKey(coSortFile))
                 throw MakeActivityException(this, TE_FileTypeMismatch, "Attempting to read index as a flat file: %s", cosortlogname.get());
-            addReadFile(coSortFile);
             Owned<IFileDescriptor> fileDesc = coSortFile->getFileDescriptor();
             unsigned o;
             for (o=0; o<fileDesc->numParts(); o++)

+ 50 - 39
thorlcr/activities/thdiskbase.cpp

@@ -47,61 +47,72 @@ void CDiskReadMasterBase::init()
     fileName.set(expandedFileName);
     reInit = 0 != (helper->getFlags() & (TDXvarfilename|TDXdynamicfilename));
 
-    Owned<IDistributedFile> file = queryThorFileManager().lookup(container.queryJob(), helperFileName, 0 != ((TDXtemporary|TDXjobtemp) & helper->getFlags()), 0 != (TDRoptional & helper->getFlags()), true, container.activityIsCodeSigned());
-    if (file)
+    if (container.queryLocal() || helper->canMatchAny()) // if local, assume may match
     {
-        if (file->isExternal() && (helper->getFlags() & TDXcompress))
-            file->queryAttributes().setPropBool("@blockCompressed", true);
-        if (file->numParts() > 1)
-            fileDesc.setown(getConfiguredFileDescriptor(*file));
-        else
-            fileDesc.setown(file->getFileDescriptor());
-        validateFile(file);
-        if (container.queryLocal() || helper->canMatchAny()) // if local, assume may match
+        bool temp = 0 != (TDXtemporary & helper->getFlags());
+        bool jobTemp = 0 != (TDXjobtemp & helper->getFlags());
+        bool opt = 0 != (TDRoptional & helper->getFlags());
+        file.setown(lookupReadFile(helperFileName, jobTemp, temp, opt));
+        if (file)
         {
-            bool temp = 0 != (TDXtemporary & helper->getFlags());
+            if (file->isExternal() && (helper->getFlags() & TDXcompress))
+                file->queryAttributes().setPropBool("@blockCompressed", true);
+            if (file->numParts() > 1)
+                fileDesc.setown(getConfiguredFileDescriptor(*file));
+            else
+                fileDesc.setown(file->getFileDescriptor());
+            validateFile(file);
             bool local;
             if (temp)
                 local = false;
             else
                 local = container.queryLocal();
             mapping.setown(getFileSlaveMaps(file->queryLogicalName(), *fileDesc, container.queryJob().queryUserDescriptor(), container.queryJob().querySlaveGroup(), local, false, hash, file->querySuperFile()));
-            addReadFile(file, temp);
-        }
-        IDistributedSuperFile *super = file->querySuperFile();
-        unsigned numsubs = super?super->numSubFiles(true):0;
-        if (0 != (helper->getFlags() & TDRfilenamecallback)) // only get/serialize if using virtual file name fields
-        {
-            for (unsigned s=0; s<numsubs; s++)
+            IDistributedSuperFile *super = file->querySuperFile();
+            unsigned numsubs = super?super->numSubFiles(true):0;
+            if (0 != (helper->getFlags() & TDRfilenamecallback)) // only get/serialize if using virtual file name fields
             {
-                IDistributedFile &subfile = super->querySubFile(s, true);
-                subfileLogicalFilenames.append(subfile.queryLogicalName());
+                for (unsigned s=0; s<numsubs; s++)
+                {
+                    IDistributedFile &subfile = super->querySubFile(s, true);
+                    subfileLogicalFilenames.append(subfile.queryLogicalName());
+                }
             }
-        }
-        if (0==(helper->getFlags() & TDXtemporary))
-        {
-            for (unsigned i=0; i<numsubs; i++)
-                subFileStats.push_back(new CThorStatsCollection(diskReadRemoteStatistics));
-        }
-        void *ekey;
-        size32_t ekeylen;
-        helper->getEncryptKey(ekeylen,ekey);
-        bool encrypted = fileDesc->queryProperties().getPropBool("@encrypted");
-        if (0 != ekeylen)
-        {
-            memset(ekey,0,ekeylen);
-            free(ekey);
-            if (!encrypted)
+            if (0==(helper->getFlags() & TDXtemporary))
+            {
+                /* JCS->SHAMSER - kludge for now, don't add more than max
+                 * But it means updateFileReadCostStats will not be querying the correct files,
+                 * if the file varies per CQ execution (see other notes in updateFileReadCostStats)
+                 */
+                for (unsigned i=subFileStats.size(); i<numsubs; i++)
+                    subFileStats.push_back(new CThorStatsCollection(diskReadRemoteStatistics));
+            }
+            void *ekey;
+            size32_t ekeylen;
+            helper->getEncryptKey(ekeylen,ekey);
+            bool encrypted = fileDesc->queryProperties().getPropBool("@encrypted");
+            if (0 != ekeylen)
             {
-                Owned<IException> e = MakeActivityWarning(&container, TE_EncryptionMismatch, "Ignoring encryption key provided as file '%s' was not published as encrypted", fileName.get());
-                queryJobChannel().fireException(e);
+                memset(ekey,0,ekeylen);
+                free(ekey);
+                if (!encrypted)
+                {
+                    Owned<IException> e = MakeActivityWarning(&container, TE_EncryptionMismatch, "Ignoring encryption key provided as file '%s' was not published as encrypted", fileName.get());
+                    queryJobChannel().fireException(e);
+                }
             }
+            else if (encrypted)
+                throw MakeActivityException(this, 0, "File '%s' was published as encrypted but no encryption key provided", fileName.get());
         }
-        else if (encrypted)
-            throw MakeActivityException(this, 0, "File '%s' was published as encrypted but no encryption key provided", fileName.get());
     }
 }
 
+void CDiskReadMasterBase::kill()
+{
+    CMasterActivity::kill();
+    file.clear();
+}
+
 void CDiskReadMasterBase::serializeSlaveData(MemoryBuffer &dst, unsigned slave)
 {
     dst.append(fileName);

+ 2 - 0
thorlcr/activities/thdiskbase.ipp

@@ -29,6 +29,7 @@ class CDiskReadMasterBase : public CMasterActivity
 protected:
     StringArray subfileLogicalFilenames;
     Owned<IFileDescriptor> fileDesc;
+    Owned<IDistributedFile> file;
     Owned<CSlavePartMapping> mapping;
     IHash *hash;
     StringAttr fileName;
@@ -37,6 +38,7 @@ protected:
 public:
     CDiskReadMasterBase(CMasterGraphElement *info);
     virtual void init() override;
+    virtual void kill() override;
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave) override;
     virtual void done() override;
     virtual void validateFile(IDistributedFile *file) { }

+ 102 - 39
thorlcr/graph/thgraphmaster.cpp

@@ -388,13 +388,6 @@ CMasterActivity::~CMasterActivity()
     delete [] data;
 }
 
-void CMasterActivity::addReadFile(IDistributedFile *file, bool temp)
-{
-    readFiles.append(*LINK(file));
-    if (!temp) // NB: Temps not listed in workunit
-        queryThorFileManager().noteFileRead(container.queryJob(), file);
-}
-
 IDistributedFile *CMasterActivity::queryReadFile(unsigned f)
 {
     if (f>=readFiles.ordinality())
@@ -402,20 +395,31 @@ IDistributedFile *CMasterActivity::queryReadFile(unsigned f)
     return &readFiles.item(f);
 }
 
-void CMasterActivity::preStart(size32_t parentExtractSz, const byte *parentExtract)
+IDistributedFile *CMasterActivity::findReadFile(const char *lfnName)
+{
+    auto it = readFilesMap.find(lfnName);
+    if (it != readFilesMap.end())
+        return LINK(it->second);
+    return nullptr;
+}
+
+IDistributedFile *CMasterActivity::lookupReadFile(const char *lfnName, bool jobTemp, bool temp, bool opt)
 {
-    CActivityBase::preStart(parentExtractSz, parentExtract);
-    IArrayOf<IDistributedFile> tmpFiles;
-    tmpFiles.swapWith(readFiles);
-    ForEachItemIn(f, tmpFiles)
+    StringBuffer normalizedFileName;
+    queryThorFileManager().addScope(container.queryJob(), lfnName, normalizedFileName, jobTemp|temp);
+    Owned<IDistributedFile> file = findReadFile(normalizedFileName);
+    if (!file)
     {
-        IDistributedFile &file = tmpFiles.item(f);
-        IDistributedSuperFile *super = file.querySuperFile();
-        if (super)
-            getSuperFileSubs(super, readFiles, true);
-        else
-            readFiles.append(*LINK(&file));
+        file.setown(queryThorFileManager().lookup(container.queryJob(), lfnName, jobTemp|temp, opt, true, container.activityIsCodeSigned()));
+        if (file)
+        {
+            readFiles.append(*LINK(file));
+            readFilesMap[normalizedFileName.str()] = file;
+            if (!temp) // NB: Temps not listed in workunit
+                queryThorFileManager().noteFileRead(container.queryJob(), file);
+        }
     }
+    return file.getClear();
 }
 
 MemoryBuffer &CMasterActivity::queryInitializationData(unsigned slave) const
@@ -455,7 +459,16 @@ void CMasterActivity::threadmain()
 
 void CMasterActivity::init()
 {
-    readFiles.kill();
+    // Files are added to readFiles during initialization,
+    // If this is a CQ query act., then it will be repeatedly re-initialized if it depends on master context,
+    // e.g. due to a variable filename.
+    // Therefore, do not clear readFiles on reinitialization, to avoid repeatedly [expensively] looking them up.
+    bool inCQ = container.queryOwner().queryOwner() && !container.queryOwner().isGlobal();
+    if (!inCQ)
+    {
+        readFiles.kill();
+        readFilesMap.clear();
+    }
 }
 
 void CMasterActivity::startProcess(bool async)
@@ -480,6 +493,7 @@ void CMasterActivity::kill()
 {
     CActivityBase::kill();
     readFiles.kill();
+    readFilesMap.clear();
 }
 
 bool CMasterActivity::fireException(IException *_e)
@@ -542,35 +556,84 @@ void CMasterActivity::getEdgeStats(IStatisticGatherer & stats, unsigned idx)
 void CMasterActivity::done()
 {
     CActivityBase::done();
-    ForEachItemIn(s, readFiles)
+    if (readFiles.ordinality())
     {
-        IDistributedFile &file = readFiles.item(s);
-        file.setAccessed();
+        IArrayOf<IDistributedFile> tmpFiles;
+        ForEachItemIn(f, readFiles)
+        {
+            IDistributedFile &file = readFiles.item(f);
+            IDistributedSuperFile *super = file.querySuperFile();
+            if (super)
+            {
+                getSuperFileSubs(super, tmpFiles, true);
+                tmpFiles.append(*LINK(&file));
+            }
+            else
+                tmpFiles.append(*LINK(&file));
+        }
+        ForEachItemIn(s, tmpFiles)
+        {
+            IDistributedFile &file = tmpFiles.item(s);
+            file.setAccessed();
+        }
     }
 }
 
 void CMasterActivity::updateFileReadCostStats(std::vector<OwnedPtr<CThorStatsCollection>> & subFileStats)
 {
-    if (!subFileStats.empty())
-    {
-        unsigned numSubFiles = subFileStats.size();
-        for (unsigned i=0; i<numSubFiles; i++)
-        {
-            IDistributedFile *file = queryReadFile(i);
-            if (file)
+    /* JCSMORE->SHAMSER: (separate JIRA needed)
+     * there can be >1 read file if this act. is in a child query/loop, it could be processing a different file per iteration,
+     * meaning there could be an arbitrary number of readfiles, in that case activity::init would be called multiple times.
+     * 
+     * I hit an assert during testing reading a super in a CQ:
+        libjlib.so!raiseAssertException(const char * assertion, const char * file, unsigned int line) (\home\jsmith\git\HPCC-Platform\system\jlib\jexcept.cpp:660)
+        libjlib.so!MemoryBuffer::read(MemoryBuffer * const this, unsigned char & value) (\home\jsmith\git\HPCC-Platform\system\jlib\jbuff.cpp:693)
+        libjlib.so!MemoryBuffer::readPacked(MemoryBuffer * const this) (\home\jsmith\git\HPCC-Platform\system\jlib\jbuff.cpp:813)
+        libjlib.so!MemoryBuffer::readPacked(MemoryBuffer * const this, unsigned int & value) (\home\jsmith\git\HPCC-Platform\system\jlib\jbuff.cpp:824)
+        libjlib.so!CRuntimeStatisticCollection::deserialize(CRuntimeStatisticCollection * const this, MemoryBuffer & in) (\home\jsmith\git\HPCC-Platform\system\jlib\jstats.cpp:2524)
+        libactivitymasters_lcr.so!CThorStatsCollection::deserialize(CThorStatsCollection * const this, unsigned int node, MemoryBuffer & mb) (\home\jsmith\git\HPCC-Platform\thorlcr\graph\thgraphmaster.ipp:53)
+        libactivitymasters_lcr.so!CDiskReadMasterBase::deserializeStats(CDiskReadMasterBase * const this, unsigned int node, MemoryBuffer & mb) (\home\jsmith\git\HPCC-Platform\thorlcr\activities\thdiskbase.cpp:139)
+        libgraphmaster_lcr.so!CMasterGraph::deserializeStats(CMasterGraph * const this, unsigned int node, MemoryBuffer & mb) (\home\jsmith\git\HPCC-Platform\thorlcr\graph\thgraphmaster.cpp:2781)
+     *
+     * (would be a crash in a Release build)
+     * it's because the diskread init is adding new CThorStatsCollection per init (per execution of the CQ),
+     * which means when deserializing there are too many. The code assumes there is only 1 file being read.
+     * 
+     * I've temporarily changed the code where subFileStats's are added, to prevent more being added per iteration,
+     * but it needs re-thinking to handle the workers potentially dealing with different logical files
+     * (+ index read to handle super files, and case where act. is reading >1 file, i.e. KJ)
+     * I've changed this code rely on the 1st readFiles for now (not the # of subFileStats, which may be more)
+     * NB: also changed when the expansion of readFiles (from supers to subfiles) happens, a new super was being added
+     * each CQ iteration and re-expanded, meaing readFiles kept growing.
+     * 
+     * Also, superkey1.ecl hits a dbgasserex whilst deserializaing stats (before and after these PR changes),
+     * but is caught/ignored. I haven't investigated further.
+     */
+
+    IDistributedFile *file = queryReadFile(0);
+    if (file)
+    {
+        IDistributedSuperFile *super = file->querySuperFile();
+        if (super)
+        {       
+            unsigned numSubFiles = super->numSubFiles(true); //subFileStats.size();
+            if (subFileStats.size())
             {
-                stat_type numDiskReads = subFileStats[i]->getStatisticSum(StNumDiskReads);
-                StringBuffer clusterName;
-                file->getClusterName(0, clusterName);
-                diskAccessCost += money2cost_type(calcFileAccessCost(clusterName, 0, numDiskReads));
-                file->addAttrValue("@numDiskReads", numDiskReads);
+                assertex(numSubFiles <= subFileStats.size());
+                for (unsigned i=0; i<subFileStats.size(); i++)
+                {
+                    IDistributedFile &subFile = super->querySubFile(i, true);
+                    const char *subName = subFile.queryLogicalName();
+                    PROGLOG("subName = %s", subName);
+                    stat_type numDiskReads = subFileStats[i]->getStatisticSum(StNumDiskReads);
+                    StringBuffer clusterName;
+                    subFile.getClusterName(0, clusterName);
+                    diskAccessCost += money2cost_type(calcFileAccessCost(clusterName, 0, numDiskReads));
+                    subFile.addAttrValue("@numDiskReads", numDiskReads);
+                }
             }
         }
-    }
-    else
-    {
-        IDistributedFile *file = queryReadFile(0);
-        if (file)
+        else
         {
             stat_type numDiskReads = statsCollection.getStatisticSum(StNumDiskReads);
             StringBuffer clusterName;

+ 5 - 2
thorlcr/graph/thgraphmaster.ipp

@@ -18,6 +18,8 @@
 #ifndef _THGRAPHMASTER_IPP
 #define _THGRAPHMASTER_IPP
 
+#include <unordered_map>
+
 #include "jmisc.hpp"
 #include "jsuperhash.hpp"
 #include "workunit.hpp"
@@ -238,6 +240,7 @@ class graphmaster_decl CMasterActivity : public CActivityBase, implements IThrea
     MemoryBuffer *data;
     CriticalSection progressCrit;
     IArrayOf<IDistributedFile> readFiles;
+    std::unordered_map<std::string, IDistributedFile *> readFilesMap;
 
 protected:
     std::vector<OwnedPtr<CThorEdgeCollection>> edgeStatsVector;
@@ -245,8 +248,9 @@ protected:
     IBitSet *notedWarnings;
     cost_type diskAccessCost = 0;
 
-    void addReadFile(IDistributedFile *file, bool temp=false);
     IDistributedFile *queryReadFile(unsigned f);
+    IDistributedFile *findReadFile(const char *lfnName);
+    IDistributedFile *lookupReadFile(const char *lfnName, bool jobTemp, bool temp, bool opt);
     void updateFileReadCostStats(std::vector<OwnedPtr<CThorStatsCollection>> & subFileStats);
     void updateFileWriteCostStats(IFileDescriptor & fileDesc, IPropertyTree &props, stat_type numDiskWrites);
     virtual void process() { }
@@ -269,7 +273,6 @@ public:
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave) { }
     virtual void slaveDone(size32_t slaveIdx, MemoryBuffer &mb) { }
 
-    virtual void preStart(size32_t parentExtractSz, const byte *parentExtract);
     virtual void startProcess(bool async=true);
     virtual bool wait(unsigned timeout);
     virtual void done();