Selaa lähdekoodia

updated worker size calculation for airflow

Avik Datta 4 vuotta sitten
vanhempi
commit
383fbe6c67
1 muutettua tiedostoa jossa 14 lisäystä ja 9 poistoa
  1. 14 9
      igf_airflow/check_celery_queue.py

+ 14 - 9
igf_airflow/check_celery_queue.py

@@ -24,11 +24,14 @@ def calculate_new_workers(queue_list,active_jobs_dict,max_workers_per_queue=10,m
                            {queue_name:{job_state:job_count}}
   :param max_workers_per_queue: Max allowed worker per queue, default 10
   :param max_total_workers: Max total worker for the queue, default 70
-  :returns: A list of dictionary containing all the target jobs
-            [{queue_name:target_job_counts}]
+  :returns: A dictionary containing all the target jobs
+            {queue_name:target_job_counts}
+            and a list of unique queue names
+            [queue_name]
   '''
   try:
-    worker_to_submit = list()
+    worker_to_submit = dict()
+    unique_queue_list = list()
     total_active_jobs = 0
     for _,job_data in active_jobs_dict.items():
       for job_state,job_count in job_data.items():
@@ -37,9 +40,8 @@ def calculate_new_workers(queue_list,active_jobs_dict,max_workers_per_queue=10,m
     if isinstance(queue_list,list) and \
        len(queue_list) > 0 and \
        total_active_jobs < max_total_workers:
-      for entry in queue_list:
+      for entry in queue_list:                         # this list should be unique
         for queue_name,waiting_jobs in entry.items():
-          target_job = dict()
           if waiting_jobs > max_workers_per_queue:
             waiting_jobs = max_workers_per_queue
           active_job = active_jobs_dict.get(queue_name)
@@ -55,17 +57,20 @@ def calculate_new_workers(queue_list,active_jobs_dict,max_workers_per_queue=10,m
           if active_queued_job < 1:
             if total_running_for_queue==0 and \
                (total_active_jobs + waiting_jobs) < max_total_workers:
-              target_job.update({queue_name : waiting_jobs})
+              worker_to_submit.\
+                update({queue_name : waiting_jobs})
             
             if total_running_for_queue > 0:
               if waiting_jobs > total_running_for_queue:
                   waiting_jobs = waiting_jobs - total_running_for_queue
               if (total_active_jobs + waiting_jobs) < max_total_workers:
-                target_job.update({queue_name:waiting_jobs})
+                worker_to_submit.\
+                  update({queue_name : waiting_jobs})
                 total_active_jobs += waiting_jobs
           else:
             print('Not submitting new jobs for queue {0}'.format(queue_name))
-          worker_to_submit.append(target_job)
-    return worker_to_submit
+    if len(worker_to_submit.keys()) > 0:
+      unique_queue_list = worker_to_submit.keys()
+    return worker_to_submit,unique_queue_list
   except Exception as e:
     raise ValueError('Failed to calculate airflow worker size, error: {0}'.format(e))