Browse Source

HPCC-26010 Write numDiskReads statistic for index files

Signed-off-by: Shamser Ahmed <shamser.ahmed@lexisnexis.com>
Shamser Ahmed 4 năm trước cách đây
mục cha
commit
f9736c7b79

+ 2 - 2
system/jlib/jstats.cpp

@@ -1268,8 +1268,8 @@ const StatisticsMapping allStatistics(StKindAll);
 const StatisticsMapping heapStatistics({StNumAllocations, StNumAllocationScans});
 const StatisticsMapping diskLocalStatistics({StCycleDiskReadIOCycles, StSizeDiskRead, StNumDiskReads, StCycleDiskWriteIOCycles, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries});
 const StatisticsMapping diskRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries});
-const StatisticsMapping diskReadRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StNumDiskRetries});
-const StatisticsMapping diskWriteRemoteStatistics({StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries});
+const StatisticsMapping diskReadRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StNumDiskRetries, StCycleDiskReadIOCycles});
+const StatisticsMapping diskWriteRemoteStatistics({StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries, StCycleDiskWriteIOCycles});
 
 //--------------------------------------------------------------------------------------------------------------------
 

+ 33 - 0
thorlcr/activities/indexread/thindexread.cpp

@@ -36,6 +36,7 @@ protected:
     bool localKey = false;
     bool partitionKey = false;
     StringBuffer fileName;
+    std::vector<OwnedPtr<CThorStatsCollection>> subIndexFileStats;
 
     rowcount_t aggregateToLimit()
     {
@@ -239,6 +240,12 @@ public:
                 IDistributedFile *sub = super ? &super->querySubFile(0,true) : index.get();
                 if (sub && 1 == sub->numParts())
                     nofilter = true;
+                if (super)
+                {
+                    unsigned numSubFiles = super->numSubFiles();
+                    for (unsigned i=0; i<numSubFiles; i++)
+                        subIndexFileStats.push_back(new CThorStatsCollection(indexReadActivityStatistics));
+                }
             }
             //MORE: Change index getFormatCrc once we support projected rows for indexes.
             checkFormatCrc(this, index, indexBaseHelper->getDiskFormatCrc(), indexBaseHelper->queryDiskRecordSize(), indexBaseHelper->getProjectedFormatCrc(), indexBaseHelper->queryProjectedDiskRecordSize(), true);
@@ -284,6 +291,32 @@ public:
         CMasterActivity::abort();
         cancelReceiveMsg(RANK_ALL, mpTag);
     }
+    virtual void deserializeStats(unsigned node, MemoryBuffer &mb) override
+    {
+        CMasterActivity::deserializeStats(node, mb);
+        for (auto &indexFileStats: subIndexFileStats)
+            indexFileStats->deserialize(node, mb);
+    }
+    virtual void done() override
+    {
+        if (!subIndexFileStats.empty())
+        {
+            unsigned numSubFiles = subIndexFileStats.size();
+            for (unsigned i=0; i<numSubFiles; i++)
+            {
+                IDistributedFile *file = queryReadFile(i);
+                if (file)
+                    file->addAttrValue("@numDiskReads", subIndexFileStats[i]->getStatisticSum(StNumDiskReads));
+            }
+        }
+        else
+        {
+            IDistributedFile *file = queryReadFile(0);
+            if (file)
+                file->addAttrValue("@numDiskReads", statsCollection.getStatisticSum(StNumDiskReads));
+        }
+        CMasterActivity::done();
+    }
 };
 
 class CIndexReadActivityMaster : public CIndexReadBase

+ 38 - 2
thorlcr/activities/indexread/thindexreadslave.cpp

@@ -75,6 +75,8 @@ protected:
     rowcount_t keyedProcessed = 0;
     rowcount_t rowLimit = RCMAX;
     bool useRemoteStreaming = false;
+    Owned<IFileIO> lazyIFileIO;
+    std::vector<OwnedPtr<CRuntimeStatisticCollection>> subIndexFileStats;
 
     template<class StatProvider>
     class CCaptureIndexStats
@@ -299,7 +301,7 @@ public:
 
                 // local key handling
 
-                Owned<IFileIO> lazyIFileIO = queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part);
+                lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadActivityStatistics));
 
                 RemoteFilename rfn;
                 part.getFilename(0, rfn);
@@ -351,10 +353,31 @@ public:
         else
             return nullptr;
     }
