Browse Source

HPCC-14572 Add IO timing information to the stats

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 9 years ago
parent
commit
c02aafd08a

+ 4 - 0
common/remote/hooks/git/gitfile.cpp

@@ -139,6 +139,10 @@ public:
     {
         throwUnexpected();
     }
+    unsigned __int64 getStatistic(StatisticKind kind)
+    {
+        return 0;
+    }
 protected:
     MemoryBuffer buf;
 };

+ 4 - 0
common/remote/hooks/libarchive/archive.cpp

@@ -277,6 +277,10 @@ public:
     {
         throwUnexpected();
     }
+    unsigned __int64 getStatistic(StatisticKind kind)
+    {
+        return 0;
+    }
 protected:
     struct archive *archive;
     offset_t fileSize;

+ 41 - 1
common/remote/sockfile.cpp

@@ -38,6 +38,7 @@
 #include "jset.hpp"
 
 #include "remoteerr.hpp"
+#include <atomic>
 
 #define SOCKET_CACHE_MAX 500
 
@@ -2144,6 +2145,12 @@ class CRemoteFileIO : public CInterface, implements IFileIO
 protected:
     Linked<CRemoteFile> parent;
     RemoteFileIOHandle  handle;
+    std::atomic<cycle_t> ioReadCycles;
+    std::atomic<cycle_t> ioWriteCycles;
+    std::atomic<__uint64> ioReadBytes;
+    std::atomic<__uint64> ioWriteBytes;
+    std::atomic<__uint64> ioReads;
+    std::atomic<__uint64> ioWrites;
     IFOmode mode;
     compatIFSHmode compatmode;
     IFEflags extraFlags;
@@ -2151,7 +2158,8 @@ protected:
 public:
     IMPLEMENT_IINTERFACE
     CRemoteFileIO(CRemoteFile *_parent)
-        : parent(_parent)
+        : parent(_parent), ioReadCycles(0), ioWriteCycles(0), ioReadBytes(0), ioWriteBytes(0), ioReads(0), ioWrites(0)
+
     {
         handle = 0;
         disconnectonexit = false;
@@ -2215,6 +2223,7 @@ public:
             break;
         default:
             mode = _mode;
+            break;
         }
         compatmode = _compatmode;
         extraFlags = _extraFlags;
@@ -2247,12 +2256,39 @@ public:
         return ret;
     }
 
+    virtual unsigned __int64 getStatistic(StatisticKind kind)
+    {
+        switch (kind)
+        {
+        case StCycleDiskReadIOCycles:
+            return ioReadCycles.load(std::memory_order_relaxed);
+        case StCycleDiskWriteIOCycles:
+            return ioWriteCycles.load(std::memory_order_relaxed);
+        case StTimeDiskReadIO:
+            return cycle_to_nanosec(ioReadCycles.load(std::memory_order_relaxed));
+        case StTimeDiskWriteIO:
+            return cycle_to_nanosec(ioWriteCycles.load(std::memory_order_relaxed));
+        case StSizeDiskRead:
+            return ioReadBytes.load(std::memory_order_relaxed);
+        case StSizeDiskWrite:
+            return ioWriteBytes.load(std::memory_order_relaxed);
+        case StNumDiskReads:
+            return ioReads.load(std::memory_order_relaxed);
+        case StNumDiskWrites:
+            return ioWrites.load(std::memory_order_relaxed);
+        }
+        return 0;
+    }
 
     size32_t read(offset_t pos, size32_t len, void * data)
     {
         size32_t got;
         MemoryBuffer replyBuffer;
+        CCycleTimer timer;
         const void *b = doRead(pos,len,replyBuffer,got,data);
+        ioReadCycles.fetch_add(timer.elapsedCycles());
+        ioReadBytes.fetch_add(got);
+        ++ioReads;
         if (b!=data)
             memcpy(data,b,got);
         return got;
@@ -2320,6 +2356,7 @@ public:
     {
         unsigned tries=0;
         size32_t ret = 0;
+        CCycleTimer timer;
         loop {
             try {
                 MemoryBuffer replyBuffer;
@@ -2341,6 +2378,9 @@ public:
 
             }
         }
+        ioWriteCycles.fetch_add(timer.elapsedCycles());
+        ioWriteBytes.fetch_add(ret);
+        ++ioWrites;
         if ((ret==(size32_t)-1) || (ret < len))
             throw createDafsException(DISK_FULL_EXCEPTION_CODE,"write failed, disk full?");
         return ret;

+ 6 - 0
common/thorhelper/thorcommon.cpp

@@ -1279,6 +1279,12 @@ public:
         return source.tell();
     }
 
+    virtual unsigned __int64 getStatistic(StatisticKind kind)
+    {
+        if (fileio)
+            return fileio->getStatistic(kind);
+        return 0;
+    }
 
 };
 

+ 2 - 0
common/thorhelper/thorcommon.hpp

@@ -22,6 +22,7 @@
 #include "jcrc.hpp"
 #include "jsort.hpp"
 #include "jdebug.hpp"
+#include "jfile.hpp"
 #include "eclhelper.hpp"
 #include "thorhelper.hpp"
 #include "thorxmlwrite.hpp"
@@ -101,6 +102,7 @@ interface IExtRowStream: extends IRowStream
     virtual const void *prefetchRow(size32_t *sz=NULL) = 0;
     virtual void prefetchDone() = 0;
     virtual void reinit(offset_t offset,offset_t len,unsigned __int64 maxrows) = 0;
+    virtual unsigned __int64 getStatistic(StatisticKind kind) = 0;
 };
 
 interface IExtRowWriter: extends IRowWriter

+ 1 - 0
dali/datest/datest.cpp

@@ -2591,6 +2591,7 @@ NULL
                 verifyex(sz == iFileIOs.item(i).write(pos, len, data));
             return sz;
         }
+        virtual unsigned __int64 getStatistic(StatisticKind kind) { return 0; }
         virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=-1) { UNIMPLEMENTED; return 0; }
         virtual void setSize(offset_t size) { UNIMPLEMENTED; }
         virtual void flush() { }

+ 18 - 1
roxie/ccd/ccdfile.cpp

@@ -71,6 +71,7 @@ public:
     virtual void setSize(offset_t size) { UNIMPLEMENTED; }
     virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { UNIMPLEMENTED; return 0; }
     virtual void close() { }
+    virtual unsigned __int64 getStatistic(StatisticKind kind) { return 0; }
 } failure;
 
 class CLazyFileIO : public CInterface, implements ILazyFileIO, implements IDelayedFile
@@ -90,6 +91,7 @@ protected:
     bool copying;
     bool isCompressed;
     const IRoxieFileCache *cached;
+    CRuntimeStatisticCollection fileStats;
 
 #ifdef FAIL_20_READ
     unsigned readCount;
@@ -99,7 +101,7 @@ public:
     IMPLEMENT_IINTERFACE;
 
     CLazyFileIO(IFile *_logical, offset_t size, const CDateTime &_date, unsigned _crc, bool _isCompressed)
-        : logical(_logical), fileSize(size), crc(_crc), isCompressed(_isCompressed)
+        : logical(_logical), fileSize(size), crc(_crc), isCompressed(_isCompressed), fileStats(diskLocalStatistics)
     {
         fileDate.set(_date);
         currentIdx = 0;
@@ -361,6 +363,21 @@ public:
         return current->size();
     }
 
+    virtual unsigned __int64 getStatistic(StatisticKind kind)
+    {
+        switch (kind)
+        {
+        case StTimeDiskReadIO:
+            return cycle_to_nanosec(getStatistic(StCycleDiskReadIOCycles));
+        case StTimeDiskWriteIO:
+            return cycle_to_nanosec(getStatistic(StCycleDiskWriteIOCycles));
+        }
+
+        CriticalBlock b(crit);
+        unsigned __int64 openValue = current ? current->getStatistic(kind) : 0;
+        return openValue + fileStats.getStatisticValue(kind);
+    }
+
     virtual size32_t write(offset_t pos, size32_t len, const void * data) { throwUnexpected(); }
     virtual void setSize(offset_t size) { throwUnexpected(); }
     virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { throwUnexpected(); return 0; }

+ 2 - 0
system/jlib/CMakeLists.txt

