dag1_calculate_hpc_worker.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. import json
  2. from airflow.models import DAG,Variable
  3. from airflow.operators.bash_operator import BashOperator
  4. from airflow.operators.python_operator import PythonOperator,BranchPythonOperator
  5. from airflow.contrib.operators.ssh_operator import SSHOperator
  6. from airflow.contrib.hooks.ssh_hook import SSHHook
  7. from airflow.utils.dates import days_ago
  8. from igf_airflow.check_celery_queue import fetch_queue_list_from_redis_server,airflow_utils_for_redis
  9. from igf_airflow.check_celery_queue import calculate_new_workers
  10. args = {
  11. 'owner':'airflow',
  12. 'start_date':days_ago(2),
  13. 'provide_context': True,
  14. }
  15. hpc_hook = SSHHook(
  16. remote_host=Variable.get('hpc_host'),
  17. username=Variable.get('hpc_user'),
  18. key_file=Variable.get('igf_lims_ssh_key_file')
  19. )
  20. dag = DAG(
  21. dag_id='dag1_calculate_hpc_worker',
  22. schedule_interval=None,
  23. default_args=args,
  24. tags=['igf-lims',]
  25. )
  26. def get_new_workers(**kwargs):
  27. try:
  28. if 'ti' not in kwargs:
  29. raise ValueError('ti not present in kwargs')
  30. ti = kwargs.get('ti')
  31. active_tasks = ti.xcom_pull(task_ids='qstat_on_remote')
  32. active_tasks = active_tasks.decode()
  33. active_tasks = json.loads(active_tasks)
  34. queued_tasks = ti.xcom_pull(task_ids='fetch_queue_list_from_redis')
  35. #print(queued_tasks,active_tasks)
  36. worker_to_submit,unique_queue_list = \
  37. calculate_new_workers(
  38. queue_list=queued_tasks,
  39. active_jobs_dict=active_tasks)
  40. print(worker_to_submit,unique_queue_list)
  41. for key,value in worker_to_submit.items():
  42. ti.xcom_push(key=key,value=value)
  43. return unique_queue_list
  44. except Exception as e:
  45. raise ValueError('Failed to get new workers, error: {0}'.format(e))
  46. with dag:
  47. fetch_queue_list_from_redis = \
  48. PythonOperator(
  49. task_id='fetch_queue_list_from_redis',
  50. dag=dag,
  51. python_callable=airflow_utils_for_redis,
  52. op_kwargs={"redis_conf_file":Variable.get('redis_conf_file')},
  53. queue='igf_lims_queue'
  54. )
  55. check_hpc_queue = \
  56. SSHOperator(
  57. task_id='fetch_active_jobs_from_hpc',
  58. ssh_hook=hpc_hook,
  59. dag=dag,
  60. command='qstat',
  61. queue='igf_lims_queue'
  62. )
  63. fetch_active_jobs_from_hpc = \
  64. SSHOperator(
  65. task_id='fetch_active_jobs_from_hpc',
  66. ssh_hook=hpc_hook,
  67. dag=dag,
  68. command='python /path/count_job_script.py',
  69. do_xcom_push=True,
  70. queue='igf_lims_queue'
  71. )
  72. calculate_new_worker_size_and_branch = \
  73. BranchPythonOperator(
  74. task_id='calculate_new_worker_size_and_branch',
  75. dag=dag,
  76. python_callable=get_new_workers,
  77. queue='igf_lims_queue',
  78. )
  79. 1Gb_worker = \
  80. BashOperator(
  81. task_id='1Gb',
  82. dag=dag,
  83. queue='igf_lims_queue',
  84. bash_command='echo "{{ ti.xcom_pull(key="1Gb",task_ids="calculate_new_worker_size_and_branch") }}"'
  85. )
  86. 1Gb4t_worker = \
  87. BashOperator(
  88. task_id='1Gb4t',
  89. dag=dag,
  90. queue='igf_lims_queue',
  91. bash_command='echo "{{ ti.xcom_pull(key="1Gb4t",task_ids="calculate_new_worker_size_and_branch") }}"'
  92. )
  93. 2Gb4t_worker = \
  94. BashOperator(
  95. task_id='2Gb4t',
  96. dag=dag,
  97. queue='igf_lims_queue',
  98. bash_command='echo "{{ ti.xcom_pull(key="2Gb4t",task_ids="calculate_new_worker_size_and_branch") }}"'
  99. )
  100. 2Gb72hr_worker = \
  101. BashOperator(
  102. task_id='2Gb72hr',
  103. dag=dag,
  104. queue='igf_lims_queue',
  105. bash_command='echo "{{ ti.xcom_pull(key="2Gb72hr",task_ids="calculate_new_worker_size_and_branch") }}"'
  106. )
  107. 4Gb_worker = \
  108. BashOperator(
  109. task_id='4Gb',
  110. dag=dag,
  111. queue='igf_lims_queue',
  112. bash_command='echo "{{ ti.xcom_pull(key="4Gb",task_ids="calculate_new_worker_size_and_branch") }}"'
  113. )
  114. 4Gb8t_worker = \
  115. BashOperator(
  116. task_id='4Gb8t',
  117. dag=dag,
  118. queue='igf_lims_queue',
  119. bash_command='echo "{{ ti.xcom_pull(key="4Gb8t",task_ids="calculate_new_worker_size_and_branch") }}"'
  120. )
  121. 4Gb16t_worker = \
  122. BashOperator(
  123. task_id='4Gb16t',
  124. dag=dag,
  125. queue='igf_lims_queue',
  126. bash_command='echo "{{ ti.xcom_pull(key="4Gb16t",task_ids="calculate_new_worker_size_and_branch") }}"'
  127. )
  128. 8Gb_worker = \
  129. BashOperator(
  130. task_id='8Gb',
  131. dag=dag,
  132. queue='igf_lims_queue',
  133. bash_command='echo "{{ ti.xcom_pull(key="8Gb",task_ids="calculate_new_worker_size_and_branch") }}"'
  134. )
  135. 8Gb8t_worker = \
  136. BashOperator(
  137. task_id='8Gb8t',
  138. dag=dag,
  139. queue='igf_lims_queue',
  140. bash_command='echo "{{ ti.xcom_pull(key="8Gb8t",task_ids="calculate_new_worker_size_and_branch") }}"'
  141. )
  142. 8Gb16t_worker = \
  143. BashOperator(
  144. task_id='8Gb16t',
  145. dag=dag,
  146. queue='igf_lims_queue',
  147. bash_command='echo "{{ ti.xcom_pull(key="8Gb16t",task_ids="calculate_new_worker_size_and_branch") }}"'
  148. )
  149. 16Gb_worker = \
  150. BashOperator(
  151. task_id='16Gb',
  152. dag=dag,
  153. queue='igf_lims_queue',
  154. bash_command='echo "{{ ti.xcom_pull(key="16Gb",task_ids="calculate_new_worker_size_and_branch") }}"'
  155. )
  156. 16Gb8t_worker = \
  157. BashOperator(
  158. task_id='16Gb8t',
  159. dag=dag,
  160. queue='igf_lims_queue',
  161. bash_command='echo "{{ ti.xcom_pull(key="16Gb8t",task_ids="calculate_new_worker_size_and_branch") }}"'
  162. )
  163. 16Gb16t_worker = \
  164. BashOperator(
  165. task_id='16Gb16t',
  166. dag=dag,
  167. queue='igf_lims_queue',
  168. bash_command='echo "{{ ti.xcom_pull(key="16Gb16t",task_ids="calculate_new_worker_size_and_branch") }}"'
  169. )
  170. 32Gb8t_worker = \
  171. BashOperator(
  172. task_id='32Gb8t',
  173. dag=dag,
  174. queue='igf_lims_queue',
  175. bash_command='echo "{{ ti.xcom_pull(key="32Gb8t",task_ids="calculate_new_worker_size_and_branch") }}"'
  176. )
  177. 32Gb16t_worker = \
  178. BashOperator(
  179. task_id='32Gb16t',
  180. dag=dag,
  181. queue='igf_lims_queue',
  182. bash_command='echo "{{ ti.xcom_pull(key="32Gb16t",task_ids="calculate_new_worker_size_and_branch") }}"'
  183. )
  184. 42Gb16t_worker = \
  185. BashOperator(
  186. task_id='42Gb16t',
  187. dag=dag,
  188. queue='igf_lims_queue',
  189. bash_command='echo "{{ ti.xcom_pull(key="42Gb16t",task_ids="calculate_new_worker_size_and_branch") }}"'
  190. )
  191. 64Gb16t48hr_worker = \
  192. BashOperator(
  193. task_id='64Gb16t48hr',
  194. dag=dag,
  195. queue='igf_lims_queue',
  196. bash_command='echo "{{ ti.xcom_pull(key="64Gb16t48hr",task_ids="calculate_new_worker_size_and_branch") }}"'
  197. )
  198. 128Gb16t72hr_worker = \
  199. BashOperator(
  200. task_id='128Gb16t72hr',
  201. dag=dag,
  202. queue='igf_lims_queue',
  203. bash_command='echo "{{ ti.xcom_pull(key="128Gb16t72hr",task_ids="calculate_new_worker_size_and_branch") }}"'
  204. )
  205. check_hpc_queue >> fetch_active_jobs_from_hpc
  206. calculate_new_worker_size_and_branch << [fetch_queue_list_from_redis,fetch_active_jobs_from_hpc]
  207. fetch_active_jobs_from_hpc >> [1Gb_worker,
  208. 1Gb4t_worker,
  209. 2Gb4t_worker,
  210. 2Gb72hr_worker,
  211. 4Gb_worker,
  212. 4Gb8t_worker,
  213. 4Gb16t_worker,
  214. 8Gb_worker,
  215. 8Gb8t_worker,
  216. 8Gb16t_worker,
  217. 16Gb_worker,
  218. 16Gb8t_worker,
  219. 16Gb16t_worker,
  220. 32Gb8t_worker,
  221. 32Gb16t_worker,
  222. 42Gb16t_worker,
  223. 64Gb16t48hr_worker,
  224. 128Gb16t72hr_worker
  225. ]