소스 검색

added worker clean up task

Avik Datta 3 년 전
부모
커밋
97b1e5a071
1개의 변경된 파일61개의 추가작업 그리고 2개의 파일을 삭제
  1. 61 2
      dags/dag1_calculate_hpc_worker.py

+ 61 - 2
dags/dag1_calculate_hpc_worker.py

@@ -70,6 +70,25 @@ def get_new_workers(**kwargs):
       ti.xcom_push(key=key,value=value)
     unique_queue_list = \
       [q for q in unique_queue_list if q.startswith('hpc')]
+    celery_worker_key = kwargs['params'].get('celery_worker_key')
+    base_queue = kwargs['params'].get('base_queue')
+    empty_celery_worker_key = kwargs['params'].get('empty_celery_worker_key')
+    celery_workers = \
+      ti.xcom_pull(task_ids='fetch_celery_workers',key=celery_worker_key)
+    cleanup_list = list()
+    for flower_entry in celery_workers:
+      active_jobs_count = flower_entry.get('active_jobs')
+      worker_id = flower_entry.get('worker_id')
+      queue_list = flower_entry.get('queue_lists')
+      if base_queue not in queue_list and \
+         len(set(queue_list).intersection(set(unique_queue_list))) == 0 and \
+         active_jobs_count == 0:
+        cleanup_list.\
+				  append(worker_id)
+    if len(cleanup_list) > 0:
+      ti.xcom_push(key=empty_celery_worker_key,value=cleanup_list)
+      unique_queue_list.\
+        append('cleanup_celery_workers')
     return unique_queue_list
   except Exception as e:
     logging.error('Failed to get new workers, error: {0}'.format(e))
@@ -105,6 +124,34 @@ def fetch_celery_worker_list(**context):
     raise
 
 
+def stop_celery_workers(**context):
+  """
+  A function for stopping celery workers
+  """
+  try:
+    ti = context.get('ti')
+    empty_celery_worker_key = context['params'].get('empty_celery_worker_key')
+    celery_basic_auth = os.environ['AIRFLOW__CELERY__FLOWER_BASIC_AUTH']
+    flower_user, flower_pass = celery_basic_auth.split(':')
+    celery_workers = \
+		  ti.xcom_pull(
+        task_ids='calculate_new_worker_size_and_branch',
+        key=empty_celery_worker_key)
+    for worker_id in celery_workers:
+      flower_shutdown_url = \
+        '{0}/api/worker/shutdown/{1}'.\
+          format(CELERY_FLOWER_BASE_URL, worker_id)
+      res = requests.post(
+              flower_shutdown_url,
+              auth=HTTPBasicAuth(flower_user, flower_pass))
+      if res.status_code != 200:
+        raise ValueError('Failed to delete worker {0}'.\
+          format(worker_id))
+  except Exception as e:
+    logging.error('Failed to stop celery workers, error: {0}'.format(e))
+    raise
+
+
 with dag:
   ## TASK
   fetch_queue_list_from_redis = \
@@ -149,7 +196,10 @@ with dag:
       task_id='calculate_new_worker_size_and_branch',
       dag=dag,
       python_callable=get_new_workers,
-      queue='igf-lims')
+      queue='igf-lims',
+      params={'celery_worker_key':'celery_workers',
+              'empty_celery_worker_key':'empty_celery_worker',
+              'base_queue':'igf-lims'})
   ## TASK
   queue_tasks = list()
   hpc_queue_list = Variable.get('hpc_queue_list')
@@ -187,10 +237,19 @@ with dag:
     queue_tasks.\
       append(t)
 
+  ## TASK
+  cleanup_celery_workers = \
+    PythonOperator(
+      task_id='cleanup_celery_workers',
+      dag=dag,
+      queue='igf-lims',
+      params={'empty_celery_worker_key':'empty_celery_worker'},
+      python_callable=stop_celery_workers)
   ## PIPELINE
   check_hpc_queue >> fetch_active_jobs_from_hpc
   calculate_new_worker_size_and_branch << \
     [fetch_queue_list_from_redis,
      fetch_active_jobs_from_hpc,
      fetch_celery_workers]
-  calculate_new_worker_size_and_branch >> queue_tasks
+  calculate_new_worker_size_and_branch >> queue_tasks
+  calculate_new_worker_size_and_branch >> cleanup_celery_workers