Browse Source

HPCC-21289 Refactor the disk io and network stats

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 6 years ago
parent
commit
6c582d2a31
6 changed files with 501 additions and 266 deletions
  1. 19 0
      ecl/eclcc/eclcc.cpp
  2. 414 266
      system/jlib/jdebug.cpp
  3. 63 0
      system/jlib/jdebug.hpp
  4. 1 0
      system/jlib/jmutex.hpp
  5. 2 0
      system/jlib/jstatcodes.h
  6. 2 0
      system/jlib/jstats.cpp

+ 19 - 0
ecl/eclcc/eclcc.cpp

@@ -1153,11 +1153,18 @@ void EclCC::processSingleQuery(EclCompileInstance & instance,
     if (syntaxChecking || instance.archive)
         severityMapper->addMapping("security", "ignore");
 
+    //This option isn't particularly useful, but is here to help test the code to gather disk information
+    bool optGatherDiskStats = instance.wu->getDebugValueBool("gatherEclccDiskStats", false);
     size32_t prevErrs = errorProcessor.errCount();
     cycle_t startCycles = get_cycles_now();
     CpuInfo systemStartTime(false, true);
     CpuInfo processStartTime(true, false);
 
+    //Avoid creating the OsDiskStats object if not gathering timings to avoid unnecessary initialisation
+    OwnedPtr<OsDiskStats> systemIoStartInfo;
+    if (optGatherDiskStats)
+        systemIoStartInfo.setown(new OsDiskStats(true));
+
     addTimeStamp(instance.wu, SSTcompilestage, "compile", StWhenStarted);
     const char * sourcePathname = queryContents ? str(queryContents->querySourcePath()) : NULL;
     const char * defaultErrorPathname = sourcePathname ? sourcePathname : queryAttributePath;
@@ -1442,6 +1449,9 @@ void EclCC::processSingleQuery(EclCompileInstance & instance,
     unsigned __int64 totalTimeNs = cycle_to_nanosec(get_cycles_now() - startCycles);
     CpuInfo systemFinishTime(false, true);
     CpuInfo processFinishTime(true, false);
+    OwnedPtr<OsDiskStats> systemIoFinishInfo;
+    if (optGatherDiskStats)
+        systemIoFinishInfo.setown(new OsDiskStats(true));
     instance.stats.generateTime = (unsigned)nanoToMilli(totalTimeNs) - instance.stats.parseTime;
     updateWorkunitStat(instance.wu, SSTcompilestage, "compile", StTimeElapsed, NULL, totalTimeNs);
 
@@ -1456,6 +1466,15 @@ void EclCC::processSingleQuery(EclCompileInstance & instance,
         updateWorkunitStat(instance.wu, SSTcompilestage, "compile", StTimeUser, NULL, processElapsed.getUserNs());
         updateWorkunitStat(instance.wu, SSTcompilestage, "compile", StTimeSystem, NULL, processElapsed.getSystemNs());
     }
+
+    if (optGatherDiskStats)
+    {
+        const BlockIoStats summaryIo = systemIoFinishInfo->querySummaryStats() - systemIoStartInfo->querySummaryStats();
+        if (summaryIo.rd_sectors)
+            updateWorkunitStat(instance.wu, SSTcompilestage, "compile", StSizeOsDiskRead, NULL, summaryIo.rd_sectors * summaryIo.getSectorSize());
+        if (summaryIo.wr_sectors)
+            updateWorkunitStat(instance.wu, SSTcompilestage, "compile", StSizeOsDiskWrite, NULL, summaryIo.wr_sectors * summaryIo.getSectorSize());
+    }
 }
 
 void EclCC::processDefinitions(EclRepositoryArray & repositories)

+ 414 - 266
system/jlib/jdebug.cpp

@@ -938,6 +938,54 @@ unsigned CpuInfo::getUserPercent() const
 
 //===========================================================================
 
+void BlockIoStats::clear()
+{
+    rd_ios = 0;
+    rd_merges = 0;
+    rd_sectors = 0;
+    rd_ticks = 0;
+    wr_ios = 0;
+    wr_merges = 0;
+    wr_sectors = 0;
+    wr_ticks = 0;
+    ticks = 0;
+    aveq = 0;
+}
+
+BlockIoStats & BlockIoStats::operator += (const BlockIoStats & other)
+{
+    rd_ios += other.rd_ios;
+    rd_merges += other.rd_merges;
+    rd_sectors += other.rd_sectors;
+    rd_ticks += other.rd_ticks;
+    wr_ios += other.wr_ios;
+    wr_merges += other.wr_merges;
+    wr_sectors += other.wr_sectors;
+    wr_ticks += other.wr_ticks;
+    ticks += other.ticks;
+    aveq += other.aveq;
+    return *this;
+}
+
+BlockIoStats BlockIoStats::operator - (const BlockIoStats & other) const
+{
+    BlockIoStats result;
+    result.rd_ios = rd_ios - other.rd_ios;
+    result.rd_merges = rd_merges - other.rd_merges;
+    result.rd_sectors = rd_sectors - other.rd_sectors;
+    result.rd_ticks = rd_ticks - other.rd_ticks;
+    result.wr_ios = wr_ios - other.wr_ios;
+    result.wr_merges = wr_merges - other.wr_merges;
+    result.wr_sectors = wr_sectors - other.wr_sectors;
+    result.wr_ticks = wr_ticks - other.wr_ticks;
+    result.ticks = ticks - other.ticks;
+    result.aveq = aveq - other.aveq;
+    return result;
+}
+
+//===========================================================================
+
+
 // Performance Monitor
 
 #ifdef _WIN32
@@ -1706,61 +1754,360 @@ public:
 
 //---------------------------------------------------------------------------
 
-class CExtendedStats  // Disk network and cpu stats
+class OsDiskInfo
 {
+public:
+    OsDiskInfo();
+    ~OsDiskInfo();
 
-    struct blkio_info 
+    unsigned getNumPartitions() const { return nparts; }
+    unsigned mapPartition(unsigned major, unsigned minor) const;
+    const char * queryPartitionName(unsigned p) const { return partition[p].name; }
+
+protected:
+    void gatherPartitions();
+    void initMajorMinor();
+    bool isDisk(unsigned int major, unsigned int minor) const
     {
-        unsigned rd_ios;        // Read I/O operations 
-        unsigned rd_merges;     // Reads merged 
-        __uint64 rd_sectors;    // Sectors read 
-        unsigned rd_ticks;      // Time in queue + service for read 
-        unsigned wr_ios;        // Write I/O operations 
-        unsigned wr_merges;     // Writes merged 
-        __uint64 wr_sectors;    // Sectors written 
-        unsigned wr_ticks;      // Time in queue + service for write 
-        unsigned ticks;         // Time of requests in queue 
-        unsigned aveq;          // Average queue length 
-    };
+        unsigned mm = (major<<16)+minor;
+        bool found = diskMajorMinor.contains(mm);
+        if (found)
+            return true;
+        if (IDE_DISK_MAJOR(major))
+            return ((minor&0x3F)==0);
+        if (SCSI_DISK_MAJOR(major))
+            return ((minor&0x0F)==0);
+        if (OTHER_DISK_MAJOR(major))
+            return ((minor&0x0F)==0);
+        return 0;
+    }
 
-    struct cpu_info 
+private:
+    struct part_info
     {
-        __uint64 user;
-        __uint64 system;
-        __uint64 idle;
-        __uint64 iowait;
+        unsigned int major;
+        unsigned int minor;
+        char name[32];
     };
 
-    struct net_info
+    unsigned nparts = 0;
+    part_info *partition = nullptr;
+    UnsignedArray diskMajorMinor;
+};
+
+OsDiskInfo::OsDiskInfo()
+{
+    initMajorMinor();
+    gatherPartitions();
+}
+
+OsDiskInfo::~OsDiskInfo()
+{
+    free(partition);
+}
+
+void OsDiskInfo::gatherPartitions()
+{
+    char ln[256];
+    part_info pi;
+    FILE* diskfp = fopen("/proc/diskstats", "r");
+    if (!diskfp)
+        return;
+    nparts = 0;
+    while (fgets(ln, sizeof(ln), diskfp))
     {
-        __uint64 rxbytes;
-        __uint64 rxpackets;
-        __uint64 rxerrors;
-        __uint64 rxdrops;
-        __uint64 txbytes;
-        __uint64 txpackets;
-        __uint64 txerrors;
-        __uint64 txdrops;
-    };
+        unsigned reads = 0;
+        if (sscanf(ln, "%4d %4d %31s %u", &pi.major, &pi.minor, pi.name, &reads) == 4)
+        {
+            unsigned p = 0;
+            while ((p<nparts) && (partition[p].major != pi.major || partition[p].minor != pi.minor))
+                p++;
+            if ((p==nparts) && reads && isDisk(pi.major,pi.minor))
+            {
+                nparts++;
+                partition = (part_info *)realloc(partition,nparts*sizeof(part_info));
+                partition[p] = pi;
+            }
+        }
+    }
+    fclose(diskfp);
+}
+
+
+
+void OsDiskInfo::initMajorMinor()
+{
+#ifdef __linux__
+    // MCK - wish libblkid could do this ...
+    // Another way might also be to look for:
+    //   /sys/block/sd*
+    //   /sys/block/nvme*
+    // and match those with entries in /proc/diskstats
+    StringBuffer cmd("lsblk -o TYPE,MAJ:MIN --pairs");
+    Owned<IPipeProcess> pipe = createPipeProcess();
+    if (pipe->run("list disks", cmd, nullptr, false, true, true, 8192))
+    {
+        StringBuffer output;
+        Owned<ISimpleReadStream> pipeReader = pipe->getOutputStream();
+        readSimpleStream(output, *pipeReader);
+        unsigned exitcode = pipe->wait();
+        if ( (exitcode == 0) && (output.length() > 0) )
+        {
+            StringArray lines;
+            lines.appendList(output, "\n");
+            ForEachItemIn(idx, lines)
+            {
+                // line: TYPE="disk" MAJ:MIN="259:0"
+                unsigned majnum, minnum;
+                if (2 == sscanf(lines.item(idx), "TYPE=\"disk\" MAJ:MIN=\"%u:%u\"", &majnum, &minnum))
+                {
+                    unsigned mm = (majnum<<16)+minnum;
+                    diskMajorMinor.appendUniq(mm);
+                }
+            }
+        }
+        else
+        {
+            StringBuffer outputErr;
+            Owned<ISimpleReadStream> pipeReaderErr = pipe->getErrorStream();
+            readSimpleStream(outputErr, *pipeReaderErr);
+            if (outputErr.length() > 0)
+                WARNLOG("WARNING: Pipe: output: %s", outputErr.str());
+        }
+    }
+#endif // __linux__
+}
 
