浏览代码

HPCC-26717 hThor file costs (all except index reads)

Signed-off-by: Shamser Ahmed <shamser.ahmed@lexisnexis.com>
Shamser Ahmed 3 年之前
父节点
当前提交
63a7e9661e
共有 3 个文件被更改,包括 98 次插入25 次删除
  1. 12 8
      ecl/eclagent/eclgraph.cpp
  2. 75 16
      ecl/hthor/hthor.cpp
  3. 11 1
      ecl/hthor/hthor.ipp

+ 12 - 8
ecl/eclagent/eclgraph.cpp

@@ -887,18 +887,20 @@ void EclSubGraph::updateProgress()
             subgraphid.append(parent.queryGraphName()).append(":").append(SubGraphScopePrefix).append(id);
             if (startGraphTime)
                 parent.updateWUStatistic(lockedwu, SSTsubgraph, subgraphid, StWhenStarted, nullptr, startGraphTime);
+            StringBuffer scope;
+            scope.append(WorkflowScopePrefix).append(parent.queryWfid()).append(":").append(subgraphid);
             if (elapsedGraphCycles)
             {
                 unsigned __int64 elapsedTime = cycle_to_nanosec(elapsedGraphCycles);
                 parent.updateWUStatistic(lockedwu, SSTsubgraph, subgraphid, StTimeElapsed, nullptr, elapsedTime);
                 const cost_type cost = money2cost_type(calcCost(agent->queryAgentMachineCost(), nanoToMilli(elapsedTime)));
                 if (cost)
-                {
-                    StringBuffer scope;
-                    scope.append(WorkflowScopePrefix).append(parent.queryWfid()).append(":").append(subgraphid);
                     lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
-                }
             }
+            Owned<IStatisticCollection> statsCollection = stats.getResult();
+            const cost_type costDiskAccess = aggregateStatistic(StCostFileAccess, statsCollection) ;
+            if (costDiskAccess)
+                lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace);
         }
     }
 }
@@ -1268,13 +1270,15 @@ void EclGraph::execute(const byte * parentExtract)
             unsigned __int64 elapsedNs = milliToNano(elapsed);
             updateWorkunitStat(wu, SSTgraph, queryGraphName(), StTimeElapsed, description.str(), elapsedNs, wfid);
 
+            StringBuffer scope;
+            scope.append(WorkflowScopePrefix).append(wfid).append(":").append(queryGraphName());
             const cost_type cost = money2cost_type(calcCost(agent->queryAgentMachineCost(), elapsed));
             if (cost)
-            {
-                StringBuffer scope;
-                scope.append(WorkflowScopePrefix).append(wfid).append(":").append(queryGraphName());
                 wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
-            }
+
+            const cost_type costDiskAccess = aggregateDiskAccessCost(wu, scope);
+            if (costDiskAccess)
+                wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace);
         }
 
         if (agent->queryRemoteWorkunit())

+ 75 - 16
ecl/hthor/hthor.cpp

@@ -446,6 +446,7 @@ void CHThorDiskWriteActivity::stop()
     close();
     if((helper.getFlags() & (TDXtemporary | TDXjobtemp) ) == 0 && !agent.queryResolveFilesLocally())
         publish();
+    io.clear();
     incomplete = false;
     if(clusterHandler)
         clusterHandler->finish(file);
@@ -568,7 +569,6 @@ void CHThorDiskWriteActivity::open()
         encrypted = true;
         blockcompressed = true;
     }
-    Owned<IFileIO> io;
     if(blockcompressed)
         io.setown(createCompressedFileWriter(file, groupedMeta->getFixedSize(), extend, true, ecomp, COMPRESS_METHOD_LZW));
     else
@@ -576,7 +576,7 @@ void CHThorDiskWriteActivity::open()
     if(!io)
         throw MakeStringException(errno, "Failed to create%s file %s for writing", (encrypted ? " encrypted" : (blockcompressed ? " compressed" : "")), filename.get());
     incomplete = true;
-    
+
     diskout.setown(createBufferedIOStream(io));
     if(extend)
         diskout->seek(0, IFSend);
