dag14_crick_seqrun_transfer.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import argparse
  2. from datetime import timedelta
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.contrib.operators.ssh_operator import SSHOperator
  6. from airflow.contrib.hooks.ssh_hook import SSHHook
  7. FTP_SEQRUN_SERVER = Variable.get('crick_ftp_seqrun_hostname')
  8. FTP_CONFIG_FILE = Variable.get('crick_ftp_config_file')
  9. SEQRUN_BASE_PATH = Variable.get('seqrun_base_path')
  10. args = {
  11. 'owner': 'airflow',
  12. 'start_date': days_ago(2),
  13. 'retries': 1,
  14. 'retry_delay': timedelta(minutes=5),
  15. 'provide_context': True,
  16. 'email_on_failure': False,
  17. 'email_on_retry': False,
  18. 'catchup': False,
  19. 'max_active_runs': 1,
  20. }
  21. ## SSH HOOK
  22. orwell_ssh_hook = \
  23. SSHHook(
  24. key_file=Variable.get('hpc_ssh_key_file'),
  25. username=Variable.get('hpc_user'),
  26. remote_host=Variable.get('orwell_server_hostname'))
  27. dag = \
  28. DAG(
  29. dag_id='dag14_crick_seqrun_transfer',
  30. schedule_interval=None,
  31. default_args=args,
  32. tags=['ftp', 'hpc', 'orwell'])
  33. with dag:
  34. ## TASK
  35. check_and_transfer_run = \
  36. SSHOperator(
  37. task_id='check_and_transfer_run',
  38. dag=dag,
  39. pool='crick_ftp_pool',
  40. ssh_hook=orwell_ssh_hook,
  41. do_xcom_push=False,
  42. queue='hpc_4G',
  43. params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
  44. 'seqrun_base_path': SEQRUN_BASE_PATH,
  45. 'ftp_config_file': FTP_CONFIG_FILE},
  46. command="""
  47. source /home/igf/igf_code/airflow/env.sh;
  48. python /home/igf/igf_code/airflow/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
  49. -f {{ params.ftp_seqrun_server }} \
  50. -s {{ dag_run.conf["seqrun_id"] }} \
  51. -d {{ params.seqrun_base_path }} \
  52. -c {{ params.ftp_config_file }}
  53. """)
  54. ## TASK
  55. extract_tar_file = \
  56. SSHOperator(
  57. task_id='extract_tar_file',
  58. dag=dag,
  59. pool='orwell_exe_pool',
  60. ssh_hook=orwell_ssh_hook,
  61. do_xcom_push=False,
  62. queue='hpc_4G',
  63. params={'seqrun_base_path': SEQRUN_BASE_PATH},
  64. command="""
  65. cd {{ params.seqrun_base_path }};
  66. if [ -d {{ dag_run.conf["seqrun_id"] }} ];
  67. then
  68. echo "Seqrun dir exists";
  69. exit 1;
  70. else
  71. mkdir -p {{ dag_run.conf["seqrun_id"] }};
  72. tar -xzf {{ dag_run.conf["seqrun_id"] }}.tar.gz -C {{ dag_run.conf["seqrun_id"] }}
  73. fi
  74. """
  75. )
  76. ## PIPELINE
  77. check_and_transfer_run >> extract_tar_file