dag6_seqrun_processing.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. from datetime import timedelta
  2. from airflow.models import DAG,Variable
  3. from airflow.utils.dates import days_ago
  4. from airflow.operators.bash_operator import BashOperator
  5. from airflow.contrib.operators.ssh_operator import SSHOperator
  6. from airflow.contrib.hooks.ssh_hook import SSHHook
  7. default_args = {
  8. 'owner': 'airflow',
  9. 'depends_on_past': False,
  10. 'start_date': days_ago(2),
  11. 'email_on_failure': False,
  12. 'email_on_retry': False,
  13. 'retries': 1,
  14. 'retry_delay': timedelta(minutes=5),
  15. }
  16. dag = \
  17. DAG(
  18. dag_id='dag6_seqrun_processing',
  19. catchup=False,
  20. schedule_interval="@hourly",
  21. max_active_runs=1,
  22. tags=['hpc','orwell'],
  23. default_args=default_args)
  24. orwell_ssh_hook = \
  25. SSHHook(
  26. key_file=Variable.get('hpc_ssh_key_file'),
  27. username=Variable.get('hpc_user'),
  28. remote_host='orwell.hh.med.ic.ac.uk')
  29. hpc_hook = SSHHook(ssh_conn_id='hpc_conn')
  30. with dag:
  31. switch_off_project_barcode = \
  32. SSHOperator(
  33. task_id = 'switch_off_project_barcode',
  34. dag = dag,
  35. ssh_hook = orwell_ssh_hook,
  36. queue='hpc_4G',
  37. command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/switch_off_project_barcode_check.sh '
  38. )
  39. change_samplesheet_for_run = \
  40. SSHOperator(
  41. task_id = 'change_samplesheet_for_run',
  42. dag = dag,
  43. queue='hpc_4G',
  44. ssh_hook = orwell_ssh_hook,
  45. command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/change_samplesheet_for_seqrun.sh '
  46. )
  47. restart_seqrun_processing = \
  48. SSHOperator(
  49. task_id = 'restart_seqrun_processing',
  50. dag = dag,
  51. queue='hpc_4G',
  52. ssh_hook = orwell_ssh_hook,
  53. command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/restart_seqrun_processing.sh '
  54. )
  55. register_project_metadata = \
  56. SSHOperator(
  57. task_id = 'register_project_metadata',
  58. dag = dag,
  59. queue='hpc_4G',
  60. ssh_hook = orwell_ssh_hook,
  61. command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/register_metadata.sh '
  62. )
  63. find_new_seqrun = \
  64. SSHOperator(
  65. task_id = 'find_new_seqrun',
  66. dag = dag,
  67. queue='hpc_4G',
  68. ssh_hook = orwell_ssh_hook,
  69. command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/find_new_seqrun.sh '
  70. )
  71. seed_demultiplexing_pipe = \
  72. SSHOperator(
  73. task_id = 'seed_demultiplexing_pipe',
  74. dag = dag,
  75. ssh_hook=hpc_hook,
  76. queue='hpc_4G',
  77. command = 'bash /rds/general/user/igf/home/git_repo/IGF-cron-scripts/hpc/seed_demultiplexing_pipeline.sh '
  78. )
  79. switch_off_project_barcode >> change_samplesheet_for_run >> restart_seqrun_processing
  80. restart_seqrun_processing >> register_project_metadata >> find_new_seqrun >> seed_demultiplexing_pipe