Selaa lähdekoodia

HPCC-27339 Store read/write stats for spray

Signed-off-by: Shamser Ahmed <shamser.ahmed@lexisnexis.com>
Shamser Ahmed 3 vuotta sitten
vanhempi
commit
f4e7ad3d7e

+ 3 - 3
dali/dfu/dfurun.cpp

@@ -117,7 +117,7 @@ class CDFUengine: public CInterface, implements IDFUengine
         void displayProgress(unsigned percentDone, unsigned secsLeft, const char * timeLeft,
                                 unsigned __int64 scaledDone, unsigned __int64 scaledTotal, const char * scale,
                                 unsigned kbPerSecondAve, unsigned kbPerSecondRate,
-                                unsigned slavesDone)
+                                unsigned slavesDone, unsigned __int64 numReads, unsigned __int64 numWrites)
         {
             if (repmode==REPbefore)
                 percentDone /= 2;
@@ -125,7 +125,7 @@ class CDFUengine: public CInterface, implements IDFUengine
                 if (repmode==REPduring)
                     percentDone = percentDone/2+50;
             progress->setProgress(percentDone, secsLeft, timeLeft, scaledDone, scaledTotal, scale,
-                                 kbPerSecondAve, kbPerSecondRate, slavesDone, repmode==REPduring);
+                                 kbPerSecondAve, kbPerSecondRate, slavesDone, repmode==REPduring, numReads, numWrites);
         }
         void displaySummary(const char * timeTaken, unsigned kbPerSecond)
         {
@@ -997,7 +997,7 @@ public:
                 numdone++;
                 subfiles.append(dlfnres.get());
                 if ((ctx.level==1)&&ctx.feedback)
-                    ctx.feedback->displayProgress(numtodo?(numdone*100/numtodo):0,0,"unknown",0,0,"",0,0,0);
+                    ctx.feedback->displayProgress(numtodo?(numdone*100/numtodo):0,0,"unknown",0,0,"",0,0,0,0,0);
             }
             // now construct the superfile
             Owned<IDistributedSuperFile> sfile = queryDistributedFileDirectory().createSuperFile(dlfn.get(),ctx.user,true,false);

+ 3 - 1
dali/dfu/dfuwu.cpp

@@ -466,7 +466,7 @@ public:
     void setProgress(   unsigned percentDone, unsigned secsLeft, const char * timeLeft,
                         unsigned __int64 scaledDone, unsigned __int64 scaledTotal, const char * scale,
                         unsigned kbPerSecAve, unsigned kbPerSecRate,
-                        unsigned slavesDone, bool replicating)
+                        unsigned slavesDone, bool replicating, unsigned __int64 numReads, unsigned __int64 numWrites)
     {
         CriticalBlock block(parent->crit);
         queryRoot()->setPropInt("@percentdone",(int)percentDone);
@@ -479,6 +479,8 @@ public:
         queryRoot()->setPropInt("@kbpersec",(int)kbPerSecRate);
         queryRoot()->setPropInt("@slavesdone",(int)slavesDone);
         queryRoot()->setPropInt("@replicating",replicating?1:0);
+        queryRoot()->setPropInt("@numreads",numReads);
+        queryRoot()->setPropInt("@numwrites",numWrites);
         parent->commit();
     }
     void setPercentDone(unsigned percentDone)

+ 1 - 1
dali/dfu/dfuwu.hpp

@@ -317,7 +317,7 @@ interface IDFUprogress: extends IConstDFUprogress
     virtual void setProgress(unsigned percentDone, unsigned secsLeft, const char * timeLeft,
                              unsigned __int64 scaledDone, unsigned __int64 scaledTotal, const char * scale,
                              unsigned kbPerSecondAve, unsigned kbPerSecondRate,
-                             unsigned slavesDone, bool replicating)=0;
+                             unsigned slavesDone, bool replicating, unsigned __int64 numReads, unsigned __int64 numWrites)=0;
     virtual void setDone(const char * timeTaken, unsigned kbPerSecond, bool set100pc) = 0;
     virtual void setState(DFUstate state) = 0;
     virtual void setTotalNodes(unsigned val) = 0;

+ 1 - 1
dali/ft/daft.hpp

@@ -31,7 +31,7 @@ interface IMultiException;
 
 interface IDaftProgress
 {
-    virtual void onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize, unsigned numNodes) = 0;          // how much has been done
+    virtual void onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize, unsigned numNodes, unsigned __int64 numReads, unsigned __int64 numWrites) = 0;          // how much has been done
     virtual void setRange(unsigned __int64 sizeReadBefore, unsigned __int64 totalSize, unsigned totalNodes) = 0;          // how much has been done
 };
 

