api_utils.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. """
  2. MIT License
  3. Copyright (c) 2022 Texas Tech University
  4. Permission is hereby granted, free of charge, to any person obtaining a copy
  5. of this software and associated documentation files (the "Software"), to deal
  6. in the Software without restriction, including without limitation the rights
  7. to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  8. copies of the Software, and to permit persons to whom the Software is
  9. furnished to do so, subject to the following conditions:
  10. The above copyright notice and this permission notice shall be included in all
  11. copies or substantial portions of the Software.
  12. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  13. IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  14. FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  15. AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  16. LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  17. OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  18. SOFTWARE.
  19. """
  20. """
  21. This file is part of MonSter.
  22. Author:
  23. Jie Li, jie.li@ttu.edu
  24. """
  25. import sql
  26. import pandas as pd
  27. import sqlalchemy as db
  28. def get_id_node_mapping(connection: str):
  29. """get_id_node_mapping Get ID-Node Mapping
  30. Get ID-Node Mapping
  31. Args:
  32. connection (str): connection string
  33. """
  34. engine = db.create_engine(connection)
  35. connect = engine.connect()
  36. mapping_sql = "SELECT nodeid, hostname FROM nodes;"
  37. mapping_df = pd.read_sql_query(mapping_sql,con=connect)
  38. mapping = pd.Series(mapping_df.hostname.values, index=mapping_df.nodeid).to_dict()
  39. connect.close()
  40. return mapping
  41. def get_metric_fqdd_mapping(connection: str):
  42. """get_metric_fqdd_mapping Get Metric-FQDD Mapping
  43. Get metric-fqdd mapping
  44. Args:
  45. connection (str): connection string
  46. """
  47. engine = db.create_engine(connection)
  48. metadata = db.MetaData()
  49. connect = engine.connect()
  50. mapping = {}
  51. metric_list = get_avail_metrics(connect, metadata, engine)
  52. for metric in metric_list:
  53. fqdd = get_avail_metric_fqdd(connect, metadata, engine, metric)
  54. if fqdd:
  55. mapping.update({
  56. metric: fqdd
  57. })
  58. connect.close()
  59. return mapping
  60. def get_avail_metrics(connect: object,
  61. metadata: object,
  62. engine: object):
  63. """get_avail_metrics Get Available Metrics
  64. Get available metrics from the table metrics_definition
  65. Args:
  66. connect (object): SqlAlchemy engine connect
  67. metadata (object): SqlAlchemy metadata
  68. engine (object): SqlAlchemy engine
  69. """
  70. result = []
  71. metrics_definition = db.Table('metrics_definition',
  72. metadata,
  73. autoload=True,
  74. autoload_with=engine)
  75. query = db.select([metrics_definition])
  76. result_proxy = connect.execute(query)
  77. result = result_proxy.fetchall()
  78. metric_list = [i[1] for i in result]
  79. return metric_list
  80. def get_avail_metric_fqdd(connect: object,
  81. metadata: object,
  82. engine: object,
  83. metric: str):
  84. """get_avail_metric_fqdd Get Available Metric FQDD
  85. Get available fqdd of a metric based on the metrics collected in the last
  86. half hour.
  87. Args:
  88. connect (object): SqlAlchemy engine connect
  89. metadata (object): SqlAlchemy metadata
  90. engine (object): SqlAlchemy engine
  91. metric: metric name
  92. """
  93. fqdd = []
  94. metric = metric.lower()
  95. table = db.Table(metric,
  96. metadata,
  97. autoload=True,
  98. autoload_with=engine,
  99. schema = 'idrac')
  100. # Find unique fqdd values
  101. query = db.select([table.columns.fqdd.distinct()]).limit(50)
  102. result_proxy = connect.execute(query)
  103. result = result_proxy.fetchall()
  104. if result:
  105. fqdd = [i[0] for i in result if i[0]]
  106. return fqdd
  107. def get_metric_fqdd_tree(metric_fqdd_mapping: dict):
  108. """get_metric_fqdd_tree Get Metric-FQDD Tree
  109. Get metric-fqdd tree for grafana
  110. Args:
  111. metric_fqdd_mapping (dict): metric-fqdd mapping
  112. """
  113. partition = 'idrac'
  114. metric_fqdd_tree = {
  115. 'name': 'root',
  116. 'children': []
  117. }
  118. # iDRAC metrics
  119. metric_fqdd_list = []
  120. for metric, fqdds in metric_fqdd_mapping.items():
  121. children = []
  122. for fqdd in fqdds:
  123. child = {
  124. 'name': fqdd, 'value': f'{partition} | {metric} | {fqdd}'
  125. }
  126. children.append(child)
  127. metric_fqdd_list.append({
  128. 'name': metric, 'children': children
  129. })
  130. child_dict = {
  131. 'name': partition,
  132. 'children': metric_fqdd_list
  133. }
  134. metric_fqdd_tree['children'].append(child_dict)
  135. # Slurm metrics
  136. slurm_child_dict = {
  137. 'name': 'slurm',
  138. 'children': [
  139. {
  140. 'name': 'memoryusage',
  141. 'children': [
  142. {
  143. 'name': 'Memory Usage',
  144. 'value': 'slurm | memoryusage | Memory Usage'
  145. }
  146. ]
  147. },
  148. {
  149. 'name': 'memory_used',
  150. 'children': [
  151. {
  152. 'name': 'Memory Used',
  153. 'value': 'slurm | memory_used | Memory Used'
  154. },
  155. ]
  156. },
  157. {
  158. 'name': 'cpu_load',
  159. 'children': [
  160. {
  161. 'name': 'CPU Load',
  162. 'value': 'slurm | cpu_load | CPU Load'
  163. }
  164. ]
  165. },
  166. ]
  167. }
  168. metric_fqdd_tree['children'].append(slurm_child_dict)
  169. return metric_fqdd_tree
  170. def query_tsdb(request: object, id_node_mapping: dict, connection: str):
  171. """query_tsdb Query TSDB
  172. Query TSDB based on the flask request.
  173. Args:
  174. request (object): flask request
  175. id_node_mapping (dict): Node-ID mapping
  176. connection (str): tsdb connection
  177. """
  178. # Initialize sqlalchemy connection
  179. engine = db.create_engine(connection)
  180. connect = engine.connect()
  181. results = []
  182. req = request.get_json(silent=True)
  183. # Request details
  184. time_range = req.get('range')
  185. interval = req.get('interval')
  186. targets = req.get('targets')
  187. # Extract time range (from, to), metrics
  188. start = time_range.get('from')
  189. end = time_range.get('to')
  190. # TO DO: add aggregation function in the targets
  191. for target in targets:
  192. req_metric = target.get('metric', '')
  193. req_type = target.get('type', '')
  194. nodes = target.get('nodes', '')
  195. if req_metric and req_type == 'metrics' and len(req_metric.split(' | ')) == 3:
  196. partition = req_metric.split(' | ')[0]
  197. metric = req_metric.split(' | ')[1]
  198. fqdd = req_metric.split(' | ')[2]
  199. metrics = query_filter_metrics(engine,
  200. metric,
  201. fqdd,
  202. nodes,
  203. id_node_mapping,
  204. start,
  205. end,
  206. interval,
  207. partition)
  208. results.append(metrics)
  209. if req_type == 'jobs':
  210. users = target.get('users', '')
  211. if not users:
  212. users = get_users(engine, start, end)
  213. jobs = query_filter_jobs(engine, users, start, end, id_node_mapping)
  214. results.append(jobs)
  215. if req_type == 'node_core':
  216. node_core = query_node_core(engine,
  217. start,
  218. end,
  219. interval,
  220. id_node_mapping)
  221. results.append(node_core)
  222. connect.close()
  223. return results
  224. def query_filter_metrics(engine: object,
  225. metric: str,
  226. fqdd: str,
  227. nodes: list,
  228. id_node_mapping: dict,
  229. start: str,
  230. end: str,
  231. interval: str,
  232. partition: str,
  233. aggregate: str = 'max'):
  234. """query_filter_metrics Query Filter Metrics
  235. Query and filter metrics from TSDB
  236. Args:
  237. engine (object): sqlalchemy engine
  238. metric (str): metric name
  239. fqdd (str): fqdd name
  240. nodes (list): target nodes
  241. id_node_mapping (dict): id-node mapping
  242. start (str): start of time range
  243. end (str): end of time range
  244. interval (str): aggregation interval
  245. partition (str): partition name.
  246. aggregate (str, optional): aggregation function. Defaults to 'max'.
  247. """
  248. if partition == 'slurm':
  249. sql_str = sql.generate_slurm_sql(metric,
  250. start,
  251. end,
  252. interval,
  253. aggregate)
  254. else:
  255. sql_str = sql.generate_idrac_sql(metric,
  256. fqdd,
  257. start,
  258. end,
  259. interval,
  260. aggregate)
  261. df = pd.read_sql_query(sql_str, con=engine)
  262. # Filter nodes
  263. if nodes:
  264. fi_df = df[df['nodeid'].isin(nodes)].copy()
  265. else:
  266. fi_df = df
  267. # Add label in slurm metrics
  268. if partition == 'slurm':
  269. fi_df['label'] = metric
  270. # Convert node id to node name
  271. fi_df['nodeid'] = fi_df['nodeid'].apply(lambda x: id_node_mapping[x])
  272. fi_df['label'] = fi_df['label'].apply(lambda x: f'{metric}|{x}')
  273. # Pivot the table
  274. df_p = fi_df.pivot(index='time', columns=['nodeid', 'label'], values='value')
  275. # Flatten the table
  276. df_p.columns = [ '|'.join([str(c) for c in c_list]) for c_list in df_p.columns.values ]
  277. metrics = metrics_df_to_response(df_p)
  278. return metrics
  279. def metrics_df_to_response(df: object):
  280. """metrics_df_to_response Metrics DF to Response
  281. Convert dataframe to response in format required by Grafana
  282. Args:
  283. df (dataframe): dataframe
  284. """
  285. # Convert index to column
  286. df.reset_index(inplace=True)
  287. # Convert datetime to epoch time
  288. df['time'] = df['time'].apply(lambda x: int(x.timestamp() * 1000))
  289. columns = [{'text': 'time', 'type': 'time'}]
  290. columns_raw = list(df.columns)
  291. for column in columns_raw[1:]:
  292. c_text = f"{column.split('|')[1]}-{column.split('|')[2]}"
  293. c_type = 'number'
  294. c_label = f"| {column.split('|')[0]}"
  295. column_info = {'text': c_text, 'type': c_type, 'label': c_label}
  296. columns.append(column_info)
  297. rows = df.values.tolist()
  298. response = {'columns': columns, 'rows': rows}
  299. return response
  300. def get_users(engine: object, start: str, end: str):
  301. """get_users Get Users
  302. Get users from the slurm jobs info
  303. Args:
  304. engine (object): sqlalchemy engine
  305. start (str): start time
  306. end (str): end time
  307. """
  308. sql_str = sql.generate_slurm_jobs_sql(start, end)
  309. df = pd.read_sql_query(sql_str,con=engine)
  310. users = df['user_name'].unique()
  311. # Remove duplicated users
  312. all_users = list(set(users))
  313. return all_users
  314. def query_filter_jobs(engine: object,
  315. users: list,
  316. start: str,
  317. end: str,
  318. id_node_mapping: dict):
  319. """query_filter_jobs Query Filter jobs
  320. Query and filter jobs from TSDB
  321. Args:
  322. engine (object): sqlalchemy engine
  323. users (list): selected users
  324. start (str): start of time range
  325. end (str): end of time range
  326. id_node_mapping (dict): id-node mapping
  327. """
  328. sql_str = sql.generate_slurm_jobs_sql(start, end)
  329. df = pd.read_sql_query(sql_str,con=engine)
  330. # Node hostname
  331. nodes = list(id_node_mapping.values())
  332. # Filter jobs based on users and the batch_host that falls in the nodes and
  333. fi_df = df[df['batch_host'].isin(nodes) & df['user_name'].isin(users)].copy()
  334. jobs = jobs_df_to_response(fi_df)
  335. return jobs
  336. def jobs_df_to_response(df: object):
  337. """jobs_df_to_response Jobs DF to Response
  338. Convert dataframe to response in format required by Grafana
  339. Args:
  340. df (dataframe): dataframe
  341. Returns:
  342. response (dict): response
  343. """
  344. columns = []
  345. selected_columns = ['job_id', 'name', 'user_id', 'user_name', 'batch_host',
  346. 'nodes', 'node_count', 'cpus', 'start_time', 'end_time']
  347. selected_df = df[selected_columns].copy()
  348. selected_df['nodes'] = selected_df['nodes'].apply(lambda x: '{' + ', '.join(x) + '}')
  349. columns_raw = list(selected_df.columns)
  350. for column in columns_raw:
  351. if column in ['job_id', 'user_id', 'cpus', 'start_time', 'end_time']:
  352. c_type = 'number'
  353. else:
  354. c_type = 'string'
  355. column_info = {'text': column, 'type': c_type}
  356. columns.append(column_info)
  357. rows = selected_df.values.tolist()
  358. response = {'columns': columns, 'rows': rows}
  359. return response
  360. def query_node_core(engine: object,
  361. start: str,
  362. end: str,
  363. interval: str,
  364. id_node_mapping: dict):
  365. """query_node_core Query Node Core
  366. Query Node-Core info from TSDB
  367. Args:
  368. engine (object): sqlalchemy engine
  369. start (str): start of time range
  370. end (str): end of time range
  371. interval (str): time interval for aggregation
  372. """
  373. sql_str = sql.generate_node_jobs_sql(start, end, interval)
  374. df = pd.read_sql_query(sql_str,con=engine)
  375. node_jobs = node_jobs_df_to_response(df, id_node_mapping)
  376. return node_jobs
  377. def node_jobs_df_to_response(df: object, id_node_mapping: dict):
  378. """node_jobs_df_to_response Node-Jobs DF to Response
  379. Convert dataframe to response in format required by Grafana
  380. Args:
  381. df (dataframe): dataframe
  382. Returns:
  383. response (dict): response
  384. """
  385. df['time'] = df['time'].apply(lambda x: int(x.timestamp() * 1000))
  386. df['nodeid'] = df['nodeid'].apply(lambda x: id_node_mapping[x])
  387. df['fl_jobs'] = df.apply(
  388. lambda df_row: flatten_array(df_row, 'jobs'), axis = 1)
  389. df['fl_cpus'] = df.apply(
  390. lambda df_row: flatten_array(df_row, 'cpus'), axis = 1)
  391. df.drop(columns = ['jobs', 'cpus'], inplace = True)
  392. df.rename(columns={'fl_jobs': 'jobs', 'fl_cpus': 'cpus'}, inplace = True)
  393. columns = [{'text': 'time', 'type': 'time'}]
  394. columns_raw = list(df.columns)
  395. for column in columns_raw[1:]:
  396. column_info = {'text': column, 'type': 'string'}
  397. columns.append(column_info)
  398. rows = df.values.tolist()
  399. response = {'columns': columns, 'rows': rows}
  400. return response
  401. def flatten_array(df_row: object, column: str):
  402. """flatten_array Flatten Array
  403. Flatten array of array for jobs and cores info
  404. Args:
  405. df_row (object): dataframe row
  406. column (str): column name
  407. """
  408. jobs = []
  409. cpus = []
  410. job_id_array = df_row['jobs']
  411. cpus_array = df_row['cpus']
  412. try:
  413. if job_id_array:
  414. # Flatten array
  415. fl_job_id_array = [item for sublist in job_id_array for item in sublist]
  416. fl_cpus_array = [item for sublist in cpus_array for item in sublist]
  417. # Only keep unique jobs
  418. for i, job in enumerate(fl_job_id_array):
  419. if job not in jobs:
  420. jobs.append(str(job))
  421. cpus.append(str(fl_cpus_array[i]))
  422. except:
  423. # print(f"{df_row['time']} - {df_row['nodeid']}")
  424. pass
  425. if column == 'jobs':
  426. str_jobs = '{' + (', ').join(jobs) + '}'
  427. return str_jobs
  428. else:
  429. str_cpus = '{' + (', ').join(cpus) + '}'
  430. return str_cpus