dag8_copy_ongoing_seqrun.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. from datetime import timedelta
  2. import os,json,logging
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.bash_operator import BashOperator
  6. from airflow.contrib.operators.ssh_operator import SSHOperator
  7. from airflow.operators.python_operator import PythonOperator,BranchPythonOperator
  8. from airflow.contrib.hooks.ssh_hook import SSHHook
  9. from airflow.operators.dummy_operator import DummyOperator
  10. from igf_airflow.seqrun.ongoing_seqrun_processing import fetch_ongoing_seqruns,compare_existing_seqrun_files
  11. from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
  12. from igf_data.utils.fileutils import get_temp_dir,copy_remote_file,check_file_path,read_json_data
  13. ## DEFAULT ARGS
  14. default_args = {
  15. 'owner': 'airflow',
  16. 'depends_on_past': False,
  17. 'start_date': days_ago(2),
  18. 'email_on_failure': False,
  19. 'email_on_retry': False,
  20. 'retries': 1,
  21. 'retry_delay': timedelta(minutes=5),
  22. 'provide_context': True,
  23. }
  24. ## SSH HOOKS
  25. orwell_ssh_hook = \
  26. SSHHook(
  27. key_file=Variable.get('hpc_ssh_key_file'),
  28. username=Variable.get('hpc_user'),
  29. remote_host=Variable.get('seqrun_server'))
  30. ## DAG
  31. dag = \
  32. DAG(
  33. dag_id='dag8_copy_ongoing_seqrun',
  34. catchup=False,
  35. schedule_interval="0 */2 * * *",
  36. max_active_runs=1,
  37. tags=['hpc'],
  38. default_args=default_args,
  39. orientation='LR')
  40. ## FUNCTIONS
  41. def get_ongoing_seqrun_list(**context):
  42. """
  43. A function for fetching ongoing sequencing run ids
  44. """
  45. try:
  46. ti = context.get('ti')
  47. seqrun_server = Variable.get('seqrun_server')
  48. seqrun_base_path = Variable.get('seqrun_base_path')
  49. seqrun_server_user = Variable.get('seqrun_server_user')
  50. database_config_file = Variable.get('database_config_file')
  51. ongoing_seqruns = \
  52. fetch_ongoing_seqruns(
  53. seqrun_server=seqrun_server,
  54. seqrun_base_path=seqrun_base_path,
  55. user_name=seqrun_server_user,
  56. database_config_file=database_config_file)
  57. ti.xcom_push(key='ongoing_seqruns',value=ongoing_seqruns)
  58. branch_list = ['generate_seqrun_file_list_{0}'.format(i[0])
  59. for i in enumerate(ongoing_seqruns)]
  60. if len(branch_list) == 0:
  61. branch_list = ['no_ongoing_seqrun']
  62. else:
  63. send_log_to_channels(
  64. slack_conf=Variable.get('slack_conf'),
  65. ms_teams_conf=Variable.get('ms_teams_conf'),
  66. task_id=context['task'].task_id,
  67. dag_id=context['task'].dag_id,
  68. comment='Ongoing seqruns found: {0}'.format(ongoing_seqruns),
  69. reaction='pass')
  70. return branch_list
  71. except Exception as e:
  72. logging.error(e)
  73. send_log_to_channels(
  74. slack_conf=Variable.get('slack_conf'),
  75. ms_teams_conf=Variable.get('ms_teams_conf'),
  76. task_id=context['task'].task_id,
  77. dag_id=context['task'].dag_id,
  78. comment=e,
  79. reaction='fail')
  80. raise
  81. def copy_seqrun_manifest_file(**context):
  82. """
  83. A function for copying filesize manifest for ongoing sequencing runs to hpc
  84. """
  85. try:
  86. remote_file_path = context['params'].get('file_path')
  87. seqrun_server = Variable.get('seqrun_server')
  88. seqrun_server_user = Variable.get('seqrun_server_user')
  89. xcom_pull_task_ids = context['params'].get('xcom_pull_task_ids')
  90. ti = context.get('ti')
  91. remote_file_path = ti.xcom_pull(task_ids=xcom_pull_task_ids)
  92. if remote_file_path is not None and \
  93. not isinstance(remote_file_path,str):
  94. remote_file_path = remote_file_path.decode().strip('\n')
  95. tmp_work_dir = get_temp_dir(use_ephemeral_space=True)
  96. local_file_path = \
  97. os.path.join(
  98. tmp_work_dir,
  99. os.path.basename(remote_file_path))
  100. remote_address = \
  101. '{0}@{1}'.format(seqrun_server_user,seqrun_server)
  102. copy_remote_file(
  103. remote_file_path,
  104. local_file_path,
  105. source_address=remote_address)
  106. return local_file_path
  107. except Exception as e:
  108. logging.error(e)
  109. send_log_to_channels(
  110. slack_conf=Variable.get('slack_conf'),
  111. ms_teams_conf=Variable.get('ms_teams_conf'),
  112. task_id=context['task'].task_id,
  113. dag_id=context['task'].dag_id,
  114. comment=e,
  115. reaction='fail')
  116. raise
  117. def reset_manifest_file(**context):
  118. """
  119. A function for checking existing files and resetting the manifest json with new files
  120. """
  121. try:
  122. xcom_pull_task_ids = context['params'].get('xcom_pull_task_ids')
  123. local_seqrun_path = context['params'].get('local_seqrun_path')
  124. seqrun_id_pull_key = context['params'].get('seqrun_id_pull_key')
  125. seqrun_id_pull_task_ids = context['params'].get('seqrun_id_pull_task_ids')
  126. run_index_number = context['params'].get('run_index_number')
  127. ti = context.get('ti')
  128. json_path = ti.xcom_pull(task_ids=xcom_pull_task_ids)
  129. if json_path is not None and \
  130. not isinstance(json_path,str):
  131. json_path = json_path.decode().strip('\n')
  132. seqrun_id = \
  133. ti.xcom_pull(key=seqrun_id_pull_key,task_ids=seqrun_id_pull_task_ids)[run_index_number]
  134. compare_existing_seqrun_files(
  135. json_path=json_path,
  136. seqrun_id=seqrun_id,
  137. seqrun_base_path=local_seqrun_path)
  138. except Exception as e:
  139. logging.error(e)
  140. send_log_to_channels(
  141. slack_conf=Variable.get('slack_conf'),
  142. ms_teams_conf=Variable.get('ms_teams_conf'),
  143. task_id=context['task'].task_id,
  144. dag_id=context['task'].dag_id,
  145. comment=e,
  146. reaction='fail')
  147. raise
  148. def get_seqrun_chunks(**context):
  149. """
  150. A function for setting file chunk size for seqrun files copy
  151. """
  152. try:
  153. ti = context.get('ti')
  154. worker_size = context['params'].get('worker_size')
  155. child_task_prefix = context['params'].get('child_task_prefix')
  156. seqrun_chunk_size_key = context['params'].get('seqrun_chunk_size_key')
  157. xcom_pull_task_ids = context['params'].get('xcom_pull_task_ids')
  158. file_path = ti.xcom_pull(task_ids=xcom_pull_task_ids)
  159. if file_path is not None and \
  160. not isinstance(file_path,str):
  161. file_path = file_path.decode().strip('\n')
  162. check_file_path(file_path)
  163. file_data = read_json_data(file_path)
  164. chunk_size = None
  165. if worker_size is None or \
  166. worker_size == 0:
  167. raise ValueError(
  168. 'Incorrect worker size: {0}'.\
  169. format(worker_size))
  170. if len(file_data) == 0:
  171. worker_branchs = \
  172. '{0}_{1}'.format(child_task_prefix,'no_work')
  173. else:
  174. if len(file_data) < int(5 * worker_size):
  175. worker_size = 1 # setting worker size to 1 for low input
  176. if len(file_data) % worker_size == 0:
  177. chunk_size = int(len(file_data) / worker_size)
  178. else:
  179. chunk_size = int(len(file_data) / worker_size)+1
  180. ti.xcom_push(key=seqrun_chunk_size_key,value=chunk_size)
  181. worker_branchs = \
  182. ['{0}_{1}'.format(child_task_prefix,i)
  183. for i in range(worker_size)]
  184. return worker_branchs
  185. except Exception as e:
  186. logging.error(e)
  187. send_log_to_channels(
  188. slack_conf=Variable.get('slack_conf'),
  189. ms_teams_conf=Variable.get('ms_teams_conf'),
  190. task_id=context['task'].task_id,
  191. dag_id=context['task'].dag_id,
  192. comment=e,
  193. reaction='fail')
  194. raise
  195. def copy_seqrun_chunk(**context):
  196. """
  197. A function for copying seqrun chunks
  198. """
  199. try:
  200. ti = context.get('ti')
  201. file_path_task_ids = context['params'].get('file_path_task_ids')
  202. seqrun_chunk_size_key = context['params'].get('seqrun_chunk_size_key')
  203. seqrun_chunk_size_task_ids = context['params'].get('seqrun_chunk_size_task_ids')
  204. chunk_index_number = context['params'].get('chunk_index_number')
  205. run_index_number = context['params'].get('run_index_number')
  206. local_seqrun_path = context['params'].get('local_seqrun_path')
  207. seqrun_id_pull_key = context['params'].get('seqrun_id_pull_key')
  208. seqrun_id_pull_task_ids = context['params'].get('seqrun_id_pull_task_ids')
  209. seqrun_server = Variable.get('seqrun_server')
  210. seqrun_server_user = Variable.get('seqrun_server_user')
  211. seqrun_base_path = Variable.get('seqrun_base_path')
  212. seqrun_id = \
  213. ti.xcom_pull(key=seqrun_id_pull_key,task_ids=seqrun_id_pull_task_ids)[run_index_number]
  214. file_path = \
  215. ti.xcom_pull(task_ids=file_path_task_ids)
  216. chunk_size = \
  217. ti.xcom_pull(key=seqrun_chunk_size_key,task_ids=seqrun_chunk_size_task_ids)
  218. check_file_path(file_path)
  219. file_data = read_json_data(file_path)
  220. start_index = chunk_index_number*chunk_size
  221. finish_index = ((chunk_index_number+1)*chunk_size) - 1
  222. if finish_index > len(file_data) - 1:
  223. finish_index = len(file_data) - 1
  224. local_seqrun_path = \
  225. os.path.join(
  226. local_seqrun_path,
  227. seqrun_id)
  228. remote_seqrun_path = \
  229. os.path.join(
  230. seqrun_base_path,
  231. seqrun_id)
  232. remote_address = \
  233. '{0}@{1}'.format(
  234. seqrun_server_user,
  235. seqrun_server)
  236. for entry in file_data[start_index:finish_index]:
  237. file_path = entry.get('file_path')
  238. file_size = entry.get('file_size')
  239. remote_path = \
  240. os.path.join(
  241. remote_seqrun_path,
  242. file_path)
  243. local_path = \
  244. os.path.join(
  245. local_seqrun_path,
  246. file_path)
  247. if os.path.exists(local_path) and \
  248. os.path.getsize(local_path) == file_size:
  249. pass
  250. else:
  251. copy_remote_file(
  252. remote_path,
  253. local_path,
  254. source_address=remote_address,
  255. check_file=False)
  256. except Exception as e:
  257. logging.error(e)
  258. send_log_to_channels(
  259. slack_conf=Variable.get('slack_conf'),
  260. ms_teams_conf=Variable.get('ms_teams_conf'),
  261. task_id=context['task'].task_id,
  262. dag_id=context['task'].dag_id,
  263. comment=e,
  264. reaction='fail')
  265. raise
  266. with dag:
  267. ## TASK
  268. generate_seqrun_list = \
  269. BranchPythonOperator(
  270. task_id='generate_seqrun_list',
  271. dag=dag,
  272. queue='hpc_4G',
  273. python_callable=get_ongoing_seqrun_list)
  274. ## TASK
  275. no_ongoing_seqrun = \
  276. DummyOperator(
  277. task_id='no_ongoing_seqrun',
  278. dag=dag,
  279. queue='hpc_4G',
  280. on_success_callback=log_sleep)
  281. ## TASK
  282. tasks = list()
  283. for i in range(5):
  284. generate_seqrun_file_list = \
  285. SSHOperator(
  286. task_id='generate_seqrun_file_list_{0}'.format(i),
  287. dag=dag,
  288. pool='orwell_exe_pool',
  289. ssh_hook=orwell_ssh_hook,
  290. do_xcom_push=True,
  291. queue='hpc_4G',
  292. params={'source_task_id':'generate_seqrun_list',
  293. 'pull_key':'ongoing_seqruns',
  294. 'index_number':i},
  295. command="""
  296. source /home/igf/igf_code/airflow/env.sh; \
  297. python /home/igf/igf_code/airflow/data-management-python/scripts/seqrun_processing/create_file_list_for_ongoing_seqrun.py \
  298. --seqrun_base_dir /home/igf/seqrun/illumina \
  299. --output_path /home/igf/ongoing_run_tracking \
  300. --seqrun_id {{ ti.xcom_pull(key=params.pull_key,task_ids=params.source_task_id)[ params.index_number ] }}
  301. """)
  302. ## TASK
  303. copy_seqrun_file_list = \
  304. PythonOperator(
  305. task_id='copy_seqrun_file_list_{0}'.format(i),
  306. dag=dag,
  307. pool='orwell_scp_pool',
  308. queue='hpc_4G',
  309. params={'xcom_pull_task_ids':'generate_seqrun_file_list_{0}'.format(i)},
  310. python_callable=copy_seqrun_manifest_file)
  311. ## TASK
  312. compare_seqrun_files = \
  313. PythonOperator(
  314. task_id='compare_seqrun_files_{0}'.format(i),
  315. dag=dag,
  316. queue='hpc_4G',
  317. params={'xcom_pull_task_ids':'copy_seqrun_file_list_{0}'.format(i),
  318. 'seqrun_id_pull_key':'ongoing_seqruns',
  319. 'run_index_number':i,
  320. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  321. 'local_seqrun_path':Variable.get('hpc_seqrun_path')},
  322. python_callable=reset_manifest_file)
  323. ## TASK
  324. decide_copy_branch = \
  325. BranchPythonOperator(
  326. task_id='decide_copy_branch_{0}'.format(i),
  327. dag=dag,
  328. queue='hpc_4G',
  329. params={'xcom_pull_task_ids':'copy_seqrun_file_list_{0}'.format(i),
  330. 'worker_size':10,
  331. 'seqrun_chunk_size_key':'seqrun_chunk_size',
  332. 'child_task_prefix':'copy_file_run_{0}_chunk'.format(i)},
  333. python_callable=get_seqrun_chunks)
  334. ## TASK
  335. no_copy_seqrun = \
  336. DummyOperator(
  337. task_id='copy_file_run_{0}_chunk_{1}'.format(i,'no_work'),
  338. dag=dag,
  339. queue='hpc_4G',
  340. on_success_callback=log_sleep)
  341. ## TASK
  342. copy_seqrun_files = list()
  343. for j in range(10):
  344. copy_file_chunk = \
  345. PythonOperator(
  346. task_id='copy_file_run_{0}_chunk_{1}'.format(i,j),
  347. dag=dag,
  348. queue='hpc_4G',
  349. pool='orwell_scp_pool',
  350. params={'file_path_task_ids':'copy_seqrun_file_list_{0}'.format(i),
  351. 'seqrun_chunk_size_key':'seqrun_chunk_size',
  352. 'seqrun_chunk_size_task_ids':'decide_copy_branch_{0}'.format(i),
  353. 'run_index_number':i,
  354. 'chunk_index_number':j,
  355. 'seqrun_id_pull_key':'ongoing_seqruns',
  356. 'seqrun_id_pull_task_ids':'generate_seqrun_list',
  357. 'local_seqrun_path':Variable.get('hpc_seqrun_path')},
  358. python_callable=copy_seqrun_chunk)
  359. copy_seqrun_files.append(copy_file_chunk)
  360. generate_seqrun_list >> generate_seqrun_file_list >> copy_seqrun_file_list >> compare_seqrun_files >> decide_copy_branch
  361. decide_copy_branch >> no_copy_seqrun
  362. decide_copy_branch >> copy_seqrun_files
  363. ## PIPELINE
  364. generate_seqrun_list >> no_ongoing_seqrun