123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391 |
- """
- MIT License
- Copyright (c) 2022 Texas Tech University
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"), to deal
- in the Software without restriction, including without limitation the rights
- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- copies of the Software, and to permit persons to whom the Software is
- furnished to do so, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in all
- copies or substantial portions of the Software.
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- SOFTWARE.
- """
- """
- This file is part of MonSter.
- Author:
- Jie Li, jie.li@ttu.edu
- """
- import logger
- import time
- import multiprocessing
- log = logger.get_logger(__name__)
- def partition(arr:list, cores: int):
- """partition Partition a list
- Partition urls/nodes into several groups based on # of cores
- Args:
- arr (list): A list to be partitioned
- cores (int): Number of cores of the compute running MonSter
- Returns:
- list: partitioned list
- """
-
- groups = []
- try:
- arr_len = len(arr)
- arr_per_core = arr_len // cores
- arr_surplus = arr_len % cores
- increment = 1
- for i in range(cores):
- if(arr_surplus != 0 and i == (cores-1)):
- groups.append(arr[i * arr_per_core:])
- else:
- groups.append(arr[i * arr_per_core : increment * arr_per_core])
- increment += 1
- except Exception as err:
- log.error(f"Cannot Partition the list: {err}")
- return groups
- def fetch(urls: list, nodes: list, username: str, password:str):
- """fetch Fetch Data From Urls
- Fetch Data From Urls of the specified nodes
- Args:
- urls (list): idrac urls
- nodes (list): a list of ip addresses of idracs
- username (str): idrac username
- password (str): idrac password
- Returns:
- [type]: [description]
- """
- idrac_metrics = []
- try:
- asyn_requests = AsyncioRequests(auth = (username, password),
- timeout = (15, 45),
- max_retries = 3)
- idrac_metrics = asyn_requests.bulk_fetch(urls, nodes)
- except Exception as err:
- log.error(f"Cannot fetch data from idrac urls: {err}")
- return idrac_metrics
- def parallel_fetch(urllist: list,
- nodelist: list,
- cores: int,
- username: str,
- password:str):
- """parallel_fetch Parallel Fetch Data from Urls
- Parallel fetch data from rrls of the specified nodes
- Args:
- urllist (list): idrac urls
- nodelist (list): a list of ip addresses of idracs
- cores (int): Number of cores of the compute running MonSter
- username (str): idrac username
- password (str): idrac password
- Returns:
- list: idrac metrics in a list
- """
-
- flatten_metrics = []
- try:
-
- urls_group = partition(urllist, cores)
- nodes_group = partition(nodelist, cores)
- fetch_args = []
- for i in range(cores):
- urls = urls_group[i]
- nodes = nodes_group[i]
- fetch_args.append((urls, nodes, username, password))
- with multiprocessing.Pool() as pool:
- metrics = pool.starmap(fetch, fetch_args)
- flatten_metrics = [item for sublist in metrics for item in sublist]
- except Exception as err:
- log.error(f"Cannot parallel fetch data from idrac urls: {err}")
- return flatten_metrics
- def extract(system_info: dict, bmc_info: dict):
- """extract Extract Info
- Extract system and bmc info
- Args:
- system_info (dict): System info
- bmc_info (dict): BMC info
- Returns:
- dict: extracted info
- """
-
- bmc_ip_addr = system_info["node"]
- system_metrics = system_info["metrics"]
- bmc_metrics = bmc_info["metrics"]
-
- general = ["UUID", "SerialNumber", "HostName", "Model", "Manufacturer"]
- processor = ["ProcessorModel", "ProcessorCount", "LogicalProcessorCount"]
- memory = ["TotalSystemMemoryGiB"]
- bmc = ["BmcModel", "BmcFirmwareVersion"]
- metrics = {}
- try:
-
- if system_metrics:
- service_tag = system_metrics.get("SKU", None)
- else:
- service_tag = None
- metrics.update({
- "ServiceTag": service_tag
- })
-
- if system_metrics:
- for metric in general:
- metrics.update({
- metric: system_metrics.get(metric, None)
- })
- for metric in processor:
- if metric.startswith("Processor"):
- metrics.update({
- metric: system_metrics.get("ProcessorSummary", {}).get(metric[9:], None)
- })
- else:
- metrics.update({
- metric: system_metrics.get("ProcessorSummary", {}).get(metric, None)
- })
- for metric in memory:
- metrics.update({
- metric: system_metrics.get("MemorySummary", {}).get("TotalSystemMemoryGiB", None)
- })
- else:
- for metric in general + processor + memory:
- metrics.update({
- metric: None
- })
- metrics.update({
- "Bmc_Ip_Addr": bmc_ip_addr
- })
-
- if bmc_metrics:
- for metric in bmc:
- metrics.update({
- metric: bmc_metrics.get(metric[3:], None)
- })
- else:
- for metric in bmc:
- metrics.update({
- metric: None
- })
-
-
- if (not system_metrics and
- not bmc_metrics):
- metrics.update({
- "Status": "BMC unreachable in this query"
- })
- else:
- metrics.update({
- "Status": system_metrics.get("Status", {}).get("Health", None)
- })
-
- return metrics
- except Exception as err:
- log.error(f"Cannot extract info from system and bmc: {err}")
- def parallel_extract(system_info_list: list,
- bmc_info_list: list):
- """parallel_extract Parallel Extract Info
- Parallel extract system and bmc info
- Args:
- system_info_list (list): a list of system info
- bmc_info_list (list): a list of bmc info
- Returns:
- list: a list of extracted info
- """
-
- info = []
- try:
- process_args = zip(system_info_list,
- bmc_info_list)
- with multiprocessing.Pool() as pool:
- info = pool.starmap(extract, process_args)
- except Exception as err:
- log.error(f"Cannot parallel extract info from system and bmc: {err}")
- return info
- """
- Process data in the MetricValues, generate raw records
- """
- def process_idrac(ip: str, report: str, metrics: list):
- """process_idrac Process iDRAC Metrics
- Process iDRAC metircs in the MetricValues and generate records
- Args:
- ip (str): iDRAC ip address
- report (str): report name
- metrics (list): a list of metric names
- Returns:
- dict: processed idrac metrics grouped by table name
- """
- idrac_metrics = {}
- try:
- if report == "PowerStatistics":
-
- pass
- else:
- for metric in metrics:
- table_name = ''
- timestamp = ''
- source = ''
- fqdd = ''
- value = ''
- try:
- table_name = metric['MetricId']
- timestamp = metric['Timestamp']
- source = metric['Oem']['Dell']['Source']
- fqdd = metric['Oem']['Dell']['FQDD']
- value = metric['MetricValue']
-
- except:
- pass
- if table_name and timestamp and source and fqdd and value:
- record = {
- 'Timestamp': timestamp,
- 'Source': source,
- 'FQDD': fqdd,
- 'Value': value
- }
- if table_name not in idrac_metrics:
- idrac_metrics.update({
- table_name: [record]
- })
- else:
- idrac_metrics[table_name].append(record)
-
- except Exception as err:
- log.error(f"Fail to process idrac metrics: {err}")
-
- return idrac_metrics
- class AsyncioRequests:
- import aiohttp
- import asyncio
- from aiohttp import ClientSession
- def __init__(self, verify_ssl: bool = False, auth: tuple = (),
- timeout: tuple = (15, 45), max_retries: int = 3):
- self.metrics = {}
- self.timestamp = int(time.time() * 1000000000)
- self.retry = 0
- self.connector=self.aiohttp.TCPConnector(verify_ssl=verify_ssl)
- if auth:
- self.auth = self.aiohttp.BasicAuth(*auth)
- else:
- self.auth = None
- self.timeout = self.aiohttp.ClientTimeout(*timeout)
- self.max_retries = max_retries
- self.loop = self.asyncio.get_event_loop()
-
-
- async def __fetch_json(self,
- url: str,
- node: str,
- session: ClientSession):
- """__fetch_json Fetch Url
- Get request wrapper to fetch json data from API
- Args:
- url (str): url of idrac
- node (str): ip address of the idrac
- session (ClientSession): Client Session
- Returns:
- dict: The return of url in json format
- """
-
- try:
- resp = await session.request(method='GET', url=url)
- resp.raise_for_status()
- json = await resp.json()
- return {"node": node,
- "metrics": json,
- "timestamp": self.timestamp}
- except (TimeoutError):
- self.retry += 1
- if self.retry >= self.max_retries:
- log.error(f"Cannot fetch data from {node} : {url}")
- return {"node": node,
- "metrics": {},
- "timestamp": self.timestamp}
- return await self.__fetch_json(url, node, session)
- except Exception as err:
- log.error(f"Cannot fetch data from {url} : {err}")
- return {"node": node,
- "metrics": {},
- "timestamp": self.timestamp}
- async def __requests(self, urls: list, nodes: list):
- async with self.ClientSession(connector=self.connector,
- auth = self.auth,
- timeout = self.timeout) as session:
- tasks = []
- for i, url in enumerate(urls):
- tasks.append(self.__fetch_json(url=url,
- node=nodes[i],
- session=session))
- return await self.asyncio.gather(*tasks)
- def bulk_fetch(self, urls: list, nodes: list):
- self.metrics = self.loop.run_until_complete(self.__requests(urls, nodes))
- self.loop.close()
- return self.metrics
-
|