Browse Source

updated dags

Avik Datta 4 years ago
parent
commit
02177fea6d

+ 47 - 27
dags/dag1_calculate_hpc_worker.py

@@ -1,12 +1,12 @@
-import json
+import json,logging
 from airflow.models import DAG,Variable
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.python_operator import PythonOperator,BranchPythonOperator
 from airflow.contrib.operators.ssh_operator import SSHOperator
 from airflow.contrib.hooks.ssh_hook import SSHHook
 from airflow.utils.dates import days_ago
-from igf_airflow.check_celery_queue import fetch_queue_list_from_redis_server
-from igf_airflow.check_celery_queue import calculate_new_workers
+from igf_airflow.celery.check_celery_queue import fetch_queue_list_from_redis_server
+from igf_airflow.celery.check_celery_queue import calculate_new_workers
 
 args = {
     'owner':'airflow',
@@ -44,7 +44,8 @@ def airflow_utils_for_redis(**kwargs):
     queue_list = fetch_queue_list_from_redis_server(url=url)
     return queue_list
   except Exception as e:
-    raise ValueError('Failed to run, error:{0}'.format(e))
+    logging.error('Failed to run, error:{0}'.format(e))
+    raise
 
 
 def get_new_workers(**kwargs):
@@ -68,50 +69,47 @@ def get_new_workers(**kwargs):
       [q for q in unique_queue_list if q.startswith('hpc')]
     return unique_queue_list
   except Exception as e:
-    raise ValueError('Failed to get new workers, error: {0}'.format(e))
-
+    logging.error('Failed to get new workers, error: {0}'.format(e))
+    raise
 
 
 with dag:
+  ## TASK
   fetch_queue_list_from_redis = \
     PythonOperator(
       task_id='fetch_queue_list_from_redis',
       dag=dag,
       python_callable=airflow_utils_for_redis,
       op_kwargs={"redis_conf_file":Variable.get('redis_conn_file')},
-      queue='igf-lims'
-    )
-
+      queue='igf-lims')
+  ## TASK
   check_hpc_queue = \
     SSHOperator(
       task_id='check_hpc_queue',
       ssh_hook=hpc_hook,
       dag=dag,
       command='source /etc/bashrc;qstat',
-      queue='igf-lims'
-    )
-
+      queue='igf-lims')
+  ## TASK
   fetch_active_jobs_from_hpc = \
     SSHOperator(
       task_id='fetch_active_jobs_from_hpc',
       ssh_hook=hpc_hook,
       dag=dag,
-      command='source /etc/bashrc;bash /project/tgu/data2/airflow_test/github/igf-airflow-hpc/scripts/hpc/hpc_job_count_runner.sh ',
+      command="""
+        source /etc/bashrc;\
+        bash /project/tgu/data2/airflow_test/github/igf-airflow-hpc/scripts/hpc/hpc_job_count_runner.sh """,
       do_xcom_push=True,
-      queue='igf-lims'
-    )
-
+      queue='igf-lims')
+  ## TASK
   calculate_new_worker_size_and_branch = \
     BranchPythonOperator(
       task_id='calculate_new_worker_size_and_branch',
       dag=dag,
       python_callable=get_new_workers,
-      queue='igf-lims',
-    )
-
-  check_hpc_queue >> fetch_active_jobs_from_hpc
-  calculate_new_worker_size_and_branch << [fetch_queue_list_from_redis,fetch_active_jobs_from_hpc]
-
+      queue='igf-lims')
+  ## TASK
+  queue_tasks = list()
   hpc_queue_list = Variable.get('hpc_queue_list')
   for q,data in hpc_queue_list.items():
     pbs_resource = data.get('pbs_resource')
