dag18_upload_and_trigger_analysis.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import os, logging
  2. from datetime import timedelta
  3. from airflow.models import DAG, Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dummy_operator import DummyOperator
  8. from airflow.operators.dagrun_operator import TriggerDagRunOperator
  9. #from airflow.operators.trigger_dagrun import TriggerDagRunOperator # FIX for v2
  10. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import find_analysis_designs_func
  11. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import load_analysis_design_func
  12. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import find_analysis_to_trigger_dags_func
  13. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import send_log_and_reset_trigger_file_func
  14. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import trigger_dag_func
  15. SLACK_CONF = \
  16. Variable.get('slack_conf', default_var=None)
  17. MS_TEAMS_CONF = \
  18. Variable.get('ms_teams_conf', default_var=None)
  19. args = {
  20. 'owner': 'airflow',
  21. 'start_date': days_ago(2),
  22. 'retries': 1,
  23. 'retry_delay': timedelta(minutes=5),
  24. 'provide_context': True,
  25. 'email_on_failure': False,
  26. 'email_on_retry': False,
  27. 'catchup': False,
  28. 'max_active_runs': 1}
  29. DAG_ID = \
  30. os.path.basename(__file__).\
  31. replace(".pyc", "").\
  32. replace(".py", "")
  33. dag = \
  34. DAG(
  35. dag_id=DAG_ID,
  36. schedule_interval=None,
  37. default_args=args,
  38. orientation='LR',
  39. tags=['hpc'])
  40. ANALYSIS_LIST = \
  41. Variable.get("analysis_dag_list", default_var={})
  42. ## DAG
  43. with dag:
  44. ## TASK
  45. find_analysis_designs = \
  46. BranchPythonOperator(
  47. task_id="find_analysis_designs",
  48. dag=dag,
  49. queue='hpc_4G',
  50. params={
  51. "load_analysis_task_prefix": "load_analysis_design",
  52. "no_task_name":"no_task",
  53. "load_task_limit": 20,
  54. "load_design_xcom_key": "load_design"},
  55. python_callable=find_analysis_designs_func)
  56. ## TASK
  57. no_task = \
  58. DummyOperator(
  59. task_id="no_task",
  60. dag=dag)
  61. ## TASK
  62. load_analysis_design_tasks = [no_task]
  63. for i in range(0, 10):
  64. t = \
  65. PythonOperator(
  66. task_id="load_analysis_design_{0}".format(i),
  67. dag=dag,
  68. queue='hpc_4G',
  69. params={
  70. "task_index": i,
  71. "load_design_xcom_key": "load_design",
  72. "load_design_xcom_task": "find_analysis_designs"},
  73. python_callable=load_analysis_design_func)
  74. load_analysis_design_tasks.append(t)
  75. ## TASK
  76. find_analysis_to_trigger_dags = \
  77. BranchPythonOperator(
  78. task_id="find_analysis_to_trigger_dags",
  79. dag=dag,
  80. queue='hpc_4G',
  81. params={
  82. "no_trigger_task": "no_trigger",
  83. "analysis_limit": 40,
  84. "xcom_key": "analysis_list",
  85. "trigger_task_prefix": "trigger"},
  86. trigger_rule='none_failed_or_skipped',
  87. python_callable=find_analysis_to_trigger_dags_func)
  88. ## TASK
  89. no_trigger = \
  90. DummyOperator(
  91. task_id="no_trigger",
  92. dag=dag)
  93. ## TASK
  94. trigger_analysis_dag_tasks = [no_trigger]
  95. for analysis_name in ANALYSIS_LIST.keys():
  96. j = \
  97. DummyOperator(
  98. task_id="finished_{0}".format(analysis_name),
  99. dag=dag)
  100. for i in range(0, 40):
  101. t = \
  102. TriggerDagRunOperator(
  103. task_id="trigger_{0}_{1}".format(analysis_name, i),
  104. trigger_dag_id=analysis_name,
  105. dag=dag,
  106. queue='hpc_4G',
  107. params={
  108. "xcom_key": "analysis_list",
  109. "xcom_task": "find_analysis_to_trigger_dags",
  110. "analysis_name": analysis_name,
  111. "index": i},
  112. python_callable=trigger_dag_func)
  113. ## PIPELINE
  114. find_analysis_to_trigger_dags >> t >> j
  115. trigger_analysis_dag_tasks.append(j)
  116. ## TASK
  117. send_log_and_reset_trigger_file = \
  118. PythonOperator(
  119. task_id="send_log_and_reset_trigger_file",
  120. dag=dag,
  121. queue='hpc_4G',
  122. params={
  123. "xcom_key": "analysis_list",
  124. "xcom_task": "find_analysis_to_trigger_dags"},
  125. trigger_rule='none_failed_or_skipped',
  126. python_callable=send_log_and_reset_trigger_file_func)
  127. ## PIPELINE
  128. find_analysis_designs >> load_analysis_design_tasks
  129. load_analysis_design_tasks >> find_analysis_to_trigger_dags
  130. find_analysis_to_trigger_dags >> no_trigger
  131. trigger_analysis_dag_tasks >> send_log_and_reset_trigger_file