Explorar o código

HPCC-16173 Add tracing to note lost UDP packets in roxie log

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman %!s(int64=8) %!d(string=hai) anos
pai
achega
5cfdded752
Modificáronse 2 ficheiros con 179 adicións e 2 borrados
  1. 175 0
      system/jlib/jdebug.cpp
  2. 4 2
      system/jlib/jdebug.hpp

+ 175 - 0
system/jlib/jdebug.cpp

@@ -2019,6 +2019,169 @@ static struct CNtKernelInformation
 } NtKernelFunctions;
 #endif
 
+struct PortStats
+{
+    unsigned port;
+    unsigned drops;
+    unsigned rx_queue;
+};
+typedef MapBetween<unsigned, unsigned, PortStats, PortStats> MapPortToPortStats;
+
+class CUdpStatsReporter
+{
+public:
+    CUdpStatsReporter()
+    {
+        dropsCol = -1;
+        portCol = -1;
+        uidCol = -1;
+        queueCol = -1;
+    }
+
+    bool reportUdpInfo(unsigned traceLevel)
+    {
+        if (uidCol==-1 && columnNames.length())
+            return false;
+        FILE *netfp = fopen("/proc/net/udp", "r");
+        if (!netfp)
+            return false;
+        char ln[512];
+        // Read header
+        if (!fgets(ln, sizeof(ln), netfp)) {
+            fclose(netfp);
+            return false;
+        }
+        if (!columnNames.length())
+        {
+            columnNames.appendList(ln, " ");
+            ForEachItemInRev(idx, columnNames)
+            {
+                if (streq(columnNames.item(idx), "rem_address"))
+                    columnNames.add("rem_port", idx+1);
+                else if (streq(columnNames.item(idx), "local_address"))
+                    columnNames.add("local_port", idx+1);
+            }
+            ForEachItemIn(idx2, columnNames)
+            {
+                if (streq(columnNames.item(idx2), "drops"))
+                    dropsCol = idx2;
+                else if (streq(columnNames.item(idx2), "local_port"))
+                    portCol = idx2;
+                else if (streq(columnNames.item(idx2), "rx_queue"))
+                    queueCol = idx2;
+                else if (streq(columnNames.item(idx2), "uid"))
+                    uidCol = idx2;
+            }
+            if (portCol == -1 || queueCol == -1 || uidCol == -1)
+            {
+                uidCol = -1;
+                fclose(netfp);
+                return false;
+            }
+        }
+        int myUid = geteuid();
+        while (fgets(ln, sizeof(ln), netfp))
+        {
+            StringArray cols;
+            cols.appendList(ln, " :");
+            if (cols.length() >= columnNames.length() && atoi(cols.item(uidCol))==myUid)
+            {
+                unsigned queue = strtoul(cols.item(queueCol), NULL, 16);
+                unsigned drops = 0;
+                if (dropsCol)
+                    drops =  strtoul(cols.item(dropsCol), NULL, 10);
+                if (queue || drops)
+                {
+                    unsigned port = strtoul(cols.item(portCol), NULL, 16);
+                    if (traceLevel > 0)
+                        DBGLOG("From /proc/net/udp: port %d rx_queue=%u drops=%u", port, queue, drops);
+                    PortStats *ret = map.getValue(port);
+                    if (!ret)
+                    {
+                        PortStats e = {port, 0, 0};
+                        map.setValue(port, e);
+                        ret = map.getValue(port);
+                        assertex(ret);
+                    }
+                    if (queue > ret->rx_queue)
+                    {
+                        DBGLOG("UDP queue: new max rx_queue: port %d rx_queue=%u drops=%u", port, queue, drops);
+                        ret->rx_queue = queue;
+                    }
+                    if (drops > ret->drops)
+                    {
+                        LOG(MCoperatorError, unknownJob, "DROPPED UDP PACKETS: port %d rx_queue=%u (peak %u) drops=%u (total %i)", port, queue, ret->rx_queue, drops-ret->drops, drops);
+                        ret->drops = drops;
+                    }
+                }
+            }
+        }
+        fclose(netfp);
+        return true;
+    }
+private:
+    MapPortToPortStats map;
+    StringArray columnNames;
+    int dropsCol;
+    int portCol;
+    int uidCol;
+    int queueCol;
+};
+
+class CSnmpStatsReporter
+{
+public:
+    CSnmpStatsReporter()
+    {
+        inErrorsCol = -1;
+        prevErrors = 0;
+    }
+    bool reportSnmpInfo()
+    {
+        if (inErrorsCol==-1 && columnNames.length())
+            return false;
+        FILE *netfp = fopen("/proc/net/snmp", "r");
+        if (!netfp)
+            return false;
+        char ln[512];
+        bool ok = false;
+        while (fgets(ln, sizeof(ln), netfp))
+        {
+            if (strncmp(ln, "Udp:", 4)==0)
+            {
+                if (!columnNames.length())
+                {
+                    columnNames.appendList(ln, " ");
+                    ForEachItemIn(idx, columnNames)
+                    {
+                        if (streq(columnNames.item(idx), "InErrors"))
+                            inErrorsCol = idx;
+                    }
+                }
+                if (fgets(ln, sizeof(ln), netfp))
+                {
+                    StringArray cols;
+                    cols.appendList(ln, " ");
+                    if (cols.length() >= columnNames.length())
+                    {
+                        ok = true;
+                        unsigned errors = strtoul(cols.item(inErrorsCol), NULL, 10);
+                        if (errors > prevErrors)
+                            LOG(MCoperatorError, unknownJob, "UDP InErrors: %u (total %u)", errors-prevErrors, errors);
+                        prevErrors = errors;
+                    }
+                }
+                break;
+            }
+        }
+        fclose(netfp);
+        return ok;
+    }
+private:
+    StringArray columnNames;
+    int inErrorsCol;
+    unsigned prevErrors;
+};
 
 static class CMemoryUsageReporter: public Thread
 {
@@ -2044,6 +2207,8 @@ static class CMemoryUsageReporter: public Thread
     StringBuffer                   secondaryfs;
     CriticalSection                sect; // for getSystemTraceInfo
 
+    CSnmpStatsReporter             snmpStats;
+    CUdpStatsReporter              udpStats;
 
 public:
     CMemoryUsageReporter(unsigned _interval, PerfMonMode _traceMode, IPerfMonHook * _hook, bool printklog)
@@ -2320,6 +2485,11 @@ public:
     {
         StringBuffer str;
         getSystemTraceInfo(str, traceMode&~PerfMonExtended); // initializes the values so that first one we print is meaningful rather than always saying PU=0%
+        if (traceMode&PerfMonUDP)
+        {
+            snmpStats.reportSnmpInfo();
+            udpStats.reportUdpInfo(0);
+        }
         CTimeMon tm(NAMEDCOUNTPERIOD*1000);
         while (!term) {
             if (sem.wait(interval))
@@ -2333,6 +2503,11 @@ public:
                 tm.reset(NAMEDCOUNTPERIOD*1000);
             }
 #endif
+            if (traceMode&PerfMonUDP)
+            {
+                snmpStats.reportSnmpInfo();
+                udpStats.reportUdpInfo(0);
+            }
             if(traceMode&&str.length()) {
                 LOG(MCdebugInfo, unknownJob, "SYS: %s", str.str());
 #ifndef _WIN32

+ 4 - 2
system/jlib/jdebug.hpp

@@ -256,11 +256,13 @@ enum
     PerfMonPackets   = 0x02,
     PerfMonDiskUsage = 0x04,
     //default and full modes:
-    PerfMonExtended  = 0x08,   
+    PerfMonExtended  = 0x08,
+    // UDP packet loss tracing
+    PerfMonUDP       = 0x10,
 #ifdef _WIN32
     PerfMonStandard  = PerfMonProcMem
 #else
-    PerfMonStandard  = PerfMonProcMem|PerfMonExtended
+    PerfMonStandard  = PerfMonProcMem|PerfMonExtended|PerfMonUDP
 #endif
 
 };