浏览代码

HPCC-25262 Metrics milestone 1

Implement and collect a counter metric

Signed-off-by: Ken Rowland <kenneth.rowland@lexisnexisrisk.com>
Ken Rowland 4 年之前
父节点
当前提交
53851c7416

+ 1 - 0
CMakeLists.txt

@@ -176,6 +176,7 @@ elseif ( NOT MAKE_DOCS_ONLY )
     HPCC_ADD_SUBDIRECTORY (rtl)
     HPCC_ADD_SUBDIRECTORY (services "PLATFORM")
     HPCC_ADD_SUBDIRECTORY (thorlcr "PLATFORM")
+    HPCC_ADD_SUBDIRECTORY (system/metrics)
     HPCC_ADD_SUBDIRECTORY (testing)
     if (NOT CONTAINERIZED)
       HPCC_ADD_SUBDIRECTORY (deploy)

+ 334 - 0
devdoc/Metrics.rst

@@ -0,0 +1,334 @@
+========================
+Metrics Framework Design
+========================
+
+************
+Introduction
+************
+
+This document describes the design of a metrics framework that allows HPCC Systems components to
+implement a metric collection strategy. Metrics provide the following functionality:
+
+* Alerts and monitoring
+
+  An important DevOps function is to monitor the cluster and providing alerts when
+  problems are detected. Aggregated metric values from multiple sources provide
+  the necessary data to build a complete picture of cluster health that drives
+  monitoring and alerts.
+
+* Scaling
+
+  As described above, aggregated metric data is also used to dynamically respond to changing
+  cluster demands and load. Metrics provide the monitoring capability to react and take
+  action
+
+* Fault diagnosis and resource monitoring
+
+  Metrics provide historical data useful in diagnosing problems by profiling how demand and
+  usage patterns may change prior to a fault. Predictive analysis can also be applied.
+
+* Analysis of jobs/workunits and profiling
+
+  With proper instrumentation, a robust dynamic metric strategy can track workunit
+  processing. Internal problems with queries should be diagnosed from deep drill down logging.
+
+The document consists of several sections in order to provide requirements as well as
+the design of framework components.
+
+Definitions
+===============
+Some definitions are useful.
+
+Metric
+  A measurement defined by a component that represents an internal state that is useful in a system
+  reliability engineering function. In the context of the framework, a metric is an object representing
+  the above.
+
+Metric Value
+  The current value of a metric.
+
+Metric Updating
+  The component task of updating metric state.
+
+Collection
+  A framework process of selecting relevant metrics based on configuration and then retrieving
+  their values.
+
+Reporting
+  A framework process of converting values obtained during a collection into a format suitable for
+  ingestion by a collection system.
+
+Trigger
+  What causes the collection of metric values.
+
+Collection System
+  The store for metric values generated during the reporting framework process.
+
+
+*************
+Use Scenarios
+*************
+This section describes how components expect to use the framework. It is not a complete list of all
+requirements but rather a sample.
+
+
+Roxie
+=====
+Roxie desires to keep a count of many different internal values. Some examples are
+
+* Disk type operations such as seeks and reads
+* Execution totals
+
+  Need to track items such as total numbers of items such as success and failures
+  as well as breaking some counts into individual reasons. For example, failures
+  may need be categorized such as as
+
+  * Busy
+  * Timeout
+  * Bad input
+
+  Or even by priority (high, low, sla, etc.)
+
+* Current operational levels such as the length of internal queues
+* The latency of operations such as queue results, agent responses, and gateway responses
+
+Roxie also has the need to track internal memory usage beyond the pod/system level capabilities.
+Tracking the state of its large fixed memory pool is necessary.
+
+The Roxie buddy system also must track how often and who is completing requests. The "I Beat You To It"
+set of metrics must be collected and exposed in order to detect pending node failure. While specific
+action on these counts is not known up front, it appears clear that these values are useful and should
+be collected.
+
+There does not appear to be a need for creating and destroying metrics dynamically. The set of metrics
+is most likely to be created at startup and remain active through the life of the Roxie. If, however,
+stats collection seeps into the metrics framework, dynamic creation and destruction of stats metrics is
+a likely requirement.
+
+
+ESP
+===
+
+There are some interesting decisions with respect to ESP and collection of metrics. Different
+applications within ESP present different use cases for collection. Ownership of a given task drives
+some of these use cases. Take workunit queues. If ownership of the task, with respect to metrics, is
+WsWorkunits, then use cases are centric to that component. However, if agents listening on the queue
+are to report metrics, then a different set of use cases emerge. It is clear that additional work is
+needed to generate clear ownership of metrics gathered by ESP and/or the tasks it performs.
+
+ESP needs to report the *activeTransactions* value from the TxSummary class(es). This gives an
+indication of how busy the ESP is in terms of client requests.
+
+Direct measurement of response time in requests may not be useful since the type of request causes
+different execution paths within ESP that are expected to take widely varying amounts of time. Creation
+of metrics for each method is not recommended. However, two possible solutions are to a) create a
+metric for request types, or b) use a histogram to measure response time ranges. Another option
+mentioned redefines the meaning of a bucket in a histogram. Instead of a numeric distribution,
+each bucket represents a unique subtask within an overall "metric" representing a measured operation.
+This should be explored whether for operational or developmental purposes.
+
+For tracking specific queries and their health, the feeling is that logging can accomplish this better
+than metrics since the list of queries to monitor will vary between clusters. Additionally, operational
+metrics solving the cases mentioned above will give a view into the overall health of ESP which will
+affect the execution of queries. Depending on actions taken by these metrics, scaling may solve
+overload conditions to keep cluster responsiveness acceptable.
+
+For Roxie a workunit operates as a service. Measuring service performance using a histogram to capture
+response times as a distribution may be appropriate. Extracting the 95th percentile of response time
+may be useful as well.
+
+There are currently no use cases requiring consistency between values of different metrics.
+
+At this time the only concrete metric identified is the number of requests received. As the framework
+design progresses and ESP is instrumented, the list will grow.
+
+
+Dali Use Cases
+==============
+
+From information gathered, Dali plans to keep counts and rates for many of the items it manages.
+
+
+****************
+Framework Design
+****************
+
+This section covers the design and architecture of the framework. It discusses the main areas of the
+design, the interactions between each area, and an overall process model of how the framework operates.
+
+The framework consists of three major areas: metrics, sinks, and the glue logic. These areas work
+together with the platform and the component to provide a reusable metrics collection function.
+
+Metrics represent the quantifiable component state measurements used to track and assess the status
+of the component. Metrics are typically scalar values that are easily aggregated by a collection system.
+Aggregated values provide the necessary input to take component and cluster actions such as scaling
+up and down. The component is responsible for creating metrics and instrumenting the code. The
+framework provides the support for collecting and reporting the values. Metrics provide the following:
+
+* Simple methods for the component to update the metric
+* Simple methods for the framework to retrieve metric value(s)
+* Handling of all synchronization between updating and retrieving metric values
+
+In addition, the framework provides the support for retrieving values so that the component does not
+participate in metric reporting. The component simply creates the metrics it needs, then instruments
+the component to update the metric whenever its state changes. For example, the component may create
+a metric that counts the total number of requests received. Then, wherever the component
+receives a request, a corresponding update to the count is added. Nowhere in the component is any
+code added to retrieve the count as that is handled by the framework.
+
+Sinks provide a pluggable interface to hide the specifics of collection systems so that the metrics
+framework is independent of those dependencies. Sinks:
+
+* Operate independently of other sinks in the system
+* Convert metric native values into collection system specific measurements and reports
+* Drive the collection and reporting processes
+
+The third area of the framework is the glue logic, referred to as the *MetricsReporter*. It manages
+the metrics system for the component. It provides the following:
+
+* Handles framework initialization
+* Loads sinks as required
+* Manages the list of metrics for the component
+* Handles collection and reporting with a set of convenience methods used by sinks
+
+The framework is designed to be instantiated into a component as part of its process and address space.
+All objects instantiated as part of the framework are owned by the component and are not shareable with
+any other component whether local or remote. Any coordination or consistency requirements that may
+arise in the implementation of a sink shall be the sole responsibility of the sink.
+
+************************
+Framework Implementation
+************************
+This section describe the implementation of each area of the framework.
+
+Metrics
+=======
+Components use metrics to measure their internal state. Metrics can represent everything from the
+number of requests received to the average length some value remains cached. The point is that the
+component is responsible for creating and updating the metric. The framework shall provide a set of
+metrics designed to cover the majority of component measurement requirements. All metrics share a
+common interface to allow the framework to manage them in a common way.
+
+To meet the requirement to manage metrics independent of the underlying metric state, all metrics
+implement a common interface. All metrics then add their specific methods to update and retrieve
+internal state. Generally the component uses the update method(s) to change metric state whenever
+an event or other process dictates. The sink, described later, is generally the consumer of the
+retrieval methods. Components create and update metrics and sinks retrieve and consume the values.
+The metric is responsible for synchronizing access between update and retrieval.
+
+Sinks
+=====
+The framework defines a sink interface to support the different requirements of collection systems.
+Examples of collection systems are Prometheus, Datadog, and Elasticsearch. Each has different
+requirements for how and when measurements are ingested. The following are examples of different
+collection system requirements:
+
+* Polled vs Periodic
+* Single measurement vs multiple reports
+* Report format (JSON, text, etc.)
+* Push vs Pull
+
+Sinks are responsible for two main functions: initiating a collection and reporting
+measurements to the collection system. The *Metrics Reporter* provides the support to complete
+these functions.
+
+The sink encapsulates all of the collection system requirements providing a pluggable architecture that
+isolates components from these differences. The framework supports multiple sinks concurrently,
+each operating independently.
+
+Instrumented components are not aware of the sink or sinks in use. Sinks can be changed without
+requiring changes to a component. Therefore, components are independent of the collection system(s)
+in use.
+
+
+Metrics Reporter
+================
+
+The metrics reporter class provides all of the common functions to bind together the component,
+the metrics it creates, and the sinks to which measurements are reported. It is responsible for
+the following:
+
+* Initialization of the framework
+* Managing the metrics created by the component
+* Handling collection and reporting as directed by configured sinks
+
+
+Metrics Implementations
+=======================
+
+The sections that follow discuss metric implementations.
+
+Counter Metric
+--------------
+A counter metric is a monotonically increasing value that "counts" the total occurrences of some event.
+Examples include the number of requests received, or the number of cache misses. Once created, the
+component instruments the code with updates to the count whenever appropriate.
+
+Gauge Metric
+------------
+A gauge metric is a continuously updated value representing the current state of an interesting value
+in the component. For example, the amount of memory used in an internal buffer, or the number of
+requests waiting on a queue. A gauge metric may increase or decrease in value as needed. Reading the
+value of a gauge is a stateless operation in that there are no dependencies on the previous reading.
+The value returned shall always be the current state.
+
+Once created, the component shall update the gauge anytime the state of what is measured is updated.
+The metric shall provide methods to increase and decrease the value. The sink reads the value during
+collection and reporting.
+
+
+*************
+Configuration
+*************
+This section discusses configuration. Since Helm charts are capable of combining configuration data
+at a global level into a component's specific configuration, The combined configuration takes the
+form as shown below. Note that as the design progresses it is expected that there will be additions.
+
+::
+
+  component:
+    metrics:
+      sinks:
+        sink:
+          - type: <sink_type>
+            name: <sink name>
+            settings:
+              sink_setting1: sink_setting_value1
+              sink_setting2: sink_setting_value2
+            metrics:
+              - name: <metric_name>
+
+Where (based on being a child of the current *component*):
+
+metrics
+    Metrics configuration for the component
+
+metrics.sinks
+    List of sinks defined for the component (may have been combined with global config)
+
+metrics.sinks.sink
+    The repeating element
+
+metrics.sinks.sink.type
+    The type for the sink. The type is substituted into the following pattern to determine the lib to load:
+    libhpccmetrics<type><shared_object_extension>
+
+metrics.sinks.sink.name
+    A name for the sink. Note this may not be needed, but can provide a way to combine global and
+    component config based on value
+
+metrics.sinks.sink.settings
+    A set of key/value pairs that passed to the sink when initialized. It should contain information
+    necessary for the operation of the sink. Nested YML is supported. Example settings are the
+    prometheus server name, or the collection period for a periodic sink.
+
+metrics.sinks.sink.metrics
+    Optional list of component-defined metrics reported by the sink to the backend during collection
+    and reporting. If no list if given, all component metrics are reported by default.
+
+
+*************************
+Component Instrumentation
+*************************
+
+This section describes component instrumentation. Will be filled in later.

