OldStyleMemberships.py [plain text]
"""Old style Mailman membership adaptor.
This adaptor gets and sets member information on the MailList object given to
the constructor. It also equates member keys and lower-cased email addresses,
i.e. KEY is LCE.
This is the adaptor used by default in Mailman 2.1.
"""
import time
from types import StringType
from Mailman import mm_cfg
from Mailman import Utils
from Mailman import Errors
from Mailman import MemberAdaptor
ISREGULAR = 1
ISDIGEST = 2
class OldStyleMemberships(MemberAdaptor.MemberAdaptor):
def __init__(self, mlist):
self.__mlist = mlist
def getMembers(self):
return self.__mlist.members.keys() + self.__mlist.digest_members.keys()
def getRegularMemberKeys(self):
return self.__mlist.members.keys()
def getDigestMemberKeys(self):
return self.__mlist.digest_members.keys()
def __get_cp_member(self, member):
lcmember = member.lower()
missing = []
val = self.__mlist.members.get(lcmember, missing)
if val is not missing:
if isinstance(val, StringType):
return val, ISREGULAR
else:
return lcmember, ISREGULAR
val = self.__mlist.digest_members.get(lcmember, missing)
if val is not missing:
if isinstance(val, StringType):
return val, ISDIGEST
else:
return lcmember, ISDIGEST
return None, None
def isMember(self, member):
cpaddr, where = self.__get_cp_member(member)
if cpaddr is not None:
return 1
return 0
def getMemberKey(self, member):
cpaddr, where = self.__get_cp_member(member)
if cpaddr is None:
raise Errors.NotAMemberError, member
return member.lower()
def getMemberCPAddress(self, member):
cpaddr, where = self.__get_cp_member(member)
if cpaddr is None:
raise Errors.NotAMemberError, member
return cpaddr
def getMemberCPAddresses(self, members):
return [self.__get_cp_member(member)[0] for member in members]
def getMemberPassword(self, member):
secret = self.__mlist.passwords.get(member.lower())
if secret is None:
raise Errors.NotAMemberError, member
return secret
def authenticateMember(self, member, response):
secret = self.getMemberPassword(member)
if secret == response:
return secret
return 0
def __assertIsMember(self, member):
if not self.isMember(member):
raise Errors.NotAMemberError, member
def getMemberLanguage(self, member):
lang = self.__mlist.language.get(
member.lower(), self.__mlist.preferred_language)
if lang in self.__mlist.GetAvailableLanguages():
return lang
return self.__mlist.preferred_language
def getMemberOption(self, member, flag):
self.__assertIsMember(member)
if flag == mm_cfg.Digests:
cpaddr, where = self.__get_cp_member(member)
return where == ISDIGEST
option = self.__mlist.user_options.get(member.lower(), 0)
return not not (option & flag)
def getMemberName(self, member):
self.__assertIsMember(member)
return self.__mlist.usernames.get(member.lower())
def getMemberTopics(self, member):
self.__assertIsMember(member)
return self.__mlist.topics_userinterest.get(member.lower(), [])
def getDeliveryStatus(self, member):
self.__assertIsMember(member)
return self.__mlist.delivery_status.get(
member.lower(),
(MemberAdaptor.ENABLED, 0))[0]
def getDeliveryStatusChangeTime(self, member):
self.__assertIsMember(member)
return self.__mlist.delivery_status.get(
member.lower(),
(MemberAdaptor.ENABLED, 0))[1]
def getDeliveryStatusMembers(self, status=(MemberAdaptor.UNKNOWN,
MemberAdaptor.BYUSER,
MemberAdaptor.BYADMIN,
MemberAdaptor.BYBOUNCE)):
return [member for member in self.getMembers()
if self.getDeliveryStatus(member) in status]
def getBouncingMembers(self):
return [member.lower() for member in self.__mlist.bounce_info.keys()]
def getBounceInfo(self, member):
self.__assertIsMember(member)
return self.__mlist.bounce_info.get(member.lower())
def addNewMember(self, member, **kws):
assert self.__mlist.Locked()
if self.isMember(member):
raise Errors.MMAlreadyAMember, member
digest = 0
password = Utils.MakeRandomPassword()
language = self.__mlist.preferred_language
realname = None
if kws.has_key('digest'):
digest = kws['digest']
del kws['digest']
if kws.has_key('password'):
password = kws['password']
del kws['password']
if kws.has_key('language'):
language = kws['language']
del kws['language']
if kws.has_key('realname'):
realname = kws['realname']
del kws['realname']
if kws:
raise ValueError, kws.keys()
if Utils.LCDomain(member) == member.lower():
value = 0
else:
value = member
member = member.lower()
if digest:
self.__mlist.digest_members[member] = value
else:
self.__mlist.members[member] = value
self.setMemberPassword(member, password)
self.setMemberLanguage(member, language)
if realname:
self.setMemberName(member, realname)
if self.__mlist.new_member_options:
self.__mlist.user_options[member] = self.__mlist.new_member_options
def removeMember(self, member):
assert self.__mlist.Locked()
self.__assertIsMember(member)
memberkey = member.lower()
for attr in ('passwords', 'user_options', 'members', 'digest_members',
'language', 'topics_userinterest', 'usernames',
'bounce_info', 'delivery_status',
):
dict = getattr(self.__mlist, attr)
if dict.has_key(memberkey):
del dict[memberkey]
def changeMemberAddress(self, member, newaddress, nodelete=0):
assert self.__mlist.Locked()
self.__assertIsMember(member)
memberkey = member.lower()
fullname = self.getMemberName(memberkey)
flags = self.__mlist.user_options.get(memberkey, 0)
digestsp = self.getMemberOption(memberkey, mm_cfg.Digests)
password = self.__mlist.passwords.get(memberkey,
Utils.MakeRandomPassword())
lang = self.getMemberLanguage(memberkey)
delivery = self.__mlist.delivery_status.get(member.lower(),
(MemberAdaptor.ENABLED,0))
if not nodelete:
self.removeMember(memberkey)
self.addNewMember(newaddress, realname=fullname, digest=digestsp,
password=password, language=lang)
if flags:
self.__mlist.user_options[newaddress.lower()] = flags
if delivery[0] in (MemberAdaptor.BYUSER, MemberAdaptor.BYADMIN)\
and not nodelete:
self.__mlist.delivery_status[newaddress.lower()] = delivery
def setMemberPassword(self, memberkey, password):
assert self.__mlist.Locked()
self.__assertIsMember(memberkey)
self.__mlist.passwords[memberkey.lower()] = password
def setMemberLanguage(self, memberkey, language):
assert self.__mlist.Locked()
self.__assertIsMember(memberkey)
self.__mlist.language[memberkey.lower()] = language
def setMemberOption(self, member, flag, value):
assert self.__mlist.Locked()
self.__assertIsMember(member)
memberkey = member.lower()
if flag == mm_cfg.Digests:
if value:
if not self.__mlist.digestable:
raise Errors.CantDigestError
if self.__mlist.digest_members.has_key(memberkey):
raise Errors.AlreadyReceivingDigests, member
cpuser = self.__mlist.members.get(memberkey)
if cpuser is None:
raise Errors.NotAMemberError, member
del self.__mlist.members[memberkey]
self.__mlist.digest_members[memberkey] = cpuser
if self.__mlist.one_last_digest.has_key(memberkey):
del self.__mlist.one_last_digest[memberkey]
else:
if not self.__mlist.nondigestable:
raise Errors.MustDigestError
if self.__mlist.members.has_key(memberkey):
raise Errors.AlreadyReceivingRegularDeliveries, member
cpuser = self.__mlist.digest_members.get(memberkey)
if cpuser is None:
raise Errors.NotAMemberError, member
del self.__mlist.digest_members[memberkey]
self.__mlist.members[memberkey] = cpuser
self.__mlist.one_last_digest[memberkey] = cpuser
return
self.__mlist.user_options.setdefault(memberkey, 0)
if value:
self.__mlist.user_options[memberkey] |= flag
else:
self.__mlist.user_options[memberkey] &= ~flag
if not self.__mlist.user_options[memberkey]:
del self.__mlist.user_options[memberkey]
def setMemberName(self, member, realname):
assert self.__mlist.Locked()
self.__assertIsMember(member)
self.__mlist.usernames[member.lower()] = realname
def setMemberTopics(self, member, topics):
assert self.__mlist.Locked()
self.__assertIsMember(member)
memberkey = member.lower()
if topics:
self.__mlist.topics_userinterest[memberkey] = topics
elif self.__mlist.topics_userinterest.has_key(memberkey):
del self.__mlist.topics_userinterest[memberkey]
def setDeliveryStatus(self, member, status):
assert status in (MemberAdaptor.ENABLED, MemberAdaptor.UNKNOWN,
MemberAdaptor.BYUSER, MemberAdaptor.BYADMIN,
MemberAdaptor.BYBOUNCE)
assert self.__mlist.Locked()
self.__assertIsMember(member)
member = member.lower()
if status == MemberAdaptor.ENABLED:
self.setBounceInfo(member, None)
else:
self.__mlist.delivery_status[member] = (status, time.time())
def setBounceInfo(self, member, info):
assert self.__mlist.Locked()
self.__assertIsMember(member)
member = member.lower()
if info is None:
if self.__mlist.bounce_info.has_key(member):
del self.__mlist.bounce_info[member]
if self.__mlist.delivery_status.has_key(member):
del self.__mlist.delivery_status[member]
else:
self.__mlist.bounce_info[member] = info