Sfoglia il codice sorgente

Merge pull request #748 from Artlands/issue-746

Issue 746: adding telemetry code for slurm
Sujit Jadhav 3 anni fa
parent
commit
08ac20c223

+ 204 - 0
telemetry/roles/slurm_telemetry/files/monster/dump.py

@@ -0,0 +1,204 @@
+"""
+MIT License
+
+Copyright (c) 2022 Texas Tech University
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+"""
+
+"""
+This file is part of MonSter.
+
+Author:
+    Jie Li, jie.li@ttu.edu
+"""
+
+import utils
+import logger
+from pgcopy import CopyManager
+from dateutil import tz
+from dateutil.parser import parse as parse_time
+
+log = logger.get_logger(__name__)
+
+
+def dump_node_jobs(timestamp: object, node_jobs: dict, conn: object):
+    """dump_node_jobs Dump Node-Jobs
+
+    Dump node-jobs correlation to TimeScaleDB
+
+    Args:
+        timestamp (object): Attached timestamp
+        node_jobs (dict): Node-jobs correlation
+        conn (object): TimeScaleDB connection object
+    """
+    try:
+        all_records = []
+        target_table = 'slurm.node_jobs'
+        cols = ('timestamp', 'nodeid', 'jobs', 'cpus')
+        for node, job_info in node_jobs.items():
+            all_records.append((timestamp, int(node), job_info['jobs'], job_info['cpus']))
+        mgr = CopyManager(conn, target_table, cols)
+        mgr.copy(all_records)
+        conn.commit()
+    except Exception as err:
+        curs = conn.cursor()
+        curs.execute("ROLLBACK")
+        conn.commit()
+        log.error(f"Fail to dump node-jobs correlation: {err}")
+
+
+def dump_job_metrics(job_metrics: dict, conn: object):
+    """dump_job_metrics Dump Job Metircs
+
+    Dump job metrics to TimeScaleDB
+
+    Args:
+        job_metrics (dict): Job Metrics
+        conn (object): TimeScaleDB connection object
+    """
+    try:
+        target_table = 'slurm.jobs'
+        cols = ('job_id', 'array_job_id', 'array_task_id', 'name','job_state', 
+                'user_id', 'user_name', 'group_id', 'cluster', 'partition', 
+                'command', 'current_working_directory', 'batch_flag', 
+                'batch_host', 'nodes', 'node_count', 'cpus', 'tasks', 
+                'tasks_per_node', 'cpus_per_task', 'memory_per_node', 
+                'memory_per_cpu', 'priority', 'time_limit', 'deadline', 
+                'submit_time', 'preempt_time', 'suspend_time', 'eligible_time', 
+                'start_time', 'end_time', 'resize_time', 'restart_cnt', 
+                'exit_code', 'derived_exit_code')
+
+        cur = conn.cursor()
+        all_records = []
+
+        for job in job_metrics:
+            job_id = job[cols.index('job_id')]
+            check_sql = f"SELECT EXISTS(SELECT 1 FROM slurm.jobs WHERE job_id={job_id})"
+            cur.execute(check_sql)
+            (job_exists, ) = cur.fetchall()[0]
+
+            if job_exists:
+                # Update
+                nodes = job[cols.index('nodes')]
+                job_state = job[cols.index('job_state')]
+                user_name = job[cols.index('user_name')]
+                start_time = job[cols.index('start_time')]
+                end_time = job[cols.index('end_time')]
+                resize_time = job[cols.index('resize_time')]
+                restart_cnt = job[cols.index('restart_cnt')]
+                exit_code = job[cols.index('exit_code')]
+                derived_exit_code = job[cols.index('derived_exit_code')]
+                update_sql = """ UPDATE slurm.jobs 
+                                 SET nodes = %s, job_state = %s, user_name = %s, start_time = %s, end_time = %s, resize_time = %s, restart_cnt = %s, exit_code = %s, derived_exit_code = %s
+                                 WHERE job_id = %s """
+                cur.execute(update_sql, (nodes, job_state, user_name, start_time, end_time, resize_time, restart_cnt, exit_code, derived_exit_code, job_id))
+            else:
+                all_records.append(job)
+
+        mgr = CopyManager(conn, target_table, cols)
+        mgr.copy(all_records)
+        conn.commit()
+    except Exception as err:
+        curs = conn.cursor()
+        curs.execute("ROLLBACK")
+        conn.commit()
+        log.error(f"Fail to dump job metrics: {err}")
+
+
+def dump_node_metrics(timestamp: object, 
+                      node_metrics: dict, 
+                      conn: object):
+    """dump_node_metrics Dump Node Metrics
+
+    Dump node metrics to TimeScaleDB
+
+    Args:
+        timestamp (object): attached timestamp
+        node_metrics (dict): node metrics
+        conn (object): TimeScaleDB connection object
+    """
+    schema = 'slurm'
+    try:
+        metric_names = list(list(node_metrics.values())[0].keys())
+
+        for metric_name in metric_names:
+            all_records = []
+            target_table = f'{schema}.{metric_name}'
+            cols = ('timestamp', 'nodeid', 'value')
+            for node, node_data in node_metrics.items():
+                all_records.append((timestamp, int(node), node_data[metric_name]))
+            mgr = CopyManager(conn, target_table, cols)
+            mgr.copy(all_records)
+            conn.commit()
+    except Exception as err:
+        curs = conn.cursor()
+        curs.execute("ROLLBACK")
+        conn.commit()
+        log.error(f"Fail to dump node metrics : {err}")
+
+
+def dump_idrac(ip: str, 
+               idrac_metrics: dict,
+               metric_dtype_mapping: dict, 
+               ip_id_mapping: dict,
+               conn: object, ):
+    """dump_idrac Dump iDRAC Metrics
+
+    Dump node metrics to TimeScaleDB
+
+    Args:
+        ip (str): ip address of iDRAC
+        idrac_metrics (dict): iDRAC Metrics
+        metric_dtype_mapping (dict): Metric-Datatype mapping
+        ip_id_mapping (dict): ip-id mapping
+        conn (object): TimeScaleDB connection object
+    """
+    try:
+        schema_name = 'idrac'
+        nodeid = ip_id_mapping[ip]
+
+        for table_name, table_metrics in idrac_metrics.items():
+            all_records = []
+            dtype = metric_dtype_mapping[table_name]
+
+            table_name = table_name.lower()
+            target_table = f"{schema_name}.{table_name}"
+
+            cols = ('timestamp', 'nodeid', 'source', 'fqdd', 'value')
+            for metric in table_metrics:
+                # We have to offset timestamp by -6/-5 hours. For some unknow
+                # reasons, the timestamp reported in iDRAC is not configured
+                # correctly.
+                timestamp = parse_time(metric['Timestamp'])
+                timestamp = timestamp.astimezone(tz.tzlocal())
+                timestamp = timestamp.replace(tzinfo=tz.tzutc())
+                timestamp = timestamp.astimezone(tz.tzlocal())
+
+                source = metric['Source']
+                fqdd = metric['FQDD']
+                if metric['Value']:
+                    value = utils.cast_value_type(metric['Value'], dtype)
+                    all_records.append((timestamp, nodeid, source, fqdd, value))
+
+            mgr = CopyManager(conn, target_table, cols)
+            mgr.copy(all_records)
+        conn.commit()
+    except Exception as err:
+        log.error(f"Fail to dump idrac metrics ({ip}): {err}")

+ 60 - 0
telemetry/roles/slurm_telemetry/files/monster/logger.py

@@ -0,0 +1,60 @@
+"""
+MIT License
+
+Copyright (c) 2022 Texas Tech University
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+"""
+
+"""
+This file is part of MonSter.
+
+Author:
+    Jie Li, jie.li@ttu.edu
+"""
+
+from pathlib import Path
+from logging.handlers import TimedRotatingFileHandler
+import logging
+
+
+def setup_logger(file_name):
+    monster_path = Path(__file__).resolve().parent.parent
+    log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+    
+    logger = logging.getLogger(file_name)
+    formatter = logging.Formatter(log_format)
+
+    log_handler = TimedRotatingFileHandler(filename=f'{monster_path}/log/monster.log', when="midnight", interval=1, backupCount=7)
+    log_handler.setLevel(logging.ERROR)
+    log_handler.setFormatter(formatter)
+
+    if not logger.handlers:
+        stream_handler = logging.StreamHandler()
+        stream_handler.setLevel(logging.ERROR)
+        stream_handler.setFormatter(formatter)
+
+        logger.addHandler(stream_handler)
+        logger.addHandler(log_handler)
+
+    return logger
+
+
+def get_logger(file_name):
+    return setup_logger(file_name)

