Browse Source

switching to python operatop for tra extraction

Avik Datta 3 years ago
parent
commit
cc4e7d2b6e
1 changed files with 56 additions and 56 deletions
  1. 56 56
      dags/dag14_crick_seqrun_transfer.py

+ 56 - 56
dags/dag14_crick_seqrun_transfer.py

@@ -7,7 +7,7 @@ from airflow.operators.bash_operator import BashOperator
 from airflow.operators.python_operator import PythonOperator
 from airflow.contrib.operators.sftp_operator import SFTPOperator
 from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_transfer_run_func
-#from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import extract_tar_file_func
+from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import extract_tar_file_func
 
 FTP_SEQRUN_SERVER = Variable.get('crick_ftp_seqrun_hostname')
 FTP_CONFIG_FILE = Variable.get('crick_ftp_config_file_wells')
@@ -56,13 +56,13 @@ with dag:
              'ftp_config_file': FTP_CONFIG_FILE},
       python_callable=check_and_transfer_run_func)
   ## TASK
-  #extract_tar_file = \
-  #  PythonOperator(
-  #    task_id='extract_tar_file',
-  #    dag=dag,
-  #    queue='hpc_4G',
-  #    params={'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH},
-  #    python_callable=extract_tar_file_func)
+  extract_tar_file = \
+    PythonOperator(
+      task_id='extract_tar_file',
+      dag=dag,
+      queue='hpc_4G',
+      params={'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH},
+      python_callable=extract_tar_file_func)
   #check_and_transfer_run = \
   #  BashOperator(
   #    task_id='check_and_transfer_run',
@@ -94,53 +94,53 @@ with dag:
   #    operation='get'
   #  )
   # TASK
-  extract_tar_file = \
-    BashOperator(
-      task_id='extract_tar_file',
-      dag=dag,
-      do_xcom_push=False,
-      queue='hpc_4G',
-      params={'seqrun_base_path': HPC_SEQRUN_BASE_PATH},
-      bash_command="""
-      cd {{ params.seqrun_base_path }};
-      if [ -d temp_{{ dag_run.conf["seqrun_id"] }} ];
-      then
-        echo "Seqrun dir exists";
-        exit 1;
-      else
-        mkdir -p temp_{{ dag_run.conf["seqrun_id"] }};
-        tar \
-          --no-same-owner \
-          --no-same-permissions \
-          --owner=igf \
-          -xzf {{ dag_run.conf["seqrun_id"] }}.tar.gz \
-          -C temp_{{ dag_run.conf["seqrun_id"] }};
-        find temp_{{ dag_run.conf["seqrun_id"] }} \
-          -type d \
-          -exec chmod 700 {} \;
-        chmod -R u+r temp_{{ dag_run.conf["seqrun_id"] }};
-        chmod -R u+w temp_{{ dag_run.conf["seqrun_id"] }};
-      fi
-      """
-    )
+  #extract_tar_file = \
+  #  BashOperator(
+  #    task_id='extract_tar_file',
+  #    dag=dag,
+  #    do_xcom_push=False,
+  #    queue='hpc_4G',
+  #    params={'seqrun_base_path': HPC_SEQRUN_BASE_PATH},
+  #    bash_command="""
+  #    cd {{ params.seqrun_base_path }};
+  #    if [ -d temp_{{ dag_run.conf["seqrun_id"] }} ];
+  #    then
+  #      echo "Seqrun dir exists";
+  #      exit 1;
+  #    else
+  #      mkdir -p temp_{{ dag_run.conf["seqrun_id"] }};
+  #      tar \
+  #        --no-same-owner \
+  #        --no-same-permissions \
+  #        --owner=igf \
+  #        -xzf {{ dag_run.conf["seqrun_id"] }}.tar.gz \
+  #        -C temp_{{ dag_run.conf["seqrun_id"] }};
+  #      find temp_{{ dag_run.conf["seqrun_id"] }} \
+  #        -type d \
+  #        -exec chmod 700 {} \;
+  #      chmod -R u+r temp_{{ dag_run.conf["seqrun_id"] }};
+  #      chmod -R u+w temp_{{ dag_run.conf["seqrun_id"] }};
+  #    fi
+  #    """
+  #  )
   ## TASK
-  move_seqrun_dir = \
-    BashOperator(
-      task_id='move_seqrun_dir',
-      dag=dag,
-      do_xcom_push=False,
-      queue='hpc_4G',
-      params={'seqrun_base_path': SEQRUN_BASE_PATH},
-      bash_command="""
-        cd {{ params.seqrun_base_path }};
-        if [ -d {{ dag_run.conf["seqrun_id"] }} ];
-        then
-          echo "Seqrun dir exists";
-          exit 1;
-        fi
-        ls temp_{{ dag_run.conf["seqrun_id"] }}/camp/stp/sequencing/inputs/instruments/sequencers/{{ dag_run.conf["seqrun_id"] }};
-        move temp_{{ dag_run.conf["seqrun_id"] }}/camp/stp/sequencing/inputs/instruments/sequencers/{{ dag_run.conf["seqrun_id"] }} {{ params.seqrun_base_path }}/{{ dag_run.conf["seqrun_id"] }};
-      """
-    )
+  #move_seqrun_dir = \
+  #  BashOperator(
+  #    task_id='move_seqrun_dir',
+  #    dag=dag,
+  #    do_xcom_push=False,
+  #    queue='hpc_4G',
+  #    params={'seqrun_base_path': SEQRUN_BASE_PATH},
+  #    bash_command="""
+  #      cd {{ params.seqrun_base_path }};
+  #      if [ -d {{ dag_run.conf["seqrun_id"] }} ];
+  #      then
+  #        echo "Seqrun dir exists";
+  #        exit 1;
+  #      fi
+  #      ls temp_{{ dag_run.conf["seqrun_id"] }}/camp/stp/sequencing/inputs/instruments/sequencers/{{ dag_run.conf["seqrun_id"] }};
+  #      move temp_{{ dag_run.conf["seqrun_id"] }}/camp/stp/sequencing/inputs/instruments/sequencers/{{ dag_run.conf["seqrun_id"] }} {{ params.seqrun_base_path }}/{{ dag_run.conf["seqrun_id"] }};
+  #    """
+  #  )
   ## PIPELINE
-  check_and_transfer_run >> extract_tar_file >> move_seqrun_dir
+  check_and_transfer_run >> extract_tar_file