@@ -123,11 +121,33 @@ with dag:
       queue='igf-lims',
       command="""
       {% if ti.xcom_pull(key=params.job_name,task_ids="calculate_new_worker_size_and_branch" ) > 1 %}
-        source /etc/bashrc;qsub -o /dev/null -e /dev/null -k n -m n -N {{ params.job_name }} -J 1-{{ ti.xcom_pull(key=params.job_name,task_ids="calculate_new_worker_size_and_branch" ) }}  {{ params.pbs_resource }} -- /project/tgu/data2/airflow_test/github/igf-airflow-hpc/scripts/hpc/airflow_worker.sh {{  params.airflow_queue }} {{ params.job_name }}
+        source /etc/bashrc; \
+        qsub \
+          -o /dev/null \
+          -e /dev/null \
+          -k n -m n \
+          -N {{ params.job_name }} \
+          -J 1-{{ ti.xcom_pull(key=params.job_name,task_ids="calculate_new_worker_size_and_branch" ) }}  {{ params.pbs_resource }} -- \
+            /project/tgu/data2/airflow_test/github/igf-airflow-hpc/scripts/hpc/airflow_worker.sh {{  params.airflow_queue }} {{ params.job_name }}
       {% else %}
-        source /etc/bashrc;qsub -o /dev/null -e /dev/null -k n -m n -N {{ params.job_name }} {{ params.pbs_resource }} -- /project/tgu/data2/airflow_test/github/igf-airflow-hpc/scripts/hpc/airflow_worker.sh {{  params.airflow_queue }} {{ params.job_name }}
+        source /etc/bashrc;\
+        qsub \
+          -o /dev/null \
+          -e /dev/null \
+          -k n -m n \
+          -N {{ params.job_name }} {{ params.pbs_resource }} -- \
+            /project/tgu/data2/airflow_test/github/igf-airflow-hpc/scripts/hpc/airflow_worker.sh {{  params.airflow_queue }} {{ params.job_name }}
       {% endif %}
       """,
-      params={'pbs_resource':pbs_resource,'airflow_queue':airflow_queue,'job_name':q}
-    )
-    calculate_new_worker_size_and_branch >> t
+      params={'pbs_resource':pbs_resource,
+              'airflow_queue':airflow_queue,
+              'job_name':q})
+    queue_tasks.\
+      append(t)
+
+  ## PIPELINE
+  check_hpc_queue >> fetch_active_jobs_from_hpc
+  calculate_new_worker_size_and_branch << \
+    [fetch_queue_list_from_redis,
+     fetch_active_jobs_from_hpc]
+  calculate_new_worker_size_and_branch >> queue_tasks

+ 21 - 19
dags/dag2_disk_usage.py

@@ -1,11 +1,11 @@
 from datetime import timedelta
-
 from airflow.models import DAG,Variable
 from airflow.utils.dates import days_ago
 from airflow.operators.bash_operator import BashOperator
 from airflow.contrib.operators.ssh_operator import SSHOperator
 from airflow.contrib.hooks.ssh_hook import SSHHook
 
