Browse Source

Merge branch 'dellhpc:devel' into devel

Cassey Goveas 3 năm trước cách đây
mục cha
commit
e1c331a2b3

+ 540 - 0
telemetry/roles/slurm_telemetry/files/monster/api_utils.py

@@ -0,0 +1,540 @@
+"""
+MIT License
+Copyright (c) 2022 Texas Tech University
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+"""
+
+"""
+This file is part of MonSter.
+Author:
+    Jie Li, jie.li@ttu.edu
+"""
+
+import sql
+import pandas as pd
+import sqlalchemy as db
+
+def get_id_node_mapping(connection: str):
+    """get_id_node_mapping Get ID-Node Mapping
+
+    Get ID-Node Mapping
+
+    Args:
+        connection (str): connection string
+
+    """
+    engine = db.create_engine(connection)
+    connect = engine.connect()
+    mapping_sql = "SELECT nodeid, hostname FROM nodes;"
+    mapping_df = pd.read_sql_query(mapping_sql,con=connect)
+    mapping = pd.Series(mapping_df.hostname.values, index=mapping_df.nodeid).to_dict()
+    connect.close()
+    return mapping
+
+
+def get_metric_fqdd_mapping(connection: str):
+    """get_metric_fqdd_mapping Get Metric-FQDD Mapping
+
+    Get metric-fqdd mapping
+
+    Args:
+        connection (str): connection string
+    """
+    engine = db.create_engine(connection)
+    metadata = db.MetaData()
+    connect = engine.connect()
+
+    mapping = {}
+
+    metric_list = get_avail_metrics(connect, metadata, engine)
+
+    for metric in metric_list:
+        fqdd = get_avail_metric_fqdd(connect, metadata, engine, metric)
+        if fqdd:
+            mapping.update({
+                metric: fqdd
+            })
+    connect.close()
+
+    return mapping
+
+
+def get_avail_metrics(connect: object, 
+                      metadata: object, 
+                      engine: object):
+    """get_avail_metrics Get Available Metrics
+
+    Get available metrics from the table metrics_definition
+
+    Args:
+        connect (object): SqlAlchemy engine connect
+        metadata (object): SqlAlchemy metadata
+        engine (object): SqlAlchemy engine
+    """
+    result = []
+    metrics_definition = db.Table('metrics_definition',
+                                  metadata,
+                                  autoload=True,
+                                  autoload_with=engine)
+    query = db.select([metrics_definition])
+    result_proxy = connect.execute(query)
+    result = result_proxy.fetchall()
+    metric_list = [i[1] for i in result]
+    return metric_list
+
+
+def get_avail_metric_fqdd(connect: object, 
+                          metadata: object, 
+                          engine: object,
+                          metric: str):
+    """get_avail_metric_fqdd Get Available Metric FQDD
+
+    Get available fqdd of a metric based on the metrics collected in the last
+    half hour.
+
+    Args:
+        connect (object): SqlAlchemy engine connect
+        metadata (object): SqlAlchemy metadata
+        engine (object): SqlAlchemy engine
+        metric: metric name
+    """
+    fqdd = []
+    metric = metric.lower()
+    table = db.Table(metric,
+                     metadata,
+                     autoload=True,
+                     autoload_with=engine,
+                     schema = 'idrac')
+
+    # Find unique fqdd values
+    query = db.select([table.columns.fqdd.distinct()]).limit(50)
+    result_proxy = connect.execute(query)
+    result = result_proxy.fetchall()
+
+    if result:
+        fqdd = [i[0] for i in result if i[0]]
+
+    return fqdd
+
+
+def get_metric_fqdd_tree(metric_fqdd_mapping: dict):
+    """get_metric_fqdd_tree Get Metric-FQDD Tree
+
+    Get metric-fqdd tree for grafana
+
+    Args:
+        metric_fqdd_mapping (dict): metric-fqdd mapping
+    """
+    partition = 'idrac'
+    metric_fqdd_tree = {
+        'name': 'root',
+        'children': []
+    }
+
+    # iDRAC metrics
+    metric_fqdd_list = []
+    for metric, fqdds in metric_fqdd_mapping.items():
+        children = []
+        for fqdd in fqdds:
+            child = {
+                'name': fqdd, 'value': f'{partition} | {metric} | {fqdd}'
+            }
+            children.append(child)
+        metric_fqdd_list.append({
+            'name': metric, 'children': children
+        })
+    
+    child_dict = {
+        'name': partition,
+        'children': metric_fqdd_list
+    }
+    metric_fqdd_tree['children'].append(child_dict)
+
+    # Slurm metrics
+    slurm_child_dict = {
+        'name': 'slurm',
+        'children': [
+            {
+                'name': 'memoryusage',
+                'children': [
+                    {
+                        'name': 'Memory Usage', 
+                        'value': 'slurm | memoryusage | Memory Usage'
+                    }
+                ]
+            },
+            {
+                'name': 'memory_used',
+                'children': [
+                    {
+                        'name': 'Memory Used', 
+                        'value': 'slurm | memory_used | Memory Used'
+                    },
+                ]
+            },
+            {
+                'name': 'cpu_load',
+                'children': [
+                    {
+                        'name': 'CPU Load', 
+                        'value': 'slurm | cpu_load | CPU Load'
+                    }
+                ]
+            },
+        ]
+    }
+    metric_fqdd_tree['children'].append(slurm_child_dict)
+    
+    return metric_fqdd_tree
+
+
+def query_tsdb(request: object, id_node_mapping: dict, connection: str):
+    """query_tsdb Query TSDB
+
+    Query TSDB based on the flask request.
+
+    Args:
+        request (object): flask request
+        id_node_mapping (dict): Node-ID mapping
+        connection (str): tsdb connection
+    """
+    # Initialize sqlalchemy connection
+    engine = db.create_engine(connection)
+    connect = engine.connect()
+
+    results = []
+    req = request.get_json(silent=True)
+
+    # Request details
+    time_range = req.get('range')
+    interval = req.get('interval')
+    targets = req.get('targets')
+
+    # Extract time range (from, to), metrics
+    start = time_range.get('from')
+    end = time_range.get('to')
+
+    # TO DO: add aggregation function in the targets
+
+    for target in targets:
+        req_metric = target.get('metric', '')
+        req_type = target.get('type', '')
+        nodes = target.get('nodes', '')
+        if req_metric and req_type == 'metrics' and len(req_metric.split(' | ')) == 3:
+            partition = req_metric.split(' | ')[0]
+            metric = req_metric.split(' | ')[1]
+            fqdd = req_metric.split(' | ')[2]
+            metrics = query_filter_metrics(engine,
+                                           metric, 
+                                           fqdd,
+                                           nodes,
+                                           id_node_mapping, 
+                                           start,
+                                           end,
+                                           interval,
+                                           partition)
+            results.append(metrics)
+
+        if req_type == 'jobs':
+            users = target.get('users', '')
+            if not users:
+                users = get_users(engine, start, end)
+            jobs = query_filter_jobs(engine, users, start, end, id_node_mapping)
+            results.append(jobs)
+
+        if req_type == 'node_core':
+            node_core = query_node_core(engine, 
+                                        start, 
+                                        end, 
+                                        interval, 
+                                        id_node_mapping)
+            results.append(node_core)
+
+    connect.close()
+    return results
+
+
+def query_filter_metrics(engine: object,
+                         metric: str,
+                         fqdd: str,
+                         nodes: list,
+                         id_node_mapping: dict,
+                         start: str,
+                         end: str,
+                         interval: str,
+                         partition: str,
+                         aggregate: str = 'max'):
+    """query_filter_metrics Query Filter Metrics
+
+    Query and filter metrics from TSDB
+
+    Args:
+        engine (object): sqlalchemy engine
+        metric (str): metric name
+        fqdd (str): fqdd name
+        nodes (list): target nodes
+        id_node_mapping (dict): id-node mapping
+        start (str): start of time range
+        end (str): end of time range
+        interval (str): aggregation interval
+        partition (str): partition name. 
+        aggregate (str, optional): aggregation function. Defaults to 'max'.
+    
+    """
+    if partition == 'slurm':
+        sql_str = sql.generate_slurm_sql(metric, 
+                                         start, 
+                                         end, 
+                                         interval, 
+                                         aggregate)
+    else:
+        sql_str = sql.generate_idrac_sql(metric,
+                                         fqdd,
+                                         start, 
+                                         end, 
+                                         interval, 
+                                         aggregate)
+    
+    df = pd.read_sql_query(sql_str, con=engine)
+
+    # Filter nodes
+    if nodes:
+        fi_df = df[df['nodeid'].isin(nodes)].copy()
+    else:
+        fi_df = df
+
+    # Add label in slurm metrics
+    if partition == 'slurm':
+        fi_df['label'] = metric
+
+    # Convert node id to node name
+    fi_df['nodeid'] = fi_df['nodeid'].apply(lambda x: id_node_mapping[x])
+    fi_df['label'] = fi_df['label'].apply(lambda x: f'{metric}|{x}')
+
+    # Pivot the table
+    df_p = fi_df.pivot(index='time', columns=['nodeid', 'label'], values='value')
+    
+    # Flatten the table
+    df_p.columns = [ '|'.join([str(c) for c in c_list]) for c_list in df_p.columns.values ]
+    
+    metrics = metrics_df_to_response(df_p)
+
+    return metrics
+
+
+def metrics_df_to_response(df: object):
+    """metrics_df_to_response Metrics DF to Response
+
+    Convert dataframe to response in format required by Grafana
+
+    Args:
+        df (dataframe): dataframe
+
+    """
+    # Convert index to column
+    df.reset_index(inplace=True)
+
+    # Convert datetime to epoch time
+    df['time'] = df['time'].apply(lambda x: int(x.timestamp() * 1000))
+
+    columns = [{'text': 'time', 'type': 'time'}]
+    columns_raw = list(df.columns)
+    for column in columns_raw[1:]:
+        c_text = f"{column.split('|')[1]}-{column.split('|')[2]}"
+        c_type = 'number'
+        c_label = f"| {column.split('|')[0]}"
+        column_info = {'text': c_text, 'type': c_type, 'label': c_label}
+        columns.append(column_info)
+    
+    rows = df.values.tolist()
+    response = {'columns': columns, 'rows': rows}
+    return response
+
+
+def get_users(engine: object, start: str, end: str):
+    """get_users Get Users
+
+    Get users from the slurm jobs info
+
+    Args:
+        engine (object): sqlalchemy engine
+        start (str): start time
+        end (str): end time
+
+    """    
+    sql_str = sql.generate_slurm_jobs_sql(start, end)
+    df = pd.read_sql_query(sql_str,con=engine)
+    users = df['user_name'].unique()
+    
+    # Remove duplicated users
+    all_users = list(set(users))
+    return all_users
+
+
+def query_filter_jobs(engine: object,
+                      users: list,
+                      start: str,
+                      end: str,
+                      id_node_mapping: dict):
+    """query_filter_jobs Query Filter jobs
+
+    Query and filter jobs from TSDB
+
+    Args:
+        engine (object): sqlalchemy engine
+        users (list): selected users
+        start (str): start of time range
+        end (str): end of time range
+        id_node_mapping (dict): id-node mapping
+
+    """
+    sql_str = sql.generate_slurm_jobs_sql(start, end)
+    df = pd.read_sql_query(sql_str,con=engine)
+
+    # Node hostname
+    nodes = list(id_node_mapping.values())
+
+    # Filter jobs based on users and the batch_host that falls in the nodes and 
+    fi_df = df[df['batch_host'].isin(nodes) & df['user_name'].isin(users)].copy()
+
+    jobs = jobs_df_to_response(fi_df)
+    return jobs
+
+
+def jobs_df_to_response(df: object):
+    """jobs_df_to_response Jobs DF to Response
+
+    Convert dataframe to response in format required by Grafana
+
+    Args:
+        df (dataframe): dataframe
+
+    Returns:
+        response (dict): response
+    """
+    columns = []
+    selected_columns = ['job_id', 'name', 'user_id', 'user_name', 'batch_host', 
+                        'nodes', 'node_count', 'cpus', 'start_time', 'end_time']
+    selected_df = df[selected_columns].copy()
+
+    selected_df['nodes'] = selected_df['nodes'].apply(lambda x: '{' + ', '.join(x) + '}')
+    
+    columns_raw = list(selected_df.columns)
+    for column in columns_raw:
+        if column in ['job_id', 'user_id', 'cpus', 'start_time', 'end_time']:
+            c_type = 'number'
+        else:
+            c_type = 'string'
+
+        column_info = {'text': column, 'type': c_type}
+        columns.append(column_info)
+    
+    rows = selected_df.values.tolist()
+    response = {'columns': columns, 'rows': rows}
+    return response
+
+
+def query_node_core(engine: object,
+                    start: str,
+                    end: str,
+                    interval: str,
+                    id_node_mapping: dict):
+    """query_node_core Query Node Core
+
+    Query Node-Core info from TSDB
+
+    Args:
+        engine (object): sqlalchemy engine
+        start (str): start of time range
+        end (str): end of time range
+        interval (str): time interval for aggregation
+
+    """
+    sql_str = sql.generate_node_jobs_sql(start, end, interval)
+    df = pd.read_sql_query(sql_str,con=engine)
+    node_jobs = node_jobs_df_to_response(df, id_node_mapping)
+    return node_jobs
+
+
+def node_jobs_df_to_response(df: object, id_node_mapping: dict):
+    """node_jobs_df_to_response Node-Jobs DF to Response
+
+    Convert dataframe to response in format required by Grafana
+
+    Args:
+        df (dataframe): dataframe
+
+    Returns:
+        response (dict): response
+    """
+
+    df['time'] = df['time'].apply(lambda x: int(x.timestamp() * 1000))
+
+    df['nodeid'] = df['nodeid'].apply(lambda x: id_node_mapping[x])
+    
+    df['fl_jobs'] = df.apply(
+        lambda df_row: flatten_array(df_row, 'jobs'), axis = 1)
+    df['fl_cpus'] = df.apply(
+        lambda df_row: flatten_array(df_row, 'cpus'), axis = 1)
+    df.drop(columns = ['jobs', 'cpus'], inplace = True)
+    df.rename(columns={'fl_jobs': 'jobs', 'fl_cpus': 'cpus'}, inplace = True)
+    
+    columns = [{'text': 'time', 'type': 'time'}]
+    columns_raw = list(df.columns)
+    for column in columns_raw[1:]:
+        column_info = {'text': column, 'type': 'string'}
+        columns.append(column_info)
+    
+    rows = df.values.tolist()
+    response = {'columns': columns, 'rows': rows}
+    return response
+
+
+def flatten_array(df_row: object, column: str):
+    """flatten_array Flatten Array
+
+    Flatten array of array for jobs and cores info
+
+    Args:
+        df_row (object): dataframe row
+        column (str): column name
+
+    """
+    jobs = []
+    cpus = []
+    job_id_array = df_row['jobs']
+    cpus_array = df_row['cpus']
+    try:
+        if job_id_array:
+            # Flatten array
+            fl_job_id_array = [item for sublist in job_id_array for item in sublist]
+            fl_cpus_array = [item for sublist in cpus_array for item in sublist]
+
+            # Only keep unique jobs
+            for i, job in enumerate(fl_job_id_array):
+                if job not in jobs:
+                    jobs.append(str(job))
+                    cpus.append(str(fl_cpus_array[i]))
+    except:
+        # print(f"{df_row['time']} - {df_row['nodeid']}")
+        pass
+    if column == 'jobs':
+        str_jobs = '{' + (', ').join(jobs) + '}'
+        return str_jobs
+    else:
+        str_cpus = '{' + (', ').join(cpus) + '}'
+        return str_cpus

