process.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. """
  2. MIT License
  3. Copyright (c) 2022 Texas Tech University
  4. Permission is hereby granted, free of charge, to any person obtaining a copy
  5. of this software and associated documentation files (the "Software"), to deal
  6. in the Software without restriction, including without limitation the rights
  7. to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  8. copies of the Software, and to permit persons to whom the Software is
  9. furnished to do so, subject to the following conditions:
  10. The above copyright notice and this permission notice shall be included in all
  11. copies or substantial portions of the Software.
  12. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  13. IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  14. FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  15. AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  16. LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  17. OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  18. SOFTWARE.
  19. """
  20. """
  21. This file is part of MonSter.
  22. Author:
  23. Jie Li, jie.li@ttu.edu
  24. """
  25. import logger
  26. import time
  27. import multiprocessing
  28. log = logger.get_logger(__name__)
  29. def partition(arr:list, cores: int):
  30. """partition Partition a list
  31. Partition urls/nodes into several groups based on # of cores
  32. Args:
  33. arr (list): A list to be partitioned
  34. cores (int): Number of cores of the compute running MonSter
  35. Returns:
  36. list: partitioned list
  37. """
  38. groups = []
  39. try:
  40. arr_len = len(arr)
  41. arr_per_core = arr_len // cores
  42. arr_surplus = arr_len % cores
  43. increment = 1
  44. for i in range(cores):
  45. if(arr_surplus != 0 and i == (cores-1)):
  46. groups.append(arr[i * arr_per_core:])
  47. else:
  48. groups.append(arr[i * arr_per_core : increment * arr_per_core])
  49. increment += 1
  50. except Exception as err:
  51. log.error(f"Cannot Partition the list: {err}")
  52. return groups
  53. def fetch(urls: list, nodes: list, username: str, password:str):
  54. """fetch Fetch Data From Urls
  55. Fetch Data From Urls of the specified nodes
  56. Args:
  57. urls (list): idrac urls
  58. nodes (list): a list of ip addresses of idracs
  59. username (str): idrac username
  60. password (str): idrac password
  61. Returns:
  62. [type]: [description]
  63. """
  64. idrac_metrics = []
  65. try:
  66. asyn_requests = AsyncioRequests(auth = (username, password),
  67. timeout = (15, 45),
  68. max_retries = 3)
  69. idrac_metrics = asyn_requests.bulk_fetch(urls, nodes)
  70. except Exception as err:
  71. log.error(f"Cannot fetch data from idrac urls: {err}")
  72. return idrac_metrics
  73. def parallel_fetch(urllist: list,
  74. nodelist: list,
  75. cores: int,
  76. username: str,
  77. password:str):
  78. """parallel_fetch Parallel Fetch Data from Urls
  79. Parallel fetch data from rrls of the specified nodes
  80. Args:
  81. urllist (list): idrac urls
  82. nodelist (list): a list of ip addresses of idracs
  83. cores (int): Number of cores of the compute running MonSter
  84. username (str): idrac username
  85. password (str): idrac password
  86. Returns:
  87. list: idrac metrics in a list
  88. """
  89. flatten_metrics = []
  90. try:
  91. # Partition
  92. urls_group = partition(urllist, cores)
  93. nodes_group = partition(nodelist, cores)
  94. fetch_args = []
  95. for i in range(cores):
  96. urls = urls_group[i]
  97. nodes = nodes_group[i]
  98. fetch_args.append((urls, nodes, username, password))
  99. with multiprocessing.Pool() as pool:
  100. metrics = pool.starmap(fetch, fetch_args)
  101. flatten_metrics = [item for sublist in metrics for item in sublist]
  102. except Exception as err:
  103. log.error(f"Cannot parallel fetch data from idrac urls: {err}")
  104. return flatten_metrics
  105. def extract(system_info: dict, bmc_info: dict):
  106. """extract Extract Info
  107. Extract system and bmc info
  108. Args:
  109. system_info (dict): System info
  110. bmc_info (dict): BMC info
  111. Returns:
  112. dict: extracted info
  113. """
  114. bmc_ip_addr = system_info["node"]
  115. system_metrics = system_info["metrics"]
  116. bmc_metrics = bmc_info["metrics"]
  117. general = ["UUID", "SerialNumber", "HostName", "Model", "Manufacturer"]
  118. processor = ["ProcessorModel", "ProcessorCount", "LogicalProcessorCount"]
  119. memory = ["TotalSystemMemoryGiB"]
  120. bmc = ["BmcModel", "BmcFirmwareVersion"]
  121. metrics = {}
  122. try:
  123. # Update service tag
  124. if system_metrics:
  125. service_tag = system_metrics.get("SKU", None)
  126. else:
  127. service_tag = None
  128. metrics.update({
  129. "ServiceTag": service_tag
  130. })
  131. # Update System metrics
  132. if system_metrics:
  133. for metric in general:
  134. metrics.update({
  135. metric: system_metrics.get(metric, None)
  136. })
  137. for metric in processor:
  138. if metric.startswith("Processor"):
  139. metrics.update({
  140. metric: system_metrics.get("ProcessorSummary", {}).get(metric[9:], None)
  141. })
  142. else:
  143. metrics.update({
  144. metric: system_metrics.get("ProcessorSummary", {}).get(metric, None)
  145. })
  146. for metric in memory:
  147. metrics.update({
  148. metric: system_metrics.get("MemorySummary", {}).get("TotalSystemMemoryGiB", None)
  149. })
  150. else:
  151. for metric in general + processor + memory:
  152. metrics.update({
  153. metric: None
  154. })
  155. metrics.update({
  156. "Bmc_Ip_Addr": bmc_ip_addr
  157. })
  158. # Update BMC metrics
  159. if bmc_metrics:
  160. for metric in bmc:
  161. metrics.update({
  162. metric: bmc_metrics.get(metric[3:], None)
  163. })
  164. else:
  165. for metric in bmc:
  166. metrics.update({
  167. metric: None
  168. })
  169. # Update Status
  170. if (not system_metrics and
  171. not bmc_metrics):
  172. metrics.update({
  173. "Status": "BMC unreachable in this query"
  174. })
  175. else:
  176. metrics.update({
  177. "Status": system_metrics.get("Status", {}).get("Health", None)
  178. })
  179. return metrics
  180. except Exception as err:
  181. log.error(f"Cannot extract info from system and bmc: {err}")
  182. def parallel_extract(system_info_list: list,
  183. bmc_info_list: list):
  184. """parallel_extract Parallel Extract Info
  185. Parallel extract system and bmc info
  186. Args:
  187. system_info_list (list): a list of system info
  188. bmc_info_list (list): a list of bmc info
  189. Returns:
  190. list: a list of extracted info
  191. """
  192. info = []
  193. try:
  194. process_args = zip(system_info_list,
  195. bmc_info_list)
  196. with multiprocessing.Pool() as pool:
  197. info = pool.starmap(extract, process_args)
  198. except Exception as err:
  199. log.error(f"Cannot parallel extract info from system and bmc: {err}")
  200. return info
  201. """
  202. Process data in the MetricValues, generate raw records
  203. """
  204. def process_idrac(ip: str, report: str, metrics: list):
  205. """process_idrac Process iDRAC Metrics
  206. Process iDRAC metircs in the MetricValues and generate records
  207. Args:
  208. ip (str): iDRAC ip address
  209. report (str): report name
  210. metrics (list): a list of metric names
  211. Returns:
  212. dict: processed idrac metrics grouped by table name
  213. """
  214. idrac_metrics = {}
  215. try:
  216. if report == "PowerStatistics":
  217. # PowerStatistics is better to be pulled
  218. pass
  219. else:
  220. for metric in metrics:
  221. table_name = ''
  222. timestamp = ''
  223. source = ''
  224. fqdd = ''
  225. value = ''
  226. try:
  227. table_name = metric['MetricId']
  228. timestamp = metric['Timestamp']
  229. source = metric['Oem']['Dell']['Source']
  230. fqdd = metric['Oem']['Dell']['FQDD']
  231. value = metric['MetricValue']
  232. # print(f"Time Stamp: {timestamp}")
  233. except:
  234. pass
  235. if table_name and timestamp and source and fqdd and value:
  236. record = {
  237. 'Timestamp': timestamp,
  238. 'Source': source,
  239. 'FQDD': fqdd,
  240. 'Value': value
  241. }
  242. if table_name not in idrac_metrics:
  243. idrac_metrics.update({
  244. table_name: [record]
  245. })
  246. else:
  247. idrac_metrics[table_name].append(record)
  248. except Exception as err:
  249. log.error(f"Fail to process idrac metrics: {err}")
  250. return idrac_metrics
  251. class AsyncioRequests:
  252. import aiohttp
  253. import asyncio
  254. from aiohttp import ClientSession
  255. def __init__(self, verify_ssl: bool = False, auth: tuple = (),
  256. timeout: tuple = (15, 45), max_retries: int = 3):
  257. self.metrics = {}
  258. self.timestamp = int(time.time() * 1000000000)
  259. self.retry = 0
  260. self.connector=self.aiohttp.TCPConnector(verify_ssl=verify_ssl)
  261. if auth:
  262. self.auth = self.aiohttp.BasicAuth(*auth)
  263. else:
  264. self.auth = None
  265. self.timeout = self.aiohttp.ClientTimeout(*timeout)
  266. self.max_retries = max_retries
  267. self.loop = self.asyncio.get_event_loop()
  268. async def __fetch_json(self,
  269. url: str,
  270. node: str,
  271. session: ClientSession):
  272. """__fetch_json Fetch Url
  273. Get request wrapper to fetch json data from API
  274. Args:
  275. url (str): url of idrac
  276. node (str): ip address of the idrac
  277. session (ClientSession): Client Session
  278. Returns:
  279. dict: The return of url in json format
  280. """
  281. try:
  282. resp = await session.request(method='GET', url=url)
  283. resp.raise_for_status()
  284. json = await resp.json()
  285. return {"node": node,
  286. "metrics": json,
  287. "timestamp": self.timestamp}
  288. except (TimeoutError):
  289. self.retry += 1
  290. if self.retry >= self.max_retries:
  291. log.error(f"Cannot fetch data from {node} : {url}")
  292. return {"node": node,
  293. "metrics": {},
  294. "timestamp": self.timestamp}
  295. return await self.__fetch_json(url, node, session)
  296. except Exception as err:
  297. log.error(f"Cannot fetch data from {url} : {err}")
  298. return {"node": node,
  299. "metrics": {},
  300. "timestamp": self.timestamp}
  301. async def __requests(self, urls: list, nodes: list):
  302. async with self.ClientSession(connector=self.connector,
  303. auth = self.auth,
  304. timeout = self.timeout) as session:
  305. tasks = []
  306. for i, url in enumerate(urls):
  307. tasks.append(self.__fetch_json(url=url,
  308. node=nodes[i],
  309. session=session))
  310. return await self.asyncio.gather(*tasks)
  311. def bulk_fetch(self, urls: list, nodes: list):
  312. self.metrics = self.loop.run_until_complete(self.__requests(urls, nodes))
  313. self.loop.close()
  314. return self.metrics