check_celery_queue.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. import redis
  2. def fetch_queue_list_from_redis_server(url):
  3. try:
  4. queue_list = list()
  5. r = redis.from_url(url)
  6. for i in r.keys():
  7. queue = i.decode()
  8. if not queue.startswith('_'):
  9. q_len = r.llen(queue)
  10. queue_list.append({queue:q_len})
  11. return queue_list
  12. except Exception as e:
  13. raise ValueError('Failed to fetch from redis server, error: {0}'.format(e))
  14. def calculate_new_workers(queue_list,active_jobs_dict,max_workers_per_queue=10,max_total_workers=70):
  15. '''
  16. A function for calculating new worker size
  17. :param queue_list: A list dictionary containing all the queued jobs
  18. [{queue_name:job_count}]
  19. :param active_jobs_dict: A dictionary containing all job counts for each queues
  20. {queue_name:{job_state:job_count}}
  21. :param max_workers_per_queue: Max allowed worker per queue, default 10
  22. :param max_total_workers: Max total worker for the queue, default 70
  23. :returns: A list of dictionary containing all the target jobs
  24. [{queue_name:target_job_counts}]
  25. '''
  26. try:
  27. worker_to_submit = list()
  28. total_active_jobs = 0
  29. for _,job_data in active_jobs_dict.items():
  30. for job_state,job_count in job_data.items():
  31. if job_state in ('Q','R'):
  32. total_active_jobs += job_count
  33. if isinstance(queue_list,list) and \
  34. len(queue_list) > 0 and \
  35. total_active_jobs < max_total_workers:
  36. for entry in queue_list:
  37. for queue_name,waiting_jobs in entry.items():
  38. target_job = dict()
  39. if waiting_jobs > max_workers_per_queue:
  40. waiting_jobs = max_workers_per_queue
  41. active_job = active_jobs_dict.get(queue_name)
  42. total_running_for_queue = 0
  43. active_queued_job = 0
  44. if active_job is not None:
  45. for job_state,job_counts in active_job.items():
  46. if job_state in ('Q','R'):
  47. total_running_for_queue += job_counts
  48. if job_state == 'Q':
  49. active_queued_job += job_counts
  50. if active_queued_job < 1:
  51. if total_running_for_queue==0 and \
  52. (total_active_jobs + waiting_jobs) < max_total_workers:
  53. target_job.update({queue_name : waiting_jobs})
  54. if total_running_for_queue > 0:
  55. if waiting_jobs > total_running_for_queue:
  56. waiting_jobs = waiting_jobs - total_running_for_queue
  57. if (total_active_jobs + waiting_jobs) < max_total_workers:
  58. target_job.update({queue_name:waiting_jobs})
  59. total_active_jobs += waiting_jobs
  60. else:
  61. print('Not submitting new jobs for queue {0}'.format(queue_name))
  62. worker_to_submit.append(target_job)
  63. return worker_to_submit
  64. except Exception as e:
  65. raise ValueError('Failed to calculate airflow worker size, error: {0}'.format(e))