jmetrics.cpp 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jmetrics.hpp"
  14. #include "jlog.hpp"
  15. using namespace hpccMetrics;
  16. static Singleton<MetricsManager> metricsManager;
  17. MODULE_INIT(INIT_PRIORITY_STANDARD)
  18. {
  19. return true;
  20. }
  21. MODULE_EXIT()
  22. {
  23. delete metricsManager.queryExisting();
  24. }
  25. MetricsManager &hpccMetrics::queryMetricsManager()
  26. {
  27. return *metricsManager.query([] { return new MetricsManager; });
  28. }
  29. MetricsManager::~MetricsManager()
  30. {
  31. for (auto const &sinkIt : sinks)
  32. {
  33. sinkIt.second->pSink->stopCollection();
  34. delete sinkIt.second->pSink;
  35. }
  36. }
  37. void MetricsManager::init(IPropertyTree *pMetricsTree)
  38. {
  39. Owned<IPropertyTreeIterator> sinkElementsIt = pMetricsTree->getElements("sinks");
  40. initializeSinks(sinkElementsIt);
  41. }
  42. void MetricsManager::addMetric(const std::shared_ptr<IMetric> &pMetric)
  43. {
  44. std::unique_lock<std::mutex> lock(metricVectorMutex);
  45. auto it = metrics.find(pMetric->queryName());
  46. if (it == metrics.end())
  47. {
  48. metrics.insert({pMetric->queryName(), pMetric});
  49. }
  50. else
  51. {
  52. //If there is a match only report an error if the metric has not been destroyed in the meantime
  53. auto match = it->second.lock();
  54. if (match)
  55. {
  56. #ifdef _DEBUG
  57. throw MakeStringException(MSGAUD_operator, "addMetric - Attempted to add duplicate named metric with name '%s'", pMetric->queryName().c_str());
  58. #else
  59. OERRLOG("addMetric - Adding a duplicate named metric '%s', old metric replaced", pMetric->queryName().c_str());
  60. #endif
  61. }
  62. it->second = pMetric;
  63. }
  64. }
  65. void MetricsManager::startCollecting()
  66. {
  67. for (auto const &sinkIt : sinks)
  68. {
  69. sinkIt.second->pSink->startCollection(this);
  70. }
  71. }
  72. void MetricsManager::stopCollecting()
  73. {
  74. for (auto const &sinkIt : sinks)
  75. {
  76. sinkIt.second->pSink->stopCollection();
  77. }
  78. }
  79. std::vector<std::shared_ptr<IMetric>> MetricsManager::queryMetricsForReport(const std::string &sinkName)
  80. {
  81. std::vector<std::shared_ptr<IMetric>> reportMetrics;
  82. reportMetrics.reserve(metrics.size());
  83. auto it = sinks.find(sinkName);
  84. if (it != sinks.end())
  85. {
  86. //
  87. // Lock the list of metrics while it's in use
  88. std::unique_lock<std::mutex> lock(metricVectorMutex); // no one else can mess with it for a bit
  89. auto metricIt=metrics.begin();
  90. while (metricIt != metrics.end())
  91. {
  92. auto pMetric = metricIt->second.lock();
  93. if (pMetric)
  94. {
  95. // This is where the metric would be compared against the list of metrics to be reported
  96. // by the sink (probably a regex). This allows limiting the metrics reported to the sink.
  97. // for now, only the default is supported which is reporting all metrics.
  98. reportMetrics.emplace_back(std::move(pMetric));
  99. ++metricIt;
  100. }
  101. else
  102. {
  103. metricIt = metrics.erase(metricIt);
  104. }
  105. }
  106. }
  107. else
  108. {
  109. throw MakeStringException(MSGAUD_operator, "queryMetricsForReport - sink name %s not found", sinkName.c_str());
  110. }
  111. return reportMetrics;
  112. }
  113. void MetricsManager::initializeSinks(IPropertyTreeIterator *pSinkIt)
  114. {
  115. for (pSinkIt->first(); pSinkIt->isValid(); pSinkIt->next())
  116. {
  117. Owned<IPropertyTree> pSinkTree = &pSinkIt->get(); // pSinkIt->query();
  118. StringBuffer cfgSinkType, cfgSinkName;
  119. pSinkTree->getProp("@type", cfgSinkType);
  120. pSinkTree->getProp("@name", cfgSinkName);
  121. //
  122. // Make sure both name and type are provided
  123. if (cfgSinkType.isEmpty() || cfgSinkName.isEmpty())
  124. {
  125. throw MakeStringException(MSGAUD_operator, "initializeSinks - All sinks definitions must specify a name and a type");
  126. }
  127. //
  128. // If sink already registered, use it, otherwise it's new.
  129. auto sinkIt = sinks.find(cfgSinkName.str());
  130. if (sinkIt == sinks.end())
  131. {
  132. Owned<IPropertyTree> pSinkSettings = pSinkTree->getPropTree("settings");
  133. MetricSink *pSink = getSinkFromLib(cfgSinkType.str(), cfgSinkName.str(), pSinkSettings);
  134. sinks.insert({std::string(cfgSinkName.str()), std::unique_ptr<SinkInfo>(new SinkInfo(pSink))});
  135. }
  136. }
  137. }
  138. MetricSink *MetricsManager::getSinkFromLib(const char *type, const char *sinkName, const IPropertyTree *pSettingsTree)
  139. {
  140. std::string libName;
  141. libName.append("libhpccmetrics_").append(type).append("sink").append(SharedObjectExtension);
  142. HINSTANCE libHandle = LoadSharedObject(libName.c_str(), true, false);
  143. //
  144. // If able to load the lib, get the instance proc and create the sink instance
  145. MetricSink *pSink = nullptr;
  146. if (libHandle != nullptr)
  147. {
  148. auto getInstanceProc = (getSinkInstance) GetSharedProcedure(libHandle, "getSinkInstance");
  149. if (getInstanceProc != nullptr)
  150. {
  151. pSink = getInstanceProc(sinkName, pSettingsTree);
  152. if (pSink == nullptr)
  153. {
  154. throw MakeStringException(MSGAUD_operator, "getSinkFromLib - Unable to get sink instance");
  155. }
  156. }
  157. else
  158. {
  159. throw MakeStringException(MSGAUD_operator, "getSinkFromLib - Unable to get shared procedure (getSinkInstance)");
  160. }
  161. }
  162. else
  163. {
  164. throw MakeStringException(MSGAUD_operator, "getSinkFromLib - Unable to load sink lib (%s)", libName.c_str());
  165. }
  166. return pSink;
  167. }
  168. // Method for use when testing
  169. void MetricsManager::addSink(MetricSink *pSink, const char *name)
  170. {
  171. //
  172. // Add the sink if it does not already exist, otherwise delete the sink because
  173. // we are taking ownership.
  174. auto sinkIt = sinks.find(name);
  175. if (sinkIt == sinks.end())
  176. {
  177. sinks.insert({std::string(name), std::unique_ptr<SinkInfo>(new SinkInfo(pSink))});
  178. }
  179. else
  180. {
  181. delete pSink;
  182. }
  183. }
  184. PeriodicMetricSink::PeriodicMetricSink(const char *name, const char *type, const IPropertyTree *pSettingsTree) :
  185. MetricSink(name, type),
  186. collectionPeriodSeconds{60}
  187. {
  188. if (pSettingsTree->hasProp("@period"))
  189. {
  190. collectionPeriodSeconds = pSettingsTree->getPropInt("@period");
  191. }
  192. }
  193. PeriodicMetricSink::~PeriodicMetricSink()
  194. {
  195. if (isCollecting)
  196. {
  197. doStopCollecting();
  198. }
  199. }
  200. void PeriodicMetricSink::startCollection(MetricsManager *_pManager)
  201. {
  202. pManager = _pManager;
  203. prepareToStartCollecting();
  204. isCollecting = true;
  205. collectThread = std::thread(&PeriodicMetricSink::collectionThread, this);
  206. }
  207. void PeriodicMetricSink::collectionThread()
  208. {
  209. //
  210. // The initial wait for the first report
  211. waitSem.wait(collectionPeriodSeconds * 1000);
  212. while (!stopCollectionFlag)
  213. {
  214. doCollection();
  215. // Wait again
  216. waitSem.wait(collectionPeriodSeconds * 1000);
  217. }
  218. }
  219. void PeriodicMetricSink::stopCollection()
  220. {
  221. if (isCollecting)
  222. {
  223. doStopCollecting();
  224. }
  225. }
  226. void PeriodicMetricSink::doStopCollecting()
  227. {
  228. //
  229. // Set the stop collecting flag, then signal the wait semaphore
  230. // to wake up and stop the collection thread
  231. stopCollectionFlag = true;
  232. waitSem.signal();
  233. isCollecting = false;
  234. collectThread.join();
  235. collectingHasStopped();
  236. }