dag8_copy_ongoing_seqrun.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. from datetime import timedelta
  2. import os,json,logging,subprocess
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.bash_operator import BashOperator
  6. from airflow.contrib.operators.ssh_operator import SSHOperator
  7. from airflow.operators.python_operator import PythonOperator
  8. from airflow.operators.python_operator import BranchPythonOperator
  9. from airflow.contrib.hooks.ssh_hook import SSHHook
  10. from airflow.operators.dummy_operator import DummyOperator
  11. from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
  12. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import get_ongoing_seqrun_list
  13. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import copy_seqrun_manifest_file
  14. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import reset_manifest_file
  15. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import get_seqrun_chunks
  16. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import copy_seqrun_chunk
  17. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import run_interop_dump
  18. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import generate_interop_report_func
  19. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import check_progress_for_run_func
  20. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import samplesheet_validation_and_branch_func
  21. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import run_tile_demult_list_func
  22. ## DEFAULT ARGS
  23. default_args = {
  24. 'owner': 'airflow',
  25. 'depends_on_past': False,
  26. 'start_date': days_ago(2),
  27. 'email_on_failure': False,
  28. 'email_on_retry': False,
  29. 'retries': 1,
  30. 'retry_delay': timedelta(minutes=5),
  31. 'provide_context': True,
  32. }
  33. ## SSH HOOKS
  34. orwell_ssh_hook = \
  35. SSHHook(
  36. key_file=Variable.get('hpc_ssh_key_file'),
  37. username=Variable.get('hpc_user'),
  38. remote_host=Variable.get('orwell_server_hostname'))
  39. ## DAG
  40. dag = \
  41. DAG(
  42. dag_id='dag8_copy_ongoing_seqrun',
  43. catchup=False,
  44. schedule_interval="0 */2 * * *",
  45. max_active_runs=1,
  46. tags=['hpc'],
  47. default_args=default_args,
  48. orientation='LR')
  49. with dag:
  50. ## TASK
  51. generate_seqrun_list = \
  52. BranchPythonOperator(
  53. task_id='generate_seqrun_list',
  54. dag=dag,
  55. queue='hpc_4G',
  56. python_callable=get_ongoing_seqrun_list)
  57. ## TASK
  58. no_ongoing_seqrun = \
  59. DummyOperator(
  60. task_id='no_ongoing_seqrun',
  61. dag=dag,
  62. queue='hpc_4G',
  63. on_success_callback=log_sleep)
  64. ## TASK
  65. tasks = list()
  66. for i in range(5):
  67. generate_seqrun_file_list = \
  68. SSHOperator(
  69. task_id='generate_seqrun_file_list_{0}'.format(i),
  70. dag=dag,
  71. pool='orwell_exe_pool',
  72. ssh_hook=orwell_ssh_hook,
  73. do_xcom_push=True,
  74. queue='hpc_4G',
  75. params={'source_task_id':'generate_seqrun_list',
  76. 'pull_key':'ongoing_seqruns',
  77. 'index_number':i},
  78. command="""
  79. source /home/igf/igf_code/airflow/env.sh; \
  80. python /home/igf/igf_code/airflow/data-management-python/scripts/seqrun_processing/create_file_list_for_ongoing_seqrun.py \
  81. --seqrun_base_dir /home/igf/seqrun/illumina \
  82. --output_path /home/igf/ongoing_run_tracking \
  83. --seqrun_id {{ ti.xcom_pull(key=params.pull_key,task_ids=params.source_task_id)[ params.index_number ] }}
  84. """)
  85. ## TASK
  86. copy_seqrun_file_list = \
  87. PythonOperator(
  88. task_id='copy_seqrun_file_list_{0}'.format(i),
  89. dag=dag,
  90. pool='orwell_scp_pool',
  91. queue='hpc_4G',
  92. params={'xcom_pull_task_ids':'generate_seqrun_file_list_{0}'.format(i)},
  93. python_callable=copy_seqrun_manifest_file)
  94. ## TASK
  95. compare_seqrun_files = \
  96. PythonOperator(
  97. task_id='compare_seqrun_files_{0}'.format(i),
  98. dag=dag,
  99. queue='hpc_4G',
  100. params={'xcom_pull_task_ids':'copy_seqrun_file_list_{0}'.format(i),
  101. 'seqrun_id_pull_key':'ongoing_seqruns',
  102. 'run_index_number':i,
  103. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  104. 'local_seqrun_path':Variable.get('hpc_seqrun_path')},
  105. python_callable=reset_manifest_file)
  106. ## TASK
  107. decide_copy_branch = \
  108. BranchPythonOperator(
  109. task_id='decide_copy_branch_{0}'.format(i),
  110. dag=dag,
  111. queue='hpc_4G',
  112. params={'xcom_pull_task_ids':'copy_seqrun_file_list_{0}'.format(i),
  113. 'worker_size':10,
  114. 'seqrun_chunk_size_key':'seqrun_chunk_size',
  115. 'child_task_prefix':'copy_file_run_{0}_chunk'.format(i)},
  116. python_callable=get_seqrun_chunks)
  117. ## TASK
  118. no_copy_seqrun = \
  119. DummyOperator(
  120. task_id='copy_file_run_{0}_chunk_{1}'.format(i,'no_work'),
  121. dag=dag,
  122. queue='hpc_4G',
  123. on_success_callback=log_sleep)
  124. ## TASK
  125. copy_seqrun_files = list()
  126. for j in range(10):
  127. copy_file_chunk = \
  128. PythonOperator(
  129. task_id='copy_file_run_{0}_chunk_{1}'.format(i,j),
  130. dag=dag,
  131. queue='hpc_4G',
  132. pool='orwell_scp_pool',
  133. params={'file_path_task_ids':'copy_seqrun_file_list_{0}'.format(i),
  134. 'seqrun_chunk_size_key':'seqrun_chunk_size',
  135. 'seqrun_chunk_size_task_ids':'decide_copy_branch_{0}'.format(i),
  136. 'run_index_number':i,
  137. 'chunk_index_number':j,
  138. 'seqrun_id_pull_key':'ongoing_seqruns',
  139. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  140. 'local_seqrun_path':Variable.get('hpc_seqrun_path')},
  141. python_callable=copy_seqrun_chunk)
  142. copy_seqrun_files.append(copy_file_chunk)
  143. ## PIPELINE
  144. generate_seqrun_list >> generate_seqrun_file_list >> copy_seqrun_file_list >> compare_seqrun_files >> decide_copy_branch
  145. decide_copy_branch >> no_copy_seqrun
  146. decide_copy_branch >> copy_seqrun_files
  147. ## TASK
  148. wait_for_copy_chunk = \
  149. DummyOperator(
  150. task_id='wait_for_copy_chunk_run_{0}'.format(i),
  151. dag=dag,
  152. trigger_rule='none_failed_or_skipped',
  153. queue='hpc_4G')
  154. ## PIPELINE
  155. copy_seqrun_files >> wait_for_copy_chunk
  156. ## TASK
  157. create_interop_dump = \
  158. PythonOperator(
  159. task_id='create_interop_dump_run_{0}'.format(i),
  160. dag=dag,
  161. queue='hpc_4G',
  162. params={'run_index_number':i,
  163. 'seqrun_id_pull_key':'ongoing_seqruns',
  164. 'seqrun_id_pull_task_ids':'generate_seqrun_list'},
  165. python_callable=run_interop_dump)
  166. ## PIPELINE
  167. wait_for_copy_chunk >> create_interop_dump
  168. ## TASK
  169. generate_interop_report = \
  170. PythonOperator(
  171. task_id='generate_interop_report_run_{0}'.format(i),
  172. dag=dag,
  173. queue='hpc_4G',
  174. params={'run_index_number':i,
  175. 'seqrun_id_pull_key':'ongoing_seqruns',
  176. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  177. 'runInfo_xml_file_name':'RunInfo.xml',
  178. 'interop_dump_pull_task':'create_interop_dump_run_{0}'.format(i),
  179. 'timeout':1200,
  180. 'kernel_name':'python3',
  181. 'output_notebook_key':'interop_notebook'},
  182. python_callable=generate_interop_report_func)
  183. ## PIPELINE
  184. create_interop_dump >> generate_interop_report
  185. ## TASK
  186. check_progress_for_run = \
  187. BranchPythonOperator(
  188. task_id='check_progress_for_run_{0}'.format(i),
  189. dag=dag,
  190. queue='hpc_4G',
  191. params={'run_index_number':i,
  192. 'seqrun_id_pull_key':'ongoing_seqruns',
  193. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  194. 'samplesheet_validation_job_prefix':'samplesheet_validation',
  195. 'tile_demult_job_prefix':'tile_demultiplexing',
  196. 'no_job_prefix':'no_seqrun_checking',
  197. 'next_job_prefix':'samplesheet_validation',
  198. 'runParameters_xml_file_name':'runParameters.xml',
  199. 'samplesheet_file_name':'SampleSheet.csv',
  200. 'interop_dump_pull_task':'create_interop_dump_run_{0}'.format(i)},
  201. python_callable=check_progress_for_run_func)
  202. ## PIPELINE
  203. create_interop_dump >> check_progress_for_run
  204. ## TASK
  205. no_seqrun_checking = \
  206. DummyOperator(
  207. task_id='no_seqrun_checking_{0}'.format(i),
  208. dag=dag,
  209. queue='hpc_4G')
  210. ## PIPELINE
  211. check_progress_for_run >> no_seqrun_checking
  212. ## TASK
  213. samplesheet_validation_and_branch = \
  214. BranchPythonOperator(
  215. task_id='samplesheet_validation_{0}'.format(i),
  216. dag=dag,
  217. queue='hpc_4G',
  218. params={'run_index_number':i,
  219. 'seqrun_id_pull_key':'ongoing_seqruns',
  220. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  221. 'samplesheet_file_name':'SampleSheet.csv',
  222. 'runParameters_xml_file_name':'runParameters.xml',
  223. 'no_job_prefix':'no_seqrun_checking',
  224. 'next_job_prefix':'tile_demultiplexing',
  225. 'next_job_range':[i for i in range(1,9)]},
  226. python_callable=samplesheet_validation_and_branch_func)
  227. ## PIPELINE
  228. check_progress_for_run >> samplesheet_validation_and_branch
  229. ## TASK
  230. run_tile_demult_list = list()
  231. for j in range(1,9):
  232. run_tile_demult_per_lane = \
  233. PythonOperator(
  234. task_id='tile_demultiplexing_{0}_{1}'.format(i,j),
  235. dag=dag,
  236. queue='hpc_4G',
  237. params={'run_index_number':i,
  238. 'lane_id':j,
  239. 'seqrun_id_pull_key':'ongoing_seqruns',
  240. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  241. 'samplesheet_file_name':'SampleSheet.csv',
  242. 'runinfo_xml_file_name':'RunInfo.xml',
  243. 'runParameters_xml_file_name':'runParameters.xml',
  244. 'tile_list':[1101,],
  245. 'threads':1},
  246. python_callable=run_tile_demult_list_func)
  247. run_tile_demult_list.\
  248. append(run_tile_demult_per_lane)
  249. ## PIPELINE
  250. samplesheet_validation_and_branch >> run_tile_demult_list
  251. samplesheet_validation_and_branch >> no_seqrun_checking
  252. ## PIPELINE
  253. generate_seqrun_list >> no_ongoing_seqrun