"""Generic queue runner class.
"""
import time
import traceback
import weakref
from cStringIO import StringIO
from Mailman import mm_cfg
from Mailman import Utils
from Mailman import Errors
from Mailman import MailList
from Mailman import i18n
from Mailman.Queue.Switchboard import Switchboard
from Mailman.Logging.Syslog import syslog
try:
True, False
except NameError:
True = 1
False = 0
class Runner:
QDIR = None
SLEEPTIME = mm_cfg.QRUNNER_SLEEP_TIME
def __init__(self, slice=None, numslices=1):
self._kids = {}
self._switchboard = Switchboard(self.QDIR, slice, numslices)
self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR)
self._stop = False
def stop(self):
self._stop = True
def run(self):
try:
try:
while True:
filecnt = self._oneloop()
self._doperiodic()
if self._stop:
break
self._snooze(filecnt)
except KeyboardInterrupt:
pass
finally:
self._cleanup()
def _oneloop(self):
files = self._switchboard.files()
for filebase in files:
msg, msgdata = self._switchboard.dequeue(filebase)
if msg is None or msgdata is None:
syslog('error', 'lost data files for filebase: %s', filebase)
else:
try:
self._onefile(msg, msgdata)
except Exception, e:
self._log(e)
msgdata['whichq'] = self._switchboard.whichq()
filebase = self._shunt.enqueue(msg, msgdata)
syslog('error', 'SHUNTING: %s', filebase)
Utils.reap(self._kids, once=True)
self._doperiodic()
if self._shortcircuit():
break
return len(files)
def _onefile(self, msg, msgdata):
listname = msgdata.get('listname')
if not listname:
listname = mm_cfg.MAILMAN_SITE_LIST
mlist = self._open_list(listname)
if not mlist:
syslog('error',
'Dequeuing message destined for missing list: %s',
listname)
self._shunt.enqueue(msg, msgdata)
return
otranslation = i18n.get_translation()
sender = msg.get_sender()
if mlist:
lang = mlist.getMemberLanguage(sender)
else:
lang = mm_cfg.DEFAULT_SERVER_LANGUAGE
i18n.set_language(lang)
msgdata['lang'] = lang
try:
keepqueued = self._dispose(mlist, msg, msgdata)
finally:
i18n.set_translation(otranslation)
kids = msgdata.get('_kids')
if kids:
self._kids.update(kids)
if keepqueued:
self._switchboard.enqueue(msg, msgdata)
_listcache = weakref.WeakValueDictionary()
def _open_list(self, listname):
mlist = self._listcache.get(listname)
if not mlist:
try:
mlist = MailList.MailList(listname, lock=False)
except Errors.MMListError, e:
syslog('error', 'error opening list: %s\n%s', listname, e)
return None
else:
self._listcache[listname] = mlist
return mlist
def _log(self, exc):
syslog('error', 'Uncaught runner exception: %s', exc)
s = StringIO()
traceback.print_exc(file=s)
syslog('error', s.getvalue())
def _cleanup(self):
"""Clean up upon exit from the main processing loop.
Called when the Runner's main loop is stopped, this should perform
any necessary resource deallocation. Its return value is irrelevant.
"""
Utils.reap(self._kids)
def _dispose(self, mlist, msg, msgdata):
"""Dispose of a single message destined for a mailing list.
Called for each message that the Runner is responsible for, this is
the primary overridable method for processing each message.
Subclasses, must provide implementation for this method.
mlist is the MailList instance this message is destined for.
msg is the Message object representing the message.
msgdata is a dictionary of message metadata.
"""
raise NotImplementedError
def _doperiodic(self):
"""Do some processing `every once in a while'.
Called every once in a while both from the Runner's main loop, and
from the Runner's hash slice processing loop. You can do whatever
special periodic processing you want here, and the return value is
irrelevant.
"""
pass
def _snooze(self, filecnt):
"""Sleep for a little while.
filecnt is the number of messages in the queue the last time through.
Sub-runners can decide to continue to do work, or sleep for a while
based on this value. By default, we only snooze if there was nothing
to do last time around.
"""
if filecnt or self.SLEEPTIME <= 0:
return
time.sleep(self.SLEEPTIME)
def _shortcircuit(self):
"""Return a true value if the individual file processing loop should
exit before it's finished processing each message in the current slice
of hash space. A false value tells _oneloop() to continue processing
until the current snapshot of hash space is exhausted.
You could, for example, implement a throttling algorithm here.
"""
return self._stop