@@ -137,6 +137,8 @@ set (    INCLUDES
         jsocket.hpp
         jsort.hpp
         jsorta.hpp
+        jstatcodes.h
+        jstats.h
         jstream.hpp
         jstream.ipp
         jstring.hpp

+ 1 - 1
system/jlib/jdebug.hpp

@@ -114,7 +114,7 @@ class CCycleTimer
 {
     cycle_t start_time;
 public:
-    CCycleTimer()
+    inline CCycleTimer()
     {
         reset();
     }

+ 81 - 8
system/jlib/jfile.cpp

@@ -1582,6 +1582,10 @@ IFileIO *_createIFileIO(const void *buffer, unsigned sz, bool readOnly)
         }
         virtual void flush() {}
         virtual void close() {}
+        virtual unsigned __int64 getStatistic(StatisticKind kind)
+        {
+            return 0;
+        }
         virtual void setSize(offset_t size)
         {
             if (size > mb.length())
@@ -1761,11 +1765,36 @@ offset_t CFileIO::appendFile(IFile *file,offset_t pos,offset_t len)
     return ret;
 }
 
+unsigned __int64 CFileIO::getStatistic(StatisticKind kind)
+{
+    switch (kind)
+    {
+    case StCycleDiskReadIOCycles:
+        return ioReadCycles.load(std::memory_order_relaxed);
+    case StCycleDiskWriteIOCycles:
+        return ioWriteCycles.load(std::memory_order_relaxed);
+    case StTimeDiskReadIO:
+        return cycle_to_nanosec(ioReadCycles.load(std::memory_order_relaxed));
+    case StTimeDiskWriteIO:
+        return cycle_to_nanosec(ioWriteCycles.load(std::memory_order_relaxed));
+    case StSizeDiskRead:
+        return ioReadBytes.load(std::memory_order_relaxed);
+    case StSizeDiskWrite:
+        return ioWriteBytes.load(std::memory_order_relaxed);
+    case StNumDiskReads:
+        return ioReads.load(std::memory_order_relaxed);
+    case StNumDiskWrites:
+        return ioWrites.load(std::memory_order_relaxed);
+    }
+    return 0;
+}
+
 #ifdef _WIN32
 
 //-- Windows implementation -------------------------------------------------
 
 CFileIO::CFileIO(HANDLE handle, IFOmode _openmode, IFSHmode _sharemode, IFEflags _extraFlags)
+    : ioReadCycles(0), ioWriteCycles(0), ioReadBytes(0), ioWriteBytes(0), ioReads(0), ioWrites(0)
 {
     assertex(handle != NULLFILE);
     throwOnError = false;
@@ -1828,10 +1857,14 @@ size32_t CFileIO::read(offset_t pos, size32_t len, void * data)
 {
     CriticalBlock procedure(cs);
 
+    CCycleTimer timer;
     DWORD numRead;
     setPos(pos);
     if (ReadFile(file,data,len,&numRead,NULL) == 0)
         throw makeOsException(GetLastError(),"CFileIO::read");
+    ioReadCycles.fetch_add(timer.elapsedCycles());
+    ioReadBytes.fetch_add(numRead);
+    ++ioReads;
     return (size32_t)numRead;
 }
 
@@ -1846,12 +1879,16 @@ size32_t CFileIO::write(offset_t pos, size32_t len, const void * data)
 {
     CriticalBlock procedure(cs);
 
+    CCycleTimer timer;
     DWORD numWritten;
     setPos(pos);
     if (!WriteFile(file,data,len,&numWritten,NULL))
         throw makeOsException(GetLastError(),"CFileIO::write");
     if (numWritten != len)
         throw makeOsException(DISK_FULL_EXCEPTION_CODE,"CFileIO::write");
+    ioWriteCycles.fetch_add(timer.elapsedCycles());
+    ioWriteBytes.fetch_add(numWritten);
+    ++ioWrites;
     return (size32_t)numWritten;
 }
 
@@ -1870,6 +1907,8 @@ void CFileIO::setSize(offset_t pos)
 
 // More errorno checking TBD
 CFileIO::CFileIO(HANDLE handle, IFOmode _openmode, IFSHmode _sharemode, IFEflags _extraFlags)
+    : ioReadCycles(0), ioWriteCycles(0), ioReadBytes(0), ioWriteBytes(0), ioReads(0), ioWrites(0)
+
 {
     assertex(handle != NULLFILE);
     throwOnError = false;
@@ -1882,7 +1921,6 @@ CFileIO::CFileIO(HANDLE handle, IFOmode _openmode, IFSHmode _sharemode, IFEflags
             extraFlags = static_cast<IFEflags>(extraFlags & ~IFEnocache);
     atomic_set(&bytesRead, 0);
     atomic_set(&bytesWritten, 0);
-
 #ifdef CFILEIOTRACE
     DBGLOG("CFileIO::CfileIO(%d,%d,%d,%d)", handle, _openmode, _sharemode, _extraFlags);
 #endif
@@ -1956,7 +1994,13 @@ offset_t CFileIO::size()
 size32_t CFileIO::read(offset_t pos, size32_t len, void * data)
 {
     if (0==len) return 0;
+
+    CCycleTimer timer;
     size32_t ret = checked_pread(file, data, len, pos);
+    ioReadCycles.fetch_add(timer.elapsedCycles());
+    ioReadBytes.fetch_add(ret);
+    ++ioReads;
+
     if ( (extraFlags & IFEnocache) && (ret > 0) )
     {
         if (atomic_add_and_read(&bytesRead, ret) >= PGCFLUSH_BLKSIZE)
@@ -1990,7 +2034,12 @@ static void sync_file_region(int fd, offset_t offset, offset_t nbytes)
 
 size32_t CFileIO::write(offset_t pos, size32_t len, const void * data)
 {
+    CCycleTimer timer;
     size32_t ret = pwrite(file,data,len,pos);
+    ioWriteCycles.fetch_add(timer.elapsedCycles());
+    ioWriteBytes.fetch_add(ret);
+    ++ioWrites;
+
     if (ret==(size32_t)-1)
         throw makeErrnoException(errno, "CFileIO::write");
     if (ret<len)
@@ -2343,6 +2392,12 @@ offset_t CFileAsyncIO::size()
     return length;
 }
 
+unsigned __int64 CFileAsyncIO::getStatistic(StatisticKind kind)
+{
+    //MORE: Could implement - but I don't think this class is currently used
+    return 0;
+}
+
 size32_t CFileAsyncIO::read(offset_t pos, size32_t len, void * data)
 {
     CriticalBlock procedure(cs);
@@ -2410,7 +2465,6 @@ IFileAsyncResult *CFileAsyncIO::writeAsync(offset_t pos, size32_t len, const voi
 
 #endif
 
-
 //---------------------------------------------------------------------------
 
 CFileIOStream::CFileIOStream(IFileIO * _io)
@@ -6397,6 +6451,7 @@ class CCachedFileIO: public CInterface, implements IFileIO
     CLazyFileIOCache &owner;
     RemoteFilename filename;
     CriticalSection &sect;
+    CRuntimeStatisticCollection fileStats;
     IFOmode mode;
 
     void writeNotSupported(const char *s)
@@ -6411,16 +6466,31 @@ public:
 
 
     CCachedFileIO(CLazyFileIOCache &_owner, CriticalSection &_sect, RemoteFilename &_filename, IFOmode _mode)
-        : owner(_owner), sect(_sect)
+        : owner(_owner), sect(_sect), fileStats(diskLocalStatistics)
     {
         filename.set(_filename);
         mode = _mode;
+        accesst = 0;
     }
 
     virtual void Link(void) const       { CInterface::Link(); }                     \
 
     virtual bool Release(void) const;
     
+    unsigned __int64 getStatistic(StatisticKind kind)
+    {
+        switch (kind)
+        {
+        case StTimeDiskReadIO:
+            return cycle_to_nanosec(getStatistic(StCycleDiskReadIOCycles));
+        case StTimeDiskWriteIO:
+            return cycle_to_nanosec(getStatistic(StCycleDiskWriteIOCycles));
+        }
+
+        CriticalBlock block(sect);
+        unsigned __int64 openValue = cachedio ? cachedio->getStatistic(kind) : 0;
+        return openValue + fileStats.getStatisticValue(kind);
+    }
 
     IFileIO *open();
 
@@ -6452,10 +6522,7 @@ public:
     {
         CriticalBlock block(sect);
         if (cachedio)
-        {
-            cachedio->close();
-            cachedio.clear();
-        }
+            forceClose();
     }
     offset_t appendFile(IFile *file,offset_t pos,offset_t len)
     {
@@ -6470,6 +6537,12 @@ public:
         io->setSize(size);
     }
 
+    void forceClose()
+    {
+        cachedio->close();
+        mergeStats(fileStats, cachedio);
+        cachedio.clear();
+    }
 };
 
 class CLazyFileIOCache: public CInterface, implements IFileIOCache
@@ -6524,7 +6597,7 @@ public:
                 break;
             if (!oldest)
                 break;
-            oldest->cachedio.clear();
+            oldest->forceClose();
             //If previously had max ios then we now have space.
             if (n == max)
                 break;

+ 8 - 1
system/jlib/jfile.hpp

@@ -24,7 +24,7 @@
 #include "jio.hpp"
 #include "jtime.hpp"
 #include "jsocket.hpp"
-
+#include "jstatcodes.h"
 
 interface IFile;
 interface IFileIO;
@@ -159,6 +159,12 @@ extern jlib_decl unsigned sortDirectory(
 
 
 
+enum FileIOStat {
+    FIOSiocycles,
+    FIOSiobytes,
+    FIOSmax
+};
+
 //This is closed by releasing the interface
 interface IFileIO : public IInterface
 {
@@ -169,6 +175,7 @@ interface IFileIO : public IInterface
     virtual void setSize(offset_t size) = 0;
     virtual void flush() = 0;
     virtual void close() = 0;       // no other access is allowed after this call
+    virtual unsigned __int64 getStatistic(StatisticKind kind) = 0;
 };
 
 interface IFileIOCache : extends IInterface

+ 11 - 1
system/jlib/jfile.ipp

@@ -22,6 +22,7 @@
 #include "jfile.hpp"
 #include "jmutex.hpp"
 #include "jio.ipp"
+#include <atomic>
 
 #ifndef _WIN32
 #include <sys/types.h>
@@ -107,10 +108,11 @@ public:
     virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len);
     virtual void flush();
     virtual void close();
+    virtual unsigned __int64 getStatistic(StatisticKind kind);
 
     bool create(const char * filename, bool replace);
     bool open(const char * filename);
-    
+
     HANDLE queryHandle() { return file; } // for debugging
 
 
@@ -123,6 +125,12 @@ protected:
     IFEflags            extraFlags;
     atomic_t            bytesRead;
     atomic_t            bytesWritten;
+    std::atomic<cycle_t> ioReadCycles;
+    std::atomic<cycle_t> ioWriteCycles;
+    std::atomic<__uint64> ioReadBytes;
+    std::atomic<__uint64> ioWriteBytes;
+    std::atomic<__uint64> ioReads;
+    std::atomic<__uint64> ioWrites;
 private:
     void setPos(offset_t pos);
 
@@ -141,6 +149,7 @@ public:
     virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { UNIMPLEMENTED; return 0; }
     virtual void flush() { io->flush(); }
     virtual void close() { io->close(); }
+    virtual unsigned __int64 getStatistic(StatisticKind kind) { return io->getStatistic(kind); }
 
 protected:
     Linked<IFileIO>     io;
@@ -163,6 +172,7 @@ public:
     virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len);
     virtual void flush();
     virtual void close();
+    virtual unsigned __int64 getStatistic(StatisticKind kind);
 
     virtual void setSize(offset_t size);
     virtual IFileAsyncResult *readAsync(offset_t pos, size32_t len, void * data);

+ 5 - 0
system/jlib/jlzw.cpp

@@ -2235,6 +2235,11 @@ public:
         return ret;
     }
 
+    virtual unsigned __int64 getStatistic(StatisticKind kind)
+    {
+        return fileio->getStatistic(kind);
+    }
+
     void setSize(offset_t size) { UNIMPLEMENTED; }
     offset_t appendFile(IFile *file,offset_t pos,offset_t len) { UNIMPLEMENTED; }
 

+ 191 - 0
system/jlib/jstatcodes.h

@@ -0,0 +1,191 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+
+#ifndef JSTATCODES_H
+#define JSTATCODES_H
+
+#define ActivityScopePrefix "a"
+#define EdgeScopePrefix "e"
+#define SubGraphScopePrefix "sg"
+#define GraphScopePrefix "graph"
+#define CONST_STRLEN(x) (sizeof(x)-1)       // sizeof(const-string) = strlen(const-string) + 1 byte for the \0 terminator
+#define MATCHES_CONST_PREFIX(search, prefix) (strncmp(search, prefix, CONST_STRLEN(prefix)) == 0)
+
+enum CombineStatsAction
+{
+    MergeStats,
+    ReplaceStats,
+    AppendStats,
+};
+
+enum StatisticCreatorType
+{
+    SCTnone,
+    SCTall,
+    SCTunknown,
+    SCThthor,
+    SCTroxie,
+    SCTroxieSlave,
+    SCTthor,
+    SCTthorMaster,
+    SCTthorSlave,
+    SCTeclcc,
+    SCTesp,
+    SCTsummary,                         // used to maintain the summary time over all thors (mainly for sorting)
+    SCTmax,
+};
+
+enum StatisticScopeType
+{
+    SSTnone,
+    SSTall,
+    SSTglobal,                          // root scope
+    SSTgraph,                           // identifies a graph
+    SSTsubgraph,
+    SSTactivity,
+    SSTallocator,                       // identifies an allocator
+    SSTsection,                         // A section within the query - not a great differentiator
+    SSTcompilestage,                    // a stage within the compilation process
+    SSTdfuworkunit,                     // a reference to an executing dfu workunit
+    SSTedge,
+    SSTmax
+};
+
+enum StatisticMeasure
+{
+    SMeasureNone,
+    SMeasureAll,
+    SMeasureTimeNs,                     // Elapsed time in nanoseconds
+    SMeasureTimestampUs,                // timestamp/when - a point in time (to the microsecond)
+    SMeasureCount,                      // a count of the number of occurrences
+    SMeasureSize,                       // a quantity of memory (or disk) measured in bytes
+    SMeasureLoad,                       // measure of cpu activity (stored as 1/1000000 core)
+    SMeasureSkew,                       // a measure of skew. 0 = perfectly balanced, range [-10000..infinity]
+    SMeasureNode,                       // A node number within a cluster (0 = master)
+    SMeasurePercent,                    // actually stored as parts per million, displayed as a percentage
+    SMeasureIPV4,
+    SMeasureCycle,
+    SMeasureMax,
+};
+
+//This macro can be used to generate multiple variations of a statistics kind, but probably not needed any more
+//e.g.,     DEFINE_SKEW_STAT(Time, Elapsed)
+
+#define DEFINE_SKEW_STAT(x, y) \
+    St ## x ## Min ## y = (St ## x ## y | StMinX), \
+    St ## x ## Max ## y = (St ## x ## y | StMaxX), \
+    St ## x ## Ave ## y = (St ## x ## y | StAvgX), \
+    St ## Skew ## y = (St ## x ## y | StSkew), \
+    St ## SkewMin ## y = (St ## x ## y | StSkewMin), \
+    St ## SkewMax ## y = (St ## x ## y | StSkewMax), \
+    St ## NodeMin ## y = (St ## x ## y | StNodeMin), \
+    St ## NodeMax ## y = (St ## x ## y | StNodeMax),
+
+//The values in this enumeration are stored persistently.  The associated values must not be changed.
+//If you add an entry here you must also update statsMetaData
+//NOTE: All statistic names should be unique with the type prefix removed. Since the prefix is replaced with Skew/Min/etc.
+enum StatisticKind
+{
+    StKindNone,
+    StKindAll,
+
+    StWhenGraphStarted,                 // When a graph starts
+    StWhenGraphFinished,                // When a graph stopped
+    StWhenFirstRow,                     // When the first row is processed by slave activity
+    StWhenQueryStarted,
+    StWhenQueryFinished,
+    StWhenCreated,
+    StWhenCompiled,
+    StWhenWorkunitModified,             // Not sure this is very useful
+
+    StTimeElapsed,                      // Elapsed wall time between first row and last row
+    StTimeLocalExecute,                 // Time spend processing just this activity
+    StTimeTotalExecute,                 // Time executing this activity and all inputs
+    StTimeRemaining,
+
+    StSizeGeneratedCpp,
+    StSizePeakMemory,
+    StSizeMaxRowSize,
+
+    StNumRowsProcessed,                 // on edge
+    StNumSlaves,                        // on edge
+    StNumStarted,                       // on edge
+    StNumStopped,                       // on edge
+    StNumIndexSeeks,
+    StNumIndexScans,
+    StNumIndexWildSeeks,
+    StNumIndexSkips,
+    StNumIndexNullSkips,
+    StNumIndexMerges,
+    StNumIndexMergeCompares,
+    StNumPreFiltered,
+    StNumPostFiltered,
+    StNumBlobCacheHits,
+    StNumLeafCacheHits,
+    StNumNodeCacheHits,
+    StNumBlobCacheAdds,
+    StNumLeafCacheAdds,
+    StNumNodeCacheAdds,
+    StNumPreloadCacheHits,
+    StNumPreloadCacheAdds,
+    StNumServerCacheHits,
+    StNumIndexAccepted,
+    StNumIndexRejected,
+    StNumAtmostTriggered,
+    StNumDiskSeeks,
+    StNumIterations,
+    StLoadWhileSorting,                 // Average load while processing a sort?
+    StNumLeftRows,
+    StNumRightRows,
+    StPerReplicated,
+    StNumDiskRowsRead,
+    StNumIndexRowsRead,
+    StNumDiskAccepted,
+    StNumDiskRejected,
+
+    StTimeSoapcall,                     // Time spent waiting for soapcalls
+    StTimeFirstExecute,                 // Time waiting for first record from this activity
+    StTimeDiskReadIO,
+    StTimeDiskWriteIO,
+    StSizeDiskRead,
+    StSizeDiskWrite,
+    StCycleDiskReadIOCycles,
+    StCycleDiskWriteIOCycles,
+    StNumDiskReads,
+    StNumDiskWrites,
+
+    StMax,
+
+    //For any quantity there is potentially the following variants.
+    //These modifiers ORd with the values above to form a compound type.
+    StKindMask                          = 0x0ffff,
+    StVariantScale                      = (StKindMask+1),
+    StMinX                              = 0x10000,  // the minimum value
+    StMaxX                              = 0x20000,  // the maximum value
+    StAvgX                              = 0x30000,  // the average value
+    StSkew                              = 0x40000,  // the skew on a particular node
+    StSkewMin                           = 0x50000,  // the minimum skew
+    StSkewMax                           = 0x60000,  // the maximum skew
+    StNodeMin                           = 0x70000,  // the node containing the minimum
+    StNodeMax                           = 0x80000,  // the node containing the maximum
+    StDeltaX                            = 0x90000,  // a difference in the value of X
+    StNextModifier                      = 0xa0000,
+
+};
+
+#endif

+ 47 - 10
system/jlib/jstats.cpp

@@ -22,6 +22,7 @@
 #include "jiter.ipp"
 #include "jlog.hpp"
 #include "jregexp.hpp"
+#include "jfile.hpp"
 
 #ifdef _WIN32
 #include <sys/timeb.h>
@@ -62,7 +63,7 @@ void setStatisticsComponentName(StatisticCreatorType processType, const char * p
 //--------------------------------------------------------------------------------------------------------------------
 
 // Textual forms of the different enumerations, first items are for none and all.
-static const char * const measureNames[] = { "", "all", "ns", "ts", "cnt", "sz", "cpu", "skw", "node", "ppm", "ip", NULL };
+static const char * const measureNames[] = { "", "all", "ns", "ts", "cnt", "sz", "cpu", "skw", "node", "ppm", "ip", "cy", NULL };
 static const char * const creatorTypeNames[]= { "", "all", "unknown", "hthor", "roxie", "roxie:s", "thor", "thor:m", "thor:s", "eclcc", "esp", "summary", NULL };
 static const char * const scopeTypeNames[] = { "", "all", "global", "graph", "subgraph", "activity", "allocator", "section", "compile", "dfu", "edge", NULL };
 
@@ -322,6 +323,9 @@ void formatStatistic(StringBuffer & out, unsigned __int64 value, StatisticMeasur
     case SMeasureIPV4:
         formatIPV4(out, value);
         break;
+    case SMeasureCycle:
+        out.append(value);
+        break;
     default:
         throwUnexpected();
     }
@@ -366,6 +370,7 @@ const char * queryMeasurePrefix(StatisticMeasure measure)
     case SMeasureNode:          return "Node";
     case SMeasurePercent:       return "Per";
     case SMeasureIPV4:          return "Ip";
+    case SMeasureCycle:         return "Cycle";
     default:
         throwUnexpected();
     }
@@ -391,14 +396,15 @@ StatsMergeAction queryMergeMode(StatisticMeasure measure)
     switch (measure)
     {
     case SMeasureTimeNs:        return StatsMergeSum;
-    case SMeasureTimestampUs:   return StatsMergeKeep;
+    case SMeasureTimestampUs:   return StatsMergeKeepNonZero;
     case SMeasureCount:         return StatsMergeSum;
     case SMeasureSize:          return StatsMergeSum;
     case SMeasureLoad:          return StatsMergeMax;
     case SMeasureSkew:          return StatsMergeMax;
-    case SMeasureNode:          return StatsMergeKeep;
+    case SMeasureNode:          return StatsMergeKeepNonZero;
     case SMeasurePercent:       return StatsMergeReplace;
-    case SMeasureIPV4:          return StatsMergeKeep;
+    case SMeasureIPV4:          return StatsMergeKeepNonZero;
+    case SMeasureCycle:         return StatsMergeSum;
     default:
         throwUnexpected();
     }
@@ -482,6 +488,7 @@ extern jlib_decl StatsMergeAction queryMergeMode(StatisticKind kind)
 #define NODESTAT(y) STAT(Node, y, SMeasureNode)
 #define PERSTAT(y) STAT(Per, y, SMeasurePercent)
 #define IPV4STAT(y) STAT(IPV4, y, SMeasureIPV4)
+#define CYCLESTAT(y) STAT(Cycle, y, SMeasureCycle)
 
 //The following variants are used where a different tag name is required
 #define TIMESTAT2(y, dft) TAGSTAT(Time, y, SMeasureTimeNs, dft)
@@ -554,6 +561,14 @@ static const StatisticMeta statsMetaData[StMax] = {
     { NUMSTAT(DiskRejected) },
     { TIMESTAT(Soapcall) },
     { TIMESTAT(FirstExecute) },
+    { TIMESTAT(DiskReadIO) },
+    { TIMESTAT(DiskWriteIO) },
+    { SIZESTAT(DiskRead) },
+    { SIZESTAT(DiskWrite) },
+    { CYCLESTAT(DiskReadIOCycles) },
+    { CYCLESTAT(DiskWriteIOCycles) },
+    { NUMSTAT(DiskReads) },
+    { NUMSTAT(DiskWrites) },
 };
 
 
@@ -683,6 +698,7 @@ inline void mergeUpdate(StatisticMeasure measure, unsigned __int64 & value, cons
     case SMeasureSize:
     case SMeasureLoad:
     case SMeasureSkew:
+    case SMeasureCycle:
         value += otherValue;
         break;
     case SMeasureTimestampUs:
@@ -702,8 +718,10 @@ unsigned __int64 mergeStatisticValue(unsigned __int64 prevValue, unsigned __int6
 {
     switch (mergeAction)
     {
-    case StatsMergeKeep:
-        return prevValue;
+    case StatsMergeKeepNonZero:
+        if (prevValue)
+            return prevValue;
+        return newValue;
     case StatsMergeAppend:
     case StatsMergeReplace:
         return newValue;
@@ -799,6 +817,10 @@ void StatisticsMapping::createMappings()
 }
 
 const StatisticsMapping allStatistics;
+const StatisticsMapping diskLocalStatistics(StCycleDiskReadIOCycles, StSizeDiskRead, StNumDiskReads, StCycleDiskWriteIOCycles, StSizeDiskWrite, StNumDiskWrites, StKindNone);
+const StatisticsMapping diskRemoteStatistics(StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StKindNone);
+const StatisticsMapping diskReadRemoteStatistics(StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StKindNone);
+const StatisticsMapping diskWriteRemoteStatistics(StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StKindNone);
 
 //--------------------------------------------------------------------------------------------------------------------
 
@@ -1404,13 +1426,15 @@ void CRuntimeStatisticCollection::merge(const CRuntimeStatisticCollection & othe
         StatisticKind kind = other.getKind(i);
         unsigned __int64 value = other.getStatisticValue(kind);
         if (value)
-        {
-            StatsMergeAction mergeAction = queryMergeMode(kind);
-            mergeStatistic(kind, other.getStatisticValue(kind), mergeAction);
-        }
+            mergeStatistic(kind, other.getStatisticValue(kind));
     }
 }
 
+void CRuntimeStatisticCollection::mergeStatistic(StatisticKind kind, unsigned __int64 value)
+{
+    queryStatistic(kind).merge(value, queryMergeMode(kind));
+}
+
 void CRuntimeStatisticCollection::rollupStatistics(unsigned numTargets, IContextLogger * const * targets) const
 {
     ForEachItem(iStat)
@@ -1529,6 +1553,19 @@ bool CRuntimeStatisticCollection::serialize(MemoryBuffer& out) const
     return numValid != 0;
 }
 
+
+void mergeStats(CRuntimeStatisticCollection & stats, IFileIO * file)
+{
+    if (!file)
+        return;
+
+    ForEachItemIn(iStat, stats)
+    {
+        StatisticKind kind = stats.getKind(iStat);
+        stats.mergeStatistic(kind, file->getStatistic(kind), queryMergeMode(kind));
+    }
+}
+
 //---------------------------------------------------
 
 bool ScopedItemFilter::matchDepth(unsigned low, unsigned high) const

+ 16 - 160
system/jlib/jstats.h

@@ -21,165 +21,7 @@
 
 #include "jlib.hpp"
 
-#define ActivityScopePrefix "a"
-#define EdgeScopePrefix "e"
-#define SubGraphScopePrefix "sg"
-#define GraphScopePrefix "graph"
-#define CONST_STRLEN(x) (sizeof(x)-1)       // sizeof(const-string) = strlen(const-string) + 1 byte for the \0 terminator
-#define MATCHES_CONST_PREFIX(search, prefix) (strncmp(search, prefix, CONST_STRLEN(prefix)) == 0)
-
-enum CombineStatsAction
-{
-    MergeStats,
-    ReplaceStats,
-    AppendStats,
-};
-
-enum StatisticCreatorType
-{
-    SCTnone,
-    SCTall,
-    SCTunknown,
-    SCThthor,
-    SCTroxie,
-    SCTroxieSlave,
-    SCTthor,
-    SCTthorMaster,
-    SCTthorSlave,
-    SCTeclcc,
-    SCTesp,
-    SCTsummary,                         // used to maintain the summary time over all thors (mainly for sorting)
-    SCTmax,
-};
-
-enum StatisticScopeType
-{
-    SSTnone,
-    SSTall,
-    SSTglobal,                          // root scope
-    SSTgraph,                           // identifies a graph
-    SSTsubgraph,
-    SSTactivity,
-    SSTallocator,                       // identifies an allocator
-    SSTsection,                         // A section within the query - not a great differentiator
-    SSTcompilestage,                    // a stage within the compilation process
-    SSTdfuworkunit,                     // a reference to an executing dfu workunit
-    SSTedge,
-    SSTmax
-};
-
-enum StatisticMeasure
-{
-    SMeasureNone,
-    SMeasureAll,
-    SMeasureTimeNs,                     // Elapsed time in nanoseconds
-    SMeasureTimestampUs,                // timestamp/when - a point in time (to the microsecond)
-    SMeasureCount,                      // a count of the number of occurrences
-    SMeasureSize,                       // a quantity of memory (or disk) measured in bytes
-    SMeasureLoad,                       // measure of cpu activity (stored as 1/1000000 core)
-    SMeasureSkew,                       // a measure of skew. 0 = perfectly balanced, range [-10000..infinity]
-    SMeasureNode,                       // A node number within a cluster (0 = master)
-    SMeasurePercent,                    // actually stored as parts per million, displayed as a percentage
-    SMeasureIPV4,
-    SMeasureMax,
-};
-
-//This macro can be used to generate multiple variations of a statistics kind, but probably not needed any more
-//e.g.,     DEFINE_SKEW_STAT(Time, Elapsed)
-
-#define DEFINE_SKEW_STAT(x, y) \
-    St ## x ## Min ## y = (St ## x ## y | StMinX), \
-    St ## x ## Max ## y = (St ## x ## y | StMaxX), \
-    St ## x ## Ave ## y = (St ## x ## y | StAvgX), \
-    St ## Skew ## y = (St ## x ## y | StSkew), \
-    St ## SkewMin ## y = (St ## x ## y | StSkewMin), \
-    St ## SkewMax ## y = (St ## x ## y | StSkewMax), \
-    St ## NodeMin ## y = (St ## x ## y | StNodeMin), \
-    St ## NodeMax ## y = (St ## x ## y | StNodeMax),
-
-//The values in this enumeration are stored persistently.  The associated values must not be changed.
-//If you add an entry here you must also update statsMetaData
-//NOTE: All statistic names should be unique with the type prefix removed. Since the prefix is replaced with Skew/Min/etc.
-enum StatisticKind
-{
-    StKindNone,
-    StKindAll,
-
-    StWhenGraphStarted,                 // When a graph starts
-    StWhenGraphFinished,                // When a graph stopped
-    StWhenFirstRow,                     // When the first row is processed by slave activity
-    StWhenQueryStarted,
-    StWhenQueryFinished,
-    StWhenCreated,
-    StWhenCompiled,
-    StWhenWorkunitModified,             // Not sure this is very useful
-
-    StTimeElapsed,                      // Elapsed wall time between first row and last row
-    StTimeLocalExecute,                 // Time spend processing just this activity
-    StTimeTotalExecute,                 // Time executing this activity and all inputs
-    StTimeRemaining,
-
-    StSizeGeneratedCpp,
-    StSizePeakMemory,
-    StSizeMaxRowSize,
-
-    StNumRowsProcessed,                 // on edge
-    StNumSlaves,                        // on edge
-    StNumStarted,                       // on edge
-    StNumStopped,                       // on edge
-    StNumIndexSeeks,
-    StNumIndexScans,
-    StNumIndexWildSeeks,
-    StNumIndexSkips,
-    StNumIndexNullSkips,
-    StNumIndexMerges,
-    StNumIndexMergeCompares,
-    StNumPreFiltered,
-    StNumPostFiltered,
-    StNumBlobCacheHits,
-    StNumLeafCacheHits,
-    StNumNodeCacheHits,
-    StNumBlobCacheAdds,
-    StNumLeafCacheAdds,
-    StNumNodeCacheAdds,
-    StNumPreloadCacheHits,
-    StNumPreloadCacheAdds,
-    StNumServerCacheHits,
-    StNumIndexAccepted,
-    StNumIndexRejected,
-    StNumAtmostTriggered,
-    StNumDiskSeeks,
-    StNumIterations,
-    StLoadWhileSorting,                 // Average load while processing a sort?
-    StNumLeftRows,
-    StNumRightRows,
-    StPerReplicated,
-    StNumDiskRowsRead,
-    StNumIndexRowsRead,
-    StNumDiskAccepted,
-    StNumDiskRejected,
-
-    StTimeSoapcall,                     // Time spent waiting for soapcalls
-    StTimeFirstExecute,                 // Time waiting for first record from this activity
-
-    StMax,
-
-    //For any quantity there is potentially the following variants.
-    //These modifiers ORd with the values above to form a compound type.
-    StKindMask                          = 0x0ffff,
-    StVariantScale                      = (StKindMask+1),
-    StMinX                              = 0x10000,  // the minimum value
-    StMaxX                              = 0x20000,  // the maximum value
-    StAvgX                              = 0x30000,  // the average value
-    StSkew                              = 0x40000,  // the skew on a particular node
-    StSkewMin                           = 0x50000,  // the minimum skew
-    StSkewMax                           = 0x60000,  // the maximum skew
-    StNodeMin                           = 0x70000,  // the node containing the minimum
-    StNodeMax                           = 0x80000,  // the node containing the maximum
-    StDeltaX                            = 0x90000,  // a difference in the value of X
-    StNextModifier                      = 0xa0000,
-
-};
+#include "jstatcodes.h"
 
 inline StatisticKind queryStatsVariant(StatisticKind kind) { return (StatisticKind)(kind & ~StKindMask); }
 
@@ -260,7 +102,7 @@ interface IStatisticCollectionIterator : public IIteratorOf<IStatisticCollection
 
 enum StatsMergeAction
 {
-    StatsMergeKeep,
+    StatsMergeKeepNonZero,
     StatsMergeReplace,
     StatsMergeSum,
     StatsMergeMin,
@@ -429,6 +271,10 @@ protected:
 };
 
 extern const jlib_decl StatisticsMapping allStatistics;
+extern const jlib_decl StatisticsMapping diskLocalStatistics;
+extern const jlib_decl StatisticsMapping diskRemoteStatistics;
+extern const jlib_decl StatisticsMapping diskReadRemoteStatistics;
+extern const jlib_decl StatisticsMapping diskWriteRemoteStatistics;
 
 //---------------------------------------------------------------------------------------------------------------------
 
@@ -470,6 +316,13 @@ public:
         unsigned num = mapping.numStatistics();
         values = new CRuntimeStatistic[num+1]; // extra entry is to gather unexpected stats
     }
+    CRuntimeStatisticCollection(const CRuntimeStatisticCollection & _other) : mapping(_other.mapping)
+    {
+        unsigned num = mapping.numStatistics();
+        values = new CRuntimeStatistic[num+1];
+        for (unsigned i=0; i <= num; i++)
+            values[i].set(_other.values[i].get());
+    }
     ~CRuntimeStatisticCollection()
     {
         delete [] values;
@@ -500,6 +353,7 @@ public:
     {
         queryStatistic(kind).merge(value, mergeAction);
     }
+    void mergeStatistic(StatisticKind kind, unsigned __int64 value);
     void setStatistic(StatisticKind kind, unsigned __int64 value)
     {
         queryStatistic(kind).set(value);
@@ -543,6 +397,8 @@ private:
 };
 
 
+extern jlib_decl void mergeStats(CRuntimeStatisticCollection & stats, IFileIO * file);
+
 //---------------------------------------------------------------------------------------------------------------------
 
 //A class for minimizing the overhead of collecting timestamps.

+ 22 - 8
thorlcr/activities/csvread/thcsvrslave.cpp

@@ -51,6 +51,7 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase, public CThorDat
         Linked<IEngineRowAllocator> allocator;
         CCsvReadSlaveActivity &activity;
         Owned<ISerialStream> inputStream;
+        OwnedIFileIO iFileIO;
         CSVSplitter csvSplitter;
         CRC32 inputCRC;
         bool readFinished;
@@ -83,6 +84,7 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase, public CThorDat
         CCsvPartHandler(CCsvReadSlaveActivity &_activity) : CDiskPartHandlerBase(_activity), activity(_activity)
         {
             readFinished = false;
+            localOffset = 0;
             //Initialise information...
             ICsvParameters * csvInfo = activity.helper->queryCsvParameters();
             csvSplitter.init(activity.helper->getMaxColumns(), csvInfo, activity.csvQuote, activity.csvSeparate, activity.csvTerminate, activity.csvEscape);
@@ -99,16 +101,19 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase, public CThorDat
             localOffset = 0;
             CDiskPartHandlerBase::open();
             readFinished = false;
-            OwnedIFileIO iFileIO;
-            if (compressed)
+
             {
-                iFileIO.setown(createCompressedFileReader(iFile, activity.eexp));
-                if (!iFileIO)
-                    throw MakeActivityException(&activity, 0, "Failed to open block compressed file '%s'", filename.get());
-                checkFileCrc = false;
+                CriticalBlock block(statsCs);
+                if (compressed)
+                {
+                    iFileIO.setown(createCompressedFileReader(iFile, activity.eexp));
+                    if (!iFileIO)
+                        throw MakeActivityException(&activity, 0, "Failed to open block compressed file '%s'", filename.get());
+                    checkFileCrc = false;
+                }
+                else
+                    iFileIO.setown(iFile->open(IFOread));
             }
-            else
-                iFileIO.setown(iFile->open(IFOread));
 
             inputStream.setown(createFileSerialStream(iFileIO));
             if (activity.headerLines)
@@ -139,6 +144,9 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase, public CThorDat
         }
         virtual void close(CRC32 &fileCRC)
         {
+            CriticalBlock block(statsCs);
+            mergeStats(fileStats, iFileIO);
+            iFileIO.clear();
             inputStream.clear();
             fileCRC = inputCRC;
         }
@@ -163,6 +171,12 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase, public CThorDat
             }
         }
         offset_t getLocalOffset() { return localOffset; }
+        virtual void gatherStats(CRuntimeStatisticCollection & merged)
+        {
+            CriticalBlock block(statsCs);
+            CDiskPartHandlerBase::gatherStats(merged);
+            mergeStats(merged, iFileIO);
+        }
     };
 
     unsigned &getHeaderLines(unsigned subFile)

+ 43 - 11
thorlcr/activities/diskread/thdiskreadslave.cpp

@@ -108,6 +108,18 @@ friend class CDiskRecordPartHandler;
 
 /////////////////////////////////////////////////
 
+void mergeStats(CRuntimeStatisticCollection & stats, IExtRowStream * in)
+{
+    if (in)
+    {
+        ForEachItemIn(iStat, stats)
+        {
+            StatisticKind kind = stats.getKind(iStat);
+            stats.mergeStatistic(kind, in->getStatistic(kind));
+        }
+    }
+}
+
 class CDiskRecordPartHandler : public CDiskPartHandlerBase
 {
     Owned<IExtRowStream> in;
@@ -149,6 +161,13 @@ public:
     {
         in->prefetchDone();
     }
+    virtual void gatherStats(CRuntimeStatisticCollection & merged)
+    {
+        CriticalBlock block(statsCs);
+        CDiskPartHandlerBase::gatherStats(merged);
+        mergeStats(merged, in);
+    }
+
 };
 
 /////////////////////////////////////////////////
@@ -205,20 +224,25 @@ void CDiskRecordPartHandler::open()
         rwFlags |= rw_crc;
     if (activity.grouped)
         rwFlags |= rw_grouped;
-    if (compressed)
+
     {
-        rwFlags |= rw_compress;
-        in.setown(createRowStream(iFile, activity.queryDiskRowInterfaces(), rwFlags, activity.eexp));
-        if (!in.get())
+        CriticalBlock block(statsCs);
+        if (compressed)
         {
-            if (!blockCompressed)
-                throw MakeStringException(-1,"Unsupported compressed file format: %s", filename.get());
-            else 
-                throw MakeActivityException(&activity, 0, "Failed to open block compressed file '%s'", filename.get());
+            rwFlags |= rw_compress;
+            in.setown(createRowStream(iFile, activity.queryDiskRowInterfaces(), rwFlags, activity.eexp));
+            if (!in.get())
+            {
+                if (!blockCompressed)
+                    throw MakeStringException(-1,"Unsupported compressed file format: %s", filename.get());
+                else
+                    throw MakeActivityException(&activity, 0, "Failed to open block compressed file '%s'", filename.get());
+            }
         }
+        else
+            in.setown(createRowStream(iFile, activity.queryDiskRowInterfaces(), rwFlags));
     }
-    else
-        in.setown(createRowStream(iFile, activity.queryDiskRowInterfaces(), rwFlags));
+
     if (!in)
         throw MakeActivityException(&activity, 0, "Failed to open file '%s'", filename.get());
     ActPrintLog(&activity, "%s[part=%d]: %s (%s)", kindStr, which, activity.isFixedDiskWidth ? "fixed" : "variable", filename.get());
@@ -239,8 +263,10 @@ void CDiskRecordPartHandler::open()
 
 void CDiskRecordPartHandler::close(CRC32 &fileCRC)
 {
+    CriticalBlock block(statsCs);
     if (in) 
         in->stop(&fileCRC);
+    mergeStats(fileStats, in);
     in.clear();
 }
 
@@ -617,6 +643,7 @@ public:
     {
         if (out)
         {
+            out->stop();
             out->Release();
             out = NULL;
         }
@@ -741,6 +768,8 @@ public:
 // IRowStream
     virtual void stop()
     {
+        if (partHandler)
+            partHandler->stop();
         dataLinkStop();
     }
     CATCH_NEXTROW()
@@ -864,6 +893,8 @@ public:
 // IRowStream
     virtual void stop()
     {
+        if (partHandler)
+            partHandler->stop();
         dataLinkStop();
     }
     CATCH_NEXTROW()
@@ -982,7 +1013,8 @@ public:
 // IRowStream
     virtual void stop()
     {
-        partHandler.clear();
+        if (partHandler)
+            partHandler->stop();
         dataLinkStop();
     }
     CATCH_NEXTROW()

+ 1 - 1
thorlcr/activities/loop/thloop.cpp

@@ -147,7 +147,7 @@ public:
     virtual void getActivityStats(IStatisticGatherer & stats)
     {
         CMasterActivity::getActivityStats(stats);
-        loopCounterProgress->getStats(stats, false);
+        loopCounterProgress->getStats(stats, false, false);
     }
 
 };

+ 1 - 0
thorlcr/activities/thactivityutil.cpp

@@ -745,6 +745,7 @@ public:
     virtual offset_t size() { return primaryio->size(); }
     virtual size32_t write(offset_t pos, size32_t len, const void * data) { return primaryio->write(pos, len, data); }
     virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=-1) { return primaryio->appendFile(file, pos, len); }
+    virtual unsigned __int64 getStatistic(StatisticKind kind) { return primaryio->getStatistic(kind); }
     virtual void setSize(offset_t size) { primaryio->setSize(size); }
     virtual void flush() { primaryio->flush(); }
     virtual void close() { primaryio->close(); }

+ 30 - 0
thorlcr/activities/thdiskbase.cpp

@@ -33,6 +33,9 @@ CDiskReadMasterBase::CDiskReadMasterBase(CMasterGraphElement *info) : CMasterAct
 {
     hash = NULL;
     inputProgress.setown(new ProgressInfo);
+    statTimeDiskRead.setown(new CThorStats(StTimeDiskReadIO));
+    statSizeDiskRead.setown(new CThorStats(StSizeDiskRead));
+    statNumDiskReads.setown(new CThorStats(StNumDiskReads));
 }
 
 void CDiskReadMasterBase::init()
@@ -120,6 +123,20 @@ void CDiskReadMasterBase::deserializeStats(unsigned node, MemoryBuffer &mb)
     rowcount_t progress;
     mb.read(progress);
     inputProgress->set(node, progress);
+
+    CRuntimeStatisticCollection fileStats(diskReadRemoteStatistics);
+    fileStats.deserialize(mb);
+    statTimeDiskRead->set(node, fileStats.getStatisticValue(StTimeDiskReadIO));
+    statSizeDiskRead->set(node, fileStats.getStatisticValue(StSizeDiskRead));
+    statNumDiskReads->set(node, fileStats.getStatisticValue(StNumDiskReads));
+}
+
+void CDiskReadMasterBase::getActivityStats(IStatisticGatherer & stats)
+{
+    CMasterActivity::getActivityStats(stats);
+    statTimeDiskRead->getStats(stats, false, false);
+    statSizeDiskRead->getStats(stats, false, false);
+    statNumDiskReads->getStats(stats, false, false);
 }
 
 void CDiskReadMasterBase::getEdgeStats(IStatisticGatherer & stats, unsigned idx)
@@ -234,6 +251,10 @@ CWriteMasterBase::CWriteMasterBase(CMasterGraphElement *info) : CMasterActivity(
 {
     publishReplicatedDone = !globals->getPropBool("@replicateAsync", true);
     replicateProgress.setown(new ProgressInfo);
+    statTimeDiskWrite.setown(new CThorStats(StTimeDiskWriteIO));
+    statSizeDiskWrite.setown(new CThorStats(StSizeDiskWrite));
+    statNumDiskWrites.setown(new CThorStats(StNumDiskWrites));
+
     diskHelperBase = (IHThorDiskWriteArg *)queryHelper();
     targetOffset = 0;
 }
@@ -244,6 +265,12 @@ void CWriteMasterBase::deserializeStats(unsigned node, MemoryBuffer &mb)
     unsigned repPerc;
     mb.read(repPerc);
     replicateProgress->set(node, repPerc);
+
+    CRuntimeStatisticCollection fileStats(diskWriteRemoteStatistics);
+    fileStats.deserialize(mb);
+    statTimeDiskWrite->set(node, fileStats.getStatisticValue(StTimeDiskWriteIO));
+    statSizeDiskWrite->set(node, fileStats.getStatisticValue(StSizeDiskWrite));
+    statNumDiskWrites->set(node, fileStats.getStatisticValue(StNumDiskWrites));
 }
 
 void CWriteMasterBase::getActivityStats(IStatisticGatherer & stats)
@@ -254,6 +281,9 @@ void CWriteMasterBase::getActivityStats(IStatisticGatherer & stats)
         replicateProgress->processInfo();
         stats.addStatistic(StPerReplicated, replicateProgress->queryAverage() * 10000);
     }
+    statTimeDiskWrite->getStats(stats, false, false);
+    statSizeDiskWrite->getStats(stats, false, false);
+    statNumDiskWrites->getStats(stats, false, false);
 }
 
 void CWriteMasterBase::preStart(size32_t parentExtractSz, const byte *parentExtract)