+ 117 - 0
telemetry/roles/slurm_telemetry/files/monster/mslurm.py

@@ -0,0 +1,117 @@
+"""
+MIT License
+
+Copyright (c) 2022 Texas Tech University
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+"""
+
+"""
+This file is part of MonSter.
+
+Author:
+    Jie Li, jie.li@ttu.edu
+"""
+
+import time
+import pytz
+import utils
+import dump
+import slurm
+import parse
+import logger
+import psycopg2
+import schedule
+
+from datetime import datetime
+
+log = logger.get_logger(__name__)
+
+
+def monitor_slurm():
+    """monitor_slurm Monitor Slurm
+
+    Monitor Slurm Metrics
+    """
+    connection = utils.init_tsdb_connection()
+    node_id_mapping = utils.get_node_id_mapping(connection)
+    os_idrac_hostname_mapping = utils.get_os_idrac_hostname_mapping()
+    slurm_config = utils.get_config('slurm_rest_api')
+    
+    #Schedule fetch slurm
+    schedule.every().minutes.at(":00").do(fetch_slurm, 
+                                          slurm_config, 
+                                          connection, 
+                                          node_id_mapping,
+                                          os_idrac_hostname_mapping)
+
+    while True:
+        try:
+            schedule.run_pending()
+            time.sleep(1)
+        except KeyboardInterrupt:
+            schedule.clear()
+            break
+        
+
+def fetch_slurm(slurm_config: dict, 
+                connection: str, 
+                node_id_mapping: dict,
+                os_idrac_hostname_mapping: dict):
+    """fetch_slurm Fetch Slurm Metrics
+
+    Fetch Slurm metrics from the Slurm REST API
+
+    Args:
+        slurm_config (dict): slurm configuration
+        connection (str): tsdb connection
+        node_id_mapping (dict): node-ip mapping
+        os_idrac_hostname_mapping (dict): OS-iDRAC hostname mapping
+    """
+    token = slurm.read_slurm_token(slurm_config)
+    timestamp = datetime.now(pytz.utc).replace(microsecond=0)
+
+    # Get nodes data
+    nodes_url = slurm.get_slurm_url(slurm_config, 'nodes')
+    nodes_data = slurm.call_slurm_api(slurm_config, token, nodes_url)
+
+    # Get jobs data
+    jobs_url = slurm.get_slurm_url(slurm_config, 'jobs')
+    jobs_data = slurm.call_slurm_api(slurm_config, token, jobs_url)
+
+    # Process slurm data
+    if nodes_data and jobs_data:
+        job_metrics = parse.parse_jobs_metrics(jobs_data, 
+                                               os_idrac_hostname_mapping)
+        node_metrics = parse.parse_node_metrics(nodes_data, 
+                                                node_id_mapping,
+                                                os_idrac_hostname_mapping)
+        node_jobs = parse.parse_node_jobs(jobs_data,
+                                          node_id_mapping,
+                                          os_idrac_hostname_mapping)
+
+        # Dump metrics
+        with psycopg2.connect(connection) as conn:
+            dump.dump_job_metrics(job_metrics, conn)
+            dump.dump_node_metrics(timestamp, node_metrics, conn)
+            dump.dump_node_jobs(timestamp, node_jobs, conn)
+
+
+if __name__ == '__main__':
+    monitor_slurm()

+ 224 - 0
telemetry/roles/slurm_telemetry/files/monster/parse.py

@@ -0,0 +1,224 @@
+"""
+MIT License
+
+Copyright (c) 2022 Texas Tech University
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+"""
+
+"""
+This file is part of MonSter.
+
+Author:
+    Jie Li, jie.li@ttu.edu
+"""
+
+import json
+import logger
+import hostlist
+
+
+log = logger.get_logger(__name__)
+
+def parse_jobs_metrics(jobs_data: dict,
+                       os_idrac_hostname_mapping: dict):
+    """parse_jobs_metrics Parse Jobs Metrics
+
+    Parse jobs metrics get from Slurm API
+
+    Args:
+        jobs_data (dict): Job data get from Slurm APi
+        os_idrac_hostname_mapping (dict): OS-iDRAC hostname mapping
+
+    Returns:
+        list: Parsed jobs info
+    """
+    jobs_metrics = []
+
+    all_jobs = jobs_data['jobs']
+    attributes = ['job_id', 'array_job_id', 'array_task_id', 'name','job_state', 
+                  'user_id', 'user_name', 'group_id', 'cluster', 'partition', 
+                  'command', 'current_working_directory', 'batch_flag', 
+                  'batch_host', 'nodes', 'node_count', 'cpus', 'tasks', 
+                  'tasks_per_node', 'cpus_per_task', 'memory_per_node', 
+                  'memory_per_cpu', 'priority', 'time_limit', 'deadline', 
+                  'submit_time', 'preempt_time', 'suspend_time', 
+                  'eligible_time', 'start_time', 'end_time', 'resize_time', 
+                  'restart_cnt', 'exit_code', 'derived_exit_code']
+    
+    for job in all_jobs:
+        nodes = job['nodes']
+        hostnames = hostlist.expand_hostlist(nodes)
+
+        # Mapping OS hostnames to iDRAC hostnames.
+        if os_idrac_hostname_mapping:
+            try:
+                hostnames = [os_idrac_hostname_mapping[i] for i in hostnames]
+            except Exception as err:
+                log.error(f"Cannot mapping OS-iDRAC hostname: {err}")
+
+        metrics = []
+        for attribute in attributes:
+            if attribute == 'nodes':
+                metrics.append(hostnames)
+            else:
+                # Some attributes values are larger than 2147483647, which is 
+                # not INT4, and cannot saved in TSDB
+                if type(job[attribute]) is int and job[attribute] > 2147483647:
+                    metrics.append(2147483647)
+                else:
+                    metrics.append(job[attribute])
+        tuple_metrics = tuple(metrics)
+        jobs_metrics.append(tuple_metrics)
+            
+    return jobs_metrics
+
+
+def parse_node_metrics(nodes_data: dict, 
+                       node_id_mapping: dict,
+                       os_idrac_hostname_mapping: dict):
+    """parse_node_metrics Parse Node Metircs
+
+    Parse Nodes metrics get from Slurm API
+
+    Args:
+        nodes_data (dict): Nodes data get from Slurm APi
+        node_id_mapping (dict): Node-Id mapping
+        os_idrac_hostname_mapping (dict): OS-iDRAC hostname mapping
+
+    Returns:
+        dict: Parsed node metrics
+    """
+    all_node_metrics = {}
+    state_mapping = {
+        'allocated': 1,
+        'idle':0,
+        'down': -1
+    }
+    all_nodes = nodes_data['nodes']
+    for node in all_nodes:
+        hostname = node['hostname']
+
+        # Mapping the OS hostname to iDRAC hostname. The hostname in 
+        # node_id_mapping is using iDRAC hostname.
+        if os_idrac_hostname_mapping:
+            try:
+                hostname = os_idrac_hostname_mapping[hostname]
+            except Exception as err:
+                log.error(f"Cannot mapping OS-iDRAC hostname: {err}")
+
+        # Only process those nodes that are in node_id_mapping dict. 
+        if hostname in node_id_mapping:
+            node_id = node_id_mapping[hostname]
+            # CPU load
+            cpu_load = int(node['cpu_load'])
+            # Some down nodes report cpu_load large than 2147483647, which is 
+            # not INT4 and cannot saved in TSDB
+            if cpu_load > 2147483647: 
+                cpu_load = 2147483647
+            # Memory usage
+            free_memory = node['free_memory']
+            real_memory = node['real_memory']
+            memory_usage = ((real_memory - free_memory)/real_memory) * 100
+            memory_used = real_memory - free_memory
+            f_memory_usage = float("{:.2f}".format(memory_usage))
+            # Status
+            state = node['state']
+            f_state = state_mapping[state]
+            node_data = {
+                'cpu_load': cpu_load,
+                'memoryusage': f_memory_usage,
+                'memory_used': memory_used,
+                'state': f_state
+            }
+            all_node_metrics.update({
+                node_id: node_data
+            })
+    return all_node_metrics
+
+
+def parse_node_jobs(jobs_metrics: dict, 
+                    node_id_mapping:dict,
+                    os_idrac_hostname_mapping: dict):
+    """parse_node_jobs Parse Node-Jobs
+
+    Parse nodes-job correlation
+
+    Args:
+        jobs_metrics (dict): Job metrics get from Slurm APi
+        node_id_mapping (dict): Node-Id mapping
+        os_idrac_hostname_mapping (dict): OS-iDRAC hostname mapping
+
+    Returns:
+        dict: node-jobs correlation
+    """
+  
+    node_jobs = {}
+    all_jobs = jobs_metrics['jobs']
+    # Get job-nodes correlation
+    job_nodes = {}
+    for job in all_jobs:
+        valid_flag = True
+        if job['job_state'] == "RUNNING":
+            job_id = job['job_id']
+            nodes = job['nodes']
+            # Get node ids
+            hostnames = hostlist.expand_hostlist(nodes)
+
+            # Mapping OS hostnames to iDRAC hostnames.
+            if os_idrac_hostname_mapping:
+                try:
+                    hostnames = [os_idrac_hostname_mapping[i] for i in hostnames]
+                except Exception as err:
+                    log.error(f"Cannot mapping OS-iDRAC hostname: {err}")
+            
+            # Check if hostname is in node_id_mapping. 
+            # If not, ignore this job info.
+            for hostname in hostnames:
+                if hostname not in node_id_mapping:
+                    valid_flag = False
+                    break
+
+            if valid_flag:
+                node_ids = [node_id_mapping[i] for i in hostnames]
+                node_ids.sort()
+                # Get cpu counts for each node
+                allocated_nodes = job['job_resources']['allocated_nodes']
+                cpu_counts = [resource['cpus'] for node, resource in allocated_nodes.items()]
+                job_nodes.update({
+                    job_id: {
+                        'nodes': node_ids,
+                        'cpus': cpu_counts
+                    }
+                })
+    # Get nodes-job correlation
+    for job, nodes_cpus in job_nodes.items():
+        for i, node in enumerate(nodes_cpus['nodes']):
+            if node not in node_jobs:
+                node_jobs.update({
+                    node: {
+                        'jobs':[job],
+                        'cpus':[nodes_cpus['cpus'][i]]
+                    }
+                })
+            else:
+                node_jobs[node]['jobs'].append(job)
+                node_jobs[node]['cpus'].append(nodes_cpus['cpus'][i])
+
+    return node_jobs

