Explorar o código

added merge report generation

Avik Datta %!s(int64=3) %!d(string=hai) anos
pai
achega
611fb1f286
Modificáronse 2 ficheiros con 16 adicións e 13 borrados
  1. 1 1
      airflow_var/var.json
  2. 15 12
      dags/dag16_test_illumina_demultiplexing.py

+ 1 - 1
airflow_var/var.json

@@ -27,7 +27,7 @@
   "analysis_ms_teams_conf": "/rds/general/user/igf/home/secret_keys/team_analysis.json",
   "hpc_seqrun_path": "/rds/general/project/genomics-facility-archive-2019/ephemeral/",
   "interop_dumptext_exe": "/rds/general/user/igf/home/anaconda3/envs/interop/bin/interop_dumptext",
-  "interop_notebook_image_path": "/project/tgu/resources/pipeline_resource/singularity_images/interop/interop-notebook-image_vlatest.sif",
+  "interop_notebook_image_path": "/project/tgu/resources/pipeline_resource/singularity_images/interop/interop-notebook-image_v3.sif",
   "interop_notebook_template": "/project/tgu/software/interop-notebook-image/templates/interop_report_v0.0.2.ipynb",
   "seqrun_ml_notebook_template": "/project/tgu/software/interop-notebook-image/templates/seqrun_ml_v0.0.1.ipynb",
   "seqrun_training_data_csv": "/project/tgu/resources/pipeline_resource/seqrun_ml/seqrun_training_data.csv",

+ 15 - 12
dags/dag16_test_illumina_demultiplexing.py

@@ -6,7 +6,7 @@ from airflow.operators.python_operator import PythonOperator
 from airflow.operators.python_operator import BranchPythonOperator
 from igf_airflow.utils.dag16_test_illumina_demultiplexing_utils import get_samplesheet_and_decide_flow_func
 from igf_airflow.utils.dag16_test_illumina_demultiplexing_utils import  run_demultiplexing_func
-#from igf_airflow.utils.dag16_test_illumina_demultiplexing_utils import  prepare_merged_report_func
+from igf_airflow.utils.dag16_test_illumina_demultiplexing_utils import  prepare_merged_report_func
 
 
 args = {
@@ -44,16 +44,19 @@ with dag:
                 'samplesheet_xcom_key': 'formatted_samplesheets'},
             python_callable=get_samplesheet_and_decide_flow_func)
     ## TASK
-    # prepare_merged_report = \
-    #    PythonOperator(
-    #        task_id='prepare_merged_report',
-    #        dag=dag,
-    #        queue='hpc_4G',
-    #        params={
-    #            'samplesheet_xcom_task': 'get_samplesheet_and_decide_flow',
-    #            'samplesheet_xcom_key': 'formatted_samplesheets',
-    #            'output_path_xcom_key': 'temp_output_path'},
-    #        python_callable=prepare_merged_report_func)
+    prepare_merged_report = \
+       PythonOperator(
+           task_id='prepare_merged_report',
+           dag=dag,
+           queue='hpc_4G',
+           params={
+               'output_path_xcom_task': 'get_samplesheet_and_decide_flow',
+               'samplesheet_xcom_key': 'formatted_samplesheets',
+               'output_path_xcom_key': 'temp_output_path',
+               'script_path': '/project/tgu/software/demultiplexing_report/scripts/generate_illumina_report_spark.py',
+               'template_path': '/project/tgu/software/demultiplexing_report/template/illumina_report_v1.html',
+               'code_dir': '/project/tgu/software/demultiplexing_report/'},
+           python_callable=prepare_merged_report_func)
     ## TASK
     for i in range(1, 9):
         t = \
@@ -70,5 +73,5 @@ with dag:
                     'output_path_xcom_key': 'temp_output_path'},
                 python_callable=run_demultiplexing_func)
         ## PIPELINE
-        get_samplesheet_and_decide_flow >> t
+        get_samplesheet_and_decide_flow >> t >> prepare_merged_report