ソースを参照

added task for log and input reset

Avik Datta 3 年 前
コミット
2abbd2d16f
1 ファイル変更21 行追加47 行削除
  1. 21 47
      dags/dag18_upload_and_trigger_analysis.py

+ 21 - 47
dags/dag18_upload_and_trigger_analysis.py

@@ -10,8 +10,8 @@ from airflow.operators.dagrun_operator import TriggerDagRunOperator
 from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import find_analysis_designs_func
 from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import load_analysis_design_func
 from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import find_analysis_to_trigger_dags_func
-from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import get_dag_conf_for_analysis
-from igf_airflow.logging.upload_log_msg import send_log_to_channels
+from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import send_log_and_reset_trigger_file_func
+from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import trigger_dag_func
 
 SLACK_CONF = \
   Variable.get('slack_conf', default_var=None)
@@ -41,49 +41,6 @@ dag = \
         tags=['hpc'])
 ANALYSIS_LIST = \
     Variable.get("analysis_dag_list", default_var={})
-## FUNCTION
-def trigger_dag_func(context, dag_run_obj):
-    try:
-        ti = context.get('ti')
-        xcom_key = \
-            context['params'].get('xcom_key')
-        xcom_task = \
-            context['params'].get('xcom_task')
-        analysis_name = \
-            context['params'].get('analysis_name')
-        index = \
-            context['params'].get('index')
-        analysis_list = \
-            ti.xcom_pull(
-                task_ids=xcom_task,
-                key=xcom_key)
-        analysis_detail = \
-           get_dag_conf_for_analysis(
-               analysis_list=analysis_list,
-               analysis_name=analysis_name,
-               index=index)
-        dag_run_obj.payload = analysis_detail
-        return dag_run_obj
-        ## FIX for v2
-        # trigger_dag = \
-        #    TriggerDagRunOperator(
-        #        task_id="trigger_dag_{0}_{1}".format(analysis_name, index),
-        #        trigger_dag_id=analysis_name,
-        #        conf=analysis_detail)
-        #return trigger_dag.execute(context=context)
-    except Exception as e:
-        logging.error(e)
-        message = \
-        'analysis input finding error: {0}'.\
-            format(e)
-        send_log_to_channels(
-            slack_conf=SLACK_CONF,
-            ms_teams_conf=MS_TEAMS_CONF,
-            task_id=context['task'].task_id,
-            dag_id=context['task'].dag_id,
-            comment=message,
-            reaction='fail')
-        raise
 ## DAG
 with dag:
     ## TASK
@@ -137,6 +94,10 @@ with dag:
     ## TASK
     trigger_analysis_dag_tasks = [no_trigger]
     for analysis_name in ANALYSIS_LIST.keys():
+        j = \
+            DummyOperator(
+                task_id="finished_{0}".format(analysis_name),
+                dag=dag)
         for i in range(0, 40):
             t = \
                 TriggerDagRunOperator(
@@ -150,8 +111,21 @@ with dag:
                         "analysis_name": analysis_name,
                         "index": i},
                     python_callable=trigger_dag_func)
-            trigger_analysis_dag_tasks.append(t)
+            ## PIPELINE
+            find_analysis_to_trigger_dags >> t >> j
+        trigger_analysis_dag_tasks.append(j)
+    ## TASK
+    send_log_and_reset_trigger_file = \
+        PythonOperator(
+            task_id="send_log_and_reset_trigger_file",
+            dag=dag,
+            queue='hpc_4G',
+            params={
+                "xcom_key": "analysis_list",
+                "xcom_task": "find_analysis_to_trigger_dags"},
+            python_callable=send_log_and_reset_trigger_file_func)
     ## PIPELINE
     find_analysis_designs >> load_analysis_design_tasks 
     load_analysis_design_tasks >> find_analysis_to_trigger_dags
-    find_analysis_to_trigger_dags >> trigger_analysis_dag_tasks
+    find_analysis_to_trigger_dags >> trigger_analysis_dag_tasks
+    trigger_analysis_dag_tasks >> send_log_and_reset_trigger_file