+ 1 - 56
telemetry/roles/slurm_telemetry/files/monster/dump.py

@@ -1,18 +1,14 @@
 """
 MIT License
-
 Copyright (c) 2022 Texas Tech University
-
 Permission is hereby granted, free of charge, to any person obtaining a copy
 of this software and associated documentation files (the "Software"), to deal
 in the Software without restriction, including without limitation the rights
 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 copies of the Software, and to permit persons to whom the Software is
 furnished to do so, subject to the following conditions:
-
 The above copyright notice and this permission notice shall be included in all
 copies or substantial portions of the Software.
-
 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
@@ -24,7 +20,6 @@ SOFTWARE.
 
 """
 This file is part of MonSter.
-
 Author:
     Jie Li, jie.li@ttu.edu
 """
@@ -151,54 +146,4 @@ def dump_node_metrics(timestamp: object,
         curs = conn.cursor()
         curs.execute("ROLLBACK")
         conn.commit()
-        log.error(f"Fail to dump node metrics : {err}")
-
-
-def dump_idrac(ip: str, 
-               idrac_metrics: dict,
-               metric_dtype_mapping: dict, 
-               ip_id_mapping: dict,
-               conn: object, ):
-    """dump_idrac Dump iDRAC Metrics
-
-    Dump node metrics to TimeScaleDB
-
-    Args:
-        ip (str): ip address of iDRAC
-        idrac_metrics (dict): iDRAC Metrics
-        metric_dtype_mapping (dict): Metric-Datatype mapping
-        ip_id_mapping (dict): ip-id mapping
-        conn (object): TimeScaleDB connection object
-    """
-    try:
-        schema_name = 'idrac'
-        nodeid = ip_id_mapping[ip]
-
-        for table_name, table_metrics in idrac_metrics.items():
-            all_records = []
-            dtype = metric_dtype_mapping[table_name]
-
-            table_name = table_name.lower()
-            target_table = f"{schema_name}.{table_name}"
-
-            cols = ('timestamp', 'nodeid', 'source', 'fqdd', 'value')
-            for metric in table_metrics:
-                # We have to offset timestamp by -6/-5 hours. For some unknow
-                # reasons, the timestamp reported in iDRAC is not configured
-                # correctly.
-                timestamp = parse_time(metric['Timestamp'])
-                timestamp = timestamp.astimezone(tz.tzlocal())
-                timestamp = timestamp.replace(tzinfo=tz.tzutc())
-                timestamp = timestamp.astimezone(tz.tzlocal())
-
-                source = metric['Source']
-                fqdd = metric['FQDD']
-                if metric['Value']:
-                    value = utils.cast_value_type(metric['Value'], dtype)
-                    all_records.append((timestamp, nodeid, source, fqdd, value))
-
-            mgr = CopyManager(conn, target_table, cols)
-            mgr.copy(all_records)
-        conn.commit()
-    except Exception as err:
-        log.error(f"Fail to dump idrac metrics ({ip}): {err}")
+        log.error(f"Fail to dump node metrics : {err}")

+ 9 - 20
telemetry/roles/slurm_telemetry/files/monster/logger.py

@@ -1,18 +1,14 @@
 """
 MIT License
