فهرست منبع

HPCC-10452 Add timeout handling to Regression Suite.

Add timeout handling for both suite and individual query run to avoid system
hang-up when a test case fails.

There are 3 levels of timeout handling:
- timeout value set up in Regression Suite config file (e.g.: regress.json)
- timeout value in command line parameter (override config value, both valid
  for all test cases)
- timeout value with //timeout tag in individual ECL file (override config
  and CLI value for current test case)

If a test case (workunit) blocked in execution pipeline (long compile, blocked
by server, etc) and timeout occurred, then RS reset the timeout value.
To avoid endless loop there is a maxAttemptCount in regress.json file.

Add more exception handling and logs.

Improve debug logging.

Signed-off-by: Attila Vamos <attila.vamos@gmail.com>
Attila Vamos 11 سال پیش
والد
کامیت
0dddc6ed71

+ 30 - 0
testing/regress/README.rst

@@ -14,6 +14,7 @@ Result:
 |
 |       usage: regress [-h] [--version] [--config [CONFIG]]
 |                       [--loglevel [{info,debug}]] [--suiteDir [SUITEDIR]]
+|                       [--timeout [TIMEOUT]]
 |                       {list,run,query} ...
 | 
 |       HPCC Platform Regression suite
@@ -32,6 +33,8 @@ Result:
 |                                  Set the log level.
 |            --suiteDir [SUITEDIR], -s [SUITEDIR]
 |                               suiteDir to use. Default value is the current directory and it can handle relative path.
+|            --timeout [TIMEOUT], -t [TIMEOUT]
+|                               timeout for query execution.
 
 	
 Steps to run Regression Suite
@@ -266,4 +269,31 @@ To skip (similar to exclusion)
 To build and publish testcase (e.g.:for libraries)
 //publish
 
+To set individual timeout for test case
+//timeout <timeout value in sec>
+
+7. Configuration setting in regress.json file:
+----------------------------------------------
+
+        "ip": "127.0.0.1",                              - ECl server address
+        "username": "regress",                          - Regression Suite dedicated username and pasword
+        "password": "regress",
+        "roxie": "127.0.0.1:9876",                      - Roxie server addres (not used)
+        "server": "127.0.0.1:8010",                     - EclWatch service server address
+        "suiteDir": "",                                 - default suite directory location - ""-> current directory
+        "eclDir": "ecl",                                - ECL test cases directory source
+        "setupDir": "ecl/setup",                        - ECL setup source directory
+        "keyDir": "ecl/key",                            - XML key files directory to check testcases result
+        "archiveDir": "archives",                       - Archive directory path for testcases generated XML results
+        "resultDir": "results",                         - Current testcases generated XML results
+        "regressionDir": "~/HPCCSystems-regression",    - Regression suite work and log file directory (in user private space)
+        "logDir": "~/HPCCSystems-regression/log",       - Regression suite run log directory
+        "Clusters": [                                   - List of known clusters name
+            "hthor",
+            "thor",
+            "roxie"
+        ],
+        "timeout":"600",                                - Default test case timeout in sec. Can be override by command line parameter or //timeout tag in ECL file
+        "maxAttemptCount":"3"                           - Max retry count to reset timeout if a testcase in any early stage (compiled, blocked) of execution pipeline.
+
 **Important! Actually regression suite compares the test case result with xml files stored in testing/regression/ecl/key independently from the cluster.**

+ 1 - 1
testing/regress/hpcc/common/config.py

@@ -60,7 +60,7 @@ class Config:
             rC = namedtuple("Regress", js.keys())
             return _dict(getattr(rC(**js), "Regress"))
         except IOError as e:
-            print(e)
+            raise(e)
 
     #implement writing out of a config.
     def writeConfig(self, file):

+ 39 - 3
testing/regress/hpcc/common/logger.py

@@ -21,6 +21,8 @@ import logging
 import sys
 import time
 
+#from ..regression.regress import Regression
+
 try:
     import curses
 except:
@@ -82,6 +84,8 @@ class Logger(object):
         isBuffer = False
         logBuffer=[]
         toSort = False
+        taskId = 0
+        taskIds = {}
 
         def close(self):
             if len(self.logBuffer):
@@ -96,26 +100,57 @@ class Logger(object):
             self.flush()
             self.stream.close()
 
+        def addTaskId(self,  taskId,  threadId,  timestamp):
+            if not self.taskIds.has_key(threadId):
+                self.taskIds[threadId] ={'taskId':taskId, 'timestamp':'timestamp'}
+            elif self.taskIds[threadId]['timestamp'] != timestamp:
+                self.taskIds[threadId] ={'taskId':taskId, 'timestamp':'timestamp'}
+
+
+        def getTaskId(self, threadId):
+            record = self.taskIds.get(threadId,  {'taskId':0, 'timestamp':'-'})
+            return record['taskId']
+
+
         def emit(self, record):
             try:
                 msg = self.format(record)
                 stream = self.stream
                 isBuffer = hasattr(record, 'filebuffer')
                 toSort = hasattr(record,  'filesort')
