Quellcode durchsuchen

updated dag9 utils for test run

Avik Datta vor 3 Jahren
Ursprung
Commit
963879ec74
1 geänderte Dateien mit 16 neuen und 4 gelöschten Zeilen
  1. 16 4
      dags/dag9_tenx_single_cell_immune_profiling.py

+ 16 - 4
dags/dag9_tenx_single_cell_immune_profiling.py

@@ -26,6 +26,8 @@ from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import chang
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import clean_up_files
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import create_and_update_qc_pages
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_cellranger_metrices_to_collection
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import generate_cell_sorted_bam_func
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_velocyto_func
 
 ## ARGS
 default_args = {
@@ -532,13 +534,23 @@ with dag:
   decide_analysis_branch >> convert_cellranger_bam_to_cram
   ## TASK
   generate_cell_sorted_bam = \
-    DummyOperator(
+    PythonOperator(
       task_id='generate_cell_sorted_bam',
-      dag=dag)
+      dag=dag,
+      queue='hpc_8G8t',
+      python_callable=generate_cell_sorted_bam_func,
+      params={'xcom_pull_task': 'cellranger_output',
+              'xcom_pull_files_key': 'run_cellranger',
+              'threads': 8})
   run_velocyto = \
-    DummyOperator(
+    PythonOperator(
       task_id='run_velocyto',
-      dag=dag)
+       queue='hpc_4G',
+      python_callable=run_velocyto_func,
+      params={'xcom_pull_task': 'cellranger_output',
+              'xcom_pull_files_key': 'run_cellranger',
+              'analysis_description_xcom_pull_task': 'fetch_analysis_info',
+              'analysis_description_xcom_key': 'analysis_description' })
   run_scvelo_for_sc_5p = \
     DummyOperator(
       task_id='run_scvelo_for_sc_5p',