Преглед на файлове

Merge pull request #15930 from shamser/issue27388

HPCC-27388 Update WU file access cost for file ops

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>.
Richard Chapman преди 3 години
родител
ревизия
6e5d7a6139
променени са 9 файла, в които са добавени 42 реда и са изтрити 6 реда
  1. 4 1
      dali/dfu/dfurun.cpp
  2. 10 2
      dali/dfu/dfuwu.cpp
  3. 2 0
      dali/dfu/dfuwu.hpp
  4. 1 0
      dali/ft/daft.hpp
  5. 2 1
      dali/ft/daftprogress.hpp
  6. 18 0
      dali/ft/filecopy.cpp
  7. 2 1
      esp/scm/ws_fs.ecm
  8. 1 0
      esp/services/ws_fs/ws_fsService.cpp
  9. 2 1
      plugins/fileservices/fileservices.cpp

+ 4 - 1
dali/dfu/dfurun.cpp

@@ -140,7 +140,10 @@ class CDFUengine: public CInterface, implements IDFUengine
             DaftProgress::setRange(sizeReadBefore,totalSize,_totalNodes);
             progress->setTotalNodes(_totalNodes);
         }
-
+        void setFileAccessCost(double fileAccessCost)
+        {
+            progress->setFileAccessCost(fileAccessCost);
+        }
     };
 
     class cAbortNotify : public CInterface, implements IAbortRequestCallback, implements IDFUabortSubscriber

+ 10 - 2
dali/dfu/dfuwu.cpp

@@ -580,7 +580,11 @@ public:
         queryRoot()->getProp("@subdone",str);
         return str;
     }
-
+    double getFileAccessCost() const
+    {
+        CriticalBlock block(parent->crit);
+        return queryRoot()->getPropInt64("@fileAccessCost");
+    }
     void setSubInProgress(const char *str)
     {
         CriticalBlock block(parent->crit);
@@ -592,7 +596,11 @@ public:
         CriticalBlock block(parent->crit);
         queryRoot()->setProp("@subdone",str);
     }
-
+    void setFileAccessCost(double fileAccessCost)
+    {
+        CriticalBlock block(parent->crit);
+        queryRoot()->setPropReal("@fileAccessCost", fileAccessCost);
+    }
 };
 
 class CDFUmonitor: public CLinkedDFUWUchild, implements IDFUmonitor

+ 2 - 0
dali/dfu/dfuwu.hpp

@@ -310,6 +310,7 @@ interface IConstDFUprogress: extends IInterface
     virtual unsigned getTotalNodes() const = 0;
     virtual StringBuffer &getSubInProgress(StringBuffer &str) const = 0;    // sub-DFUWUs in progress
     virtual StringBuffer &getSubDone(StringBuffer &str) const = 0;          // sub-DFUWUs done (list)
+    virtual double getFileAccessCost() const = 0;
 };
 
 interface IDFUprogress: extends IConstDFUprogress
@@ -325,6 +326,7 @@ interface IDFUprogress: extends IConstDFUprogress
     virtual void setSubInProgress(const char *str) = 0;     // set sub-DFUWUs in progress
     virtual void setSubDone(const char *str) = 0;           // set sub-DFUWUs done
     virtual void clearProgress() = 0;
+    virtual void setFileAccessCost(double fileAccessCost) = 0;
 };
 
 interface IDFUprogressSubscriber: extends IInterface

+ 1 - 0
dali/ft/daft.hpp

@@ -33,6 +33,7 @@ interface IDaftProgress
 {
     virtual void onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize, unsigned numNodes, unsigned __int64 numReads, unsigned __int64 numWrites) = 0;          // how much has been done
     virtual void setRange(unsigned __int64 sizeReadBefore, unsigned __int64 totalSize, unsigned totalNodes) = 0;          // how much has been done
+    virtual void setFileAccessCost(double fileAccessCost) = 0;
 };
 
 interface IDaftCopyProgress

+ 2 - 1
dali/ft/daftprogress.hpp

@@ -33,7 +33,7 @@ public:
                             unsigned numNodes, unsigned __int64 numReads, unsigned __int64 numWrites) = 0;
     virtual void displaySummary(const char * timeTaken, unsigned kbPerSecond) = 0;
     virtual void setRange(unsigned __int64 sizeReadBefore, unsigned __int64 totalSize, unsigned _totalNodes);
