dag14_crick_seqrun_transfer.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. from datetime import timedelta
  2. from airflow.models import DAG,Variable
  3. from airflow.utils.dates import days_ago
  4. from airflow.operators.python_operator import PythonOperator
  5. from airflow.contrib.operators.ssh_operator import SSHOperator
  6. from airflow.operators.bash_operator import BashOperator
  7. from airflow.contrib.hooks.ssh_hook import SSHHook
  8. FTP_SEQRUN_SERVER = Variable.get('crick_ftp_seqrun_hostname')
  9. FTP_CONFIG_FILE = Variable.get('crick_ftp_config_file')
  10. SEQRUN_BASE_PATH = Variable.get('seqrun_base_path')
  11. args = {
  12. 'owner': 'airflow',
  13. 'start_date': days_ago(2),
  14. 'retries': 1,
  15. 'retry_delay': timedelta(minutes=5),
  16. 'provide_context': True,
  17. 'email_on_failure': False,
  18. 'email_on_retry': False,
  19. 'catchup': False,
  20. 'max_active_runs': 1,
  21. }
  22. ## SSH HOOK
  23. orwell_ssh_hook = \
  24. SSHHook(
  25. key_file=Variable.get('hpc_ssh_key_file'),
  26. username=Variable.get('hpc_user'),
  27. remote_host=Variable.get('orwell_server_hostname'))
  28. dag = \
  29. DAG(
  30. dag_id='dag14_crick_seqrun_transfer',
  31. schedule_interval=None,
  32. default_args=args,
  33. tags=['ftp', 'hpc', 'orwell'])
  34. with dag:
  35. # TASK
  36. # check if the FTP host is alive
  37. ping_remote_host = \
  38. BashOperator(
  39. task_id='ping_remote_host',
  40. dag=dag,
  41. xcom_push=False,
  42. queue='hpc_4G',
  43. params={'FTP_SEQRUN_SERVER': FTP_SEQRUN_SERVER},
  44. bash_command='ping -c5 {{ params.FTP_SEQRUN_SERVER }}')
  45. # TASK
  46. check_and_transfer_run = \
  47. SSHOperator(
  48. task_id='check_and_transfer_run',
  49. dag=dag,
  50. pool='crick_ftp_pool',
  51. ssh_hook=orwell_ssh_hook,
  52. do_xcom_push=False,
  53. queue='hpc_4G',
  54. params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
  55. 'seqrun_base_path': SEQRUN_BASE_PATH,
  56. 'ftp_config_file': FTP_CONFIG_FILE,
  57. 'seqrun_id': "{{ dag_run.conf['seqrun_id'] }}" },
  58. command="""
  59. source /home/igf/igf_code/env.sh;
  60. python /home/igf/igf_code/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
  61. -f {{ params.ftp_seqrun_server }} \
  62. -s {{ params.seqrun_id }} \
  63. -d {{ params.seqrun_base_path }} \
  64. -c {{ params.ftp_config_file }}
  65. """)
  66. ## PIPELINE
  67. ping_remote_host >> check_and_transfer_run