|
@@ -5,7 +5,7 @@ from airflow.operators.python_operator import PythonOperator,BranchPythonOperato
|
|
|
from airflow.contrib.operators.ssh_operator import SSHOperator
|
|
|
from airflow.contrib.hooks.ssh_hook import SSHHook
|
|
|
from airflow.utils.dates import days_ago
|
|
|
-from igf_airflow.check_celery_queue import fetch_queue_list_from_redis_server,airflow_utils_for_redis
|
|
|
+from igf_airflow.check_celery_queue import fetch_queue_list_from_redis_server
|
|
|
from igf_airflow.check_celery_queue import calculate_new_workers
|
|
|
|
|
|
args = {
|
|
@@ -26,6 +26,27 @@ dag = DAG(
|
|
|
)
|
|
|
|
|
|
|
|
|
+def airflow_utils_for_redis(**kwargs):
|
|
|
+ """
|
|
|
+ A function for dag1, TO DO
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ if 'redis_conf_file' not in kwargs:
|
|
|
+ raise ValueError('redis_conf_file info is not present in the kwargs')
|
|
|
+
|
|
|
+ redis_conf_file = kwargs.get('redis_conf_file')
|
|
|
+ json_data = dict()
|
|
|
+ with open(redis_conf_file,'r') as jp:
|
|
|
+ json_data = json.load(jp)
|
|
|
+ if 'redis_db' not in json_data:
|
|
|
+ raise ValueError('redis_db key not present in the conf file')
|
|
|
+ url = json_data.get('redis_db')
|
|
|
+ queue_list = fetch_queue_list_from_redis_server(url=url)
|
|
|
+ return queue_list
|
|
|
+ except Exception as e:
|
|
|
+ raise ValueError('Failed to run, error:{0}'.format(e))
|
|
|
+
|
|
|
+
|
|
|
def get_new_workers(**kwargs):
|
|
|
try:
|
|
|
if 'ti' not in kwargs:
|