Browse Source

added bam copy for parallel picard runs

Avik Datta 4 years ago
parent
commit
21ebd05927
1 changed files with 46 additions and 19 deletions
  1. 46 19
      dags/dag9_tenx_single_cell_immune_profiling.py

+ 46 - 19
dags/dag9_tenx_single_cell_immune_profiling.py

@@ -22,6 +22,8 @@ from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import conve
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_picard_for_cellranger
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_samtools_for_cellranger
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_multiqc_for_cellranger
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import index_and_copy_bam_for_parallel_analysis
+
 
 ## ARGS
 default_args = {
@@ -519,14 +521,32 @@ with dag:
   load_seurat_report_for_sc_5p_db >> upload_seurat_report_for_sc_5p_ftp
   load_seurat_report_for_sc_5p_db >> upload_seurat_report_for_sc_5p_to_box
   ## TASK
+  copy_bam_for_parallel_runs = \
+    BranchPythonOperator(
+      task_id='copy_bam_for_parallel_runs',
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=index_and_copy_bam_for_parallel_analysis,
+      params={'xcom_pull_files_key':'cellranger_output',
+              'xcom_pull_task':'run_cellranger',
+              'list_of_tasks':[
+                'convert_cellranger_bam_to_cram',
+                'run_picard_alignment_summary',
+                'run_picard_qual_summary',
+                'run_picard_rna_summary',
+                'run_picard_gc_summary',
+                'run_picard_base_dist_summary',
+                'run_samtools_stats']
+            })
+  ## TASK
   convert_bam_to_cram = \
     PythonOperator(
       task_id='convert_cellranger_bam_to_cram',
       dag=dag,
       queue='hpc_4G4t',
       python_callable=convert_bam_to_cram_func,
-      params={'xcom_pull_files_key':'cellranger_output',
-              'xcom_pull_task':'run_cellranger',
+      params={'xcom_pull_files_key':'convert_cellranger_bam_to_cram',
+              'xcom_pull_task':'copy_bam_for_parallel_runs',
               'analysis_description_xcom_pull_task':'fetch_analysis_info',
               'analysis_description_xcom_key':'analysis_description',
               'use_ephemeral_space':True,
@@ -546,7 +566,8 @@ with dag:
               'collection_name_key':'sample_igf_id',
               'analysis_name':'cellranger_multi'})
   ## PIPELINE
-  decide_analysis_branch >> convert_bam_to_cram
+  decide_analysis_branch >> copy_bam_for_parallel_runs
+  copy_bam_for_parallel_runs >> convert_bam_to_cram
   convert_bam_to_cram >> upload_cram_to_irods
   ## TASK
   run_picard_alignment_summary = \
@@ -556,7 +577,7 @@ with dag:
       queue='hpc_4G',
       python_callable=run_picard_for_cellranger,
       params={'xcom_pull_files_key':'cellranger_output',
-              'xcom_pull_task':'run_cellranger',
+              'xcom_pull_task':'copy_bam_for_parallel_runs',
               'analysis_description_xcom_pull_task':'fetch_analysis_info',
               'analysis_description_xcom_key':'analysis_description',
               'use_ephemeral_space':True,
@@ -567,7 +588,7 @@ with dag:
               'analysis_files_xcom_key':'picard_alignment_summary',
               'bam_files_xcom_key':None})
   ## PIPELINE
-  decide_analysis_branch >> run_picard_alignment_summary
+  copy_bam_for_parallel_runs >> run_picard_alignment_summary
   ## TASK
   run_picard_qual_summary = \
     PythonOperator(
@@ -576,7 +597,7 @@ with dag:
       queue='hpc_4G',
       python_callable=run_picard_for_cellranger,
       params={'xcom_pull_files_key':'cellranger_output',
-              'xcom_pull_task':'run_cellranger',
+              'xcom_pull_task':'copy_bam_for_parallel_runs',
               'analysis_description_xcom_pull_task':'fetch_analysis_info',
               'analysis_description_xcom_key':'analysis_description',
               'use_ephemeral_space':True,
@@ -587,7 +608,7 @@ with dag:
               'analysis_files_xcom_key':'picard_qual_summary',
               'bam_files_xcom_key':None})
   ## PIPELINE
-  run_picard_alignment_summary >> run_picard_qual_summary
+  copy_bam_for_parallel_runs >> run_picard_qual_summary
   ## TASK
   run_picard_rna_summary = \
     PythonOperator(
@@ -596,7 +617,7 @@ with dag:
       queue='hpc_4G',
       python_callable=run_picard_for_cellranger,
       params={'xcom_pull_files_key':'cellranger_output',
-              'xcom_pull_task':'run_cellranger',
+              'xcom_pull_task':'copy_bam_for_parallel_runs',
               'analysis_description_xcom_pull_task':'fetch_analysis_info',
               'analysis_description_xcom_key':'analysis_description',
               'use_ephemeral_space':True,
@@ -607,7 +628,7 @@ with dag:
               'analysis_files_xcom_key':'picard_rna_summary',
               'bam_files_xcom_key':None})
   ## PIPELINE
-  run_picard_qual_summary >> run_picard_rna_summary
+  copy_bam_for_parallel_runs >> run_picard_rna_summary
   ## TASK
   run_picard_gc_summary = \
     PythonOperator(
@@ -616,7 +637,7 @@ with dag:
       queue='hpc_4G',
       python_callable=run_picard_for_cellranger,
       params={'xcom_pull_files_key':'cellranger_output',
-              'xcom_pull_task':'run_cellranger',
+              'xcom_pull_task':'copy_bam_for_parallel_runs',
               'analysis_description_xcom_pull_task':'fetch_analysis_info',
               'analysis_description_xcom_key':'analysis_description',
               'use_ephemeral_space':True,
@@ -627,7 +648,7 @@ with dag:
               'analysis_files_xcom_key':'picard_gc_summary',
               'bam_files_xcom_key':None})
   ## PIPELINE
-  run_picard_rna_summary >> run_picard_gc_summary
+  copy_bam_for_parallel_runs >> run_picard_gc_summary
   ## TASK
   run_picard_base_dist_summary = \
     PythonOperator(
@@ -635,8 +656,8 @@ with dag:
       dag=dag,
       queue='hpc_4G',
       python_callable=run_picard_for_cellranger,
-      params={'xcom_pull_files_key':'cellranger_output',
-              'xcom_pull_task':'run_cellranger',
+      params={'xcom_pull_files_key':'run_picard_base_dist_summary',
+              'xcom_pull_task':'copy_bam_for_parallel_runs',
               'analysis_description_xcom_pull_task':'fetch_analysis_info',
               'analysis_description_xcom_key':'analysis_description',
               'use_ephemeral_space':True,
@@ -647,7 +668,7 @@ with dag:
               'analysis_files_xcom_key':'picard_base_summary',
               'bam_files_xcom_key':None})
   ## PIPELINE
-  run_picard_gc_summary >> run_picard_base_dist_summary
+  copy_bam_for_parallel_runs >> run_picard_base_dist_summary
   ## TASK
   run_samtools_stats = \
     PythonOperator(
@@ -655,8 +676,8 @@ with dag:
       dag=dag,
       queue='hpc_4G4t',
       python_callable=run_samtools_for_cellranger,
-      params={'xcom_pull_files_key':'cellranger_output',
-              'xcom_pull_task':'run_cellranger',
+      params={'xcom_pull_files_key':'run_samtools_stats',
+              'xcom_pull_task':'copy_bam_for_parallel_runs',
               'analysis_description_xcom_pull_task':'fetch_analysis_info',
               'analysis_description_xcom_key':'analysis_description',
               'use_ephemeral_space':True,
@@ -665,7 +686,7 @@ with dag:
               'threads':4,
               'analysis_files_xcom_key':'samtools_stats'})
   ## PIPELINE
-  run_picard_gc_summary >> run_samtools_stats
+  copy_bam_for_parallel_runs >> run_samtools_stats
   ## TASK
   run_samtools_idxstats = \
     PythonOperator(
@@ -673,8 +694,8 @@ with dag:
       dag=dag,
       queue='hpc_4G4t',
       python_callable=run_samtools_for_cellranger,
-      params={'xcom_pull_files_key':'cellranger_output',
-              'xcom_pull_task':'run_cellranger',
+      params={'xcom_pull_files_key':'run_samtools_stats',
+              'xcom_pull_task':'copy_bam_for_parallel_runs',
               'analysis_description_xcom_pull_task':'fetch_analysis_info',
               'analysis_description_xcom_key':'analysis_description',
               'use_ephemeral_space':True,
@@ -690,6 +711,7 @@ with dag:
       task_id='run_multiqc',
       dag=dag,
       queue='hpc_4G',
+      trigger_rule='none_failed_or_skipped',
       python_callable=run_multiqc_for_cellranger,
       params={
         'list_of_analysis_xcoms_and_tasks':{
@@ -708,6 +730,11 @@ with dag:
         'multiqc_data_file_xcom_key':'multiqc_data',
         'tool_order_list':['picad','samtools']})
   ## PIPELINE
+  run_picard_alignment_summary >> run_multiqc
+  run_picard_qual_summary >> run_multiqc
+  run_picard_rna_summary >> run_multiqc
+  run_picard_gc_summary >> run_multiqc
+  run_picard_base_dist_summary >> run_multiqc
   run_samtools_idxstats >> run_multiqc
   ## TASK
   upload_multiqc_to_ftp = \