+                taskId = 0
+                if hasattr(record, 'taskId'):
+                    taskId = getattr(record,  'taskId')
+                    self.addTaskId(taskId,  record.thread,  record.asctime)
+                else:
+                    if record.threadName == "MainThread":
+                        taskId = 0
+                    else:
+                        taskId = self.getTaskId(record.thread)
+                if record.levelname == 'DEBUG':
+                    msg +=" [asctime:"+record.asctime+", process:"+str(record.process)+", processName:"+record.processName+", thread:"+str(record.thread)+", threadName:"+record.threadName+"]"
+                    msg = "{0:3d}".format(taskId) + ". Debug-[debug-"+record.asctime+"]: "+msg
+                if record.levelname == 'CRITICAL':
+                    msg += " [level: "+record.levelname+" ]"
+                    msg = "{0:3d}".format(taskId) +". " + msg
                 if toSort:
                     self.toSort = True
                 if isBuffer:
-                    #toggle buffer switcch
+                    #toggle buffer switch
                     self.isBuffer = not self.isBuffer
                 if self.isBuffer or isBuffer:
                     if len(msg):
-                         self.logBuffer.append(msg.replace(". Test:",". Case:"))
+                        msg.replace(". Test:",". Case:")
+                        msg = msg[0:4]+'-'+record.asctime+'-'+msg[4:]
+                        self.logBuffer.append(msg)
                 else:
                     if len(self.logBuffer):
                         if self.toSort:
                             self.logBuffer.sort()
                         for item in self.logBuffer:
+                            item = item[0:4] +item[23:]
                             item = item.replace(". Case:",". Test:")
+                            item = item.replace(". Debug-", ".  ")
                             stream.write(item)
                             stream.write(self.terminator)
                         self.logBuffer = []
@@ -126,7 +161,7 @@ class Logger(object):
                     self.flush()
             except (KeyboardInterrupt, SystemExit):
                 raise
-            except:
+            except Exception as ex:
                 self.handleError(record)
 
     def addHandler(self, fd, level='info'):
@@ -155,3 +190,4 @@ class Logger(object):
     def __init__(self, level='info'):
         self.setLevel(level)
         self.enable_pretty_logging()
+        self.taskId = 0;

+ 240 - 27
testing/regress/hpcc/regression/regress.py

@@ -22,6 +22,7 @@ import os
 import sys
 import time
 import thread
+import threading
 
 from ..common.config import Config
 from ..common.error import Error
@@ -31,13 +32,31 @@ from ..regression.suite import Suite
 from ..util.ecl.cc import ECLCC
 from ..util.ecl.command import ECLcmd
 from ..util.expandcheck import ExpandCheck
+from ..util.util import setConfig,  queryWuid,  abortWorkunit, getVersionNumbers
 
 
 class Regression:
-    def __init__(self, config="regress.json", level='info', suiteDir=None,  numOfThreads=1):
+
+    def timeoutHandler(self):
+        for th in range(self.maxthreads):
+            if self.timeouts[th] > 0:
+                self.timeouts[th] -= 1
+
+        if self.timeoutHandlerEnabled:
+            self.timeoutThread = threading.Timer(1.0,  self.timeoutHandler)
+            self.timeoutThread.start()
+
+    def __init__(self, config="regress.json", level='info', suiteDir=None,  timeout=-1,  numOfThreads=1):
         self.config = Config(config).configObj
+        setConfig(self.config)
         self.suites = {}
         self.log = Logger(level)
+        if timeout == '-1':
+            self.timeout = int(self.config.timeout);
+        else:
+            self.timeout = int(timeout)
+        if numOfThreads == 0:
+            numOfThreads = 1;
         if not suiteDir:
             self.suiteDir = self.config.suiteDir
             if not self.suiteDir:
@@ -63,17 +82,29 @@ class Regression:
         logging.debug("Archive Dir    : %s", self.dir_a)
 
         self.loggermutex = thread.allocate_lock()
+        self.numOfCpus = 2
+        ver = getVersionNumbers()
         if numOfThreads == -1:
-            self.numOfCpus = 1
-            if 'linux' in sys.platform :
-                command = 'grep cores /proc/cpuinfo | sort -u'
-                cpuInfo = os.popen(command).read()
-                self.numOfCpus = int(cpuInfo.split()[3])
+            if (ver['main'] >= 2) and (ver['minor'] >= 7):
+                if 'linux' in sys.platform :
+                    command = 'grep cores /proc/cpuinfo | sort -u'
+                    cpuInfo = os.popen(command).read()
+                    if cpuInfo == "":
+                        self.numOfCpus = 1
+                    else:
+                        self.numOfCpus = int(cpuInfo.split()[3])
             numOfThreads = self.numOfCpus  * 2