+## ARGS
 default_args = {
     'owner': 'airflow',
     'depends_on_past': False,
@@ -16,6 +16,7 @@ default_args = {
     'retry_delay': timedelta(minutes=5)
 }
 
+## SSH HOOK
 orwell_ssh_hook = \
   SSHHook(
     key_file=Variable.get('hpc_ssh_key_file'),
@@ -40,6 +41,7 @@ igf_lims_ssh_hook = \
     username=Variable.get('hpc_user'),
     remote_host='igf-lims.cc.ic.ac.uk')
 
+## DAG
 dag = \
   DAG(
     dag_id='dag2_disk_usage',
@@ -50,59 +52,59 @@ dag = \
     default_args=default_args)
 
 with dag:
+  ## TASK
   check_orwell_disk = \
     SSHOperator(
       task_id = 'check_orwell_disk',
       dag = dag,
       ssh_hook = orwell_ssh_hook,
       queue='hpc_4G',
-      command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/orwell_disk_usage.sh '
-    )
-
+      command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/orwell_disk_usage.sh ')
+  ## TASK
   check_eliot_disk = \
     SSHOperator(
       task_id = 'check_eliot_disk',
       dag = dag,
       ssh_hook = eliot_ssh_hook,
       queue='hpc_4G',
-      command = 'bash /home/igf/git_repos/IGF-cron-scripts/eliot/eliot_disk_usage.sh '
-    )
-
+      command = 'bash /home/igf/git_repos/IGF-cron-scripts/eliot/eliot_disk_usage.sh ')
+  ## TASK
   check_woolf_disk = \
     SSHOperator(
       task_id = 'check_woolf_disk',
       dag = dag,
       ssh_hook = woolf_ssh_hook,
       queue='hpc_4G',
-      command = 'bash /home/igf/git_repos/IGF-cron-scripts/woolf/woolf_disk_usage.sh '
-    )
-
+      command = 'bash /home/igf/git_repos/IGF-cron-scripts/woolf/woolf_disk_usage.sh ')
+  ## TASK
   check_igf_lims_disk = \
     SSHOperator(
       task_id = 'check_igf_lims_disk',
       dag = dag,
       ssh_hook = igf_lims_ssh_hook,
       queue='hpc_4G',
-      command = 'bash /home/igf/github/IGF-cron-scripts/igf_lims/igf_lims_disk_usage.sh '
-    )
-
+      command = 'bash /home/igf/github/IGF-cron-scripts/igf_lims/igf_lims_disk_usage.sh ')
+  ## TASK
   merge_disk_usage = \
     SSHOperator(
       task_id = 'merge_disk_usage',
       dag = dag,
       ssh_hook = eliot_ssh_hook,
       queue='hpc_4G',
-      command = 'bash /home/igf/git_repos/IGF-cron-scripts/eliot/merge_disk_usage.sh '
-    )
-
+      command = 'bash /home/igf/git_repos/IGF-cron-scripts/eliot/merge_disk_usage.sh ')
+  ## TASK
   internal_usage = \
     SSHOperator(
       task_id = 'internal_usage',
       dag = dag,
       ssh_hook = eliot_ssh_hook,
       queue='hpc_4G',
-      command = 'bash /home/igf/git_repos/IGF-cron-scripts/eliot/internal_usage.sh '
-    )
+      command = 'bash /home/igf/git_repos/IGF-cron-scripts/eliot/internal_usage.sh ')
 
-  merge_disk_usage << [check_orwell_disk,check_eliot_disk,check_woolf_disk,check_igf_lims_disk]
+  ## PIPELINE
+  merge_disk_usage << \
+    [check_orwell_disk,
+     check_eliot_disk,
+     check_woolf_disk,
+     check_igf_lims_disk]
   merge_disk_usage >> internal_usage

+ 7 - 5
dags/dag3_hpc_pipelines.py

@@ -4,6 +4,7 @@ from airflow.models import DAG,Variable
 from airflow.utils.dates import days_ago
 from airflow.operators.bash_operator import BashOperator
 
+## ARGS
 default_args = {
     'owner': 'airflow',
     'depends_on_past': False,
@@ -14,6 +15,7 @@ default_args = {
     'retry_delay': timedelta(minutes=2),
 }
 
+## DAG
 dag = \
   DAG(
     dag_id='dag3_hpc_pipelines',
@@ -24,20 +26,20 @@ dag = \
     default_args=default_args)
 
 with dag:
+  ## TASK
   run_demultiplexing_pipeline = \
     BashOperator(
       task_id='run_demultiplexing_pipeline',
       dag=dag,
       queue='hpc_4G',
-      bash_command='bash /rds/general/user/igf/home/git_repo/IGF-cron-scripts/hpc/run_demultiplexing_pipeline.sh '
-    )
-
+      bash_command='bash /rds/general/user/igf/home/git_repo/IGF-cron-scripts/hpc/run_demultiplexing_pipeline.sh ')
+  ## TASK
   run_primary_analysis_pipeline = \
     BashOperator(
       task_id='run_primary_analysis_pipeline',
       dag=dag,
       queue='hpc_4G',
-      bash_command='bash /rds/general/user/igf/home/git_repo/IGF-cron-scripts/hpc/run_primary_analysis_pipeline.sh '
-    )
+      bash_command='bash /rds/general/user/igf/home/git_repo/IGF-cron-scripts/hpc/run_primary_analysis_pipeline.sh ')
 
