regress.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. '''
  2. /*#############################################################################
  3. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems(R).
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. ############################################################################ */
  14. '''
  15. import logging
  16. logger = logging.getLogger('RegressionTestEngine')
  17. import os
  18. import sys
  19. import time
  20. try:
  21. import _thread
  22. except Exception as e:
  23. logger.debug("%s" % (repr(e)))
  24. import threading
  25. import inspect
  26. from ..common.error import Error
  27. from ..common.report import Report
  28. from ..regression.suite import Suite
  29. from ..util.ecl.cc import ECLCC
  30. from ..util.ecl.command import ECLcmd
  31. from ..util.expandcheck import ExpandCheck
  32. from ..util.util import getConfig, queryWuid, abortWorkunit, getVersionNumbers, createZAP, getEclRunArgs, printException, getCodeInfo
  33. class Regression:
  34. def timeoutHandler(self):
  35. for th in range(self.maxthreads):
  36. if self.timeouts[th] > 0:
  37. self.timeouts[th] -= 1
  38. if self.timeoutHandlerEnabled:
  39. self.timeoutThread = threading.Timer(1.0, self.timeoutHandler)
  40. self.timeoutThread.start()
  41. def __del__(self):
  42. logger.debug("Regression destructor.")
  43. pass
  44. def close(self):
  45. logger.debug("Regression close().")
  46. for threadId in range(self.maxthreads):
  47. if self.exitmutexes[threadId].locked():
  48. query = self.suiteItems[self.taskParam[threadId]['taskId']]
  49. logger.warning("Thread :%d is locked for %s, terminate it." % (threadId, query.ecl))
  50. # Emultae timeout to force abort
  51. self.timeouts[threadId] = 0
  52. self.retryCount = 0
  53. self.CheckTimeout(self.taskParam[threadId]['taskId']+1, threadId, query)
  54. self.StopTimeoutThread()
  55. def __init__(self, args):
  56. self.args = args
  57. self.config = getConfig()
  58. self.suites = {}
  59. # Use the existing logger instance
  60. self.log = self.config.log
  61. self.loglevel = args.loglevel
  62. if args.loglevel == 'info':
  63. logger.setLevel(logging.INFO)
  64. elif args.loglevel == 'debug':
  65. logger.setLevel(logging.DEBUG)
  66. if args.timeout == '0':
  67. self.timeout = int(self.config.timeout);
  68. else:
  69. self.timeout = int(args.timeout)
  70. logger.debug("Suite timeout: %d sec / testcase", self.timeout)
  71. if not args.suiteDir:
  72. self.suiteDir = self.config.suiteDir
  73. if not self.suiteDir:
  74. raise Error("2002")
  75. else:
  76. self.suiteDir = args.suiteDir
  77. if args.keyDir == self.config.keyDir:
  78. self.keyDir = self.config.keyDir
  79. if not self.keyDir:
  80. raise Error("2003")
  81. else:
  82. self.keyDir = args.keyDir
  83. logger.debug("Try to use alternative key directory: %s", self.keyDir)
  84. self.suiteDir = ExpandCheck.dirExists(self.suiteDir, True)
  85. self.regressionDir = ExpandCheck.dirExists(self.config.regressionDir, True)
  86. self.logDir = ExpandCheck.dirExists(self.config.logDir, True)
  87. self.dir_ec = ExpandCheck.dirExists(os.path.join(self.suiteDir, self.config.eclDir), True)
  88. self.dir_ex = ExpandCheck.dirExists(os.path.join(self.suiteDir, self.keyDir), True)
  89. self.dir_a = os.path.join(self.regressionDir, self.config.archiveDir)
  90. self.dir_r = os.path.join(self.regressionDir, self.config.resultDir)
  91. self.dir_zap = os.path.join(self.regressionDir,self.config.zapDir)
  92. self.dir_inc = self.dir_ec
  93. self.setupDir = ExpandCheck.dirExists(os.path.join(self.suiteDir, self.config.setupDir), True)
  94. logger.debug("Suite Dir : %s", self.suiteDir)
  95. logger.debug("Setup Dir : %s", self.setupDir)
  96. logger.debug("Regression Dir : %s", self.regressionDir)
  97. logger.debug("Result Dir : %s", self.dir_r)
  98. logger.debug("Log Dir : %s", self.logDir)
  99. logger.debug("ECL Dir : %s", self.dir_ec)
  100. logger.debug("Key Dir : %s", self.dir_ex)
  101. logger.debug("Archive Dir : %s", self.dir_a)
  102. logger.debug("ZAP Dir : %s", self.dir_zap )
  103. logger.debug("INC Dir : %s", self.dir_inc )
  104. numOfThreads=1
  105. if 'pq' in args:
  106. if args.pq == 0:
  107. numOfThreads = 1;
  108. else:
  109. numOfThreads = args.pq
  110. self.loggermutex = _thread.allocate_lock()
  111. self.numOfCpus = 2
  112. self.threadPerCpu = 2
  113. ver = getVersionNumbers()
  114. if numOfThreads == -1:
  115. if (ver['main'] >= 2) and (ver['minor'] >= 7):
  116. if 'linux' in sys.platform :
  117. command = "grep 'core\|processor' /proc/cpuinfo | awk '{print $3}' | sort -nru | head -1"
  118. cpuInfo = os.popen(command).read()
  119. if cpuInfo == "":
  120. self.numOfCpus = 1
  121. else:
  122. self.numOfCpus = int(cpuInfo)+1
  123. numOfThreads = self.numOfCpus * self.threadPerCpu
  124. elif (ver['main'] <= 2) and (ver['minor'] < 7):
  125. numOfThreads = self.numOfCpus * self.threadPerCpu
  126. logger.debug("Number of CPUs:%d, NUmber of threads:%d", self.numOfCpus, numOfThreads )
  127. self.maxthreads = numOfThreads
  128. self.maxtasks = 0
  129. self.exitmutexes = [_thread.allocate_lock() for i in range(self.maxthreads)]
  130. self.timeouts = [(-1) for i in range(self.maxthreads)]
  131. self.timeoutHandlerEnabled = False;
  132. self.timeoutThread = threading.Timer(1.0, self.timeoutHandler)
  133. def setLogLevel(self, level):
  134. self.log.setLevel(level)
  135. def bootstrap(self, engine, cluster, args, fileList=None):
  136. self.createDirectory(self.regressionDir)
  137. self.createDirectory(self.dir_a)
  138. self.createDirectory(self.dir_r)
  139. self.createDirectory(self.logDir)
  140. self.createDirectory(self.dir_zap)
  141. self.suites[engine] = Suite(engine, cluster, self.dir_ec, self.dir_a, self.dir_ex, self.dir_r, self.logDir, self.dir_inc, args, False, fileList)
  142. self.maxtasks = len(self.suites[engine].getSuite())
  143. def createDirectory(self, dir_n):
  144. if not os.path.isdir(dir_n):
  145. os.makedirs(dir_n)
  146. def Setup(self, args):
  147. self.createDirectory(self.regressionDir)
  148. self.createDirectory(self.dir_a)
  149. self.createDirectory(self.dir_r)
  150. self.createDirectory(self.logDir)
  151. self.createDirectory(self.dir_zap)
  152. self.setupSuite = Suite(args.engine, args.cluster, self.setupDir, self.dir_a, self.dir_ex, self.dir_r, self.logDir, self.dir_inc, args, True, args.setup)
  153. self.maxtasks = len(self.setupSuite.getSuite())
  154. return self.setupSuite
  155. def buildLogging(self, name):
  156. report = Report(name)
  157. curTime = time.strftime("%y-%m-%d-%H-%M-%S")
  158. logName = name + "." + curTime + ".log"
  159. self.args.testId=curTime
  160. logHandler = os.path.join(self.logDir, logName)
  161. self.args.testFile=logHandler
  162. self.saveConfig()
  163. self.log.addHandler(logHandler, self.loglevel )
  164. return (report, logHandler)
  165. def closeLogging(self):
  166. self.log.removeHandler()
  167. def saveConfig(self):
  168. confLogName = 'environment-'+self.args.testId + ".conf"
  169. logFileName = os.path.join(self.logDir, confLogName)
  170. try:
  171. log = open(logFileName, "w");
  172. log.write("Environment info\n")
  173. log.write("Args:\n")
  174. for arg in self.args.__dict__:
  175. argStr = arg +'=\"'+str(self.args.__dict__[arg])+'\"'
  176. log.write(argStr+"\n")
  177. log.write("\nConfigs:\n")
  178. for conf in self.config.__dict__:
  179. if conf != '_dict__d':
  180. confStr = conf +'=\"'+str(self.config.__dict__[conf])+'\"'
  181. log.write(confStr+"\n")
  182. else:
  183. for subConf in self.config.__dict__[conf]:
  184. confStr = subConf +'=\"'+str(self.config.__dict__[conf][subConf])+'\"'
  185. log.write(confStr+"\n")
  186. log.close()
  187. except IOError:
  188. logger.error("Can't open %s file to write!" %(logFileName))
  189. @staticmethod
  190. def displayReport(report, elapsTime=0):
  191. report[0].display(report[1], elapsTime)
  192. def runSuiteP(self, name, cluster, suite):
  193. engine = name
  194. logName = name
  195. if 'setup' in suite.getSuiteName():
  196. logName ='setup_'+name
  197. name = name + ' (setup)'
  198. report = self.buildLogging(logName)
  199. self.taskParam = []
  200. self.taskParam = [{'taskId':0, 'jobName':'', 'timeoutValue':0, 'retryCount': 0} for i in range(self.maxthreads)]
  201. self.goodStates = ('compiling', 'blocked')
  202. logger.debug("runSuiteP(name:'%s', suite:'%s')" % (name, suite.getSuiteName()))
  203. logger.warn("Suite: %s ", name)
  204. logger.warn("Queries: %s" % repr(len(suite.getSuite())))
  205. logger.warn('%s','' , extra={'filebuffer':True, 'filesort':True})
  206. cnt = 0
  207. oldCnt = -1
  208. suite.setStarTime(time.time())
  209. self.suiteItems = suite.getSuite()
  210. exc=None
  211. try:
  212. self.StartTimeoutThread()
  213. while cnt in range(self.maxtasks):
  214. if oldCnt != cnt:
  215. query = self.suiteItems[cnt]
  216. query.setTaskId(cnt+1)
  217. query.setIgnoreResult(self.args.ignoreResult)
  218. query.setJobname(time.strftime("%y%m%d-%H%M%S"))
  219. timeout = query.getTimeout()
  220. logger.debug("%3d. Query timeout:%d", -1, timeout)
  221. oldCnt = cnt
  222. started = False
  223. for threadId in range(self.maxthreads):
  224. for startThreadId in range(self.maxthreads):
  225. if not self.exitmutexes[startThreadId].locked():
  226. # Start a new test case with a reused thread id
  227. self.taskParam[startThreadId]['taskId']=cnt
  228. cnt += 1
  229. if timeout != 0:
  230. self.timeouts[startThreadId] = timeout
  231. else:
  232. self.timeouts[startThreadId] = self.timeout
  233. timeout = self.timeout
  234. self.taskParam[startThreadId]['timeoutValue'] = timeout
  235. query = self.suiteItems[self.taskParam[startThreadId]['taskId']]
  236. logger.debug("self.timeout:%d, self.timeouts[thread:%d]:%d", self.timeout, startThreadId, self.timeouts[startThreadId])
  237. query.setTimeout(timeout)
  238. self.taskParam[startThreadId]['jobName'] = query.getJobname()
  239. self.taskParam[startThreadId]['retryCount'] = int(self.config.maxAttemptCount)
  240. self.exitmutexes[startThreadId].acquire()
  241. _thread.start_new_thread(self.runQuery, (engine, cluster, query, report, cnt, suite.testPublish(query.ecl), startThreadId))
  242. started = True
  243. break
  244. if started:
  245. break
  246. if self.exitmutexes[threadId].locked():
  247. if self.timeouts[threadId] % 10 == 0:
  248. self.loggermutex.acquire()
  249. logger.debug("%3d. timeout counter:%d" % (self.taskParam[threadId]['taskId']+1, self.timeouts[threadId]), extra={'taskId':self.taskParam[threadId]['taskId']+1})
  250. self.loggermutex.release()
  251. if self.timeouts[threadId] == 0:
  252. # time out occured
  253. wuid = queryWuid(self.taskParam[threadId]['jobName'], self.taskParam[threadId]['taskId']+1)
  254. if ("Not found" in wuid['wuid'] ) or (wuid['state'] in self.goodStates):
  255. #Possible blocked, give it more time if it is possible
  256. self.taskParam[threadId]['retryCount'] -= 1;
  257. if self.taskParam[threadId]['retryCount'] > 0:
  258. self.timeouts[threadId] = self.taskParam[threadId]['timeoutValue']
  259. self.loggermutex.acquire()
  260. logger.warn("%3d. Has not started yet. Reset due to timeout after %d sec." % (self.taskParam[threadId]['taskId']+1, self.taskParam[threadId]['timeoutValue']), extra={'taskId':self.taskParam[threadId]['taskId']+1})
  261. logger.debug("%3d. Task parameters: thread id: %d, ecl:'%s',state:'%s', retry count:%d." % (self.taskParam[threadId]['taskId']+1, threadId, self.suiteItems[self.taskParam[threadId]['taskId']].ecl, wuid['state'], self.taskParam[threadId]['retryCount'] ), extra={'taskId':self.taskParam[threadId]['taskId']+1})
  262. self.loggermutex.release()
  263. else:
  264. # retry counter exhausted, give up and abort this test case if exists
  265. if 'W' in wuid['wuid']:
  266. abortWorkunit(wuid['wuid'], self.taskParam[threadId]['taskId'], engine)
  267. self.loggermutex.acquire()
  268. query = self.suiteItems[self.taskParam[threadId]['taskId']]
  269. query.setAborReason('Timeout and retry count exhausted!')
  270. logger.info("%3d. Timeout occured and no more attempt left. Force to abort... " % (self.taskParam[threadId]['taskId']), extra={'taskId':self.taskParam[threadId]['taskId']+1})
  271. logger.debug("%3d. Task parameters: thread id:%d, wuid:'%s', state:'%s', ecl:'%s'." % (self.taskParam[threadId]['taskId']+1, threadId, wuid['wuid'], wuid['state'], query.ecl), extra={'taskId':self.taskParam[threadId]['taskId']+1})
  272. self.loggermutex.release()
  273. else:
  274. self.exitmutexes[threadId].release()
  275. self.loggermutex.acquire()
  276. query = self.suiteItems[self.taskParam[threadId]['taskId']]
  277. query.setAborReason('Timeout (has not started yet and retry count exhausted)')
  278. logger.info("%3d. Timeout occured and no more attempt left. Force to abort... " % (self.taskParam[threadId]['taskId']), extra={'taskId':self.taskParam[threadId]['taskId']+1})
  279. logger.debug("%3d. Task parameters: thread id:%d, wuid:'%s', state:'%s', ecl:'%s'." % (self.taskParam[threadId]['taskId']+1, threadId, wuid['wuid'], wuid['state'], query.ecl), extra={'taskId':self.taskParam[threadId]['taskId']+1})
  280. self.loggermutex.release()
  281. self.timeouts[threadId] = -1
  282. elif wuid['state'] =='completed':
  283. # It is done in HPCC System but need some more time to complete
  284. self.timeouts[threadId] = 5 # sec extra time to finish
  285. self.loggermutex.acquire()
  286. logger.info("%3d. It is completed in HPCC Sytem, but not finised yet. Give it %d sec." % (self.taskParam[threadId]['taskId']+1, self.taskParam[threadId]['timeoutValue']), extra={'taskId':self.taskParam[threadId]['taskId']+1})
  287. logger.debug("%3d. Task parameters: thread id: %d, ecl:'%s',state:'%s'." % (self.taskParam[threadId]['taskId']+1, threadId, self.suiteItems[self.taskParam[threadId]['taskId']].ecl, wuid['state']), extra={'taskId':self.taskParam[threadId]['taskId']+1})
  288. self.loggermutex.release()
  289. else:
  290. # Something wrong with this test case, abort it.
  291. abortWorkunit(wuid['wuid'], self.taskParam[threadId]['taskId']+1, engine)
  292. self.loggermutex.acquire()
  293. query = self.suiteItems[self.taskParam[threadId]['taskId']]
  294. query.setAborReason('Timeout')
  295. logger.info("%3d. Timeout occured. Force to abort... " % (self.taskParam[threadId]['taskId']+1), extra={'taskId':self.taskParam[threadId]['taskId']+1})
  296. logger.debug("%3d. Task parameters: thread id:%d, wuid:'%s', state:'%s', ecl:'%s'." % (self.taskParam[threadId]['taskId']+1, threadId, wuid['wuid'], wuid['state'], query.ecl), extra={'taskId':self.taskParam[threadId]['taskId']+1})
  297. self.loggermutex.release()
  298. self.timeouts[threadId] = -1
  299. # give some time to other threads
  300. if not started:
  301. time.sleep(0.2)
  302. # All tasks are scheduled
  303. except Exception as e:
  304. exc = e
  305. pass
  306. except KeyboardInterrupt as e:
  307. logger.debug("%3d. Keyboard interrupt in %s." % (-1, getCodeInfo(inspect.currentframe()) ))
  308. logger.warning(repr(e))
  309. exc = e
  310. pass
  311. finally:
  312. #Some of them finished, others are not yet, but should check the still running tasks' timeout and retry state
  313. for threadId in range(self.maxthreads):
  314. if self.exitmutexes[threadId].locked():
  315. query = self.suiteItems[self.taskParam[threadId]['taskId']]
  316. if exc != None:
  317. logger.warning("Thread :%d is locked for %s, terminate it." % (threadId, query.ecl))
  318. # Emulatae timeout to force abort
  319. self.timeouts[threadId] = 0
  320. self.retryCount = 0
  321. else:
  322. self.retryCount = int(self.config.maxAttemptCount)
  323. self.CheckTimeout(self.taskParam[threadId]['taskId']+1, threadId, query)
  324. self.StopTimeoutThread()
  325. logger.warn('%s','' , extra={'filebuffer':True, 'filesort':True})
  326. suite.setEndTime(time.time())
  327. Regression.displayReport(report, suite.getElapsTime())
  328. suite.close()
  329. self.closeLogging()
  330. if exc != None:
  331. logger.debug(str(exc)+"(line: "+str(inspect.stack()[0][2])+")")
  332. raise(exc)
  333. def StartTimeoutThread(self):
  334. self.timeoutThread.cancel()
  335. self.timeoutThread = threading.Timer(1.0, self.timeoutHandler)
  336. self.timeoutHandlerEnabled=True
  337. self.timeoutThread.start()
  338. def CheckTimeout(self, cnt, threadId, query):
  339. defSleepTime=0.1
  340. while self.exitmutexes[threadId].locked():
  341. sleepTime = defSleepTime
  342. if self.timeouts[threadId] >= 0:
  343. self.loggermutex.acquire()
  344. logger.debug("%3d. timeout counter:%d (%d)" % (cnt, self.timeouts[threadId], self.retryCount), extra={'taskId':cnt})
  345. self.loggermutex.release()
  346. sleepTime = defSleepTime
  347. if self.timeouts[threadId] == 0:
  348. wuid = queryWuid(query.getJobname(), query.getTaskId())
  349. self.retryCount -= 1;
  350. if self.retryCount> 0:
  351. self.timeouts[threadId] = query.getTimeout()
  352. self.loggermutex.acquire()
  353. logger.warn("%3d. %s has not started yet. Reset due to timeout after %d sec (%d retry attempt(s) remain)." % (cnt, query.ecl, self.timeouts[threadId], self.retryCount), extra={'taskId':cnt})
  354. logger.debug("%3d. Task parameters: thread id: %d, ecl:'%s',state:'%s', retry count:%d." % (cnt, threadId, query.ecl, wuid['state'], self.retryCount), extra={'taskId':cnt})
  355. self.loggermutex.release()
  356. else:
  357. # retry counter exhausted, give up and abort this test case if exists
  358. logger.debug("%3d. Abort %s WUID:'%s'" % (cnt, query.ecl, str(wuid)), extra={'taskId':cnt})
  359. query.setAborReason('Timeout and retry count exhausted!')
  360. abortWorkunit(wuid['wuid'], cnt, self.args.engine)
  361. self.loggermutex.acquire()
  362. logger.error("%3d. Timeout occured for %s and no more attempt left. Force to abort... " % (cnt, query.ecl), extra={'taskId':cnt})
  363. logger.debug("%3d. Task parameters: wuid:'%s', state:'%s', ecl:'%s'." % (cnt, wuid['wuid'], wuid['state'], query.ecl), extra={'taskId':cnt})
  364. logger.debug("%3d. Waiting for abort..." % (cnt), extra={'taskId':cnt})
  365. self.loggermutex.release()
  366. self.timeouts[threadId] = -1
  367. sleepTime = 1.0
  368. time.sleep(sleepTime)
  369. def StopTimeoutThread(self):
  370. self.timeoutHandlerEnabled=False
  371. self.timeoutThread.cancel()
  372. time.sleep(2)
  373. def runSuite(self, engine, name, suite):
  374. if name == "setup":
  375. cluster = 'hthor'
  376. else:
  377. cluster = name
  378. logName = name
  379. if 'setup' in suite.getSuiteName():
  380. logName ='setup_'+name
  381. name = name + ' (setup)'
  382. report = self.buildLogging(logName)
  383. logger.debug("runSuite(name:'%s', suite:'%s')" % (name, suite.getSuiteName()))
  384. logger.warning("Suite: %s" % name)
  385. logger.warning("Queries: %s" % repr(len(suite.getSuite())))
  386. suite.setStarTime(time.time())
  387. cnt = 1
  388. th = 0
  389. self.maxthreads = 1
  390. self.suiteItems = suite.getSuite()
  391. self.taskParam = [{'taskId':0, 'jobName':'', 'timeoutValue':0, 'retryCount': 0}]
  392. try:
  393. self.StartTimeoutThread()
  394. for query in self.suiteItems:
  395. query.setJobname(time.strftime("%y%m%d-%H%M%S"))
  396. query.setTaskId(cnt)
  397. self.taskParam[th]['taskId']=cnt - 1
  398. self.taskParam[th]['jobName']=query.getJobname()
  399. query.setIgnoreResult(self.args.ignoreResult)
  400. #self.timeouts[th] = self.timeout
  401. timeout = query.getTimeout()
  402. if timeout != 0:
  403. self.timeouts[th] = timeout
  404. else:
  405. self.timeouts[th] = self.timeout
  406. self.retryCount = int(self.config.maxAttemptCount)
  407. query.setTimeout(self.timeouts[th])
  408. self.exitmutexes[th].acquire()
  409. _thread.start_new_thread(self.runQuery, (engine, cluster, query, report, cnt, suite.testPublish(query.ecl), th))
  410. time.sleep(0.1)
  411. self.CheckTimeout(cnt, th, query)
  412. cnt += 1
  413. self.StopTimeoutThread()
  414. suite.setEndTime(time.time())
  415. Regression.displayReport(report, suite.getElapsTime())
  416. suite.close()
  417. self.closeLogging()
  418. except Error as e:
  419. self.StopTimeoutThread()
  420. suite.close()
  421. raise(e)
  422. except Exception as e:
  423. self.StopTimeoutThread()
  424. suite.close()
  425. raise(e)
  426. except KeyboardInterrupt as e:
  427. logger.debug("%3d. Keyboard interrupt in %s." % (-1, getCodeInfo(inspect.currentframe()) ))
  428. logger.warning(repr(e))
  429. suite.close()
  430. raise(e)
  431. def runSuiteQ(self, engine, clusterName, eclfile):
  432. report = self.buildLogging(clusterName)
  433. logger.debug("runSuiteQ( clusterName:'%s', eclfile:'%s')", clusterName, eclfile.ecl, extra={'taskId':0})
  434. if clusterName == "setup":
  435. cluster = 'hthor'
  436. else:
  437. cluster = clusterName
  438. cnt = 1
  439. eclfile.setTaskId(cnt)
  440. eclfile.setIgnoreResult(self.args.ignoreResult)
  441. threadId = 0
  442. logger.warning("Target: %s" % clusterName)
  443. logger.warning("Queries: %s" % 1)
  444. start = time.time()
  445. try:
  446. self.StartTimeoutThread()
  447. eclfile.setJobname(time.strftime("%y%m%d-%H%M%S"))
  448. self.timeouts[threadId] = self.timeout
  449. timeout = eclfile.getTimeout()
  450. if timeout != 0:
  451. self.timeouts[threadId] = timeout
  452. else:
  453. self.timeouts[threadId] = self.timeout
  454. self.retryCount = int(self.config.maxAttemptCount)
  455. self.exitmutexes[threadId].acquire()
  456. _thread.start_new_thread(self.runQuery, (engine, cluster, eclfile, report, cnt, eclfile.testPublish(), threadId))
  457. time.sleep(0.1)
  458. self.CheckTimeout(cnt, threadId, eclfile)
  459. self.StopTimeoutThread()
  460. Regression.displayReport(report, time.time()-start)
  461. eclfile.close()
  462. self.closeLogging()
  463. except Exception as e:
  464. self.StopTimeoutThread()
  465. eclfile.close()
  466. raise(e)
  467. except KeyboardInterrupt as e:
  468. logger.debug("%3d. Keyboard interrupt in %s." % (-1, getCodeInfo(inspect.currentframe()) ))
  469. logger.warning(repr(e))
  470. eclfile.close()
  471. raise(e)
  472. def runQuery(self, engine, cluster, query, report, cnt=1, publish=False, th = 0):
  473. startTime = time.time()
  474. try:
  475. self.loggermutex.acquire()
  476. logger.debug("runQuery(engine: '%s', cluster: '%s', query: '%s', cnt: %d, publish: %s, thread id: %d" % ( engine, cluster, query.ecl, cnt, publish, th))
  477. logger.warning("%3d. Test: %s" % (cnt, query.getBaseEclRealName()), extra={'taskId':cnt})
  478. if 'createEclRunArg' in self.args and self.args.createEclRunArg:
  479. logger.warning("%3d. Cmd: %s %s/%s" % (cnt, getEclRunArgs(query, engine, cluster), query.dir_ec, query.getBaseEcl())),
  480. self.loggermutex.release()
  481. except Exception as e:
  482. printException(repr(e) + " runQuery()")
  483. res = 0
  484. wuid = None
  485. try:
  486. if ECLCC().makeArchive(query):
  487. if query.isEclccWarningChanged():
  488. logger.debug("Should check Eclcc Warning:'%s'" % (query.getEclccWarning()), extra={'taskId':cnt})
  489. res = False
  490. wuid = 'Not found'
  491. query.setWuid(wuid)
  492. query.diff = query.getEclccWarningChanges()
  493. report[0].addResult(query)
  494. else:
  495. eclCmd = ECLcmd()
  496. try:
  497. if publish:
  498. res = eclCmd.runCmd("publish", engine, cluster, query, report[0],
  499. server=self.config.espIp,
  500. username=self.config.username,
  501. password=self.config.password,
  502. retryCount=int(self.config.maxAttemptCount))
  503. else:
  504. res = eclCmd.runCmd("run", engine, cluster, query, report[0],
  505. server=self.config.espIp,
  506. username=self.config.username,
  507. password=self.config.password,
  508. retryCount=int(self.config.maxAttemptCount))
  509. except Error as e:
  510. logger.debug("Exception raised:'%s' (line: %s )" % ( str(e), str(inspect.stack()[0][2]) ), extra={'taskId':cnt})
  511. res = False
  512. wuid = 'Not found'
  513. query.setWuid(wuid)
  514. query.diff = query.getBaseEcl()+"\n\t"+str(e)
  515. report[0].addResult(query)
  516. pass
  517. if e.getErrorCode() == 9000:
  518. # No space left on device
  519. raise e
  520. except Exception as e:
  521. printException(repr(e) + " runQuery() end")
  522. try:
  523. printException(repr(e) + " Unexpected error() ")
  524. except Exception as e:
  525. printException(repr(e) + " runQuery() ")
  526. wuid = query.getWuid()
  527. if wuid == 'Not found':
  528. res = False
  529. wuid="No WUID"
  530. if query.testFail():
  531. logger.debug("Intentionally fails", extra={'taskId':cnt})
  532. if res == False:
  533. res = True
  534. logger.debug("CMD result: '%s', wuid:'%s'" % ( res, wuid), extra={'taskId':cnt})
  535. else:
  536. wuid="N/A"
  537. if query.testFail():
  538. res = True
  539. report[0].addResult(query)
  540. else:
  541. res = False
  542. report[0].addResult(query)
  543. if wuid and wuid.startswith("W"):
  544. if self.config.useSsl.lower() == 'true':
  545. url = "https://"
  546. else:
  547. url = "http://"
  548. url += self.config.espIp + ":" + self.config.espSocket
  549. url += "/?Widget=WUDetailsWidget&Wuid="
  550. url += wuid
  551. elif query.testFail():
  552. url = "N/A"
  553. res = True
  554. else:
  555. url = "N/A"
  556. res = False
  557. self.loggermutex.acquire()
  558. elapsTime = time.time()-startTime
  559. if res:
  560. logger.info("%3d. Pass %s - %s (%d sec)" % (cnt, query.getBaseEclRealName(), wuid, elapsTime), extra={'taskId':cnt})
  561. if query.testFail():
  562. logger.info("%3d. Intentionally fails" % (cnt), extra={'taskId':cnt})
  563. logger.info("%3d. URL %s" % (cnt,url))
  564. else:
  565. if not wuid or not wuid.startswith("W"):
  566. logger.error("%3d. Fail No WUID for %s (%d sec)" % (cnt, query.getBaseEclRealName(), elapsTime), extra={'taskId':cnt})
  567. else:
  568. logger.error("%3d. Fail %s - %s (%d sec)" % (cnt, query.getBaseEclRealName(), wuid, elapsTime), extra={'taskId':cnt})
  569. logger.error("%3d. URL %s" % (cnt,url), extra={'taskId':cnt})
  570. zapRes = createZAP(wuid, cnt)
  571. logger.error("%3d. Zipped Analysis Package: %s" % (cnt, zapRes), extra={'taskId':cnt})
  572. self.loggermutex.release()
  573. query.setElapsTime(elapsTime)
  574. self.exitmutexes[th].release()
  575. except Exception as e:
  576. printException(repr(e) + " runQuery()")
  577. logger.error("Unexpected error:'%s' (line: %s ) :%s " %( sys.exc_info()[0], str(inspect.stack()[0][2]), repr(e) ) , extra={'taskId':cnt})
  578. elapsTime = time.time()-startTime
  579. query.setElapsTime(elapsTime)
  580. self.exitmutexes[th].release()
  581. def getConfig(self):
  582. return self.config
  583. @staticmethod
  584. def getTaskId(self):
  585. return self.taskId