+        else:
+            if (ver['main'] <= 2) and (ver['minor'] < 7):
+                numOfThreads = self.numOfCpus  * 2
+        logging.debug("Number of CPUs:%d, NUmber of threads:%d", self.numOfCpus, numOfThreads  )
 
         self.maxthreads = numOfThreads
         self.maxtasks = 0
         self.exitmutexes = [thread.allocate_lock() for i in range(self.maxthreads)]
+        self.timeouts = [(-1) for i in range(self.maxthreads)]
+        self.timeoutHandlerEnabled = False;
+        self.timeoutThread = threading.Timer(1.0,  self.timeoutHandler)
 
     def setLogLevel(self, level):
         self.log.setLevel(level)
@@ -115,35 +146,157 @@ class Regression:
         report[0].display(report[1],  elapsTime)
 
     def runSuiteP(self, name, suite):
+        print "runSuiteP"
         report = self.buildLogging(name)
         if name == "setup":
             cluster = 'hthor'
         else:
             cluster = name
 
+        self.taskParam = []
+        self.taskParam = [{'taskId':0,  'jobName':'',  'timeoutValue':0,  'retryCount': 0} for i in range(self.maxthreads)]
+        self.goodStates = ('compiling', 'blocked')
+
         logging.warn("Suite: %s ",  name)
         logging.warn("Queries: %s" % repr(len(suite.getSuite())))
         logging.warn('%s','' , extra={'filebuffer':True,  'filesort':True})
         cnt = 0
+        oldCnt = -1
         suite.setStarTime(time.time())
         suiteItems = suite.getSuite()
-        while cnt in range(self.maxtasks):
-            query = suiteItems[cnt]
-            for th in range(self.maxthreads):
-                if not self.exitmutexes[th].locked():
-                    cnt += 1
-                    thread.start_new_thread(self.runQuery, (cluster, query, report, cnt, suite.testPublish(query.ecl),  th))
-                    break
-            time.sleep(0.1)
+        try:
+            self.StartTimeoutThread()
+            while cnt in range(self.maxtasks):
+                if oldCnt != cnt:
+                    query = suiteItems[cnt]
+                    query.setTaskId(cnt+1)
+                    query.setJobname(time.strftime("%y%m%d-%H%M%S"))
+                    timeout = query.getTimeout()
+                    oldCnt = cnt
 
-        for mutex in self.exitmutexes:
-            while mutex.locked(): pass
+                for threadId in range(self.maxthreads):
+                    if not self.exitmutexes[threadId].locked():
+                        # Start a new test case with a reused thread id
+                        self.taskParam[threadId]['taskId']=cnt
+                        cnt += 1
+                        if timeout != -1:
+                            self.timeouts[threadId] = timeout
+                        else:
+                            self.timeouts[threadId] = self.timeout
 
