test_persisted.py   [plain text]



# Twisted, the Framework of Your Internet
# Copyright (C) 2001 Matthew W. Lefkowitz
# 
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
# 
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
# 
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

# System Imports
from twisted.trial import unittest

try:
    import cPickle as pickle
except ImportError:
    import pickle

try:
    import cStringIO as StringIO
except ImportError:
    import StringIO

# Twisted Imports
from twisted.persisted import styles, marmalade, aot


class VersionTestCase(unittest.TestCase):
    def testNullVersionUpgrade(self):
        global NullVersioned
        class NullVersioned:
            ok = 0
        pkcl = pickle.dumps(NullVersioned())
        class NullVersioned(styles.Versioned):
            persistenceVersion = 1
            def upgradeToVersion1(self):
                self.ok = 1
        mnv = pickle.loads(pkcl)
        styles.doUpgrade()
        assert mnv.ok, "initial upgrade not run!"

    def testVersionUpgrade(self):
        global MyVersioned
        class MyVersioned(styles.Versioned):
            persistenceVersion = 2
            persistenceForgets = ['garbagedata']
            v3 = 0
            v4 = 0

            def __init__(self):
                self.somedata = 'xxx'
                self.garbagedata = lambda q: 'cant persist'

            def upgradeToVersion3(self):
                self.v3 += 1

            def upgradeToVersion4(self):
                self.v4 += 1
        mv = MyVersioned()
        assert not (mv.v3 or mv.v4), "hasn't been upgraded yet"
        pickl = pickle.dumps(mv)
        MyVersioned.persistenceVersion = 4
        obj = pickle.loads(pickl)
        styles.doUpgrade()
        assert obj.v3, "didn't do version 3 upgrade"
        assert obj.v4, "didn't do version 4 upgrade"
        pickl = pickle.dumps(obj)
        obj = pickle.loads(pickl)
        styles.doUpgrade()
        assert obj.v3 == 1, "upgraded unnecessarily"
        assert obj.v4 == 1, "upgraded unnecessarily"
    
    def testNonIdentityHash(self):
        global ClassWithCustomHash
        class ClassWithCustomHash(styles.Versioned):
            def __init__(self, unique, hash):
                self.unique = unique
                self.hash = hash
            def __hash__(self):
                return self.hash
        
        v1 = ClassWithCustomHash('v1', 0)
        v2 = ClassWithCustomHash('v2', 0)

        pkl = pickle.dumps((v1, v2))
        del v1, v2
        ClassWithCustomHash.persistenceVersion = 1
        ClassWithCustomHash.upgradeToVersion1 = lambda self: setattr(self, 'upgraded', True)
        v1, v2 = pickle.loads(pkl)
        styles.doUpgrade()
        self.assertEquals(v1.unique, 'v1')
        self.assertEquals(v2.unique, 'v2')
        self.failUnless(v1.upgraded)
        self.failUnless(v2.upgraded)
    
    def testUpgradeDeserializesObjectsRequiringUpgrade(self):
        global ToyClassA, ToyClassB
        class ToyClassA(styles.Versioned):
            pass
        class ToyClassB(styles.Versioned):
            pass
        x = ToyClassA()
        y = ToyClassB()
        pklA, pklB = pickle.dumps(x), pickle.dumps(y)
        del x, y
        ToyClassA.persistenceVersion = 1
        def upgradeToVersion1(self):
            self.y = pickle.loads(pklB)
            styles.doUpgrade()
        ToyClassA.upgradeToVersion1 = upgradeToVersion1
        ToyClassB.persistenceVersion = 1
        ToyClassB.upgradeToVersion1 = lambda self: setattr(self, 'upgraded', True)

        x = pickle.loads(pklA)
        styles.doUpgrade()
        self.failUnless(x.y.upgraded)

class MyEphemeral(styles.Ephemeral):

    def __init__(self, x):
        self.x = x


