test_mail.py   [plain text]



# Twisted, the Framework of Your Internet
# Copyright (C) 2001 Matthew W. Lefkowitz
# 
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
# 
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
# 
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

import os
import md5
import shutil
import smtplib
import pickle
import StringIO
import rfc822

from twisted.trial import unittest
from twisted.protocols import smtp
from twisted.protocols import pop3
from twisted.protocols import dns
from twisted.protocols import basic
from twisted.internet import protocol
from twisted.internet import defer
from twisted.internet import reactor
from twisted.internet import interfaces
from twisted.internet.error import DNSLookupError, CannotListenError
from twisted.python import components
from twisted.python import failure
from twisted.python import util

from twisted import mail
import twisted.mail.mail
import twisted.mail.maildir
import twisted.mail.relay
import twisted.mail.relaymanager
import twisted.mail.protocols
import twisted.mail.alias

from twisted import cred
import twisted.cred.credentials
import twisted.cred.checkers
import twisted.cred.portal

# Since we run a couple processes, we need SignalMixin from test_process
import test_process

from proto_helpers import LineSendingProtocol

class DomainWithDefaultsTestCase(unittest.TestCase):
    def testMethods(self):
        d = dict([(x, x + 10) for x in range(10)])
        d = mail.mail.DomainWithDefaultDict(d, 'Default')
        
        self.assertEquals(len(d), 10)
        self.assertEquals(list(iter(d)), range(10))
        self.assertEquals(list(d.iterkeys()), list(iter(d)))

        items = list(d.iteritems())
        items.sort()
        self.assertEquals(items, [(x, x + 10) for x in range(10)])
        
        values = list(d.itervalues())
        values.sort()
        self.assertEquals(values, range(10, 20))
        
        items = d.items()
        items.sort()
        self.assertEquals(items, [(x, x + 10) for x in range(10)])
        
        values = d.values()
        values.sort()
        self.assertEquals(values, range(10, 20))
        
        for x in range(10):
            self.assertEquals(d[x], x + 10)
            self.assertEquals(d.get(x), x + 10)
            self.failUnless(x in d)
            self.failUnless(d.has_key(x))
        
        del d[2], d[4], d[6]
        
        self.assertEquals(len(d), 7)        
        self.assertEquals(d[2], 'Default')
        self.assertEquals(d[4], 'Default')
        self.assertEquals(d[6], 'Default')
        
        d.update({'a': None, 'b': (), 'c': '*'})
        self.assertEquals(len(d), 10)
        self.assertEquals(d['a'], None)
        self.assertEquals(d['b'], ())
        self.assertEquals(d['c'], '*')
        
        d.clear()
        self.assertEquals(len(d), 0)
        
        self.assertEquals(d.setdefault('key', 'value'), 'value')
        self.assertEquals(d['key'], 'value')
        
        self.assertEquals(d.popitem(), ('key', 'value'))
        self.assertEquals(len(d), 0)

class BounceTestCase(unittest.TestCase):
    def setUp(self):
        self.domain = mail.mail.BounceDomain()
    
    def testExists(self):
        self.assertRaises(smtp.AddressError, self.domain.exists, "any user")

    def testRelay(self):
        self.assertEquals(
            self.domain.willRelay("random q emailer", "protocol"),
            False
        )
    
    def testMessage(self):
        self.assertRaises(AssertionError, self.domain.startMessage, "whomever")
    
    def testAddUser(self):
        self.domain.addUser("bob", "password")
        self.assertRaises(smtp.SMTPBadRcpt, self.domain.exists, "bob")

class FileMessageTestCase(unittest.TestCase):
    def setUp(self):
        self.name = "fileMessage.testFile"
        self.final = "final.fileMessage.testFile"
        self.f = file(self.name, 'w')
        self.fp = mail.mail.FileMessage(self.f, self.name, self.final)
    
    def tearDown(self):
        try:
            self.f.close()
        except:
            pass
        try:
            os.remove(self.name)
        except:
            pass
        try:
            os.remove(self.final)
        except:
            pass

    def testFinalName(self):
        self.assertEquals(unittest.deferredResult(self.fp.eomReceived()), self.final)
        self.failUnless(self.f.closed)
        self.failIf(os.path.exists(self.name))

    def testContents(self):
        contents = "first line\nsecond line\nthird line\n"
        for line in contents.splitlines():
            self.fp.lineReceived(line)
        self.fp.eomReceived()
        self.assertEquals(file(self.final).read(), contents)
    
    def testInterrupted(self):
        contents = "first line\nsecond line\n"
        for line in contents.splitlines():
            self.fp.lineReceived(line)
        self.fp.connectionLost()
        self.failIf(os.path.exists(self.name))
        self.failIf(os.path.exists(self.final))

