|
@@ -0,0 +1,151 @@
|
|
|
+from datetime import timedelta
|
|
|
+import os,json,logging,subprocess
|
|
|
+from airflow.models import DAG,Variable
|
|
|
+from airflow.utils.dates import days_ago
|
|
|
+from airflow.operators.python_operator import PythonOperator
|
|
|
+from airflow.operators.python_operator import BranchPythonOperator
|
|
|
+from airflow.operators.dummy_operator import DummyOperator
|
|
|
+from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import fetch_nextflow_analysis_info_and_branch_func
|
|
|
+from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import prep_nf_run_func
|
|
|
+from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import run_nf_command_func
|
|
|
+from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import copy_nf_data_to_box_func
|
|
|
+from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import copy_nf_data_to_irods_func
|
|
|
+from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import nf_analysis_copy_branch_func
|
|
|
+from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import change_pipeline_status
|
|
|
+from igf_airflow.utils.dag10_nextflow_atacseq_pipeline_utils import copy_nf_output_to_disk_func
|
|
|
+
|
|
|
+default_args = {
|
|
|
+ 'owner': 'airflow',
|
|
|
+ 'depends_on_past': False,
|
|
|
+ 'start_date': days_ago(2),
|
|
|
+ 'email_on_failure': False,
|
|
|
+ 'email_on_retry': False,
|
|
|
+ 'retries': 4,
|
|
|
+ 'max_active_runs':10,
|
|
|
+ 'catchup':True,
|
|
|
+ 'retry_delay': timedelta(minutes=5),
|
|
|
+ 'provide_context': True}
|
|
|
+
|
|
|
+dag = \
|
|
|
+ DAG(
|
|
|
+ dag_id='dag10_nextflow_chipseq_pipeline',
|
|
|
+ schedule_interval=None,
|
|
|
+ tags=['hpc','analysis','nextflow','chipseq'],
|
|
|
+ default_args=default_args,
|
|
|
+ orientation='LR')
|
|
|
+
|
|
|
+with dag:
|
|
|
+ ## TASK
|
|
|
+ fetch_nextflow_analysis_info_and_branch = \
|
|
|
+ BranchPythonOperator(
|
|
|
+ task_id='fetch_nextflow_analysis_info_and_branch',
|
|
|
+ dag=dag,
|
|
|
+ queue='hpc_4G',
|
|
|
+ python_callable=fetch_nextflow_analysis_info_and_branch_func,
|
|
|
+ params={'no_analysis_task':'no_analysis',
|
|
|
+ 'active_tasks':['prep_nf_atacseq_run'],
|
|
|
+ 'analysis_description_xcom_key':'analysis_description'})
|
|
|
+ ## TASK
|
|
|
+ prep_nf_chipseq_run = \
|
|
|
+ PythonOperator(
|
|
|
+ task_id='prep_nf_chipseq_run',
|
|
|
+ dag=dag,
|
|
|
+ queue='hpc_4G',
|
|
|
+ python_callable=prep_nf_run_func,
|
|
|
+ params={'analysis_description_xcom_key':'analysis_description',
|
|
|
+ 'analysis_description_xcom_task':'fetch_nextflow_analysis_info_and_branch',
|
|
|
+ 'nextflow_command_xcom_key':'nexflow_command',
|
|
|
+ 'nextflow_work_dir_xcom_key':'nextflow_work_dir'})
|
|
|
+ ## TASK
|
|
|
+ no_analysis = \
|
|
|
+ DummyOperator(
|
|
|
+ task_id='no_analysis',
|
|
|
+ dag=dag)
|
|
|
+ ## TASK
|
|
|
+ update_analysis_and_status = \
|
|
|
+ PythonOperator(
|
|
|
+ task_id='update_analysis_and_status',
|
|
|
+ dag=dag,
|
|
|
+ queue='hpc_4G',
|
|
|
+ python_callable=change_pipeline_status,
|
|
|
+ trigger_rule='none_failed_or_skipped',
|
|
|
+ params={'new_status':'FINISHED',
|
|
|
+ 'no_change_status':'SEEDED'})
|
|
|
+ ## PIPELINE
|
|
|
+ fetch_nextflow_analysis_info_and_branch >> prep_nf_chipseq_run
|
|
|
+ fetch_nextflow_analysis_info_and_branch >> no_analysis
|
|
|
+ no_analysis >> update_analysis_and_status
|
|
|
+ ## TASK
|
|
|
+ run_nf_chipseq = \
|
|
|
+ PythonOperator(
|
|
|
+ task_id='run_nf_chipseq',
|
|
|
+ dag=dag,
|
|
|
+ queue='hpc_4G_long',
|
|
|
+ pool='nextflow_hpc',
|
|
|
+ python_callable=run_nf_command_func,
|
|
|
+ params={'nextflow_command_xcom_task':'prep_nf_chipseq_run',
|
|
|
+ 'nextflow_command_xcom_key':'nexflow_command',
|
|
|
+ 'nextflow_work_dir_xcom_task':'prep_nf_chipseq_run',
|
|
|
+ 'nextflow_work_dir_xcom_key':'nextflow_work_dir'})
|
|
|
+ ## TASK
|
|
|
+ copy_nf_output_to_disk = \
|
|
|
+ PythonOperator(
|
|
|
+ task_id='copy_nf_output_to_disk',
|
|
|
+ dag=dag,
|
|
|
+ queue='hpc_4G',
|
|
|
+ python_callable=copy_nf_output_to_disk_func,
|
|
|
+ params={'nextflow_work_dir_xcom_task':'prep_nf_atacseq_run',
|
|
|
+ 'nextflow_work_dir_xcom_key':'nextflow_work_dir',
|
|
|
+ 'result_dirname':'results',
|
|
|
+ 'data_dir_list':['bwa'],
|
|
|
+ 'report_file_dirs':['fastqc','igv','multiqc','pipeline_info','trim_galore'],
|
|
|
+ 'dag_file_name':'dag.html'})
|
|
|
+ ## TASK
|
|
|
+ nf_analysis_copy_branch = \
|
|
|
+ BranchPythonOperator(
|
|
|
+ task_id='nf_analysis_copy_branch',
|
|
|
+ dag=dag,
|
|
|
+ queue='hpc_4G',
|
|
|
+ python_callable=nf_analysis_copy_branch_func,
|
|
|
+ params={'nextflow_work_dir_xcom_task':'prep_nf_atacseq_run',
|
|
|
+ 'nextflow_work_dir_xcom_key':'nextflow_work_dir',
|
|
|
+ 'result_dirname':'results',
|
|
|
+ 'data_dir_list':['bwa'],
|
|
|
+ 'data_dir_xcom_key':'data_dir',
|
|
|
+ 'report_file_dirs':['fastqc','igv','multiqc','pipeline_info','trim_galore'],
|
|
|
+ 'report_file_xcom_key':'report_dir',
|
|
|
+ 'dag_file_name':'dag.html',
|
|
|
+ 'dag_file_xcom_key':'dag_file',
|
|
|
+ 'data_file_copy_tasks':['copy_nf_data_to_irods'],
|
|
|
+ 'report_file_copy_tasks':['copy_nf_data_to_box']})
|
|
|
+ ## TASK
|
|
|
+ copy_nf_data_to_irods = \
|
|
|
+ PythonOperator(
|
|
|
+ task_id='copy_nf_data_to_irods',
|
|
|
+ dag=dag,
|
|
|
+ queue='hpc_4G',
|
|
|
+ python_callable=copy_nf_data_to_irods_func,
|
|
|
+ params={'nextflow_work_dir_xcom_task':'prep_nf_atacseq_run',
|
|
|
+ 'nextflow_work_dir_xcom_key':'nextflow_work_dir',
|
|
|
+ 'result_dirname':'results',
|
|
|
+ 'data_dir_xcom_key':'data_dir',
|
|
|
+ 'data_dir_xcom_task':'nf_analysis_copy_branch'})
|
|
|
+ ## TASK
|
|
|
+ copy_nf_data_to_box = \
|
|
|
+ PythonOperator(
|
|
|
+ task_id='copy_nf_data_to_box',
|
|
|
+ dag=dag,
|
|
|
+ queue='hpc_4G',
|
|
|
+ python_callable=copy_nf_data_to_box_func,
|
|
|
+ params={'report_file_xcom_key':'report_dir',
|
|
|
+ 'report_file_xcom_task':'nf_analysis_copy_branch',
|
|
|
+ 'dag_file_xcom_key':'dag_file',
|
|
|
+ 'dag_file_xcom_task':'nf_analysis_copy_branch'})
|
|
|
+ ## PIPELINE
|
|
|
+ prep_nf_chipseq_run >> run_nf_chipseq >> nf_analysis_copy_branch
|
|
|
+ run_nf_chipseq >> copy_nf_output_to_disk
|
|
|
+ nf_analysis_copy_branch >> copy_nf_data_to_irods
|
|
|
+ nf_analysis_copy_branch >> copy_nf_data_to_box
|
|
|
+ copy_nf_output_to_disk >> update_analysis_and_status
|
|
|
+ copy_nf_data_to_irods >> update_analysis_and_status
|
|
|
+ copy_nf_data_to_box >> update_analysis_and_status
|