"""Bounce queue runner."""
import os
import re
import time
import cPickle
from email.MIMEText import MIMEText
from email.MIMEMessage import MIMEMessage
from email.Utils import parseaddr
from Mailman import mm_cfg
from Mailman import Utils
from Mailman import LockFile
from Mailman.Message import UserNotification
from Mailman.Bouncers import BouncerAPI
from Mailman.Queue.Runner import Runner
from Mailman.Queue.sbcache import get_switchboard
from Mailman.Logging.Syslog import syslog
from Mailman.i18n import _
COMMASPACE = ', '
try:
True, False
except NameError:
True = 1
False = 0
class BounceMixin:
def __init__(self):
self._bounce_events_file = os.path.join(
mm_cfg.DATA_DIR, 'bounce-events-%05d.pck' % os.getpid())
self._bounce_events_fp = None
self._bouncecnt = 0
self._nextaction = time.time() + mm_cfg.REGISTER_BOUNCES_EVERY
def _queue_bounces(self, listname, addrs, msg):
today = time.localtime()[:3]
if self._bounce_events_fp is None:
self._bounce_events_fp = open(self._bounce_events_file, 'a+b')
for addr in addrs:
cPickle.dump((listname, addr, today, msg),
self._bounce_events_fp, 1)
self._bounce_events_fp.flush()
os.fsync(self._bounce_events_fp.fileno())
self._bouncecnt += len(addrs)
def _register_bounces(self):
syslog('bounce', '%s processing %s queued bounces',
self, self._bouncecnt)
events = {}
self._bounce_events_fp.seek(0)
while True:
try:
listname, addr, day, msg = cPickle.load(self._bounce_events_fp)
except ValueError, e:
syslog('bounce', 'Error reading bounce events: %s', e)
except EOFError:
break
events.setdefault(listname, []).append((addr, day, msg))
for listname in events.keys():
mlist = self._open_list(listname)
mlist.Lock()
try:
for addr, day, msg in events[listname]:
mlist.registerBounce(addr, msg, day=day)
mlist.Save()
finally:
mlist.Unlock()
self._bounce_events_fp.close()
self._bounce_events_fp = None
os.unlink(self._bounce_events_file)
self._bouncecnt = 0
def _cleanup(self):
if self._bouncecnt > 0:
self._register_bounces()
def _doperiodic(self):
now = time.time()
if self._nextaction > now or self._bouncecnt == 0:
return
self._nextaction = now + mm_cfg.REGISTER_BOUNCES_EVERY
self._register_bounces()
def _probe_bounce(self, mlist, token):
locked = mlist.Locked()
if not locked:
mlist.Lock()
try:
op, addr, bmsg = mlist.pend_confirm(token)
info = mlist.getBounceInfo(addr)
mlist.disableBouncingMember(addr, info, bmsg)
if not locked:
mlist.Save()
finally:
if not locked:
mlist.Unlock()
class BounceRunner(Runner, BounceMixin):
QDIR = mm_cfg.BOUNCEQUEUE_DIR
def __init__(self, slice=None, numslices=1):
Runner.__init__(self, slice, numslices)
BounceMixin.__init__(self)
def _dispose(self, mlist, msg, msgdata):
mlist.Load()
outq = get_switchboard(mm_cfg.OUTQUEUE_DIR)
if msg.get('to', '') == Utils.get_site_email(extra='owner'):
outq.enqueue(msg, msgdata,
recips=[Utils.get_site_email()],
envsender=Utils.get_site_email(extra='loop'),
)
if not mlist.bounce_processing:
return
addrs = verp_bounce(mlist, msg)
if not addrs:
token = verp_probe(mlist, msg)
if token:
self._probe_bounce(mlist, token)
return
addrs = BouncerAPI.ScanMessages(mlist, msg)
if not addrs:
syslog('bounce', 'bounce message w/no discernable addresses: %s',
msg.get('message-id'))
maybe_forward(mlist, msg)
return
addrs = filter(None, addrs)
self._queue_bounces(mlist.internal_name(), addrs, msg)
_doperiodic = BounceMixin._doperiodic
def _cleanup(self):
BounceMixin._cleanup(self)
Runner._cleanup(self)
def verp_bounce(mlist, msg):
bmailbox, bdomain = Utils.ParseEmail(mlist.GetBouncesEmail())
vals = []
for header in ('to', 'delivered-to', 'envelope-to', 'apparently-to'):
vals.extend(msg.get_all(header, []))
for field in vals:
to = parseaddr(field)[1]
if not to:
continue mo = re.search(mm_cfg.VERP_REGEXP, to)
if not mo:
continue try:
if bmailbox <> mo.group('bounces'):
continue addr = '%s@%s' % mo.group('mailbox', 'host')
except IndexError:
syslog('error',
"VERP_REGEXP doesn't yield the right match groups: %s",
mm_cfg.VERP_REGEXP)
return []
return [addr]
def verp_probe(mlist, msg):
bmailbox, bdomain = Utils.ParseEmail(mlist.GetBouncesEmail())
vals = []
for header in ('to', 'delivered-to', 'envelope-to', 'apparently-to'):
vals.extend(msg.get_all(header, []))
for field in vals:
to = parseaddr(field)[1]
if not to:
continue mo = re.search(mm_cfg.VERP_PROBE_REGEXP, to)
if not mo:
continue try:
if bmailbox <> mo.group('bounces'):
continue token = mo.group('token')
data = mlist.pend_confirm(token, expunge=False)
if data is not None:
return token
except IndexError:
syslog(
'error',
"VERP_PROBE_REGEXP doesn't yield the right match groups: %s",
mm_cfg.VERP_PROBE_REGEXP)
return None
def maybe_forward(mlist, msg):
if mlist.bounce_unrecognized_goes_to_list_owner:
adminurl = mlist.GetScriptURL('admin', absolute=1) + '/bounce'
mlist.ForwardMessage(msg,
text=_("""\
The attached message was received as a bounce, but either the bounce format
was not recognized, or no member addresses could be extracted from it. This
mailing list has been configured to send all unrecognized bounce messages to
the list administrator(s).
For more information see:
%(adminurl)s
"""),
subject=_('Uncaught bounce notification'),
tomoderators=0)
syslog('bounce', 'forwarding unrecognized, message-id: %s',
msg.get('message-id', 'n/a'))
else:
syslog('bounce', 'discarding unrecognized, message-id: %s',
msg.get('message-id', 'n/a'))