@@ -659,9 +659,9 @@ void CHThorDiskWriteActivity::publish()
     desc->setDefaultDir(dir.str());
 
     Owned<IPropertyTree> attrs;
-    if(clusterHandler) 
+    if(clusterHandler)
         attrs.setown(createPTree("Part")); // clusterHandler is going to set attributes
-    else 
+    else
     {
         // add cluster
         StringBuffer mygroupname;
@@ -740,6 +740,11 @@ void CHThorDiskWriteActivity::publish()
     if (helper.getFlags() & TDWrestricted)
         properties.setPropBool("restricted", true);
 
+    if (io)
+    {
+        numDiskWrites = io->getStatistic(StNumDiskWrites);
+        properties.setPropInt64("@numDiskWrites", numDiskWrites);
+    }
     StringBuffer lfn;
     expandLogicalFilename(lfn, mangledHelperFileName.str(), agent.queryWorkUnit(), agent.queryResolveFilesLocally(), false);
     CDfsLogicalFileName logicalName;
@@ -752,11 +757,26 @@ void CHThorDiskWriteActivity::publish()
         Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(desc);
         if(file->getModificationTime(modifiedTime))
             file->setAccessedTime(modifiedTime);
+        if ((helper.getFlags() & TDXtemporary) == 0)
+        {
+            StringBuffer clusterName;
+            file->getClusterName(0, clusterName);
+            diskAccessCost = money2cost_type(calcFileCost(clusterName, 0, 0, numDiskWrites, 0));
+        }
         file->attach(logicalName.get(), agent.queryCodeContext()->queryUserDescriptor());
         agent.logFileAccess(file, "HThor", "CREATED", graph);
     }
 }
 
+void CHThorDiskWriteActivity::updateProgress(IStatisticGatherer &progress) const
+{
+    CHThorActivityBase::updateProgress(progress);
+    StatsActivityScope scope(progress, activityId);
+    progress.addStatistic(StNumDiskWrites, numDiskWrites);
+    if ((helper.getFlags() & TDXtemporary) == 0)
+        progress.addStatistic(StCostFileAccess, diskAccessCost);
+}
+
 void CHThorDiskWriteActivity::updateWorkUnitResult(unsigned __int64 reccount)
 {
     if(lfn.length()) //this is required as long as temp files don't get a name which can be stored in the WU and automatically deleted by the WU
@@ -781,14 +801,14 @@ void CHThorDiskWriteActivity::updateWorkUnitResult(unsigned __int64 reccount)
                 fileKind = WUFileStandard;
             wu->addFile(lfn.str(), &clusters, helper.getTempUsageCount(), fileKind, NULL);
         }
