Browse Source

added function for loading cellranger tar

Avik Datta 4 years ago
parent
commit
222b5aa030
1 changed files with 68 additions and 36 deletions
  1. 68 36
      dags/dag9_tenx_single_cell_immune_profiling.py

+ 68 - 36
dags/dag9_tenx_single_cell_immune_profiling.py

@@ -6,11 +6,12 @@ from airflow.operators.python_operator import PythonOperator
 from airflow.operators.python_operator import BranchPythonOperator
 from airflow.operators.dummy_operator import DummyOperator
 from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
-from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import fetch_analysis_info_and_branch_func
-from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import configure_cellranger_run_func
-from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import run_sc_read_trimmming_func
-from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling import run_cellranger_tool
-
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import fetch_analysis_info_and_branch_func
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import configure_cellranger_run_func
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_sc_read_trimmming_func
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_cellranger_tool
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import decide_analysis_branch_func
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_cellranger_result_to_db_func
 ## ARGS
 default_args = {
     'owner': 'airflow',
@@ -118,45 +119,61 @@ with dag:
       task_id='decide_analysis_branch',
       dag=dag,
       python_callable=decide_analysis_branch_func,
-      params={'upload_report_to_ftp_task':'upload_report_to_ftp',
-              'upload_report_to_box_task':'upload_report_to_box',
-              'upload_results_to_irods_task':'upload_results_to_irods',
+      params={'load_cellranger_result_to_db_task':'load_cellranger_result_to_db',
               'run_scanpy_for_sc_5p_task':'run_scanpy_for_sc_5p',
               'run_scirpy_for_vdj_task':'run_scirpy_for_vdj',
               'run_scirpy_for_vdj_b_task':'run_scirpy_for_vdj_b',
               'run_scirpy_vdj_t_task':'run_scirpy_vdj_t',
               'run_seurat_for_sc_5p_task':'run_seurat_for_sc_5p',
               'run_picard_alignment_summary_task':'run_picard_alignment_summary',
+              'convert_bam_to_cram_task':'convert_bam_to_cram',
               'library_csv_xcom_key':'cellranger_library_csv',
               'library_csv_xcom_pull_task':'configure_cellranger_run'})
   ## PIPELINE
   run_cellranger >> decide_analysis_branch
   ## TASK
+  load_cellranger_result_to_db = \
+    PythonOperator(
+      task_id='load_cellranger_result_to_db',
+      dag=dag,
+      python_callable=load_cellranger_result_to_db_func,
+      params={'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
+              'analysis_description_xcom_key':'analysis_description',
+              'cellranger_xcom_key':'cellranger_output',
+              'cellranger_xcom_pull_task':'run_cellranger',
+              'collection_type':'CELLRANGER_MULTI',
+              'collection_table':'sample',
+              'genome_column':'genome_build',
+              'analysis_name':'cellranger_multi',
+              'output_xcom_key':'loaded_output_files'})
   upload_report_to_ftp = \
-    DummyOperator(
+    PythonOperator(
       task_id='upload_report_to_ftp',
-      dag=dag)
-  ## PIPELINE
-  decide_analysis_branch >> upload_report_to_ftp
-  ## TASK
+      dag=dag,
+      python_callable=None,
+      params={})
   upload_report_to_box = \
     DummyOperator(
       task_id='upload_report_to_box',
-      dag=dag)
-  ## PIPELINE
-  decide_analysis_branch >> upload_report_to_box
-  ## TASK
+      dag=dag,
+      params=None)
   upload_results_to_irods = \
     DummyOperator(
       task_id='upload_results_to_irods',
-      dag=dag)
+      dag=dag,
+      params=None)
   ## PIPELINE
