Browse Source

downloading tar files directly on hpc

Avik Datta 3 năm trước cách đây
mục cha
commit
fbf69ef354
2 tập tin đã thay đổi với 57 bổ sung35 xóa
  1. 1 1
      airflow_var/var.json
  2. 56 34
      dags/dag14_crick_seqrun_transfer.py

+ 1 - 1
airflow_var/var.json

@@ -8,7 +8,7 @@
   "hpc_max_workers_per_queue": 10,
   "hpc_max_total_workers": 40,
   "crick_ftp_seqrun_hostname": "hermes.crick.ac.uk",
-  "crick_ftp_config_file": "/home/igf/igf_code/airflow/secret/crick_ftp_config.json",
+  "crick_ftp_config_file": "/rds/general/user/igf/home/secret_keys/crick_ftp_config.json",
   "seqrun_base_path": "/home/igf/seqrun/illumina",
   "seqrun_server_user": "igf",
   "database_config_file": "/rds/general/user/igf/home/secret_keys/dbconfig.json",

+ 56 - 34
dags/dag14_crick_seqrun_transfer.py

@@ -4,10 +4,14 @@ from airflow.models import DAG,Variable
 from airflow.utils.dates import days_ago
 from airflow.contrib.operators.ssh_operator import SSHOperator
 from airflow.contrib.hooks.ssh_hook import SSHHook
+from airflow.operators.python_operator import PythonOperator
+from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_transfer_run_func
+from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import extract_tar_file_func
 
 FTP_SEQRUN_SERVER = Variable.get('crick_ftp_seqrun_hostname')
 FTP_CONFIG_FILE = Variable.get('crick_ftp_config_file')
 SEQRUN_BASE_PATH = Variable.get('seqrun_base_path')
+HPC_SEQRUN_BASE_PATH = Variable.get('hpc_seqrun_path')
 
 args = {
     'owner': 'airflow',
@@ -39,49 +43,67 @@ dag = \
 with dag:
   ## TASK
   check_and_transfer_run = \
-    SSHOperator(
+    PythonOperator(
       task_id='check_and_transfer_run',
       dag=dag,
       pool='crick_ftp_pool',
-      ssh_hook=orwell_ssh_hook,
-      do_xcom_push=False,
       queue='hpc_4G',
       params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
-              'seqrun_base_path': SEQRUN_BASE_PATH,
-              'ftp_config_file': FTP_CONFIG_FILE},
-      command="""
-        source /home/igf/igf_code/airflow/env.sh;
-        python /home/igf/igf_code/airflow/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
-          -f {{ params.ftp_seqrun_server }} \
-          -s {{ dag_run.conf["seqrun_id"] }} \
-          -d {{ params.seqrun_base_path }} \
-          -c {{ params.ftp_config_file }}
-        """)
+             'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH,
+             'ftp_config_file': FTP_CONFIG_FILE},
+      python_callable=check_and_transfer_run_func)
   ## TASK
   extract_tar_file = \
-    SSHOperator(
+    PythonOperator(
       task_id='extract_tar_file',
       dag=dag,
-      pool='orwell_exe_pool',
-      ssh_hook=orwell_ssh_hook,
-      do_xcom_push=False,
       queue='hpc_4G',
-      params={'seqrun_base_path': SEQRUN_BASE_PATH},
-      command="""
-      cd {{ params.seqrun_base_path }};
-      if [ -d {{ dag_run.conf["seqrun_id"] }} ];
-      then
-        echo "Seqrun dir exists";
-        exit 1;
-      else
-        mkdir -p {{ dag_run.conf["seqrun_id"] }};
-        tar \
-          --no-same-owner \
-          --no-same-permissions \
-          --owner=igf \
-          -xzf {{ dag_run.conf["seqrun_id"] }}.tar.gz -C {{ dag_run.conf["seqrun_id"] }}
-      fi
-      """
-    )
+      params={'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH},
+      python_callable=extract_tar_file_func)
+  #check_and_transfer_run = \
+  #  SSHOperator(
+  #    task_id='check_and_transfer_run',
+  #    dag=dag,
+  #    pool='crick_ftp_pool',
+  #    ssh_hook=orwell_ssh_hook,
+  #    do_xcom_push=False,
+  #    queue='hpc_4G',
+  #    params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
+  #            'seqrun_base_path': SEQRUN_BASE_PATH,
+  #            'ftp_config_file': FTP_CONFIG_FILE},
+  #    command="""
+  #      source /home/igf/igf_code/airflow/env.sh;
+  #      python /home/igf/igf_code/airflow/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
+  #        -f {{ params.ftp_seqrun_server }} \
+  #        -s {{ dag_run.conf["seqrun_id"] }} \
+  #        -d {{ params.seqrun_base_path }} \
+  #        -c {{ params.ftp_config_file }}
+  #      """)
+  ## TASK
+  #extract_tar_file = \
+  #  SSHOperator(
+  #    task_id='extract_tar_file',
+  #    dag=dag,
+  #    pool='orwell_exe_pool',
+  #    ssh_hook=orwell_ssh_hook,
+  #    do_xcom_push=False,
+  #    queue='hpc_4G',
+  #    params={'seqrun_base_path': SEQRUN_BASE_PATH},
+  #    command="""
+  #    cd {{ params.seqrun_base_path }};
+  #    if [ -d {{ dag_run.conf["seqrun_id"] }} ];
+  #    then
+  #      echo "Seqrun dir exists";
+  #      exit 1;
+  #    else
+  #      mkdir -p {{ dag_run.conf["seqrun_id"] }};
+  #      tar \
+  #        --no-same-owner \
+  #        --no-same-permissions \
+  #        --owner=igf \
+  #        -xzf {{ dag_run.conf["seqrun_id"] }}.tar.gz -C {{ dag_run.conf["seqrun_id"] }}
+  #    fi
+  #    """
+  #  )
   ## PIPELINE
   check_and_transfer_run >> extract_tar_file