prometheusSink.cpp 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. /*################################################################################
  2. # HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. ################################################################################
  16. */
  17. #include "jlog.hpp"
  18. #include "prometheusSink.hpp"
  19. using namespace hpccMetrics;
  20. extern "C" MetricSink* getSinkInstance(const char *name, const IPropertyTree *pSettingsTree)
  21. {
  22. return new PrometheusMetricSink(name, pSettingsTree);
  23. }
  24. PrometheusMetricSink::PrometheusMetricSink(const char *name, const IPropertyTree *pSettingsTree) :
  25. MetricSink(name, PROMETHEUS_REPORTER_TYPE)
  26. {
  27. m_metricsManager = nullptr;
  28. m_metricsSinkName = name;
  29. m_processing = false;
  30. m_port = DEFAULT_PROMETHEUS_METRICS_SERVICE_PORT;
  31. m_verbose = false;
  32. if (pSettingsTree)
  33. {
  34. m_verbose = pSettingsTree->getPropBool("@verbose", false);
  35. m_port = pSettingsTree->getPropInt("@port", DEFAULT_PROMETHEUS_METRICS_SERVICE_PORT);
  36. pSettingsTree->getProp("@serviceName", m_metricsServiceName);
  37. if (m_metricsServiceName.isEmpty())
  38. m_metricsServiceName = DEFAULT_PROMETHEUS_METRICS_SERVICE_NAME;
  39. else
  40. if (m_metricsServiceName.charAt(0) != '/')
  41. m_metricsServiceName.insert(0, '/');
  42. m_server.set_error_handler([](const Request& req, Response& res)
  43. {
  44. StringBuffer msg(detail::status_message(res.status));
  45. if (res.status == 500)
  46. {
  47. if (res.has_header(HTTPLIB_ERROR_MESSAGE_HEADER_NAME))
  48. msg.append(" - ").append(res.get_header_value(HTTPLIB_ERROR_MESSAGE_HEADER_NAME).c_str());
  49. else
  50. msg.append(" - ").append("encountered unknown error!");
  51. LOG(MCinternalError, "PrometheusMetricsService: %s", msg.str());
  52. }
  53. VStringBuffer respmessage(PROMETHEUS_METRICS_HTTP_ERROR, msg.str(), req.path.c_str(), res.status);
  54. res.set_content(respmessage.str(), PROMETHEUS_METRICS_SERVICE_RESP_TYPE);
  55. LOG(MCuserInfo, "TxSummary[status=%d;user=@%s:%d;contLen=%ld;req=%s;]\n", res.status, req.remote_addr.c_str(), req.remote_port, req.content_length, req.method.c_str());
  56. });
  57. m_server.Get(m_metricsServiceName.str(), [&](const Request& req, Response& res)
  58. {
  59. StringBuffer payload;
  60. toPrometheusMetrics(m_metricsManager->queryMetricsForReport(std::string(m_metricsSinkName.str())), payload, m_verbose);
  61. res.set_content(payload.str(), PROMETHEUS_METRICS_SERVICE_RESP_TYPE);
  62. LOG(MCuserInfo, "TxSummary[status=%d;user=@%s:%d;contLen=%ld;req=GET;]\n",res.status, req.remote_addr.c_str(), req.remote_port, req.content_length);
  63. });
  64. }
  65. }
  66. const char * PrometheusMetricSink::mapHPCCMetricTypeToPrometheusStr(MetricType type)
  67. {
  68. switch (type)
  69. {
  70. case hpccMetrics::METRICS_COUNTER:
  71. return "counter";
  72. case hpccMetrics::METRICS_GAUGE:
  73. return "gauge";
  74. default:
  75. LOG(MCdebugInfo, "Encountered unknown metric - cannot map to Prometheus metric!");
  76. return nullptr;
  77. }
  78. }
  79. void PrometheusMetricSink::toPrometheusMetrics(const std::vector<std::shared_ptr<IMetric>> & reportMetrics, StringBuffer & out, bool verbose)
  80. {
  81. /*
  82. * [# HELP <metric name> <metric summary>\n]
  83. * [# TYPE <metric name> <metric type, if missing, metric is un-typed>\n]
  84. * <metric name> [{<label name>=<label value>, ...}] <metric value>\n
  85. *
  86. * where metric name := [a-zA-Z_:][a-zA-Z0-9_:]*
  87. */
  88. for (auto &pMetric: reportMetrics)
  89. {
  90. const std::string & name = pMetric->queryName();
  91. if (verbose)
  92. {
  93. if (!pMetric->queryDescription().empty())
  94. out.append("# HELP ").append(name.c_str()).append(" ").append(pMetric->queryDescription().c_str()).append("\n");
  95. MetricType type = pMetric->queryMetricType();
  96. const char * promtype = mapHPCCMetricTypeToPrometheusStr(type);
  97. if (promtype)
  98. out.append("# TYPE ").append(name.c_str()).append(" ").append(promtype).append("\n");
  99. }
  100. out.append(name.c_str()).append(" ").append(pMetric->queryValue()).append("\n");
  101. }
  102. }
  103. void PrometheusMetricSink::startCollection(MetricsManager *_pManager)
  104. {
  105. if (!_pManager)
  106. throw MakeStringException(-1, "PrometheusMetricsService: NULL MetricsManager detected!");
  107. m_metricsManager = _pManager;
  108. m_collectThread = std::thread(collectionThread, this);
  109. m_processing = true;
  110. }
  111. void PrometheusMetricSink::stopCollection()
  112. {
  113. LOG(MCoperatorProgress, "PrometheusMetricsService stopping: port: '%i' uri: '%s' sinkname: '%s'\n", m_port, m_metricsServiceName.str(), m_metricsSinkName.str());
  114. m_processing = false;
  115. m_server.stop();
  116. m_collectThread.join();
  117. }
  118. void PrometheusMetricSink::startServer()
  119. {
  120. LOG(MCoperatorProgress, "PrometheusMetricsService started: port: '%i' uri: '%s' sinkname: '%s'\n", m_port, m_metricsServiceName.str(), m_metricsSinkName.str());
  121. m_server.listen(BIND_ALL_LOCAL_NICS, m_port);
  122. }