+  ## PIPELINE
   run_demultiplexing_pipeline >> run_primary_analysis_pipeline

+ 5 - 3
dags/dag4_lims_metadata.py

@@ -1,9 +1,9 @@
 from datetime import timedelta
-
 from airflow.models import DAG,Variable
 from airflow.utils.dates import days_ago
 from airflow.operators.bash_operator import BashOperator
 
+## ARGS
 default_args = {
     'owner': 'airflow',
     'depends_on_past': False,
@@ -14,6 +14,7 @@ default_args = {
     'retry_delay': timedelta(minutes=5)
 }
 
+## DAG
 dag = \
   DAG(
     dag_id='dag4_lims_metadata',
@@ -24,13 +25,14 @@ dag = \
     default_args=default_args)
 
 with dag:
+  ## TASK
   submit_metadata_fetch_job = \
     BashOperator(
       task_id = 'submit_metadata_fetch_job',
       dag = dag,
       xcom_push=True,
       queue='hpc_4G',
-      bash_command = 'bash /rds/general/user/igf/home/git_repo/IGF-cron-scripts/hpc/lims_metadata/fetch_lims_metadata_qsub.sh '
-    )
+      bash_command = 'bash /rds/general/user/igf/home/git_repo/IGF-cron-scripts/hpc/lims_metadata/fetch_lims_metadata_qsub.sh ')
 
+  ## PIPELINE
   submit_metadata_fetch_job

+ 10 - 9
dags/dag5_primary_analysis_and_qc_processing.py

@@ -1,11 +1,11 @@
 from datetime import timedelta
-
 from airflow.models import DAG,Variable
 from airflow.utils.dates import days_ago
 from airflow.operators.bash_operator import BashOperator
 from airflow.contrib.operators.ssh_operator import SSHOperator
 from airflow.contrib.hooks.ssh_hook import SSHHook
 
+## ARGS
 default_args = {
     'owner': 'airflow',
     'depends_on_past': False,
@@ -16,6 +16,7 @@ default_args = {
     'retry_delay': timedelta(minutes=5),
 }
 
+## SSH HOOK
 orwell_ssh_hook = \
   SSHHook(
     key_file=Variable.get('hpc_ssh_key_file'),
@@ -23,6 +24,7 @@ orwell_ssh_hook = \
     remote_host='orwell.hh.med.ic.ac.uk')
 hpc_hook = SSHHook(ssh_conn_id='hpc_conn')
 
+## DAG
 dag = \
   DAG(
     dag_id='dag5_primary_analysis_and_qc_processing',
@@ -33,30 +35,29 @@ dag = \
     default_args=default_args)
 
 with dag:
+  ## TASK
   update_exp_metadata = \
     BashOperator(
       task_id = 'update_exp_metadata',
       dag = dag,
       queue='hpc_4G',
-      bash_command = 'bash /rds/general/user/igf/home/git_repo/IGF-cron-scripts/hpc/update_exp_metadata.sh '
-    )
-
+      bash_command = 'bash /rds/general/user/igf/home/git_repo/IGF-cron-scripts/hpc/update_exp_metadata.sh ')
+  ## TASK
   find_new_exp_for_analysis = \
     SSHOperator(
       task_id = 'find_new_exp_for_analysis',
       dag = dag,
       ssh_hook = orwell_ssh_hook,
       queue='hpc_4G',
-      command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/find_new_exp_for_analysis.sh '
-    )
-
+      command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/find_new_exp_for_analysis.sh ')
+  ## TASK
   seed_analysis_pipeline = \
     SSHOperator(
       task_id = 'seed_analysis_pipeline',
       dag = dag,
       ssh_hook=hpc_hook,
       queue='hpc_4G',
-      command = 'bash /rds/general/user/igf/home/git_repo/IGF-cron-scripts/hpc/seed_analysis_pipeline.sh '
-    )
+      command = 'bash /rds/general/user/igf/home/git_repo/IGF-cron-scripts/hpc/seed_analysis_pipeline.sh ')
 
