Newer
Older
from postorius.models import List, MailmanUser
from postorius.utils import get_mailman_client
class Command(BaseCommand):
can_import_settings = True
help = 'Synchronize mailing list memberships from a LDAP server'
def handle(self, *args, **options):
ldap_backend = load_backend('django_auth_ldap.backend.LDAPBackend')
conn = ldap_backend.ldap.initialize(ldap_backend.settings.SERVER_URI, bytes_mode=False)
for opt, value in ldap_backend.settings.CONNECTION_OPTIONS.items():
conn.set_option(opt, value)
if ldap_backend.settings.START_TLS:
conn.start_tls_s()
conn.simple_bind_s(ldap_backend.settings.BIND_DN, ldap_backend.settings.BIND_PASSWORD)
client = get_mailman_client()
user_search = django.conf.settings.AUTH_LDAP_USER_SEARCH_ALL_NAME
results = user_search.execute(conn)
ldap_users = [list(attr.values())[0][0] for dn, attr in results ]
django_users = get_user_model().objects.all()
mm_users = {}
membership_settings = getattr(django.conf.settings, 'LDAP_MEMBERSHIP_SYNC', {})
# create all django user in mailman
for user in django_users:
mm_users[user.username] = MailmanUser.objects.get_or_create_from_django(user)
if mm_users[user.username].display_name != user.get_full_name():
logger.warning("update display_name on {} to {}".format(user.username, user.get_full_name()))
mm_users[user.username].display_name = user.get_full_name()
mm_users[user.username].save()
for list_name in membership_settings:
ldap_setting = membership_settings[list_name].get('ldap', {})
for membership_type in ldap_setting:
if not ldap_setting[membership_type].get('enabled'):
continue
ldap_members = LDAPSearch(
ldap_setting[membership_type]['dn'], ldap.SCOPE_SUBTREE,
ldap_setting[membership_type]['filter'],
[ldap_setting[membership_type]['username_attr']]).execute(conn)
ldap_member_names = [list(attr.values())[0][0] for dn, attr in ldap_members ]
# we refetch the mm_list each time because of wired caching problems
mm_list = client.get_list(list_name)
backref_mapping = {'member': 'members', 'moderator': 'moderators', 'owner': 'owners'}
mailman_id2username = {mm_users[i].user_id: i for i in mm_users}
mm_members = getattr(mm_list, backref_mapping[membership_type], [])
for mm_member in mm_members:
try:
username = mailman_id2username.get(mm_member.user.user_id, None)
if not username or not username in ldap_member_names:
# user should not be subscribed -> remove
logger.warning("remove {} ( {} ) as {} on {}".format(username, mm_member.user, membership_type, list_name))
mm_member.unsubscribe()
except Exception as e:
logger.exception(e)
continue
for username in mm_users:
if not username in ldap_member_names:
continue
try:
mm_user = mm_users[username]
displayname = mm_users[username].display_name or username
for mm_subscription in mm_user.subscriptions:
if mm_subscription.role == membership_type:
# user is already subscribed
break
else:
# user is not subscribed but should be subscribed -> subscribe
user_mail = mm_user.addresses[0].email
logger.warning("subscribe {} ( {} ) as {} on {}".format(username, user_mail, membership_type, list_name))
if membership_type == 'member':
mm_list.subscribe(user_mail,
display_name=displayname,
pre_verified=True,
pre_confirmed=True,
pre_approved=True)
elif membership_type == 'moderator':
mm_list.add_moderator(user_mail,
display_name=displayname)
elif membership_type == 'owner':
mm_list.add_moderator(user_mail,
display_name=displayname)
except Exception as e:
logger.exception(e)
continue