123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- from datetime import timedelta
- from airflow.models import DAG,Variable
- from airflow.utils.dates import days_ago
- from airflow.contrib.operators.ssh_operator import SSHOperator
- from airflow.contrib.hooks.ssh_hook import SSHHook
- from airflow.operators.bash_operator import BashOperator
- from airflow.operators.python_operator import PythonOperator
- from airflow.contrib.operators.sftp_operator import SFTPOperator
- from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_transfer_run_func
- from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import extract_tar_file_func
- FTP_SEQRUN_SERVER = Variable.get('crick_ftp_seqrun_hostname')
- FTP_CONFIG_FILE = Variable.get('crick_ftp_config_file_wells')
- FTP_BASE_PATH = Variable.get('crick_ftp_base_path')
- SEQRUN_BASE_PATH = Variable.get('seqrun_base_path')
- HPC_SEQRUN_BASE_PATH = Variable.get('hpc_seqrun_path')
- args = {
- 'owner': 'airflow',
- 'start_date': days_ago(2),
- 'retries': 1,
- 'retry_delay': timedelta(minutes=5),
- 'provide_context': True,
- 'email_on_failure': False,
- 'email_on_retry': False,
- 'catchup': False,
- 'max_active_runs': 1,
- }
- ## SSH HOOK
- #orwell_ssh_hook = \
- # SSHHook(
- # key_file=Variable.get('hpc_ssh_key_file'),
- # username=Variable.get('hpc_user'),
- # remote_host=Variable.get('orwell_server_hostname'))
- dag = \
- DAG(
- dag_id='dag14_crick_seqrun_transfer',
- schedule_interval=None,
- default_args=args,
- tags=['ftp', 'hpc', 'orwell', 'wells'])
- with dag:
- ## TASK
- # not working on HPC
- check_and_transfer_run = \
- PythonOperator(
- task_id='check_and_transfer_run',
- dag=dag,
- pool='crick_ftp_pool',
- queue='hpc_4G',
- params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
- 'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH,
- 'ftp_config_file': FTP_CONFIG_FILE},
- python_callable=check_and_transfer_run_func)
- ## TASK
- extract_tar_file = \
- PythonOperator(
- task_id='extract_tar_file',
- dag=dag,
- queue='hpc_4G',
- python_callable=extract_tar_file_func)
- #check_and_transfer_run = \
- # BashOperator(
- # task_id='check_and_transfer_run',
- # dag=dag,
- # pool='crick_ftp_pool',
- # do_xcom_push=False,
- # queue='wells',
- # params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
- # 'seqrun_base_path': HPC_SEQRUN_BASE_PATH,
- # 'ftp_config_file': FTP_CONFIG_FILE},
- # bash_command="""
- # python /rds/general/user/igf/home/data2/airflow_test/github/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
- # -f {{ params.ftp_seqrun_server }} \
- # -s {{ dag_run.conf["seqrun_id"] }} \
- # -d {{ params.seqrun_base_path }} \
- # -c {{ params.ftp_config_file }}
- # """)
- #check_and_transfer_run = \
- # SFTPOperator(
- # task_id='check_and_transfer_run',
- # dag=dag,
- # pool='crick_ftp_pool',
- # queue='hpc_4G',
- # ssh_conn_id="crick_sftp_conn",
- # params={'seqrun_base_path': HPC_SEQRUN_BASE_PATH,
- # 'ftp_base_path': FTP_BASE_PATH},
- # local_filepath="{{ params.seqrun_base_path }}/{{ dag_run.conf['seqrun_id'] }}.tar.gz",
- # remote_filepath="{{ params.ftp_base_path }}/{{ dag_run.conf['seqrun_id'] }}.tar.gz",
- # operation='get'
- # )
- # TASK
- #extract_tar_file = \
- # BashOperator(
- # task_id='extract_tar_file',
- # dag=dag,
- # do_xcom_push=False,
- # queue='hpc_4G',
- # params={'seqrun_base_path': HPC_SEQRUN_BASE_PATH},
- # bash_command="""
- # cd {{ params.seqrun_base_path }};
- # if [ -d temp_{{ dag_run.conf["seqrun_id"] }} ];
- # then
- # echo "Seqrun dir exists";
- # exit 1;
- # else
- # mkdir -p temp_{{ dag_run.conf["seqrun_id"] }};
- # tar \
- # --no-same-owner \
- # --no-same-permissions \
- # --owner=igf \
- # -xzf {{ dag_run.conf["seqrun_id"] }}.tar.gz \
- # -C temp_{{ dag_run.conf["seqrun_id"] }};
- # find temp_{{ dag_run.conf["seqrun_id"] }} \
- # -type d \
- # -exec chmod 700 {} \;
- # chmod -R u+r temp_{{ dag_run.conf["seqrun_id"] }};
- # chmod -R u+w temp_{{ dag_run.conf["seqrun_id"] }};
- # fi
- # """
- # )
- ## TASK
- #move_seqrun_dir = \
- # BashOperator(
- # task_id='move_seqrun_dir',
- # dag=dag,
- # do_xcom_push=False,
- # queue='hpc_4G',
- # params={'seqrun_base_path': SEQRUN_BASE_PATH},
- # bash_command="""
- # cd {{ params.seqrun_base_path }};
- # if [ -d {{ dag_run.conf["seqrun_id"] }} ];
- # then
- # echo "Seqrun dir exists";
- # exit 1;
- # fi
- # ls temp_{{ dag_run.conf["seqrun_id"] }}/camp/stp/sequencing/inputs/instruments/sequencers/{{ dag_run.conf["seqrun_id"] }};
- # move temp_{{ dag_run.conf["seqrun_id"] }}/camp/stp/sequencing/inputs/instruments/sequencers/{{ dag_run.conf["seqrun_id"] }} {{ params.seqrun_base_path }}/{{ dag_run.conf["seqrun_id"] }};
- # """
- # )
- ## PIPELINE
- check_and_transfer_run >> extract_tar_file
|