Avik Datta 3 år sedan
förälder
incheckning
cdeaccf32b
1 ändrade filer med 10 tillägg och 4 borttagningar
  1. 10 4
      dags/dag10_nextflow_atacseq_pipeline.py

+ 10 - 4
dags/dag10_nextflow_atacseq_pipeline.py

@@ -44,7 +44,10 @@ with dag:
       dag=dag,
       queue='hpc_4G',
       python_callable=prep_nf_atacseq_run_func,
-      params={})
+      params={'analysis_description_xcom_key':'analysis_description',
+              'analysis_description_xcom_task':'fetch_nextflow_analysis_info_and_branch',
+              'nextflow_command_xcom_key':'nexflow_command',
+              'nextflow_work_dir_xcom_key':'nextflow_work_dir'})
   ## TASK
   no_analysis = \
     DummyOperator(
@@ -71,8 +74,11 @@ with dag:
       dag=dag,
       queue='hpc_4G',
       pool_slots='nextflow_hpc',
-      python_callable=run_nf_atacseq_func,
-      params={})
+      python_callable=run_nf_command_func,
+      params={'nextflow_command_xcom_task':'prep_nf_atacseq_run',
+              'nextflow_command_xcom_key':'nexflow_command',
+              'nextflow_work_dir_xcom_task':'prep_nf_atacseq_run',
+              'nextflow_work_dir_xcom_key':'nextflow_work_dir'})
   ## TASK
   copy_nf_atacseq_branch = \
     BranchPythonOperator(
@@ -94,7 +100,7 @@ with dag:
       task_id='copy_data_to_box',
       dag=dag,
       queue='hpc_4G',
-      python_callable=copy _data_to_irods_func,
+      python_callable=copy_data_to_irods_func,
       params={})
   ## PIPELINE
   prep_nf_atacseq_run >> run_nf_atacseq >> copy_nf_atacseq_branch