Browse Source

uodated dag9

Avik Datta 4 years ago
parent
commit
f639099209
2 changed files with 35 additions and 5 deletions
  1. 9 3
      airflow_var/var.json
  2. 26 2
      dags/dag9_tenx_single_cell_immune_profiling.py

+ 9 - 3
airflow_var/var.json

@@ -41,8 +41,8 @@
   "seurat_notebook_image":"",
   "seurat_notebook_image":"",
   "hpc_queue_list":{
   "hpc_queue_list":{
   "irods_exe_dir":"",
   "irods_exe_dir":"",
-  "ftp_hostname":"",
-  "ftp_username":"",
+  "ftp_hostname":"eliot.med.ic.ac.uk",
+  "ftp_username":"igf",
   "ftp_project_path":"",
   "ftp_project_path":"",
   "all_cell_marker_list":"",
   "all_cell_marker_list":"",
     "hpc_1G": {
     "hpc_1G": {
@@ -92,6 +92,12 @@
       "airflow_queue":"hpc_1G,hpc_1G4t,hpc_4G,hpc_4G4t,hpc_4G8t,hpc_4G16t,hpc_8G,hpc_8G4t,hpc_8G8t,hpc_8G16,hpc_16G4t,hpc_16G8t,hpc_16G16t,hpc_32G16t"},
       "airflow_queue":"hpc_1G,hpc_1G4t,hpc_4G,hpc_4G4t,hpc_4G8t,hpc_4G16t,hpc_8G,hpc_8G4t,hpc_8G8t,hpc_8G16,hpc_16G4t,hpc_16G8t,hpc_16G16t,hpc_32G16t"},
     "hpc_42G16t":{
     "hpc_42G16t":{
       "pbs_resource":"-lselect=1:ncpus=16:mem=42gb -lwalltime=02:00:00",
       "pbs_resource":"-lselect=1:ncpus=16:mem=42gb -lwalltime=02:00:00",
-      "airflow_queue":"hpc_1G,hpc_1G4t,hpc_4G,hpc_4G4t,hpc_4G8t,hpc_4G16t,hpc_8G,hpc_8G4t,hpc_8G8t,hpc_8G16,hpc_16G4t,hpc_16G8t,hpc_16G16t,hpc_32G16,hpc_42G16t"}
+      "airflow_queue":"hpc_1G,hpc_1G4t,hpc_4G,hpc_4G4t,hpc_4G8t,hpc_4G16t,hpc_8G,hpc_8G4t,hpc_8G8t,hpc_8G16,hpc_16G4t,hpc_16G8t,hpc_16G16t,hpc_32G16,hpc_42G16t"},
+    "hpc_64G16t24hr":{
+      "pbs_resource":"-lselect=1:ncpus=16:mem=64gb -lwalltime=24:00:00",
+      "airflow_queue":"hpc_1G,hpc_1G4t,hpc_4G,hpc_4G4t,hpc_4G8t,hpc_4G16t,hpc_8G,hpc_8G4t,hpc_8G8t,hpc_8G16,hpc_16G4t,hpc_16G8t,hpc_16G16t,hpc_32G16,hpc_42G16t,hpc_64G16t24hr"},
+    "hpc_64G16t72hr":{
+      "pbs_resource":"-lselect=1:ncpus=16:mem=64gb -lwalltime=72:00:00",
+      "airflow_queue":"hpc_1G,hpc_1G4t,hpc_4G,hpc_4G4t,hpc_4G8t,hpc_4G16t,hpc_8G,hpc_8G4t,hpc_8G8t,hpc_8G16,hpc_16G4t,hpc_16G8t,hpc_16G16t,hpc_32G16,hpc_42G16t,hpc_64G16t24hr,hpc_64G16t72hr"}
    }
    }
 }
 }

+ 26 - 2
dags/dag9_tenx_single_cell_immune_profiling.py

@@ -17,6 +17,7 @@ from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import irods
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_scanpy_for_sc_5p_func
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_scanpy_for_sc_5p_func
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_singlecell_notebook_wrapper_func
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import run_singlecell_notebook_wrapper_func
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_analysis_files_func
 from igf_airflow.utils.dag9_tenx_single_cell_immune_profiling_utils import load_analysis_files_func
