dag18_upload_and_trigger_analysis.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import os, logging
  2. from datetime import timedelta
  3. from airflow.models import DAG, Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dummy_operator import DummyOperator
  8. from airflow.operators.dagrun_operator import TriggerDagRunOperator
  9. #from airflow.operators.trigger_dagrun import TriggerDagRunOperator # FIX for v2
  10. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import find_analysis_designs_func
  11. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import load_analysis_design_func
  12. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import find_analysis_to_trigger_dags_func
  13. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import send_log_and_reset_trigger_file_func
  14. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import trigger_dag_func
  15. SLACK_CONF = \
  16. Variable.get('slack_conf', default_var=None)
  17. MS_TEAMS_CONF = \
  18. Variable.get('ms_teams_conf', default_var=None)
  19. args = {
  20. 'owner': 'airflow',
  21. 'start_date': days_ago(2),
  22. 'retries': 1,
  23. 'retry_delay': timedelta(minutes=5),
  24. 'provide_context': True,
  25. 'email_on_failure': False,
  26. 'email_on_retry': False,
  27. 'catchup': False,
  28. 'max_active_runs': 1}
  29. DAG_ID = \
  30. os.path.basename(__file__).\
  31. replace(".pyc", "").\
  32. replace(".py", "")
  33. dag = \
  34. DAG(
  35. dag_id=DAG_ID,
  36. schedule_interval=None,
  37. default_args=args,
  38. orientation='LR',
  39. tags=['hpc'])
  40. ANALYSIS_LIST = \
  41. Variable.get("analysis_dag_list", default_var={})
  42. ## DAG
  43. with dag:
  44. ## TASK
  45. find_analysis_designs = \
  46. BranchPythonOperator(
  47. task_id="find_analysis_designs",
  48. dag=dag,
  49. queue='hpc_4G',
  50. params={
  51. "load_analysis_task_prefix": "load_analysis_design",
  52. "no_task_name":"no_task",
  53. "load_task_limit": 20,
  54. "load_design_xcom_key": "load_design"},
  55. python_callable=find_analysis_designs_func)
  56. ## TASK
  57. no_task = \
  58. DummyOperator(
  59. task_id="no_task",
  60. dag=dag)
  61. ## TASK
  62. load_analysis_design_tasks = [no_task]
  63. for i in range(0, 10):
  64. t = \
  65. PythonOperator(
  66. task_id="load_analysis_design_{0}".format(i),
  67. dag=dag,
  68. params={
  69. "task_index": i,
  70. "load_design_xcom_key": "load_design",
  71. "load_design_xcom_task": "find_analysis_designs"},
  72. python_callable=load_analysis_design_func)
  73. load_analysis_design_tasks.append(t)
  74. ## TASK
  75. find_analysis_to_trigger_dags = \
  76. BranchPythonOperator(
  77. task_id="find_analysis_to_trigger_dags",
  78. dag=dag,
  79. queue='hpc_4G',
  80. params={
  81. "no_trigger_task": "no_trigger",
  82. "analysis_limit": 40,
  83. "xcom_key": "analysis_list",
  84. "trigger_task_prefix": "trigger"},
  85. trigger_rule='none_failed_or_skipped',
  86. python_callable=find_analysis_to_trigger_dags_func)
  87. ## TASK
  88. no_trigger = \
  89. DummyOperator(
  90. task_id="no_trigger",
  91. dag=dag)
  92. ## TASK
  93. trigger_analysis_dag_tasks = [no_trigger]
  94. for analysis_name in ANALYSIS_LIST.keys():
  95. j = \
  96. DummyOperator(
  97. task_id="finished_{0}".format(analysis_name),
  98. dag=dag)
  99. for i in range(0, 40):
  100. t = \
  101. TriggerDagRunOperator(
  102. task_id="trigger_{0}_{1}".format(analysis_name, i),
  103. trigger_dag_id=analysis_name,
  104. dag=dag,
  105. queue='hpc_4G',
  106. params={
  107. "xcom_key": "analysis_list",
  108. "xcom_task": "find_analysis_to_trigger_dags",
  109. "analysis_name": analysis_name,
  110. "index": i},
  111. python_callable=trigger_dag_func)
  112. ## PIPELINE
  113. find_analysis_to_trigger_dags >> t >> j
  114. trigger_analysis_dag_tasks.append(j)
  115. ## TASK
  116. send_log_and_reset_trigger_file = \
  117. PythonOperator(
  118. task_id="send_log_and_reset_trigger_file",
  119. dag=dag,
  120. queue='hpc_4G',
  121. params={
  122. "xcom_key": "analysis_list",
  123. "xcom_task": "find_analysis_to_trigger_dags"},
  124. trigger_rule='none_failed_or_skipped',
  125. python_callable=send_log_and_reset_trigger_file_func)
  126. ## PIPELINE
  127. find_analysis_designs >> load_analysis_design_tasks
  128. load_analysis_design_tasks >> find_analysis_to_trigger_dags
  129. trigger_analysis_dag_tasks >> send_log_and_reset_trigger_file