base.py   [plain text]


# Twisted, the Framework of Your Internet
# Copyright (C) 2003 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General
# Public License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
# USA
#
# Author: Clark Evans  (cce@clarkevans.com)
#
""" flow.base

    This module contains the core exceptions and base classes 
    in the flow module.   See flow.flow for more detailed information
"""

import twisted.python.compat
from twisted.internet import reactor
import time

#
# Exceptions used within flow
# 
class Unsupported(NotImplementedError):
    """ Indicates that the given stage does not know what to do 
        with the flow instruction that was returned.
    """
    def __init__(self, inst):
        msg = "Unsupported flow instruction: %s " % repr(inst)
        TypeError.__init__(self,msg)

class NotReadyError(RuntimeError):
    """ Raised when a stage has not been subject to a yield """
    pass

#
# Abstract/Base Classes
#

class Instruction:
    """ Has special meaning when yielded in a flow """
    pass
 
class Controller:
    """ Flow controller

        At the base of every flow, is a controller class which 
        interprets the instructions, especially the CallLater 
        instructions.  This is primarly just a marker class to
        denote which classes consume Instruction events.  If a
        controller cannot handle a particular instruction, it
        raises the Unsupported exception.
    """
    pass

class CallLater(Instruction):
    """ Instruction to support callbacks

        This is the instruction which is returned during the yield
        of the _Deferred and Callback stage.   The underlying 
        flow driver should call the 'callLater' function with the 
        callable to be executed after each callback.
    """
    def callLater(self, callable):
        pass

class Cooperate(CallLater):
    """ Requests that processing be paused so other tasks can resume

        Yield this object when the current chain would block or periodically
        during an intensive processing task.   The flow mechanism uses these
        objects to signal that the current processing chain should be paused
        and resumed later.  This allows other delayed operations to be
        processed, etc.  Usage is quite simple:

               // within some generator wrapped by a Controller
               yield Cooperate(1)  # yield for a second or more

    """
    def __init__(self, timeout = 0):
        self.timeout = timeout
    def callLater(self, callable):
        reactor.callLater(self.timeout, callable)

class Stage(Instruction):
    """ Abstract base defining protocol for iterator/generators in a flow

        This is the primary component in the flow system, it is an
        iterable object which must be passed to a yield statement 
        before each call to next().   Usage...

           iterable = DerivedStage( ... , SpamError, EggsError))
           yield iterable
           for result in iterable:
               // handle good result, or SpamError or EggsError
               yield iterable 

        Alternatively, when inside a generator, the next() method can be
        used directly.   In this case, if no results are available,
        StopIteration is raised, and if left uncaught, will nicely end
        the generator.   Of course, unexpected failures are raised.  This 
        technique is especially useful when pulling from more than 
        one stage at a time.

             def someGenerator():
                 iterable = SomeStage( ... , SpamError, EggsError)
                 while True:
                     yield iterable
                     result = iterable.next() 
                     // handle good result or SpamError or EggsError

        For many generators, the results become available in chunks
        of rows.  While the default value is to get one row at a time,
        there is a 'chunked' property which allows them to be 
        returned via the next() method as many rows rather than
        row by row.

             iterable = DerivedStage(...)
             iterable.chunked = True
             for results in iterable:
                 for result in results:
                      // handle good result
                 yield iterable

        For those wishing more control at the cost of a painful experience,
        the following member variables can be used to great effect:

            results  This is a list of results produced by the generator,
                     they can be fetched one by one using next() or in a
                     group together.  If no results were produced, then
                     this is an empty list.   These results should be 
                     removed from the list after they are read; or, after
                     reading all of the results set to an empty list

            stop     This is true if the underlying generator has finished 
                     execution (raised a StopIteration or returned).  Note
                     that several results may exist, and stop may be true.

            failure  If the generator produced an exception, then it is 
                     wrapped as a Failure object and put here.  Note that
                     several results may have been produced before the 
                     failure.  To ensure that the failure isn't accidently
                     reported twice, it is adviseable to set stop to True.

        The order in which these member variables is used is *critical* for
        proper adherance to the flow protocol.   First, all successful
        results should be handled.  Second, the iterable should be checked
        to see if it is finished.  Third, a failure should be checked; 
        while handling a failure, either the loop should be exited, or
        the iterable's stop member should be set. 

             iterable = SomeStage(...)
             while True:
                 yield iterable
                 if iterable.results:
                     for result in iterable.results:
                         // handle good result
                     iterable.results = []
                 if iterable.stop:
                     break
                 if iterable.failure:
                     iterable.stop = True
                     // handle iterable.failure
                     break
    """      
    def __init__(self, *trap):
        self._trap = trap
        self.stop = False
        self.failure = None
        self.results = []
        self.chunked = False
    
    def __iter__(self):
        return self

    def next(self):
        """ return current result

            This is the primary function to be called to retrieve
            the current result.  It complies with the iterator 
            protocol by raising StopIteration when the stage is
            complete.   It also raises an exception if it is 
            called before the stage is yielded. 
        """
        if self.results:
            if self.chunked:
                ret = self.results
                self.results = []
                return ret
            else:
                return self.results.pop(0)
        if self.stop:
            raise StopIteration()

        if self.failure:
            self.stop = True

            cr = self.failure.check(*self._trap)

            if cr:
                return cr

            if self.failure.tb:
                raise self.failure.value.__class__, \
                      self.failure.value, self.failure.tb

            raise self.failure.value

        raise NotReadyError("Must 'yield' this object before calling next()")

    def _yield(self):
        """ executed during a yield statement by previous stage

            This method is private within the scope of the flow module, 
            it is used by one stage in the flow to ask a subsequent
            stage to produce its value.  The result of the yield is 
            then stored in self.result and is an instance of Failure
            if a problem occurred.
        """
        raise NotImplementedError