-        else if ((TDXtemporary | TDXjobtemp) & flags)          
+        else if ((TDXtemporary | TDXjobtemp) & flags)
             agent.noteTemporaryFilespec(filename);//note for later deletion
         if (!(flags & TDXtemporary) && helper.getSequence() >= 0)
         {
             Owned<IWUResult> result = wu->updateResultBySequence(helper.getSequence());
             if (result)
             {
-                result->setResultTotalRowCount(reccount); 
+                result->setResultTotalRowCount(reccount);
                 result->setResultStatus(ResultStatusCalculated);
                 if (helper.getFlags() & TDWresult)
                     result->setResultFilename(lfn.str());
@@ -1211,6 +1231,7 @@ void CHThorIndexWriteActivity::execute()
         builder->finish(metadata, &fileCrc);
         duplicateKeyCount = builder->getDuplicateCount();
         cummulativeDuplicateKeyCount += duplicateKeyCount;
+        numDiskWrites = io->getStatistic(StNumDiskWrites);
         out->flush();
         out.clear();
     }
@@ -1232,9 +1253,9 @@ void CHThorIndexWriteActivity::execute()
 
     //properties of the first file part.
     Owned<IPropertyTree> attrs;
-    if(clusterHandler) 
+    if(clusterHandler)
         attrs.setown(createPTree("Part"));  // clusterHandler is going to set attributes
-    else 
+    else
     {
         // add cluster
         StringBuffer mygroupname;
@@ -1281,10 +1302,11 @@ void CHThorIndexWriteActivity::execute()
     properties.setProp("@workunit", agent.queryWorkUnit()->queryWuid());
     properties.setProp("@job", agent.queryWorkUnit()->queryJobName());
     properties.setPropInt64("@duplicateKeyCount",duplicateKeyCount);
+    properties.setPropInt64("@numDiskWrites", numDiskWrites);
+
     char const * rececl = helper.queryRecordECL();
     if(rececl && *rececl)
         properties.setProp("ECL", rececl);
-    
 
     if (helper.getFlags() & TIWexpires)
         setExpiryTime(properties, helper.getExpiryDays());
@@ -1332,6 +1354,10 @@ void CHThorIndexWriteActivity::execute()
         expandLogicalFilename(lfn, fname, agent.queryWorkUnit(), agent.queryResolveFilesLocally(), false);
         dfile->attach(lfn.str(),agent.queryCodeContext()->queryUserDescriptor());
         agent.logFileAccess(dfile, "HThor", "CREATED", graph);
+
+        StringBuffer clusterName;
+        dfile->getClusterName(0, clusterName);
+        diskAccessCost = money2cost_type(calcFileCost(clusterName, 0, 0, numDiskWrites, 0));
     }
     else
         lfn = filename;
@@ -8301,10 +8327,12 @@ void CHThorDiskReadBaseActivity::resolve()
 
                 persistent = dFile->queryAttributes().getPropBool("@persistent");
                 dfsParts.setown(dFile->getIterator());
-                if (helper.getFlags() & TDRfilenamecallback)
+                IDistributedSuperFile *super = dFile->querySuperFile();
+                if (super)
                 {
-                    IDistributedSuperFile *super = dFile->querySuperFile();
-                    if (super)
+                    assertex(fdesc);
+                    superfile.set(fdesc->querySuperFileDescriptor());
+                    if (helper.getFlags() & TDRfilenamecallback)
                     {
                         unsigned numsubs = super->numSubFiles(true);
                         unsigned s=0;
@@ -8313,11 +8341,8 @@ void CHThorDiskReadBaseActivity::resolve()
                             IDistributedFile &subfile = super->querySubFile(s, true);
                             subfileLogicalFilenames.append(subfile.queryLogicalName());
                         }
-                        assertex(fdesc);
-                        superfile.set(fdesc->querySuperFileDescriptor());
                         if (!superfile && numsubs>0)
                             logicalFileName.set(subfileLogicalFilenames.item(0));
-
                     }
                 }
                 if((helper.getFlags() & (TDXtemporary | TDXjobtemp)) == 0)