+ 391 - 0
telemetry/roles/slurm_telemetry/files/monster/process.py

@@ -0,0 +1,391 @@
+"""
+MIT License
+
+Copyright (c) 2022 Texas Tech University
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+"""
+
+"""
+This file is part of MonSter.
+
+Author:
+    Jie Li, jie.li@ttu.edu
+"""
+
+import logger
+import time
+import multiprocessing
+
+log = logger.get_logger(__name__)
+
+
+def partition(arr:list, cores: int):
+    """partition Partition a list
+
+    Partition urls/nodes into several groups based on # of cores
+
+    Args:
+        arr (list): A list to be partitioned
+        cores (int): Number of cores of the compute running MonSter
+
+    Returns:
+        list: partitioned list
+    """
+    
+    groups = []
+    try:
+        arr_len = len(arr)
+        arr_per_core = arr_len // cores
+        arr_surplus = arr_len % cores
+
+        increment = 1
+        for i in range(cores):
+            if(arr_surplus != 0 and i == (cores-1)):
+                groups.append(arr[i * arr_per_core:])
+            else:
+                groups.append(arr[i * arr_per_core : increment * arr_per_core])
+                increment += 1
+    except Exception as err:
+        log.error(f"Cannot Partition the list: {err}")
+    return groups
+
+
+def fetch(urls: list, nodes: list, username: str, password:str):
+    """fetch Fetch Data From Urls
+
+    Fetch Data From Urls of the specified nodes
+
+    Args:
+        urls (list): idrac urls
+        nodes (list): a list of ip addresses of idracs
+        username (str): idrac username
+        password (str): idrac password
+
+    Returns:
+        [type]: [description]
+    """
+    idrac_metrics = []
+    try:
+        asyn_requests = AsyncioRequests(auth = (username, password),
+                                        timeout = (15, 45),
+                                        max_retries = 3)
+        idrac_metrics = asyn_requests.bulk_fetch(urls, nodes)
+    except Exception as err:
+        log.error(f"Cannot fetch data from idrac urls: {err}")
+    return idrac_metrics
+
+
+def parallel_fetch(urllist: list, 
+                   nodelist: list, 
+                   cores: int,
+                   username: str, 
+                   password:str):
+    """parallel_fetch Parallel Fetch Data from Urls
+
+    Parallel fetch data from rrls of the specified nodes
+
+    Args:
+        urllist (list): idrac urls
+        nodelist (list): a list of ip addresses of idracs
+        cores (int): Number of cores of the compute running MonSter
+        username (str): idrac username
+        password (str): idrac password
+
+    Returns:
+        list: idrac metrics in a list
+    """
+    
+    flatten_metrics = []
+    try:
+        # Partition
+        urls_group = partition(urllist, cores)
+        nodes_group = partition(nodelist, cores)
+
+        fetch_args = []
+        for i in range(cores):
+            urls = urls_group[i]
+            nodes = nodes_group[i]
+            fetch_args.append((urls, nodes, username, password))
+
+        with multiprocessing.Pool() as pool:
+            metrics = pool.starmap(fetch, fetch_args)
+
+        flatten_metrics = [item for sublist in metrics for item in sublist]
+    except Exception as err:
+        log.error(f"Cannot parallel fetch data from idrac urls: {err}")
+
+    return flatten_metrics
+
+
+def extract(system_info: dict, bmc_info: dict):
+    """extract Extract Info
+
+    Extract system and bmc info
+
+    Args:
+        system_info (dict): System info
+        bmc_info (dict): BMC info
+
+    Returns:
+        dict: extracted info
+    """
+    
+    bmc_ip_addr = system_info["node"]
+    system_metrics = system_info["metrics"]
+    bmc_metrics = bmc_info["metrics"]
+    
+    general = ["UUID", "SerialNumber", "HostName", "Model", "Manufacturer"]
+    processor = ["ProcessorModel", "ProcessorCount", "LogicalProcessorCount"]
+    memory = ["TotalSystemMemoryGiB"]
+    bmc = ["BmcModel", "BmcFirmwareVersion"]
+    metrics = {}
+    try:
+        # Update service tag
+        if system_metrics:
+            service_tag = system_metrics.get("SKU", None)
+        else:
+            service_tag = None
+
+        metrics.update({
+            "ServiceTag": service_tag
+        })
+
+        # Update System metrics
+        if system_metrics:
+            for metric in general:
+                metrics.update({
+                    metric: system_metrics.get(metric, None)
+                })
+            for metric in processor:
+                if metric.startswith("Processor"):
+                    metrics.update({
+                        metric: system_metrics.get("ProcessorSummary", {}).get(metric[9:], None)
+                    })
+                else:
+                    metrics.update({
+                        metric: system_metrics.get("ProcessorSummary", {}).get(metric, None)
+                    })
+            for metric in memory:
+                metrics.update({
+                    metric: system_metrics.get("MemorySummary", {}).get("TotalSystemMemoryGiB", None)
+                })
+        else:
+            for metric in general + processor + memory:
+                metrics.update({
+                    metric: None
+                })
+
+        metrics.update({
+            "Bmc_Ip_Addr": bmc_ip_addr
+        })
+
+        # Update BMC metrics
+        if bmc_metrics:
+            for metric in bmc:
+                metrics.update({
+                    metric: bmc_metrics.get(metric[3:], None)
+                })
+        else:
+            for metric in bmc:
+                metrics.update({
+                    metric: None
+                })
+        
+        # Update Status
+        if  (not system_metrics and 
+             not bmc_metrics):
+            metrics.update({
+                "Status": "BMC unreachable in this query"
+            })
+        else:
+            metrics.update({
+                "Status": system_metrics.get("Status", {}).get("Health", None)
+            })
+            
+        return metrics
+    except Exception as err:
+        log.error(f"Cannot extract info from system and bmc: {err}")
+
+
+def parallel_extract(system_info_list: list, 
+                     bmc_info_list: list):
+    """parallel_extract Parallel Extract Info
+
+    Parallel extract system and bmc info
+
+    Args:
+        system_info_list (list): a list of system info
+        bmc_info_list (list): a list of bmc info
+
+    Returns:
+        list: a list of extracted info
+    """
+    
+    info = []
+    try:
+        process_args = zip(system_info_list, 
+                           bmc_info_list)
+        with multiprocessing.Pool() as pool:
+            info = pool.starmap(extract, process_args)
+    except Exception as err:
+        log.error(f"Cannot parallel extract info from system and bmc: {err}")
+    return info
+
+"""
+    Process data in the MetricValues, generate raw records
+    """
+
+
+def process_idrac(ip: str, report: str, metrics: list):
+    """process_idrac Process iDRAC Metrics
+
+    Process iDRAC metircs in the MetricValues and generate records
+
+    Args:
+        ip (str): iDRAC ip address
+        report (str): report name
+        metrics (list): a list of metric names
+
+    Returns:
+        dict: processed idrac metrics grouped by table name
+    """
+    idrac_metrics = {}
+    try:
+        if report == "PowerStatistics":
+            # PowerStatistics is better to be pulled
+            pass
+        else:
+            for metric in metrics:
+                table_name = ''
+                timestamp = ''
+                source = ''
+                fqdd = ''
+                value = ''
+
+                try:
+                    table_name = metric['MetricId']
+                    timestamp = metric['Timestamp']
+                    source = metric['Oem']['Dell']['Source']
+                    fqdd = metric['Oem']['Dell']['FQDD']
+                    value = metric['MetricValue']
+
+                    # print(f"Time Stamp: {timestamp}")
+
+                except:
+                    pass
+
+                if table_name and timestamp and source and fqdd and value:
+                    record = {
+                        'Timestamp': timestamp,
+                        'Source': source,
+                        'FQDD': fqdd,
+                        'Value': value
+                    }
+
+                    if table_name not in idrac_metrics:
+                        idrac_metrics.update({
+                            table_name: [record]
+                        })
+                    else:
+                        idrac_metrics[table_name].append(record)
+    
+    except Exception as err:
+            log.error(f"Fail to process idrac metrics: {err}")
+    
+    return idrac_metrics
+
+
+class AsyncioRequests:
+    import aiohttp
+    import asyncio
+    from aiohttp import ClientSession
+
+
+    def __init__(self, verify_ssl: bool = False, auth: tuple = (), 
+                 timeout: tuple = (15, 45), max_retries: int = 3):
+        self.metrics = {}
+        self.timestamp = int(time.time() * 1000000000)
+        self.retry = 0
+        self.connector=self.aiohttp.TCPConnector(verify_ssl=verify_ssl)
+        if auth:
+            self.auth = self.aiohttp.BasicAuth(*auth)
+        else:
+            self.auth = None
+        self.timeout = self.aiohttp.ClientTimeout(*timeout)
+        self.max_retries = max_retries
+        self.loop = self.asyncio.get_event_loop()
+        
+    
+    async def __fetch_json(self, 
+                           url: str, 
+                           node: str, 
+                           session: ClientSession):
+        """__fetch_json Fetch Url
+
+        Get request wrapper to fetch json data from API
+
+        Args:
+            url (str): url of idrac
+            node (str): ip address of the idrac
+            session (ClientSession): Client Session
+
+        Returns:
+            dict: The return of url in json format
+        """
+        
+        try:
+            resp = await session.request(method='GET', url=url)
+            resp.raise_for_status()
+            json = await resp.json()
+            return {"node": node, 
+                    "metrics": json, 
+                    "timestamp": self.timestamp}
+        except (TimeoutError):
+            self.retry += 1
+            if self.retry >= self.max_retries:
+                log.error(f"Cannot fetch data from {node} : {url}")
+                return {"node": node, 
+                        "metrics": {}, 
+                        "timestamp": self.timestamp}
+            return await self.__fetch_json(url, node, session)
+        except Exception as err:
+            log.error(f"Cannot fetch data from {url} : {err}")
+            return {"node": node, 
+                    "metrics": {}, 
+                    "timestamp": self.timestamp}
+
+
+    async def __requests(self, urls: list, nodes: list):
+        async with self.ClientSession(connector=self.connector, 
+                                      auth = self.auth, 
+                                      timeout = self.timeout) as session:
+            tasks = []
+            for i, url in enumerate(urls):
+                tasks.append(self.__fetch_json(url=url, 
+                                               node=nodes[i], 
+                                               session=session))
+            return await self.asyncio.gather(*tasks)
+
+
+    def bulk_fetch(self, urls: list, nodes: list):
+        self.metrics =  self.loop.run_until_complete(self.__requests(urls, nodes))
+        self.loop.close()
+        return self.metrics
+    

