|
@@ -1,8 +1,6 @@
|
|
|
from datetime import timedelta
|
|
|
-from airflow.models import DAG,Variable
|
|
|
+from airflow.models import DAG
|
|
|
from airflow.utils.dates import days_ago
|
|
|
-from airflow.contrib.operators.ssh_operator import SSHOperator
|
|
|
-from airflow.contrib.hooks.ssh_hook import SSHHook
|
|
|
from airflow.operators.python_operator import PythonOperator
|
|
|
from airflow.operators.python_operator import BranchPythonOperator
|
|
|
from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_transfer_run_func
|
|
@@ -11,6 +9,8 @@ from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import find_and_split_m
|
|
|
from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import validate_md5_chunk_func
|
|
|
from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_divide_run_for_remote_copy_func
|
|
|
from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import copy_run_file_to_remote_func
|
|
|
+from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import run_interop_dump_func
|
|
|
+from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import generate_interop_report_func
|
|
|
|
|
|
args = {
|
|
|
'owner': 'airflow',
|
|
@@ -104,3 +104,21 @@ with dag:
|
|
|
python_callable=copy_run_file_to_remote_func)
|
|
|
## PIPELINE
|
|
|
check_and_divide_run_for_remote_copy >> copy_additional_file_to_remote
|
|
|
+ ## TASK
|
|
|
+ generate_interop_dump = \
|
|
|
+ PythonOperator(
|
|
|
+ task_id='generate_interop_dump',
|
|
|
+ dag=dag,
|
|
|
+ queue='hpc_4G',
|
|
|
+ python_callable=run_interop_dump_func)
|
|
|
+ generate_interop_report = \
|
|
|
+ PythonOperator(
|
|
|
+ task_id='generate_interop_report',
|
|
|
+ dag=dag,
|
|
|
+ queue='hpc_4G',
|
|
|
+ params={'interop_dump_xcom_task': 'generate_interop_dump',
|
|
|
+ 'timeout': 1200,
|
|
|
+ 'kernel_name': 'python3'},
|
|
|
+ python_callable=generate_interop_report_func)
|
|
|
+ ## PIPELINE
|
|
|
+ extract_tar_file >> generate_interop_dump >> generate_interop_report
|