dag10_nextflow_atacseq_pipeline.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. from datetime import timedelta
  2. import os,json,logging,subprocess
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dummy_operator import DummyOperator
  8. from igf_airflow.utils.dag10_nextflow_atacseq_pipeline import fetch_nextflow_analysis_info_and_branch_func
  9. from igf_airflow.utils.dag10_nextflow_atacseq_pipeline import prep_nf_run_func
  10. from igf_airflow.utils.dag10_nextflow_atacseq_pipeline import run_nf_command_func
  11. from igf_airflow.utils.dag10_nextflow_atacseq_pipeline import copy_data_to_box_func
  12. from igf_airflow.utils.dag10_nextflow_atacseq_pipeline import copy_data_to_irods_func
  13. from igf_airflow.utils.dag10_nextflow_atacseq_pipeline import copy_nf_atacseq_branch_func
  14. from igf_airflow.utils.dag10_nextflow_atacseq_pipeline import change_pipeline_status
  15. default_args = {
  16. 'owner': 'airflow',
  17. 'depends_on_past': False,
  18. 'start_date': days_ago(2),
  19. 'email_on_failure': False,
  20. 'email_on_retry': False,
  21. 'retries': 4,
  22. 'max_active_runs':10,
  23. 'catchup':True,
  24. 'retry_delay': timedelta(minutes=5),
  25. 'provide_context': True}
  26. dag = \
  27. DAG(
  28. dag_id='dag10_nextflow_atacseq_pipeline',
  29. schedule_interval=None,
  30. tags=['hpc','analysis','nextflow','atacseq'],
  31. default_args=default_args,
  32. orientation='LR')
  33. with dag:
  34. ## TASK
  35. fetch_nextflow_analysis_info_and_branch = \
  36. BranchPythonOperator(
  37. task_id='fetch_nextflow_analysis_info_and_branch',
  38. dag=dag,
  39. queue='hpc_4G',
  40. python_callable=fetch_nextflow_analysis_info_and_branch_func,
  41. params={'no_analysis_task':'no_analysis',
  42. 'active_tasks':['prep_nf_atacseq_run'],
  43. 'analysis_description_xcom_key':'analysis_description'})
  44. ## TASK
  45. prep_nf_atacseq_run = \
  46. PythonOperator(
  47. task_id='prep_nf_atacseq_run',
  48. dag=dag,
  49. queue='hpc_4G',
  50. python_callable=prep_nf_run_func,
  51. params={'analysis_description_xcom_key':'analysis_description',
  52. 'analysis_description_xcom_task':'fetch_nextflow_analysis_info_and_branch',
  53. 'nextflow_command_xcom_key':'nexflow_command',
  54. 'nextflow_work_dir_xcom_key':'nextflow_work_dir'})
  55. ## TASK
  56. no_analysis = \
  57. DummyOperator(
  58. task_id='no_analysis',
  59. dag=dag)
  60. ## TASK
  61. update_analysis_and_status = \
  62. PythonOperator(
  63. task_id='update_analysis_and_status',
  64. dag=dag,
  65. queue='hpc_4G',
  66. python_callable=change_pipeline_status,
  67. trigger_rule='none_failed_or_skipped',
  68. params={'new_status':'FINISHED',
  69. 'no_change_status':'SEEDED'})
  70. ## PIPELINE
  71. fetch_nextflow_analysis_info_and_branch >> prep_nf_atacseq_run
  72. fetch_nextflow_analysis_info_and_branch >> no_analysis
  73. no_analysis >> update_analysis_and_status
  74. ## TASK
  75. run_nf_atacseq = \
  76. PythonOperator(
  77. task_id='run_nf_atacseq',
  78. dag=dag,
  79. queue='hpc_4G',
  80. pool_slots='nextflow_hpc',
  81. python_callable=run_nf_command_func,
  82. params={'nextflow_command_xcom_task':'prep_nf_atacseq_run',
  83. 'nextflow_command_xcom_key':'nexflow_command',
  84. 'nextflow_work_dir_xcom_task':'prep_nf_atacseq_run',
  85. 'nextflow_work_dir_xcom_key':'nextflow_work_dir'})
  86. ## TASK
  87. copy_nf_atacseq_branch = \
  88. BranchPythonOperator(
  89. task_id='copy_nf_atacseq_branch',
  90. dag=dag,
  91. queue='hpc_4G',
  92. python_callable=copy_nf_atacseq_branch_func,
  93. params={'data_files':'',
  94. 'html_files':''})
  95. copy_data_to_irods = \
  96. PythonOperator(
  97. task_id='copy_data_to_irods',
  98. dag=dag,
  99. queue='hpc_4G',
  100. python_callable=copy_data_to_irods_func,
  101. params={})
  102. copy_data_to_box = \
  103. PythonOperator(
  104. task_id='copy_data_to_box',
  105. dag=dag,
  106. queue='hpc_4G',
  107. python_callable=copy_data_to_irods_func,
  108. params={})
  109. ## PIPELINE
  110. prep_nf_atacseq_run >> run_nf_atacseq >> copy_nf_atacseq_branch
  111. copy_nf_atacseq_branch >> copy_data_to_irods
  112. copy_nf_atacseq_branch >> copy_data_to_box
  113. copy_data_to_irods >> update_analysis_and_status
  114. copy_data_to_box >> update_analysis_and_status