+ 244 - 0
telemetry/roles/slurm_telemetry/files/monster/rparser.py

@@ -0,0 +1,244 @@
+"""
+MIT License
+
+Copyright (c) 2022 Texas Tech University
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+"""
+
+"""
+This file is part of MonSter.
+
+Author:
+    Jie Li, jie.li@ttu.edu
+"""
+
+JSON_COMMA = ','
+JSON_COLON = ':'
+JSON_LEFTBRACKET = '['
+JSON_RIGHTBRACKET = ']'
+JSON_LEFTBRACE = '{'
+JSON_RIGHTBRACE = '}'
+JSON_QUOTE = '"'
+
+JSON_QUOTE = '"'
+JSON_WHITESPACE = [' ', '\t', '\b', '\n', '\r']
+JSON_SYNTAX = [JSON_COMMA, JSON_COLON, JSON_LEFTBRACKET, JSON_RIGHTBRACKET,
+               JSON_LEFTBRACE, JSON_RIGHTBRACE]
+
+FALSE_LEN = len('false')
+TRUE_LEN = len('true')
+NULL_LEN = len('null')
+
+
+def lex_string(string):
+    json_string = ''
+
+    if string[0] == JSON_QUOTE:
+        string = string[1:]
+    else:
+        return None, string
+
+    for c in string:
+        if c == JSON_QUOTE:
+            return json_string, string[len(json_string)+1:]
+        else:
+            json_string += c
+
+    # raise Exception('Expected end-of-string quote')
+    return json_string, ''
+    
+
+def lex_number(string):
+    json_number = ''
+
+    number_characters = [str(d) for d in range(0, 10)] + ['-', 'e', '.']
+
+    for c in string:
+        if c in number_characters:
+            json_number += c
+        else:
+            break
+
+    rest = string[len(json_number):]
+
+    if not len(json_number):
+        return None, string
+
+    if '.' in json_number:
+        return float(json_number), rest
+
+    return int(json_number), rest
+
+
+def lex_bool(string):
+    string_len = len(string)
+
+    if string_len >= TRUE_LEN and \
+       string[:TRUE_LEN] == 'true':
+        return True, string[TRUE_LEN:]
+    elif string_len >= FALSE_LEN and \
+         string[:FALSE_LEN] == 'false':
+        return False, string[FALSE_LEN:]
+
+    return None, string
+
+
+def lex_null(string):
+    string_len = len(string)
+
+    if string_len >= NULL_LEN and \
+       string[:NULL_LEN] == 'null':
+        return True, string[NULL_LEN]
+
+    return None, string
+
+
+def lex(string):
+    tokens = []
+
+    while len(string):
+        json_string, string = lex_string(string)
+        if json_string is not None:
+            tokens.append(json_string)
+            continue
+
+        json_number, string = lex_number(string)
+        if json_number is not None:
+            tokens.append(json_number)
+            continue
+
+        json_bool, string = lex_bool(string)
+        if json_bool is not None:
+            tokens.append(json_bool)
+            continue
+
+        json_null, string = lex_null(string)
+        if json_null is not None:
+            tokens.append(None)
+            continue
+
+        if string[0] in JSON_WHITESPACE:
+            string = string[1:]
+        elif string[0] in JSON_SYNTAX:
+            tokens.append(string[0])
+            string = string[1:]
+        else:
+            raise Exception('Unexpected character: {}'.format(string[0]))
+        
+        # print(tokens)
+
+    return tokens
+
+
+def parse_array(tokens):
+    # print("PARSE ARRAY: ")
+    # print(tokens)
+    json_array = []
+
+    if tokens:
+        t = tokens[0]
+        if t == JSON_RIGHTBRACKET:
+            return json_array, tokens[1:]
+
+        while True:
+            json, tokens = parse(tokens)
+            json_array.append(json)
+
+            # print(f'Json array: {json_array}')
+
+            if tokens:
+                t = tokens[0]
+                if t == JSON_RIGHTBRACKET:
+                    return json_array, tokens[1:]
+                elif t != JSON_COMMA:
+                    raise Exception('Expected comma after object in array')
+                else:
+                    tokens = tokens[1:]
+            else:
+                return json_array, None
+    return None, None
+    # raise Exception('Expected end-of-array bracket')
+
+
+def parse_object(tokens):
+    # print("PARSE OBJECT: ")
+    # print(tokens)
+    json_object = {}
+
+    if tokens:
+        t = tokens[0]
+
+        if t == JSON_RIGHTBRACE:
+            return json_object, tokens[1:]
+
+        while True:
+            if tokens:
+                json_key = tokens[0]
+                # print(f'Json key: {json_key}')
+
+                if type(json_key) is str:
+                    tokens = tokens[1:]
+                else:
+                    raise Exception('Expected string key, got: {}'.format(json_key))
+
+                if tokens:
+                    if tokens[0] != JSON_COLON:
+                        raise Exception('Expected colon after key in object, got: {}'.format(t))
+
+                    json_value, tokens = parse(tokens[1:])
+                    # print(f'Json value: {json_value}')
+
+                    json_object[json_key] = json_value
+
+                    # print(f'Json object: {json_object}')
+
+                if tokens:
+                    t = tokens[0]
+                    if t == JSON_RIGHTBRACE:
+                        return json_object, tokens[1:]
+                    elif t != JSON_COMMA:
+                        raise Exception('Expected comma after pair in object, got: {}'.format(t))
+
+                    tokens = tokens[1:]
+            else:
+                return json_object, None
+    return None, None
+    # raise Exception('Expected end-of-object brace')
+
+def parse(tokens):
+    if tokens:
+        t = tokens[0]
+
+        # print("PARSE: ")
+        # print(tokens)
+
+        if t == JSON_LEFTBRACKET:
+            return parse_array(tokens[1:])
+        elif t == JSON_LEFTBRACE:
+            return parse_object(tokens[1:])
+        else:
+            return t, tokens[1:]
+    else:
+        return None, None
+
+
+def report_parser(string):
+    tokens = lex(string)
+    return parse(tokens)[0]

