dag19_fastq_screen_for_seqrun.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. import os
  2. from datetime import timedelta
  3. from airflow.models import DAG, Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dummy_operator import DummyOperator
  8. args = {
  9. 'owner': 'airflow',
  10. 'start_date': days_ago(2),
  11. 'retries': 1,
  12. 'retry_delay': timedelta(minutes=5),
  13. 'provide_context': True,
  14. 'email_on_failure': False,
  15. 'email_on_retry': False,
  16. 'catchup': False,
  17. 'max_active_runs': 1}
  18. DAG_ID = \
  19. os.path.basename(__file__).\
  20. replace(".pyc", "").\
  21. replace(".py", "")
  22. dag = \
  23. DAG(
  24. dag_id=DAG_ID,
  25. schedule_interval=None,
  26. default_args=args,
  27. orientation='LR',
  28. tags=['hpc'])
  29. ## DAG
  30. with dag:
  31. ## TASK
  32. check_finish_status_for_seqrun = \
  33. DummyOperator(
  34. task_id="check_finish_status_for_seqrun",
  35. dag=dag)
  36. ## TASK
  37. lane_level_start_tasks = list()
  38. lane_level_end_tasks = list()
  39. for lane_id in range(1,9):
  40. ## TASK - lane level
  41. fetch_projects_for_lane = \
  42. DummyOperator(
  43. task_id="fetch_projects_for_lane_{0}".format(lane_id),
  44. dag=dag)
  45. lane_level_start_tasks.\
  46. append(fetch_projects_for_lane)
  47. project_level_start_tasks = list()
  48. project_level_end_tasks = list()
  49. for project_id in range(1,4):
  50. ## TASK - project level
  51. fetch_samples_for_lane_project = \
  52. DummyOperator(
  53. task_id="fetch_fastqs_for_lane_{0}_project_{1}".format(lane_id, project_id),
  54. dag=dag)
  55. project_level_start_tasks.\
  56. append(fetch_samples_for_lane_project)
  57. sample_level_tasks = list()
  58. for sample_count in range(1,97):
  59. ## TASK - sample level
  60. run_fastq_screen_for_sample = \
  61. DummyOperator(
  62. task_id="run_fastq_screen_for_lane_{0}_project_{1}_sample_{2}".format(lane_id, project_id, sample_count),
  63. dag=dag)
  64. sample_level_tasks.\
  65. append(run_fastq_screen_for_sample)
  66. ## TASK - collect all samples for project
  67. collect_all_samples_for_project = \
  68. DummyOperator(
  69. task_id="collect_output_for_lane_{0}_project_{1}".format(lane_id, project_id),
  70. dag=dag)
  71. project_level_end_tasks.\
  72. append(collect_all_samples_for_project)
  73. ## PIPELINE
  74. fetch_samples_for_lane_project >> sample_level_tasks
  75. sample_level_tasks >> collect_all_samples_for_project
  76. ## TASK - collect all project for lane
  77. collect_output_for_all_project_for_lane = \
  78. DummyOperator(
  79. task_id="collect_output_for_lane_{0}".format(lane_id),
  80. dag=dag)
  81. lane_level_end_tasks.\
  82. append(collect_output_for_all_project_for_lane)
  83. ## PIPELINE
  84. fetch_projects_for_lane >> project_level_start_tasks
  85. project_level_end_tasks >> collect_output_for_all_project_for_lane
  86. ## TASK - collect all lane for flowcell
  87. collect_all_output_for_flowcell = \
  88. DummyOperator(
  89. task_id="collect_all_output_for_flowcell",
  90. dag=dag)
  91. ## PIPELINE
  92. check_finish_status_for_seqrun >> lane_level_start_tasks
  93. lane_level_end_tasks >> collect_all_output_for_flowcell
  94. ## TASK
  95. merge_all_output_and_create_report = \
  96. DummyOperator(
  97. task_id="merge_all_output_and_create_report",
  98. dag=dag)
  99. ## TASK
  100. upload_report_to_box = \
  101. DummyOperator(
  102. task_id="upload_report_to_box",
  103. dag=dag)
  104. ## PIPELINE
  105. collect_all_output_for_flowcell >> merge_all_output_and_create_report
  106. merge_all_output_and_create_report >> upload_report_to_box