-
 Copyright (c) 2022 Texas Tech University
-
 Permission is hereby granted, free of charge, to any person obtaining a copy
 of this software and associated documentation files (the "Software"), to deal
 in the Software without restriction, including without limitation the rights
 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 copies of the Software, and to permit persons to whom the Software is
 furnished to do so, subject to the following conditions:
-
 The above copyright notice and this permission notice shall be included in all
 copies or substantial portions of the Software.
-
 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
@@ -24,34 +20,27 @@ SOFTWARE.
 
 """
 This file is part of MonSter.
-
 Author:
     Jie Li, jie.li@ttu.edu
 """
 
 from pathlib import Path
-from logging.handlers import TimedRotatingFileHandler
 import logging
 
 
 def setup_logger(file_name):
     monster_path = Path(__file__).resolve().parent.parent
     log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
-    
+    logging.basicConfig(filename=f'{monster_path}/log/monster.log',
+                        format=log_format,
+                        level=logging.ERROR,
+                        filemode='w')
     logger = logging.getLogger(file_name)
-    formatter = logging.Formatter(log_format)
-
-    log_handler = TimedRotatingFileHandler(filename=f'{monster_path}/log/monster.log', when="midnight", interval=1, backupCount=7)
-    log_handler.setLevel(logging.ERROR)
-    log_handler.setFormatter(formatter)
-
-    if not logger.handlers:
-        stream_handler = logging.StreamHandler()
-        stream_handler.setLevel(logging.ERROR)
-        stream_handler.setFormatter(formatter)
-
-        logger.addHandler(stream_handler)
-        logger.addHandler(log_handler)
+    
+    console = logging.StreamHandler()
+    console.setLevel(logging.DEBUG)
+    console.setFormatter(logging.Formatter(log_format))
+    logging.getLogger(file_name).addHandler(console)
 
     return logger
 

