"""A java implementation of a ``select'' loop.
This implementation is very incomplete, and is rather buggy. This may be
inherent in the fact that it uses threads - a java.nio reactor will
someday hopefully provide better support.
API Stability: stable
Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
"""
from twisted.internet import protocol
from twisted.persisted import styles
from twisted.python import timeoutqueue, log, failure
from java.net import Socket, ServerSocket, SocketException, InetAddress
import java.io.InterruptedIOException
import java.io.IOException
from java.lang import System
import jarray
import threading, Queue, time
import abstract
import interfaces
from twisted.internet.base import ReactorBase
class JConnection(abstract.FileDescriptor,
styles.Ephemeral):
"""A java connection class."""
writeBlocker = None
def __init__(self, skt, protocol, jport):
self.skt = skt
self.protocol = protocol
self.istream = skt.getInputStream()
self.ostream = skt.getOutputStream()
self.writeQ = Queue.Queue()
self.jport = jport
def write(self, data):
self.writeQ.put(data)
def registerProducer(self, producer, streaming):
abstract.FileDescriptor.registerProducer(self, producer, streaming)
self.writeQ.put(BEGIN_CONSUMING)
def unregisterProducer(self):
abstract.FileDescriptor.unregisterProducer(self)
self.writeQ.put(END_CONSUMING)
def produceMore(self, x):
if self.producer:
self.producer.resumeProducing()
factory = None
def connectionLost(self, arg=None):
if not self.disconnected:
self.disconnected = 1
self.skt.shutdownInput()
self.skt.shutdownOutput()
self.skt.close()
self.protocol.connectionLost(arg)
if self.factory:
self.factory.clientConnectionLost(self.jport, arg)
else:
self.protocol.connectionLost(arg)
abstract.FileDescriptor.connectionLost(self, arg)
def loseConnection(self):
self.disconnecting = 1
self.writeQ.put(None)
def getHost(self):
return ('INET', InetAddress.getLocalHost().getHostAddress(), self.jport.port)
def getPeer(self):
addr = self.skt.getInetAddress()
return ('INET', addr.getHostAddress(), self.skt.getPort())
def getTcpNoDelay(self):
return self.skt.getTcpNoDelay()
def setTcpNoDelay(self, enabled):
self.skt.setTcpNoDelay(enabled)
class Blocker(threading.Thread):
stopped = 0
def __init__(self, q):
threading.Thread.__init__(self)
self.q = q
def block(self):
raise 'hello'
def blockForever(self):
while not self.stopped:
obj = self.block()
if obj:
self.q.put(obj)
self.q.put((self.blockerFinished, None))
def blockerFinished(self, ignore):
"""Called in main thread when blocker finishes."""
pass
def run(self):
self.blockForever()
def stop(self):
self.stopped = 1
BEGIN_CONSUMING = 1
END_CONSUMING = 2
class WriteBlocker(Blocker):
def __init__(self, fdes, q):
Blocker.__init__(self, q)
self.fdes = fdes
self.consuming = 0
def block(self):
if self.consuming:
try:
data = self.fdes.writeQ.get_nowait()
except Queue.Empty:
self.producing = 0
self.q.put((self.fdes.produceMore, 1))
data = self.fdes.writeQ.get()
else:
data = self.fdes.writeQ.get()
if data is None:
self.stop()
return (self.fdes.connectionLost, failure.Failure())
elif data == BEGIN_CONSUMING:
self.consuming = 1
elif data == END_CONSUMING:
self.consuming = 0
else:
try:
self.fdes.ostream.write(data)
self.fdes.ostream.flush()
except SocketException:
self.stop()
return (self.fdes.connectionLost, failure.Failure())
class ReadBlocker(Blocker):
def __init__(self, fdes, q):
Blocker.__init__(self, q)
self.fdes = fdes
def block(self):
bytes = jarray.zeros(8192, 'b')
try:
l = self.fdes.istream.read(bytes)
except SocketException:
self.stop()
return (self.fdes.connectionLost, failure.Failure())
if l == -1:
self.stop()
return (self.fdes.connectionLost, failure.Failure())
return (self.fdes.protocol.dataReceived, bytes[:l].tostring())
class AcceptBlocker(Blocker):
def __init__(self, svr, q):
Blocker.__init__(self, q)
self.svr = svr
def block(self):
while 1:
try:
skt = self.svr.sskt.accept()
return (self.svr.gotSocket, skt)
except (java.io.InterruptedIOException, java.io.IOException):
if not self.svr.isListening:
self.svr.sskt.close()
return
def blockerFinished(self, ignore):
self.svr.blockerFinished(ignore)
class JReactor(ReactorBase):
"""Fakes multiplexing using multiple threads and an action queue."""
def __init__(self):
ReactorBase.__init__(self)
self.q = timeoutqueue.TimeoutQueue()
def installWaker(self):
pass
def removeAll(self):
return []
def wakeUp(self):
self.q.put(lambda x: x, None)
def run(self, **kwargs):
import main
main.running = 1
while 1:
self.runUntilCurrent()
timeout = self.timeout()
if timeout is None:
timeout = 1.0
self.doIteration(timeout)
def doIteration(self, timeout):
log.msg(channel='system', event='iteration', reactor=self)
try:
self.q.wait(timeout)
except timeoutqueue.TimedOut:
pass
for i in range(self.q.qsize()):
meth, arg = self.q.get()
meth(arg)
def listenTCP(self, port, factory, backlog=5, interface=''):
jp = JavaPort(self, port, factory, backlog)
jp.startListening()
return jp
def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
jc = JavaConnector(self, host, port, factory)
jc.connect()
return jc
def wakeUp(self):
self.q.put((doNothing, None))
def doNothing(arg):
pass
class ConnectBlocker(Blocker):
def __init__(self, svr, q):
Blocker.__init__(self, q)
self.svr = svr
self.blockedyet = 0
def block(self):
if self.blockedyet:
return
self.blockedyet = 1
while 1:
try:
skt = Socket(self.svr.host, self.svr.port)
return (self.svr.gotSocket, skt)
except (java.io.InterruptedIOException, java.io.IOException):
if not self.svr.isConnecting:
return
def blockerFinished(self, ignore):
self.svr.blockerFinished(ignore)
class JavaConnector:
__implements__ = interfaces.IConnector
def __init__(self, reactor, host, port, factory):
self.reactor = reactor
self.host = host
self.port = port
self.factory = factory
self.currentTransport = None
def connect(self):
self.isConnecting = 1
self.factory.doStart()
ConnectBlocker(self, self.reactor.q).start()
log.msg("%s starting on %s"%(self.factory.__class__, self.port))
def stopConnecting(self):
self.isConnecting = 0
def blockerFinished(self, ignore):
"""Called when ConnectBlocker is finished."""
print 'connect blocker finished'
def disconnect(self):
if not self.currentTransport:
self.stopConnecting()
else:
self.currentTransport.loseConnection()
def getDestination(self):
return ("INET", self.host, self.port)
def gotSocket(self, skt):
protocol = self.factory.buildProtocol(None)
transport = JConnection(skt, protocol, self)
transport.factory = self.factory
protocol.makeConnection(transport)
wb = WriteBlocker(transport, self.reactor.q)
wb.start()
transport.writeBlocker = wb
ReadBlocker(transport, self.reactor.q).start()
class JavaPort:
__implements__ = interfaces.IListeningPort
def __init__(self, reactor, port, factory, backlog):
self.reactor = reactor
self.factory = factory
self.port = port
self.backlog = backlog
self.isListening = 1
def startListening(self):
self.factory.doStart()
sskt = ServerSocket(self.port, self.backlog)
sskt.setSoTimeout(100)
self.sskt = sskt
AcceptBlocker(self, self.reactor.q).start()
log.msg("%s starting on %s"%(self.factory.__class__, self.port))
def stopListening(self):
self.isListening = 0
def blockerFinished(self, ignore):
"""Called when AcceptBlocker is finished."""
self.factory.doStop()
def gotSocket(self, skt):
protocol = self.factory.buildProtocol(None)
transport = JConnection(skt, protocol, self)
protocol.makeConnection(transport)
wb = WriteBlocker(transport, self.reactor.q)
wb.start()
transport.writeBlocker = wb
ReadBlocker(transport, self.reactor.q).start()
class Errno:
"""A class which will pretend to be a module."""
def __getattr__(self, name):
return 1
def install():
import sys
sys.modules["errno"] = Errno()
reactor = JReactor()
import main
main.installReactor(reactor)
return reactor
__all__ = ["install"]