浏览代码

Merge pull request #15918 from jakesmith/HPCC-26921-cq-avoid-re-lookup

 HPCC-26921 Avoid CQ's re-looking up logical files.

Reviewed-By: Shamser Ahmed <shamser.ahmed@lexisnexis.co.uk>
Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 3 年之前
父节点
当前提交
aa7ac959f4

+ 6 - 4
thorlcr/activities/diskread/thdiskread.cpp

@@ -106,9 +106,12 @@ public:
     virtual void done() override
     {
         CDiskReadMasterVF::done();
-        IHThorDiskReadBaseArg *helper = (IHThorDiskReadBaseArg *)queryHelper();
-        if (0 != (helper->getFlags() & TDXtemporary) && !container.queryJob().queryUseCheckpoints())
-            container.queryTempHandler()->deregisterFile(fileName, fileDesc->queryProperties().getPropBool("@pausefile"));
+        if (file)
+        {
+            IHThorDiskReadBaseArg *helper = (IHThorDiskReadBaseArg *)queryHelper();
+            if (0 != (helper->getFlags() & TDXtemporary) && !container.queryJob().queryUseCheckpoints())
+                container.queryTempHandler()->deregisterFile(fileName, fileDesc->queryProperties().getPropBool("@pausefile"));
+        }
     }
     virtual void init() override
     {
@@ -192,7 +195,6 @@ public:
         {
             if (!helper->hasSegmentMonitors() && !helper->hasFilter() && !(helper->getFlags() & TDXtemporary))
             {
-                IDistributedFile *file = queryReadFile(0);
                 if (file && canMatch)
                 {
                     if (0 != (TDRunfilteredcount & helper->getFlags()) && file->queryAttributes().hasProp("@recordCount"))

+ 17 - 7
thorlcr/activities/fetch/thfetch.cpp

@@ -34,6 +34,7 @@ class CFetchActivityMaster : public CMasterActivity
     std::vector<OwnedPtr<CThorStatsCollection>> subFileStats;
 
 protected:
+    Owned<IDistributedFile> fetchFile;
     IHThorFetchArg *helper;
 
 public:
@@ -49,11 +50,11 @@ public:
     {
         if (endpoints) free(endpoints);
     }
-    virtual void init()
+    virtual void init() override
     {
         CMasterActivity::init();
         OwnedRoxieString fname(helper->getFileName());
-        Owned<IDistributedFile> fetchFile = queryThorFileManager().lookup(container.queryJob(), fname, false, 0 != (helper->getFetchFlags() & FFdatafileoptional), false, container.activityIsCodeSigned());
+        fetchFile.setown(lookupReadFile(fname, false, false, 0 != (helper->getFetchFlags() & FFdatafileoptional)));
         if (fetchFile)
         {
             if (isFileKey(fetchFile))
@@ -77,15 +78,25 @@ public:
                 throw MakeActivityException(this, 0, "File '%s' was published as encrypted but no encryption key provided", fetchFile->queryLogicalName());
             IDistributedSuperFile *super = fetchFile->querySuperFile();
             unsigned numsubs = super?super->numSubFiles(true):0;
-            for (unsigned i=0; i<numsubs; i++)
-                subFileStats.push_back(new CThorStatsCollection(diskReadPartStatistics));
+
+            /* JCS->SHAMSER - kludge for now, don't add more than max
+                * But it means updateFileReadCostStats will not be querying the correct files,
+                * if the file varies per CQ execution (see other notes in updateFileReadCostStats)
+                */
+            for (unsigned i=subFileStats.size(); i<numsubs; i++)
+                subFileStats.push_back(new CThorStatsCollection(diskReadRemoteStatistics));
+
 
             mapping.setown(getFileSlaveMaps(fetchFile->queryLogicalName(), *fileDesc, container.queryJob().queryUserDescriptor(), container.queryJob().querySlaveGroup(), container.queryLocalOrGrouped(), false, NULL, super));
             mapping->serializeFileOffsetMap(offsetMapMb);
-            addReadFile(fetchFile);
         }
     }
-    virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
+    virtual void kill() override
+    {
+        CMasterActivity::kill();
+        fetchFile.clear();
+    }
+    virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave) override
     {
         if (mapping)
         {
@@ -120,7 +131,6 @@ public:
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
     {
         CFetchActivityMaster::serializeSlaveData(dst, slave);
-        IDistributedFile *fetchFile = queryReadFile(0);
         if (fetchFile)
             fetchFile->queryAttributes().serialize(dst);
     }

+ 1 - 5
thorlcr/activities/hashdistrib/thhashdistrib.cpp

@@ -100,9 +100,7 @@ public:
         StringBuffer scoped;
         OwnedRoxieString indexFileName(helper->getIndexFileName());
         queryThorFileManager().addScope(container.queryJob(), indexFileName, scoped);
-        Owned<IDistributedFile> file = queryThorFileManager().lookup(container.queryJob(), indexFileName, false, false, false, container.activityIsCodeSigned());
-        if (!file)
-            throw MakeActivityException(this, 0, "KeyedDistribute: Failed to find key: %s", scoped.str());
+        Owned<IDistributedFile> file = lookupReadFile(indexFileName, false, false, false);
         if (0 == file->numParts())
             throw MakeActivityException(this, 0, "KeyedDistribute: Can't distribute based on an empty key: %s", scoped.str());
         if (!isFileKey(file))
@@ -123,8 +121,6 @@ public:
 
         tlkMb.append(iFileIO->size());
         ::read(iFileIO, 0, (size32_t)iFileIO->size(), tlkMb);
-
-        addReadFile(file);
     }
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
     {

+ 35 - 29
thorlcr/activities/indexread/thindexread.cpp

@@ -216,46 +216,52 @@ public:
     virtual void init() override
     {
         CMasterActivity::init();
-        OwnedRoxieString helperFileName = indexBaseHelper->getFileName();
-        StringBuffer expandedFileName;
-        queryThorFileManager().addScope(container.queryJob(), helperFileName, expandedFileName);
-        fileName.set(expandedFileName);
-        Owned<IDistributedFile> index = queryThorFileManager().lookup(container.queryJob(), helperFileName, false, 0 != (TIRoptional & indexBaseHelper->getFlags()), true, container.activityIsCodeSigned());
-        if (index)
+        if ((container.queryLocalOrGrouped() || indexBaseHelper->canMatchAny()))
         {
-            checkFileType(this, index, "key", true);
+            OwnedRoxieString helperFileName = indexBaseHelper->getFileName();
+            StringBuffer expandedFileName;
+            queryThorFileManager().addScope(container.queryJob(), helperFileName, expandedFileName);
+            fileName.set(expandedFileName);
+            Owned<IDistributedFile> index = lookupReadFile(helperFileName, false, false, 0 != (TIRoptional & indexBaseHelper->getFlags()));
+            if (index && (0 == index->numParts())) // possible if superfile
+                index.clear();
+            if (index)
+            {
+                checkFileType(this, index, "key", true);
 
-            partitionKey = index->queryAttributes().hasProp("@partitionFieldMask");
-            localKey = index->queryAttributes().getPropBool("@local") && !partitionKey;
+                partitionKey = index->queryAttributes().hasProp("@partitionFieldMask");
+                localKey = index->queryAttributes().getPropBool("@local") && !partitionKey;
 
-            if (container.queryLocalData() && !localKey)
-                throw MakeActivityException(this, 0, "Index Read cannot be LOCAL unless supplied index is local");
+                if (container.queryLocalData() && !localKey)
+                    throw MakeActivityException(this, 0, "Index Read cannot be LOCAL unless supplied index is local");
 
-            nofilter = 0 != (TIRnofilter & indexBaseHelper->getFlags());
-            if (localKey)
-                nofilter = true;
-            else
-            {
-                IDistributedSuperFile *super = index->querySuperFile();
-                IDistributedFile *sub = super ? &super->querySubFile(0,true) : index.get();
-                if (sub && 1 == sub->numParts())
+                nofilter = 0 != (TIRnofilter & indexBaseHelper->getFlags());
+                if (localKey)
                     nofilter = true;
-                if (super)
+                else
                 {
-                    unsigned numSubFiles = super->numSubFiles();
-                    for (unsigned i=0; i<numSubFiles; i++)
-                        subIndexFileStats.push_back(new CThorStatsCollection(indexReadActivityStatistics));
+                    IDistributedSuperFile *super = index->querySuperFile();
+                    IDistributedFile *sub = super ? &super->querySubFile(0,true) : index.get();
+                    if (sub && 1 == sub->numParts())
+                        nofilter = true;
+                    if (super)
+                    {
+                        unsigned numSubFiles = super->numSubFiles(true);
+
+                        /* JCS->SHAMSER - kludge for now, don't add more than max
+                        * But it means updateFileReadCostStats will not be querying the correct files,
+                        * if the file varies per CQ execution (see other notes in updateFileReadCostStats)
+                        */
+                        for (unsigned i=subIndexFileStats.size(); i<numSubFiles; i++)
+                            subIndexFileStats.push_back(new CThorStatsCollection(indexReadActivityStatistics));
+                    }
                 }
-            }
-            //MORE: Change index getFormatCrc once we support projected rows for indexes.
-            checkFormatCrc(this, index, indexBaseHelper->getDiskFormatCrc(), indexBaseHelper->queryDiskRecordSize(), indexBaseHelper->getProjectedFormatCrc(), indexBaseHelper->queryProjectedDiskRecordSize(), true);
-            if ((container.queryLocalOrGrouped() || indexBaseHelper->canMatchAny()) && index->numParts())
-            {
+                //MORE: Change index getFormatCrc once we support projected rows for indexes.
+                checkFormatCrc(this, index, indexBaseHelper->getDiskFormatCrc(), indexBaseHelper->queryDiskRecordSize(), indexBaseHelper->getProjectedFormatCrc(), indexBaseHelper->queryProjectedDiskRecordSize(), true);
                 fileDesc.setown(getConfiguredFileDescriptor(*index));
                 if (container.queryLocalOrGrouped())
                     nofilter = true;
                 prepareKey(index);
-                addReadFile(index);
                 mapping.setown(getFileSlaveMaps(index->queryLogicalName(), *fileDesc, container.queryJob().queryUserDescriptor(), container.queryJob().querySlaveGroup(), container.queryLocalOrGrouped(), true, NULL, index->querySuperFile()));
             }
         }

+ 2 - 4
thorlcr/activities/keydiff/thkeydiff.cpp

@@ -56,10 +56,10 @@ public:
         queryThorFileManager().addScope(container.queryJob(), outputHelperName, expandedFileName.clear(), false);
         outputName.set(expandedFileName);
 
-        originalIndexFile.setown(queryThorFileManager().lookup(container.queryJob(), originalHelperName,false, false, false, container.activityIsCodeSigned()));
+        originalIndexFile.setown(lookupReadFile(originalHelperName, false, false, false));
+        newIndexFile.setown(lookupReadFile(updatedHelperName, false, false, false));
         if (!isFileKey(originalIndexFile))
             throw MakeActivityException(this, TE_FileTypeMismatch, "Attempting to read flat file as an index: %s", originalHelperName.get());
-        newIndexFile.setown(queryThorFileManager().lookup(container.queryJob(), updatedHelperName, false, false, false, container.activityIsCodeSigned()));
         if (!isFileKey(newIndexFile))
             throw MakeActivityException(this, TE_FileTypeMismatch, "Attempting to read flat file as an index: %s", updatedHelperName.get());
         if (originalIndexFile->numParts() != newIndexFile->numParts())
@@ -87,8 +87,6 @@ public:
         fillClusterArray(container.queryJob(), outputName, clusters, groups);
         patchDesc.setown(queryThorFileManager().create(container.queryJob(), outputName, clusters, groups, 0 != (KDPoverwrite & helper->getFlags()), 0, !local, width));
         patchDesc->queryProperties().setProp("@kind", "keydiff");
-        addReadFile(originalIndexFile);
-        addReadFile(newIndexFile);
     }
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
     {

+ 12 - 14
thorlcr/activities/keyedjoin/thkeyedjoin-legacy.cpp

@@ -30,6 +30,7 @@ namespace LegacyKJ
 class CKeyedJoinMaster : public CMasterActivity
 {
     IHThorKeyedJoinArg *helper;
+    Owned<IDistributedFile> dataFile, indexFile;
     Owned<IFileDescriptor> dataFileDesc;
     Owned<CSlavePartMapping> dataFileMapping;
     MemoryBuffer offsetMapMb, initMb;
@@ -54,12 +55,11 @@ public:
             if (TAG_NULL != tags[i])
                 container.queryJob().freeMPTag(tags[i]);
     }
-    virtual void init()
+    virtual void init() override
     {
         CMasterActivity::init();
         OwnedRoxieString indexFileName(helper->getIndexFileName());
-        Owned<IDistributedFile> dataFile;
-        Owned<IDistributedFile> indexFile = queryThorFileManager().lookup(container.queryJob(), indexFileName, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, container.activityIsCodeSigned());
+        indexFile.setown(lookupReadFile(indexFileName, false, false, 0 != (helper->getJoinFlags() & JFindexoptional)));
 
         unsigned keyReadWidth = (unsigned)container.queryJob().getWorkUnitValueInt("KJKRR", 0);
         if (!keyReadWidth || keyReadWidth>container.queryJob().querySlaves())
@@ -207,7 +207,7 @@ public:
                     OwnedRoxieString fetchFilename(helper->getFileName());
                     if (fetchFilename)
                     {
-                        dataFile.setown(queryThorFileManager().lookup(container.queryJob(), fetchFilename, false, 0 != (helper->getFetchFlags() & FFdatafileoptional), true, container.activityIsCodeSigned()));
+                        dataFile.setown(lookupReadFile(fetchFilename, false, false, 0 != (helper->getFetchFlags() & FFdatafileoptional)));
                         if (dataFile)
                         {
                             if (isFileKey(dataFile))
@@ -260,22 +260,20 @@ public:
             else
                 indexFile.clear();
         }
-        if (indexFile)
-        {
-            addReadFile(indexFile);
-            if (dataFile)
-                addReadFile(dataFile);
-        }
-        else
+        if (!indexFile)
             initMb.append((unsigned)0);
     }
-    virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
+    virtual void kill() override
+    {
+        CMasterActivity::kill();
+        indexFile.clear();
+        dataFile.clear();
+    }
+    virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave) override
     {
         dst.append(initMb);
-        IDistributedFile *indexFile = queryReadFile(0); // 0 == indexFile, 1 == dataFile
         if (indexFile && helper->diskAccessRequired())
         {
-            IDistributedFile *dataFile = queryReadFile(1);
             if (dataFile)
             {
                 dst.append(remoteDataFiles);

+ 14 - 10
thorlcr/activities/keyedjoin/thkeyedjoin.cpp

@@ -303,7 +303,7 @@ public:
         totalIndexParts = 0;
 
         Owned<IDistributedFile> dataFile;
-        Owned<IDistributedFile> indexFile = queryThorFileManager().lookup(container.queryJob(), indexFileName, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, container.activityIsCodeSigned());
+        Owned<IDistributedFile> indexFile = lookupReadFile(indexFileName, false, false, 0 != (helper->getJoinFlags() & JFindexoptional));
         if (indexFile)
         {
             if (!isFileKey(indexFile))
@@ -311,7 +311,12 @@ public:
             IDistributedSuperFile *superIndex = indexFile->querySuperFile();
             // One entry for each subfile (unless it is not a superfile => then add one entry for index data file stats)
             unsigned numSuperIndexSubs = superIndex?superIndex->numSubFiles(true):1;
-            for (unsigned i=0; i<numSuperIndexSubs; i++)
+
+            /* JCS->SHAMSER - kludge for now, don't add more than max
+            * But it means updateFileReadCostStats will not be querying the correct files,
+            * if the file varies per CQ execution (see other notes in updateFileReadCostStats)
+            */
+            for (unsigned i=fileStats.size(); i<numSuperIndexSubs; i++)
                 fileStats.push_back(new CThorStatsCollection(indexReadStatistics));
 
             if (helper->diskAccessRequired())
@@ -319,7 +324,7 @@ public:
                 OwnedRoxieString fetchFilename(helper->getFileName());
                 if (fetchFilename)
                 {
-                    dataFile.setown(queryThorFileManager().lookup(container.queryJob(), fetchFilename, false, 0 != (helper->getFetchFlags() & FFdatafileoptional), true, container.activityIsCodeSigned()));
+                    dataFile.setown(lookupReadFile(fetchFilename, false, false, 0 != (helper->getFetchFlags() & FFdatafileoptional)));
                     if (dataFile)
                     {
                         if (isFileKey(dataFile))
@@ -360,7 +365,12 @@ public:
                         IDistributedSuperFile *super = dataFile->querySuperFile();
                         // One entry for each subfile (unless it is not a superfile => then have 1 entry for data file stats)
                         unsigned numsubs = super?super->numSubFiles(true):1;
-                        for (unsigned i=0; i<numsubs; i++)
+
+                        /* JCS->SHAMSER - kludge for now, don't add more than max
+                        * But it means updateFileReadCostStats will not be querying the correct files,
+                        * if the file varies per CQ execution (see other notes in updateFileReadCostStats)
+                        */
+                        for (unsigned i=fileStats.size(); i<numsubs; i++)
                             fileStats.push_back(new CThorStatsCollection(diskReadRemoteStatistics));
                     }
                 }
@@ -511,12 +521,6 @@ public:
         }
         else
             initMb.append(totalIndexParts); // 0
-        if (indexFile)
-        {
-            addReadFile(indexFile);
-            if (dataFile)
-                addReadFile(dataFile);
-        }
     }
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
     {

+ 2 - 5
thorlcr/activities/keypatch/thkeypatch.cpp

@@ -55,10 +55,10 @@ public:
         queryThorFileManager().addScope(container.queryJob(), outputHelperName, expandedFileName.clear(), false);
         outputName.set(expandedFileName);
 
-        originalIndexFile.setown(queryThorFileManager().lookup(container.queryJob(), originalHelperName, false, false, false, container.activityIsCodeSigned()));
+        originalIndexFile.setown(lookupReadFile(originalHelperName, false, false, false));
         if (!isFileKey(originalIndexFile))
             throw MakeActivityException(this, TE_FileTypeMismatch, "Attempting to read flat file as an index: %s", originalHelperName.get());
-        patchFile.setown(queryThorFileManager().lookup(container.queryJob(), patchHelperName, false, false, false, container.activityIsCodeSigned()));
+        patchFile.setown(lookupReadFile(patchHelperName, false, false, false));
         if (isFileKey(patchFile))
             throw MakeActivityException(this, TE_FileTypeMismatch, "Attempting to read index as a patch file: %s", patchHelperName.get());
         
@@ -67,9 +67,6 @@ public:
         if (originalIndexFile->querySuperFile() || patchFile->querySuperFile())
             throw MakeActivityException(this, 0, "Patching super files not supported");
         
-        addReadFile(originalIndexFile);
-        addReadFile(patchFile);
-
         width = originalIndexFile->numParts();
 
         originalDesc.setown(originalIndexFile->getFileDescriptor());

+ 1 - 2
thorlcr/activities/msort/thmsort.cpp

@@ -91,10 +91,9 @@ protected:
         OwnedRoxieString cosortlogname(helper->getSortedFilename());
         if (cosortlogname&&*cosortlogname)
         {
-            Owned<IDistributedFile> coSortFile = queryThorFileManager().lookup(container.queryJob(), cosortlogname, false, false, false, container.activityIsCodeSigned());
+            Owned<IDistributedFile> coSortFile = lookupReadFile(cosortlogname, false, false, false);
             if (isFileKey(coSortFile))
                 throw MakeActivityException(this, TE_FileTypeMismatch, "Attempting to read index as a flat file: %s", cosortlogname.get());
-            addReadFile(coSortFile);
             Owned<IFileDescriptor> fileDesc = coSortFile->getFileDescriptor();
             unsigned o;
             for (o=0; o<fileDesc->numParts(); o++)

+ 51 - 39
thorlcr/activities/thdiskbase.cpp

@@ -47,61 +47,73 @@ void CDiskReadMasterBase::init()
     fileName.set(expandedFileName);
     reInit = 0 != (helper->getFlags() & (TDXvarfilename|TDXdynamicfilename));
 
-    Owned<IDistributedFile> file = queryThorFileManager().lookup(container.queryJob(), helperFileName, 0 != ((TDXtemporary|TDXjobtemp) & helper->getFlags()), 0 != (TDRoptional & helper->getFlags()), true, container.activityIsCodeSigned());
-    if (file)
+    if (container.queryLocal() || helper->canMatchAny()) // if local, assume may match
     {
-        if (file->isExternal() && (helper->getFlags() & TDXcompress))
-            file->queryAttributes().setPropBool("@blockCompressed", true);
-        if (file->numParts() > 1)
-            fileDesc.setown(getConfiguredFileDescriptor(*file));
-        else
-            fileDesc.setown(file->getFileDescriptor());
-        validateFile(file);
-        if (container.queryLocal() || helper->canMatchAny()) // if local, assume may match
+        bool temp = 0 != (TDXtemporary & helper->getFlags());
+        bool jobTemp = 0 != (TDXjobtemp & helper->getFlags());
+        bool opt = 0 != (TDRoptional & helper->getFlags());
+        file.setown(lookupReadFile(helperFileName, jobTemp, temp, opt));
+        if (file)
         {
-            bool temp = 0 != (TDXtemporary & helper->getFlags());
+            if (file->isExternal() && (helper->getFlags() & TDXcompress))
+                file->queryAttributes().setPropBool("@blockCompressed", true);
+            if (file->numParts() > 1)
+                fileDesc.setown(getConfiguredFileDescriptor(*file));
+            else
+                fileDesc.setown(file->getFileDescriptor());
+            validateFile(file);
             bool local;
             if (temp)
                 local = false;
             else
                 local = container.queryLocal();
             mapping.setown(getFileSlaveMaps(file->queryLogicalName(), *fileDesc, container.queryJob().queryUserDescriptor(), container.queryJob().querySlaveGroup(), local, false, hash, file->querySuperFile()));
-            addReadFile(file, temp);
-        }
-        IDistributedSuperFile *super = file->querySuperFile();
-        unsigned numsubs = super?super->numSubFiles(true):0;
-        if (0 != (helper->getFlags() & TDRfilenamecallback)) // only get/serialize if using virtual file name fields
-        {
-            for (unsigned s=0; s<numsubs; s++)
+            IDistributedSuperFile *super = file->querySuperFile();
+            unsigned numsubs = super?super->numSubFiles(true):0;
+            if (0 != (helper->getFlags() & TDRfilenamecallback)) // only get/serialize if using virtual file name fields
             {
-                IDistributedFile &subfile = super->querySubFile(s, true);
-                subfileLogicalFilenames.append(subfile.queryLogicalName());
+                subfileLogicalFilenames.kill();
+                for (unsigned s=0; s<numsubs; s++)
+                {
+                    IDistributedFile &subfile = super->querySubFile(s, true);
+                    subfileLogicalFilenames.append(subfile.queryLogicalName());
+                }
             }
-        }
-        if (0==(helper->getFlags() & TDXtemporary))
-        {
-            for (unsigned i=0; i<numsubs; i++)
-                subFileStats.push_back(new CThorStatsCollection(diskReadRemoteStatistics));
-        }
-        void *ekey;
-        size32_t ekeylen;
-        helper->getEncryptKey(ekeylen,ekey);
-        bool encrypted = fileDesc->queryProperties().getPropBool("@encrypted");
-        if (0 != ekeylen)
-        {
-            memset(ekey,0,ekeylen);
-            free(ekey);
-            if (!encrypted)
+            if (0==(helper->getFlags() & TDXtemporary))
+            {
+                /* JCS->SHAMSER - kludge for now, don't add more than max
+                 * But it means updateFileReadCostStats will not be querying the correct files,
+                 * if the file varies per CQ execution (see other notes in updateFileReadCostStats)
+                 */
+                for (unsigned i=subFileStats.size(); i<numsubs; i++)
+                    subFileStats.push_back(new CThorStatsCollection(diskReadRemoteStatistics));
+            }
+            void *ekey;
+            size32_t ekeylen;
+            helper->getEncryptKey(ekeylen,ekey);
+            bool encrypted = fileDesc->queryProperties().getPropBool("@encrypted");
+            if (0 != ekeylen)
             {
-                Owned<IException> e = MakeActivityWarning(&container, TE_EncryptionMismatch, "Ignoring encryption key provided as file '%s' was not published as encrypted", fileName.get());
-                queryJobChannel().fireException(e);
+                memset(ekey,0,ekeylen);
+                free(ekey);
+                if (!encrypted)
+                {
+                    Owned<IException> e = MakeActivityWarning(&container, TE_EncryptionMismatch, "Ignoring encryption key provided as file '%s' was not published as encrypted", fileName.get());
+                    queryJobChannel().fireException(e);
+                }
             }
+            else if (encrypted)
+                throw MakeActivityException(this, 0, "File '%s' was published as encrypted but no encryption key provided", fileName.get());
         }
-        else if (encrypted)
-            throw MakeActivityException(this, 0, "File '%s' was published as encrypted but no encryption key provided", fileName.get());
     }
 }
 
+void CDiskReadMasterBase::kill()
+{
+    CMasterActivity::kill();
+    file.clear();
+}
+
 void CDiskReadMasterBase::serializeSlaveData(MemoryBuffer &dst, unsigned slave)
 {
     dst.append(fileName);

+ 2 - 0
thorlcr/activities/thdiskbase.ipp

@@ -29,6 +29,7 @@ class CDiskReadMasterBase : public CMasterActivity
 protected:
     StringArray subfileLogicalFilenames;
     Owned<IFileDescriptor> fileDesc;
+    Owned<IDistributedFile> file;
     Owned<CSlavePartMapping> mapping;
     IHash *hash;
     StringAttr fileName;
@@ -37,6 +38,7 @@ protected:
 public:
     CDiskReadMasterBase(CMasterGraphElement *info);
     virtual void init() override;
+    virtual void kill() override;
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave) override;
     virtual void done() override;
     virtual void validateFile(IDistributedFile *file) { }

+ 102 - 39
thorlcr/graph/thgraphmaster.cpp

@@ -388,13 +388,6 @@ CMasterActivity::~CMasterActivity()
     delete [] data;
 }
 
-void CMasterActivity::addReadFile(IDistributedFile *file, bool temp)
-{
-    readFiles.append(*LINK(file));
-    if (!temp) // NB: Temps not listed in workunit
-        queryThorFileManager().noteFileRead(container.queryJob(), file);
-}
-
 IDistributedFile *CMasterActivity::queryReadFile(unsigned f)
 {
     if (f>=readFiles.ordinality())
@@ -402,20 +395,31 @@ IDistributedFile *CMasterActivity::queryReadFile(unsigned f)
     return &readFiles.item(f);
 }
 
-void CMasterActivity::preStart(size32_t parentExtractSz, const byte *parentExtract)
+IDistributedFile *CMasterActivity::findReadFile(const char *lfnName)
+{
+    auto it = readFilesMap.find(lfnName);
+    if (it != readFilesMap.end())
+        return LINK(it->second);
+    return nullptr;
+}
+
+IDistributedFile *CMasterActivity::lookupReadFile(const char *lfnName, bool jobTemp, bool temp, bool opt)
 {
-    CActivityBase::preStart(parentExtractSz, parentExtract);
-    IArrayOf<IDistributedFile> tmpFiles;
-    tmpFiles.swapWith(readFiles);
-    ForEachItemIn(f, tmpFiles)
+    StringBuffer normalizedFileName;
+    queryThorFileManager().addScope(container.queryJob(), lfnName, normalizedFileName, jobTemp|temp);
+    Owned<IDistributedFile> file = findReadFile(normalizedFileName);
+    if (!file)
     {
-        IDistributedFile &file = tmpFiles.item(f);
-        IDistributedSuperFile *super = file.querySuperFile();
-        if (super)
-            getSuperFileSubs(super, readFiles, true);
-        else
-            readFiles.append(*LINK(&file));
+        file.setown(queryThorFileManager().lookup(container.queryJob(), lfnName, jobTemp|temp, opt, true, container.activityIsCodeSigned()));
+        if (file)
+        {
+            readFiles.append(*LINK(file));
+            readFilesMap[normalizedFileName.str()] = file;
+            if (!temp) // NB: Temps not listed in workunit
+                queryThorFileManager().noteFileRead(container.queryJob(), file);
+        }
     }
+    return file.getClear();
 }
 
 MemoryBuffer &CMasterActivity::queryInitializationData(unsigned slave) const
@@ -455,7 +459,16 @@ void CMasterActivity::threadmain()
 
 void CMasterActivity::init()
 {
-    readFiles.kill();
+    // Files are added to readFiles during initialization,
+    // If this is a CQ query act., then it will be repeatedly re-initialized if it depends on master context,
+    // e.g. due to a variable filename.
+    // Therefore, do not clear readFiles on reinitialization, to avoid repeatedly [expensively] looking them up.
+    bool inCQ = container.queryOwner().queryOwner() && !container.queryOwner().isGlobal();
+    if (!inCQ)
+    {
+        readFiles.kill();
+        readFilesMap.clear();
+    }
 }
 
 void CMasterActivity::startProcess(bool async)
@@ -480,6 +493,7 @@ void CMasterActivity::kill()
 {
     CActivityBase::kill();
     readFiles.kill();
+    readFilesMap.clear();
 }
 
 bool CMasterActivity::fireException(IException *_e)
@@ -542,35 +556,84 @@ void CMasterActivity::getEdgeStats(IStatisticGatherer & stats, unsigned idx)
 void CMasterActivity::done()
 {
     CActivityBase::done();
-    ForEachItemIn(s, readFiles)
+    if (readFiles.ordinality())
     {
-        IDistributedFile &file = readFiles.item(s);
-        file.setAccessed();
+        IArrayOf<IDistributedFile> tmpFiles;
+        ForEachItemIn(f, readFiles)
+        {
+            IDistributedFile &file = readFiles.item(f);
+            IDistributedSuperFile *super = file.querySuperFile();
+            if (super)
+            {
+                getSuperFileSubs(super, tmpFiles, true);
+                tmpFiles.append(*LINK(&file));
+            }
+            else
+                tmpFiles.append(*LINK(&file));
+        }
+        ForEachItemIn(s, tmpFiles)
+        {
+            IDistributedFile &file = tmpFiles.item(s);
+            file.setAccessed();
+        }
     }
 }
 
 void CMasterActivity::updateFileReadCostStats(std::vector<OwnedPtr<CThorStatsCollection>> & subFileStats)
 {
-    if (!subFileStats.empty())
-    {
-        unsigned numSubFiles = subFileStats.size();
-        for (unsigned i=0; i<numSubFiles; i++)
-        {
-            IDistributedFile *file = queryReadFile(i);
-            if (file)
+    /* JCSMORE->SHAMSER: (separate JIRA needed)
+     * there can be >1 read file if this act. is in a child query/loop, it could be processing a different file per iteration,
+     * meaning there could be an arbitrary number of readfiles, in that case activity::init would be called multiple times.
+     * 
+     * I hit an assert during testing reading a super in a CQ:
+        libjlib.so!raiseAssertException(const char * assertion, const char * file, unsigned int line) (\home\jsmith\git\HPCC-Platform\system\jlib\jexcept.cpp:660)
+        libjlib.so!MemoryBuffer::read(MemoryBuffer * const this, unsigned char & value) (\home\jsmith\git\HPCC-Platform\system\jlib\jbuff.cpp:693)
+        libjlib.so!MemoryBuffer::readPacked(MemoryBuffer * const this) (\home\jsmith\git\HPCC-Platform\system\jlib\jbuff.cpp:813)
+        libjlib.so!MemoryBuffer::readPacked(MemoryBuffer * const this, unsigned int & value) (\home\jsmith\git\HPCC-Platform\system\jlib\jbuff.cpp:824)
+        libjlib.so!CRuntimeStatisticCollection::deserialize(CRuntimeStatisticCollection * const this, MemoryBuffer & in) (\home\jsmith\git\HPCC-Platform\system\jlib\jstats.cpp:2524)
+        libactivitymasters_lcr.so!CThorStatsCollection::deserialize(CThorStatsCollection * const this, unsigned int node, MemoryBuffer & mb) (\home\jsmith\git\HPCC-Platform\thorlcr\graph\thgraphmaster.ipp:53)
+        libactivitymasters_lcr.so!CDiskReadMasterBase::deserializeStats(CDiskReadMasterBase * const this, unsigned int node, MemoryBuffer & mb) (\home\jsmith\git\HPCC-Platform\thorlcr\activities\thdiskbase.cpp:139)
+        libgraphmaster_lcr.so!CMasterGraph::deserializeStats(CMasterGraph * const this, unsigned int node, MemoryBuffer & mb) (\home\jsmith\git\HPCC-Platform\thorlcr\graph\thgraphmaster.cpp:2781)
+     *
+     * (would be a crash in a Release build)
+     * it's because the diskread init is adding new CThorStatsCollection per init (per execution of the CQ),
+     * which means when deserializing there are too many. The code assumes there is only 1 file being read.
+     * 
+     * I've temporarily changed the code where subFileStats's are added, to prevent more being added per iteration,
+     * but it needs re-thinking to handle the workers potentially dealing with different logical files
+     * (+ index read to handle super files, and case where act. is reading >1 file, i.e. KJ)
+     * I've changed this code rely on the 1st readFiles for now (not the # of subFileStats, which may be more)
+     * NB: also changed when the expansion of readFiles (from supers to subfiles) happens, a new super was being added
+     * each CQ iteration and re-expanded, meaing readFiles kept growing.
+     * 
+     * Also, superkey1.ecl hits a dbgasserex whilst deserializing stats (before and after these PR changes),
+     * but is caught/ignored. I haven't investigated further.
+     */
+
+    IDistributedFile *file = queryReadFile(0);
+    if (file)
+    {
+        IDistributedSuperFile *super = file->querySuperFile();
+        if (super)
+        {       
+            unsigned numSubFiles = super->numSubFiles(true); //subFileStats.size();
+            if (subFileStats.size())
             {
-                stat_type numDiskReads = subFileStats[i]->getStatisticSum(StNumDiskReads);
-                StringBuffer clusterName;
-                file->getClusterName(0, clusterName);
-                diskAccessCost += money2cost_type(calcFileAccessCost(clusterName, 0, numDiskReads));
-                file->addAttrValue("@numDiskReads", numDiskReads);
+                assertex(numSubFiles <= subFileStats.size());
+                for (unsigned i=0; i<subFileStats.size(); i++)
+                {
+                    IDistributedFile &subFile = super->querySubFile(i, true);
+                    const char *subName = subFile.queryLogicalName();
+                    PROGLOG("subName = %s", subName);
+                    stat_type numDiskReads = subFileStats[i]->getStatisticSum(StNumDiskReads);
+                    StringBuffer clusterName;
+                    subFile.getClusterName(0, clusterName);
+                    diskAccessCost += money2cost_type(calcFileAccessCost(clusterName, 0, numDiskReads));
+                    subFile.addAttrValue("@numDiskReads", numDiskReads);
+                }
             }
         }
-    }
-    else
-    {
-        IDistributedFile *file = queryReadFile(0);
-        if (file)
+        else
         {
             stat_type numDiskReads = statsCollection.getStatisticSum(StNumDiskReads);
             StringBuffer clusterName;

+ 5 - 2
thorlcr/graph/thgraphmaster.ipp

@@ -18,6 +18,8 @@
 #ifndef _THGRAPHMASTER_IPP
 #define _THGRAPHMASTER_IPP
 
+#include <unordered_map>
+
 #include "jmisc.hpp"
 #include "jsuperhash.hpp"
 #include "workunit.hpp"
@@ -238,6 +240,7 @@ class graphmaster_decl CMasterActivity : public CActivityBase, implements IThrea
     MemoryBuffer *data;
     CriticalSection progressCrit;
     IArrayOf<IDistributedFile> readFiles;
+    std::unordered_map<std::string, IDistributedFile *> readFilesMap; // NB: IDistributedFile pointers are owned by readFiles
 
 protected:
     std::vector<OwnedPtr<CThorEdgeCollection>> edgeStatsVector;
@@ -245,8 +248,9 @@ protected:
     IBitSet *notedWarnings;
     cost_type diskAccessCost = 0;
 
-    void addReadFile(IDistributedFile *file, bool temp=false);
     IDistributedFile *queryReadFile(unsigned f);
+    IDistributedFile *findReadFile(const char *lfnName);
+    IDistributedFile *lookupReadFile(const char *lfnName, bool jobTemp, bool temp, bool opt);
     void updateFileReadCostStats(std::vector<OwnedPtr<CThorStatsCollection>> & subFileStats);
     void updateFileWriteCostStats(IFileDescriptor & fileDesc, IPropertyTree &props, stat_type numDiskWrites);
     virtual void process() { }
@@ -269,7 +273,6 @@ public:
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave) { }
     virtual void slaveDone(size32_t slaveIdx, MemoryBuffer &mb) { }
 
-    virtual void preStart(size32_t parentExtractSz, const byte *parentExtract);
     virtual void startProcess(bool async=true);
     virtual bool wait(unsigned timeout);
     virtual void done();