Selaa lähdekoodia

added function for analysis file load to db

Avik Datta 4 vuotta sitten
vanhempi
commit
054fadc6a9
2 muutettua tiedostoa jossa 114 lisäystä ja 21 poistoa
  1. 5 1
      airflow_var/var.json
  2. 109 20
      dags/dag9_tenx_single_cell_immune_profiling.py

+ 5 - 1
airflow_var/var.json

@@ -35,12 +35,16 @@
     "crisper_guide_capture"],
   "scanpy_single_sample_template":"",
   "scanpy_notebook_image":"",
+  "scirpy_single_sample_template":"",
+  "scirpy_notebook_image":"",
+  "seurat_single_sample_template":"",
+  "seurat_notebook_image":"",
   "hpc_queue_list":{
   "irods_exe_dir":"",
   "ftp_hostname":"",
   "ftp_username":"",
   "ftp_project_path":"",
-  "cell_marker_list":"",
+  "all_cell_marker_list":"",
     "hpc_1G": {
       "pbs_resource":"-lselect=1:ncpus=1:mem=1gb -lwalltime=02:00:00",
       "airflow_queue":"hpc_1G"},

+ 109 - 20
dags/dag9_tenx_single_cell_immune_profiling.py

@@ -15,7 +15,8 @@ from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import ftp_files_upload_for_analysis
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import irods_files_upload_for_analysis
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_scanpy_for_sc_5p_func
-from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_scirpy_for_vdj_func
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_singlecell_notebook_wrapper_func
+from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_analysis_files_func
 ## ARGS
 default_args = {
     'owner': 'airflow',
@@ -198,6 +199,19 @@ with dag:
               'output_cellbrowser_key':'cellbrowser_dirs',
               'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
               'analysis_description_xcom_key':'analysis_description'})
+  load_scanpy_report_for_sc_5p_to_db = \
+    PythonOperator(
+      task_id='load_scanpy_report_for_sc_5p_to_db',
+      dag=dag,
+      python_callable=load_analysis_files_func,
+      params={'collection_name_task':'load_cellranger_result_to_db',
+              'collection_name_key':'sample_igf_id',
+              'file_name_task':'run_scanpy_for_sc_5p',
+              'file_name_key':'scanpy_notebook',
+              'analysis_name':'scanpy_5p',
+              'collection_type':'SCANPY_HTML',
+              'collection_table':'sample',
+              'output_files_key':'output_db_files'})
   upload_scanpy_report_for_sc_5p_to_ftp = \
     DummyOperator(
       task_id='upload_scanpy_report_for_sc_5p_to_ftp',
@@ -212,23 +226,42 @@ with dag:
       dag=dag)
   ## PIPELINE
   decide_analysis_branch >> run_scanpy_for_sc_5p
-  run_scanpy_for_sc_5p >> upload_scanpy_report_for_sc_5p_to_ftp
-  run_scanpy_for_sc_5p >> upload_scanpy_report_for_sc_5p_to_box
+  run_scanpy_for_sc_5p >> load_scanpy_report_for_sc_5p_to_db
+  load_scanpy_report_for_sc_5p_to_db >> upload_scanpy_report_for_sc_5p_to_ftp
+  load_scanpy_report_for_sc_5p_to_db >> upload_scanpy_report_for_sc_5p_to_box
   run_scanpy_for_sc_5p >> upload_cellbrowser_for_sc_5p_to_ftp
   ## TASK
   run_scirpy_for_vdj = \
     PythonOperator(
       task_id='run_scirpy_for_vdj',
       dag=dag,
-      python_callable=run_scirpy_for_vdj_func,
+      python_callable=run_singlecell_notebook_wrapper_func,
       params={'cellranger_xcom_key':'cellranger_output',
               'cellranger_xcom_pull_task':'run_cellranger',
               'scanpy_timeout':1200,
               'allow_errors':False,
+              'kernel_name':'python3',
               'vdj_dir':'vdj',
+              'count_dir':'count',
+              'cell_marker_list':Variable.get('all_cell_marker_list'),
               'output_notebook_key':'scirpy_notebook',
               'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
-              'analysis_description_xcom_key':'analysis_description'})
+              'analysis_description_xcom_key':'analysis_description',
+              'template_ipynb_path':Variable.get('scirpy_single_sample_template'),
+              'singularity_image_path':Variable.get('scirpy_notebook_image')})
+  load_scanpy_report_for_vdj_to_db = \
+    PythonOperator(
+      task_id='load_scanpy_report_for_vdj_to_db',
+      dag=dag,
+      python_callable=load_analysis_files_func,
+      params={'collection_name_task':'load_cellranger_result_to_db',
+              'collection_name_key':'sample_igf_id',
+              'file_name_task':'run_scirpy_for_vdj',
+              'file_name_key':'scirpy_notebook',
+              'analysis_name':'scirpy_vdj',
+              'collection_type':'SCIRPY_VDJ_HTML',
+              'collection_table':'sample',
+              'output_files_key':'output_db_files'})
   upload_scanpy_report_for_vdj_to_ftp = \
     DummyOperator(
       task_id='upload_scanpy_report_for_vdj_to_ftp',
@@ -239,22 +272,41 @@ with dag:
       dag=dag)
   ## PIPELINE
   decide_analysis_branch >> run_scirpy_for_vdj
