dag1_calculate_hpc_worker.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. import json
  2. from airflow.models import DAG,Variable
  3. from airflow.operators.bash_operator import BashOperator
  4. from airflow.operators.python_operator import PythonOperator,BranchPythonOperator
  5. from airflow.contrib.operators.ssh_operator import SSHOperator
  6. from airflow.contrib.hooks.ssh_hook import SSHHook
  7. from airflow.utils.dates import days_ago
  8. from igf_airflow.check_celery_queue import fetch_queue_list_from_redis_server,airflow_utils_for_redis
  9. from igf_airflow.check_celery_queue import calculate_new_workers
  10. args = {
  11. 'owner':'airflow',
  12. 'start_date':days_ago(2),
  13. 'provide_context': True,
  14. }
  15. #hpc_hook = SSHHook(
  16. # remote_host=Variable.get('hpc_host'),
  17. # username=Variable.get('hpc_user'),
  18. # key_file=Variable.get('igf_lims_ssh_key_file')
  19. # )
  20. hpc_hook = SSHHook(ssh_conn_id='hpc_conn')
  21. dag = DAG(
  22. dag_id='dag1_calculate_hpc_worker',
  23. schedule_interval=None,
  24. default_args=args,
  25. tags=['igf-lims',]
  26. )
  27. def get_new_workers(**kwargs):
  28. try:
  29. if 'ti' not in kwargs:
  30. raise ValueError('ti not present in kwargs')
  31. ti = kwargs.get('ti')
  32. active_tasks = ti.xcom_pull(task_ids='qstat_on_remote')
  33. active_tasks = active_tasks.decode()
  34. active_tasks = json.loads(active_tasks)
  35. queued_tasks = ti.xcom_pull(task_ids='fetch_queue_list_from_redis')
  36. #print(queued_tasks,active_tasks)
  37. worker_to_submit,unique_queue_list = \
  38. calculate_new_workers(
  39. queue_list=queued_tasks,
  40. active_jobs_dict=active_tasks)
  41. print(worker_to_submit,unique_queue_list)
  42. for key,value in worker_to_submit.items():
  43. ti.xcom_push(key=key,value=value)
  44. return unique_queue_list
  45. except Exception as e:
  46. raise ValueError('Failed to get new workers, error: {0}'.format(e))
  47. with dag:
  48. fetch_queue_list_from_redis = \
  49. PythonOperator(
  50. task_id='fetch_queue_list_from_redis',
  51. dag=dag,
  52. python_callable=airflow_utils_for_redis,
  53. op_kwargs={"redis_conf_file":Variable.get('redis_conn_file')},
  54. queue='igf-lims'
  55. )
  56. check_hpc_queue = \
  57. SSHOperator(
  58. task_id='check_hpc_queue',
  59. ssh_hook=hpc_hook,
  60. dag=dag,
  61. command='source /etc/bashrc;qstat',
  62. queue='igf-lims'
  63. )
  64. fetch_active_jobs_from_hpc = \
  65. SSHOperator(
  66. task_id='fetch_active_jobs_from_hpc',
  67. ssh_hook=hpc_hook,
  68. dag=dag,
  69. command='/project/tgu/data2/airflow_test/github/igf-airflow-hpc;python scripts/hpc/count_active_jobs_in_hpc.py',
  70. do_xcom_push=True,
  71. queue='igf-lims'
  72. )
  73. calculate_new_worker_size_and_branch = \
  74. BranchPythonOperator(
  75. task_id='calculate_new_worker_size_and_branch',
  76. dag=dag,
  77. python_callable=get_new_workers,
  78. queue='igf-lims',
  79. )
  80. """
  81. 1Gb_worker = \
  82. BashOperator(
  83. task_id='1Gb',
  84. dag=dag,
  85. queue='igf-lims',
  86. bash_command='echo "{{ ti.xcom_pull(key="1Gb",task_ids="calculate_new_worker_size_and_branch") }}"'
  87. )
  88. 1Gb4t_worker = \
  89. BashOperator(
  90. task_id='1Gb4t',
  91. dag=dag,
  92. queue='igf_lims_queue',
  93. bash_command='echo "{{ ti.xcom_pull(key="1Gb4t",task_ids="calculate_new_worker_size_and_branch") }}"'
  94. )
  95. 2Gb4t_worker = \
  96. BashOperator(
  97. task_id='2Gb4t',
  98. dag=dag,
  99. queue='igf_lims_queue',
  100. bash_command='echo "{{ ti.xcom_pull(key="2Gb4t",task_ids="calculate_new_worker_size_and_branch") }}"'
  101. )
  102. 2Gb72hr_worker = \
  103. BashOperator(
  104. task_id='2Gb72hr',
  105. dag=dag,
  106. queue='igf_lims_queue',
  107. bash_command='echo "{{ ti.xcom_pull(key="2Gb72hr",task_ids="calculate_new_worker_size_and_branch") }}"'
  108. )
  109. 4Gb_worker = \
  110. BashOperator(
  111. task_id='4Gb',
  112. dag=dag,
  113. queue='igf_lims_queue',
  114. bash_command='echo "{{ ti.xcom_pull(key="4Gb",task_ids="calculate_new_worker_size_and_branch") }}"'
  115. )
  116. 4Gb8t_worker = \
  117. BashOperator(
  118. task_id='4Gb8t',
  119. dag=dag,
  120. queue='igf_lims_queue',
  121. bash_command='echo "{{ ti.xcom_pull(key="4Gb8t",task_ids="calculate_new_worker_size_and_branch") }}"'
  122. )
  123. 4Gb16t_worker = \
  124. BashOperator(
  125. task_id='4Gb16t',
  126. dag=dag,
  127. queue='igf_lims_queue',
  128. bash_command='echo "{{ ti.xcom_pull(key="4Gb16t",task_ids="calculate_new_worker_size_and_branch") }}"'
  129. )
  130. 8Gb_worker = \
  131. BashOperator(
  132. task_id='8Gb',
  133. dag=dag,
  134. queue='igf_lims_queue',
  135. bash_command='echo "{{ ti.xcom_pull(key="8Gb",task_ids="calculate_new_worker_size_and_branch") }}"'
  136. )
  137. 8Gb8t_worker = \
  138. BashOperator(
  139. task_id='8Gb8t',
  140. dag=dag,
  141. queue='igf_lims_queue',
  142. bash_command='echo "{{ ti.xcom_pull(key="8Gb8t",task_ids="calculate_new_worker_size_and_branch") }}"'
  143. )
  144. 8Gb16t_worker = \
  145. BashOperator(
  146. task_id='8Gb16t',
  147. dag=dag,
  148. queue='igf_lims_queue',
  149. bash_command='echo "{{ ti.xcom_pull(key="8Gb16t",task_ids="calculate_new_worker_size_and_branch") }}"'
  150. )
  151. 16Gb_worker = \
  152. BashOperator(
  153. task_id='16Gb',
  154. dag=dag,
  155. queue='igf_lims_queue',
  156. bash_command='echo "{{ ti.xcom_pull(key="16Gb",task_ids="calculate_new_worker_size_and_branch") }}"'
  157. )
  158. 16Gb8t_worker = \
  159. BashOperator(
  160. task_id='16Gb8t',
  161. dag=dag,
  162. queue='igf_lims_queue',
  163. bash_command='echo "{{ ti.xcom_pull(key="16Gb8t",task_ids="calculate_new_worker_size_and_branch") }}"'
  164. )
  165. 16Gb16t_worker = \
  166. BashOperator(
  167. task_id='16Gb16t',
  168. dag=dag,
  169. queue='igf_lims_queue',
  170. bash_command='echo "{{ ti.xcom_pull(key="16Gb16t",task_ids="calculate_new_worker_size_and_branch") }}"'
  171. )
  172. 32Gb8t_worker = \
  173. BashOperator(
  174. task_id='32Gb8t',
  175. dag=dag,
  176. queue='igf_lims_queue',
  177. bash_command='echo "{{ ti.xcom_pull(key="32Gb8t",task_ids="calculate_new_worker_size_and_branch") }}"'
  178. )
  179. 32Gb16t_worker = \
  180. BashOperator(
  181. task_id='32Gb16t',
  182. dag=dag,
  183. queue='igf_lims_queue',
  184. bash_command='echo "{{ ti.xcom_pull(key="32Gb16t",task_ids="calculate_new_worker_size_and_branch") }}"'
  185. )
  186. 42Gb16t_worker = \
  187. BashOperator(
  188. task_id='42Gb16t',
  189. dag=dag,
  190. queue='igf_lims_queue',
  191. bash_command='echo "{{ ti.xcom_pull(key="42Gb16t",task_ids="calculate_new_worker_size_and_branch") }}"'
  192. )
  193. 64Gb16t48hr_worker = \
  194. BashOperator(
  195. task_id='64Gb16t48hr',
  196. dag=dag,
  197. queue='igf_lims_queue',
  198. bash_command='echo "{{ ti.xcom_pull(key="64Gb16t48hr",task_ids="calculate_new_worker_size_and_branch") }}"'
  199. )
  200. 128Gb16t72hr_worker = \
  201. BashOperator(
  202. task_id='128Gb16t72hr',
  203. dag=dag,
  204. queue='igf_lims_queue',
  205. bash_command='echo "{{ ti.xcom_pull(key="128Gb16t72hr",task_ids="calculate_new_worker_size_and_branch") }}"'
  206. )
  207. """
  208. check_hpc_queue >> fetch_active_jobs_from_hpc
  209. calculate_new_worker_size_and_branch << [fetch_queue_list_from_redis,fetch_active_jobs_from_hpc]
  210. """
  211. fetch_active_jobs_from_hpc >> [1Gb_worker,
  212. 1Gb4t_worker,
  213. 2Gb4t_worker,
  214. 2Gb72hr_worker,
  215. 4Gb_worker,
  216. 4Gb8t_worker,
  217. 4Gb16t_worker,
  218. 8Gb_worker,
  219. 8Gb8t_worker,
  220. 8Gb16t_worker,
  221. 16Gb_worker,
  222. 16Gb8t_worker,
  223. 16Gb16t_worker,
  224. 32Gb8t_worker,
  225. 32Gb16t_worker,
  226. 42Gb16t_worker,
  227. 64Gb16t48hr_worker,
  228. 128Gb16t72hr_worker
  229. ]
  230. """