dag18_upload_and_trigger_analysis.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. import os, logging
  2. from datetime import timedelta
  3. from airflow.models import DAG, Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dummy_operator import DummyOperator
  8. from airflow.operators.dagrun_operator import TriggerDagRunOperator
  9. #from airflow.operators.trigger_dagrun import TriggerDagRunOperator # FIX for v2
  10. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import find_analysis_designs_func
  11. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import load_analysis_design_func
  12. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import find_analysis_to_trigger_dags_func
  13. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import get_dag_conf_for_analysis
  14. from igf_airflow.logging.upload_log_msg import send_log_to_channels
  15. SLACK_CONF = \
  16. Variable.get('slack_conf', default_var=None)
  17. MS_TEAMS_CONF = \
  18. Variable.get('ms_teams_conf', default_var=None)
  19. args = {
  20. 'owner': 'airflow',
  21. 'start_date': days_ago(2),
  22. 'retries': 1,
  23. 'retry_delay': timedelta(minutes=5),
  24. 'provide_context': True,
  25. 'email_on_failure': False,
  26. 'email_on_retry': False,
  27. 'catchup': False,
  28. 'max_active_runs': 1}
  29. DAG_ID = \
  30. os.path.basename(__file__).\
  31. replace(".pyc", "").\
  32. replace(".py", "")
  33. dag = \
  34. DAG(
  35. dag_id=DAG_ID,
  36. schedule_interval=None,
  37. default_args=args,
  38. orientation='LR',
  39. tags=['hpc'])
  40. ANALYSIS_LIST = \
  41. Variable.get("analysis_dag_list", default_var={})
  42. ## FUNCTION
  43. def trigger_dag_func(context, dag_run_obj):
  44. try:
  45. ti = context.get('ti')
  46. xcom_key = \
  47. context['params'].get('xcom_key')
  48. xcom_task = \
  49. context['params'].get('xcom_task')
  50. analysis_name = \
  51. context['params'].get('analysis_name')
  52. index = \
  53. context['params'].get('index')
  54. analysis_list = \
  55. ti.xcom_pull(
  56. task_ids=xcom_task,
  57. key=xcom_key)
  58. analysis_detail = \
  59. get_dag_conf_for_analysis(
  60. analysis_list=analysis_list,
  61. analysis_name=analysis_name,
  62. index=index)
  63. dag_run_obj.payload = analysis_detail
  64. return dag_run_obj
  65. ## FIX for v2
  66. # trigger_dag = \
  67. # TriggerDagRunOperator(
  68. # task_id="trigger_dag_{0}_{1}".format(analysis_name, index),
  69. # trigger_dag_id=analysis_name,
  70. # conf=analysis_detail)
  71. #return trigger_dag.execute(context=context)
  72. except Exception as e:
  73. logging.error(e)
  74. message = \
  75. 'analysis input finding error: {0}'.\
  76. format(e)
  77. send_log_to_channels(
  78. slack_conf=SLACK_CONF,
  79. ms_teams_conf=MS_TEAMS_CONF,
  80. task_id=context['task'].task_id,
  81. dag_id=context['task'].dag_id,
  82. comment=message,
  83. reaction='fail')
  84. raise
  85. ## DAG
  86. with dag:
  87. ## TASK
  88. find_analysis_designs = \
  89. BranchPythonOperator(
  90. task_id="find_analysis_designs",
  91. dag=dag,
  92. queue='hpc_4G',
  93. params={
  94. "load_analysis_task_prefix": "load_analysis_design",
  95. "no_task_name":"no_task",
  96. "load_task_limit": 20,
  97. "load_design_xcom_key": "load_design"},
  98. python_callable=find_analysis_designs_func)
  99. ## TASK
  100. no_task = \
  101. DummyOperator(
  102. task_id="no_task",
  103. dag=dag)
  104. ## TASK
  105. load_analysis_design_tasks = [no_task]
  106. for i in range(0, 10):
  107. t = \
  108. PythonOperator(
  109. task_id="load_analysis_design_{0}".format(i),
  110. dag=dag,
  111. params={
  112. "task_index": i,
  113. "load_design_xcom_key": "load_design",
  114. "load_design_xcom_task": "find_analysis_designs"},
  115. python_callable=load_analysis_design_func)
  116. load_analysis_design_tasks.append(t)
  117. ## TASK
  118. find_analysis_to_trigger_dags = \
  119. BranchPythonOperator(
  120. task_id="find_analysis_to_trigger_dags",
  121. dag=dag,
  122. queue='hpc_4G',
  123. params={
  124. "no_trigger_task": "no_trigger",
  125. "analysis_limit": 40,
  126. "xcom_key": "analysis_list",
  127. "trigger_task_prefix": "trigger"},
  128. trigger_rule='none_failed_or_skipped',
  129. python_callable=find_analysis_to_trigger_dags_func)
  130. ## TASK
  131. no_trigger = \
  132. DummyOperator(
  133. task_id="no_trigger",
  134. dag=dag)
  135. ## TASK
  136. trigger_analysis_dag_tasks = [no_trigger]
  137. for analysis_name in ANALYSIS_LIST.keys():
  138. for i in range(0, 40):
  139. t = \
  140. TriggerDagRunOperator(
  141. task_id="trigger_{0}_{1}".format(analysis_name, i),
  142. trigger_dag_id=analysis_name,
  143. dag=dag,
  144. queue='hpc_4G',
  145. params={
  146. "xcom_key": "analysis_list",
  147. "xcom_task": "find_analysis_to_trigger_dags",
  148. "analysis_name": analysis_name,
  149. "index": i},
  150. python_callable=trigger_dag_func)
  151. trigger_analysis_dag_tasks.append(t)
  152. ## PIPELINE
  153. find_analysis_designs >> load_analysis_design_tasks
  154. load_analysis_design_tasks >> find_analysis_to_trigger_dags
  155. find_analysis_to_trigger_dags >> trigger_analysis_dag_tasks