Przeglądaj źródła

moving files download scripts to wells worker

Avik Datta 3 lat temu
rodzic
commit
b9d3f407b9
2 zmienionych plików z 9 dodań i 11 usunięć
  1. 2 1
      airflow_var/var.json
  2. 7 10
      dags/dag14_crick_seqrun_transfer.py

+ 2 - 1
airflow_var/var.json

@@ -8,7 +8,8 @@
   "hpc_max_workers_per_queue": 10,
   "hpc_max_total_workers": 40,
   "crick_ftp_seqrun_hostname": "hermes.crick.ac.uk",
-  "crick_ftp_config_file": "/home/igf/igf_code/airflow/secret/crick_ftp_config.json",
+  "crick_ftp_config_file_orwell": "/home/igf/igf_code/airflow/secret/crick_ftp_config.json",
+  "crick_ftp_config_file_wells": "/rds/general/user/igf/home/data2/airflow_test/secrets/crick_ftp_config.json",
   "confluence_config": "/rds/general/user/igf/home/secret_keys/confluence_private_api_key.json",
   "wiki_publication_page_id": 293602542,
   "wiki_publication_page_title": "Publications",

+ 7 - 10
dags/dag14_crick_seqrun_transfer.py

@@ -8,7 +8,7 @@ from airflow.operators.python_operator import PythonOperator
 #from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import extract_tar_file_func
 
 FTP_SEQRUN_SERVER = Variable.get('crick_ftp_seqrun_hostname')
-FTP_CONFIG_FILE = Variable.get('crick_ftp_config_file')
+FTP_CONFIG_FILE = Variable.get('crick_ftp_config_file_wells')
 SEQRUN_BASE_PATH = Variable.get('seqrun_base_path')
 HPC_SEQRUN_BASE_PATH = Variable.get('hpc_seqrun_path')
 
@@ -36,7 +36,7 @@ dag = \
     dag_id='dag14_crick_seqrun_transfer',
     schedule_interval=None,
     default_args=args,
-    tags=['ftp', 'hpc', 'orwell'])
+    tags=['ftp', 'hpc', 'orwell', 'wells'])
 
 
 with dag:
@@ -67,13 +67,12 @@ with dag:
       pool='crick_ftp_pool',
       ssh_hook=orwell_ssh_hook,
       do_xcom_push=False,
-      queue='hpc_4G',
+      queue='wells',
       params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
-              'seqrun_base_path': SEQRUN_BASE_PATH,
+              'seqrun_base_path': HPC_SEQRUN_BASE_PATH,
               'ftp_config_file': FTP_CONFIG_FILE},
       command="""
-        source /home/igf/igf_code/airflow/env.sh;
-        python /home/igf/igf_code/airflow/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
+        python /rds/general/user/igf/home/data2/airflow_test/github/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
           -f {{ params.ftp_seqrun_server }} \
           -s {{ dag_run.conf["seqrun_id"] }} \
           -d {{ params.seqrun_base_path }} \
@@ -84,11 +83,9 @@ with dag:
     SSHOperator(
       task_id='extract_tar_file',
       dag=dag,
-      pool='orwell_exe_pool',
-      ssh_hook=orwell_ssh_hook,
       do_xcom_push=False,
-      queue='hpc_4G',
-      params={'seqrun_base_path': SEQRUN_BASE_PATH},
+      queue='wells',
+      params={'seqrun_base_path': HPC_SEQRUN_BASE_PATH},
       command="""
       cd {{ params.seqrun_base_path }};
       if [ -d temp_{{ dag_run.conf["seqrun_id"] }} ];