+ 2 - 2
dali/ft/daftprogress.cpp

@@ -64,7 +64,7 @@ void DaftProgress::formatTime(char * buffer, unsigned secs)
         sprintf(buffer, "%d secs", secs);
 }
 
-void DaftProgress::onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize, unsigned numNodes)
+void DaftProgress::onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize, unsigned numNodes, unsigned __int64 numReads, unsigned __int64 numWrites)
 {
     cycle_t nowTime = get_cycles_now();
     savedTime[nextSample] = nowTime;
@@ -91,7 +91,7 @@ void DaftProgress::onProgress(unsigned __int64 sizeDone, unsigned __int64 totalS
         displayProgress((unsigned)(sizeDone*100/totalSize), secsLeft, temp, 
                         sizeDone/scale,totalSize/scale,scaleUnit, 
                         (unsigned)(msGone ? (sizeDone-startSize)/msGone : 0),
-                        (unsigned)(recentTimeDelta ? recentSizeDelta / recentTimeDelta : 0), numNodes);
+                        (unsigned)(recentTimeDelta ? recentSizeDelta / recentTimeDelta : 0), numNodes, numReads, numWrites);
 
         if (sizeDone == totalSize)
         {

+ 2 - 2
dali/ft/daftprogress.hpp

@@ -26,11 +26,11 @@ class DALIFT_API DaftProgress : public IDaftProgress
 public:
     DaftProgress();
 
-    virtual void onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize, unsigned numNodes);
+    virtual void onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize, unsigned numNodes, unsigned __int64 numReads, unsigned __int64 numWrites);
     virtual void displayProgress(unsigned percentDone, unsigned secsLeft, const char * timeLeft,
                             unsigned __int64 scaledDone, unsigned __int64 scaledTotal, const char * scale,
                             unsigned kbPerSecondAve, unsigned kbPerSecondRate,
-                            unsigned numNodes) = 0;
+                            unsigned numNodes, unsigned __int64 numReads, unsigned __int64 numWrites) = 0;
     virtual void displaySummary(const char * timeTaken, unsigned kbPerSecond) = 0;
     virtual void setRange(unsigned __int64 sizeReadBefore, unsigned __int64 totalSize, unsigned _totalNodes);
 

+ 12 - 2
dali/ft/filecopy.cpp

@@ -110,7 +110,6 @@ inline void setCanAccessDirectly(RemoteFilename & file)
 #define FAsize              "@size"
 #define FAcompressedSize    "@compressedSize"
 
-
 const unsigned operatorUpdateFrequency = 5000;      // time between updates in ms
 const unsigned abortCheckFrequency = 20000;         // time between updates in ms
 const unsigned sdsUpdateFrequency = 20000;          // time between updates in ms
@@ -615,6 +614,8 @@ FileSprayer::FileSprayer(IPropertyTree * _options, IPropertyTree * _progress, IR
     calcedInputCRC = false;
     aborting = false;
     totalLengthRead = 0;
+    totalNumReads = 0;
+    totalNumWrites = 0;
     throttleNicSpeed = 0;
     compressedInput = false;
     compressOutput = options->getPropBool(ANcompress);
@@ -2828,7 +2829,10 @@ void FileSprayer::updateProgress(const OutputProgress & newProgress)
     OutputProgress & curProgress = progress.item(newProgress.whichPartition);
 
     totalLengthRead += (newProgress.inputLength - curProgress.inputLength);
+    totalNumReads += (newProgress.numReads - curProgress.numReads);
+    totalNumWrites += (newProgress.numWrites - curProgress.numWrites);
     curProgress.set(newProgress);
+
     if (curProgress.tree)
         curProgress.save(curProgress.tree);
 
@@ -2853,7 +2857,7 @@ void FileSprayer::updateSizeRead()
         unsigned numCompleted = (sizeReadSoFar == sizeToBeRead) ? transferSlaves.ordinality() : numSlavesCompleted;
         if (done || (nowTick - lastOperatorTick >= operatorUpdateFrequency))
         {
-            progressReport->onProgress(sizeReadSoFar, sizeToBeRead, numCompleted);
+            progressReport->onProgress(sizeReadSoFar, sizeToBeRead, numCompleted, totalNumReads, totalNumWrites);
             lastOperatorTick = nowTick;
             progressDone = done;
         }
@@ -3288,6 +3292,7 @@ void FileSprayer::updateTargetProperties()
 
         DistributedFilePropertyLock lock(distributedTarget);
         IPropertyTree &curProps = lock.queryAttributes();
+        curProps.setPropInt64("@numDiskWrites", totalNumWrites);
         if (calcCRC())
             curProps.setPropInt(FAcrc, totalCRC.get());
         curProps.setPropInt64(FAsize, totalLength);
@@ -3463,6 +3468,11 @@ void FileSprayer::updateTargetProperties()
         if (expireDays != -1)
             curProps.setPropInt("@expireDays", expireDays);
     }
