Browse Source

added temp file clean up and picard matrics upload to box

Avik Datta 4 years ago
parent
commit
f3dcc5287f
1 changed files with 153 additions and 2 deletions
  1. 153 2
      dags/dag9_tenx_single_cell_immune_profiling.py

+ 153 - 2
dags/dag9_tenx_single_cell_immune_profiling.py

@@ -23,6 +23,7 @@ from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_s
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_multiqc_for_cellranger
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import index_and_copy_bam_for_parallel_analysis
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import change_pipeline_status
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import clean_up_files
 
 ## ARGS
 default_args = {
@@ -593,6 +594,29 @@ with dag:
   ## PIPELINE
   copy_bam_for_parallel_runs >> run_picard_alignment_summary
   ## TASK
+  cleanup_picard_alignment_summary_input = \
+    PythonOperator(
+      task_id='cleanup_picard_alignment_summary_input',
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=clean_up_files,
+      params={'xcom_pull_files_key':'run_picard_alignment_summary',
+              'xcom_pull_task':'copy_bam_for_parallel_runs'})
+  ## PIPELINE
+  run_picard_alignment_summary >> cleanup_picard_alignment_summary_input
+  ## TASK
+  upload_picard_alignment_summary_to_box = \
+    PythonOperator(
+      task_id='upload_picard_alignment_summary_to_box',
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=upload_analysis_file_to_box,
+      params={'xcom_pull_task':'run_picard_alignment_summary',
+              'xcom_pull_files_key':'picard_alignment_summary',
+              'analysis_tag':'Picard-CollectAlignmentSummaryMetrics'})
+  ## PIPELINE
+  run_picard_alignment_summary >> upload_picard_alignment_summary_to_box
+  ## TASK
   run_picard_qual_summary = \
     PythonOperator(
       task_id='run_picard_qual_summary',
@@ -613,11 +637,34 @@ with dag:
   ## PIPELINE
   copy_bam_for_parallel_runs >> run_picard_qual_summary
   ## TASK
+  cleanup_picard_qual_summary_input = \
+    PythonOperator(
+      task_id='cleanup_picard_qual_summary_input',
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=clean_up_files,
+      params={'xcom_pull_files_key':'run_picard_qual_summary',
+              'xcom_pull_task':'copy_bam_for_parallel_runs'})
+  ## PIPELINE
+  run_picard_qual_summary >> cleanup_picard_qual_summary_input
+  ## TASK
+  upload_picard_qual_summary_to_box = \
+    PythonOperator(
+      task_id='upload_picard_qual_summary_to_box',
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=upload_analysis_file_to_box,
+      params={'xcom_pull_task':'run_picard_qual_summary',
+              'xcom_pull_files_key':'picard_qual_summary',
+              'analysis_tag':'Picard-QualityScoreDistribution'})
+  ## PIPELINE
+  run_picard_qual_summary >> upload_picard_qual_summary_to_box
+  ## TASK
   run_picard_rna_summary = \
     PythonOperator(
       task_id='run_picard_rna_summary',
       dag=dag,
-      queue='hpc_4G',
+      queue='hpc_8G',
       python_callable=run_picard_for_cellranger,
       params={'xcom_pull_files_key':'run_picard_rna_summary',
               'xcom_pull_task':'copy_bam_for_parallel_runs',
@@ -625,7 +672,7 @@ with dag:
               'analysis_description_xcom_key':'analysis_description',
               'use_ephemeral_space':True,
               'load_metrics_to_cram':True,
-              'java_param':'-Xmx4g',
+              'java_param':'-Xmx7g',
               'picard_command':'CollectRnaSeqMetrics',
               'picard_option':{},
               'analysis_files_xcom_key':'picard_rna_summary',
@@ -633,6 +680,29 @@ with dag:
   ## PIPELINE
   copy_bam_for_parallel_runs >> run_picard_rna_summary
   ## TASK
+  cleanup_picard_rna_summary_input = \
+    PythonOperator(
+      task_id='cleanup_picard_rna_summary_input',
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=clean_up_files,
+      params={'xcom_pull_files_key':'run_picard_rna_summary',
+              'xcom_pull_task':'picard_rna_summary'})
+  ## PIPELINE
+  run_picard_rna_summary >> cleanup_picard_rna_summary_input
+  ## TASK
+  upload_picard_rna_summary_to_box = \
+    PythonOperator(
+      task_id='upload_picard_rna_summary_to_box',
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=upload_analysis_file_to_box,
+      params={'xcom_pull_task':'run_picard_rna_summary',
+              'xcom_pull_files_key':'picard_rna_summary',
+              'analysis_tag':'Picard-CollectRnaSeqMetrics'})
+  ## PIPELINE
+  run_picard_rna_summary >> upload_picard_rna_summary_to_box
+  ## TASK
   run_picard_gc_summary = \
     PythonOperator(
       task_id='run_picard_gc_summary',
@@ -653,6 +723,29 @@ with dag:
   ## PIPELINE
   copy_bam_for_parallel_runs >> run_picard_gc_summary
   ## TASK
+  cleanup_picard_gc_summary_input = \
+    PythonOperator(
+      task_id='cleanup_picard_gc_summary_input',
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=clean_up_files,
+      params={'xcom_pull_files_key':'run_picard_gc_summary',
+              'xcom_pull_task':'copy_bam_for_parallel_runs'})
+  ## PIPELINE
+  run_picard_gc_summary >> cleanup_picard_gc_summary_input
+  ## TASK
+  upload_picard_gc_summary_to_box = \
+    PythonOperator(
+      task_id='upload_picard_alignment_summary_to_box',
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=upload_analysis_file_to_box,
+      params={'xcom_pull_task':'run_picard_gc_summary',
+              'xcom_pull_files_key':'picard_gc_summary',
+              'analysis_tag':'Picard-CollectGcBiasMetrics'})
+  ## PIPELINE
+  run_picard_gc_summary >> upload_picard_gc_summary_to_box
+  ## TASK
   run_picard_base_dist_summary = \
     PythonOperator(
       task_id='run_picard_base_dist_summary',
@@ -673,6 +766,29 @@ with dag:
   ## PIPELINE
   copy_bam_for_parallel_runs >> run_picard_base_dist_summary
   ## TASK
+  cleanup_picard_base_dist_summary_input = \
+    PythonOperator(
+      task_id='cleanup_picard_base_dist_summary_input',
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=clean_up_files,
+      params={'xcom_pull_files_key':'run_picard_base_dist_summary',
+              'xcom_pull_task':'copy_bam_for_parallel_runs'})
+  ## PIPELINE
+  run_picard_base_dist_summary >> cleanup_picard_base_dist_summary_input
+  ## TASK
+  upload_picard_base_dist_summary_to_box = \
+    PythonOperator(
+      task_id='upload_picard_base_dist_summary_to_box',
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=upload_analysis_file_to_box,
+      params={'xcom_pull_task':'run_picard_base_dist_summary',
+              'xcom_pull_files_key':'picard_base_summary',
+              'analysis_tag':'Picard-CollectBaseDistributionByCycle'})
+  ## PIPELINE
+  run_picard_base_dist_summary >> upload_picard_base_dist_summary_to_box
+  ## TASK
   run_samtools_stats = \
     PythonOperator(
       task_id='run_samtools_stats',
@@ -691,6 +807,18 @@ with dag:
   ## PIPELINE
   copy_bam_for_parallel_runs >> run_samtools_stats
   ## TASK
+  upload_samtools_stats_to_box = \
+    PythonOperator(
+      task_id='upload_samtools_stats_to_box',
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=upload_analysis_file_to_box,
+      params={'xcom_pull_task':'run_samtools_stats',
+              'xcom_pull_files_key':'samtools_stats',
+              'analysis_tag':'Samtools-stats'})
+  ## PIPELINE
+  run_samtools_stats >> upload_samtools_stats_to_box
+  ## TASK
   run_samtools_idxstats = \
     PythonOperator(
       task_id='run_samtools_idxstats',
@@ -709,6 +837,29 @@ with dag:
   ## PIPELINE
   run_samtools_stats >> run_samtools_idxstats
   ## TASK
+  cleanup_samtools_stats_input = \
+    PythonOperator(
+      task_id='cleanup_samtools_stats_input',
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=clean_up_files,
+      params={'xcom_pull_files_key':'run_samtools_stats',
+              'xcom_pull_task':'copy_bam_for_parallel_runs'})
+  ## PIPELINE
+  run_samtools_idxstats >> cleanup_samtools_stats_input
+  ## TASK
+  upload_samtools_idxstats_to_box = \
+    PythonOperator(
+      task_id='upload_samtools_idxstats_to_box',
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=upload_analysis_file_to_box,
+      params={'xcom_pull_task':'run_samtools_idxstats',
+              'xcom_pull_files_key':'samtools_idxstats',
+              'analysis_tag':'Samtools-idxstats'})
+  ## PIPELINE
+  run_samtools_idxstats >> upload_samtools_idxstats_to_box
+  ## TASK
   run_multiqc = \
     PythonOperator(
       task_id='run_multiqc',