Procházet zdrojové kódy

Merge pull request #15189 from shamser/issue26224

HPCC-26224 Generate numDiskReads for fetch

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman před 3 roky
rodič
revize
b1161a187a

+ 35 - 2
thorlcr/activities/fetch/thfetch.cpp

@@ -20,6 +20,7 @@
 #include "thbufdef.hpp"
 #include "mptag.hpp"
 #include "dadfs.hpp"
+#include "jstats.h"
 #include "thexception.hpp"
 
 #include "../hashdistrib/thhashdistrib.ipp"
@@ -30,12 +31,13 @@ class CFetchActivityMaster : public CMasterActivity
     Owned<CSlavePartMapping> mapping;
     MemoryBuffer offsetMapMb;
     SocketEndpoint *endpoints;
+    std::vector<OwnedPtr<CThorStatsCollection>> subFileStats;
 
 protected:
     IHThorFetchArg *helper;
 
 public:
-    CFetchActivityMaster(CMasterGraphElement *info) : CMasterActivity(info)
+    CFetchActivityMaster(CMasterGraphElement *info) : CMasterActivity(info, diskReadActivityStatistics)
     {
         endpoints = NULL;
         if (!container.queryLocalOrGrouped())
@@ -73,7 +75,12 @@ public:
             }
             else if (encrypted)
                 throw MakeActivityException(this, 0, "File '%s' was published as encrypted but no encryption key provided", fetchFile->queryLogicalName());
-            mapping.setown(getFileSlaveMaps(fetchFile->queryLogicalName(), *fileDesc, container.queryJob().queryUserDescriptor(), container.queryJob().querySlaveGroup(), container.queryLocalOrGrouped(), false, NULL, fetchFile->querySuperFile()));
+            IDistributedSuperFile *super = fetchFile->querySuperFile();
+            unsigned numsubs = super?super->numSubFiles(true):0;
+            for (unsigned i=0; i<numsubs; i++)
+                subFileStats.push_back(new CThorStatsCollection(diskReadActivityStatistics));
+
+            mapping.setown(getFileSlaveMaps(fetchFile->queryLogicalName(), *fileDesc, container.queryJob().queryUserDescriptor(), container.queryJob().querySlaveGroup(), container.queryLocalOrGrouped(), false, NULL, super));
             mapping->serializeFileOffsetMap(offsetMapMb);
             addReadFile(fetchFile);
         }
@@ -93,6 +100,32 @@ public:
         if (!container.queryLocalOrGrouped())
             dst.append((int)mpTag);
     }
+    virtual void deserializeStats(unsigned node, MemoryBuffer &mb) override
+    {
+        CMasterActivity::deserializeStats(node, mb);
+        for (auto &stats: subFileStats)
+            stats->deserialize(node, mb);
+    }
+    virtual void done() override
+    {
+        if (!subFileStats.empty())
+        {
+            unsigned numSubFiles = subFileStats.size();
+            for (unsigned i=0; i<numSubFiles; i++)
+            {
+                IDistributedFile *file = queryReadFile(i);
+                if (file)
+                    file->addAttrValue("@numDiskReads", subFileStats[i]->getStatisticSum(StNumDiskReads));
+            }
+        }
+        else
+        {
+            IDistributedFile *file = queryReadFile(0);
+            if (file)
+                file->addAttrValue("@numDiskReads", statsCollection.getStatisticSum(StNumDiskReads));
+        }
+        CMasterActivity::done();
+    }
 };
 
 class CCsvFetchActivityMaster : public CFetchActivityMaster

+ 44 - 2
thorlcr/activities/fetch/thfetchslave.cpp

@@ -159,7 +159,7 @@ public:
                 e->base = part.queryProperties().getPropInt64("@offset");
                 e->top = e->base + part.queryProperties().getPropInt64("@size");
                 e->index = f;
-                e->file = queryThor().queryFileCache().lookupIFileIO(owner, logicalFilename, part); // NB: freed by FPosTableEntryIFileIO dtor
+                e->file = queryThor().queryFileCache().lookupIFileIO(owner, logicalFilename, part, nullptr, diskReadActivityStatistics); // NB: freed by FPosTableEntryIFileIO dtor
             }
         }
     }
