|
@@ -1,9 +1,7 @@
|
|
|
from datetime import timedelta
|
|
|
from airflow.models import DAG,Variable
|
|
|
from airflow.utils.dates import days_ago
|
|
|
-from airflow.operators.python_operator import PythonOperator
|
|
|
from airflow.contrib.operators.ssh_operator import SSHOperator
|
|
|
-from airflow.operators.bash_operator import BashOperator
|
|
|
from airflow.contrib.hooks.ssh_hook import SSHHook
|
|
|
|
|
|
FTP_SEQRUN_SERVER = Variable.get('crick_ftp_seqrun_hostname')
|
|
@@ -41,13 +39,15 @@ with dag:
|
|
|
# TASK
|
|
|
# check if the FTP host is alive
|
|
|
ping_remote_host = \
|
|
|
- BashOperator(
|
|
|
+ SSHOperator(
|
|
|
task_id='ping_remote_host',
|
|
|
dag=dag,
|
|
|
- xcom_push=False,
|
|
|
+ ssh_hook=orwell_ssh_hook,
|
|
|
+ pool='orwell_exe_pool',
|
|
|
+ do_xcom_push=False,
|
|
|
queue='hpc_4G',
|
|
|
params={'FTP_SEQRUN_SERVER': FTP_SEQRUN_SERVER},
|
|
|
- bash_command='ping -c5 {{ params.FTP_SEQRUN_SERVER }}')
|
|
|
+ command='ping -c5 {{ params.FTP_SEQRUN_SERVER }}')
|
|
|
# TASK
|
|
|
check_and_transfer_run = \
|
|
|
SSHOperator(
|