check_celery_queue.py 3.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import redis,json
  2. def fetch_queue_list_from_redis_server(url):
  3. try:
  4. queue_list = list()
  5. print(url)
  6. print(redis.__version__)
  7. r = redis.from_url(url)
  8. for i in r.keys():
  9. queue = i.decode()
  10. if not queue.startswith('_') and \
  11. not queue.startswith('unacked'):
  12. print(queue)
  13. q_len = r.llen(queue)
  14. queue_list.append({queue:q_len})
  15. return queue_list
  16. except Exception as e:
  17. raise ValueError('Failed to fetch from redis server, error: {0}'.format(e))
  18. def airflow_utils_for_redis(**kwargs):
  19. try:
  20. if 'redis_conf_file' not in kwargs:
  21. raise ValueError('redis_conf_file info is not present in the kwargs')
  22. redis_conf_file = kwargs.get('redis_conf_file')
  23. json_data = dict()
  24. with open(redis_conf_file,'r') as jp:
  25. json_data = json.load(jp)
  26. if 'redis_db' not in json_data:
  27. raise ValueError('redis_db key not present in the conf file')
  28. url = json_data.get('redis_db')
  29. queue_list = fetch_queue_list_from_redis_server(url=url)
  30. return queue_list
  31. except Exception as e:
  32. raise ValueError('Failed to run, error:{0}'.format(e))
  33. def calculate_new_workers(queue_list,active_jobs_dict,max_workers_per_queue=10,max_total_workers=70):
  34. '''
  35. A function for calculating new worker size
  36. :param queue_list: A list dictionary containing all the queued jobs
  37. [{queue_name:job_count}]
  38. :param active_jobs_dict: A dictionary containing all job counts for each queues
  39. {queue_name:{job_state:job_count}}
  40. :param max_workers_per_queue: Max allowed worker per queue, default 10
  41. :param max_total_workers: Max total worker for the queue, default 70
  42. :returns: A dictionary containing all the target jobs
  43. {queue_name:target_job_counts}
  44. and a list of unique queue names
  45. [queue_name]
  46. '''
  47. try:
  48. worker_to_submit = dict()
  49. unique_queue_list = list()
  50. total_active_jobs = 0
  51. for _,job_data in active_jobs_dict.items():
  52. for job_state,job_count in job_data.items():
  53. if job_state in ('Q','R'):
  54. total_active_jobs += job_count
  55. if isinstance(queue_list,list) and \
  56. len(queue_list) > 0 and \
  57. total_active_jobs < max_total_workers:
  58. for entry in queue_list: # this list should be unique
  59. for queue_name,waiting_jobs in entry.items():
  60. if waiting_jobs > max_workers_per_queue:
  61. waiting_jobs = max_workers_per_queue
  62. active_job = active_jobs_dict.get(queue_name)
  63. total_running_for_queue = 0
  64. active_queued_job = 0
  65. if active_job is not None:
  66. for job_state,job_counts in active_job.items():
  67. if job_state in ('Q','R'):
  68. total_running_for_queue += job_counts
  69. if job_state == 'Q':
  70. active_queued_job += job_counts
  71. if active_queued_job < 1:
  72. if total_running_for_queue==0 and \
  73. (total_active_jobs + waiting_jobs) < max_total_workers:
  74. worker_to_submit.\
  75. update({queue_name : waiting_jobs})
  76. if total_running_for_queue > 0:
  77. if waiting_jobs > total_running_for_queue:
  78. waiting_jobs = waiting_jobs - total_running_for_queue
  79. if (total_active_jobs + waiting_jobs) < max_total_workers:
  80. worker_to_submit.\
  81. update({queue_name : waiting_jobs})
  82. total_active_jobs += waiting_jobs
  83. else:
  84. print('Not submitting new jobs for queue {0}'.format(queue_name))
  85. if len(worker_to_submit.keys()) > 0:
  86. unique_queue_list = list(worker_to_submit.keys())
  87. return worker_to_submit,unique_queue_list
  88. except Exception as e:
  89. raise ValueError('Failed to calculate airflow worker size, error: {0}'.format(e))