+
 ## ARGS
 ## ARGS
 default_args = {
 default_args = {
     'owner': 'airflow',
     'owner': 'airflow',
@@ -48,7 +49,7 @@ with dag:
     BranchPythonOperator(
     BranchPythonOperator(
       task_id='fetch_analysis_info',
       task_id='fetch_analysis_info',
       dag=dag,
       dag=dag,
-      queue='hpc_4g',
+      queue='hpc_4G',
       params={'no_analysis_task':'no_analysis',
       params={'no_analysis_task':'no_analysis',
               'analysis_description_xcom_key':'analysis_description',
               'analysis_description_xcom_key':'analysis_description',
               'analysis_info_xcom_key':'analysis_info'},
               'analysis_info_xcom_key':'analysis_info'},
@@ -58,7 +59,7 @@ with dag:
     PythonOperator(
     PythonOperator(
       task_id='configure_cellranger_run',
       task_id='configure_cellranger_run',
       dag=dag,
       dag=dag,
-      queue='hpc_4g',
+      queue='hpc_4G',
       params={'xcom_pull_task_id':'fetch_analysis_info_and_branch',
       params={'xcom_pull_task_id':'fetch_analysis_info_and_branch',
               'analysis_description_xcom_key':'analysis_description',
               'analysis_description_xcom_key':'analysis_description',
               'analysis_info_xcom_key':'analysis_info',
               'analysis_info_xcom_key':'analysis_info',
@@ -77,6 +78,7 @@ with dag:
         PythonOperator(
         PythonOperator(
           task_id='run_trim_{0}_{1}'.format(analysis_name,run_id),
           task_id='run_trim_{0}_{1}'.format(analysis_name,run_id),
           dag=dag,
           dag=dag,
+          queue='hpc_4G',
           params={'xcom_pull_task_id':'fetch_analysis_info_and_branch',
           params={'xcom_pull_task_id':'fetch_analysis_info_and_branch',
                   'analysis_info_xcom_key':'analysis_info',
                   'analysis_info_xcom_key':'analysis_info',
                   'analysis_name':analysis_name,
                   'analysis_name':analysis_name,
@@ -109,6 +111,7 @@ with dag:
     PythonOperator(
     PythonOperator(
       task_id='run_cellranger',
       task_id='run_cellranger',
       dag=dag,
       dag=dag,
+      queue='hpc_64G16t24hr',
       params={'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
       params={'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
               'analysis_description_xcom_key':'analysis_description',
               'analysis_description_xcom_key':'analysis_description',
               'library_csv_xcom_key':'cellranger_library_csv',
               'library_csv_xcom_key':'cellranger_library_csv',
@@ -123,6 +126,7 @@ with dag:
     BranchPythonOperator(
     BranchPythonOperator(
       task_id='decide_analysis_branch',
       task_id='decide_analysis_branch',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=decide_analysis_branch_func,
       python_callable=decide_analysis_branch_func,
       params={'load_cellranger_result_to_db_task':'load_cellranger_result_to_db',
       params={'load_cellranger_result_to_db_task':'load_cellranger_result_to_db',
               'run_scanpy_for_sc_5p_task':'run_scanpy_for_sc_5p',
               'run_scanpy_for_sc_5p_task':'run_scanpy_for_sc_5p',
@@ -141,6 +145,7 @@ with dag:
     PythonOperator(
     PythonOperator(
       task_id='load_cellranger_result_to_db',
       task_id='load_cellranger_result_to_db',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=load_cellranger_result_to_db_func,
       python_callable=load_cellranger_result_to_db_func,
       params={'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
       params={'analysis_description_xcom_pull_task':'fetch_analysis_info_and_branch',
               'analysis_description_xcom_key':'analysis_description',
               'analysis_description_xcom_key':'analysis_description',
@@ -158,6 +163,7 @@ with dag:
     PythonOperator(
     PythonOperator(
       task_id='upload_report_to_ftp',
       task_id='upload_report_to_ftp',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=ftp_files_upload_for_analysis,
       python_callable=ftp_files_upload_for_analysis,
       params={'xcom_pull_task':'load_cellranger_result_to_db',
       params={'xcom_pull_task':'load_cellranger_result_to_db',
               'xcom_pull_files_key':'html_report_file',
               'xcom_pull_files_key':'html_report_file',
@@ -170,12 +176,14 @@ with dag:
     DummyOperator(
     DummyOperator(
       task_id='upload_report_to_box',
       task_id='upload_report_to_box',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       params={'xcom_pull_task':'load_cellranger_result_to_db',
       params={'xcom_pull_task':'load_cellranger_result_to_db',
               'xcom_pull_files_key':'html_report_file'})
               'xcom_pull_files_key':'html_report_file'})
   upload_results_to_irods = \
   upload_results_to_irods = \
     PythonOperator(
     PythonOperator(
       task_id='upload_results_to_irods',
       task_id='upload_results_to_irods',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=irods_files_upload_for_analysis,
       python_callable=irods_files_upload_for_analysis,
       params={'xcom_pull_task':'load_cellranger_result_to_db',
       params={'xcom_pull_task':'load_cellranger_result_to_db',
               'xcom_pull_files_key':'loaded_output_files',
               'xcom_pull_files_key':'loaded_output_files',
@@ -191,6 +199,7 @@ with dag:
     PythonOperator(
     PythonOperator(
       task_id='run_scanpy_for_sc_5p',
       task_id='run_scanpy_for_sc_5p',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=run_scanpy_for_sc_5p_func,
       python_callable=run_scanpy_for_sc_5p_func,
       params={'cellranger_xcom_key':'cellranger_output',
       params={'cellranger_xcom_key':'cellranger_output',
               'cellranger_xcom_pull_task':'run_cellranger',
               'cellranger_xcom_pull_task':'run_cellranger',
@@ -204,6 +213,7 @@ with dag:
     PythonOperator(
     PythonOperator(
       task_id='load_scanpy_report_for_sc_5p_to_db',
       task_id='load_scanpy_report_for_sc_5p_to_db',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=load_analysis_files_func,
       python_callable=load_analysis_files_func,
       params={'collection_name_task':'load_cellranger_result_to_db',
       params={'collection_name_task':'load_cellranger_result_to_db',
               'collection_name_key':'sample_igf_id',
               'collection_name_key':'sample_igf_id',
@@ -217,6 +227,7 @@ with dag:
     PythonOperator(
     PythonOperator(
       task_id='upload_scanpy_report_for_sc_5p_to_ftp',
       task_id='upload_scanpy_report_for_sc_5p_to_ftp',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=ftp_files_upload_for_analysis,
       python_callable=ftp_files_upload_for_analysis,
       params={'xcom_pull_task':'load_scanpy_report_for_sc_5p_to_db',
       params={'xcom_pull_task':'load_scanpy_report_for_sc_5p_to_db',
               'xcom_pull_files_key':'output_db_files',
               'xcom_pull_files_key':'output_db_files',
@@ -233,6 +244,7 @@ with dag:
     DummyOperator(
     DummyOperator(
       task_id='upload_cellbrowser_for_sc_5p_to_ftp',
       task_id='upload_cellbrowser_for_sc_5p_to_ftp',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=ftp_files_upload_for_analysis,
       python_callable=ftp_files_upload_for_analysis,
       params={'xcom_pull_task':'run_scanpy_for_sc_5p',
       params={'xcom_pull_task':'run_scanpy_for_sc_5p',
               'xcom_pull_files_key':'cellbrowser_dirs',
               'xcom_pull_files_key':'cellbrowser_dirs',
@@ -252,6 +264,7 @@ with dag:
     PythonOperator(
     PythonOperator(
       task_id='run_scirpy_for_vdj',
       task_id='run_scirpy_for_vdj',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=run_singlecell_notebook_wrapper_func,
       python_callable=run_singlecell_notebook_wrapper_func,
       params={'cellranger_xcom_key':'cellranger_output',
       params={'cellranger_xcom_key':'cellranger_output',
               'cellranger_xcom_pull_task':'run_cellranger',
               'cellranger_xcom_pull_task':'run_cellranger',
@@ -270,6 +283,7 @@ with dag:
     PythonOperator(
     PythonOperator(
       task_id='load_scanpy_report_for_vdj_to_db',
       task_id='load_scanpy_report_for_vdj_to_db',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=load_analysis_files_func,
       python_callable=load_analysis_files_func,
       params={'collection_name_task':'load_cellranger_result_to_db',
       params={'collection_name_task':'load_cellranger_result_to_db',
               'collection_name_key':'sample_igf_id',
               'collection_name_key':'sample_igf_id',
@@ -283,6 +297,7 @@ with dag:
     PythonOperator(
     PythonOperator(
       task_id='upload_scanpy_report_for_vdj_to_ftp',
       task_id='upload_scanpy_report_for_vdj_to_ftp',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=ftp_files_upload_for_analysis,
       python_callable=ftp_files_upload_for_analysis,
       params={'xcom_pull_task':'load_scanpy_report_for_vdj_to_db',
       params={'xcom_pull_task':'load_scanpy_report_for_vdj_to_db',
               'xcom_pull_files_key':'output_db_files',
               'xcom_pull_files_key':'output_db_files',
@@ -305,6 +320,7 @@ with dag:
     DummyOperator(
     DummyOperator(
       task_id='run_scirpy_for_vdj_b',
       task_id='run_scirpy_for_vdj_b',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=run_singlecell_notebook_wrapper_func,
       python_callable=run_singlecell_notebook_wrapper_func,
       params={'cellranger_xcom_key':'cellranger_output',
       params={'cellranger_xcom_key':'cellranger_output',
               'cellranger_xcom_pull_task':'run_cellranger',
               'cellranger_xcom_pull_task':'run_cellranger',
@@ -323,6 +339,7 @@ with dag:
     PythonOperator(
     PythonOperator(
       task_id='load_scanpy_report_for_vdj_b_to_db',
       task_id='load_scanpy_report_for_vdj_b_to_db',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=load_analysis_files_func,
       python_callable=load_analysis_files_func,
       params={'collection_name_task':'load_cellranger_result_to_db',
       params={'collection_name_task':'load_cellranger_result_to_db',
               'collection_name_key':'sample_igf_id',
               'collection_name_key':'sample_igf_id',
@@ -336,6 +353,7 @@ with dag:
     PythonOperator(
     PythonOperator(
       task_id='upload_scanpy_report_for_vdj_b_to_ftp',
       task_id='upload_scanpy_report_for_vdj_b_to_ftp',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=ftp_files_upload_for_analysis,
       python_callable=ftp_files_upload_for_analysis,
       params={'xcom_pull_task':'load_scanpy_report_for_vdj_b_to_db',
       params={'xcom_pull_task':'load_scanpy_report_for_vdj_b_to_db',
               'xcom_pull_files_key':'output_db_files',
               'xcom_pull_files_key':'output_db_files',
@@ -358,6 +376,7 @@ with dag:
     DummyOperator(
     DummyOperator(
       task_id='run_scirpy_for_vdj_t',
       task_id='run_scirpy_for_vdj_t',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=run_singlecell_notebook_wrapper_func,
       python_callable=run_singlecell_notebook_wrapper_func,
       params={'cellranger_xcom_key':'cellranger_output',
       params={'cellranger_xcom_key':'cellranger_output',
               'cellranger_xcom_pull_task':'run_cellranger',
               'cellranger_xcom_pull_task':'run_cellranger',
@@ -376,6 +395,7 @@ with dag:
     PythonOperator(
     PythonOperator(
       task_id='load_scanpy_report_for_vdj_t_to_db',
       task_id='load_scanpy_report_for_vdj_t_to_db',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=load_analysis_files_func,
       python_callable=load_analysis_files_func,
       params={'collection_name_task':'load_cellranger_result_to_db',
       params={'collection_name_task':'load_cellranger_result_to_db',
               'collection_name_key':'sample_igf_id',
               'collection_name_key':'sample_igf_id',
@@ -389,6 +409,7 @@ with dag:
     PythonOperator(
     PythonOperator(
       task_id='upload_scanpy_report_for_vdj_t_to_ftp',
       task_id='upload_scanpy_report_for_vdj_t_to_ftp',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=ftp_files_upload_for_analysis,
       python_callable=ftp_files_upload_for_analysis,
       params={'xcom_pull_task':'load_scanpy_report_for_vdj_t_to_db',
       params={'xcom_pull_task':'load_scanpy_report_for_vdj_t_to_db',
               'xcom_pull_files_key':'output_db_files',
               'xcom_pull_files_key':'output_db_files',
@@ -411,6 +432,7 @@ with dag:
     PythonOperator(
     PythonOperator(
       task_id='run_seurat_for_sc_5p',
       task_id='run_seurat_for_sc_5p',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=run_singlecell_notebook_wrapper_func,
       python_callable=run_singlecell_notebook_wrapper_func,
       params={'cellranger_xcom_key':'cellranger_output',
       params={'cellranger_xcom_key':'cellranger_output',
               'cellranger_xcom_pull_task':'run_cellranger',
               'cellranger_xcom_pull_task':'run_cellranger',
@@ -429,6 +451,7 @@ with dag:
     PythonOperator(
     PythonOperator(
       task_id='load_seurat_report_for_sc_5p_db',
       task_id='load_seurat_report_for_sc_5p_db',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=load_analysis_files_func,
       python_callable=load_analysis_files_func,
       params={'collection_name_task':'load_cellranger_result_to_db',
       params={'collection_name_task':'load_cellranger_result_to_db',
               'collection_name_key':'sample_igf_id',
               'collection_name_key':'sample_igf_id',
@@ -442,6 +465,7 @@ with dag:
     DummyOperator(
     DummyOperator(
       task_id='upload_seurat_report_for_sc_5p_ftp',
       task_id='upload_seurat_report_for_sc_5p_ftp',
       dag=dag,
       dag=dag,
+      queue='hpc_4G',
       python_callable=ftp_files_upload_for_analysis,
       python_callable=ftp_files_upload_for_analysis,
       params={'xcom_pull_task':'load_seurat_report_for_sc_5p_db',
       params={'xcom_pull_task':'load_seurat_report_for_sc_5p_db',
               'xcom_pull_files_key':'output_db_files',
               'xcom_pull_files_key':'output_db_files',