Browse Source

switching to old shell scripts

Avik Datta 3 years ago
parent
commit
69b5bb635a
1 changed files with 53 additions and 52 deletions
  1. 53 52
      dags/dag14_crick_seqrun_transfer.py

+ 53 - 52
dags/dag14_crick_seqrun_transfer.py

@@ -42,68 +42,69 @@ dag = \
 
 with dag:
   ## TASK
-  check_and_transfer_run = \
-    PythonOperator(
-      task_id='check_and_transfer_run',
-      dag=dag,
-      pool='crick_ftp_pool',
-      queue='hpc_4G',
-      params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
-             'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH,
-             'ftp_config_file': FTP_CONFIG_FILE},
-      python_callable=check_and_transfer_run_func)
-  ## TASK
-  extract_tar_file = \
-    PythonOperator(
-      task_id='extract_tar_file',
-      dag=dag,
-      queue='hpc_4G',
-      params={'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH},
-      python_callable=extract_tar_file_func)
+  # not working on HPC
   #check_and_transfer_run = \
-  #  SSHOperator(
+  #  PythonOperator(
   #    task_id='check_and_transfer_run',
   #    dag=dag,
   #    pool='crick_ftp_pool',
-  #    ssh_hook=orwell_ssh_hook,
-  #    do_xcom_push=False,
   #    queue='hpc_4G',
   #    params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
-  #            'seqrun_base_path': SEQRUN_BASE_PATH,
-  #            'ftp_config_file': FTP_CONFIG_FILE},
-  #    command="""
-  #      source /home/igf/igf_code/airflow/env.sh;
-  #      python /home/igf/igf_code/airflow/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
-  #        -f {{ params.ftp_seqrun_server }} \
-  #        -s {{ dag_run.conf["seqrun_id"] }} \
-  #        -d {{ params.seqrun_base_path }} \
-  #        -c {{ params.ftp_config_file }}
-  #      """)
+  #           'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH,
+  #           'ftp_config_file': FTP_CONFIG_FILE},
+  #    python_callable=check_and_transfer_run_func)
   ## TASK
   #extract_tar_file = \
-  #  SSHOperator(
+  #  PythonOperator(
   #    task_id='extract_tar_file',
   #    dag=dag,
-  #    pool='orwell_exe_pool',
-  #    ssh_hook=orwell_ssh_hook,
-  #    do_xcom_push=False,
   #    queue='hpc_4G',
-  #    params={'seqrun_base_path': SEQRUN_BASE_PATH},
-  #    command="""
-  #    cd {{ params.seqrun_base_path }};
-  #    if [ -d {{ dag_run.conf["seqrun_id"] }} ];
-  #    then
-  #      echo "Seqrun dir exists";
-  #      exit 1;
-  #    else
-  #      mkdir -p {{ dag_run.conf["seqrun_id"] }};
-  #      tar \
-  #        --no-same-owner \
-  #        --no-same-permissions \
-  #        --owner=igf \
-  #        -xzf {{ dag_run.conf["seqrun_id"] }}.tar.gz -C {{ dag_run.conf["seqrun_id"] }}
-  #    fi
-  #    """
-  #  )
+  #    params={'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH},
+  #    python_callable=extract_tar_file_func)
+  check_and_transfer_run = \
+    SSHOperator(
+      task_id='check_and_transfer_run',
+      dag=dag,
+      pool='crick_ftp_pool',
+      ssh_hook=orwell_ssh_hook,
+      do_xcom_push=False,
+      queue='hpc_4G',
+      params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
+              'seqrun_base_path': SEQRUN_BASE_PATH,
+              'ftp_config_file': FTP_CONFIG_FILE},
+      command="""
+        source /home/igf/igf_code/airflow/env.sh;
+        python /home/igf/igf_code/airflow/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
+          -f {{ params.ftp_seqrun_server }} \
+          -s {{ dag_run.conf["seqrun_id"] }} \
+          -d {{ params.seqrun_base_path }} \
+          -c {{ params.ftp_config_file }}
+        """)
+  # TASK
+  extract_tar_file = \
+    SSHOperator(
+      task_id='extract_tar_file',
+      dag=dag,
+      pool='orwell_exe_pool',
+      ssh_hook=orwell_ssh_hook,
+      do_xcom_push=False,
+      queue='hpc_4G',
+      params={'seqrun_base_path': SEQRUN_BASE_PATH},
+      command="""
+      cd {{ params.seqrun_base_path }};
+      if [ -d {{ dag_run.conf["seqrun_id"] }} ];
+      then
+        echo "Seqrun dir exists";
+        exit 1;
+      else
+        mkdir -p temp_{{ dag_run.conf["seqrun_id"] }};
+        tar \
+          --no-same-owner \
+          --no-same-permissions \
+          --owner=igf \
+          -xzf {{ dag_run.conf["seqrun_id"] }}.tar.gz -C temp_{{ dag_run.conf["seqrun_id"] }}
+      fi
+      """
+    )
   ## PIPELINE
   check_and_transfer_run >> extract_tar_file