|
- from datetime import timedelta
- import os,json,logging,subprocess
- from airflow.models import DAG,Variable
- from airflow.utils.dates import days_ago
- from airflow.operators.bash_operator import BashOperator
- from airflow.contrib.operators.ssh_operator import SSHOperator
- from airflow.operators.python_operator import PythonOperator
- from airflow.operators.python_operator import BranchPythonOperator
- from airflow.contrib.hooks.ssh_hook import SSHHook
- from airflow.operators.dummy_operator import DummyOperator
- from igf_airflow.logging.upload_log_msg import send_log_to_channels,log_success,log_failure,log_sleep
- from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import get_ongoing_seqrun_list
- from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import copy_seqrun_manifest_file
- from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import reset_manifest_file
- from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import get_seqrun_chunks
- from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import copy_seqrun_chunk
- from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import run_interop_dump
- from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import generate_interop_report_func
- from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import check_progress_for_run_func
- from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import samplesheet_validation_and_branch_func
- from igf_airflow.utils.dag8_copy_ongoing_seqrun_utils import run_tile_demult_list_func
- ## DEFAULT ARGS
- default_args = {
- 'owner': 'airflow',
- 'depends_on_past': False,
- 'start_date': days_ago(2),
- 'email_on_failure': False,
- 'email_on_retry': False,
- 'retries': 1,
- 'retry_delay': timedelta(minutes=5),
- 'provide_context': True,
- }
- ## SSH HOOKS
- orwell_ssh_hook = \
- SSHHook(
- key_file=Variable.get('hpc_ssh_key_file'),
- username=Variable.get('hpc_user'),
- remote_host=Variable.get('orwell_server_hostname'))
- ## DAG
- dag = \
- DAG(
- dag_id='dag8_copy_ongoing_seqrun',
- catchup=False,
- schedule_interval="0 */2 * * *",
- max_active_runs=1,
- tags=['hpc'],
- default_args=default_args,
- orientation='LR')
- with dag:
- ## TASK
- generate_seqrun_list = \
- BranchPythonOperator(
- task_id='generate_seqrun_list',
- dag=dag,
- queue='hpc_4G',
- python_callable=get_ongoing_seqrun_list)
- ## TASK
- no_ongoing_seqrun = \
- DummyOperator(
- task_id='no_ongoing_seqrun',
- dag=dag,
- queue='hpc_4G',
- on_success_callback=log_sleep)
- ## TASK
- tasks = list()
- for i in range(5):
- generate_seqrun_file_list = \
- SSHOperator(
- task_id='generate_seqrun_file_list_{0}'.format(i),
- dag=dag,
- pool='orwell_exe_pool',
- ssh_hook=orwell_ssh_hook,
- do_xcom_push=True,
- queue='hpc_4G',
- params={'source_task_id':'generate_seqrun_list',
- 'pull_key':'ongoing_seqruns',
- 'index_number':i},
- command="""
- source /home/igf/igf_code/airflow/env.sh; \
- python /home/igf/igf_code/airflow/data-management-python/scripts/seqrun_processing/create_file_list_for_ongoing_seqrun.py \
- --seqrun_base_dir /home/igf/seqrun/illumina \
- --output_path /home/igf/ongoing_run_tracking \
- --seqrun_id {{ ti.xcom_pull(key=params.pull_key,task_ids=params.source_task_id)[ params.index_number ] }}
- """)
- ## TASK
- copy_seqrun_file_list = \
- PythonOperator(
- task_id='copy_seqrun_file_list_{0}'.format(i),
- dag=dag,
- pool='orwell_scp_pool',
- queue='hpc_4G',
- params={'xcom_pull_task_ids':'generate_seqrun_file_list_{0}'.format(i)},
- python_callable=copy_seqrun_manifest_file)
- ## TASK
- compare_seqrun_files = \
- PythonOperator(
- task_id='compare_seqrun_files_{0}'.format(i),
- dag=dag,
- queue='hpc_4G',
- params={'xcom_pull_task_ids':'copy_seqrun_file_list_{0}'.format(i),
- 'seqrun_id_pull_key':'ongoing_seqruns',
- 'run_index_number':i,
- 'seqrun_id_pull_task_ids':'generate_seqrun_list',
- 'local_seqrun_path':Variable.get('hpc_seqrun_path')},
- python_callable=reset_manifest_file)
- ## TASK
- decide_copy_branch = \
- BranchPythonOperator(
- task_id='decide_copy_branch_{0}'.format(i),
- dag=dag,
- queue='hpc_4G',
- params={'xcom_pull_task_ids':'copy_seqrun_file_list_{0}'.format(i),
- 'worker_size':10,
- 'seqrun_chunk_size_key':'seqrun_chunk_size',
- 'child_task_prefix':'copy_file_run_{0}_chunk'.format(i)},
- python_callable=get_seqrun_chunks)
- ## TASK
- no_copy_seqrun = \
- DummyOperator(
- task_id='copy_file_run_{0}_chunk_{1}'.format(i,'no_work'),
- dag=dag,
- queue='hpc_4G',
- on_success_callback=log_sleep)
- ## TASK
- copy_seqrun_files = list()
- for j in range(10):
- copy_file_chunk = \
- PythonOperator(
- task_id='copy_file_run_{0}_chunk_{1}'.format(i,j),
- dag=dag,
- queue='hpc_4G',
- pool='orwell_scp_pool',
- params={'file_path_task_ids':'copy_seqrun_file_list_{0}'.format(i),
- 'seqrun_chunk_size_key':'seqrun_chunk_size',
- 'seqrun_chunk_size_task_ids':'decide_copy_branch_{0}'.format(i),
- 'run_index_number':i,
- 'chunk_index_number':j,
- 'seqrun_id_pull_key':'ongoing_seqruns',
- 'seqrun_id_pull_task_ids':'generate_seqrun_list',
- 'local_seqrun_path':Variable.get('hpc_seqrun_path')},
- python_callable=copy_seqrun_chunk)
- copy_seqrun_files.append(copy_file_chunk)
- ## PIPELINE
- generate_seqrun_list >> generate_seqrun_file_list >> copy_seqrun_file_list >> compare_seqrun_files >> decide_copy_branch
- decide_copy_branch >> no_copy_seqrun
- decide_copy_branch >> copy_seqrun_files
- ## TASK
- wait_for_copy_chunk = \
- DummyOperator(
- task_id='wait_for_copy_chunk_run_{0}'.format(i),
- dag=dag,
- trigger_rule='none_failed_or_skipped',
- queue='hpc_4G')
- ## PIPELINE
- copy_seqrun_files >> wait_for_copy_chunk
- ## TASK
- create_interop_dump = \
- PythonOperator(
- task_id='create_interop_dump_run_{0}'.format(i),
- dag=dag,
- queue='hpc_4G',
- params={'run_index_number':i,
- 'seqrun_id_pull_key':'ongoing_seqruns',
- 'seqrun_id_pull_task_ids':'generate_seqrun_list'},
- python_callable=run_interop_dump)
- ## PIPELINE
- wait_for_copy_chunk >> create_interop_dump
- ## TASK
- generate_interop_report = \
- PythonOperator(
- task_id='generate_interop_report_run_{0}'.format(i),
- dag=dag,
- queue='hpc_4G',
- params={'run_index_number':i,
- 'seqrun_id_pull_key':'ongoing_seqruns',
- 'seqrun_id_pull_task_ids':'generate_seqrun_list',
- 'runInfo_xml_file_name':'RunInfo.xml',
- 'interop_dump_pull_task':'create_interop_dump_run_{0}'.format(i),
- 'timeout':1200,
- 'kernel_name':'python3',
- 'output_notebook_key':'interop_notebook'},
- python_callable=generate_interop_report_func)
- ## PIPELINE
- create_interop_dump >> generate_interop_report
- ## TASK
- check_progress_for_run = \
- BranchPythonOperator(
- task_id='check_progress_for_run_{0}'.format(i),
- dag=dag,
- queue='hpc_4G',
- params={'run_index_number':i,
- 'seqrun_id_pull_key':'ongoing_seqruns',
- 'seqrun_id_pull_task_ids':'generate_seqrun_list',
- 'samplesheet_validation_job_prefix':'samplesheet_validation',
- 'tile_demult_job_prefix':'tile_demultiplexing',
- 'no_job_prefix':'no_seqrun_checking',
- 'next_job_prefix':'samplesheet_validation',
- 'runParameters_xml_file_name':'runParameters.xml',
- 'samplesheet_file_name':'SampleSheet.csv',
- 'interop_dump_pull_task':'create_interop_dump_run_{0}'.format(i)},
- python_callable=check_progress_for_run_func)
- ## PIPELINE
- create_interop_dump >> check_progress_for_run
- ## TASK
- no_seqrun_checking = \
- DummyOperator(
- task_id='no_seqrun_checking_{0}'.format(i),
- dag=dag,
- queue='hpc_4G')
- ## PIPELINE
- check_progress_for_run >> no_seqrun_checking
- ## TASK
- samplesheet_validation_and_branch = \
- BranchPythonOperator(
- task_id='samplesheet_validation_{0}'.format(i),
- dag=dag,
- queue='hpc_4G',
- params={'run_index_number':i,
- 'seqrun_id_pull_key':'ongoing_seqruns',
- 'seqrun_id_pull_task_ids':'generate_seqrun_list',
- 'samplesheet_file_name':'SampleSheet.csv',
- 'runParameters_xml_file_name':'runParameters.xml',
- 'no_job_prefix':'no_seqrun_checking',
- 'next_job_prefix':'tile_demultiplexing',
- 'next_job_range':[i for i in range(1,9)]},
- python_callable=samplesheet_validation_and_branch_func)
- ## PIPELINE
- check_progress_for_run >> samplesheet_validation_and_branch
- ## TASK
- run_tile_demult_list = list()
- for j in range(1,9):
- run_tile_demult_per_lane = \
- PythonOperator(
- task_id='tile_demultiplexing_{0}_{1}'.format(i,j),
- dag=dag,
- queue='hpc_4G',
- params={'run_index_number':i,
- 'lane_id':j,
- 'seqrun_id_pull_key':'ongoing_seqruns',
- 'seqrun_id_pull_task_ids':'generate_seqrun_list',
- 'samplesheet_file_name':'SampleSheet.csv',
- 'runinfo_xml_file_name':'RunInfo.xml',
- 'runParameters_xml_file_name':'runParameters.xml',
- 'tile_list':[1101,],
- 'threads':1},
- python_callable=run_tile_demult_list_func)
- run_tile_demult_list.\
- append(run_tile_demult_per_lane)
- ## PIPELINE
- samplesheet_validation_and_branch >> run_tile_demult_list
- samplesheet_validation_and_branch >> no_seqrun_checking
- ## PIPELINE
- generate_seqrun_list >> no_ongoing_seqrun
|