cfreactor.py   [plain text]


# Twisted, the Framework of Your Internet
# Copyright (C) 2003 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

"""
This module provides support for Twisted to interact with CoreFoundation
CFRunLoops.  This includes Cocoa's NSRunLoop.

In order to use this support, simply do the following::

    |  from twisted.internet import cfreactor
    |  cfreactor.install()

Then use the twisted.internet APIs as usual.  The other methods here are not
intended to be called directly under normal use.  However, install can take
a runLoop kwarg, and run will take a withRunLoop arg if you need to explicitly
pass a CFRunLoop for some reason.  Otherwise it will make a pretty good guess
as to which runLoop you want (the current NSRunLoop if PyObjC is imported,
otherwise the current CFRunLoop.  Either way, if one doesn't exist, it will
be created).

API Stability: stable

Maintainer: U{Bob Ippolito<mailto:bob@redivi.com>}
"""

__all__ = ['install']

import sys
import cfsupport as cf

from twisted.python import log, threadable, failure
from twisted.internet import main, default, error
from weakref import WeakKeyDictionary

# cache two extremely common "failures" without traceback info
_faildict = {
    error.ConnectionDone: failure.Failure(error.ConnectionDone()),
    error.ConnectionLost: failure.Failure(error.ConnectionLost()),
}

class SelectableSocketWrapper(object):
    _objCache = WeakKeyDictionary()

    cf = None
    def socketWrapperForReactorAndObject(klass, reactor, obj):
        _objCache = klass._objCache
        if obj in _objCache:
            return _objCache[obj]
        v = _objCache[obj] = klass(reactor, obj)
        return v
    socketWrapperForReactorAndObject = classmethod(socketWrapperForReactorAndObject)
        
    def __init__(self, reactor, obj):
        if self.cf:
            raise ValueError, "This socket wrapper is already initialized"
        self.reactor = reactor
        self.obj = obj
        obj._orig_ssw_connectionLost = obj.connectionLost
        obj.connectionLost = self.objConnectionLost
        self.fd = obj.fileno()
        self.writing = False
        self.reading = False
        self.wouldRead = False
        self.wouldWrite = False
        self.cf = cf.PyCFSocket(obj.fileno(), self.doRead, self.doWrite, self.doConnect)
        self.cf.stopWriting()
        reactor.getRunLoop().addSocket(self.cf)
       
    def __repr__(self):
        return 'SSW(fd=%r r=%r w=%r x=%08x o=%08x)' % (self.fd, int(self.reading), int(self.writing), id(self), id(self.obj))

    def objConnectionLost(self, *args, **kwargs):
        obj = self.obj
        self.reactor.removeReader(obj)
        self.reactor.removeWriter(obj)
        obj.connectionLost = obj._orig_ssw_connectionLost
        obj.connectionLost(*args, **kwargs)
        try:
            del self._objCache[obj]
        except:
            pass
        self.obj = None
        self.cf = None

    def doConnect(self, why):
        pass

    def startReading(self):
        self.cf.startReading()
        self.reading = True
        if self.wouldRead:
            if not self.reactor.running:
                self.reactor.callLater(0, self.doRead)
            else:
                self.doRead()
            self.wouldRead = False
        return self

    def stopReading(self):
        self.cf.stopReading()
        self.reading = False
        self.wouldRead = False
        return self

    def startWriting(self):
        self.cf.startWriting()
        self.writing = True
        if self.wouldWrite:
            if not self.reactor.running:
                self.reactor.callLater(0, self.doWrite)
            else:
                self.doWrite()
            self.wouldWrite = False
        return self

    def stopWriting(self):
        self.cf.stopWriting()
        self.writing = False
        self.wouldWrite = False
    
    def _finishReadOrWrite(self, fn, faildict=_faildict):
        try:
            why = fn()
        except:
            why = sys.exc_info()[1]
            log.err()
        if why:
            try:
                f = faildict.get(why.__class__) or failure.Failure(why)
                self.objConnectionLost(f)
            except:
                log.err()
        if self.reactor.running:
            self.reactor.simulate()

    def doRead(self):
        obj = self.obj
        if not obj:
            return
        if not self.reading:
            self.wouldRead = True
            if self.reactor.running:
                self.reactor.simulate()
            return
        self._finishReadOrWrite(obj.doRead)

    def doWrite(self):
        obj = self.obj
        if not obj:
            return
        if not self.writing:
            self.wouldWrite = True
            if self.reactor.running:
                self.reactor.simulate()
            return
        self._finishReadOrWrite(obj.doWrite)
 
    def __hash__(self):
        return hash(self.fd)

