dag9_tenx_single_cell_immune_profiling.py 13 KB


  1. from datetime import timedelta
  2. import os,json,logging,subprocess
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dummy_operator import DummyOperator
  8. from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
  9. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import fetch_analysis_info_and_branch_func
  10. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import configure_cellranger_run_func
  11. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_sc_read_trimmming_func
  12. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_cellranger_tool
  13. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import decide_analysis_branch_func
  14. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_cellranger_result_to_db_func
  15. ## ARGS
  16. default_args = {
  17. 'owner': 'airflow',
  18. 'depends_on_past': False,
  19. 'start_date': days_ago(2),
  20. 'email_on_failure': False,
  21. 'email_on_retry': False,
  22. 'retries': 4,
  23. 'max_active_runs':10,
  24. 'catchup':False,
  25. 'retry_delay': timedelta(minutes=5),
  26. 'provide_context': True,
  27. }
  28. FEATURE_TYPE_LIST = Variable.get('tenx_single_cell_immune_profiling_feature_types')
  29. ## DAG
  30. dag = \
  31. DAG(
  32. dag_id='dag9_tenx_single_cell_immune_profiling',
  33. schedule_interval=None,
  34. tags=['hpc','analysis','tenx','sc'],
  35. default_args=default_args,
  36. orientation='LR')
  37. with dag:
  38. ## TASK
  39. fetch_analysis_info_and_branch = \
  40. BranchPythonOperator(
  41. task_id='fetch_analysis_info',
  42. dag=dag,
  43. queue='hpc_4g',
  44. params={'no_analysis_task':'no_analysis',
  45. 'analysis_description_xcom_key':'analysis_description',
  46. 'analysis_info_xcom_key':'analysis_info'},
  47. python_callable=fetch_analysis_info_and_branch_func)
  48. ## TASK
  49. configure_cellranger_run = \
  50. PythonOperator(
  51. task_id='configure_cellranger_run',
  52. dag=dag,
  53. queue='hpc_4g',
  54. params={'xcom_pull_task_id':'fetch_analysis_info_and_branch',
  55. 'analysis_description_xcom_key':'analysis_description',
  56. 'analysis_info_xcom_key':'analysis_info',
  57. 'library_csv_xcom_key':'cellranger_library_csv'},
  58. python_callable=configure_cellranger_run_func)
  59. for analysis_name in FEATURE_TYPE_LIST:
  60. ## TASK
  61. task_branch = \
  62. DummyOperator(
  63. task_id=analysis_name,
  64. dag=dag)
  65. run_trim_list = list()
  66. for run_id in range(0,32):
  67. ## TASK
  68. t = \
  69. PythonOperator(
  70. task_id='run_trim_{0}_{1}'.format(analysis_name,run_id),
  71. dag=dag,
  72. params={'xcom_pull_task_id':'fetch_analysis_info_and_branch',
  73. 'analysis_info_xcom_key':'analysis_info',
  74. 'analysis_name':analysis_name,
  75. 'run_id':run_id,
  76. 'r1_length':26,
  77. 'r2_length':0,
  78. 'fastq_input_dir_tag':'fastq_dir',
  79. 'fastq_output_dir_tag':'output_path'},
  80. python_callable=run_sc_read_trimmming_func)
  81. run_trim_list.append(t)
  82. ## TASK
  83. collect_trimmed_files = \
  84. DummyOperator(
  85. task_id='collect_trimmed_files_{0}'.format(analysis_name),
  86. dag=dag)
  87. ## PIPELINE
  88. fetch_analysis_info_and_branch >> task_branch
  89. task_branch >> run_trim_list
  90. run_trim_list >> collect_trimmed_files
  91. collect_trimmed_files >> configure_cellranger_run
  92. ## TASK
  93. no_analysis = \
  94. DummyOperator(
  95. task_id='no_analysis',
  96. dag=dag)
  97. ## PIPELINE
  98. fetch_analysis_info_and_branch >> no_analysis
  99. ## TASK
  100. run_cellranger = \
  101. PythonOperator(
  102. task_id='run_cellranger',
  103. dag=dag,
  104. params={'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
  105. 'analysis_description_xcom_key':'analysis_description',
  106. 'library_csv_xcom_key':'cellranger_library_csv',
  107. 'library_csv_xcom_pull_task':'configure_cellranger_run',
  108. 'cellranger_xcom_key':'cellranger_output',
  109. 'cellranger_options':['--localcores 8','--localmem 64']},
  110. python_callable=run_cellranger_tool)
  111. ## PIPELINE
  112. configure_cellranger_run >> run_cellranger
  113. ## TASK
  114. decide_analysis_branch = \
  115. BranchPythonOperator(
  116. task_id='decide_analysis_branch',
  117. dag=dag,
  118. python_callable=decide_analysis_branch_func,
  119. params={'load_cellranger_result_to_db_task':'load_cellranger_result_to_db',
  120. 'run_scanpy_for_sc_5p_task':'run_scanpy_for_sc_5p',
  121. 'run_scirpy_for_vdj_task':'run_scirpy_for_vdj',
  122. 'run_scirpy_for_vdj_b_task':'run_scirpy_for_vdj_b',
  123. 'run_scirpy_vdj_t_task':'run_scirpy_vdj_t',
  124. 'run_seurat_for_sc_5p_task':'run_seurat_for_sc_5p',
  125. 'run_picard_alignment_summary_task':'run_picard_alignment_summary',
  126. 'convert_bam_to_cram_task':'convert_bam_to_cram',
  127. 'library_csv_xcom_key':'cellranger_library_csv',
  128. 'library_csv_xcom_pull_task':'configure_cellranger_run'})
  129. ## PIPELINE
  130. run_cellranger >> decide_analysis_branch
  131. ## TASK
  132. load_cellranger_result_to_db = \
  133. PythonOperator(
  134. task_id='load_cellranger_result_to_db',
  135. dag=dag,
  136. python_callable=load_cellranger_result_to_db_func,
  137. params={'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
  138. 'analysis_description_xcom_key':'analysis_description',
  139. 'cellranger_xcom_key':'cellranger_output',
  140. 'cellranger_xcom_pull_task':'run_cellranger',
  141. 'collection_type':'CELLRANGER_MULTI',
  142. 'collection_table':'sample',
  143. 'genome_column':'genome_build',
  144. 'analysis_name':'cellranger_multi',
  145. 'output_xcom_key':'loaded_output_files'})
  146. upload_report_to_ftp = \
  147. PythonOperator(
  148. task_id='upload_report_to_ftp',
  149. dag=dag,
  150. python_callable=None,
  151. params={})
  152. upload_report_to_box = \
  153. DummyOperator(
  154. task_id='upload_report_to_box',
  155. dag=dag,
  156. params=None)
  157. upload_results_to_irods = \
  158. DummyOperator(
  159. task_id='upload_results_to_irods',
  160. dag=dag,
  161. params=None)
  162. ## PIPELINE
  163. decide_analysis_branch >> load_cellranger_result_to_db
  164. load_cellranger_result_to_db >> upload_report_to_ftp
  165. load_cellranger_result_to_db >> upload_report_to_box
  166. load_cellranger_result_to_db >> upload_results_to_irods
  167. ## TASK
  168. run_scanpy_for_sc_5p = \
  169. DummyOperator(
  170. task_id='run_scanpy_for_sc_5p',
  171. dag=dag,
  172. params={'cellranger_xcom_key':'cellranger_output',
  173. 'cellranger_xcom_pull_task':'run_cellranger'})
  174. upload_scanpy_report_for_sc_5p_to_ftp = \
  175. DummyOperator(
  176. task_id='upload_scanpy_report_for_sc_5p_to_ftp',
  177. dag=dag)
  178. upload_scanpy_report_for_sc_5p_to_box = \
  179. DummyOperator(
  180. task_id='upload_scanpy_report_for_sc_5p_to_box',
  181. dag=dag)
  182. run_cellbrowser_for_sc_5p = \
  183. DummyOperator(
  184. task_id='run_cellbrowser_for_sc_5p',
  185. dag=dag)
  186. upload_cellbrowser_for_sc_5p_to_ftp = \
  187. DummyOperator(
  188. task_id='upload_cellbrowser_for_sc_5p_to_ftp',
  189. dag=dag)
  190. ## PIPELINE
  191. decide_analysis_branch >> run_scanpy_for_sc_5p
  192. run_scanpy_for_sc_5p >> upload_scanpy_report_for_sc_5p_to_ftp
  193. run_scanpy_for_sc_5p >> upload_scanpy_report_for_sc_5p_to_box
  194. run_scanpy_for_sc_5p >> run_cellbrowser_for_sc_5p
  195. run_cellbrowser_for_sc_5p >> upload_cellbrowser_for_sc_5p_to_ftp
  196. ## TASK
  197. run_scirpy_for_vdj = \
  198. DummyOperator(
  199. task_id='run_scirpy_for_vdj',
  200. dag=dag,
  201. params={'cellranger_xcom_key':'cellranger_output',
  202. 'cellranger_xcom_pull_task':'run_cellranger'})
  203. upload_scanpy_report_for_vdj_to_ftp = \
  204. DummyOperator(
  205. task_id='upload_scanpy_report_for_vdj_to_ftp',
  206. dag=dag)
  207. upload_scanpy_report_for_vdj_to_box = \
  208. DummyOperator(
  209. task_id='upload_scanpy_report_for_vdj_to_box',
  210. dag=dag)
  211. ## PIPELINE
  212. decide_analysis_branch >> run_scirpy_for_vdj
  213. run_scirpy_for_vdj >> upload_scanpy_report_for_vdj_to_ftp
  214. run_scirpy_for_vdj >> upload_scanpy_report_for_vdj_to_box
  215. ## TASK
  216. run_scirpy_for_vdj_b = \
  217. DummyOperator(
  218. task_id='run_scirpy_for_vdj_b',
  219. dag=dag,
  220. params={'cellranger_xcom_key':'cellranger_output',
  221. 'cellranger_xcom_pull_task':'run_cellranger'})
  222. upload_scanpy_report_for_vdj_b_to_ftp = \
  223. DummyOperator(
  224. task_id='upload_scanpy_report_for_vdj_b_to_ftp',
  225. dag=dag)
  226. upload_scanpy_report_for_vdj_b_to_box = \
  227. DummyOperator(
  228. task_id='upload_scanpy_report_for_vdj_b_to_box',
  229. dag=dag)
  230. ## PIPELINE
  231. decide_analysis_branch >> run_scirpy_for_vdj_b
  232. run_scirpy_for_vdj_b >> upload_scanpy_report_for_vdj_b_to_ftp
  233. run_scirpy_for_vdj_b >> upload_scanpy_report_for_vdj_b_to_box
  234. ## TASK
  235. run_scirpy_for_vdj_t = \
  236. DummyOperator(
  237. task_id='run_scirpy_for_vdj_t',
  238. dag=dag,
  239. params={'cellranger_xcom_key':'cellranger_output',
  240. 'cellranger_xcom_pull_task':'run_cellranger'})
  241. upload_scanpy_report_for_vdj_t_to_ftp = \
  242. DummyOperator(
  243. task_id='upload_scanpy_report_for_vdj_t_to_ftp',
  244. dag=dag)
  245. upload_scanpy_report_for_vdj_t_to_box = \
  246. DummyOperator(
  247. task_id='upload_scanpy_report_for_vdj_t_to_box',
  248. dag=dag)
  249. ## PIPELINE
  250. decide_analysis_branch >> run_scirpy_for_vdj_t
  251. run_scirpy_for_vdj_t >> upload_scanpy_report_for_vdj_t_to_ftp
  252. run_scirpy_for_vdj_t >> upload_scanpy_report_for_vdj_t_to_box
  253. ## TASK
  254. run_seurat_for_sc_5p = \
  255. DummyOperator(
  256. task_id='run_seurat_for_sc_5p',
  257. dag=dag,
  258. params={'cellranger_xcom_key':'cellranger_output',
  259. 'cellranger_xcom_pull_task':'run_cellranger'})
  260. upload_seurat_report_for_sc_5p_ftp = \
  261. DummyOperator(
  262. task_id='upload_seurat_report_for_sc_5p_ftp',
  263. dag=dag)
  264. upload_seurat_report_for_sc_5p_to_box = \
  265. DummyOperator(
  266. task_id='upload_seurat_report_for_sc_5p_to_box',
  267. dag=dag)
  268. ## PIPELINE
  269. decide_analysis_branch >> run_seurat_for_sc_5p
  270. run_seurat_for_sc_5p >> upload_seurat_report_for_sc_5p_ftp
  271. run_seurat_for_sc_5p >> upload_seurat_report_for_sc_5p_to_box
  272. ## TASK
  273. convert_bam_to_cram = \
  274. DummyOperator(
  275. task_id='convert_bam_to_cram',
  276. dag=dag,
  277. params={'cellranger_xcom_key':'cellranger_output',
  278. 'cellranger_xcom_pull_task':'run_cellranger'})
  279. upload_cram_to_irods = \
  280. DummyOperator(
  281. task_id='upload_cram_to_irods',
  282. dag=dag)
  283. ## PIPELINE
  284. decide_analysis_branch >> convert_bam_to_cram
  285. convert_bam_to_cram >> upload_cram_to_irods
  286. ## TASK
  287. run_picard_alignment_summary = \
  288. DummyOperator(
  289. task_id='run_picard_alignment_summary',
  290. dag=dag,
  291. params={'cellranger_xcom_key':'cellranger_output',
  292. 'cellranger_xcom_pull_task':'run_cellranger'})
  293. ## PIPELINE
  294. decide_analysis_branch >> run_picard_alignment_summary
  295. ## TASK
  296. run_picard_qual_summary = \
  297. DummyOperator(
  298. task_id='run_picard_qual_summary',
  299. dag=dag)
  300. ## PIPELINE
  301. run_picard_alignment_summary >> run_picard_qual_summary
  302. ## TASK
  303. run_picard_rna_summary = \
  304. DummyOperator(
  305. task_id='run_picard_rna_summary',
  306. dag=dag)
  307. ## PIPELINE
  308. run_picard_qual_summary >> run_picard_rna_summary
  309. ## TASK
  310. run_picard_gc_summary = \
  311. DummyOperator(
  312. task_id='run_picard_gc_summary',
  313. dag=dag)
  314. ## PIPELINE
  315. run_picard_rna_summary >> run_picard_gc_summary
  316. ## TASK
  317. run_samtools_stats = \
  318. DummyOperator(
  319. task_id='run_samtools_stats',
  320. dag=dag)
  321. ## PIPELINE
  322. run_picard_gc_summary >> run_samtools_stats
  323. ## TASK
  324. run_samtools_idxstats = \
  325. DummyOperator(
  326. task_id='run_samtools_idxstats',
  327. dag=dag)
  328. ## PIPELINE
  329. run_samtools_stats >> run_samtools_idxstats
  330. ## TASK
  331. run_multiqc = \
  332. DummyOperator(
  333. task_id='run_multiqc',
  334. dag=dag)
  335. ## PIPELINE
  336. run_samtools_idxstats >> run_multiqc
  337. ## TASK
  338. upload_multiqc_to_ftp = \
  339. DummyOperator(
  340. task_id='upload_multiqc_to_ftp',
  341. dag=dag)
  342. ## PIPELINE
  343. run_multiqc >> upload_multiqc_to_ftp
  344. ## TASK
  345. upload_multiqc_to_box = \
  346. DummyOperator(
  347. task_id='upload_multiqc_to_box',
  348. dag=dag)
  349. ## PIPELINE
  350. run_multiqc >> upload_multiqc_to_box
  351. ## TASK
  352. update_analysis_and_status = \
  353. DummyOperator(
  354. task_id='update_analysis_and_status',
  355. dag=dag)
  356. ## PIPELINE
  357. upload_multiqc_to_ftp >> update_analysis_and_status
  358. upload_scanpy_report_for_sc_5p_to_ftp >> update_analysis_and_status
  359. upload_scanpy_report_for_sc_5p_to_box >> update_analysis_and_status
  360. upload_cellbrowser_for_sc_5p_to_ftp >> update_analysis_and_status
  361. upload_scanpy_report_for_vdj_to_ftp >> update_analysis_and_status
  362. upload_scanpy_report_for_vdj_to_box >> update_analysis_and_status
  363. upload_scanpy_report_for_vdj_b_to_ftp >> update_analysis_and_status
  364. upload_scanpy_report_for_vdj_b_to_box >> update_analysis_and_status
  365. upload_scanpy_report_for_vdj_t_to_ftp >> update_analysis_and_status
  366. upload_scanpy_report_for_vdj_t_to_box >> update_analysis_and_status
  367. upload_seurat_report_for_sc_5p_ftp >> update_analysis_and_status
  368. upload_seurat_report_for_sc_5p_to_box >> update_analysis_and_status
  369. upload_results_to_irods >> update_analysis_and_status
  370. upload_report_to_ftp >> update_analysis_and_status
  371. upload_report_to_box >> update_analysis_and_status
  372. upload_cram_to_irods >> update_analysis_and_status