@@ -262,6 +262,32 @@ public:
         }
         return NULL;
     }
+    virtual void getFileStats(CRuntimeStatisticCollection & stats) override
+    {
+        for (unsigned f=0; f<files; f++)
+        {
+            IFileIO *file = fPosMultiPartTable[f].file;
+            mergeStats(stats, file);
+        }
+    }
+    virtual void getSubFileStats(std::vector<OwnedPtr<CRuntimeStatisticCollection>> & subFileStats) override
+    {
+        if (subFileStats.size()>0)
+        {
+            ISuperFileDescriptor *super = parts.item(0).queryOwner().querySuperFileDescriptor();
+            dbgassertex(super);
+            for (unsigned f=0; f<files; f++)
+            {
+                IPartDescriptor &part = parts.item(f);
+                unsigned subfile, lnum;
+                if(super->mapSubPart(part.queryPartIndex(), subfile, lnum))
+                {
+                    IFileIO *file = fPosMultiPartTable[f].file;
+                    mergeStats(*subFileStats[subfile], file);
+                }
+            }
+        }
+    }
 };
 
 
@@ -281,6 +307,7 @@ class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler
     MemoryBuffer offsetMapBytes;
     Owned<IExpander> eexp;
     Owned<IEngineRowAllocator> keyRowAllocator;
+    std::vector<OwnedPtr<CRuntimeStatisticCollection>> subFileStats;
 
 protected:
     Owned<IThorRowInterfaces> fetchDiskRowIf;
@@ -299,7 +326,7 @@ protected:
 public:
     IMPLEMENT_IINTERFACE_USING(CSlaveActivity);
 
-    CFetchSlaveBase(CGraphElementBase *_container) : CSlaveActivity(_container)
+    CFetchSlaveBase(CGraphElementBase *_container) : CSlaveActivity(_container, diskReadActivityStatistics)
     {
         fetchBaseHelper = (IHThorFetchBaseArg *)queryHelper();
         reInit = 0 != (fetchBaseHelper->getFetchFlags() & (FFvarfilename|FFdynamicfilename));
@@ -359,6 +386,10 @@ public:
                     prefetchers.append(prefetcher.getClear());
                 }
             }
+            ISuperFileDescriptor *super = parts.item(0).queryOwner().querySuperFileDescriptor();
+            if (super)
+                for (unsigned i=0; i<files; i++)
+                    subFileStats.push_back(new CRuntimeStatisticCollection(diskReadActivityStatistics));
         }
 
         unsigned encryptedKeyLen;
@@ -541,6 +572,17 @@ public:
             return fpos;
         }
     }
+    virtual void serializeStats(MemoryBuffer &mb) override
+    {
+        if (fetchStream)
+        {
+            fetchStream->getFileStats(stats);
+            fetchStream->getSubFileStats(subFileStats);
+        }
+        PARENT::serializeStats(mb);
+        for (auto &stats: subFileStats)
+            stats->serialize(mb);
+    }
     virtual void onLimitExceeded() = 0;
 };
 

+ 2 - 0
thorlcr/activities/fetch/thfetchslave.ipp

@@ -37,6 +37,8 @@ interface IFetchStream : extends IInterface
     virtual IFileIO *getPartIO(unsigned part) = 0;
     virtual StringBuffer &getPartName(unsigned part, StringBuffer &out) = 0;
     virtual void abort() = 0;
+    virtual void getFileStats(CRuntimeStatisticCollection & stats) = 0;
+    virtual void getSubFileStats(std::vector<OwnedPtr<CRuntimeStatisticCollection>> & subFileStats) = 0;
 };
 
 IFetchStream *createFetchStream(CSlaveActivity &owner, IThorRowInterfaces *keyRowIf, IThorRowInterfaces *fetchRowIf, bool &abortSoon, const char *logicalFilename, CPartDescriptorArray &parts, unsigned offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *iFetchHandler, mptag_t tag, IExpander *eexp=NULL);