dag1_calculate_hpc_worker.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. import os, json, logging, requests
  2. from datetime import timedelta
  3. from requests.auth import HTTPBasicAuth
  4. from airflow.models import DAG, Variable
  5. from airflow.utils.dates import days_ago
  6. from airflow.contrib.hooks.ssh_hook import SSHHook
  7. from airflow.operators.python_operator import PythonOperator
  8. from airflow.operators.python_operator import BranchPythonOperator
  9. from airflow.contrib.operators.ssh_operator import SSHOperator
  10. from igf_airflow.celery.check_celery_queue import fetch_queue_list_from_redis_server
  11. from igf_airflow.celery.check_celery_queue import calculate_new_workers
  12. CELERY_FLOWER_BASE_URL = Variable.get('celery_flower_base_url')
  13. args = {
  14. 'owner':'airflow',
  15. 'start_date':days_ago(2),
  16. 'retries': 1,
  17. 'retry_delay': timedelta(minutes=2),
  18. 'provide_context': True,
  19. }
  20. hpc_hook = SSHHook(ssh_conn_id='hpc_conn')
  21. dag = DAG(
  22. dag_id='dag1_calculate_hpc_worker',
  23. catchup=False,
  24. max_active_runs=1,
  25. schedule_interval="*/5 * * * *",
  26. default_args=args,
  27. tags=['igf-lims',]
  28. )
  29. def airflow_utils_for_redis(**kwargs):
  30. """
  31. A function for dag1, TO DO
  32. """
  33. try:
  34. if 'redis_conf_file' not in kwargs:
  35. raise ValueError('redis_conf_file info is not present in the kwargs')
  36. redis_conf_file = kwargs.get('redis_conf_file')
  37. json_data = dict()
  38. with open(redis_conf_file,'r') as jp:
  39. json_data = json.load(jp)
  40. if 'redis_db' not in json_data:
  41. raise ValueError('redis_db key not present in the conf file')
  42. url = json_data.get('redis_db')
  43. queue_list = fetch_queue_list_from_redis_server(url=url)
  44. return queue_list
  45. except Exception as e:
  46. logging.error('Failed to run, error:{0}'.format(e))
  47. raise
  48. def get_new_workers(**kwargs):
  49. try:
  50. if 'ti' not in kwargs:
  51. raise ValueError('ti not present in kwargs')
  52. ti = kwargs.get('ti')
  53. active_tasks = ti.xcom_pull(task_ids='fetch_active_jobs_from_hpc')
  54. active_tasks = active_tasks.decode()
  55. active_tasks = json.loads(active_tasks)
  56. queued_tasks = ti.xcom_pull(task_ids='fetch_queue_list_from_redis')
  57. worker_to_submit,unique_queue_list = \
  58. calculate_new_workers(
  59. queue_list=queued_tasks,
  60. active_jobs_dict=active_tasks,
  61. max_workers_per_queue=Variable.get('hpc_max_workers_per_queue'),
  62. max_total_workers=Variable.get('hpc_max_total_workers'))
  63. for key,value in worker_to_submit.items():
  64. ti.xcom_push(key=key,value=value)
  65. unique_queue_list = \
  66. [q for q in unique_queue_list if q.startswith('hpc')]
  67. celery_worker_key = kwargs['params'].get('celery_worker_key')
  68. base_queue = kwargs['params'].get('base_queue')
  69. empty_celery_worker_key = kwargs['params'].get('empty_celery_worker_key')
  70. celery_workers = \
  71. ti.xcom_pull(task_ids='fetch_celery_workers',key=celery_worker_key)
  72. cleanup_list = list()
  73. for flower_entry in celery_workers:
  74. active_jobs_count = flower_entry.get('active_jobs')
  75. worker_id = flower_entry.get('worker_id')
  76. queue_list = flower_entry.get('queue_lists')
  77. if base_queue not in queue_list and \
  78. len(set(queue_list).intersection(set(unique_queue_list))) == 0 and \
  79. active_jobs_count == 0:
  80. cleanup_list.\
  81. append(worker_id)
  82. if len(cleanup_list) > 0:
  83. ti.xcom_push(key=empty_celery_worker_key,value=cleanup_list)
  84. unique_queue_list.\
  85. append('cleanup_celery_workers')
  86. return unique_queue_list
  87. except Exception as e:
  88. logging.error('Failed to get new workers, error: {0}'.format(e))
  89. raise
  90. def fetch_celery_worker_list(**context):
  91. """
  92. A function for fetching list of celery workers from flower server
  93. """
  94. try:
  95. ti = context.get('ti')
  96. celery_worker_key = context['params'].get('celery_worker_key')
  97. celery_basic_auth = os.environ.get('AIRFLOW__CELERY__FLOWER_BASIC_AUTH')
  98. if celery_basic_auth is None:
  99. raise ValueError('Missing env for flower basic auth')
  100. flower_user, flower_pass = celery_basic_auth.split(':')
  101. celery_url = '{0}/api/workers'.format(CELERY_FLOWER_BASE_URL)
  102. res = requests.get(celery_url, auth=HTTPBasicAuth(flower_user, flower_pass))
  103. if res.status_code != 200:
  104. raise ValueError('Failed to fetch celery workers')
  105. data = res.content.decode()
  106. data = json.loads(data)
  107. worker_list = list()
  108. for worker_id, val in data.items():
  109. worker_list.append({
  110. 'worker_id': worker_id,
  111. 'active_jobs': len(val.get('active')),
  112. 'queue_lists': [i.get('name') for i in val.get('active_queues')]})
  113. ti.xcom_push(key=celery_worker_key,value=worker_list)
  114. except Exception as e:
  115. logging.error('Failed to get celery workers, error: {0}'.format(e))
  116. raise
  117. def stop_celery_workers(**context):
  118. """
  119. A function for stopping celery workers
  120. """
  121. try:
  122. ti = context.get('ti')
  123. empty_celery_worker_key = context['params'].get('empty_celery_worker_key')
  124. celery_basic_auth = os.environ['AIRFLOW__CELERY__FLOWER_BASIC_AUTH']
  125. flower_user, flower_pass = celery_basic_auth.split(':')
  126. celery_workers = \
  127. ti.xcom_pull(
  128. task_ids='calculate_new_worker_size_and_branch',
  129. key=empty_celery_worker_key)
  130. for worker_id in celery_workers:
  131. flower_shutdown_url = \
  132. '{0}/api/worker/shutdown/{1}'.\
  133. format(CELERY_FLOWER_BASE_URL, worker_id)
  134. res = requests.post(
  135. flower_shutdown_url,
  136. auth=HTTPBasicAuth(flower_user, flower_pass))
  137. if res.status_code != 200:
  138. raise ValueError('Failed to delete worker {0}'.\
  139. format(worker_id))
  140. except Exception as e:
  141. logging.error('Failed to stop celery workers, error: {0}'.format(e))
  142. raise
  143. with dag:
  144. ## TASK
  145. fetch_queue_list_from_redis = \
  146. PythonOperator(
  147. task_id='fetch_queue_list_from_redis',
  148. dag=dag,
  149. python_callable=airflow_utils_for_redis,
  150. op_kwargs={"redis_conf_file":Variable.get('redis_conn_file')},
  151. queue='generic')
  152. ## TASK
  153. check_hpc_queue = \
  154. SSHOperator(
  155. task_id='check_hpc_queue',
  156. ssh_hook=hpc_hook,
  157. dag=dag,
  158. command='source /etc/bashrc;qstat',
  159. queue='generic')
  160. ## TASK
  161. fetch_active_jobs_from_hpc = \
  162. SSHOperator(
  163. task_id='fetch_active_jobs_from_hpc',
  164. ssh_hook=hpc_hook,
  165. dag=dag,
  166. command="""
  167. source /etc/bashrc;\
  168. source /project/tgu/data2/airflow_test/secrets/hpc_env.sh;\
  169. python /project/tgu/data2/airflow_test/github/data-management-python/scripts/hpc/count_active_jobs_in_hpc.py """,
  170. do_xcom_push=True,
  171. queue='generic')
  172. ## TASK
  173. fetch_celery_workers = \
  174. PythonOperator(
  175. task_id='fetch_celery_workers',
  176. dag=dag,
  177. queue='generic',
  178. python_callable=fetch_celery_worker_list,
  179. params={'celery_worker_key':'celery_workers'}
  180. )
  181. ## TASK
  182. calculate_new_worker_size_and_branch = \
  183. BranchPythonOperator(
  184. task_id='calculate_new_worker_size_and_branch',
  185. dag=dag,
  186. python_callable=get_new_workers,
  187. queue='generic',
  188. params={'celery_worker_key':'celery_workers',
  189. 'empty_celery_worker_key':'empty_celery_worker',
  190. 'base_queue':'generic'})
  191. ## TASK
  192. queue_tasks = list()
  193. hpc_queue_list = Variable.get('hpc_queue_list')
  194. for q,data in hpc_queue_list.items():
  195. pbs_resource = data.get('pbs_resource')
  196. airflow_queue = data.get('airflow_queue')
  197. t = SSHOperator(
  198. task_id=q,
  199. ssh_hook=hpc_hook,
  200. dag=dag,
  201. queue='generic',
  202. command="""
  203. {% if ti.xcom_pull(key=params.job_name,task_ids="calculate_new_worker_size_and_branch" ) > 1 %}
  204. source /etc/bashrc; \
  205. qsub \
  206. -o /dev/null \
  207. -e /dev/null \
  208. -k n -m n \
  209. -N {{ params.job_name }} \
  210. -J 1-{{ ti.xcom_pull(key=params.job_name,task_ids="calculate_new_worker_size_and_branch" ) }} {{ params.pbs_resource }} -- \
  211. /project/tgu/data2/airflow_test/github/data-management-python/scripts/hpc/airflow_worker.sh {{ params.airflow_queue }} {{ params.job_name }}
  212. {% else %}
  213. source /etc/bashrc;\
  214. qsub \
  215. -o /dev/null \
  216. -e /dev/null \
  217. -k n -m n \
  218. -N {{ params.job_name }} {{ params.pbs_resource }} -- \
  219. /project/tgu/data2/airflow_test/github/data-management-python/scripts/hpc/airflow_worker.sh {{ params.airflow_queue }} {{ params.job_name }}
  220. {% endif %}
  221. """,
  222. params={'pbs_resource':pbs_resource,
  223. 'airflow_queue':airflow_queue,
  224. 'job_name':q})
  225. queue_tasks.\
  226. append(t)
  227. ## TASK
  228. cleanup_celery_workers = \
  229. PythonOperator(
  230. task_id='cleanup_celery_workers',
  231. dag=dag,
  232. queue='generic',
  233. params={'empty_celery_worker_key':'empty_celery_worker'},
  234. python_callable=stop_celery_workers)
  235. ## PIPELINE
  236. check_hpc_queue >> fetch_active_jobs_from_hpc
  237. calculate_new_worker_size_and_branch << \
  238. [fetch_queue_list_from_redis,
  239. fetch_active_jobs_from_hpc,
  240. fetch_celery_workers]
  241. calculate_new_worker_size_and_branch >> queue_tasks
  242. calculate_new_worker_size_and_branch >> cleanup_celery_workers