-        logging.warn('%s','' , extra={'filebuffer':True,  'filesort':True})
-        suite.setEndTime(time.time())
-        Regression.displayReport(report, suite.getelapsTime())
+                        self.taskParam[threadId]['timeoutValue'] = self.timeouts[threadId]
+                        query = suiteItems[self.taskParam[threadId]['taskId']]
+                        #logging.debug("self.timeout[%d]:%d", threadId, self.timeouts[threadId])
+                        sysThreadId = thread.start_new_thread(self.runQuery, (cluster, query, report, cnt, suite.testPublish(query.ecl),  threadId))
+                        time.sleep(0.4)
+                        self.taskParam[threadId]['jobName'] = query.getJobname()
+                        self.taskParam[threadId]['retryCount'] = int(self.config.maxAttemptCount)
+                        break
+                    else:
+                        if self.timeouts[threadId] % 10 == 0:
+                            self.loggermutex.acquire()
+                            logging.debug("%3d. timeout counter:%d" % (self.taskParam[threadId]['taskId']+1, self.timeouts[threadId]))
+                            self.loggermutex.release()
+                        if self.timeouts[threadId] == 0:
+                            # time out occured
+                            wuid =  queryWuid(self.taskParam[threadId]['jobName'])
+                            if ("Not found" in wuid['wuid'] ) or (wuid['state'] in self.goodStates):
+                                #Possible blocked, give it more time if it is possible
+                                self.taskParam[threadId]['retryCount'] -= 1;
+                                if self.taskParam[threadId]['retryCount'] > 0:
+                                    self.timeouts[threadId] =  self.taskParam[threadId]['timeoutValue']
+                                    self.loggermutex.acquire()
+                                    logging.info("%3d. Does not started yet. Reset timeout to %d sec." % (self.taskParam[threadId]['taskId']+1, self.taskParam[threadId]['timeoutValue']))
+                                    logging.debug("%3d. Task parameters: thread id: %d, ecl:'%s',state:'%s', retry count:%d." % (self.taskParam[threadId]['taskId']+1, threadId,  suiteItems[self.taskParam[threadId]['taskId']].ecl,   wuid['state'],  self.taskParam[threadId]['retryCount'] ),  extra={'taskId':self.taskParam[threadId]['taskId']+1})
+                                    self.loggermutex.release()
+                                else:
+                                    # retry counter exhausted, give up and abort this test case if exists
+                                    if 'W' in wuid['wuid']:
+                                        abortWorkunit(wuid['wuid'])
+                                        self.loggermutex.acquire()
+                                        query = suiteItems[self.taskParam[threadId]['taskId']]
+                                        query.setAborReason('Timeout and retry count exhausted!')
+                                        logging.info("%3d. Timeout occured and no more attempt. Force to abort... " % (self.taskParam[threadId]['taskId']))
+                                        logging.debug("%3d. Task parameters: thread id:%d, wuid:'%s', state:'%s', ecl:'%s'." % (self.taskParam[threadId]['taskId']+1, threadId, wuid['wuid'], wuid['state'],  query.ecl),  extra={'taskId':self.taskParam[threadId]['taskId']+1})
+                                        self.loggermutex.release()
+                                    else:
+                                        self.exitmutexes[threadId].release()
+                                        self.loggermutex.acquire()
+                                        query = suiteItems[self.taskParam[threadId]['taskId']]
+                                        query.setAborReason('Timeout (does not started yet and retry count exhausted)')
+                                        logging.info("%3d. Timeout occured and no more attempt. Force to abort... " % (self.taskParam[threadId]['taskId']))
+                                        logging.debug("%3d. Task parameters: thread id:%d, wuid:'%s', state:'%s', ecl:'%s'." % (self.taskParam[threadId]['taskId']+1, threadId, wuid['wuid'], wuid['state'],  query.ecl),  extra={'taskId':self.taskParam[threadId]['taskId']+1})
+                                        self.loggermutex.release()
+
+                                    self.timeouts[threadId] = -1
+
+
+                            elif   wuid['state'] =='completed':
+                                # It is done in HPCC System but need some more time to complete
+                                self.timeouts[threadId] =  5 # sec extra time to finish
+                                self.loggermutex.acquire()
+                                logging.info("%3d. It is completed in HPCC Sytem, but not finised yet. Give it %d sec." % (self.taskParam[threadId]['taskId']+1, self.taskParam[threadId]['timeoutValue']))
+                                logging.debug("%3d. Task parameters: thread id: %d, ecl:'%s',state:'%s'." % (self.taskParam[threadId]['taskId']+1, threadId,  suiteItems[self.taskParam[threadId]['taskId']].ecl, wuid['state']),  extra={'taskId':self.taskParam[threadId]['taskId']+1})
+                                self.loggermutex.release()
+                            else:
+                                # Something wrong with this test case, abort it.
+                                abortWorkunit(wuid['wuid'])
+                                self.loggermutex.acquire()
+                                query = suiteItems[self.taskParam[threadId]['taskId']]
+                                query.setAborReason('Timeout')
+                                logging.info("%3d. Timeout occured. Force to abort... " % (self.taskParam[threadId]['taskId']))
+                                logging.debug("%3d. Task parameters: thread id:%d, wuid:'%s', state:'%s', ecl:'%s'." % (self.taskParam[threadId]['taskId']+1, threadId, wuid['wuid'], wuid['state'],  query.ecl),  extra={'taskId':self.taskParam[threadId]['taskId']+1})
+                                self.loggermutex.release()
+                                self.timeouts[threadId] = -1
+
+                # give some time to other threads
+                time.sleep(0.4)
+
+            for mutex in self.exitmutexes:
+                while mutex.locked(): pass
+
+            self.StopTimeoutThread()
+            logging.warn('%s','' , extra={'filebuffer':True,  'filesort':True})
+            suite.setEndTime(time.time())
+            Regression.displayReport(report, suite.getelapsTime())
+
+        except Exception as e:
+            self.StopTimeoutThread()
+            raise(e)
+
+
+    def StartTimeoutThread(self):
+        self.timeoutThread.cancel()
+        self.timeoutThread = threading.Timer(1.0,  self.timeoutHandler)
+        self.timeoutHandlerEnabled=True
+        self.timeoutThread.start()
+
+    def CheckTimeout(self, cnt,  thread,  query):
+        while  self.exitmutexes[thread].locked():
+            if self.timeouts[thread] >= 0:
+                self.loggermutex.acquire()
+                logging.debug("%3d. timeout counter:%d" % (cnt, self.timeouts[thread]))
+                self.loggermutex.release()
+                sleepTime = 1.0
+            if self.timeouts[thread] == 0:
+                # time out, abort tast
+                wuid =  queryWuid(query.getJobname())
+                abortWorkunit(wuid['wuid'])
+                query.setAborReason('Timeout')
+                self.loggermutex.acquire()
+                logging.debug("%3d. Waiting for abort..." % (cnt))
+                self.loggermutex.release()
+                self.timeouts[thread] = -1
+                sleepTime = 1.0
+            time.sleep(sleepTime)
+
+    def StopTimeoutThread(self):
+        self.timeoutHandlerEnabled=False
+        self.timeoutThread.cancel()
+        time.sleep(2)
 
     def runSuite(self, name, suite):
