<?xml version="1.0"?> <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <title>Cooperative Data Flows (via generators)</title> </head> <body> <h1>Cooperative Data Flows (via generators)</h1> <h2>Background</h2> <h3>Handling more than one Request</h3> <p>When creating services to handle simultaneous requests, processing state must be managed. A common approach, threading, applies the method used for single user operations: keeping processing state within local variables and function arguments. In this case, the operating environment handles variable allocation and cleanup. Unfortunately, threading requires much preemption and locking magic to keep each thread's execution independent, yet allowing data to be shared between threads. This magic can lead to subtle programming errors, and, in the case of Python, slowness as every changes to a mutable object must be synchronized with the global interpreter lock.</p> <p>An alternative is to use a event-driven approach, where each operation is broken down into distinct steps, each step scheduled for operation in an delayed execution queue. In this approach, an object is often used used to maintain state information between each step, allowing other operations to be performed. Often each step is a member function, queued within Twisted using <code>reactor.CallLater</code>.</p> <pre class="python"> from twisted.internet import reactor class Request: def __init__(self, name): self.name = name reactor.callLater(0, self.step_one) def step_one(self): print self.name, ": Step One" reactor.callLater(0, self.step_two) def step_two(self): print self.name, ": Step Two" reactor.callLater(0, self.step_done) def step_done(self): # cleanup, and don't call reactor again. pass Request("James") Request("Wendy") # start and shut down the event loop reactor.callLater(1, reactor.stop) reactor.run() </pre> <p>In the output of this example, one can see that execution alternates between steps for James and Wendy's request. In effect, the requests are done in a parallel, cooperative manner. While this is a good start, it isn't perfect as the event loop is stopped with a hard-coded timeout. It would be better for each Request object to signal when it is completed so that the event loop can be shut down sooner.</p> <h3>Deferred Callbacks</h3> <p>In Twisted, communication between operations is often accomplished with <code>defer.Deferred</code>. The Deferred object decouples an asyncronous operation by providing a temporary storage location for its result. Instead of calling the operation and waiting for a return value, an operation schedules itself to be executed with <code>reactor.callLater</code>and then returns a <code>defer.Deferred</code> object. At this point, the caller can register a callback function to be executed when the operation has completed. In the example below, <code>defer.DeferredList</code> is used to merge the result of each request into a single notification, which is used to stop the main event loop. </p> <pre class="python"> from twisted.internet import reactor, defer class Request: def __init__(self, name): self.name = name self.d = defer.Deferred() reactor.callLater(0, self.step_one) def step_one(self): print self.name, ": Step One" reactor.callLater(0, self.step_two) def step_two(self): print self.name, ": Step Two" reactor.callLater(0, self.step_done) def step_done(self): self.d.callback(None) # notify callback that we are done james = Request("James") wendy = Request("Wendy") # start and shut down the event loop d = defer.DeferredList([james.d, wendy.d]) d.addCallback(lambda _: reactor.stop()) reactor.run() </pre> <p>While this <code>Deferred</code> approach is very good, it can get quickly complicated, especially if the request is not a simple linear sequence of steps, or when results must flow between steps incrementally. The flow module addresses these shortcomings using python generators.</p> <h2>Iterators and generators</h2> <p>An iterator is basically an object which produces a sequence of values. Python's iterators are simply objects with an <code>__iter__()</code> member function which returns an object (usually itself) which has a <code>next()</code> member function. The <code>next()</code> method is then invoked till it raises a <code>StopIteration</code> exception.</p> <pre class="python"> from twisted.python.compat import iter, StopIteration class Counter: def __init__(self, count): self.count = count def __iter__(self): return self def next(self): ret = self.count self.count -= 1 if ret: return ret raise StopIteration return ret import sys if sys.version_info < (2,2): def list(it): ret = [] it = iter(it) try: while 1: ret.append(it.next()) except StopIteration: pass return ret print list(Counter(3)) # prints: [3, 2, 1] </pre> <h3>State pattern</h3> <p>Often times it is useful for an iterator to change state during its production of values. This can be done nicely with the 'state' pattern.</p> <pre class="python"> class States: def __iter__(self): self.next = self.state_one return self def state_one(self): self.next = self.state_two return "one" def state_two(self): self.next = self.state_stop return "two" def state_stop(self): raise StopIteration print list(States()) # prints: ['one', 'two'] </pre> <h3>Generators</h3> <p>With Python 2.2, there is a wonderful syntax sugar for creating iterators... generators. When a generator is first executed, an iterator is returned. And from there on, each invocation of <code>next()</code> gives the subsequent value produced by the <code>yield</code> statement. With generators, the two iterators above become very easy to express.</p> <pre class="python"> from __future__ import generators def Counter(count): while count > 0: yield count count -= 1 def States(): yield "one" yield "two" print list(Counter(3)) print list(States()) # prints: # [3, 2, 1] # ['one', 'two'] </pre> <p>One technical difference between iterators and generators, is that raising an exception from a generator permanently halts the generator, while raising an exception from an iterator's <code>next()</code> method does not invalidate the iterator, that is, one could call the <code>next()</code> method again and possibly get results. From here on, we use the generator syntax for building iterators.</p> <h3>Chaining Generators</h3> <p>It is often useful to view an operation information as a flow between stages, where each stage may have several states or steps. This can be coded where the output of one generator is consumed by another. In this view, the last generator in the chain 'pulls' data from previous stages.</p> <pre class="python"> from __future__ import generators def Counter(count): while count > 0: yield count count -= 1 def Consumer(): producer = Counter(3) for result in producer: if 2 != result: yield result print list(Consumer()) # prints: [3, 1] </pre> <p>While this is a very clean syntax for creating a multi-stage operation, it would block all other operations. Therefore, some mechanism for pausing the generator and resuming it later is required.</p> <h2>Introducing Flow</h2> <p>The flow module provides this ability to cooperate with other tasks. This is accomplished by wrapping iterables in a flow stage object and following an alternating yield pattern. That is, before each value pulled from the stage, the operation must yield the wrapper object. During this yield bookkeeping is done to prepare the next value, or, if the next value is not available, re-scheduling the operation to be executed later.</p> <pre class="python"> from __future__ import generators from twisted.flow import flow def Counter(count): while count > 0: yield count count -= 1 def Consumer(): producer = flow.wrap(Counter(3)) yield producer for result in producer: if 2 != result: yield result yield producer print list(flow.Block(Consumer)) # prints: [3, 1] </pre> <h3>Equivalent Forms</h3> <p>In the above code, <code>producer.next()</code> is called implicitly, and thus the generator above is equivalent to...</p> <pre class="python"> from __future__ import generators def Consumer(): producer = flow.wrap(Counter(3)) while True: yield producer result = producer.next() if 2 != result: yield result </pre> <p>The <code>next()</code> method of the wrapper object does several things. First, it checks to see if there are results ready, if so it returns the next one. If not, it looks for a failure, raising it. And finally, checking to see if the end of the input has been reached. More concretely... </p> <pre class="python"> from __future__ import generators def Consumer(): producer = flow.wrap(Counter(3)) while True: yield producer if producer.results: result = producer.results.pop(0) yield result continue if producer.failure: producer.stop = 1 producer.failure.trap() if producer.stop: break </pre> <h3>Handling failures</h3> <p>Another difference between plain old iterables and one wrapped with the flow module is that exceptions caught are wrapped with a <code>twisted.python.failure.Failure</code> object for later delivery. There are two basic ways to recover from exceptions. One way is to list expected exceptions in the call to <code>flow.wrap</code>. Alternatively, a try/except block can be used, catching <code>flow.Failure</code> objects. </p> <pre class="python"> from __future__ import generators from twisted.flow import flow def Producer(throw): yield 1 yield 2 raise throw yield 3 def Consumer(producer): producer = flow.wrap(producer, IOError) yield producer try: for result in producer: if result is IOError: # handle trapped error yield "trapped" else: yield result yield producer except AssertionError, err: # handle assertion error yield str(err) print list(flow.Block(Consumer(Producer(IOError("trap"))))) print list(flow.Block(Consumer(Producer(AssertionError("notrap"))))) # prints: [1, 2, 'trapped'] # prints: [1, 2, 'notrap'] </pre> <h3>Cooperate</h3> <p>This seems like quite the effort, wrapping each iterator and then having to alter the calling sequence. Why? The answer is that it allows for a <code>flow.Cooperate</code> object to be returned. When this happens, the entire call chain can be paused so that other flows can use the call stack. For flow.Block, the implementation of Cooperate simply puts the call chain to sleep.</p> <pre class="python"> from __future__ import generators from twisted.flow import flow def gen(): yield 'immediate' yield flow.Cooperate(2) yield 'delayed' for x in flow.Block(gen): print x # prints: # immediate # delayed </pre> <h3>Merge and Zip</h3> <p>Cooperate can be demonstrated with <code>flow.Merge</code> and <code>flow.Zip</code> components. These two stages join two or more generators into a single stream. The Merge operation does this by rotating between any of its input streams which are available. The Zip operation, on the other hand, waits for a result from each stream before it produces a result.</p> <pre class="python"> from __future__ import generators from twisted.flow import flow def Right(): yield "one" yield "two" yield flow.Cooperate() yield "three" def Left(): yield 1 yield 2 yield 3 print "Zip", list(flow.Block(flow.Zip(Right,Left))) print "Merge", list(flow.Block(flow.Merge(Right,Left))) # Zip [('one', 1), ('two', 2), ('three', 3)] # Merge ['one', 1, 'two', 2, 3, 'three'] </pre> <h2>Integrating with Twisted</h2> <p>While <code>flow.Block</code> is useful for understanding how flow works, it undermines the whole purpose by sleeping during Cooperate and blocking other operations. Following are numerous examples of how to integrate flow with the Twisted framework in a non-blocking manner.</p> <h3>Deferred Flow</h3> <p>For starters, the long example of Wendy and James, with its numerous calls to <code>reactor.callLater</code> to schedule each step of the operation can be rewritten using flow.Deferred.</p> <pre class="python"> from __future__ import generators from twisted.internet import reactor, defer from twisted.flow import flow def request(name): print name, ": Step One" yield flow.Cooperate() print name, ": Step Two" james = flow.Deferred(request("James")) wendy = flow.Deferred(request("Wendy")) # start and shut down the event loop d = defer.DeferredList([wendy, james]) d.addCallback(lambda _: reactor.stop()) reactor.run() </pre> <p>Under the sheets, when <code>flow.Deferred</code> encounters a <code>flow.Cooperate</code> event, it reschedules itself to be resumed at a later time, allowing other asyncronous operations to proceed. Once again, <code>defer.DeferredList</code> is only used here to stop the reactor after all operations are completed.</p> <h3>Flow Resources</h3> <p>By using <code>flow.Deferred</code> it is easy to make up a web resource which is both long running, but also can serve more than one customer at a time. Run the example below, and with two browsers, view the webpage. Notice that both web pages are being created at the same time.</p> <pre class="python"> from __future__ import generators from twisted.internet import reactor from twisted.web import server, resource from twisted.flow import flow def cooperative(count): """ simulate a cooperative resource, that not block """ from random import random idx = 0 while idx < count: val = random() yield flow.Cooperate(val) yield str(val)[-5:] idx += 1 def flowRender(req): count = int(req.args.get("count",["30"])[0]) req.write("<html><body>") req.write(" %s Random numbers: <ol>\n" % count) source = flow.wrap(cooperative(count)) yield source for itm in source: req.write("<li>%s</li>\n" % itm) yield source req.write("</ol></body></html>\n") class FlowResource(resource.Resource): def __init__(self, gen): resource.Resource.__init__(self) self.gen = gen def isLeaf(self): return true def render(self, req): self.d = flow.Deferred(self.gen(req)) self.d.addCallback(lambda _: req.finish()) return server.NOT_DONE_YET print "visit http://localhost:8081/ to view the example" root = FlowResource(flowRender) site = server.Site(root) reactor.listenTCP(8081,site) reactor.run() </pre> <h3>Flow Protocols</h3> <p>The flow module can also be used to construct protocols easily, following is an echo client and server. For each protocol, one must yield the connection before reading from it. When the generator finishes, the connection is automatically closed.</p> <pre class="python"> from __future__ import generators from twisted.flow import flow from twisted.internet import protocol, reactor PORT = 8392 def echoServer(conn): yield conn for data in conn: conn.write(data) yield conn reactor.callLater(0,reactor.stop) server = protocol.ServerFactory() server.protocol = flow.makeProtocol(echoServer) reactor.listenTCP(PORT,server) def echoClient(conn): conn.write("Hello World") yield conn print conn.next() conn.write("Another Line") yield conn print conn.next() client = protocol.ClientFactory() client.protocol = flow.makeProtocol(echoClient) reactor.connectTCP("localhost", PORT, client) reactor.run() </pre> <h2>Threading</h2> <p>While the Flow module allows for multiple cooperative tasks to work in a single thread, sometimes it is necessary to have the output of another thread be consumed within a flow. This can be done with <code>twisted.flow.threads.Threaded</code>, which takes an iterable object and executes it in another thread. Following is a sample iterable, countSleep which simulates a blocking producer which must be put into a thread. To show that <code>twisted.flow.threads.Threaded</code> does not block other operations, a similar, cooperative count is included. </p> <pre class="python"> from __future__ import generators from twisted.internet import reactor, defer from twisted.flow import flow from twisted.flow.threads import Threaded def countSleep(index): from time import sleep for index in range(index): sleep(.3) print "sleep", index yield index def countCooperate(index): for index in range(index): yield flow.Cooperate(.1) print "cooperate", index yield "coop %s" % index d = flow.Deferred( flow.Merge( Threaded(countSleep(5)), countCooperate(5))) # # alternatively # d1 = flow.Deferred(Threaded(countSleep(5))) # d2 = flow.Deferred(countCooperate(10)) # d = defer.DeferredList([d1,d2]) def prn(x): print x reactor.stop() d.addCallback(prn) reactor.run() </pre> <h3>Using database connections</h3> <p>Since most standard database drivers are thread based, the flow builds on the <code>twisted.flow.threads.Threaded</code> by providing a <code>QueryIterator</code>, which takes an sql query and a <code>ConnectionPool</code>.</p> <pre class="python"> from __future__ import generators from twisted.enterprise import adbapi from twisted.internet import reactor from twisted.flow import flow from twisted.flow.threads import QueryIterator, Threaded dbpool = adbapi.ConnectionPool("SomeDriver",host='localhost', db='Database',user='User',passwd='Password') # # I test with... # from pyPgSQL import PgSQL # dbpool = PgSQL sql = """ (SELECT 'one') UNION ALL (SELECT 'two') UNION ALL (SELECT 'three') """ def consumer(): print "executing" query = Threaded(QueryIterator(dbpool, sql)) print "yielding" yield query print "done yeilding" for row in query: print "Processed result : ", row yield query from twisted.internet import reactor def finish(result): print "Deferred Complete : ", result reactor.stop() f = flow.Deferred(consumer()) f.addBoth(finish) reactor.run() </pre> </body> </html>