util.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. '''
  2. /*#############################################################################
  3. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems(R).
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. ############################################################################ */
  14. '''
  15. from . import argparse
  16. import platform
  17. import logging
  18. import os
  19. import subprocess
  20. import sys
  21. import traceback
  22. import linecache
  23. import inspect
  24. logger = logging.getLogger('RegressionTestEngine')
  25. from ..common.error import Error
  26. from ..common.shell import Shell
  27. def isPositiveIntNum(numString):
  28. logger.debug("%3d. isPositiveIntNum() result is: '%s'", -1, numString)
  29. for i in range(0, len(numString)):
  30. if (numString[i] < '0') or (numString[i] > '9'):
  31. return False
  32. return True
  33. def checkPqParam(string):
  34. if isPositiveIntNum(string) or (string == '-1'):
  35. value = int(string)
  36. else:
  37. msg = "Wrong value of threadNumber parameter: '"+string+"' !"
  38. raise argparse.ArgumentTypeError(msg)
  39. return value
  40. def checkXParam(string):
  41. param=str(string)
  42. if len(param):
  43. if ('=' in param) or ('None' == param):
  44. value = param
  45. else:
  46. raise Error("5000", err="But got argument:'%s'" % (param) )
  47. else:
  48. msg = "Missing argument of -X parameter!"
  49. raise argparse.ArgumentTypeError(msg)
  50. return value
  51. def getVersionNumbers():
  52. version = platform.python_version_tuple()
  53. verNum = {'main':0, 'minor':0, 'patch':0}
  54. if isPositiveIntNum(version[0]):
  55. verNum['main'] = int(version[0])
  56. if isPositiveIntNum(version[1]):
  57. verNum['minor'] = int(version[1])
  58. if isPositiveIntNum(version[2]):
  59. verNum['patch'] = int(version[2])
  60. return(verNum);
  61. def parentPath(osPath):
  62. # remove current dir
  63. [osPath, sep, curDir] = osPath.rpartition(os.sep)
  64. return osPath
  65. def convertPath(osPath):
  66. hpccPath = ''
  67. osPath = osPath.lstrip(os.sep)
  68. osPath = osPath.replace(os.sep, '::')
  69. for i in range(0, len(osPath)):
  70. if osPath[i] >= 'A' and osPath[i] <= 'Z':
  71. hpccPath = hpccPath +'^'
  72. hpccPath = hpccPath +osPath[i]
  73. return hpccPath
  74. gConfig = None
  75. def setConfig(config):
  76. global gConfig
  77. gConfig = config
  78. def getConfig():
  79. return gConfig
  80. def getEclRunArgs(test, engine, cluster):
  81. retString=''
  82. test.setJobname("")
  83. retString += "ecl run -fpickBestEngine=false --target=%s --cluster=%s --port=%s " % (engine, cluster, gConfig.espSocket)
  84. retString += "--exception-level=warning --noroot --name=\"%s\" " % (test.getJobname())
  85. retString += "%s " % (" ".join(test.getFParameters()))
  86. retString += "%s " % (" ".join(test.getDParameters()))
  87. retString += "%s " % (" ".join(test.getStoredInputParameters()))
  88. args = []
  89. addCommonEclArgs(args)
  90. retString += "%s " % (" ".join(args))
  91. return retString
  92. def addCommonEclArgs(args):
  93. args.append('--server=' + gConfig.espIp)
  94. args.append('--username=' + gConfig.username)
  95. args.append('--password=' + gConfig.password)
  96. args.append('--port=' + gConfig.espSocket)
  97. if gConfig.useSsl.lower() == 'true':
  98. args.append('--ssl')
  99. def queryWuid(jobname, taskId):
  100. shell = Shell()
  101. cmd = 'ecl'
  102. defaults = []
  103. args = []
  104. args.append('status')
  105. args.append('-v')
  106. args.append('-n=' + jobname)
  107. args.append('--wait-read='+ gConfig.wuStatusTimeout ) # Timeout while reading from socket (in seconds)
  108. args.append('--wait-connect=%d' % (int(gConfig.wuStatusTimeout) * 1000) ) # Timeout while reading from socket (in milliseconds)
  109. addCommonEclArgs(args)
  110. res, stderr = shell.command(cmd, *defaults)(*args)
  111. logger.debug("%3d. queryWuid(%s, cmd :'%s') result is: '%s'", taskId, jobname, cmd, res)
  112. wuid = "Not found"
  113. state = 'N/A'
  114. result = 'Fail'
  115. if len(res):
  116. resultItems = res.split(',')
  117. if len(resultItems) == 3:
  118. result = 'OK'
  119. for resultItem in resultItems:
  120. resultItem = resultItem.strip()
  121. [key, val] = resultItem.split(':')
  122. if key == 'ID':
  123. wuid = val
  124. if key == 'state':
  125. state = val
  126. return {'wuid':wuid, 'state':state, 'result':result}
  127. def queryEngineProcess(engine, taskId):
  128. retVal = []
  129. myProc = subprocess.Popen(["ps aux | egrep '"+engine+"' | egrep -v 'grep'"], shell=True, bufsize=8192, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  130. result = myProc.stdout.read() + myProc.stderr.read()
  131. results = result.decode("utf-8").split('\n')
  132. logger.debug("%3d. queryEngineProcess(engine: %s): process(es) :'%s'", taskId, engine, results)
  133. for line in results:
  134. line = line.replace('\n','')
  135. logger.debug("%3d. queryEngineProcess(engine: %s): line:'%s'", taskId, engine, line)
  136. if len(line):
  137. items = line.split()
  138. if len(items) >= 12:
  139. if engine in items[10]:
  140. myProc2 = subprocess.Popen(["sudo readlink -f /proc/" + items[1] + "/exe"], shell=True, bufsize=8192, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  141. result2 = myProc2.stdout.read().decode("utf-8").replace ('\n', '')
  142. binPath = os.path.dirname(result2)
  143. logger.debug("%3d. queryEngineProcess(engine: %s): binary: '%s', binPath:'%s'", taskId, engine, result2, binPath)
  144. if 'slavenum' in line:
  145. ind = [items.index(i) for i in items if i.startswith('--slavenum')]
  146. try:
  147. slaveNum = items[ind[0]].split('=')[1]
  148. retVal.append({ 'process' : result2, 'name' : os.path.basename(items[10]), 'slaveNum' : slaveNum, 'pid' : items[1], 'binPath': binPath})
  149. except Exception as e:
  150. logger.error("%3d. queryEngineProcess(engine: %s): slave number query failed:'%s'", taskId, engine, repr(e))
  151. else:
  152. retVal.append({ 'process' : result2, 'name' : os.path.basename(items[10]), 'slaveNum' : '', 'pid' : items[1], 'binPath': binPath})
  153. return retVal
  154. def createStackTrace(wuid, proc, taskId, logDir = ""):
  155. # Execute this function from CLI:
  156. # ~/MyPython/RegressionSuite$ python -c 'import hpcc.util.util as util; p = util.queryEngineProcess("thormaster"); p+= util.queryEngineProcess("thorslave"); print p; [ util.createStackTrace("na", pp, -1, "~/HPCCSystems-regression/log") for pp in p]; '
  157. binPath = proc['process']
  158. pid = proc['pid']
  159. if logDir == "":
  160. outFile = os.path.expanduser(gConfig.logDir) + '/' + wuid +'-' + proc['name'] + proc['slaveNum'] + '.trace'
  161. else:
  162. outFile = os.path.expanduser(logDir) + '/' + wuid +'-' + proc['name'] + proc['slaveNum'] + '-' + pid + '.trace'
  163. logger.error("%3d. Create Stack Trace for %s%s (pid:%s) into '%s'" % (taskId, proc['name'], proc['slaveNum'], pid, outFile), extra={'taskId':taskId})
  164. cmd = 'sudo gdb --batch --quiet -ex "set interactive-mode off" '
  165. cmd += '-ex "echo \nBacktrace for all threads\n==========================" -ex "thread apply all bt" '
  166. cmd += '-ex "echo \nRegisters:\n==========================\n" -ex "info reg" '
  167. cmd += '-ex "echo \nDisassembler:\n==========================\n" -ex "disas" '
  168. cmd += '-ex "quit" ' + binPath + ' ' + pid + ' > ' + outFile + ' 2>&1'
  169. myProc = subprocess.Popen([ cmd ], shell=True, bufsize=8192, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  170. result = myProc.stdout.read() + myProc.stderr.read()
  171. logger.debug("%3d. Create Stack Trace result:'%s'", taskId, result)
  172. def abortWorkunit(wuid, taskId = -1, engine = None):
  173. state=False
  174. wuid = wuid.strip()
  175. if wuid.upper().startswith('W'):
  176. logger.debug("%3d. abortWorkunit(wuid:'%s', engine: '%s')", taskId, wuid, str(engine))
  177. logger.debug("%3d. config: generateStackTrace: '%s'", taskId, str(gConfig.generateStackTrace))
  178. if (gConfig.generateStackTrace and (engine != None)):
  179. if isSudoer():
  180. if engine.startswith('thor'):
  181. hpccProcesses = queryEngineProcess("thormaster", taskId)
  182. hpccProcesses += queryEngineProcess("thorslave", taskId)
  183. elif engine.startswith('hthor'):
  184. hpccProcesses = queryEngineProcess("eclagent", taskId)
  185. hpccProcesses += queryEngineProcess("hthor", taskId)
  186. elif engine.startswith('roxie'):
  187. hpccProcesses = queryEngineProcess("roxie", taskId)
  188. if len(hpccProcesses) > 0:
  189. for p in hpccProcesses:
  190. createStackTrace(wuid, p, taskId)
  191. else:
  192. logger.error("%3d. abortWorkunit(wuid:'%s', engine:'%s') related process to generate stack trace not found.", taskId, wuid, str(engine))
  193. pass
  194. else:
  195. err = Error("7100")
  196. logger.error("%s. generateStackTrace error:%s" % (taskId, err))
  197. logger.error(traceback.format_exc())
  198. raise Error(err)
  199. pass
  200. if gConfig.preAbort != None:
  201. try:
  202. logger.error("%3d. Execute pre abort script '%s'", taskId, str(gConfig.preAbort), extra={'taskId':taskId})
  203. outFile = os.path.expanduser(gConfig.logDir) + '/' + wuid +'-preAbort.log'
  204. command=gConfig.preAbort + " > " + outFile + " 2>&1"
  205. myProc = subprocess.Popen([ command ], shell=True, bufsize=8192, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  206. result = myProc.stdout.read() + myProc.stderr.read()
  207. logger.debug("%3d. Pre abort script result '%s'", taskId, wuid, str(result))
  208. logger.error("%3d. Pre abort script result stored into '%s'", taskId, outFile, extra={'taskId':taskId})
  209. except Exception as e:
  210. printException("preAbort scrip:" + repr(e), True)
  211. logger.error("%3d. Exception in executing pre abort script: '%s'", taskId, repr(e), extra={'taskId':taskId})
  212. pass
  213. shell = Shell()
  214. cmd = 'ecl'
  215. defaults=[]
  216. args = []
  217. args.append('abort')
  218. args.append('-wu=' + wuid)
  219. args.append('--wait-read=' + gConfig.wuAbortTimeout ) # Timeout while reading from socket (in seconds)
  220. addCommonEclArgs(args)
  221. state=shell.command(cmd, *defaults)(*args)
  222. else:
  223. logger.error("%3d. abortWorkunit(wuid:'%s', engine:'%s') invalid WUID.", taskId, wuid, str(engine))
  224. return state
  225. def createZAP(wuid, taskId, reason=''):
  226. retVal = 'Error in create ZAP'
  227. zapFilePath = os.path.join(os.path.expanduser(gConfig.regressionDir), gConfig.zapDir)
  228. shell = Shell()
  229. cmd = 'ecl'
  230. defaults=[]
  231. args = []
  232. args.append('zapgen')
  233. args.append(wuid)
  234. args.append('--path=' + zapFilePath)
  235. if reason != '':
  236. args.append('--description=' + reason)
  237. else:
  238. args.append('--description="Failed in OBT"')
  239. args.append('--inc-thor-slave-logs')
  240. addCommonEclArgs(args)
  241. try:
  242. state=shell.command(cmd, *defaults)(*args)
  243. logger.debug("%3d. createZAP(state:%s)", taskId, str(state))
  244. if state[1] != '':
  245. retVal = state[1]
  246. else:
  247. retVal = state[0]
  248. except Exception as ex:
  249. state = "Unable to query "+ str(ex)
  250. logger.debug("%3d. %s in createZAP(%s)", taskId, state, wuid)
  251. retVal += " (" + str(ex). replace('\n',' ') + ")"
  252. return retVal
  253. def getRealIPAddress():
  254. ipAddress = '127.0.0.1'
  255. found = False
  256. try:
  257. proc = subprocess.Popen(['ip', '-o', '-4', 'addr', 'show'], shell=False, bufsize=8192, stdout=subprocess.PIPE, stderr=None)
  258. result = proc.communicate()[0]
  259. results = result.decode("utf-8").split('\n')
  260. for line in results:
  261. if 'scope global' in line:
  262. items = line.split()
  263. ipAddress = items[3].split('/')[0]
  264. found = True
  265. break;
  266. if not found:
  267. for line in results:
  268. items = line.split()
  269. ipAddress = items[3].split('/')[0]
  270. break;
  271. except OSError:
  272. pass
  273. finally:
  274. pass
  275. return ipAddress
  276. def checkClusters(clusters, targetSet):
  277. targetEngines =[]
  278. if 'all' in clusters:
  279. for engine in gConfig.Engines:
  280. targetEngines.append(str(engine))
  281. else:
  282. for engine in clusters:
  283. engine = engine.strip()
  284. if engine in gConfig.Engines:
  285. targetEngines.append(engine)
  286. else:
  287. logger.error("%s. Unknown engine:'%s' in %s:'%s'!" % (1, engine, targetSet, clusters))
  288. raise Error("4000")
  289. return targetEngines
  290. def isLocalIP(ip):
  291. retVal=False
  292. if '127.0.0.1' == ip:
  293. retVal = True
  294. elif ip == getRealIPAddress():
  295. retVal = True
  296. return retVal
  297. def checkHpccStatus():
  298. # Check HPCC Systems status on local/remote target
  299. isLocal = False
  300. isIpChecked={}
  301. config = getConfig()
  302. ip = config.espIp
  303. isIpChecked[ip] = False
  304. isLocal = isLocalIP(ip)
  305. try:
  306. if isLocal:
  307. # There is no remote version (yet)
  308. myProc = subprocess.Popen(["ecl --version"], shell=True, bufsize=8192, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  309. result = myProc.stdout.read() + myProc.stderr.read()
  310. results = result.decode("utf-8").split('\n')
  311. for line in results:
  312. if 'not found' in line:
  313. err = Error("6000")
  314. logger.error("%s. %s:'%s'" % (1, err, line))
  315. raise err
  316. break
  317. else:
  318. # Maybe use SSH to run "ecl --version" on a remote node
  319. pass
  320. args = []
  321. addCommonEclArgs(args)
  322. myProc = subprocess.Popen("ecl getname --wuid 'W*' --limit=5 " + " ".join(args), shell=True, bufsize=8192, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  323. result = myProc.stdout.read() + myProc.stderr.read()
  324. results = result.decode("utf-8").split('\n')
  325. for line in results:
  326. if "Error connecting" in line:
  327. if isLocal:
  328. err = Error("6001")
  329. logger.error("%s. %s:'%s local target!'" % (1, err, line))
  330. raise (err)
  331. else:
  332. err = Error("6004")
  333. logger.error("%s. %s:'%s remote target!'" % (1, err, line))
  334. raise (err)
  335. break
  336. if "command not found" in line:
  337. err = Error("6002")
  338. logger.error("%s. %s:'%s'" % (1, err, line))
  339. raise (err)
  340. break
  341. isIpChecked[ip] = True
  342. except OSError:
  343. err = Error("6002")
  344. logger.error("%s. checkHpccStatus error:%s!" % (1, err))
  345. raise Error(err)
  346. except ValueError:
  347. err = Error("6003")
  348. logger.error("%s. checkHpccStatus error:%s!" % (1, err))
  349. raise Error(err)
  350. finally:
  351. pass
  352. def isSudoer(testId = -1):
  353. retVal = False
  354. if 'linux' in sys.platform :
  355. tryCount = 5
  356. cmd = "timeout -k 2 2 sudo id && echo Access granted || echo Access denied"
  357. while tryCount > 0:
  358. tryCount -= 1
  359. myProc = subprocess.Popen([cmd], shell=True, bufsize=8192, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  360. (myStdout, myStderr) = myProc.communicate()
  361. myStdout = myStdout.decode("utf-8")
  362. myStderr = myStderr.decode("utf-8")
  363. result = "returncode:" + str(myProc.returncode) + ", stdout:\n'" + myStdout + "', stderr:\n'" + myStderr + "'."
  364. logger.debug("%3d. isSudoer() result is: '%s' (try count is:%d)", testId, result, tryCount)
  365. if 'timeout: invalid option' in myStderr:
  366. logger.debug("%3d. isSudoer() result is: '%s'", testId, result)
  367. cmd = "timeout 2 sudo id && echo Access granted || echo Access denied"
  368. logger.debug("%3d. try is without '-k 2' parameter: '%s'", testId, cmd)
  369. continue
  370. if 'Access denied' not in myStdout:
  371. retVal = True
  372. break
  373. if retVal == False:
  374. logger.debug("%3d. isSudoer() result is: '%s'", testId, result)
  375. return retVal
  376. def clearOSCache(testId = -1):
  377. if 'linux' in sys.platform :
  378. if isSudoer(testId):
  379. myProc = subprocess.Popen(["free; sudo -S sync; echo 3 | sudo tee /proc/sys/vm/drop_caches; free"], shell=True, bufsize=8192, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  380. (myStdout, myStderr) = myProc.communicate()
  381. myStdout = myStdout.decode("utf-8")
  382. myStderr = myStderr.decode("utf-8")
  383. result = "returncode:" + str(myProc.returncode) + ", stdout:\n'" + myStdout + "', stderr:\n'" + myStderr + "'."
  384. logger.debug("%3d. clearOSCache() result is: '%s'", testId, result)
  385. else:
  386. err = Error("7000")
  387. logger.error("%s. clearOSCache error:%s" % (testId, err))
  388. logger.error(traceback.format_exc())
  389. raise Error(err)
  390. else:
  391. logger.debug("%3d. clearOSCache() not supported on %s.", testId, sys.platform)
  392. pass
  393. def printException(msg = '', debug = False):
  394. exc_type, exc_obj, tb = sys.exc_info()
  395. f = tb.tb_frame
  396. lineno = tb.tb_lineno
  397. filename = f.f_code.co_filename
  398. linecache.checkcache(filename)
  399. line = linecache.getline(filename, lineno, f.f_globals)
  400. if debug:
  401. logger.debug('EXCEPTION IN (%s, LINE %s CODE:"%s"): %s' % ( filename, lineno, line.strip(), msg))
  402. logger.debug(traceback.format_exc())
  403. else:
  404. print ('EXCEPTION IN (%s, LINE %s CODE:"%s"): %s' % ( filename, lineno, line.strip(), msg))
  405. print(traceback.format_exc())
  406. def getCodeInfo(currentFrame= None):
  407. retVal = "'' (No frame info)"
  408. if currentFrame != None:
  409. retVal = "%s:%s" % (os.path.basename(os.path.abspath(inspect.getfile(currentFrame))), currentFrame.f_lineno)
  410. return(retVal)