dag8_copy_ongoing_seqrun.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. from datetime import timedelta
  2. import os,json,logging,subprocess
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.bash_operator import BashOperator
  6. from airflow.contrib.operators.ssh_operator import SSHOperator
  7. from airflow.operators.python_operator import PythonOperator,BranchPythonOperator
  8. from airflow.contrib.hooks.ssh_hook import SSHHook
  9. from airflow.operators.dummy_operator import DummyOperator
  10. from igf_airflow.seqrun.ongoing_seqrun_processing import fetch_ongoing_seqruns,compare_existing_seqrun_files
  11. from igf_airflow.seqrun.ongoing_seqrun_processing import check_for_sequencing_progress
  12. from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
  13. from igf_data.utils.fileutils import get_temp_dir,copy_remote_file,check_file_path,read_json_data
  14. ## DEFAULT ARGS
  15. default_args = {
  16. 'owner': 'airflow',
  17. 'depends_on_past': False,
  18. 'start_date': days_ago(2),
  19. 'email_on_failure': False,
  20. 'email_on_retry': False,
  21. 'retries': 1,
  22. 'retry_delay': timedelta(minutes=5),
  23. 'provide_context': True,
  24. }
  25. ## SSH HOOKS
  26. orwell_ssh_hook = \
  27. SSHHook(
  28. key_file=Variable.get('hpc_ssh_key_file'),
  29. username=Variable.get('hpc_user'),
  30. remote_host=Variable.get('seqrun_server'))
  31. ## DAG
  32. dag = \
  33. DAG(
  34. dag_id='dag8_copy_ongoing_seqrun',
  35. catchup=False,
  36. schedule_interval="0 */2 * * *",
  37. max_active_runs=1,
  38. tags=['hpc'],
  39. default_args=default_args,
  40. orientation='LR')
  41. ## FUNCTIONS
  42. def get_ongoing_seqrun_list(**context):
  43. """
  44. A function for fetching ongoing sequencing run ids
  45. """
  46. try:
  47. ti = context.get('ti')
  48. seqrun_server = Variable.get('seqrun_server')
  49. seqrun_base_path = Variable.get('seqrun_base_path')
  50. seqrun_server_user = Variable.get('seqrun_server_user')
  51. database_config_file = Variable.get('database_config_file')
  52. ongoing_seqruns = \
  53. fetch_ongoing_seqruns(
  54. seqrun_server=seqrun_server,
  55. seqrun_base_path=seqrun_base_path,
  56. user_name=seqrun_server_user,
  57. database_config_file=database_config_file)
  58. ti.xcom_push(key='ongoing_seqruns',value=ongoing_seqruns)
  59. branch_list = ['generate_seqrun_file_list_{0}'.format(i[0])
  60. for i in enumerate(ongoing_seqruns)]
  61. if len(branch_list) == 0:
  62. branch_list = ['no_ongoing_seqrun']
  63. else:
  64. send_log_to_channels(
  65. slack_conf=Variable.get('slack_conf'),
  66. ms_teams_conf=Variable.get('ms_teams_conf'),
  67. task_id=context['task'].task_id,
  68. dag_id=context['task'].dag_id,
  69. comment='Ongoing seqruns found: {0}'.format(ongoing_seqruns),
  70. reaction='pass')
  71. return branch_list
  72. except Exception as e:
  73. logging.error(e)
  74. send_log_to_channels(
  75. slack_conf=Variable.get('slack_conf'),
  76. ms_teams_conf=Variable.get('ms_teams_conf'),
  77. task_id=context['task'].task_id,
  78. dag_id=context['task'].dag_id,
  79. comment=e,
  80. reaction='fail')
  81. raise
  82. def copy_seqrun_manifest_file(**context):
  83. """
  84. A function for copying filesize manifest for ongoing sequencing runs to hpc
  85. """
  86. try:
  87. remote_file_path = context['params'].get('file_path')
  88. seqrun_server = Variable.get('seqrun_server')
  89. seqrun_server_user = Variable.get('seqrun_server_user')
  90. xcom_pull_task_ids = context['params'].get('xcom_pull_task_ids')
  91. ti = context.get('ti')
  92. remote_file_path = ti.xcom_pull(task_ids=xcom_pull_task_ids)
  93. if remote_file_path is not None and \
  94. not isinstance(remote_file_path,str):
  95. remote_file_path = remote_file_path.decode().strip('\n')
  96. tmp_work_dir = get_temp_dir(use_ephemeral_space=True)
  97. local_file_path = \
  98. os.path.join(
  99. tmp_work_dir,
  100. os.path.basename(remote_file_path))
  101. remote_address = \
  102. '{0}@{1}'.format(seqrun_server_user,seqrun_server)
  103. copy_remote_file(
  104. remote_file_path,
  105. local_file_path,
  106. source_address=remote_address)
  107. return local_file_path
  108. except Exception as e:
  109. logging.error(e)
  110. send_log_to_channels(
  111. slack_conf=Variable.get('slack_conf'),
  112. ms_teams_conf=Variable.get('ms_teams_conf'),
  113. task_id=context['task'].task_id,
  114. dag_id=context['task'].dag_id,
  115. comment=e,
  116. reaction='fail')
  117. raise
  118. def reset_manifest_file(**context):
  119. """
  120. A function for checking existing files and resetting the manifest json with new files
  121. """
  122. try:
  123. xcom_pull_task_ids = context['params'].get('xcom_pull_task_ids')
  124. local_seqrun_path = context['params'].get('local_seqrun_path')
  125. seqrun_id_pull_key = context['params'].get('seqrun_id_pull_key')
  126. seqrun_id_pull_task_ids = context['params'].get('seqrun_id_pull_task_ids')
  127. run_index_number = context['params'].get('run_index_number')
  128. ti = context.get('ti')
  129. json_path = ti.xcom_pull(task_ids=xcom_pull_task_ids)
  130. if json_path is not None and \
  131. not isinstance(json_path,str):
  132. json_path = json_path.decode().strip('\n')
  133. seqrun_id = \
  134. ti.xcom_pull(key=seqrun_id_pull_key,task_ids=seqrun_id_pull_task_ids)[run_index_number]
  135. compare_existing_seqrun_files(
  136. json_path=json_path,
  137. seqrun_id=seqrun_id,
  138. seqrun_base_path=local_seqrun_path)
  139. except Exception as e:
  140. logging.error(e)
  141. send_log_to_channels(
  142. slack_conf=Variable.get('slack_conf'),
  143. ms_teams_conf=Variable.get('ms_teams_conf'),
  144. task_id=context['task'].task_id,
  145. dag_id=context['task'].dag_id,
  146. comment=e,
  147. reaction='fail')
  148. raise
  149. def get_seqrun_chunks(**context):
  150. """
  151. A function for setting file chunk size for seqrun files copy
  152. """
  153. try:
  154. ti = context.get('ti')
  155. worker_size = context['params'].get('worker_size')
  156. child_task_prefix = context['params'].get('child_task_prefix')
  157. seqrun_chunk_size_key = context['params'].get('seqrun_chunk_size_key')
  158. xcom_pull_task_ids = context['params'].get('xcom_pull_task_ids')
  159. file_path = ti.xcom_pull(task_ids=xcom_pull_task_ids)
  160. if file_path is not None and \
  161. not isinstance(file_path,str):
  162. file_path = file_path.decode().strip('\n')
  163. check_file_path(file_path)
  164. file_data = read_json_data(file_path)
  165. chunk_size = None
  166. if worker_size is None or \
  167. worker_size == 0:
  168. raise ValueError(
  169. 'Incorrect worker size: {0}'.\
  170. format(worker_size))
  171. if len(file_data) == 0:
  172. worker_branchs = \
  173. '{0}_{1}'.format(child_task_prefix,'no_work')
  174. else:
  175. if len(file_data) < int(5 * worker_size):
  176. worker_size = 1 # setting worker size to 1 for low input
  177. if len(file_data) % worker_size == 0:
  178. chunk_size = int(len(file_data) / worker_size)
  179. else:
  180. chunk_size = int(len(file_data) / worker_size)+1
  181. ti.xcom_push(key=seqrun_chunk_size_key,value=chunk_size)
  182. worker_branchs = \
  183. ['{0}_{1}'.format(child_task_prefix,i)
  184. for i in range(worker_size)]
  185. return worker_branchs
  186. except Exception as e:
  187. logging.error(e)
  188. send_log_to_channels(
  189. slack_conf=Variable.get('slack_conf'),
  190. ms_teams_conf=Variable.get('ms_teams_conf'),
  191. task_id=context['task'].task_id,
  192. dag_id=context['task'].dag_id,
  193. comment=e,
  194. reaction='fail')
  195. raise
  196. def copy_seqrun_chunk(**context):
  197. """
  198. A function for copying seqrun chunks
  199. """
  200. try:
  201. ti = context.get('ti')
  202. file_path_task_ids = context['params'].get('file_path_task_ids')
  203. seqrun_chunk_size_key = context['params'].get('seqrun_chunk_size_key')
  204. seqrun_chunk_size_task_ids = context['params'].get('seqrun_chunk_size_task_ids')
  205. chunk_index_number = context['params'].get('chunk_index_number')
  206. run_index_number = context['params'].get('run_index_number')
  207. local_seqrun_path = context['params'].get('local_seqrun_path')
  208. seqrun_id_pull_key = context['params'].get('seqrun_id_pull_key')
  209. seqrun_id_pull_task_ids = context['params'].get('seqrun_id_pull_task_ids')
  210. seqrun_server = Variable.get('seqrun_server')
  211. seqrun_server_user = Variable.get('seqrun_server_user')
  212. seqrun_base_path = Variable.get('seqrun_base_path')
  213. seqrun_id = \
  214. ti.xcom_pull(key=seqrun_id_pull_key,task_ids=seqrun_id_pull_task_ids)[run_index_number]
  215. file_path = \
  216. ti.xcom_pull(task_ids=file_path_task_ids)
  217. chunk_size = \
  218. ti.xcom_pull(key=seqrun_chunk_size_key,task_ids=seqrun_chunk_size_task_ids)
  219. check_file_path(file_path)
  220. file_data = read_json_data(file_path)
  221. start_index = chunk_index_number*chunk_size
  222. finish_index = ((chunk_index_number+1)*chunk_size) - 1
  223. if finish_index > len(file_data) - 1:
  224. finish_index = len(file_data) - 1
  225. local_seqrun_path = \
  226. os.path.join(
  227. local_seqrun_path,
  228. seqrun_id)
  229. remote_seqrun_path = \
  230. os.path.join(
  231. seqrun_base_path,
  232. seqrun_id)
  233. remote_address = \
  234. '{0}@{1}'.format(
  235. seqrun_server_user,
  236. seqrun_server)
  237. for entry in file_data[start_index:finish_index]:
  238. file_path = entry.get('file_path')
  239. file_size = entry.get('file_size')
  240. remote_path = \
  241. os.path.join(
  242. remote_seqrun_path,
  243. file_path)
  244. local_path = \
  245. os.path.join(
  246. local_seqrun_path,
  247. file_path)
  248. if os.path.exists(local_path) and \
  249. os.path.getsize(local_path) == file_size:
  250. pass
  251. else:
  252. copy_remote_file(
  253. remote_path,
  254. local_path,
  255. source_address=remote_address,
  256. check_file=False)
  257. except Exception as e:
  258. logging.error(e)
  259. send_log_to_channels(
  260. slack_conf=Variable.get('slack_conf'),
  261. ms_teams_conf=Variable.get('ms_teams_conf'),
  262. task_id=context['task'].task_id,
  263. dag_id=context['task'].dag_id,
  264. comment=e,
  265. reaction='fail')
  266. raise
  267. def run_interop_dump(**context):
  268. """
  269. A function for generating InterOp dump for seqrun
  270. """
  271. try:
  272. ti = context.get('ti')
  273. local_seqrun_path = Variable.get('hpc_seqrun_path')
  274. seqrun_id_pull_key = context['params'].get('seqrun_id_pull_key')
  275. seqrun_id_pull_task_ids = context['params'].get('seqrun_id_pull_task_ids')
  276. run_index_number = context['params'].get('run_index_number')
  277. interop_dumptext_exe = Variable.get('interop_dumptext_exe')
  278. temp_dir = get_temp_dir(use_ephemeral_space=True)
  279. seqrun_id = \
  280. ti.xcom_pull(key=seqrun_id_pull_key,task_ids=seqrun_id_pull_task_ids)[run_index_number]
  281. seqrun_path = \
  282. os.path.join(
  283. local_seqrun_path,
  284. seqrun_id)
  285. dump_file = \
  286. os.path.join(
  287. temp_dir,
  288. '{0}_interop_dump.csv'.format(seqrun_id))
  289. check_file_path(interop_dumptext_exe)
  290. cmd = \
  291. [interop_dumptext_exe,seqrun_path,'>',dump_file]
  292. cmd = ' '.join(cmd)
  293. subprocess.\
  294. check_call(cmd,shell=True)
  295. return dump_file
  296. except Exception as e:
  297. logging.error(e)
  298. send_log_to_channels(
  299. slack_conf=Variable.get('slack_conf'),
  300. ms_teams_conf=Variable.get('ms_teams_conf'),
  301. task_id=context['task'].task_id,
  302. dag_id=context['task'].dag_id,
  303. comment=e,
  304. reaction='fail')
  305. raise
  306. def check_progress_for_run_func(**context):
  307. try:
  308. ti = context.get('ti')
  309. local_seqrun_path = Variable.get('hpc_seqrun_path')
  310. seqrun_id_pull_key = context['params'].get('seqrun_id_pull_key')
  311. seqrun_id_pull_task_ids = context['params'].get('seqrun_id_pull_task_ids')
  312. run_index_number = context['params'].get('run_index_number')
  313. interop_dump_pull_task = context['params'].get('interop_dump_pull_task')
  314. seqrun_id = \
  315. ti.xcom_pull(key=seqrun_id_pull_key,task_ids=seqrun_id_pull_task_ids)[run_index_number]
  316. runinfo_path = \
  317. os.path.join(
  318. local_seqrun_path,
  319. seqrun_id,
  320. 'RunInfo.xml')
  321. interop_dump_path = \
  322. ti.xcom_pull(task_ids=interop_dump_pull_task)
  323. if interop_dump_path is not None and \
  324. not isinstance(interop_dump_path,str):
  325. interop_dump_path = \
  326. interop_dump_path.decode().strip('\n')
  327. current_cycle,index_cycle_status,read_format = \
  328. check_for_sequencing_progress(
  329. interop_dump=interop_dump_path,
  330. runinfo_file=runinfo_path)
  331. comment = \
  332. 'current cycle: {0}, index cycle: {1}, read format: {2}'.\
  333. format(current_cycle,index_cycle_status,read_format)
  334. send_log_to_channels(
  335. slack_conf=Variable.get('slack_conf'),
  336. ms_teams_conf=Variable.get('ms_teams_conf'),
  337. task_id=context['task'].task_id,
  338. dag_id=context['task'].dag_id,
  339. comment=comment,
  340. reaction='pass')
  341. except Exception as e:
  342. logging.error(e)
  343. send_log_to_channels(
  344. slack_conf=Variable.get('slack_conf'),
  345. ms_teams_conf=Variable.get('ms_teams_conf'),
  346. task_id=context['task'].task_id,
  347. dag_id=context['task'].dag_id,
  348. comment=e,
  349. reaction='fail')
  350. raise
  351. with dag:
  352. ## TASK
  353. generate_seqrun_list = \
  354. BranchPythonOperator(
  355. task_id='generate_seqrun_list',
  356. dag=dag,
  357. queue='hpc_4G',
  358. python_callable=get_ongoing_seqrun_list)
  359. ## TASK
  360. no_ongoing_seqrun = \
  361. DummyOperator(
  362. task_id='no_ongoing_seqrun',
  363. dag=dag,
  364. queue='hpc_4G',
  365. on_success_callback=log_sleep)
  366. ## TASK
  367. tasks = list()
  368. for i in range(5):
  369. generate_seqrun_file_list = \
  370. SSHOperator(
  371. task_id='generate_seqrun_file_list_{0}'.format(i),
  372. dag=dag,
  373. pool='orwell_exe_pool',
  374. ssh_hook=orwell_ssh_hook,
  375. do_xcom_push=True,
  376. queue='hpc_4G',
  377. params={'source_task_id':'generate_seqrun_list',
  378. 'pull_key':'ongoing_seqruns',
  379. 'index_number':i},
  380. command="""
  381. source /home/igf/igf_code/airflow/env.sh; \
  382. python /home/igf/igf_code/airflow/data-management-python/scripts/seqrun_processing/create_file_list_for_ongoing_seqrun.py \
  383. --seqrun_base_dir /home/igf/seqrun/illumina \
  384. --output_path /home/igf/ongoing_run_tracking \
  385. --seqrun_id {{ ti.xcom_pull(key=params.pull_key,task_ids=params.source_task_id)[ params.index_number ] }}
  386. """)
  387. ## TASK
  388. copy_seqrun_file_list = \
  389. PythonOperator(
  390. task_id='copy_seqrun_file_list_{0}'.format(i),
  391. dag=dag,
  392. pool='orwell_scp_pool',
  393. queue='hpc_4G',
  394. params={'xcom_pull_task_ids':'generate_seqrun_file_list_{0}'.format(i)},
  395. python_callable=copy_seqrun_manifest_file)
  396. ## TASK
  397. compare_seqrun_files = \
  398. PythonOperator(
  399. task_id='compare_seqrun_files_{0}'.format(i),
  400. dag=dag,
  401. queue='hpc_4G',
  402. params={'xcom_pull_task_ids':'copy_seqrun_file_list_{0}'.format(i),
  403. 'seqrun_id_pull_key':'ongoing_seqruns',
  404. 'run_index_number':i,
  405. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  406. 'local_seqrun_path':Variable.get('hpc_seqrun_path')},
  407. python_callable=reset_manifest_file)
  408. ## TASK
  409. decide_copy_branch = \
  410. BranchPythonOperator(
  411. task_id='decide_copy_branch_{0}'.format(i),
  412. dag=dag,
  413. queue='hpc_4G',
  414. params={'xcom_pull_task_ids':'copy_seqrun_file_list_{0}'.format(i),
  415. 'worker_size':10,
  416. 'seqrun_chunk_size_key':'seqrun_chunk_size',
  417. 'child_task_prefix':'copy_file_run_{0}_chunk'.format(i)},
  418. python_callable=get_seqrun_chunks)
  419. ## TASK
  420. no_copy_seqrun = \
  421. DummyOperator(
  422. task_id='copy_file_run_{0}_chunk_{1}'.format(i,'no_work'),
  423. dag=dag,
  424. queue='hpc_4G',
  425. on_success_callback=log_sleep)
  426. ## TASK
  427. copy_seqrun_files = list()
  428. for j in range(10):
  429. copy_file_chunk = \
  430. PythonOperator(
  431. task_id='copy_file_run_{0}_chunk_{1}'.format(i,j),
  432. dag=dag,
  433. queue='hpc_4G',
  434. pool='orwell_scp_pool',
  435. params={'file_path_task_ids':'copy_seqrun_file_list_{0}'.format(i),
  436. 'seqrun_chunk_size_key':'seqrun_chunk_size',
  437. 'seqrun_chunk_size_task_ids':'decide_copy_branch_{0}'.format(i),
  438. 'run_index_number':i,
  439. 'chunk_index_number':j,
  440. 'seqrun_id_pull_key':'ongoing_seqruns',
  441. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  442. 'local_seqrun_path':Variable.get('hpc_seqrun_path')},
  443. python_callable=copy_seqrun_chunk)
  444. copy_seqrun_files.append(copy_file_chunk)
  445. ## PIPELINE
  446. generate_seqrun_list >> generate_seqrun_file_list >> copy_seqrun_file_list >> compare_seqrun_files >> decide_copy_branch
  447. decide_copy_branch >> no_copy_seqrun
  448. decide_copy_branch >> copy_seqrun_files
  449. ## TASK
  450. wait_for_copy_chunk = \
  451. DummyOperator(
  452. task_id='wait_for_copy_chunk_run_{0}'.format(i),
  453. dag=dag,
  454. trigger_rule='none_failed_or_skipped',
  455. queue='hpc_4G')
  456. ## PIPELINE
  457. copy_seqrun_files >> wait_for_copy_chunk
  458. ## TASK
  459. create_interop_dump = \
  460. PythonOperator(
  461. task_id='create_interop_dump_run_{0}'.format(i),
  462. dag=dag,
  463. queue='hpc_4G',
  464. params={'run_index_number':i,
  465. 'seqrun_id_pull_key':'ongoing_seqruns',
  466. 'seqrun_id_pull_task_ids':'generate_seqrun_list'},
  467. python_callable=run_interop_dump)
  468. ## PIPELINE
  469. wait_for_copy_chunk >> create_interop_dump
  470. ## TASK
  471. check_progress_for_run = \
  472. PythonOperator(
  473. task_id='check_progress_for_run_{0}'.format(i),
  474. dag=dag,
  475. queue='hpc_4G',
  476. params={'run_index_number':i,
  477. 'seqrun_id_pull_key':'ongoing_seqruns',
  478. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  479. 'interop_dump_pull_task':'create_interop_dump_run_{0}'.format(i)},
  480. python_callable=check_progress_for_run_func)
  481. ## PIPELINE
  482. create_interop_dump >> check_progress_for_run
  483. ## PIPELINE
  484. generate_seqrun_list >> no_ongoing_seqrun