|
@@ -11,6 +11,7 @@ from airflow.contrib.operators.sftp_operator import SFTPOperator
|
|
|
|
|
|
FTP_SEQRUN_SERVER = Variable.get('crick_ftp_seqrun_hostname')
|
|
|
FTP_CONFIG_FILE = Variable.get('crick_ftp_config_file_wells')
|
|
|
+FTP_BASE_PATH = Variable.get('crick_ftp_base_path')
|
|
|
SEQRUN_BASE_PATH = Variable.get('seqrun_base_path')
|
|
|
HPC_SEQRUN_BASE_PATH = Variable.get('hpc_seqrun_path')
|
|
|
|
|
@@ -86,9 +87,10 @@ with dag:
|
|
|
pool='crick_ftp_pool',
|
|
|
queue='hpc_4G',
|
|
|
ssh_conn_id="crick_sftp_conn",
|
|
|
- params={'seqrun_base_path': HPC_SEQRUN_BASE_PATH},
|
|
|
- local_filepath="/rds/general/user/igf/ephemeral/{{ dag_run.conf['seqrun_id'] }}.tar.gz",
|
|
|
- remote_filepath="{{ params.seqrun_base_path }}/{{ dag_run.conf['seqrun_id'] }}.tar.gz",
|
|
|
+ params={'seqrun_base_path': HPC_SEQRUN_BASE_PATH,
|
|
|
+ 'ftp_base_path': FTP_BASE_PATH},
|
|
|
+ local_filepath="{{ params.seqrun_base_path }}/{{ dag_run.conf['seqrun_id'] }}.tar.gz",
|
|
|
+ remote_filepath="{{ params.ftp_base_path }}/{{ dag_run.conf['seqrun_id'] }}.tar.gz",
|
|
|
operation='get'
|
|
|
)
|
|
|
# TASK
|