dag8_copy_ongoing_seqrun.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. from datetime import timedelta
  2. import os,json,logging,subprocess
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.bash_operator import BashOperator
  6. from airflow.contrib.operators.ssh_operator import SSHOperator
  7. from airflow.operators.python_operator import PythonOperator
  8. from airflow.operators.python_operator import BranchPythonOperator
  9. from airflow.contrib.hooks.ssh_hook import SSHHook
  10. from airflow.operators.dummy_operator import DummyOperator
  11. from igf_airflow.seqrun.ongoing_seqrun_processing import fetch_ongoing_seqruns
  12. from igf_airflow.seqrun.ongoing_seqrun_processing import compare_existing_seqrun_files
  13. from igf_airflow.seqrun.ongoing_seqrun_processing import check_for_sequencing_progress
  14. from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
  15. from igf_data.utils.fileutils import get_temp_dir,copy_remote_file,check_file_path,read_json_data
  16. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import get_ongoing_seqrun_list
  17. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import copy_seqrun_manifest_file
  18. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import reset_manifest_file
  19. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import get_seqrun_chunks
  20. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import copy_seqrun_chunk
  21. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import run_interop_dump
  22. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import check_progress_for_run_func
  23. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import samplesheet_validation_and_branch_func
  24. from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import run_tile_demult_list_func
  25. ## DEFAULT ARGS
  26. default_args = {
  27. 'owner': 'airflow',
  28. 'depends_on_past': False,
  29. 'start_date': days_ago(2),
  30. 'email_on_failure': False,
  31. 'email_on_retry': False,
  32. 'retries': 1,
  33. 'retry_delay': timedelta(minutes=5),
  34. 'provide_context': True,
  35. }
  36. ## SSH HOOKS
  37. orwell_ssh_hook = \
  38. SSHHook(
  39. key_file=Variable.get('hpc_ssh_key_file'),
  40. username=Variable.get('hpc_user'),
  41. remote_host=Variable.get('seqrun_server'))
  42. ## DAG
  43. dag = \
  44. DAG(
  45. dag_id='dag8_copy_ongoing_seqrun',
  46. catchup=False,
  47. schedule_interval="0 */2 * * *",
  48. max_active_runs=1,
  49. tags=['hpc'],
  50. default_args=default_args,
  51. orientation='LR')
  52. '''
  53. ## FUNCTIONS
  54. def get_ongoing_seqrun_list(**context):
  55. """
  56. A function for fetching ongoing sequencing run ids
  57. """
  58. try:
  59. ti = context.get('ti')
  60. seqrun_server = Variable.get('seqrun_server')
  61. seqrun_base_path = Variable.get('seqrun_base_path')
  62. seqrun_server_user = Variable.get('seqrun_server_user')
  63. database_config_file = Variable.get('database_config_file')
  64. ongoing_seqruns = \
  65. fetch_ongoing_seqruns(
  66. seqrun_server=seqrun_server,
  67. seqrun_base_path=seqrun_base_path,
  68. user_name=seqrun_server_user,
  69. database_config_file=database_config_file)
  70. ti.xcom_push(key='ongoing_seqruns',value=ongoing_seqruns)
  71. branch_list = ['generate_seqrun_file_list_{0}'.format(i[0])
  72. for i in enumerate(ongoing_seqruns)]
  73. if len(branch_list) == 0:
  74. branch_list = ['no_ongoing_seqrun']
  75. else:
  76. send_log_to_channels(
  77. slack_conf=Variable.get('slack_conf'),
  78. ms_teams_conf=Variable.get('ms_teams_conf'),
  79. task_id=context['task'].task_id,
  80. dag_id=context['task'].dag_id,
  81. comment='Ongoing seqruns found: {0}'.format(ongoing_seqruns),
  82. reaction='pass')
  83. return branch_list
  84. except Exception as e:
  85. logging.error(e)
  86. send_log_to_channels(
  87. slack_conf=Variable.get('slack_conf'),
  88. ms_teams_conf=Variable.get('ms_teams_conf'),
  89. task_id=context['task'].task_id,
  90. dag_id=context['task'].dag_id,
  91. comment=e,
  92. reaction='fail')
  93. raise
  94. def copy_seqrun_manifest_file(**context):
  95. """
  96. A function for copying filesize manifest for ongoing sequencing runs to hpc
  97. """
  98. try:
  99. remote_file_path = context['params'].get('file_path')
  100. seqrun_server = Variable.get('seqrun_server')
  101. seqrun_server_user = Variable.get('seqrun_server_user')
  102. xcom_pull_task_ids = context['params'].get('xcom_pull_task_ids')
  103. ti = context.get('ti')
  104. remote_file_path = ti.xcom_pull(task_ids=xcom_pull_task_ids)
  105. if remote_file_path is not None and \
  106. not isinstance(remote_file_path,str):
  107. remote_file_path = remote_file_path.decode().strip('\n')
  108. tmp_work_dir = get_temp_dir(use_ephemeral_space=True)
  109. local_file_path = \
  110. os.path.join(
  111. tmp_work_dir,
  112. os.path.basename(remote_file_path))
  113. remote_address = \
  114. '{0}@{1}'.format(seqrun_server_user,seqrun_server)
  115. copy_remote_file(
  116. remote_file_path,
  117. local_file_path,
  118. source_address=remote_address)
  119. return local_file_path
  120. except Exception as e:
  121. logging.error(e)
  122. send_log_to_channels(
  123. slack_conf=Variable.get('slack_conf'),
  124. ms_teams_conf=Variable.get('ms_teams_conf'),
  125. task_id=context['task'].task_id,
  126. dag_id=context['task'].dag_id,
  127. comment=e,
  128. reaction='fail')
  129. raise
  130. def reset_manifest_file(**context):
  131. """
  132. A function for checking existing files and resetting the manifest json with new files
  133. """
  134. try:
  135. xcom_pull_task_ids = context['params'].get('xcom_pull_task_ids')
  136. local_seqrun_path = context['params'].get('local_seqrun_path')
  137. seqrun_id_pull_key = context['params'].get('seqrun_id_pull_key')
  138. seqrun_id_pull_task_ids = context['params'].get('seqrun_id_pull_task_ids')
  139. run_index_number = context['params'].get('run_index_number')
  140. ti = context.get('ti')
  141. json_path = ti.xcom_pull(task_ids=xcom_pull_task_ids)
  142. if json_path is not None and \
  143. not isinstance(json_path,str):
  144. json_path = json_path.decode().strip('\n')
  145. seqrun_id = \
  146. ti.xcom_pull(key=seqrun_id_pull_key,task_ids=seqrun_id_pull_task_ids)[run_index_number]
  147. compare_existing_seqrun_files(
  148. json_path=json_path,
  149. seqrun_id=seqrun_id,
  150. seqrun_base_path=local_seqrun_path)
  151. except Exception as e:
  152. logging.error(e)
  153. send_log_to_channels(
  154. slack_conf=Variable.get('slack_conf'),
  155. ms_teams_conf=Variable.get('ms_teams_conf'),
  156. task_id=context['task'].task_id,
  157. dag_id=context['task'].dag_id,
  158. comment=e,
  159. reaction='fail')
  160. raise
  161. def get_seqrun_chunks(**context):
  162. """
  163. A function for setting file chunk size for seqrun files copy
  164. """
  165. try:
  166. ti = context.get('ti')
  167. worker_size = context['params'].get('worker_size')
  168. child_task_prefix = context['params'].get('child_task_prefix')
  169. seqrun_chunk_size_key = context['params'].get('seqrun_chunk_size_key')
  170. xcom_pull_task_ids = context['params'].get('xcom_pull_task_ids')
  171. file_path = ti.xcom_pull(task_ids=xcom_pull_task_ids)
  172. if file_path is not None and \
  173. not isinstance(file_path,str):
  174. file_path = file_path.decode().strip('\n')
  175. check_file_path(file_path)
  176. file_data = read_json_data(file_path)
  177. chunk_size = None
  178. if worker_size is None or \
  179. worker_size == 0:
  180. raise ValueError(
  181. 'Incorrect worker size: {0}'.\
  182. format(worker_size))
  183. if len(file_data) == 0:
  184. worker_branchs = \
  185. '{0}_{1}'.format(child_task_prefix,'no_work')
  186. else:
  187. if len(file_data) < int(5 * worker_size):
  188. worker_size = 1 # setting worker size to 1 for low input
  189. if len(file_data) % worker_size == 0:
  190. chunk_size = int(len(file_data) / worker_size)
  191. else:
  192. chunk_size = int(len(file_data) / worker_size)+1
  193. ti.xcom_push(key=seqrun_chunk_size_key,value=chunk_size)
  194. worker_branchs = \
  195. ['{0}_{1}'.format(child_task_prefix,i)
  196. for i in range(worker_size)]
  197. return worker_branchs
  198. except Exception as e:
  199. logging.error(e)
  200. send_log_to_channels(
  201. slack_conf=Variable.get('slack_conf'),
  202. ms_teams_conf=Variable.get('ms_teams_conf'),
  203. task_id=context['task'].task_id,
  204. dag_id=context['task'].dag_id,
  205. comment=e,
  206. reaction='fail')
  207. raise
  208. def copy_seqrun_chunk(**context):
  209. """
  210. A function for copying seqrun chunks
  211. """
  212. try:
  213. ti = context.get('ti')
  214. file_path_task_ids = context['params'].get('file_path_task_ids')
  215. seqrun_chunk_size_key = context['params'].get('seqrun_chunk_size_key')
  216. seqrun_chunk_size_task_ids = context['params'].get('seqrun_chunk_size_task_ids')
  217. chunk_index_number = context['params'].get('chunk_index_number')
  218. run_index_number = context['params'].get('run_index_number')
  219. local_seqrun_path = context['params'].get('local_seqrun_path')
  220. seqrun_id_pull_key = context['params'].get('seqrun_id_pull_key')
  221. seqrun_id_pull_task_ids = context['params'].get('seqrun_id_pull_task_ids')
  222. seqrun_server = Variable.get('seqrun_server')
  223. seqrun_server_user = Variable.get('seqrun_server_user')
  224. seqrun_base_path = Variable.get('seqrun_base_path')
  225. seqrun_id = \
  226. ti.xcom_pull(key=seqrun_id_pull_key,task_ids=seqrun_id_pull_task_ids)[run_index_number]
  227. file_path = \
  228. ti.xcom_pull(task_ids=file_path_task_ids)
  229. chunk_size = \
  230. ti.xcom_pull(key=seqrun_chunk_size_key,task_ids=seqrun_chunk_size_task_ids)
  231. check_file_path(file_path)
  232. file_data = read_json_data(file_path)
  233. start_index = chunk_index_number*chunk_size
  234. finish_index = ((chunk_index_number+1)*chunk_size) - 1
  235. if finish_index > len(file_data) - 1:
  236. finish_index = len(file_data) - 1
  237. local_seqrun_path = \
  238. os.path.join(
  239. local_seqrun_path,
  240. seqrun_id)
  241. remote_seqrun_path = \
  242. os.path.join(
  243. seqrun_base_path,
  244. seqrun_id)
  245. remote_address = \
  246. '{0}@{1}'.format(
  247. seqrun_server_user,
  248. seqrun_server)
  249. for entry in file_data[start_index:finish_index]:
  250. file_path = entry.get('file_path')
  251. file_size = entry.get('file_size')
  252. remote_path = \
  253. os.path.join(
  254. remote_seqrun_path,
  255. file_path)
  256. local_path = \
  257. os.path.join(
  258. local_seqrun_path,
  259. file_path)
  260. if os.path.exists(local_path) and \
  261. os.path.getsize(local_path) == file_size:
  262. pass
  263. else:
  264. copy_remote_file(
  265. remote_path,
  266. local_path,
  267. source_address=remote_address,
  268. check_file=False)
  269. except Exception as e:
  270. logging.error(e)
  271. send_log_to_channels(
  272. slack_conf=Variable.get('slack_conf'),
  273. ms_teams_conf=Variable.get('ms_teams_conf'),
  274. task_id=context['task'].task_id,
  275. dag_id=context['task'].dag_id,
  276. comment=e,
  277. reaction='fail')
  278. raise
  279. def run_interop_dump(**context):
  280. """
  281. A function for generating InterOp dump for seqrun
  282. """
  283. try:
  284. ti = context.get('ti')
  285. local_seqrun_path = Variable.get('hpc_seqrun_path')
  286. seqrun_id_pull_key = context['params'].get('seqrun_id_pull_key')
  287. seqrun_id_pull_task_ids = context['params'].get('seqrun_id_pull_task_ids')
  288. run_index_number = context['params'].get('run_index_number')
  289. interop_dumptext_exe = Variable.get('interop_dumptext_exe')
  290. temp_dir = get_temp_dir(use_ephemeral_space=True)
  291. seqrun_id = \
  292. ti.xcom_pull(key=seqrun_id_pull_key,task_ids=seqrun_id_pull_task_ids)[run_index_number]
  293. seqrun_path = \
  294. os.path.join(
  295. local_seqrun_path,
  296. seqrun_id)
  297. dump_file = \
  298. os.path.join(
  299. temp_dir,
  300. '{0}_interop_dump.csv'.format(seqrun_id))
  301. check_file_path(interop_dumptext_exe)
  302. cmd = \
  303. [interop_dumptext_exe,seqrun_path,'>',dump_file]
  304. cmd = ' '.join(cmd)
  305. subprocess.\
  306. check_call(cmd,shell=True)
  307. return dump_file
  308. except Exception as e:
  309. logging.error(e)
  310. send_log_to_channels(
  311. slack_conf=Variable.get('slack_conf'),
  312. ms_teams_conf=Variable.get('ms_teams_conf'),
  313. task_id=context['task'].task_id,
  314. dag_id=context['task'].dag_id,
  315. comment=e,
  316. reaction='fail')
  317. raise
  318. def check_progress_for_run_func(**context):
  319. try:
  320. ti = context.get('ti')
  321. local_seqrun_path = Variable.get('hpc_seqrun_path')
  322. seqrun_id_pull_key = context['params'].get('seqrun_id_pull_key')
  323. seqrun_id_pull_task_ids = context['params'].get('seqrun_id_pull_task_ids')
  324. run_index_number = context['params'].get('run_index_number')
  325. interop_dump_pull_task = context['params'].get('interop_dump_pull_task')
  326. no_job_prefix = context['params'].get('no_job_prefix')
  327. next_job_prefix = context['params'].get('next_job_prefix')
  328. job_list = \
  329. ['{0}_{1}'.format(no_job_prefix,run_index_number)]
  330. seqrun_id = \
  331. ti.xcom_pull(key=seqrun_id_pull_key,task_ids=seqrun_id_pull_task_ids)[run_index_number]
  332. runinfo_path = \
  333. os.path.join(
  334. local_seqrun_path,
  335. seqrun_id,
  336. 'RunInfo.xml')
  337. interop_dump_path = \
  338. ti.xcom_pull(task_ids=interop_dump_pull_task)
  339. if interop_dump_path is not None and \
  340. not isinstance(interop_dump_path,str):
  341. interop_dump_path = \
  342. interop_dump_path.decode().strip('\n')
  343. current_cycle,index_cycle_status,read_format = \
  344. check_for_sequencing_progress(
  345. interop_dump=interop_dump_path,
  346. runinfo_file=runinfo_path)
  347. comment = \
  348. 'seqrun: {0}, current cycle: {1}, index cycle: {2}, read format: {3}'.\
  349. format(seqrun_id,current_cycle,index_cycle_status,read_format)
  350. send_log_to_channels(
  351. slack_conf=Variable.get('slack_conf'),
  352. ms_teams_conf=Variable.get('ms_teams_conf'),
  353. task_id=context['task'].task_id,
  354. dag_id=context['task'].dag_id,
  355. comment=comment,
  356. reaction='pass')
  357. if index_cycle_status is 'complete':
  358. job_list = \
  359. ['{0}_{1}'.format(next_job_prefix,run_index_number)]
  360. return job_list
  361. except Exception as e:
  362. logging.error(e)
  363. send_log_to_channels(
  364. slack_conf=Variable.get('slack_conf'),
  365. ms_teams_conf=Variable.get('ms_teams_conf'),
  366. task_id=context['task'].task_id,
  367. dag_id=context['task'].dag_id,
  368. comment=e,
  369. reaction='fail')
  370. raise
  371. '''
  372. with dag:
  373. ## TASK
  374. generate_seqrun_list = \
  375. BranchPythonOperator(
  376. task_id='generate_seqrun_list',
  377. dag=dag,
  378. queue='hpc_4G',
  379. python_callable=get_ongoing_seqrun_list)
  380. ## TASK
  381. no_ongoing_seqrun = \
  382. DummyOperator(
  383. task_id='no_ongoing_seqrun',
  384. dag=dag,
  385. queue='hpc_4G',
  386. on_success_callback=log_sleep)
  387. ## TASK
  388. tasks = list()
  389. for i in range(5):
  390. generate_seqrun_file_list = \
  391. SSHOperator(
  392. task_id='generate_seqrun_file_list_{0}'.format(i),
  393. dag=dag,
  394. pool='orwell_exe_pool',
  395. ssh_hook=orwell_ssh_hook,
  396. do_xcom_push=True,
  397. queue='hpc_4G',
  398. params={'source_task_id':'generate_seqrun_list',
  399. 'pull_key':'ongoing_seqruns',
  400. 'index_number':i},
  401. command="""
  402. source /home/igf/igf_code/airflow/env.sh; \
  403. python /home/igf/igf_code/airflow/data-management-python/scripts/seqrun_processing/create_file_list_for_ongoing_seqrun.py \
  404. --seqrun_base_dir /home/igf/seqrun/illumina \
  405. --output_path /home/igf/ongoing_run_tracking \
  406. --seqrun_id {{ ti.xcom_pull(key=params.pull_key,task_ids=params.source_task_id)[ params.index_number ] }}
  407. """)
  408. ## TASK
  409. copy_seqrun_file_list = \
  410. PythonOperator(
  411. task_id='copy_seqrun_file_list_{0}'.format(i),
  412. dag=dag,
  413. pool='orwell_scp_pool',
  414. queue='hpc_4G',
  415. params={'xcom_pull_task_ids':'generate_seqrun_file_list_{0}'.format(i)},
  416. python_callable=copy_seqrun_manifest_file)
  417. ## TASK
  418. compare_seqrun_files = \
  419. PythonOperator(
  420. task_id='compare_seqrun_files_{0}'.format(i),
  421. dag=dag,
  422. queue='hpc_4G',
  423. params={'xcom_pull_task_ids':'copy_seqrun_file_list_{0}'.format(i),
  424. 'seqrun_id_pull_key':'ongoing_seqruns',
  425. 'run_index_number':i,
  426. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  427. 'local_seqrun_path':Variable.get('hpc_seqrun_path')},
  428. python_callable=reset_manifest_file)
  429. ## TASK
  430. decide_copy_branch = \
  431. BranchPythonOperator(
  432. task_id='decide_copy_branch_{0}'.format(i),
  433. dag=dag,
  434. queue='hpc_4G',
  435. params={'xcom_pull_task_ids':'copy_seqrun_file_list_{0}'.format(i),
  436. 'worker_size':10,
  437. 'seqrun_chunk_size_key':'seqrun_chunk_size',
  438. 'child_task_prefix':'copy_file_run_{0}_chunk'.format(i)},
  439. python_callable=get_seqrun_chunks)
  440. ## TASK
  441. no_copy_seqrun = \
  442. DummyOperator(
  443. task_id='copy_file_run_{0}_chunk_{1}'.format(i,'no_work'),
  444. dag=dag,
  445. queue='hpc_4G',
  446. on_success_callback=log_sleep)
  447. ## TASK
  448. copy_seqrun_files = list()
  449. for j in range(10):
  450. copy_file_chunk = \
  451. PythonOperator(
  452. task_id='copy_file_run_{0}_chunk_{1}'.format(i,j),
  453. dag=dag,
  454. queue='hpc_4G',
  455. pool='orwell_scp_pool',
  456. params={'file_path_task_ids':'copy_seqrun_file_list_{0}'.format(i),
  457. 'seqrun_chunk_size_key':'seqrun_chunk_size',
  458. 'seqrun_chunk_size_task_ids':'decide_copy_branch_{0}'.format(i),
  459. 'run_index_number':i,
  460. 'chunk_index_number':j,
  461. 'seqrun_id_pull_key':'ongoing_seqruns',
  462. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  463. 'local_seqrun_path':Variable.get('hpc_seqrun_path')},
  464. python_callable=copy_seqrun_chunk)
  465. copy_seqrun_files.append(copy_file_chunk)
  466. ## PIPELINE
  467. generate_seqrun_list >> generate_seqrun_file_list >> copy_seqrun_file_list >> compare_seqrun_files >> decide_copy_branch
  468. decide_copy_branch >> no_copy_seqrun
  469. decide_copy_branch >> copy_seqrun_files
  470. ## TASK
  471. wait_for_copy_chunk = \
  472. DummyOperator(
  473. task_id='wait_for_copy_chunk_run_{0}'.format(i),
  474. dag=dag,
  475. trigger_rule='none_failed_or_skipped',
  476. queue='hpc_4G')
  477. ## PIPELINE
  478. copy_seqrun_files >> wait_for_copy_chunk
  479. ## TASK
  480. create_interop_dump = \
  481. PythonOperator(
  482. task_id='create_interop_dump_run_{0}'.format(i),
  483. dag=dag,
  484. queue='hpc_4G',
  485. params={'run_index_number':i,
  486. 'seqrun_id_pull_key':'ongoing_seqruns',
  487. 'seqrun_id_pull_task_ids':'generate_seqrun_list'},
  488. python_callable=run_interop_dump)
  489. ## PIPELINE
  490. wait_for_copy_chunk >> create_interop_dump
  491. ## TASK
  492. check_progress_for_run = \
  493. BranchPythonOperator(
  494. task_id='check_progress_for_run_{0}'.format(i),
  495. dag=dag,
  496. queue='hpc_4G',
  497. params={'run_index_number':i,
  498. 'seqrun_id_pull_key':'ongoing_seqruns',
  499. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  500. 'samplesheet_validation_job_prefix':'samplesheet_validation',
  501. 'tile_demult_job_prefix':'tile_demultiplexing',
  502. 'no_job_prefix':'no_seqrun_checking',
  503. 'next_job_prefix':'samplesheet_validation',
  504. 'runParameters_xml_file_name':'runParameters.xml',
  505. 'samplesheet_file_name':'SampleSheet.csv',
  506. 'interop_dump_pull_task':'create_interop_dump_run_{0}'.format(i)},
  507. python_callable=check_progress_for_run_func)
  508. ## PIPELINE
  509. create_interop_dump >> check_progress_for_run
  510. ## TASK
  511. no_seqrun_checking = \
  512. DummyOperator(
  513. task_id='no_seqrun_checking_{0}'.format(i),
  514. dag=dag,
  515. queue='hpc_4G')
  516. ## PIPELINE
  517. check_progress_for_run >> no_seqrun_checking
  518. ## TASK
  519. samplesheet_validation_and_branch = \
  520. BranchPythonOperator(
  521. task_id='samplesheet_validation_{0}'.format(i),
  522. dag=dag,
  523. queue='hpc_4G',
  524. params={'run_index_number':i,
  525. 'seqrun_id_pull_key':'ongoing_seqruns',
  526. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  527. 'samplesheet_file_name':'SampleSheet.csv',
  528. 'runParameters_xml_file_name':'runParameters.xml',
  529. 'no_job_prefix':'no_seqrun_checking',
  530. 'next_job_prefix':'tile_demultiplexing',
  531. 'next_job_range':[i for i in range(1,9)]},
  532. python_callable=samplesheet_validation_and_branch_func)
  533. ## PIPELINE
  534. check_progress_for_run >> samplesheet_validation_and_branch
  535. ## TASK
  536. run_tile_demult_list = list()
  537. for j in range(1,9):
  538. run_tile_demult_per_lane = \
  539. PythonOperator(
  540. task_id='tile_demultiplexing_{0}_{1}'.format(i,j),
  541. dag=dag,
  542. queue='hpc_4G',
  543. params={'run_index_number':i,
  544. 'lane_id':j,
  545. 'seqrun_id_pull_key':'ongoing_seqruns',
  546. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  547. 'samplesheet_file_name':'SampleSheet.csv',
  548. 'runinfo_xml_file_name':'RunInfo.xml',
  549. 'runParameters_xml_file_name':'runParameters.xml',
  550. 'tile_list':[1101,],
  551. 'threads':1},
  552. python_callable=run_tile_demult_list_func)
  553. run_tile_demult_list.\
  554. append(run_tile_demult_per_lane)
  555. ## PIPELINE
  556. samplesheet_validation_and_branch >> run_tile_demult_list
  557. samplesheet_validation_and_branch >> no_seqrun_checking
  558. ## PIPELINE
  559. generate_seqrun_list >> no_ongoing_seqrun