Przeglądaj źródła

simplifiesd job counts

Avik Datta 3 lat temu
rodzic
commit
f83ab9a270

+ 1 - 1
dags/dag1_calculate_hpc_worker.py

@@ -37,7 +37,7 @@ def get_new_workers(**kwargs):
       calculate_new_workers(
         queue_list=queued_tasks,
         active_jobs_dict=active_tasks,
-        max_workers_per_queue=6,
+        max_workers_per_queue=3,
         max_total_workers=70)
     for key,value in worker_to_submit.items():
       ti.xcom_push(key=key,value=value)

+ 0 - 8
igf_airflow/check_celery_queue.py

@@ -77,19 +77,11 @@ def calculate_new_workers(queue_list,active_jobs_dict,max_workers_per_queue=10,m
           if active_queued_job < 1:
             if total_running_for_queue==0 and \
                (total_active_jobs + waiting_jobs) < max_total_workers:
-              if waiting_jobs is not None and \
-                 waiting_jobs > 0 and \
-                 int(waiting_jobs/2) > 1:
-                waiting_jobs = int(waiting_jobs/2)
               worker_to_submit.\
                 update({queue_name : waiting_jobs})
             if total_running_for_queue > 0:
               if waiting_jobs > total_running_for_queue:
                   waiting_jobs = waiting_jobs - total_running_for_queue
-              if waiting_jobs is not None and \
-                 waiting_jobs > 0 and \
-                 int(waiting_jobs/2) > 1:
-                waiting_jobs = int(waiting_jobs/2)
               if (total_active_jobs + waiting_jobs) < max_total_workers:
                 worker_to_submit.\
                   update({queue_name : waiting_jobs})