dag1_calculate_hpc_worker.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. import json
  2. from airflow.models import DAG,Variable
  3. from airflow.operators.bash_operator import BashOperator
  4. from airflow.operators.python_operator import PythonOperator,BranchPythonOperator
  5. from airflow.contrib.operators.ssh_operator import SSHOperator
  6. from airflow.contrib.hooks.ssh_hook import SSHHook
  7. from airflow.utils.dates import days_ago
  8. from igf_airflow.check_celery_queue import fetch_queue_list_from_redis_server,airflow_utils_for_redis
  9. from igf_airflow.check_celery_queue import calculate_new_workers
  10. args = {
  11. 'owner':'airflow',
  12. 'start_date':days_ago(2),
  13. 'provide_context': True,
  14. }
  15. hpc_hook = SSHHook(
  16. remote_host=Variable.get('hpc_host'),
  17. username=Variable.get('hpc_user'),
  18. key_file=Variable.get('igf_lims_ssh_key_file')
  19. )
  20. dag = DAG(
  21. dag_id='dag1_calculate_hpc_worker',
  22. schedule_interval=None,
  23. default_args=args,
  24. tags=['igf-lims',]
  25. )
  26. def get_new_workers(**kwargs):
  27. try:
  28. if 'ti' not in kwargs:
  29. raise ValueError('ti not present in kwargs')
  30. ti = kwargs.get('ti')
  31. active_tasks = ti.xcom_pull(task_ids='qstat_on_remote')
  32. active_tasks = active_tasks.decode()
  33. active_tasks = json.loads(active_tasks)
  34. queued_tasks = ti.xcom_pull(task_ids='fetch_queue_list_from_redis')
  35. #print(queued_tasks,active_tasks)
  36. worker_to_submit,unique_queue_list = \
  37. calculate_new_workers(
  38. queue_list=queued_tasks,
  39. active_jobs_dict=active_tasks)
  40. print(worker_to_submit,unique_queue_list)
  41. for key,value in worker_to_submit.items():
  42. ti.xcom_push(key=key,value=value)
  43. return unique_queue_list
  44. except Exception as e:
  45. raise ValueError('Failed to get new workers, error: {0}'.format(e))
  46. with dag:
  47. fetch_queue_list_from_redis = \
  48. PythonOperator(
  49. task_id='fetch_queue_list_from_redis',
  50. dag=dag,
  51. python_callable=airflow_utils_for_redis,
  52. op_kwargs={"redis_conf_file":Variable.get('redis_conf_file')},
  53. queue='igf-lims'
  54. )
  55. check_hpc_queue = \
  56. SSHOperator(
  57. task_id='fetch_active_jobs_from_hpc',
  58. ssh_hook=hpc_hook,
  59. dag=dag,
  60. command='qstat',
  61. queue='igf-lims'
  62. )
  63. fetch_active_jobs_from_hpc = \
  64. SSHOperator(
  65. task_id='fetch_active_jobs_from_hpc',
  66. ssh_hook=hpc_hook,
  67. dag=dag,
  68. command='python /path/count_job_script.py',
  69. do_xcom_push=True,
  70. queue='igf-lims'
  71. )
  72. calculate_new_worker_size_and_branch = \
  73. BranchPythonOperator(
  74. task_id='calculate_new_worker_size_and_branch',
  75. dag=dag,
  76. python_callable=get_new_workers,
  77. queue='igf-lims',
  78. )
  79. """
  80. 1Gb_worker = \
  81. BashOperator(
  82. task_id='1Gb',
  83. dag=dag,
  84. queue='igf-lims',
  85. bash_command='echo "{{ ti.xcom_pull(key="1Gb",task_ids="calculate_new_worker_size_and_branch") }}"'
  86. )
  87. 1Gb4t_worker = \
  88. BashOperator(
  89. task_id='1Gb4t',
  90. dag=dag,
  91. queue='igf_lims_queue',
  92. bash_command='echo "{{ ti.xcom_pull(key="1Gb4t",task_ids="calculate_new_worker_size_and_branch") }}"'
  93. )
  94. 2Gb4t_worker = \
  95. BashOperator(
  96. task_id='2Gb4t',
  97. dag=dag,
  98. queue='igf_lims_queue',
  99. bash_command='echo "{{ ti.xcom_pull(key="2Gb4t",task_ids="calculate_new_worker_size_and_branch") }}"'
  100. )
  101. 2Gb72hr_worker = \
  102. BashOperator(
  103. task_id='2Gb72hr',
  104. dag=dag,
  105. queue='igf_lims_queue',
  106. bash_command='echo "{{ ti.xcom_pull(key="2Gb72hr",task_ids="calculate_new_worker_size_and_branch") }}"'
  107. )
  108. 4Gb_worker = \
  109. BashOperator(
  110. task_id='4Gb',
  111. dag=dag,
  112. queue='igf_lims_queue',
  113. bash_command='echo "{{ ti.xcom_pull(key="4Gb",task_ids="calculate_new_worker_size_and_branch") }}"'
  114. )
  115. 4Gb8t_worker = \
  116. BashOperator(
  117. task_id='4Gb8t',
  118. dag=dag,
  119. queue='igf_lims_queue',
  120. bash_command='echo "{{ ti.xcom_pull(key="4Gb8t",task_ids="calculate_new_worker_size_and_branch") }}"'
  121. )
  122. 4Gb16t_worker = \
  123. BashOperator(
  124. task_id='4Gb16t',
  125. dag=dag,
  126. queue='igf_lims_queue',
  127. bash_command='echo "{{ ti.xcom_pull(key="4Gb16t",task_ids="calculate_new_worker_size_and_branch") }}"'
  128. )
  129. 8Gb_worker = \
  130. BashOperator(
  131. task_id='8Gb',
  132. dag=dag,
  133. queue='igf_lims_queue',
  134. bash_command='echo "{{ ti.xcom_pull(key="8Gb",task_ids="calculate_new_worker_size_and_branch") }}"'
  135. )
  136. 8Gb8t_worker = \
  137. BashOperator(
  138. task_id='8Gb8t',
  139. dag=dag,
  140. queue='igf_lims_queue',
  141. bash_command='echo "{{ ti.xcom_pull(key="8Gb8t",task_ids="calculate_new_worker_size_and_branch") }}"'
  142. )
  143. 8Gb16t_worker = \
  144. BashOperator(
  145. task_id='8Gb16t',
  146. dag=dag,
  147. queue='igf_lims_queue',
  148. bash_command='echo "{{ ti.xcom_pull(key="8Gb16t",task_ids="calculate_new_worker_size_and_branch") }}"'
  149. )
  150. 16Gb_worker = \
  151. BashOperator(
  152. task_id='16Gb',
  153. dag=dag,
  154. queue='igf_lims_queue',
  155. bash_command='echo "{{ ti.xcom_pull(key="16Gb",task_ids="calculate_new_worker_size_and_branch") }}"'
  156. )
  157. 16Gb8t_worker = \
  158. BashOperator(
  159. task_id='16Gb8t',
  160. dag=dag,
  161. queue='igf_lims_queue',
  162. bash_command='echo "{{ ti.xcom_pull(key="16Gb8t",task_ids="calculate_new_worker_size_and_branch") }}"'
  163. )
  164. 16Gb16t_worker = \
  165. BashOperator(
  166. task_id='16Gb16t',
  167. dag=dag,
  168. queue='igf_lims_queue',
  169. bash_command='echo "{{ ti.xcom_pull(key="16Gb16t",task_ids="calculate_new_worker_size_and_branch") }}"'
  170. )
  171. 32Gb8t_worker = \
  172. BashOperator(
  173. task_id='32Gb8t',
  174. dag=dag,
  175. queue='igf_lims_queue',
  176. bash_command='echo "{{ ti.xcom_pull(key="32Gb8t",task_ids="calculate_new_worker_size_and_branch") }}"'
  177. )
  178. 32Gb16t_worker = \
  179. BashOperator(
  180. task_id='32Gb16t',
  181. dag=dag,
  182. queue='igf_lims_queue',
  183. bash_command='echo "{{ ti.xcom_pull(key="32Gb16t",task_ids="calculate_new_worker_size_and_branch") }}"'
  184. )
  185. 42Gb16t_worker = \
  186. BashOperator(
  187. task_id='42Gb16t',
  188. dag=dag,
  189. queue='igf_lims_queue',
  190. bash_command='echo "{{ ti.xcom_pull(key="42Gb16t",task_ids="calculate_new_worker_size_and_branch") }}"'
  191. )
  192. 64Gb16t48hr_worker = \
  193. BashOperator(
  194. task_id='64Gb16t48hr',
  195. dag=dag,
  196. queue='igf_lims_queue',
  197. bash_command='echo "{{ ti.xcom_pull(key="64Gb16t48hr",task_ids="calculate_new_worker_size_and_branch") }}"'
  198. )
  199. 128Gb16t72hr_worker = \
  200. BashOperator(
  201. task_id='128Gb16t72hr',
  202. dag=dag,
  203. queue='igf_lims_queue',
  204. bash_command='echo "{{ ti.xcom_pull(key="128Gb16t72hr",task_ids="calculate_new_worker_size_and_branch") }}"'
  205. )
  206. """
  207. check_hpc_queue >> fetch_active_jobs_from_hpc
  208. calculate_new_worker_size_and_branch << [fetch_queue_list_from_redis,fetch_active_jobs_from_hpc]
  209. """
  210. fetch_active_jobs_from_hpc >> [1Gb_worker,
  211. 1Gb4t_worker,
  212. 2Gb4t_worker,
  213. 2Gb72hr_worker,
  214. 4Gb_worker,
  215. 4Gb8t_worker,
  216. 4Gb16t_worker,
  217. 8Gb_worker,
  218. 8Gb8t_worker,
  219. 8Gb16t_worker,
  220. 16Gb_worker,
  221. 16Gb8t_worker,
  222. 16Gb16t_worker,
  223. 32Gb8t_worker,
  224. 32Gb16t_worker,
  225. 42Gb16t_worker,
  226. 64Gb16t48hr_worker,
  227. 128Gb16t72hr_worker
  228. ]
  229. """