+        print "runSuite"
         report = self.buildLogging(name)
         if name == "setup":
             cluster = 'hthor'
@@ -154,18 +307,71 @@ class Regression:
         logging.warn("Queries: %s" % repr(len(suite.getSuite())))
         suite.setStarTime(time.time())
         cnt = 1
-        for query in suite.getSuite():
-            self.runQuery(cluster, query, report, cnt, suite.testPublish(query.ecl))
-            cnt += 1
-        suite.setEndTime(time.time())
-        Regression.displayReport(report, suite.getelapsTime())
+        th = 0
+        try:
+            self.StartTimeoutThread()
+            for query in suite.getSuite():
+                query.setJobname(time.strftime("%y%m%d-%H%M%S"))
+                self.timeouts[th] = self.timeout
+                timeout = query.getTimeout()
+                if timeout != -1:
+                   self.timeouts[th] = timeout
+
+                thread.start_new_thread(self.runQuery, (cluster, query, report, cnt, suite.testPublish(query.ecl),  th))
+                time.sleep(0.1)
+                self.CheckTimeout(cnt, th,  query)
+                cnt += 1
+
+            self.StopTimeoutThread()
+
+            suite.setEndTime(time.time())
+            Regression.displayReport(report, suite.getelapsTime())
+
+        except Exception as e:
+            self.StopTimeoutThread()
+            raise(e)
+
+
+    def runSuiteQ(self, clusterName, eclfile):
+        #print "runSuiteQ"
+        report = self.buildLogging(clusterName)
+        logging.debug("runSuiteQ( clusterName:'%s', eclfile:'%s')",  clusterName,  eclfile.ecl,  extra={'taskId':0})
+
+        if clusterName == "setup":
+            cluster = 'hthor'
+        else:
+            cluster = clusterName
+
+        cnt = 1
+        eclfile.setTaskId(cnt)
+        threadId = 0
+        logging.warn("Target: %s" % clusterName)
+        logging.warn("Queries: %s" % 1)
+        start = time.time()
+        try:
+            self.StartTimeoutThread()
+            eclfile.setJobname(time.strftime("%y%m%d-%H%M%S"))
+            self.timeouts[threadId] = self.timeout
+            timeout = eclfile.getTimeout()
+            if timeout != -1:
+               self.timeouts[threadId] = timeout
+            sysThreadId = thread.start_new_thread(self.runQuery, (cluster, eclfile, report, cnt, eclfile.testPublish(),  threadId))
+            time.sleep(0.1)
+            self.CheckTimeout(cnt, threadId,  eclfile)
+
+            self.StopTimeoutThread()
+            Regression.displayReport(report,  time.time()-start)
+        except Exception as e:
+            self.StopTimeoutThread()
+            raise(e)
 
     def runQuery(self, cluster, query, report, cnt=1, publish=False,  th = 0):
         startTime = time.time()
-        self.exitmutexes[th].acquire()
         self.loggermutex.acquire()
-        logging.debug("runQuery(cluster:", cluster, ", query:", query, ", report:", report, ", cnt:", cnt, ", publish:", publish, ")")
-        logging.warn("%3d. Test: %s" % (cnt, query.ecl))
+
+        self.exitmutexes[th].acquire()
+        logging.debug("runQuery(cluster:'%s', query:'%s', task id:%d, publish: %d, thread id:%d)", cluster, query.ecl,  cnt, publish,  th,  extra={'taskId':cnt})
+        logging.warn("%3d. Test: %s " % (cnt, query.ecl))
         ECLCC().makeArchive(query)
         eclCmd = ECLcmd()
         self.loggermutex.release()
@@ -201,3 +407,10 @@ class Regression:
         self.loggermutex.release()
         query.setElapsTime(elapsTime)
         self.exitmutexes[th].release()
