dag18_upload_and_trigger_analysis.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import os
  2. from datetime import timedelta
  3. from airflow.models import DAG, Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dummy_operator import DummyOperator
  8. from airflow.operators.dagrun_operator import TriggerDagRunOperator
  9. #from airflow.operators.trigger_dagrun import TriggerDagRunOperator # FIX for v2
  10. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import find_analysis_designs_func
  11. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import load_analysis_design_func
  12. from igf_airflow.utils.dag18_upload_and_trigger_analysis_utils import find_analysis_to_trigger_dags_func
  13. args = {
  14. 'owner': 'airflow',
  15. 'start_date': days_ago(2),
  16. 'retries': 1,
  17. 'retry_delay': timedelta(minutes=5),
  18. 'provide_context': True,
  19. 'email_on_failure': False,
  20. 'email_on_retry': False,
  21. 'catchup': False,
  22. 'max_active_runs': 1}
  23. DAG_ID = \
  24. os.path.basename(__file__).\
  25. replace(".pyc", "").\
  26. replace(".py", "")
  27. dag = \
  28. DAG(
  29. dag_id=DAG_ID,
  30. schedule_interval=None,
  31. default_args=args,
  32. tags=['hpc'])
  33. ANALYSIS_LIST = \
  34. Variable.get("analysis_dag_list", default_var={})
  35. with dag:
  36. ## TASK
  37. find_analysis_designs = \
  38. BranchPythonOperator(
  39. task_id="find_analysis_designs",
  40. dag=dag,
  41. queue='hpc_4G',
  42. params={
  43. "load_analysis_task_prefix": "load_analysis_design",
  44. "no_task_name":"no_task",
  45. "load_task_limit": 20,
  46. "load_design_xcom_key": "load_design"},
  47. python_callable=find_analysis_designs_func)
  48. ## TASK
  49. no_task = \
  50. DummyOperator(
  51. task_id="no_task",
  52. dag=dag)
  53. ## TASK
  54. load_analysis_design_tasks = [no_task]
  55. for i in range(0, 20):
  56. t = \
  57. PythonOperator(
  58. task_id="load_analysis_design_{i}".format(i),
  59. dag=dag,
  60. params={
  61. "task_index": i,
  62. "load_design_xcom_key": "load_design",
  63. "load_design_xcom_task": "find_analysis_designs"},
  64. python_callable=load_analysis_design_func)
  65. load_analysis_design_tasks.append(t)
  66. ## TASK
  67. find_analysis_to_trigger_dags = \
  68. BranchPythonOperator(
  69. task_id="find_analysis_to_trigger_dags",
  70. dag=dag,
  71. queue='hpc_4G',
  72. params={
  73. "no_trigger_task": "no_trigger",
  74. "analysis_limit": 20,
  75. "trigger_task_prefix": "trigger"},
  76. trigger_rule='none_failed_or_skipped',
  77. python_callable=find_analysis_to_trigger_dags_func)
  78. ## TASK
  79. trigger_analysis_dag_tasks = list()
  80. for analysis_name in ANALYSIS_LIST.keys():
  81. for i in (range(0, 20)):
  82. t = \
  83. TriggerDagRunOperator(
  84. task_id="trigger_{0}_{0}".format(analysis_name, i),
  85. dag=dag,
  86. trigger_dag_id=analysis_name,
  87. queue='hpc_4G',
  88. params={},
  89. python_callable=None)
  90. trigger_analysis_dag_tasks.append(t)
  91. ## PIPELINE
  92. find_analysis_designs >> load_analysis_design_tasks
  93. load_analysis_design_tasks >> find_analysis_to_trigger_dags
  94. find_analysis_to_trigger_dags >> trigger_analysis_dag_tasks