dag18_upload_and_trigger_analysis.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import os, logging
  2. from datetime import timedelta
  3. from airflow.models import DAG, Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dummy_operator import DummyOperator
  8. from airflow.operators.dagrun_operator import TriggerDagRunOperator
  9. #from airflow.operators.trigger_dagrun import TriggerDagRunOperator # FIX for v2
  10. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import find_analysis_designs_func
  11. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import load_analysis_design_func
  12. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import find_analysis_to_trigger_dags_func
  13. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import get_dag_conf_for_analysis
  14. from igf_airflow.logging.upload_log_msg import send_log_to_channels
  15. SLACK_CONF = \
  16. Variable.get('slack_conf', default_var=None)
  17. MS_TEAMS_CONF = \
  18. Variable.get('ms_teams_conf', default_var=None)
  19. args = {
  20. 'owner': 'airflow',
  21. 'start_date': days_ago(2),
  22. 'retries': 1,
  23. 'retry_delay': timedelta(minutes=5),
  24. 'provide_context': True,
  25. 'email_on_failure': False,
  26. 'email_on_retry': False,
  27. 'catchup': False,
  28. 'max_active_runs': 1}
  29. DAG_ID = \
  30. os.path.basename(__file__).\
  31. replace(".pyc", "").\
  32. replace(".py", "")
  33. dag = \
  34. DAG(
  35. dag_id=DAG_ID,
  36. schedule_interval=None,
  37. default_args=args,
  38. tags=['hpc'])
  39. ANALYSIS_LIST = \
  40. Variable.get("analysis_dag_list", default_var={})
  41. ## FUNCTION
  42. def trigger_dag_func(context, dag_run_obj):
  43. try:
  44. ti = context.get('ti')
  45. xcom_key = \
  46. context['params'].get('xcom_key')
  47. xcom_task = \
  48. context['params'].get('xcom_task')
  49. analysis_name = \
  50. context['params'].get('analysis_name')
  51. index = \
  52. context['params'].get('index')
  53. analysis_list = \
  54. ti.xcom_pull(
  55. task_ids=xcom_task,
  56. key=xcom_key)
  57. analysis_detail = \
  58. get_dag_conf_for_analysis(
  59. analysis_list=analysis_list,
  60. analysis_name=analysis_name,
  61. index=index)
  62. dag_run_obj.payload = analysis_detail
  63. return dag_run_obj
  64. ## FIX for v2
  65. # trigger_dag = \
  66. # TriggerDagRunOperator(
  67. # task_id="trigger_dag_{0}_{1}".format(analysis_name, index),
  68. # trigger_dag_id=analysis_name,
  69. # conf=analysis_detail)
  70. #return trigger_dag.execute(context=context)
  71. except Exception as e:
  72. logging.error(e)
  73. message = \
  74. 'analysis input finding error: {0}'.\
  75. format(e)
  76. send_log_to_channels(
  77. slack_conf=SLACK_CONF,
  78. ms_teams_conf=MS_TEAMS_CONF,
  79. task_id=context['task'].task_id,
  80. dag_id=context['task'].dag_id,
  81. comment=message,
  82. reaction='fail')
  83. raise
  84. ## DAG
  85. with dag:
  86. ## TASK
  87. find_analysis_designs = \
  88. BranchPythonOperator(
  89. task_id="find_analysis_designs",
  90. dag=dag,
  91. queue='hpc_4G',
  92. params={
  93. "load_analysis_task_prefix": "load_analysis_design",
  94. "no_task_name":"no_task",
  95. "load_task_limit": 20,
  96. "load_design_xcom_key": "load_design"},
  97. python_callable=find_analysis_designs_func)
  98. ## TASK
  99. no_task = \
  100. DummyOperator(
  101. task_id="no_task",
  102. dag=dag)
  103. ## TASK
  104. load_analysis_design_tasks = [no_task]
  105. for i in range(0, 10):
  106. t = \
  107. PythonOperator(
  108. task_id="load_analysis_design_{0}".format(i),
  109. dag=dag,
  110. params={
  111. "task_index": i,
  112. "load_design_xcom_key": "load_design",
  113. "load_design_xcom_task": "find_analysis_designs"},
  114. python_callable=load_analysis_design_func)
  115. load_analysis_design_tasks.append(t)
  116. ## TASK
  117. find_analysis_to_trigger_dags = \
  118. BranchPythonOperator(
  119. task_id="find_analysis_to_trigger_dags",
  120. dag=dag,
  121. queue='hpc_4G',
  122. params={
  123. "no_trigger_task": "no_trigger",
  124. "analysis_limit": 40,
  125. "xcom_key": "analysis_list",
  126. "trigger_task_prefix": "trigger"},
  127. trigger_rule='none_failed_or_skipped',
  128. python_callable=find_analysis_to_trigger_dags_func)
  129. ## TASK
  130. no_trigger = \
  131. DummyOperator(
  132. task_id="no_trigger",
  133. dag=dag)
  134. ## TASK
  135. trigger_analysis_dag_tasks = [no_trigger]
  136. for analysis_name in ANALYSIS_LIST.keys():
  137. for i in range(0, 40):
  138. t = \
  139. TriggerDagRunOperator(
  140. task_id="trigger_{0}_{1}".format(analysis_name, i),
  141. trigger_dag_id=analysis_name,
  142. dag=dag,
  143. queue='hpc_4G',
  144. params={
  145. "xcom_key": "analysis_list",
  146. "xcom_task": "find_analysis_to_trigger_dags",
  147. "analysis_name": analysis_name,
  148. "index": i},
  149. python_callable=trigger_dag_func)
  150. trigger_analysis_dag_tasks.append(t)
  151. ## PIPELINE
  152. find_analysis_designs >> load_analysis_design_tasks
  153. load_analysis_design_tasks >> find_analysis_to_trigger_dags
  154. find_analysis_to_trigger_dags >> trigger_analysis_dag_tasks