|
@@ -7,7 +7,7 @@ from airflow.contrib.operators.ssh_operator import SSHOperator
|
|
|
from airflow.operators.python_operator import PythonOperator,BranchPythonOperator
|
|
|
from airflow.contrib.hooks.ssh_hook import SSHHook
|
|
|
from airflow.operators.dummy_operator import DummyOperator
|
|
|
-from igf_airflow.seqrun.ongoing_seqrun_processing import fetch_ongoing_seqruns
|
|
|
+from igf_airflow.seqrun.ongoing_seqrun_processing import fetch_ongoing_seqruns,compare_existing_seqrun_files
|
|
|
from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
|
|
|
from igf_data.utils.fileutils import get_temp_dir,copy_remote_file,check_file_path,read_json_data
|
|
|
|
|
@@ -97,7 +97,6 @@ def copy_seqrun_manifest_file(**context):
|
|
|
seqrun_server = Variable.get('seqrun_server')
|
|
|
seqrun_server_user = Variable.get('seqrun_server_user')
|
|
|
xcom_pull_task_ids = context['params'].get('xcom_pull_task_ids')
|
|
|
-
|
|
|
ti = context.get('ti')
|
|
|
remote_file_path = ti.xcom_pull(task_ids=xcom_pull_task_ids)
|
|
|
if remote_file_path is not None and \
|
|
@@ -127,6 +126,39 @@ def copy_seqrun_manifest_file(**context):
|
|
|
raise
|
|
|
|
|
|
|
|
|
+def check_and_reset_manifest_file(**context):
|
|
|
+ """
|
|
|
+ A function for checking existing files and resetting the manifest json with new files
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ xcom_pull_task_ids = context['params'].get('xcom_pull_task_ids')
|
|
|
+ local_seqrun_path = context['params'].get('local_seqrun_path')
|
|
|
+ seqrun_id_pull_key = context['params'].get('seqrun_id_pull_key')
|
|
|
+ seqrun_id_pull_task_ids = context['params'].get('seqrun_id_pull_task_ids')
|
|
|
+ run_index_number = context['params'].get('run_index_number')
|
|
|
+ ti = context.get('ti')
|
|
|
+ json_path = ti.xcom_pull(task_ids=xcom_pull_task_ids)
|
|
|
+ if json_path is not None and \
|
|
|
+ not isinstance(json_path,str):
|
|
|
+ json_path = json_path.decode().strip('\n')
|
|
|
+ seqrun_id = \
|
|
|
+ ti.xcom_pull(key=seqrun_id_pull_key,task_ids=seqrun_id_pull_task_ids)[run_index_number]
|
|
|
+ compare_existing_seqrun_files(
|
|
|
+ json_path=json_path,
|
|
|
+ seqrun_id=seqrun_id,
|
|
|
+ seqrun_base_path=local_seqrun_path)
|
|
|
+ except Exception as e:
|
|
|
+ logging.error(e)
|
|
|
+ send_log_to_channels(
|
|
|
+ slack_conf=Variable.get('slack_conf'),
|
|
|
+ ms_teams_conf=Variable.get('ms_teams_conf'),
|
|
|
+ task_id=context['task'].task_id,
|
|
|
+ dag_id=context['task'].dag_id,
|
|
|
+ comment=e,
|
|
|
+ reaction='fail')
|
|
|
+ raise
|
|
|
+
|
|
|
+
|
|
|
def get_seqrun_chunks(**context):
|
|
|
"""
|
|
|
A function for setting file chunk size for seqrun files copy
|
|
@@ -295,6 +327,18 @@ with dag:
|
|
|
params={'xcom_pull_task_ids':'generate_seqrun_file_list_{0}'.format(i)},
|
|
|
python_callable=copy_seqrun_manifest_file)
|
|
|
## TASK
|
|
|
+ compare_existing_seqrun_files = \
|
|
|
+ PythonOperator(
|
|
|
+ task_id='compare_existing_seqrun_files_{0}'.format(i),
|
|
|
+ dag=dag,
|
|
|
+ queue='hpc_4G',
|
|
|
+ params={'xcom_pull_task_ids':'copy_seqrun_file_list_{0}'.format(i),
|
|
|
+ 'seqrun_id_pull_key':'ongoing_seqruns',
|
|
|
+ 'run_index_number':i,
|
|
|
+ 'seqrun_id_pull_task_ids':'generate_seqrun_list',
|
|
|
+ 'local_seqrun_path':Variable.get('hpc_seqrun_path')},
|
|
|
+ python_callable=check_and_reset_manifest_file) ## TODO
|
|
|
+ ## TASK
|
|
|
decide_copy_branch = \
|
|
|
BranchPythonOperator(
|
|
|
task_id='decide_copy_branch_{0}'.format(i),
|
|
@@ -331,7 +375,7 @@ with dag:
|
|
|
'local_seqrun_path':Variable.get('hpc_seqrun_path')},
|
|
|
python_callable=copy_seqrun_chunk)
|
|
|
copy_seqrun_files.append(copy_seqrun_chunk)
|
|
|
- generate_seqrun_list >> generate_seqrun_file_list >> copy_seqrun_file_list >> decide_copy_branch
|
|
|
+ generate_seqrun_list >> generate_seqrun_file_list >> copy_seqrun_file_list >> compare_existing_seqrun_files >> decide_copy_branch
|
|
|
decide_copy_branch >> no_copy_seqrun
|
|
|
decide_copy_branch >> copy_seqrun_files
|
|
|
|