-
+    virtual void setFileAccessCost(double fileAccessCost) = 0;
 protected:
     void formatTime(char * buffer, unsigned secs);
 
@@ -59,6 +59,7 @@ public:
                             unsigned __int64 scaledDone, unsigned __int64 scaledTotal, const char * scale,
                             unsigned kbPerSecondAve, unsigned kbPerSecondRate, unsigned numNodes);
     virtual void displaySummary(const char * timeTaken, unsigned kbPerSecond);
+    virtual void setFileAccessCost(double fileAccessCost) {};
 };
 
 #endif

+ 18 - 0
dali/ft/filecopy.cpp

@@ -3105,6 +3105,24 @@ void FileSprayer::spray()
 
     //If got here then we have succeeded
     updateTargetProperties();
+
+    //Calculate and store file access cost
+    double fileAccessCost = 0.0;
+    if (distributedTarget)
+    {
+        StringBuffer cluster;
+        distributedTarget->getClusterName(0, cluster);
+        if (!cluster.isEmpty())
+            fileAccessCost += calcFileAccessCost(cluster, totalNumWrites, 0);
+    }
+    if (distributedSource && distributedSource->querySuperFile()==nullptr)
+    {
+        StringBuffer cluster;
+        distributedSource->getClusterName(0, cluster);
+        if (!cluster.isEmpty())
+            fileAccessCost += calcFileAccessCost(cluster, 0, totalNumReads);
+    }
+    progressReport->setFileAccessCost(fileAccessCost);
     StringBuffer copyEventText;     // [logical-source] > [logical-target]
     if (distributedSource)
         copyEventText.append(distributedSource->queryLogicalName());

+ 2 - 1
esp/scm/ws_fs.ecm

@@ -96,6 +96,7 @@ ESPStruct [nil_remove] DFUWorkunit
     [min_ver("1.12")] bool preserveCompression(true);
     [min_ver("1.14")] int expireDays;
     [min_ver("1.21")] bool PreserveFileParts;
+    [min_ver("1.23")] double FileAccessCost;
 };
 
 ESPStruct [nil_remove] GroupNode
@@ -692,7 +693,7 @@ ESPresponse [exceptions_inline, nil_remove] GetDFUServerQueuesResponse
 
 ESPservice [
     auth_feature("DEFERRED"),
-    version("1.22"),
+    version("1.23"),
     exceptions_inline("./smc_xslt/exceptions.xslt")] FileSpray
 {
     ESPmethod EchoDateTime(EchoDateTime, EchoDateTimeResponse);

+ 1 - 0
esp/services/ws_fs/ws_fsService.cpp

@@ -351,6 +351,7 @@ static void DeepAssign(IEspContext &context, IConstDFUWorkUnit *src, IEspDFUWork
         if(secs > 0)
             dest.setSecsLeft(secs);
         dest.setPercentDone(prog->getPercentDone());
+        dest.setFileAccessCost(prog->getFileAccessCost());
     }
 
     IConstDFUoptions *options = src->queryOptions();

+ 2 - 1
plugins/fileservices/fileservices.cpp

@@ -617,10 +617,11 @@ static void blockUntilComplete(const char * label, IClientFileSpray &server, ICo
             wuScope.appendf("%s-%s", label, dfuwu.getID());
             ElapsedLabel.append(wuScope).append(" (Elapsed) ");
             RemainingLabel.append(wuScope).append(" (Remaining) ");
-
             //MORE: I think this are intended to replace the timing information, but will currently combine
             updateWorkunitStat(wu, SSTdfuworkunit, wuScope, StTimeElapsed, ElapsedLabel, milliToNano(time.elapsed()));
             updateWorkunitStat(wu, SSTdfuworkunit, wuScope, StTimeRemaining, RemainingLabel, milliToNano(dfuwu.getSecsLeft()*1000));
+            stat_type costFileAccess = money2cost_type(dfuwu.getFileAccessCost());
+            updateWorkunitStat(wu, SSTdfuworkunit, wuScope, StCostFileAccess, "", costFileAccess);
             wu->setApplicationValue(label, dfuwu.getID(), dfuwu.getSummaryMessage(), true);
             wu->commit();
             wu.clear();