prometheusSink.cpp 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. /*################################################################################
  2. # HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. ################################################################################
  16. */
  17. #include "jlog.hpp"
  18. #include "prometheusSink.hpp"
  19. using namespace hpccMetrics;
  20. extern "C" MetricSink* getSinkInstance(const char *name, const IPropertyTree *pSettingsTree)
  21. {
  22. return new PrometheusMetricSink(name, pSettingsTree);
  23. }
  24. PrometheusMetricSink::PrometheusMetricSink(const char *name, const IPropertyTree *pSettingsTree) :
  25. MetricSink(name, PROMETHEUS_REPORTER_TYPE)
  26. {
  27. m_metricsManager = nullptr;
  28. m_metricsSinkName = name;
  29. m_processing = false;
  30. m_port = DEFAULT_PROMETHEUS_METRICS_SERVICE_PORT;
  31. m_verbose = false;
  32. if (pSettingsTree)
  33. {
  34. m_verbose = pSettingsTree->getPropBool("@verbose", false);
  35. m_port = pSettingsTree->getPropInt("@port", DEFAULT_PROMETHEUS_METRICS_SERVICE_PORT);
  36. pSettingsTree->getProp("@serviceName", m_metricsServiceName);
  37. if (m_metricsServiceName.isEmpty())
  38. m_metricsServiceName = DEFAULT_PROMETHEUS_METRICS_SERVICE_NAME;
  39. else
  40. if (m_metricsServiceName.charAt(0) != '/')
  41. m_metricsServiceName.insert(0, '/');
  42. m_server.set_error_handler([](const Request& req, Response& res)
  43. {
  44. StringBuffer msg(detail::status_message(res.status));
  45. if (res.status == 500)
  46. {
  47. if (res.has_header(HTTPLIB_ERROR_MESSAGE_HEADER_NAME))
  48. msg.append(" - ").append(res.get_header_value(HTTPLIB_ERROR_MESSAGE_HEADER_NAME).c_str());
  49. else
  50. msg.append(" - ").append("encountered unknown error!");
  51. LOG(MCinternalError, "PrometheusMetricsService: %s", msg.str());
  52. }
  53. VStringBuffer respmessage(PROMETHEUS_METRICS_HTTP_ERROR, msg.str(), req.path.c_str(), res.status);
  54. res.set_content(respmessage.str(), PROMETHEUS_METRICS_SERVICE_RESP_TYPE);
  55. LOG(MCuserError, "PrometheusMetricsService Error: %s", msg.str());
  56. LOG(MCuserInfo, "TxSummary[status=%d;user=@%s:%d;contLen=%ld;req=%s;path=%s]", res.status, req.remote_addr.c_str(), req.remote_port, req.content_length, req.method.c_str(), req.path.c_str());
  57. });
  58. m_server.Get(m_metricsServiceName.str(), [&](const Request& req, Response& res)
  59. {
  60. LOG(MCdebugInfo, "GET PrometheusMetricsService%s, from %s:%d", req.path.c_str(), req.remote_addr.c_str(), req.remote_port);
  61. StringBuffer payload;
  62. toPrometheusMetrics(m_metricsManager->queryMetricsForReport(std::string(m_metricsSinkName.str())), payload, m_verbose);
  63. res.set_content(payload.str(), PROMETHEUS_METRICS_SERVICE_RESP_TYPE);
  64. res.status = 200;
  65. LOG(MCdebugInfo, "PrometheusMetricsService Response: %s\n", payload.str());
  66. LOG(MCuserInfo, "TxSummary[status=%d;user=@%s:%d;contLen=%ld;req=GET;path=%s]", res.status, req.remote_addr.c_str(), req.remote_port, req.content_length, req.path.c_str());
  67. });
  68. }
  69. }
  70. const char * PrometheusMetricSink::mapHPCCMetricTypeToPrometheusStr(MetricType type)
  71. {
  72. switch (type)
  73. {
  74. case hpccMetrics::METRICS_COUNTER:
  75. return "counter";
  76. case hpccMetrics::METRICS_GAUGE:
  77. return "gauge";
  78. default:
  79. LOG(MCinternalWarning, "Encountered unknown metric - cannot map to Prometheus metric!");
  80. return nullptr;
  81. }
  82. }
  83. void PrometheusMetricSink::toPrometheusMetrics(const std::vector<std::shared_ptr<IMetric>> & reportMetrics, StringBuffer & out, bool verbose)
  84. {
  85. /*
  86. * [# HELP <metric name> <metric summary>\n]
  87. * [# TYPE <metric name> <metric type, if missing, metric is un-typed>\n]
  88. * <metric name> [{<label name>=<label value>, ...}] <metric value>\n
  89. *
  90. * where metric name := [a-zA-Z_:][a-zA-Z0-9_:]*
  91. */
  92. for (auto &pMetric: reportMetrics)
  93. {
  94. std::string name = pMetric->queryName();
  95. //'.' is a known char used in HPCC metric names but invalid in Prometheus
  96. std::replace(name.begin(), name.end(), '.', '_');
  97. if (verbose)
  98. {
  99. if (!pMetric->queryDescription().empty())
  100. out.append("# HELP ").append(name.c_str()).append(" ").append(pMetric->queryDescription().c_str()).append("\n");
  101. MetricType type = pMetric->queryMetricType();
  102. const char * promtype = mapHPCCMetricTypeToPrometheusStr(type);
  103. if (promtype)
  104. out.append("# TYPE ").append(name.c_str()).append(" ").append(promtype).append("\n");
  105. }
  106. out.append(name.c_str());
  107. auto metaData = pMetric->queryMetaData();
  108. if (metaData.size()>0)
  109. {
  110. out.append(" {");
  111. bool firstEntry = true;
  112. for (auto &metaDataIt: metaData)
  113. {
  114. if (!firstEntry)
  115. out.append(",");
  116. else
  117. firstEntry=false;
  118. out.append(metaDataIt.key.c_str()).append("=\"").append(metaDataIt.value.c_str()).append("\"");
  119. }
  120. out.append("}");
  121. }
  122. out.append(" ").append(pMetric->queryValue()).append("\n");
  123. }
  124. }
  125. void PrometheusMetricSink::startCollection(MetricsManager *_pManager)
  126. {
  127. if (!_pManager)
  128. throw MakeStringException(-1, "PrometheusMetricsService: NULL MetricsManager detected!");
  129. m_metricsManager = _pManager;
  130. m_collectThread = std::thread(collectionThread, this);
  131. m_processing = true;
  132. }
  133. void PrometheusMetricSink::stopCollection()
  134. {
  135. LOG(MCoperatorProgress, "PrometheusMetricsService stopping: port: '%i' uri: '%s' sinkname: '%s'", m_port, m_metricsServiceName.str(), m_metricsSinkName.str());
  136. m_processing = false;
  137. m_server.stop();
  138. m_collectThread.join();
  139. }
  140. void PrometheusMetricSink::startServer()
  141. {
  142. LOG(MCoperatorProgress, "PrometheusMetricsService started: port: '%i' uri: '%s' sinkname: '%s'", m_port, m_metricsServiceName.str(), m_metricsSinkName.str());
  143. m_server.listen(BIND_ALL_LOCAL_NICS, m_port);
  144. }