+ 125 - 0
telemetry/roles/slurm_telemetry/files/monster/schema.py

@@ -0,0 +1,125 @@
+"""
+MIT License
+
+Copyright (c) 2022 Texas Tech University
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+"""
+
+"""
+This file is part of MonSter.
+
+Author:
+    Jie Li, jie.li@ttu.edu
+"""
+
+import utils
+import logger
+
+log = logger.get_logger(__name__)
+
+
+def build_idrac_table_schemas(metric_definitions: list):
+    """build_table_schemas Build iDRAC Table Schemas
+
+    Build table schemas based on the idrac telemetry metric definitions
+
+    Args:
+        metric_definitions (list): idrac telemetry metric definitions
+    
+    Returns:
+        dict: iDRAC table schemas
+    """
+    table_schemas = {}
+
+    try:
+        for metric in metric_definitions:
+            table_name = metric['Id']
+            metric_type = metric['MetricDataType']
+            metric_unit = metric.get('Units', None)
+
+            # For network metrics, use BIG INT for storing the metric readings
+            if metric_unit == 'By' or metric_unit == 'Pkt':
+                value_type = 'BIGINT'
+            else:
+                value_type = utils.data_type_mapping.get(metric_type, 'TEXT')
+            
+            column_names = ['Timestamp', 'NodeID', 'Source', 'FQDD', 'Value']
+            column_types = ['TIMESTAMPTZ NOT NULL', 'INT NOT NULL', 'TEXT', \
+                            'TEXT', value_type]
+            
+            table_schemas.update({
+                table_name: {
+                    'column_names': column_names,
+                    'column_types': column_types,
+                }
+            })
+    except Exception as err:
+        log.error(f"Cannot build idrac table schemas: {err}")
+    return table_schemas
+
+
+def build_slurm_table_schemas():
+    """build_slurm_table_schemas Build Slurm Table Schemas
+
+    Build slurm table schemas for storing resource usage metrics obtained from 
+    slurm
+
+    Returns:
+        dict: slurm table schemas
+    """
+    table_schemas = {}
+    add_tables = {
+        'memoryusage':{
+            'add_columns': ['Value'],
+            'add_types': ['REAL']
+        },
+        'memory_used':{
+            'add_columns': ['Value'],
+            'add_types': ['INT']
+        },
+        'cpu_load':{
+            'add_columns': ['Value'],
+            'add_types': ['INT']
+        },
+        'state':{
+            'add_columns': ['Value'],
+            'add_types': ['INT']
+        },
+        'node_jobs':{
+            'add_columns': ['Jobs', 'CPUs'],
+            'add_types': ['INTEGER[]', 'INTEGER[]']
+        }
+    }
+    try:
+        for table_name, detail in add_tables.items():
+            column_names = ['Timestamp', 'NodeID']
+            column_types = ['TIMESTAMPTZ NOT NULL', 'INT NOT NULL']
+            column_names.extend(detail['add_columns'])
+            column_types.extend(detail['add_types'])
+
+            table_schemas.update({
+                table_name: {
+                    'column_names': column_names,
+                    'column_types': column_types
+                }
+            })
+    except Exception as err:
+        log.error(f'Cannot build slurm table schemas: {err}')
+    return table_schemas

+ 155 - 0
telemetry/roles/slurm_telemetry/files/monster/slurm.py

@@ -0,0 +1,155 @@
+"""
+MIT License
+
+Copyright (c) 2022 Texas Tech University
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+"""
+
+"""
+This file is part of MonSter.
+
+Author:
+    Jie Li, jie.li@ttu.edu
+"""
+
+import time
+import json
+import logger
+import requests
+import subprocess
+
+from requests.adapters import HTTPAdapter
+
+log = logger.get_logger(__name__)
+
+
+def read_slurm_token(slurm_config: dict):
+    """read_slurm_token Read Slurm token
+
+    Read the token file, if it is out of data, get a new token from Slurm
+
+    Args:
+        slurm_config (dict): Slurm Configuration
+    """
+    token = ""
+    try:
+        with open('./token.json', 'r') as f:
+            token_record = json.load(f)
+            time_interval = int(time.time()) - token_record['time']
+            if time_interval >= 3600:
+                token = get_slurm_token(slurm_config)
+            else:
+                token = token_record['token']
+    except:
+        token = get_slurm_token(slurm_config)
+    return token
+
+
+def get_slurm_token(slurm_config: dict):
+    """get_slurm_token Get Slurm Token
+
+    Get JWT token from Slurm. This requires the public key on this node to be 
+    added to the target cluster headnode.
+
+    Args:
+        slurm_config (dict): Slurm Configuration
+
+    Returns:
+        srt: token
+    """
+    
+    while True:
+        try:
+            # Setting command parameters
+            slurm_headnode = slurm_config['headnode']
+            
+            print("Get a new token...")
+            # The command used in cli
+            command = [f"ssh {slurm_headnode} 'scontrol token lifespan=3600'"]
+            # Get the string from command line
+            rtn_str = subprocess.run(command, shell=True, stdout=subprocess.PIPE).stdout.decode('utf-8')
+            # Get token
+            token = rtn_str.splitlines()[0].split('=')[1]
+            timestamp = int(time.time())
+
+            token_record = {
+                'time': timestamp,
+                'token': token
+            }
+
+            with open('./token.json', 'w') as f:
+                json.dump(token_record, f, indent = 4)
+            
+            return token
+        except Exception as err:
+            print("Get Slurm token error! Try in 60s.")
+            time.sleep(60)
+        else:
+            break
+
+
+def call_slurm_api(slurm_config: dict, token: str, url: str):
+    """call_slurm_api Call Slurm API
+
+    Call Slurm API and get the data from the specified url
+
+    Args:
+        slurm_config (dict): Slurm Configuration
+        token (str): Slurm JWT token
+        url (str): Url of Slurm API
+
+    Returns:
+        dict: slurm metrics
+    """
+  
+    metrics = {}
+    headers = {"X-SLURM-USER-NAME": slurm_config['user'], 
+               "X-SLURM-USER-TOKEN": token}
+    adapter = HTTPAdapter(max_retries=3)
+    with requests.Session() as session:
+        session.mount(url, adapter)
+        try:
+            response = session.get(url, headers=headers)
+            metrics = response.json()
+        except Exception as err:
+            log.error(f"Fetch slurm metrics error: {err}")
+    return metrics
+
+
+def get_slurm_url(slurm_config: dict, url_type: str):
+    """get_slurm_nodes_url Get Slurm Nodes Url
+
+    Get the url for reading nodes info from slurm
+
+    Args:
+        slurm_config (dict): Slurm Configuration
+        url_type: Url type. nodes or jobs
+    """
+    base_url = f"http://{slurm_config['ip']}:{slurm_config['port']}"
+    url_types = ['nodes', 'jobs']
+    if url_type not in url_types:
+        raise ValueError(f"Invalid url type. Expected one of: {url_types}")
+
+    if url_type == 'nodes':
+        url = f"{base_url}{slurm_config['slurm_nodes']}"
+    else:
+        url = f"{base_url}{slurm_config['slurm_jobs']}"
+    
+    return url

+ 410 - 0
telemetry/roles/slurm_telemetry/files/monster/sql.py

@@ -0,0 +1,410 @@
+"""
+MIT License
+
+Copyright (c) 2022 Texas Tech University
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+"""
+
+"""
+This file is part of MonSter.
+
+Author:
+    Jie Li, jie.li@ttu.edu
+"""
+
+import utils
+import logger
+from pgcopy import CopyManager
+from datetime import datetime
+
+log = logger.get_logger(__name__)
+
+
+def generate_metric_table_sqls(table_schemas: dict,
+                               schema_name: str):
+    """generate_metric_table_sqls General Metric Table Sqls
+
+    Generate sqls for creating metric tables
+
+    Args:
+        table_schemas (dict): table schemas
+        schema_name (str): schema name
+
+    Returns:
+        dict: sql statements
+    """
+    sql_statements = {}
+    try:
+        schema_sql = f"CREATE SCHEMA IF NOT EXISTS {schema_name};"
+        sql_statements.update({
+            'schema_sql': schema_sql
+        })
+        
+        tables_sql = []
+        for table, column in table_schemas.items():
+            column_names = column['column_names']
+            column_types = column['column_types']
+            
+            column_str = ''
+            for i, column in enumerate(column_names):
+                column_str += f'{column} {column_types[i]}, '
+
+            table_sql = f"CREATE TABLE IF NOT EXISTS {schema_name}.{table} \
+                ({column_str}FOREIGN KEY (NodeID) REFERENCES nodes (NodeID));"
+            tables_sql.append(table_sql)
+
+        sql_statements.update({
+            'tables_sql': tables_sql,
+        })
+
+    except Exception as err:
+        log.error(f'Cannot Genrerate Metric Table Sqls: {err}')
+    
+    return sql_statements
+
+
+def generate_slurm_job_table_sql(schema_name: str):
+    """generate_slurm_job_table_sql Generate Slurm Job Table Sql
+
+    Generate sqls for creating the table that stores the jobs info
+
+    Args:
+        schema_name (str): schema name
+
+    Returns:
+        dict: sql statements
+    """
+    
+    sql_statements = {}
+    table = 'jobs'
+    try:
+        schema_sql = f"CREATE SCHEMA if NOT EXISTS {schema_name}"
+        sql_statements.update({
+            'schema_sql': schema_sql
+        })
+        tables_sql = []
+        column_names = ['job_id', 'array_job_id', 'array_task_id', 'name', 
+                        'job_state', 'user_id', 'user_name', 'group_id', 
+                        'cluster', 'partition', 'command', 
+                        'current_working_directory', 'batch_flag', 'batch_host',
+                        'nodes', 'node_count', 'cpus', 'tasks', 
+                        'tasks_per_node', 'cpus_per_task', 'memory_per_node', 
+                        'memory_per_cpu', 'priority', 'time_limit', 'deadline', 
+                        'submit_time', 'preempt_time', 'suspend_time', 
+                        'eligible_time', 'start_time', 'end_time', 
+                        'resize_time', 'restart_cnt', 'exit_code', 
+                        'derived_exit_code']
+        column_types = ['INT PRIMARY KEY', 'INT', 'INT', 'TEXT', 'TEXT', 'INT', 
+                        'TEXT', 'INT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 
+                        'BOOLEAN', 'TEXT', 'TEXT[]', 'INT', 'INT', 'INT', 'INT', 
+                        'INT', 'INT', 'INT', 'INT', 'INT', 'INT', 'INT', 'INT', 
+                        'INT', 'INT', 'INT', 'INT', 'INT', 'INT', 'INT', 'INT']
+        column_str = ''
+        for i, column in enumerate(column_names):
+            column_str += f'{column} {column_types[i]}, '
+
+        table_sql = f"CREATE TABLE IF NOT EXISTS {schema_name}.{table} \
+            ({column_str[:-2]});"
+        tables_sql.append(table_sql)
+
+        sql_statements.update({
+            'tables_sql': tables_sql,
+        })
+    except Exception as err:
+        print(err)
+        log.error(f'Cannot Genrerate Job Table Sqls: {err}')
+    
+    return sql_statements
+
+
+def generate_metric_def_table_sql():
+    """generate_metrics_def_table_sql Generate Metrics Definition Table Sql
+
+    Generate a sql for creating the metrics definition table
+
+    Returns:
+        str: sql string
+    """
+    metric_def_table_sql = "CREATE TABLE IF NOT EXISTS metrics_definition \
+            (id SERIAL PRIMARY KEY, metric_id TEXT NOT NULL, metric_name TEXT, \
+            description TEXT, metric_type TEXT,  metric_data_type TEXT, \
+            units TEXT, accuracy REAL, sensing_interval TEXT, \
+            discrete_values TEXT[], data_type TEXT, UNIQUE (id));"
+    return metric_def_table_sql
+
+
+def generate_metadata_table_sql(nodes_metadata: list, table_name: str):
+    """generate_metadata_table_sql Generate Metadata Table Sql
+
+    Generate a sql for creating the node metadata table
+
+    Args:
+        nodes_metadata (list): nodes metadata list
+        table_name (str): table name 
+
+    Returns:
+        str: sql string
+    """
+    column_names = list(nodes_metadata[0].keys())
+    column_str = ""
+    for i, column in enumerate(column_names):
+        column_str += column + " TEXT, "
+    column_str = column_str[:-2]
+    metadata_table_sql = f" CREATE TABLE IF NOT EXISTS {table_name} \
+        ( NodeID SERIAL PRIMARY KEY, {column_str}, UNIQUE (NodeID));"
+    return metadata_table_sql
+
+
+def update_nodes_metadata(conn: object, nodes_metadata: list, table_name: str):
+    """update_nodes_metadata Update Nodes Metadata
+
+    Update nodes metadata table
+
+    Args:
+        conn (object): database connection
+        nodes_metadata (list): nodes metadata list
+        table_name (str): table name
+    """
+    cur = conn.cursor()
+    for record in nodes_metadata:
+        col_sql = ""
+        bmc_ip_addr = record['Bmc_Ip_Addr']
+        for col, value in record.items():
+            if col != 'Bmc_Ip_Addr' and col != 'HostName':
+                col_value = col.lower() + " = '" + str(value) + "', "
+                col_sql += col_value
+        col_sql = col_sql[:-2]
+        sql =  "UPDATE " + table_name + " SET " + col_sql \
+            + " WHERE bmc_ip_addr = '" + bmc_ip_addr + "';"
+        cur.execute(sql)
+    
+    conn.commit()
+    cur.close()
+
+
+def insert_nodes_metadata(conn: object, nodes_metadata: list, table_name: str):
+    """insert_nodes_metadata Insert Nodes Metadata
+
+    Insert nodes metadata to metadata table
+
+    Args:
+        conn (object): database connection
+        nodes_metadata (list): nodes metadata list
+        table_name (str): table name
+    """
+    cols = tuple([col.lower() for col in list(nodes_metadata[0].keys())])
+    records = []
+    for record in nodes_metadata:
+        values = [str(value) for value in record.values()]
+        records.append(tuple(values))
+
+    mgr = CopyManager(conn, table_name, cols)
+    mgr.copy(records)
+    conn.commit()
+
+
+def check_table_exist(conn: object, table_name: str):
+    """check_table_exist Check Table Exists
+
+    Check if the specified table exists or not
+
+    Args:
+        conn (object): database connection
+        table_name (str): table name
+
+    Returns:
+        bool: True if exists, false otherwise
+    """
+    cur = conn.cursor()
+    table_exists = False
+    sql = "SELECT EXISTS (SELECT FROM pg_tables WHERE tablename = '" + table_name + "');"
+    cur.execute(sql)
+    (table_exists, ) = cur.fetchall()[0]
+
+    if table_exists:
+        data_exists = False
+        sql = "SELECT EXISTS (SELECT * from " + table_name + ");"
+        cur.execute(sql)
+        (data_exists, ) = cur.fetchall()[0]
+        return data_exists
+    return False
+
+
+def write_metric_definitions(conn: object, metric_definitions: list):
+    """write_metric_definitions Write Metric Definitions
+
+    Write metric definitions to the table
+
+    Args:
+        conn (object): database connection
+        metric_definitions (list): the metric definitions
+    """
+    if not check_table_exist(conn, 'metrics_definition'):
+        cols = ('metric_id', 'metric_name', 'description', 'metric_type',
+                    'metric_data_type', 'units', 'accuracy', 'sensing_interval',
+                    'discrete_values', 'data_type')
+
+        metric_definitions_table = [(i['Id'], i['Name'], i['Description'],
+        i['MetricType'], i['MetricDataType'], i['Units'], i['Accuracy'], 
+        i['SensingInterval'], i['DiscreteValues'], 
+        utils.data_type_mapping[i['MetricDataType']])for i in metric_definitions]
+
+        # Sort
+        metric_definitions_table = utils.sort_tuple_list(metric_definitions_table)
+        
+        mgr = CopyManager(conn, 'metrics_definition', cols)
+        mgr.copy(metric_definitions_table)
+    
+    conn.commit()
+
+
+def write_nodes_metadata(conn: object, nodes_metadata: list):
+    """write_nodes_metadata Write Nodes Metadata
+
+    Write nodes metadata to the table
+
+    Args:
+        conn (object): database connection
+        nodes_metadata (list): nodes metadata list
+    """
+    if not check_table_exist(conn, 'nodes'):
+        insert_nodes_metadata(conn, nodes_metadata, 'nodes') 
+    else:
+        update_nodes_metadata(conn, nodes_metadata, 'nodes')
+
+
+def generate_slurm_sql(metric: str, 
+                       start: str, 
+                       end: str, 
+                       interval: str, 
+                       aggregate: str):
+    """generate_slurm_sql Generate Slurm Sql
+
+    Generate sql for querying slurm metrics
+
+    Args:
+        metric (str): metric name
+        start (str): start of time range
+        end (str): end of time range
+        interval (str): aggregation interval
+        aggregate (str): aggregation function
+
+    Returns:
+        string: sql string
+    """
+    sql = ""
+    if metric == 'node_jobs':
+        sql = f"SELECT time_bucket_gapfill('{interval}', timestamp) AS time, \
+            nodeid, jsonb_agg(jobs) AS jobs, jsonb_agg(cpus) AS cpus \
+            FROM slurm.{metric} \
+            WHERE timestamp >= '{start}' \
+            AND timestamp <= '{end}' \
+            GROUP BY time, nodeid \
+            ORDER BY time;"
+    else:
+        sql = f"SELECT time_bucket_gapfill('{interval}', timestamp) AS time, \
+            nodeid, {aggregate}(value) AS value\
+            FROM slurm.{metric} \
+            WHERE timestamp >= '{start}' \
+            AND timestamp <= '{end}' \
+            GROUP BY time, nodeid \
+            ORDER BY time;"
+    return sql
+
+
+def generate_idrac_sql(metric: str, 
+                       fqdd: str,
+                       start: str, 
+                       end: str, 
+                       interval: str, 
+                       aggregate: str):
+    """generate_idrac_sql Generate iDRAC Sql
+
+    Generate sql for querying idrac metrics
+
+    Args:
+        metric (str): metric name
+        fqdd (str): Fully Qualified Device Descriptor
+        start (str): start of time range
+        end (str): end of time range
+        interval (str): aggregation interval
+        aggregate (str): aggregation function
+    
+    Returns:
+        string: sql string
+    """
+    schema = 'idrac'
+    sql = f"SELECT time_bucket_gapfill('{interval}', timestamp) AS time, \
+        nodeid, fqdd AS label, {aggregate}(value) AS value \
+        FROM {schema}.{metric} \
+        WHERE timestamp >= '{start}' \
+        AND timestamp < '{end}' \
+        AND fqdd = '{fqdd}' \
+        GROUP BY time, nodeid, label \
+        ORDER BY time;"
+    return sql
+
+
+def generate_slurm_jobs_sql(start: str,end: str):
+    """generate_slurm_jobs_sql Generate Slurm Jobs Sql
+
+    Generate Sql for querying slurm jobs info
+
+    Args:
+        start (str): start time
+        end (str): end time
+
+    Returns:
+        string: sql string
+    """
+    utc_from = datetime.strptime(start, '%Y-%m-%dT%H:%M:%S.%fZ')
+    epoch_from = int((utc_from - datetime(1970, 1, 1)).total_seconds())
+    utc_to = datetime.strptime(end, '%Y-%m-%dT%H:%M:%S.%fZ')
+    epoch_to = int((utc_to - datetime(1970, 1, 1)).total_seconds())
+
+    sql = f"SELECT * FROM slurm.jobs \
+            WHERE start_time < {epoch_to} \
+            AND end_time > {epoch_from};"
+    return sql
+
+
+def generate_node_jobs_sql(start: str, end: str, interval: str):
+    """gene_node_jobs_sql Generate Node-Jobs Sql
+
+    Generate SQL for querying node-jobs correlation
+
+    Args:
+        start (str): start time
+        end (str): end time
+        interval (str): interval for aggragation
+
+    Returns:
+        string: sql string
+    """
+    sql = f"SELECT time_bucket_gapfill('{interval}', timestamp) AS time, \
+            nodeid, jsonb_agg(jobs) AS jobs, jsonb_agg(cpus) AS cpus \
+            FROM slurm.node_jobs \
+            WHERE timestamp >= '{start}' \
+            AND timestamp <= '{end}' \
+            GROUP BY time, nodeid \
+            ORDER BY time;"
+    return sql
+
+