+ 3 - 8
telemetry/roles/slurm_telemetry/files/monster/mslurm.py

@@ -1,18 +1,14 @@
 """
 MIT License
-
 Copyright (c) 2022 Texas Tech University
-
 Permission is hereby granted, free of charge, to any person obtaining a copy
 of this software and associated documentation files (the "Software"), to deal
 in the Software without restriction, including without limitation the rights
 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 copies of the Software, and to permit persons to whom the Software is
 furnished to do so, subject to the following conditions:
-
 The above copyright notice and this permission notice shall be included in all
 copies or substantial portions of the Software.
-
 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
@@ -24,7 +20,6 @@ SOFTWARE.
 
 """
 This file is part of MonSter.
-
 Author:
     Jie Li, jie.li@ttu.edu
 """
@@ -51,7 +46,7 @@ def monitor_slurm():
     """
     connection = utils.init_tsdb_connection()
     node_id_mapping = utils.get_node_id_mapping(connection)
-    os_idrac_hostname_mapping = utils.get_os_idrac_hostname_mapping()
+    os_idrac_hostname_mapping = utils.get_os_idrac_hostname_mapping()    
     slurm_config = utils.get_config('slurm_rest_api')
     
     #Schedule fetch slurm
@@ -89,11 +84,11 @@ def fetch_slurm(slurm_config: dict,
 
     # Get nodes data
     nodes_url = slurm.get_slurm_url(slurm_config, 'nodes')
-    nodes_data = slurm.call_slurm_api(slurm_config, token, nodes_url)
+    nodes_data = slurm.call_slurm_api(slurm_config, token, nodes_url)    
 
     # Get jobs data
     jobs_url = slurm.get_slurm_url(slurm_config, 'jobs')
-    jobs_data = slurm.call_slurm_api(slurm_config, token, jobs_url)
+    jobs_data = slurm.call_slurm_api(slurm_config, token, jobs_url)    
 
     # Process slurm data
     if nodes_data and jobs_data:

+ 1 - 6
telemetry/roles/slurm_telemetry/files/monster/parse.py

@@ -1,18 +1,14 @@
 """
 MIT License
-
 Copyright (c) 2022 Texas Tech University
-
 Permission is hereby granted, free of charge, to any person obtaining a copy
 of this software and associated documentation files (the "Software"), to deal
 in the Software without restriction, including without limitation the rights
 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 copies of the Software, and to permit persons to whom the Software is
 furnished to do so, subject to the following conditions:
-
 The above copyright notice and this permission notice shall be included in all
 copies or substantial portions of the Software.
-
 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
@@ -24,7 +20,6 @@ SOFTWARE.
 
 """
 This file is part of MonSter.
-
 Author:
     Jie Li, jie.li@ttu.edu
 """
@@ -121,7 +116,7 @@ def parse_node_metrics(nodes_data: dict,
             try:
                 hostname = os_idrac_hostname_mapping[hostname]
             except Exception as err:
-                log.error(f"Cannot mapping OS-iDRAC hostname: {err}")
+                log.error(f"Cannot map OS-hostname and IP: {err}")
 
         # Only process those nodes that are in node_id_mapping dict. 
         if hostname in node_id_mapping:

+ 0 - 329
telemetry/roles/slurm_telemetry/files/monster/process.py

@@ -1,18 +1,14 @@
 """
 MIT License
-
 Copyright (c) 2022 Texas Tech University
-
 Permission is hereby granted, free of charge, to any person obtaining a copy
 of this software and associated documentation files (the "Software"), to deal
 in the Software without restriction, including without limitation the rights
 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 copies of the Software, and to permit persons to whom the Software is
 furnished to do so, subject to the following conditions:
-
 The above copyright notice and this permission notice shall be included in all
 copies or substantial portions of the Software.
-
 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
@@ -24,7 +20,6 @@ SOFTWARE.
 
 """
 This file is part of MonSter.
-
 Author:
     Jie Li, jie.li@ttu.edu
 """
@@ -65,327 +60,3 @@ def partition(arr:list, cores: int):
     except Exception as err:
         log.error(f"Cannot Partition the list: {err}")
     return groups
-
-
-def fetch(urls: list, nodes: list, username: str, password:str):
-    """fetch Fetch Data From Urls
-
-    Fetch Data From Urls of the specified nodes
-
-    Args:
-        urls (list): idrac urls
-        nodes (list): a list of ip addresses of idracs
-        username (str): idrac username
-        password (str): idrac password
-
-    Returns:
-        [type]: [description]
-    """
-    idrac_metrics = []
-    try:
-        asyn_requests = AsyncioRequests(auth = (username, password),
-                                        timeout = (15, 45),
-                                        max_retries = 3)
-        idrac_metrics = asyn_requests.bulk_fetch(urls, nodes)
-    except Exception as err:
-        log.error(f"Cannot fetch data from idrac urls: {err}")
-    return idrac_metrics
-
-
-def parallel_fetch(urllist: list, 
-                   nodelist: list, 
-                   cores: int,
-                   username: str, 
-                   password:str):
-    """parallel_fetch Parallel Fetch Data from Urls
-
-    Parallel fetch data from rrls of the specified nodes
-
-    Args:
-        urllist (list): idrac urls
-        nodelist (list): a list of ip addresses of idracs
-        cores (int): Number of cores of the compute running MonSter
-        username (str): idrac username
-        password (str): idrac password
-
-    Returns:
-        list: idrac metrics in a list
-    """
-    
-    flatten_metrics = []
-    try:
-        # Partition
-        urls_group = partition(urllist, cores)
-        nodes_group = partition(nodelist, cores)
-
-        fetch_args = []
-        for i in range(cores):
-            urls = urls_group[i]
-            nodes = nodes_group[i]
-            fetch_args.append((urls, nodes, username, password))
-
-        with multiprocessing.Pool() as pool:
-            metrics = pool.starmap(fetch, fetch_args)
-
-        flatten_metrics = [item for sublist in metrics for item in sublist]
-    except Exception as err:
-        log.error(f"Cannot parallel fetch data from idrac urls: {err}")
-
-    return flatten_metrics
-
-
-def extract(system_info: dict, bmc_info: dict):
-    """extract Extract Info
-
-    Extract system and bmc info
-
-    Args:
-        system_info (dict): System info
-        bmc_info (dict): BMC info
-
-    Returns:
-        dict: extracted info
-    """
-    
-    bmc_ip_addr = system_info["node"]
-    system_metrics = system_info["metrics"]
-    bmc_metrics = bmc_info["metrics"]
-    
-    general = ["UUID", "SerialNumber", "HostName", "Model", "Manufacturer"]
-    processor = ["ProcessorModel", "ProcessorCount", "LogicalProcessorCount"]
-    memory = ["TotalSystemMemoryGiB"]
-    bmc = ["BmcModel", "BmcFirmwareVersion"]
-    metrics = {}
-    try:
-        # Update service tag
-        if system_metrics:
-            service_tag = system_metrics.get("SKU", None)
-        else:
-            service_tag = None
-
-        metrics.update({
-            "ServiceTag": service_tag
-        })
-
-        # Update System metrics
-        if system_metrics:
-            for metric in general:
-                metrics.update({
-                    metric: system_metrics.get(metric, None)
-                })
-            for metric in processor:
-                if metric.startswith("Processor"):
-                    metrics.update({
-                        metric: system_metrics.get("ProcessorSummary", {}).get(metric[9:], None)
-                    })
-                else:
-                    metrics.update({
-                        metric: system_metrics.get("ProcessorSummary", {}).get(metric, None)
-                    })
-            for metric in memory:
-                metrics.update({
-                    metric: system_metrics.get("MemorySummary", {}).get("TotalSystemMemoryGiB", None)
-                })
-        else:
-            for metric in general + processor + memory:
-                metrics.update({
-                    metric: None
-                })
-
-        metrics.update({
-            "Bmc_Ip_Addr": bmc_ip_addr
-        })
-
-        # Update BMC metrics
-        if bmc_metrics:
-            for metric in bmc:
-                metrics.update({
-                    metric: bmc_metrics.get(metric[3:], None)
-                })
-        else:
-            for metric in bmc:
-                metrics.update({
-                    metric: None
-                })
-        
-        # Update Status
-        if  (not system_metrics and 
-             not bmc_metrics):
-            metrics.update({
-                "Status": "BMC unreachable in this query"
-            })
-        else:
-            metrics.update({
-                "Status": system_metrics.get("Status", {}).get("Health", None)
-            })
-            
-        return metrics
-    except Exception as err:
-        log.error(f"Cannot extract info from system and bmc: {err}")
-
-
-def parallel_extract(system_info_list: list, 
-                     bmc_info_list: list):
-    """parallel_extract Parallel Extract Info
-
-    Parallel extract system and bmc info
-
-    Args:
-        system_info_list (list): a list of system info
-        bmc_info_list (list): a list of bmc info
-
-    Returns:
-        list: a list of extracted info
-    """
-    
-    info = []
-    try:
-        process_args = zip(system_info_list, 
-                           bmc_info_list)
-        with multiprocessing.Pool() as pool:
-            info = pool.starmap(extract, process_args)
-    except Exception as err:
-        log.error(f"Cannot parallel extract info from system and bmc: {err}")
-    return info
-
-"""
-    Process data in the MetricValues, generate raw records
-    """
-
-
-def process_idrac(ip: str, report: str, metrics: list):
-    """process_idrac Process iDRAC Metrics
-
-    Process iDRAC metircs in the MetricValues and generate records
-
-    Args:
-        ip (str): iDRAC ip address
-        report (str): report name
-        metrics (list): a list of metric names
-
-    Returns:
-        dict: processed idrac metrics grouped by table name
-    """
-    idrac_metrics = {}
-    try:
-        if report == "PowerStatistics":
-            # PowerStatistics is better to be pulled
-            pass
-        else:
-            for metric in metrics:
-                table_name = ''
-                timestamp = ''
-                source = ''
-                fqdd = ''
-                value = ''
-
-                try:
-                    table_name = metric['MetricId']
-                    timestamp = metric['Timestamp']
-                    source = metric['Oem']['Dell']['Source']
-                    fqdd = metric['Oem']['Dell']['FQDD']
-                    value = metric['MetricValue']
-
-                    # print(f"Time Stamp: {timestamp}")
-
-                except:
-                    pass
-
-                if table_name and timestamp and source and fqdd and value:
-                    record = {
-                        'Timestamp': timestamp,
-                        'Source': source,
-                        'FQDD': fqdd,
-                        'Value': value
-                    }
-
-                    if table_name not in idrac_metrics:
-                        idrac_metrics.update({
-                            table_name: [record]
-                        })
-                    else:
-                        idrac_metrics[table_name].append(record)
-    
-    except Exception as err:
-            log.error(f"Fail to process idrac metrics: {err}")
-    
-    return idrac_metrics
-
-
-class AsyncioRequests:
-    import aiohttp
-    import asyncio
-    from aiohttp import ClientSession
-
-
-    def __init__(self, verify_ssl: bool = False, auth: tuple = (), 
-                 timeout: tuple = (15, 45), max_retries: int = 3):
-        self.metrics = {}
-        self.timestamp = int(time.time() * 1000000000)
-        self.retry = 0
-        self.connector=self.aiohttp.TCPConnector(verify_ssl=verify_ssl)
-        if auth:
-            self.auth = self.aiohttp.BasicAuth(*auth)
-        else:
-            self.auth = None
-        self.timeout = self.aiohttp.ClientTimeout(*timeout)
-        self.max_retries = max_retries
-        self.loop = self.asyncio.get_event_loop()
-        
-    
-    async def __fetch_json(self, 
-                           url: str, 
-                           node: str, 
-                           session: ClientSession):
-        """__fetch_json Fetch Url
-
-        Get request wrapper to fetch json data from API
-
-        Args:
-            url (str): url of idrac
-            node (str): ip address of the idrac
-            session (ClientSession): Client Session
-
-        Returns:
-            dict: The return of url in json format
-        """
-        
-        try:
-            resp = await session.request(method='GET', url=url)
-            resp.raise_for_status()
-            json = await resp.json()
-            return {"node": node, 
-                    "metrics": json, 
-                    "timestamp": self.timestamp}
-        except (TimeoutError):
-            self.retry += 1
-            if self.retry >= self.max_retries:
-                log.error(f"Cannot fetch data from {node} : {url}")
-                return {"node": node, 
-                        "metrics": {}, 
-                        "timestamp": self.timestamp}
-            return await self.__fetch_json(url, node, session)
-        except Exception as err:
-            log.error(f"Cannot fetch data from {url} : {err}")
-            return {"node": node, 
-                    "metrics": {}, 
-                    "timestamp": self.timestamp}
-
-
-    async def __requests(self, urls: list, nodes: list):
-        async with self.ClientSession(connector=self.connector, 
-                                      auth = self.auth, 
-                                      timeout = self.timeout) as session:
-            tasks = []
-            for i, url in enumerate(urls):
-                tasks.append(self.__fetch_json(url=url, 
-                                               node=nodes[i], 
-                                               session=session))
-            return await self.asyncio.gather(*tasks)
-
-
-    def bulk_fetch(self, urls: list, nodes: list):
-        self.metrics =  self.loop.run_until_complete(self.__requests(urls, nodes))
-        self.loop.close()
-        return self.metrics
-    

+ 0 - 5
telemetry/roles/slurm_telemetry/files/monster/rparser.py

@@ -1,18 +1,14 @@
 """
 MIT License
-
 Copyright (c) 2022 Texas Tech University
-
 Permission is hereby granted, free of charge, to any person obtaining a copy
 of this software and associated documentation files (the "Software"), to deal
 in the Software without restriction, including without limitation the rights
 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 copies of the Software, and to permit persons to whom the Software is
 furnished to do so, subject to the following conditions:
-
 The above copyright notice and this permission notice shall be included in all
 copies or substantial portions of the Software.
-
 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
@@ -24,7 +20,6 @@ SOFTWARE.
 
 """
 This file is part of MonSter.
-
 Author:
     Jie Li, jie.li@ttu.edu
 """

+ 0 - 45
telemetry/roles/slurm_telemetry/files/monster/schema.py

@@ -1,18 +1,14 @@
 """
 MIT License
-
 Copyright (c) 2022 Texas Tech University
-
 Permission is hereby granted, free of charge, to any person obtaining a copy
 of this software and associated documentation files (the "Software"), to deal
 in the Software without restriction, including without limitation the rights
 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 copies of the Software, and to permit persons to whom the Software is
 furnished to do so, subject to the following conditions:
-
 The above copyright notice and this permission notice shall be included in all
 copies or substantial portions of the Software.
-
 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
@@ -24,7 +20,6 @@ SOFTWARE.
 
 """
 This file is part of MonSter.
-
 Author:
     Jie Li, jie.li@ttu.edu
 """
@@ -35,46 +30,6 @@ import logger
 log = logger.get_logger(__name__)
 
 
-def build_idrac_table_schemas(metric_definitions: list):
-    """build_table_schemas Build iDRAC Table Schemas
-
-    Build table schemas based on the idrac telemetry metric definitions
-
-    Args:
-        metric_definitions (list): idrac telemetry metric definitions
-    
-    Returns:
-        dict: iDRAC table schemas
-    """
-    table_schemas = {}
-
-    try:
-        for metric in metric_definitions:
-            table_name = metric['Id']
-            metric_type = metric['MetricDataType']
-            metric_unit = metric.get('Units', None)
-
-            # For network metrics, use BIG INT for storing the metric readings
-            if metric_unit == 'By' or metric_unit == 'Pkt':
-                value_type = 'BIGINT'
-            else:
-                value_type = utils.data_type_mapping.get(metric_type, 'TEXT')
-            
-            column_names = ['Timestamp', 'NodeID', 'Source', 'FQDD', 'Value']
-            column_types = ['TIMESTAMPTZ NOT NULL', 'INT NOT NULL', 'TEXT', \
-                            'TEXT', value_type]
-            
-            table_schemas.update({
-                table_name: {
-                    'column_names': column_names,
-                    'column_types': column_types,
-                }
-            })
-    except Exception as err:
-        log.error(f"Cannot build idrac table schemas: {err}")
-    return table_schemas
-
-
 def build_slurm_table_schemas():
     """build_slurm_table_schemas Build Slurm Table Schemas
 

+ 1 - 6
telemetry/roles/slurm_telemetry/files/monster/slurm.py

@@ -1,18 +1,14 @@
 """
 MIT License
-
 Copyright (c) 2022 Texas Tech University
-
 Permission is hereby granted, free of charge, to any person obtaining a copy
 of this software and associated documentation files (the "Software"), to deal
 in the Software without restriction, including without limitation the rights
 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 copies of the Software, and to permit persons to whom the Software is
 furnished to do so, subject to the following conditions:
-
 The above copyright notice and this permission notice shall be included in all
 copies or substantial portions of the Software.
-
 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
@@ -24,7 +20,6 @@ SOFTWARE.
 
 """
 This file is part of MonSter.
-
 Author:
     Jie Li, jie.li@ttu.edu
 """
@@ -152,4 +147,4 @@ def get_slurm_url(slurm_config: dict, url_type: str):
     else:
         url = f"{base_url}{slurm_config['slurm_jobs']}"
     
-    return url
+    return url

+ 4 - 42
telemetry/roles/slurm_telemetry/files/monster/sql.py

@@ -1,18 +1,14 @@
 """
 MIT License
-
 Copyright (c) 2022 Texas Tech University
-
 Permission is hereby granted, free of charge, to any person obtaining a copy
 of this software and associated documentation files (the "Software"), to deal
 in the Software without restriction, including without limitation the rights
 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 copies of the Software, and to permit persons to whom the Software is
 furnished to do so, subject to the following conditions:
-
 The above copyright notice and this permission notice shall be included in all
 copies or substantial portions of the Software.
-
 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
@@ -24,7 +20,6 @@ SOFTWARE.
 
 """
 This file is part of MonSter.
-
 Author:
     Jie Li, jie.li@ttu.edu
 """
@@ -181,18 +176,18 @@ def update_nodes_metadata(conn: object, nodes_metadata: list, table_name: str):
         conn (object): database connection
         nodes_metadata (list): nodes metadata list
         table_name (str): table name
-    """
+    """    
     cur = conn.cursor()
     for record in nodes_metadata:
         col_sql = ""
-        bmc_ip_addr = record['Bmc_Ip_Addr']
+        os_ip_addr = record['Os_Ip_Addr']
         for col, value in record.items():
-            if col != 'Bmc_Ip_Addr' and col != 'HostName':
+            if col != 'Os_Ip_Addr' and col != 'servicetag':
                 col_value = col.lower() + " = '" + str(value) + "', "
                 col_sql += col_value
         col_sql = col_sql[:-2]
         sql =  "UPDATE " + table_name + " SET " + col_sql \
-            + " WHERE bmc_ip_addr = '" + bmc_ip_addr + "';"
+            + " WHERE os_ip_addr = '" + os_ip_addr + "';"
         cur.execute(sql)
     
     conn.commit()
@@ -329,39 +324,6 @@ def generate_slurm_sql(metric: str,
     return sql
 
 
-def generate_idrac_sql(metric: str, 
-                       fqdd: str,
-                       start: str, 
-                       end: str, 
-                       interval: str, 
-                       aggregate: str):
-    """generate_idrac_sql Generate iDRAC Sql
-
-    Generate sql for querying idrac metrics
-
-    Args:
-        metric (str): metric name
-        fqdd (str): Fully Qualified Device Descriptor
-        start (str): start of time range
-        end (str): end of time range
-        interval (str): aggregation interval
-        aggregate (str): aggregation function
-    
-    Returns:
-        string: sql string
-    """
-    schema = 'idrac'
-    sql = f"SELECT time_bucket_gapfill('{interval}', timestamp) AS time, \
-        nodeid, fqdd AS label, {aggregate}(value) AS value \
-        FROM {schema}.{metric} \
-        WHERE timestamp >= '{start}' \
-        AND timestamp < '{end}' \
-        AND fqdd = '{fqdd}' \
-        GROUP BY time, nodeid, label \
-        ORDER BY time;"
-    return sql
-
-
 def generate_slurm_jobs_sql(start: str,end: str):
     """generate_slurm_jobs_sql Generate Slurm Jobs Sql
 

+ 5 - 29
telemetry/roles/slurm_telemetry/files/monster/tsdb.py

@@ -1,18 +1,14 @@
 """
 MIT License
-
 Copyright (c) 2022 Texas Tech University
-
 Permission is hereby granted, free of charge, to any person obtaining a copy
 of this software and associated documentation files (the "Software"), to deal
 in the Software without restriction, including without limitation the rights
 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 copies of the Software, and to permit persons to whom the Software is
 furnished to do so, subject to the following conditions:
-
 The above copyright notice and this permission notice shall be included in all
 copies or substantial portions of the Software.
-
 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
@@ -24,14 +20,12 @@ SOFTWARE.
 
 """
 This file is part of MonSter.
-
 Author:
     Jie Li, jie.li@ttu.edu
 """
 
 import sql
 import utils
-import idrac
 import logger
 import schema
 import psycopg2
@@ -46,20 +40,10 @@ def init_tsdb():
     should be created before run this function.
     """
     connection = utils.init_tsdb_connection()
-    username, password = utils.get_idrac_auth()
-    nodelist = utils.get_nodelist()
-
-    node = nodelist[0]
-
-    utils.print_status('Getting', 'metric' , 'definitions')
-    metric_definitions = idrac.get_metric_definitions(node, username, password)
     
     utils.print_status('Getting', 'nodes' , 'metadata')
-    nodes_metadata = idrac.get_nodes_metadata(nodelist, username, password)
-    
-    idrac_table_schemas = schema.build_idrac_table_schemas(metric_definitions)
+    nodes_metadata = utils.get_clusternodes()
     slurm_table_schemas = schema.build_slurm_table_schemas()
-    
 
     with psycopg2.connect(connection) as conn:
         cur = conn.cursor()
@@ -70,23 +54,20 @@ def init_tsdb():
         cur.execute(metadata_sql)
         sql.write_nodes_metadata(conn, nodes_metadata)
 
-        # Create schema for idrac
-        idrac_sqls = sql.generate_metric_table_sqls(idrac_table_schemas, 'idrac')
-        cur.execute(idrac_sqls['schema_sql'])
-
         # Create schema for slurm
         slurm_sqls = sql.generate_metric_table_sqls(slurm_table_schemas, 'slurm')
         cur.execute(slurm_sqls['schema_sql'])
 
-        # Create idrac and slurm tables
-        all_sqls = idrac_sqls['tables_sql'] + slurm_sqls['tables_sql']
+        # Create slurm tables
+        all_sqls = slurm_sqls['tables_sql']
         for s in all_sqls:
             table_name = s.split(' ')[5]
             cur.execute(s)
 
             # Create hypertable
             create_hypertable_sql = "SELECT create_hypertable(" + "'" \
-                + table_name + "', 'timestamp', if_not_exists => TRUE);"
+                + table_name + "', 'timestamp', if_not_exists => TRUE)"
+            print(create_hypertable_sql)
             cur.execute(create_hypertable_sql)
         
         # Create table for jobs info
@@ -96,11 +77,6 @@ def init_tsdb():
             table_name = s.split(' ')[5]
             cur.execute(s)
         
-        # Create table for metric definitions
-        metric_def_sql = sql.generate_metric_def_table_sql()
-        cur.execute(metric_def_sql)
-        sql.write_metric_definitions(conn, metric_definitions)
-        
         conn.commit()
         cur.close()
     utils.print_status('Finish', 'tables' , 'initialization!')

+ 12 - 82
telemetry/roles/slurm_telemetry/files/monster/utils.py

@@ -1,18 +1,14 @@
 """
 MIT License
-
 Copyright (c) 2022 Texas Tech University
-
 Permission is hereby granted, free of charge, to any person obtaining a copy
 of this software and associated documentation files (the "Software"), to deal
 in the Software without restriction, including without limitation the rights
 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 copies of the Software, and to permit persons to whom the Software is
 furnished to do so, subject to the following conditions:
-
 The above copyright notice and this permission notice shall be included in all
 copies or substantial portions of the Software.
-
 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
@@ -24,7 +20,6 @@ SOFTWARE.
 
 """
 This file is part of MonSter.
-
 Author:
     Jie Li, jie.li@ttu.edu
 """
@@ -87,17 +82,6 @@ def parse_config():
         log.error(f"Parsing Configuration Error: {err}")
 
 
-def get_idrac_auth():
-    """get_idrac_auth Get iDRAC Authentication
-
-    Get username and password for accessing idrac reports
-    """
-    idrac_config = parse_config()['idrac']
-    username = idrac_config['username']
-    password = idrac_config['password']
-    return(username, password)
-
-
 def get_config(target: str):
     """get_config Get Config
 
@@ -113,7 +97,7 @@ def get_config(target: str):
         dict: configurations of specified target
     """
     
-    targets = ['timescaledb', 'idrac', 'slurm_rest_api']
+    targets = ['timescaledb', 'slurm_rest_api']
     if target not in targets:
         raise ValueError(f"Invalid configuration target. Expected one of: {targets}")
 
@@ -137,22 +121,13 @@ def init_tsdb_connection():
     return connection
 
 
-def get_nodelist():
-    """get_nodelist Get Nodelist
+def get_clusternodes():
+    """get_clusternodes Get ClusterNodes
 
-    Generate the nodelist according to the configuration
+    Generate the nodes list in the cluster
     """
-    idrac_config = parse_config()['idrac']['nodelist']
-    nodelist = []
-
-    try:
-        for i in idrac_config:
-            nodes = hostlist.expand_hostlist(i)
-            nodelist.extend(nodes)
-        
-        return nodelist
-    except Exception as err:
-        log.error(f"Cannot generate nodelist: {err}")
+    nodes_config = parse_config()['clusternodes']
+    return nodes_config
 
 
 def sort_tuple_list(tuple_list:list):
@@ -193,11 +168,11 @@ def get_node_id_mapping(connection: str):
     try:
         with psycopg2.connect(connection) as conn:
             cur = conn.cursor()
-            query = "SELECT nodeid, hostname FROM nodes"
+            query = "SELECT nodeid, os_ip_addr from nodes"
             cur.execute(query)
-            for (nodeid, hostname) in cur.fetchall():
+            for (nodeid, os_ip_addr) in cur.fetchall():
                 mapping.update({
-                    hostname: nodeid
+                    os_ip_addr: nodeid
                 })
             cur.close()
             return mapping
@@ -205,29 +180,6 @@ def get_node_id_mapping(connection: str):
         log.error(f"Cannot generate node-id mapping: {err}")
 
 
-def get_metric_dtype_mapping(conn: object):
-    """get_table_dtype_mapping Get Metric-datatype mapping
-
-    Get Metric-datatype mapping from the metric definition
-
-    Args:
-        conn (object): TimeScaleDB connection object
-
-    Returns:
-        dict: Metric-datatype mapping
-    """
-    mapping = {}
-    cur = conn.cursor()
-    query = "SELECT metric_id, data_type FROM metrics_definition;"
-    cur.execute(query)
-    for (metric, data_type) in cur.fetchall():
-        mapping.update({
-            metric: data_type
-        })
-    cur.close()
-    return mapping
-
-
 def get_ip_id_mapping(conn: object):
     """get_ip_id_mapping Get IP-ID mapping
 
@@ -241,11 +193,11 @@ def get_ip_id_mapping(conn: object):
     """
     mapping = {}
     cur = conn.cursor()
-    query = "SELECT nodeid, bmc_ip_addr FROM nodes"
+    query = "SELECT nodeid, os_ip_addr FROM nodes"
     cur.execute(query)
-    for (nodeid, bmc_ip_addr) in cur.fetchall():
+    for (nodeid, os_ip_addr) in cur.fetchall():
         mapping.update({
-            bmc_ip_addr: nodeid
+            os_ip_addr: nodeid
         })
     cur.close()
     return mapping
@@ -272,25 +224,3 @@ def cast_value_type(value, dtype):
             return value
     except ValueError:
         return value
-
-
-def partition(arr:list, cores: int):
-    """
-    Partition urls/nodes into several groups based on # of cores
-    """
-    groups = []
-    try:
-        arr_len = len(arr)
-        arr_per_core = arr_len // cores
-        arr_surplus = arr_len % cores
-
-        increment = 1
-        for i in range(cores):
-            if(arr_surplus != 0 and i == (cores-1)):
-                groups.append(arr[i * arr_per_core:])
-            else:
-                groups.append(arr[i * arr_per_core : increment * arr_per_core])
-                increment += 1
-    except Exception as err:
-        log.error(f"Cannot Partition List : {err}")
-    return groups