-  decide_analysis_branch >> upload_results_to_irods
+  decide_analysis_branch >> load_cellranger_result_to_db
+  load_cellranger_result_to_db >> upload_report_to_ftp
+  load_cellranger_result_to_db >> upload_report_to_box
+  load_cellranger_result_to_db >> upload_results_to_irods
   ## TASK
   run_scanpy_for_sc_5p = \
     DummyOperator(
       task_id='run_scanpy_for_sc_5p',
-      dag=dag)
+      dag=dag,
+      params={'cellranger_xcom_key':'cellranger_output',
+              'cellranger_xcom_pull_task':'run_cellranger'})
   upload_scanpy_report_for_sc_5p_to_ftp = \
     DummyOperator(
       task_id='upload_scanpy_report_for_sc_5p_to_ftp',
@@ -183,7 +200,9 @@ with dag:
   run_scirpy_for_vdj = \
     DummyOperator(
       task_id='run_scirpy_for_vdj',
-      dag=dag)
+      dag=dag,
+      params={'cellranger_xcom_key':'cellranger_output',
+              'cellranger_xcom_pull_task':'run_cellranger'})
   upload_scanpy_report_for_vdj_to_ftp = \
     DummyOperator(
       task_id='upload_scanpy_report_for_vdj_to_ftp',
@@ -200,7 +219,9 @@ with dag:
   run_scirpy_for_vdj_b = \
     DummyOperator(
       task_id='run_scirpy_for_vdj_b',
-      dag=dag)
+      dag=dag,
+      params={'cellranger_xcom_key':'cellranger_output',
+              'cellranger_xcom_pull_task':'run_cellranger'})
   upload_scanpy_report_for_vdj_b_to_ftp = \
     DummyOperator(
       task_id='upload_scanpy_report_for_vdj_b_to_ftp',
@@ -209,25 +230,17 @@ with dag:
     DummyOperator(
       task_id='upload_scanpy_report_for_vdj_b_to_box',
       dag=dag)
-  run_cellbrowser_for_vdj_b = \
-    DummyOperator(
-      task_id='run_cellbrowser_for_vdj_b',
-      dag=dag)
-  upload_cellbrowser_for_vdj_b_to_ftp = \
-    DummyOperator(
-      task_id='upload_cellbrowser_for_vdj_b_to_ftp',
-      dag=dag)
   ## PIPELINE
   decide_analysis_branch >> run_scirpy_for_vdj_b
   run_scirpy_for_vdj_b >> upload_scanpy_report_for_vdj_b_to_ftp
   run_scirpy_for_vdj_b >> upload_scanpy_report_for_vdj_b_to_box
-  run_scirpy_for_vdj_b >> run_cellbrowser_for_vdj_b
-  run_cellbrowser_for_vdj_b >> upload_cellbrowser_for_vdj_b_to_ftp
   ## TASK
   run_scirpy_for_vdj_t = \
     DummyOperator(
       task_id='run_scirpy_for_vdj_t',
-      dag=dag)
+      dag=dag,
+      params={'cellranger_xcom_key':'cellranger_output',
+              'cellranger_xcom_pull_task':'run_cellranger'})
   upload_scanpy_report_for_vdj_t_to_ftp = \
     DummyOperator(
       task_id='upload_scanpy_report_for_vdj_t_to_ftp',
@@ -244,7 +257,9 @@ with dag:
   run_seurat_for_sc_5p = \
     DummyOperator(
       task_id='run_seurat_for_sc_5p',
-      dag=dag)
+      dag=dag,
+      params={'cellranger_xcom_key':'cellranger_output',
+              'cellranger_xcom_pull_task':'run_cellranger'})
   upload_seurat_report_for_sc_5p_ftp = \
     DummyOperator(
       task_id='upload_seurat_report_for_sc_5p_ftp',
@@ -258,10 +273,26 @@ with dag:
   run_seurat_for_sc_5p >> upload_seurat_report_for_sc_5p_ftp
   run_seurat_for_sc_5p >> upload_seurat_report_for_sc_5p_to_box
   ## TASK
+  convert_bam_to_cram = \
+    DummyOperator(
+      task_id='convert_bam_to_cram',
+      dag=dag,
+      params={'cellranger_xcom_key':'cellranger_output',
+              'cellranger_xcom_pull_task':'run_cellranger'})
+  upload_cram_to_irods = \
+    DummyOperator(
+      task_id='upload_cram_to_irods',
+      dag=dag)
+  ## PIPELINE
+  decide_analysis_branch >> convert_bam_to_cram
+  convert_bam_to_cram >> upload_cram_to_irods
+  ## TASK
   run_picard_alignment_summary = \
     DummyOperator(
       task_id='run_picard_alignment_summary',
-      dag=dag)
+      dag=dag,
+      params={'cellranger_xcom_key':'cellranger_output',
+              'cellranger_xcom_pull_task':'run_cellranger'})
   ## PIPELINE
   decide_analysis_branch >> run_picard_alignment_summary
   ## TASK
@@ -340,4 +371,5 @@ with dag:
   upload_seurat_report_for_sc_5p_to_box >> update_analysis_and_status
   upload_results_to_irods >> update_analysis_and_status
   upload_report_to_ftp >> update_analysis_and_status
-  upload_report_to_box >> update_analysis_and_status
+  upload_report_to_box >> update_analysis_and_status
+  upload_cram_to_irods >> update_analysis_and_status