Przeglądaj źródła

added celery worker fetching task

Avik Datta 3 lat temu
rodzic
commit
d6f01f6aac
1 zmienionych plików z 44 dodań i 3 usunięć
  1. 44 3
      dags/dag1_calculate_hpc_worker.py

+ 44 - 3
dags/dag1_calculate_hpc_worker.py

@@ -1,13 +1,16 @@
-import json,logging
+import os, json, logging, requests
 from airflow.models import DAG,Variable
 from airflow.operators.bash_operator import BashOperator
-from airflow.operators.python_operator import PythonOperator,BranchPythonOperator
+from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
 from airflow.contrib.operators.ssh_operator import SSHOperator
 from airflow.contrib.hooks.ssh_hook import SSHHook
 from airflow.utils.dates import days_ago
+from requests.auth import HTTPBasicAuth
 from igf_airflow.celery.check_celery_queue import fetch_queue_list_from_redis_server
 from igf_airflow.celery.check_celery_queue import calculate_new_workers
 
+CELERY_FLOWER_BASE_URL = Variable.get('celery_flower_base_url')
+
 args = {
     'owner':'airflow',
     'start_date':days_ago(2),
@@ -73,6 +76,35 @@ def get_new_workers(**kwargs):
     raise
 
 
+def fetch_celery_worker_list(**context):
+  """
+  A function for fetching list of celery workers from flower server
+  """
+  try:
+    ti = context.get('ti')
+    celery_worker_key = context['params'].get('celery_worker_key')
+    celery_basic_auth = os.environ.get('AIRFLOW__CELERY__FLOWER_BASIC_AUTH')
+    if celery_basic_auth is None:
+      raise ValueError('Missing env for flower basic auth')
+    flower_user, flower_pass = celery_basic_auth.split(':')
+    celery_url = '{0}/api/workers'.format(CELERY_FLOWER_BASE_URL)
+    res = requests.get(celery_url, auth=HTTPBasicAuth(flower_user, flower_pass))
+    if res.status_code != 200:
+      raise ValueError('Failed to fetch celery workers')
+    data = res.content.decode()
+    data = json.loads(data)
+    worker_list = list()
+    for worker_id, val in data.items():
+      worker_list.append({
+        'worker_id': worker_id, 
+        'active_jobs': len(val.get('active')), 
+        'queue_lists': [i.get('name') for i in val.get('active_queues')]})
+    ti.xcom_push(key=celery_worker_key,value=worker_list)
+  except Exception as e:
+    logging.error('Failed to get celery workers, error: {0}'.format(e))
+    raise
+
+
 with dag:
   ## TASK
   fetch_queue_list_from_redis = \
@@ -103,6 +135,14 @@ with dag:
       do_xcom_push=True,
       queue='igf-lims')
   ## TASK
+  fetch_celery_workers = \
+    PythonOperator(
+      task_id='fetch_celery_workers',
+      dag=dag,
+      python_callable=fetch_celery_worker_list,
+      params={'celery_worker_key':'celery_workers'}
+    )
+  ## TASK
   calculate_new_worker_size_and_branch = \
     BranchPythonOperator(
       task_id='calculate_new_worker_size_and_branch',
@@ -150,5 +190,6 @@ with dag:
   check_hpc_queue >> fetch_active_jobs_from_hpc
   calculate_new_worker_size_and_branch << \
     [fetch_queue_list_from_redis,
-     fetch_active_jobs_from_hpc]
+     fetch_active_jobs_from_hpc,
+     fetch_celery_workers]
   calculate_new_worker_size_and_branch >> queue_tasks