dag14_crick_seqrun_transfer.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. from datetime import timedelta
  2. from airflow.models import DAG
  3. from airflow.utils.dates import days_ago
  4. from airflow.operators.python_operator import PythonOperator
  5. from airflow.operators.python_operator import BranchPythonOperator
  6. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_transfer_run_func
  7. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import extract_tar_file_func
  8. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import find_and_split_md5_func
  9. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import validate_md5_chunk_func
  10. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_divide_run_for_remote_copy_func
  11. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import copy_run_file_to_remote_func
  12. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import run_interop_dump_func
  13. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import generate_interop_report_func
  14. args = {
  15. 'owner': 'airflow',
  16. 'start_date': days_ago(2),
  17. 'retries': 4,
  18. 'retry_delay': timedelta(minutes=5),
  19. 'provide_context': True,
  20. 'email_on_failure': False,
  21. 'email_on_retry': False,
  22. 'catchup': False,
  23. 'max_active_runs': 5,
  24. }
  25. dag = \
  26. DAG(
  27. dag_id='dag14_crick_seqrun_transfer',
  28. schedule_interval=None,
  29. default_args=args,
  30. orientation='LR',
  31. tags=['ftp', 'hpc'])
  32. with dag:
  33. ## TASK
  34. check_and_transfer_run = \
  35. PythonOperator(
  36. task_id='check_and_transfer_run',
  37. dag=dag,
  38. pool='crick_ftp_pool',
  39. queue='hpc_4G',
  40. python_callable=check_and_transfer_run_func)
  41. ## TASK
  42. extract_tar_file = \
  43. PythonOperator(
  44. task_id='extract_tar_file',
  45. dag=dag,
  46. queue='hpc_4G',
  47. python_callable=extract_tar_file_func)
  48. ## TASK
  49. find_and_split_md5 = \
  50. BranchPythonOperator(
  51. task_id='find_and_split_md5',
  52. dag=dag,
  53. queue='hpc_4G',
  54. params={'split_count': 50},
  55. python_callable=find_and_split_md5_func)
  56. ## PIPELINE
  57. check_and_transfer_run >> extract_tar_file >> find_and_split_md5
  58. for chunk_id in range(0, 50):
  59. t = \
  60. PythonOperator(
  61. task_id='md5_validate_chunk_{0}'.format(chunk_id),
  62. dag=dag,
  63. queue='hpc_4G',
  64. params={'chunk_id': chunk_id,
  65. 'xcom_task': 'find_and_split_md5',
  66. 'xcom_key': 'md5_file_chunk'},
  67. python_callable=validate_md5_chunk_func)
  68. ## PIPELINE
  69. find_and_split_md5 >> t
  70. ## TASK
  71. check_and_divide_run_for_remote_copy = \
  72. PythonOperator(
  73. task_id='check_and_divide_run_for_remote_copy',
  74. dag=dag,
  75. queue='hpc_4G',
  76. python_callable=check_and_divide_run_for_remote_copy_func)
  77. ## PIPELINE
  78. extract_tar_file >> check_and_divide_run_for_remote_copy
  79. ## TASK
  80. for i in range(1, 9):
  81. t = \
  82. PythonOperator(
  83. task_id='copy_bcl_to_remote_for_lane{0}'.format(i),
  84. dag=dag,
  85. pool='orwell_scp_pool',
  86. queue='hpc_4G',
  87. params={'lane_id': i,
  88. 'xcom_task': 'check_and_divide_run_for_remote_copy',
  89. 'xcom_key': 'bcl_files'},
  90. python_callable=copy_run_file_to_remote_func)
  91. ## PIPELINE
  92. check_and_divide_run_for_remote_copy >> t
  93. copy_additional_file_to_remote = \
  94. PythonOperator(
  95. task_id='copy_additional_file_to_remote',
  96. dag=dag,
  97. pool='orwell_scp_pool',
  98. queue='hpc_4G',
  99. params={'xcom_task': 'check_and_divide_run_for_remote_copy',
  100. 'xcom_key': 'additional_files'},
  101. python_callable=copy_run_file_to_remote_func)
  102. ## PIPELINE
  103. check_and_divide_run_for_remote_copy >> copy_additional_file_to_remote
  104. ## TASK
  105. generate_interop_dump = \
  106. PythonOperator(
  107. task_id='generate_interop_dump',
  108. dag=dag,
  109. queue='hpc_4G',
  110. python_callable=run_interop_dump_func)
  111. generate_interop_report = \
  112. PythonOperator(
  113. task_id='generate_interop_report',
  114. dag=dag,
  115. queue='hpc_4G',
  116. params={'interop_dump_xcom_task': 'generate_interop_dump',
  117. 'timeout': 1200,
  118. 'kernel_name': 'python3'},
  119. python_callable=generate_interop_report_func)
  120. ## PIPELINE
  121. extract_tar_file >> generate_interop_dump >> generate_interop_report