dag1_calculate_hpc_worker.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import json
  2. from airflow.models import DAG,Variable
  3. from airflow.operators.bash_operator import BashOperator
  4. from airflow.operators.python_operator import PythonOperator,BranchPythonOperator
  5. from airflow.contrib.operators.ssh_operator import SSHOperator
  6. from airflow.contrib.hooks.ssh_hook import SSHHook
  7. from airflow.utils.dates import days_ago
  8. from igf_airflow.check_celery_queue import fetch_queue_list_from_redis_server,airflow_utils_for_redis
  9. from igf_airflow.check_celery_queue import calculate_new_workers
  10. args = {
  11. 'owner':'airflow',
  12. 'start_date':days_ago(2),
  13. 'provide_context': True,
  14. }
  15. hpc_hook = SSHHook(ssh_conn_id='hpc_conn')
  16. dag = DAG(
  17. dag_id='dag1_calculate_hpc_worker',
  18. catchup=False,
  19. max_active_runs=1,
  20. schedule_interval="*/15 * * * *",
  21. default_args=args,
  22. tags=['igf-lims',]
  23. )
  24. def get_new_workers(**kwargs):
  25. try:
  26. if 'ti' not in kwargs:
  27. raise ValueError('ti not present in kwargs')
  28. ti = kwargs.get('ti')
  29. active_tasks = ti.xcom_pull(task_ids='fetch_active_jobs_from_hpc')
  30. active_tasks = active_tasks.decode()
  31. active_tasks = json.loads(active_tasks)
  32. queued_tasks = ti.xcom_pull(task_ids='fetch_queue_list_from_redis')
  33. worker_to_submit,unique_queue_list = \
  34. calculate_new_workers(
  35. queue_list=queued_tasks,
  36. active_jobs_dict=active_tasks,
  37. max_workers_per_queue=3,
  38. max_total_workers=10)
  39. for key,value in worker_to_submit.items():
  40. ti.xcom_push(key=key,value=value)
  41. unique_queue_list = \
  42. [q for q in unique_queue_list if q.startswith('hpc')]
  43. return unique_queue_list
  44. except Exception as e:
  45. raise ValueError('Failed to get new workers, error: {0}'.format(e))
  46. with dag:
  47. fetch_queue_list_from_redis = \
  48. PythonOperator(
  49. task_id='fetch_queue_list_from_redis',
  50. dag=dag,
  51. python_callable=airflow_utils_for_redis,
  52. op_kwargs={"redis_conf_file":Variable.get('redis_conn_file')},
  53. queue='igf-lims'
  54. )
  55. check_hpc_queue = \
  56. SSHOperator(
  57. task_id='check_hpc_queue',
  58. ssh_hook=hpc_hook,
  59. dag=dag,
  60. command='source /etc/bashrc;qstat',
  61. queue='igf-lims'
  62. )
  63. fetch_active_jobs_from_hpc = \
  64. SSHOperator(
  65. task_id='fetch_active_jobs_from_hpc',
  66. ssh_hook=hpc_hook,
  67. dag=dag,
  68. command='source /etc/bashrc;bash /project/tgu/data2/airflow_test/github/igf-airflow-hpc/scripts/hpc/hpc_job_count_runner.sh ',
  69. do_xcom_push=True,
  70. queue='igf-lims'
  71. )
  72. calculate_new_worker_size_and_branch = \
  73. BranchPythonOperator(
  74. task_id='calculate_new_worker_size_and_branch',
  75. dag=dag,
  76. python_callable=get_new_workers,
  77. queue='igf-lims',
  78. )
  79. check_hpc_queue >> fetch_active_jobs_from_hpc
  80. calculate_new_worker_size_and_branch << [fetch_queue_list_from_redis,fetch_active_jobs_from_hpc]
  81. hpc_queue_list = Variable.get('hpc_queue_list')
  82. for q,data in hpc_queue_list.items():
  83. pbs_resource = data.get('pbs_resource')
  84. airflow_queue = data.get('airflow_queue')
  85. t = SSHOperator(
  86. task_id=q,
  87. ssh_hook=hpc_hook,
  88. dag=dag,
  89. queue='igf-lims',
  90. command="""
  91. {% if ti.xcom_pull(key=params.job_name,task_ids="calculate_new_worker_size_and_branch" ) > 1 %}
  92. source /etc/bashrc;qsub -o /dev/null -e /dev/null -k n -m n -N {{ params.job_name }} -J 1-{{ ti.xcom_pull(key=params.job_name,task_ids="calculate_new_worker_size_and_branch" ) }} {{ params.pbs_resource }} -- /project/tgu/data2/airflow_test/github/igf-airflow-hpc/scripts/hpc/airflow_worker.sh {{ params.airflow_queue }} {{ params.job_name }}
  93. {% else %}
  94. source /etc/bashrc;qsub -o /dev/null -e /dev/null -k n -m n -N {{ params.job_name }} {{ params.pbs_resource }} -- /project/tgu/data2/airflow_test/github/igf-airflow-hpc/scripts/hpc/airflow_worker.sh {{ params.airflow_queue }} {{ params.job_name }}
  95. {% endif %}
  96. """,
  97. params={'pbs_resource':pbs_resource,'airflow_queue':airflow_queue,'job_name':q}
  98. )
  99. calculate_new_worker_size_and_branch >> t