+    void mergeSubFileStats(IPartDescriptor *partDesc, IFileIO *partIO)
+    {
+        if (subIndexFileStats.size()>0)
+        {
+            ISuperFileDescriptor * superFDesc = partDesc->queryOwner().querySuperFileDescriptor();
+            dbgassertex(superFDesc);
+            unsigned subfile, lnum;
+            if(superFDesc->mapSubPart(partDesc->queryPartIndex(), subfile, lnum))
+                mergeStats(*subIndexFileStats[subfile], partIO);
+        }
+    }
+    void updateStats()
+    {
+        if (lazyIFileIO)
+        {
+            mergeStats(stats, lazyIFileIO);
+            if (currentPart<partDescs.ordinality())
+                mergeSubFileStats(&partDescs.item(currentPart), lazyIFileIO);
+        }
+    }
     void configureNextInput()
     {
         if (currentManager)
         {
+            updateStats();
             resetManager(currentManager);
             currentManager = nullptr;
         }
@@ -579,7 +602,13 @@ public:
         {
             IPartDescriptor &part0 = partDescs.item(0);
             IFileDescriptor &fileDesc = part0.queryOwner();
-
+            ISuperFileDescriptor *super = fileDesc.querySuperFileDescriptor();
+            if (super)
+            {
+                unsigned numSubFiles = super->querySubFiles();
+                for (unsigned i=0; i<numSubFiles; i++)
+                    subIndexFileStats.push_back(new CRuntimeStatisticCollection(indexReadActivityStatistics));
+            }
             if ((0 == (helper->getFlags() & TIRusesblob)) && !localMerge)
             {
                 if (!inChildQuery())
@@ -690,6 +719,13 @@ public:
     {
         stats.setStatistic(StNumRowsProcessed, progress);
         PARENT::serializeStats(mb);
+        for (auto &indexFileStats: subIndexFileStats)
+            indexFileStats->serialize(mb);
+    }
+    virtual void done() override
+    {
+        updateStats();
+        PARENT::done();
     }
 };
 

+ 2 - 1
thorlcr/graph/thgraph.hpp

@@ -62,6 +62,7 @@
 #include "eclhelper.hpp"
 
 #include "thorplugin.hpp"
+#include "jstats.h"
 
 #define THORDATALINK_STOPPED            (RCMAX&~(RCMAX>>1))                         // dataLinkStop() was called
 #define THORDATALINK_STARTED            (RCMAX&~THORDATALINK_STOPPED&~(RCMAX>>2))   // dataLinkStart() was called
@@ -1198,7 +1199,7 @@ interface IExpander;
 interface IThorFileCache : extends IInterface
 {
     virtual bool remove(const char *filename, unsigned crc) = 0;
-    virtual IFileIO *lookupIFileIO(CActivityBase &activity, const char *logicalFilenae, IPartDescriptor &partDesc, IExpander *expander=nullptr) = 0;
+    virtual IFileIO *lookupIFileIO(CActivityBase &activity, const char *logicalFilenae, IPartDescriptor &partDesc, IExpander *expander=nullptr, const StatisticsMapping & _statMapping=diskLocalStatistics) = 0;
 };
 
 class graph_decl CThorResourceBase : implements IThorResource, public CInterface

+ 4 - 4
thorlcr/graph/thgraphslave.cpp

@@ -1985,8 +1985,8 @@ class CLazyFileIO : public CInterfaceOf<IFileIO>
         return iFileIO.getClear();
     }
 public:
-    CLazyFileIO(CFileCache &_cache, const char *_filename, const char *_id, IActivityReplicatedFile *_repFile, bool _compressed, IExpander *_expander)
-        : cache(_cache), filename(_filename), id(_id), repFile(_repFile), compressed(_compressed), expander(_expander), fileStats(diskLocalStatistics)
+    CLazyFileIO(CFileCache &_cache, const char *_filename, const char *_id, IActivityReplicatedFile *_repFile, bool _compressed, IExpander *_expander, const StatisticsMapping & _statMapping=diskLocalStatistics)
+        : cache(_cache), filename(_filename), id(_id), repFile(_repFile), compressed(_compressed), expander(_expander), fileStats(_statMapping)
     {
     }
     virtual void beforeDispose() override;
@@ -2119,7 +2119,7 @@ public:
         CriticalBlock b(crit);
         return _remove(id);
     }
-    virtual IFileIO *lookupIFileIO(CActivityBase &activity, const char *logicalFilename, IPartDescriptor &partDesc, IExpander *expander) override
+    virtual IFileIO *lookupIFileIO(CActivityBase &activity, const char *logicalFilename, IPartDescriptor &partDesc, IExpander *expander, const StatisticsMapping & _statMapping) override
     {
         StringBuffer filename;
         RemoteFilename rfn;
@@ -2135,7 +2135,7 @@ public:
         {
             Owned<IActivityReplicatedFile> repFile = createEnsurePrimaryPartFile(logicalFilename, &partDesc);
             bool compressed = partDesc.queryOwner().isCompressed();
-            file = new CLazyFileIO(*this, filename, id, repFile.getClear(), compressed, expander);
+            file = new CLazyFileIO(*this, filename, id, repFile.getClear(), compressed, expander, _statMapping);
             files.replace(* file); // NB: files does not own 'file', CLazyFileIO will remove itself from cache on destruction
 
             /* NB: there will be 1 CLazyFileIO per physical file part name

+ 1 - 1
thorlcr/thorutil/thormisc.cpp

@@ -75,7 +75,7 @@ const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed,
 const StatisticsMapping basicActivityStatistics({StTimeLocalExecute, StTimeBlocked});
 const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics);
 const StatisticsMapping hashJoinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics);
-const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed, StNumIndexSeeks, StNumIndexScans, StNumPostFiltered, StNumIndexWildSeeks}, basicActivityStatistics);
+const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed, StNumIndexSeeks, StNumIndexScans, StNumPostFiltered, StNumIndexWildSeeks}, diskReadRemoteStatistics, basicActivityStatistics);
 const StatisticsMapping indexWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics);
 const StatisticsMapping keyedJoinActivityStatistics({ StNumIndexSeeks, StNumIndexScans, StNumIndexAccepted, StNumPostFiltered, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected, StNumIndexWildSeeks}, basicActivityStatistics);
 const StatisticsMapping loopActivityStatistics({StNumIterations}, basicActivityStatistics);