Skip to content
Snippets Groups Projects
Commit 50548dc9 authored by Julian's avatar Julian
Browse files

Incorporate django-auth-ldap-remoteuser and fix user mail address handling

Previously, users imported from LDAP had no verified mail address causing
many problems with Postorius and Hyperkitty.

Also prevents users from changing their username, first name or last name.

Fixes #3
parent 0a52d2b9
No related branches found
No related tags found
No related merge requests found
import logging
from django.contrib.auth import login
from django.contrib.auth.backends import RemoteUserBackend
from postorius_ldap_membership_management.utils import populate_user
logger = logging.getLogger(__name__)
class LdapRemoteUserBackend(RemoteUserBackend):
def authenticate(self, request, remote_user=None):
logging.info('auth as %s', remote_user)
user = populate_user(remote_user)
if user is not None:
login(request, user, backend='django_auth_ldap.backend.LDAPBackend')
logging.debug('loaded data for %s', remote_user)
return user
import logging import logging
import ldap import ldap
from urllib.error import HTTPError
from django.contrib.auth import get_user_model, load_backend import django.conf
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django.contrib.auth import get_user_model
from django_auth_ldap.config import LDAPSearch from django_auth_ldap.config import LDAPSearch
import django.conf from django_mailman3.lib.mailman import get_mailman_user, get_mailman_client
from urllib.error import HTTPError from postorius_ldap_membership_management.utils import get_ldap_connection, execute_ldap_search_without_hiding_errors
from postorius.models import List, MailmanUser
from postorius.utils import get_mailman_client
from allauth.account.models import EmailAddress
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def execute_ldap_search_without_hiding_errors(search, connection, filterargs=(), escape=True):
'''If an LDAPError occurs during search.execute, it logs the error and returns
an empty list of results. This behavor is indistinguishable from a query
with no results.'''
if not isinstance(search, LDAPSearch):
raise NotImplementedError(f'{type(search)} is not supported by postorius_ldap_membership_management')
# This is a copy of django_auth_ldap.config.LDAPSearch.execute without the
# ldap.LDAPError eating try-except block
if escape:
filterargs = search._escape_filterargs(filterargs)
filterstr = search.filterstr % filterargs
results = connection.search_s(search.base_dn, search.scope, filterstr, search.attrlist)
return search._process_results(results)
class Command(BaseCommand): class Command(BaseCommand):
can_import_settings = True can_import_settings = True
help = 'Synchronize mailing list memberships from a LDAP server' help = 'Synchronize mailing list memberships from a LDAP server'
def handle(self, *args, **options): def handle(self, *args, **options):
ldap_backend = load_backend('django_auth_ldap.backend.LDAPBackend') mm_client = get_mailman_client()
conn = ldap_backend.ldap.initialize(ldap_backend.settings.SERVER_URI, bytes_mode=False) ldap_conn = get_ldap_connection()
for opt, value in ldap_backend.settings.CONNECTION_OPTIONS.items():
conn.set_option(opt, value)
if ldap_backend.settings.START_TLS:
conn.start_tls_s()
conn.simple_bind_s(ldap_backend.settings.BIND_DN, ldap_backend.settings.BIND_PASSWORD)
client = get_mailman_client()
django_users = get_user_model().objects.filter(is_active=True)
backref_mapping = {'member': 'members', 'moderator': 'moderators', 'owner': 'owners'} backref_mapping = {'member': 'members', 'moderator': 'moderators', 'owner': 'owners'}
mm_users = {} mm_users = {}
membership_settings = getattr(django.conf.settings, 'LDAP_MEMBERSHIP_SYNC', {}) membership_settings = getattr(django.conf.settings, 'LDAP_MEMBERSHIP_SYNC', {})
# create all django user in mailman django_users = get_user_model().objects.filter(is_active=True)
for user in django_users: mm_users = {user.username: get_mailman_user(user) for user in django_users}
mm_users[user.username] = MailmanUser.objects.get_or_create_from_django(user)
if mm_users[user.username].display_name != user.get_full_name():
logger.warning("update display_name on {} to {}".format(user.username, user.get_full_name()))
mm_users[user.username].display_name = user.get_full_name()
mm_users[user.username].save()
# set user mail adresses to verified if they match those in ldap
user_emails = EmailAddress.objects.filter(user=user)
for mail in user_emails:
if mail.email == user.email and not mail.verified:
logger.warning("update email.verified on user {} for address {} to True".format(user.username, mail.email))
mail.verified = True
mail.save()
mailman_id2username = {mm_users[i].user_id: i for i in mm_users} mailman_id2username = {mm_users[i].user_id: i for i in mm_users}
for list_name in membership_settings: for list_name in membership_settings:
...@@ -75,11 +37,11 @@ class Command(BaseCommand): ...@@ -75,11 +37,11 @@ class Command(BaseCommand):
ldap_members = execute_ldap_search_without_hiding_errors(LDAPSearch( ldap_members = execute_ldap_search_without_hiding_errors(LDAPSearch(
ldap_setting[membership_type]['dn'], ldap.SCOPE_SUBTREE, ldap_setting[membership_type]['dn'], ldap.SCOPE_SUBTREE,
ldap_setting[membership_type]['filter'], ldap_setting[membership_type]['filter'],
[ldap_setting[membership_type]['username_attr']]), conn) [ldap_setting[membership_type]['username_attr']]), ldap_conn)
ldap_member_names = [list(attr.values())[0][0] for dn, attr in ldap_members] ldap_member_names = [list(attr.values())[0][0] for dn, attr in ldap_members]
# we refetch the mm_list each time because of wired caching problems # we refetch the mm_list each time because of wired caching problems
mm_list = client.get_list(list_name) mm_list = mm_client.get_list(list_name)
mm_members = getattr(mm_list, backref_mapping[membership_type], []) mm_members = getattr(mm_list, backref_mapping[membership_type], [])
for mm_member in mm_members: for mm_member in mm_members:
......
import logging
from django.core.management.base import BaseCommand
from django.contrib.auth import get_user_model
import django.conf
from django_mailman3.lib.mailman import get_mailman_user
from allauth.account.models import EmailAddress
from postorius_ldap_membership_management.utils import get_ldap_connection, execute_ldap_search_without_hiding_errors, populate_user
logger = logging.getLogger(__name__)
class Command(BaseCommand):
can_import_settings = True
help = 'Synchronize users from a LDAP server'
def handle(self, *args, **options):
ldap_conn = get_ldap_connection()
results = execute_ldap_search_without_hiding_errors(django.conf.settings.AUTH_LDAP_USER_SEARCH_ALL_NAME, ldap_conn)
ldap_usernames = [list(attr.values())[0][0] for dn, attr in results]
for username in ldap_usernames:
logger.warning(f'creating or updating {username}')
# populate_user ignores all errors
populate_user(username)
users = get_user_model().objects.filter(is_active=True)
for user in users:
if not user.username in ldap_usernames:
logger.warning(f'deactivating {user.username}')
user.is_active = False
user.save()
# Based on https://github.com/labd/django-session-timeout
# Copyright (c) 2017 Michael van Tellingen
import time
from django.conf import settings
from django.contrib.auth.views import redirect_to_login
from django.shortcuts import redirect
try:
from django.utils.deprecation import MiddlewareMixin
except ImportError:
MiddlewareMixin = object
SESSION_TIMEOUT_KEY = "_session_init_timestamp_"
class SessionTimeoutMiddleware(MiddlewareMixin):
def process_request(self, request):
if not hasattr(request, "session") or request.session.is_empty():
return
init_time = request.session.setdefault(SESSION_TIMEOUT_KEY, time.time())
expire_seconds = getattr(
settings, "SESSION_EXPIRE_SECONDS", settings.SESSION_COOKIE_AGE
)
session_is_expired = time.time() - init_time > expire_seconds
if session_is_expired:
request.session.flush()
redirect_url = getattr(settings, "SESSION_TIMEOUT_REDIRECT", None)
if redirect_url:
return redirect(redirect_url)
else:
return redirect_to_login(next=request.path)
expire_since_last_activity = getattr(
settings, "SESSION_EXPIRE_AFTER_LAST_ACTIVITY", False
)
grace_period = getattr(
settings, "SESSION_EXPIRE_AFTER_LAST_ACTIVITY_GRACE_PERIOD", 1
)
if expire_since_last_activity and time.time() - init_time > grace_period:
request.session[SESSION_TIMEOUT_KEY] = time.time()
import django_mailman3.forms
# We cannot allow users to change their username, because we use the username
# to connect LDAP users do Django users. Since there are no other options, we
# monkey-patch the UserProfileForm.
# With disabled set to True, client-side changes are ignored. So we don't
# need any more validation.
django_mailman3.forms.UserProfileForm.declared_fields['username'].disabled = True
django_mailman3.forms.UserProfileForm.declared_fields['first_name'].disabled = True
django_mailman3.forms.UserProfileForm.declared_fields['first_name'].required = False
django_mailman3.forms.UserProfileForm.declared_fields['last_name'].disabled = True
django_mailman3.forms.UserProfileForm.declared_fields['last_name'].required = False
import logging
import django.conf
from django.contrib.auth import load_backend
from allauth.account.models import EmailAddress
from django_auth_ldap.config import LDAPSearch
from django_mailman3.lib.mailman import get_mailman_user, sync_email_addresses, update_preferred_address
logger = logging.getLogger(__name__)
ldap_backend = load_backend('django_auth_ldap.backend.LDAPBackend')
def get_ldap_connection():
conn = ldap_backend.ldap.initialize(ldap_backend.settings.SERVER_URI, bytes_mode=False)
for opt, value in ldap_backend.settings.CONNECTION_OPTIONS.items():
conn.set_option(opt, value)
if ldap_backend.settings.START_TLS:
conn.start_tls_s()
conn.simple_bind_s(ldap_backend.settings.BIND_DN, ldap_backend.settings.BIND_PASSWORD)
return conn
def execute_ldap_search_without_hiding_errors(search, connection, filterargs=(), escape=True):
'''If an LDAPError occurs during search.execute, it logs the error and returns
an empty list of results. This behavor is indistinguishable from a query
with no results.'''
if not isinstance(search, LDAPSearch):
raise NotImplementedError(f'{type(search)} is not supported by postorius_ldap_membership_management')
# This is a copy of django_auth_ldap.config.LDAPSearch.execute without the
# ldap.LDAPError eating try-except block
if escape:
filterargs = search._escape_filterargs(filterargs)
filterstr = search.filterstr % filterargs
results = connection.search_s(search.base_dn, search.scope, filterstr, search.attrlist)
return search._process_results(results)
def populate_user(username):
'''Create or update user from LDAP, including it's django-allauth email
addresses and Mailman backend user'''
# Caution: If a user has no verified primary email address (an EmailAddress
# object with verified=True and primary=True), django-allauth allows the user
# to set his primary email address and also User.email to any unverified
# address. This is a security problem, because Postorius and Hyperkitty use
# User.email to lookup the corresponding Mailman backend user and thus for
# permission checking. This causes a race-condition in the following code
# that we can't do anything about. ldap_backend.populate_user creates (and
# saves) a new user. We create the EmailAddress objects afterwards. If this
# fails or does not happen for some reason, we end up with a user without a
# primary email address.
user = ldap_backend.populate_user(username)
if user is None:
# ldap_backend.populate_user hides LDAP errors and returns None if
# they occur. It also returns None if the user is not found in the LDAP
# directory.
return None
if 'last_name' not in django.conf.settings.AUTH_LDAP_USER_ATTR_MAP:
user.last_name = ''
primary_changed = False
user_email_objs = EmailAddress.objects.filter(user=user)
primary_email_obj = ([obj for obj in user_email_objs if obj.primary] or [None])[0]
ldap_email = user.ldap_user.attrs[django.conf.settings.AUTH_LDAP_USER_ATTR_MAP['email']][0]
ldap_email_obj = ([obj for obj in user_email_objs if obj.email == ldap_email] or [None])[0]
if ldap_email_obj is None:
ldap_email_obj = EmailAddress(user=user, email=ldap_email, verified=False, primary=False)
ldap_email_obj.verified = True
if primary_email_obj is None:
ldap_email_obj.primary = True
primary_email_obj = ldap_email_obj
primary_changed = True
ldap_email_obj.save()
# ldap_backend.populate_user overwrites user.email with the address from LDAP
user.email = primary_email_obj.email
user.save()
sync_email_addresses(user)
if primary_changed:
update_preferred_address(user, primary_email_obj)
mm_user = get_mailman_user(user)
if mm_user.display_name != user.get_full_name():
mm_user.display_name = user.get_full_name()
mm_user.save()
return user
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment