Ver código fonte

added job checking scripts

Avik Datta 4 anos atrás
pai
commit
ce9edb4417

+ 0 - 0
igf_airflow/__init__.py


+ 71 - 0
igf_airflow/check_celery_queue.py

@@ -0,0 +1,71 @@
+import redis
+
+def fetch_queue_list_from_redis_server(url):
+  try:
+    queue_list = list()
+    r = redis.from_url(url)
+    for i in r.keys():
+      queue = i.decode() 
+      if not queue.startswith('_'):
+        q_len = r.llen(queue)
+        queue_list.append({queue:q_len})
+    return queue_list
+  except Exception as e:
+    raise ValueError('Failed to fetch from redis server, error: {0}'.format(e))
+
+
+def calculate_new_workers(queue_list,active_jobs_dict,max_workers_per_queue=10,max_total_workers=70):
+  '''
+  A function for calculating new worker size
+
+  :param queue_list: A list dictionary containing all the queued jobs
+                      [{queue_name:job_count}]
+  :param active_jobs_dict: A dictionary containing all job counts for each queues
+                           {queue_name:{job_state:job_count}}
+  :param max_workers_per_queue: Max allowed worker per queue, default 10
+  :param max_total_workers: Max total worker for the queue, default 70
+  :returns: A list of dictionary containing all the target jobs
+            [{queue_name:target_job_counts}]
+  '''
+  try:
+    worker_to_submit = list()
+    total_active_jobs = 0
+    for _,job_data in active_jobs_dict.items():
+      for job_state,job_count in job_data.items():
+        if job_state in ('Q','R'):
+          total_active_jobs += job_count
+    if isinstance(queue_list,list) and \
+       len(queue_list) > 0 and \
+       total_active_jobs < max_total_workers:
+      for entry in queue_list:
+        for queue_name,waiting_jobs in entry.items():
+          target_job = dict()
+          if waiting_jobs > max_workers_per_queue:
+            waiting_jobs = max_workers_per_queue
+          active_job = active_jobs_dict.get(queue_name)
+          total_running_for_queue = 0
+          active_queued_job = 0
+          if active_job is not None:
+            for job_state,job_counts in active_job.items():
+              if job_state in ('Q','R'):
+                total_running_for_queue += job_counts
+              if job_state == 'Q':
+                active_queued_job += job_counts
+
+          if active_queued_job < 1:
+            if total_running_for_queue==0 and \
+               (total_active_jobs + waiting_jobs) < max_total_workers:
+              target_job.update({queue_name : waiting_jobs})
+            
+            if total_running_for_queue > 0:
+              if waiting_jobs > total_running_for_queue:
+                  waiting_jobs = waiting_jobs - total_running_for_queue
+              if (total_active_jobs + waiting_jobs) < max_total_workers:
+                target_job.update({queue_name:waiting_jobs})
+                total_active_jobs += waiting_jobs
+          else:
+            print('Not submitting new jobs for queue {0}'.format(queue_name))
+          worker_to_submit.append(target_job)
+    return worker_to_submit
+  except Exception as e:
+    raise ValueError('Failed to calculate airflow worker size, error: {0}'.format(e))

+ 38 - 0
igf_airflow/hpc_queue.py

@@ -0,0 +1,38 @@
+import json
+import subprocess
+from collections import defaultdict
+from tempfile import TemporaryFile
+
+def get_pbspro_job_count(job_name_prefix=''):
+  '''
+  A function for fetching running and queued job information from a PBSPro HPC cluster
+
+  :param job_name_prefix: A text to filter running jobs, default ''
+  :returns: A defaultdict object with the following structure
+            { job_name: {'Q': counts, 'R': counts }}
+  '''
+  try:
+    with TemporaryFile() as tmp_file:
+      subprocess.\
+        check_call(
+          ['qstat','-f','-F','json'],
+          stdout=tmp_file)
+      tmp_file.seek(0)
+      json_data = tmp_file.read()
+      json_data = json.loads(json_data)
+    jobs = json_data.get('Jobs')
+    active_jobs = dict()
+    if jobs is not None:
+      active_jobs = defaultdict(lambda: defaultdict(int))
+      if len(jobs) > 0:
+        for _,job_data in jobs.items():
+          job_name = job_data.get('Job_Name')
+          job_state = job_data.get('job_state')
+          if job_name.startswith(job_name_prefix):
+            if job_state == 'Q':
+              active_jobs[job_name]['Q'] += 1
+            if job_state == 'R':
+              active_jobs[job_name]['R'] += 1
+    return active_jobs
+  except Exception as e:
+    raise ValueError('Failed to get job counts from hpc, error: {0}'.format(e))

+ 2 - 0
requirements.txt

@@ -0,0 +1,2 @@
+redis==3.5.3
+apache-airflow[postgres,redis,slack,celery]==1.10.12