import os
import threading
import time
import traceback
try:
import Queue as queue
except ImportError:
import queue
try:
import win32api
except ImportError:
win32api = None
try:
import multiprocessing
except ImportError:
multiprocessing = None
import lit.Test
class LockedValue(object):
def __init__(self, value):
self.lock = threading.Lock()
self._value = value
def _get_value(self):
self.lock.acquire()
try:
return self._value
finally:
self.lock.release()
def _set_value(self, value):
self.lock.acquire()
try:
self._value = value
finally:
self.lock.release()
value = property(_get_value, _set_value)
class TestProvider(object):
def __init__(self, tests, num_jobs, queue_impl, canceled_flag):
self.canceled_flag = canceled_flag
self.queue = queue_impl()
for i in range(len(tests)):
self.queue.put(i)
for i in range(num_jobs):
self.queue.put(None)
def cancel(self):
self.canceled_flag.value = 1
def get(self):
if self.canceled_flag.value:
return None
return self.queue.get()
class Tester(object):
def __init__(self, run_instance, provider, consumer):
self.run_instance = run_instance
self.provider = provider
self.consumer = consumer
def run(self):
while True:
item = self.provider.get()
if item is None:
break
self.run_test(item)
self.consumer.task_finished()
def run_test(self, test_index):
test = self.run_instance.tests[test_index]
try:
self.run_instance.execute_test(test)
except KeyboardInterrupt:
print('\nCtrl-C detected, goodbye.')
os.kill(0,9)
self.consumer.update(test_index, test)
class ThreadResultsConsumer(object):
def __init__(self, display):
self.display = display
self.lock = threading.Lock()
def update(self, test_index, test):
self.lock.acquire()
try:
self.display.update(test)
finally:
self.lock.release()
def task_finished(self):
pass
def handle_results(self):
pass
class MultiprocessResultsConsumer(object):
def __init__(self, run, display, num_jobs):
self.run = run
self.display = display
self.num_jobs = num_jobs
self.queue = multiprocessing.Queue()
def update(self, test_index, test):
self.queue.put((test_index, test.result))
def task_finished(self):
self.queue.put(None)
def handle_results(self):
completed = 0
while completed != self.num_jobs:
item = self.queue.get()
if item is None:
completed += 1
continue
index,result = item
test = self.run.tests[index]
test.result = result
self.display.update(test)
def run_one_tester(run, provider, display):
tester = Tester(run, provider, display)
tester.run()
class Run(object):
"""
This class represents a concrete, configured testing run.
"""
def __init__(self, lit_config, tests):
self.lit_config = lit_config
self.tests = tests
def execute_test(self, test):
result = None
start_time = time.time()
try:
result = test.config.test_format.execute(test, self.lit_config)
if isinstance(result, tuple):
code, output = result
result = lit.Test.Result(code, output)
elif not isinstance(result, lit.Test.Result):
raise ValueError("unexpected result from test execution")
except KeyboardInterrupt:
raise
except:
if self.lit_config.debug:
raise
output = 'Exception during script execution:\n'
output += traceback.format_exc()
output += '\n'
result = lit.Test.Result(lit.Test.UNRESOLVED, output)
result.elapsed = time.time() - start_time
test.setResult(result)
def execute_tests(self, display, jobs, max_time=None,
use_processes=False):
"""
execute_tests(display, jobs, [max_time])
Execute each of the tests in the run, using up to jobs number of
parallel tasks, and inform the display of each individual result. The
provided tests should be a subset of the tests available in this run
object.
If max_time is non-None, it should be a time in seconds after which to
stop executing tests.
The display object will have its update method called with each test as
it is completed. The calls are guaranteed to be locked with respect to
one another, but are *not* guaranteed to be called on the same thread as
this method was invoked on.
Upon completion, each test in the run will have its result
computed. Tests which were not actually executed (for any reason) will
be given an UNRESOLVED result.
"""
consumer = None
if jobs != 1 and use_processes and multiprocessing:
try:
task_impl = multiprocessing.Process
queue_impl = multiprocessing.Queue
canceled_flag = multiprocessing.Value('i', 0)
consumer = MultiprocessResultsConsumer(self, display, jobs)
except:
self.lit_config.note('failed to initialize multiprocessing')
consumer = None
if not consumer:
task_impl = threading.Thread
queue_impl = queue.Queue
canceled_flag = LockedValue(0)
consumer = ThreadResultsConsumer(display)
provider = TestProvider(self.tests, jobs, queue_impl, canceled_flag)
if win32api is not None:
def console_ctrl_handler(type):
provider.cancel()
return True
win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
if max_time is not None:
def timeout_handler():
provider.cancel()
timeout_timer = threading.Timer(max_time, timeout_handler)
timeout_timer.start()
if jobs == 1:
run_one_tester(self, provider, consumer)
else:
self._execute_tests_in_parallel(task_impl, provider, consumer, jobs)
if max_time is not None:
timeout_timer.cancel()
for test in self.tests:
if test.result is None:
test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):
tasks = [task_impl(target=run_one_tester,
args=(self, provider, consumer))
for i in range(jobs)]
for t in tasks:
t.start()
consumer.handle_results()
for t in tasks:
t.join()