Parcourir la source

checking md5 in chunks

Avik Datta il y a 3 ans
Parent
commit
e554ff6021
1 fichiers modifiés avec 29 ajouts et 97 suppressions
  1. 29 97
      dags/dag14_crick_seqrun_transfer.py

+ 29 - 97
dags/dag14_crick_seqrun_transfer.py

@@ -3,36 +3,31 @@ from airflow.models import DAG,Variable
 from airflow.utils.dates import days_ago
 from airflow.contrib.operators.ssh_operator import SSHOperator
 from airflow.contrib.hooks.ssh_hook import SSHHook
-from airflow.operators.bash_operator import BashOperator
 from airflow.operators.python_operator import PythonOperator
-from airflow.contrib.operators.sftp_operator import SFTPOperator
+from airflow.operators.python_operator import BranchPythonOperator
 from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_transfer_run_func
 from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import extract_tar_file_func
-
-FTP_SEQRUN_SERVER = Variable.get('crick_ftp_seqrun_hostname')
-FTP_CONFIG_FILE = Variable.get('crick_ftp_config_file_wells')
-FTP_BASE_PATH = Variable.get('crick_ftp_base_path')
-SEQRUN_BASE_PATH = Variable.get('seqrun_base_path')
-HPC_SEQRUN_BASE_PATH = Variable.get('hpc_seqrun_path')
+from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import find_and_split_md5_func
+from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import validate_md5_chunk_func
 
 args = {
     'owner': 'airflow',
     'start_date': days_ago(2),
-    'retries': 1,
+    'retries': 4,
     'retry_delay': timedelta(minutes=5),
     'provide_context': True,
     'email_on_failure': False,
     'email_on_retry': False,
     'catchup': False,
-    'max_active_runs': 1,
+    'max_active_runs': 5,
 }
 
 ## SSH HOOK
-#orwell_ssh_hook = \
-#  SSHHook(
-#    key_file=Variable.get('hpc_ssh_key_file'),
-#    username=Variable.get('hpc_user'),
-#    remote_host=Variable.get('orwell_server_hostname'))
+orwell_ssh_hook = \
+  SSHHook(
+    key_file=Variable.get('hpc_ssh_key_file'),
+    username=Variable.get('hpc_user'),
+    remote_host=Variable.get('orwell_server_hostname'))
 
 dag = \
   DAG(
@@ -51,9 +46,6 @@ with dag:
       dag=dag,
       pool='crick_ftp_pool',
       queue='hpc_4G',
-      params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
-             'hpc_seqrun_base_path': HPC_SEQRUN_BASE_PATH,
-             'ftp_config_file': FTP_CONFIG_FILE},
       python_callable=check_and_transfer_run_func)
   ## TASK
   extract_tar_file = \
@@ -62,84 +54,24 @@ with dag:
       dag=dag,
       queue='hpc_4G',
       python_callable=extract_tar_file_func)