class EphemeralTestCase(unittest.TestCase):

    def testEphemeral(self):
        o = MyEphemeral(3)
        self.assertEquals(o.__class__, MyEphemeral)
        self.assertEquals(o.x, 3)
        
        pickl = pickle.dumps(o)
        o = pickle.loads(pickl)
        
        self.assertEquals(o.__class__, styles.Ephemeral)
        self.assert_(not hasattr(o, 'x'))


class Pickleable:

    def __init__(self, x):
        self.x = x
    
    def getX(self):
        return self.x

class A:
    """
    dummy class
    """
    def amethod(self):
        pass

class B:
    """
    dummy class
    """
    def bmethod(self):
        pass


class Marmaladeable(marmalade.DOMJellyable):

    jellyDOMVersion = 1

    def __init__(self, integer, instance, name, sequence):
        self.integer = integer
        self.instance = instance
        self.sequence = sequence
        self.name = name

    def jellyToDOM_1(self, jellier, element):
        from twisted.python.reflect import qual
        element.setAttribute("integer", str(self.integer))
        element.setAttribute("instance", qual(self.instance.__class__)) # not l33t enough
        element.setAttribute("name", str(self.name))
        # oops forgot self.sequence

    def unjellyFromDOM_1(self, unjellier, element):
        from twisted.python.reflect import namedClass
        self.integer = int(element.getAttribute("integer"))
        self.instance = namedClass(element.getAttribute("instance"))()
        self.name = element.getAttribute("name")
        # just give us any ol' list
        self.sequence = [self.instance, self.instance]

    def jellyToDOM_2(self, jellier, element):
        element.setAttribute("integer", str(self.integer))
        element.setAttribute("name", str(self.name))
        instanceNode = jellier.jellyToNode(self.instance) # l33ter!
        instanceNode.setAttribute("parent:role", "instance")
        element.appendChild(instanceNode)
        i = 0
        for seqel in self.sequence:
            seqNode = jellier.jellyToNode(seqel)
            seqNode.setAttribute("parent:role", "sequence:%d" % i)
            element.appendChild(seqNode)
            i = i + 1

    def unjellyFromDOM_2(self, unjellier, element):
        self.integer = int(element.getAttribute("integer"))
        self.name = element.getAttribute("name")

        # Note to people reading this as an example: if you don't use
        # "unjellyInto", and instead use "unjellyFromNode", it will appear to
        # work.  _however_, it will also have some really surprising results
        # when you have references in your application; i.e. you will get
        # _Dereference instances in places where you thought you should have
        # references to back-referenced data.  I am working on making this
        # simpler.
        from twisted.web.microdom import Element
        self.sequence = []
        i = 0
        for node in element.childNodes:
            if isinstance(node, Element):
                if node.getAttribute("parent:role") == 'instance':
                    unjellier.unjellyAttribute(self, "instance", node)
                else:
                    self.sequence.append(None)
                    unjellier.unjellyLater(node).addCallback(
                        self.gotSequenceItem, i)
                    i = i + 1

    def gotSequenceItem(self, seqitem, num):
        self.sequence[num] = seqitem


def funktion():
    pass

class MarmaladeTestCase(unittest.TestCase):

    def testMarmaladeable(self):
        m = Marmaladeable(1, B(), "testing", [1, 2, 3])
        s = marmalade.jellyToXML(m)
        u = marmalade.unjellyFromXML(s)
        assert u.sequence == [u.instance, u.instance]
        u.sequence.append(u.instance)
        u.jellyDOMVersion = 2
        s2 = marmalade.jellyToXML(u)
        u2 = marmalade.unjellyFromXML(s2)
        self.assertEquals( u2.sequence,  [u2.instance, u2.instance, u2.instance])

    def testCopyReg(self):
        s = "foo_bar"
        sio = StringIO.StringIO()
        sio.write(s)
        assert marmalade.unjellyFromXML(marmalade.jellyToXML({1:sio}))[1].getvalue() == s

    def testMethodSelfIdentity(self):
        a = A()
        b = B()
        a.bmethod = b.bmethod
        b.a = a
        im_ = marmalade.unjellyFromXML(marmalade.jellyToXML(b)).a.bmethod
        self.assertEquals(im_.im_class, im_.im_self.__class__)

    def testBasicIdentity(self):
        # Anyone wanting to make this datastructure more complex, and thus this
        # test more comprehensive, is welcome to do so.
        dj = marmalade.DOMJellier().jellyToNode
        d = {'hello': 'world', "method": dj}
        l = [1, 2, 3,
             "he\tllo\n\n\"x world!",
             u"goodbye \n\t\u1010 world!",
             1, 1.0, 100 ** 100l, unittest, marmalade.DOMJellier, d,
             funktion
             ]
        t = tuple(l)
        l.append(l)
        l.append(t)
        l.append(t)
        uj = marmalade.unjellyFromXML(marmalade.jellyToXML([l, l]))
        assert uj[0] is uj[1]
        assert uj[1][0:5] == l[0:5]

