|
@@ -60,7 +60,7 @@ with dag:
|
|
|
dag=dag,
|
|
|
python_callable=airflow_utils_for_redis,
|
|
|
op_kwargs={"redis_conf_file":Variable.get('redis_conf_file')},
|
|
|
- queue='igf_lims_queue'
|
|
|
+ queue='igf-lims'
|
|
|
)
|
|
|
|
|
|
check_hpc_queue = \
|
|
@@ -69,7 +69,7 @@ with dag:
|
|
|
ssh_hook=hpc_hook,
|
|
|
dag=dag,
|
|
|
command='qstat',
|
|
|
- queue='igf_lims_queue'
|
|
|
+ queue='igf-lims'
|
|
|
)
|
|
|
|
|
|
fetch_active_jobs_from_hpc = \
|
|
@@ -79,7 +79,7 @@ with dag:
|
|
|
dag=dag,
|
|
|
command='python /path/count_job_script.py',
|
|
|
do_xcom_push=True,
|
|
|
- queue='igf_lims_queue'
|
|
|
+ queue='igf-lims'
|
|
|
)
|
|
|
|
|
|
calculate_new_worker_size_and_branch = \
|
|
@@ -87,14 +87,14 @@ with dag:
|
|
|
task_id='calculate_new_worker_size_and_branch',
|
|
|
dag=dag,
|
|
|
python_callable=get_new_workers,
|
|
|
- queue='igf_lims_queue',
|
|
|
+ queue='igf-lims',
|
|
|
)
|
|
|
-
|
|
|
+ """
|
|
|
1Gb_worker = \
|
|
|
BashOperator(
|
|
|
task_id='1Gb',
|
|
|
dag=dag,
|
|
|
- queue='igf_lims_queue',
|
|
|
+ queue='igf-lims',
|
|
|
bash_command='echo "{{ ti.xcom_pull(key="1Gb",task_ids="calculate_new_worker_size_and_branch") }}"'
|
|
|
)
|
|
|
|
|
@@ -233,9 +233,10 @@ with dag:
|
|
|
queue='igf_lims_queue',
|
|
|
bash_command='echo "{{ ti.xcom_pull(key="128Gb16t72hr",task_ids="calculate_new_worker_size_and_branch") }}"'
|
|
|
)
|
|
|
-
|
|
|
+ """
|
|
|
check_hpc_queue >> fetch_active_jobs_from_hpc
|
|
|
calculate_new_worker_size_and_branch << [fetch_queue_list_from_redis,fetch_active_jobs_from_hpc]
|
|
|
+ """
|
|
|
fetch_active_jobs_from_hpc >> [1Gb_worker,
|
|
|
1Gb4t_worker,
|
|
|
2Gb4t_worker,
|
|
@@ -254,4 +255,5 @@ with dag:
|
|
|
42Gb16t_worker,
|
|
|
64Gb16t48hr_worker,
|
|
|
128Gb16t72hr_worker
|
|
|
- ]
|
|
|
+ ]
|
|
|
+ """
|