Ver código fonte

added branch function

Avik Datta 4 anos atrás
pai
commit
abd240224c
1 arquivos alterados com 19 adições e 46 exclusões
  1. 19 46
      dags/dag9_tenx_single_cell_immune_profiling.py

+ 19 - 46
dags/dag9_tenx_single_cell_immune_profiling.py

@@ -107,7 +107,8 @@ with dag:
               'analysis_description_xcom_key':'analysis_description',
               'library_csv_xcom_key':'cellranger_library_csv',
               'library_csv_xcom_pull_task':'configure_cellranger_run',
-              'cellranger_xcom_key':'cellranger_output'},
+              'cellranger_xcom_key':'cellranger_output',
+              'cellranger_options':['--localcores 8','--localmem 64']},
       python_callable=run_cellranger_tool)
   ## PIPELINE
   configure_cellranger_run >> run_cellranger
@@ -116,11 +117,18 @@ with dag:
     BranchPythonOperator(
       task_id='decide_analysis_branch',
       dag=dag,
-      python_callable=lambda: ['upload_report_to_ftp',
-                               'upload_report_to_box',
-                               'upload_results_to_irods',
-                               'notebook_report_branch',
-                               'run_picard_alignment_summary'])
+      python_callable=decide_analysis_branch_func,
+      params={'upload_report_to_ftp_task':'upload_report_to_ftp',
+              'upload_report_to_box_task':'upload_report_to_box',
+              'upload_results_to_irods_task':'upload_results_to_irods',
+              'run_scanpy_for_sc_5p_task':'run_scanpy_for_sc_5p',
+              'run_scirpy_for_vdj_task':'run_scirpy_for_vdj',
+              'run_scirpy_for_vdj_b_task':'run_scirpy_for_vdj_b',
+              'run_scirpy_vdj_t_task':'run_scirpy_vdj_t',
+              'run_seurat_for_sc_5p_task':'run_seurat_for_sc_5p',
+              'run_picard_alignment_summary_task':'run_picard_alignment_summary',
+              'library_csv_xcom_key':'cellranger_library_csv',
+              'library_csv_xcom_pull_task':'configure_cellranger_run'})
   ## PIPELINE
   run_cellranger >> decide_analysis_branch
   ## TASK
@@ -145,18 +153,6 @@ with dag:
   ## PIPELINE
   decide_analysis_branch >> upload_results_to_irods
   ## TASK
-  notebook_report_branch = \
-    BranchPythonOperator(
-      task_id='notebook_report_branch',
-      dag=dag,
-      python_callable=lambda: ['run_scanpy_for_sc_5p',
-                               'run_scirpy_for_vdj',
-                               'run_scirpy_for_vdj_b',
-                               'run_scirpy_vdj_t',
-                               'run_seurat_for_sc_5p'])
-  ## PIPELINE
-  decide_analysis_branch >> notebook_report_branch
-  ## TASK
   run_scanpy_for_sc_5p = \
     DummyOperator(
       task_id='run_scanpy_for_sc_5p',
@@ -178,7 +174,7 @@ with dag:
       task_id='upload_cellbrowser_for_sc_5p_to_ftp',
       dag=dag)
   ## PIPELINE
-  notebook_report_branch >> run_scanpy_for_sc_5p
+  decide_analysis_branch >> run_scanpy_for_sc_5p
   run_scanpy_for_sc_5p >> upload_scanpy_report_for_sc_5p_to_ftp
   run_scanpy_for_sc_5p >> upload_scanpy_report_for_sc_5p_to_box
   run_scanpy_for_sc_5p >> run_cellbrowser_for_sc_5p
@@ -196,20 +192,10 @@ with dag:
     DummyOperator(
       task_id='upload_scanpy_report_for_vdj_to_box',
       dag=dag)
-  run_cellbrowser_for_vdj = \
-    DummyOperator(
-      task_id='run_cellbrowser_for_vdj',
-      dag=dag)
-  upload_cellbrowser_for_vdj_to_ftp = \
-    DummyOperator(
-      task_id='upload_cellbrowser_for_vdj_to_ftp',
-      dag=dag)
   ## PIPELINE
