|
@@ -6,7 +6,7 @@ from airflow.contrib.hooks.ssh_hook import SSHHook
|
|
|
from airflow.operators.bash_operator import BashOperator
|
|
|
from airflow.operators.python_operator import PythonOperator
|
|
|
from airflow.contrib.operators.sftp_operator import SFTPOperator
|
|
|
-#from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_transfer_run_func
|
|
|
+from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_transfer_run_func
|
|
|
#from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import extract_tar_file_func
|
|
|
|
|
|
FTP_SEQRUN_SERVER = Variable.get('crick_ftp_seqrun_hostname')
|
|
@@ -45,16 +45,16 @@ dag = \
|
|
|
with dag:
|
|
|
## TASK
|
|
|
# not working on HPC
|
|
|
- #check_and_transfer_run = \
|
|
|
- # PythonOperator(
|
|
|
- # task_id='check_and_transfer_run',
|
|
|
- # dag=dag,
|
|
|
- # pool='crick_ftp_pool',
|
|
|
- # queue='hpc_4G',
|
|
|
- # params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
|
|
|
- # 'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH,
|
|
|
- # 'ftp_config_file': FTP_CONFIG_FILE},
|
|
|
- # python_callable=check_and_transfer_run_func)
|
|
|
+ check_and_transfer_run = \
|
|
|
+ PythonOperator(
|
|
|
+ task_id='check_and_transfer_run',
|
|
|
+ dag=dag,
|
|
|
+ pool='crick_ftp_pool',
|
|
|
+ queue='hpc_4G',
|
|
|
+ params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
|
|
|
+ 'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH,
|
|
|
+ 'ftp_config_file': FTP_CONFIG_FILE},
|
|
|
+ python_callable=check_and_transfer_run_func)
|
|
|
## TASK
|
|
|
#extract_tar_file = \
|
|
|
# PythonOperator(
|
|
@@ -80,19 +80,19 @@ with dag:
|
|
|
# -d {{ params.seqrun_base_path }} \
|
|
|
# -c {{ params.ftp_config_file }}
|
|
|
# """)
|
|
|
- check_and_transfer_run = \
|
|
|
- SFTPOperator(
|
|
|
- task_id='check_and_transfer_run',
|
|
|
- dag=dag,
|
|
|
- pool='crick_ftp_pool',
|
|
|
- queue='hpc_4G',
|
|
|
- ssh_conn_id="crick_sftp_conn",
|
|
|
- params={'seqrun_base_path': HPC_SEQRUN_BASE_PATH,
|
|
|
- 'ftp_base_path': FTP_BASE_PATH},
|
|
|
- local_filepath="{{ params.seqrun_base_path }}/{{ dag_run.conf['seqrun_id'] }}.tar.gz",
|
|
|
- remote_filepath="{{ params.ftp_base_path }}/{{ dag_run.conf['seqrun_id'] }}.tar.gz",
|
|
|
- operation='get'
|
|
|
- )
|
|
|
+ #check_and_transfer_run = \
|
|
|
+ # SFTPOperator(
|
|
|
+ # task_id='check_and_transfer_run',
|
|
|
+ # dag=dag,
|
|
|
+ # pool='crick_ftp_pool',
|
|
|
+ # queue='hpc_4G',
|
|
|
+ # ssh_conn_id="crick_sftp_conn",
|
|
|
+ # params={'seqrun_base_path': HPC_SEQRUN_BASE_PATH,
|
|
|
+ # 'ftp_base_path': FTP_BASE_PATH},
|
|
|
+ # local_filepath="{{ params.seqrun_base_path }}/{{ dag_run.conf['seqrun_id'] }}.tar.gz",
|
|
|
+ # remote_filepath="{{ params.ftp_base_path }}/{{ dag_run.conf['seqrun_id'] }}.tar.gz",
|
|
|
+ # operation='get'
|
|
|
+ # )
|
|
|
# TASK
|
|
|
extract_tar_file = \
|
|
|
BashOperator(
|