123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373 |
- """
- MIT License
- Copyright (c) 2022 Texas Tech University
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"), to deal
- in the Software without restriction, including without limitation the rights
- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- copies of the Software, and to permit persons to whom the Software is
- furnished to do so, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in all
- copies or substantial portions of the Software.
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- SOFTWARE.
- """
- """
- This file is part of MonSter.
- Author:
- Jie Li, jie.li@ttu.edu
- """
- import utils
- import logger
- from pgcopy import CopyManager
- from datetime import datetime
- log = logger.get_logger(__name__)
- def generate_metric_table_sqls(table_schemas: dict,
- schema_name: str):
- """generate_metric_table_sqls General Metric Table Sqls
- Generate sqls for creating metric tables
- Args:
- table_schemas (dict): table schemas
- schema_name (str): schema name
- Returns:
- dict: sql statements
- """
- sql_statements = {}
- try:
- schema_sql = f"CREATE SCHEMA IF NOT EXISTS {schema_name};"
- sql_statements.update({
- 'schema_sql': schema_sql
- })
-
- tables_sql = []
- for table, column in table_schemas.items():
- column_names = column['column_names']
- column_types = column['column_types']
-
- column_str = ''
- for i, column in enumerate(column_names):
- column_str += f'{column} {column_types[i]}, '
- table_sql = f"CREATE TABLE IF NOT EXISTS {schema_name}.{table} \
- ({column_str}FOREIGN KEY (NodeID) REFERENCES nodes (NodeID));"
- tables_sql.append(table_sql)
- sql_statements.update({
- 'tables_sql': tables_sql,
- })
- except Exception as err:
- log.error(f'Cannot Genrerate Metric Table Sqls: {err}')
-
- return sql_statements
- def generate_slurm_job_table_sql(schema_name: str):
- """generate_slurm_job_table_sql Generate Slurm Job Table Sql
- Generate sqls for creating the table that stores the jobs info
- Args:
- schema_name (str): schema name
- Returns:
- dict: sql statements
- """
-
- sql_statements = {}
- table = 'jobs'
- try:
- schema_sql = f"CREATE SCHEMA if NOT EXISTS {schema_name}"
- sql_statements.update({
- 'schema_sql': schema_sql
- })
- tables_sql = []
- column_names = ['job_id', 'array_job_id', 'array_task_id', 'name',
- 'job_state', 'user_id', 'user_name', 'group_id',
- 'cluster', 'partition', 'command',
- 'current_working_directory', 'batch_flag', 'batch_host',
- 'nodes', 'node_count', 'cpus', 'tasks',
- 'tasks_per_node', 'cpus_per_task', 'memory_per_node',
- 'memory_per_cpu', 'priority', 'time_limit', 'deadline',
- 'submit_time', 'preempt_time', 'suspend_time',
- 'eligible_time', 'start_time', 'end_time',
- 'resize_time', 'restart_cnt', 'exit_code',
- 'derived_exit_code']
- column_types = ['INT PRIMARY KEY', 'INT', 'INT', 'TEXT', 'TEXT', 'INT',
- 'TEXT', 'INT', 'TEXT', 'TEXT', 'TEXT', 'TEXT',
- 'BOOLEAN', 'TEXT', 'TEXT[]', 'INT', 'INT', 'INT', 'INT',
- 'INT', 'INT', 'INT', 'INT', 'INT', 'INT', 'INT', 'INT',
- 'INT', 'INT', 'INT', 'INT', 'INT', 'INT', 'INT', 'INT']
- column_str = ''
- for i, column in enumerate(column_names):
- column_str += f'{column} {column_types[i]}, '
- table_sql = f"CREATE TABLE IF NOT EXISTS {schema_name}.{table} \
- ({column_str[:-2]});"
- tables_sql.append(table_sql)
- sql_statements.update({
- 'tables_sql': tables_sql,
- })
- except Exception as err:
- print(err)
- log.error(f'Cannot Genrerate Job Table Sqls: {err}')
-
- return sql_statements
- def generate_metric_def_table_sql():
- """generate_metrics_def_table_sql Generate Metrics Definition Table Sql
- Generate a sql for creating the metrics definition table
- Returns:
- str: sql string
- """
- metric_def_table_sql = "CREATE TABLE IF NOT EXISTS metrics_definition \
- (id SERIAL PRIMARY KEY, metric_id TEXT NOT NULL, metric_name TEXT, \
- description TEXT, metric_type TEXT, metric_data_type TEXT, \
- units TEXT, accuracy REAL, sensing_interval TEXT, \
- discrete_values TEXT[], data_type TEXT, UNIQUE (id));"
- return metric_def_table_sql
- def generate_metadata_table_sql(nodes_metadata: list, table_name: str):
- """generate_metadata_table_sql Generate Metadata Table Sql
- Generate a sql for creating the node metadata table
- Args:
- nodes_metadata (list): nodes metadata list
- table_name (str): table name
- Returns:
- str: sql string
- """
- column_names = list(nodes_metadata[0].keys())
- column_str = ""
- for i, column in enumerate(column_names):
- column_str += column + " TEXT, "
- column_str = column_str[:-2]
- metadata_table_sql = f" CREATE TABLE IF NOT EXISTS {table_name} \
- ( NodeID SERIAL PRIMARY KEY, {column_str}, UNIQUE (NodeID));"
- return metadata_table_sql
- def update_nodes_metadata(conn: object, nodes_metadata: list, table_name: str):
- """update_nodes_metadata Update Nodes Metadata
- Update nodes metadata table
- Args:
- conn (object): database connection
- nodes_metadata (list): nodes metadata list
- table_name (str): table name
- """
- cur = conn.cursor()
- for record in nodes_metadata:
- col_sql = ""
- os_ip_addr = record['Os_Ip_Addr']
- for col, value in record.items():
- if col != 'Os_Ip_Addr' and col != 'servicetag':
- col_value = col.lower() + " = '" + str(value) + "', "
- col_sql += col_value
- col_sql = col_sql[:-2]
- sql = "UPDATE " + table_name + " SET " + col_sql \
- + " WHERE os_ip_addr = '" + os_ip_addr + "';"
- cur.execute(sql)
-
- conn.commit()
- cur.close()
- def insert_nodes_metadata(conn: object, nodes_metadata: list, table_name: str):
- """insert_nodes_metadata Insert Nodes Metadata
- Insert nodes metadata to metadata table
- Args:
- conn (object): database connection
- nodes_metadata (list): nodes metadata list
- table_name (str): table name
- """
- cols = tuple([col.lower() for col in list(nodes_metadata[0].keys())])
- records = []
- for record in nodes_metadata:
- values = [str(value) for value in record.values()]
- records.append(tuple(values))
- mgr = CopyManager(conn, table_name, cols)
- mgr.copy(records)
- conn.commit()
- def check_table_exist(conn: object, table_name: str):
- """check_table_exist Check Table Exists
- Check if the specified table exists or not
- Args:
- conn (object): database connection
- table_name (str): table name
- Returns:
- bool: True if exists, false otherwise
- """
- cur = conn.cursor()
- table_exists = False
- sql = "SELECT EXISTS (SELECT FROM pg_tables WHERE tablename = '" + table_name + "');"
- cur.execute(sql)
- (table_exists, ) = cur.fetchall()[0]
- if table_exists:
- data_exists = False
- sql = "SELECT EXISTS (SELECT * from " + table_name + ");"
- cur.execute(sql)
- (data_exists, ) = cur.fetchall()[0]
- return data_exists
- return False
- def write_metric_definitions(conn: object, metric_definitions: list):
- """write_metric_definitions Write Metric Definitions
- Write metric definitions to the table
- Args:
- conn (object): database connection
- metric_definitions (list): the metric definitions
- """
- if not check_table_exist(conn, 'metrics_definition'):
- cols = ('metric_id', 'metric_name', 'description', 'metric_type',
- 'metric_data_type', 'units', 'accuracy', 'sensing_interval',
- 'discrete_values', 'data_type')
- metric_definitions_table = [(i['Id'], i['Name'], i['Description'],
- i['MetricType'], i['MetricDataType'], i['Units'], i['Accuracy'],
- i['SensingInterval'], i['DiscreteValues'],
- utils.data_type_mapping[i['MetricDataType']])for i in metric_definitions]
- # Sort
- metric_definitions_table = utils.sort_tuple_list(metric_definitions_table)
-
- mgr = CopyManager(conn, 'metrics_definition', cols)
- mgr.copy(metric_definitions_table)
-
- conn.commit()
- def write_nodes_metadata(conn: object, nodes_metadata: list):
- """write_nodes_metadata Write Nodes Metadata
- Write nodes metadata to the table
- Args:
- conn (object): database connection
- nodes_metadata (list): nodes metadata list
- """
- if not check_table_exist(conn, 'nodes'):
- insert_nodes_metadata(conn, nodes_metadata, 'nodes')
- else:
- update_nodes_metadata(conn, nodes_metadata, 'nodes')
- def generate_slurm_sql(metric: str,
- start: str,
- end: str,
- interval: str,
- aggregate: str):
- """generate_slurm_sql Generate Slurm Sql
- Generate sql for querying slurm metrics
- Args:
- metric (str): metric name
- start (str): start of time range
- end (str): end of time range
- interval (str): aggregation interval
- aggregate (str): aggregation function
- Returns:
- string: sql string
- """
- sql = ""
- if metric == 'node_jobs':
- sql = f"SELECT time_bucket_gapfill('{interval}', timestamp) AS time, \
- nodeid, jsonb_agg(jobs) AS jobs, jsonb_agg(cpus) AS cpus \
- FROM slurm.{metric} \
- WHERE timestamp >= '{start}' \
- AND timestamp <= '{end}' \
- GROUP BY time, nodeid \
- ORDER BY time;"
- else:
- sql = f"SELECT time_bucket_gapfill('{interval}', timestamp) AS time, \
- nodeid, {aggregate}(value) AS value\
- FROM slurm.{metric} \
- WHERE timestamp >= '{start}' \
- AND timestamp <= '{end}' \
- GROUP BY time, nodeid \
- ORDER BY time;"
- return sql
- def generate_slurm_jobs_sql(start: str,end: str):
- """generate_slurm_jobs_sql Generate Slurm Jobs Sql
- Generate Sql for querying slurm jobs info
- Args:
- start (str): start time
- end (str): end time
- Returns:
- string: sql string
- """
- utc_from = datetime.strptime(start, '%Y-%m-%dT%H:%M:%S.%fZ')
- epoch_from = int((utc_from - datetime(1970, 1, 1)).total_seconds())
- utc_to = datetime.strptime(end, '%Y-%m-%dT%H:%M:%S.%fZ')
- epoch_to = int((utc_to - datetime(1970, 1, 1)).total_seconds())
- sql = f"SELECT * FROM slurm.jobs \
- WHERE start_time < {epoch_to} \
- AND end_time > {epoch_from};"
- return sql
- def generate_node_jobs_sql(start: str, end: str, interval: str):
- """gene_node_jobs_sql Generate Node-Jobs Sql
- Generate SQL for querying node-jobs correlation
- Args:
- start (str): start time
- end (str): end time
- interval (str): interval for aggragation
- Returns:
- string: sql string
- """
- sql = f"SELECT time_bucket_gapfill('{interval}', timestamp) AS time, \
- nodeid, jsonb_agg(jobs) AS jobs, jsonb_agg(cpus) AS cpus \
- FROM slurm.node_jobs \
- WHERE timestamp >= '{start}' \
- AND timestamp <= '{end}' \
- GROUP BY time, nodeid \
- ORDER BY time;"
- return sql
|