dag9_tenx_single_cell_immune_profiling.py 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159
  1. from datetime import timedelta
  2. import os,json,logging,subprocess
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dummy_operator import DummyOperator
  8. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import fetch_analysis_info_and_branch_func
  9. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import configure_cellranger_run_func
  10. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_sc_read_trimmming_func
  11. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_cellranger_tool
  12. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import decide_analysis_branch_func
  13. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_cellranger_result_to_db_func
  14. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import ftp_files_upload_for_analysis
  15. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import irods_files_upload_for_analysis
  16. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_singlecell_notebook_wrapper_func
  17. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_analysis_files_func
  18. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import task_branch_function
  19. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import upload_analysis_file_to_box
  20. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import convert_bam_to_cram_func
  21. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_picard_for_cellranger
  22. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_samtools_for_cellranger
  23. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_multiqc_for_cellranger
  24. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import index_and_copy_bam_for_parallel_analysis
  25. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import change_pipeline_status
  26. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import clean_up_files
  27. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import create_and_update_qc_pages
  28. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_cellranger_metrices_to_collection
  29. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import generate_cell_sorted_bam_func
  30. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_velocyto_func
  31. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_scvelo_for_sc_5p_func
  32. ## ARGS
  33. default_args = {
  34. 'owner': 'airflow',
  35. 'depends_on_past': False,
  36. 'start_date': days_ago(2),
  37. 'email_on_failure': False,
  38. 'email_on_retry': False,
  39. 'retries': 4,
  40. 'max_active_runs': 10,
  41. 'catchup': True,
  42. 'retry_delay': timedelta(minutes=5),
  43. 'provide_context': True,
  44. }
  45. FEATURE_TYPE_LIST = \
  46. Variable.get('tenx_single_cell_immune_profiling_feature_types', default_var={})
  47. ## DAG
  48. dag = \
  49. DAG(
  50. dag_id='dag9_tenx_single_cell_immune_profiling',
  51. schedule_interval=None,
  52. tags=['hpc', 'analysis', 'tenx', 'sc'],
  53. default_args=default_args,
  54. concurrency=100,
  55. max_active_runs=20,
  56. orientation='LR')
  57. with dag:
  58. ## TASK
  59. fetch_analysis_info_and_branch = \
  60. BranchPythonOperator(
  61. task_id='fetch_analysis_info',
  62. dag=dag,
  63. queue='hpc_4G',
  64. params={
  65. 'no_analysis_task': 'no_analysis',
  66. 'analysis_description_xcom_key': 'analysis_description',
  67. 'analysis_info_xcom_key': 'analysis_info'},
  68. python_callable=fetch_analysis_info_and_branch_func)
  69. ## TASK
  70. configure_cellranger_run = \
  71. PythonOperator(
  72. task_id='configure_cellranger_run',
  73. dag=dag,
  74. queue='hpc_4G',
  75. trigger_rule='none_failed_or_skipped',
  76. params={
  77. 'xcom_pull_task_id': 'fetch_analysis_info',
  78. 'analysis_description_xcom_key': 'analysis_description',
  79. 'analysis_info_xcom_key': 'analysis_info',
  80. 'library_csv_xcom_key': 'cellranger_library_csv'},
  81. python_callable=configure_cellranger_run_func)
  82. for analysis_name in FEATURE_TYPE_LIST.keys():
  83. ## TASK
  84. task_branch = \
  85. BranchPythonOperator(
  86. task_id=analysis_name,
  87. dag=dag,
  88. queue='hpc_4G',
  89. params={
  90. 'xcom_pull_task_id': 'fetch_analysis_info',
  91. 'analysis_info_xcom_key': 'analysis_info',
  92. 'analysis_name': analysis_name,
  93. 'task_prefix': 'run_trim'},
  94. python_callable=task_branch_function)
  95. run_trim_list = list()
  96. for run_id in range(0,10):
  97. ## TASK
  98. t = \
  99. PythonOperator(
  100. task_id='run_trim_{0}_{1}'.format(analysis_name, run_id),
  101. dag=dag,
  102. queue='hpc_4G',
  103. params={
  104. 'xcom_pull_task_id': 'fetch_analysis_info',
  105. 'analysis_info_xcom_key': 'analysis_info',
  106. 'analysis_description_xcom_key': 'analysis_description',
  107. 'analysis_name': analysis_name,
  108. 'run_id': run_id,
  109. 'r1-length': 0,
  110. 'r2-length': 0,
  111. 'fastq_input_dir_tag': 'fastq_dir',
  112. 'use_ephemeral_space': True,
  113. 'fastq_output_dir_tag': 'output_path'},
  114. python_callable=run_sc_read_trimmming_func)
  115. run_trim_list.append(t)
  116. ## TASK
  117. collect_trimmed_files = \
  118. DummyOperator(
  119. task_id='collect_trimmed_files_{0}'.format(analysis_name),
  120. trigger_rule='none_failed_or_skipped',
  121. dag=dag)
  122. ## PIPELINE
  123. fetch_analysis_info_and_branch >> task_branch
  124. task_branch >> run_trim_list
  125. run_trim_list >> collect_trimmed_files
  126. collect_trimmed_files >> configure_cellranger_run
  127. ## TASK
  128. no_analysis = \
  129. DummyOperator(
  130. task_id='no_analysis',
  131. dag=dag)
  132. ## PIPELINE
  133. fetch_analysis_info_and_branch >> no_analysis
  134. ## TASK
  135. run_cellranger = \
  136. PythonOperator(
  137. task_id='run_cellranger',
  138. dag=dag,
  139. queue='hpc_64G16t24hr',
  140. params={
  141. 'analysis_description_xcom_pull_task': 'fetch_analysis_info',
  142. 'analysis_description_xcom_key': 'analysis_description',
  143. 'library_csv_xcom_key': 'cellranger_library_csv',
  144. 'library_csv_xcom_pull_task': 'configure_cellranger_run',
  145. 'cellranger_xcom_key': 'cellranger_output',
  146. 'cellranger_options': ['--localcores 16', '--localmem 64']},
  147. python_callable=run_cellranger_tool)
  148. ## PIPELINE
  149. configure_cellranger_run >> run_cellranger
  150. ## TASK
  151. decide_analysis_branch = \
  152. BranchPythonOperator(
  153. task_id='decide_analysis_branch',
  154. dag=dag,
  155. queue='hpc_4G',
  156. python_callable=decide_analysis_branch_func,
  157. params={
  158. 'load_cellranger_result_to_db_task': 'load_cellranger_result_to_db',
  159. 'run_scanpy_for_sc_5p_task': 'run_scanpy_for_sc_5p',
  160. 'run_scirpy_for_vdj_task': 'run_scirpy_for_vdj',
  161. 'run_scirpy_for_vdj_b_task': 'run_scirpy_for_vdj_b',
  162. 'run_scirpy_vdj_t_task': 'run_scirpy_for_vdj_t',
  163. 'run_seurat_for_sc_5p_task': 'run_seurat_for_sc_5p',
  164. 'convert_cellranger_bam_to_cram_task': 'convert_cellranger_bam_to_cram',
  165. 'library_csv_xcom_key': 'cellranger_library_csv',
  166. 'library_csv_xcom_pull_task': 'configure_cellranger_run'})
  167. ## PIPELINE
  168. run_cellranger >> decide_analysis_branch
  169. ## TASK
  170. load_cellranger_result_to_db = \
  171. PythonOperator(
  172. task_id='load_cellranger_result_to_db',
  173. dag=dag,
  174. queue='hpc_4G',
  175. python_callable=load_cellranger_result_to_db_func,
  176. params={
  177. 'analysis_description_xcom_pull_task': 'fetch_analysis_info',
  178. 'analysis_description_xcom_key': 'analysis_description',
  179. 'cellranger_xcom_key': 'cellranger_output',
  180. 'cellranger_xcom_pull_task': 'run_cellranger',
  181. 'collection_type': 'CELLRANGER_MULTI',
  182. 'html_collection_type': 'CELLRANGER_HTML',
  183. 'collection_table': 'sample',
  184. 'xcom_collection_name_key': 'sample_igf_id',
  185. 'genome_column': 'genome_build',
  186. 'analysis_name': 'cellranger_multi',
  187. 'output_xcom_key': 'loaded_output_files',
  188. 'html_xcom_key': 'html_report_file',
  189. 'html_report_file_name': 'web_summary.html'})
  190. upload_cellranger_report_to_ftp = \
  191. PythonOperator(
  192. task_id='upload_cellranger_report_to_ftp',
  193. dag=dag,
  194. queue='hpc_4G',
  195. python_callable=ftp_files_upload_for_analysis,
  196. params={
  197. 'xcom_pull_task': 'load_cellranger_result_to_db',
  198. 'xcom_pull_files_key': 'html_report_file',
  199. 'collection_name_task': 'load_cellranger_result_to_db',
  200. 'collection_name_key': 'sample_igf_id',
  201. 'collection_type': 'FTP_CELLRANGER_HTML',
  202. 'collection_table': 'sample',
  203. 'collect_remote_file': True})
  204. upload_cellranger_report_to_box = \
  205. PythonOperator(
  206. task_id='upload_cellranger_report_to_box',
  207. dag=dag,
  208. queue='hpc_4G',
  209. python_callable=upload_analysis_file_to_box,
  210. params={
  211. 'xcom_pull_task': 'load_cellranger_result_to_db',
  212. 'xcom_pull_files_key': 'html_report_file',
  213. 'analysis_tag': 'cellranger_multi'})
  214. upload_cellranger_results_to_irods = \
  215. PythonOperator(
  216. task_id='upload_cellranger_results_to_irods',
  217. dag=dag,
  218. queue='hpc_4G',
  219. python_callable=irods_files_upload_for_analysis,
  220. params={
  221. 'xcom_pull_task': 'load_cellranger_result_to_db',
  222. 'xcom_pull_files_key': 'loaded_output_files',
  223. 'collection_name_key': 'sample_igf_id',
  224. 'collection_name_task': 'load_cellranger_result_to_db',
  225. 'analysis_name': 'cellranger_multi'})
  226. ## PIPELINE
  227. decide_analysis_branch >> load_cellranger_result_to_db
  228. load_cellranger_result_to_db >> upload_cellranger_report_to_ftp
  229. load_cellranger_result_to_db >> upload_cellranger_report_to_box
  230. load_cellranger_result_to_db >> upload_cellranger_results_to_irods
  231. ## TASK
  232. run_scanpy_for_sc_5p = \
  233. PythonOperator(
  234. task_id='run_scanpy_for_sc_5p',
  235. dag=dag,
  236. queue='hpc_4G',
  237. python_callable=run_singlecell_notebook_wrapper_func,
  238. params={
  239. 'cellranger_xcom_key': 'cellranger_output',
  240. 'cellranger_xcom_pull_task': 'run_cellranger',
  241. 'scanpy_timeout': 1200,
  242. 'allow_errors': False,
  243. 'kernel_name': 'python3',
  244. 'count_dir': 'count',
  245. 'analysis_name': 'scanpy',
  246. 'output_notebook_key': 'scanpy_notebook',
  247. 'output_cellbrowser_key': 'cellbrowser_dirs',
  248. 'output_scanpy_h5ad_key': 'scanpy_h5ad',
  249. 'analysis_description_xcom_pull_task': 'fetch_analysis_info',
  250. 'analysis_description_xcom_key': 'analysis_description'})
  251. load_cellranger_gex_matrics_to_db = \
  252. PythonOperator(
  253. task_id='load_cellranger_gex_matrics_to_db',
  254. dag=dag,
  255. queue='hpc_4G',
  256. python_callable=load_cellranger_metrices_to_collection,
  257. params={
  258. 'cellranger_xcom_key': 'cellranger_output',
  259. 'cellranger_xcom_pull_task': 'run_cellranger',
  260. 'collection_type': 'CELLRANGER_MULTI',
  261. 'collection_name_task': 'load_cellranger_result_to_db',
  262. 'collection_name_key': 'sample_igf_id',
  263. 'metrics_summary_file': 'metrics_summary.csv',
  264. 'attribute_prefix': 'CELLRANGER_COUNT'})
  265. load_scanpy_report_for_sc_5p_to_db = \
  266. PythonOperator(
  267. task_id='load_scanpy_report_for_sc_5p_to_db',
  268. dag=dag,
  269. queue='hpc_4G',
  270. python_callable=load_analysis_files_func,
  271. params={
  272. 'collection_name_task': 'load_cellranger_result_to_db',
  273. 'collection_name_key': 'sample_igf_id',
  274. 'file_name_task': 'run_scanpy_for_sc_5p',
  275. 'file_name_key': 'scanpy_notebook',
  276. 'analysis_name': 'scanpy_5p',
  277. 'collection_type': 'SCANPY_HTML',
  278. 'collection_table': 'sample',
  279. 'output_files_key': 'output_db_files'})
  280. upload_scanpy_report_for_sc_5p_to_ftp = \
  281. PythonOperator(
  282. task_id='upload_scanpy_report_for_sc_5p_to_ftp',
  283. dag=dag,
  284. queue='hpc_4G',
  285. python_callable=ftp_files_upload_for_analysis,
  286. params={
  287. 'xcom_pull_task': 'load_scanpy_report_for_sc_5p_to_db',
  288. 'xcom_pull_files_key': 'output_db_files',
  289. 'collection_name_task': 'load_cellranger_result_to_db',
  290. 'collection_name_key': 'sample_igf_id',
  291. 'collection_type': 'FTP_SCANPY_HTML',
  292. 'collection_table': 'sample',
  293. 'collect_remote_file': True})
  294. upload_scanpy_report_for_sc_5p_to_box = \
  295. PythonOperator(
  296. task_id='upload_scanpy_report_for_sc_5p_to_box',
  297. dag=dag,
  298. queue='hpc_4G',
  299. python_callable=upload_analysis_file_to_box,
  300. params={
  301. 'xcom_pull_task': 'load_scanpy_report_for_sc_5p_to_db',
  302. 'xcom_pull_files_key': 'output_db_files',
  303. 'analysis_tag': 'scanpy_single_sample_report'})
  304. upload_cellbrowser_for_sc_5p_to_ftp = \
  305. PythonOperator(
  306. task_id='upload_cellbrowser_for_sc_5p_to_ftp',
  307. dag=dag,
  308. queue='hpc_4G',
  309. python_callable=ftp_files_upload_for_analysis,
  310. params={
  311. 'xcom_pull_task': 'run_scanpy_for_sc_5p',
  312. 'xcom_pull_files_key': 'cellbrowser_dirs',
  313. 'collection_name_task': 'load_cellranger_result_to_db',
  314. 'collection_name_key': 'sample_igf_id',
  315. 'collection_type': 'FTP_CELLBROWSER',
  316. 'collection_table': 'sample',
  317. 'collect_remote_file': True})
  318. ## PIPELINE
  319. decide_analysis_branch >> run_scanpy_for_sc_5p
  320. run_scanpy_for_sc_5p >> load_scanpy_report_for_sc_5p_to_db
  321. run_scanpy_for_sc_5p >> load_cellranger_gex_matrics_to_db
  322. load_scanpy_report_for_sc_5p_to_db >> upload_scanpy_report_for_sc_5p_to_ftp
  323. load_scanpy_report_for_sc_5p_to_db >> upload_scanpy_report_for_sc_5p_to_box
  324. run_scanpy_for_sc_5p >> upload_cellbrowser_for_sc_5p_to_ftp
  325. ## TASK
  326. run_scirpy_for_vdj_b = \
  327. PythonOperator(
  328. task_id='run_scirpy_for_vdj_b',
  329. dag=dag,
  330. queue='hpc_4G',
  331. python_callable=run_singlecell_notebook_wrapper_func,
  332. params={
  333. 'cellranger_xcom_key': 'cellranger_output',
  334. 'cellranger_xcom_pull_task': 'run_cellranger',
  335. 'scanpy_timeout': 1200,
  336. 'allow_errors': False,
  337. 'kernel_name': 'python3',
  338. 'analysis_name': 'scirpy',
  339. 'vdj_dir': 'vdj_b',
  340. 'count_dir': 'count',
  341. 'output_notebook_key': 'scirpy_notebook',
  342. 'analysis_description_xcom_pull_task': 'fetch_analysis_info',
  343. 'analysis_description_xcom_key': 'analysis_description'})
  344. load_cellranger_vdjB_matrics_to_db = \
  345. PythonOperator(
  346. task_id='load_cellranger_vdjB_matrics_to_db',
  347. dag=dag,
  348. queue='hpc_4G',
  349. python_callable=load_cellranger_metrices_to_collection,
  350. params={
  351. 'cellranger_xcom_key': 'cellranger_output',
  352. 'cellranger_xcom_pull_task': 'run_cellranger',
  353. 'collection_type': 'CELLRANGER_MULTI',
  354. 'collection_name_task': 'load_cellranger_result_to_db',
  355. 'collection_name_key': 'sample_igf_id',
  356. 'metrics_summary_file': 'metrics_summary.csv',
  357. 'attribute_prefix': 'CELLRANGER_VDJB'})
  358. load_scirpy_report_for_vdj_b_to_db = \
  359. PythonOperator(
  360. task_id='load_scirpy_report_for_vdj_b_to_db',
  361. dag=dag,
  362. queue='hpc_4G',
  363. python_callable=load_analysis_files_func,
  364. params={
  365. 'collection_name_task': 'load_cellranger_result_to_db',
  366. 'collection_name_key': 'sample_igf_id',
  367. 'file_name_task': 'run_scirpy_for_vdj_b',
  368. 'file_name_key': 'scirpy_notebook',
  369. 'analysis_name': 'scirpy_vdj_b',
  370. 'collection_type': 'SCIRPY_VDJ_B_HTML',
  371. 'collection_table': 'sample',
  372. 'output_files_key': 'output_db_files'})
  373. upload_scirpy_report_for_vdj_b_to_ftp = \
  374. PythonOperator(
  375. task_id='upload_scirpy_report_for_vdj_b_to_ftp',
  376. dag=dag,
  377. queue='hpc_4G',
  378. python_callable=ftp_files_upload_for_analysis,
  379. params={
  380. 'xcom_pull_task': 'load_scirpy_report_for_vdj_b_to_db',
  381. 'xcom_pull_files_key': 'output_db_files',
  382. 'collection_name_task': 'load_cellranger_result_to_db',
  383. 'collection_name_key': 'sample_igf_id',
  384. 'collection_type': 'FTP_SCIRPY_VDJ_B_HTML',
  385. 'collection_table': 'sample',
  386. 'collect_remote_file': True})
  387. upload_scirpy_report_for_vdj_b_to_box = \
  388. PythonOperator(
  389. task_id='upload_scanpy_report_for_vdj_b_to_box',
  390. dag=dag,
  391. queue='hpc_4G',
  392. python_callable=upload_analysis_file_to_box,
  393. params={
  394. 'xcom_pull_task': 'load_scirpy_report_for_vdj_b_to_db',
  395. 'xcom_pull_files_key': 'output_db_files',
  396. 'analysis_tag': 'scirpy_vdj_b_single_sample_report'})
  397. ## PIPELINE
  398. decide_analysis_branch >> run_scirpy_for_vdj_b
  399. run_scirpy_for_vdj_b >> load_scirpy_report_for_vdj_b_to_db
  400. run_scirpy_for_vdj_b >> load_cellranger_vdjB_matrics_to_db
  401. load_scirpy_report_for_vdj_b_to_db >> upload_scirpy_report_for_vdj_b_to_ftp
  402. load_scirpy_report_for_vdj_b_to_db >> upload_scirpy_report_for_vdj_b_to_box
  403. ## TASK
  404. run_scirpy_for_vdj_t = \
  405. PythonOperator(
  406. task_id='run_scirpy_for_vdj_t',
  407. dag=dag,
  408. queue='hpc_4G',
  409. python_callable=run_singlecell_notebook_wrapper_func,
  410. params={
  411. 'cellranger_xcom_key': 'cellranger_output',
  412. 'cellranger_xcom_pull_task': 'run_cellranger',
  413. 'scanpy_timeout': 1200,
  414. 'allow_errors': False,
  415. 'kernel_name': 'python3',
  416. 'analysis_name': 'scirpy',
  417. 'vdj_dir': 'vdj_t',
  418. 'count_dir': 'count',
  419. 'output_notebook_key': 'scirpy_notebook',
  420. 'analysis_description_xcom_pull_task': 'fetch_analysis_info',
  421. 'analysis_description_xcom_key': 'analysis_description'})
  422. load_cellranger_vdjT_matrics_to_db = \
  423. PythonOperator(
  424. task_id='load_cellranger_vdjT_matrics_to_db',
  425. dag=dag,
  426. queue='hpc_4G',
  427. python_callable=load_cellranger_metrices_to_collection,
  428. params={
  429. 'cellranger_xcom_key': 'cellranger_output',
  430. 'cellranger_xcom_pull_task': 'run_cellranger',
  431. 'collection_type': 'CELLRANGER_MULTI',
  432. 'collection_name_task': 'load_cellranger_result_to_db',
  433. 'collection_name_key': 'sample_igf_id',
  434. 'metrics_summary_file': 'metrics_summary.csv',
  435. 'attribute_prefix': 'CELLRANGER_VDJT'})
  436. load_scirpy_report_for_vdj_t_to_db = \
  437. PythonOperator(
  438. task_id='load_scirpy_report_for_vdj_t_to_db',
  439. dag=dag,
  440. queue='hpc_4G',
  441. python_callable=load_analysis_files_func,
  442. params={
  443. 'collection_name_task': 'load_cellranger_result_to_db',
  444. 'collection_name_key': 'sample_igf_id',
  445. 'file_name_task': 'run_scirpy_for_vdj_t',
  446. 'file_name_key': 'scirpy_notebook',
  447. 'analysis_name': 'scirpy_vdj_t',
  448. 'collection_type': 'SCIRPY_VDJ_T_HTML',
  449. 'collection_table': 'sample',
  450. 'output_files_key': 'output_db_files'})
  451. upload_scirpy_report_for_vdj_t_to_ftp = \
  452. PythonOperator(
  453. task_id='upload_scirpy_report_for_vdj_t_to_ftp',
  454. dag=dag,
  455. queue='hpc_4G',
  456. python_callable=ftp_files_upload_for_analysis,
  457. params={
  458. 'xcom_pull_task': 'load_scirpy_report_for_vdj_t_to_db',
  459. 'xcom_pull_files_key': 'output_db_files',
  460. 'collection_name_task': 'load_cellranger_result_to_db',
  461. 'collection_name_key': 'sample_igf_id',
  462. 'collection_type': 'FTP_SCIRPY_VDJ_T_HTML',
  463. 'collection_table': 'sample',
  464. 'collect_remote_file': True})
  465. upload_scirpy_report_for_vdj_t_to_box = \
  466. PythonOperator(
  467. task_id='upload_scirpy_report_for_vdj_t_to_box',
  468. dag=dag,
  469. queue='hpc_4G',
  470. python_callable=upload_analysis_file_to_box,
  471. params={
  472. 'xcom_pull_task': 'load_scirpy_report_for_vdj_t_to_db',
  473. 'xcom_pull_files_key': 'output_db_files',
  474. 'analysis_tag': 'scirpy_vdj_t_single_sample_report'})
  475. ## PIPELINE
  476. decide_analysis_branch >> run_scirpy_for_vdj_t
  477. run_scirpy_for_vdj_t >> load_scirpy_report_for_vdj_t_to_db
  478. run_scirpy_for_vdj_t >> load_cellranger_vdjT_matrics_to_db
  479. load_scirpy_report_for_vdj_t_to_db >> upload_scirpy_report_for_vdj_t_to_ftp
  480. load_scirpy_report_for_vdj_t_to_db >> upload_scirpy_report_for_vdj_t_to_box
  481. ## TASK
  482. run_seurat_for_sc_5p = \
  483. PythonOperator(
  484. task_id='run_seurat_for_sc_5p',
  485. dag=dag,
  486. queue='hpc_4G',
  487. python_callable=run_singlecell_notebook_wrapper_func,
  488. params={
  489. 'cellranger_xcom_key': 'cellranger_output',
  490. 'cellranger_xcom_pull_task': 'run_cellranger',
  491. 'scanpy_timeout': 1200,
  492. 'allow_errors': False,
  493. 'kernel_name': 'ir',
  494. 'analysis_name': 'seurat',
  495. 'vdj_dir': 'vdj',
  496. 'count_dir': 'count',
  497. 'output_notebook_key': 'seurat_notebook',
  498. 'analysis_description_xcom_pull_task': 'fetch_analysis_info',
  499. 'analysis_description_xcom_key': 'analysis_description'})
  500. load_seurat_report_for_sc_5p_db = \
  501. PythonOperator(
  502. task_id='load_seurat_report_for_sc_5p_db',
  503. dag=dag,
  504. queue='hpc_4G',
  505. python_callable=load_analysis_files_func,
  506. params={
  507. 'collection_name_task': 'load_cellranger_result_to_db',
  508. 'collection_name_key': 'sample_igf_id',
  509. 'file_name_task': 'run_seurat_for_sc_5p',
  510. 'file_name_key': 'seurat_notebook',
  511. 'analysis_name': 'seurat_5p',
  512. 'collection_type': 'SEURAT_HTML',
  513. 'collection_table': 'sample',
  514. 'output_files_key': 'output_db_files'})
  515. upload_seurat_report_for_sc_5p_ftp = \
  516. PythonOperator(
  517. task_id='upload_seurat_report_for_sc_5p_ftp',
  518. dag=dag,
  519. queue='hpc_4G',
  520. python_callable=ftp_files_upload_for_analysis,
  521. params={
  522. 'xcom_pull_task': 'load_seurat_report_for_sc_5p_db',
  523. 'xcom_pull_files_key': 'output_db_files',
  524. 'collection_name_task': 'load_cellranger_result_to_db',
  525. 'collection_name_key': 'sample_igf_id',
  526. 'collection_type': 'FTP_SEURAT_HTML',
  527. 'collection_table': 'sample',
  528. 'collect_remote_file': True})
  529. upload_seurat_report_for_sc_5p_to_box = \
  530. PythonOperator(
  531. task_id='upload_seurat_report_for_sc_5p_to_box',
  532. dag=dag,
  533. queue='hpc_4G',
  534. python_callable=upload_analysis_file_to_box,
  535. params={
  536. 'xcom_pull_task': 'load_seurat_report_for_sc_5p_db',
  537. 'xcom_pull_files_key': 'output_db_files',
  538. 'analysis_tag': 'seurat_single_sample_report'})
  539. ## PIPELINE
  540. decide_analysis_branch >> run_seurat_for_sc_5p
  541. run_seurat_for_sc_5p >> load_seurat_report_for_sc_5p_db
  542. load_seurat_report_for_sc_5p_db >> upload_seurat_report_for_sc_5p_ftp
  543. load_seurat_report_for_sc_5p_db >> upload_seurat_report_for_sc_5p_to_box
  544. ## TASK
  545. convert_cellranger_bam_to_cram = \
  546. PythonOperator(
  547. task_id='convert_cellranger_bam_to_cram',
  548. dag=dag,
  549. queue='hpc_4G4t',
  550. python_callable=convert_bam_to_cram_func,
  551. params={
  552. 'xcom_pull_files_key': 'cellranger_output',
  553. 'xcom_pull_task': 'run_cellranger',
  554. 'analysis_description_xcom_pull_task': 'fetch_analysis_info',
  555. 'analysis_description_xcom_key': 'analysis_description',
  556. 'use_ephemeral_space': True,
  557. 'threads': 4,
  558. 'cellranger_bam_path': 'count/sample_alignments.bam',
  559. 'analysis_name': 'cellranger',
  560. 'collection_type': 'ANALYSIS_CRAM',
  561. 'collection_table': 'sample',
  562. 'cram_files_xcom_key': 'cram_files'})
  563. ## PIPELINE
  564. decide_analysis_branch >> convert_cellranger_bam_to_cram
  565. ## TASK
  566. generate_cell_sorted_bam = \
  567. PythonOperator(
  568. task_id='generate_cell_sorted_bam',
  569. dag=dag,
  570. queue='hpc_16G8t',
  571. python_callable=generate_cell_sorted_bam_func,
  572. params={
  573. 'xcom_pull_task': 'run_cellranger',
  574. 'xcom_pull_files_key': 'cellranger_output',
  575. 'cellranger_bam_path': 'count/sample_alignments.bam',
  576. 'cellsorted_bam_path': 'count/cellsorted_sample_alignments.bam',
  577. 'samtools_mem': '2G',
  578. 'threads': 7})
  579. run_velocyto = \
  580. PythonOperator(
  581. task_id='run_velocyto',
  582. queue='hpc_16G_long',
  583. python_callable=run_velocyto_func,
  584. params={
  585. 'xcom_pull_task': 'run_cellranger',
  586. 'xcom_pull_files_key': 'cellranger_output',
  587. 'cell_sorted_bam_name': 'count/cellsorted_sample_alignments.bam',
  588. 'analysis_description_xcom_pull_task': 'fetch_analysis_info',
  589. 'analysis_description_xcom_key': 'analysis_description' })
  590. run_scvelo_for_sc_5p = \
  591. PythonOperator(
  592. task_id='run_scvelo_for_sc_5p',
  593. dag=dag,
  594. queue='hpc_32G16t',
  595. python_callable=run_scvelo_for_sc_5p_func,
  596. params={
  597. 'xcom_pull_task': 'run_cellranger',
  598. 'xcom_pull_files_key': 'cellranger_output',
  599. 'analysis_description_xcom_pull_task': 'fetch_analysis_info',
  600. 'analysis_description_xcom_key': 'analysis_description',
  601. 'loom_file_key': 'loom_output',
  602. 'loom_file_task': 'run_velocyto',
  603. 'scanpy_h5ad_task':'run_scanpy_for_sc_5p',
  604. 'scanpy_h5ad_key': 'scanpy_h5ad',
  605. 'timeout': 2400,
  606. 'allow_errors': False,
  607. 'cpu_threads': 14,
  608. 'output_notebook_key': 'scvelo_notebook'})
  609. load_loom_file_to_rds = \
  610. PythonOperator(
  611. task_id='load_loom_file_to_rds',
  612. dag=dag,
  613. queue='hpc_4G',
  614. python_callable=load_analysis_files_func,
  615. params={
  616. 'collection_name_task': 'load_cellranger_result_to_db',
  617. 'collection_name_key': 'sample_igf_id',
  618. 'file_name_task': 'run_velocyto',
  619. 'file_name_key': 'loom_output',
  620. 'analysis_name': 'velocyto_5p',
  621. 'collection_type': 'VELOCYTO_LOOM',
  622. 'collection_table': 'sample',
  623. 'output_files_key': 'output_db_files'})
  624. upload_loom_file_to_irods = \
  625. PythonOperator(
  626. task_id='upload_loom_file_to_irods',
  627. dag=dag,
  628. queue='hpc_4G',
  629. python_callable=irods_files_upload_for_analysis,
  630. params={
  631. 'xcom_pull_task': 'load_loom_file_to_rds',
  632. 'xcom_pull_files_key': 'output_db_files',
  633. 'collection_name_key': 'sample_igf_id',
  634. 'collection_name_task': 'load_cellranger_result_to_db',
  635. 'analysis_name': 'velocyto_loom'})
  636. load_scvelo_report_to_rds = \
  637. PythonOperator(
  638. task_id='load_scvelo_report_to_rds',
  639. dag=dag,
  640. queue='hpc_4G',
  641. python_callable=load_analysis_files_func,
  642. params={
  643. 'collection_name_task': 'load_cellranger_result_to_db',
  644. 'collection_name_key': 'sample_igf_id',
  645. 'file_name_task': 'run_scvelo_for_sc_5p',
  646. 'file_name_key': 'scvelo_notebook',
  647. 'analysis_name': 'scvelo_5p',
  648. 'collection_type': 'SCVELO_HTML',
  649. 'collection_table': 'sample',
  650. 'output_files_key': 'output_db_files'})
  651. upload_scvelo_report_to_ftp = \
  652. PythonOperator(
  653. task_id='upload_scvelo_report_to_ftp',
  654. dag=dag,
  655. queue='hpc_4G',
  656. python_callable=ftp_files_upload_for_analysis,
  657. params={
  658. 'xcom_pull_task': 'load_scvelo_report_to_rds',
  659. 'xcom_pull_files_key': 'output_db_files',
  660. 'collection_name_task': 'load_cellranger_result_to_db',
  661. 'collection_name_key': 'sample_igf_id',
  662. 'collection_type': 'FTP_SCVELO_HTML',
  663. 'collection_table': 'sample',
  664. 'collect_remote_file': True})
  665. upload_scvelo_report_to_box = \
  666. PythonOperator(
  667. task_id='upload_scvelo_report_to_box',
  668. dag=dag,
  669. queue='hpc_4G',
  670. python_callable=upload_analysis_file_to_box,
  671. params={
  672. 'xcom_pull_task': 'load_scvelo_report_to_rds',
  673. 'xcom_pull_files_key': 'output_db_files',
  674. 'analysis_tag': 'scvelo_single_sample_report'})
  675. ## PIPELINE
  676. convert_cellranger_bam_to_cram >> generate_cell_sorted_bam
  677. generate_cell_sorted_bam >> run_velocyto
  678. run_velocyto >> run_scvelo_for_sc_5p
  679. run_scanpy_for_sc_5p >> run_scvelo_for_sc_5p
  680. run_velocyto >> load_loom_file_to_rds
  681. load_loom_file_to_rds >> upload_loom_file_to_irods
  682. run_scvelo_for_sc_5p >> load_scvelo_report_to_rds
  683. load_scvelo_report_to_rds >> upload_scvelo_report_to_ftp
  684. load_scvelo_report_to_rds >> upload_scvelo_report_to_box
  685. ## TASK
  686. copy_bam_for_parallel_runs = \
  687. BranchPythonOperator(
  688. task_id='copy_bam_for_parallel_runs',
  689. dag=dag,
  690. queue='hpc_4G',
  691. python_callable=index_and_copy_bam_for_parallel_analysis,
  692. params={
  693. 'xcom_pull_files_key': 'cellranger_output',
  694. 'xcom_pull_task': 'run_cellranger',
  695. 'cellranger_bam_path': 'count/sample_alignments.bam',
  696. 'list_of_tasks': [
  697. 'run_picard_alignment_summary',
  698. 'run_picard_qual_summary',
  699. 'run_picard_rna_summary',
  700. 'run_picard_gc_summary',
  701. 'run_picard_base_dist_summary',
  702. 'run_samtools_stats']})
  703. ## TASK
  704. upload_cram_to_irods = \
  705. PythonOperator(
  706. task_id='upload_cram_to_irods',
  707. dag=dag,
  708. queue='hpc_4G',
  709. python_callable=irods_files_upload_for_analysis,
  710. params={
  711. 'xcom_pull_task': 'convert_cellranger_bam_to_cram',
  712. 'xcom_pull_files_key': 'cram_files',
  713. 'collection_name_key': 'sample_igf_id',
  714. 'collection_name_task': 'load_cellranger_result_to_db',
  715. 'analysis_name': 'cellranger_multi'})
  716. ## PIPELINE
  717. #convert_cellranger_bam_to_cram >> copy_bam_for_parallel_runs # we need to load metrics to cram
  718. generate_cell_sorted_bam >> copy_bam_for_parallel_runs
  719. convert_cellranger_bam_to_cram >> upload_cram_to_irods
  720. ## TASK
  721. run_picard_alignment_summary = \
  722. PythonOperator(
  723. task_id='run_picard_alignment_summary',
  724. dag=dag,
  725. queue='hpc_4G',
  726. python_callable=run_picard_for_cellranger,
  727. params={
  728. 'xcom_pull_files_key': 'run_picard_alignment_summary',
  729. 'xcom_pull_task': 'copy_bam_for_parallel_runs',
  730. 'analysis_description_xcom_pull_task': 'fetch_analysis_info',
  731. 'analysis_description_xcom_key': 'analysis_description',
  732. 'use_ephemeral_space': True,
  733. 'load_metrics_to_cram': True,
  734. 'java_param': '-Xmx4g',
  735. 'picard_command': 'CollectAlignmentSummaryMetrics',
  736. 'picard_option': {},
  737. 'analysis_files_xcom_key': 'picard_alignment_summary',
  738. 'bam_files_xcom_key': None})
  739. ## PIPELINE
  740. copy_bam_for_parallel_runs >> run_picard_alignment_summary
  741. ## TASK
  742. cleanup_picard_alignment_summary_input = \
  743. PythonOperator(
  744. task_id='cleanup_picard_alignment_summary_input',
  745. dag=dag,
  746. queue='hpc_4G',
  747. python_callable=clean_up_files,
  748. params={
  749. 'xcom_pull_files_key': 'run_picard_alignment_summary',
  750. 'xcom_pull_task': 'copy_bam_for_parallel_runs'})
  751. ## PIPELINE
  752. run_picard_alignment_summary >> cleanup_picard_alignment_summary_input
  753. ## TASK
  754. upload_picard_alignment_summary_to_box = \
  755. PythonOperator(
  756. task_id='upload_picard_alignment_summary_to_box',
  757. dag=dag,
  758. queue='hpc_4G',
  759. python_callable=upload_analysis_file_to_box,
  760. params={
  761. 'xcom_pull_task': 'run_picard_alignment_summary',
  762. 'xcom_pull_files_key': 'picard_alignment_summary',
  763. 'analysis_tag': 'Picard-CollectAlignmentSummaryMetrics'})
  764. ## PIPELINE
  765. run_picard_alignment_summary >> upload_picard_alignment_summary_to_box
  766. ## TASK
  767. run_picard_qual_summary = \
  768. PythonOperator(
  769. task_id='run_picard_qual_summary',
  770. dag=dag,
  771. queue='hpc_4G',
  772. python_callable=run_picard_for_cellranger,
  773. params={
  774. 'xcom_pull_files_key': 'run_picard_qual_summary',
  775. 'xcom_pull_task': 'copy_bam_for_parallel_runs',
  776. 'analysis_description_xcom_pull_task': 'fetch_analysis_info',
  777. 'analysis_description_xcom_key': 'analysis_description',
  778. 'use_ephemeral_space': True,
  779. 'load_metrics_to_cram': True,
  780. 'java_param': '-Xmx4g',
  781. 'picard_command': 'QualityScoreDistribution',
  782. 'picard_option': {},
  783. 'analysis_files_xcom_key': 'picard_qual_summary',
  784. 'bam_files_xcom_key': None})
  785. ## PIPELINE
  786. copy_bam_for_parallel_runs >> run_picard_qual_summary
  787. ## TASK
  788. cleanup_picard_qual_summary_input = \
  789. PythonOperator(
  790. task_id='cleanup_picard_qual_summary_input',
  791. dag=dag,
  792. queue='hpc_4G',
  793. python_callable=clean_up_files,
  794. params={
  795. 'xcom_pull_files_key': 'run_picard_qual_summary',
  796. 'xcom_pull_task': 'copy_bam_for_parallel_runs'})
  797. ## PIPELINE
  798. run_picard_qual_summary >> cleanup_picard_qual_summary_input
  799. ## TASK
  800. upload_picard_qual_summary_to_box = \
  801. PythonOperator(
  802. task_id='upload_picard_qual_summary_to_box',
  803. dag=dag,
  804. queue='hpc_4G',
  805. python_callable=upload_analysis_file_to_box,
  806. params={
  807. 'xcom_pull_task': 'run_picard_qual_summary',
  808. 'xcom_pull_files_key': 'picard_qual_summary',
  809. 'analysis_tag': 'Picard-QualityScoreDistribution'})
  810. ## PIPELINE
  811. run_picard_qual_summary >> upload_picard_qual_summary_to_box
  812. ## TASK
  813. run_picard_rna_summary = \
  814. PythonOperator(
  815. task_id='run_picard_rna_summary',
  816. dag=dag,
  817. queue='hpc_8G',
  818. python_callable=run_picard_for_cellranger,
  819. params={
  820. 'xcom_pull_files_key': 'run_picard_rna_summary',
  821. 'xcom_pull_task': 'copy_bam_for_parallel_runs',
  822. 'analysis_description_xcom_pull_task': 'fetch_analysis_info',
  823. 'analysis_description_xcom_key': 'analysis_description',
  824. 'use_ephemeral_space': True,
  825. 'load_metrics_to_cram': True,
  826. 'java_param': '-Xmx7g',
  827. 'picard_command': 'CollectRnaSeqMetrics',
  828. 'picard_option': {},
  829. 'analysis_files_xcom_key': 'picard_rna_summary',
  830. 'bam_files_xcom_key': None})
  831. ## PIPELINE
  832. copy_bam_for_parallel_runs >> run_picard_rna_summary
  833. ## TASK
  834. cleanup_picard_rna_summary_input = \
  835. PythonOperator(
  836. task_id='cleanup_picard_rna_summary_input',
  837. dag=dag,
  838. queue='hpc_4G',
  839. python_callable=clean_up_files,
  840. params={
  841. 'xcom_pull_files_key': 'run_picard_rna_summary',
  842. 'xcom_pull_task': 'copy_bam_for_parallel_runs'})
  843. ## PIPELINE
  844. run_picard_rna_summary >> cleanup_picard_rna_summary_input
  845. ## TASK
  846. upload_picard_rna_summary_to_box = \
  847. PythonOperator(
  848. task_id='upload_picard_rna_summary_to_box',
  849. dag=dag,
  850. queue='hpc_4G',
  851. python_callable=upload_analysis_file_to_box,
  852. params={
  853. 'xcom_pull_task': 'run_picard_rna_summary',
  854. 'xcom_pull_files_key': 'picard_rna_summary',
  855. 'analysis_tag': 'Picard-CollectRnaSeqMetrics'})
  856. ## PIPELINE
  857. run_picard_rna_summary >> upload_picard_rna_summary_to_box
  858. ## TASK
  859. run_picard_gc_summary = \
  860. PythonOperator(
  861. task_id='run_picard_gc_summary',
  862. dag=dag,
  863. queue='hpc_4G',
  864. python_callable=run_picard_for_cellranger,
  865. params={
  866. 'xcom_pull_files_key': 'run_picard_gc_summary',
  867. 'xcom_pull_task': 'copy_bam_for_parallel_runs',
  868. 'analysis_description_xcom_pull_task': 'fetch_analysis_info',
  869. 'analysis_description_xcom_key': 'analysis_description',
  870. 'use_ephemeral_space': True,
  871. 'load_metrics_to_cram': True,
  872. 'java_param': '-Xmx4g',
  873. 'picard_command': 'CollectGcBiasMetrics',
  874. 'picard_option': {},
  875. 'analysis_files_xcom_key': 'picard_gc_summary',
  876. 'bam_files_xcom_key': None})
  877. ## PIPELINE
  878. copy_bam_for_parallel_runs >> run_picard_gc_summary
  879. ## TASK
  880. cleanup_picard_gc_summary_input = \
  881. PythonOperator(
  882. task_id='cleanup_picard_gc_summary_input',
  883. dag=dag,
  884. queue='hpc_4G',
  885. python_callable=clean_up_files,
  886. params={
  887. 'xcom_pull_files_key': 'run_picard_gc_summary',
  888. 'xcom_pull_task': 'copy_bam_for_parallel_runs'})
  889. ## PIPELINE
  890. run_picard_gc_summary >> cleanup_picard_gc_summary_input
  891. ## TASK
  892. upload_picard_gc_summary_to_box = \
  893. PythonOperator(
  894. task_id='upload_picard_gc_summary_to_box',
  895. dag=dag,
  896. queue='hpc_4G',
  897. python_callable=upload_analysis_file_to_box,
  898. params={
  899. 'xcom_pull_task': 'run_picard_gc_summary',
  900. 'xcom_pull_files_key': 'picard_gc_summary',
  901. 'analysis_tag': 'Picard-CollectGcBiasMetrics'})
  902. ## PIPELINE
  903. run_picard_gc_summary >> upload_picard_gc_summary_to_box
  904. ## TASK
  905. run_picard_base_dist_summary = \
  906. PythonOperator(
  907. task_id='run_picard_base_dist_summary',
  908. dag=dag,
  909. queue='hpc_4G',
  910. python_callable=run_picard_for_cellranger,
  911. params={
  912. 'xcom_pull_files_key': 'run_picard_base_dist_summary',
  913. 'xcom_pull_task': 'copy_bam_for_parallel_runs',
  914. 'analysis_description_xcom_pull_task': 'fetch_analysis_info',
  915. 'analysis_description_xcom_key': 'analysis_description',
  916. 'use_ephemeral_space': True,
  917. 'load_metrics_to_cram': True,
  918. 'java_param': '-Xmx4g',
  919. 'picard_command': 'CollectBaseDistributionByCycle',
  920. 'picard_option': {},
  921. 'analysis_files_xcom_key': 'picard_base_summary',
  922. 'bam_files_xcom_key': None})
  923. ## PIPELINE
  924. copy_bam_for_parallel_runs >> run_picard_base_dist_summary
  925. ## TASK
  926. cleanup_picard_base_dist_summary_input = \
  927. PythonOperator(
  928. task_id='cleanup_picard_base_dist_summary_input',
  929. dag=dag,
  930. queue='hpc_4G',
  931. python_callable=clean_up_files,
  932. params={
  933. 'xcom_pull_files_key': 'run_picard_base_dist_summary',
  934. 'xcom_pull_task': 'copy_bam_for_parallel_runs'})
  935. ## PIPELINE
  936. run_picard_base_dist_summary >> cleanup_picard_base_dist_summary_input
  937. ## TASK
  938. upload_picard_base_dist_summary_to_box = \
  939. PythonOperator(
  940. task_id='upload_picard_base_dist_summary_to_box',
  941. dag=dag,
  942. queue='hpc_4G',
  943. python_callable=upload_analysis_file_to_box,
  944. params={
  945. 'xcom_pull_task': 'run_picard_base_dist_summary',
  946. 'xcom_pull_files_key': 'picard_base_summary',
  947. 'analysis_tag': 'Picard-CollectBaseDistributionByCycle'})
  948. ## PIPELINE
  949. run_picard_base_dist_summary >> upload_picard_base_dist_summary_to_box
  950. ## TASK
  951. run_samtools_stats = \
  952. PythonOperator(
  953. task_id='run_samtools_stats',
  954. dag=dag,
  955. queue='hpc_4G4t',
  956. python_callable=run_samtools_for_cellranger,
  957. params={
  958. 'xcom_pull_files_key': 'run_samtools_stats',
  959. 'xcom_pull_task': 'copy_bam_for_parallel_runs',
  960. 'analysis_description_xcom_pull_task': 'fetch_analysis_info',
  961. 'analysis_description_xcom_key': 'analysis_description',
  962. 'use_ephemeral_space': True,
  963. 'load_metrics_to_cram': True,
  964. 'samtools_command': 'stats',
  965. 'threads': 4,
  966. 'analysis_files_xcom_key': 'samtools_stats'})
  967. ## PIPELINE
  968. copy_bam_for_parallel_runs >> run_samtools_stats
  969. ## TASK
  970. upload_samtools_stats_to_box = \
  971. PythonOperator(
  972. task_id='upload_samtools_stats_to_box',
  973. dag=dag,
  974. queue='hpc_4G',
  975. python_callable=upload_analysis_file_to_box,
  976. params={
  977. 'xcom_pull_task': 'run_samtools_stats',
  978. 'xcom_pull_files_key': 'samtools_stats',
  979. 'analysis_tag': 'Samtools-stats'})
  980. ## PIPELINE
  981. run_samtools_stats >> upload_samtools_stats_to_box
  982. ## TASK
  983. run_samtools_idxstats = \
  984. PythonOperator(
  985. task_id='run_samtools_idxstats',
  986. dag=dag,
  987. queue='hpc_4G4t',
  988. python_callable=run_samtools_for_cellranger,
  989. params={
  990. 'xcom_pull_files_key': 'run_samtools_stats',
  991. 'xcom_pull_task': 'copy_bam_for_parallel_runs',
  992. 'analysis_description_xcom_pull_task': 'fetch_analysis_info',
  993. 'analysis_description_xcom_key': 'analysis_description',
  994. 'use_ephemeral_space': True,
  995. 'load_metrics_to_cram': True,
  996. 'samtools_command': 'idxstats',
  997. 'threads': 4,
  998. 'analysis_files_xcom_key': 'samtools_idxstats'})
  999. ## PIPELINE
  1000. run_samtools_stats >> run_samtools_idxstats
  1001. ## TASK
  1002. cleanup_samtools_stats_input = \
  1003. PythonOperator(
  1004. task_id='cleanup_samtools_stats_input',
  1005. dag=dag,
  1006. queue='hpc_4G',
  1007. python_callable=clean_up_files,
  1008. params={
  1009. 'xcom_pull_files_key': 'run_samtools_stats',
  1010. 'xcom_pull_task': 'copy_bam_for_parallel_runs'})
  1011. ## PIPELINE
  1012. run_samtools_idxstats >> cleanup_samtools_stats_input
  1013. ## TASK
  1014. upload_samtools_idxstats_to_box = \
  1015. PythonOperator(
  1016. task_id='upload_samtools_idxstats_to_box',
  1017. dag=dag,
  1018. queue='hpc_4G',
  1019. python_callable=upload_analysis_file_to_box,
  1020. params={
  1021. 'xcom_pull_task': 'run_samtools_idxstats',
  1022. 'xcom_pull_files_key': 'samtools_idxstats',
  1023. 'analysis_tag': 'Samtools-idxstats'})
  1024. ## PIPELINE
  1025. run_samtools_idxstats >> upload_samtools_idxstats_to_box
  1026. ## TASK
  1027. run_multiqc = \
  1028. PythonOperator(
  1029. task_id='run_multiqc',
  1030. dag=dag,
  1031. queue='hpc_4G',
  1032. trigger_rule='none_failed_or_skipped',
  1033. python_callable=run_multiqc_for_cellranger,
  1034. params={
  1035. 'list_of_analysis_xcoms_and_tasks': {
  1036. 'run_cellranger': 'cellranger_output',
  1037. 'run_picard_alignment_summary': 'picard_alignment_summary',
  1038. 'run_picard_qual_summary': 'picard_qual_summary',
  1039. 'run_picard_rna_summary': 'picard_rna_summary',
  1040. 'run_picard_gc_summary': 'picard_gc_summary',
  1041. 'run_picard_base_dist_summary': 'picard_base_summary',
  1042. 'run_samtools_stats': 'samtools_stats',
  1043. 'run_samtools_idxstats': 'samtools_idxstats'},
  1044. 'analysis_description_xcom_pull_task': 'fetch_analysis_info',
  1045. 'analysis_description_xcom_key': 'analysis_description',
  1046. 'use_ephemeral_space': True,
  1047. 'multiqc_html_file_xcom_key': 'multiqc_html',
  1048. 'multiqc_data_file_xcom_key': 'multiqc_data',
  1049. 'tool_order_list': ['picad', 'samtools']})
  1050. ## PIPELINE
  1051. run_picard_alignment_summary >> run_multiqc
  1052. run_picard_qual_summary >> run_multiqc
  1053. run_picard_rna_summary >> run_multiqc
  1054. run_picard_gc_summary >> run_multiqc
  1055. run_picard_base_dist_summary >> run_multiqc
  1056. run_samtools_idxstats >> run_multiqc
  1057. ## TASK
  1058. load_multiqc_html = \
  1059. PythonOperator(
  1060. task_id='load_multiqc_html',
  1061. dag=dag,
  1062. queue='hpc_4G',
  1063. python_callable=load_analysis_files_func,
  1064. params={
  1065. 'collection_name_task': 'load_cellranger_result_to_db',
  1066. 'collection_name_key': 'sample_igf_id',
  1067. 'file_name_task': 'run_multiqc',
  1068. 'file_name_key': 'multiqc_html',
  1069. 'analysis_name': 'multiqc',
  1070. 'collection_type': 'MULTIQC_HTML',
  1071. 'collection_table': 'sample',
  1072. 'output_files_key': 'output_db_files'})
  1073. ## PIPELINE
  1074. run_multiqc >> load_multiqc_html
  1075. ## TASK
  1076. upload_multiqc_to_ftp = \
  1077. PythonOperator(
  1078. task_id='upload_multiqc_to_ftp',
  1079. dag=dag,
  1080. queue='hpc_4G',
  1081. python_callable=ftp_files_upload_for_analysis,
  1082. params={
  1083. 'xcom_pull_task': 'load_multiqc_html',
  1084. 'xcom_pull_files_key': 'output_db_files',
  1085. 'collection_name_task': 'load_cellranger_result_to_db',
  1086. 'collection_name_key': 'sample_igf_id',
  1087. 'collection_type': 'FTP_MULTIQC_HTML',
  1088. 'collection_table': 'sample',
  1089. 'collect_remote_file': True})
  1090. ## PIPELINE
  1091. load_multiqc_html >> upload_multiqc_to_ftp
  1092. ## TASK
  1093. upload_multiqc_to_box = \
  1094. PythonOperator(
  1095. task_id='upload_multiqc_to_box',
  1096. dag=dag,
  1097. queue='hpc_4G',
  1098. python_callable=upload_analysis_file_to_box,
  1099. params={
  1100. 'xcom_pull_task': 'load_multiqc_html',
  1101. 'xcom_pull_files_key': 'output_db_files',
  1102. 'analysis_tag': 'multiqc_report'})
  1103. ## PIPELINE
  1104. load_multiqc_html >> upload_multiqc_to_box
  1105. ## TASK
  1106. update_analysis_and_status = \
  1107. PythonOperator(
  1108. task_id='update_analysis_and_status',
  1109. dag=dag,
  1110. queue='hpc_4G',
  1111. python_callable=change_pipeline_status,
  1112. trigger_rule='none_failed_or_skipped',
  1113. params={
  1114. 'new_status': 'FINISHED',
  1115. 'no_change_status': 'SEEDED'})
  1116. ## PIPELINE
  1117. upload_multiqc_to_ftp >> update_analysis_and_status
  1118. upload_scanpy_report_for_sc_5p_to_ftp >> update_analysis_and_status
  1119. upload_scanpy_report_for_sc_5p_to_box >> update_analysis_and_status
  1120. upload_cellbrowser_for_sc_5p_to_ftp >> update_analysis_and_status
  1121. #upload_scirpy_report_for_vdj_to_ftp >> update_analysis_and_status # no more ambiguous VDJ
  1122. #upload_scirpy_report_for_vdj_to_box >> update_analysis_and_status
  1123. upload_scirpy_report_for_vdj_b_to_ftp >> update_analysis_and_status
  1124. upload_scirpy_report_for_vdj_b_to_box >> update_analysis_and_status
  1125. upload_scirpy_report_for_vdj_t_to_ftp >> update_analysis_and_status
  1126. upload_scirpy_report_for_vdj_t_to_box >> update_analysis_and_status
  1127. upload_seurat_report_for_sc_5p_ftp >> update_analysis_and_status
  1128. upload_seurat_report_for_sc_5p_to_box >> update_analysis_and_status
  1129. upload_cellranger_results_to_irods >> update_analysis_and_status
  1130. upload_cellranger_report_to_ftp >> update_analysis_and_status
  1131. upload_cellranger_report_to_box >> update_analysis_and_status
  1132. upload_cram_to_irods >> update_analysis_and_status
  1133. upload_scvelo_report_to_box >> update_analysis_and_status
  1134. upload_scvelo_report_to_ftp >> update_analysis_and_status
  1135. ## TASK
  1136. update_qc_pages = \
  1137. PythonOperator(
  1138. task_id='update_qc_pages',
  1139. dag=dag,
  1140. queue='hpc_4G',
  1141. python_callable=create_and_update_qc_pages,
  1142. params={
  1143. 'use_ephemeral_space': True,
  1144. 'collection_type_list': [
  1145. 'FTP_MULTIQC_HTML',
  1146. 'FTP_SEURAT_HTML',
  1147. 'FTP_SCIRPY_VDJ_T_HTML',
  1148. 'FTP_SCIRPY_VDJ_B_HTML',
  1149. 'FTP_SCIRPY_VDJ_HTML',
  1150. 'FTP_CELLBROWSER',
  1151. 'FTP_SCANPY_HTML',
  1152. 'FTP_SCVELO_HTML',
  1153. 'FTP_CELLRANGER_HTML']})
  1154. ## PIPELINE
  1155. update_analysis_and_status >> update_qc_pages