Browse Source

added tasks for dag trigger

Avik Datta 3 years ago
parent
commit
5a2872a400
1 changed files with 69 additions and 8 deletions
  1. 69 8
      dags/dag18_upload_and_trigger_analysis.py

+ 69 - 8
dags/dag18_upload_and_trigger_analysis.py

@@ -1,4 +1,4 @@
-import os
+import os, logging
 from datetime import timedelta
 from airflow.models import DAG, Variable
 from airflow.utils.dates import days_ago
@@ -10,6 +10,13 @@ from airflow.operators.dagrun_operator import TriggerDagRunOperator
 from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import find_analysis_designs_func
 from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import load_analysis_design_func
 from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import find_analysis_to_trigger_dags_func
+from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import get_dag_conf_for_analysis
+from igf_airflow.logging.upload_log_msg import send_log_to_channels
+
+SLACK_CONF = \
+  Variable.get('slack_conf', default_var=None)
+MS_TEAMS_CONF = \
+  Variable.get('ms_teams_conf', default_var=None)
 
 args = {
     'owner': 'airflow',
@@ -33,6 +40,50 @@ dag = \
         tags=['hpc'])
 ANALYSIS_LIST = \
     Variable.get("analysis_dag_list", default_var={})
+## FUNCTION
+def trigger_dag_func(context, dag_run_obj):
+    try:
+        ti = context.get('ti')
+        xcom_key = \
+            context['params'].get('xcom_key')
+        xcom_task = \
+            context['params'].get('xcom_task')
+        analysis_name = \
+            context['params'].get('analysis_name')
+        index = \
+            context['params'].get('index')
+        analysis_list = \
+            ti.xcom_pull(
+                task_ids=xcom_task,
+                key=xcom_key)
+        analysis_detail = \
+           get_dag_conf_for_analysis(
+               analysis_list=analysis_list,
+               analysis_name=analysis_name,
+               index=index)
+        dag_run_obj.payload = analysis_detail
+        return dag_run_obj
+        ## FIX for v2
+        # trigger_dag = \
+        #    TriggerDagRunOperator(
+        #        task_id="trigger_dag_{0}_{1}".format(analysis_name, index),
+        #        trigger_dag_id=analysis_name,
+        #        conf=analysis_detail)
+        #return trigger_dag.execute(context=context)
+    except Exception as e:
+        logging.error(e)
+        message = \
+        'analysis input finding error: {0}'.\
+            format(e)
+        send_log_to_channels(
+            slack_conf=SLACK_CONF,
+            ms_teams_conf=MS_TEAMS_CONF,
+            task_id=context['task'].task_id,
+            dag_id=context['task'].dag_id,
+            comment=message,
+            reaction='fail')
+        raise
+## DAG
 with dag:
     ## TASK
     find_analysis_designs = \
@@ -53,7 +104,7 @@ with dag:
             dag=dag)
     ## TASK
     load_analysis_design_tasks = [no_task]
-    for i in range(0, 20):
+    for i in range(0, 10):
         t = \
             PythonOperator(
                 task_id="load_analysis_design_{i}".format(i),
@@ -72,22 +123,32 @@ with dag:
             queue='hpc_4G',
             params={
                 "no_trigger_task": "no_trigger",
-                "analysis_limit": 20,
+                "analysis_limit": 40,
+                "xcom_key": "analysis_list",
                 "trigger_task_prefix": "trigger"},
             trigger_rule='none_failed_or_skipped',
             python_callable=find_analysis_to_trigger_dags_func)
     ## TASK
-    trigger_analysis_dag_tasks = list()
+    no_trigger = \
+        DummyOperator(
+            task_id="no_trigger",
+            dag=dag)
+    ## TASK
+    trigger_analysis_dag_tasks = [no_trigger]
     for analysis_name in ANALYSIS_LIST.keys():
-        for i in (range(0, 20)):
+        for i in (range(0, 40)):
             t = \
                 TriggerDagRunOperator(
                     task_id="trigger_{0}_{0}".format(analysis_name, i),
-                    dag=dag,
                     trigger_dag_id=analysis_name,
+                    dag=dag,
                     queue='hpc_4G',
-                    params={},
-                    python_callable=None)
+                    params={
+                        "xcom_key": "analysis_list",
+                        "xcom_task": "find_analysis_to_trigger_dags",
+                        "analysis_name": analysis_name,
+                        "index": i},
+                    python_callable=trigger_dag_func)
             trigger_analysis_dag_tasks.append(t)
     ## PIPELINE
     find_analysis_designs >> load_analysis_design_tasks