dag9_tenx_single_cell_immune_profiling.py 30 KB


  1. from datetime import timedelta
  2. import os,json,logging,subprocess
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dummy_operator import DummyOperator
  8. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import fetch_analysis_info_and_branch_func
  9. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import configure_cellranger_run_func
  10. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_sc_read_trimmming_func
  11. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_cellranger_tool
  12. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import decide_analysis_branch_func
  13. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_cellranger_result_to_db_func
  14. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import ftp_files_upload_for_analysis
  15. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import irods_files_upload_for_analysis
  16. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_scanpy_for_sc_5p_func
  17. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_singlecell_notebook_wrapper_func
  18. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_analysis_files_func
  19. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import task_branch_function
  20. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import upload_analysis_file_to_box
  21. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import convert_bam_to_cram_func
  22. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_picard_for_cellranger
  23. ## ARGS
  24. default_args = {
  25. 'owner': 'airflow',
  26. 'depends_on_past': False,
  27. 'start_date': days_ago(2),
  28. 'email_on_failure': False,
  29. 'email_on_retry': False,
  30. 'retries': 4,
  31. 'max_active_runs':10,
  32. 'catchup':False,
  33. 'retry_delay': timedelta(minutes=5),
  34. 'provide_context': True,
  35. }
  36. FEATURE_TYPE_LIST = Variable.get('tenx_single_cell_immune_profiling_feature_types').split(',')
  37. ## DAG
  38. dag = \
  39. DAG(
  40. dag_id='dag9_tenx_single_cell_immune_profiling',
  41. schedule_interval=None,
  42. tags=['hpc','analysis','tenx','sc'],
  43. default_args=default_args,
  44. orientation='LR')
  45. with dag:
  46. ## TASK
  47. fetch_analysis_info_and_branch = \
  48. BranchPythonOperator(
  49. task_id='fetch_analysis_info',
  50. dag=dag,
  51. queue='hpc_4G',
  52. params={'no_analysis_task':'no_analysis',
  53. 'analysis_description_xcom_key':'analysis_description',
  54. 'analysis_info_xcom_key':'analysis_info'},
  55. python_callable=fetch_analysis_info_and_branch_func)
  56. ## TASK
  57. configure_cellranger_run = \
  58. PythonOperator(
  59. task_id='configure_cellranger_run',
  60. dag=dag,
  61. queue='hpc_4G',
  62. trigger_rule='none_failed_or_skipped',
  63. params={'xcom_pull_task_id':'fetch_analysis_info',
  64. 'analysis_description_xcom_key':'analysis_description',
  65. 'analysis_info_xcom_key':'analysis_info',
  66. 'library_csv_xcom_key':'cellranger_library_csv'},
  67. python_callable=configure_cellranger_run_func)
  68. for analysis_name in FEATURE_TYPE_LIST:
  69. ## TASK
  70. task_branch = \
  71. BranchPythonOperator(
  72. task_id=analysis_name,
  73. dag=dag,
  74. queue='hpc_4G',
  75. params={'xcom_pull_task_id':'fetch_analysis_info',
  76. 'analysis_info_xcom_key':'analysis_info',
  77. 'analysis_name':analysis_name,
  78. 'task_prefix':'run_trim'},
  79. python_callable=task_branch_function)
  80. run_trim_list = list()
  81. for run_id in range(0,10):
  82. ## TASK
  83. t = \
  84. PythonOperator(
  85. task_id='run_trim_{0}_{1}'.format(analysis_name,run_id),
  86. dag=dag,
  87. queue='hpc_4G',
  88. params={'xcom_pull_task_id':'fetch_analysis_info',
  89. 'analysis_info_xcom_key':'analysis_info',
  90. 'analysis_description_xcom_key':'analysis_description',
  91. 'analysis_name':analysis_name,
  92. 'run_id':run_id,
  93. 'r1_length':0,
  94. 'r2_length':0,
  95. 'fastq_input_dir_tag':'fastq_dir',
  96. 'fastq_output_dir_tag':'output_path'},
  97. python_callable=run_sc_read_trimmming_func)
  98. run_trim_list.append(t)
  99. ## TASK
  100. collect_trimmed_files = \
  101. DummyOperator(
  102. task_id='collect_trimmed_files_{0}'.format(analysis_name),
  103. trigger_rule='none_failed_or_skipped',
  104. dag=dag)
  105. ## PIPELINE
  106. fetch_analysis_info_and_branch >> task_branch
  107. task_branch >> run_trim_list
  108. run_trim_list >> collect_trimmed_files
  109. collect_trimmed_files >> configure_cellranger_run
  110. ## TASK
  111. no_analysis = \
  112. DummyOperator(
  113. task_id='no_analysis',
  114. dag=dag)
  115. ## PIPELINE
  116. fetch_analysis_info_and_branch >> no_analysis
  117. ## TASK
  118. run_cellranger = \
  119. PythonOperator(
  120. task_id='run_cellranger',
  121. dag=dag,
  122. queue='hpc_64G16t24hr',
  123. params={'analysis_description_xcom_pull_task':'fetch_analysis_info',
  124. 'analysis_description_xcom_key':'analysis_description',
  125. 'library_csv_xcom_key':'cellranger_library_csv',
  126. 'library_csv_xcom_pull_task':'configure_cellranger_run',
  127. 'cellranger_xcom_key':'cellranger_output',
  128. 'cellranger_options':['--localcores 16','--localmem 64']},
  129. python_callable=run_cellranger_tool)
  130. ## PIPELINE
  131. configure_cellranger_run >> run_cellranger
  132. ## TASK
  133. decide_analysis_branch = \
  134. BranchPythonOperator(
  135. task_id='decide_analysis_branch',
  136. dag=dag,
  137. queue='hpc_4G',
  138. python_callable=decide_analysis_branch_func,
  139. params={'load_cellranger_result_to_db_task':'load_cellranger_result_to_db',
  140. 'run_scanpy_for_sc_5p_task':'run_scanpy_for_sc_5p',
  141. 'run_scirpy_for_vdj_task':'run_scirpy_for_vdj',
  142. 'run_scirpy_for_vdj_b_task':'run_scirpy_for_vdj_b',
  143. 'run_scirpy_vdj_t_task':'run_scirpy_for_vdj_t',
  144. 'run_seurat_for_sc_5p_task':'run_seurat_for_sc_5p',
  145. 'run_picard_alignment_summary_task':'run_picard_alignment_summary',
  146. 'convert_bam_to_cram_task':'convert_bam_to_cram',
  147. 'library_csv_xcom_key':'cellranger_library_csv',
  148. 'library_csv_xcom_pull_task':'configure_cellranger_run'})
  149. ## PIPELINE
  150. run_cellranger >> decide_analysis_branch
  151. ## TASK
  152. load_cellranger_result_to_db = \
  153. PythonOperator(
  154. task_id='load_cellranger_result_to_db',
  155. dag=dag,
  156. queue='hpc_4G',
  157. python_callable=load_cellranger_result_to_db_func,
  158. params={'analysis_description_xcom_pull_task':'fetch_analysis_info',
  159. 'analysis_description_xcom_key':'analysis_description',
  160. 'cellranger_xcom_key':'cellranger_output',
  161. 'cellranger_xcom_pull_task':'run_cellranger',
  162. 'collection_type':'CELLRANGER_MULTI',
  163. 'collection_table':'sample',
  164. 'xcom_collection_name_key':'sample_igf_id',
  165. 'genome_column':'genome_build',
  166. 'analysis_name':'cellranger_multi',
  167. 'output_xcom_key':'loaded_output_files',
  168. 'html_xcom_key':'html_report_file',
  169. 'html_report_file_name':'web_summary.html'})
  170. upload_cellranger_report_to_ftp = \
  171. PythonOperator(
  172. task_id='upload_cellranger_report_to_ftp',
  173. dag=dag,
  174. queue='hpc_4G',
  175. python_callable=ftp_files_upload_for_analysis,
  176. params={'xcom_pull_task':'load_cellranger_result_to_db',
  177. 'xcom_pull_files_key':'html_report_file',
  178. 'collection_name_task':'load_cellranger_result_to_db',
  179. 'collection_name_key':'sample_igf_id',
  180. 'collection_type':'FTP_CELLRANGER_MULTI',
  181. 'collection_table':'sample',
  182. 'collect_remote_file':True})
  183. upload_cellranger_report_to_box = \
  184. PythonOperator(
  185. task_id='upload_cellranger_report_to_box',
  186. dag=dag,
  187. queue='hpc_4G',
  188. python_callable=None,
  189. params={'xcom_pull_task':'load_cellranger_result_to_db',
  190. 'xcom_pull_files_key':'html_report_file'})
  191. upload_cellranger_results_to_irods = \
  192. PythonOperator(
  193. task_id='upload_cellranger_results_to_irods',
  194. dag=dag,
  195. queue='hpc_4G',
  196. python_callable=irods_files_upload_for_analysis,
  197. params={'xcom_pull_task':'load_cellranger_result_to_db',
  198. 'xcom_pull_files_key':'loaded_output_files',
  199. 'collection_name_key':'sample_igf_id',
  200. 'analysis_name':'cellranger_multi'})
  201. ## PIPELINE
  202. decide_analysis_branch >> load_cellranger_result_to_db
  203. load_cellranger_result_to_db >> upload_cellranger_report_to_ftp
  204. load_cellranger_result_to_db >> upload_cellranger_report_to_box
  205. load_cellranger_result_to_db >> upload_cellranger_results_to_irods
  206. ## TASK
  207. run_scanpy_for_sc_5p = \
  208. PythonOperator(
  209. task_id='run_scanpy_for_sc_5p',
  210. dag=dag,
  211. queue='hpc_4G',
  212. python_callable=run_singlecell_notebook_wrapper_func,
  213. params={'cellranger_xcom_key':'cellranger_output',
  214. 'cellranger_xcom_pull_task':'run_cellranger',
  215. 'scanpy_timeout':1200,
  216. 'allow_errors':False,
  217. 'kernel_name':'python3',
  218. 'count_dir':'count',
  219. 'analysis_name':'scanpy',
  220. 'output_notebook_key':'scanpy_notebook',
  221. 'output_cellbrowser_key':'cellbrowser_dirs',
  222. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  223. 'analysis_description_xcom_key':'analysis_description'})
  224. load_scanpy_report_for_sc_5p_to_db = \
  225. PythonOperator(
  226. task_id='load_scanpy_report_for_sc_5p_to_db',
  227. dag=dag,
  228. queue='hpc_4G',
  229. python_callable=load_analysis_files_func,
  230. params={'collection_name_task':'load_cellranger_result_to_db',
  231. 'collection_name_key':'sample_igf_id',
  232. 'file_name_task':'run_scanpy_for_sc_5p',
  233. 'file_name_key':'scanpy_notebook',
  234. 'analysis_name':'scanpy_5p',
  235. 'collection_type':'SCANPY_HTML',
  236. 'collection_table':'sample',
  237. 'output_files_key':'output_db_files'})
  238. upload_scanpy_report_for_sc_5p_to_ftp = \
  239. PythonOperator(
  240. task_id='upload_scanpy_report_for_sc_5p_to_ftp',
  241. dag=dag,
  242. queue='hpc_4G',
  243. python_callable=ftp_files_upload_for_analysis,
  244. params={'xcom_pull_task':'load_scanpy_report_for_sc_5p_to_db',
  245. 'xcom_pull_files_key':'output_db_files',
  246. 'collection_name_task':'load_cellranger_result_to_db',
  247. 'collection_name_key':'sample_igf_id',
  248. 'collection_type':'FTP_SCANPY_HTML',
  249. 'collection_table':'sample',
  250. 'collect_remote_file':True})
  251. upload_scanpy_report_for_sc_5p_to_box = \
  252. PythonOperator(
  253. task_id='upload_scanpy_report_for_sc_5p_to_box',
  254. dag=dag,
  255. queue='hpc_4G',
  256. python_callable=upload_analysis_file_to_box,
  257. params={'xcom_pull_task':'load_scanpy_report_for_sc_5p_to_db',
  258. 'xcom_pull_files_key':'output_db_files',
  259. 'analysis_tag':'scanpy_single_sample_report'})
  260. upload_cellbrowser_for_sc_5p_to_ftp = \
  261. PythonOperator(
  262. task_id='upload_cellbrowser_for_sc_5p_to_ftp',
  263. dag=dag,
  264. queue='hpc_4G',
  265. python_callable=ftp_files_upload_for_analysis,
  266. params={'xcom_pull_task':'run_scanpy_for_sc_5p',
  267. 'xcom_pull_files_key':'cellbrowser_dirs',
  268. 'collection_name_task':'load_cellranger_result_to_db',
  269. 'collection_name_key':'sample_igf_id',
  270. 'collection_type':'FTP_CELLBROWSER',
  271. 'collection_table':'sample',
  272. 'collect_remote_file':True})
  273. ## PIPELINE
  274. decide_analysis_branch >> run_scanpy_for_sc_5p
  275. run_scanpy_for_sc_5p >> load_scanpy_report_for_sc_5p_to_db
  276. load_scanpy_report_for_sc_5p_to_db >> upload_scanpy_report_for_sc_5p_to_ftp
  277. load_scanpy_report_for_sc_5p_to_db >> upload_scanpy_report_for_sc_5p_to_box
  278. run_scanpy_for_sc_5p >> upload_cellbrowser_for_sc_5p_to_ftp
  279. ## TASK
  280. run_scirpy_for_vdj = \
  281. PythonOperator(
  282. task_id='run_scirpy_for_vdj',
  283. dag=dag,
  284. queue='hpc_4G',
  285. python_callable=run_singlecell_notebook_wrapper_func,
  286. params={'cellranger_xcom_key':'cellranger_output',
  287. 'cellranger_xcom_pull_task':'run_cellranger',
  288. 'scanpy_timeout':1200,
  289. 'allow_errors':False,
  290. 'kernel_name':'python3',
  291. 'analysis_name':'scirpy',
  292. 'vdj_dir':'vdj',
  293. 'count_dir':'count',
  294. 'output_notebook_key':'scirpy_notebook',
  295. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  296. 'analysis_description_xcom_key':'analysis_description'})
  297. load_scirpy_report_for_vdj_to_db = \
  298. PythonOperator(
  299. task_id='load_scirpy_report_for_vdj_to_db',
  300. dag=dag,
  301. queue='hpc_4G',
  302. python_callable=load_analysis_files_func,
  303. params={'collection_name_task':'load_cellranger_result_to_db',
  304. 'collection_name_key':'sample_igf_id',
  305. 'file_name_task':'run_scirpy_for_vdj',
  306. 'file_name_key':'scirpy_notebook',
  307. 'analysis_name':'scirpy_vdj',
  308. 'collection_type':'SCIRPY_VDJ_HTML',
  309. 'collection_table':'sample',
  310. 'output_files_key':'output_db_files'})
  311. upload_scirpy_report_for_vdj_to_ftp = \
  312. PythonOperator(
  313. task_id='upload_scirpy_report_for_vdj_to_ftp',
  314. dag=dag,
  315. queue='hpc_4G',
  316. python_callable=ftp_files_upload_for_analysis,
  317. params={'xcom_pull_task':'load_scanpy_report_for_vdj_to_db',
  318. 'xcom_pull_files_key':'output_db_files',
  319. 'collection_name_task':'load_cellranger_result_to_db',
  320. 'collection_name_key':'sample_igf_id',
  321. 'collection_type':'FTP_SCIRPY_VDJ_HTML',
  322. 'collection_table':'sample',
  323. 'collect_remote_file':True})
  324. upload_scirpy_report_for_vdj_to_box = \
  325. PythonOperator(
  326. task_id='upload_scirpy_report_for_vdj_to_box',
  327. dag=dag,
  328. queue='hpc_4G',
  329. python_callable=upload_analysis_file_to_box,
  330. params={'xcom_pull_task':'load_scanpy_report_for_vdj_to_db',
  331. 'xcom_pull_files_key':'output_db_files',
  332. 'analysis_tag':'scirpy_vdj_single_sample_report'})
  333. ## PIPELINE
  334. decide_analysis_branch >> run_scirpy_for_vdj
  335. run_scirpy_for_vdj >> load_scirpy_report_for_vdj_to_db
  336. load_scirpy_report_for_vdj_to_db >> upload_scirpy_report_for_vdj_to_ftp
  337. load_scirpy_report_for_vdj_to_db >> upload_scirpy_report_for_vdj_to_box
  338. ## TASK
  339. run_scirpy_for_vdj_b = \
  340. PythonOperator(
  341. task_id='run_scirpy_for_vdj_b',
  342. dag=dag,
  343. queue='hpc_4G',
  344. python_callable=run_singlecell_notebook_wrapper_func,
  345. params={'cellranger_xcom_key':'cellranger_output',
  346. 'cellranger_xcom_pull_task':'run_cellranger',
  347. 'scanpy_timeout':1200,
  348. 'allow_errors':False,
  349. 'kernel_name':'python3',
  350. 'analysis_name':'scirpy',
  351. 'vdj_dir':'vdj_b',
  352. 'count_dir':'count',
  353. 'output_notebook_key':'scirpy_notebook',
  354. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  355. 'analysis_description_xcom_key':'analysis_description'})
  356. load_scirpy_report_for_vdj_b_to_db = \
  357. PythonOperator(
  358. task_id='load_scirpy_report_for_vdj_b_to_db',
  359. dag=dag,
  360. queue='hpc_4G',
  361. python_callable=load_analysis_files_func,
  362. params={'collection_name_task':'load_cellranger_result_to_db',
  363. 'collection_name_key':'sample_igf_id',
  364. 'file_name_task':'run_scirpy_for_vdj_b',
  365. 'file_name_key':'scirpy_notebook',
  366. 'analysis_name':'scirpy_vdj_b',
  367. 'collection_type':'SCIRPY_VDJ_B_HTML',
  368. 'collection_table':'sample',
  369. 'output_files_key':'output_db_files'})
  370. upload_scirpy_report_for_vdj_b_to_ftp = \
  371. PythonOperator(
  372. task_id='upload_scirpy_report_for_vdj_b_to_ftp',
  373. dag=dag,
  374. queue='hpc_4G',
  375. python_callable=ftp_files_upload_for_analysis,
  376. params={'xcom_pull_task':'load_scanpy_report_for_vdj_b_to_db',
  377. 'xcom_pull_files_key':'output_db_files',
  378. 'collection_name_task':'load_cellranger_result_to_db',
  379. 'collection_name_key':'sample_igf_id',
  380. 'collection_type':'FTP_SCIRPY_VDJ_B_HTML',
  381. 'collection_table':'sample',
  382. 'collect_remote_file':True})
  383. upload_scirpy_report_for_vdj_b_to_box = \
  384. PythonOperator(
  385. task_id='upload_scanpy_report_for_vdj_b_to_box',
  386. dag=dag,
  387. queue='hpc_4G',
  388. python_callable=upload_analysis_file_to_box,
  389. params={'xcom_pull_task':'load_scanpy_report_for_vdj_b_to_db',
  390. 'xcom_pull_files_key':'output_db_files',
  391. 'analysis_tag':'scirpy_vdj_b_single_sample_report'})
  392. ## PIPELINE
  393. decide_analysis_branch >> run_scirpy_for_vdj_b
  394. run_scirpy_for_vdj_b >> load_scirpy_report_for_vdj_b_to_db
  395. load_scirpy_report_for_vdj_b_to_db >> upload_scirpy_report_for_vdj_b_to_ftp
  396. load_scirpy_report_for_vdj_b_to_db >> upload_scirpy_report_for_vdj_b_to_box
  397. ## TASK
  398. run_scirpy_for_vdj_t = \
  399. PythonOperator(
  400. task_id='run_scirpy_for_vdj_t',
  401. dag=dag,
  402. queue='hpc_4G',
  403. python_callable=run_singlecell_notebook_wrapper_func,
  404. params={'cellranger_xcom_key':'cellranger_output',
  405. 'cellranger_xcom_pull_task':'run_cellranger',
  406. 'scanpy_timeout':1200,
  407. 'allow_errors':False,
  408. 'kernel_name':'python3',
  409. 'analysis_name':'scirpy',
  410. 'vdj_dir':'vdj_t',
  411. 'count_dir':'count',
  412. 'output_notebook_key':'scirpy_notebook',
  413. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  414. 'analysis_description_xcom_key':'analysis_description'})
  415. load_scirpy_report_for_vdj_t_to_db = \
  416. PythonOperator(
  417. task_id='load_scirpy_report_for_vdj_t_to_db',
  418. dag=dag,
  419. queue='hpc_4G',
  420. python_callable=load_analysis_files_func,
  421. params={'collection_name_task':'load_cellranger_result_to_db',
  422. 'collection_name_key':'sample_igf_id',
  423. 'file_name_task':'run_scirpy_for_vdj_t',
  424. 'file_name_key':'scirpy_notebook',
  425. 'analysis_name':'scirpy_vdj_t',
  426. 'collection_type':'SCIRPY_VDJ_T_HTML',
  427. 'collection_table':'sample',
  428. 'output_files_key':'output_db_files'})
  429. upload_scirpy_report_for_vdj_t_to_ftp = \
  430. PythonOperator(
  431. task_id='upload_scirpy_report_for_vdj_t_to_ftp',
  432. dag=dag,
  433. queue='hpc_4G',
  434. python_callable=ftp_files_upload_for_analysis,
  435. params={'xcom_pull_task':'load_scanpy_report_for_vdj_t_to_db',
  436. 'xcom_pull_files_key':'output_db_files',
  437. 'collection_name_task':'load_cellranger_result_to_db',
  438. 'collection_name_key':'sample_igf_id',
  439. 'collection_type':'FTP_SCIRPY_VDJ_T_HTML',
  440. 'collection_table':'sample',
  441. 'collect_remote_file':True})
  442. upload_scirpy_report_for_vdj_t_to_box = \
  443. PythonOperator(
  444. task_id='upload_scirpy_report_for_vdj_t_to_box',
  445. dag=dag,
  446. queue='hpc_4G',
  447. python_callable=upload_analysis_file_to_box,
  448. params={'xcom_pull_task':'load_scanpy_report_for_vdj_t_to_db',
  449. 'xcom_pull_files_key':'output_db_files',
  450. 'analysis_tag':'scirpy_vdj_t_single_sample_report'})
  451. ## PIPELINE
  452. decide_analysis_branch >> run_scirpy_for_vdj_t
  453. run_scirpy_for_vdj_t >> load_scirpy_report_for_vdj_t_to_db
  454. load_scirpy_report_for_vdj_t_to_db >> upload_scirpy_report_for_vdj_t_to_ftp
  455. load_scirpy_report_for_vdj_t_to_db >> upload_scirpy_report_for_vdj_t_to_box
  456. ## TASK
  457. run_seurat_for_sc_5p = \
  458. PythonOperator(
  459. task_id='run_seurat_for_sc_5p',
  460. dag=dag,
  461. queue='hpc_4G',
  462. python_callable=run_singlecell_notebook_wrapper_func,
  463. params={'cellranger_xcom_key':'cellranger_output',
  464. 'cellranger_xcom_pull_task':'run_cellranger',
  465. 'scanpy_timeout':1200,
  466. 'allow_errors':False,
  467. 'kernel_name':'R',
  468. 'analysis_name':'seurat',
  469. 'vdj_dir':'vdj',
  470. 'count_dir':'count',
  471. 'output_notebook_key':'seurat_notebook',
  472. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  473. 'analysis_description_xcom_key':'analysis_description'})
  474. load_seurat_report_for_sc_5p_db = \
  475. PythonOperator(
  476. task_id='load_seurat_report_for_sc_5p_db',
  477. dag=dag,
  478. queue='hpc_4G',
  479. python_callable=load_analysis_files_func,
  480. params={'collection_name_task':'load_cellranger_result_to_db',
  481. 'collection_name_key':'sample_igf_id',
  482. 'file_name_task':'run_seurat_for_sc_5p',
  483. 'file_name_key':'seurat_notebook',
  484. 'analysis_name':'seurat_5p',
  485. 'collection_type':'SEURAT_HTML',
  486. 'collection_table':'sample',
  487. 'output_files_key':'output_db_files'})
  488. upload_seurat_report_for_sc_5p_ftp = \
  489. PythonOperator(
  490. task_id='upload_seurat_report_for_sc_5p_ftp',
  491. dag=dag,
  492. queue='hpc_4G',
  493. python_callable=ftp_files_upload_for_analysis,
  494. params={'xcom_pull_task':'load_seurat_report_for_sc_5p_db',
  495. 'xcom_pull_files_key':'output_db_files',
  496. 'collection_name_task':'load_cellranger_result_to_db',
  497. 'collection_name_key':'sample_igf_id',
  498. 'collection_type':'FTP_SEURAT_HTML',
  499. 'collection_table':'sample',
  500. 'collect_remote_file':True})
  501. upload_seurat_report_for_sc_5p_to_box = \
  502. PythonOperator(
  503. task_id='upload_seurat_report_for_sc_5p_to_box',
  504. dag=dag,
  505. queue='hpc_4G',
  506. python_callable=upload_analysis_file_to_box,
  507. params={'xcom_pull_task':'load_seurat_report_for_sc_5p_db',
  508. 'xcom_pull_files_key':'output_db_files',
  509. 'analysis_tag':'seurat_single_sample_report'})
  510. ## PIPELINE
  511. decide_analysis_branch >> run_seurat_for_sc_5p
  512. run_seurat_for_sc_5p >> load_seurat_report_for_sc_5p_db
  513. load_seurat_report_for_sc_5p_db >> upload_seurat_report_for_sc_5p_ftp
  514. load_seurat_report_for_sc_5p_db >> upload_seurat_report_for_sc_5p_to_box
  515. ## TASK
  516. convert_bam_to_cram = \
  517. PythonOperator(
  518. task_id='convert_cellranger_bam_to_cram',
  519. dag=dag,
  520. queue='hpc_4G4t',
  521. python_callable=convert_bam_to_cram_func,
  522. params={'xcom_pull_files_key':'cellranger_output',
  523. 'xcom_pull_task':'run_cellranger',
  524. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  525. 'analysis_description_xcom_key':'analysis_description',
  526. 'use_ephemeral_space':True,
  527. 'threads':4,
  528. 'analysis_name':'cellranger',
  529. 'collection_type':'ALIGNED_CRAM',
  530. 'collection_table':'sample',
  531. 'cram_files_xcom_key':'cram_files'})
  532. upload_cram_to_irods = \
  533. PythonOperator(
  534. task_id='upload_cram_to_irods',
  535. dag=dag,
  536. queue='hpc_4G',
  537. python_callable=irods_files_upload_for_analysis,
  538. params={'xcom_pull_task':'convert_cellranger_bam_to_cram',
  539. 'xcom_pull_files_key':'cram_files',
  540. 'collection_name_key':'sample_igf_id',
  541. 'analysis_name':'cellranger_multi'})
  542. ## PIPELINE
  543. decide_analysis_branch >> convert_bam_to_cram
  544. convert_bam_to_cram >> upload_cram_to_irods
  545. ## TASK
  546. run_picard_alignment_summary = \
  547. PythonOperator(
  548. task_id='run_picard_alignment_summary',
  549. dag=dag,
  550. queue='hpc_4G',
  551. python_callable=run_picard_for_cellranger,
  552. params={'xcom_pull_files_key':'cellranger_output',
  553. 'xcom_pull_task':'run_cellranger',
  554. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  555. 'analysis_description_xcom_key':'analysis_description',
  556. 'use_ephemeral_space':True,
  557. 'load_metrics_to_cram':True,
  558. 'output_prefix':None,
  559. 'java_param':'-Xmx4g',
  560. 'picard_command':'CollectAlignmentSummaryMetrics',
  561. 'picard_option':{},
  562. 'analysis_files_xcom_key':'picard_alignment_summary',
  563. 'bam_files_xcom_key':None})
  564. ## PIPELINE
  565. decide_analysis_branch >> run_picard_alignment_summary
  566. ## TASK
  567. run_picard_qual_summary = \
  568. PythonOperator(
  569. task_id='run_picard_qual_summary',
  570. dag=dag,
  571. queue='hpc_4G',
  572. python_callable=run_picard_for_cellranger,
  573. params={'xcom_pull_files_key':'cellranger_output',
  574. 'xcom_pull_task':'run_cellranger',
  575. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  576. 'analysis_description_xcom_key':'analysis_description',
  577. 'use_ephemeral_space':True,
  578. 'load_metrics_to_cram':True,
  579. 'output_prefix':None,
  580. 'java_param':'-Xmx4g',
  581. 'picard_command':'QualityScoreDistribution',
  582. 'picard_option':{},
  583. 'analysis_files_xcom_key':'picard_qual_summary',
  584. 'bam_files_xcom_key':None})
  585. ## PIPELINE
  586. run_picard_alignment_summary >> run_picard_qual_summary
  587. ## TASK
  588. run_picard_rna_summary = \
  589. PythonOperator(
  590. task_id='run_picard_rna_summary',
  591. dag=dag,
  592. queue='hpc_4G',
  593. python_callable=run_picard_for_cellranger,
  594. params={'xcom_pull_files_key':'cellranger_output',
  595. 'xcom_pull_task':'run_cellranger',
  596. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  597. 'analysis_description_xcom_key':'analysis_description',
  598. 'use_ephemeral_space':True,
  599. 'load_metrics_to_cram':True,
  600. 'output_prefix':None,
  601. 'java_param':'-Xmx4g',
  602. 'picard_command':'CollectRnaSeqMetrics',
  603. 'picard_option':{},
  604. 'analysis_files_xcom_key':'picard_rna_summary',
  605. 'bam_files_xcom_key':None})
  606. ## PIPELINE
  607. run_picard_qual_summary >> run_picard_rna_summary
  608. ## TASK
  609. run_picard_gc_summary = \
  610. PythonOperator(
  611. task_id='run_picard_gc_summary',
  612. dag=dag,
  613. queue='hpc_4G',
  614. python_callable=run_picard_for_cellranger,
  615. params={'xcom_pull_files_key':'cellranger_output',
  616. 'xcom_pull_task':'run_cellranger',
  617. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  618. 'analysis_description_xcom_key':'analysis_description',
  619. 'use_ephemeral_space':True,
  620. 'load_metrics_to_cram':True,
  621. 'output_prefix':None,
  622. 'java_param':'-Xmx4g',
  623. 'picard_command':'CollectGcBiasMetrics',
  624. 'picard_option':{},
  625. 'analysis_files_xcom_key':'picard_gc_summary',
  626. 'bam_files_xcom_key':None})
  627. ## PIPELINE
  628. run_picard_rna_summary >> run_picard_gc_summary
  629. ## TASK
  630. picard_base_dist_summary = \
  631. PythonOperator(
  632. task_id='run_picard_gc_summary',
  633. dag=dag,
  634. queue='hpc_4G',
  635. python_callable=run_picard_for_cellranger,
  636. params={'xcom_pull_files_key':'cellranger_output',
  637. 'xcom_pull_task':'run_cellranger',
  638. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  639. 'analysis_description_xcom_key':'analysis_description',
  640. 'use_ephemeral_space':True,
  641. 'load_metrics_to_cram':True,
  642. 'output_prefix':None,
  643. 'java_param':'-Xmx4g',
  644. 'picard_command':'CollectBaseDistributionByCycle',
  645. 'picard_option':{},
  646. 'analysis_files_xcom_key':'picard_base_summary',
  647. 'bam_files_xcom_key':None})
  648. ## PIPELINE
  649. run_picard_gc_summary >> picard_base_dist_summary
  650. ## TASK
  651. run_samtools_stats = \
  652. PythonOperator(
  653. task_id='run_samtools_stats',
  654. dag=dag,
  655. queue='hpc_4G',
  656. python_callable=None)
  657. ## PIPELINE
  658. run_picard_gc_summary >> run_samtools_stats
  659. ## TASK
  660. run_samtools_idxstats = \
  661. PythonOperator(
  662. task_id='run_samtools_idxstats',
  663. dag=dag,
  664. queue='hpc_4G',
  665. python_callable=None)
  666. ## PIPELINE
  667. run_samtools_stats >> run_samtools_idxstats
  668. ## TASK
  669. run_multiqc = \
  670. PythonOperator(
  671. task_id='run_multiqc',
  672. dag=dag,
  673. queue='hpc_4G',
  674. python_callable=None)
  675. ## PIPELINE
  676. run_samtools_idxstats >> run_multiqc
  677. ## TASK
  678. upload_multiqc_to_ftp = \
  679. PythonOperator(
  680. task_id='upload_multiqc_to_ftp',
  681. dag=dag,
  682. queue='hpc_4G',
  683. python_callable=None)
  684. ## PIPELINE
  685. run_multiqc >> upload_multiqc_to_ftp
  686. ## TASK
  687. upload_multiqc_to_box = \
  688. PythonOperator(
  689. task_id='upload_multiqc_to_box',
  690. dag=dag,
  691. queue='hpc_4G',
  692. python_callable=None)
  693. ## PIPELINE
  694. run_multiqc >> upload_multiqc_to_box
  695. ## TASK
  696. update_analysis_and_status = \
  697. PythonOperator(
  698. task_id='update_analysis_and_status',
  699. dag=dag,
  700. queue='hpc_4G',
  701. python_callable=None,
  702. trigger_rule='none_failed_or_skipped')
  703. ## PIPELINE
  704. upload_multiqc_to_ftp >> update_analysis_and_status
  705. upload_scanpy_report_for_sc_5p_to_ftp >> update_analysis_and_status
  706. upload_scanpy_report_for_sc_5p_to_box >> update_analysis_and_status
  707. upload_cellbrowser_for_sc_5p_to_ftp >> update_analysis_and_status
  708. upload_scirpy_report_for_vdj_to_ftp >> update_analysis_and_status
  709. upload_scirpy_report_for_vdj_to_box >> update_analysis_and_status
  710. upload_scirpy_report_for_vdj_b_to_ftp >> update_analysis_and_status
  711. upload_scirpy_report_for_vdj_b_to_box >> update_analysis_and_status
  712. upload_scirpy_report_for_vdj_t_to_ftp >> update_analysis_and_status
  713. upload_scirpy_report_for_vdj_t_to_box >> update_analysis_and_status
  714. upload_seurat_report_for_sc_5p_ftp >> update_analysis_and_status
  715. upload_seurat_report_for_sc_5p_to_box >> update_analysis_and_status
  716. upload_cellranger_results_to_irods >> update_analysis_and_status
  717. upload_cellranger_report_to_ftp >> update_analysis_and_status
  718. upload_cellranger_report_to_box >> update_analysis_and_status
  719. upload_cram_to_irods >> update_analysis_and_status