|
@@ -1,11 +1,12 @@
|
|
|
import os, json, logging, requests
|
|
|
-from airflow.models import DAG,Variable
|
|
|
-from airflow.operators.bash_operator import BashOperator
|
|
|
-from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
|
|
|
-from airflow.contrib.operators.ssh_operator import SSHOperator
|
|
|
-from airflow.contrib.hooks.ssh_hook import SSHHook
|
|
|
-from airflow.utils.dates import days_ago
|
|
|
+from datetime import timedelta
|
|
|
from requests.auth import HTTPBasicAuth
|
|
|
+from airflow.models import DAG, Variable
|
|
|
+from airflow.utils.dates import days_ago
|
|
|
+from airflow.contrib.hooks.ssh_hook import SSHHook
|
|
|
+from airflow.operators.python_operator import PythonOperator
|
|
|
+from airflow.operators.python_operator import BranchPythonOperator
|
|
|
+from airflow.contrib.operators.ssh_operator import SSHOperator
|
|
|
from igf_airflow.celery.check_celery_queue import fetch_queue_list_from_redis_server
|
|
|
from igf_airflow.celery.check_celery_queue import calculate_new_workers
|
|
|
|
|
@@ -14,6 +15,8 @@ CELERY_FLOWER_BASE_URL = Variable.get('celery_flower_base_url')
|
|
|
args = {
|
|
|
'owner':'airflow',
|
|
|
'start_date':days_ago(2),
|
|
|
+ 'retries': 1,
|
|
|
+ 'retry_delay': timedelta(minutes=2),
|
|
|
'provide_context': True,
|
|
|
}
|
|
|
|
|
@@ -23,7 +26,7 @@ dag = DAG(
|
|
|
dag_id='dag1_calculate_hpc_worker',
|
|
|
catchup=False,
|
|
|
max_active_runs=1,
|
|
|
- schedule_interval="5,10,15,20,25,30,35,40,45,50,55 * * * *",
|
|
|
+ schedule_interval="*/5 * * * *",
|
|
|
default_args=args,
|
|
|
tags=['igf-lims',]
|
|
|
)
|