瀏覽代碼

fix for worker calc

Avik Datta 4 年之前
父節點
當前提交
17722213a1
共有 1 個文件被更改,包括 8 次插入5 次删除
  1. 8 5
      igf_airflow/check_celery_queue.py

+ 8 - 5
igf_airflow/check_celery_queue.py

@@ -64,10 +64,6 @@ def calculate_new_workers(queue_list,active_jobs_dict,max_workers_per_queue=10,m
         for queue_name,waiting_jobs in entry.items():
           if waiting_jobs > max_workers_per_queue:
             waiting_jobs = max_workers_per_queue
-          if waiting_jobs is not None and \
-             waiting_jobs > 0 and \
-             int(waiting_jobs/2) >= 1:
-            waiting_jobs = int(waiting_jobs/2)                                  #submit half of the workers
           active_job = active_jobs_dict.get(queue_name)
           total_running_for_queue = 0
           active_queued_job = 0
@@ -81,12 +77,19 @@ def calculate_new_workers(queue_list,active_jobs_dict,max_workers_per_queue=10,m
           if active_queued_job < 1:
             if total_running_for_queue==0 and \
                (total_active_jobs + waiting_jobs) < max_total_workers:
+              if waiting_jobs is not None and \
+                 waiting_jobs > 0 and \
+                 int(waiting_jobs/2) > 1:
+                waiting_jobs = int(waiting_jobs/2)
               worker_to_submit.\
                 update({queue_name : waiting_jobs})
-            
             if total_running_for_queue > 0:
               if waiting_jobs > total_running_for_queue:
                   waiting_jobs = waiting_jobs - total_running_for_queue
+              if waiting_jobs is not None and \
+                 waiting_jobs > 0 and \
+                 int(waiting_jobs/2) > 1:
+                waiting_jobs = int(waiting_jobs/2)
               if (total_active_jobs + waiting_jobs) < max_total_workers:
                 worker_to_submit.\
                   update({queue_name : waiting_jobs})