from twisted.trial import unittest
from twisted.internet import reactor, protocol, error, app, abstract
from twisted.internet.defer import SUCCESS, FAILURE, Deferred, succeed, fail
from twisted.python import threadable, log
threadable.init(1)
import sys
import time
import threading
import types
class SystemEventTestCase(unittest.TestCase):
def setUp(self):
self.triggers = []
def addTrigger(self, event, phase, func):
t = reactor.addSystemEventTrigger(event, phase, func)
self.triggers.append(t)
return t
def removeTrigger(self, trigger):
reactor.removeSystemEventTrigger(trigger)
self.triggers.remove(trigger)
def tearDown(self):
for t in self.triggers:
try:
reactor.removeSystemEventTrigger(t)
except:
pass
def testTriggerSystemEvent1(self):
l = []
l2 = []
d = Deferred()
d2 = Deferred()
def _returnDeferred(d=d):
return d
def _returnDeferred2(d2=d2):
return d2
def _appendToList(l=l):
l.append(1)
def _appendToList2(l2=l2):
l2.append(1)
r = reactor
self.addTrigger("before", "test", _appendToList)
self.addTrigger("during", "test", _appendToList)
self.addTrigger("after", "test", _appendToList)
self.assertEquals(len(l), 0, "Nothing happened yet.")
r.fireSystemEvent("test")
r.iterate()
self.assertEquals(len(l), 3, "Should have filled the list.")
l[:]=[]
self.addTrigger("before", "defer", _returnDeferred)
self.addTrigger("before", "defer", _returnDeferred2)
self.addTrigger("during", "defer", _appendToList)
self.addTrigger("after", "defer", _appendToList)
r.fireSystemEvent("defer")
self.assertEquals(len(l), 0, "Event should not have fired yet.")
d.callback(None)
self.assertEquals(len(l), 0, "Event still should not have fired yet.")
d2.callback(None)
self.assertEquals(len(l), 2)
l[:]=[]
a = self.addTrigger("before", "remove", _appendToList)
b = self.addTrigger("before", "remove", _appendToList2)
self.removeTrigger(b)
r.fireSystemEvent("remove")
self.assertEquals(len(l), 1)
self.assertEquals(len(l2), 0)
def testTriggerSystemEvent2(self):
l = []
d = Deferred()
d2 = Deferred()
def _returnDeferred(d=d):
return d
def _fireDeferred(d=d):
d.callback(None)
def _returnDeferred2(d2=d2):
return d2
def _appendToList(l=l):
l.append(1)
r = reactor
self.addTrigger("before", "defer2", _returnDeferred)
self.addTrigger("before", "defer2", _fireDeferred)
self.addTrigger("before", "defer2", _returnDeferred2)
self.addTrigger("during", "defer2", _appendToList)
self.addTrigger("after", "defer2", _appendToList)
r.fireSystemEvent("defer2")
self.assertEquals(len(l), 0, "Event should not have fired yet.")
d2.callback(None)
self.assertEquals(len(l), 2)
def testTriggerSystemEvent3(self):
l = []
d = Deferred()
d2 = Deferred()
def _returnDeferred(d=d):
return d
def _appendToList(l=l):
l.append(1)
def _ignore(failure):
return None
r = reactor
b1 = self.addTrigger("before", "defer3", _returnDeferred)
b2 = self.addTrigger("after", "defer3", _appendToList)
r.fireSystemEvent("defer3")
self.assertEquals(len(l), 0, "Event should not have fired yet.")
self.removeTrigger(b1)
self.removeTrigger(b2)
try:
d.callback(None) except ValueError:
pass
self.assertEquals(len(l), 0)
d.addErrback(_ignore)
def testTriggerSystemEvent4(self):
l = []
l2 = []
d = Deferred()
d2 = Deferred()
def _returnDeferred(d=d):
return d
def _returnDeferred2(d2=d2):
return d2
def _appendToList(l=l):
l.append(1)
def _appendToList2(l2=l2):
l2.append(1)
r = reactor
self.addTrigger("before", "event1", _returnDeferred)
self.addTrigger("after", "event1", _appendToList)
self.addTrigger("before", "event2", _returnDeferred2)
self.addTrigger("after", "event2", _appendToList2)
r.fireSystemEvent("event1")
r.fireSystemEvent("event2")
self.assertEquals(len(l), 0, "Event should not have fired yet.")
self.assertEquals(len(l2), 0, "Event should not have fired yet.")
d.callback(None)
self.assertEquals(len(l), 1)
self.assertEquals(len(l2), 0)
d2.callback(None)
self.assertEquals(len(l), 1)
self.assertEquals(len(l2), 1)
def testTriggerSystemEvent5(self):
l = []
def _appendToList(l=l):
l.append(1)
r = reactor
b = self.addTrigger("after", "event1", _appendToList)
self.removeTrigger(b)
if type(b) == types.IntType:
bogus = b + 40
self.failUnlessRaises(ValueError,
r.removeSystemEventTrigger, bogus)
self.failUnlessRaises(TypeError,
r.removeSystemEventTrigger, None)
class InterfaceTestCase(unittest.TestCase):
_called = 0
def _callback(self, x, **d):
"""Callback for testCallLater"""
self.assertEquals(x, 1)
self.assertEquals(d, {'a': 1})
self._called = 1
self._calledTime = time.time()
def testCallLater(self):
def bad():
raise RuntimeError, "this shouldn't have been called"
i = reactor.callLater(0.1, bad)
i.cancel()
self.assertRaises(error.AlreadyCancelled, i.cancel)
i = reactor.callLater(0.5, self._callback, 1, a=1)
start = time.time()
while time.time() - start < 0.6:
reactor.iterate(0.01)
self.assertEquals(self._called, 1)
self.assertApproximates(self._calledTime, start + 0.5, 0.2 )
self.assertRaises(error.AlreadyCalled, i.cancel)
del self._called
del self._calledTime
def _resetcallback(self):
self._resetcallbackTime = time.time()
def _delaycallback(self):
self._delaycallbackTime = time.time()
def testCallLaterDelayAndReset(self):
self._resetcallbackTime = None
self._delaycallbackTime = None
ireset = reactor.callLater(0.4, self._resetcallback)
idelay = reactor.callLater(0.5, self._delaycallback)
start = time.time()
while time.time() - start < 0.2:
reactor.iterate(0.01)
ireset.reset(0.4) idelay.delay(0.3) while time.time() - start < 2:
reactor.iterate(0.01)
ireset_elapsed = self._resetcallbackTime - start
idelay_elapsed = self._delaycallbackTime - start
self.failUnless(idelay_elapsed >= ireset_elapsed,
"got %f, %f" % (idelay_elapsed, ireset_elapsed))
self.failUnless(ireset_elapsed >= (0.6-0.05),
"got %f (wanted 0.6)" % ireset_elapsed)
self.failUnless(idelay_elapsed >= (0.8-0.05),
"got %f (wanted 0.8)" % idelay_elapsed)
del self._resetcallbackTime
del self._delaycallbackTime
def testCallLaterTime(self):
d = reactor.callLater(10, lambda: None)
try:
self.failUnless(d.getTime() - (time.time() + 10) < 1)
finally:
d.cancel()
def testWakeUp(self):
"""reactor.wakeUp should terminate reactor.iterate(5)"""
def wake(reactor=reactor):
time.sleep(0.5)
reactor.wakeUp()
start = time.time()
t = threading.Thread(target=wake).start()
reactor.iterate(5)
elapsed = time.time() - start
self.failUnless(elapsed > 0 and elapsed < 1,
"woke up after %f, wanted 0..1" % elapsed)
class ReactorCoreTestCase(unittest.TestCase):
def setUp(self):
self.triggers = []
self.timers = []
def addTrigger(self, event, phase, func):
t = reactor.addSystemEventTrigger(event, phase, func)
self.triggers.append(t)
return t
def removeTrigger(self, trigger):
reactor.removeSystemEventTrigger(trigger)
self.triggers.remove(trigger)
def addTimer(self, when, func):
t = reactor.callLater(when, func)
self.timers.append(t)
return t
def removeTimer(self, timer):
try:
timer.cancel()
except error.AlreadyCalled:
pass
self.timers.remove(timer)
def tearDown(self):
for t in self.triggers:
try:
reactor.removeSystemEventTrigger(t)
except:
pass
def crash(self):
reactor.crash()
def stop(self):
reactor.stop()
def testRun(self):
"""Test that reactor.crash terminates reactor.run"""
reactor.callLater(0.1, self.crash)
reactor.run() reactor.callLater(0.1, self.crash)
reactor.run()
def testIterate(self):
"""Test that reactor.iterate(0) doesn't block"""
start = time.time()
t = reactor.callLater(10, self.crash)
reactor.iterate(0) stop = time.time()
elapsed = stop - start
self.failUnless(elapsed < 8)
t.cancel()
def timeout(self):
print "test timed out"
self.problem = 1
self.fail("test timed out")
def count(self):
self.counter += 1
def testCrash(self):
"""reactor.crash should NOT fire shutdown triggers"""
self.counter = 0
self.problem = 0
self.addTrigger("before", "shutdown", self.count)
self.addTimer(0.1, self.crash)
t = self.addTimer(5, self.timeout)
reactor.run()
self.failUnless(self.counter == 0,
"reactor.crash invoked shutdown triggers, "
"but it isn't supposed to")
self.failIf(self.problem, "the test timed out")
self.removeTimer(t)
class DelayedTestCase(unittest.TestCase):
def setUp(self):
self.finished = 0
self.counter = 0
self.timers = {}
if not hasattr(reactor, "getDelayedCalls"):
return
for t in reactor.getDelayedCalls():
t.cancel()
reactor.iterate()
def tearDown(self):
for t in self.timers.values():
t.cancel()
def checkTimers(self):
l1 = self.timers.values()
l1.sort()
l2 = list(reactor.getDelayedCalls())
l2.sort()
if l1 != l2:
self.finished = 1
self.assertEquals(l1, l2)
def callback(self, tag):
del self.timers[tag]
self.checkTimers()
def addCallback(self, tag):
self.callback(tag)
self.addTimer(15, self.callback)
def done(self, tag):
self.finished = 1
self.callback(tag)
def failsafe(self, tag):
self.finished = 1
self.fail("timeout")
def addTimer(self, when, callback):
self.timers[self.counter] = reactor.callLater(when * 0.01, callback,
self.counter)
self.counter += 1
self.checkTimers()
def testGetDelayedCalls(self):
if not hasattr(reactor, "getDelayedCalls"):
return
self.checkTimers()
self.addTimer(35, self.done)
self.addTimer(20, self.callback)
self.addTimer(30, self.callback)
which = self.counter
self.addTimer(29, self.callback)
self.addTimer(25, self.addCallback)
self.addTimer(26, self.callback)
self.addTimer(50, self.failsafe)
self.timers[which].cancel()
del self.timers[which]
self.checkTimers()
while not self.finished:
reactor.iterate(0.01)
self.checkTimers()
def testActive(self):
dcall = reactor.callLater(0, lambda: None)
self.assertEquals(dcall.active(), 1)
reactor.iterate()
self.assertEquals(dcall.active(), 0)
class Counter:
index = 0
def add(self):
self.index = self.index + 1
class Order:
stage = 0
def a(self):
if self.stage != 0: raise RuntimeError
self.stage = 1
def b(self):
if self.stage != 1: raise RuntimeError
self.stage = 2
def c(self):
if self.stage != 2: raise RuntimeError
self.stage = 3
class ThreadOrder(threading.Thread, Order):
def run(self):
self.schedule(self.a)
self.schedule(self.b)
self.schedule(self.c)
class callFromThreadTestCase(unittest.TestCase):
"""Task scheduling from threads tests."""
def schedule(self, *args, **kwargs):
"""Override in subclasses."""
reactor.callFromThread(*args, **kwargs)
def testScheduling(self):
c = Counter()
for i in range(100):
self.schedule(c.add)
for i in range(100):
reactor.iterate()
self.assertEquals(c.index, 100)
def testCorrectOrder(self):
o = Order()
self.schedule(o.a)
self.schedule(o.b)
self.schedule(o.c)
reactor.iterate()
reactor.iterate()
reactor.iterate()
self.assertEquals(o.stage, 3)
def testNotRunAtOnce(self):
c = Counter()
self.schedule(c.add)
self.assertEquals(c.index, 0)
reactor.iterate()
self.assertEquals(c.index, 1)
class MyProtocol(protocol.Protocol):
"""Sample protocol."""
class MyFactory(protocol.Factory):
"""Sample factory."""
protocol = MyProtocol
class ProtocolTestCase(unittest.TestCase):
def testFactory(self):
factory = MyFactory()
protocol = factory.buildProtocol(None)
self.assertEquals(protocol.factory, factory)
self.assert_( isinstance(protocol, factory.protocol) )
class StopError(Exception): pass
class StoppingService(app.ApplicationService):
def __init__(self, name, succeed):
app.ApplicationService.__init__(self, name)
self.succeed = succeed
def stopService(self):
if self.succeed:
return succeed("yay!")
else:
return fail(StopError('boo'))
class StoppingServiceII(app.ApplicationService):
def stopService(self):
return None
class MultiServiceTestCase(unittest.TestCase):
def setUp(self):
self.callbackRan = 0
def testDeferredStopService(self):
ms = app.MultiService("MultiService")
self.s1 = StoppingService("testService", 0)
self.s2 = StoppingService("testService2", 1)
ms.addService(self.s1)
ms.addService(self.s2)
ms.stopService().addCallback(self.woohoo)
def woohoo(self, res):
self.callbackRan = 1
self.assertEqual(res[self.s1][0], 0)
self.assertEqual(res[self.s2][0], 1)
def testStopServiceNone(self):
"""MultiService.stopService returns Deferred when service returns None.
"""
ms = app.MultiService("MultiService")
self.s1 = StoppingServiceII("testService")
ms.addService(self.s1)
d = ms.stopService()
d.addCallback(self.cb_nonetest)
def cb_nonetest(self, res):
self.callbackRan = 1
self.assertEqual((SUCCESS, None), res[self.s1])
def testEmptyStopService(self):
"""MutliService.stopService returns Deferred when empty."""
ms = app.MultiService("MultiService")
d = ms.stopService()
d.addCallback(self.cb_emptytest)
def cb_emptytest(self, res):
self.callbackRan = 1
self.assertEqual(len(res), 0)
def tearDown(self):
log.flushErrors (StopError)
self.failUnless(self.callbackRan, "Callback was never run.")
class DummyProducer:
resumed = 0
stopped = 0
def resumeProducing(self):
self.resumed += 1
def stopProducing(self):
self.stopped += 1
class TestProducer(unittest.TestCase):
def testDoubleProducer(self):
fd = abstract.FileDescriptor()
fd.connected = 1
dp = DummyProducer()
fd.registerProducer(dp, 0)
self.assertEquals(dp.resumed, 1)
self.assertRaises(RuntimeError, fd.registerProducer, DummyProducer(), 0)
def testUnconnectedFileDescriptor(self):
fd = abstract.FileDescriptor()
fd.disconnected = 1
dp = DummyProducer()
fd.registerProducer(dp, 0)
self.assertEquals(dp.stopped, 1)
if __name__ == '__main__':
unittest.main()