class MailServiceTestCase(unittest.TestCase):
    def setUp(self):
        self.service = mail.mail.MailService()

    def testFactories(self):
        f = self.service.getPOP3Factory()
        self.failUnless(isinstance(f, protocol.ServerFactory))
        self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), pop3.POP3)

        f = self.service.getSMTPFactory()
        self.failUnless(isinstance(f, protocol.ServerFactory))
        self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.SMTP)
        
        f = self.service.getESMTPFactory()
        self.failUnless(isinstance(f, protocol.ServerFactory))
        self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.ESMTP)
    
    def testPortals(self):
        o1 = object()
        o2 = object()
        self.service.portals['domain'] = o1
        self.service.portals[''] = o2
        
        self.failUnless(self.service.lookupPortal('domain') is o1)
        self.failUnless(self.service.defaultPortal() is o2)

class MaildirTestCase(unittest.TestCase):
    def setUp(self):
        self.d = self.mktemp()
        mail.maildir.initializeMaildir(self.d)
    
    def tearDown(self):
        shutil.rmtree(self.d)

    def testInitializer(self):
        d = self.d
        trash = os.path.join(d, '.Trash')

        self.failUnless(os.path.exists(d) and os.path.isdir(d))
        self.failUnless(os.path.exists(os.path.join(d, 'new')))
        self.failUnless(os.path.exists(os.path.join(d, 'cur')))
        self.failUnless(os.path.exists(os.path.join(d, 'tmp')))
        self.failUnless(os.path.isdir(os.path.join(d, 'new')))
        self.failUnless(os.path.isdir(os.path.join(d, 'cur')))
        self.failUnless(os.path.isdir(os.path.join(d, 'tmp')))
        
        self.failUnless(os.path.exists(os.path.join(trash, 'new')))
        self.failUnless(os.path.exists(os.path.join(trash, 'cur')))
        self.failUnless(os.path.exists(os.path.join(trash, 'tmp')))
        self.failUnless(os.path.isdir(os.path.join(trash, 'new')))
        self.failUnless(os.path.isdir(os.path.join(trash, 'cur')))
        self.failUnless(os.path.isdir(os.path.join(trash, 'tmp')))

    def testMailbox(self):
        j = os.path.join
        n = mail.maildir._generateMaildirName
        msgs = [j(b, n()) for b in ('cur', 'new') for x in range(5)]

        # Toss a few files into the mailbox
        i = 1
        for f in msgs:
            f = file(j(self.d, f), 'w')
            f.write('x' * i)
            f.close()
            i = i + 1

        mb = mail.maildir.MaildirMailbox(self.d)
        self.assertEquals(mb.listMessages(), range(1, 11))
        self.assertEquals(mb.listMessages(1), 2)
        self.assertEquals(mb.listMessages(5), 6)
        
        self.assertEquals(mb.getMessage(6).read(), 'x' * 7)
        self.assertEquals(mb.getMessage(1).read(), 'x' * 2)
        
        d = {}
        for i in range(10):
            u = mb.getUidl(i)
            self.failIf(u in d)
            d[u] = None
        
        p, f = os.path.split(msgs[5])
        
        mb.deleteMessage(5)
        self.assertEquals(mb.listMessages(5), 0)
        self.failUnless(os.path.exists(j(self.d, '.Trash', 'cur', f)))
        self.failIf(os.path.exists(j(self.d, msgs[5])))
        
        mb.undeleteMessages()
        self.assertEquals(mb.listMessages(5), 6)
        self.failIf(os.path.exists(j(self.d, '.Trash', 'cur', f)))
        self.failUnless(os.path.exists(j(self.d, msgs[5])))

class MaildirDirdbmDomainTestCase(unittest.TestCase):
    def setUp(self):
        self.P = self.mktemp()
        self.S = mail.mail.MailService()
        self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.P)
    
    def tearDown(self):
        shutil.rmtree(self.P)
    
    def testAddUser(self):
        toAdd = (('user1', 'pwd1'), ('user2', 'pwd2'), ('user3', 'pwd3'))
        for (u, p) in toAdd:
            self.D.addUser(u, p)
        
        for (u, p) in toAdd:
            self.failUnless(u in self.D.dbm)
            self.assertEquals(self.D.dbm[u], p)
            self.failUnless(os.path.exists(os.path.join(self.P, u)))
    
    def testCredentials(self):
        creds = self.D.getCredentialsCheckers()

        self.assertEquals(len(creds), 1)
        self.failUnless(components.implements(creds[0], cred.checkers.ICredentialsChecker))
        self.failUnless(cred.credentials.IUsernamePassword in creds[0].credentialInterfaces)
    
    def testRequestAvatar(self):
        class ISomething(components.Interface):
            pass
        
        self.D.addUser('user', 'password')
        self.assertRaises(
            NotImplementedError,
            self.D.requestAvatar, 'user', None, ISomething
        )
        
        t = self.D.requestAvatar('user', None, pop3.IMailbox)
        self.assertEquals(len(t), 3)
        self.failUnless(t[0] is pop3.IMailbox)
        self.failUnless(components.implements(t[1], pop3.IMailbox))
        
        t[2]()
    
    def testRequestAvatarId(self):
        self.D.addUser('user', 'password')
        database = self.D.getCredentialsCheckers()[0]
        
        creds = cred.credentials.UsernamePassword('user', 'wrong password')
        self.assertRaises(
            cred.error.UnauthorizedLogin,
            database.requestAvatarId, creds
        )
        
        creds = cred.credentials.UsernamePassword('user', 'password')
        self.assertEquals(database.requestAvatarId(creds), 'user')

