Browse Source

allowing empty json for for seqrun

Avik Datta 4 years ago
parent
commit
5a5bea27a4
1 changed files with 29 additions and 21 deletions
  1. 29 21
      dags/dag8_copy_ongoing_seqrun.py

+ 29 - 21
dags/dag8_copy_ongoing_seqrun.py

@@ -150,19 +150,19 @@ def get_seqrun_chunks(**context):
               'Incorrect worker size: {0}'.\
                 format(worker_size))
     if len(file_data) == 0:
-      raise ValueError(
-              'No data present in seqrun list file {0}'.\
-                format(file_path))
-    if len(file_data) < int(5 * worker_size):
-      worker_size = 1                                                           # setting worker size to 1 for low input
-    if len(file_data) % worker_size == 0:
-      chunk_size = int(len(file_data) / worker_size)
+      worker_branchs = \
+        '{0}_{1}'.format(child_task_prefix,'no_work')
     else:
-      chunk_size = int(len(file_data) / worker_size)+1
-    ti.xcom_push(key=seqrun_chunk_size_key,value=chunk_size)
-    worker_branchs = \
-      ['{0}_{1}'.format(child_task_prefix,i) 
-         for i in range(worker_size)]
+      if len(file_data) < int(5 * worker_size):
+        worker_size = 1                                                           # setting worker size to 1 for low input
+      if len(file_data) % worker_size == 0:
+        chunk_size = int(len(file_data) / worker_size)
+      else:
+        chunk_size = int(len(file_data) / worker_size)+1
+      ti.xcom_push(key=seqrun_chunk_size_key,value=chunk_size)
+      worker_branchs = \
+        ['{0}_{1}'.format(child_task_prefix,i) 
+           for i in range(worker_size)]
     return worker_branchs
   except Exception as e:
     logging.error(e)
@@ -267,7 +267,7 @@ with dag:
   ## TASK
   tasks = list()
   for i in range(5):
-    t1 = \
+    generate_seqrun_file_list = \
       SSHOperator(
         task_id='generate_seqrun_file_list_{0}'.format(i),
         dag=dag,
@@ -286,7 +286,7 @@ with dag:
             --seqrun_id {{ ti.xcom_pull(key=params.pull_key,task_ids=params.source_task_id)[ params.index_number ] }}
           """)
     ## TASK
-    t2 = \
+    copy_seqrun_file_list = \
       PythonOperator(
         task_id='copy_seqrun_file_list_{0}'.format(i),
         dag=dag,
@@ -295,7 +295,7 @@ with dag:
         params={'xcom_pull_task_ids':'generate_seqrun_file_list_{0}'.format(i)},
         python_callable=copy_seqrun_manifest_file)
     ## TASK
-    t3 = \
+    decide_copy_branch = \
       BranchPythonOperator(
         task_id='decide_copy_branch_{0}'.format(i),
         dag=dag,
@@ -306,9 +306,16 @@ with dag:
                 'child_task_prefix':'copy_file_run_{0}_chunk'.format(i)},
         python_callable=get_seqrun_chunks)
     ## TASK
-    t4 = list()
+    no_copy_seqrun = \
+      DummyOperator(
+        task_id='copy_file_run_{0}_chunk_{1}'.format(i,'no_work'),
+        dag=dag,
+        queue='hpc_4G',
+        on_success_callback=log_sleep)
+    ## TASK
+    copy_seqrun_files = list()
     for j in range(10):
-      t4j = \
+      copy_seqrun_chunk = \
         PythonOperator(
           task_id='copy_file_run_{0}_chunk_{1}'.format(i,j),
           dag=dag,
@@ -323,10 +330,11 @@ with dag:
                   'seqrun_id_pull_task_ids':'generate_seqrun_list',
                   'local_seqrun_path':Variable.get('hpc_seqrun_path')},
           python_callable=copy_seqrun_chunk)
-      t4.append(t4j)
-    #tasks.append([ t1 >> t2 >> t3 >> t4 ])
-    generate_seqrun_list >> t1 >> t2 >> t3 >> t4
+      copy_seqrun_files.append(copy_seqrun_chunk)
+    generate_seqrun_list >> generate_seqrun_file_list >> copy_seqrun_file_list >> decide_copy_branch
+    decide_copy_branch >> no_copy_seqrun
+    decide_copy_branch >> copy_seqrun_files
 
   ## PIPELINE
   generate_seqrun_list >> no_ongoing_seqrun
-  #generate_seqrun_list >> tasks
+