jmetrics.cpp 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jmetrics.hpp"
  14. #include "jmutex.hpp"
  15. #include "jlog.hpp"
  16. using namespace hpccMetrics;
  17. static Singleton<MetricsReporter> metricsReporter;
  18. MODULE_INIT(INIT_PRIORITY_STANDARD)
  19. {
  20. return true;
  21. }
  22. MODULE_EXIT()
  23. {
  24. delete metricsReporter.queryExisting();
  25. }
  26. struct hpccMetrics::SinkInfo
  27. {
  28. explicit SinkInfo(MetricSink *_pSink) : pSink{_pSink} {}
  29. MetricSink *pSink = nullptr; // ptr to the sink
  30. std::vector<std::string> reportMetrics; // vector of metrics to report (empty for none)
  31. };
  32. MetricsReporter &hpccMetrics::queryMetricsReporter()
  33. {
  34. return *metricsReporter.query([] { return new MetricsReporter; });
  35. }
  36. MetricsReporter::~MetricsReporter()
  37. {
  38. for (auto const &sinkIt : sinks)
  39. {
  40. sinkIt.second->pSink->stopCollection();
  41. delete sinkIt.second->pSink;
  42. }
  43. }
  44. void MetricsReporter::init(IPropertyTree *pMetricsTree)
  45. {
  46. Owned<IPropertyTree> pSinkTree = pMetricsTree->getPropTree("sinks");
  47. Owned<IPropertyTreeIterator> sinkElementsIt = pSinkTree->getElements("sink");
  48. initializeSinks(sinkElementsIt);
  49. }
  50. void MetricsReporter::addMetric(const std::shared_ptr<IMetric> &pMetric)
  51. {
  52. std::unique_lock<std::mutex> lock(metricVectorMutex);
  53. auto it = metrics.find(pMetric->queryName());
  54. if (it == metrics.end())
  55. {
  56. metrics.insert({pMetric->queryName(), pMetric});
  57. }
  58. else
  59. {
  60. throw MakeStringException(MSGAUD_operator, "addMetric - Attempted to add duplicate named metric with name %s", pMetric->queryName().c_str());
  61. }
  62. }
  63. void MetricsReporter::startCollecting()
  64. {
  65. for (auto const &sinkIt : sinks)
  66. {
  67. sinkIt.second->pSink->startCollection(this);
  68. }
  69. }
  70. void MetricsReporter::stopCollecting()
  71. {
  72. for (auto const &sinkIt : sinks)
  73. {
  74. sinkIt.second->pSink->stopCollection();
  75. }
  76. }
  77. std::vector<std::shared_ptr<IMetric>> MetricsReporter::queryMetricsForReport(const std::string &sinkName)
  78. {
  79. std::vector<std::shared_ptr<IMetric>> reportMetrics;
  80. reportMetrics.reserve(metrics.size());
  81. auto it = sinks.find(sinkName);
  82. if (it != sinks.end())
  83. {
  84. //
  85. // Lock the list of metrics while it's in use
  86. std::unique_lock<std::mutex> lock(metricVectorMutex); // no one else can mess with it for a bit
  87. auto metricIt=metrics.begin();
  88. while (metricIt != metrics.end())
  89. {
  90. auto pMetric = metricIt->second.lock();
  91. if (pMetric)
  92. {
  93. // This is where the metric would be compared against the list of metrics to be reported
  94. // by the sink (probably a regex). This allows limiting the metrics reported to the sink.
  95. // for now, only the default is supported which is reporting all metrics.
  96. reportMetrics.emplace_back(std::move(pMetric));
  97. ++metricIt;
  98. }
  99. else
  100. {
  101. metricIt = metrics.erase(metricIt);
  102. }
  103. }
  104. }
  105. else
  106. {
  107. throw MakeStringException(MSGAUD_operator, "queryMetricsForReport - sink name %s not found", sinkName.c_str());
  108. }
  109. return reportMetrics;
  110. }
  111. void MetricsReporter::initializeSinks(IPropertyTreeIterator *pSinkIt)
  112. {
  113. for (pSinkIt->first(); pSinkIt->isValid(); pSinkIt->next())
  114. {
  115. Owned<IPropertyTree> pSinkTree = &pSinkIt->get(); // pSinkIt->query();
  116. StringBuffer cfgSinkType, cfgSinkName;
  117. pSinkTree->getProp("@type", cfgSinkType);
  118. pSinkTree->getProp("@name", cfgSinkName);
  119. //
  120. // Make sure both name and type are provided
  121. if (cfgSinkType.isEmpty() || cfgSinkName.isEmpty())
  122. {
  123. throw MakeStringException(MSGAUD_operator, "initializeSinks - All sinks definitions must specify a name and a type");
  124. }
  125. //
  126. // If sink already registered, use it, otherwise it's new.
  127. auto sinkIt = sinks.find(cfgSinkName.str());
  128. if (sinkIt == sinks.end())
  129. {
  130. Owned<IPropertyTree> pSinkSettings = pSinkTree->getPropTree("settings");
  131. MetricSink *pSink = getSinkFromLib(cfgSinkType.str(), cfgSinkName.str(), pSinkSettings);
  132. sinks.insert({std::string(cfgSinkName.str()), std::unique_ptr<SinkInfo>(new SinkInfo(pSink))});
  133. }
  134. }
  135. }
  136. MetricSink *MetricsReporter::getSinkFromLib(const char *type, const char *sinkName, const IPropertyTree *pSettingsTree)
  137. {
  138. std::string libName;
  139. libName.append("libhpccmetrics_").append(type).append(SharedObjectExtension);
  140. HINSTANCE libHandle = LoadSharedObject(libName.c_str(), true, false);
  141. //
  142. // If able to load the lib, get the instance proc and create the sink instance
  143. MetricSink *pSink = nullptr;
  144. if (libHandle != nullptr)
  145. {
  146. auto getInstanceProc = (getSinkInstance) GetSharedProcedure(libHandle, "getSinkInstance");
  147. if (getInstanceProc != nullptr)
  148. {
  149. pSink = getInstanceProc(sinkName, pSettingsTree);
  150. if (pSink == nullptr)
  151. {
  152. throw MakeStringException(MSGAUD_operator, "getSinkFromLib - Unable to get sink instance");
  153. }
  154. }
  155. else
  156. {
  157. throw MakeStringException(MSGAUD_operator, "getSinkFromLib - Unable to get shared procedure (getSinkInstance)");
  158. }
  159. }
  160. else
  161. {
  162. throw MakeStringException(MSGAUD_operator, "getSinkFromLib - Unable to load sink lib (%s)", libName.c_str());
  163. }
  164. return pSink;
  165. }