class ServiceDomainTestCase(unittest.TestCase):
    def setUp(self):
        self.S = mail.mail.MailService()
        self.D = mail.protocols.DomainDeliveryBase(self.S, None)
        self.D.service = self.S
        self.D.protocolName = 'TEST'
        self.D.host = 'hostname'
        
        self.tmpdir = self.mktemp()
        domain = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
        domain.addUser('user', 'password')
        self.S.domains['test.domain'] = domain

    def tearDown(self):
        shutil.rmtree(self.tmpdir)

    def testReceivedHeader(self):
         hdr = self.D.receivedHeader(
             ('remotehost', '123.232.101.234'),
             smtp.Address('<someguy@somplace>'),
             ['user@host.name']
         )
         fp = StringIO.StringIO(hdr)
         m = rfc822.Message(fp)
         self.assertEquals(len(m.items()), 1)
         self.failUnless(m.has_key('Received'))

    def testValidateTo(self):
        user = smtp.User('user@test.domain', 'helo', None, 'wherever@whatever')
        self.failUnless(
            callable(unittest.deferredResult(
                defer.maybeDeferred(self.D.validateTo, user)
            ))
        )
        user = smtp.User('resu@test.domain', 'helo', None, 'wherever@whatever')
        self.assertEquals(
            unittest.deferredResult(
                self.D.validateTo(user).addErrback(
                    lambda f: f.trap(smtp.SMTPBadRcpt)
                )
            ), smtp.SMTPBadRcpt
        )

        user = smtp.User('user@domain.test', 'helo', None, 'wherever@whatever')
        self.assertEquals(
            unittest.deferredResult(
                self.D.validateTo(user).addErrback(
                    lambda f: f.trap(smtp.SMTPBadRcpt)
                )
            ), smtp.SMTPBadRcpt
        )
    
    def testValidateFrom(self):
        helo = ('hostname', '127.0.0.1')
        origin = smtp.Address('<user@hostname>')
        self.failUnless(self.D.validateFrom(helo, origin) is origin)
        
        helo = ('hostname', '1.2.3.4')
        origin = smtp.Address('<user@hostname>')
        self.failUnless(self.D.validateFrom(helo, origin) is origin)

        self.assertRaises(
            smtp.SMTPBadSender,
            self.D.validateFrom, None, origin
        )
    
class VirtualPOP3TestCase(unittest.TestCase):
    def setUp(self):
        self.tmpdir = self.mktemp()
        self.S = mail.mail.MailService()
        self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
        self.D.addUser('user', 'password')
        self.S.domains['test.domain'] = self.D
        
        portal = cred.portal.Portal(self.D)
        map(portal.registerChecker, self.D.getCredentialsCheckers())
        self.S.portals[''] = self.S.portals['test.domain'] = portal
        

        self.P = mail.protocols.VirtualPOP3()
        self.P.service = self.S
        self.P.magic = '<unit test magic>'
    
    def tearDown(self):
        shutil.rmtree(self.tmpdir)
    
    def testAuthenticateAPOP(self):
        result = unittest.deferredResult(
            self.P.authenticateUserAPOP(
                'user',
                md5.new(self.P.magic + 'password').hexdigest()
            )
        )
        
        self.assertEquals(len(result), 3)
        self.assertEquals(result[0], pop3.IMailbox)
        self.failUnless(components.implements(result[1], pop3.IMailbox))
        result[2]()
        
        self.assertEquals(
            unittest.deferredResult(
                self.P.authenticateUserAPOP(
                    'resu',
                    md5.new(self.P.magic + 'password').hexdigest()
                ).addErrback(lambda f: f.trap(cred.error.UnauthorizedLogin))
            ), cred.error.UnauthorizedLogin
        )
        
        self.assertEquals(
            unittest.deferredResult(
                self.P.authenticateUserAPOP(
                    'user',
                    md5.new('wrong digest').hexdigest()
                ).addErrback(lambda f: f.trap(cred.error.UnauthorizedLogin))
            ), cred.error.UnauthorizedLogin
        )

    def testAuthenticatePASS(self):
        result = unittest.deferredResult(
            self.P.authenticateUserPASS(
                'user',
                'password'
            )
        )
        
        self.assertEquals(len(result), 3)
        self.assertEquals(result[0], pop3.IMailbox)
        self.failUnless(components.implements(result[1], pop3.IMailbox))
        result[2]()
        
        self.assertEquals(
            unittest.deferredResult(
                self.P.authenticateUserPASS(
                    'resu', 'password'
                ).addErrback(lambda f: f.trap(cred.error.UnauthorizedLogin))
            ), cred.error.UnauthorizedLogin
        )
        
        self.assertEquals(
            unittest.deferredResult(
                self.P.authenticateUserPASS(
                    'user', 'wrong password'
                ).addErrback(lambda f: f.trap(cred.error.UnauthorizedLogin))
            ), cred.error.UnauthorizedLogin
        )