+
+    def getConfig(self):
+        return self.config
+
+    @staticmethod
+    def getTaskId(self):
+        return self.taskId

+ 10 - 4
testing/regress/hpcc/util/ecl/command.py

@@ -22,7 +22,7 @@ import logging
 from ...common.shell import Shell
 from ...util.ecl.file import ECLFile
 from ...common.error import Error
-
+from ...util.util import queryWuid
 
 class ECLcmd(Shell):
     def __init__(self):
@@ -48,7 +48,8 @@ class ECLcmd(Shell):
             if server:
                 args.append('--server=' + server)
             if not name:
-                name = eclfile.ecl
+                #name = eclfile.ecl
+                name = eclfile.getJobname()
             if username:
                 args.append("--username=" + username)
             if password:
@@ -71,6 +72,8 @@ class ECLcmd(Shell):
                     wuid = i.split()[1]
                 if "state:" in i:
                     state = i.split()[1]
+                if "aborted" in i:
+                    state = "aborted"
                 if cnt > 4:
                     result += i + "\n"
                 cnt += 1
@@ -89,9 +92,12 @@ class ECLcmd(Shell):
                 else:
                     test = False
                     eclfile.diff = 'Error'
-                
             else:
-                test = eclfile.testResults()
+                if queryWuid(eclfile.getJobname())['state'] == 'aborted':
+                    eclfile.diff = eclfile.ecl+'\n\t'+'Aborted ( reason: '+eclfile.getAbortReason()+' )'
+                    test = False
+                else:
+                    test = eclfile.testResults()
             report.addResult(eclfile)
             if not test:
                 return False

+ 65 - 14
testing/regress/hpcc/util/ecl/file.py

@@ -20,7 +20,9 @@
 import difflib
 import logging
 import os
+import traceback
 
+from ...util.util import isPositiveIntNum
 
 class ECLFile:
     ecl = None
@@ -35,6 +37,9 @@ class ECLFile:
     diff = ''
     wuid = None
     elapsTime = 0
+    jobname = ''
+    abortReason = ''
+    taskId = -1;
 
     def __init__(self, ecl, dir_a, dir_ex, dir_r):
         self.dir_ec = os.path.dirname(ecl)
@@ -42,11 +47,13 @@ class ECLFile:
         self.dir_r = dir_r
         self.dir_a = dir_a
         baseEcl = os.path.basename(ecl)
-        baseXml = os.path.splitext(baseEcl)[0] + '.xml'
+        self.basename = os.path.splitext(baseEcl)[0]
+        baseXml = self.basename + '.xml'
         self.ecl = baseEcl
         self.xml_e = baseXml
         self.xml_r = baseXml
         self.xml_a = 'archive_' + baseXml
+        self.jobname = self.basename
 
     def getExpected(self):
         return os.path.join(self.dir_ex, self.xml_e)
@@ -105,30 +112,53 @@ class ECLFile:
         # Standard string has a problem with unicode characters
         # use byte arrays and binary file open instead
         tag = b'//no' + target.encode()
-        logging.debug("testExclusion (ecl:", self.ecl,", target:", target,", tag: ", tag, ")")
+        logging.debug("testExclusion (ecl:'%s', target: '%s', tag:'%s'", self.ecl, target, tag)
         eclText = open(self.getEcl(), 'rb')
+        retVal = False
         for line in eclText:
             if tag in line:
-                return True
-        return False
+                retVal = True
+                break
+        logging.debug("Exclude %s",  retVal)
+        return retVal
 
     def testPublish(self):
         # Standard string has a problem with unicode characters
         # use byte arrays and binary file open instead
         tag = b'//publish'
-        logging.debug("testPublish (ecl:", self.ecl,", tag: ", tag, ")")
+        logging.debug("%3d. testPublish (ecl:'%s', tag:'%s'", self.taskId, self.ecl,  tag)
         eclText = open(self.getEcl(), 'rb')
+        retVal = False
         for line in eclText:
             if tag in line:
-                return True
-        return False
+                retVal = True
+                break
 
+        logging.debug("%3d. Publish is %s",  self.taskId,  retVal)
+        return retVal
+
+    def getTimeout(self):
+        timeout = -1
+        # Standard string has a problem with unicode characters
+        # use byte arrays and binary file open instead
+        tag = b'//timeout'
+        logging.debug("%3d. getTimeout (ecl:'%s', tag:'%s')", self.taskId,  self.ecl, tag)
+        eclText = open(self.getEcl(), 'rb')
+        for line in eclText:
+            if tag in line:
+                timeoutParts = line.split()
+                if len(timeoutParts) == 2:
+                    if isPositiveIntNum(timeoutParts[1]):
+                        timeout = int(timeoutParts[1])
+                break
+        logging.debug("%3d. Timeout is :%d sec",  self.taskId,  timeout)
+        return timeout
 
     def testResults(self):
         d = difflib.Differ()
         try:
