cluster_script.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. #!/usr/bin/env python3
  2. '''
  3. /*#############################################################################
  4. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems(R).
  5. Licensed under the Apache License, Version 2.0 (the "License");
  6. you may not use this file except in compliance with the License.
  7. You may obtain a copy of the License at
  8. http://www.apache.org/licenses/LICENSE-2.0
  9. Unless required by applicable law or agreed to in writing, software
  10. distributed under the License is distributed on an "AS IS" BASIS,
  11. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. See the License for the specific language governing permissions and
  13. limitations under the License.
  14. ############################################################################ */
  15. '''
  16. import sys
  17. import os
  18. import os.path
  19. import getopt
  20. import queue
  21. import time
  22. import datetime
  23. import logging
  24. import configparser
  25. import signal
  26. def signal_handler(signal, frame):
  27. print('\n\nctrl-c\n')
  28. os._exit(1)
  29. signal.signal(signal.SIGINT, signal_handler)
  30. signal.signal(signal.SIGQUIT, signal_handler)
  31. signal.signal(signal.SIGTERM, signal_handler)
  32. from hpcc.cluster.host import Host
  33. from hpcc.cluster.task import ScriptTask
  34. from hpcc.cluster.thread import ThreadWithQueue
  35. class ScriptExecution(object):
  36. '''
  37. This class implements concurrent task execution in a list of hosts.
  38. It provides a main function. Run cluster_script.py --help for the usage
  39. '''
  40. def __init__(self):
  41. '''
  42. Constructor
  43. '''
  44. self.env_conf = '/etc/HPCCSystems/environment.conf'
  45. self.section = 'DEFAULT'
  46. self.hpcc_config = None
  47. self.host_list_file = None
  48. self.log_file = None
  49. self.script_file = None
  50. self.number_of_threads = 5
  51. self.exclude_local = False
  52. self.log_level = "INFO"
  53. self.chksum = None
  54. self.quque = None
  55. self.hosts = []
  56. self.tasks = []
  57. self.threads = []
  58. self.logger = None
  59. def get_config(self, key):
  60. if not self.hpcc_config:
  61. self.hpcc_config = configparser.ConfigParser()
  62. self.hpcc_config.read(self.env_conf)
  63. return self.hpcc_config.get(self.section, key)
  64. def set_logger(self):
  65. if not self.log_file:
  66. self.log_file = self.get_config('log') + \
  67. "/cluster/cc_" + os.path.basename(self.script_file) + \
  68. "_" + datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + ".log"
  69. log_directory = os.path.dirname(self.log_file)
  70. if log_directory and not os.path.exists(log_directory):
  71. os.makedirs(log_directory)
  72. numeric_level = getattr(logging, self.log_level.upper(), None)
  73. self.logger = logging.getLogger("hpcc.cluster")
  74. self.logger.setLevel(numeric_level)
  75. fh = logging.FileHandler(self.log_file)
  76. formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  77. fh.setFormatter(formatter)
  78. self.logger.addHandler(fh)
  79. def setup(self):
  80. self.queue = queue.Queue()
  81. # Get hosts information
  82. if self.host_list_file:
  83. self.hosts = Host.get_hosts_from_file( self.host_list_file, self.exclude_local )
  84. else:
  85. self.hosts = Host.get_hosts_from_env(
  86. self.get_config( 'configs' ) + '/' + \
  87. self.get_config( 'environment' ),
  88. self.get_config( 'path' ), self.exclude_local )
  89. if len(self.hosts) == 0:
  90. print("Could not get any host. At least one host is required.")
  91. print("Refer to the following log file for more information: ")
  92. print(self.log_file)
  93. exit(0)
  94. self.addTasks(self.number_of_threads * 2)
  95. def addTasks(self, n):
  96. current_tasks_size = len(self.tasks)
  97. hosts_size = len(self.hosts)
  98. if current_tasks_size >= hosts_size:
  99. return
  100. number_to_add = n
  101. unscheduled_hosts_size = hosts_size - current_tasks_size
  102. if n > unscheduled_hosts_size:
  103. number_to_add = unscheduled_hosts_size
  104. next_host_index = current_tasks_size
  105. last_host_to_add = next_host_index + number_to_add - 1
  106. self.logger.info("add " + str(number_to_add) + " tasks to thread queue")
  107. while (next_host_index <= last_host_to_add):
  108. task = ScriptTask(next_host_index, self.script_file)
  109. if self.chksum:
  110. task.checksum=self.chksum
  111. self.tasks.append(task)
  112. # Assign the task to a host and add it to schedule queue
  113. self.queue.put((task.run, self.hosts[next_host_index]))
  114. next_host_index += 1
  115. def execute(self):
  116. thread_id = 0
  117. for _ in range(self.number_of_threads):
  118. thread = ThreadWithQueue(thread_id, self.queue)
  119. self.threads.append(thread)
  120. thread.start()
  121. thread_id += 1
  122. print("\nTotal hosts to process: %d\n" % (len(self.hosts)))
  123. while (True):
  124. if self.is_done():
  125. self.report_status()
  126. for thread in self.threads:
  127. thread.stop()
  128. break
  129. else:
  130. self.report_status()
  131. self.update_queue()
  132. time.sleep(2)
  133. def is_done(self):
  134. if len(self.tasks) < len(self.hosts):
  135. return False
  136. for task in self.tasks:
  137. if task.status != "DONE":
  138. return False
  139. self.logger.info("script execution done.")
  140. return True
  141. def update_queue(self):
  142. if len(self.tasks) >= len(self.hosts):
  143. return
  144. if self.queue.qsize() <= self.number_of_threads:
  145. self.addTasks(self.number_of_threads * 2)
  146. def report_status(self):
  147. current_done = 0
  148. current_succeed = 0
  149. current_failed = 0
  150. current_running = 0
  151. current_in_queue = 0
  152. for task in self.tasks:
  153. if task.status == 'DONE':
  154. current_done += 1
  155. if task.result == 'SUCCEED':
  156. current_succeed += 1
  157. else:
  158. current_failed += 1
  159. elif task.status == 'RUNNING':
  160. current_running += 1
  161. else:
  162. current_in_queue += 1
  163. progress = (current_done * 100) / len(self.hosts)
  164. sys.stdout.write("\rExecution progress: %d%%, running: %d, in queue: %d, succeed: %d, failed: %d" \
  165. % (progress, current_running, current_in_queue, current_succeed, current_failed))
  166. sys.stdout.flush();
  167. def check_error(self):
  168. no_error_found = True
  169. for task in self.tasks:
  170. if task.result != 'SUCCEED':
  171. no_error_found = False
  172. script_name = os.path.basename(self.script_file)
  173. if not no_error_found:
  174. print("\n\n\033[91mError found during " + script_name + " execution.\033[0m")
  175. print("Refer to the following log file for more information: ")
  176. print(self.log_file)
  177. else:
  178. print("\n\n" + script_name + " run successfully on all hosts in the cluster")
  179. print("Refer to the following log file for more information: ")
  180. print(self.log_file)
  181. print("\n")
  182. return no_error_found
  183. def usage(self):
  184. print("Usage cluster_script.py [option(s)]\n")
  185. print(" -?, --help print help")
  186. print(" -c --chksum script file md5 checksum")
  187. print(" -e, --env_conf environment.conf full path. The default is")
  188. print(" /etc/HPCCSystems/environment.conf")
  189. print(" -f, --script_file script file")
  190. print(" -h --host_list by default hosts will be retrieved from environment.xml")
  191. print(" -l, --log_level WARNING, INFO, DEBUG. The default is INFO")
  192. print(" -n, --number_of_threads number of working threads for concurrent execution")
  193. print(" -o, --log_file by default only log on error unless -v specified")
  194. print(" default log file is se_<script name>_<yyymmdd_hhmmss>.log")
  195. print(" under <log_dir>/cluster directory")
  196. print(" -s, --section environment.conf section. The default is DEFAULT.")
  197. print(" -x, --exclude_local script will not run on local system")
  198. print("\n");
  199. def process_args(self):
  200. try:
  201. opts, args = getopt.getopt(sys.argv[1:],":c:e:f:h:l:n:o:s:x",
  202. ["help", "chksum","env_conf","script_file","host_list", "number_of_threads",
  203. "section", "log_file", "log_level", "exclude_local"])
  204. except getopt.GetoptError as err:
  205. print(str(err))
  206. self.usage()
  207. exit(0)
  208. for arg, value in opts:
  209. if arg in ("-?", "--help"):
  210. self.usage()
  211. exit(0)
  212. elif arg in ("-c", "--chksum"):
  213. self.chksum = value
  214. elif arg in ("-e", "--env_conf"):
  215. self.env_conf = value
  216. elif arg in ("-h", "--host_list"):
  217. self.host_list_file = value
  218. elif arg in ("-n", "--number_of_thread"):
  219. self.number_of_threads = int(value)
  220. elif arg in ("-o", "--log_file"):
  221. self.log_file = value
  222. elif arg in ("-f", "--script_file"):
  223. self.script_file = value
  224. elif arg in ("-l", "--log_level"):
  225. self.log_level = value
  226. elif arg in ("-s", "--section"):
  227. self.section = value
  228. elif arg in ("-x", "--exclude_local"):
  229. self.exclude_local = True
  230. else:
  231. print("\nUnknown option: " + arg)
  232. self.usage()
  233. exit(0)
  234. def validate_args(self):
  235. if not self.script_file:
  236. print("\nMissing required script file\n")
  237. self.usage()
  238. exit(0)
  239. if not os.path.isfile(self.script_file):
  240. print("\nFile " + self.script_file + " does not exist.\n")
  241. exit(0)
  242. def log_input_parameters(self):
  243. self.logger.info("Current parameters:")
  244. self.logger.info("%-20s" % "env_conf" + ": %s" % self.env_conf )
  245. self.logger.info("%-20s" % "script_file" + ": %s" % self.script_file )
  246. self.logger.info("%-20s" % "log_file" + ": %s" % self.log_file )
  247. self.logger.info("%-20s" % "log_level" + ": %s" % self.log_level )
  248. self.logger.info("%-20s" % "host_list_file" + ": %s" % self.host_list_file )
  249. self.logger.info("%-20s" % "number_of_thread" + ": %d" % self.number_of_threads )
  250. self.logger.info("%-20s" % "section" + ": %s" % self.section )
  251. self.logger.info("%-20s" % "exclude_local" + ": %d" % self.exclude_local )
  252. if __name__ == '__main__':
  253. se = ScriptExecution()
  254. se.process_args()
  255. se.validate_args()
  256. se.set_logger()
  257. se.log_input_parameters()
  258. se.setup()
  259. se.execute()
  260. if se.check_error():
  261. exit(0)
  262. else:
  263. exit(1)