class empty(smtp.User):
    def __init__(self):
        pass

class RelayTestCase(unittest.TestCase):
    def testExists(self):
        service = mail.mail.MailService()
        domain = mail.relay.DomainQueuer(service)
        
        doRelay = [
            ('UNIX', '/var/run/mail-relay'),
            ('TCP', '127.0.0.1', 12345),
        ]
        
        dontRelay = [
            ('TCP', '192.168.2.1', 62),
            ('TCP', '1.2.3.4', 1943),
        ]
        
        for peer in doRelay:
            user = empty()
            user.orig = 'user@host'
            user.dest = 'tsoh@resu'
            user.protocol = empty()
            user.protocol.transport = empty()
            user.protocol.transport.getPeer = lambda: peer
            
            self.failUnless(callable(domain.exists(user)))
        
        for peer in dontRelay:
            user = empty()
            user.orig = 'some@place'
            user.protocol = empty()
            user.protocol.transport = empty()
            user.protocol.transport.getPeer = lambda: peer
            user.dest = 'who@cares'
            
            self.assertRaises(smtp.SMTPBadRcpt, domain.exists, user)

class RelayerTestCase(unittest.TestCase):
    def setUp(self):
        self.tmpdir = self.mktemp()
        os.mkdir(self.tmpdir)
        self.messageFiles = []
        for i in range(10):
            name = os.path.join(self.tmpdir, 'body-%d' % (i,))
            f = file(name + '-H', 'w')
            pickle.dump(['from-%d' % (i,), 'to-%d' % (i,)], f)
            f.close()

            f = file(name + '-D', 'w')
            f.write(name)
            f.seek(0, 0)
            self.messageFiles.append(name)

        self.R = mail.relay.RelayerMixin()
        self.R.loadMessages(self.messageFiles)
    
    def tearDown(self):
        shutil.rmtree(self.tmpdir)
    
    def testMailFrom(self):
        for i in range(10):
            self.assertEquals(self.R.getMailFrom(), 'from-%d' % (i,))
            self.R.sentMail(250, None, None, None, None)
        self.assertEquals(self.R.getMailFrom(), None)
    
    def testMailTo(self):
        for i in range(10):
            self.assertEquals(self.R.getMailTo(), ['to-%d' % (i,)])
            self.R.sentMail(250, None, None, None, None)
        self.assertEquals(self.R.getMailTo(), None)
    
    def testMailData(self):
        for i in range(10):
            name = os.path.join(self.tmpdir, 'body-%d' % (i,))
            self.assertEquals(self.R.getMailData().read(), name)
            self.R.sentMail(250, None, None, None, None)
        self.assertEquals(self.R.getMailData(), None)

class Manager:
    def __init__(self):
        self.success = []
        self.failure = []
        self.done = []

    def notifySuccess(self, factory, message):
        self.success.append((factory, message))
    
    def notifyFailure(self, factory, message):
        self.failure.append((factory, message))
    
    def notifyDone(self, factory):
        self.done.append(factory)

class ManagedRelayerTestCase(unittest.TestCase):
    def setUp(self):
        self.manager = Manager()
        self.messages = range(0, 20, 2)
        self.factory = object()
        self.relay = mail.relaymanager.ManagedRelayerMixin(self.manager)
        self.relay.messages = self.messages[:]
        self.relay.names = self.messages[:]
        self.relay.factory = self.factory
    
    def testSuccessfulSentMail(self):
        for i in self.messages:
            self.relay.sentMail(250, None, None, None, None)
        
        self.assertEquals(
            self.manager.success,
            [(self.factory, m) for m in self.messages]
        )

    def testFailedSentMail(self):
        for i in self.messages:
            self.relay.sentMail(550, None, None, None, None)
        
        self.assertEquals(
            self.manager.failure,
            [(self.factory, m) for m in self.messages]
        )

    def testConnectionLost(self):
        self.relay.connectionLost(failure.Failure(Exception()))
        self.assertEquals(self.manager.done, [self.factory])