-            logging.debug("EXP: " + self.getExpected())
-            logging.debug("REC: " + self.getResults())
+            logging.debug("%3d. EXP: " + self.getExpected(),  self.taskId )
+            logging.debug("%3d. REC: " + self.getResults(),  self.taskId )
             if not os.path.isfile(self.getExpected()):
                 self.diff += "KEY FILE NOT FOUND. " + self.getExpected()
                 raise IOError("KEY FILE NOT FOUND. " + self.getExpected())
@@ -143,12 +173,33 @@ class ECLFile:
                                              tofile=self.xml_r):
                 self.diff += line
         except Exception as e:
-            logging.critical(e)
+            logging.critical( e, extra={'taskId':self.taskId})
+            logging.critical("%s",  traceback.format_exc().replace("\n","\n\t\t"),  extra={'taskId':self.taskId} )
+            logging.critical("EXP: %s",  self.getExpected(),  extra={'taskId':self.taskId})
+            logging.critical("REC: %s",  self.getResults(),  extra={'taskId':self.taskId})
+            return False
+        finally:
+            if not self.diff:
+                return True
             return False
-
-        if not self.diff:
-            return True
-        return False
 
     def setElapsTime(self,  time):
         self.elapsTime = time
+
+    def setJobname(self,  timestamp):
+        self.jobname = self.basename +"-"+timestamp
+
+    def getJobname(self):
+        return self.jobname
+
+    def setAborReason(self,  reason):
+        self.abortReason = reason
+
+    def getAbortReason(self):
+        return self.abortReason
+
+    def setTaskId(self, taskId):
+        self.taskId = taskId
+
+    def getTaskId(self):
+        return self.taskId

+ 1 - 1
testing/regress/hpcc/util/expandcheck.py

@@ -24,7 +24,7 @@ class ExpandCheck:
 
     @staticmethod
     def dir_exists(path, require=False):
-        logging.debug("dir_exists(path: %s, require: %d", path, require)
+        logging.debug("dir_exists(path: %s, require: %s)", path, require)
         if '~' in path:
             path = os.path.expanduser(path)
         else:

+ 46 - 0
testing/regress/hpcc/util/util.py

