Jelajahi Sumber

updated flow

Avik Datta 4 tahun lalu
induk
melakukan
b67ded8a88
1 mengubah file dengan 140 tambahan dan 12 penghapusan
  1. 140 12
      dags/dag9_tenx_single_cell_immune_profiling.py

+ 140 - 12
dags/dag9_tenx_single_cell_immune_profiling.py

@@ -119,7 +119,7 @@ with dag:
       python_callable=lambda: ['upload_report_to_ftp',
                                'upload_report_to_box',
                                'upload_results_to_irods',
-                               'run_scanpy_report',
+                               'notebook_report_branch',
                                'run_picard_alignment_summary'])
   ## PIPELINE
   run_cellranger >> decide_analysis_branch
@@ -145,26 +145,142 @@ with dag:
   ## PIPELINE
   decide_analysis_branch >> upload_results_to_irods
   ## TASK
-  run_scanpy_report = \
+  notebook_report_branch = \
+    BranchPythonOperator(
+      task_id='notebook_report_branch',
+      dag=dag,
+      python_callable=lambda: ['run_scanpy_for_sc_5p',
+                               'run_scirpy_for_vdj',
+                               'run_scirpy_for_vdj_b',
+                               'run_scirpy_vdj_t',
+                               'run_seurat_for_sc_5p'])
+  ## PIPELINE
+  decide_analysis_branch >> notebook_report_branch
+  ## TASK
+  run_scanpy_for_sc_5p = \
+    DummyOperator(
+      task_id='run_scanpy_for_sc_5p',
+      dag=dag)
+  upload_scanpy_report_for_sc_5p_to_ftp = \
+    DummyOperator(
+      task_id='upload_scanpy_report_for_sc_5p_to_ftp',
+      dag=dag)
+  upload_scanpy_report_for_sc_5p_to_box = \
+    DummyOperator(
+      task_id='upload_scanpy_report_for_sc_5p_to_box',
+      dag=dag)
+  run_cellbrowser_for_sc_5p = \
+    DummyOperator(
+      task_id='run_cellbrowser_for_sc_5p',
+      dag=dag)
+  upload_cellbrowser_for_sc_5p_to_ftp = \
+    DummyOperator(
+      task_id='upload_cellbrowser_for_sc_5p_to_ftp',
+      dag=dag)
+  ## PIPELINE
+  notebook_report_branch >> run_scanpy_for_sc_5p
+  run_scanpy_for_sc_5p >> upload_scanpy_report_for_sc_5p_to_ftp
+  run_scanpy_for_sc_5p >> upload_scanpy_report_for_sc_5p_to_box
+  run_scanpy_for_sc_5p >> run_cellbrowser_for_sc_5p
+  run_cellbrowser_for_sc_5p >> upload_cellbrowser_for_sc_5p_to_ftp
+  ## TASK
+  run_scirpy_for_vdj = \
+    DummyOperator(
+      task_id='run_scirpy_for_vdj',
+      dag=dag)
+  upload_scanpy_report_for_vdj_to_ftp = \
+    DummyOperator(
+      task_id='upload_scanpy_report_for_vdj_to_ftp',
+      dag=dag)
+  upload_scanpy_report_for_vdj_to_box = \
+    DummyOperator(
+      task_id='upload_scanpy_report_for_vdj_to_box',
+      dag=dag)
+  run_cellbrowser_for_vdj = \
+    DummyOperator(
+      task_id='run_cellbrowser_for_vdj',
+      dag=dag)
+  upload_cellbrowser_for_vdj_to_ftp = \
     DummyOperator(
-      task_id='run_scanpy_report',
+      task_id='upload_cellbrowser_for_vdj_to_ftp',
       dag=dag)
   ## PIPELINE
-  decide_analysis_branch >> run_scanpy_report
+  notebook_report_branch >> run_scirpy_for_vdj
+  run_scirpy_for_vdj >> upload_scanpy_report_for_vdj_to_ftp
+  run_scirpy_for_vdj >> upload_scanpy_report_for_vdj_to_box
+  run_scirpy_for_vdj >> run_cellbrowser_for_vdj
+  run_cellbrowser_for_vdj >> upload_cellbrowser_for_vdj_to_ftp
   ## TASK
