Browse Source

dag14 to use bash

Avik Datta 3 years ago
parent
commit
dee6f5bc47
1 changed files with 12 additions and 14 deletions
  1. 12 14
      dags/dag14_crick_seqrun_transfer.py

+ 12 - 14
dags/dag14_crick_seqrun_transfer.py

@@ -3,6 +3,7 @@ from airflow.models import DAG,Variable
 from airflow.utils.dates import days_ago
 from airflow.contrib.operators.ssh_operator import SSHOperator
 from airflow.contrib.hooks.ssh_hook import SSHHook
+from airflow.operators.bash_operator import BashOperator
 from airflow.operators.python_operator import PythonOperator
 #from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_transfer_run_func
 #from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import extract_tar_file_func
@@ -25,11 +26,11 @@ args = {
 }
 
 ## SSH HOOK
-orwell_ssh_hook = \
-  SSHHook(
-    key_file=Variable.get('hpc_ssh_key_file'),
-    username=Variable.get('hpc_user'),
-    remote_host=Variable.get('orwell_server_hostname'))
+#orwell_ssh_hook = \
+#  SSHHook(
+#    key_file=Variable.get('hpc_ssh_key_file'),
+#    username=Variable.get('hpc_user'),
+#    remote_host=Variable.get('orwell_server_hostname'))
 
 dag = \
   DAG(
@@ -61,17 +62,16 @@ with dag:
   #    params={'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH},
   #    python_callable=extract_tar_file_func)
   check_and_transfer_run = \
-    SSHOperator(
+    BashOperator(
       task_id='check_and_transfer_run',
       dag=dag,
       pool='crick_ftp_pool',
-      ssh_hook=orwell_ssh_hook,
       do_xcom_push=False,
       queue='wells',
       params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
               'seqrun_base_path': HPC_SEQRUN_BASE_PATH,
               'ftp_config_file': FTP_CONFIG_FILE},
-      command="""
+      bash_command="""
         python /rds/general/user/igf/home/data2/airflow_test/github/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
           -f {{ params.ftp_seqrun_server }} \
           -s {{ dag_run.conf["seqrun_id"] }} \
@@ -80,13 +80,13 @@ with dag:
         """)
   # TASK
   extract_tar_file = \
-    SSHOperator(
+    BashOperator(
       task_id='extract_tar_file',
       dag=dag,
       do_xcom_push=False,
       queue='wells',
       params={'seqrun_base_path': HPC_SEQRUN_BASE_PATH},
-      command="""
+      bash_command="""
       cd {{ params.seqrun_base_path }};
       if [ -d temp_{{ dag_run.conf["seqrun_id"] }} ];
       then
@@ -110,15 +110,13 @@ with dag:
     )
   ## TASK
   move_seqrun_dir = \
-    SSHOperator(
+    BashOperator(
       task_id='move_seqrun_dir',
       dag=dag,
-      pool='orwell_exe_pool',
-      ssh_hook=orwell_ssh_hook,
       do_xcom_push=False,
       queue='hpc_4G',
       params={'seqrun_base_path': SEQRUN_BASE_PATH},
-      command="""
+      bash_command="""
         cd {{ params.seqrun_base_path }};
         if [ -d {{ dag_run.conf["seqrun_id"] }} ];
         then