浏览代码

added seqrun progress checking

Avik Datta 4 年之前
父节点
当前提交
ca7e32314d
共有 2 个文件被更改,包括 125 次插入1 次删除
  1. 1 0
      airflow_var/var.json
  2. 124 1
      dags/dag8_copy_ongoing_seqrun.py

+ 1 - 0
airflow_var/var.json

@@ -13,6 +13,7 @@
   "asana_demult_project":"",
   "ms_teams_conf":"",
   "hpc_seqrun_path":"",
+  "interop_dumptext_exe":"",
   "hpc_queue_list":{
     "hpc_1G": {
       "pbs_resource":"-lselect=1:ncpus=1:mem=1gb -lwalltime=02:00:00",

+ 124 - 1
dags/dag8_copy_ongoing_seqrun.py

@@ -1,5 +1,5 @@
 from datetime import timedelta
-import os,json,logging
+import os,json,logging,subprocess
 from airflow.models import DAG,Variable
 from airflow.utils.dates import days_ago
 from airflow.operators.bash_operator import BashOperator
@@ -8,6 +8,7 @@ from airflow.operators.python_operator import PythonOperator,BranchPythonOperato
 from airflow.contrib.hooks.ssh_hook import SSHHook
 from airflow.operators.dummy_operator import DummyOperator
 from igf_airflow.seqrun.ongoing_seqrun_processing import fetch_ongoing_seqruns,compare_existing_seqrun_files
+from igf_airflow.seqrun.ongoing_seqrun_processing import check_for_sequencing_progress
 from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
 from igf_data.utils.fileutils import get_temp_dir,copy_remote_file,check_file_path,read_json_data
 
@@ -281,6 +282,94 @@ def copy_seqrun_chunk(**context):
     raise
 
 
+def run_interop_dump(**context):
+  """
+  A function for generating InterOp dump for seqrun
+  """
+  try:
+    ti = context.get('ti')
+    local_seqrun_path = Variable.get('hpc_seqrun_path')
+    seqrun_id_pull_key = context['params'].get('seqrun_id_pull_key')
+    seqrun_id_pull_task_ids = context['params'].get('seqrun_id_pull_task_ids')
+    run_index_number = context['params'].get('run_index_number')
+    interop_dumptext_exe = Variable.get('interop_dumptext_exe')
+    temp_dir = get_temp_dir(use_ephemeral_space=True)
+    seqrun_id = \
+      ti.xcom_pull(key=seqrun_id_pull_key,task_ids=seqrun_id_pull_task_ids)[run_index_number]
+    seqrun_path = \
+      os.path.join(
+        local_seqrun_path,
+        seqrun_id)
+    dump_file = \
+      os.path.join(
+        temp_dir,
+        '{0}_interop_dump.csv'.format(seqrun_id))
+    check_file_path(interop_dumptext_exe)
+    cmd = \
+      [interop_dumptext_exe,seqrun_path,'>',dump_file]
+    cmd = ' '.join(cmd)
+    subprocess.\
+      check_call(cmd,shell=True)
+    return dump_file
+  except Exception as e:
+    logging.error(e)
+    send_log_to_channels(
+      slack_conf=Variable.get('slack_conf'),
+      ms_teams_conf=Variable.get('ms_teams_conf'),
+      task_id=context['task'].task_id,
+      dag_id=context['task'].dag_id,
+      comment=e,
+      reaction='fail')
+    raise
+
+
+def check_progress_for_run_func(**context):
+  try:
+    ti = context.get('ti')
+    local_seqrun_path = Variable.get('hpc_seqrun_path')
+    seqrun_id_pull_key = context['params'].get('seqrun_id_pull_key')
+    seqrun_id_pull_task_ids = context['params'].get('seqrun_id_pull_task_ids')
+    run_index_number = context['params'].get('run_index_number')
+    interop_dump_pull_task = context['params'].get('interop_dump_pull_task')
+    seqrun_id = \
+      ti.xcom_pull(key=seqrun_id_pull_key,task_ids=seqrun_id_pull_task_ids)[run_index_number]
+    runinfo_path = \
+      os.path.join(
+        local_seqrun_path,
+        seqrun_id,
+        'RunInfo.xml')
+    interop_dump_path = \
+      ti.xcom_pull(task_ids=interop_dump_pull_task)
+    if interop_dump_path is not None and \
+       not isinstance(interop_dump_path,str):
+      interop_dump_path = \
+        interop_dump_path.decode().strip('\n')
+    current_cycle,index_cycle_status,read_format = \
+      check_for_sequencing_progress(
+        interop_dump=interop_dump_path,
+        runinfo_file=runinfo_path)
+    comment = \
+      'current cycle: {0}, index cycle: {1}, read format: {2}'.\
+        format(current_cycle,index_cycle_statusread_format)
+    send_log_to_channels(
+      slack_conf=Variable.get('slack_conf'),
+      ms_teams_conf=Variable.get('ms_teams_conf'),
+      task_id=context['task'].task_id,
+      dag_id=context['task'].dag_id,
+      comment=comment,
+      reaction='pass')
+  except Exception as e:
+    logging.error(e)
+    send_log_to_channels(
+      slack_conf=Variable.get('slack_conf'),
+      ms_teams_conf=Variable.get('ms_teams_conf'),
+      task_id=context['task'].task_id,
+      dag_id=context['task'].dag_id,
+      comment=e,
+      reaction='fail')
+    raise
+
+
 with dag:
   ## TASK
   generate_seqrun_list = \
@@ -375,9 +464,43 @@ with dag:
                   'local_seqrun_path':Variable.get('hpc_seqrun_path')},
           python_callable=copy_seqrun_chunk)
       copy_seqrun_files.append(copy_file_chunk)
+    ## PIPELINE
     generate_seqrun_list >> generate_seqrun_file_list >> copy_seqrun_file_list >> compare_seqrun_files >> decide_copy_branch
     decide_copy_branch >> no_copy_seqrun
     decide_copy_branch >> copy_seqrun_files
+    ## TASK
+    wait_for_copy_chunk = \
+      DummyOperator(
+        task_id='wait_for_copy_chunk_run_{0}'.format(i),
+        dag=dag,
+        trigger_rule='none_failed_or_skipped',
+        queue='hpc_4G')
+    ## PIPELINE
+    copy_seqrun_files >> wait_for_copy_chunk
+    ## TASK
+    create_interop_dump = \
+      PythonOperator(
+        task_id='create_interop_dump_run_{0}'.format(i),
+        dag=dag,
+        queue='hpc_4G',
+        params={'run_index_number':i,
+                'seqrun_id_pull_key':'ongoing_seqruns',
+                'seqrun_id_pull_task_ids':'generate_seqrun_list'},
+        python_callable=run_interop_dump)
+    ## PIPELINE
+    wait_for_copy_chunk >> create_interop_dump
+    ## TASK
+    check_progress_for_run = \
+      PythonOperator(
+        task_id='check_progress_for_run_{0}'.format(i),
+        dag=dag,
+        queue='hpc_4G',
+        params={'run_index_number':i,
+                'seqrun_id_pull_key':'ongoing_seqruns',
+                'seqrun_id_pull_task_ids':'generate_seqrun_list',
+                'interop_dump_pull_task':'create_interop_dump_run_{0}'.format(i)},
+        python_callable=check_progress_for_run_func)
+
 
   ## PIPELINE
   generate_seqrun_list >> no_ongoing_seqrun