+  ## PIPELINE
   update_exp_metadata >> find_new_exp_for_analysis >> seed_analysis_pipeline

+ 16 - 18
dags/dag6_seqrun_processing.py

@@ -1,11 +1,11 @@
 from datetime import timedelta
-
 from airflow.models import DAG,Variable
 from airflow.utils.dates import days_ago
 from airflow.operators.bash_operator import BashOperator
 from airflow.contrib.operators.ssh_operator import SSHOperator
 from airflow.contrib.hooks.ssh_hook import SSHHook
 
+## ARGS
 default_args = {
     'owner': 'airflow',
     'depends_on_past': False,
@@ -16,6 +16,7 @@ default_args = {
     'retry_delay': timedelta(minutes=5),
 }
 
+## DAG
 dag = \
   DAG(
     dag_id='dag6_seqrun_processing',
@@ -25,6 +26,7 @@ dag = \
     tags=['hpc','orwell'],
     default_args=default_args)
 
+## SSH HOOK
 orwell_ssh_hook = \
   SSHHook(
     key_file=Variable.get('hpc_ssh_key_file'),
@@ -34,59 +36,55 @@ orwell_ssh_hook = \
 hpc_hook = SSHHook(ssh_conn_id='hpc_conn')
 
 with dag:
+  ## TASK
   switch_off_project_barcode = \
     SSHOperator(
       task_id = 'switch_off_project_barcode',
       dag = dag,
       ssh_hook = orwell_ssh_hook,
       queue='hpc_4G',
-      command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/switch_off_project_barcode_check.sh '
-    )
-
+      command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/switch_off_project_barcode_check.sh ')
+  ## TASK
   change_samplesheet_for_run = \
     SSHOperator(
       task_id = 'change_samplesheet_for_run',
       dag = dag,
       queue='hpc_4G',
       ssh_hook = orwell_ssh_hook,
-      command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/change_samplesheet_for_seqrun.sh '
-    )
-
+      command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/change_samplesheet_for_seqrun.sh ')
+  ## TASK
   restart_seqrun_processing = \
     SSHOperator(
       task_id = 'restart_seqrun_processing',
       dag = dag,
       queue='hpc_4G',
       ssh_hook = orwell_ssh_hook,
-      command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/restart_seqrun_processing.sh '
-    )
-
+      command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/restart_seqrun_processing.sh ')
+  ## TASK
   register_project_metadata = \
     SSHOperator(
       task_id = 'register_project_metadata',
       dag = dag,
       queue='hpc_4G',
       ssh_hook = orwell_ssh_hook,
-      command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/register_metadata.sh '
-    )
-
+      command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/register_metadata.sh ')
+  ## TASK
   find_new_seqrun = \
     SSHOperator(
       task_id = 'find_new_seqrun',
       dag = dag,
       queue='hpc_4G',
       ssh_hook = orwell_ssh_hook,
-      command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/find_new_seqrun.sh '
-    )
-
+      command = 'bash /home/igf/igf_code/IGF-cron-scripts/orwell/find_new_seqrun.sh ')
+  ## TASK
   seed_demultiplexing_pipe = \
     SSHOperator(
       task_id = 'seed_demultiplexing_pipe',
       dag = dag,
       ssh_hook=hpc_hook,
       queue='hpc_4G',
-      command = 'bash /rds/general/user/igf/home/git_repo/IGF-cron-scripts/hpc/seed_demultiplexing_pipeline.sh '
-    )
+      command = 'bash /rds/general/user/igf/home/git_repo/IGF-cron-scripts/hpc/seed_demultiplexing_pipeline.sh ')
 
+  ## PIPELINE
   switch_off_project_barcode >> change_samplesheet_for_run >> restart_seqrun_processing
   restart_seqrun_processing >> register_project_metadata >> find_new_seqrun >> seed_demultiplexing_pipe

+ 7 - 3
dags/dag7_hpc_scheduler.py

