123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- import os, logging
- from datetime import timedelta
- from airflow.models import DAG, Variable
- from airflow.utils.dates import days_ago
- from airflow.operators.python_operator import PythonOperator
- from airflow.operators.python_operator import BranchPythonOperator
- from airflow.operators.dummy_operator import DummyOperator
- from airflow.operators.dagrun_operator import TriggerDagRunOperator
- #from airflow.operators.trigger_dagrun import TriggerDagRunOperator # FIX for v2
- from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import find_analysis_designs_func
- from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import load_analysis_design_func
- from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import find_analysis_to_trigger_dags_func
- from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import get_dag_conf_for_analysis
- from igf_airflow.logging.upload_log_msg import send_log_to_channels
- SLACK_CONF = \
- Variable.get('slack_conf', default_var=None)
- MS_TEAMS_CONF = \
- Variable.get('ms_teams_conf', default_var=None)
- args = {
- 'owner': 'airflow',
- 'start_date': days_ago(2),
- 'retries': 1,
- 'retry_delay': timedelta(minutes=5),
- 'provide_context': True,
- 'email_on_failure': False,
- 'email_on_retry': False,
- 'catchup': False,
- 'max_active_runs': 1}
- DAG_ID = \
- os.path.basename(__file__).\
- replace(".pyc", "").\
- replace(".py", "")
- dag = \
- DAG(
- dag_id=DAG_ID,
- schedule_interval=None,
- default_args=args,
- tags=['hpc'])
- ANALYSIS_LIST = \
- Variable.get("analysis_dag_list", default_var={})
- ## FUNCTION
- def trigger_dag_func(context, dag_run_obj):
- try:
- ti = context.get('ti')
- xcom_key = \
- context['params'].get('xcom_key')
- xcom_task = \
- context['params'].get('xcom_task')
- analysis_name = \
- context['params'].get('analysis_name')
- index = \
- context['params'].get('index')
- analysis_list = \
- ti.xcom_pull(
- task_ids=xcom_task,
- key=xcom_key)
- analysis_detail = \
- get_dag_conf_for_analysis(
- analysis_list=analysis_list,
- analysis_name=analysis_name,
- index=index)
- dag_run_obj.payload = analysis_detail
- return dag_run_obj
- ## FIX for v2
- # trigger_dag = \
- # TriggerDagRunOperator(
- # task_id="trigger_dag_{0}_{1}".format(analysis_name, index),
- # trigger_dag_id=analysis_name,
- # conf=analysis_detail)
- #return trigger_dag.execute(context=context)
- except Exception as e:
- logging.error(e)
- message = \
- 'analysis input finding error: {0}'.\
- format(e)
- send_log_to_channels(
- slack_conf=SLACK_CONF,
- ms_teams_conf=MS_TEAMS_CONF,
- task_id=context['task'].task_id,
- dag_id=context['task'].dag_id,
- comment=message,
- reaction='fail')
- raise
- ## DAG
- with dag:
- ## TASK
- find_analysis_designs = \
- BranchPythonOperator(
- task_id="find_analysis_designs",
- dag=dag,
- queue='hpc_4G',
- params={
- "load_analysis_task_prefix": "load_analysis_design",
- "no_task_name":"no_task",
- "load_task_limit": 20,
- "load_design_xcom_key": "load_design"},
- python_callable=find_analysis_designs_func)
- ## TASK
- no_task = \
- DummyOperator(
- task_id="no_task",
- dag=dag)
- ## TASK
- load_analysis_design_tasks = [no_task]
- for i in range(0, 10):
- t = \
- PythonOperator(
- task_id="load_analysis_design_{0}".format(i),
- dag=dag,
- params={
- "task_index": i,
- "load_design_xcom_key": "load_design",
- "load_design_xcom_task": "find_analysis_designs"},
- python_callable=load_analysis_design_func)
- load_analysis_design_tasks.append(t)
- ## TASK
- find_analysis_to_trigger_dags = \
- BranchPythonOperator(
- task_id="find_analysis_to_trigger_dags",
- dag=dag,
- queue='hpc_4G',
- params={
- "no_trigger_task": "no_trigger",
- "analysis_limit": 40,
- "xcom_key": "analysis_list",
- "trigger_task_prefix": "trigger"},
- trigger_rule='none_failed_or_skipped',
- python_callable=find_analysis_to_trigger_dags_func)
- ## TASK
- no_trigger = \
- DummyOperator(
- task_id="no_trigger",
- dag=dag)
- ## TASK
- trigger_analysis_dag_tasks = [no_trigger]
- for analysis_name in ANALYSIS_LIST.keys():
- for i in range(0, 40):
- t = \
- TriggerDagRunOperator(
- task_id="trigger_{0}_{1}".format(analysis_name, i),
- trigger_dag_id=analysis_name,
- dag=dag,
- queue='hpc_4G',
- params={
- "xcom_key": "analysis_list",
- "xcom_task": "find_analysis_to_trigger_dags",
- "analysis_name": analysis_name,
- "index": i},
- python_callable=trigger_dag_func)
- trigger_analysis_dag_tasks.append(t)
- ## PIPELINE
- find_analysis_designs >> load_analysis_design_tasks
- load_analysis_design_tasks >> find_analysis_to_trigger_dags
- find_analysis_to_trigger_dags >> trigger_analysis_dag_tasks
|