dag18_upload_and_trigger_analysis.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import os
  2. from datetime import timedelta
  3. from airflow.models import DAG, Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dagrun_operator import TriggerDagRunOperator
  8. #from airflow.operators.trigger_dagrun import TriggerDagRunOperator # FIX for v2
  9. args = {
  10. 'owner': 'airflow',
  11. 'start_date': days_ago(2),
  12. 'retries': 1,
  13. 'retry_delay': timedelta(minutes=5),
  14. 'provide_context': True,
  15. 'email_on_failure': False,
  16. 'email_on_retry': False,
  17. 'catchup': False,
  18. 'max_active_runs': 1}
  19. DAG_ID = \
  20. os.path.basename(__file__).\
  21. replace(".pyc", "").\
  22. replace(".py", "")
  23. dag = \
  24. DAG(
  25. dag_id=DAG_ID,
  26. schedule_interval=None,
  27. default_args=args,
  28. tags=['hpc'])
  29. ANALYSIS_LIST = \
  30. Variable.get("analysis_dag_list", default_var={})
  31. with dag:
  32. ## TASK
  33. find_analysis_designs = \
  34. BranchPythonOperator(
  35. task_id="find_analysis_designs",
  36. dag=dag,
  37. queue='hpc_4G',
  38. params={},
  39. python_callable=None)
  40. ## TASK
  41. load_analysis_design_tasks = list()
  42. for i in range(1, 21):
  43. t = \
  44. PythonOperator(
  45. task_id="load_analysis_design_{i}".format(i),
  46. dag=dag,
  47. params={},
  48. python_callable=None)
  49. load_analysis_design_tasks.append(t)
  50. ## TASK
  51. find_analysis_to_trigger_dags = \
  52. BranchPythonOperator(
  53. task_id="find_analysis_to_trigger_dags",
  54. dag=dag,
  55. queue='hpc_4G',
  56. params={},
  57. python_callable=None)
  58. ## TASK
  59. trigger_analysis_dag_tasks = list()
  60. for analysis_name in ANALYSIS_LIST.keys():
  61. for i in (range(1, 21)):
  62. t = \
  63. TriggerDagRunOperator(
  64. task_id="trigger_{0}_{0}".format(analysis_name, i),
  65. dag=dag,
  66. trigger_dag_id=analysis_name,
  67. queue='hpc_4G',
  68. params={},
  69. python_callable=None)
  70. trigger_analysis_dag_tasks.append(t)
  71. ## PIPELINE
  72. find_analysis_designs >> load_analysis_design_tasks
  73. load_analysis_design_tasks >> find_analysis_to_trigger_dags
  74. find_analysis_to_trigger_dags >> trigger_analysis_dag_tasks