test_break.py   [plain text]


import gc
import os
import weakref

from cStringIO import StringIO

try:
    import signal
except ImportError:
    signal = None

import unittest2


class TestBreak(unittest2.TestCase):
    
    def setUp(self):
        self._default_handler = signal.getsignal(signal.SIGINT)
        
    def tearDown(self):
        signal.signal(signal.SIGINT, self._default_handler)
        unittest2.signals._results = weakref.WeakKeyDictionary()
        unittest2.signals._interrupt_handler = None

        
    def testInstallHandler(self):
        default_handler = signal.getsignal(signal.SIGINT)
        unittest2.installHandler()
        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
        
        try:
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)
        except KeyboardInterrupt:
            self.fail("KeyboardInterrupt not handled")
            
        self.assertTrue(unittest2.signals._interrupt_handler.called)
    
    def testRegisterResult(self):
        result = unittest2.TestResult()
        unittest2.registerResult(result)
        
        for ref in unittest2.signals._results:
            if ref is result:
                break
            elif ref is not result:
                self.fail("odd object in result set")
        else:
            self.fail("result not found")
        
        
    def testInterruptCaught(self):
        default_handler = signal.getsignal(signal.SIGINT)
        
        result = unittest2.TestResult()
        unittest2.installHandler()
        unittest2.registerResult(result)
        
        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
        
        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)
            result.breakCaught = True
            self.assertTrue(result.shouldStop)
        
        try:
            test(result)
        except KeyboardInterrupt:
            self.fail("KeyboardInterrupt not handled")
        self.assertTrue(result.breakCaught)
    
    
    def testSecondInterrupt(self):
        result = unittest2.TestResult()
        unittest2.installHandler()
        unittest2.registerResult(result)
        
        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)
            result.breakCaught = True
            self.assertTrue(result.shouldStop)
            os.kill(pid, signal.SIGINT)
            self.fail("Second KeyboardInterrupt not raised")
        
        try:
            test(result)
        except KeyboardInterrupt:
            pass
        else:
            self.fail("Second KeyboardInterrupt not raised")
        self.assertTrue(result.breakCaught)

    
    def testTwoResults(self):
        unittest2.installHandler()
        
        result = unittest2.TestResult()
        unittest2.registerResult(result)
        new_handler = signal.getsignal(signal.SIGINT)
        
        result2 = unittest2.TestResult()
        unittest2.registerResult(result2)
        self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
        
        result3 = unittest2.TestResult()
        
        def test(result):
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)
        
        try:
            test(result)
        except KeyboardInterrupt:
            self.fail("KeyboardInterrupt not handled")
        
        self.assertTrue(result.shouldStop)
        self.assertTrue(result2.shouldStop)
        self.assertFalse(result3.shouldStop)
    
    
    def testHandlerReplacedButCalled(self):
        # If our handler has been replaced (is no longer installed) but is
        # called by the *new* handler, then it isn't safe to delay the
        # SIGINT and we should immediately delegate to the default handler
        unittest2.installHandler()
        
        handler = signal.getsignal(signal.SIGINT)
        def new_handler(frame, signum):
            handler(frame, signum)
        signal.signal(signal.SIGINT, new_handler)
        
        try:
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)
        except KeyboardInterrupt:
            pass
        else:
            self.fail("replaced but delegated handler doesn't raise interrupt")
    
    def testRunner(self):
        # Creating a TextTestRunner with the appropriate argument should
        # register the TextTestResult it creates
        runner = unittest2.TextTestRunner(stream=StringIO())
        
        result = runner.run(unittest2.TestSuite())
        self.assertIn(result, unittest2.signals._results)
    
    def testWeakReferences(self):
        # Calling registerResult on a result should not keep it alive
        result = unittest2.TestResult()
        unittest2.registerResult(result)
        
        ref = weakref.ref(result)
        del result
        
        # For non-reference counting implementations
        gc.collect();gc.collect()
        self.assertIsNone(ref())
        
    
    def testRemoveResult(self):
        result = unittest2.TestResult()
        unittest2.registerResult(result)
        
        unittest2.installHandler()
        self.assertTrue(unittest2.removeResult(result))
        
        # Should this raise an error instead?
        self.assertFalse(unittest2.removeResult(unittest2.TestResult()))

        try:
            pid = os.getpid()
            os.kill(pid, signal.SIGINT)
        except KeyboardInterrupt:
            pass
        
        self.assertFalse(result.shouldStop)
    
    def testMainInstallsHandler(self):
        failfast = object()
        test = object()
        verbosity = object()
        result = object()
        default_handler = signal.getsignal(signal.SIGINT)

        class FakeRunner(object):
            initArgs = []
            runArgs = []
            def __init__(self, *args, **kwargs):
                self.initArgs.append((args, kwargs))
            def run(self, test):
                self.runArgs.append(test)
                return result
        
        class Program(unittest2.TestProgram):
            def __init__(self, catchbreak): 
                self.exit = False
                self.verbosity = verbosity
                self.failfast = failfast
                self.catchbreak = catchbreak
                self.testRunner = FakeRunner
                self.test = test
                self.result = None
        
        p = Program(False)
        p.runTests()
        
        self.assertEqual(FakeRunner.initArgs, [((), {'verbosity': verbosity, 
                                                'failfast': failfast,
                                                'buffer': None})])
        self.assertEqual(FakeRunner.runArgs, [test])
        self.assertEqual(p.result, result)
        
        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
        
        FakeRunner.initArgs = []
        FakeRunner.runArgs = []
        p = Program(True)
        p.runTests()
        
        self.assertEqual(FakeRunner.initArgs, [((), {'verbosity': verbosity, 
                                                'failfast': failfast,
                                                'buffer': None})])
        self.assertEqual(FakeRunner.runArgs, [test])
        self.assertEqual(p.result, result)
        
        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)


    def testRemoveHandler(self):
        default_handler = signal.getsignal(signal.SIGINT)
        unittest2.installHandler()
        unittest2.removeHandler()
        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)

        # check that calling removeHandler multiple times has no ill-effect
        unittest2.removeHandler()
        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
    
    def testRemoveHandlerAsDecorator(self):
        default_handler = signal.getsignal(signal.SIGINT)
        unittest2.installHandler()
        
        @unittest2.removeHandler
        def test():
            self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
        
        test()
        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
        

# Should also skip some tests on Jython
skipper = unittest2.skipUnless(hasattr(os, 'kill') and signal is not None, 
                               "test uses os.kill(...) and the signal module")
TestBreak = skipper(TestBreak)

if __name__ == '__main__':
    unittest2.main()