dag14_crick_seqrun_transfer.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. from datetime import timedelta
  2. from airflow.models import DAG
  3. from airflow.utils.dates import days_ago
  4. from airflow.operators.python_operator import PythonOperator
  5. from airflow.operators.python_operator import BranchPythonOperator
  6. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_transfer_run_func
  7. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import extract_tar_file_func
  8. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import find_and_split_md5_func
  9. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import validate_md5_chunk_func
  10. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_divide_run_for_remote_copy_func
  11. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import copy_run_file_to_remote_func
  12. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import run_interop_dump_func
  13. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import generate_interop_report_func
  14. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import send_message_to_channels_with_mention_func
  15. args = {
  16. 'owner': 'airflow',
  17. 'start_date': days_ago(2),
  18. 'retries': 4,
  19. 'retry_delay': timedelta(minutes=5),
  20. 'provide_context': True,
  21. 'email_on_failure': False,
  22. 'email_on_retry': False,
  23. 'catchup': False,
  24. 'max_active_runs': 5,
  25. }
  26. dag = \
  27. DAG(
  28. dag_id='dag14_crick_seqrun_transfer',
  29. schedule_interval=None,
  30. default_args=args,
  31. orientation='LR',
  32. tags=['ftp', 'hpc'])
  33. with dag:
  34. ## TASK
  35. check_and_transfer_run = \
  36. PythonOperator(
  37. task_id='check_and_transfer_run',
  38. dag=dag,
  39. pool='crick_ftp_pool',
  40. queue='hpc_4G',
  41. python_callable=check_and_transfer_run_func)
  42. ## TASK
  43. extract_tar_file = \
  44. PythonOperator(
  45. task_id='extract_tar_file',
  46. dag=dag,
  47. queue='hpc_4G',
  48. python_callable=extract_tar_file_func)
  49. ## TASK
  50. find_and_split_md5 = \
  51. BranchPythonOperator(
  52. task_id='find_and_split_md5',
  53. dag=dag,
  54. queue='hpc_4G',
  55. params={'split_count': 50},
  56. python_callable=find_and_split_md5_func)
  57. ## PIPELINE
  58. check_and_transfer_run >> extract_tar_file >> find_and_split_md5
  59. for chunk_id in range(0, 50):
  60. t = \
  61. PythonOperator(
  62. task_id='md5_validate_chunk_{0}'.format(chunk_id),
  63. dag=dag,
  64. queue='hpc_4G',
  65. params={'chunk_id': chunk_id,
  66. 'xcom_task': 'find_and_split_md5',
  67. 'xcom_key': 'md5_file_chunk'},
  68. python_callable=validate_md5_chunk_func)
  69. ## PIPELINE
  70. find_and_split_md5 >> t
  71. ## TASK
  72. check_and_divide_run_for_remote_copy = \
  73. PythonOperator(
  74. task_id='check_and_divide_run_for_remote_copy',
  75. dag=dag,
  76. queue='hpc_4G',
  77. python_callable=check_and_divide_run_for_remote_copy_func)
  78. ## PIPELINE
  79. extract_tar_file >> check_and_divide_run_for_remote_copy
  80. ## TASK
  81. send_message_to_channels_with_mention = \
  82. PythonOperator(
  83. task_id='send_message_to_channels_with_mention',
  84. dag=dag,
  85. queue='hpc_4G',
  86. python_callable=send_message_to_channels_with_mention_func)
  87. ## TASK
  88. for i in range(1, 9):
  89. t = \
  90. PythonOperator(
  91. task_id='copy_bcl_to_remote_for_lane{0}'.format(i),
  92. dag=dag,
  93. pool='orwell_scp_pool',
  94. queue='hpc_4G',
  95. params={'lane_id': i,
  96. 'xcom_task': 'check_and_divide_run_for_remote_copy',
  97. 'xcom_key': 'bcl_files'},
  98. python_callable=copy_run_file_to_remote_func)
  99. ## PIPELINE
  100. check_and_divide_run_for_remote_copy >> t >> send_message_to_channels_with_mention
  101. copy_additional_file_to_remote = \
  102. PythonOperator(
  103. task_id='copy_additional_file_to_remote',
  104. dag=dag,
  105. pool='orwell_scp_pool',
  106. queue='hpc_4G',
  107. params={'xcom_task': 'check_and_divide_run_for_remote_copy',
  108. 'xcom_key': 'additional_files'},
  109. python_callable=copy_run_file_to_remote_func)
  110. ## PIPELINE
  111. check_and_divide_run_for_remote_copy >> copy_additional_file_to_remote
  112. ## TASK
  113. generate_interop_dump = \
  114. PythonOperator(
  115. task_id='generate_interop_dump',
  116. dag=dag,
  117. queue='hpc_4G',
  118. python_callable=run_interop_dump_func)
  119. generate_interop_report = \
  120. PythonOperator(
  121. task_id='generate_interop_report',
  122. dag=dag,
  123. queue='hpc_4G',
  124. params={'interop_dump_xcom_task': 'generate_interop_dump',
  125. 'timeout': 1200,
  126. 'kernel_name': 'python3'},
  127. python_callable=generate_interop_report_func)
  128. ## PIPELINE
  129. extract_tar_file >> generate_interop_dump >> generate_interop_report