dag14_crick_seqrun_transfer.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. from datetime import timedelta
  2. from airflow.models import DAG,Variable
  3. from airflow.utils.dates import days_ago
  4. from airflow.contrib.operators.ssh_operator import SSHOperator
  5. from airflow.contrib.hooks.ssh_hook import SSHHook
  6. from airflow.operators.python_operator import PythonOperator
  7. #from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_transfer_run_func
  8. #from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import extract_tar_file_func
  9. FTP_SEQRUN_SERVER = Variable.get('crick_ftp_seqrun_hostname')
  10. FTP_CONFIG_FILE = Variable.get('crick_ftp_config_file_wells')
  11. SEQRUN_BASE_PATH = Variable.get('seqrun_base_path')
  12. HPC_SEQRUN_BASE_PATH = Variable.get('hpc_seqrun_path')
  13. args = {
  14. 'owner': 'airflow',
  15. 'start_date': days_ago(2),
  16. 'retries': 1,
  17. 'retry_delay': timedelta(minutes=5),
  18. 'provide_context': True,
  19. 'email_on_failure': False,
  20. 'email_on_retry': False,
  21. 'catchup': False,
  22. 'max_active_runs': 1,
  23. }
  24. ## SSH HOOK
  25. orwell_ssh_hook = \
  26. SSHHook(
  27. key_file=Variable.get('hpc_ssh_key_file'),
  28. username=Variable.get('hpc_user'),
  29. remote_host=Variable.get('orwell_server_hostname'))
  30. dag = \
  31. DAG(
  32. dag_id='dag14_crick_seqrun_transfer',
  33. schedule_interval=None,
  34. default_args=args,
  35. tags=['ftp', 'hpc', 'orwell', 'wells'])
  36. with dag:
  37. ## TASK
  38. # not working on HPC
  39. #check_and_transfer_run = \
  40. # PythonOperator(
  41. # task_id='check_and_transfer_run',
  42. # dag=dag,
  43. # pool='crick_ftp_pool',
  44. # queue='hpc_4G',
  45. # params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
  46. # 'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH,
  47. # 'ftp_config_file': FTP_CONFIG_FILE},
  48. # python_callable=check_and_transfer_run_func)
  49. ## TASK
  50. #extract_tar_file = \
  51. # PythonOperator(
  52. # task_id='extract_tar_file',
  53. # dag=dag,
  54. # queue='hpc_4G',
  55. # params={'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH},
  56. # python_callable=extract_tar_file_func)
  57. check_and_transfer_run = \
  58. SSHOperator(
  59. task_id='check_and_transfer_run',
  60. dag=dag,
  61. pool='crick_ftp_pool',
  62. ssh_hook=orwell_ssh_hook,
  63. do_xcom_push=False,
  64. queue='wells',
  65. params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
  66. 'seqrun_base_path': HPC_SEQRUN_BASE_PATH,
  67. 'ftp_config_file': FTP_CONFIG_FILE},
  68. command="""
  69. python /rds/general/user/igf/home/data2/airflow_test/github/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
  70. -f {{ params.ftp_seqrun_server }} \
  71. -s {{ dag_run.conf["seqrun_id"] }} \
  72. -d {{ params.seqrun_base_path }} \
  73. -c {{ params.ftp_config_file }}
  74. """)
  75. # TASK
  76. extract_tar_file = \
  77. SSHOperator(
  78. task_id='extract_tar_file',
  79. dag=dag,
  80. do_xcom_push=False,
  81. queue='wells',
  82. params={'seqrun_base_path': HPC_SEQRUN_BASE_PATH},
  83. command="""
  84. cd {{ params.seqrun_base_path }};
  85. if [ -d temp_{{ dag_run.conf["seqrun_id"] }} ];
  86. then
  87. echo "Seqrun dir exists";
  88. exit 1;
  89. else
  90. mkdir -p temp_{{ dag_run.conf["seqrun_id"] }};
  91. tar \
  92. --no-same-owner \
  93. --no-same-permissions \
  94. --owner=igf \
  95. -xzf {{ dag_run.conf["seqrun_id"] }}.tar.gz \
  96. -C temp_{{ dag_run.conf["seqrun_id"] }};
  97. find temp_{{ dag_run.conf["seqrun_id"] }} \
  98. -type d \
  99. -exec chmod 700 {} \;
  100. chmod -R u+r temp_{{ dag_run.conf["seqrun_id"] }};
  101. chmod -R u+w temp_{{ dag_run.conf["seqrun_id"] }};
  102. fi
  103. """
  104. )
  105. ## TASK
  106. move_seqrun_dir = \
  107. SSHOperator(
  108. task_id='move_seqrun_dir',
  109. dag=dag,
  110. pool='orwell_exe_pool',
  111. ssh_hook=orwell_ssh_hook,
  112. do_xcom_push=False,
  113. queue='hpc_4G',
  114. params={'seqrun_base_path': SEQRUN_BASE_PATH},
  115. command="""
  116. cd {{ params.seqrun_base_path }};
  117. if [ -d {{ dag_run.conf["seqrun_id"] }} ];
  118. then
  119. echo "Seqrun dir exists";
  120. exit 1;
  121. fi
  122. ls temp_{{ dag_run.conf["seqrun_id"] }}/camp/stp/sequencing/inputs/instruments/sequencers/{{ dag_run.conf["seqrun_id"] }};
  123. move temp_{{ dag_run.conf["seqrun_id"] }}/camp/stp/sequencing/inputs/instruments/sequencers/{{ dag_run.conf["seqrun_id"] }} {{ params.seqrun_base_path }}/{{ dag_run.conf["seqrun_id"] }};
  124. """
  125. )
  126. ## PIPELINE
  127. check_and_transfer_run >> extract_tar_file >> move_seqrun_dir