dag8_copy_ongoing_seqrun.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. from datetime import timedelta
  2. import os,json,logging,subprocess
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.bash_operator import BashOperator
  6. from airflow.contrib.operators.ssh_operator import SSHOperator
  7. from airflow.operators.python_operator import PythonOperator
  8. from airflow.operators.python_operator import BranchPythonOperator
  9. from airflow.contrib.hooks.ssh_hook import SSHHook
  10. from airflow.operators.dummy_operator import DummyOperator
  11. from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
  12. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import get_ongoing_seqrun_list
  13. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import copy_seqrun_manifest_file
  14. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import reset_manifest_file
  15. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import get_seqrun_chunks
  16. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import copy_seqrun_chunk
  17. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import run_interop_dump
  18. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import check_progress_for_run_func
  19. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import samplesheet_validation_and_branch_func
  20. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import run_tile_demult_list_func
  21. ## DEFAULT ARGS
  22. default_args = {
  23. 'owner': 'airflow',
  24. 'depends_on_past': False,
  25. 'start_date': days_ago(2),
  26. 'email_on_failure': False,
  27. 'email_on_retry': False,
  28. 'retries': 1,
  29. 'retry_delay': timedelta(minutes=5),
  30. 'provide_context': True,
  31. }
  32. ## SSH HOOKS
  33. orwell_ssh_hook = \
  34. SSHHook(
  35. key_file=Variable.get('hpc_ssh_key_file'),
  36. username=Variable.get('hpc_user'),
  37. remote_host=Variable.get('orwell_server_hostname'))
  38. ## DAG
  39. dag = \
  40. DAG(
  41. dag_id='dag8_copy_ongoing_seqrun',
  42. catchup=False,
  43. schedule_interval="0 */2 * * *",
  44. max_active_runs=1,
  45. tags=['hpc'],
  46. default_args=default_args,
  47. orientation='LR')
  48. '''
  49. ## FUNCTIONS
  50. '''
  51. with dag:
  52. ## TASK
  53. generate_seqrun_list = \
  54. BranchPythonOperator(
  55. task_id='generate_seqrun_list',
  56. dag=dag,
  57. queue='hpc_4G',
  58. python_callable=get_ongoing_seqrun_list)
  59. ## TASK
  60. no_ongoing_seqrun = \
  61. DummyOperator(
  62. task_id='no_ongoing_seqrun',
  63. dag=dag,
  64. queue='hpc_4G',
  65. on_success_callback=log_sleep)
  66. ## TASK
  67. tasks = list()
  68. for i in range(5):
  69. generate_seqrun_file_list = \
  70. SSHOperator(
  71. task_id='generate_seqrun_file_list_{0}'.format(i),
  72. dag=dag,
  73. pool='orwell_exe_pool',
  74. ssh_hook=orwell_ssh_hook,
  75. do_xcom_push=True,
  76. queue='hpc_4G',
  77. params={'source_task_id':'generate_seqrun_list',
  78. 'pull_key':'ongoing_seqruns',
  79. 'index_number':i},
  80. command="""
  81. source /home/igf/igf_code/airflow/env.sh; \
  82. python /home/igf/igf_code/airflow/data-management-python/scripts/seqrun_processing/create_file_list_for_ongoing_seqrun.py \
  83. --seqrun_base_dir /home/igf/seqrun/illumina \
  84. --output_path /home/igf/ongoing_run_tracking \
  85. --seqrun_id {{ ti.xcom_pull(key=params.pull_key,task_ids=params.source_task_id)[ params.index_number ] }}
  86. """)
  87. ## TASK
  88. copy_seqrun_file_list = \
  89. PythonOperator(
  90. task_id='copy_seqrun_file_list_{0}'.format(i),
  91. dag=dag,
  92. pool='orwell_scp_pool',
  93. queue='hpc_4G',
  94. params={'xcom_pull_task_ids':'generate_seqrun_file_list_{0}'.format(i)},
  95. python_callable=copy_seqrun_manifest_file)
  96. ## TASK
  97. compare_seqrun_files = \
  98. PythonOperator(
  99. task_id='compare_seqrun_files_{0}'.format(i),
  100. dag=dag,
  101. queue='hpc_4G',
  102. params={'xcom_pull_task_ids':'copy_seqrun_file_list_{0}'.format(i),
  103. 'seqrun_id_pull_key':'ongoing_seqruns',
  104. 'run_index_number':i,
  105. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  106. 'local_seqrun_path':Variable.get('hpc_seqrun_path')},
  107. python_callable=reset_manifest_file)
  108. ## TASK
  109. decide_copy_branch = \
  110. BranchPythonOperator(
  111. task_id='decide_copy_branch_{0}'.format(i),
  112. dag=dag,
  113. queue='hpc_4G',
  114. params={'xcom_pull_task_ids':'copy_seqrun_file_list_{0}'.format(i),
  115. 'worker_size':10,
  116. 'seqrun_chunk_size_key':'seqrun_chunk_size',
  117. 'child_task_prefix':'copy_file_run_{0}_chunk'.format(i)},
  118. python_callable=get_seqrun_chunks)
  119. ## TASK
  120. no_copy_seqrun = \
  121. DummyOperator(
  122. task_id='copy_file_run_{0}_chunk_{1}'.format(i,'no_work'),
  123. dag=dag,
  124. queue='hpc_4G',
  125. on_success_callback=log_sleep)
  126. ## TASK
  127. copy_seqrun_files = list()
  128. for j in range(10):
  129. copy_file_chunk = \
  130. PythonOperator(
  131. task_id='copy_file_run_{0}_chunk_{1}'.format(i,j),
  132. dag=dag,
  133. queue='hpc_4G',
  134. pool='orwell_scp_pool',
  135. params={'file_path_task_ids':'copy_seqrun_file_list_{0}'.format(i),
  136. 'seqrun_chunk_size_key':'seqrun_chunk_size',
  137. 'seqrun_chunk_size_task_ids':'decide_copy_branch_{0}'.format(i),
  138. 'run_index_number':i,
  139. 'chunk_index_number':j,
  140. 'seqrun_id_pull_key':'ongoing_seqruns',
  141. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  142. 'local_seqrun_path':Variable.get('hpc_seqrun_path')},
  143. python_callable=copy_seqrun_chunk)
  144. copy_seqrun_files.append(copy_file_chunk)
  145. ## PIPELINE
  146. generate_seqrun_list >> generate_seqrun_file_list >> copy_seqrun_file_list >> compare_seqrun_files >> decide_copy_branch
  147. decide_copy_branch >> no_copy_seqrun
  148. decide_copy_branch >> copy_seqrun_files
  149. ## TASK
  150. wait_for_copy_chunk = \
  151. DummyOperator(
  152. task_id='wait_for_copy_chunk_run_{0}'.format(i),
  153. dag=dag,
  154. trigger_rule='none_failed_or_skipped',
  155. queue='hpc_4G')
  156. ## PIPELINE
  157. copy_seqrun_files >> wait_for_copy_chunk
  158. ## TASK
  159. create_interop_dump = \
  160. PythonOperator(
  161. task_id='create_interop_dump_run_{0}'.format(i),
  162. dag=dag,
  163. queue='hpc_4G',
  164. params={'run_index_number':i,
  165. 'seqrun_id_pull_key':'ongoing_seqruns',
  166. 'seqrun_id_pull_task_ids':'generate_seqrun_list'},
  167. python_callable=run_interop_dump)
  168. ## PIPELINE
  169. wait_for_copy_chunk >> create_interop_dump
  170. ## TASK
  171. check_progress_for_run = \
  172. BranchPythonOperator(
  173. task_id='check_progress_for_run_{0}'.format(i),
  174. dag=dag,
  175. queue='hpc_4G',
  176. params={'run_index_number':i,
  177. 'seqrun_id_pull_key':'ongoing_seqruns',
  178. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  179. 'samplesheet_validation_job_prefix':'samplesheet_validation',
  180. 'tile_demult_job_prefix':'tile_demultiplexing',
  181. 'no_job_prefix':'no_seqrun_checking',
  182. 'next_job_prefix':'samplesheet_validation',
  183. 'runParameters_xml_file_name':'runParameters.xml',
  184. 'samplesheet_file_name':'SampleSheet.csv',
  185. 'interop_dump_pull_task':'create_interop_dump_run_{0}'.format(i)},
  186. python_callable=check_progress_for_run_func)
  187. ## PIPELINE
  188. create_interop_dump >> check_progress_for_run
  189. ## TASK
  190. no_seqrun_checking = \
  191. DummyOperator(
  192. task_id='no_seqrun_checking_{0}'.format(i),
  193. dag=dag,
  194. queue='hpc_4G')
  195. ## PIPELINE
  196. check_progress_for_run >> no_seqrun_checking
  197. ## TASK
  198. samplesheet_validation_and_branch = \
  199. BranchPythonOperator(
  200. task_id='samplesheet_validation_{0}'.format(i),
  201. dag=dag,
  202. queue='hpc_4G',
  203. params={'run_index_number':i,
  204. 'seqrun_id_pull_key':'ongoing_seqruns',
  205. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  206. 'samplesheet_file_name':'SampleSheet.csv',
  207. 'runParameters_xml_file_name':'runParameters.xml',
  208. 'no_job_prefix':'no_seqrun_checking',
  209. 'next_job_prefix':'tile_demultiplexing',
  210. 'next_job_range':[i for i in range(1,9)]},
  211. python_callable=samplesheet_validation_and_branch_func)
  212. ## PIPELINE
  213. check_progress_for_run >> samplesheet_validation_and_branch
  214. ## TASK
  215. run_tile_demult_list = list()
  216. for j in range(1,9):
  217. run_tile_demult_per_lane = \
  218. PythonOperator(
  219. task_id='tile_demultiplexing_{0}_{1}'.format(i,j),
  220. dag=dag,
  221. queue='hpc_4G',
  222. params={'run_index_number':i,
  223. 'lane_id':j,
  224. 'seqrun_id_pull_key':'ongoing_seqruns',
  225. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  226. 'samplesheet_file_name':'SampleSheet.csv',
  227. 'runinfo_xml_file_name':'RunInfo.xml',
  228. 'runParameters_xml_file_name':'runParameters.xml',
  229. 'tile_list':[1101,],
  230. 'threads':1},
  231. python_callable=run_tile_demult_list_func)
  232. run_tile_demult_list.\
  233. append(run_tile_demult_per_lane)
  234. ## PIPELINE
  235. samplesheet_validation_and_branch >> run_tile_demult_list
  236. samplesheet_validation_and_branch >> no_seqrun_checking
  237. ## PIPELINE
  238. generate_seqrun_list >> no_ongoing_seqrun