Explorar o código

using ftpoperator

Avik Datta %!s(int64=3) %!d(string=hai) anos
pai
achega
5c5a9638e4
Modificáronse 1 ficheiros con 24 adicións e 12 borrados
  1. 24 12
      dags/dag14_crick_seqrun_transfer.py

+ 24 - 12
dags/dag14_crick_seqrun_transfer.py

@@ -5,6 +5,7 @@ from airflow.contrib.operators.ssh_operator import SSHOperator
 from airflow.contrib.hooks.ssh_hook import SSHHook
 from airflow.contrib.hooks.ssh_hook import SSHHook
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.python_operator import PythonOperator
 from airflow.operators.python_operator import PythonOperator
+from airflow.contrib.operators.sftp_operator import SFTPOperator, SFTPOperation
 #from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_transfer_run_func
 #from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_transfer_run_func
 #from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import extract_tar_file_func
 #from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import extract_tar_file_func
 
 
@@ -61,23 +62,34 @@ with dag:
   #    queue='hpc_4G',
   #    queue='hpc_4G',
   #    params={'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH},
   #    params={'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH},
   #    python_callable=extract_tar_file_func)
   #    python_callable=extract_tar_file_func)
+  #check_and_transfer_run = \
+  #  BashOperator(
+  #    task_id='check_and_transfer_run',
+  #    dag=dag,
+  #    pool='crick_ftp_pool',
+  #    do_xcom_push=False,
+  #    queue='wells',
+  #    params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
+  #            'seqrun_base_path': HPC_SEQRUN_BASE_PATH,
+  #            'ftp_config_file': FTP_CONFIG_FILE},
+  #    bash_command="""
+  #      python /rds/general/user/igf/home/data2/airflow_test/github/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
+  #        -f {{ params.ftp_seqrun_server }} \
+  #        -s {{ dag_run.conf["seqrun_id"] }} \
+  #        -d {{ params.seqrun_base_path }} \
+  #        -c {{ params.ftp_config_file }}
+  #      """)
   check_and_transfer_run = \
   check_and_transfer_run = \
-    BashOperator(
+    SFTPOperator(
       task_id='check_and_transfer_run',
       task_id='check_and_transfer_run',
       dag=dag,
       dag=dag,
       pool='crick_ftp_pool',
       pool='crick_ftp_pool',
-      do_xcom_push=False,
       queue='wells',
       queue='wells',
-      params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
-              'seqrun_base_path': HPC_SEQRUN_BASE_PATH,
-              'ftp_config_file': FTP_CONFIG_FILE},
-      bash_command="""
-        python /rds/general/user/igf/home/data2/airflow_test/github/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
-          -f {{ params.ftp_seqrun_server }} \
-          -s {{ dag_run.conf["seqrun_id"] }} \
-          -d {{ params.seqrun_base_path }} \
-          -c {{ params.ftp_config_file }}
-        """)
+      ssh_conn_id="crick_sftp_conn",
+      local_filepath="/rds/general/project/genomics-facility-archive-2019/ephemeral/{{ dag_run.conf['seqrun_id'] }}.tar.gz",
+      remote_filepath="/users/dattaa/runs/{{ dag_run.conf['seqrun_id'] }}.tar.gz",
+      operation=SFTPOperation.GET
+    )
   # TASK
   # TASK
   extract_tar_file = \
   extract_tar_file = \
     BashOperator(
     BashOperator(