@@ -1,11 +1,11 @@
 from datetime import timedelta
-
 from airflow.models import DAG,Variable
 from airflow.utils.dates import days_ago
 from airflow.operators.bash_operator import BashOperator
 from airflow.contrib.operators.ssh_operator import SSHOperator
 from airflow.contrib.hooks.ssh_hook import SSHHook
 
+## ARG
 default_args = {
     'owner': 'airflow',
     'depends_on_past': False,
@@ -16,6 +16,7 @@ default_args = {
     'retry_delay': timedelta(minutes=5),
 }
 
+## DAG
 dag = \
   DAG(
     dag_id='dag7_hpc_scheduler',
@@ -25,15 +26,18 @@ dag = \
     tags=['igf-lims'],
     default_args=default_args)
 
+## SSH HOOK
 hpc_hook = SSHHook(ssh_conn_id='hpc_conn')
 
 with dag:
+  ## TASK
   run_hpc_scheduler = \
     SSHOperator(
       task_id = 'run_hpc_scheduler',
       dag = dag,
       ssh_hook = hpc_hook,
       queue='igf-lims',
-      command = 'source /etc/bashrc;qsub /project/tgu/data2/airflow_test/github/igf-airflow-hpc/scripts/hpc/run_hpc_scheduler.sh '
-    )
+      command = 'source /etc/bashrc;qsub /project/tgu/data2/airflow_test/github/igf-airflow-hpc/scripts/hpc/run_hpc_scheduler.sh ')
+
+  ## PIPELNE
   run_hpc_scheduler

+ 13 - 2
dags/dag8_copy_ongoing_seqrun.py

@@ -217,20 +217,22 @@ def copy_seqrun_chunk(context):
       reaction='fail')
 
 
-## TASKS
 with dag:
+  ## TASK
   generate_seqrun_list = \
     BranchPythonOperator(
       task_id='generate_seqrun_list',
       dag=dag,
       queue='hpc_4G',
       python_callable=get_ongoing_seqrun_list)
+  ## TASK
   no_ongoing_seqrun = \
     DummyOperator(
       task_id='no_ongoing_seqrun',
       dag=dag,
       queue='hpc_4G',
       on_success_callback=log_sleep)
+  ## TASK
   tasks = list()
   for i in range(5):
     t1 = \
@@ -243,7 +245,14 @@ with dag:
         params={'source_task_id':'generate_seqrun_list',
                 'pull_key':'ongoing_seqruns',
                 'index_number':i},
-        command='python /path/script1.py --run_id {{ ti.xcom_pull(key=params.pull_key,task_ids=params.source_task_id)[ params.index_number ] }}')
+        command="""
+          source /home/igf/igf_code/airflow/env.sh; \
+          python /home/igf/igf_code/airflow/data-management-python/scripts/seqrun_processing/create_file_list_for_ongoing_seqrun.py \
+            --seqrun_base_dir /home/igf/seqrun/illumina \
+            --output_path /home/igf/ongoing_run_tracking \
+            --seqrun_id {{ ti.xcom_pull(key=params.pull_key,task_ids=params.source_task_id)[ params.index_number ] }}
+          """)
+    ## TASK
     t2 = \
       PythonOperator(
         task_id='copy_seqrun_file_list_{0}'.format(i),
@@ -253,6 +262,7 @@ with dag:
         params={'xcom_pull_task_ids':'generate_seqrun_file_list_{0}'.format(i),
                 'seqrun_server':Variable.get('seqrun_server')},
         python_callable=copy_seqrun_manifest_file)
+    ## TASK
     t3 = \
       BranchPythonOperator(
         task_id='decide_copy_branch_{0}'.format(i),
@@ -263,6 +273,7 @@ with dag:
                 'seqrun_chunk_size_key':'seqrun_chunk_size',
                 'child_task_prefix':'copy_file_run_{0}_chunk_'.format(i)},
         python_callable=get_seqrun_chunks)
+    ## TASK
     t4 = list()
     for j in range(10):
       t4j = \