Browse Source

added step for extracting run

Avik Datta 3 years ago
parent
commit
e45674f72a
1 changed files with 25 additions and 3 deletions
  1. 25 3
      dags/dag14_crick_seqrun_transfer.py

+ 25 - 3
dags/dag14_crick_seqrun_transfer.py

@@ -1,3 +1,4 @@
+import argparse
 from datetime import timedelta
 from airflow.models import DAG,Variable
 from airflow.utils.dates import days_ago
@@ -36,7 +37,7 @@ dag = \
 
 
 with dag:
-  # TASK
+  ## TASK
   check_and_transfer_run = \
     SSHOperator(
       task_id='check_and_transfer_run',
@@ -48,7 +49,6 @@ with dag:
       params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
               'seqrun_base_path': SEQRUN_BASE_PATH,
               'ftp_config_file': FTP_CONFIG_FILE},
-      env={'seqrun_id': '{{ dag_run.conf["seqrun_id"] if dag_run else "" }}'},
       command="""
         source /home/igf/igf_code/airflow/env.sh;
         python /home/igf/igf_code/airflow/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
@@ -57,5 +57,27 @@ with dag:
           -d {{ params.seqrun_base_path }} \
           -c {{ params.ftp_config_file }}
         """)
+  ## TASK
+  extract_tar_file = \
+    SSHOperator(
+      task_id='extract_tar_file',
+      dag=dag,
+      pool='orwell_exe_pool',
+      ssh_hook=orwell_ssh_hook,
+      do_xcom_push=False,
+      queue='hpc_4G',
+      params={'seqrun_base_path': SEQRUN_BASE_PATH},
+      command="""
+      cd {{ params.seqrun_base_path }};
+      if [ -d {{ dag_run.conf["seqrun_id"] }} ];
+      then
+        echo "Seqrun dir exists";
+        exit 1;
+      else
+        mkdir -p {{ dag_run.conf["seqrun_id"] }};
+        tar -xzf {{ dag_run.conf["seqrun_id"] }}.tar.gz -C {{ dag_run.conf["seqrun_id"] }}
+      fi
+      """
+    )
   ## PIPELINE
-  check_and_transfer_run
+  check_and_transfer_run >> extract_tar_file