task.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. '''
  2. /*#############################################################################
  3. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems(R).
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. ############################################################################ */
  14. '''
  15. import time
  16. import subprocess
  17. import hpcc.cluster.host
  18. import logging
  19. import hashlib
  20. class Task(object):
  21. '''
  22. An abstract class to represent a task which will be executed for all hosts
  23. of cluster
  24. '''
  25. def __init__(self, id):
  26. '''
  27. Constructor
  28. '''
  29. self._id = id
  30. self._status = 'INIT' #INIT,RUNNING,DONE
  31. self._start_time = None
  32. self._end_time = None
  33. self._duration = None
  34. self._message = ""
  35. self._result = "UNKNOWN" #UNKNOWN, SUCCEED,FAILED
  36. self.logger = logging.getLogger("hpcc.cluster.Task." + str(id))
  37. @property
  38. def id(self):
  39. return self._id
  40. @property
  41. def duration(self):
  42. return self._duration
  43. @property
  44. def status(self):
  45. return self._status
  46. @property
  47. def result(self):
  48. return self._result
  49. @property
  50. def message(self):
  51. return self._message
  52. def pretask(self):
  53. self._start_time = time.time()
  54. def posttask(self):
  55. self._end_time = time.time()
  56. self._duration = self._end_time - self._start_time
  57. self._status = "DONE"
  58. def run (self, host):
  59. self.pretask()
  60. self._status = "RUNNING"
  61. self.worker(host)
  62. self.posttask()
  63. def worker(self, host):
  64. pass
  65. class ScriptTask(Task):
  66. '''
  67. A subclass of Task to implement shell script task.
  68. '''
  69. def __init__(self, id, script_file=None):
  70. '''
  71. Constructor
  72. '''
  73. Task.__init__(self, id)
  74. self.script_file = script_file
  75. self.logger = logging.getLogger("hpcc.cluster.ScriptTask." + str(id))
  76. self._checksum = None
  77. @property
  78. def checksum(self):
  79. return self._checksum
  80. @checksum.setter
  81. def checksum(self, chksum):
  82. self._checksum = chksum
  83. def validateScriptFile(self):
  84. if not self._checksum:
  85. return True
  86. with open (self.script_file) as f:
  87. file_md5 = hashlib.md5(f.read()).hexdigest()
  88. if self._checksum == file_md5:
  89. return True;
  90. else:
  91. return False
  92. def worker(self, host):
  93. '''
  94. worker: execute provided script with subprocess module.
  95. Python documentation discourage 'shell=True' option to
  96. avoid shell injection attack. Since we need general shell
  97. execution environment we need this option. To add security
  98. we add md5 check if the checksum is provided from command0-line
  99. option. Also user should ensure the script permission to
  100. protect the script from malicious modification.
  101. '''
  102. cmd = self.script_file + " " + host.ip.decode('utf-8')
  103. self.logger.info(cmd)
  104. if not self.validateScriptFile():
  105. self.logger.error("Script file check sum does not match")
  106. self._result = "FAILED"
  107. return
  108. try:
  109. # subprocess.check_output is more convenvient but only available
  110. # on Python 2.7+
  111. process = subprocess.Popen(cmd, shell=True,
  112. stdout=subprocess.PIPE,
  113. stderr=subprocess.STDOUT)
  114. process.wait()
  115. errcode = process.returncode
  116. output = process.communicate()[0]
  117. self._message = output
  118. if (errcode == 0):
  119. self.logger.debug(self._message)
  120. self._result = "SUCCEED"
  121. else:
  122. self.logger.error(self._message)
  123. self._result = "FAILED"
  124. except Exception as e:
  125. self._message = "Catch Exception: \n" + e.output
  126. self.logger.error(self._message)
  127. self._result = "FAILED"
  128. self.logger.info("result: " + self._result)