class DirectoryQueueTestCase(unittest.TestCase):
    def setUp(self):
        # This is almost a test case itself.
        self.tmpdir = self.mktemp()
        os.mkdir(self.tmpdir)
        self.queue = mail.relaymanager.Queue(self.tmpdir)
        for m in range(25):
            hdrF, msgF = self.queue.createNewMessage()
            pickle.dump(['header', m], hdrF)
            hdrF.close()
            msgF.lineReceived('body: %d' % (m,))
            msgF.eomReceived()
        self.queue.readDirectory()
    
    def tearDown(self):
        shutil.rmtree(self.tmpdir)

    def testWaiting(self):
        self.failUnless(self.queue.hasWaiting())
        self.assertEquals(len(self.queue.getWaiting()), 25)
        
        waiting = self.queue.getWaiting()
        self.queue.setRelaying(waiting[0])
        self.assertEquals(len(self.queue.getWaiting()), 24)
        
        self.queue.setWaiting(waiting[0])
        self.assertEquals(len(self.queue.getWaiting()), 25)

    def testRelaying(self):
        for m in self.queue.getWaiting():
            self.queue.setRelaying(m)
            self.assertEquals(
                len(self.queue.getRelayed()), 
                25 - len(self.queue.getWaiting())
            )

        self.failIf(self.queue.hasWaiting())

        relayed = self.queue.getRelayed()
        self.queue.setWaiting(relayed[0])
        self.assertEquals(len(self.queue.getWaiting()), 1)
        self.assertEquals(len(self.queue.getRelayed()), 24)

    def testDone(self):
        msg = self.queue.getWaiting()[0]
        self.queue.setRelaying(msg)
        self.queue.done(msg)
        
        self.assertEquals(len(self.queue.getWaiting()), 24)
        self.assertEquals(len(self.queue.getRelayed()), 0)
        
        self.failIf(msg in self.queue.getWaiting())
        self.failIf(msg in self.queue.getRelayed())
    
    def testEnvelope(self):
        envelopes = []
        
        for msg in self.queue.getWaiting():
            envelopes.append(self.queue.getEnvelope(msg))
        
        envelopes.sort()
        for i in range(25):
            self.assertEquals(
                envelopes.pop(0),
                ['header', i]
            )

from twisted.names import server
from twisted.names import client
from twisted.names import common

class TestAuthority(common.ResolverBase):
    def __init__(self):
        common.ResolverBase.__init__(self)
        self.addresses = {}
        
    def _lookup(self, name, cls, type, timeout = None):
        if name in self.addresses and type == dns.MX:
            results = []
            for a in self.addresses[name]:
                hdr = dns.RRHeader(
                    name, dns.MX, dns.IN, 60, dns.Record_MX(0, a)
                )
                results.append(hdr)
            return defer.succeed((results, [], []))
        return defer.fail(failure.Failure(dns.DomainError(name)))

def setUpDNS(self):
    self.auth = TestAuthority()
    factory = server.DNSServerFactory([self.auth])
    protocol = dns.DNSDatagramProtocol(factory)
    while 1:
        self.port = reactor.listenTCP(0, factory, interface='127.0.0.1')
        portNumber = self.port.getHost()[2]
        
        try:
            self.udpPort = reactor.listenUDP(portNumber, protocol, interface='127.0.0.1')
        except CannotListenError:
            self.port.stopListening()
        else:
            break
    self.resolver = client.Resolver(servers=[('127.0.0.1', portNumber)])

def tearDownDNS(self):
    self.port.stopListening()
    self.udpPort.stopListening()
    try:
        self.resolver._parseCall.cancel()
    except:
        pass

