dag14_crick_seqrun_transfer.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import argparse
  2. from datetime import timedelta
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.contrib.operators.ssh_operator import SSHOperator
  6. from airflow.contrib.hooks.ssh_hook import SSHHook
  7. from airflow.operators.python_operator import PythonOperator
  8. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_transfer_run_func
  9. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import extract_tar_file_func
  10. FTP_SEQRUN_SERVER = Variable.get('crick_ftp_seqrun_hostname')
  11. FTP_CONFIG_FILE = Variable.get('crick_ftp_config_file')
  12. SEQRUN_BASE_PATH = Variable.get('seqrun_base_path')
  13. HPC_SEQRUN_BASE_PATH = Variable.get('hpc_seqrun_path')
  14. args = {
  15. 'owner': 'airflow',
  16. 'start_date': days_ago(2),
  17. 'retries': 1,
  18. 'retry_delay': timedelta(minutes=5),
  19. 'provide_context': True,
  20. 'email_on_failure': False,
  21. 'email_on_retry': False,
  22. 'catchup': False,
  23. 'max_active_runs': 1,
  24. }
  25. ## SSH HOOK
  26. orwell_ssh_hook = \
  27. SSHHook(
  28. key_file=Variable.get('hpc_ssh_key_file'),
  29. username=Variable.get('hpc_user'),
  30. remote_host=Variable.get('orwell_server_hostname'))
  31. dag = \
  32. DAG(
  33. dag_id='dag14_crick_seqrun_transfer',
  34. schedule_interval=None,
  35. default_args=args,
  36. tags=['ftp', 'hpc', 'orwell'])
  37. with dag:
  38. ## TASK
  39. check_and_transfer_run = \
  40. PythonOperator(
  41. task_id='check_and_transfer_run',
  42. dag=dag,
  43. pool='crick_ftp_pool',
  44. queue='hpc_4G',
  45. params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
  46. 'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH,
  47. 'ftp_config_file': FTP_CONFIG_FILE},
  48. python_callable=check_and_transfer_run_func)
  49. ## TASK
  50. extract_tar_file = \
  51. PythonOperator(
  52. task_id='extract_tar_file',
  53. dag=dag,
  54. queue='hpc_4G',
  55. params={'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH},
  56. python_callable=extract_tar_file_func)
  57. #check_and_transfer_run = \
  58. # SSHOperator(
  59. # task_id='check_and_transfer_run',
  60. # dag=dag,
  61. # pool='crick_ftp_pool',
  62. # ssh_hook=orwell_ssh_hook,
  63. # do_xcom_push=False,
  64. # queue='hpc_4G',
  65. # params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
  66. # 'seqrun_base_path': SEQRUN_BASE_PATH,
  67. # 'ftp_config_file': FTP_CONFIG_FILE},
  68. # command="""
  69. # source /home/igf/igf_code/airflow/env.sh;
  70. # python /home/igf/igf_code/airflow/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
  71. # -f {{ params.ftp_seqrun_server }} \
  72. # -s {{ dag_run.conf["seqrun_id"] }} \
  73. # -d {{ params.seqrun_base_path }} \
  74. # -c {{ params.ftp_config_file }}
  75. # """)
  76. ## TASK
  77. #extract_tar_file = \
  78. # SSHOperator(
  79. # task_id='extract_tar_file',
  80. # dag=dag,
  81. # pool='orwell_exe_pool',
  82. # ssh_hook=orwell_ssh_hook,
  83. # do_xcom_push=False,
  84. # queue='hpc_4G',
  85. # params={'seqrun_base_path': SEQRUN_BASE_PATH},
  86. # command="""
  87. # cd {{ params.seqrun_base_path }};
  88. # if [ -d {{ dag_run.conf["seqrun_id"] }} ];
  89. # then
  90. # echo "Seqrun dir exists";
  91. # exit 1;
  92. # else
  93. # mkdir -p {{ dag_run.conf["seqrun_id"] }};
  94. # tar \
  95. # --no-same-owner \
  96. # --no-same-permissions \
  97. # --owner=igf \
  98. # -xzf {{ dag_run.conf["seqrun_id"] }}.tar.gz -C {{ dag_run.conf["seqrun_id"] }}
  99. # fi
  100. # """
  101. # )
  102. ## PIPELINE
  103. check_and_transfer_run >> extract_tar_file