+ 7 - 0
thorlcr/activities/thdiskbase.ipp

@@ -32,6 +32,9 @@ protected:
     Owned<CSlavePartMapping> mapping;
     IHash *hash;
     Owned<ProgressInfo> inputProgress;
+    Owned<CThorStats> statTimeDiskRead;
+    Owned<CThorStats> statSizeDiskRead;
+    Owned<CThorStats> statNumDiskReads;
     StringAttr fileName;
 
 public:
@@ -40,6 +43,7 @@ public:
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave);
     virtual void validateFile(IDistributedFile *file) { }
     virtual void deserializeStats(unsigned node, MemoryBuffer &mb);
+    virtual void getActivityStats(IStatisticGatherer & stats);
     virtual void getEdgeStats(IStatisticGatherer & stats, unsigned idx);
 };
 
@@ -47,6 +51,9 @@ class CWriteMasterBase : public CMasterActivity
 {
     bool publishReplicatedDone;
     Owned<ProgressInfo> replicateProgress;
+    Owned<CThorStats> statTimeDiskWrite;
+    Owned<CThorStats> statSizeDiskWrite;
+    Owned<CThorStats> statNumDiskWrites;
     __int64 recordsProcessed;
     bool published;
     StringAttr fileName;

+ 27 - 5
thorlcr/activities/thdiskbaseslave.cpp

@@ -64,7 +64,7 @@ void getPartsMetaInfo(ThorDataLinkMetaInfo &metaInfo, CThorDataLink &link, unsig
 //////////////////////////////////////////////
 
 CDiskPartHandlerBase::CDiskPartHandlerBase(CDiskReadSlaveActivityBase &_activity) 
-    : activity(_activity)
+    : activity(_activity), fileStats(diskReadRemoteStatistics)
 {
     checkFileCrc = activity.checkFileCrc;
     which = 0;
@@ -281,6 +281,11 @@ void CDiskReadSlaveActivityBase::serializeStats(MemoryBuffer &mb)
 {
     CSlaveActivity::serializeStats(mb);
     mb.append(diskProgress);
+
+    CRuntimeStatisticCollection activeStats(diskReadRemoteStatistics);
+    if (partHandler)
+        partHandler->gatherStats(activeStats);
+    activeStats.serialize(mb);
 }
 
 
@@ -342,7 +347,10 @@ void CDiskWriteSlaveActivityBase::open()
     if (extend||(external&&!query))
         twFlags |= TW_Extend;
 
-    Owned<IFileIO> iFileIO = createMultipleWrite(this, *partDesc, diskRowMinSz, twFlags, compress, ecomp, this, &abortSoon, (external&&!query) ? &tempExternalName : NULL);
+    {
+        CriticalBlock block(statsCs);
+        outputIO.setown(createMultipleWrite(this, *partDesc, diskRowMinSz, twFlags, compress, ecomp, this, &abortSoon, (external&&!query) ? &tempExternalName : NULL));
+    }
 
     if (compress)
     {
@@ -353,12 +361,12 @@ void CDiskWriteSlaveActivityBase::open()
     Owned<IFileIOStream> stream;
     if (wantRaw())
     {
-        outraw.setown(createBufferedIOStream(iFileIO));
+        outraw.setown(createBufferedIOStream(outputIO));
         stream.set(outraw);
     }
     else
     {
-        stream.setown(createIOStream(iFileIO));
+        stream.setown(createIOStream(outputIO));
         unsigned rwFlags = 0;
         if (grouped)
             rwFlags |= rw_grouped;
@@ -403,6 +411,13 @@ void CDiskWriteSlaveActivityBase::close()
             uncompressedBytesWritten = outraw->tell();
             outraw.clear();
         }
+
+        {
+            CriticalBlock block(statsCs);
+            mergeStats(fileStats, outputIO);
+            outputIO.clear();
+        }
+
         if (!rfsQueryParallel && dlfn.isExternal() && !lastNode())
         {
             rowcount_t rows = processed & THORDATALINK_COUNT_MASK;
@@ -424,7 +439,8 @@ void CDiskWriteSlaveActivityBase::close()
         removeFiles();
 }
 
-CDiskWriteSlaveActivityBase::CDiskWriteSlaveActivityBase(CGraphElementBase *container) : ProcessSlaveActivity(container)
+CDiskWriteSlaveActivityBase::CDiskWriteSlaveActivityBase(CGraphElementBase *container)
+: ProcessSlaveActivity(container), fileStats(diskWriteRemoteStatistics)
 {
     grouped = false;
     compress = calcFileCrc = false;
@@ -480,8 +496,14 @@ void CDiskWriteSlaveActivityBase::abort()
 
 void CDiskWriteSlaveActivityBase::serializeStats(MemoryBuffer &mb)
 {
+    CriticalBlock block(statsCs);
+
     ProcessSlaveActivity::serializeStats(mb);
     mb.append(replicateDone);
+
+    CRuntimeStatisticCollection activeStats(fileStats);
+    mergeStats(activeStats, outputIO);
+    activeStats.serialize(mb);
 }
 
 // ICopyFileProgress

+ 9 - 1
thorlcr/activities/thdiskbaseslave.ipp

@@ -38,8 +38,9 @@ protected:
     StringAttr filename, logicalFilename;
     unsigned __int64 fileBaseOffset;
     const char *kindStr;
-
+    CRuntimeStatisticCollection fileStats;
     CDiskReadSlaveActivityBase &activity;
+    CriticalSection statsCs;
 
     bool eoi;
 public:
@@ -51,6 +52,10 @@ public:
 
     virtual void stop();
     virtual const void *nextRow() = 0;
+    virtual void gatherStats(CRuntimeStatisticCollection & merged)
+    {
+        merged.merge(fileStats);
+    }
 
 // IThorDiskCallback
     virtual offset_t getFilePosition(const void * row);
@@ -101,6 +106,7 @@ class CDiskWriteSlaveActivityBase : public ProcessSlaveActivity, implements ICop
 {
 protected:
     IHThorDiskWriteArg *diskHelperBase;
+    Owned<IFileIO> outputIO;
     Owned<IExtRowWriter> out;
     Owned<IFileIOStream> outraw;
     Owned<IPartDescriptor> partDesc;
@@ -114,6 +120,8 @@ protected:
     unsigned usageCount;
     CDfsLogicalFileName dlfn;
     StringBuffer tempExternalName;
+    CRuntimeStatisticCollection fileStats;
+    CriticalSection statsCs;
 
     void open();
     void removeFiles();

+ 22 - 8
thorlcr/activities/xmlread/thxmlreadslave.cpp

@@ -47,6 +47,7 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase, public CThorDat
         Owned<ICrcIOStream> crcStream;
         Owned<IXMLParse> xmlParser;
         CRC32 inputCRC;
+        OwnedIFileIO iFileIO;
         Owned<IIOStream> inputIOstream;
         offset_t localOffset;  // not sure what this is for 
         Linked<IEngineRowAllocator> allocator;
@@ -64,16 +65,20 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase, public CThorDat
         virtual void open() 
         {
             CDiskPartHandlerBase::open();
-            OwnedIFileIO iFileIO;
-            if (compressed)
+
             {
-                iFileIO.setown(createCompressedFileReader(iFile, activity.eexp));
-                if (!iFileIO)
-                    throw MakeActivityException(&activity, 0, "Failed to open block compressed file '%s'", filename.get());
-                checkFileCrc = false;
+                CriticalBlock block(statsCs);
+                if (compressed)
+                {
+                    iFileIO.setown(createCompressedFileReader(iFile, activity.eexp));
+                    if (!iFileIO)
+                        throw MakeActivityException(&activity, 0, "Failed to open block compressed file '%s'", filename.get());
+                    checkFileCrc = false;
+                }
+                else
+                    iFileIO.setown(iFile->open(IFOread));
             }
-            else
-                iFileIO.setown(iFile->open(IFOread));
+
             Owned<IIOStream> stream = createIOStream(iFileIO);
             if (stream && checkFileCrc)
             {
@@ -89,10 +94,13 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase, public CThorDat
         }
         virtual void close(CRC32 &fileCRC)
         {
+            CriticalBlock block(statsCs);
             xmlParser.clear();
             inputIOstream.clear();
             if (checkFileCrc)
                 fileCRC.reset(~crcStream->queryCrc()); // MORE should prob. change stream to use CRC32
+            mergeStats(fileStats, iFileIO);
+            iFileIO.clear();
         }
 
         const void *nextRow()
@@ -177,6 +185,12 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase, public CThorDat
             return localOffset; // NH->JCS is this what is wanted? (or should it be stream position relative?
         }
     
+        virtual void gatherStats(CRuntimeStatisticCollection & merged)
+        {
+            CriticalBlock block(statsCs);
+            CDiskPartHandlerBase::gatherStats(merged);
+            mergeStats(merged, iFileIO);
+        }
     };
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);

+ 5 - 2
thorlcr/graph/thgraphmaster.cpp

@@ -2825,9 +2825,12 @@ void CThorStats::processInfo()
     calculateSkew();
 }
 
-void CThorStats::getStats(IStatisticGatherer & stats, bool suppressMinMaxWhenEqual)
+void CThorStats::getStats(IStatisticGatherer & stats, bool suppressMinMaxWhenEqual, bool suppressIfZero)
 {
     processInfo();
+    if (suppressIfZero && (0 == tot))
+        return;
+
     //MORE: For most measures (not time stamps etc.) it would be sensible to output the total here....
     if (!suppressMinMaxWhenEqual || (maxSkew != minSkew))
     {
@@ -2876,7 +2879,7 @@ void ProgressInfo::processInfo() // reimplement as counts have special flags (i.
 
 void ProgressInfo::getStats(IStatisticGatherer & stats)
 {
-    CThorStats::getStats(stats, true);
+    CThorStats::getStats(stats, true, false);
     stats.addStatistic(kind, tot);
     stats.addStatistic(StNumSlaves, counts.ordinality());
     stats.addStatistic(StNumStarted, startcount);

+ 2 - 2
thorlcr/graph/thgraphmaster.ipp

@@ -211,7 +211,7 @@ public:
     unsigned queryMinNode() { return minNode; }
 
     void set(unsigned node, unsigned __int64 count);
-    void getStats(IStatisticGatherer & stats, bool suppressMinMaxWhenEqual);
+    void getStats(IStatisticGatherer & stats, bool suppressMinMaxWhenEqual, bool suppressIfZero);
 
 protected:
     void calculateSkew();
@@ -222,7 +222,7 @@ class graphmaster_decl CTimingInfo : public CThorStats
 {
 public:
     CTimingInfo();
-    void getStats(IStatisticGatherer & stats) { CThorStats::getStats(stats, false); }
+    void getStats(IStatisticGatherer & stats) { CThorStats::getStats(stats, false, false); }
 };
 
 class graphmaster_decl ProgressInfo : public CThorStats

+ 20 - 1
thorlcr/graph/thgraphslave.cpp

@@ -1336,6 +1336,7 @@ class CLazyFileIO : public CInterface, implements IFileIO, implements IDelayedFi
     Linked<IExpander> expander;
     bool compressed;
     StringAttr filename;
+    CRuntimeStatisticCollection fileStats;
     CriticalSection crit;
     Owned<IFileIO> iFileIO; // real IFileIO
 
@@ -1343,7 +1344,8 @@ class CLazyFileIO : public CInterface, implements IFileIO, implements IDelayedFi
 
 public:
     IMPLEMENT_IINTERFACE;
-    CLazyFileIO(CFileCache &_cache, const char *_filename, IReplicatedFile *_repFile, bool _compressed, IExpander *_expander) : cache(_cache), filename(_filename), repFile(_repFile), compressed(_compressed), expander(_expander)
+    CLazyFileIO(CFileCache &_cache, const char *_filename, IReplicatedFile *_repFile, bool _compressed, IExpander *_expander)
+    : cache(_cache), filename(_filename), repFile(_repFile), compressed(_compressed), expander(_expander), fileStats(diskLocalStatistics)
     {
     }
     ~CLazyFileIO()
@@ -1382,7 +1384,10 @@ public:
     {
         CriticalBlock b(crit);
         if (iFileIO)
+        {
+            mergeStats(fileStats, iFileIO);
             iFileIO->close();
+        }
         iFileIO.clear();
     }
     virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1)
@@ -1397,6 +1402,20 @@ public:
         checkOpen();
         iFileIO->setSize(size);
     }
+    virtual unsigned __int64 getStatistic(StatisticKind kind)
+    {
+        switch (kind)
+        {
+        case StTimeDiskReadIO:
+            return cycle_to_nanosec(getStatistic(StCycleDiskReadIOCycles));
+        case StTimeDiskWriteIO:
+            return cycle_to_nanosec(getStatistic(StCycleDiskWriteIOCycles));
+        }
+
+        CriticalBlock b(crit);
+        unsigned __int64 openValue = iFileIO ? iFileIO->getStatistic(kind) : 0;
+        return openValue + fileStats.getStatisticValue(kind);
+    }
 // IDelayedFile impl.
     virtual IMemoryMappedFile *queryMappedFile() { return NULL; }
     virtual IFileIO *queryFileIO() { return this; }

+ 4 - 0
thorlcr/thorutil/thormisc.hpp

@@ -274,6 +274,10 @@ public:
     {
         stream->reinit(offset, len, maxRows);
     }
+    virtual unsigned __int64 getStatistic(StatisticKind kind)
+    {
+        return stream->getStatistic(kind);
+    }
 };