|
@@ -0,0 +1,540 @@
|
|
|
+"""
|
|
|
+MIT License
|
|
|
+Copyright (c) 2022 Texas Tech University
|
|
|
+Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
|
+of this software and associated documentation files (the "Software"), to deal
|
|
|
+in the Software without restriction, including without limitation the rights
|
|
|
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
|
+copies of the Software, and to permit persons to whom the Software is
|
|
|
+furnished to do so, subject to the following conditions:
|
|
|
+The above copyright notice and this permission notice shall be included in all
|
|
|
+copies or substantial portions of the Software.
|
|
|
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
|
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
|
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
|
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
|
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
|
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
|
+SOFTWARE.
|
|
|
+"""
|
|
|
+
|
|
|
+"""
|
|
|
+This file is part of MonSter.
|
|
|
+Author:
|
|
|
+ Jie Li, jie.li@ttu.edu
|
|
|
+"""
|
|
|
+
|
|
|
+import sql
|
|
|
+import pandas as pd
|
|
|
+import sqlalchemy as db
|
|
|
+
|
|
|
+def get_id_node_mapping(connection: str):
|
|
|
+ """get_id_node_mapping Get ID-Node Mapping
|
|
|
+
|
|
|
+ Get ID-Node Mapping
|
|
|
+
|
|
|
+ Args:
|
|
|
+ connection (str): connection string
|
|
|
+
|
|
|
+ """
|
|
|
+ engine = db.create_engine(connection)
|
|
|
+ connect = engine.connect()
|
|
|
+ mapping_sql = "SELECT nodeid, hostname FROM nodes;"
|
|
|
+ mapping_df = pd.read_sql_query(mapping_sql,con=connect)
|
|
|
+ mapping = pd.Series(mapping_df.hostname.values, index=mapping_df.nodeid).to_dict()
|
|
|
+ connect.close()
|
|
|
+ return mapping
|
|
|
+
|
|
|
+
|
|
|
+def get_metric_fqdd_mapping(connection: str):
|
|
|
+ """get_metric_fqdd_mapping Get Metric-FQDD Mapping
|
|
|
+
|
|
|
+ Get metric-fqdd mapping
|
|
|
+
|
|
|
+ Args:
|
|
|
+ connection (str): connection string
|
|
|
+ """
|
|
|
+ engine = db.create_engine(connection)
|
|
|
+ metadata = db.MetaData()
|
|
|
+ connect = engine.connect()
|
|
|
+
|
|
|
+ mapping = {}
|
|
|
+
|
|
|
+ metric_list = get_avail_metrics(connect, metadata, engine)
|
|
|
+
|
|
|
+ for metric in metric_list:
|
|
|
+ fqdd = get_avail_metric_fqdd(connect, metadata, engine, metric)
|
|
|
+ if fqdd:
|
|
|
+ mapping.update({
|
|
|
+ metric: fqdd
|
|
|
+ })
|
|
|
+ connect.close()
|
|
|
+
|
|
|
+ return mapping
|
|
|
+
|
|
|
+
|
|
|
+def get_avail_metrics(connect: object,
|
|
|
+ metadata: object,
|
|
|
+ engine: object):
|
|
|
+ """get_avail_metrics Get Available Metrics
|
|
|
+
|
|
|
+ Get available metrics from the table metrics_definition
|
|
|
+
|
|
|
+ Args:
|
|
|
+ connect (object): SqlAlchemy engine connect
|
|
|
+ metadata (object): SqlAlchemy metadata
|
|
|
+ engine (object): SqlAlchemy engine
|
|
|
+ """
|
|
|
+ result = []
|
|
|
+ metrics_definition = db.Table('metrics_definition',
|
|
|
+ metadata,
|
|
|
+ autoload=True,
|
|
|
+ autoload_with=engine)
|
|
|
+ query = db.select([metrics_definition])
|
|
|
+ result_proxy = connect.execute(query)
|
|
|
+ result = result_proxy.fetchall()
|
|
|
+ metric_list = [i[1] for i in result]
|
|
|
+ return metric_list
|
|
|
+
|
|
|
+
|
|
|
+def get_avail_metric_fqdd(connect: object,
|
|
|
+ metadata: object,
|
|
|
+ engine: object,
|
|
|
+ metric: str):
|
|
|
+ """get_avail_metric_fqdd Get Available Metric FQDD
|
|
|
+
|
|
|
+ Get available fqdd of a metric based on the metrics collected in the last
|
|
|
+ half hour.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ connect (object): SqlAlchemy engine connect
|
|
|
+ metadata (object): SqlAlchemy metadata
|
|
|
+ engine (object): SqlAlchemy engine
|
|
|
+ metric: metric name
|
|
|
+ """
|
|
|
+ fqdd = []
|
|
|
+ metric = metric.lower()
|
|
|
+ table = db.Table(metric,
|
|
|
+ metadata,
|
|
|
+ autoload=True,
|
|
|
+ autoload_with=engine,
|
|
|
+ schema = 'idrac')
|
|
|
+
|
|
|
+ # Find unique fqdd values
|
|
|
+ query = db.select([table.columns.fqdd.distinct()]).limit(50)
|
|
|
+ result_proxy = connect.execute(query)
|
|
|
+ result = result_proxy.fetchall()
|
|
|
+
|
|
|
+ if result:
|
|
|
+ fqdd = [i[0] for i in result if i[0]]
|
|
|
+
|
|
|
+ return fqdd
|
|
|
+
|
|
|
+
|
|
|
+def get_metric_fqdd_tree(metric_fqdd_mapping: dict):
|
|
|
+ """get_metric_fqdd_tree Get Metric-FQDD Tree
|
|
|
+
|
|
|
+ Get metric-fqdd tree for grafana
|
|
|
+
|
|
|
+ Args:
|
|
|
+ metric_fqdd_mapping (dict): metric-fqdd mapping
|
|
|
+ """
|
|
|
+ partition = 'idrac'
|
|
|
+ metric_fqdd_tree = {
|
|
|
+ 'name': 'root',
|
|
|
+ 'children': []
|
|
|
+ }
|
|
|
+
|
|
|
+ # iDRAC metrics
|
|
|
+ metric_fqdd_list = []
|
|
|
+ for metric, fqdds in metric_fqdd_mapping.items():
|
|
|
+ children = []
|
|
|
+ for fqdd in fqdds:
|
|
|
+ child = {
|
|
|
+ 'name': fqdd, 'value': f'{partition} | {metric} | {fqdd}'
|
|
|
+ }
|
|
|
+ children.append(child)
|
|
|
+ metric_fqdd_list.append({
|
|
|
+ 'name': metric, 'children': children
|
|
|
+ })
|
|
|
+
|
|
|
+ child_dict = {
|
|
|
+ 'name': partition,
|
|
|
+ 'children': metric_fqdd_list
|
|
|
+ }
|
|
|
+ metric_fqdd_tree['children'].append(child_dict)
|
|
|
+
|
|
|
+ # Slurm metrics
|
|
|
+ slurm_child_dict = {
|
|
|
+ 'name': 'slurm',
|
|
|
+ 'children': [
|
|
|
+ {
|
|
|
+ 'name': 'memoryusage',
|
|
|
+ 'children': [
|
|
|
+ {
|
|
|
+ 'name': 'Memory Usage',
|
|
|
+ 'value': 'slurm | memoryusage | Memory Usage'
|
|
|
+ }
|
|
|
+ ]
|
|
|
+ },
|
|
|
+ {
|
|
|
+ 'name': 'memory_used',
|
|
|
+ 'children': [
|
|
|
+ {
|
|
|
+ 'name': 'Memory Used',
|
|
|
+ 'value': 'slurm | memory_used | Memory Used'
|
|
|
+ },
|
|
|
+ ]
|
|
|
+ },
|
|
|
+ {
|
|
|
+ 'name': 'cpu_load',
|
|
|
+ 'children': [
|
|
|
+ {
|
|
|
+ 'name': 'CPU Load',
|
|
|
+ 'value': 'slurm | cpu_load | CPU Load'
|
|
|
+ }
|
|
|
+ ]
|
|
|
+ },
|
|
|
+ ]
|
|
|
+ }
|
|
|
+ metric_fqdd_tree['children'].append(slurm_child_dict)
|
|
|
+
|
|
|
+ return metric_fqdd_tree
|
|
|
+
|
|
|
+
|
|
|
+def query_tsdb(request: object, id_node_mapping: dict, connection: str):
|
|
|
+ """query_tsdb Query TSDB
|
|
|
+
|
|
|
+ Query TSDB based on the flask request.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ request (object): flask request
|
|
|
+ id_node_mapping (dict): Node-ID mapping
|
|
|
+ connection (str): tsdb connection
|
|
|
+ """
|
|
|
+ # Initialize sqlalchemy connection
|
|
|
+ engine = db.create_engine(connection)
|
|
|
+ connect = engine.connect()
|
|
|
+
|
|
|
+ results = []
|
|
|
+ req = request.get_json(silent=True)
|
|
|
+
|
|
|
+ # Request details
|
|
|
+ time_range = req.get('range')
|
|
|
+ interval = req.get('interval')
|
|
|
+ targets = req.get('targets')
|
|
|
+
|
|
|
+ # Extract time range (from, to), metrics
|
|
|
+ start = time_range.get('from')
|
|
|
+ end = time_range.get('to')
|
|
|
+
|
|
|
+ # TO DO: add aggregation function in the targets
|
|
|
+
|
|
|
+ for target in targets:
|
|
|
+ req_metric = target.get('metric', '')
|
|
|
+ req_type = target.get('type', '')
|
|
|
+ nodes = target.get('nodes', '')
|
|
|
+ if req_metric and req_type == 'metrics' and len(req_metric.split(' | ')) == 3:
|
|
|
+ partition = req_metric.split(' | ')[0]
|
|
|
+ metric = req_metric.split(' | ')[1]
|
|
|
+ fqdd = req_metric.split(' | ')[2]
|
|
|
+ metrics = query_filter_metrics(engine,
|
|
|
+ metric,
|
|
|
+ fqdd,
|
|
|
+ nodes,
|
|
|
+ id_node_mapping,
|
|
|
+ start,
|
|
|
+ end,
|
|
|
+ interval,
|
|
|
+ partition)
|
|
|
+ results.append(metrics)
|
|
|
+
|
|
|
+ if req_type == 'jobs':
|
|
|
+ users = target.get('users', '')
|
|
|
+ if not users:
|
|
|
+ users = get_users(engine, start, end)
|
|
|
+ jobs = query_filter_jobs(engine, users, start, end, id_node_mapping)
|
|
|
+ results.append(jobs)
|
|
|
+
|
|
|
+ if req_type == 'node_core':
|
|
|
+ node_core = query_node_core(engine,
|
|
|
+ start,
|
|
|
+ end,
|
|
|
+ interval,
|
|
|
+ id_node_mapping)
|
|
|
+ results.append(node_core)
|
|
|
+
|
|
|
+ connect.close()
|
|
|
+ return results
|
|
|
+
|
|
|
+
|
|
|
+def query_filter_metrics(engine: object,
|
|
|
+ metric: str,
|
|
|
+ fqdd: str,
|
|
|
+ nodes: list,
|
|
|
+ id_node_mapping: dict,
|
|
|
+ start: str,
|
|
|
+ end: str,
|
|
|
+ interval: str,
|
|
|
+ partition: str,
|
|
|
+ aggregate: str = 'max'):
|
|
|
+ """query_filter_metrics Query Filter Metrics
|
|
|
+
|
|
|
+ Query and filter metrics from TSDB
|
|
|
+
|
|
|
+ Args:
|
|
|
+ engine (object): sqlalchemy engine
|
|
|
+ metric (str): metric name
|
|
|
+ fqdd (str): fqdd name
|
|
|
+ nodes (list): target nodes
|
|
|
+ id_node_mapping (dict): id-node mapping
|
|
|
+ start (str): start of time range
|
|
|
+ end (str): end of time range
|
|
|
+ interval (str): aggregation interval
|
|
|
+ partition (str): partition name.
|
|
|
+ aggregate (str, optional): aggregation function. Defaults to 'max'.
|
|
|
+
|
|
|
+ """
|
|
|
+ if partition == 'slurm':
|
|
|
+ sql_str = sql.generate_slurm_sql(metric,
|
|
|
+ start,
|
|
|
+ end,
|
|
|
+ interval,
|
|
|
+ aggregate)
|
|
|
+ else:
|
|
|
+ sql_str = sql.generate_idrac_sql(metric,
|
|
|
+ fqdd,
|
|
|
+ start,
|
|
|
+ end,
|
|
|
+ interval,
|
|
|
+ aggregate)
|
|
|
+
|
|
|
+ df = pd.read_sql_query(sql_str, con=engine)
|
|
|
+
|
|
|
+ # Filter nodes
|
|
|
+ if nodes:
|
|
|
+ fi_df = df[df['nodeid'].isin(nodes)].copy()
|
|
|
+ else:
|
|
|
+ fi_df = df
|
|
|
+
|
|
|
+ # Add label in slurm metrics
|
|
|
+ if partition == 'slurm':
|
|
|
+ fi_df['label'] = metric
|
|
|
+
|
|
|
+ # Convert node id to node name
|
|
|
+ fi_df['nodeid'] = fi_df['nodeid'].apply(lambda x: id_node_mapping[x])
|
|
|
+ fi_df['label'] = fi_df['label'].apply(lambda x: f'{metric}|{x}')
|
|
|
+
|
|
|
+ # Pivot the table
|
|
|
+ df_p = fi_df.pivot(index='time', columns=['nodeid', 'label'], values='value')
|
|
|
+
|
|
|
+ # Flatten the table
|
|
|
+ df_p.columns = [ '|'.join([str(c) for c in c_list]) for c_list in df_p.columns.values ]
|
|
|
+
|
|
|
+ metrics = metrics_df_to_response(df_p)
|
|
|
+
|
|
|
+ return metrics
|
|
|
+
|
|
|
+
|
|
|
+def metrics_df_to_response(df: object):
|
|
|
+ """metrics_df_to_response Metrics DF to Response
|
|
|
+
|
|
|
+ Convert dataframe to response in format required by Grafana
|
|
|
+
|
|
|
+ Args:
|
|
|
+ df (dataframe): dataframe
|
|
|
+
|
|
|
+ """
|
|
|
+ # Convert index to column
|
|
|
+ df.reset_index(inplace=True)
|
|
|
+
|
|
|
+ # Convert datetime to epoch time
|
|
|
+ df['time'] = df['time'].apply(lambda x: int(x.timestamp() * 1000))
|
|
|
+
|
|
|
+ columns = [{'text': 'time', 'type': 'time'}]
|
|
|
+ columns_raw = list(df.columns)
|
|
|
+ for column in columns_raw[1:]:
|
|
|
+ c_text = f"{column.split('|')[1]}-{column.split('|')[2]}"
|
|
|
+ c_type = 'number'
|
|
|
+ c_label = f"| {column.split('|')[0]}"
|
|
|
+ column_info = {'text': c_text, 'type': c_type, 'label': c_label}
|
|
|
+ columns.append(column_info)
|
|
|
+
|
|
|
+ rows = df.values.tolist()
|
|
|
+ response = {'columns': columns, 'rows': rows}
|
|
|
+ return response
|
|
|
+
|
|
|
+
|
|
|
+def get_users(engine: object, start: str, end: str):
|
|
|
+ """get_users Get Users
|
|
|
+
|
|
|
+ Get users from the slurm jobs info
|
|
|
+
|
|
|
+ Args:
|
|
|
+ engine (object): sqlalchemy engine
|
|
|
+ start (str): start time
|
|
|
+ end (str): end time
|
|
|
+
|
|
|
+ """
|
|
|
+ sql_str = sql.generate_slurm_jobs_sql(start, end)
|
|
|
+ df = pd.read_sql_query(sql_str,con=engine)
|
|
|
+ users = df['user_name'].unique()
|
|
|
+
|
|
|
+ # Remove duplicated users
|
|
|
+ all_users = list(set(users))
|
|
|
+ return all_users
|
|
|
+
|
|
|
+
|
|
|
+def query_filter_jobs(engine: object,
|
|
|
+ users: list,
|
|
|
+ start: str,
|
|
|
+ end: str,
|
|
|
+ id_node_mapping: dict):
|
|
|
+ """query_filter_jobs Query Filter jobs
|
|
|
+
|
|
|
+ Query and filter jobs from TSDB
|
|
|
+
|
|
|
+ Args:
|
|
|
+ engine (object): sqlalchemy engine
|
|
|
+ users (list): selected users
|
|
|
+ start (str): start of time range
|
|
|
+ end (str): end of time range
|
|
|
+ id_node_mapping (dict): id-node mapping
|
|
|
+
|
|
|
+ """
|
|
|
+ sql_str = sql.generate_slurm_jobs_sql(start, end)
|
|
|
+ df = pd.read_sql_query(sql_str,con=engine)
|
|
|
+
|
|
|
+ # Node hostname
|
|
|
+ nodes = list(id_node_mapping.values())
|
|
|
+
|
|
|
+ # Filter jobs based on users and the batch_host that falls in the nodes and
|
|
|
+ fi_df = df[df['batch_host'].isin(nodes) & df['user_name'].isin(users)].copy()
|
|
|
+
|
|
|
+ jobs = jobs_df_to_response(fi_df)
|
|
|
+ return jobs
|
|
|
+
|
|
|
+
|
|
|
+def jobs_df_to_response(df: object):
|
|
|
+ """jobs_df_to_response Jobs DF to Response
|
|
|
+
|
|
|
+ Convert dataframe to response in format required by Grafana
|
|
|
+
|
|
|
+ Args:
|
|
|
+ df (dataframe): dataframe
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ response (dict): response
|
|
|
+ """
|
|
|
+ columns = []
|
|
|
+ selected_columns = ['job_id', 'name', 'user_id', 'user_name', 'batch_host',
|
|
|
+ 'nodes', 'node_count', 'cpus', 'start_time', 'end_time']
|
|
|
+ selected_df = df[selected_columns].copy()
|
|
|
+
|
|
|
+ selected_df['nodes'] = selected_df['nodes'].apply(lambda x: '{' + ', '.join(x) + '}')
|
|
|
+
|
|
|
+ columns_raw = list(selected_df.columns)
|
|
|
+ for column in columns_raw:
|
|
|
+ if column in ['job_id', 'user_id', 'cpus', 'start_time', 'end_time']:
|
|
|
+ c_type = 'number'
|
|
|
+ else:
|
|
|
+ c_type = 'string'
|
|
|
+
|
|
|
+ column_info = {'text': column, 'type': c_type}
|
|
|
+ columns.append(column_info)
|
|
|
+
|
|
|
+ rows = selected_df.values.tolist()
|
|
|
+ response = {'columns': columns, 'rows': rows}
|
|
|
+ return response
|
|
|
+
|
|
|
+
|
|
|
+def query_node_core(engine: object,
|
|
|
+ start: str,
|
|
|
+ end: str,
|
|
|
+ interval: str,
|
|
|
+ id_node_mapping: dict):
|
|
|
+ """query_node_core Query Node Core
|
|
|
+
|
|
|
+ Query Node-Core info from TSDB
|
|
|
+
|
|
|
+ Args:
|
|
|
+ engine (object): sqlalchemy engine
|
|
|
+ start (str): start of time range
|
|
|
+ end (str): end of time range
|
|
|
+ interval (str): time interval for aggregation
|
|
|
+
|
|
|
+ """
|
|
|
+ sql_str = sql.generate_node_jobs_sql(start, end, interval)
|
|
|
+ df = pd.read_sql_query(sql_str,con=engine)
|
|
|
+ node_jobs = node_jobs_df_to_response(df, id_node_mapping)
|
|
|
+ return node_jobs
|
|
|
+
|
|
|
+
|
|
|
+def node_jobs_df_to_response(df: object, id_node_mapping: dict):
|
|
|
+ """node_jobs_df_to_response Node-Jobs DF to Response
|
|
|
+
|
|
|
+ Convert dataframe to response in format required by Grafana
|
|
|
+
|
|
|
+ Args:
|
|
|
+ df (dataframe): dataframe
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ response (dict): response
|
|
|
+ """
|
|
|
+
|
|
|
+ df['time'] = df['time'].apply(lambda x: int(x.timestamp() * 1000))
|
|
|
+
|
|
|
+ df['nodeid'] = df['nodeid'].apply(lambda x: id_node_mapping[x])
|
|
|
+
|
|
|
+ df['fl_jobs'] = df.apply(
|
|
|
+ lambda df_row: flatten_array(df_row, 'jobs'), axis = 1)
|
|
|
+ df['fl_cpus'] = df.apply(
|
|
|
+ lambda df_row: flatten_array(df_row, 'cpus'), axis = 1)
|
|
|
+ df.drop(columns = ['jobs', 'cpus'], inplace = True)
|
|
|
+ df.rename(columns={'fl_jobs': 'jobs', 'fl_cpus': 'cpus'}, inplace = True)
|
|
|
+
|
|
|
+ columns = [{'text': 'time', 'type': 'time'}]
|
|
|
+ columns_raw = list(df.columns)
|
|
|
+ for column in columns_raw[1:]:
|
|
|
+ column_info = {'text': column, 'type': 'string'}
|
|
|
+ columns.append(column_info)
|
|
|
+
|
|
|
+ rows = df.values.tolist()
|
|
|
+ response = {'columns': columns, 'rows': rows}
|
|
|
+ return response
|
|
|
+
|
|
|
+
|
|
|
+def flatten_array(df_row: object, column: str):
|
|
|
+ """flatten_array Flatten Array
|
|
|
+
|
|
|
+ Flatten array of array for jobs and cores info
|
|
|
+
|
|
|
+ Args:
|
|
|
+ df_row (object): dataframe row
|
|
|
+ column (str): column name
|
|
|
+
|
|
|
+ """
|
|
|
+ jobs = []
|
|
|
+ cpus = []
|
|
|
+ job_id_array = df_row['jobs']
|
|
|
+ cpus_array = df_row['cpus']
|
|
|
+ try:
|
|
|
+ if job_id_array:
|
|
|
+ # Flatten array
|
|
|
+ fl_job_id_array = [item for sublist in job_id_array for item in sublist]
|
|
|
+ fl_cpus_array = [item for sublist in cpus_array for item in sublist]
|
|
|
+
|
|
|
+ # Only keep unique jobs
|
|
|
+ for i, job in enumerate(fl_job_id_array):
|
|
|
+ if job not in jobs:
|
|
|
+ jobs.append(str(job))
|
|
|
+ cpus.append(str(fl_cpus_array[i]))
|
|
|
+ except:
|
|
|
+ # print(f"{df_row['time']} - {df_row['nodeid']}")
|
|
|
+ pass
|
|
|
+ if column == 'jobs':
|
|
|
+ str_jobs = '{' + (', ').join(jobs) + '}'
|
|
|
+ return str_jobs
|
|
|
+ else:
|
|
|
+ str_cpus = '{' + (', ').join(cpus) + '}'
|
|
|
+ return str_cpus
|