class MXTestCase(unittest.TestCase):
    def setUp(self):
        setUpDNS(self)
        self.mx = mail.relaymanager.MXCalculator(self.resolver)
    
    def tearDown(self):
        tearDownDNS(self)
    
    def testSimpleSuccess(self):
        self.auth.addresses['test.domain'] = ['the.email.test.domain']
        
        mx = unittest.deferredResult(self.mx.getMX('test.domain'))
        self.assertEquals(mx.preference, 0)
        self.assertEquals(str(mx.exchange), 'the.email.test.domain')
    
    def testSimpleFailure(self):
        self.mx.fallbackToDomain = False
        self.assertEquals(
            unittest.deferredError(self.mx.getMX('test.domain')).type,
            IOError
        )

    def testSimpleFailureWithFallback(self):
        self.assertEquals(
            unittest.deferredError(self.mx.getMX('test.domain')).type,
            DNSLookupError
        )
    
    def testManyRecords(self):
        self.auth.addresses['test.domain'] = [
            'mx1.test.domain', 'mx2.test.domain', 'mx3.test.domain'
        ]
        
        mx = unittest.deferredResult(self.mx.getMX('test.domain'))
        self.failUnless(str(mx.exchange).split('.', 1)[0] in ('mx1', 'mx2', 'mx3'))
        
        self.mx.markBad(mx)
        
        nextMX = unittest.deferredResult(self.mx.getMX('test.domain'))
        self.assertNotEqual(str(mx.exchange), str(nextMX.exchange))
        
        self.mx.markBad(nextMX)
        
        lastMX = unittest.deferredResult(self.mx.getMX('test.domain'))
        self.assertNotEqual(str(mx.exchange), str(lastMX.exchange))
        self.assertNotEqual(str(nextMX.exchange), str(lastMX.exchange))
        
        self.mx.markBad(lastMX)
        self.mx.markGood(nextMX)
        
        againMX = unittest.deferredResult(self.mx.getMX('test.domain'))
        self.assertEqual(str(againMX.exchange), str(nextMX.exchange))

class LiveFireExercise(unittest.TestCase):
    if interfaces.IReactorUDP(reactor, default=None) is None:
        skip = "UDP support is required to determining MX records"

    def setUp(self):
        setUpDNS(self)
        self.tmpdirs = [
            'domainDir', 'insertionDomain', 'insertionQueue',
            'destinationDomain', 'destinationQueue'
        ]

    def tearDown(self):
        tearDownDNS(self)
        for d in self.tmpdirs:
            if os.path.exists(d):
                shutil.rmtree(d)

    def testLocalDelivery(self):
        service = mail.mail.MailService()
        service.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
        domain = mail.maildir.MaildirDirdbmDomain(service, 'domainDir')
        domain.addUser('user', 'password')
        service.domains['test.domain'] = domain
        service.portals['test.domain'] = cred.portal.Portal(domain)
        service.portals[''] = service.portals['test.domain']
        map(service.portals[''].registerChecker, domain.getCredentialsCheckers())
        
        service.setQueue(mail.relay.DomainQueuer(service))
        manager = mail.relaymanager.SmartHostSMTPRelayingManager(service.queue, None)
        helper = mail.relaymanager.RelayStateHelper(manager, 1)

        f = service.getSMTPFactory()
        
        self.smtpServer = reactor.listenTCP(0, f, interface='127.0.0.1')

        client = LineSendingProtocol([
            'HELO meson',
            'MAIL FROM: <user@hostname>',
            'RCPT TO: <user@test.domain>',
            'DATA',
            'This is the message',
            '.',
            'QUIT'
        ])
        
        done = []
        f = protocol.ClientFactory()
        f.protocol = lambda: client
        f.clientConnectionLost = lambda *args: done.append(None)
        reactor.connectTCP('127.0.0.1', self.smtpServer.getHost()[2], f)

        i = 0
        while len(done) == 0 and i < 1000:
            reactor.iterate(0.01)
            i += 1
        
        self.failUnless(done)
        
        mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
        msg = mbox.getMessage(0).read()
        self.failIfEqual(msg.find('This is the message'), -1)
        
        self.smtpServer.stopListening()

    def testRelayDelivery(self):
        # Here is the service we will connect to and send mail from
        insServ = mail.mail.MailService()
        insServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
        domain = mail.maildir.MaildirDirdbmDomain(insServ, 'insertionDomain')
        insServ.domains['insertion.domain'] = domain
        insServ.portals['insertion.domain'] = cred.portal.Portal(domain)
        os.mkdir('insertionQueue')
        insServ.setQueue(mail.relaymanager.Queue('insertionQueue'))
        insServ.domains.setDefaultDomain(mail.relay.DomainQueuer(insServ))
        manager = mail.relaymanager.SmartHostSMTPRelayingManager(insServ.queue)
        manager.fArgs += ('test.identity.hostname',)
        helper = mail.relaymanager.RelayStateHelper(manager, 1)
        # Yoink!  Now the internet obeys OUR every whim!
        manager.mxcalc = mail.relaymanager.MXCalculator(self.resolver)
        # And this is our whim.
        self.auth.addresses['destination.domain'] = ['localhost']
        
        f = insServ.getSMTPFactory()
        self.insServer = reactor.listenTCP(0, f, interface='127.0.0.1')
        
        # Here is the service the previous one will connect to for final
        # delivery
        destServ = mail.mail.MailService()
        destServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
        domain = mail.maildir.MaildirDirdbmDomain(destServ, 'destinationDomain')
        domain.addUser('user', 'password')
        destServ.domains['destination.domain'] = domain
        destServ.portals['destination.domain'] = cred.portal.Portal(domain)
        os.mkdir('destinationQueue')
        destServ.setQueue(mail.relaymanager.Queue('destinationQueue'))
        manager2 = mail.relaymanager.SmartHostSMTPRelayingManager(destServ.queue)
        helper = mail.relaymanager.RelayStateHelper(manager, 1)
        helper.startService()
        
        f = destServ.getSMTPFactory()
        self.destServer = reactor.listenTCP(0, f, interface='127.0.0.1')
        
        # Update the port number the *first* relay will connect to, because we can't use
        # port 25
        manager.PORT = self.destServer.getHost()[2]

        client = LineSendingProtocol([
            'HELO meson',
            'MAIL FROM: <user@wherever>',
            'RCPT TO: <user@destination.domain>',
            'DATA',
            'This is the message',
            '.',
            'QUIT'
        ])

        done = []
        f = protocol.ClientFactory()
        f.protocol = lambda: client
        f.clientConnectionLost = lambda *args: done.append(None)
        reactor.connectTCP('127.0.0.1', self.insServer.getHost()[2], f)
        
        i = 0
        while len(done) == 0 and i < 1000:
            reactor.iterate(0.01)
            i += 1
        
        self.failUnless(done)

        # First part of the delivery is done.  Poke the queue manually now
        # so we don't have to wait for the queue to be flushed.
        manager.checkState()

        for i in range(1000):
            reactor.iterate(0.01)
        
        mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
        msg = mbox.getMessage(0).read()
        self.failIfEqual(msg.find('This is the message'), -1)
        
        self.insServer.stopListening()
        self.destServer.stopListening()
        helper.stopService()