+ 109 - 0
telemetry/roles/slurm_telemetry/files/monster/tsdb.py

@@ -0,0 +1,109 @@
+"""
+MIT License
+
+Copyright (c) 2022 Texas Tech University
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+"""
+
+"""
+This file is part of MonSter.
+
+Author:
+    Jie Li, jie.li@ttu.edu
+"""
+
+import sql
+import utils
+import idrac
+import logger
+import schema
+import psycopg2
+
+log = logger.get_logger(__name__)
+
+
+def init_tsdb():
+    """init_tsdb Initialize TimeScaleDB
+
+    Initialize TimeScaleDB; The database specified in the configuration file
+    should be created before run this function.
+    """
+    connection = utils.init_tsdb_connection()
+    username, password = utils.get_idrac_auth()
+    nodelist = utils.get_nodelist()
+
+    node = nodelist[0]
+
+    utils.print_status('Getting', 'metric' , 'definitions')
+    metric_definitions = idrac.get_metric_definitions(node, username, password)
+    
+    utils.print_status('Getting', 'nodes' , 'metadata')
+    nodes_metadata = idrac.get_nodes_metadata(nodelist, username, password)
+    
+    idrac_table_schemas = schema.build_idrac_table_schemas(metric_definitions)
+    slurm_table_schemas = schema.build_slurm_table_schemas()
+    
+
+    with psycopg2.connect(connection) as conn:
+        cur = conn.cursor()
+
+        # Create node metadata table
+        utils.print_status('Creating', 'TimeScaleDB' , 'tables')
+        metadata_sql = sql.generate_metadata_table_sql(nodes_metadata, 'nodes')
+        cur.execute(metadata_sql)
+        sql.write_nodes_metadata(conn, nodes_metadata)
+
+        # Create schema for idrac
+        idrac_sqls = sql.generate_metric_table_sqls(idrac_table_schemas, 'idrac')
+        cur.execute(idrac_sqls['schema_sql'])
+
+        # Create schema for slurm
+        slurm_sqls = sql.generate_metric_table_sqls(slurm_table_schemas, 'slurm')
+        cur.execute(slurm_sqls['schema_sql'])
+
+        # Create idrac and slurm tables
+        all_sqls = idrac_sqls['tables_sql'] + slurm_sqls['tables_sql']
+        for s in all_sqls:
+            table_name = s.split(' ')[5]
+            cur.execute(s)
+
+            # Create hypertable
+            create_hypertable_sql = "SELECT create_hypertable(" + "'" \
+                + table_name + "', 'timestamp', if_not_exists => TRUE);"
+            cur.execute(create_hypertable_sql)
+        
+        # Create table for jobs info
+        slurm_job_sql = sql.generate_slurm_job_table_sql('slurm')
+        cur.execute(slurm_job_sql['schema_sql'])
+        for s in slurm_job_sql['tables_sql']:
+            table_name = s.split(' ')[5]
+            cur.execute(s)
+        
+        # Create table for metric definitions
+        metric_def_sql = sql.generate_metric_def_table_sql()
+        cur.execute(metric_def_sql)
+        sql.write_metric_definitions(conn, metric_definitions)
+        
+        conn.commit()
+        cur.close()
+    utils.print_status('Finish', 'tables' , 'initialization!')
+
+if __name__ == '__main__':
+    init_tsdb()

