dag9_tenx_single_cell_immune_profiling.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. from datetime import timedelta
  2. import os,json,logging,subprocess
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dummy_operator import DummyOperator
  8. from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
  9. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import fetch_analysis_info_and_branch_func
  10. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import configure_cellranger_run_func
  11. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import run_sc_read_trimmming_func
  12. from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import run_cellranger_tool
  13. ## ARGS
  14. default_args = {
  15. 'owner': 'airflow',
  16. 'depends_on_past': False,
  17. 'start_date': days_ago(2),
  18. 'email_on_failure': False,
  19. 'email_on_retry': False,
  20. 'retries': 4,
  21. 'max_active_runs':10,
  22. 'catchup':False,
  23. 'retry_delay': timedelta(minutes=5),
  24. 'provide_context': True,
  25. }
  26. FEATURE_TYPE_LIST = Variable.get('tenx_single_cell_immune_profiling_feature_types')
  27. ## DAG
  28. dag = \
  29. DAG(
  30. dag_id='dag9_tenx_single_cell_immune_profiling',
  31. schedule_interval=None,
  32. tags=['hpc','analysis','tenx','sc'],
  33. default_args=default_args,
  34. orientation='LR')
  35. with dag:
  36. ## TASK
  37. fetch_analysis_info_and_branch = \
  38. BranchPythonOperator(
  39. task_id='fetch_analysis_info',
  40. dag=dag,
  41. queue='hpc_4g',
  42. params={'no_analysis_task':'no_analysis',
  43. 'analysis_description_xcom_key':'analysis_description',
  44. 'analysis_info_xcom_key':'analysis_info'},
  45. python_callable=fetch_analysis_info_and_branch_func)
  46. ## TASK
  47. configure_cellranger_run = \
  48. PythonOperator(
  49. task_id='configure_cellranger_run',
  50. dag=dag,
  51. queue='hpc_4g',
  52. params={'xcom_pull_task_id':'fetch_analysis_info_and_branch',
  53. 'analysis_description_xcom_key':'analysis_description',
  54. 'analysis_info_xcom_key':'analysis_info',
  55. 'library_csv_xcom_key':'cellranger_library_csv'},
  56. python_callable=configure_cellranger_run_func)
  57. for analysis_name in FEATURE_TYPE_LIST:
  58. ## TASK
  59. task_branch = \
  60. DummyOperator(
  61. task_id=analysis_name,
  62. dag=dag)
  63. run_trim_list = list()
  64. for run_id in range(0,32):
  65. ## TASK
  66. t = \
  67. PythonOperator(
  68. task_id='run_trim_{0}_{1}'.format(analysis_name,run_id),
  69. dag=dag,
  70. params={'xcom_pull_task_id':'fetch_analysis_info_and_branch',
  71. 'analysis_info_xcom_key':'analysis_info',
  72. 'analysis_name':analysis_name,
  73. 'run_id':run_id,
  74. 'r1_length':26,
  75. 'r2_length':0,
  76. 'fastq_input_dir_tag':'fastq_dir',
  77. 'fastq_output_dir_tag':'output_path'},
  78. python_callable=run_sc_read_trimmming_func)
  79. run_trim_list.append(t)
  80. ## TASK
  81. collect_trimmed_files = \
  82. DummyOperator(
  83. task_id='collect_trimmed_files_{0}'.format(analysis_name),
  84. dag=dag)
  85. ## PIPELINE
  86. fetch_analysis_info_and_branch >> task_branch
  87. task_branch >> run_trim_list
  88. run_trim_list >> collect_trimmed_files
  89. collect_trimmed_files >> configure_cellranger_run
  90. ## TASK
  91. no_analysis = \
  92. DummyOperator(
  93. task_id='no_analysis',
  94. dag=dag)
  95. ## PIPELINE
  96. fetch_analysis_info_and_branch >> no_analysis
  97. ## TASK
  98. run_cellranger = \
  99. PythonOperator(
  100. task_id='run_cellranger',
  101. dag=dag,
  102. params={'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
  103. 'analysis_description_xcom_key':'analysis_description',
  104. 'library_csv_xcom_key':'cellranger_library_csv',
  105. 'library_csv_xcom_pull_task':'configure_cellranger_run',
  106. 'cellranger_xcom_key':'cellranger_output',
  107. 'cellranger_options':['--localcores 8','--localmem 64']},
  108. python_callable=run_cellranger_tool)
  109. ## PIPELINE
  110. configure_cellranger_run >> run_cellranger
  111. ## TASK
  112. decide_analysis_branch = \
  113. BranchPythonOperator(
  114. task_id='decide_analysis_branch',
  115. dag=dag,
  116. python_callable=decide_analysis_branch_func,
  117. params={'upload_report_to_ftp_task':'upload_report_to_ftp',
  118. 'upload_report_to_box_task':'upload_report_to_box',
  119. 'upload_results_to_irods_task':'upload_results_to_irods',
  120. 'run_scanpy_for_sc_5p_task':'run_scanpy_for_sc_5p',
  121. 'run_scirpy_for_vdj_task':'run_scirpy_for_vdj',
  122. 'run_scirpy_for_vdj_b_task':'run_scirpy_for_vdj_b',
  123. 'run_scirpy_vdj_t_task':'run_scirpy_vdj_t',
  124. 'run_seurat_for_sc_5p_task':'run_seurat_for_sc_5p',
  125. 'run_picard_alignment_summary_task':'run_picard_alignment_summary',
  126. 'library_csv_xcom_key':'cellranger_library_csv',
  127. 'library_csv_xcom_pull_task':'configure_cellranger_run'})
  128. ## PIPELINE
  129. run_cellranger >> decide_analysis_branch
  130. ## TASK
  131. upload_report_to_ftp = \
  132. DummyOperator(
  133. task_id='upload_report_to_ftp',
  134. dag=dag)
  135. ## PIPELINE
  136. decide_analysis_branch >> upload_report_to_ftp
  137. ## TASK
  138. upload_report_to_box = \
  139. DummyOperator(
  140. task_id='upload_report_to_box',
  141. dag=dag)
  142. ## PIPELINE
  143. decide_analysis_branch >> upload_report_to_box
  144. ## TASK
  145. upload_results_to_irods = \
  146. DummyOperator(
  147. task_id='upload_results_to_irods',
  148. dag=dag)
  149. ## PIPELINE
  150. decide_analysis_branch >> upload_results_to_irods
  151. ## TASK
  152. run_scanpy_for_sc_5p = \
  153. DummyOperator(
  154. task_id='run_scanpy_for_sc_5p',
  155. dag=dag)
  156. upload_scanpy_report_for_sc_5p_to_ftp = \
  157. DummyOperator(
  158. task_id='upload_scanpy_report_for_sc_5p_to_ftp',
  159. dag=dag)
  160. upload_scanpy_report_for_sc_5p_to_box = \
  161. DummyOperator(
  162. task_id='upload_scanpy_report_for_sc_5p_to_box',
  163. dag=dag)
  164. run_cellbrowser_for_sc_5p = \
  165. DummyOperator(
  166. task_id='run_cellbrowser_for_sc_5p',
  167. dag=dag)
  168. upload_cellbrowser_for_sc_5p_to_ftp = \
  169. DummyOperator(
  170. task_id='upload_cellbrowser_for_sc_5p_to_ftp',
  171. dag=dag)
  172. ## PIPELINE
  173. decide_analysis_branch >> run_scanpy_for_sc_5p
  174. run_scanpy_for_sc_5p >> upload_scanpy_report_for_sc_5p_to_ftp
  175. run_scanpy_for_sc_5p >> upload_scanpy_report_for_sc_5p_to_box
  176. run_scanpy_for_sc_5p >> run_cellbrowser_for_sc_5p
  177. run_cellbrowser_for_sc_5p >> upload_cellbrowser_for_sc_5p_to_ftp
  178. ## TASK
  179. run_scirpy_for_vdj = \
  180. DummyOperator(
  181. task_id='run_scirpy_for_vdj',
  182. dag=dag)
  183. upload_scanpy_report_for_vdj_to_ftp = \
  184. DummyOperator(
  185. task_id='upload_scanpy_report_for_vdj_to_ftp',
  186. dag=dag)
  187. upload_scanpy_report_for_vdj_to_box = \
  188. DummyOperator(
  189. task_id='upload_scanpy_report_for_vdj_to_box',
  190. dag=dag)
  191. ## PIPELINE
  192. decide_analysis_branch >> run_scirpy_for_vdj
  193. run_scirpy_for_vdj >> upload_scanpy_report_for_vdj_to_ftp
  194. run_scirpy_for_vdj >> upload_scanpy_report_for_vdj_to_box
  195. ## TASK
  196. run_scirpy_for_vdj_b = \
  197. DummyOperator(
  198. task_id='run_scirpy_for_vdj_b',
  199. dag=dag)
  200. upload_scanpy_report_for_vdj_b_to_ftp = \
  201. DummyOperator(
  202. task_id='upload_scanpy_report_for_vdj_b_to_ftp',
  203. dag=dag)
  204. upload_scanpy_report_for_vdj_b_to_box = \
  205. DummyOperator(
  206. task_id='upload_scanpy_report_for_vdj_b_to_box',
  207. dag=dag)
  208. run_cellbrowser_for_vdj_b = \
  209. DummyOperator(
  210. task_id='run_cellbrowser_for_vdj_b',
  211. dag=dag)
  212. upload_cellbrowser_for_vdj_b_to_ftp = \
  213. DummyOperator(
  214. task_id='upload_cellbrowser_for_vdj_b_to_ftp',
  215. dag=dag)
  216. ## PIPELINE
  217. decide_analysis_branch >> run_scirpy_for_vdj_b
  218. run_scirpy_for_vdj_b >> upload_scanpy_report_for_vdj_b_to_ftp
  219. run_scirpy_for_vdj_b >> upload_scanpy_report_for_vdj_b_to_box
  220. run_scirpy_for_vdj_b >> run_cellbrowser_for_vdj_b
  221. run_cellbrowser_for_vdj_b >> upload_cellbrowser_for_vdj_b_to_ftp
  222. ## TASK
  223. run_scirpy_for_vdj_t = \
  224. DummyOperator(
  225. task_id='run_scirpy_for_vdj_t',
  226. dag=dag)
  227. upload_scanpy_report_for_vdj_t_to_ftp = \
  228. DummyOperator(
  229. task_id='upload_scanpy_report_for_vdj_t_to_ftp',
  230. dag=dag)
  231. upload_scanpy_report_for_vdj_t_to_box = \
  232. DummyOperator(
  233. task_id='upload_scanpy_report_for_vdj_t_to_box',
  234. dag=dag)
  235. ## PIPELINE
  236. decide_analysis_branch >> run_scirpy_for_vdj_t
  237. run_scirpy_for_vdj_t >> upload_scanpy_report_for_vdj_t_to_ftp
  238. run_scirpy_for_vdj_t >> upload_scanpy_report_for_vdj_t_to_box
  239. ## TASK
  240. run_seurat_for_sc_5p = \
  241. DummyOperator(
  242. task_id='run_seurat_for_sc_5p',
  243. dag=dag)
  244. upload_seurat_report_for_sc_5p_ftp = \
  245. DummyOperator(
  246. task_id='upload_seurat_report_for_sc_5p_ftp',
  247. dag=dag)
  248. upload_seurat_report_for_sc_5p_to_box = \
  249. DummyOperator(
  250. task_id='upload_seurat_report_for_sc_5p_to_box',
  251. dag=dag)
  252. ## PIPELINE
  253. decide_analysis_branch >> run_seurat_for_sc_5p
  254. run_seurat_for_sc_5p >> upload_seurat_report_for_sc_5p_ftp
  255. run_seurat_for_sc_5p >> upload_seurat_report_for_sc_5p_to_box
  256. ## TASK
  257. run_picard_alignment_summary = \
  258. DummyOperator(
  259. task_id='run_picard_alignment_summary',
  260. dag=dag)
  261. ## PIPELINE
  262. decide_analysis_branch >> run_picard_alignment_summary
  263. ## TASK
  264. run_picard_qual_summary = \
  265. DummyOperator(
  266. task_id='run_picard_qual_summary',
  267. dag=dag)
  268. ## PIPELINE
  269. run_picard_alignment_summary >> run_picard_qual_summary
  270. ## TASK
  271. run_picard_rna_summary = \
  272. DummyOperator(
  273. task_id='run_picard_rna_summary',
  274. dag=dag)
  275. ## PIPELINE
  276. run_picard_qual_summary >> run_picard_rna_summary
  277. ## TASK
  278. run_picard_gc_summary = \
  279. DummyOperator(
  280. task_id='run_picard_gc_summary',
  281. dag=dag)
  282. ## PIPELINE
  283. run_picard_rna_summary >> run_picard_gc_summary
  284. ## TASK
  285. run_samtools_stats = \
  286. DummyOperator(
  287. task_id='run_samtools_stats',
  288. dag=dag)
  289. ## PIPELINE
  290. run_picard_gc_summary >> run_samtools_stats
  291. ## TASK
  292. run_samtools_idxstats = \
  293. DummyOperator(
  294. task_id='run_samtools_idxstats',
  295. dag=dag)
  296. ## PIPELINE
  297. run_samtools_stats >> run_samtools_idxstats
  298. ## TASK
  299. run_multiqc = \
  300. DummyOperator(
  301. task_id='run_multiqc',
  302. dag=dag)
  303. ## PIPELINE
  304. run_samtools_idxstats >> run_multiqc
  305. ## TASK
  306. upload_multiqc_to_ftp = \
  307. DummyOperator(
  308. task_id='upload_multiqc_to_ftp',
  309. dag=dag)
  310. ## PIPELINE
  311. run_multiqc >> upload_multiqc_to_ftp
  312. ## TASK
  313. upload_multiqc_to_box = \
  314. DummyOperator(
  315. task_id='upload_multiqc_to_box',
  316. dag=dag)
  317. ## PIPELINE
  318. run_multiqc >> upload_multiqc_to_box
  319. ## TASK
  320. update_analysis_and_status = \
  321. DummyOperator(
  322. task_id='update_analysis_and_status',
  323. dag=dag)
  324. ## PIPELINE
  325. upload_multiqc_to_ftp >> update_analysis_and_status
  326. upload_scanpy_report_for_sc_5p_to_ftp >> update_analysis_and_status
  327. upload_scanpy_report_for_sc_5p_to_box >> update_analysis_and_status
  328. upload_cellbrowser_for_sc_5p_to_ftp >> update_analysis_and_status
  329. upload_scanpy_report_for_vdj_to_ftp >> update_analysis_and_status
  330. upload_scanpy_report_for_vdj_to_box >> update_analysis_and_status
  331. upload_scanpy_report_for_vdj_b_to_ftp >> update_analysis_and_status
  332. upload_scanpy_report_for_vdj_b_to_box >> update_analysis_and_status
  333. upload_scanpy_report_for_vdj_t_to_ftp >> update_analysis_and_status
  334. upload_scanpy_report_for_vdj_t_to_box >> update_analysis_and_status
  335. upload_seurat_report_for_sc_5p_ftp >> update_analysis_and_status
  336. upload_seurat_report_for_sc_5p_to_box >> update_analysis_and_status
  337. upload_results_to_irods >> update_analysis_and_status
  338. upload_report_to_ftp >> update_analysis_and_status
  339. upload_report_to_box >> update_analysis_and_status