@@ -18,6 +18,7 @@
 '''
 
 import argparse
+import platform
 
 def isPositiveIntNum(string):
     for i in range(0,  len(string)):
@@ -34,3 +35,48 @@ def checkPqParam(string):
         raise argparse.ArgumentTypeError(msg)
 
     return value
+
+def getVersionNumbers():
+    version = platform.python_version_tuple()
+    verNum = {'main':0,  'minor':0,  'patch':0}
+    if isPositiveIntNum(version[0]):
+        verNum['main'] = int(version[0])
+    if isPositiveIntNum(version[1]):
+        verNum['minor'] = int(version[1])
+    if isPositiveIntNum(version[2]):
+        verNum['patch'] = int(version[2])
+    return(verNum);
+
+import json
+import urllib2
+
+gConfig = None
+
+def setConfig(config):
+    global gConfig
+    gConfig = config
+
+def queryWuid(jobname):
+    server = gConfig.server
+    host = "http://"+server+"/WsWorkunits/WUQuery.json?Jobname="+jobname
+    wuid="Not found"
+    try:
+        response_stream = urllib2.urlopen(host)
+        json_response = response_stream.read()
+        resp = json.loads(json_response)
+        if resp['WUQueryResponse']['NumWUs'] > 0:
+            wuid= resp['WUQueryResponse']['Workunits']['ECLWorkunit'][0]['Wuid']
+            state =resp['WUQueryResponse']['Workunits']['ECLWorkunit'][0]['State']
+        else:
+            state = jobname+' not found'
+    except KeyError as ke:
+        state = "Key error:"+ke.str()
+    except Exception as ex:
+        state = "Unable to query "+ ex.reason.strerror
+    return {'wuid':wuid, 'state':state}
+
+def abortWorkunit(wuid):
+    host = "http://"+gConfig.server+"/WsWorkunits/WUAbort?Wuids="+wuid
+    response_stream = urllib2.urlopen(host)
+    json_response = response_stream.read()
+    #print(json_response)

+ 22 - 16
testing/regress/regress

@@ -22,18 +22,23 @@
 import argparse
 import logging
 import os
+import platform
 import atexit
+import traceback
 
 from hpcc.regression.regress import Regression
 from hpcc.util.ecl.file import ECLFile
-from hpcc.util.util import checkPqParam
+from hpcc.util.util import checkPqParam,  getVersionNumbers
 
 if __name__ == "__main__":
+    ver = getVersionNumbers()
+    if (ver['main'] >= 2) and (ver['minor'] >= 7):
+        atexit.register(logging.shutdown)
     prog = "regress"
     description = 'HPCC Platform Regression suite'
     parser = argparse.ArgumentParser(prog=prog, description=description)
     parser.add_argument('--version', '-v', action='version',
-                        version='%(prog)s 0.0.2')
+                        version='%(prog)s 0.0.3')
     parser.add_argument('--config', help="Config file to use.",
                         nargs='?', default="regress.json")
     parser.add_argument('--loglevel', help="Set the log level.",
@@ -41,6 +46,8 @@ if __name__ == "__main__":
                         choices=['info', 'debug'])
     parser.add_argument('--suiteDir', '-s', help="suiteDir to use.",
                         nargs='?', default=".")
+    parser.add_argument('--timeout', '-t', help="timeout for query execution.",
+                        nargs='?', default="-1")
 
     subparsers = parser.add_subparsers(help='sub-command help')
     parser_list = subparsers.add_parser('list', help='list help')
@@ -51,7 +58,7 @@ if __name__ == "__main__":
     parser_run.add_argument('cluster', help="Run the cluster suite.",
                             nargs='?', type=str,  default='setup')
     parser_run.add_argument('--pq', help="Parallel query execution with threadNumber threads. ('-1' can be use to calculate usable thread count on a single node system)",
-                            type=checkPqParam,  default = 1,   metavar="threadNumber")
+                            type=checkPqParam,  default = 0,   metavar="threadNumber")
 
     parser_query = subparsers.add_parser('query', help='query help')
     parser_query.add_argument('query', help="Name of a single ECL query (mandatory).",
@@ -66,6 +73,10 @@ if __name__ == "__main__":
     suiteDir = ""
     if 'suiteDir' in args:
         suiteDir = args.suiteDir
+    timeout = -1
+    if 'timeout' in args:
+        timeout = args.timeout
+    regress = None
     try:
         if 'clusters' in args:
             Clusters = ['setup']
@@ -80,7 +91,7 @@ if __name__ == "__main__":
                 print "\nMissing ECL query file!\n"
                 parser_query.print_help()
                 exit()
-            regress = Regression(args.config, args.loglevel, suiteDir)
+            regress = Regression(args.config, args.loglevel, suiteDir,  timeout)
             ecl = os.path.join(regress.dir_ec, args.query)
             eclfile = ECLFile(ecl, regress.dir_a, regress.dir_ex,
                               regress.dir_r)
@@ -91,29 +102,19 @@ if __name__ == "__main__":
             else:
                 targetClusters.append(args.cluster)
             for cluster in targetClusters:
-                logging.warn("Target: %s" % cluster)
-                logging.warn("Queries: %s" % 1)
                 try:
                     if not eclfile.testSkip(cluster)['skip']:
                         if not eclfile.testExclusion(cluster):
-                            report = regress.buildLogging(args.query)
-                            if args.publish or eclfile.testPublish():
-                                regress.runQuery(cluster, eclfile, report, 1, True)
-                            else:
-                                regress.runQuery(cluster, eclfile, report)
-                            Regression.displayReport(report)
+                            regress.runSuiteQ(cluster, eclfile)
                         else:
-                            #print args.query+" excluded on "+cluster+" cluster."
                             logging.warn("%s. %s excluded on this cluster." % (1, args.query))
                     else:
-                        #print args.query+" skipped on "+cluster+" cluster."
                         logging.warn("%s. %s skipped on this cluster." % (1, args.query))
                 except IOError:
-                    #print "Query "+args.query+ " does not exist!"
                     logging.error("%s. Query %s does not exist!" % (1,  args.query))
                     exit()
         elif 'cluster' in args:
-            regress = Regression(args.config, args.loglevel, suiteDir,  args.pq)
+            regress = Regression(args.config, args.loglevel, suiteDir,  timeout,  args.pq)
             regress.bootstrap(args.cluster)
             if 'setup' in args.cluster:
                 regress.runSuite('setup', regress.setup)
@@ -124,5 +125,10 @@ if __name__ == "__main__":
                     regress.runSuite(args.cluster, regress.suites[args.cluster])
     except Exception as e:
         logging.critical(e)
+        logging.critical(traceback.format_exc())
+        print "Abend."
     except KeyboardInterrupt:
         logging.critical("Keyboard Interrupt Caught.")
+        regress.StopTimeoutThread()
+
+    exit()

+ 3 - 1
testing/regress/regress.json

@@ -17,6 +17,8 @@
             "hthor",
             "thor",
             "roxie"
-        ]
+        ],
+        "timeout":"600",
+        "maxAttemptCount":"3"
     }
 }