class CFReactor(default.PosixReactorBase):
    # how long to poll if we're don't care about signals
    longIntervalOfTime = 60.0 

    # how long we should poll if we do care about signals
    shortIntervalOfTime = 1.0

    # don't set this
    pollInterval = longIntervalOfTime

    def __init__(self, runLoop=None):
        self.readers = {}
        self.writers = {}
        self.running = 0
        self.crashing = False
        self._doRunUntilCurrent = True
        self.timer = None
        self.runLoop = None
        self.inheritedRunLoop = runLoop is not None 
        if self.inheritedRunLoop:
            self.getRunLoop(runLoop)
        default.PosixReactorBase.__init__(self)

    def installWaker(self):
        # I don't know why, but the waker causes 100% CPU
        # so for now we don't install one, ever.
        return
    
    def getRunLoop(self, runLoop=None):
        if self.runLoop is None:
            # If Foundation is loaded, assume they want the current
            # NSRunLoop, not the base CFRunLoop.
            # If None or an NSRunLoop instance is given, then we assume
            # the user has caused it to begin running.  In reality, 
            # NSApplication probably started it for them.
            #
            # If this is a wrong guess, the user can make the runloop go
            # on their own after reactor.run().  It's a pretty good guess,
            # though.
            if 'Foundation' in sys.modules:
                from Foundation import NSRunLoop
                nsRunLoop = runLoop or NSRunLoop.currentRunLoop()
                if isinstance(nsRunLoop, NSRunLoop):
                    runLoop = nsRunLoop.getCFRunLoop()
                    self.inheritedRunLoop = True
            self.runLoop = cf.PyCFRunLoop(runLoop)
        return self.runLoop
    
    def addReader(self, reader):
        self.readers[reader] = SelectableSocketWrapper.socketWrapperForReactorAndObject(self, reader).startReading()

    def addWriter(self, writer):
        self.writers[writer] = SelectableSocketWrapper.socketWrapperForReactorAndObject(self, writer).startWriting()

    def removeReader(self, reader):
        wrapped = self.readers.get(reader, None)
        if wrapped is not None:
            del self.readers[reader]
            wrapped.stopReading()

    def removeWriter(self, writer):
        wrapped = self.writers.get(writer, None)
        if wrapped is not None:
            del self.writers[writer]
            wrapped.stopWriting()

    def removeAll(self):
        r = self.readers.keys()
        for s in self.readers.itervalues():
            s.stopReading()
        for s in self.writers.itervalues():
            s.stopWriting()
        self.readers.clear()
        self.writers.clear()
        return r
        
    def run(self, installSignalHandlers=1, withRunLoop=None):
        if self.running:
            raise ValueError, "Reactor already running"
        if installSignalHandlers:
            self.pollInterval = self.shortIntervalOfTime
        runLoop = self.getRunLoop(withRunLoop)
        self._startup()
       
        self.startRunning(installSignalHandlers=installSignalHandlers)

        self.running = True
        if not self.inheritedRunLoop:
            # Inherited runLoops are assumed to be running already,
            # but we created this one so we have to start it.
            runLoop.run()
            self.crashing = False

    def callLater(self, howlong, *args, **kwargs):
        rval = default.PosixReactorBase.callLater(self, howlong, *args, **kwargs)
        if self.timer:
            timeout = self.timeout()
            if timeout is None:
                timeout = howlong
            sleepUntil = cf.now() + min(timeout, howlong)
            if sleepUntil < self.timer.getNextFireDate():
                self.timer.setNextFireDate(sleepUntil)
        else:
            pass
        return rval
        
    def iterate(self, howlong=0.0):
        if self.running:
            raise ValueError, "Can't iterate a running reactor"
        self.runUntilCurrent()
        self.doIteration(howlong)
        
    def doIteration(self, howlong):
        if self.running:
            raise ValueError, "Can't iterate a running reactor"
        howlong = howlong or 0.01
        pi = self.pollInterval
        self.pollInterval = howlong
        self._doRunUntilCurrent = False
        self.run()
        self._doRunUntilCurrent = True
        self.pollInterval = pi

    def simulate(self):
        if self.crashing:
            return
        if not self.running:
            raise ValueError, "You can't simulate a stopped reactor"
        if self._doRunUntilCurrent:
            self.runUntilCurrent()
        if self.crashing:
            return
        if self.timer is None:
            return
        nap = self.timeout()
        if nap is None:
            nap = self.pollInterval
        else:
            nap = min(self.pollInterval, nap)
        if self.running:
            self.timer.setNextFireDate(cf.now() + nap)
        if not self._doRunUntilCurrent:
            self.crash()
        
    def _startup(self):
        if self.running:
            raise ValueError, "Can't bootstrap a running reactor"
        self.timer = cf.PyCFRunLoopTimer(cf.now(), self.pollInterval, self.simulate)
        self.runLoop.addTimer(self.timer)

    def cleanup(self):
        pass

    def sigInt(self, *args):
        self.callLater(0.0, self.stop)

    def crash(self):
        if not self.running:
            raise ValueError, "Can't crash a stopped reactor"
        self.running = False
        self.crashing = True
        if self.timer is not None:
            self.runLoop.removeTimer(self.timer)
            self.timer = None
        if not self.inheritedRunLoop:
            self.runLoop.stop()

    def stop(self):
        if not self.running:
            raise ValueError, "Can't stop a stopped reactor"
        default.PosixReactorBase.stop(self)

def install(runLoop=None):
    """Configure the twisted mainloop to be run inside CFRunLoop.
    """
    reactor = CFReactor(runLoop=runLoop)
    reactor.addSystemEventTrigger('after', 'shutdown', reactor.cleanup)
    from twisted.internet.main import installReactor
    installReactor(reactor)
    return reactor