-  upload_scanpy_report_to_ftp = \
+  run_scirpy_for_vdj_b = \
     DummyOperator(
-      task_id='upload_scanpy_report_to_ftp',
+      task_id='run_scirpy_for_vdj_b',
+      dag=dag)
+  upload_scanpy_report_for_vdj_b_to_ftp = \
+    DummyOperator(
+      task_id='upload_scanpy_report_for_vdj_b_to_ftp',
+      dag=dag)
+  upload_scanpy_report_for_vdj_b_to_box = \
+    DummyOperator(
+      task_id='upload_scanpy_report_for_vdj_b_to_box',
+      dag=dag)
+  run_cellbrowser_for_vdj_b = \
+    DummyOperator(
+      task_id='run_cellbrowser_for_vdj_b',
+      dag=dag)
+  upload_cellbrowser_for_vdj_b_to_ftp = \
+    DummyOperator(
+      task_id='upload_cellbrowser_for_vdj_b_to_ftp',
+      dag=dag)
+  ## PIPELINE
+  notebook_report_branch >> run_scirpy_for_vdj_b
+  run_scirpy_for_vdj_b >> upload_scanpy_report_for_vdj_b_to_ftp
+  run_scirpy_for_vdj_b >> upload_scanpy_report_for_vdj_b_to_box
+  run_scirpy_for_vdj_b >> run_cellbrowser_for_vdj_b
+  run_cellbrowser_for_vdj_b >> upload_cellbrowser_for_vdj_b_to_ftp
+  ## TASK
+  run_scirpy_for_vdj_t = \
+    DummyOperator(
+      task_id='run_scirpy_for_vdj_t',
+      dag=dag)
+  upload_scanpy_report_for_vdj_t_to_ftp = \
+    DummyOperator(
+      task_id='upload_scanpy_report_for_vdj_t_to_ftp',
+      dag=dag)
+  upload_scanpy_report_for_vdj_t_to_box = \
+    DummyOperator(
+      task_id='upload_scanpy_report_for_vdj_t_to_box',
+      dag=dag)
+  run_cellbrowser_for_vdj_t = \
+    DummyOperator(
+      task_id='run_cellbrowser_for_vdj_t',
+      dag=dag)
+  upload_cellbrowser_for_vdj_t_to_ftp = \
+    DummyOperator(
+      task_id='upload_cellbrowser_for_vdj_t_to_ftp',
       dag=dag)
   ## PIPELINE
-  run_scanpy_report >> upload_scanpy_report_to_ftp
+  notebook_report_branch >> run_scirpy_for_vdj_t
+  run_scirpy_for_vdj_t >> upload_scanpy_report_for_vdj_t_to_ftp
+  run_scirpy_for_vdj_t >> upload_scanpy_report_for_vdj_t_to_box
+  run_scirpy_for_vdj_t >> run_cellbrowser_for_vdj_t
+  run_cellbrowser_for_vdj_t >> upload_cellbrowser_for_vdj_t_to_ftp
   ## TASK
-  upload_scanpy_report_to_box = \
+  run_seurat_for_sc_5p = \
+    DummyOperator(
+      task_id='run_seurat_for_sc_5p',
+      dag=dag)
+  upload_seurat_report_for_sc_5p_ftp = \
+    DummyOperator(
+      task_id='upload_seurat_report_for_sc_5p_ftp',
+      dag=dag)
+  upload_seurat_report_for_sc_5p_to_box = \
     DummyOperator(
-      task_id='upload_scanpy_report_to_box',
+      task_id='upload_seurat_report_for_sc_5p_to_box',
       dag=dag)
   ## PIPELINE
-  run_scanpy_report >> upload_scanpy_report_to_box
+  notebook_report_branch >> run_seurat_for_sc_5p
+  run_seurat_for_sc_5p >> upload_seurat_report_for_sc_5p_ftp
+  run_seurat_for_sc_5p >> upload_seurat_report_for_sc_5p_to_box
   ## TASK
   run_picard_alignment_summary = \
     DummyOperator(
@@ -235,8 +351,20 @@ with dag:
       dag=dag)
   ## PIPELINE
   upload_multiqc_to_ftp >> update_analysis_and_status
-  upload_scanpy_report_to_ftp >> update_analysis_and_status
-  upload_scanpy_report_to_box >> update_analysis_and_status
+  upload_scanpy_report_for_sc_5p_to_ftp >> update_analysis_and_status
+  upload_scanpy_report_for_sc_5p_to_box >> update_analysis_and_status
+  upload_cellbrowser_for_sc_5p_to_ftp >> update_analysis_and_status
+  upload_scanpy_report_for_vdj_to_ftp >> update_analysis_and_status
+  upload_scanpy_report_for_vdj_to_box >> update_analysis_and_status
+  upload_cellbrowser_for_vdj_to_ftp >> update_analysis_and_status
+  upload_scanpy_report_for_vdj_b_to_ftp >> update_analysis_and_status
+  upload_scanpy_report_for_vdj_b_to_box >> update_analysis_and_status
+  upload_cellbrowser_for_vdj_b_to_ftp >> update_analysis_and_status
+  upload_scanpy_report_for_vdj_t_to_ftp >> update_analysis_and_status
+  upload_scanpy_report_for_vdj_t_to_box >> update_analysis_and_status
+  upload_cellbrowser_for_vdj_t_to_ftp >> update_analysis_and_status
+  upload_seurat_report_for_sc_5p_ftp >> update_analysis_and_status
+  upload_seurat_report_for_sc_5p_to_box >> update_analysis_and_status
   upload_results_to_irods >> update_analysis_and_status
   upload_report_to_ftp >> update_analysis_and_status
   upload_report_to_box >> update_analysis_and_status