瀏覽代碼

uploading files to rds,irods and box

Avik Datta 3 年之前
父節點
當前提交
f36e2d8c35
共有 1 個文件被更改,包括 51 次插入22 次删除
  1. 51 22
      dags/dag10_nextflow_atacseq_pipeline.py

+ 51 - 22
dags/dag10_nextflow_atacseq_pipeline.py

@@ -8,10 +8,11 @@ from airflow.operators.dummy_operator import DummyOperator
 from igf_airflow.utils.dag10_nextflow_atacseq_pipeline import fetch_nextflow_analysis_info_and_branch_func
 from igf_airflow.utils.dag10_nextflow_atacseq_pipeline import prep_nf_run_func
 from igf_airflow.utils.dag10_nextflow_atacseq_pipeline import run_nf_command_func
-from igf_airflow.utils.dag10_nextflow_atacseq_pipeline import copy_data_to_box_func
-from igf_airflow.utils.dag10_nextflow_atacseq_pipeline import copy_data_to_irods_func
-from igf_airflow.utils.dag10_nextflow_atacseq_pipeline import copy_nf_atacseq_branch_func
+from igf_airflow.utils.dag10_nextflow_atacseq_pipeline import copy_nf_data_to_box_func
+from igf_airflow.utils.dag10_nextflow_atacseq_pipeline import copy_nf_data_to_irods_func
+from igf_airflow.utils.dag10_nextflow_atacseq_pipeline import nf_analysis_copy_branch_func
 from igf_airflow.utils.dag10_nextflow_atacseq_pipeline import change_pipeline_status
+from igf_airflow.utils.dag10_nextflow_atacseq_pipeline import copy_nf_output_to_disk_func
 
 default_args = {
   'owner': 'airflow',
@@ -79,7 +80,7 @@ with dag:
     PythonOperator(
       task_id='run_nf_atacseq',
       dag=dag,
-      queue='hpc_4G',
+      queue='hpc_4G_long',
       pool_slots='nextflow_hpc',
       python_callable=run_nf_command_func,
       params={'nextflow_command_xcom_task':'prep_nf_atacseq_run',
@@ -87,31 +88,59 @@ with dag:
               'nextflow_work_dir_xcom_task':'prep_nf_atacseq_run',
               'nextflow_work_dir_xcom_key':'nextflow_work_dir'})
   ## TASK
-  copy_nf_atacseq_branch = \
+  copy_nf_output_to_disk = \
+    PythonOperator(
+      task_id='copy_nf_output_to_disk',
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=copy_nf_output_to_disk_func,
+      params={'nextflow_work_dir_xcom_task':'prep_nf_atacseq_run',
+              'nextflow_work_dir_xcom_key':'nextflow_work_dir',
+              'result_dirname':'results',
+              'data_dir_list':['bwa'],
+              'report_file_dirs':['fastqc','igv','multiqc','pipeline_info','trim_galore'],
+              'dag_file_name':'dag.html'})
+  nf_analysis_copy_branch = \
     BranchPythonOperator(
-      task_id='copy_nf_atacseq_branch',
+      task_id='nf_analysis_copy_branch',
       dag=dag,
       queue='hpc_4G',
-      python_callable=copy_nf_atacseq_branch_func,
-      params={'data_files':'',
-              'html_files':''})
-  copy_data_to_irods = \
+      python_callable=nf_analysis_copy_branch_func,
+      params={'nextflow_work_dir_xcom_task':'prep_nf_atacseq_run',
+              'nextflow_work_dir_xcom_key':'nextflow_work_dir',
+              'result_dirname':'results',
+              'data_dir_list':['bwa'],
+              'data_dir_xcom_key':'data_dir',
+              'report_file_dirs':['fastqc','igv','multiqc','pipeline_info','trim_galore'],
+              'report_file_xcom_key':'report_dir',
+              'dag_file_name':'dag.html',
+              'dag_file_xcom_key':'dag_file',
+              'data_file_copy_tasks':['copy_nf_data_to_irods'],
+              'report_file_copy_tasks':['copy_nf_data_to_box']})
+  copy_nf_data_to_irods = \
     PythonOperator(
-      task_id='copy_data_to_irods',
+      task_id='copy_nf_data_to_irods',
       dag=dag,
       queue='hpc_4G',
-      python_callable=copy_data_to_irods_func,
-      params={})
-  copy_data_to_box = \
+      python_callable=copy_nf_data_to_irods_func,
+      params={'nextflow_work_dir_xcom_task':'prep_nf_atacseq_run',
+              'nextflow_work_dir_xcom_key':'nextflow_work_dir',
+              'data_dir_xcom_key':'data_dir',
+              'data_dir_xcom_task':'nf_analysis_copy_branch'})
+  copy_nf_data_to_box = \
     PythonOperator(
-      task_id='copy_data_to_box',
+      task_id='copy_nf_data_to_box',
       dag=dag,
       queue='hpc_4G',
-      python_callable=copy_data_to_irods_func,
-      params={})
+      python_callable=copy_nf_data_to_irods_func,
+      params={'report_file_xcom_key':'report_dir',
+              'report_file_xcom_task':'nf_analysis_copy_branch',
+              'dag_file_xcom_key':'dag_file',
+              'dag_file_xcom_task':'nf_analysis_copy_branch'})
   ## PIPELINE
-  prep_nf_atacseq_run >> run_nf_atacseq >> copy_nf_atacseq_branch
-  copy_nf_atacseq_branch >> copy_data_to_irods
-  copy_nf_atacseq_branch >> copy_data_to_box
-  copy_data_to_irods >> update_analysis_and_status
-  copy_data_to_box >> update_analysis_and_status
+  prep_nf_atacseq_run >> run_nf_atacseq >> nf_analysis_copy_branch
+  run_nf_atacseq >> copy_nf_output_to_disk
+  nf_analysis_copy_branch >> copy_nf_data_to_irods
+  nf_analysis_copy_branch >> copy_nf_data_to_box
+  copy_nf_data_to_irods >> update_analysis_and_status
+  copy_nf_data_to_box >> update_analysis_and_status