Browse Source

fix for worker calculation

Avik Datta 4 years ago
parent
commit
16a9077710
1 changed files with 18 additions and 2 deletions
  1. 18 2
      igf_airflow/check_celery_queue.py

+ 18 - 2
igf_airflow/check_celery_queue.py

@@ -1,4 +1,4 @@
-import redis
+import redis,json
 
 def fetch_queue_list_from_redis_server(url):
   try:
@@ -17,6 +17,22 @@ def fetch_queue_list_from_redis_server(url):
   except Exception as e:
     raise ValueError('Failed to fetch from redis server, error: {0}'.format(e))
 
+def airflow_utils_for_redis(**kwargs):
+  try:
+    if 'redis_conf_file' not in kwargs:
+      raise ValueError('redis_conf_file info is not present in the kwargs')
+
+    redis_conf_file = kwargs.get('redis_conf_file')
+    json_data = dict()
+    with open(redis_conf_file,'r') as jp:
+      json_data = json.load(jp)
+    if 'redis_db' not in json_data:
+      raise ValueError('redis_db key not present in the conf file')
+    url = json_data.get('redis_db')
+    queue_list = fetch_queue_list_from_redis_server(url=url)
+    return queue_list
+  except Exception as e:
+    raise ValueError('Failed to run, error:{0}'.format(e))
 
 def calculate_new_workers(queue_list,active_jobs_dict,max_workers_per_queue=10,max_total_workers=70):
   '''
@@ -74,7 +90,7 @@ def calculate_new_workers(queue_list,active_jobs_dict,max_workers_per_queue=10,m
           else:
             print('Not submitting new jobs for queue {0}'.format(queue_name))
     if len(worker_to_submit.keys()) > 0:
-      unique_queue_list = worker_to_submit.keys()
+      unique_queue_list = list(worker_to_submit.keys())
     return worker_to_submit,unique_queue_list
   except Exception as e:
     raise ValueError('Failed to calculate airflow worker size, error: {0}'.format(e))