dag12_nextflow_chipseq_pipeline.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. from datetime import timedelta
  2. import os,json,logging,subprocess
  3. from airflow.models import DAG,Variable
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.python_operator import PythonOperator
  6. from airflow.operators.python_operator import BranchPythonOperator
  7. from airflow.operators.dummy_operator import DummyOperator
  8. from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import fetch_nextflow_analysis_info_and_branch_func
  9. from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import prep_nf_run_func
  10. from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import run_nf_command_func
  11. from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import copy_nf_data_to_box_func
  12. from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import copy_nf_data_to_irods_func
  13. from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import nf_analysis_copy_branch_func
  14. from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import change_pipeline_status
  15. from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import copy_nf_output_to_disk_func
  16. default_args = {
  17. 'owner': 'airflow',
  18. 'depends_on_past': False,
  19. 'start_date': days_ago(2),
  20. 'email_on_failure': False,
  21. 'email_on_retry': False,
  22. 'retries': 4,
  23. 'max_active_runs':10,
  24. 'catchup':True,
  25. 'retry_delay': timedelta(minutes=5),
  26. 'provide_context': True}
  27. dag = \
  28. DAG(
  29. dag_id='dag12_nextflow_chipseq_pipeline',
  30. schedule_interval=None,
  31. tags=['hpc','analysis','nextflow','chipseq'],
  32. default_args=default_args,
  33. orientation='LR')
  34. with dag:
  35. ## TASK
  36. fetch_nextflow_analysis_info_and_branch = \
  37. BranchPythonOperator(
  38. task_id='fetch_nextflow_analysis_info_and_branch',
  39. dag=dag,
  40. queue='hpc_4G',
  41. python_callable=fetch_nextflow_analysis_info_and_branch_func,
  42. params={'no_analysis_task':'no_analysis',
  43. 'active_tasks':['prep_nf_chipseq_run'],
  44. 'analysis_description_xcom_key':'analysis_description'})
  45. ## TASK
  46. prep_nf_chipseq_run = \
  47. PythonOperator(
  48. task_id='prep_nf_chipseq_run',
  49. dag=dag,
  50. queue='hpc_4G',
  51. python_callable=prep_nf_run_func,
  52. params={'analysis_description_xcom_key':'analysis_description',
  53. 'analysis_description_xcom_task':'fetch_nextflow_analysis_info_and_branch',
  54. 'nextflow_command_xcom_key':'nexflow_command',
  55. 'nextflow_work_dir_xcom_key':'nextflow_work_dir'})
  56. ## TASK
  57. no_analysis = \
  58. DummyOperator(
  59. task_id='no_analysis',
  60. dag=dag)
  61. ## TASK
  62. update_analysis_and_status = \
  63. PythonOperator(
  64. task_id='update_analysis_and_status',
  65. dag=dag,
  66. queue='hpc_4G',
  67. python_callable=change_pipeline_status,
  68. trigger_rule='none_failed_or_skipped',
  69. params={'new_status':'FINISHED',
  70. 'no_change_status':'SEEDED'})
  71. ## PIPELINE
  72. fetch_nextflow_analysis_info_and_branch >> prep_nf_chipseq_run
  73. fetch_nextflow_analysis_info_and_branch >> no_analysis
  74. no_analysis >> update_analysis_and_status
  75. ## TASK
  76. run_nf_chipseq = \
  77. PythonOperator(
  78. task_id='run_nf_chipseq',
  79. dag=dag,
  80. queue='hpc_4G_long',
  81. pool='nextflow_hpc',
  82. python_callable=run_nf_command_func,
  83. params={'nextflow_command_xcom_task':'prep_nf_chipseq_run',
  84. 'nextflow_command_xcom_key':'nexflow_command',
  85. 'nextflow_work_dir_xcom_task':'prep_nf_chipseq_run',
  86. 'nextflow_work_dir_xcom_key':'nextflow_work_dir'})
  87. ## TASK
  88. copy_nf_output_to_disk = \
  89. PythonOperator(
  90. task_id='copy_nf_output_to_disk',
  91. dag=dag,
  92. queue='hpc_4G',
  93. python_callable=copy_nf_output_to_disk_func,
  94. params={'nextflow_work_dir_xcom_task':'prep_nf_atacseq_run',
  95. 'nextflow_work_dir_xcom_key':'nextflow_work_dir',
  96. 'result_dirname':'results',
  97. 'data_dir_list':['bwa'],
  98. 'report_file_dirs':['fastqc','igv','multiqc','pipeline_info','trim_galore'],
  99. 'dag_file_name':'dag.html'})
  100. ## TASK
  101. nf_analysis_copy_branch = \
  102. BranchPythonOperator(
  103. task_id='nf_analysis_copy_branch',
  104. dag=dag,
  105. queue='hpc_4G',
  106. python_callable=nf_analysis_copy_branch_func,
  107. params={'nextflow_work_dir_xcom_task':'prep_nf_atacseq_run',
  108. 'nextflow_work_dir_xcom_key':'nextflow_work_dir',
  109. 'result_dirname':'results',
  110. 'data_dir_list':['bwa'],
  111. 'data_dir_xcom_key':'data_dir',
  112. 'report_file_dirs':['fastqc','igv','multiqc','pipeline_info','trim_galore'],
  113. 'report_file_xcom_key':'report_dir',
  114. 'dag_file_name':'dag.html',
  115. 'dag_file_xcom_key':'dag_file',
  116. 'data_file_copy_tasks':['copy_nf_data_to_irods'],
  117. 'report_file_copy_tasks':['copy_nf_data_to_box']})
  118. ## TASK
  119. copy_nf_data_to_irods = \
  120. PythonOperator(
  121. task_id='copy_nf_data_to_irods',
  122. dag=dag,
  123. queue='hpc_4G',
  124. python_callable=copy_nf_data_to_irods_func,
  125. params={'nextflow_work_dir_xcom_task':'prep_nf_atacseq_run',
  126. 'nextflow_work_dir_xcom_key':'nextflow_work_dir',
  127. 'result_dirname':'results',
  128. 'data_dir_xcom_key':'data_dir',
  129. 'data_dir_xcom_task':'nf_analysis_copy_branch'})
  130. ## TASK
  131. copy_nf_data_to_box = \
  132. PythonOperator(
  133. task_id='copy_nf_data_to_box',
  134. dag=dag,
  135. queue='hpc_4G',
  136. python_callable=copy_nf_data_to_box_func,
  137. params={'report_file_xcom_key':'report_dir',
  138. 'report_file_xcom_task':'nf_analysis_copy_branch',
  139. 'dag_file_xcom_key':'dag_file',
  140. 'dag_file_xcom_task':'nf_analysis_copy_branch'})
  141. ## PIPELINE
  142. prep_nf_chipseq_run >> run_nf_chipseq >> nf_analysis_copy_branch
  143. run_nf_chipseq >> copy_nf_output_to_disk
  144. nf_analysis_copy_branch >> copy_nf_data_to_irods
  145. nf_analysis_copy_branch >> copy_nf_data_to_box
  146. copy_nf_output_to_disk >> update_analysis_and_status
  147. copy_nf_data_to_irods >> update_analysis_and_status
  148. copy_nf_data_to_box >> update_analysis_and_status