+    if (distributedSource)
+    {
+        if (distributedSource->querySuperFile()==nullptr)
+            distributedSource->addAttrValue("@numDiskReads", totalNumReads);
+    }
     if (error)
         throw error.getClear();
 }

+ 2 - 0
dali/ft/filecopy.ipp

@@ -336,6 +336,8 @@ protected:
     bool                    compressOutput;
     bool                    copyCompressed;
     unsigned __int64        totalLengthRead;
+    unsigned __int64        totalNumReads;
+    unsigned __int64        totalNumWrites;
     unsigned                throttleNicSpeed;
     unsigned                lastProgressTick;
     StringAttr              wuid; // used for logging

+ 10 - 2
dali/ft/ftbase.cpp

@@ -540,13 +540,15 @@ void OutputProgress::reset()
     outputLength = 0;
     hasCompressed = false;
     compressedPartSize = 0;
+    numWrites = 0;
+    numReads = 0;
 }
 
 MemoryBuffer & OutputProgress::deserializeCore(MemoryBuffer & in)
 {
     unsigned _inputCRC, _outputCRC;
     bool hasTime;
-    in.read(status).read(whichPartition).read(hasInputCRC).read(_inputCRC).read(inputLength).read(_outputCRC).read(outputLength).read(hasTime);
+    in.read(status).read(whichPartition).read(hasInputCRC).read(_inputCRC).read(inputLength).read(_outputCRC).read(outputLength).read(hasTime).read(numWrites).read(numReads);
     inputCRC = _inputCRC;
     outputCRC = _outputCRC;
     if (hasTime)
@@ -584,7 +586,7 @@ MemoryBuffer & OutputProgress::serializeCore(MemoryBuffer & out)
     bool hasTime = !resultTime.isNull();
     unsigned _inputCRC = inputCRC;
     unsigned _outputCRC = outputCRC;
-    out.append(status).append(whichPartition).append(hasInputCRC).append(_inputCRC).append(inputLength).append(_outputCRC).append(outputLength).append(hasTime);
+    out.append(status).append(whichPartition).append(hasInputCRC).append(_inputCRC).append(inputLength).append(_outputCRC).append(outputLength).append(hasTime).append(numWrites).append(numReads);
     if (hasTime)
         resultTime.serialize(out);
     return out;
@@ -615,6 +617,8 @@ void OutputProgress::set(const OutputProgress & other)
     resultTime = other.resultTime;
     hasCompressed = other.hasCompressed;
     compressedPartSize = other.compressedPartSize;
+    numWrites = other.numWrites;
+    numReads = other.numReads;
 }
 
 void OutputProgress::restore(IPropertyTree * tree)
@@ -629,6 +633,8 @@ void OutputProgress::restore(IPropertyTree * tree)
     resultTime.setString(tree->queryProp("@modified"));
     hasCompressed = tree->getPropBool("@compressed");
     compressedPartSize = tree->getPropInt64("@compressedPartSize");
+    numWrites = tree->getPropInt64("@numWrites");
+    numReads = tree->getPropInt64("@numReads");
 }
 
 void OutputProgress::save(IPropertyTree * tree)
@@ -647,6 +653,8 @@ void OutputProgress::save(IPropertyTree * tree)
     }
     tree->setPropInt("@compressed", hasCompressed);
     tree->setPropInt64("@compressedPartSize", compressedPartSize);
+    tree->setPropInt64("@numWrites", numWrites);
+    tree->setPropInt64("@numReads", numReads);
 }
 
 

+ 2 - 0
dali/ft/ftbase.ipp

@@ -110,6 +110,8 @@ public:
     bool            hasInputCRC;
     bool            hasCompressed;
     offset_t        compressedPartSize;
+    stat_type       numWrites;
+    stat_type       numReads;
 
 //Not saved/serialized - should probably be in a Sprayer-only class that contains an outputProgress.
     Owned<IPropertyTree> tree;

+ 12 - 1
dali/ft/fttransform.cpp

