Browse Source

added cellranger run

Avik Datta 4 years ago
parent
commit
b85dc4883a
1 changed files with 47 additions and 5 deletions
  1. 47 5
      dags/dag9_tenx_single_cell_immune_profiling.py

+ 47 - 5
dags/dag9_tenx_single_cell_immune_profiling.py

@@ -7,6 +7,9 @@ from airflow.operators.python_operator import BranchPythonOperator
 from airflow.operators.dummy_operator import DummyOperator
 from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import fetch_analysis_info_and_branch_func
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import configure_cellranger_run_func
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import run_sc_read_trimmming_func
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import run_cellranger_tool
 
 ## ARGS
 default_args = {
@@ -65,9 +68,18 @@ with dag:
     for run_id in range(0,32):
       ## TASK
       t = \
-        DummyOperator(
+        PythonOperator(
           task_id='run_trim_{0}_{1}'.format(analysis_name,run_id),
-          dag=dag)
+          dag=dag,
+          params={'xcom_pull_task_id':'fetch_analysis_info_and_branch',
+                  'analysis_info_xcom_key':'analysis_info',
+                  'analysis_name':analysis_name,
+                  'run_id':run_id,
+                  'r1_length':26,
+                  'r2_length':0,
+                  'fastq_input_dir_tag':'fastq_dir',
+                  'fastq_output_dir_tag':'output_path'},
+          python_callable=run_sc_read_trimmming_func)
       run_trim_list.append(t)
     ## TASK
     collect_trimmed_files = \
@@ -88,9 +100,15 @@ with dag:
   fetch_analysis_info_and_branch >> no_analysis
   ## TASK
   run_cellranger = \
-    DummyOperator(
+    PythonOperator(
       task_id='run_cellranger',
-      dag=dag)
+      dag=dag,
+      params={'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
+              'analysis_description_xcom_key':'analysis_description',
+              'library_csv_xcom_key':'cellranger_library_csv',
+              'library_csv_xcom_pull_task':'configure_cellranger_run',
+              'cellranger_xcom_key':'cellranger_output'},
+      python_callable=run_cellranger_tool)
   ## PIPELINE
   configure_cellranger_run >> run_cellranger
   ## TASK
@@ -99,6 +117,7 @@ with dag:
       task_id='decide_analysis_branch',
       dag=dag,
       python_callable=lambda: ['upload_report_to_ftp',
+                               'upload_report_to_box',
                                'upload_results_to_irods',
                                'run_scanpy_report',
                                'run_picard_alignment_summary'])
@@ -112,6 +131,13 @@ with dag:
   ## PIPELINE
   decide_analysis_branch >> upload_report_to_ftp
   ## TASK
+  upload_report_to_box = \
+    DummyOperator(
+      task_id='upload_report_to_box',
+      dag=dag)
+  ## PIPELINE
+  decide_analysis_branch >> upload_report_to_box
+  ## TASK
   upload_results_to_irods = \
     DummyOperator(
       task_id='upload_results_to_irods',
@@ -133,6 +159,13 @@ with dag:
   ## PIPELINE
   run_scanpy_report >> upload_scanpy_report_to_ftp
   ## TASK
+  upload_scanpy_report_to_box = \
+    DummyOperator(
+      task_id='upload_scanpy_report_to_box',
+      dag=dag)
+  ## PIPELINE
+  run_scanpy_report >> upload_scanpy_report_to_box
+  ## TASK
   run_picard_alignment_summary = \
     DummyOperator(
       task_id='run_picard_alignment_summary',
@@ -189,6 +222,13 @@ with dag:
   ## PIPELINE
   run_multiqc >> upload_multiqc_to_ftp
   ## TASK
+  upload_multiqc_to_box = \
+    DummyOperator(
+      task_id='upload_multiqc_to_box',
+      dag=dag)
+  ## PIPELINE
+  run_multiqc >> upload_multiqc_to_box
+  ## TASK
   update_analysis_and_status = \
     DummyOperator(
       task_id='update_analysis_and_status',
@@ -196,5 +236,7 @@ with dag:
   ## PIPELINE
   upload_multiqc_to_ftp >> update_analysis_and_status
   upload_scanpy_report_to_ftp >> update_analysis_and_status
+  upload_scanpy_report_to_box >> update_analysis_and_status
   upload_results_to_irods >> update_analysis_and_status
-  upload_report_to_ftp >> update_analysis_and_status
+  upload_report_to_ftp >> update_analysis_and_status
+  upload_report_to_box >> update_analysis_and_status