@@ -8414,6 +8439,32 @@ unsigned __int64 CHThorDiskReadBaseActivity::getLocalFilePosition(const void * r
 
 void CHThorDiskReadBaseActivity::closepart()
 {
+    if (opened && inputfileio && ldFile && partNum > 0)
+    {
+        unsigned previousPartNum = partNum-1;
+        if (previousPartNum < ldFile->numParts())
+        {
+            stat_type curDiskReads = inputfileio->getStatistic(StNumDiskReads);
+            IDistributedFile * dFile = ldFile->queryDistributedFile();
+            if (dFile)
+            {
+                if (superfile)
+                {
+                    unsigned subfile, lnum;
+                    if (superfile->mapSubPart(previousPartNum, subfile, lnum))
+                    {
+                        IDistributedSuperFile *super = dFile->querySuperFile();
+                        dFile = &(super->querySubFile(subfile, true));
+                    }
+                }
+                dFile->addAttrValue("@numDiskReads", curDiskReads);
+                StringBuffer clusterName;
+                dFile->getClusterName(0, clusterName);
+                diskAccessCost += money2cost_type(calcFileCost(clusterName, 0, 0, 0, curDiskReads));
+            }
+            numDiskReads += curDiskReads;
+        }
+    }
     inputstream.clear();
     inputfileio.clear();
     inputfile.clear();
@@ -8447,7 +8498,7 @@ bool CHThorDiskReadBaseActivity::openNext()
             //MORE: Order of copies should be optimized at this point....
             StringBuffer file, filelist;
             closepart();
-            if (dfsParts && superfile && curPart)
+            if (dfsParts && superfile && curPart && !subfileLogicalFilenames.empty())
             {
                 unsigned subfile;
                 unsigned lnum;
@@ -8744,6 +8795,14 @@ void CHThorDiskReadBaseActivity::open()
     opened = true;
 }
 
+void CHThorDiskReadBaseActivity::updateProgress(IStatisticGatherer &progress) const
+{
+    CHThorActivityBase::updateProgress(progress);
+    StatsActivityScope scope(progress, activityId);
+    progress.addStatistic(StNumDiskReads, numDiskReads);
+    progress.addStatistic(StCostFileAccess, diskAccessCost);
+}
+
 //=====================================================================================================
 
 CHThorBinaryDiskReadBase::CHThorBinaryDiskReadBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskReadBaseArg &_arg, IHThorCompoundBaseArg & _segHelper, ThorActivityKind _kind, IPropertyTree *_node, EclGraph & _graph)

+ 11 - 1
ecl/hthor/hthor.ipp

@@ -283,6 +283,7 @@ protected:
     IHThorDiskWriteArg &helper;
     bool extend;
     bool overwrite;
+    Owned<IFileIO> io;
     Linked<IFileIOStream> diskout;
     StringBuffer lfn;
     StringAttr filename;
@@ -295,6 +296,8 @@ protected:
     offset_t uncompressedBytesWritten;
     Owned<IExtRowWriter> outSeq;
     unsigned __int64 numRecords;
+    stat_type numDiskWrites = 0;
+    cost_type diskAccessCost = 0;
     Owned<ClusterWriteHandler> clusterHandler;
     offset_t sizeLimit;
     Owned<IRowInterfaces> rowIf;
@@ -312,6 +315,7 @@ protected:
     void open();
     void close();
     void publish();
+    void updateProgress(IStatisticGatherer &progress) const override;
     void updateWorkUnitResult(unsigned __int64 reccount);
     void finishOutput();
     bool next();
@@ -384,7 +388,8 @@ class CHThorIndexWriteActivity : public CHThorActivityBase
     offset_t sizeLimit;
     unsigned __int64 duplicateKeyCount = 0;
     unsigned __int64 cummulativeDuplicateKeyCount = 0;
-
+    stat_type numDiskWrites = 0;
+    cost_type diskAccessCost = 0;
     void close();
     void buildUserMetadata(Owned<IPropertyTree> & metadata);
     void buildLayoutMetadata(Owned<IPropertyTree> & metadata);
@@ -393,6 +398,8 @@ class CHThorIndexWriteActivity : public CHThorActivityBase
         CHThorActivityBase::updateProgress(progress);
         StatsActivityScope scope(progress, activityId);
         progress.addStatistic(StNumDuplicateKeys, cummulativeDuplicateKeyCount);
+        progress.addStatistic(StNumDiskWrites, numDiskWrites);
+        progress.addStatistic(StCostFileAccess, diskAccessCost);
     }
 
 public:
@@ -2257,6 +2264,8 @@ protected:
     unsigned __int64 remoteLimit = 0;
     unsigned __int64 localOffset;
     unsigned __int64 offsetOfPart;
+    stat_type numDiskReads = 0;
+    cost_type diskAccessCost = 0;
     StringBuffer mangledHelperFileName;
     StringAttr logicalFileName;
     StringArray subfileLogicalFilenames;
@@ -2312,6 +2321,7 @@ public:
     virtual unsigned __int64 getLocalFilePosition(const void * row);
     virtual const char * queryLogicalFilename(const void * row) { return logicalFileName.get(); }
     virtual const byte * lookupBlob(unsigned __int64 id) { UNIMPLEMENTED; }
+    virtual void updateProgress(IStatisticGatherer &progress) const override;
 };
 
 class CHThorBinaryDiskReadBase : public CHThorDiskReadBaseActivity, implements IIndexReadContext