-    struct part_info 
+unsigned OsDiskInfo::mapPartition(unsigned major, unsigned minor) const
+{
+    for (unsigned p = 0; p < nparts; p++)
     {
-        unsigned int major; 
-        unsigned int minor; 
-        char name[32];
-    };
-    
+        if (partition[p].major == major && partition[p].minor == minor)
+            return p;
+    }
+    return (unsigned)-1;
+}
+
+
+static Singleton<OsDiskInfo> globalOsDiskInfo;
+MODULE_INIT(INIT_PRIORITY_JDEBUG2)
+{
+    return true;
+}
+MODULE_EXIT()
+{
+    delete globalOsDiskInfo.queryExisting();
+}
+
+const OsDiskInfo & queryGlobalOsDiskInfo()
+{
+    return *globalOsDiskInfo.query([] { return new OsDiskInfo; });
+}
+#endif
+
+//---------------------------------------------------------------------------------------------------------------------
+
+OsDiskStats::OsDiskStats()
+{
+    stats = new BlockIoStats[getNumPartitions()];
+}
+
+OsDiskStats::OsDiskStats(bool updateNow) : OsDiskStats()
+{
+    if (updateNow)
+        updateCurrent();
+}
 
-    part_info *partition;
+OsDiskStats::~OsDiskStats()
+{
+    delete [] stats;
+}
+
+unsigned OsDiskStats::getNumPartitions() const
+{
+#ifdef _WIN32
+    return 0;
+#else
+    return queryGlobalOsDiskInfo().getNumPartitions();
+#endif
+}
+
+
+bool OsDiskStats::updateCurrent()
+{
+#ifdef _WIN32
+    //MORE: This should be updated (probably using the performance counters api in windows).
+    //Revisit if windows ever becomes a supported server platform
+    return false;
+#else
+    FILE* diskfp = fopen("/proc/diskstats", "r");
+    if (!diskfp)
+        return false;
+
+    total.clear();
+    char ln[256];
+    while (fgets(ln, sizeof(ln), diskfp))
+    {
+        unsigned major, minor;
+        BlockIoStats blkio;
+        unsigned items = sscanf(ln, "%4d %4d %*s %u %u %llu %u %u %u %llu %u %*u %u %u",
+                   &major, &minor,
+                   &blkio.rd_ios, &blkio.rd_merges,
+                   &blkio.rd_sectors, &blkio.rd_ticks,
+                   &blkio.wr_ios, &blkio.wr_merges,
+                   &blkio.wr_sectors, &blkio.wr_ticks,
+                   &blkio.ticks, &blkio.aveq);
+
+        if (items == 6)
+        {
+            // hopefully not this branch!
+            blkio.rd_sectors = blkio.rd_merges;
+            blkio.wr_sectors = blkio.rd_ticks;
+            blkio.rd_ios = 0;
+            blkio.rd_merges = 0;
+            blkio.rd_ticks = 0;
+            blkio.wr_ios = 0;
+            blkio.wr_merges = 0;
+            blkio.wr_ticks = 0;
+            blkio.ticks = 0;
+            blkio.aveq = 0;
+            items = 12;
+        }
+        if (items == 12)
+        {
+            unsigned match = queryGlobalOsDiskInfo().mapPartition(major, minor);
+            if (match != NotFound)
+            {
+                stats[match] = blkio;
+                total += blkio;
+            }
+        }
+    }
+    fclose(diskfp);
+    return true;
+#endif
+}
+
+//---------------------------------------------------------------------------------------------------------------------
+
+OsNetworkStats::OsNetworkStats(const char * ifname)
+{
+    updateCurrent(ifname);
+}
+
+
+OsNetworkStats OsNetworkStats::operator - (const OsNetworkStats & other) const
+{
+    OsNetworkStats result;
+    result.rxbytes = rxbytes - other.rxbytes;
+    result.rxpackets = rxpackets - other.rxpackets;
+    result.rxerrors = rxerrors - other.rxerrors;
+    result.rxdrops = rxdrops - other.rxdrops;
+    result.txbytes = txbytes - other.txbytes;
+    result.txpackets = txpackets - other.txpackets;
+    result.txerrors = txerrors - other.txerrors;
+    result.txdrops = txdrops - other.txdrops;
+    return result;
+}
+
+bool OsNetworkStats::updateCurrent(const char * ifname)
+{
+#ifdef _WIN32
+    //MORE: Implement on windows when we have a requirement
+    return false;
+#else
+    FILE *netfp = fopen("/proc/net/dev", "r");
+    if (!netfp)
+        return false;
+    char ln[512];
+    // Read two lines
+    if (!fgets(ln, sizeof(ln), netfp) || !fgets(ln, sizeof(ln), netfp)) {
+        fclose(netfp);
+        return false;
+    }
+    unsigned txskip = 2;
+    bool hasbyt = false;
+    if (strstr(ln,"compressed")) {
+        txskip = 4;
+        hasbyt = true;
+    }
+    else if (strstr(ln,"bytes"))
+        hasbyt = true;
+
+    size_t ilen = ifname ? strlen(ifname) : 0;
+    while (fgets(ln, sizeof(ln), netfp)) {
+        const char *s = ln;
+        skipSp(s);
+
+        const char * next = nullptr;
+        if (!ifname)
+        {
+            const char * colon = strchr(s, ':');
+            if (colon)
+                next = colon + 1;
+        }
+        else
+        {
+            if ((strncmp(s, ifname, ilen)==0) && (s[ilen]==':'))
+                next = s + ilen + 1;
+        }
+
+        if (next)
+        {
+            s = next;
+            skipSp(s);
+            if (hasbyt) {
+                rxbytes = readDecNum(s);
+                skipSp(s);
+            }
+            else
+                rxbytes = 0;
+            rxpackets = readDecNum(s);
+            skipSp(s);
+            rxerrors = readDecNum(s);
+            skipSp(s);
+            rxdrops = readDecNum(s);
+            skipSp(s);
+            while (txskip--) {
+                readDecNum(s);
+                skipSp(s);
+            }
+            if (hasbyt) {
+                txbytes = readDecNum(s);
+                skipSp(s);
+            }
+            else
+                txbytes = 0;
+            txpackets = readDecNum(s);
+            skipSp(s);
+            txerrors = readDecNum(s);
+            skipSp(s);
+            txdrops = readDecNum(s);
+            if (ifname)
+                break;
+        }
+    }
+    fclose(netfp);
+    return true;
+#endif
+}
+
+//---------------------------------------------------------------------------------------------------------------------
+
+#ifndef _WIN32
+//---------------------------------------------------------------------------
+
+class CExtendedStats  // Disk network and cpu stats
+{
     unsigned nparts;
-    blkio_info *newblkio;
-    blkio_info *oldblkio;
+    OsDiskStats *newDiskStats;
+    OsDiskStats *oldDiskStats;
     CpuInfo newcpu;
     unsigned numcpu;
     CpuInfo oldcpu;
     CpuInfo cpu;
-    net_info oldnet;
-    net_info newnet;
+    OsNetworkStats oldnet;
+    OsNetworkStats newnet;
     unsigned ncpu;
     bool first;
     char *kbuf;
@@ -1773,28 +2120,6 @@ class CExtendedStats  // Disk network and cpu stats
 
     StringBuffer ifname;
 
-#ifdef __linux__
-    UnsignedArray diskMajorMinor;
-#endif
-
-    bool isDisk(unsigned int major, unsigned int minor)
-    {
-#ifdef __linux__
-        unsigned mm = (major<<16)+minor;
-        bool found = diskMajorMinor.contains(mm);
-        if (found)
-            return true;
-#endif
-        if (IDE_DISK_MAJOR(major)) 
-            return ((minor&0x3F)==0);
-        if (SCSI_DISK_MAJOR(major)) 
-            return ((minor&0x0F)==0);
-        if (OTHER_DISK_MAJOR(major)) 
-            return ((minor&0x0F)==0);
-        return 0;
-    }
-
-
     bool getNextCPU()
     {
         if (!ncpu) {
@@ -1821,138 +2146,19 @@ class CExtendedStats  // Disk network and cpu stats
 
     bool getDiskInfo()
     {
-        char ln[256];
-        part_info pi;
-        FILE* diskfp = fopen("/proc/diskstats", "r");
-        if (!diskfp)
-            return false;
-        if (!newblkio)
-        {
-            nparts = 0;
-            while (fgets(ln, sizeof(ln), diskfp))
-            {
-                unsigned reads = 0;
-                if (sscanf(ln, "%4d %4d %31s %u", &pi.major, &pi.minor, pi.name, &reads) == 4)
-                {
-                    unsigned p = 0;
-                    while ((p<nparts) && (partition[p].major != pi.major || partition[p].minor != pi.minor))
-                        p++;
-                    if ((p==nparts) && reads && isDisk(pi.major,pi.minor))
-                    {
-                        nparts++;
-                        partition = (part_info *)realloc(partition,nparts*sizeof(part_info));
-                        partition[p] = pi;
-                    }
-                }
-            }
-            free(newblkio);
-            free(oldblkio);
-            newblkio = (blkio_info *)calloc(sizeof(blkio_info),nparts);
-            oldblkio = (blkio_info* )calloc(sizeof(blkio_info),nparts);
-        }
-        rewind(diskfp);
-        // could skip lines we know aren't significant here
-        while (fgets(ln, sizeof(ln), diskfp))
+        if (!newDiskStats)
         {
-            blkio_info blkio;
-            unsigned items = sscanf(ln, "%4d %4d %*s %u %u %llu %u %u %u %llu %u %*u %u %u",
-                       &pi.major, &pi.minor,
-                       &blkio.rd_ios, &blkio.rd_merges,
-                       &blkio.rd_sectors, &blkio.rd_ticks, 
-                       &blkio.wr_ios, &blkio.wr_merges,
-                       &blkio.wr_sectors, &blkio.wr_ticks,
-                       &blkio.ticks, &blkio.aveq);
-            if (items == 6)
-            {
-                // hopefully not this branch!
-                blkio.rd_sectors = blkio.rd_merges;
-                blkio.wr_sectors = blkio.rd_ticks;
-                blkio.rd_ios = 0;
-                blkio.rd_merges = 0;
-                blkio.rd_ticks = 0;
-                blkio.wr_ios = 0;
-                blkio.wr_merges = 0;
-                blkio.wr_ticks = 0;
-                blkio.ticks = 0;
-                blkio.aveq = 0;
-                items = 12;
-            }
-            if (items == 12)
-            {
-                for (unsigned p = 0; p < nparts; p++)
-                {
-                    if (partition[p].major == pi.major && partition[p].minor == pi.minor)
-                    {
-                        newblkio[p] = blkio;
-                        break;
-                    }
-                }
-            }
+            newDiskStats = new OsDiskStats;
+            oldDiskStats = new OsDiskStats;
         }
-        fclose(diskfp);
-        return true;
+
+        return newDiskStats->updateCurrent();
     }
 
 
     bool getNetInfo()
     {
-        FILE *netfp = fopen("/proc/net/dev", "r");
-        if (!netfp)
-            return false;
-        char ln[512];
-        // Read two lines
-        if (!fgets(ln, sizeof(ln), netfp) || !fgets(ln, sizeof(ln), netfp)) {
-            fclose(netfp);
-            return false;
-        }
-        unsigned txskip = 2;
-        bool hasbyt = false;
-        if (strstr(ln,"compressed")) {
-            txskip = 4;
-            hasbyt = true;
-        }
-        else if (strstr(ln,"bytes")) 
-            hasbyt = true;
-
-        while (fgets(ln, sizeof(ln), netfp)) {
-            const char *s = ln;
-            skipSp(s);
-            size_t ilen = ifname.length();
-            if ( (strncmp(s, ifname.str(), ilen)==0) && (s[ilen]==':') ) {
-                s+=(ilen+1);
-                skipSp(s);
-                if (hasbyt) {
-                    newnet.rxbytes = readDecNum(s);
-                    skipSp(s);
-                }
-                else
-                    newnet.rxbytes = 0;
-                newnet.rxpackets = readDecNum(s);
-                skipSp(s);
-                newnet.rxerrors = readDecNum(s);
-                skipSp(s);
-                newnet.rxdrops = readDecNum(s);
-                skipSp(s);
-                while (txskip--) {
-                    readDecNum(s);
-                    skipSp(s);
-                }
-                if (hasbyt) {
-                    newnet.txbytes = readDecNum(s);
-                    skipSp(s);
-                }
-                else
-                    newnet.txbytes = 0;
-                newnet.txpackets = readDecNum(s);
-                skipSp(s);
-                newnet.txerrors = readDecNum(s);
-                skipSp(s);
-                newnet.txdrops = readDecNum(s);
-                break;
-            }
-        }
-        fclose(netfp);
-        return true;
+        return newnet.updateCurrent(ifname);
     }
 
     size32_t getKLog(const char *&data)
@@ -2052,21 +2258,15 @@ public:
 
     CExtendedStats(bool printklog)
     {
-        partition = (part_info *)malloc(sizeof(part_info));
-        nparts = 0;
-        newblkio = NULL;
-        oldblkio = NULL;
+        nparts = queryGlobalOsDiskInfo().getNumPartitions();
+        newDiskStats = NULL;
+        oldDiskStats = NULL;
         first = true;
         ncpu = 0;
         kbuf = nullptr;
         kbufcrc = 0;
-        memset(&oldcpu, 0, sizeof(oldcpu));
-        memset(&newcpu, 0, sizeof(newcpu));
-        memset(&cpu, 0, sizeof(cpu));
         totalcpu = 0;
         numcpu = 0;
-        memset(&oldnet, 0, sizeof(oldnet));
-        memset(&newnet, 0, sizeof(newnet));
         ndisks = 0;
         kbadcnt = 0;
         if (printklog)
@@ -2081,62 +2281,19 @@ public:
 
         if (!getInterfaceName(ifname))
             ifname.set("eth0");
-
-#ifdef __linux__
-        // MCK - wish libblkid could do this ...
-        // Another way might also be to look for:
-        //   /sys/block/sd*
-        //   /sys/block/nvme*
-        // and match those with entries in /proc/diskstats
-        StringBuffer cmd("lsblk -o TYPE,MAJ:MIN --pairs");
-        Owned<IPipeProcess> pipe = createPipeProcess();
-        if (pipe->run("list disks", cmd, nullptr, false, true, true, 8192))
-        {
-            StringBuffer output;
-            Owned<ISimpleReadStream> pipeReader = pipe->getOutputStream();
-            readSimpleStream(output, *pipeReader);
-            unsigned exitcode = pipe->wait();
-            if ( (exitcode == 0) && (output.length() > 0) )
-            {
-                StringArray lines;
-                lines.appendList(output, "\n");
-                ForEachItemIn(idx, lines)
-                {
-                    // line: TYPE="disk" MAJ:MIN="259:0"
-                    unsigned majnum, minnum;
-                    if (2 == sscanf(lines.item(idx), "TYPE=\"disk\" MAJ:MIN=\"%u:%u\"", &majnum, &minnum))
-                    {
-                        unsigned mm = (majnum<<16)+minnum;
-                        diskMajorMinor.appendUniq(mm);
-                    }
-                }
-            }
-            else
-            {
-                StringBuffer outputErr;
-                Owned<ISimpleReadStream> pipeReaderErr = pipe->getErrorStream();
-                readSimpleStream(outputErr, *pipeReaderErr);
-                if (outputErr.length() > 0)
-                    WARNLOG("WARNING: Pipe: output: %s", outputErr.str());
-            }
-        }
-#endif // __linux__
     }
 
     ~CExtendedStats()
     {
-        free(partition);
-        free(newblkio);
-        free(oldblkio);
+        free(newDiskStats);
+        free(oldDiskStats);
         if (kbuf != nullptr)
             free(kbuf);
     }
 
     bool getLine(StringBuffer &out)
     {
-        blkio_info *t = oldblkio;
-        oldblkio = newblkio;
-        newblkio = t;
+        std::swap(oldDiskStats, newDiskStats);
         oldnet = newnet;
         bool gotdisk = getDiskInfo()&&nparts;
         bool gotnet = getNetInfo();
@@ -2155,21 +2312,18 @@ public:
             out.append("DSK: ");
             for (unsigned p = 0; p < nparts; p++)
             {
-
-                unsigned rd_ios = newblkio[p].rd_ios - oldblkio[p].rd_ios;
-                __uint64 rd_sectors = newblkio[p].rd_sectors - oldblkio[p].rd_sectors;
-                unsigned wr_ios = newblkio[p].wr_ios - oldblkio[p].wr_ios;
-                __uint64 wr_sectors = newblkio[p].wr_sectors - oldblkio[p].wr_sectors;
-                unsigned ticks = newblkio[p].ticks - oldblkio[p].ticks;
-                unsigned busy = (unsigned)(100*ticks/deltams);
+                const BlockIoStats & oldStats = oldDiskStats->queryStats(p);
+                const BlockIoStats & newStats = newDiskStats->queryStats(p);
+                BlockIoStats diff = newStats - oldStats;
+                unsigned busy = (unsigned)(100*diff.ticks/deltams);
                 if (busy>100)
                     busy = 100;
                 out.appendf("[%s] r/s=%0.1f kr/s=%0.1f w/s=%0.1f kw/s=%0.1f bsy=%d",
-                           partition[p].name,
-                           perSec(rd_ios,deltams),
-                           perSec(rd_sectors,deltams)/2.0,
-                           perSec(wr_ios,deltams),
-                           perSec(wr_sectors,deltams)/2.0,
+                           queryGlobalOsDiskInfo().queryPartitionName(p),
+                           perSec(diff.rd_ios,deltams),
+                           perSec(diff.rd_sectors,deltams)/2.0,
+                           perSec(diff.wr_ios,deltams),
+                           perSec(diff.wr_sectors,deltams)/2.0,
                            busy);
                 out.append(' ');
             }
@@ -2179,20 +2333,14 @@ public:
             if (out.length()&&(out.charAt(out.length()-1)!=' '))
                 out.append(' ');
             out.appendf("NIC: [%s] ", ifname.str());
-            __uint64 rxbytes = newnet.rxbytes-oldnet.rxbytes;
-            __uint64 rxpackets = newnet.rxpackets-oldnet.rxpackets;
-            __uint64 txbytes = newnet.txbytes-oldnet.txbytes;
-            __uint64 txpackets = newnet.txpackets-oldnet.txpackets;
-            __uint64 rxerrors = newnet.rxerrors-oldnet.rxerrors;
-            __uint64 rxdrops = newnet.rxdrops-oldnet.rxdrops;
-            __uint64 txerrors = newnet.txerrors-oldnet.txerrors;
-            __uint64 txdrops = newnet.txdrops-oldnet.txdrops;
+
+            OsNetworkStats diff = newnet - oldnet;
             out.appendf("rxp/s=%0.1f rxk/s=%0.1f txp/s=%0.1f txk/s=%0.1f rxerrs=%" I64F "d rxdrps=%" I64F "d txerrs=%" I64F "d txdrps=%" I64F "d",
-                       perSec(rxpackets,deltams),
-                       perSec(rxbytes/1024.0,deltams),
-                       perSec(txpackets,deltams),
-                       perSec(txbytes/1024.0,deltams),
-                       rxerrors, rxdrops, txerrors, txdrops);
+                       perSec(diff.rxpackets,deltams),
+                       perSec(diff.rxbytes/1024.0,deltams),
+                       perSec(diff.txpackets,deltams),
+                       perSec(diff.txbytes/1024.0,deltams),
+                       diff.rxerrors, diff.rxdrops, diff.txerrors, diff.txdrops);
             out.append(' ');
         }
         if (totalcpu)

+ 63 - 0
system/jlib/jdebug.hpp

@@ -301,6 +301,69 @@ protected:
     __uint64 ctx =0;
 };
 
+//Information about a single IO device
+class jlib_decl BlockIoStats
+{
+public:
+    void clear();
+    BlockIoStats & operator += (const BlockIoStats & other);
+    BlockIoStats operator - (const BlockIoStats & other) const;
+
+    unsigned getSectorSize() const { return 512; }
+
+public:
+    unsigned rd_ios = 0;        // Read I/O operations
+    unsigned rd_merges = 0;     // Reads merged
+    __uint64 rd_sectors = 0;    // Sectors read
+    unsigned rd_ticks = 0;      // Time in queue + service for read
+    unsigned wr_ios = 0;        // Write I/O operations
+    unsigned wr_merges = 0;     // Writes merged
+    __uint64 wr_sectors = 0;    // Sectors written
+    unsigned wr_ticks = 0;      // Time in queue + service for write
+    unsigned ticks = 0;         // Time of requests in queue
+    unsigned aveq = 0;          // Average queue length
+};
+
+
+//Information about all the block IO devices being tracked
+class jlib_decl OsDiskStats
+{
+public:
+    OsDiskStats();
+    OsDiskStats(bool updateNow);
+    ~OsDiskStats();
+
+    bool updateCurrent();
+    unsigned getNumPartitions() const;
+    const BlockIoStats & queryStats(unsigned i) const { return stats[i]; }
+    const BlockIoStats & querySummaryStats() const { return total; }
+
+protected:
+    BlockIoStats * stats;
+    BlockIoStats total;
+};
+
+
+class jlib_decl OsNetworkStats
+{
+public:
+    OsNetworkStats() = default;
+    OsNetworkStats(const char * ifname);
+
+    bool updateCurrent(const char * ifname);    // ifname = null gathers all matches
+    OsNetworkStats operator - (const OsNetworkStats & other) const;
+
+public:
+    __uint64 rxbytes = 0;
+    __uint64 rxpackets = 0;
+    __uint64 rxerrors = 0;
+    __uint64 rxdrops = 0;
+    __uint64 txbytes = 0;
+    __uint64 txpackets = 0;
+    __uint64 txerrors = 0;
+    __uint64 txdrops = 0;
+};
+
 
 interface IPerfMonHook : extends IInterface
 {

+ 1 - 0
system/jlib/jmutex.hpp

@@ -997,6 +997,7 @@ class Singleton
 {
 public:
     template <typename FUNC> X * query(FUNC factory) { return querySingleton(singleton, cs, factory); }
+    X * queryExisting() const { return singleton.load(std::memory_order_acquire); }
 private:
     std::atomic<X *> singleton{nullptr};
     CriticalSection cs;

+ 2 - 0
system/jlib/jstatcodes.h

@@ -227,6 +227,8 @@ enum StatisticKind
     StCycleUserCycles,                  // Time for in user mode for this process
     StCycleSystemCycles,
     StCycleTotalCycles,
+    StSizeOsDiskRead,
+    StSizeOsDiskWrite,
     StMax,
 
     //For any quantity there is potentially the following variants.

+ 2 - 0
system/jlib/jstats.cpp

@@ -868,6 +868,8 @@ static const StatisticMeta statsMetaData[StMax] = {
     { CYCLESTAT(User) },
     { CYCLESTAT(System) },
     { CYCLESTAT(Total) },
+    { SIZESTAT(OsDiskRead) },
+    { SIZESTAT(OsDiskWrite) },
 };