-  run_scirpy_for_vdj >> upload_scanpy_report_for_vdj_to_ftp
-  run_scirpy_for_vdj >> upload_scanpy_report_for_vdj_to_box
+  run_scirpy_for_vdj >> load_scanpy_report_for_vdj_to_db
+  load_scanpy_report_for_vdj_to_db >> upload_scanpy_report_for_vdj_to_ftp
+  load_scanpy_report_for_vdj_to_db >> upload_scanpy_report_for_vdj_to_box
   ## TASK
   run_scirpy_for_vdj_b = \
     DummyOperator(
       task_id='run_scirpy_for_vdj_b',
       dag=dag,
-      python_callable=run_scirpy_for_vdj_func,
+      python_callable=run_singlecell_notebook_wrapper_func,
       params={'cellranger_xcom_key':'cellranger_output',
               'cellranger_xcom_pull_task':'run_cellranger',
               'scanpy_timeout':1200,
               'allow_errors':False,
+              'kernel_name':'python3',
               'vdj_dir':'vdj_b',
+              'count_dir':'count',
+              'cell_marker_list':Variable.get('all_cell_marker_list'),
               'output_notebook_key':'scirpy_notebook',
               'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
-              'analysis_description_xcom_key':'analysis_description'})
+              'analysis_description_xcom_key':'analysis_description',
+              'template_ipynb_path':Variable.get('scirpy_single_sample_template'),
+              'singularity_image_path':Variable.get('scirpy_notebook_image')})
+  load_scanpy_report_for_vdj_b_to_db = \
+    PythonOperator(
+      task_id='load_scanpy_report_for_vdj_b_to_db',
+      dag=dag,
+      python_callable=load_analysis_files_func,
+      params={'collection_name_task':'load_cellranger_result_to_db',
+              'collection_name_key':'sample_igf_id',
+              'file_name_task':'run_scirpy_for_vdj_b',
+              'file_name_key':'scirpy_notebook',
+              'analysis_name':'scirpy_vdj_b',
+              'collection_type':'SCIRPY_VDJ_B_HTML',
+              'collection_table':'sample',
+              'output_files_key':'output_db_files'})
   upload_scanpy_report_for_vdj_b_to_ftp = \
     DummyOperator(
       task_id='upload_scanpy_report_for_vdj_b_to_ftp',
@@ -265,22 +317,41 @@ with dag:
       dag=dag)
   ## PIPELINE
   decide_analysis_branch >> run_scirpy_for_vdj_b