aliasFile = StringIO.StringIO("""\
# Here's a comment
   # woop another one
testuser:                   address1,address2, address3,
    continuation@address, |/bin/process/this

usertwo:thisaddress,thataddress, lastaddress
lastuser:       :/includable, /filename, |/program, address
""")

class LineBufferMessage:
    def __init__(self):
        self.lines = []
        self.eom = False
        self.lost = False

    def lineReceived(self, line):
        self.lines.append(line)
    
    def eomReceived(self):
        self.eom = True
        return defer.succeed('<Whatever>')
    
    def connectionLost(self):
        self.lost = True

class AliasTestCase(unittest.TestCase):
    lines = [
        'First line',
        'Next line',
        '',
        'After a blank line',
        'Last line'
    ]

    def setUp(self):
        aliasFile.seek(0)
    
    def testHandle(self):
        result = {}
        lines = [
            'user:  another@host\n',
            'nextuser:  |/bin/program\n',
            'user:  me@again\n',
            'moreusers: :/etc/include/filename\n',
            'multiuser: first@host, second@host,last@anotherhost',
        ]
        
        for l in lines:
            mail.alias.handle(result, l, 'TestCase', None)
        
        self.assertEquals(result['user'], ['another@host', 'me@again'])
        self.assertEquals(result['nextuser'], ['|/bin/program'])
        self.assertEquals(result['moreusers'], [':/etc/include/filename'])
        self.assertEquals(result['multiuser'], ['first@host', 'second@host', 'last@anotherhost'])

    def testFileLoader(self):
        domains = {'': object()}
        result = mail.alias.loadAliasFile(domains, fp=aliasFile)
        
        self.assertEquals(len(result), 3)
        
        group = result['testuser']
        s = str(group)
        for a in ('address1', 'address2', 'address3', 'continuation@address', '/bin/process/this'):
            self.failIfEqual(s.find(a), -1)
        self.assertEquals(len(group), 5)

        group = result['usertwo']
        s = str(group)
        for a in ('thisaddress', 'thataddress', 'lastaddress'):
            self.failIfEqual(s.find(a), -1)
        self.assertEquals(len(group), 3)

        group = result['lastuser']
        s = str(group)
        self.failUnlessEqual(s.find('/includable'), -1)
        for a in ('/filename', 'program', 'address'):
            self.failIfEqual(s.find(a), -1, '%s not found' % a)
        self.assertEquals(len(group), 3)

    def testMultiWrapper(self):
        msgs = LineBufferMessage(), LineBufferMessage(), LineBufferMessage()
        msg = mail.alias.MultiWrapper(msgs)
        
        for L in self.lines:
            msg.lineReceived(L)
        unittest.deferredResult(msg.eomReceived())
        
        for m in msgs:
            self.failUnless(m.eom)
            self.failIf(m.lost)
            self.assertEquals(self.lines, m.lines)

    def testFileAlias(self):
        tmpfile = self.mktemp()
        a = mail.alias.FileAlias(tmpfile, None, None)
        m = a.createMessageReceiver()
        
        for l in self.lines:
            m.lineReceived(l)
        unittest.deferredResult(m.eomReceived())
        
        lines = file(tmpfile).readlines()
        self.assertEquals([L[:-1] for L in lines], self.lines)

