dag9_tenx_single_cell_immune_profiling.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738
  1. from datetime import timedelta
  2. import os,json,logging,subprocess
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dummy_operator import DummyOperator
  8. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import fetch_analysis_info_and_branch_func
  9. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import configure_cellranger_run_func
  10. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_sc_read_trimmming_func
  11. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_cellranger_tool
  12. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import decide_analysis_branch_func
  13. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_cellranger_result_to_db_func
  14. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import ftp_files_upload_for_analysis
  15. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import irods_files_upload_for_analysis
  16. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_scanpy_for_sc_5p_func
  17. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_singlecell_notebook_wrapper_func
  18. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_analysis_files_func
  19. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import task_branch_function
  20. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import upload_analysis_file_to_box
  21. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import convert_bam_to_cram_func
  22. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_picard_for_cellranger
  23. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_samtools_for_cellranger
  24. ## ARGS
  25. default_args = {
  26. 'owner': 'airflow',
  27. 'depends_on_past': False,
  28. 'start_date': days_ago(2),
  29. 'email_on_failure': False,
  30. 'email_on_retry': False,
  31. 'retries': 4,
  32. 'max_active_runs':10,
  33. 'catchup':False,
  34. 'retry_delay': timedelta(minutes=5),
  35. 'provide_context': True,
  36. }
  37. FEATURE_TYPE_LIST = Variable.get('tenx_single_cell_immune_profiling_feature_types').split(',')
  38. ## DAG
  39. dag = \
  40. DAG(
  41. dag_id='dag9_tenx_single_cell_immune_profiling',
  42. schedule_interval=None,
  43. tags=['hpc','analysis','tenx','sc'],
  44. default_args=default_args,
  45. orientation='LR')
  46. with dag:
  47. ## TASK
  48. fetch_analysis_info_and_branch = \
  49. BranchPythonOperator(
  50. task_id='fetch_analysis_info',
  51. dag=dag,
  52. queue='hpc_4G',
  53. params={'no_analysis_task':'no_analysis',
  54. 'analysis_description_xcom_key':'analysis_description',
  55. 'analysis_info_xcom_key':'analysis_info'},
  56. python_callable=fetch_analysis_info_and_branch_func)
  57. ## TASK
  58. configure_cellranger_run = \
  59. PythonOperator(
  60. task_id='configure_cellranger_run',
  61. dag=dag,
  62. queue='hpc_4G',
  63. trigger_rule='none_failed_or_skipped',
  64. params={'xcom_pull_task_id':'fetch_analysis_info',
  65. 'analysis_description_xcom_key':'analysis_description',
  66. 'analysis_info_xcom_key':'analysis_info',
  67. 'library_csv_xcom_key':'cellranger_library_csv'},
  68. python_callable=configure_cellranger_run_func)
  69. for analysis_name in FEATURE_TYPE_LIST:
  70. ## TASK
  71. task_branch = \
  72. BranchPythonOperator(
  73. task_id=analysis_name,
  74. dag=dag,
  75. queue='hpc_4G',
  76. params={'xcom_pull_task_id':'fetch_analysis_info',
  77. 'analysis_info_xcom_key':'analysis_info',
  78. 'analysis_name':analysis_name,
  79. 'task_prefix':'run_trim'},
  80. python_callable=task_branch_function)
  81. run_trim_list = list()
  82. for run_id in range(0,10):
  83. ## TASK
  84. t = \
  85. PythonOperator(
  86. task_id='run_trim_{0}_{1}'.format(analysis_name,run_id),
  87. dag=dag,
  88. queue='hpc_4G',
  89. params={'xcom_pull_task_id':'fetch_analysis_info',
  90. 'analysis_info_xcom_key':'analysis_info',
  91. 'analysis_description_xcom_key':'analysis_description',
  92. 'analysis_name':analysis_name,
  93. 'run_id':run_id,
  94. 'r1_length':0,
  95. 'r2_length':0,
  96. 'fastq_input_dir_tag':'fastq_dir',
  97. 'fastq_output_dir_tag':'output_path'},
  98. python_callable=run_sc_read_trimmming_func)
  99. run_trim_list.append(t)
  100. ## TASK
  101. collect_trimmed_files = \
  102. DummyOperator(
  103. task_id='collect_trimmed_files_{0}'.format(analysis_name),
  104. trigger_rule='none_failed_or_skipped',
  105. dag=dag)
  106. ## PIPELINE
  107. fetch_analysis_info_and_branch >> task_branch
  108. task_branch >> run_trim_list
  109. run_trim_list >> collect_trimmed_files
  110. collect_trimmed_files >> configure_cellranger_run
  111. ## TASK
  112. no_analysis = \
  113. DummyOperator(
  114. task_id='no_analysis',
  115. dag=dag)
  116. ## PIPELINE
  117. fetch_analysis_info_and_branch >> no_analysis
  118. ## TASK
  119. run_cellranger = \
  120. PythonOperator(
  121. task_id='run_cellranger',
  122. dag=dag,
  123. queue='hpc_64G16t24hr',
  124. params={'analysis_description_xcom_pull_task':'fetch_analysis_info',
  125. 'analysis_description_xcom_key':'analysis_description',
  126. 'library_csv_xcom_key':'cellranger_library_csv',
  127. 'library_csv_xcom_pull_task':'configure_cellranger_run',
  128. 'cellranger_xcom_key':'cellranger_output',
  129. 'cellranger_options':['--localcores 16','--localmem 64']},
  130. python_callable=run_cellranger_tool)
  131. ## PIPELINE
  132. configure_cellranger_run >> run_cellranger
  133. ## TASK
  134. decide_analysis_branch = \
  135. BranchPythonOperator(
  136. task_id='decide_analysis_branch',
  137. dag=dag,
  138. queue='hpc_4G',
  139. python_callable=decide_analysis_branch_func,
  140. params={'load_cellranger_result_to_db_task':'load_cellranger_result_to_db',
  141. 'run_scanpy_for_sc_5p_task':'run_scanpy_for_sc_5p',
  142. 'run_scirpy_for_vdj_task':'run_scirpy_for_vdj',
  143. 'run_scirpy_for_vdj_b_task':'run_scirpy_for_vdj_b',
  144. 'run_scirpy_vdj_t_task':'run_scirpy_for_vdj_t',
  145. 'run_seurat_for_sc_5p_task':'run_seurat_for_sc_5p',
  146. 'run_picard_alignment_summary_task':'run_picard_alignment_summary',
  147. 'convert_bam_to_cram_task':'convert_bam_to_cram',
  148. 'library_csv_xcom_key':'cellranger_library_csv',
  149. 'library_csv_xcom_pull_task':'configure_cellranger_run'})
  150. ## PIPELINE
  151. run_cellranger >> decide_analysis_branch
  152. ## TASK
  153. load_cellranger_result_to_db = \
  154. PythonOperator(
  155. task_id='load_cellranger_result_to_db',
  156. dag=dag,
  157. queue='hpc_4G',
  158. python_callable=load_cellranger_result_to_db_func,
  159. params={'analysis_description_xcom_pull_task':'fetch_analysis_info',
  160. 'analysis_description_xcom_key':'analysis_description',
  161. 'cellranger_xcom_key':'cellranger_output',
  162. 'cellranger_xcom_pull_task':'run_cellranger',
  163. 'collection_type':'CELLRANGER_MULTI',
  164. 'collection_table':'sample',
  165. 'xcom_collection_name_key':'sample_igf_id',
  166. 'genome_column':'genome_build',
  167. 'analysis_name':'cellranger_multi',
  168. 'output_xcom_key':'loaded_output_files',
  169. 'html_xcom_key':'html_report_file',
  170. 'html_report_file_name':'web_summary.html'})
  171. upload_cellranger_report_to_ftp = \
  172. PythonOperator(
  173. task_id='upload_cellranger_report_to_ftp',
  174. dag=dag,
  175. queue='hpc_4G',
  176. python_callable=ftp_files_upload_for_analysis,
  177. params={'xcom_pull_task':'load_cellranger_result_to_db',
  178. 'xcom_pull_files_key':'html_report_file',
  179. 'collection_name_task':'load_cellranger_result_to_db',
  180. 'collection_name_key':'sample_igf_id',
  181. 'collection_type':'FTP_CELLRANGER_MULTI',
  182. 'collection_table':'sample',
  183. 'collect_remote_file':True})
  184. upload_cellranger_report_to_box = \
  185. PythonOperator(
  186. task_id='upload_cellranger_report_to_box',
  187. dag=dag,
  188. queue='hpc_4G',
  189. python_callable=None,
  190. params={'xcom_pull_task':'load_cellranger_result_to_db',
  191. 'xcom_pull_files_key':'html_report_file'})
  192. upload_cellranger_results_to_irods = \
  193. PythonOperator(
  194. task_id='upload_cellranger_results_to_irods',
  195. dag=dag,
  196. queue='hpc_4G',
  197. python_callable=irods_files_upload_for_analysis,
  198. params={'xcom_pull_task':'load_cellranger_result_to_db',
  199. 'xcom_pull_files_key':'loaded_output_files',
  200. 'collection_name_key':'sample_igf_id',
  201. 'analysis_name':'cellranger_multi'})
  202. ## PIPELINE
  203. decide_analysis_branch >> load_cellranger_result_to_db
  204. load_cellranger_result_to_db >> upload_cellranger_report_to_ftp
  205. load_cellranger_result_to_db >> upload_cellranger_report_to_box
  206. load_cellranger_result_to_db >> upload_cellranger_results_to_irods
  207. ## TASK
  208. run_scanpy_for_sc_5p = \
  209. PythonOperator(
  210. task_id='run_scanpy_for_sc_5p',
  211. dag=dag,
  212. queue='hpc_4G',
  213. python_callable=run_singlecell_notebook_wrapper_func,
  214. params={'cellranger_xcom_key':'cellranger_output',
  215. 'cellranger_xcom_pull_task':'run_cellranger',
  216. 'scanpy_timeout':1200,
  217. 'allow_errors':False,
  218. 'kernel_name':'python3',
  219. 'count_dir':'count',
  220. 'analysis_name':'scanpy',
  221. 'output_notebook_key':'scanpy_notebook',
  222. 'output_cellbrowser_key':'cellbrowser_dirs',
  223. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  224. 'analysis_description_xcom_key':'analysis_description'})
  225. load_scanpy_report_for_sc_5p_to_db = \
  226. PythonOperator(
  227. task_id='load_scanpy_report_for_sc_5p_to_db',
  228. dag=dag,
  229. queue='hpc_4G',
  230. python_callable=load_analysis_files_func,
  231. params={'collection_name_task':'load_cellranger_result_to_db',
  232. 'collection_name_key':'sample_igf_id',
  233. 'file_name_task':'run_scanpy_for_sc_5p',
  234. 'file_name_key':'scanpy_notebook',
  235. 'analysis_name':'scanpy_5p',
  236. 'collection_type':'SCANPY_HTML',
  237. 'collection_table':'sample',
  238. 'output_files_key':'output_db_files'})
  239. upload_scanpy_report_for_sc_5p_to_ftp = \
  240. PythonOperator(
  241. task_id='upload_scanpy_report_for_sc_5p_to_ftp',
  242. dag=dag,
  243. queue='hpc_4G',
  244. python_callable=ftp_files_upload_for_analysis,
  245. params={'xcom_pull_task':'load_scanpy_report_for_sc_5p_to_db',
  246. 'xcom_pull_files_key':'output_db_files',
  247. 'collection_name_task':'load_cellranger_result_to_db',
  248. 'collection_name_key':'sample_igf_id',
  249. 'collection_type':'FTP_SCANPY_HTML',
  250. 'collection_table':'sample',
  251. 'collect_remote_file':True})
  252. upload_scanpy_report_for_sc_5p_to_box = \
  253. PythonOperator(
  254. task_id='upload_scanpy_report_for_sc_5p_to_box',
  255. dag=dag,
  256. queue='hpc_4G',
  257. python_callable=upload_analysis_file_to_box,
  258. params={'xcom_pull_task':'load_scanpy_report_for_sc_5p_to_db',
  259. 'xcom_pull_files_key':'output_db_files',
  260. 'analysis_tag':'scanpy_single_sample_report'})
  261. upload_cellbrowser_for_sc_5p_to_ftp = \
  262. PythonOperator(
  263. task_id='upload_cellbrowser_for_sc_5p_to_ftp',
  264. dag=dag,
  265. queue='hpc_4G',
  266. python_callable=ftp_files_upload_for_analysis,
  267. params={'xcom_pull_task':'run_scanpy_for_sc_5p',
  268. 'xcom_pull_files_key':'cellbrowser_dirs',
  269. 'collection_name_task':'load_cellranger_result_to_db',
  270. 'collection_name_key':'sample_igf_id',
  271. 'collection_type':'FTP_CELLBROWSER',
  272. 'collection_table':'sample',
  273. 'collect_remote_file':True})
  274. ## PIPELINE
  275. decide_analysis_branch >> run_scanpy_for_sc_5p
  276. run_scanpy_for_sc_5p >> load_scanpy_report_for_sc_5p_to_db
  277. load_scanpy_report_for_sc_5p_to_db >> upload_scanpy_report_for_sc_5p_to_ftp
  278. load_scanpy_report_for_sc_5p_to_db >> upload_scanpy_report_for_sc_5p_to_box
  279. run_scanpy_for_sc_5p >> upload_cellbrowser_for_sc_5p_to_ftp
  280. ## TASK
  281. run_scirpy_for_vdj = \
  282. PythonOperator(
  283. task_id='run_scirpy_for_vdj',
  284. dag=dag,
  285. queue='hpc_4G',
  286. python_callable=run_singlecell_notebook_wrapper_func,
  287. params={'cellranger_xcom_key':'cellranger_output',
  288. 'cellranger_xcom_pull_task':'run_cellranger',
  289. 'scanpy_timeout':1200,
  290. 'allow_errors':False,
  291. 'kernel_name':'python3',
  292. 'analysis_name':'scirpy',
  293. 'vdj_dir':'vdj',
  294. 'count_dir':'count',
  295. 'output_notebook_key':'scirpy_notebook',
  296. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  297. 'analysis_description_xcom_key':'analysis_description'})
  298. load_scirpy_report_for_vdj_to_db = \
  299. PythonOperator(
  300. task_id='load_scirpy_report_for_vdj_to_db',
  301. dag=dag,
  302. queue='hpc_4G',
  303. python_callable=load_analysis_files_func,
  304. params={'collection_name_task':'load_cellranger_result_to_db',
  305. 'collection_name_key':'sample_igf_id',
  306. 'file_name_task':'run_scirpy_for_vdj',
  307. 'file_name_key':'scirpy_notebook',
  308. 'analysis_name':'scirpy_vdj',
  309. 'collection_type':'SCIRPY_VDJ_HTML',
  310. 'collection_table':'sample',
  311. 'output_files_key':'output_db_files'})
  312. upload_scirpy_report_for_vdj_to_ftp = \
  313. PythonOperator(
  314. task_id='upload_scirpy_report_for_vdj_to_ftp',
  315. dag=dag,
  316. queue='hpc_4G',
  317. python_callable=ftp_files_upload_for_analysis,
  318. params={'xcom_pull_task':'load_scanpy_report_for_vdj_to_db',
  319. 'xcom_pull_files_key':'output_db_files',
  320. 'collection_name_task':'load_cellranger_result_to_db',
  321. 'collection_name_key':'sample_igf_id',
  322. 'collection_type':'FTP_SCIRPY_VDJ_HTML',
  323. 'collection_table':'sample',
  324. 'collect_remote_file':True})
  325. upload_scirpy_report_for_vdj_to_box = \
  326. PythonOperator(
  327. task_id='upload_scirpy_report_for_vdj_to_box',
  328. dag=dag,
  329. queue='hpc_4G',
  330. python_callable=upload_analysis_file_to_box,
  331. params={'xcom_pull_task':'load_scanpy_report_for_vdj_to_db',
  332. 'xcom_pull_files_key':'output_db_files',
  333. 'analysis_tag':'scirpy_vdj_single_sample_report'})
  334. ## PIPELINE
  335. decide_analysis_branch >> run_scirpy_for_vdj
  336. run_scirpy_for_vdj >> load_scirpy_report_for_vdj_to_db
  337. load_scirpy_report_for_vdj_to_db >> upload_scirpy_report_for_vdj_to_ftp
  338. load_scirpy_report_for_vdj_to_db >> upload_scirpy_report_for_vdj_to_box
  339. ## TASK
  340. run_scirpy_for_vdj_b = \
  341. PythonOperator(
  342. task_id='run_scirpy_for_vdj_b',
  343. dag=dag,
  344. queue='hpc_4G',
  345. python_callable=run_singlecell_notebook_wrapper_func,
  346. params={'cellranger_xcom_key':'cellranger_output',
  347. 'cellranger_xcom_pull_task':'run_cellranger',
  348. 'scanpy_timeout':1200,
  349. 'allow_errors':False,
  350. 'kernel_name':'python3',
  351. 'analysis_name':'scirpy',
  352. 'vdj_dir':'vdj_b',
  353. 'count_dir':'count',
  354. 'output_notebook_key':'scirpy_notebook',
  355. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  356. 'analysis_description_xcom_key':'analysis_description'})
  357. load_scirpy_report_for_vdj_b_to_db = \
  358. PythonOperator(
  359. task_id='load_scirpy_report_for_vdj_b_to_db',
  360. dag=dag,
  361. queue='hpc_4G',
  362. python_callable=load_analysis_files_func,
  363. params={'collection_name_task':'load_cellranger_result_to_db',
  364. 'collection_name_key':'sample_igf_id',
  365. 'file_name_task':'run_scirpy_for_vdj_b',
  366. 'file_name_key':'scirpy_notebook',
  367. 'analysis_name':'scirpy_vdj_b',
  368. 'collection_type':'SCIRPY_VDJ_B_HTML',
  369. 'collection_table':'sample',
  370. 'output_files_key':'output_db_files'})
  371. upload_scirpy_report_for_vdj_b_to_ftp = \
  372. PythonOperator(
  373. task_id='upload_scirpy_report_for_vdj_b_to_ftp',
  374. dag=dag,
  375. queue='hpc_4G',
  376. python_callable=ftp_files_upload_for_analysis,
  377. params={'xcom_pull_task':'load_scanpy_report_for_vdj_b_to_db',
  378. 'xcom_pull_files_key':'output_db_files',
  379. 'collection_name_task':'load_cellranger_result_to_db',
  380. 'collection_name_key':'sample_igf_id',
  381. 'collection_type':'FTP_SCIRPY_VDJ_B_HTML',
  382. 'collection_table':'sample',
  383. 'collect_remote_file':True})
  384. upload_scirpy_report_for_vdj_b_to_box = \
  385. PythonOperator(
  386. task_id='upload_scanpy_report_for_vdj_b_to_box',
  387. dag=dag,
  388. queue='hpc_4G',
  389. python_callable=upload_analysis_file_to_box,
  390. params={'xcom_pull_task':'load_scanpy_report_for_vdj_b_to_db',
  391. 'xcom_pull_files_key':'output_db_files',
  392. 'analysis_tag':'scirpy_vdj_b_single_sample_report'})
  393. ## PIPELINE
  394. decide_analysis_branch >> run_scirpy_for_vdj_b
  395. run_scirpy_for_vdj_b >> load_scirpy_report_for_vdj_b_to_db
  396. load_scirpy_report_for_vdj_b_to_db >> upload_scirpy_report_for_vdj_b_to_ftp
  397. load_scirpy_report_for_vdj_b_to_db >> upload_scirpy_report_for_vdj_b_to_box
  398. ## TASK
  399. run_scirpy_for_vdj_t = \
  400. PythonOperator(
  401. task_id='run_scirpy_for_vdj_t',
  402. dag=dag,
  403. queue='hpc_4G',
  404. python_callable=run_singlecell_notebook_wrapper_func,
  405. params={'cellranger_xcom_key':'cellranger_output',
  406. 'cellranger_xcom_pull_task':'run_cellranger',
  407. 'scanpy_timeout':1200,
  408. 'allow_errors':False,
  409. 'kernel_name':'python3',
  410. 'analysis_name':'scirpy',
  411. 'vdj_dir':'vdj_t',
  412. 'count_dir':'count',
  413. 'output_notebook_key':'scirpy_notebook',
  414. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  415. 'analysis_description_xcom_key':'analysis_description'})
  416. load_scirpy_report_for_vdj_t_to_db = \
  417. PythonOperator(
  418. task_id='load_scirpy_report_for_vdj_t_to_db',
  419. dag=dag,
  420. queue='hpc_4G',
  421. python_callable=load_analysis_files_func,
  422. params={'collection_name_task':'load_cellranger_result_to_db',
  423. 'collection_name_key':'sample_igf_id',
  424. 'file_name_task':'run_scirpy_for_vdj_t',
  425. 'file_name_key':'scirpy_notebook',
  426. 'analysis_name':'scirpy_vdj_t',
  427. 'collection_type':'SCIRPY_VDJ_T_HTML',
  428. 'collection_table':'sample',
  429. 'output_files_key':'output_db_files'})
  430. upload_scirpy_report_for_vdj_t_to_ftp = \
  431. PythonOperator(
  432. task_id='upload_scirpy_report_for_vdj_t_to_ftp',
  433. dag=dag,
  434. queue='hpc_4G',
  435. python_callable=ftp_files_upload_for_analysis,
  436. params={'xcom_pull_task':'load_scanpy_report_for_vdj_t_to_db',
  437. 'xcom_pull_files_key':'output_db_files',
  438. 'collection_name_task':'load_cellranger_result_to_db',
  439. 'collection_name_key':'sample_igf_id',
  440. 'collection_type':'FTP_SCIRPY_VDJ_T_HTML',
  441. 'collection_table':'sample',
  442. 'collect_remote_file':True})
  443. upload_scirpy_report_for_vdj_t_to_box = \
  444. PythonOperator(
  445. task_id='upload_scirpy_report_for_vdj_t_to_box',
  446. dag=dag,
  447. queue='hpc_4G',
  448. python_callable=upload_analysis_file_to_box,
  449. params={'xcom_pull_task':'load_scanpy_report_for_vdj_t_to_db',
  450. 'xcom_pull_files_key':'output_db_files',
  451. 'analysis_tag':'scirpy_vdj_t_single_sample_report'})
  452. ## PIPELINE
  453. decide_analysis_branch >> run_scirpy_for_vdj_t
  454. run_scirpy_for_vdj_t >> load_scirpy_report_for_vdj_t_to_db
  455. load_scirpy_report_for_vdj_t_to_db >> upload_scirpy_report_for_vdj_t_to_ftp
  456. load_scirpy_report_for_vdj_t_to_db >> upload_scirpy_report_for_vdj_t_to_box
  457. ## TASK
  458. run_seurat_for_sc_5p = \
  459. PythonOperator(
  460. task_id='run_seurat_for_sc_5p',
  461. dag=dag,
  462. queue='hpc_4G',
  463. python_callable=run_singlecell_notebook_wrapper_func,
  464. params={'cellranger_xcom_key':'cellranger_output',
  465. 'cellranger_xcom_pull_task':'run_cellranger',
  466. 'scanpy_timeout':1200,
  467. 'allow_errors':False,
  468. 'kernel_name':'R',
  469. 'analysis_name':'seurat',
  470. 'vdj_dir':'vdj',
  471. 'count_dir':'count',
  472. 'output_notebook_key':'seurat_notebook',
  473. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  474. 'analysis_description_xcom_key':'analysis_description'})
  475. load_seurat_report_for_sc_5p_db = \
  476. PythonOperator(
  477. task_id='load_seurat_report_for_sc_5p_db',
  478. dag=dag,
  479. queue='hpc_4G',
  480. python_callable=load_analysis_files_func,
  481. params={'collection_name_task':'load_cellranger_result_to_db',
  482. 'collection_name_key':'sample_igf_id',
  483. 'file_name_task':'run_seurat_for_sc_5p',
  484. 'file_name_key':'seurat_notebook',
  485. 'analysis_name':'seurat_5p',
  486. 'collection_type':'SEURAT_HTML',
  487. 'collection_table':'sample',
  488. 'output_files_key':'output_db_files'})
  489. upload_seurat_report_for_sc_5p_ftp = \
  490. PythonOperator(
  491. task_id='upload_seurat_report_for_sc_5p_ftp',
  492. dag=dag,
  493. queue='hpc_4G',
  494. python_callable=ftp_files_upload_for_analysis,
  495. params={'xcom_pull_task':'load_seurat_report_for_sc_5p_db',
  496. 'xcom_pull_files_key':'output_db_files',
  497. 'collection_name_task':'load_cellranger_result_to_db',
  498. 'collection_name_key':'sample_igf_id',
  499. 'collection_type':'FTP_SEURAT_HTML',
  500. 'collection_table':'sample',
  501. 'collect_remote_file':True})
  502. upload_seurat_report_for_sc_5p_to_box = \
  503. PythonOperator(
  504. task_id='upload_seurat_report_for_sc_5p_to_box',
  505. dag=dag,
  506. queue='hpc_4G',
  507. python_callable=upload_analysis_file_to_box,
  508. params={'xcom_pull_task':'load_seurat_report_for_sc_5p_db',
  509. 'xcom_pull_files_key':'output_db_files',
  510. 'analysis_tag':'seurat_single_sample_report'})
  511. ## PIPELINE
  512. decide_analysis_branch >> run_seurat_for_sc_5p
  513. run_seurat_for_sc_5p >> load_seurat_report_for_sc_5p_db
  514. load_seurat_report_for_sc_5p_db >> upload_seurat_report_for_sc_5p_ftp
  515. load_seurat_report_for_sc_5p_db >> upload_seurat_report_for_sc_5p_to_box
  516. ## TASK
  517. convert_bam_to_cram = \
  518. PythonOperator(
  519. task_id='convert_cellranger_bam_to_cram',
  520. dag=dag,
  521. queue='hpc_4G4t',
  522. python_callable=convert_bam_to_cram_func,
  523. params={'xcom_pull_files_key':'cellranger_output',
  524. 'xcom_pull_task':'run_cellranger',
  525. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  526. 'analysis_description_xcom_key':'analysis_description',
  527. 'use_ephemeral_space':True,
  528. 'threads':4,
  529. 'analysis_name':'cellranger',
  530. 'collection_type':'ALIGNED_CRAM',
  531. 'collection_table':'sample',
  532. 'cram_files_xcom_key':'cram_files'})
  533. upload_cram_to_irods = \
  534. PythonOperator(
  535. task_id='upload_cram_to_irods',
  536. dag=dag,
  537. queue='hpc_4G',
  538. python_callable=irods_files_upload_for_analysis,
  539. params={'xcom_pull_task':'convert_cellranger_bam_to_cram',
  540. 'xcom_pull_files_key':'cram_files',
  541. 'collection_name_key':'sample_igf_id',
  542. 'analysis_name':'cellranger_multi'})
  543. ## PIPELINE
  544. decide_analysis_branch >> convert_bam_to_cram
  545. convert_bam_to_cram >> upload_cram_to_irods
  546. ## TASK
  547. run_picard_alignment_summary = \
  548. PythonOperator(
  549. task_id='run_picard_alignment_summary',
  550. dag=dag,
  551. queue='hpc_4G',
  552. python_callable=run_picard_for_cellranger,
  553. params={'xcom_pull_files_key':'cellranger_output',
  554. 'xcom_pull_task':'run_cellranger',
  555. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  556. 'analysis_description_xcom_key':'analysis_description',
  557. 'use_ephemeral_space':True,
  558. 'load_metrics_to_cram':True,
  559. 'java_param':'-Xmx4g',
  560. 'picard_command':'CollectAlignmentSummaryMetrics',
  561. 'picard_option':{},
  562. 'analysis_files_xcom_key':'picard_alignment_summary',
  563. 'bam_files_xcom_key':None})
  564. ## PIPELINE
  565. decide_analysis_branch >> run_picard_alignment_summary
  566. ## TASK
  567. run_picard_qual_summary = \
  568. PythonOperator(
  569. task_id='run_picard_qual_summary',
  570. dag=dag,
  571. queue='hpc_4G',
  572. python_callable=run_picard_for_cellranger,
  573. params={'xcom_pull_files_key':'cellranger_output',
  574. 'xcom_pull_task':'run_cellranger',
  575. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  576. 'analysis_description_xcom_key':'analysis_description',
  577. 'use_ephemeral_space':True,
  578. 'load_metrics_to_cram':True,
  579. 'java_param':'-Xmx4g',
  580. 'picard_command':'QualityScoreDistribution',
  581. 'picard_option':{},
  582. 'analysis_files_xcom_key':'picard_qual_summary',
  583. 'bam_files_xcom_key':None})
  584. ## PIPELINE
  585. run_picard_alignment_summary >> run_picard_qual_summary
  586. ## TASK
  587. run_picard_rna_summary = \
  588. PythonOperator(
  589. task_id='run_picard_rna_summary',
  590. dag=dag,
  591. queue='hpc_4G',
  592. python_callable=run_picard_for_cellranger,
  593. params={'xcom_pull_files_key':'cellranger_output',
  594. 'xcom_pull_task':'run_cellranger',
  595. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  596. 'analysis_description_xcom_key':'analysis_description',
  597. 'use_ephemeral_space':True,
  598. 'load_metrics_to_cram':True,
  599. 'java_param':'-Xmx4g',
  600. 'picard_command':'CollectRnaSeqMetrics',
  601. 'picard_option':{},
  602. 'analysis_files_xcom_key':'picard_rna_summary',
  603. 'bam_files_xcom_key':None})
  604. ## PIPELINE
  605. run_picard_qual_summary >> run_picard_rna_summary
  606. ## TASK
  607. run_picard_gc_summary = \
  608. PythonOperator(
  609. task_id='run_picard_gc_summary',
  610. dag=dag,
  611. queue='hpc_4G',
  612. python_callable=run_picard_for_cellranger,
  613. params={'xcom_pull_files_key':'cellranger_output',
  614. 'xcom_pull_task':'run_cellranger',
  615. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  616. 'analysis_description_xcom_key':'analysis_description',
  617. 'use_ephemeral_space':True,
  618. 'load_metrics_to_cram':True,
  619. 'java_param':'-Xmx4g',
  620. 'picard_command':'CollectGcBiasMetrics',
  621. 'picard_option':{},
  622. 'analysis_files_xcom_key':'picard_gc_summary',
  623. 'bam_files_xcom_key':None})
  624. ## PIPELINE
  625. run_picard_rna_summary >> run_picard_gc_summary
  626. ## TASK
  627. picard_base_dist_summary = \
  628. PythonOperator(
  629. task_id='run_picard_gc_summary',
  630. dag=dag,
  631. queue='hpc_4G',
  632. python_callable=run_picard_for_cellranger,
  633. params={'xcom_pull_files_key':'cellranger_output',
  634. 'xcom_pull_task':'run_cellranger',
  635. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  636. 'analysis_description_xcom_key':'analysis_description',
  637. 'use_ephemeral_space':True,
  638. 'load_metrics_to_cram':True,
  639. 'java_param':'-Xmx4g',
  640. 'picard_command':'CollectBaseDistributionByCycle',
  641. 'picard_option':{},
  642. 'analysis_files_xcom_key':'picard_base_summary',
  643. 'bam_files_xcom_key':None})
  644. ## PIPELINE
  645. run_picard_gc_summary >> picard_base_dist_summary
  646. ## TASK
  647. run_samtools_stats = \
  648. PythonOperator(
  649. task_id='run_samtools_stats',
  650. dag=dag,
  651. queue='hpc_4G4t',
  652. python_callable=run_samtools_for_cellranger,
  653. params={'xcom_pull_files_key':'cellranger_output',
  654. 'xcom_pull_task':'run_cellranger',
  655. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  656. 'analysis_description_xcom_key':'analysis_description',
  657. 'use_ephemeral_space':True,
  658. 'load_metrics_to_cram':True,
  659. 'samtools_command':'stats',
  660. 'threads':4,
  661. 'analysis_files_xcom_key':'samtools_stats'})
  662. ## PIPELINE
  663. run_picard_gc_summary >> run_samtools_stats
  664. ## TASK
  665. run_samtools_idxstats = \
  666. PythonOperator(
  667. task_id='run_samtools_idxstats',
  668. dag=dag,
  669. queue='hpc_4G4t',
  670. python_callable=run_samtools_for_cellranger,
  671. params={'xcom_pull_files_key':'cellranger_output',
  672. 'xcom_pull_task':'run_cellranger',
  673. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  674. 'analysis_description_xcom_key':'analysis_description',
  675. 'use_ephemeral_space':True,
  676. 'load_metrics_to_cram':True,
  677. 'samtools_command':'idxstats',
  678. 'threads':4,
  679. 'analysis_files_xcom_key':'samtools_idxstats'})
  680. ## PIPELINE
  681. run_samtools_stats >> run_samtools_idxstats
  682. ## TASK
  683. run_multiqc = \
  684. PythonOperator(
  685. task_id='run_multiqc',
  686. dag=dag,
  687. queue='hpc_4G',
  688. python_callable=None)
  689. ## PIPELINE
  690. run_samtools_idxstats >> run_multiqc
  691. ## TASK
  692. upload_multiqc_to_ftp = \
  693. PythonOperator(
  694. task_id='upload_multiqc_to_ftp',
  695. dag=dag,
  696. queue='hpc_4G',
  697. python_callable=None)
  698. ## PIPELINE
  699. run_multiqc >> upload_multiqc_to_ftp
  700. ## TASK
  701. upload_multiqc_to_box = \
  702. PythonOperator(
  703. task_id='upload_multiqc_to_box',
  704. dag=dag,
  705. queue='hpc_4G',
  706. python_callable=None)
  707. ## PIPELINE
  708. run_multiqc >> upload_multiqc_to_box
  709. ## TASK
  710. update_analysis_and_status = \
  711. PythonOperator(
  712. task_id='update_analysis_and_status',
  713. dag=dag,
  714. queue='hpc_4G',
  715. python_callable=None,
  716. trigger_rule='none_failed_or_skipped')
  717. ## PIPELINE
  718. upload_multiqc_to_ftp >> update_analysis_and_status
  719. upload_scanpy_report_for_sc_5p_to_ftp >> update_analysis_and_status
  720. upload_scanpy_report_for_sc_5p_to_box >> update_analysis_and_status
  721. upload_cellbrowser_for_sc_5p_to_ftp >> update_analysis_and_status
  722. upload_scirpy_report_for_vdj_to_ftp >> update_analysis_and_status
  723. upload_scirpy_report_for_vdj_to_box >> update_analysis_and_status
  724. upload_scirpy_report_for_vdj_b_to_ftp >> update_analysis_and_status
  725. upload_scirpy_report_for_vdj_b_to_box >> update_analysis_and_status
  726. upload_scirpy_report_for_vdj_t_to_ftp >> update_analysis_and_status
  727. upload_scirpy_report_for_vdj_t_to_box >> update_analysis_and_status
  728. upload_seurat_report_for_sc_5p_ftp >> update_analysis_and_status
  729. upload_seurat_report_for_sc_5p_to_box >> update_analysis_and_status
  730. upload_cellranger_results_to_irods >> update_analysis_and_status
  731. upload_cellranger_report_to_ftp >> update_analysis_and_status
  732. upload_cellranger_report_to_box >> update_analysis_and_status
  733. upload_cram_to_irods >> update_analysis_and_status