-  run_scirpy_for_vdj_b >> upload_scanpy_report_for_vdj_b_to_ftp
-  run_scirpy_for_vdj_b >> upload_scanpy_report_for_vdj_b_to_box
+  run_scirpy_for_vdj_b >> load_scanpy_report_for_vdj_b_to_db
+  load_scanpy_report_for_vdj_b_to_db >> upload_scanpy_report_for_vdj_b_to_ftp
+  load_scanpy_report_for_vdj_b_to_db >> upload_scanpy_report_for_vdj_b_to_box
   ## TASK
   run_scirpy_for_vdj_t = \
     DummyOperator(
       task_id='run_scirpy_for_vdj_t',
       dag=dag,
-      python_callable=run_scirpy_for_vdj_func,
+      python_callable=run_singlecell_notebook_wrapper_func,
       params={'cellranger_xcom_key':'cellranger_output',
               'cellranger_xcom_pull_task':'run_cellranger',
-              'timeout':1200,
+              'scanpy_timeout':1200,
               'allow_errors':False,
+              'kernel_name':'python3',
               'vdj_dir':'vdj_t',
+              'count_dir':'count',
+              'cell_marker_list':Variable.get('all_cell_marker_list'),
               'output_notebook_key':'scirpy_notebook',
               'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
-              'analysis_description_xcom_key':'analysis_description'})
+              'analysis_description_xcom_key':'analysis_description',
+              'template_ipynb_path':Variable.get('scirpy_single_sample_template'),
+              'singularity_image_path':Variable.get('scirpy_notebook_image')})
+  load_scanpy_report_for_vdj_t_to_db = \
+    PythonOperator(
+      task_id='load_scanpy_report_for_vdj_t_to_db',
+      dag=dag,
+      python_callable=load_analysis_files_func,
+      params={'collection_name_task':'load_cellranger_result_to_db',
+              'collection_name_key':'sample_igf_id',
+              'file_name_task':'run_scirpy_for_vdj_t',
+              'file_name_key':'scirpy_notebook',
+              'analysis_name':'scirpy_vdj_t',
+              'collection_type':'SCIRPY_VDJ_T_HTML',
+              'collection_table':'sample',
+              'output_files_key':'output_db_files'})
   upload_scanpy_report_for_vdj_t_to_ftp = \
     DummyOperator(
       task_id='upload_scanpy_report_for_vdj_t_to_ftp',
@@ -291,15 +362,32 @@ with dag:
       dag=dag)
   ## PIPELINE
   decide_analysis_branch >> run_scirpy_for_vdj_t
-  run_scirpy_for_vdj_t >> upload_scanpy_report_for_vdj_t_to_ftp
-  run_scirpy_for_vdj_t >> upload_scanpy_report_for_vdj_t_to_box
+  run_scirpy_for_vdj_t >> load_scanpy_report_for_vdj_t_to_db
+  load_scanpy_report_for_vdj_t_to_db >> upload_scanpy_report_for_vdj_t_to_ftp
+  load_scanpy_report_for_vdj_t_to_db >> upload_scanpy_report_for_vdj_t_to_box
   ## TASK
   run_seurat_for_sc_5p = \
-    DummyOperator(
+    PythonOperator(
       task_id='run_seurat_for_sc_5p',
       dag=dag,
+      python_callable=run_singlecell_notebook_wrapper_func,
       params={'cellranger_xcom_key':'cellranger_output',
-              'cellranger_xcom_pull_task':'run_cellranger'})
+              'cellranger_xcom_pull_task':'run_cellranger',
+              'scanpy_timeout':1200,
+              'allow_errors':False,
+              'kernel_name':'R',
+              'vdj_dir':'vdj',
+              'count_dir':'count',
+              'cell_marker_list':Variable.get('all_cell_marker_list'),
+              'output_notebook_key':'seurat_notebook',
+              'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
+              'analysis_description_xcom_key':'analysis_description',
+              'template_ipynb_path':Variable.get('seurat_single_sample_template'),
+              'singularity_image_path':Variable.get('seurat_notebook_image')})
+  load_seurat_report_for_sc_5p_db = \
+    DummyOperator(
+      task_id='load_seurat_report_for_sc_5p_db',
+      dag=dag)
   upload_seurat_report_for_sc_5p_ftp = \
     DummyOperator(
       task_id='upload_seurat_report_for_sc_5p_ftp',
@@ -310,8 +398,9 @@ with dag:
       dag=dag)
   ## PIPELINE
   decide_analysis_branch >> run_seurat_for_sc_5p
-  run_seurat_for_sc_5p >> upload_seurat_report_for_sc_5p_ftp
-  run_seurat_for_sc_5p >> upload_seurat_report_for_sc_5p_to_box
+  run_seurat_for_sc_5p >> load_seurat_report_for_sc_5p_db
+  load_seurat_report_for_sc_5p_db >> upload_seurat_report_for_sc_5p_ftp
+  load_seurat_report_for_sc_5p_db >> upload_seurat_report_for_sc_5p_to_box
   ## TASK
   convert_bam_to_cram = \
     DummyOperator(