@@ -521,7 +521,8 @@ void TransferServer::appendTransformed(unsigned chunkIndex, ITransformer * input
 
     const offset_t startInputOffset = curPartition.inputOffset;
     const offset_t startOutputOffset = curPartition.outputOffset;
-
+    stat_type prevNumWrites =  out->getStatistic(StNumDiskWrites);
+    stat_type prevNumReads = input->getStatistic(StNumDiskReads);
     for (;;)
     {
         unsigned gotLength = input->getBlock(out);
@@ -541,6 +542,12 @@ void TransferServer::appendTransformed(unsigned chunkIndex, ITransformer * input
             curProgress.status = (gotLength == 0) ? OutputProgress::StatusCopied : OutputProgress::StatusActive;
             curProgress.inputLength = input->tell()-startInputOffset;
             curProgress.outputLength = out->tell()-startOutputOffset;
+            stat_type curNumWrites = out->getStatistic(StNumDiskWrites);
+            stat_type curNumReads = input->getStatistic(StNumDiskReads);
+            curProgress.numWrites += (curNumWrites - prevNumWrites);
+            curProgress.numReads += (curNumReads - prevNumReads);
+            prevNumWrites = curNumWrites;
+            prevNumReads = curNumReads;
             if (crcOut)
                 curProgress.outputCRC = crcOut->getCRC();
             if (calcInputCRC)
@@ -688,10 +695,12 @@ void TransferServer::transferChunk(unsigned chunkIndex)
     size32_t fixedTextLength = (size32_t)curPartition.fixedText.length();
     if (fixedTextLength || curPartition.inputName.isNull())
     {
+        stat_type prevWrites = out->getStatistic(StNumDiskWrites);
         out->write(fixedTextLength, curPartition.fixedText.get());
         curProgress.status = OutputProgress::StatusCopied;
         curProgress.inputLength = fixedTextLength;
         curProgress.outputLength = fixedTextLength;
+        curProgress.numWrites += (out->getStatistic(StNumDiskWrites)-prevWrites);
         if (crcOut)
             curProgress.outputCRC = crcOut->getCRC();
         sendProgress(curProgress);
@@ -864,7 +873,9 @@ processedProgress:
                 {
                     char null = 0;
                     offset_t lastOffset = lastChunk.outputOffset+lastChunk.outputLength;
+                    stat_type prevWrites = outio->getStatistic(StNumDiskWrites);
                     outio->write(lastOffset-sizeof(null),sizeof(null),&null);
+                    curProgress.numWrites += (outio->getStatistic(StNumDiskWrites)-prevWrites);
                     LOG(MCdebugProgress, unknownJob, "Extend length of target file to %" I64F "d", lastOffset);
                 }
             }

+ 1 - 0
dali/ft/fttransform.hpp

@@ -31,6 +31,7 @@ public:
     virtual void setInputCRC(crc32_t value) = 0;
     virtual bool setPartition(RemoteFilename & remoteInputName, offset_t _startOffset, offset_t _length, bool compressedInput, const char *decryptKey) = 0;
     virtual offset_t tell() = 0;
+    virtual stat_type getStatistic(StatisticKind kind) = 0;
 };
 
 

+ 3 - 0
dali/ft/fttransform.ipp

@@ -38,6 +38,7 @@ public:
     virtual bool getInputCRC(crc32_t & value) { return false; }
     virtual bool setPartition(RemoteFilename & remoteInputName, offset_t _startOffset, offset_t _length, bool compressedInput, const char *decryptKey) = 0;
     virtual void setInputCRC(crc32_t value);
+    virtual stat_type getStatistic(StatisticKind kind) = 0;
 
 protected:
     bool setPartition(RemoteFilename & remoteInputName, offset_t _startOffset, offset_t _length);
@@ -60,6 +61,7 @@ public:
     virtual bool setPartition(RemoteFilename & remoteInputName, offset_t _startOffset, offset_t _length, bool compressedInput, const char *decryptKey);
     virtual size32_t getBlock(IFileIOStream * out);
     virtual offset_t tell();
+    virtual stat_type getStatistic(StatisticKind kind) override { return input->getStatistic(kind); }
 
 protected:
     size32_t read(size32_t maxLength, void * buffer);
@@ -192,6 +194,7 @@ public:
     virtual bool setPartition(RemoteFilename & remoteInputName, offset_t _startOffset, offset_t _length, bool compressedInput, const char *decryptKey);
     virtual void setInputCRC(crc32_t value);
     virtual offset_t tell();
+    virtual stat_type getStatistic(StatisticKind kind) override { UNIMPLEMENTED; }
 
 protected:
     Owned<IFormatProcessor> processor;