from datetime import timedelta import os,json,logging,subprocess from airflow.models import DAG,Variable from airflow.utils.dates import days_ago from airflow.operators.bash_operator import BashOperator from airflow.contrib.operators.ssh_operator import SSHOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.python_operator import BranchPythonOperator from airflow.contrib.hooks.ssh_hook import SSHHook from airflow.operators.dummy_operator import DummyOperator from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import get_ongoing_seqrun_list from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import copy_seqrun_manifest_file from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import reset_manifest_file from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import get_seqrun_chunks from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import copy_seqrun_chunk from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import run_interop_dump from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import generate_interop_report_func from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import check_progress_for_run_func from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import samplesheet_validation_and_branch_func from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import run_tile_demult_list_func ## DEFAULT ARGS default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(2), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), 'provide_context': True, } ## SSH HOOKS orwell_ssh_hook = \ SSHHook( key_file=Variable.get('hpc_ssh_key_file'), username=Variable.get('hpc_user'), remote_host=Variable.get('orwell_server_hostname')) ## DAG dag = \ DAG( dag_id='dag8_copy_ongoing_seqrun', catchup=False, schedule_interval="0 */2 * * *", max_active_runs=1, tags=['hpc'], default_args=default_args, orientation='LR') with dag: ## TASK generate_seqrun_list = \ BranchPythonOperator( task_id='generate_seqrun_list', dag=dag, queue='hpc_4G', python_callable=get_ongoing_seqrun_list) ## TASK no_ongoing_seqrun = \ DummyOperator( task_id='no_ongoing_seqrun', dag=dag, queue='hpc_4G', on_success_callback=log_sleep) ## TASK tasks = list() for i in range(5): generate_seqrun_file_list = \ SSHOperator( task_id='generate_seqrun_file_list_{0}'.format(i), dag=dag, pool='orwell_exe_pool', ssh_hook=orwell_ssh_hook, do_xcom_push=True, queue='hpc_4G', params={'source_task_id':'generate_seqrun_list', 'pull_key':'ongoing_seqruns', 'index_number':i}, command=""" source /home/igf/igf_code/airflow/env.sh; \ python /home/igf/igf_code/airflow/data-management-python/scripts/seqrun_processing/create_file_list_for_ongoing_seqrun.py \ --seqrun_base_dir /home/igf/seqrun/illumina \ --output_path /home/igf/ongoing_run_tracking \ --seqrun_id {{ ti.xcom_pull(key=params.pull_key,task_ids=params.source_task_id)[ params.index_number ] }} """) ## TASK copy_seqrun_file_list = \ PythonOperator( task_id='copy_seqrun_file_list_{0}'.format(i), dag=dag, pool='orwell_scp_pool', queue='hpc_4G', params={'xcom_pull_task_ids':'generate_seqrun_file_list_{0}'.format(i)}, python_callable=copy_seqrun_manifest_file) ## TASK compare_seqrun_files = \ PythonOperator( task_id='compare_seqrun_files_{0}'.format(i), dag=dag, queue='hpc_4G', params={'xcom_pull_task_ids':'copy_seqrun_file_list_{0}'.format(i), 'seqrun_id_pull_key':'ongoing_seqruns', 'run_index_number':i, 'seqrun_id_pull_task_ids':'generate_seqrun_list', 'local_seqrun_path':Variable.get('hpc_seqrun_path')}, python_callable=reset_manifest_file) ## TASK decide_copy_branch = \ BranchPythonOperator( task_id='decide_copy_branch_{0}'.format(i), dag=dag, queue='hpc_4G', params={'xcom_pull_task_ids':'copy_seqrun_file_list_{0}'.format(i), 'worker_size':10, 'seqrun_chunk_size_key':'seqrun_chunk_size', 'child_task_prefix':'copy_file_run_{0}_chunk'.format(i)}, python_callable=get_seqrun_chunks) ## TASK no_copy_seqrun = \ DummyOperator( task_id='copy_file_run_{0}_chunk_{1}'.format(i,'no_work'), dag=dag, queue='hpc_4G', on_success_callback=log_sleep) ## TASK copy_seqrun_files = list() for j in range(10): copy_file_chunk = \ PythonOperator( task_id='copy_file_run_{0}_chunk_{1}'.format(i,j), dag=dag, queue='hpc_4G', pool='orwell_scp_pool', params={'file_path_task_ids':'copy_seqrun_file_list_{0}'.format(i), 'seqrun_chunk_size_key':'seqrun_chunk_size', 'seqrun_chunk_size_task_ids':'decide_copy_branch_{0}'.format(i), 'run_index_number':i, 'chunk_index_number':j, 'seqrun_id_pull_key':'ongoing_seqruns', 'seqrun_id_pull_task_ids':'generate_seqrun_list', 'local_seqrun_path':Variable.get('hpc_seqrun_path')}, python_callable=copy_seqrun_chunk) copy_seqrun_files.append(copy_file_chunk) ## PIPELINE generate_seqrun_list >> generate_seqrun_file_list >> copy_seqrun_file_list >> compare_seqrun_files >> decide_copy_branch decide_copy_branch >> no_copy_seqrun decide_copy_branch >> copy_seqrun_files ## TASK wait_for_copy_chunk = \ DummyOperator( task_id='wait_for_copy_chunk_run_{0}'.format(i), dag=dag, trigger_rule='none_failed_or_skipped', queue='hpc_4G') ## PIPELINE copy_seqrun_files >> wait_for_copy_chunk ## TASK create_interop_dump = \ PythonOperator( task_id='create_interop_dump_run_{0}'.format(i), dag=dag, queue='hpc_4G', params={'run_index_number':i, 'seqrun_id_pull_key':'ongoing_seqruns', 'seqrun_id_pull_task_ids':'generate_seqrun_list'}, python_callable=run_interop_dump) ## PIPELINE wait_for_copy_chunk >> create_interop_dump ## TASK generate_interop_report = \ PythonOperator( task_id='generate_interop_report_run_{0}'.format(i), dag=dag, queue='hpc_4G', params={'run_index_number':i, 'seqrun_id_pull_key':'ongoing_seqruns', 'seqrun_id_pull_task_ids':'generate_seqrun_list', 'runInfo_xml_file_name':'RunInfo.xml', 'interop_dump_pull_task':'create_interop_dump_run_{0}'.format(i), 'timeout':1200, 'kernel_name':'python3', 'output_notebook_key':'interop_notebook'}, python_callable=generate_interop_report_func) ## PIPELINE create_interop_dump >> generate_interop_report ## TASK check_progress_for_run = \ BranchPythonOperator( task_id='check_progress_for_run_{0}'.format(i), dag=dag, queue='hpc_4G', params={'run_index_number':i, 'seqrun_id_pull_key':'ongoing_seqruns', 'seqrun_id_pull_task_ids':'generate_seqrun_list', 'samplesheet_validation_job_prefix':'samplesheet_validation', 'tile_demult_job_prefix':'tile_demultiplexing', 'no_job_prefix':'no_seqrun_checking', 'next_job_prefix':'samplesheet_validation', 'runParameters_xml_file_name':'runParameters.xml', 'samplesheet_file_name':'SampleSheet.csv', 'interop_dump_pull_task':'create_interop_dump_run_{0}'.format(i)}, python_callable=check_progress_for_run_func) ## PIPELINE create_interop_dump >> check_progress_for_run ## TASK no_seqrun_checking = \ DummyOperator( task_id='no_seqrun_checking_{0}'.format(i), dag=dag, queue='hpc_4G') ## PIPELINE check_progress_for_run >> no_seqrun_checking ## TASK samplesheet_validation_and_branch = \ BranchPythonOperator( task_id='samplesheet_validation_{0}'.format(i), dag=dag, queue='hpc_4G', params={'run_index_number':i, 'seqrun_id_pull_key':'ongoing_seqruns', 'seqrun_id_pull_task_ids':'generate_seqrun_list', 'samplesheet_file_name':'SampleSheet.csv', 'runParameters_xml_file_name':'runParameters.xml', 'no_job_prefix':'no_seqrun_checking', 'next_job_prefix':'tile_demultiplexing', 'next_job_range':[i for i in range(1,9)]}, python_callable=samplesheet_validation_and_branch_func) ## PIPELINE check_progress_for_run >> samplesheet_validation_and_branch ## TASK run_tile_demult_list = list() for j in range(1,9): run_tile_demult_per_lane = \ PythonOperator( task_id='tile_demultiplexing_{0}_{1}'.format(i,j), dag=dag, queue='hpc_4G', params={'run_index_number':i, 'lane_id':j, 'seqrun_id_pull_key':'ongoing_seqruns', 'seqrun_id_pull_task_ids':'generate_seqrun_list', 'samplesheet_file_name':'SampleSheet.csv', 'runinfo_xml_file_name':'RunInfo.xml', 'runParameters_xml_file_name':'runParameters.xml', 'tile_list':[1101,], 'threads':1}, python_callable=run_tile_demult_list_func) run_tile_demult_list.\ append(run_tile_demult_per_lane) ## PIPELINE samplesheet_validation_and_branch >> run_tile_demult_list samplesheet_validation_and_branch >> no_seqrun_checking ## PIPELINE generate_seqrun_list >> no_ongoing_seqrun