dag14_crick_seqrun_transfer.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. from datetime import timedelta
  2. from airflow.models import DAG,Variable
  3. from airflow.utils.dates import days_ago
  4. from airflow.contrib.operators.ssh_operator import SSHOperator
  5. from airflow.contrib.hooks.ssh_hook import SSHHook
  6. FTP_SEQRUN_SERVER = Variable.get('crick_ftp_seqrun_hostname')
  7. FTP_CONFIG_FILE = Variable.get('crick_ftp_config_file')
  8. SEQRUN_BASE_PATH = Variable.get('seqrun_base_path')
  9. args = {
  10. 'owner': 'airflow',
  11. 'start_date': days_ago(2),
  12. 'retries': 1,
  13. 'retry_delay': timedelta(minutes=5),
  14. 'provide_context': True,
  15. 'email_on_failure': False,
  16. 'email_on_retry': False,
  17. 'catchup': False,
  18. 'max_active_runs': 1,
  19. }
  20. ## SSH HOOK
  21. orwell_ssh_hook = \
  22. SSHHook(
  23. key_file=Variable.get('hpc_ssh_key_file'),
  24. username=Variable.get('hpc_user'),
  25. remote_host=Variable.get('orwell_server_hostname'))
  26. dag = \
  27. DAG(
  28. dag_id='dag14_crick_seqrun_transfer',
  29. schedule_interval=None,
  30. default_args=args,
  31. tags=['ftp', 'hpc', 'orwell'])
  32. with dag:
  33. # TASK
  34. check_and_transfer_run = \
  35. SSHOperator(
  36. task_id='check_and_transfer_run',
  37. dag=dag,
  38. pool='crick_ftp_pool',
  39. ssh_hook=orwell_ssh_hook,
  40. do_xcom_push=False,
  41. queue='hpc_4G',
  42. params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
  43. 'seqrun_base_path': SEQRUN_BASE_PATH,
  44. 'ftp_config_file': FTP_CONFIG_FILE},
  45. env={'seqrun_id': '{{ dag_run.conf["seqrun_id"] if dag_run else "" }}'},
  46. command="""
  47. source /home/igf/igf_code/airflow/env.sh;
  48. python /home/igf/igf_code/airflow/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
  49. -f {{ params.ftp_seqrun_server }} \
  50. -s \'$seqrun_id\' \
  51. -d {{ params.seqrun_base_path }} \
  52. -c {{ params.ftp_config_file }}
  53. """)
  54. ## PIPELINE
  55. check_and_transfer_run