-  #check_and_transfer_run = \
-  #  BashOperator(
-  #    task_id='check_and_transfer_run',
-  #    dag=dag,
-  #    pool='crick_ftp_pool',
-  #    do_xcom_push=False,
-  #    queue='wells',
-  #    params={'ftp_seqrun_server': FTP_SEQRUN_SERVER,
-  #            'seqrun_base_path': HPC_SEQRUN_BASE_PATH,
-  #            'ftp_config_file': FTP_CONFIG_FILE},
-  #    bash_command="""
-  #      python /rds/general/user/igf/home/data2/airflow_test/github/data-management-python/scripts/ftp_seqrun_transfer/transfer_seqrun_from_crick.py \
-  #        -f {{ params.ftp_seqrun_server }} \
-  #        -s {{ dag_run.conf["seqrun_id"] }} \
-  #        -d {{ params.seqrun_base_path }} \
-  #        -c {{ params.ftp_config_file }}
-  #      """)
-  #check_and_transfer_run = \
-  #  SFTPOperator(
-  #    task_id='check_and_transfer_run',
-  #    dag=dag,
-  #    pool='crick_ftp_pool',
-  #    queue='hpc_4G',
-  #    ssh_conn_id="crick_sftp_conn",
-  #    params={'seqrun_base_path': HPC_SEQRUN_BASE_PATH,
-  #            'ftp_base_path': FTP_BASE_PATH},
-  #    local_filepath="{{ params.seqrun_base_path }}/{{ dag_run.conf['seqrun_id'] }}.tar.gz",
-  #    remote_filepath="{{ params.ftp_base_path }}/{{ dag_run.conf['seqrun_id'] }}.tar.gz",
-  #    operation='get'
-  #  )
-  # TASK
-  #extract_tar_file = \
-  #  BashOperator(
-  #    task_id='extract_tar_file',
-  #    dag=dag,
-  #    do_xcom_push=False,
-  #    queue='hpc_4G',
-  #    params={'seqrun_base_path': HPC_SEQRUN_BASE_PATH},
-  #    bash_command="""
-  #    cd {{ params.seqrun_base_path }};
-  #    if [ -d temp_{{ dag_run.conf["seqrun_id"] }} ];
-  #    then
-  #      echo "Seqrun dir exists";
-  #      exit 1;
-  #    else
-  #      mkdir -p temp_{{ dag_run.conf["seqrun_id"] }};
-  #      tar \
-  #        --no-same-owner \
-  #        --no-same-permissions \
-  #        --owner=igf \
-  #        -xzf {{ dag_run.conf["seqrun_id"] }}.tar.gz \
-  #        -C temp_{{ dag_run.conf["seqrun_id"] }};
-  #      find temp_{{ dag_run.conf["seqrun_id"] }} \
-  #        -type d \
-  #        -exec chmod 700 {} \;
-  #      chmod -R u+r temp_{{ dag_run.conf["seqrun_id"] }};
-  #      chmod -R u+w temp_{{ dag_run.conf["seqrun_id"] }};
-  #    fi
-  #    """
-  #  )
   ## TASK
-  #move_seqrun_dir = \
-  #  BashOperator(
-  #    task_id='move_seqrun_dir',
-  #    dag=dag,
-  #    do_xcom_push=False,
-  #    queue='hpc_4G',
-  #    params={'seqrun_base_path': SEQRUN_BASE_PATH},
-  #    bash_command="""
-  #      cd {{ params.seqrun_base_path }};
-  #      if [ -d {{ dag_run.conf["seqrun_id"] }} ];
-  #      then
-  #        echo "Seqrun dir exists";
-  #        exit 1;
-  #      fi
-  #      ls temp_{{ dag_run.conf["seqrun_id"] }}/camp/stp/sequencing/inputs/instruments/sequencers/{{ dag_run.conf["seqrun_id"] }};
-  #      move temp_{{ dag_run.conf["seqrun_id"] }}/camp/stp/sequencing/inputs/instruments/sequencers/{{ dag_run.conf["seqrun_id"] }} {{ params.seqrun_base_path }}/{{ dag_run.conf["seqrun_id"] }};
-  #    """
-  #  )
+  find_and_split_md5 = \
+    BranchPythonOperator(
+      task_id='find_and_split_md5',
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=find_and_split_md5_func)
   ## PIPELINE
-  check_and_transfer_run >> extract_tar_file
+  check_and_transfer_run >> extract_tar_file >> find_and_split_md5
+  for chunk_id in range(0, 21):
+    t = \
+      PythonOperator(
+        task_id='md5_validate_chunk_{0}'.format(i),
+        dag=dag,
+        queue='hpc_4G',
+        params={'chunk_id': chunk_id,
+                'xcom_task': 'find_and_split_md5',
+                'xcom_key': 'md5_file_chunk'},
+        python_callable=validate_md5_chunk_func)
+    ## PIPELINE
+    find_and_split_md5 >> t