"""
lit - LLVM Integrated Tester.
See lit.pod for more information.
"""
import math, os, platform, random, re, sys, time, threading, traceback
import ProgressBar
import TestRunner
import Util
from TestingConfig import TestingConfig
import LitConfig
import Test
gConfigName = 'lit.cfg'
gSiteConfigName = 'lit.site.cfg'
kLocalConfigName = 'lit.local.cfg'
class TestingProgressDisplay:
def __init__(self, opts, numTests, progressBar=None):
self.opts = opts
self.numTests = numTests
self.current = None
self.lock = threading.Lock()
self.progressBar = progressBar
self.completed = 0
def update(self, test):
if self.opts.quiet and not test.result.isFailure:
self.completed += 1
return
self.lock.acquire()
try:
self.handleUpdate(test)
finally:
self.lock.release()
def finish(self):
if self.progressBar:
self.progressBar.clear()
elif self.opts.quiet:
pass
elif self.opts.succinct:
sys.stdout.write('\n')
def handleUpdate(self, test):
self.completed += 1
if self.progressBar:
self.progressBar.update(float(self.completed)/self.numTests,
test.getFullName())
if self.opts.succinct and not test.result.isFailure:
return
if self.progressBar:
self.progressBar.clear()
print '%s: %s (%d of %d)' % (test.result.name, test.getFullName(),
self.completed, self.numTests)
if test.result.isFailure and self.opts.showOutput:
print "%s TEST '%s' FAILED %s" % ('*'*20, test.getFullName(),
'*'*20)
print test.output
print "*" * 20
sys.stdout.flush()
class TestProvider:
def __init__(self, tests, maxTime):
self.maxTime = maxTime
self.iter = iter(tests)
self.lock = threading.Lock()
self.startTime = time.time()
def get(self):
if self.maxTime is not None:
if time.time() - self.startTime > self.maxTime:
return None
self.lock.acquire()
try:
item = self.iter.next()
except StopIteration:
item = None
self.lock.release()
return item
class Tester(threading.Thread):
def __init__(self, litConfig, provider, display):
threading.Thread.__init__(self)
self.litConfig = litConfig
self.provider = provider
self.display = display
def run(self):
while 1:
item = self.provider.get()
if item is None:
break
self.runTest(item)
def runTest(self, test):
result = None
startTime = time.time()
try:
result, output = test.config.test_format.execute(test,
self.litConfig)
except KeyboardInterrupt:
print '\nCtrl-C detected, goodbye.'
os.kill(0,9)
except:
if self.litConfig.debug:
raise
result = Test.UNRESOLVED
output = 'Exception during script execution:\n'
output += traceback.format_exc()
output += '\n'
elapsed = time.time() - startTime
test.setResult(result, output, elapsed)
self.display.update(test)
def dirContainsTestSuite(path):
cfgpath = os.path.join(path, gSiteConfigName)
if os.path.exists(cfgpath):
return cfgpath
cfgpath = os.path.join(path, gConfigName)
if os.path.exists(cfgpath):
return cfgpath
def getTestSuite(item, litConfig, cache):
"""getTestSuite(item, litConfig, cache) -> (suite, relative_path)
Find the test suite containing @arg item.
@retval (None, ...) - Indicates no test suite contains @arg item.
@retval (suite, relative_path) - The suite that @arg item is in, and its
relative path inside that suite.
"""
def search1(path):
cfgpath = dirContainsTestSuite(path)
if not cfgpath:
parent,base = os.path.split(path)
if parent == path:
return (None, ())
ts, relative = search(parent)
return (ts, relative + (base,))
if litConfig.debug:
litConfig.note('loading suite config %r' % cfgpath)
cfg = TestingConfig.frompath(cfgpath, None, litConfig, mustExist = True)
source_root = os.path.realpath(cfg.test_source_root or path)
exec_root = os.path.realpath(cfg.test_exec_root or path)
return Test.TestSuite(cfg.name, source_root, exec_root, cfg), ()
def search(path):
res = cache.get(path)
if res is None:
cache[path] = res = search1(path)
return res
item = os.path.realpath(item)
components = []
while not os.path.isdir(item):
parent,base = os.path.split(item)
if parent == item:
return (None, ())
components.append(base)
item = parent
components.reverse()
ts, relative = search(item)
return ts, tuple(relative + tuple(components))
def getLocalConfig(ts, path_in_suite, litConfig, cache):
def search1(path_in_suite):
if not path_in_suite:
parent = ts.config
else:
parent = search(path_in_suite[:-1])
source_path = ts.getSourcePath(path_in_suite)
cfgpath = os.path.join(source_path, kLocalConfigName)
if litConfig.debug:
litConfig.note('loading local config %r' % cfgpath)
return TestingConfig.frompath(cfgpath, parent, litConfig,
mustExist = False,
config = parent.clone(cfgpath))
def search(path_in_suite):
key = (ts, path_in_suite)
res = cache.get(key)
if res is None:
cache[key] = res = search1(path_in_suite)
return res
return search(path_in_suite)
def getTests(path, litConfig, testSuiteCache, localConfigCache):
ts,path_in_suite = getTestSuite(path, litConfig, testSuiteCache)
if ts is None:
litConfig.warning('unable to find test suite for %r' % path)
return (),()
if litConfig.debug:
litConfig.note('resolved input %r to %r::%r' % (path, ts.name,
path_in_suite))
return ts, getTestsInSuite(ts, path_in_suite, litConfig,
testSuiteCache, localConfigCache)
def getTestsInSuite(ts, path_in_suite, litConfig,
testSuiteCache, localConfigCache):
source_path = ts.getSourcePath(path_in_suite)
if not os.path.exists(source_path):
return
if not os.path.isdir(source_path):
lc = getLocalConfig(ts, path_in_suite[:-1], litConfig, localConfigCache)
yield Test.Test(ts, path_in_suite, lc)
return
lc = getLocalConfig(ts, path_in_suite, litConfig, localConfigCache)
if lc.test_format is not None:
for res in lc.test_format.getTestsInDirectory(ts, path_in_suite,
litConfig, lc):
yield res
for filename in os.listdir(source_path):
if filename in ('Output', '.svn') or filename in lc.excludes:
continue
file_sourcepath = os.path.join(source_path, filename)
if not os.path.isdir(file_sourcepath):
continue
file_execpath = ts.getExecPath(path_in_suite + (filename,))
if dirContainsTestSuite(file_execpath):
sub_ts, subiter = getTests(file_execpath, litConfig,
testSuiteCache, localConfigCache)
elif dirContainsTestSuite(file_sourcepath):
sub_ts, subiter = getTests(file_sourcepath, litConfig,
testSuiteCache, localConfigCache)
else:
subiter = getTestsInSuite(ts, path_in_suite + (filename,),
litConfig, testSuiteCache,
localConfigCache)
sub_ts = None
N = 0
for res in subiter:
N += 1
yield res
if sub_ts and not N:
litConfig.warning('test suite %r contained no tests' % sub_ts.name)
def runTests(numThreads, litConfig, provider, display):
if numThreads == 1:
t = Tester(litConfig, provider, display)
t.run()
return
testers = [Tester(litConfig, provider, display)
for i in range(numThreads)]
for t in testers:
t.start()
try:
for t in testers:
t.join()
except KeyboardInterrupt:
sys.exit(2)
def load_test_suite(inputs):
import unittest
litConfig = LitConfig.LitConfig(progname = 'lit',
path = [],
quiet = False,
useValgrind = False,
valgrindLeakCheck = False,
valgrindArgs = [],
useTclAsSh = False,
noExecute = False,
debug = False,
isWindows = (platform.system()=='Windows'),
params = {})
tests = []
testSuiteCache = {}
localConfigCache = {}
for input in inputs:
prev = len(tests)
tests.extend(getTests(input, litConfig,
testSuiteCache, localConfigCache)[1])
if prev == len(tests):
litConfig.warning('input %r contained no tests' % input)
if litConfig.numErrors:
print >>sys.stderr, '%d errors, exiting.' % litConfig.numErrors
sys.exit(2)
def get_test_fn(test):
return unittest.FunctionTestCase(
lambda: test.config.test_format.execute(
test, litConfig),
description = test.getFullName())
from LitTestCase import LitTestCase
return unittest.TestSuite([LitTestCase(test, litConfig) for test in tests])
def main(builtinParameters = {}): import sys
sys.setcheckinterval(1000)
global options
from optparse import OptionParser, OptionGroup
parser = OptionParser("usage: %prog [options] {file-or-path}")
parser.add_option("-j", "--threads", dest="numThreads", metavar="N",
help="Number of testing threads",
type=int, action="store", default=None)
parser.add_option("", "--config-prefix", dest="configPrefix",
metavar="NAME", help="Prefix for 'lit' config files",
action="store", default=None)
parser.add_option("", "--param", dest="userParameters",
metavar="NAME=VAL",
help="Add 'NAME' = 'VAL' to the user defined parameters",
type=str, action="append", default=[])
group = OptionGroup(parser, "Output Format")
group.add_option("-q", "--quiet", dest="quiet",
help="Suppress no error output",
action="store_true", default=False)
group.add_option("-s", "--succinct", dest="succinct",
help="Reduce amount of output",
action="store_true", default=False)
group.add_option("-v", "--verbose", dest="showOutput",
help="Show all test output",
action="store_true", default=False)
group.add_option("", "--no-progress-bar", dest="useProgressBar",
help="Do not use curses based progress bar",
action="store_false", default=True)
parser.add_option_group(group)
group = OptionGroup(parser, "Test Execution")
group.add_option("", "--path", dest="path",
help="Additional paths to add to testing environment",
action="append", type=str, default=[])
group.add_option("", "--vg", dest="useValgrind",
help="Run tests under valgrind",
action="store_true", default=False)
group.add_option("", "--vg-leak", dest="valgrindLeakCheck",
help="Check for memory leaks under valgrind",
action="store_true", default=False)
group.add_option("", "--vg-arg", dest="valgrindArgs", metavar="ARG",
help="Specify an extra argument for valgrind",
type=str, action="append", default=[])
group.add_option("", "--time-tests", dest="timeTests",
help="Track elapsed wall time for each test",
action="store_true", default=False)
group.add_option("", "--no-execute", dest="noExecute",
help="Don't execute any tests (assume PASS)",
action="store_true", default=False)
parser.add_option_group(group)
group = OptionGroup(parser, "Test Selection")
group.add_option("", "--max-tests", dest="maxTests", metavar="N",
help="Maximum number of tests to run",
action="store", type=int, default=None)
group.add_option("", "--max-time", dest="maxTime", metavar="N",
help="Maximum time to spend testing (in seconds)",
action="store", type=float, default=None)
group.add_option("", "--shuffle", dest="shuffle",
help="Run tests in random order",
action="store_true", default=False)
parser.add_option_group(group)
group = OptionGroup(parser, "Debug and Experimental Options")
group.add_option("", "--debug", dest="debug",
help="Enable debugging (for 'lit' development)",
action="store_true", default=False)
group.add_option("", "--show-suites", dest="showSuites",
help="Show discovered test suites",
action="store_true", default=False)
group.add_option("", "--no-tcl-as-sh", dest="useTclAsSh",
help="Don't run Tcl scripts using 'sh'",
action="store_false", default=True)
group.add_option("", "--repeat", dest="repeatTests", metavar="N",
help="Repeat tests N times (for timing)",
action="store", default=None, type=int)
parser.add_option_group(group)
(opts, args) = parser.parse_args()
if not args:
parser.error('No inputs specified')
if opts.configPrefix is not None:
global gConfigName, gSiteConfigName
gConfigName = '%s.cfg' % opts.configPrefix
gSiteConfigName = '%s.site.cfg' % opts.configPrefix
if opts.numThreads is None:
if sys.hexversion >= 0x2050200:
opts.numThreads = Util.detectCPUs()
else:
opts.numThreads = 1
inputs = args
userParams = dict(builtinParameters)
for entry in opts.userParameters:
if '=' not in entry:
name,val = entry,''
else:
name,val = entry.split('=', 1)
userParams[name] = val
litConfig = LitConfig.LitConfig(progname = os.path.basename(sys.argv[0]),
path = opts.path,
quiet = opts.quiet,
useValgrind = opts.useValgrind,
valgrindLeakCheck = opts.valgrindLeakCheck,
valgrindArgs = opts.valgrindArgs,
useTclAsSh = opts.useTclAsSh,
noExecute = opts.noExecute,
debug = opts.debug,
isWindows = (platform.system()=='Windows'),
params = userParams)
actual_inputs = []
for input in inputs:
if os.path.exists(input) or not input.startswith('@'):
actual_inputs.append(input)
else:
f = open(input[1:])
try:
for ln in f:
ln = ln.strip()
if ln:
actual_inputs.append(ln)
finally:
f.close()
tests = []
testSuiteCache = {}
localConfigCache = {}
for input in actual_inputs:
prev = len(tests)
tests.extend(getTests(input, litConfig,
testSuiteCache, localConfigCache)[1])
if prev == len(tests):
litConfig.warning('input %r contained no tests' % input)
if litConfig.numErrors:
print >>sys.stderr, '%d errors, exiting.' % litConfig.numErrors
sys.exit(2)
if opts.showSuites:
suitesAndTests = dict([(ts,[])
for ts,_ in testSuiteCache.values()
if ts])
for t in tests:
suitesAndTests[t.suite].append(t)
print '-- Test Suites --'
suitesAndTests = suitesAndTests.items()
suitesAndTests.sort(key = lambda (ts,_): ts.name)
for ts,ts_tests in suitesAndTests:
print ' %s - %d tests' %(ts.name, len(ts_tests))
print ' Source Root: %s' % ts.source_root
print ' Exec Root : %s' % ts.exec_root
numTotalTests = len(tests)
if opts.shuffle:
random.shuffle(tests)
else:
tests.sort(key = lambda t: t.getFullName())
if opts.maxTests is not None:
tests = tests[:opts.maxTests]
extra = ''
if len(tests) != numTotalTests:
extra = ' of %d' % numTotalTests
header = '-- Testing: %d%s tests, %d threads --'%(len(tests),extra,
opts.numThreads)
if opts.repeatTests:
tests = [t.copyWithIndex(i)
for t in tests
for i in range(opts.repeatTests)]
progressBar = None
if not opts.quiet:
if opts.succinct and opts.useProgressBar:
try:
tc = ProgressBar.TerminalController()
progressBar = ProgressBar.ProgressBar(tc, header)
except ValueError:
print header
progressBar = ProgressBar.SimpleProgressBar('Testing: ')
else:
print header
opts.numThreads = min(len(tests), opts.numThreads)
startTime = time.time()
display = TestingProgressDisplay(opts, len(tests), progressBar)
provider = TestProvider(tests, opts.maxTime)
runTests(opts.numThreads, litConfig, provider, display)
display.finish()
if not opts.quiet:
print 'Testing Time: %.2fs'%(time.time() - startTime)
for t in tests:
if t.result is None:
t.setResult(Test.UNRESOLVED, '', 0.0)
hasFailures = False
byCode = {}
for t in tests:
if t.result not in byCode:
byCode[t.result] = []
byCode[t.result].append(t)
if t.result.isFailure:
hasFailures = True
for title,code in (('Unexpected Passing Tests', Test.XPASS),
('Failing Tests', Test.FAIL)):
elts = byCode.get(code)
if not elts:
continue
print '*'*20
print '%s (%d):' % (title, len(elts))
for t in elts:
print ' %s' % t.getFullName()
print
if opts.timeTests:
times = {}
for t in tests:
key = t.getFullName()
times[key] = times.get(key, 0.) + t.elapsed
byTime = list(times.items())
byTime.sort(key = lambda (name,elapsed): elapsed)
if byTime:
Util.printHistogram(byTime, title='Tests')
for name,code in (('Expected Passes ', Test.PASS),
('Expected Failures ', Test.XFAIL),
('Unsupported Tests ', Test.UNSUPPORTED),
('Unresolved Tests ', Test.UNRESOLVED),
('Unexpected Passes ', Test.XPASS),
('Unexpected Failures', Test.FAIL),):
if opts.quiet and not code.isFailure:
continue
N = len(byCode.get(code,[]))
if N:
print ' %s: %d' % (name,N)
if litConfig.numErrors:
print >>sys.stderr, '\n%d error(s), exiting.' % litConfig.numErrors
sys.exit(2)
if litConfig.numWarnings:
print >>sys.stderr, '\n%d warning(s) in tests.' % litConfig.numWarnings
if hasFailures:
sys.exit(1)
sys.exit(0)
if __name__=='__main__':
main()