dag9_tenx_single_cell_immune_profiling.py 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970
  1. from datetime import timedelta
  2. import os,json,logging,subprocess
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dummy_operator import DummyOperator
  8. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import fetch_analysis_info_and_branch_func
  9. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import configure_cellranger_run_func
  10. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_sc_read_trimmming_func
  11. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_cellranger_tool
  12. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import decide_analysis_branch_func
  13. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_cellranger_result_to_db_func
  14. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import ftp_files_upload_for_analysis
  15. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import irods_files_upload_for_analysis
  16. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_singlecell_notebook_wrapper_func
  17. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_analysis_files_func
  18. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import task_branch_function
  19. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import upload_analysis_file_to_box
  20. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import convert_bam_to_cram_func
  21. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_picard_for_cellranger
  22. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_samtools_for_cellranger
  23. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_multiqc_for_cellranger
  24. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import index_and_copy_bam_for_parallel_analysis
  25. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import change_pipeline_status
  26. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import clean_up_files
  27. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import create_and_update_qc_pages
  28. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_cellranger_metrices_to_collection
  29. ## ARGS
  30. default_args = {
  31. 'owner': 'airflow',
  32. 'depends_on_past': False,
  33. 'start_date': days_ago(2),
  34. 'email_on_failure': False,
  35. 'email_on_retry': False,
  36. 'retries': 4,
  37. 'max_active_runs':10,
  38. 'catchup':True,
  39. 'retry_delay': timedelta(minutes=5),
  40. 'provide_context': True,
  41. }
  42. FEATURE_TYPE_LIST = \
  43. Variable.get('tenx_single_cell_immune_profiling_feature_types',default_var={})#.split(',')
  44. ## DAG
  45. dag = \
  46. DAG(
  47. dag_id='dag9_tenx_single_cell_immune_profiling',
  48. schedule_interval=None,
  49. tags=['hpc','analysis','tenx','sc'],
  50. default_args=default_args,
  51. concurrency=100,
  52. max_active_runs=20,
  53. orientation='LR')
  54. with dag:
  55. ## TASK
  56. fetch_analysis_info_and_branch = \
  57. BranchPythonOperator(
  58. task_id='fetch_analysis_info',
  59. dag=dag,
  60. queue='hpc_4G',
  61. params={'no_analysis_task':'no_analysis',
  62. 'analysis_description_xcom_key':'analysis_description',
  63. 'analysis_info_xcom_key':'analysis_info'},
  64. python_callable=fetch_analysis_info_and_branch_func)
  65. ## TASK
  66. configure_cellranger_run = \
  67. PythonOperator(
  68. task_id='configure_cellranger_run',
  69. dag=dag,
  70. queue='hpc_4G',
  71. trigger_rule='none_failed_or_skipped',
  72. params={'xcom_pull_task_id':'fetch_analysis_info',
  73. 'analysis_description_xcom_key':'analysis_description',
  74. 'analysis_info_xcom_key':'analysis_info',
  75. 'library_csv_xcom_key':'cellranger_library_csv'},
  76. python_callable=configure_cellranger_run_func)
  77. for analysis_name in FEATURE_TYPE_LIST.keys():
  78. ## TASK
  79. task_branch = \
  80. BranchPythonOperator(
  81. task_id=analysis_name,
  82. dag=dag,
  83. queue='hpc_4G',
  84. params={'xcom_pull_task_id':'fetch_analysis_info',
  85. 'analysis_info_xcom_key':'analysis_info',
  86. 'analysis_name':analysis_name,
  87. 'task_prefix':'run_trim'},
  88. python_callable=task_branch_function)
  89. run_trim_list = list()
  90. for run_id in range(0,10):
  91. ## TASK
  92. t = \
  93. PythonOperator(
  94. task_id='run_trim_{0}_{1}'.format(analysis_name,run_id),
  95. dag=dag,
  96. queue='hpc_4G',
  97. params={'xcom_pull_task_id':'fetch_analysis_info',
  98. 'analysis_info_xcom_key':'analysis_info',
  99. 'analysis_description_xcom_key':'analysis_description',
  100. 'analysis_name':analysis_name,
  101. 'run_id':run_id,
  102. 'r1-length':0,
  103. 'r2-length':0,
  104. 'fastq_input_dir_tag':'fastq_dir',
  105. 'use_ephemeral_space':True,
  106. 'fastq_output_dir_tag':'output_path'},
  107. python_callable=run_sc_read_trimmming_func)
  108. run_trim_list.append(t)
  109. ## TASK
  110. collect_trimmed_files = \
  111. DummyOperator(
  112. task_id='collect_trimmed_files_{0}'.format(analysis_name),
  113. trigger_rule='none_failed_or_skipped',
  114. dag=dag)
  115. ## PIPELINE
  116. fetch_analysis_info_and_branch >> task_branch
  117. task_branch >> run_trim_list
  118. run_trim_list >> collect_trimmed_files
  119. collect_trimmed_files >> configure_cellranger_run
  120. ## TASK
  121. no_analysis = \
  122. DummyOperator(
  123. task_id='no_analysis',
  124. dag=dag)
  125. ## PIPELINE
  126. fetch_analysis_info_and_branch >> no_analysis
  127. ## TASK
  128. run_cellranger = \
  129. PythonOperator(
  130. task_id='run_cellranger',
  131. dag=dag,
  132. queue='hpc_64G16t24hr',
  133. params={'analysis_description_xcom_pull_task':'fetch_analysis_info',
  134. 'analysis_description_xcom_key':'analysis_description',
  135. 'library_csv_xcom_key':'cellranger_library_csv',
  136. 'library_csv_xcom_pull_task':'configure_cellranger_run',
  137. 'cellranger_xcom_key':'cellranger_output',
  138. 'cellranger_options':['--localcores 16','--localmem 64']},
  139. python_callable=run_cellranger_tool)
  140. ## PIPELINE
  141. configure_cellranger_run >> run_cellranger
  142. ## TASK
  143. decide_analysis_branch = \
  144. BranchPythonOperator(
  145. task_id='decide_analysis_branch',
  146. dag=dag,
  147. queue='hpc_4G',
  148. python_callable=decide_analysis_branch_func,
  149. params={'load_cellranger_result_to_db_task':'load_cellranger_result_to_db',
  150. 'run_scanpy_for_sc_5p_task':'run_scanpy_for_sc_5p',
  151. 'run_scirpy_for_vdj_task':'run_scirpy_for_vdj',
  152. 'run_scirpy_for_vdj_b_task':'run_scirpy_for_vdj_b',
  153. 'run_scirpy_vdj_t_task':'run_scirpy_for_vdj_t',
  154. 'run_seurat_for_sc_5p_task':'run_seurat_for_sc_5p',
  155. 'convert_cellranger_bam_to_cram_task':'convert_cellranger_bam_to_cram',
  156. 'library_csv_xcom_key':'cellranger_library_csv',
  157. 'library_csv_xcom_pull_task':'configure_cellranger_run'})
  158. ## PIPELINE
  159. run_cellranger >> decide_analysis_branch
  160. ## TASK
  161. load_cellranger_result_to_db = \
  162. PythonOperator(
  163. task_id='load_cellranger_result_to_db',
  164. dag=dag,
  165. queue='hpc_4G',
  166. python_callable=load_cellranger_result_to_db_func,
  167. params={'analysis_description_xcom_pull_task':'fetch_analysis_info',
  168. 'analysis_description_xcom_key':'analysis_description',
  169. 'cellranger_xcom_key':'cellranger_output',
  170. 'cellranger_xcom_pull_task':'run_cellranger',
  171. 'collection_type':'CELLRANGER_MULTI',
  172. 'html_collection_type':'CELLRANGER_HTML',
  173. 'collection_table':'sample',
  174. 'xcom_collection_name_key':'sample_igf_id',
  175. 'genome_column':'genome_build',
  176. 'analysis_name':'cellranger_multi',
  177. 'output_xcom_key':'loaded_output_files',
  178. 'html_xcom_key':'html_report_file',
  179. 'html_report_file_name':'web_summary.html'})
  180. upload_cellranger_report_to_ftp = \
  181. PythonOperator(
  182. task_id='upload_cellranger_report_to_ftp',
  183. dag=dag,
  184. queue='hpc_4G',
  185. python_callable=ftp_files_upload_for_analysis,
  186. params={'xcom_pull_task':'load_cellranger_result_to_db',
  187. 'xcom_pull_files_key':'html_report_file',
  188. 'collection_name_task':'load_cellranger_result_to_db',
  189. 'collection_name_key':'sample_igf_id',
  190. 'collection_type':'FTP_CELLRANGER_HTML',
  191. 'collection_table':'sample',
  192. 'collect_remote_file':True})
  193. upload_cellranger_report_to_box = \
  194. PythonOperator(
  195. task_id='upload_cellranger_report_to_box',
  196. dag=dag,
  197. queue='hpc_4G',
  198. python_callable=upload_analysis_file_to_box,
  199. params={'xcom_pull_task':'load_cellranger_result_to_db',
  200. 'xcom_pull_files_key':'html_report_file',
  201. 'analysis_tag':'cellranger_multi'})
  202. upload_cellranger_results_to_irods = \
  203. PythonOperator(
  204. task_id='upload_cellranger_results_to_irods',
  205. dag=dag,
  206. queue='hpc_4G',
  207. python_callable=irods_files_upload_for_analysis,
  208. params={'xcom_pull_task':'load_cellranger_result_to_db',
  209. 'xcom_pull_files_key':'loaded_output_files',
  210. 'collection_name_key':'sample_igf_id',
  211. 'collection_name_task':'load_cellranger_result_to_db',
  212. 'analysis_name':'cellranger_multi'})
  213. ## PIPELINE
  214. decide_analysis_branch >> load_cellranger_result_to_db
  215. load_cellranger_result_to_db >> upload_cellranger_report_to_ftp
  216. load_cellranger_result_to_db >> upload_cellranger_report_to_box
  217. load_cellranger_result_to_db >> upload_cellranger_results_to_irods
  218. ## TASK
  219. run_scanpy_for_sc_5p = \
  220. PythonOperator(
  221. task_id='run_scanpy_for_sc_5p',
  222. dag=dag,
  223. queue='hpc_4G',
  224. python_callable=run_singlecell_notebook_wrapper_func,
  225. params={'cellranger_xcom_key':'cellranger_output',
  226. 'cellranger_xcom_pull_task':'run_cellranger',
  227. 'scanpy_timeout':1200,
  228. 'allow_errors':False,
  229. 'kernel_name':'python3',
  230. 'count_dir':'count',
  231. 'analysis_name':'scanpy',
  232. 'output_notebook_key':'scanpy_notebook',
  233. 'output_cellbrowser_key':'cellbrowser_dirs',
  234. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  235. 'analysis_description_xcom_key':'analysis_description'})
  236. load_cellranger_gex_matrics_to_db = \
  237. PythonOperator(
  238. task_id='load_cellranger_gex_matrics_to_db',
  239. dag=dag,
  240. queue='hpc_4G',
  241. python_callable=load_cellranger_metrices_to_collection,
  242. params={'cellranger_xcom_key':'cellranger_output',
  243. 'cellranger_xcom_pull_task':'run_cellranger',
  244. 'collection_type':'CELLRANGER_MULTI',
  245. 'collection_name_task':'load_cellranger_result_to_db',
  246. 'collection_name_key':'sample_igf_id',
  247. 'metrics_summary_file':'count/metrics_summary.csv',
  248. 'attribute_prefix':'CELLRANGER_COUNT'})
  249. load_scanpy_report_for_sc_5p_to_db = \
  250. PythonOperator(
  251. task_id='load_scanpy_report_for_sc_5p_to_db',
  252. dag=dag,
  253. queue='hpc_4G',
  254. python_callable=load_analysis_files_func,
  255. params={'collection_name_task':'load_cellranger_result_to_db',
  256. 'collection_name_key':'sample_igf_id',
  257. 'file_name_task':'run_scanpy_for_sc_5p',
  258. 'file_name_key':'scanpy_notebook',
  259. 'analysis_name':'scanpy_5p',
  260. 'collection_type':'SCANPY_HTML',
  261. 'collection_table':'sample',
  262. 'output_files_key':'output_db_files'})
  263. upload_scanpy_report_for_sc_5p_to_ftp = \
  264. PythonOperator(
  265. task_id='upload_scanpy_report_for_sc_5p_to_ftp',
  266. dag=dag,
  267. queue='hpc_4G',
  268. python_callable=ftp_files_upload_for_analysis,
  269. params={'xcom_pull_task':'load_scanpy_report_for_sc_5p_to_db',
  270. 'xcom_pull_files_key':'output_db_files',
  271. 'collection_name_task':'load_cellranger_result_to_db',
  272. 'collection_name_key':'sample_igf_id',
  273. 'collection_type':'FTP_SCANPY_HTML',
  274. 'collection_table':'sample',
  275. 'collect_remote_file':True})
  276. upload_scanpy_report_for_sc_5p_to_box = \
  277. PythonOperator(
  278. task_id='upload_scanpy_report_for_sc_5p_to_box',
  279. dag=dag,
  280. queue='hpc_4G',
  281. python_callable=upload_analysis_file_to_box,
  282. params={'xcom_pull_task':'load_scanpy_report_for_sc_5p_to_db',
  283. 'xcom_pull_files_key':'output_db_files',
  284. 'analysis_tag':'scanpy_single_sample_report'})
  285. upload_cellbrowser_for_sc_5p_to_ftp = \
  286. PythonOperator(
  287. task_id='upload_cellbrowser_for_sc_5p_to_ftp',
  288. dag=dag,
  289. queue='hpc_4G',
  290. python_callable=ftp_files_upload_for_analysis,
  291. params={'xcom_pull_task':'run_scanpy_for_sc_5p',
  292. 'xcom_pull_files_key':'cellbrowser_dirs',
  293. 'collection_name_task':'load_cellranger_result_to_db',
  294. 'collection_name_key':'sample_igf_id',
  295. 'collection_type':'FTP_CELLBROWSER',
  296. 'collection_table':'sample',
  297. 'collect_remote_file':True})
  298. ## PIPELINE
  299. decide_analysis_branch >> run_scanpy_for_sc_5p
  300. run_scanpy_for_sc_5p >> load_scanpy_report_for_sc_5p_to_db
  301. run_scanpy_for_sc_5p >> load_cellranger_gex_matrics_to_db
  302. load_scanpy_report_for_sc_5p_to_db >> upload_scanpy_report_for_sc_5p_to_ftp
  303. load_scanpy_report_for_sc_5p_to_db >> upload_scanpy_report_for_sc_5p_to_box
  304. run_scanpy_for_sc_5p >> upload_cellbrowser_for_sc_5p_to_ftp
  305. ## TASK
  306. run_scirpy_for_vdj_b = \
  307. PythonOperator(
  308. task_id='run_scirpy_for_vdj_b',
  309. dag=dag,
  310. queue='hpc_4G',
  311. python_callable=run_singlecell_notebook_wrapper_func,
  312. params={'cellranger_xcom_key':'cellranger_output',
  313. 'cellranger_xcom_pull_task':'run_cellranger',
  314. 'scanpy_timeout':1200,
  315. 'allow_errors':False,
  316. 'kernel_name':'python3',
  317. 'analysis_name':'scirpy',
  318. 'vdj_dir':'vdj_b',
  319. 'count_dir':'count',
  320. 'output_notebook_key':'scirpy_notebook',
  321. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  322. 'analysis_description_xcom_key':'analysis_description'})
  323. load_cellranger_vdjB_matrics_to_db = \
  324. PythonOperator(
  325. task_id='load_cellranger_vdjB_matrics_to_db',
  326. dag=dag,
  327. queue='hpc_4G',
  328. python_callable=load_cellranger_metrices_to_collection,
  329. params={'cellranger_xcom_key':'cellranger_output',
  330. 'cellranger_xcom_pull_task':'run_cellranger',
  331. 'collection_type':'CELLRANGER_MULTI',
  332. 'collection_name_task':'load_cellranger_result_to_db',
  333. 'collection_name_key':'sample_igf_id',
  334. 'metrics_summary_file':'vdj_b/metrics_summary.csv',
  335. 'attribute_prefix':'CELLRANGER_VDJB'})
  336. load_scirpy_report_for_vdj_b_to_db = \
  337. PythonOperator(
  338. task_id='load_scirpy_report_for_vdj_b_to_db',
  339. dag=dag,
  340. queue='hpc_4G',
  341. python_callable=load_analysis_files_func,
  342. params={'collection_name_task':'load_cellranger_result_to_db',
  343. 'collection_name_key':'sample_igf_id',
  344. 'file_name_task':'run_scirpy_for_vdj_b',
  345. 'file_name_key':'scirpy_notebook',
  346. 'analysis_name':'scirpy_vdj_b',
  347. 'collection_type':'SCIRPY_VDJ_B_HTML',
  348. 'collection_table':'sample',
  349. 'output_files_key':'output_db_files'})
  350. upload_scirpy_report_for_vdj_b_to_ftp = \
  351. PythonOperator(
  352. task_id='upload_scirpy_report_for_vdj_b_to_ftp',
  353. dag=dag,
  354. queue='hpc_4G',
  355. python_callable=ftp_files_upload_for_analysis,
  356. params={'xcom_pull_task':'load_scirpy_report_for_vdj_b_to_db',
  357. 'xcom_pull_files_key':'output_db_files',
  358. 'collection_name_task':'load_cellranger_result_to_db',
  359. 'collection_name_key':'sample_igf_id',
  360. 'collection_type':'FTP_SCIRPY_VDJ_B_HTML',
  361. 'collection_table':'sample',
  362. 'collect_remote_file':True})
  363. upload_scirpy_report_for_vdj_b_to_box = \
  364. PythonOperator(
  365. task_id='upload_scanpy_report_for_vdj_b_to_box',
  366. dag=dag,
  367. queue='hpc_4G',
  368. python_callable=upload_analysis_file_to_box,
  369. params={'xcom_pull_task':'load_scirpy_report_for_vdj_b_to_db',
  370. 'xcom_pull_files_key':'output_db_files',
  371. 'analysis_tag':'scirpy_vdj_b_single_sample_report'})
  372. ## PIPELINE
  373. decide_analysis_branch >> run_scirpy_for_vdj_b
  374. run_scirpy_for_vdj_b >> load_scirpy_report_for_vdj_b_to_db
  375. run_scirpy_for_vdj_b >> load_cellranger_vdjB_matrics_to_db
  376. load_scirpy_report_for_vdj_b_to_db >> upload_scirpy_report_for_vdj_b_to_ftp
  377. load_scirpy_report_for_vdj_b_to_db >> upload_scirpy_report_for_vdj_b_to_box
  378. ## TASK
  379. run_scirpy_for_vdj_t = \
  380. PythonOperator(
  381. task_id='run_scirpy_for_vdj_t',
  382. dag=dag,
  383. queue='hpc_4G',
  384. python_callable=run_singlecell_notebook_wrapper_func,
  385. params={'cellranger_xcom_key':'cellranger_output',
  386. 'cellranger_xcom_pull_task':'run_cellranger',
  387. 'scanpy_timeout':1200,
  388. 'allow_errors':False,
  389. 'kernel_name':'python3',
  390. 'analysis_name':'scirpy',
  391. 'vdj_dir':'vdj_t',
  392. 'count_dir':'count',
  393. 'output_notebook_key':'scirpy_notebook',
  394. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  395. 'analysis_description_xcom_key':'analysis_description'})
  396. load_cellranger_vdjT_matrics_to_db = \
  397. PythonOperator(
  398. task_id='load_cellranger_vdjT_matrics_to_db',
  399. dag=dag,
  400. queue='hpc_4G',
  401. python_callable=load_cellranger_metrices_to_collection,
  402. params={'cellranger_xcom_key':'cellranger_output',
  403. 'cellranger_xcom_pull_task':'run_cellranger',
  404. 'collection_type':'CELLRANGER_MULTI',
  405. 'collection_name_task':'load_cellranger_result_to_db',
  406. 'collection_name_key':'sample_igf_id',
  407. 'metrics_summary_file':'vdj_t/metrics_summary.csv',
  408. 'attribute_prefix':'CELLRANGER_VDJT'})
  409. load_scirpy_report_for_vdj_t_to_db = \
  410. PythonOperator(
  411. task_id='load_scirpy_report_for_vdj_t_to_db',
  412. dag=dag,
  413. queue='hpc_4G',
  414. python_callable=load_analysis_files_func,
  415. params={'collection_name_task':'load_cellranger_result_to_db',
  416. 'collection_name_key':'sample_igf_id',
  417. 'file_name_task':'run_scirpy_for_vdj_t',
  418. 'file_name_key':'scirpy_notebook',
  419. 'analysis_name':'scirpy_vdj_t',
  420. 'collection_type':'SCIRPY_VDJ_T_HTML',
  421. 'collection_table':'sample',
  422. 'output_files_key':'output_db_files'})
  423. upload_scirpy_report_for_vdj_t_to_ftp = \
  424. PythonOperator(
  425. task_id='upload_scirpy_report_for_vdj_t_to_ftp',
  426. dag=dag,
  427. queue='hpc_4G',
  428. python_callable=ftp_files_upload_for_analysis,
  429. params={'xcom_pull_task':'load_scirpy_report_for_vdj_t_to_db',
  430. 'xcom_pull_files_key':'output_db_files',
  431. 'collection_name_task':'load_cellranger_result_to_db',
  432. 'collection_name_key':'sample_igf_id',
  433. 'collection_type':'FTP_SCIRPY_VDJ_T_HTML',
  434. 'collection_table':'sample',
  435. 'collect_remote_file':True})
  436. upload_scirpy_report_for_vdj_t_to_box = \
  437. PythonOperator(
  438. task_id='upload_scirpy_report_for_vdj_t_to_box',
  439. dag=dag,
  440. queue='hpc_4G',
  441. python_callable=upload_analysis_file_to_box,
  442. params={'xcom_pull_task':'load_scirpy_report_for_vdj_t_to_db',
  443. 'xcom_pull_files_key':'output_db_files',
  444. 'analysis_tag':'scirpy_vdj_t_single_sample_report'})
  445. ## PIPELINE
  446. decide_analysis_branch >> run_scirpy_for_vdj_t
  447. run_scirpy_for_vdj_t >> load_scirpy_report_for_vdj_t_to_db
  448. run_scirpy_for_vdj_t >> load_cellranger_vdjT_matrics_to_db
  449. load_scirpy_report_for_vdj_t_to_db >> upload_scirpy_report_for_vdj_t_to_ftp
  450. load_scirpy_report_for_vdj_t_to_db >> upload_scirpy_report_for_vdj_t_to_box
  451. ## TASK
  452. run_seurat_for_sc_5p = \
  453. PythonOperator(
  454. task_id='run_seurat_for_sc_5p',
  455. dag=dag,
  456. queue='hpc_4G',
  457. python_callable=run_singlecell_notebook_wrapper_func,
  458. params={'cellranger_xcom_key':'cellranger_output',
  459. 'cellranger_xcom_pull_task':'run_cellranger',
  460. 'scanpy_timeout':1200,
  461. 'allow_errors':False,
  462. 'kernel_name':'ir',
  463. 'analysis_name':'seurat',
  464. 'vdj_dir':'vdj',
  465. 'count_dir':'count',
  466. 'output_notebook_key':'seurat_notebook',
  467. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  468. 'analysis_description_xcom_key':'analysis_description'})
  469. load_seurat_report_for_sc_5p_db = \
  470. PythonOperator(
  471. task_id='load_seurat_report_for_sc_5p_db',
  472. dag=dag,
  473. queue='hpc_4G',
  474. python_callable=load_analysis_files_func,
  475. params={'collection_name_task':'load_cellranger_result_to_db',
  476. 'collection_name_key':'sample_igf_id',
  477. 'file_name_task':'run_seurat_for_sc_5p',
  478. 'file_name_key':'seurat_notebook',
  479. 'analysis_name':'seurat_5p',
  480. 'collection_type':'SEURAT_HTML',
  481. 'collection_table':'sample',
  482. 'output_files_key':'output_db_files'})
  483. upload_seurat_report_for_sc_5p_ftp = \
  484. PythonOperator(
  485. task_id='upload_seurat_report_for_sc_5p_ftp',
  486. dag=dag,
  487. queue='hpc_4G',
  488. python_callable=ftp_files_upload_for_analysis,
  489. params={'xcom_pull_task':'load_seurat_report_for_sc_5p_db',
  490. 'xcom_pull_files_key':'output_db_files',
  491. 'collection_name_task':'load_cellranger_result_to_db',
  492. 'collection_name_key':'sample_igf_id',
  493. 'collection_type':'FTP_SEURAT_HTML',
  494. 'collection_table':'sample',
  495. 'collect_remote_file':True})
  496. upload_seurat_report_for_sc_5p_to_box = \
  497. PythonOperator(
  498. task_id='upload_seurat_report_for_sc_5p_to_box',
  499. dag=dag,
  500. queue='hpc_4G',
  501. python_callable=upload_analysis_file_to_box,
  502. params={'xcom_pull_task':'load_seurat_report_for_sc_5p_db',
  503. 'xcom_pull_files_key':'output_db_files',
  504. 'analysis_tag':'seurat_single_sample_report'})
  505. ## PIPELINE
  506. decide_analysis_branch >> run_seurat_for_sc_5p
  507. run_seurat_for_sc_5p >> load_seurat_report_for_sc_5p_db
  508. load_seurat_report_for_sc_5p_db >> upload_seurat_report_for_sc_5p_ftp
  509. load_seurat_report_for_sc_5p_db >> upload_seurat_report_for_sc_5p_to_box
  510. ## TASK
  511. convert_cellranger_bam_to_cram = \
  512. PythonOperator(
  513. task_id='convert_cellranger_bam_to_cram',
  514. dag=dag,
  515. queue='hpc_4G4t',
  516. python_callable=convert_bam_to_cram_func,
  517. params={'xcom_pull_files_key':'cellranger_output',
  518. 'xcom_pull_task':'run_cellranger',
  519. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  520. 'analysis_description_xcom_key':'analysis_description',
  521. 'use_ephemeral_space':True,
  522. 'threads':4,
  523. 'analysis_name':'cellranger',
  524. 'collection_type':'ANALYSIS_CRAM',
  525. 'collection_table':'sample',
  526. 'cram_files_xcom_key':'cram_files'})
  527. ## TASK
  528. copy_bam_for_parallel_runs = \
  529. BranchPythonOperator(
  530. task_id='copy_bam_for_parallel_runs',
  531. dag=dag,
  532. queue='hpc_4G',
  533. python_callable=index_and_copy_bam_for_parallel_analysis,
  534. params={'xcom_pull_files_key':'cellranger_output',
  535. 'xcom_pull_task':'run_cellranger',
  536. 'list_of_tasks':[
  537. 'run_picard_alignment_summary',
  538. 'run_picard_qual_summary',
  539. 'run_picard_rna_summary',
  540. 'run_picard_gc_summary',
  541. 'run_picard_base_dist_summary',
  542. 'run_samtools_stats']})
  543. ## TASK
  544. upload_cram_to_irods = \
  545. PythonOperator(
  546. task_id='upload_cram_to_irods',
  547. dag=dag,
  548. queue='hpc_4G',
  549. python_callable=irods_files_upload_for_analysis,
  550. params={'xcom_pull_task':'convert_cellranger_bam_to_cram',
  551. 'xcom_pull_files_key':'cram_files',
  552. 'collection_name_key':'sample_igf_id',
  553. 'collection_name_task':'load_cellranger_result_to_db',
  554. 'analysis_name':'cellranger_multi'})
  555. ## PIPELINE
  556. decide_analysis_branch >> convert_cellranger_bam_to_cram
  557. convert_cellranger_bam_to_cram >> copy_bam_for_parallel_runs # we need to load metrics to cram
  558. convert_cellranger_bam_to_cram >> upload_cram_to_irods
  559. ## TASK
  560. run_picard_alignment_summary = \
  561. PythonOperator(
  562. task_id='run_picard_alignment_summary',
  563. dag=dag,
  564. queue='hpc_4G',
  565. python_callable=run_picard_for_cellranger,
  566. params={'xcom_pull_files_key':'run_picard_alignment_summary',
  567. 'xcom_pull_task':'copy_bam_for_parallel_runs',
  568. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  569. 'analysis_description_xcom_key':'analysis_description',
  570. 'use_ephemeral_space':True,
  571. 'load_metrics_to_cram':True,
  572. 'java_param':'-Xmx4g',
  573. 'picard_command':'CollectAlignmentSummaryMetrics',
  574. 'picard_option':{},
  575. 'analysis_files_xcom_key':'picard_alignment_summary',
  576. 'bam_files_xcom_key':None})
  577. ## PIPELINE
  578. copy_bam_for_parallel_runs >> run_picard_alignment_summary
  579. ## TASK
  580. cleanup_picard_alignment_summary_input = \
  581. PythonOperator(
  582. task_id='cleanup_picard_alignment_summary_input',
  583. dag=dag,
  584. queue='hpc_4G',
  585. python_callable=clean_up_files,
  586. params={'xcom_pull_files_key':'run_picard_alignment_summary',
  587. 'xcom_pull_task':'copy_bam_for_parallel_runs'})
  588. ## PIPELINE
  589. run_picard_alignment_summary >> cleanup_picard_alignment_summary_input
  590. ## TASK
  591. upload_picard_alignment_summary_to_box = \
  592. PythonOperator(
  593. task_id='upload_picard_alignment_summary_to_box',
  594. dag=dag,
  595. queue='hpc_4G',
  596. python_callable=upload_analysis_file_to_box,
  597. params={'xcom_pull_task':'run_picard_alignment_summary',
  598. 'xcom_pull_files_key':'picard_alignment_summary',
  599. 'analysis_tag':'Picard-CollectAlignmentSummaryMetrics'})
  600. ## PIPELINE
  601. run_picard_alignment_summary >> upload_picard_alignment_summary_to_box
  602. ## TASK
  603. run_picard_qual_summary = \
  604. PythonOperator(
  605. task_id='run_picard_qual_summary',
  606. dag=dag,
  607. queue='hpc_4G',
  608. python_callable=run_picard_for_cellranger,
  609. params={'xcom_pull_files_key':'run_picard_qual_summary',
  610. 'xcom_pull_task':'copy_bam_for_parallel_runs',
  611. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  612. 'analysis_description_xcom_key':'analysis_description',
  613. 'use_ephemeral_space':True,
  614. 'load_metrics_to_cram':True,
  615. 'java_param':'-Xmx4g',
  616. 'picard_command':'QualityScoreDistribution',
  617. 'picard_option':{},
  618. 'analysis_files_xcom_key':'picard_qual_summary',
  619. 'bam_files_xcom_key':None})
  620. ## PIPELINE
  621. copy_bam_for_parallel_runs >> run_picard_qual_summary
  622. ## TASK
  623. cleanup_picard_qual_summary_input = \
  624. PythonOperator(
  625. task_id='cleanup_picard_qual_summary_input',
  626. dag=dag,
  627. queue='hpc_4G',
  628. python_callable=clean_up_files,
  629. params={'xcom_pull_files_key':'run_picard_qual_summary',
  630. 'xcom_pull_task':'copy_bam_for_parallel_runs'})
  631. ## PIPELINE
  632. run_picard_qual_summary >> cleanup_picard_qual_summary_input
  633. ## TASK
  634. upload_picard_qual_summary_to_box = \
  635. PythonOperator(
  636. task_id='upload_picard_qual_summary_to_box',
  637. dag=dag,
  638. queue='hpc_4G',
  639. python_callable=upload_analysis_file_to_box,
  640. params={'xcom_pull_task':'run_picard_qual_summary',
  641. 'xcom_pull_files_key':'picard_qual_summary',
  642. 'analysis_tag':'Picard-QualityScoreDistribution'})
  643. ## PIPELINE
  644. run_picard_qual_summary >> upload_picard_qual_summary_to_box
  645. ## TASK
  646. run_picard_rna_summary = \
  647. PythonOperator(
  648. task_id='run_picard_rna_summary',
  649. dag=dag,
  650. queue='hpc_8G',
  651. python_callable=run_picard_for_cellranger,
  652. params={'xcom_pull_files_key':'run_picard_rna_summary',
  653. 'xcom_pull_task':'copy_bam_for_parallel_runs',
  654. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  655. 'analysis_description_xcom_key':'analysis_description',
  656. 'use_ephemeral_space':True,
  657. 'load_metrics_to_cram':True,
  658. 'java_param':'-Xmx7g',
  659. 'picard_command':'CollectRnaSeqMetrics',
  660. 'picard_option':{},
  661. 'analysis_files_xcom_key':'picard_rna_summary',
  662. 'bam_files_xcom_key':None})
  663. ## PIPELINE
  664. copy_bam_for_parallel_runs >> run_picard_rna_summary
  665. ## TASK
  666. cleanup_picard_rna_summary_input = \
  667. PythonOperator(
  668. task_id='cleanup_picard_rna_summary_input',
  669. dag=dag,
  670. queue='hpc_4G',
  671. python_callable=clean_up_files,
  672. params={'xcom_pull_files_key':'run_picard_rna_summary',
  673. 'xcom_pull_task':'copy_bam_for_parallel_runs'})
  674. ## PIPELINE
  675. run_picard_rna_summary >> cleanup_picard_rna_summary_input
  676. ## TASK
  677. upload_picard_rna_summary_to_box = \
  678. PythonOperator(
  679. task_id='upload_picard_rna_summary_to_box',
  680. dag=dag,
  681. queue='hpc_4G',
  682. python_callable=upload_analysis_file_to_box,
  683. params={'xcom_pull_task':'run_picard_rna_summary',
  684. 'xcom_pull_files_key':'picard_rna_summary',
  685. 'analysis_tag':'Picard-CollectRnaSeqMetrics'})
  686. ## PIPELINE
  687. run_picard_rna_summary >> upload_picard_rna_summary_to_box
  688. ## TASK
  689. run_picard_gc_summary = \
  690. PythonOperator(
  691. task_id='run_picard_gc_summary',
  692. dag=dag,
  693. queue='hpc_4G',
  694. python_callable=run_picard_for_cellranger,
  695. params={'xcom_pull_files_key':'run_picard_gc_summary',
  696. 'xcom_pull_task':'copy_bam_for_parallel_runs',
  697. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  698. 'analysis_description_xcom_key':'analysis_description',
  699. 'use_ephemeral_space':True,
  700. 'load_metrics_to_cram':True,
  701. 'java_param':'-Xmx4g',
  702. 'picard_command':'CollectGcBiasMetrics',
  703. 'picard_option':{},
  704. 'analysis_files_xcom_key':'picard_gc_summary',
  705. 'bam_files_xcom_key':None})
  706. ## PIPELINE
  707. copy_bam_for_parallel_runs >> run_picard_gc_summary
  708. ## TASK
  709. cleanup_picard_gc_summary_input = \
  710. PythonOperator(
  711. task_id='cleanup_picard_gc_summary_input',
  712. dag=dag,
  713. queue='hpc_4G',
  714. python_callable=clean_up_files,
  715. params={'xcom_pull_files_key':'run_picard_gc_summary',
  716. 'xcom_pull_task':'copy_bam_for_parallel_runs'})
  717. ## PIPELINE
  718. run_picard_gc_summary >> cleanup_picard_gc_summary_input
  719. ## TASK
  720. upload_picard_gc_summary_to_box = \
  721. PythonOperator(
  722. task_id='upload_picard_gc_summary_to_box',
  723. dag=dag,
  724. queue='hpc_4G',
  725. python_callable=upload_analysis_file_to_box,
  726. params={'xcom_pull_task':'run_picard_gc_summary',
  727. 'xcom_pull_files_key':'picard_gc_summary',
  728. 'analysis_tag':'Picard-CollectGcBiasMetrics'})
  729. ## PIPELINE
  730. run_picard_gc_summary >> upload_picard_gc_summary_to_box
  731. ## TASK
  732. run_picard_base_dist_summary = \
  733. PythonOperator(
  734. task_id='run_picard_base_dist_summary',
  735. dag=dag,
  736. queue='hpc_4G',
  737. python_callable=run_picard_for_cellranger,
  738. params={'xcom_pull_files_key':'run_picard_base_dist_summary',
  739. 'xcom_pull_task':'copy_bam_for_parallel_runs',
  740. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  741. 'analysis_description_xcom_key':'analysis_description',
  742. 'use_ephemeral_space':True,
  743. 'load_metrics_to_cram':True,
  744. 'java_param':'-Xmx4g',
  745. 'picard_command':'CollectBaseDistributionByCycle',
  746. 'picard_option':{},
  747. 'analysis_files_xcom_key':'picard_base_summary',
  748. 'bam_files_xcom_key':None})
  749. ## PIPELINE
  750. copy_bam_for_parallel_runs >> run_picard_base_dist_summary
  751. ## TASK
  752. cleanup_picard_base_dist_summary_input = \
  753. PythonOperator(
  754. task_id='cleanup_picard_base_dist_summary_input',
  755. dag=dag,
  756. queue='hpc_4G',
  757. python_callable=clean_up_files,
  758. params={'xcom_pull_files_key':'run_picard_base_dist_summary',
  759. 'xcom_pull_task':'copy_bam_for_parallel_runs'})
  760. ## PIPELINE
  761. run_picard_base_dist_summary >> cleanup_picard_base_dist_summary_input
  762. ## TASK
  763. upload_picard_base_dist_summary_to_box = \
  764. PythonOperator(
  765. task_id='upload_picard_base_dist_summary_to_box',
  766. dag=dag,
  767. queue='hpc_4G',
  768. python_callable=upload_analysis_file_to_box,
  769. params={'xcom_pull_task':'run_picard_base_dist_summary',
  770. 'xcom_pull_files_key':'picard_base_summary',
  771. 'analysis_tag':'Picard-CollectBaseDistributionByCycle'})
  772. ## PIPELINE
  773. run_picard_base_dist_summary >> upload_picard_base_dist_summary_to_box
  774. ## TASK
  775. run_samtools_stats = \
  776. PythonOperator(
  777. task_id='run_samtools_stats',
  778. dag=dag,
  779. queue='hpc_4G4t',
  780. python_callable=run_samtools_for_cellranger,
  781. params={'xcom_pull_files_key':'run_samtools_stats',
  782. 'xcom_pull_task':'copy_bam_for_parallel_runs',
  783. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  784. 'analysis_description_xcom_key':'analysis_description',
  785. 'use_ephemeral_space':True,
  786. 'load_metrics_to_cram':True,
  787. 'samtools_command':'stats',
  788. 'threads':4,
  789. 'analysis_files_xcom_key':'samtools_stats'})
  790. ## PIPELINE
  791. copy_bam_for_parallel_runs >> run_samtools_stats
  792. ## TASK
  793. upload_samtools_stats_to_box = \
  794. PythonOperator(
  795. task_id='upload_samtools_stats_to_box',
  796. dag=dag,
  797. queue='hpc_4G',
  798. python_callable=upload_analysis_file_to_box,
  799. params={'xcom_pull_task':'run_samtools_stats',
  800. 'xcom_pull_files_key':'samtools_stats',
  801. 'analysis_tag':'Samtools-stats'})
  802. ## PIPELINE
  803. run_samtools_stats >> upload_samtools_stats_to_box
  804. ## TASK
  805. run_samtools_idxstats = \
  806. PythonOperator(
  807. task_id='run_samtools_idxstats',
  808. dag=dag,
  809. queue='hpc_4G4t',
  810. python_callable=run_samtools_for_cellranger,
  811. params={'xcom_pull_files_key':'run_samtools_stats',
  812. 'xcom_pull_task':'copy_bam_for_parallel_runs',
  813. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  814. 'analysis_description_xcom_key':'analysis_description',
  815. 'use_ephemeral_space':True,
  816. 'load_metrics_to_cram':True,
  817. 'samtools_command':'idxstats',
  818. 'threads':4,
  819. 'analysis_files_xcom_key':'samtools_idxstats'})
  820. ## PIPELINE
  821. run_samtools_stats >> run_samtools_idxstats
  822. ## TASK
  823. cleanup_samtools_stats_input = \
  824. PythonOperator(
  825. task_id='cleanup_samtools_stats_input',
  826. dag=dag,
  827. queue='hpc_4G',
  828. python_callable=clean_up_files,
  829. params={'xcom_pull_files_key':'run_samtools_stats',
  830. 'xcom_pull_task':'copy_bam_for_parallel_runs'})
  831. ## PIPELINE
  832. run_samtools_idxstats >> cleanup_samtools_stats_input
  833. ## TASK
  834. upload_samtools_idxstats_to_box = \
  835. PythonOperator(
  836. task_id='upload_samtools_idxstats_to_box',
  837. dag=dag,
  838. queue='hpc_4G',
  839. python_callable=upload_analysis_file_to_box,
  840. params={'xcom_pull_task':'run_samtools_idxstats',
  841. 'xcom_pull_files_key':'samtools_idxstats',
  842. 'analysis_tag':'Samtools-idxstats'})
  843. ## PIPELINE
  844. run_samtools_idxstats >> upload_samtools_idxstats_to_box
  845. ## TASK
  846. run_multiqc = \
  847. PythonOperator(
  848. task_id='run_multiqc',
  849. dag=dag,
  850. queue='hpc_4G',
  851. trigger_rule='none_failed_or_skipped',
  852. python_callable=run_multiqc_for_cellranger,
  853. params={
  854. 'list_of_analysis_xcoms_and_tasks':{
  855. 'run_cellranger':'cellranger_output',
  856. 'run_picard_alignment_summary':'picard_alignment_summary',
  857. 'run_picard_qual_summary':'picard_qual_summary',
  858. 'run_picard_rna_summary':'picard_rna_summary',
  859. 'run_picard_gc_summary':'picard_gc_summary',
  860. 'run_picard_base_dist_summary':'picard_base_summary',
  861. 'run_samtools_stats':'samtools_stats',
  862. 'run_samtools_idxstats':'samtools_idxstats'},
  863. 'analysis_description_xcom_pull_task':'fetch_analysis_info',
  864. 'analysis_description_xcom_key':'analysis_description',
  865. 'use_ephemeral_space':True,
  866. 'multiqc_html_file_xcom_key':'multiqc_html',
  867. 'multiqc_data_file_xcom_key':'multiqc_data',
  868. 'tool_order_list':['picad','samtools']})
  869. ## PIPELINE
  870. run_picard_alignment_summary >> run_multiqc
  871. run_picard_qual_summary >> run_multiqc
  872. run_picard_rna_summary >> run_multiqc
  873. run_picard_gc_summary >> run_multiqc
  874. run_picard_base_dist_summary >> run_multiqc
  875. run_samtools_idxstats >> run_multiqc
  876. ## TASK
  877. load_multiqc_html = \
  878. PythonOperator(
  879. task_id='load_multiqc_html',
  880. dag=dag,
  881. queue='hpc_4G',
  882. python_callable=load_analysis_files_func,
  883. params={'collection_name_task':'load_cellranger_result_to_db',
  884. 'collection_name_key':'sample_igf_id',
  885. 'file_name_task':'run_multiqc',
  886. 'file_name_key':'multiqc_html',
  887. 'analysis_name':'multiqc',
  888. 'collection_type':'MULTIQC_HTML',
  889. 'collection_table':'sample',
  890. 'output_files_key':'output_db_files'})
  891. ## PIPELINE
  892. run_multiqc >> load_multiqc_html
  893. ## TASK
  894. upload_multiqc_to_ftp = \
  895. PythonOperator(
  896. task_id='upload_multiqc_to_ftp',
  897. dag=dag,
  898. queue='hpc_4G',
  899. python_callable=ftp_files_upload_for_analysis,
  900. params={'xcom_pull_task':'load_multiqc_html',
  901. 'xcom_pull_files_key':'output_db_files',
  902. 'collection_name_task':'load_cellranger_result_to_db',
  903. 'collection_name_key':'sample_igf_id',
  904. 'collection_type':'FTP_MULTIQC_HTML',
  905. 'collection_table':'sample',
  906. 'collect_remote_file':True})
  907. ## PIPELINE
  908. load_multiqc_html >> upload_multiqc_to_ftp
  909. ## TASK
  910. upload_multiqc_to_box = \
  911. PythonOperator(
  912. task_id='upload_multiqc_to_box',
  913. dag=dag,
  914. queue='hpc_4G',
  915. python_callable=upload_analysis_file_to_box,
  916. params={'xcom_pull_task':'load_multiqc_html',
  917. 'xcom_pull_files_key':'output_db_files',
  918. 'analysis_tag':'multiqc_report'})
  919. ## PIPELINE
  920. load_multiqc_html >> upload_multiqc_to_box
  921. ## TASK
  922. update_analysis_and_status = \
  923. PythonOperator(
  924. task_id='update_analysis_and_status',
  925. dag=dag,
  926. queue='hpc_4G',
  927. python_callable=change_pipeline_status,
  928. trigger_rule='none_failed_or_skipped',
  929. params={'new_status':'FINISHED',
  930. 'no_change_status':'SEEDED'})
  931. ## PIPELINE
  932. upload_multiqc_to_ftp >> update_analysis_and_status
  933. upload_scanpy_report_for_sc_5p_to_ftp >> update_analysis_and_status
  934. upload_scanpy_report_for_sc_5p_to_box >> update_analysis_and_status
  935. upload_cellbrowser_for_sc_5p_to_ftp >> update_analysis_and_status
  936. #upload_scirpy_report_for_vdj_to_ftp >> update_analysis_and_status # no more ambiguous VDJ
  937. #upload_scirpy_report_for_vdj_to_box >> update_analysis_and_status
  938. upload_scirpy_report_for_vdj_b_to_ftp >> update_analysis_and_status
  939. upload_scirpy_report_for_vdj_b_to_box >> update_analysis_and_status
  940. upload_scirpy_report_for_vdj_t_to_ftp >> update_analysis_and_status
  941. upload_scirpy_report_for_vdj_t_to_box >> update_analysis_and_status
  942. upload_seurat_report_for_sc_5p_ftp >> update_analysis_and_status
  943. upload_seurat_report_for_sc_5p_to_box >> update_analysis_and_status
  944. upload_cellranger_results_to_irods >> update_analysis_and_status
  945. upload_cellranger_report_to_ftp >> update_analysis_and_status
  946. upload_cellranger_report_to_box >> update_analysis_and_status
  947. upload_cram_to_irods >> update_analysis_and_status
  948. ## TASK
  949. update_qc_pages = \
  950. PythonOperator(
  951. task_id='update_qc_pages',
  952. dag=dag,
  953. queue='hpc_4G',
  954. python_callable=create_and_update_qc_pages,
  955. params={'use_ephemeral_space':True,
  956. 'collection_type_list':[
  957. 'FTP_MULTIQC_HTML',
  958. 'FTP_SEURAT_HTML',
  959. 'FTP_SCIRPY_VDJ_T_HTML',
  960. 'FTP_SCIRPY_VDJ_B_HTML',
  961. 'FTP_SCIRPY_VDJ_HTML',
  962. 'FTP_CELLBROWSER',
  963. 'FTP_SCANPY_HTML',
  964. 'FTP_CELLRANGER_HTML']})
  965. ## PIPELINE
  966. update_analysis_and_status >> update_qc_pages