+ 95 - 76
esp/platform/espp.cpp

@@ -48,6 +48,9 @@
 #include "rmtfile.hpp"
 #include "dafdesc.hpp"
 
+#include "jmetrics.hpp"
+using namespace hpccMetrics;
+
 void CEspServer::sendSnmpMessage(const char* msg) { throwUnexpected(); }
 
 bool CEspServer::addCacheClient(const char *id, const char *cacheInitString)
@@ -141,53 +144,53 @@ int start_init_main(int argc, const char** argv, int (*init_main_func)(int,const
 #define RESET_ESP_SIGNAL_HANDLER(sig, handler)
 
 int start_init_main(int argc, const char** argv, int (*init_main_func)(int, const char**))
-{ 
-    if(argc > 1 && !strcmp(argv[1], "work")) 
-    { 
+{
+    if(argc > 1 && !strcmp(argv[1], "work"))
+    {
         const char** newargv = new const char*[argc - 1];
-        newargv[0] = argv[0]; 
-        for(int i = 2; i < argc; i++) 
-        { 
-            newargv[i - 1] = argv[i]; 
-        } 
-        int rtcode = init_main_func(argc - 1, newargv); 
+        newargv[0] = argv[0];
+        for(int i = 2; i < argc; i++)
+        {
+            newargv[i - 1] = argv[i];
+        }
+        int rtcode = init_main_func(argc - 1, newargv);
         delete[] newargv;
         return rtcode;
-    } 
-    else 
-    { 
-        StringBuffer command; 
-        command.append(argv[0]); 
-        command.append(" work "); 
-        for(int i = 1; i < argc; i++) 
-        { 
-            command.append(argv[i]); 
-            command.append(" "); 
-        } 
-        DWORD exitcode = 0; 
-        while(true) 
-        { 
-            PROGLOG("Starting working process: %s", command.str()); 
-            PROCESS_INFORMATION process; 
-            STARTUPINFO si; 
-            GetStartupInfo(&si); 
-            if(!CreateProcess(NULL, (char*)command.str(), NULL, NULL, FALSE, 0, NULL, NULL, &si, &process)) 
-            { 
-                IERRLOG("Process failed: %d\r\n",GetLastError()); 
-                exit(-1); 
-            } 
-            WaitForSingleObject(process.hProcess,INFINITE); 
-            GetExitCodeProcess(process.hProcess, &exitcode); 
-            PROGLOG("Working process exited, exitcode=%d", exitcode); 
-            if(exitcode == TERMINATE_EXITCODE) 
-            { 
-                DBGLOG("This is telling the monitoring process to exit too. Exiting once and for all...."); 
-                exit(exitcode); 
-            } 
-            CloseHandle(process.hProcess); 
-            CloseHandle(process.hThread); 
+    }
+    else
+    {
+        StringBuffer command;
+        command.append(argv[0]);
+        command.append(" work ");
+        for(int i = 1; i < argc; i++)
+        {
+            command.append(argv[i]);
+            command.append(" ");
+        }
+        DWORD exitcode = 0;
+        while(true)
+        {
+            PROGLOG("Starting working process: %s", command.str());
+            PROCESS_INFORMATION process;
+            STARTUPINFO si;
+            GetStartupInfo(&si);
+            if(!CreateProcess(NULL, (char*)command.str(), NULL, NULL, FALSE, 0, NULL, NULL, &si, &process))
+            {
+                IERRLOG("Process failed: %d\r\n",GetLastError());
+                exit(-1);
+            }
+            WaitForSingleObject(process.hProcess,INFINITE);
+            GetExitCodeProcess(process.hProcess, &exitcode);
+            PROGLOG("Working process exited, exitcode=%d", exitcode);
+            if(exitcode == TERMINATE_EXITCODE)
+            {
+                DBGLOG("This is telling the monitoring process to exit too. Exiting once and for all....");
+                exit(exitcode);
+            }
+            CloseHandle(process.hProcess);
+            CloseHandle(process.hThread);
             Sleep(1000);
-        } 
+        }
     }
 }
 
@@ -210,35 +213,35 @@ int start_init_main(int argc, const char** argv, int (*init_main_func)(int, cons
 int work_main(CEspConfig& config, CEspServer& server);
 int do_work_main(CEspConfig& config, CEspServer& server)
 {
-   int result; 
-   int numchildren = 0; 
-   pid_t childpid=0; 
-
-createworker: 
-   childpid = fork(); 
-   if(childpid < 0) 
-   { 
-      IERRLOG("Unable to create new process"); 
-      result = -1; 
-   } 
-   else if(childpid == 0) 
+   int result;
+   int numchildren = 0;
+   pid_t childpid=0;
+
+createworker:
+   childpid = fork();
+   if(childpid < 0)
+   {
+      IERRLOG("Unable to create new process");
+      result = -1;
+   }
+   else if(childpid == 0)
    {
         result = work_main(config, server);
-   } 
-   else 
-   { 
-      DBGLOG("New process generated, pid=%d", childpid); 
-      numchildren++; 
-      if(numchildren < MAX_CHILDREN) 
-         goto createworker; 
-      
-      int status; 
-      childpid = wait3(&status, 0, NULL); 
-      DBGLOG("Attention: child process exited, pid = %d", childpid); 
-      numchildren--; 
-      DBGLOG("Bringing up a new process..."); 
-      sleep(1); 
-      goto createworker; 
+   }
+   else
+   {
+      DBGLOG("New process generated, pid=%d", childpid);
+      numchildren++;
+      if(numchildren < MAX_CHILDREN)
+         goto createworker;
+
+      int status;
+      childpid = wait3(&status, 0, NULL);
+      DBGLOG("Attention: child process exited, pid = %d", childpid);
+      numchildren--;
+      DBGLOG("Bringing up a new process...");
+      sleep(1);
+      goto createworker;
    }
 
    return result;
@@ -304,7 +307,7 @@ void openEspLogFile(IPropertyTree* envpt, IPropertyTree* procpt)
 
     if (procpt->getPropBool("@enableSysLog", false))
         UseSysLogForOperatorMessages();
-}   
+}
 
 
 static constexpr const char * defaultYaml = R"!!(
@@ -351,6 +354,20 @@ static void usage()
 
 IPropertyTree *buildApplicationLegacyConfig(const char *application, const char* argv[]);
 
+//
+// Initialize metrics
+void initializeMetrics(CEspConfig* config)
+{
+    //
+    // Initialize metrics
+    Owned<IPropertyTree> pMetricsTree = config->queryConfigPTree()->getPropTree("metrics");
+    if (pMetricsTree != nullptr)
+    {
+        MetricsReporter &metricsReporter = queryMetricsReporter();
+        metricsReporter.init(pMetricsTree);
+        metricsReporter.startCollecting();
+    }
+}
 
 int init_main(int argc, const char* argv[])
 {
@@ -391,7 +408,7 @@ int init_main(int argc, const char* argv[])
 
     int result = -1;
 
-#ifdef _WIN32 
+#ifdef _WIN32
     if (!interactive)
         ::SetErrorMode(SEM_NOGPFAULTERRORBOX|SEM_FAILCRITICALERRORS);
 #endif
@@ -528,8 +545,10 @@ int init_main(int argc, const char* argv[])
             setEspContainer(server.get());
 
             config->loadAll();
-            config->bindServer(*server.get(), *server.get()); 
+            config->bindServer(*server.get(), *server.get());
             config->checkESPCache(*server.get());
+
+            initializeMetrics(config);
         }
         catch(IException* e)
         {
@@ -552,12 +571,12 @@ int init_main(int argc, const char* argv[])
     {
         OERRLOG("!!! Unable to load ESP configuration.");
     }
-    
+
     return result;
 }
 
 //command line arguments:
-// [pre] if "work", special init behavior, but removed before init_main  
+// [pre] if "work", special init behavior, but removed before init_main
 // [1] process name
 // [2] config location - local file name or dali address
 // [3] config location type - "dali" or ""
@@ -565,9 +584,9 @@ int init_main(int argc, const char* argv[])
 int main(int argc, const char* argv[])
 {
     start_init_main(argc, argv, init_main);
+    queryMetricsReporter().stopCollecting();
     stopPerformanceMonitor();
     UseSysLogForOperatorMessages(false);
     releaseAtoms();
     return 0;
 }
-

+ 16 - 0
esp/platform/txsummary.cpp

@@ -19,6 +19,7 @@
 #include "jlog.hpp"
 #include "jutil.hpp"
 #include <algorithm>
+#include "espcontext.hpp"
 
 using std::find_if;
 using std::for_each;
@@ -26,6 +27,20 @@ using std::for_each;
 #define VALIDATE_KEY(k) if (!(k) || !(*k)) return false
 #define MATCH_KEY       [&](const Entry& entry) { return stricmp(entry.key.str(), key) == 0; }
 
+std::shared_ptr<hpccMetrics::CounterMetric> CTxSummary::pRequestCount;
+
+MODULE_INIT(INIT_PRIORITY_STANDARD)
+{
+    CTxSummary::pRequestCount = hpccMetrics::createMetric<hpccMetrics::CounterMetric>("requests", "Number of Requests");
+    return true;
+}
+
+MODULE_EXIT()
+{
+    CTxSummary::pRequestCount = nullptr;
+}
+
+
 bool operator < (const StringAttr& a, const StringAttr& b)
 {
     return (stricmp(a.str(), b.str()) < 0);
@@ -34,6 +49,7 @@ bool operator < (const StringAttr& a, const StringAttr& b)
 CTxSummary::CTxSummary(unsigned creationTime)
 : m_creationTime(creationTime ? creationTime : msTick())
 {
+    pRequestCount->inc(1);
 }
 
 CTxSummary::~CTxSummary()

+ 4 - 0
esp/platform/txsummary.hpp

@@ -26,6 +26,8 @@
 #include "esphttp.hpp"
 #include <list>
 #include <map>
+#include "jmetrics.hpp"
+
 
 class CTxSummary : extends CInterface
 {
@@ -78,6 +80,8 @@ public:
     // The same conditions as for getTimer apply.
     virtual bool updateTimer(const char* name, unsigned long long delta, const LogLevel logLevel = LogMin);
 
+    static std::shared_ptr<hpccMetrics::CounterMetric> pRequestCount;
+
 protected:
     // Log the summary contents on destruction.
     ~CTxSummary();

+ 8 - 3
initfiles/componentfiles/configxml/esp.xsl

@@ -71,7 +71,6 @@
         </xsl:copy>
     </xsl:template>
 
-
     <xsl:template match="/Environment/Software/EspProcess">
         <!-- note that this can only be @name=$process since it is guided by template
           for /Environment above -->
@@ -85,6 +84,8 @@
 
             <xsl:call-template name="addEnvironmentInfo"/>
 
+            <xsl:call-template name="addMetricsConfig"/>
+
             <xsl:if test="ESPCacheGroup[1]">
                <xsl:value-of disable-output-escaping="yes" select="$break" />
                <xsl:value-of disable-output-escaping="yes" select="$indent" />
@@ -633,6 +634,10 @@
             </Environment>
         </xsl:template>
 
+    <xsl:template name="addMetricsConfig">
+        <xsl:copy-of select="/Environment/Software/metrics"/>
+    </xsl:template>
+
      <xsl:template match="/Environment/Software/EspProcess" mode="EclWatch">
         <xsl:for-each select="EspBinding">
             <xsl:variable name="protocol" select="@protocol"/>
@@ -671,7 +676,7 @@
                  </xsl:for-each>
                  <xsl:for-each select="$bindingNode/AuthenticateFeature[@authenticate='Yes']">
                     <Feature name="{@name}" path="{@path}" resource="{@resource}" required="{@access}" description="{@description}"/>
-                 </xsl:for-each>                              
+                 </xsl:for-each>
                  <xsl:for-each select="$bindingNode/AuthenticateSetting[@include='Yes']">
                     <Setting path="{@path}" resource="{@resource}" description="{@description}"/>
                  </xsl:for-each>
@@ -697,7 +702,7 @@
            </xsl:when>
         </xsl:choose>
      </xsl:template>
-   
+
         <xsl:template name="printUniqueTokens">
             <xsl:param name="s"/><!--space delimited string of tokens with space as last char-->
             <xsl:param name="enclosingTagName"/>

+ 52 - 51
system/jlib/CMakeLists.txt

@@ -14,20 +14,20 @@
 #    limitations under the License.
 ################################################################################
 
-# Component: jlib 
+# Component: jlib
 #####################################################
 # Description:
 # ------------
 #    Cmake Input File for jlib
 #####################################################
 
-project( jlib ) 
+project( jlib )
 
-INCLUDE(CheckLibraryExists) 
+INCLUDE(CheckLibraryExists)
 
 if (NOT WIN32 AND NOT WIN64)
-CHECK_LIBRARY_EXISTS(dl dlopen "" HAVE_LIBDL) 
-CHECK_LIBRARY_EXISTS(crypt crypt "" HAVE_LIBCRYPT) 
+CHECK_LIBRARY_EXISTS(dl dlopen "" HAVE_LIBDL)
+CHECK_LIBRARY_EXISTS(crypt crypt "" HAVE_LIBCRYPT)
 endif ()
 
 if(NOT TARGET lzma)
@@ -46,54 +46,55 @@ if (CMAKE_COMPILER_IS_GNUCC OR CMAKE_COMPILER_IS_CLANG)
   SET (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wno-switch -Wno-unused-parameter -Werror -Wno-error=delete-non-virtual-dtor")
 endif()
 
-set (    SRCS 
+set (    SRCS
     ${SCM_GENERATED_INCLUDES}
-         jargv.cpp 
-         jarray.cpp 
-         javahash.cpp 
-         jbsocket.cpp 
-         jbuff.cpp 
-         jcomp.cpp 
-         jcrc.cpp 
-         jdebug.cpp 
-         jencrypt.cpp 
-         jexcept.cpp 
-         jfile.cpp 
-         jflz.cpp 
-         jhash.cpp 
-         jiface.cpp 
-         jio.cpp 
-         jiter.cpp 
-         jkeyboard.cpp 
-         jlib.cpp 
-         jlog.cpp 
+         jargv.cpp
+         jarray.cpp
+         javahash.cpp
+         jbsocket.cpp
+         jbuff.cpp
+         jcomp.cpp
+         jcrc.cpp
+         jdebug.cpp
+         jencrypt.cpp
+         jexcept.cpp
+         jfile.cpp
+         jflz.cpp
+         jhash.cpp
+         jiface.cpp
+         jio.cpp
+         jiter.cpp
+         jkeyboard.cpp
+         jlib.cpp
+         jlog.cpp
          jlz4.cpp
-         jlzma.cpp 
-         jlzw.cpp 
-         jmd5.cpp 
-         jmemleak.cpp 
-         jmisc.cpp 
-         jmutex.cpp 
-         jobserve.cpp 
-         jprop.cpp 
-         jptree.cpp 
+         jlzma.cpp
+         jlzw.cpp
+         jmd5.cpp
+         jmemleak.cpp
+         jmetrics.cpp
+         jmisc.cpp
+         jmutex.cpp
+         jobserve.cpp
+         jprop.cpp
+         jptree.cpp
          jqueue.cpp
-         jregexp.cpp 
+         jregexp.cpp
          jrowstream.cpp
          jsecrets.cpp
-         jsem.cpp 
-         jset.cpp 
-         jsmartsock.cpp 
-         jsocket.cpp 
-         jsort.cpp 
-         jstats.cpp 
-         jstream.cpp 
-         jstring.cpp 
+         jsem.cpp
+         jset.cpp
+         jsmartsock.cpp
+         jsocket.cpp
+         jsort.cpp
+         jstats.cpp
+         jstream.cpp
+         jstring.cpp
          jsuperhash.cpp
-         jthread.cpp 
-         jtime.cpp 
-         junicode.cpp 
-         jutil.cpp 
+         jthread.cpp
+         jtime.cpp
+         junicode.cpp
+         jutil.cpp
          ${HPCC_SOURCE_DIR}/system/globalid/lnuid.cpp
          ${HPCC_SOURCE_DIR}/system/codesigner/codesigner.cpp
          ${HPCC_SOURCE_DIR}/system/codesigner/gpgcodesigner.cpp
@@ -191,7 +192,7 @@ if (CMAKE_COMPILER_IS_GNUCC)
     endif ()
 endif ()
 
-include_directories ( 
+include_directories (
          ${HPCC_SOURCE_DIR}/system/jlib
          ${HPCC_SOURCE_DIR}/system/win32
          ${HPCC_SOURCE_DIR}/system/include
@@ -201,7 +202,7 @@ include_directories (
          ${HPCC_SOURCE_DIR}/system/security/cryptohelper
          ${HPCC_SOURCE_DIR}/system/yaml/libyaml/include
          ${HPCC_SOURCE_DIR}/system/httplib
-         ${CMAKE_CURRENT_BINARY_DIR}  # for generated jelog.h file 
+         ${CMAKE_CURRENT_BINARY_DIR}  # for generated jelog.h file
          ${CMAKE_BINARY_DIR}
          ${CMAKE_BINARY_DIR}/oss
     )
@@ -242,10 +243,10 @@ if (NOT PLUGIN)
         add_custom_command ( DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/jelog.mc
                          OUTPUT ${EXECUTABLE_OUTPUT_PATH}/${CMAKE_CFG_INTDIR}/jelog.dll ${CMAKE_CURRENT_BINARY_DIR}/jelog.h
                          COMMAND echo mc -r ${WINDOWS_CMAKE_CURRENT_BINARY_DIR} -h ${WINDOWS_CMAKE_CURRENT_BINARY_DIR} ${WINDOWS_CMAKE_CURRENT_SOURCE_DIR}
-                         COMMAND echo rc -r -fo ${CMAKE_CURRENT_BINARY_DIR}/jelog.res ${CMAKE_CURRENT_BINARY_DIR}/jelog.rc 
+                         COMMAND echo rc -r -fo ${CMAKE_CURRENT_BINARY_DIR}/jelog.res ${CMAKE_CURRENT_BINARY_DIR}/jelog.rc
                          COMMAND echo link -dll -noentry -machine:IX86 -out:${EXECUTABLE_OUTPUT_PATH}/${CMAKE_CFG_INTDIR}/jelog.dll ${CMAKE_CURRENT_BINARY_DIR}/jelog.res
                          COMMAND mc -r ${WINDOWS_CMAKE_CURRENT_BINARY_DIR} -h ${WINDOWS_CMAKE_CURRENT_BINARY_DIR} ${WINDOWS_CMAKE_CURRENT_SOURCE_DIR}
-                         COMMAND rc -r -fo ${CMAKE_CURRENT_BINARY_DIR}/jelog.res ${CMAKE_CURRENT_BINARY_DIR}/jelog.rc 
+                         COMMAND rc -r -fo ${CMAKE_CURRENT_BINARY_DIR}/jelog.res ${CMAKE_CURRENT_BINARY_DIR}/jelog.rc
                          COMMAND link -dll -noentry -machine:IX86 -out:${EXECUTABLE_OUTPUT_PATH}/${CMAKE_CFG_INTDIR}/jelog.dll ${CMAKE_CURRENT_BINARY_DIR}/jelog.res
                         )
         add_custom_target ( jelog ALL DEPENDS ${EXECUTABLE_OUTPUT_PATH}/${CMAKE_CFG_INTDIR}/jelog.dll )

+ 196 - 0
system/jlib/jmetrics.cpp

@@ -0,0 +1,196 @@
+/*##############################################################################
+    HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "jmetrics.hpp"
+#include "jmutex.hpp"
+#include "jlog.hpp"
+
+using namespace hpccMetrics;
+
+static Singleton<MetricsReporter> metricsReporter;
+MODULE_INIT(INIT_PRIORITY_STANDARD)
+{
+    return true;
+}
+
+MODULE_EXIT()
+{
+    delete metricsReporter.queryExisting();
+}
+
+
+struct hpccMetrics::SinkInfo
+{
+    explicit SinkInfo(MetricSink *_pSink) : pSink{_pSink} {}
+    MetricSink *pSink = nullptr;             // ptr to the sink
+    std::vector<std::string> reportMetrics;   // vector of metrics to report (empty for none)
+};
+
+MetricsReporter &hpccMetrics::queryMetricsReporter()
+{
+    return *metricsReporter.query([] { return new MetricsReporter; });
+}
+
+
+MetricsReporter::~MetricsReporter()
+{
+    for (auto const &sinkIt : sinks)
+    {
+        sinkIt.second->pSink->stopCollection();
+        delete sinkIt.second->pSink;
+    }
+}
+
+
+void MetricsReporter::init(IPropertyTree *pMetricsTree)
+{
+    Owned<IPropertyTree> pSinkTree = pMetricsTree->getPropTree("sinks");
+    Owned<IPropertyTreeIterator> sinkElementsIt = pSinkTree->getElements("sink");
+    initializeSinks(sinkElementsIt);
+}
+
+
+void MetricsReporter::addMetric(const std::shared_ptr<IMetric> &pMetric)
+{
+    std::unique_lock<std::mutex> lock(metricVectorMutex);
+    auto it = metrics.find(pMetric->queryName());
+    if (it == metrics.end())
+    {
+        metrics.insert({pMetric->queryName(), pMetric});
+    }
+    else
+    {
+        throw MakeStringException(MSGAUD_operator, "addMetric - Attempted to add duplicate named metric with name %s", pMetric->queryName().c_str());
+    }
+}
+
+
+void MetricsReporter::startCollecting()
+{
+    for (auto const &sinkIt : sinks)
+    {
+        sinkIt.second->pSink->startCollection(this);
+    }
+}
+
+
+void MetricsReporter::stopCollecting()
+{
+    for (auto const &sinkIt : sinks)
+    {
+        sinkIt.second->pSink->stopCollection();
+    }
+}
+
+
+std::vector<std::shared_ptr<IMetric>> MetricsReporter::queryMetricsForReport(const std::string &sinkName)
+{
+    std::vector<std::shared_ptr<IMetric>> reportMetrics;
+    reportMetrics.reserve(metrics.size());
+
+    auto it = sinks.find(sinkName);
+    if (it != sinks.end())
+    {
+        //
+        // Lock the list of metrics while it's in use
+        std::unique_lock<std::mutex> lock(metricVectorMutex);   // no one else can mess with it for a bit
+        auto metricIt=metrics.begin();
+        while (metricIt != metrics.end())
+        {
+            auto pMetric = metricIt->second.lock();
+            if (pMetric)
+            {
+                // This is where the metric would be compared against the list of metrics to be reported
+                // by the sink (probably a regex). This allows limiting the metrics reported to the sink.
+                // for now, only the default is supported which is reporting all metrics.
+                reportMetrics.emplace_back(std::move(pMetric));
+                ++metricIt;
+            }
+            else
+            {
+                metricIt = metrics.erase(metricIt);
+            }
+        }
+    }
+    else
+    {
+        throw MakeStringException(MSGAUD_operator, "queryMetricsForReport - sink name %s not found", sinkName.c_str());
+    }
+    return reportMetrics;
+}
+
+
+
+void MetricsReporter::initializeSinks(IPropertyTreeIterator *pSinkIt)
+{
+    for (pSinkIt->first(); pSinkIt->isValid(); pSinkIt->next())
+    {
+        Owned<IPropertyTree> pSinkTree = &pSinkIt->get(); // pSinkIt->query();
+
+        StringBuffer cfgSinkType, cfgSinkName;
+        pSinkTree->getProp("@type", cfgSinkType);
+        pSinkTree->getProp("@name", cfgSinkName);
+
+        //
+        // Make sure both name and type are provided
+        if (cfgSinkType.isEmpty() || cfgSinkName.isEmpty())
+        {
+            throw MakeStringException(MSGAUD_operator, "initializeSinks - All sinks definitions must specify a name and a type");
+        }
+
+        //
+        // If sink already registered, use it, otherwise it's new.
+        auto sinkIt = sinks.find(cfgSinkName.str());
+        if (sinkIt == sinks.end())
+        {
+            Owned<IPropertyTree> pSinkSettings = pSinkTree->getPropTree("settings");
+            MetricSink *pSink = getSinkFromLib(cfgSinkType.str(), cfgSinkName.str(), pSinkSettings);
+            sinks.insert({std::string(cfgSinkName.str()), std::unique_ptr<SinkInfo>(new SinkInfo(pSink))});
+        }
+    }
+}
+
+
+MetricSink *MetricsReporter::getSinkFromLib(const char *type, const char *sinkName, const IPropertyTree *pSettingsTree)
+{
+    std::string libName;
+
+    libName.append("libhpccmetrics_").append(type).append(SharedObjectExtension);
+
+    HINSTANCE libHandle = LoadSharedObject(libName.c_str(), true, false);
+
+    //
+    // If able to load the lib, get the instance proc and create the sink instance
+    MetricSink *pSink = nullptr;
+    if (libHandle != nullptr)
+    {
+        auto getInstanceProc = (getSinkInstance) GetSharedProcedure(libHandle, "getSinkInstance");
+        if (getInstanceProc != nullptr)
+        {
+            pSink = getInstanceProc(sinkName, pSettingsTree);
+            if (pSink == nullptr)
+            {
+                throw MakeStringException(MSGAUD_operator, "getSinkFromLib - Unable to get sink instance");
+            }
+        }
+        else
+        {
+            throw MakeStringException(MSGAUD_operator, "getSinkFromLib - Unable to get shared procedure (getSinkInstance)");
+        }
+    }
+    else
+    {
+        throw MakeStringException(MSGAUD_operator, "getSinkFromLib - Unable to load sink lib (%s)", libName.c_str());
+    }
+    return pSink;
+}

+ 202 - 0
system/jlib/jmetrics.hpp

@@ -0,0 +1,202 @@
+/*##############################################################################
+    HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <vector>
+#include <map>
+#include <atomic>
+#include <chrono>
+#include <mutex>
+#include <memory>
+#include <unordered_set>
+#include "jiface.hpp"
+#include "jptree.hpp"
+
+
+namespace hpccMetrics {
+
+class MetricsReporter;
+
+MetricsReporter jlib_decl &queryMetricsReporter();
+
+/*
+ * Enumerates the metric type.
+ */
+enum MetricType
+{
+    METRICS_COUNTER,
+    METRICS_GAUGE
+};
+
+
+/*
+ * IMetric
+ *
+ * Interface defining a metric
+ */
+interface IMetric
+{
+    /*
+     * Returns the metric name
+     */
+    virtual const std::string &queryName() const = 0;
+
+    /*
+     * Returns metric description
+     */
+    virtual const std::string &queryDescription() const = 0;
+
+    /*
+     * Returns the metric type.
+     */
+    virtual MetricType queryMetricType() const = 0;
+
+    /*
+     * Get current measurement
+     */
+    virtual uint64_t queryValue() const = 0;
+};
+
+
+/*
+ * Concrete base class implementation of the IMetric interface. All metrics inherit
+ * from this class.
+*/
+class jlib_decl Metric : public IMetric
+{
+public:
+    virtual ~Metric() = default;
+    const std::string &queryName() const override { return name; }
+    const std::string &queryDescription() const override { return description; }
+    MetricType queryMetricType() const override { return metricType; }
+    uint64_t queryValue() const override { return value; }
+
+protected:
+    // No one should be able to create one of these
+    Metric(const char *_name, const char *_desc, MetricType _metricType) :
+            name{_name},
+            description{_desc},
+            metricType{_metricType} { }
+
+protected:
+    std::string name;
+    std::string description;
+    MetricType metricType;
+    std::atomic<uint64_t> value{0};
+};
+
+/*
+ * Metric used to count events. Count is a monotonically increasing value
+ */
+class jlib_decl CounterMetric : public Metric
+{
+public:
+    CounterMetric(const char *name, const char *description) :
+            Metric{name, description, MetricType::METRICS_COUNTER}  { }
+    void inc(uint64_t val)
+    {
+        value.fetch_add(val);
+    }
+};
+
+
+/*
+ * Metric used to track the current state of some internal measurement.
+ */
+class jlib_decl GaugeMetric : public Metric
+{
+public:
+    GaugeMetric(const char *name, const char *description) :
+        Metric{name, description, MetricType::METRICS_GAUGE}  { }
+
+    /*
+     * Update the value as indicated
+     */
+    void add(int64_t delta)
+    {
+        value += delta;
+    }
+
+    /*
+     * Set the value
+     */
+    void set(int64_t val)
+    {
+        value = val;
+    }
+};
+
+
+
+class jlib_decl MetricSink
+{
+public:
+    virtual ~MetricSink() = default;
+    virtual void startCollection(MetricsReporter *pReporter) = 0;
+    virtual void stopCollection() = 0;
+    const std::string &queryName() const { return name; }
+    const std::string &queryType() const { return type; }
+
+protected:
+    MetricSink(const char *_name, const char *_type) :
+            name{_name},
+            type{_type} { }
+
+protected:
+    std::string name;
+    std::string type;
+    MetricsReporter *pReporter = nullptr;
+};
+
+extern "C" { typedef hpccMetrics::MetricSink* (*getSinkInstance)(const char *, const IPropertyTree *pSettingsTree); }
+
+struct SinkInfo;
+
+class jlib_decl MetricsReporter
+{
+public:
+    MetricsReporter() = default;
+    ~MetricsReporter();
+    void init(IPropertyTree *pMetricsTree);
+    void addMetric(const std::shared_ptr<IMetric> &pMetric);
+    void startCollecting();
+    void stopCollecting();
+    std::vector<std::shared_ptr<IMetric>> queryMetricsForReport(const std::string &sinkName);
+
+protected:
+    void initializeSinks(IPropertyTreeIterator *pSinkIt);
+    static MetricSink *getSinkFromLib(const char *type, const char *sinkName, const IPropertyTree *pSettingsTree);
+
+protected:
+    StringBuffer componentPrefix;
+    StringBuffer globalPrefix;
+    std::map<std::string, std::unique_ptr<SinkInfo>> sinks;
+    std::map<std::string, std::weak_ptr<IMetric>> metrics;
+    std::mutex metricVectorMutex;
+};
+
+
+//
+// Convenience function template to create a metric and add it to the reporter
+template <typename T>
+std::shared_ptr<T> createMetric(const char *name, const char* desc)
+{
+    std::shared_ptr<T> pMetric = std::make_shared<T>(name, desc);
+    queryMetricsReporter().addMetric(pMetric);
+    return pMetric;
+}
+
+}

+ 18 - 0
system/metrics/CMakeLists.txt

@@ -0,0 +1,18 @@
+###############################################################################
+#    HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+################################################################################
+
+HPCC_ADD_SUBDIRECTORY (sinks)
+HPCC_ADD_SUBDIRECTORY (testing)

+ 17 - 0
system/metrics/sinks/CMakeLists.txt

@@ -0,0 +1,17 @@
+###############################################################################
+#    HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+################################################################################
+
+HPCC_ADD_SUBDIRECTORY (file)

+ 31 - 0
system/metrics/sinks/file/CMakeLists.txt

@@ -0,0 +1,31 @@
+################################################################################
+#    HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+################################################################################
+
+project(hpccmetrics_filesink)
+
+set ( srcs
+        fileSink.cpp
+        )
+
+include_directories(
+        ${HPCC_SOURCE_DIR}/system/include
+        ${HPCC_SOURCE_DIR}/system/jlib
+)
+
+ADD_DEFINITIONS(  -DFILESINK_EXPORTS )
+HPCC_ADD_LIBRARY( hpccmetrics_filesink SHARED ${srcs} )
+TARGET_LINK_LIBRARIES( hpccmetrics_filesink jlib)
+INSTALL ( TARGETS hpccmetrics_filesink RUNTIME DESTINATION ${EXEC_DIR} LIBRARY DESTINATION ${LIB_DIR} )

+ 115 - 0
system/metrics/sinks/file/fileSink.cpp

@@ -0,0 +1,115 @@
+/*##############################################################################
+    HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "fileSink.hpp"
+#include <cstdio>
+#include <thread>
+
+using namespace hpccMetrics;
+
+extern "C" MetricSink* getSinkInstance(const char *name, const IPropertyTree *pSettingsTree)
+{
+    MetricSink *pSink = new FileMetricSink(name, pSettingsTree);
+    return pSink;
+}
+
+
+FileMetricSink::FileMetricSink(const char *name, const IPropertyTree *pSettingsTree) :
+        MetricSink(name, "file"),
+        collectionPeriodSeconds{60}
+{
+    if (pSettingsTree->hasProp("@period"))
+    {
+        collectionPeriodSeconds = pSettingsTree->getPropInt("@period");
+    }
+
+    pSettingsTree->getProp("@filename", fileName);
+    clearFileOnStartCollecting = pSettingsTree->getPropBool("@clear", false);
+}
+
+
+FileMetricSink::~FileMetricSink()
+{
+    if (isCollecting)
+    {
+        doStopCollecting();
+    }
+}
+
+
+void FileMetricSink::startCollection(MetricsReporter *_pReporter)
+{
+    fhandle = fopen(fileName.str(), clearFileOnStartCollecting ? "w" : "a");
+    pReporter = _pReporter;
+    isCollecting = true;
+    collectThread = std::thread(&FileMetricSink::collectionThread, this);
+}
+
+
+void FileMetricSink::collectionThread()
+{
+    //
+    // The initial wait for the first report
+    waitSem.wait(collectionPeriodSeconds * 1000);
+    while (!stopCollectionFlag)
+    {
+        auto reportMetrics = pReporter->queryMetricsForReport(name);
+        writeReportHeaderToFile();
+        for (auto &pMetric: reportMetrics)
+        {
+            writeMeasurementToFile(pMetric->queryName(), pMetric->queryValue(), pMetric->queryDescription());
+        }
+
+        // Wait again
+        waitSem.wait(collectionPeriodSeconds * 1000);
+    }
+}
+
+
+void FileMetricSink::stopCollection()
+{
+    if (isCollecting)
+    {
+        doStopCollecting();
+    }
+}
+
+
+void FileMetricSink::doStopCollecting()
+{
+    //
+    // Set the stop collecting flag, then signal the wait semaphore
+    // to wake up and stop the collection thread
+    stopCollectionFlag = true;
+    waitSem.signal();
+    isCollecting = false;
+    collectThread.join();
+    fclose(fhandle);
+}
+
+
+void FileMetricSink::writeMeasurementToFile(const std::string &metricName, uint32_t value, const std::string &metricDescription) const
+{
+    fprintf(fhandle, "  %s -> %d, %s\n", metricName.c_str(), value, metricDescription.c_str());
+    fflush(fhandle);
+}
+
+
+void FileMetricSink::writeReportHeaderToFile() const
+{
+    auto timenow = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
+    std::string timeStr(ctime(&timenow));
+    timeStr.pop_back();
+    fprintf(fhandle, "------------ Metric Report [%s] ------------\n", timeStr.c_str());
+    fflush(fhandle);
+}

+ 57 - 0
system/metrics/sinks/file/fileSink.hpp

@@ -0,0 +1,57 @@
+/*##############################################################################
+    HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+
+#pragma once
+
+#include "jmetrics.hpp"
+#include "jptree.hpp"
+#include "jstring.hpp"
+#include "jfile.ipp"
+#include "jsem.hpp"
+#include <thread>
+#include <condition_variable>
+
+
+using namespace hpccMetrics;
+
+#ifdef FILESINK_EXPORTS
+#define FILESINK_API DECL_EXPORT
+#else
+#define FILESINK_API DECL_IMPORT
+#endif
+
+class FILESINK_API FileMetricSink : public MetricSink
+{
+    public:
+        explicit FileMetricSink(const char *name, const IPropertyTree *pSettingsTree);
+        ~FileMetricSink() override;
+        void startCollection(MetricsReporter *pReporter) override;
+        void stopCollection() override;
+
+    protected:
+        void writeReportHeaderToFile() const;
+        void writeMeasurementToFile(const std::string &metricName, uint32_t value, const std::string &metricDescription) const;
+        void collectionThread();
+        void doStopCollecting();
+
+    protected:
+        StringBuffer fileName;
+        unsigned collectionPeriodSeconds;
+        std::thread collectThread;
+        std::atomic<bool> stopCollectionFlag{false};
+        bool isCollecting = false;
+        bool clearFileOnStartCollecting = false;
+        FILE *fhandle = nullptr;
+        Semaphore waitSem;
+};

+ 32 - 0
system/metrics/testing/CMakeLists.txt

@@ -0,0 +1,32 @@
+################################################################################
+#    HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+################################################################################
+
+project ( metrictest )
+
+SET (SRC_FILES
+        metrictest.cpp
+        )
+
+INCLUDE_DIRECTORIES(
+        ${CMAKE_BINARY_DIR}
+        ${CMAKE_BINARY_DIR}/oss
+        ${HPCC_SOURCE_DIR}/system/include
+        ${HPCC_SOURCE_DIR}/system/jlib
+)
+
+HPCC_ADD_EXECUTABLE ( metrictest ${SRC_FILES} )
+TARGET_LINK_LIBRARIES( metrictest jlib)
+INSTALL ( TARGETS metrictest RUNTIME DESTINATION ${EXEC_DIR} )

+ 146 - 0
system/metrics/testing/metrictest.cpp

@@ -0,0 +1,146 @@
+#include <cstdio>
+#include <thread>
+#include <chrono>
+#include <jptree.hpp>
+#include "jmetrics.hpp"
+
+
+using namespace hpccMetrics;
+
+void processThread(int, unsigned, bool, const std::string&, unsigned, unsigned);
+std::shared_ptr<CounterMetric> pEventCountMetric;
+std::shared_ptr<GaugeMetric> pQueueSizeMetric;
+
+
+MetricsReporter *pReporter;
+
+
+const char *globalConfigYml = R"!!(config:
+  metrics:
+    name: cluster config
+    prefix: global_prefix.
+    sinks:
+      - type: filesink
+        name: default
+        settings:
+          filename: testout.txt
+          clear: true
+          period: 5
+)!!";
+
+
+const char *localConfigYml = R"!!(roxie:
+  metrics:
+    name: config_name
+    prefix: component_prefix.
+    sinks:
+      - name: default
+        metrics:
+          - name: requests
+            measurement_type: count
+          - name: requests
+            measurement_type: resetting_count
+          - name: requests
+            measurement_type: rate
+            description: Number of request arriving per second
+          - name: queuesize
+          - name: requests_dynamic
+            measurement_type: count
+)!!";
+
+
+const char *testConfigYml = R"!!(component:
+  metrics:
+    name: config_name
+    prefix: component_prefix.
+    sinks:
+        sink:
+          - type: filesink
+            name: default
+            settings:
+              filename: testout.txt
+              clear: true
+              period: 5
+)!!";
+
+
+int main(int argc, char *argv[])
+{
+    InitModuleObjects();
+
+    //
+    // Simulate retrieving the component and global config
+    Owned<IPropertyTree> pSettings = createPTreeFromYAMLString(testConfigYml, ipt_none, ptr_ignoreWhiteSpace, nullptr);
+
+    //
+    // Retrieve the global and component metrics config
+    Owned<IPropertyTree> pMetricsTree = pSettings->getPropTree("component/metrics");
+
+    //
+    // Allow override of output file for the file sink
+    if (argc > 1)
+    {
+        auto pSinkTree = pMetricsTree->getPropTree("component/metrics/sinks[1]/settings");
+        pSinkTree->setProp("@filename", argv[1]);
+    }
+
+
+    //
+    // Get singleton
+    MetricsReporter &myReporter = queryMetricsReporter();
+
+    //
+    // Init reporter with config
+    myReporter.init(pMetricsTree);
+
+    //
+    // Now create the metrics and add them to the reporter
+    pEventCountMetric = std::make_shared<CounterMetric>("requests", "The number of requests");
+    myReporter.addMetric(pEventCountMetric);
+
+    pQueueSizeMetric = std::make_shared<GaugeMetric>("queuesize", "request queue size");
+    myReporter.addMetric(pQueueSizeMetric);
+
+    myReporter.startCollecting();
+
+    //
+    // Starts some threads, each updating metrics
+    std::thread first (processThread, 20, 2, true, "requests_dynamic", 4, 10);
+    std::thread second (processThread, 15, 3, false, "", 0, 0);
+
+    first.join();
+    second.join();
+
+    printf("Stopping the collection...");
+    myReporter.stopCollecting();
+    printf("Stopped. Test complete\n");
+}
+
+
+void processThread(int numLoops, unsigned delay, bool addDynamic, const std::string& name, unsigned addAfter, unsigned deleteAfter)
+{
+    std::shared_ptr<GaugeMetric> pDynamicMetric;
+    for (unsigned i=0; i<numLoops; ++i)
+    {
+        if (addDynamic && i == addAfter)
+        {
+            MetricsReporter &myReporter = queryMetricsReporter();
+            pDynamicMetric = std::make_shared<GaugeMetric>(name.c_str(), "The dynamic number of requests");
+            myReporter.addMetric(pDynamicMetric);
+        }
+        else if (addDynamic && i == (addAfter + deleteAfter))
+        {
+            pDynamicMetric.reset();
+        }
+
+        if (pDynamicMetric)
+        {
+            pDynamicMetric->add(1);
+        }
+
+        pEventCountMetric->inc(2u);
+        pQueueSizeMetric->add(3);
+        std::this_thread::sleep_for(std::chrono::seconds(delay));
+        pQueueSizeMetric->add(-1);
+    }
+}