123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- from datetime import timedelta
- import os,json,logging,subprocess
- from airflow.models import DAG,Variable
- from airflow.utils.dates import days_ago
- from airflow.operators.python_operator import PythonOperator
- from airflow.operators.python_operator import BranchPythonOperator
- from airflow.operators.dummy_operator import DummyOperator
- from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import fetch_nextflow_analysis_info_and_branch_func
- from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import prep_nf_run_func
- from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import run_nf_command_func
- from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import copy_nf_data_to_box_func
- from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import copy_nf_data_to_irods_func
- from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import nf_analysis_copy_branch_func
- from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import change_pipeline_status
- from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import copy_nf_output_to_disk_func
- default_args = {
- 'owner': 'airflow',
- 'depends_on_past': False,
- 'start_date': days_ago(2),
- 'email_on_failure': False,
- 'email_on_retry': False,
- 'retries': 4,
- 'max_active_runs':10,
- 'catchup':True,
- 'retry_delay': timedelta(minutes=5),
- 'provide_context': True}
- dag = \
- DAG(
- dag_id='dag12_nextflow_chipseq_pipeline',
- schedule_interval=None,
- tags=['hpc','analysis','nextflow','chipseq'],
- default_args=default_args,
- orientation='LR')
- with dag:
- ## TASK
- fetch_nextflow_analysis_info_and_branch = \
- BranchPythonOperator(
- task_id='fetch_nextflow_analysis_info_and_branch',
- dag=dag,
- queue='hpc_4G',
- python_callable=fetch_nextflow_analysis_info_and_branch_func,
- params={'no_analysis_task':'no_analysis',
- 'active_tasks':['prep_nf_chipseq_run'],
- 'analysis_description_xcom_key':'analysis_description'})
- ## TASK
- prep_nf_chipseq_run = \
- PythonOperator(
- task_id='prep_nf_chipseq_run',
- dag=dag,
- queue='hpc_4G',
- python_callable=prep_nf_run_func,
- params={'analysis_description_xcom_key':'analysis_description',
- 'analysis_description_xcom_task':'fetch_nextflow_analysis_info_and_branch',
- 'nextflow_command_xcom_key':'nexflow_command',
- 'nextflow_work_dir_xcom_key':'nextflow_work_dir'})
- ## TASK
- no_analysis = \
- DummyOperator(
- task_id='no_analysis',
- dag=dag)
- ## TASK
- update_analysis_and_status = \
- PythonOperator(
- task_id='update_analysis_and_status',
- dag=dag,
- queue='hpc_4G',
- python_callable=change_pipeline_status,
- trigger_rule='none_failed_or_skipped',
- params={'new_status':'FINISHED',
- 'no_change_status':'SEEDED'})
- ## PIPELINE
- fetch_nextflow_analysis_info_and_branch >> prep_nf_chipseq_run
- fetch_nextflow_analysis_info_and_branch >> no_analysis
- no_analysis >> update_analysis_and_status
- ## TASK
- run_nf_chipseq = \
- PythonOperator(
- task_id='run_nf_chipseq',
- dag=dag,
- queue='hpc_4G_long',
- pool='nextflow_hpc',
- python_callable=run_nf_command_func,
- params={'nextflow_command_xcom_task':'prep_nf_chipseq_run',
- 'nextflow_command_xcom_key':'nexflow_command',
- 'nextflow_work_dir_xcom_task':'prep_nf_chipseq_run',
- 'nextflow_work_dir_xcom_key':'nextflow_work_dir'})
- ## TASK
- copy_nf_output_to_disk = \
- PythonOperator(
- task_id='copy_nf_output_to_disk',
- dag=dag,
- queue='hpc_4G',
- python_callable=copy_nf_output_to_disk_func,
- params={'nextflow_work_dir_xcom_task':'prep_nf_atacseq_run',
- 'nextflow_work_dir_xcom_key':'nextflow_work_dir',
- 'result_dirname':'results',
- 'data_dir_list':['bwa'],
- 'report_file_dirs':['fastqc','igv','multiqc','pipeline_info','trim_galore'],
- 'dag_file_name':'dag.html'})
- ## TASK
- nf_analysis_copy_branch = \
- BranchPythonOperator(
- task_id='nf_analysis_copy_branch',
- dag=dag,
- queue='hpc_4G',
- python_callable=nf_analysis_copy_branch_func,
- params={'nextflow_work_dir_xcom_task':'prep_nf_atacseq_run',
- 'nextflow_work_dir_xcom_key':'nextflow_work_dir',
- 'result_dirname':'results',
- 'data_dir_list':['bwa'],
- 'data_dir_xcom_key':'data_dir',
- 'report_file_dirs':['fastqc','igv','multiqc','pipeline_info','trim_galore'],
- 'report_file_xcom_key':'report_dir',
- 'dag_file_name':'dag.html',
- 'dag_file_xcom_key':'dag_file',
- 'data_file_copy_tasks':['copy_nf_data_to_irods'],
- 'report_file_copy_tasks':['copy_nf_data_to_box']})
- ## TASK
- copy_nf_data_to_irods = \
- PythonOperator(
- task_id='copy_nf_data_to_irods',
- dag=dag,
- queue='hpc_4G',
- python_callable=copy_nf_data_to_irods_func,
- params={'nextflow_work_dir_xcom_task':'prep_nf_atacseq_run',
- 'nextflow_work_dir_xcom_key':'nextflow_work_dir',
- 'result_dirname':'results',
- 'data_dir_xcom_key':'data_dir',
- 'data_dir_xcom_task':'nf_analysis_copy_branch'})
- ## TASK
- copy_nf_data_to_box = \
- PythonOperator(
- task_id='copy_nf_data_to_box',
- dag=dag,
- queue='hpc_4G',
- python_callable=copy_nf_data_to_box_func,
- params={'report_file_xcom_key':'report_dir',
- 'report_file_xcom_task':'nf_analysis_copy_branch',
- 'dag_file_xcom_key':'dag_file',
- 'dag_file_xcom_task':'nf_analysis_copy_branch'})
- ## PIPELINE
- prep_nf_chipseq_run >> run_nf_chipseq >> nf_analysis_copy_branch
- run_nf_chipseq >> copy_nf_output_to_disk
- nf_analysis_copy_branch >> copy_nf_data_to_irods
- nf_analysis_copy_branch >> copy_nf_data_to_box
- copy_nf_output_to_disk >> update_analysis_and_status
- copy_nf_data_to_irods >> update_analysis_and_status
- copy_nf_data_to_box >> update_analysis_and_status
|