class PicklingTestCase(unittest.TestCase):
    """Test pickling of extra object types."""
    
    def testModule(self):
        pickl = pickle.dumps(styles)
        o = pickle.loads(pickl)
        self.assertEquals(o, styles)
    
    def testClassMethod(self):
        pickl = pickle.dumps(Pickleable.getX)
        o = pickle.loads(pickl)
        self.assertEquals(o, Pickleable.getX)
    
    def testInstanceMethod(self):
        obj = Pickleable(4)
        pickl = pickle.dumps(obj.getX)
        o = pickle.loads(pickl)
        self.assertEquals(o(), 4)
        self.assertEquals(type(o), type(obj.getX))
    
    def testStringIO(self):
        f = StringIO.StringIO()
        f.write("abc")
        pickl = pickle.dumps(f)
        o = pickle.loads(pickl)
        self.assertEquals(type(o), type(f))
        self.assertEquals(f.getvalue(), "abc")


class EvilSourceror:
    def __init__(self, x):
        self.a = self
        self.a.b = self
        self.a.b.c = x

class NonDictState:
    def __getstate__(self):
        return self.state
    def __setstate__(self, state):
        self.state = state

class AOTTestCase(unittest.TestCase):
    def testMethodSelfIdentity(self):
        a = A()
        b = B()
        a.bmethod = b.bmethod
        b.a = a
        im_ = aot.unjellyFromSource(aot.jellyToSource(b)).a.bmethod
        self.assertEquals(im_.im_class, im_.im_self.__class__)

    def testBasicIdentity(self):
        # Anyone wanting to make this datastructure more complex, and thus this
        # test more comprehensive, is welcome to do so.
        aj = aot.AOTJellier().jellyToAO
        d = {'hello': 'world', "method": aj}
        l = [1, 2, 3,
             "he\tllo\n\n\"x world!",
             u"goodbye \n\t\u1010 world!",
             1, 1.0, 100 ** 100l, unittest, aot.AOTJellier, d,
             funktion
             ]
        t = tuple(l)
        l.append(l)
        l.append(t)
        l.append(t)
        uj = aot.unjellyFromSource(aot.jellyToSource([l, l]))
        assert uj[0] is uj[1]
        assert uj[1][0:5] == l[0:5]


    def testNonDictState(self):
        a = NonDictState()
        a.state = "meringue!"
        assert aot.unjellyFromSource(aot.jellyToSource(a)).state == a.state

    def testCopyReg(self):
        s = "foo_bar"
        sio = StringIO.StringIO()
        sio.write(s)
        uj = aot.unjellyFromSource(aot.jellyToSource(sio))
        # print repr(uj.__dict__)
        assert uj.getvalue() == s

    def testFunkyReferences(self):
        o = EvilSourceror(EvilSourceror([]))
        j1 = aot.jellyToAOT(o)
        oj = aot.unjellyFromAOT(j1)

        assert oj.a is oj
        assert oj.a.b is oj.b
        assert oj.c is not oj.c.c

testCases = [VersionTestCase, EphemeralTestCase, PicklingTestCase]