+ 296 - 0
telemetry/roles/slurm_telemetry/files/monster/utils.py

@@ -0,0 +1,296 @@
+"""
+MIT License
+
+Copyright (c) 2022 Texas Tech University
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+"""
+
+"""
+This file is part of MonSter.
+
+Author:
+    Jie Li, jie.li@ttu.edu
+"""
+
+import yaml
+import logger
+import hostlist
+import psycopg2
+from pathlib import Path
+
+log = logger.get_logger(__name__)
+
+data_type_mapping = {
+    'Decimal': 'REAL',
+    'Integer': 'BIGINT',
+    'DateTime': 'TIMESTAMPTZ',
+    'Enumeration': 'TEXT',
+}
+
+DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
+
+class bcolors:
+    HEADER = '\033[95m'
+    OKBLUE = '\033[94m'
+    OKCYAN = '\033[96m'
+    OKGREEN = '\033[92m'
+    WARNING = '\033[93m'
+    FAIL = '\033[91m'
+    ENDC = '\033[0m'
+    BOLD = '\033[1m'
+    UNDERLINE = '\033[4m'
+
+
+def print_status(action: str, target: str, obj: str):
+    """print_status Print Status
+
+    Print status in a nice way
+
+    Args:
+        status (str): status
+    """
+    print(f'{action} {bcolors.OKBLUE}{target}{bcolors.ENDC} {obj}...')
+
+
+def parse_config():
+    """parse_config Parse Config
+
+    Parse configuration files
+
+    Returns:
+        dict: Configuration in json format
+    """
+    cfg = []
+    monster_path = Path(__file__).resolve().parent
+    try:
+        with open(f'{monster_path}/config.yml', 'r') as ymlfile:
+            cfg = yaml.safe_load(ymlfile)
+            return cfg
+    except Exception as err:
+        log.error(f"Parsing Configuration Error: {err}")
+
+
+def get_idrac_auth():
+    """get_idrac_auth Get iDRAC Authentication
+
+    Get username and password for accessing idrac reports
+    """
+    idrac_config = parse_config()['idrac']
+    username = idrac_config['username']
+    password = idrac_config['password']
+    return(username, password)
+
+
+def get_config(target: str):
+    """get_config Get Config
+
+    Get Configuration for the specified target 
+
+    Args:
+        target (str): configuration target
+
+    Raises:
+        ValueError: Invalid configuration target
+
+    Returns:
+        dict: configurations of specified target
+    """
+    
+    targets = ['timescaledb', 'idrac', 'slurm_rest_api']
+    if target not in targets:
+        raise ValueError(f"Invalid configuration target. Expected one of: {targets}")
+
+    config = parse_config()[target]
+    return config
+
+
+def init_tsdb_connection():
+    """init_tsdb_connection Initialize TimeScaleDB Connection
+
+    Initialize TimeScaleDB Connection according to the configuration
+    """
+    config_tsdb = parse_config()['timescaledb']
+
+    db_host = config_tsdb['host']
+    db_port = config_tsdb['port']
+    db_user = config_tsdb['username']
+    db_pswd = config_tsdb['password']
+    db_dbnm = config_tsdb['database']
+    connection = f"postgresql://{db_user}:{db_pswd}@{db_host}:{db_port}/{db_dbnm}"
+    return connection
+
+
+def get_nodelist():
+    """get_nodelist Get Nodelist
+
+    Generate the nodelist according to the configuration
+    """
+    idrac_config = parse_config()['idrac']['nodelist']
+    nodelist = []
+
+    try:
+        for i in idrac_config:
+            nodes = hostlist.expand_hostlist(i)
+            nodelist.extend(nodes)
+        
+        return nodelist
+    except Exception as err:
+        log.error(f"Cannot generate nodelist: {err}")
+
+
+def sort_tuple_list(tuple_list:list):
+    """sort_tuple Sort a list of tuple
+
+    Sort tuple. Ref: https://www.geeksforgeeks.org/python-program-to-sort-a-\
+    list-of-tuples-by-second-item/
+
+    Args:
+        tuple_list (list): a list of tuple
+    """
+    tuple_list.sort(key = lambda x: x[0])  
+    return tuple_list
+
+
+def get_os_idrac_hostname_mapping():
+    """get_os_idrac_hostname_mapping Get OS iDRAC hostname mapping
+
+    Read configuration file and get OS idrac hostname mapping if configured
+    """
+    hostnames_mapping = parse_config()['hostnames']
+    return hostnames_mapping
+
+
+def get_node_id_mapping(connection: str):
+    """get_node_id_mapping Get Node-Id Mapping
+
+    Get node-id mapping from the nodes metadata table
+
+    Args:
+        connection (str): timescaledb connection
+
+    Returns:
+        dict: node-id mapping
+    """
+    
+    mapping = {}
+    try:
+        with psycopg2.connect(connection) as conn:
+            cur = conn.cursor()
+            query = "SELECT nodeid, hostname FROM nodes"
+            cur.execute(query)
+            for (nodeid, hostname) in cur.fetchall():
+                mapping.update({
+                    hostname: nodeid
+                })
+            cur.close()
+            return mapping
+    except Exception as err:
+        log.error(f"Cannot generate node-id mapping: {err}")
+
+
+def get_metric_dtype_mapping(conn: object):
+    """get_table_dtype_mapping Get Metric-datatype mapping
+
+    Get Metric-datatype mapping from the metric definition
+
+    Args:
+        conn (object): TimeScaleDB connection object
+
+    Returns:
+        dict: Metric-datatype mapping
+    """
+    mapping = {}
+    cur = conn.cursor()
+    query = "SELECT metric_id, data_type FROM metrics_definition;"
+    cur.execute(query)
+    for (metric, data_type) in cur.fetchall():
+        mapping.update({
+            metric: data_type
+        })
+    cur.close()
+    return mapping
+
+
+def get_ip_id_mapping(conn: object):
+    """get_ip_id_mapping Get IP-ID mapping
+
+    Get iDRAC-ip address - node-id mapping
+
+    Args:
+        conn (object): TimeScaleDB connection object
+
+    Returns:
+        dict: ip-id mapping
+    """
+    mapping = {}
+    cur = conn.cursor()
+    query = "SELECT nodeid, bmc_ip_addr FROM nodes"
+    cur.execute(query)
+    for (nodeid, bmc_ip_addr) in cur.fetchall():
+        mapping.update({
+            bmc_ip_addr: nodeid
+        })
+    cur.close()
+    return mapping
+
+
+def cast_value_type(value, dtype):
+    """cast_value_type Cast Value Data Type
+
+    Cast value data type based on the datatype in TimeScaleDB
+
+    Args:
+        value ([type]): value to be casted
+        dtype ([type]): TimeScaleDB data type
+
+    Returns:
+        object: casted datatype
+    """
+    try:
+        if dtype =="BIGINT":
+            return int(value)
+        elif dtype == "REAL":
+            return float(value)
+        else:
+            return value
+    except ValueError:
+        return value
+
+
+def partition(arr:list, cores: int):
+    """
+    Partition urls/nodes into several groups based on # of cores
+    """
+    groups = []
+    try:
+        arr_len = len(arr)
+        arr_per_core = arr_len // cores
+        arr_surplus = arr_len % cores
+
+        increment = 1
+        for i in range(cores):
+            if(arr_surplus != 0 and i == (cores-1)):
+                groups.append(arr[i * arr_per_core:])
+            else:
+                groups.append(arr[i * arr_per_core : increment * arr_per_core])
+                increment += 1
+    except Exception as err:
+        log.error(f"Cannot Partition List : {err}")
+    return groups