dag8_copy_ongoing_seqrun.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. from datetime import timedelta
  2. import os,json,logging,subprocess
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.bash_operator import BashOperator
  6. from airflow.contrib.operators.ssh_operator import SSHOperator
  7. from airflow.operators.python_operator import PythonOperator
  8. from airflow.operators.python_operator import BranchPythonOperator
  9. from airflow.contrib.hooks.ssh_hook import SSHHook
  10. from airflow.operators.dummy_operator import DummyOperator
  11. from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
  12. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import get_ongoing_seqrun_list
  13. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import copy_seqrun_manifest_file
  14. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import reset_manifest_file
  15. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import get_seqrun_chunks
  16. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import copy_seqrun_chunk
  17. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import run_interop_dump
  18. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import check_progress_for_run_func
  19. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import samplesheet_validation_and_branch_func
  20. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import run_tile_demult_list_func
  21. ## DEFAULT ARGS
  22. default_args = {
  23. 'owner': 'airflow',
  24. 'depends_on_past': False,
  25. 'start_date': days_ago(2),
  26. 'email_on_failure': False,
  27. 'email_on_retry': False,
  28. 'retries': 1,
  29. 'retry_delay': timedelta(minutes=5),
  30. 'provide_context': True,
  31. }
  32. ## SSH HOOKS
  33. orwell_ssh_hook = \
  34. SSHHook(
  35. key_file=Variable.get('hpc_ssh_key_file'),
  36. username=Variable.get('hpc_user'),
  37. remote_host=Variable.get('orwell_server_hostname'))
  38. ## DAG
  39. dag = \
  40. DAG(
  41. dag_id='dag8_copy_ongoing_seqrun',
  42. catchup=False,
  43. schedule_interval="0 */2 * * *",
  44. max_active_runs=1,
  45. tags=['hpc'],
  46. default_args=default_args,
  47. orientation='LR')
  48. with dag:
  49. ## TASK
  50. generate_seqrun_list = \
  51. BranchPythonOperator(
  52. task_id='generate_seqrun_list',
  53. dag=dag,
  54. queue='hpc_4G',
  55. python_callable=get_ongoing_seqrun_list)
  56. ## TASK
  57. no_ongoing_seqrun = \
  58. DummyOperator(
  59. task_id='no_ongoing_seqrun',
  60. dag=dag,
  61. queue='hpc_4G',
  62. on_success_callback=log_sleep)
  63. ## TASK
  64. tasks = list()
  65. for i in range(5):
  66. generate_seqrun_file_list = \
  67. SSHOperator(
  68. task_id='generate_seqrun_file_list_{0}'.format(i),
  69. dag=dag,
  70. pool='orwell_exe_pool',
  71. ssh_hook=orwell_ssh_hook,
  72. do_xcom_push=True,
  73. queue='hpc_4G',
  74. params={'source_task_id':'generate_seqrun_list',
  75. 'pull_key':'ongoing_seqruns',
  76. 'index_number':i},
  77. command="""
  78. source /home/igf/igf_code/airflow/env.sh; \
  79. python /home/igf/igf_code/airflow/data-management-python/scripts/seqrun_processing/create_file_list_for_ongoing_seqrun.py \
  80. --seqrun_base_dir /home/igf/seqrun/illumina \
  81. --output_path /home/igf/ongoing_run_tracking \
  82. --seqrun_id {{ ti.xcom_pull(key=params.pull_key,task_ids=params.source_task_id)[ params.index_number ] }}
  83. """)
  84. ## TASK
  85. copy_seqrun_file_list = \
  86. PythonOperator(
  87. task_id='copy_seqrun_file_list_{0}'.format(i),
  88. dag=dag,
  89. pool='orwell_scp_pool',
  90. queue='hpc_4G',
  91. params={'xcom_pull_task_ids':'generate_seqrun_file_list_{0}'.format(i)},
  92. python_callable=copy_seqrun_manifest_file)
  93. ## TASK
  94. compare_seqrun_files = \
  95. PythonOperator(
  96. task_id='compare_seqrun_files_{0}'.format(i),
  97. dag=dag,
  98. queue='hpc_4G',
  99. params={'xcom_pull_task_ids':'copy_seqrun_file_list_{0}'.format(i),
  100. 'seqrun_id_pull_key':'ongoing_seqruns',
  101. 'run_index_number':i,
  102. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  103. 'local_seqrun_path':Variable.get('hpc_seqrun_path')},
  104. python_callable=reset_manifest_file)
  105. ## TASK
  106. decide_copy_branch = \
  107. BranchPythonOperator(
  108. task_id='decide_copy_branch_{0}'.format(i),
  109. dag=dag,
  110. queue='hpc_4G',
  111. params={'xcom_pull_task_ids':'copy_seqrun_file_list_{0}'.format(i),
  112. 'worker_size':10,
  113. 'seqrun_chunk_size_key':'seqrun_chunk_size',
  114. 'child_task_prefix':'copy_file_run_{0}_chunk'.format(i)},
  115. python_callable=get_seqrun_chunks)
  116. ## TASK
  117. no_copy_seqrun = \
  118. DummyOperator(
  119. task_id='copy_file_run_{0}_chunk_{1}'.format(i,'no_work'),
  120. dag=dag,
  121. queue='hpc_4G',
  122. on_success_callback=log_sleep)
  123. ## TASK
  124. copy_seqrun_files = list()
  125. for j in range(10):
  126. copy_file_chunk = \
  127. PythonOperator(
  128. task_id='copy_file_run_{0}_chunk_{1}'.format(i,j),
  129. dag=dag,
  130. queue='hpc_4G',
  131. pool='orwell_scp_pool',
  132. params={'file_path_task_ids':'copy_seqrun_file_list_{0}'.format(i),
  133. 'seqrun_chunk_size_key':'seqrun_chunk_size',
  134. 'seqrun_chunk_size_task_ids':'decide_copy_branch_{0}'.format(i),
  135. 'run_index_number':i,
  136. 'chunk_index_number':j,
  137. 'seqrun_id_pull_key':'ongoing_seqruns',
  138. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  139. 'local_seqrun_path':Variable.get('hpc_seqrun_path')},
  140. python_callable=copy_seqrun_chunk)
  141. copy_seqrun_files.append(copy_file_chunk)
  142. ## PIPELINE
  143. generate_seqrun_list >> generate_seqrun_file_list >> copy_seqrun_file_list >> compare_seqrun_files >> decide_copy_branch
  144. decide_copy_branch >> no_copy_seqrun
  145. decide_copy_branch >> copy_seqrun_files
  146. ## TASK
  147. wait_for_copy_chunk = \
  148. DummyOperator(
  149. task_id='wait_for_copy_chunk_run_{0}'.format(i),
  150. dag=dag,
  151. trigger_rule='none_failed_or_skipped',
  152. queue='hpc_4G')
  153. ## PIPELINE
  154. copy_seqrun_files >> wait_for_copy_chunk
  155. ## TASK
  156. create_interop_dump = \
  157. PythonOperator(
  158. task_id='create_interop_dump_run_{0}'.format(i),
  159. dag=dag,
  160. queue='hpc_4G',
  161. params={'run_index_number':i,
  162. 'seqrun_id_pull_key':'ongoing_seqruns',
  163. 'seqrun_id_pull_task_ids':'generate_seqrun_list'},
  164. python_callable=run_interop_dump)
  165. ## PIPELINE
  166. wait_for_copy_chunk >> create_interop_dump
  167. ## TASK
  168. check_progress_for_run = \
  169. BranchPythonOperator(
  170. task_id='check_progress_for_run_{0}'.format(i),
  171. dag=dag,
  172. queue='hpc_4G',
  173. params={'run_index_number':i,
  174. 'seqrun_id_pull_key':'ongoing_seqruns',
  175. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  176. 'samplesheet_validation_job_prefix':'samplesheet_validation',
  177. 'tile_demult_job_prefix':'tile_demultiplexing',
  178. 'no_job_prefix':'no_seqrun_checking',
  179. 'next_job_prefix':'samplesheet_validation',
  180. 'runParameters_xml_file_name':'runParameters.xml',
  181. 'samplesheet_file_name':'SampleSheet.csv',
  182. 'interop_dump_pull_task':'create_interop_dump_run_{0}'.format(i)},
  183. python_callable=check_progress_for_run_func)
  184. ## PIPELINE
  185. create_interop_dump >> check_progress_for_run
  186. ## TASK
  187. no_seqrun_checking = \
  188. DummyOperator(
  189. task_id='no_seqrun_checking_{0}'.format(i),
  190. dag=dag,
  191. queue='hpc_4G')
  192. ## PIPELINE
  193. check_progress_for_run >> no_seqrun_checking
  194. ## TASK
  195. samplesheet_validation_and_branch = \
  196. BranchPythonOperator(
  197. task_id='samplesheet_validation_{0}'.format(i),
  198. dag=dag,
  199. queue='hpc_4G',
  200. params={'run_index_number':i,
  201. 'seqrun_id_pull_key':'ongoing_seqruns',
  202. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  203. 'samplesheet_file_name':'SampleSheet.csv',
  204. 'runParameters_xml_file_name':'runParameters.xml',
  205. 'no_job_prefix':'no_seqrun_checking',
  206. 'next_job_prefix':'tile_demultiplexing',
  207. 'next_job_range':[i for i in range(1,9)]},
  208. python_callable=samplesheet_validation_and_branch_func)
  209. ## PIPELINE
  210. check_progress_for_run >> samplesheet_validation_and_branch
  211. ## TASK
  212. run_tile_demult_list = list()
  213. for j in range(1,9):
  214. run_tile_demult_per_lane = \
  215. PythonOperator(
  216. task_id='tile_demultiplexing_{0}_{1}'.format(i,j),
  217. dag=dag,
  218. queue='hpc_4G',
  219. params={'run_index_number':i,
  220. 'lane_id':j,
  221. 'seqrun_id_pull_key':'ongoing_seqruns',
  222. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  223. 'samplesheet_file_name':'SampleSheet.csv',
  224. 'runinfo_xml_file_name':'RunInfo.xml',
  225. 'runParameters_xml_file_name':'runParameters.xml',
  226. 'tile_list':[1101,],
  227. 'threads':1},
  228. python_callable=run_tile_demult_list_func)
  229. run_tile_demult_list.\
  230. append(run_tile_demult_per_lane)
  231. ## PIPELINE
  232. samplesheet_validation_and_branch >> run_tile_demult_list
  233. samplesheet_validation_and_branch >> no_seqrun_checking
  234. ## PIPELINE
  235. generate_seqrun_list >> no_ongoing_seqrun