from __future__ import generators
from twisted.protocols import basic
from twisted.internet import reactor
from twisted.internet import protocol
from twisted.internet import defer
import Queue
import math
import sys
def withoutFlow():
"""
An example protocol which does not use the flow module
"""
def factor(n, q):
i = 1
while i < math.ceil(n ** 0.5):
if n % i == 0:
q.put(i)
q.put(n / i)
i += 1
q.put(None)
class FactoringServer(basic.LineReceiver):
def lineReceived(self, line):
try:
value = long(line)
except ValueError:
self.sendLine('ERROR')
else:
q = Queue.Queue()
reactor.callInThread(factor, value, q)
reactor.callLater(0.1, self.pollQueue, value, q)
def pollQueue(self, value, q):
while not q.empty():
factor = q.get(0)
if factor is None:
self.sendLine('%d: DONE' % (value,))
return
else:
self.sendLine('%d: %d' % (value, factor))
reactor.callLater(0.1, self.pollQueue, value, q)
def printFactorList(lst, orig):
print '%d: %s' % (orig, ', '.join(map(str, lst)))
class FactoringClient(basic.LineReceiver):
def __init__(self):
self.live = {}
def connectionMade(self):
d = self.findFactors(self.factory.value)
d.addCallback(printFactorList, self.factory.value)
d.addCallback(lambda x: self.transport.loseConnection())
def connectionLost(self, reason):
reactor.stop()
def findFactors(self, number):
if number in self.live:
raise RuntimeError, "You already asked for that."
d = defer.Deferred()
self.live[number] = ([], d)
self.sendLine(str(number))
return d
def lineReceived(self, line):
parts = line.split(': ')
value = long(parts[0])
if parts[1] == 'DONE':
factors, d = self.live[value]
d.callback(factors)
del self.live[value]
else:
self.live[value][0].append(long(parts[1]))
def server(port=6543):
f = protocol.ServerFactory()
f.protocol = FactoringServer
return reactor.listenTCP(port, f)
def client(n, port=6543):
f = protocol.ClientFactory()
f.protocol = FactoringClient
f.value = n
return reactor.connectTCP('localhost', port, f)
return (FactoringClient, FactoringServer)
def withFlow():
"""
The same example refactored to use generators/flow
"""
from twisted.flow import flow
def factor(n):
i = 1
while i < math.ceil(n ** 0.5):
if n % i == 0:
yield i
yield n / i
i += 1
if not i % 1000:
yield flow.Cooperate()
def writefactor(n, write):
factors = flow.wrap(factor(n))
yield factors
for fac in factors:
write('%d: %d' % (n, fac))
yield factors
write('%d: DONE' % (n,))
class FactoringServer(basic.LineReceiver):
def lineReceived(self, line):
try:
value = long(line)
except ValueError:
self.sendLine('ERROR')
else:
flow.Deferred(writefactor(value, self.sendLine))
def receiveLines(lines, live):
yield lines
for line in lines:
parts = line.split(': ')
value = long(parts[0])
factors = live[value]
if parts[1] == 'DONE':
del live[value]
yield (value, factors)
if not live:
return
else:
factors.append(long(parts[1]))
yield lines
def printResults(lines, live):
results = flow.wrap(receiveLines(lines, live))
yield results
for (value, factors) in results:
print '%d: %s' % (value, ', '.join(map(str, factors)))
yield results
class FactoringClient(basic.LineReceiver):
def __init__(self):
self.live = {}
def connectionLost(self, reason):
reactor.stop()
def connectionMade(self):
cb = flow.Callback()
self.lineReceived = cb.result
self.findFactors(self.factory.value)
d = flow.Deferred(printResults(cb,self.live))
d.addCallback(lambda _: self.transport.loseConnection())
def findFactors(self, number):
if number in self.live:
raise RuntimeError, "You already asked for that."
self.live[number] = []
self.sendLine(str(number))
return (FactoringClient, FactoringServer)
def main(tpl):
FactoringClient, FactoringServer = tpl
def server(port=6543):
f = protocol.ServerFactory()
f.protocol = FactoringServer
return reactor.listenTCP(port, f)
def client(n, port=6543):
f = protocol.ClientFactory()
f.protocol = FactoringClient
f.value = n
return reactor.connectTCP('localhost', port, f)
try:
value = long(sys.argv[1])
except:
value = 10
server()
client(value)
reactor.run()
if __name__ == '__main__':
main(withFlow())
main(withoutFlow())