Browse Source

branching to selected tasks

Avik Datta 4 years ago
parent
commit
f8e019ff53
1 changed files with 9 additions and 3 deletions
  1. 9 3
      dags/dag9_tenx_single_cell_immune_profiling.py

+ 9 - 3
dags/dag9_tenx_single_cell_immune_profiling.py

@@ -16,7 +16,7 @@ from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import irods
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_scanpy_for_sc_5p_func
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_singlecell_notebook_wrapper_func
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_analysis_files_func
-
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import task_branch_function
 ## ARGS
 default_args = {
     'owner': 'airflow',
@@ -67,9 +67,15 @@ with dag:
   for analysis_name in FEATURE_TYPE_LIST:
     ## TASK
     task_branch = \
-      DummyOperator(
+      BranchPythonOperator(
         task_id=analysis_name,
-        dag=dag)
+        dag=dag,
+        queue='hpc_4G',
+        params={'xcom_pull_task_id':'fetch_analysis_info',
+                'analysis_info_xcom_key':'analysis_info',
+                'analysis_name':analysis_name,
+                'task_prefix':'run_trim'},
+        python_callable=task_branch_function)
     run_trim_list = list()
     for run_id in range(0,10):
       ## TASK