-  notebook_report_branch >> run_scirpy_for_vdj
+  decide_analysis_branch >> run_scirpy_for_vdj
   run_scirpy_for_vdj >> upload_scanpy_report_for_vdj_to_ftp
   run_scirpy_for_vdj >> upload_scanpy_report_for_vdj_to_box
-  run_scirpy_for_vdj >> run_cellbrowser_for_vdj
-  run_cellbrowser_for_vdj >> upload_cellbrowser_for_vdj_to_ftp
   ## TASK
   run_scirpy_for_vdj_b = \
     DummyOperator(
@@ -232,7 +218,7 @@ with dag:
       task_id='upload_cellbrowser_for_vdj_b_to_ftp',
       dag=dag)
   ## PIPELINE
-  notebook_report_branch >> run_scirpy_for_vdj_b
+  decide_analysis_branch >> run_scirpy_for_vdj_b
   run_scirpy_for_vdj_b >> upload_scanpy_report_for_vdj_b_to_ftp
   run_scirpy_for_vdj_b >> upload_scanpy_report_for_vdj_b_to_box
   run_scirpy_for_vdj_b >> run_cellbrowser_for_vdj_b
@@ -250,20 +236,10 @@ with dag:
     DummyOperator(
       task_id='upload_scanpy_report_for_vdj_t_to_box',
       dag=dag)
-  run_cellbrowser_for_vdj_t = \
-    DummyOperator(
-      task_id='run_cellbrowser_for_vdj_t',
-      dag=dag)
-  upload_cellbrowser_for_vdj_t_to_ftp = \
-    DummyOperator(
-      task_id='upload_cellbrowser_for_vdj_t_to_ftp',
-      dag=dag)
   ## PIPELINE
-  notebook_report_branch >> run_scirpy_for_vdj_t
+  decide_analysis_branch >> run_scirpy_for_vdj_t
   run_scirpy_for_vdj_t >> upload_scanpy_report_for_vdj_t_to_ftp
   run_scirpy_for_vdj_t >> upload_scanpy_report_for_vdj_t_to_box
-  run_scirpy_for_vdj_t >> run_cellbrowser_for_vdj_t
-  run_cellbrowser_for_vdj_t >> upload_cellbrowser_for_vdj_t_to_ftp
   ## TASK
   run_seurat_for_sc_5p = \
     DummyOperator(
@@ -278,7 +254,7 @@ with dag:
       task_id='upload_seurat_report_for_sc_5p_to_box',
       dag=dag)
   ## PIPELINE
-  notebook_report_branch >> run_seurat_for_sc_5p
+  decide_analysis_branch >> run_seurat_for_sc_5p
   run_seurat_for_sc_5p >> upload_seurat_report_for_sc_5p_ftp
   run_seurat_for_sc_5p >> upload_seurat_report_for_sc_5p_to_box
   ## TASK
@@ -356,13 +332,10 @@ with dag:
   upload_cellbrowser_for_sc_5p_to_ftp >> update_analysis_and_status
   upload_scanpy_report_for_vdj_to_ftp >> update_analysis_and_status
   upload_scanpy_report_for_vdj_to_box >> update_analysis_and_status
-  upload_cellbrowser_for_vdj_to_ftp >> update_analysis_and_status
   upload_scanpy_report_for_vdj_b_to_ftp >> update_analysis_and_status
   upload_scanpy_report_for_vdj_b_to_box >> update_analysis_and_status
-  upload_cellbrowser_for_vdj_b_to_ftp >> update_analysis_and_status
   upload_scanpy_report_for_vdj_t_to_ftp >> update_analysis_and_status
   upload_scanpy_report_for_vdj_t_to_box >> update_analysis_and_status
-  upload_cellbrowser_for_vdj_t_to_ftp >> update_analysis_and_status
   upload_seurat_report_for_sc_5p_ftp >> update_analysis_and_status
   upload_seurat_report_for_sc_5p_to_box >> update_analysis_and_status
   upload_results_to_irods >> update_analysis_and_status