Ver código fonte

added functions for scanpy and scirpy notebook runner

Avik Datta 4 anos atrás
pai
commit
d4bf855f9c
2 arquivos alterados com 44 adições e 13 exclusões
  1. 7 0
      airflow_var/var.json
  2. 37 13
      dags/dag9_tenx_single_cell_immune_profiling.py

+ 7 - 0
airflow_var/var.json

@@ -33,7 +33,14 @@
     "antibody_capture",
     "antigen_capture",
     "crisper_guide_capture"],
+  "scanpy_single_sample_template":"",
+  "scanpy_notebook_image":"",
   "hpc_queue_list":{
+  "irods_exe_dir":"",
+  "ftp_hostname":"",
+  "ftp_username":"",
+  "ftp_project_path":"",
+  "cell_marker_list":"",
     "hpc_1G": {
       "pbs_resource":"-lselect=1:ncpus=1:mem=1gb -lwalltime=02:00:00",
       "airflow_queue":"hpc_1G"},

+ 37 - 13
dags/dag9_tenx_single_cell_immune_profiling.py

@@ -14,7 +14,8 @@ from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import decid
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_cellranger_result_to_db_func
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import ftp_files_upload_for_analysis
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import irods_files_upload_for_analysis
-
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_scanpy_for_sc_5p_func
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_scirpy_for_vdj_func
 ## ARGS
 default_args = {
     'owner': 'airflow',
@@ -185,11 +186,18 @@ with dag:
   load_cellranger_result_to_db >> upload_results_to_irods
   ## TASK
   run_scanpy_for_sc_5p = \
-    DummyOperator(
+    PythonOperator(
       task_id='run_scanpy_for_sc_5p',
       dag=dag,
+      python_callable=run_scanpy_for_sc_5p_func,
       params={'cellranger_xcom_key':'cellranger_output',
-              'cellranger_xcom_pull_task':'run_cellranger'})
+              'cellranger_xcom_pull_task':'run_cellranger',
+              'scanpy_timeout':1200,
+              'allow_errors':False,
+              'output_notebook_key':'scanpy_notebook',
+              'output_cellbrowser_key':'cellbrowser_dirs',
+              'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
+              'analysis_description_xcom_key':'analysis_description'})
   upload_scanpy_report_for_sc_5p_to_ftp = \
     DummyOperator(
       task_id='upload_scanpy_report_for_sc_5p_to_ftp',
@@ -198,10 +206,6 @@ with dag:
     DummyOperator(
       task_id='upload_scanpy_report_for_sc_5p_to_box',
       dag=dag)
-  run_cellbrowser_for_sc_5p = \
-    DummyOperator(
-      task_id='run_cellbrowser_for_sc_5p',
-      dag=dag)
   upload_cellbrowser_for_sc_5p_to_ftp = \
     DummyOperator(
       task_id='upload_cellbrowser_for_sc_5p_to_ftp',
@@ -210,15 +214,21 @@ with dag:
   decide_analysis_branch >> run_scanpy_for_sc_5p
   run_scanpy_for_sc_5p >> upload_scanpy_report_for_sc_5p_to_ftp
   run_scanpy_for_sc_5p >> upload_scanpy_report_for_sc_5p_to_box
-  run_scanpy_for_sc_5p >> run_cellbrowser_for_sc_5p
-  run_cellbrowser_for_sc_5p >> upload_cellbrowser_for_sc_5p_to_ftp
+  run_scanpy_for_sc_5p >> upload_cellbrowser_for_sc_5p_to_ftp
   ## TASK
   run_scirpy_for_vdj = \
-    DummyOperator(
+    PythonOperator(
       task_id='run_scirpy_for_vdj',
       dag=dag,
+      python_callable=run_scirpy_for_vdj_func,
       params={'cellranger_xcom_key':'cellranger_output',
-              'cellranger_xcom_pull_task':'run_cellranger'})
+              'cellranger_xcom_pull_task':'run_cellranger',
+              'scanpy_timeout':1200,
+              'allow_errors':False,
+              'vdj_dir':'vdj',
+              'output_notebook_key':'scirpy_notebook',
+              'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
+              'analysis_description_xcom_key':'analysis_description'})
   upload_scanpy_report_for_vdj_to_ftp = \
     DummyOperator(
       task_id='upload_scanpy_report_for_vdj_to_ftp',
@@ -236,8 +246,15 @@ with dag:
     DummyOperator(
       task_id='run_scirpy_for_vdj_b',
       dag=dag,
+      python_callable=run_scirpy_for_vdj_func,
       params={'cellranger_xcom_key':'cellranger_output',
-              'cellranger_xcom_pull_task':'run_cellranger'})
+              'cellranger_xcom_pull_task':'run_cellranger',
+              'scanpy_timeout':1200,
+              'allow_errors':False,
+              'vdj_dir':'vdj_b',
+              'output_notebook_key':'scirpy_notebook',
+              'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
+              'analysis_description_xcom_key':'analysis_description'})
   upload_scanpy_report_for_vdj_b_to_ftp = \
     DummyOperator(
       task_id='upload_scanpy_report_for_vdj_b_to_ftp',
@@ -255,8 +272,15 @@ with dag:
     DummyOperator(
       task_id='run_scirpy_for_vdj_t',
       dag=dag,
+      python_callable=run_scirpy_for_vdj_func,
       params={'cellranger_xcom_key':'cellranger_output',
-              'cellranger_xcom_pull_task':'run_cellranger'})
+              'cellranger_xcom_pull_task':'run_cellranger',
+              'timeout':1200,
+              'allow_errors':False,
+              'vdj_dir':'vdj_t',
+              'output_notebook_key':'scirpy_notebook',
+              'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
+              'analysis_description_xcom_key':'analysis_description'})
   upload_scanpy_report_for_vdj_t_to_ftp = \
     DummyOperator(
       task_id='upload_scanpy_report_for_vdj_t_to_ftp',