class ProcessAliasTestCase(test_process.SignalMixin, unittest.TestCase):
    lines = [
        'First line',
        'Next line',
        '',
        'After a blank line',
        'Last line'
    ]
    
    def setUpClass(self):
        self.DNSNAME = smtp.DNSNAME
        smtp.DNSNAME = ''
    
    def tearDownClass(self):
        smtp.DNSNAME = self.DNSNAME

    def tearDown(self):
        reactor.iterate()
        reactor.iterate()
        reactor.iterate()

    def testProcessAlias(self):
        path = util.sibpath(__file__, 'process.alias.sh')
        a = mail.alias.ProcessAlias(path, None, None)
        m = a.createMessageReceiver()
        
        for l in self.lines:
            m.lineReceived(l)
        unittest.deferredResult(m.eomReceived())
        
        lines = file('process.alias.out').readlines()
        self.assertEquals([L[:-1] for L in lines], self.lines)

    def testAliasResolution(self):
        aliases = {}
        domain = {'': TestDomain(aliases, ['user1', 'user2', 'user3'])}
        A1 = mail.alias.AliasGroup(['user1', '|echo', '/file'], domain, 'alias1')
        A2 = mail.alias.AliasGroup(['user2', 'user3'], domain, 'alias2')
        A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
        aliases.update({
            'alias1': A1,
            'alias2': A2,
            'alias3': A3,
        })

        r1 = map(str, A1.resolve(aliases).objs)
        r1.sort()
        p = reactor.spawnProcess(protocol.ProcessProtocol(), "process_reader.py")
        expected = map(str, [
            mail.alias.AddressAlias('user1', None, None),
            mail.alias.MessageWrapper(p, 'echo'),
            mail.alias.FileWrapper('/file'),
        ])
        expected.sort() 
        self.assertEquals(r1, expected)

        r2 = map(str, A2.resolve(aliases).objs)
        r2.sort()
        expected = map(str, [
            mail.alias.AddressAlias('user2', None, None),
            mail.alias.AddressAlias('user3', None, None)
        ])
        expected.sort()
        self.assertEquals(r2, expected)

        r3 = map(str, A3.resolve(aliases).objs)
        r3.sort()
        expected = map(str, [
            mail.alias.AddressAlias('user1', None, None),
            mail.alias.MessageWrapper(p, 'echo'),
            mail.alias.FileWrapper('/file'),
        ])
        expected.sort() 
        self.assertEquals(r3, expected)

    def testCyclicAlias(self):
        aliases = {}
        domain = {'': TestDomain(aliases, [])}
        A1 = mail.alias.AddressAlias('alias2', domain, 'alias1')
        A2 = mail.alias.AddressAlias('alias3', domain, 'alias2')
        A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
        aliases.update({
            'alias1': A1,
            'alias2': A2,
            'alias3': A3
        })

        self.assertEquals(aliases['alias1'].resolve(aliases), None)
        self.assertEquals(aliases['alias2'].resolve(aliases), None)
        self.assertEquals(aliases['alias3'].resolve(aliases), None)
        
        A4 = mail.alias.AliasGroup(['|echo', 'alias1'], domain, 'alias4')
        aliases['alias4'] = A4
        
        p = reactor.spawnProcess(protocol.ProcessProtocol(), "process_reader.py")
        r = map(str, A4.resolve(aliases).objs)
        r.sort()
        expected = map(str, [
            mail.alias.MessageWrapper(p, 'echo')
        ])
        self.assertEquals(r, expected)
        reactor.iterate()
        reactor.iterate()
        reactor.iterate()

if not components.implements(reactor, interfaces.IReactorProcess):
    ProcessAliasTestCase = "IReactorProcess not supported"

class TestDomain:
    def __init__(self, aliases, users):
        self.aliases = aliases
        self.users = users

    def exists(self, user, memo=None):
        user = user.dest.local
        if user in self.users:
            return lambda: mail.alias.AddressAlias(user, None, None)
        try:
            a = self.aliases[user]
        except:
            raise smtp.SMTPBadRcpt(user)            
        else:
            aliases = a.resolve(self.aliases, memo)
            if aliases:
                return lambda: aliases
            raise smtp.SMTPBadRcpt(user)


from twisted.python.runtime import platformType
import types
if platformType != "posix":
    for o in locals().values():
        if isinstance(o, (types.ClassType, type)) and issubclass(o, unittest.TestCase):
            o.skip = "twisted.mail only works on posix"