Avik Datta 3 роки тому
батько
коміт
2ee8878cca
1 змінених файлів з 24 додано та 8 видалено
  1. 24 8
      dags/dag18_upload_and_trigger_analysis.py

+ 24 - 8
dags/dag18_upload_and_trigger_analysis.py

@@ -4,8 +4,11 @@ from airflow.models import DAG, Variable
 from airflow.utils.dates import days_ago
 from airflow.operators.python_operator import PythonOperator
 from airflow.operators.python_operator import BranchPythonOperator
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.dagrun_operator import TriggerDagRunOperator
 #from airflow.operators.trigger_dagrun import TriggerDagRunOperator # FIX for v2
+from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import find_analysis_designs_func
+from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import load_analysis_design_func
 
 args = {
     'owner': 'airflow',
@@ -36,17 +39,29 @@ with dag:
             task_id="find_analysis_designs",
             dag=dag,
             queue='hpc_4G',
-            params={},
-            python_callable=None)
+            params={
+                "load_analysis_task_prefix": "load_analysis_design",
+                "no_task_name":"no_task",
+                "load_task_limit": 20,
+                "load_design_xcom_key": "load_design"},
+            python_callable=find_analysis_designs_func)
+    ## TASK
+    no_task = \
+        DummyOperator(
+            task_id="no_task",
+            dag=dag)
     ## TASK
-    load_analysis_design_tasks = list()
-    for i in range(1, 21):
+    load_analysis_design_tasks = [no_task]
+    for i in range(0, 20):
         t = \
             PythonOperator(
                 task_id="load_analysis_design_{i}".format(i),
                 dag=dag,
-                params={},
-                python_callable=None)
+                params={
+                    "task_index": i,
+                    "load_design_xcom_key": "load_design",
+                    "load_design_xcom_task": "find_analysis_designs"},
+                python_callable=load_analysis_design_func)
         load_analysis_design_tasks.append(t)
     ## TASK
     find_analysis_to_trigger_dags = \
@@ -55,11 +70,12 @@ with dag:
             dag=dag,
             queue='hpc_4G',
             params={},
-            python_callable=None)
+            trigger_rule='none_failed_or_skipped',
+            python_callable=find_analysis_to_trigger_dags_func)
     ## TASK
     trigger_analysis_dag_tasks = list()
     for analysis_name in ANALYSIS_LIST.keys():
-        for i in (range(1, 21)):
+        for i in (range(0, 20)):
             t = \
                 TriggerDagRunOperator(
                     task_id="trigger_{0}_{0}".format(analysis_name, i),