Browse Source

added tile demultiplexing and validation checking

Avik Datta 4 năm trước cách đây
mục cha
commit
7f676d5104
1 tập tin đã thay đổi với 78 bổ sung6 xóa
  1. 78 6
      dags/dag8_copy_ongoing_seqrun.py

+ 78 - 6
dags/dag8_copy_ongoing_seqrun.py

@@ -4,14 +4,24 @@ from airflow.models import DAG,Variable
 from airflow.utils.dates import days_ago
 from airflow.operators.bash_operator import BashOperator
 from airflow.contrib.operators.ssh_operator import SSHOperator
-from airflow.operators.python_operator import PythonOperator,BranchPythonOperator
+from airflow.operators.python_operator import PythonOperator
+from airflow.operators.python_operator import BranchPythonOperator
 from airflow.contrib.hooks.ssh_hook import SSHHook
 from airflow.operators.dummy_operator import DummyOperator
-from igf_airflow.seqrun.ongoing_seqrun_processing import fetch_ongoing_seqruns,compare_existing_seqrun_files
+from igf_airflow.seqrun.ongoing_seqrun_processing import fetch_ongoing_seqruns
+from igf_airflow.seqrun.ongoing_seqrun_processing import compare_existing_seqrun_files
 from igf_airflow.seqrun.ongoing_seqrun_processing import check_for_sequencing_progress
 from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
 from igf_data.utils.fileutils import get_temp_dir,copy_remote_file,check_file_path,read_json_data
-
+from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import get_ongoing_seqrun_list
+from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import copy_seqrun_manifest_file
+from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import reset_manifest_file
+from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import get_seqrun_chunks
+from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import copy_seqrun_chunk
+from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import run_interop_dump
+from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import check_progress_for_run_func
+from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import samplesheet_validation_and_branch_func
+from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import run_tile_demult_list_func
 
 ## DEFAULT ARGS
 default_args = {
@@ -45,7 +55,7 @@ dag = \
     default_args=default_args,
     orientation='LR')
 
-
+'''
 ## FUNCTIONS
 def get_ongoing_seqrun_list(**context):
   """
@@ -331,6 +341,10 @@ def check_progress_for_run_func(**context):
     seqrun_id_pull_task_ids = context['params'].get('seqrun_id_pull_task_ids')
     run_index_number = context['params'].get('run_index_number')
     interop_dump_pull_task = context['params'].get('interop_dump_pull_task')
+    no_job_prefix = context['params'].get('no_job_prefix')
+    next_job_prefix = context['params'].get('next_job_prefix')
+    job_list = \
+      ['{0}_{1}'.format(no_job_prefix,run_index_number)]
     seqrun_id = \
       ti.xcom_pull(key=seqrun_id_pull_key,task_ids=seqrun_id_pull_task_ids)[run_index_number]
     runinfo_path = \
@@ -358,6 +372,10 @@ def check_progress_for_run_func(**context):
       dag_id=context['task'].dag_id,
       comment=comment,
       reaction='pass')
+    if index_cycle_status is 'complete':
+      job_list = \
+        ['{0}_{1}'.format(next_job_prefix,run_index_number)]
+    return job_list
   except Exception as e:
     logging.error(e)
     send_log_to_channels(
@@ -368,7 +386,7 @@ def check_progress_for_run_func(**context):
       comment=e,
       reaction='fail')
     raise
-
+'''
 
 with dag:
   ## TASK
@@ -491,17 +509,71 @@ with dag:
     wait_for_copy_chunk >> create_interop_dump
     ## TASK
     check_progress_for_run = \
-      PythonOperator(
+      BranchPythonOperator(
         task_id='check_progress_for_run_{0}'.format(i),
         dag=dag,
         queue='hpc_4G',
         params={'run_index_number':i,
                 'seqrun_id_pull_key':'ongoing_seqruns',
                 'seqrun_id_pull_task_ids':'generate_seqrun_list',
+                'samplesheet_validation_job_prefix':'samplesheet_validation',
+                'tile_demult_job_prefix':'tile_demultiplexing',
+                'no_job_prefix':'no_seqrun_checking',
+                'next_job_prefix':'samplesheet_validation',
+                'runParameters_xml_file_name':'runParameters.xml',
+                'samplesheet_file_name':'SampleSheet.csv',
                 'interop_dump_pull_task':'create_interop_dump_run_{0}'.format(i)},
         python_callable=check_progress_for_run_func)
     ## PIPELINE
     create_interop_dump >> check_progress_for_run
+    ## TASK
+    no_seqrun_checking = \
+      DummyOperator(
+        task_id='no_seqrun_checking_{0}'.format(i),
+        dag=dag,
+        queue='hpc_4G')
+    ## PIPELINE
+    check_progress_for_run >> no_seqrun_checking
+    ## TASK
+    samplesheet_validation_and_branch = \
+      BranchPythonOperator(
+        task_id='samplesheet_validation_{0}'.format(i),
+        dag=dag,
+        queue='hpc_4G',
+        params={'run_index_number':i,
+                'seqrun_id_pull_key':'ongoing_seqruns',
+                'seqrun_id_pull_task_ids':'generate_seqrun_list',
+                'samplesheet_file_name':'SampleSheet.csv',
+                'runParameters_xml_file_name':'runParameters.xml',
+                'no_job_prefix':'no_seqrun_checking',
+                'next_job_prefix':'tile_demultiplexing',
+                'next_job_range':[i for i in range(1,9)]},
+        python_callable=samplesheet_validation_and_branch_func)
+    ## PIPELINE
+    check_progress_for_run >> samplesheet_validation_and_branch
+    ## TASK
+    run_tile_demult_list = list()
+    for j in range(1,9):
+      run_tile_demult_per_lane = \
+        PythonOperator(
+          task_id='tile_demultiplexing_{0}_{1}'.format(i,j),
+          dag=dag,
+        queue='hpc_4G',
+        params={'run_index_number':i,
+                'lane_id':j,
+                'seqrun_id_pull_key':'ongoing_seqruns',
+                'seqrun_id_pull_task_ids':'generate_seqrun_list',
+                'samplesheet_file_name':'SampleSheet.csv',
+                'runinfo_xml_file_name':'RunInfo.xml',
+                'runParameters_xml_file_name':'runParameters.xml',
+                'tile_list':[1101,],
+                'threads':1},
+        python_callable=run_tile_demult_list_func)
+      run_tile_demult_list.\
+        append(run_tile_demult_per_lane)
+    ## PIPELINE
+    samplesheet_validation_and_branch >> run_tile_demult_list
+    samplesheet_validation_and_branch >> no_seqrun_checking
 
   ## PIPELINE
   generate_seqrun_list >> no_ongoing_seqrun