Sfoglia il codice sorgente

added tasks for remote file copy

Avik Datta 3 anni fa
parent
commit
2fcbf6dad5
1 ha cambiato i file con 38 aggiunte e 10 eliminazioni
  1. 38 10
      dags/dag14_crick_seqrun_transfer.py

+ 38 - 10
dags/dag14_crick_seqrun_transfer.py

@@ -9,6 +9,8 @@ from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_transf
 from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import extract_tar_file_func
 from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import find_and_split_md5_func
 from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import validate_md5_chunk_func
+from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import check_and_divide_run_for_remote_copy_func
+from igf_airflow.utils.dag14_crick_seqrun_transfer_utils import copy_run_file_to_remote_func
 
 args = {
     'owner': 'airflow',
@@ -22,24 +24,16 @@ args = {
     'max_active_runs': 5,
 }
 
-## SSH HOOK
-orwell_ssh_hook = \
-  SSHHook(
-    key_file=Variable.get('hpc_ssh_key_file'),
-    username=Variable.get('hpc_user'),
-    remote_host=Variable.get('orwell_server_hostname'))
-
 dag = \
   DAG(
     dag_id='dag14_crick_seqrun_transfer',
     schedule_interval=None,
     default_args=args,
-    tags=['ftp', 'hpc', 'orwell', 'wells'])
-
+    orientation='LR',
+    tags=['ftp', 'hpc'])
 
 with dag:
   ## TASK
-  # not working on HPC
   check_and_transfer_run = \
     PythonOperator(
       task_id='check_and_transfer_run',
@@ -76,3 +70,37 @@ with dag:
         python_callable=validate_md5_chunk_func)
     ## PIPELINE
     find_and_split_md5 >> t
+  ## TASK
+  check_and_divide_run_for_remote_copy = \
+    PythonOperator(
+      task_id='check_and_divide_run_for_remote_copy',
+      dag=dag,
+      queue='hpc_4G',
+      python_callable=check_and_divide_run_for_remote_copy_func)
+  ## PIPELINE
+  extract_tar_file >> check_and_divide_run_for_remote_copy
+  ## TASK
+  for i in range(1, 9):
+    t = \
+      PythonOperator(
+        task_id='copy_bcl_to_remote_for_lane{0}'.format(i),
+        dag=dag,
+        pool='orwell_scp_pool',
+        queue='hpc_4G',
+        params={'lane_id': i,
+                'xcom_task': 'check_and_divide_run_for_remote_copy',
+                'xcom_key': 'bcl_files'},
+        python_callable=copy_run_file_to_remote_func)
+    ## PIPELINE
+    check_and_divide_run_for_remote_copy >> t
+  copy_additional_file_to_remote = \
+    PythonOperator(
+      task_id='copy_additional_file_to_remote',
+      dag=dag,
+      pool='orwell_scp_pool',
+      queue='hpc_4G',
+      params={'xcom_task': 'check_and_divide_run_for_remote_copy',
+              'xcom_key': 'additional_files'},
+      python_callable=copy_run_file_to_remote_func)
+  ## PIPELINE
+  check_and_divide_run_for_remote_copy >> copy_additional_file_to_remote