dag14_crick_seqrun_transfer.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. from datetime import timedelta
  2. from airflow.models import DAG,Variable
  3. from airflow.utils.dates import days_ago
  4. from airflow.contrib.operators.ssh_operator import SSHOperator
  5. from airflow.contrib.hooks.ssh_hook import SSHHook
  6. from airflow.operators.bash_operator import BashOperator
  7. from airflow.operators.python_operator import PythonOperator
  8. from airflow.contrib.operators.sftp_operator import SFTPOperator
  9. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_transfer_run_func
  10. from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import extract_tar_file_func
  11. FTP_SEQRUN_SERVER = Variable.get('crick_ftp_seqrun_hostname')
  12. FTP_CONFIG_FILE = Variable.get('crick_ftp_config_file_wells')
  13. FTP_BASE_PATH = Variable.get('crick_ftp_base_path')
  14. SEQRUN_BASE_PATH = Variable.get('seqrun_base_path')
  15. HPC_SEQRUN_BASE_PATH = Variable.get('hpc_seqrun_path')
  16. args = {
  17. 'owner': 'airflow',
  18. 'start_date': days_ago(2),
  19. 'retries': 1,
  20. 'retry_delay': timedelta(minutes=5),
  21. 'provide_context': True,
  22. 'email_on_failure': False,
  23. 'email_on_retry': False,
  24. 'catchup': False,
  25. 'max_active_runs': 1,
  26. }
  27. ## SSH HOOK
  28. #orwell_ssh_hook = \
  29. # SSHHook(
  30. # key_file=Variable.get('hpc_ssh_key_file'),
  31. # username=Variable.get('hpc_user'),
  32. # remote_host=Variable.get('orwell_server_hostname'))
  33. dag = \
  34. DAG(
  35. dag_id='dag14_crick_seqrun_transfer',
  36. schedule_interval=None,
  37. default_args=args,
  38. tags=['ftp', 'hpc', 'orwell', 'wells'])
  39. with dag:
  40. ## TASK
  41. # not working on HPC
  42. check_and_transfer_run = \
  43. PythonOperator(
  44. task_id='check_and_transfer_run',
  45. dag=dag,
  46. pool='crick_ftp_pool',
  47. queue='hpc_4G',
  48. params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
  49. 'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH,
  50. 'ftp_config_file': FTP_CONFIG_FILE},
  51. python_callable=check_and_transfer_run_func)
  52. ## TASK
  53. extract_tar_file = \
  54. PythonOperator(
  55. task_id='extract_tar_file',
  56. dag=dag,
  57. queue='hpc_4G',
  58. python_callable=extract_tar_file_func)
  59. #check_and_transfer_run = \
  60. # BashOperator(
  61. # task_id='check_and_transfer_run',
  62. # dag=dag,
  63. # pool='crick_ftp_pool',
  64. # do_xcom_push=False,
  65. # queue='wells',
  66. # params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
  67. # 'seqrun_base_path': HPC_SEQRUN_BASE_PATH,
  68. # 'ftp_config_file': FTP_CONFIG_FILE},
  69. # bash_command="""
  70. # python /rds/general/user/igf/home/data2/airflow_test/github/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
  71. # -f {{ params.ftp_seqrun_server }} \
  72. # -s {{ dag_run.conf["seqrun_id"] }} \
  73. # -d {{ params.seqrun_base_path }} \
  74. # -c {{ params.ftp_config_file }}
  75. # """)
  76. #check_and_transfer_run = \
  77. # SFTPOperator(
  78. # task_id='check_and_transfer_run',
  79. # dag=dag,
  80. # pool='crick_ftp_pool',
  81. # queue='hpc_4G',
  82. # ssh_conn_id="crick_sftp_conn",
  83. # params={'seqrun_base_path': HPC_SEQRUN_BASE_PATH,
  84. # 'ftp_base_path': FTP_BASE_PATH},
  85. # local_filepath="{{ params.seqrun_base_path }}/{{ dag_run.conf['seqrun_id'] }}.tar.gz",
  86. # remote_filepath="{{ params.ftp_base_path }}/{{ dag_run.conf['seqrun_id'] }}.tar.gz",
  87. # operation='get'
  88. # )
  89. # TASK
  90. #extract_tar_file = \
  91. # BashOperator(
  92. # task_id='extract_tar_file',
  93. # dag=dag,
  94. # do_xcom_push=False,
  95. # queue='hpc_4G',
  96. # params={'seqrun_base_path': HPC_SEQRUN_BASE_PATH},
  97. # bash_command="""
  98. # cd {{ params.seqrun_base_path }};
  99. # if [ -d temp_{{ dag_run.conf["seqrun_id"] }} ];
  100. # then
  101. # echo "Seqrun dir exists";
  102. # exit 1;
  103. # else
  104. # mkdir -p temp_{{ dag_run.conf["seqrun_id"] }};
  105. # tar \
  106. # --no-same-owner \
  107. # --no-same-permissions \
  108. # --owner=igf \
  109. # -xzf {{ dag_run.conf["seqrun_id"] }}.tar.gz \
  110. # -C temp_{{ dag_run.conf["seqrun_id"] }};
  111. # find temp_{{ dag_run.conf["seqrun_id"] }} \
  112. # -type d \
  113. # -exec chmod 700 {} \;
  114. # chmod -R u+r temp_{{ dag_run.conf["seqrun_id"] }};
  115. # chmod -R u+w temp_{{ dag_run.conf["seqrun_id"] }};
  116. # fi
  117. # """
  118. # )
  119. ## TASK
  120. #move_seqrun_dir = \
  121. # BashOperator(
  122. # task_id='move_seqrun_dir',
  123. # dag=dag,
  124. # do_xcom_push=False,
  125. # queue='hpc_4G',
  126. # params={'seqrun_base_path': SEQRUN_BASE_PATH},
  127. # bash_command="""
  128. # cd {{ params.seqrun_base_path }};
  129. # if [ -d {{ dag_run.conf["seqrun_id"] }} ];
  130. # then
  131. # echo "Seqrun dir exists";
  132. # exit 1;
  133. # fi
  134. # ls temp_{{ dag_run.conf["seqrun_id"] }}/camp/stp/sequencing/inputs/instruments/sequencers/{{ dag_run.conf["seqrun_id"] }};
  135. # move temp_{{ dag_run.conf["seqrun_id"] }}/camp/stp/sequencing/inputs/instruments/sequencers/{{ dag_run.conf["seqrun_id"] }} {{ params.seqrun_base_path }}/{{ dag_run.conf["seqrun_id"] }};
  136. # """
  137. # )
  138. ## PIPELINE
  139. check_and_transfer_run >> extract_tar_file