Avik Datta 3 éve
szülő
commit
45c57daa27
1 módosított fájl, 73 hozzáadás és 0 törlés
  1. 73 0
      dags/dag14_crick_seqrun_transfer.py

+ 73 - 0
dags/dag14_crick_seqrun_transfer.py

@@ -0,0 +1,73 @@
+from datetime import timedelta
+from airflow.models import DAG,Variable
+from airflow.utils.dates import days_ago
+from airflow.operators.python_operator import PythonOperator
+from airflow.contrib.operators.ssh_operator import SSHOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.contrib.hooks.ssh_hook import SSHHook
+
+FTP_SEQRUN_SERVER = Variable.get('crick_ftp_seqrun_hostname')
+FTP_CONFIG_FILE = Variable.get('crick_ftp_config_file')
+SEQRUN_BASE_PATH = Variable.get('seqrun_base_path')
+
+args = {
+    'owner': 'airflow',
+    'start_date': days_ago(2),
+    'retries': 1,
+    'retry_delay': timedelta(minutes=5),
+    'provide_context': True,
+    'email_on_failure': False,
+    'email_on_retry': False,
+    'catchup': False,
+    'max_active_runs': 1,
+}
+
+## SSH HOOK
+orwell_ssh_hook = \
+  SSHHook(
+    key_file=Variable.get('hpc_ssh_key_file'),
+    username=Variable.get('hpc_user'),
+    remote_host=Variable.get('orwell_server_hostname'))
+
+dag = \
+  DAG(
+    dag_id='dag14_crick_seqrun_transfer',
+    schedule_interval=None,
+    default_args=args,
+    tags=['ftp', 'hpc', 'orwell'])
+
+
+with dag:
+  # TASK
+  # check if the FTP host is alive
+  ping_remote_host = \
+    BashOperator(
+      task_id='ping_remote_host',
+      dag=dag,
+      xcom_push=False,
+      queue='hpc_4G',
+      params={'FTP_SEQRUN_SERVER': FTP_SEQRUN_SERVER},
+      bash_command='ping -c5 {{ params.FTP_SEQRUN_SERVER }}')
+  # TASK
+  check_and_transfer_run = \
+    SSHOperator(
+      task_id='check_and_transfer_run',
+      dag=dag,
+      pool='crick_ftp_pool',
+      ssh_hook=orwell_ssh_hook,
+      do_xcom_push=False,
+      queue='hpc_4G',
+      params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
+              'seqrun_base_path': SEQRUN_BASE_PATH,
+              'ftp_config_file': FTP_CONFIG_FILE,
+              'seqrun_id': "{{ dag_run.conf['seqrun_id'] }}" },
+      command="""
+        source /home/igf/igf_code/env.sh;
+        python /home/igf/igf_code/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
+          -f {{ params.ftp_seqrun_server }} \
+          -s {{ params.seqrun_id }} \
+          -d {{ params.seqrun_base_path }} \
+          -c {{ params.ftp_config_file }}
+        """)
+  ## PIPELINE
+  ping_remote_host >> check_and_transfer_run