Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Select Git revision
Show changes
Showing
with 645 additions and 713 deletions
import click
from flask.cli import with_appcontext
from uffd.tasks import cleanup_task
from uffd.database import db
@click.command('cleanup', help='Cleanup expired data')
@with_appcontext
def cleanup_command():
cleanup_task.run()
db.session.commit()
import os
import click
from werkzeug.serving import make_ssl_devcert
@click.command("gendevcert", help='Generates a self-signed TLS certificate for development')
def gendevcert_command(): #pylint: disable=unused-variable
if os.path.exists('devcert.crt') or os.path.exists('devcert.key'):
print('Refusing to overwrite existing "devcert.crt"/"devcert.key" file!')
return
make_ssl_devcert('devcert')
print('Certificate written to "devcert.crt", private key to "devcert.key".')
print('Run `flask run --cert devcert.crt --key devcert.key` to use it.')
from flask import current_app
from flask.cli import AppGroup
from sqlalchemy.exc import IntegrityError
import click
from uffd.database import db
from uffd.models import Group
group_command = AppGroup('group', help='Manage groups')
@group_command.command(help='List names of all groups')
def list():
with current_app.test_request_context():
for group in Group.query:
click.echo(group.name)
@group_command.command(help='Show details of group')
@click.argument('name')
def show(name):
with current_app.test_request_context():
group = Group.query.filter_by(name=name).one_or_none()
if group is None:
raise click.ClickException(f'Group {name} not found')
click.echo(f'Name: {group.name}')
click.echo(f'Unix GID: {group.unix_gid}')
click.echo(f'Description: {group.description}')
click.echo(f'Members: {", ".join([user.loginname for user in group.members])}')
@group_command.command(help='Create new group')
@click.argument('name')
@click.option('--description', default='', help='Set description text. Empty per default.')
def create(name, description):
with current_app.test_request_context():
group = Group(description=description)
if not group.set_name(name):
raise click.ClickException('Invalid name')
try:
db.session.add(group)
db.session.commit()
except IntegrityError:
# pylint: disable=raise-missing-from
raise click.ClickException(f'A group with name "{name}" already exists')
@group_command.command(help='Update group attributes')
@click.argument('name')
@click.option('--description', help='Set description text.')
def update(name, description):
with current_app.test_request_context():
group = Group.query.filter_by(name=name).one_or_none()
if group is None:
raise click.ClickException(f'Group {name} not found')
if description is not None:
group.description = description
db.session.commit()
@group_command.command(help='Delete group')
@click.argument('name')
def delete(name):
with current_app.test_request_context():
group = Group.query.filter_by(name=name).one_or_none()
if group is None:
raise click.ClickException(f'Group {name} not found')
db.session.delete(group)
db.session.commit()
import os
from flask import current_app
from flask.cli import with_appcontext
import click
try:
from werkzeug.middleware.profiler import ProfilerMiddleware
except ImportError:
from werkzeug.contrib.profiler import ProfilerMiddleware
@click.command("profile", help='Runs app with profiler')
@with_appcontext
def profile_command(): #pylint: disable=unused-variable
# app.run() is silently ignored if executed from commands. We really want
# to do this, so we overwrite the check by overwriting the environment
# variable.
os.environ['FLASK_RUN_FROM_CLI'] = 'false'
current_app.wsgi_app = ProfilerMiddleware(current_app.wsgi_app, restrictions=[30])
current_app.run(debug=True)
from flask import current_app
from flask.cli import AppGroup
from sqlalchemy.exc import IntegrityError
import click
from uffd.database import db
from uffd.models import Group, Role, RoleGroup
role_command = AppGroup('role', help='Manage roles')
# pylint: disable=too-many-arguments,too-many-locals
def update_attrs(role, description=None, default=None,
moderator_group=None, clear_moderator_group=False,
clear_groups=False, add_group=tuple(), remove_group=tuple(),
clear_roles=False, add_role=tuple(), remove_role=tuple()):
if description is not None:
role.description = description
if default is not None:
role.is_default = default
if clear_moderator_group:
role.moderator_group = None
elif moderator_group is not None:
group = Group.query.filter_by(name=moderator_group).one_or_none()
if group is None:
raise click.ClickException(f'Moderaor group {moderator_group} not found')
role.moderator_group = group
if clear_groups:
role.groups.clear()
for group_name in add_group:
group = Group.query.filter_by(name=group_name).one_or_none()
if group is None:
raise click.ClickException(f'Group {group_name} not found')
role.groups[group] = RoleGroup(group=group)
for group_name in remove_group:
group = Group.query.filter_by(name=group_name).one_or_none()
if group is None:
raise click.ClickException(f'Group {group_name} not found')
del role.groups[group]
if clear_roles:
role.included_roles.clear()
for role_name in add_role:
_role = Role.query.filter_by(name=role_name).one_or_none()
if _role is None:
raise click.ClickException(f'Role {role_name} not found')
role.included_roles.append(_role)
for role_name in remove_role:
_role = Role.query.filter_by(name=role_name).one_or_none()
if _role is None:
raise click.ClickException(f'Role {role_name} not found')
role.included_roles.remove(_role)
@role_command.command(help='List names of all roles')
def list():
with current_app.test_request_context():
for role in Role.query:
click.echo(role.name)
@role_command.command(help='Show details of group')
@click.argument('name')
def show(name):
with current_app.test_request_context():
role = Role.query.filter_by(name=name).one_or_none()
if role is None:
raise click.ClickException(f'Role {name} not found')
click.echo(f'Name: {role.name}')
click.echo(f'Description: {role.description}')
click.echo(f'Default: {role.is_default}')
click.echo(f'Moderator group: {role.moderator_group.name if role.moderator_group else None}')
click.echo(f'Direct groups: {", ".join(sorted([group.name for group in role.groups]))}')
click.echo(f'Effective groups: {", ".join(sorted([group.name for group in role.groups_effective]))}')
click.echo(f'Included roles: {", ".join(sorted([irole.name for irole in role.included_roles]))}')
click.echo(f'Direct members: {", ".join(sorted([user.loginname for user in role.members]))}')
click.echo(f'Effective members: {", ".join(sorted([user.loginname for user in role.members_effective]))}')
@role_command.command(help='Create new role')
@click.argument('name')
@click.option('--description', default='', help='Set description text.')
@click.option('--default/--no-default', default=False, help='Mark role as default or not. Non-service users are auto-added to default roles.')
@click.option('--moderator-group', metavar='GROUP_NAME', help='Set moderator group. No moderator group if unset.')
@click.option('--add-group', multiple=True, metavar='GROUP_NAME', help='Add group granted to role members. Repeat to add multiple groups.')
@click.option('--add-role', multiple=True, metavar='ROLE_NAME', help='Add role to inherit groups from. Repeat to add multiple roles.')
def create(name, description, default, moderator_group, add_group, add_role):
with current_app.test_request_context():
try:
role = Role(name=name)
update_attrs(role, description, default, moderator_group,
add_group=add_group, add_role=add_role)
db.session.add(role)
role.update_member_groups()
db.session.commit()
except IntegrityError:
# pylint: disable=raise-missing-from
raise click.ClickException(f'A role with name "{name}" already exists')
@role_command.command(help='Update role attributes')
@click.argument('name')
@click.option('--description', help='Set description text.')
@click.option('--default/--no-default', default=None, help='Mark role as default or not. Non-service users are auto-added to default roles.')
@click.option('--moderator-group', metavar='GROUP_NAME', help='Set moderator group.')
@click.option('--no-moderator-group', is_flag=True, flag_value=True, default=False, help='Clear moderator group setting.')
@click.option('--clear-groups', is_flag=True, flag_value=True, default=False, help='Remove all groups granted to role members. Executed before --add-group.')
@click.option('--add-group', multiple=True, metavar='GROUP_NAME', help='Add group granted to role members. Repeat to add multiple groups.')
@click.option('--remove-group', multiple=True, metavar='GROUP_NAME', help='Remove group granted to role members. Repeat to remove multiple groups.')
@click.option('--clear-roles', is_flag=True, flag_value=True, default=False, help='Remove all included roles. Executed before --add-role.')
@click.option('--add-role', multiple=True, metavar='ROLE_NAME', help='Add role to inherit groups from. Repeat to add multiple roles.')
@click.option('--remove-role', multiple=True, metavar='ROLE_NAME', help='Remove included role. Repeat to remove multiple roles.')
def update(name, description, default, moderator_group, no_moderator_group,
clear_groups, add_group, remove_group, clear_roles, add_role, remove_role):
with current_app.test_request_context():
role = Role.query.filter_by(name=name).one_or_none()
if role is None:
raise click.ClickException(f'Role {name} not found')
old_members = set(role.members_effective)
update_attrs(role, description, default, moderator_group,
no_moderator_group, clear_groups, add_group, remove_group,
clear_roles, add_role, remove_role)
for user in old_members:
user.update_groups()
role.update_member_groups()
db.session.commit()
@role_command.command(help='Delete role')
@click.argument('name')
def delete(name):
with current_app.test_request_context():
role = Role.query.filter_by(name=name).one_or_none()
if role is None:
raise click.ClickException(f'Role {name} not found')
old_members = set(role.members_effective)
db.session.delete(role)
for user in old_members:
user.update_groups()
db.session.commit()
import sys
from flask import current_app
from flask.cli import with_appcontext
import click
from uffd.database import db
from uffd.models import User
@click.command('roles-update-all', help='Update group memberships for all users based on their roles')
@click.option('--check-only', is_flag=True)
@with_appcontext
def roles_update_all_command(check_only): #pylint: disable=unused-variable
consistent = True
with current_app.test_request_context():
for user in User.query.all():
groups_added, groups_removed = user.update_groups()
if groups_added:
consistent = False
print('Adding groups [%s] to user %s'%(', '.join([group.name for group in groups_added]), user.loginname))
if groups_removed:
consistent = False
print('Removing groups [%s] from user %s'%(', '.join([group.name for group in groups_removed]), user.loginname))
if not check_only:
db.session.commit()
if check_only and not consistent:
print('No changes were made because --check-only is set')
print()
print('Error: Groups are not consistent with roles in database')
sys.exit(1)
import click
from flask.cli import with_appcontext
from sqlalchemy.exc import IntegrityError
from uffd.database import db
from uffd.models import User, UserEmail, FeatureFlag
# pylint completely fails to understand SQLAlchemy's query functions
# pylint: disable=no-member
@click.group('unique-email-addresses', help='Enable/disable e-mail address uniqueness checks')
def unique_email_addresses_command():
pass
@unique_email_addresses_command.command('enable')
@with_appcontext
def enable_unique_email_addresses_command():
if FeatureFlag.unique_email_addresses:
raise click.ClickException('Uniqueness checks for e-mail addresses are already enabled')
query = db.select([UserEmail.address_normalized, UserEmail.user_id])\
.group_by(UserEmail.address_normalized, UserEmail.user_id)\
.having(db.func.count(UserEmail.id.distinct()) > 1)
for address_normalized, user_id in db.session.execute(query).fetchall():
user = User.query.get(user_id)
user_emails = UserEmail.query.filter_by(address_normalized=address_normalized, user_id=user_id)
click.echo(f'User "{user.loginname}" has the same e-mail address multiple times:', err=True)
for user_email in user_emails:
if user_email.verified:
click.echo(f'- {user_email.address}', err=True)
else:
click.echo(f'- {user_email.address} (unverified)', err=True)
click.echo()
query = db.select([UserEmail.address_normalized, UserEmail.address])\
.where(UserEmail.verified)\
.group_by(UserEmail.address_normalized)\
.having(db.func.count(UserEmail.id.distinct()) > 1)
for address_normalized, address in db.session.execute(query).fetchall():
click.echo(f'E-mail address "{address}" is used by multiple users:', err=True)
user_emails = UserEmail.query.filter_by(address_normalized=address_normalized, verified=True)
for user_email in user_emails:
if user_email.address != address:
click.echo(f'- {user_email.user.loginname} ({user_email.address})', err=True)
else:
click.echo(f'- {user_email.user.loginname}', err=True)
click.echo()
try:
FeatureFlag.unique_email_addresses.enable()
except IntegrityError:
# pylint: disable=raise-missing-from
raise click.ClickException('''Some existing e-mail addresses violate uniqueness checks
You need to fix this manually in the admin interface. Then run this command
again to continue.''')
db.session.commit()
click.echo('Uniqueness checks for e-mail addresses enabled')
@unique_email_addresses_command.command('disable')
@with_appcontext
def disable_unique_email_addresses_command():
if not FeatureFlag.unique_email_addresses:
raise click.ClickException('Uniqueness checks for e-mail addresses are already disabled')
click.echo('''Please note that the option to disable email address uniqueness checks will
be remove in uffd v3.
''', err=True)
FeatureFlag.unique_email_addresses.disable()
db.session.commit()
click.echo('Uniqueness checks for e-mail addresses disabled')
from flask import current_app
from flask.cli import AppGroup
from sqlalchemy.exc import IntegrityError
import click
from uffd.database import db
from uffd.models import User, Role
user_command = AppGroup('user', help='Manage users')
# pylint: disable=too-many-arguments
def update_attrs(user, mail=None, displayname=None, password=None,
prompt_password=False, clear_roles=False,
add_role=tuple(), remove_role=tuple(), deactivate=None):
if password is None and prompt_password:
password = click.prompt('Password', hide_input=True, confirmation_prompt='Confirm password')
if mail is not None and not user.set_primary_email_address(mail):
raise click.ClickException('Invalid mail address')
if displayname is not None and not user.set_displayname(displayname):
raise click.ClickException('Invalid displayname')
if password is not None and not user.set_password(password):
raise click.ClickException('Invalid password')
if deactivate is not None:
user.is_deactivated = deactivate
if clear_roles:
user.roles.clear()
for role_name in add_role:
role = Role.query.filter_by(name=role_name).one_or_none()
if role is None:
raise click.ClickException(f'Role {role_name} not found')
role.members.append(user)
for role_name in remove_role:
role = Role.query.filter_by(name=role_name).one_or_none()
if role is None:
raise click.ClickException(f'Role {role_name} not found')
role.members.remove(user)
user.update_groups()
@user_command.command(help='List login names of all users')
def list():
with current_app.test_request_context():
for user in User.query:
click.echo(user.loginname)
@user_command.command(help='Show details of user')
@click.argument('loginname')
def show(loginname):
with current_app.test_request_context():
user = User.query.filter_by(loginname=loginname).one_or_none()
if user is None:
raise click.ClickException(f'User {loginname} not found')
click.echo(f'Loginname: {user.loginname}')
click.echo(f'Deactivated: {user.is_deactivated}')
click.echo(f'Displayname: {user.displayname}')
click.echo(f'Mail: {user.primary_email.address}')
click.echo(f'Service User: {user.is_service_user}')
click.echo(f'Roles: {", ".join([role.name for role in user.roles])}')
click.echo(f'Groups: {", ".join([group.name for group in user.groups])}')
@user_command.command(help='Create new user')
@click.argument('loginname')
@click.option('--mail', required=True, metavar='EMAIL_ADDRESS', help='E-Mail address')
@click.option('--displayname', help='Set display name. Defaults to login name.')
@click.option('--service/--no-service', default=False, help='Create service or regular (default) user. '+\
'Regular users automatically have roles marked as default. '+\
'Service users do not.')
@click.option('--password', help='Password for SSO login. Login disabled if unset.')
@click.option('--prompt-password', is_flag=True, flag_value=True, default=False, help='Read password interactively from terminal.')
@click.option('--add-role', multiple=True, help='Add role to user. Repeat to add multiple roles.', metavar='ROLE_NAME')
@click.option('--deactivate', is_flag=True, flag_value=True, default=None, help='Deactivate account.')
def create(loginname, mail, displayname, service, password, prompt_password, add_role, deactivate):
with current_app.test_request_context():
if displayname is None:
displayname = loginname
user = User(is_service_user=service)
if not user.set_loginname(loginname, ignore_blocklist=True):
raise click.ClickException('Invalid loginname')
try:
db.session.add(user)
update_attrs(user, mail, displayname, password, prompt_password, add_role=add_role, deactivate=deactivate)
db.session.commit()
except IntegrityError:
# pylint: disable=raise-missing-from
raise click.ClickException('Login name or e-mail address is already in use')
@user_command.command(help='Update user attributes and roles')
@click.argument('loginname')
@click.option('--mail', metavar='EMAIL_ADDRESS', help='Set e-mail address.')
@click.option('--displayname', help='Set display name.')
@click.option('--password', help='Set password for SSO login.')
@click.option('--prompt-password', is_flag=True, flag_value=True, default=False, help='Set password by reading it interactivly from terminal.')
@click.option('--clear-roles', is_flag=True, flag_value=True, default=False, help='Remove all roles from user. Executed before --add-role.')
@click.option('--add-role', multiple=True, help='Add role to user. Repeat to add multiple roles.')
@click.option('--remove-role', multiple=True, help='Remove role from user. Repeat to remove multiple roles.')
@click.option('--deactivate/--activate', default=None, help='Deactivate or reactivate account.')
def update(loginname, mail, displayname, password, prompt_password, clear_roles, add_role, remove_role, deactivate):
with current_app.test_request_context():
user = User.query.filter_by(loginname=loginname).one_or_none()
if user is None:
raise click.ClickException(f'User {loginname} not found')
try:
update_attrs(user, mail, displayname, password, prompt_password, clear_roles, add_role, remove_role, deactivate)
db.session.commit()
except IntegrityError:
# pylint: disable=raise-missing-from
raise click.ClickException('E-mail address is already in use')
@user_command.command(help='Delete user')
@click.argument('loginname')
def delete(loginname):
with current_app.test_request_context():
user = User.query.filter_by(loginname=loginname).one_or_none()
if user is None:
raise click.ClickException(f'User {loginname} not found')
db.session.delete(user)
db.session.commit()
......@@ -4,9 +4,7 @@ from flask import Blueprint, request, session
bp = Blueprint("csrf", __name__)
# pylint: disable=invalid-name
csrfEndpoints = []
# pylint: enable=invalid-name
csrf_endpoints = []
def csrf_protect(blueprint=None, endpoint=None):
def wraper(func):
......@@ -15,7 +13,7 @@ def csrf_protect(blueprint=None, endpoint=None):
urlendpoint = "{}.{}".format(blueprint.name, func.__name__)
else:
urlendpoint = func.__name__
csrfEndpoints.append(urlendpoint)
csrf_endpoints.append(urlendpoint)
@wraps(func)
def decorator(*args, **kwargs):
if '_csrf_token' in request.values:
......@@ -32,6 +30,6 @@ def csrf_protect(blueprint=None, endpoint=None):
@bp.app_url_defaults
def csrf_inject(endpoint, values):
if endpoint not in csrfEndpoints or not session.get('_csrf_token'):
if endpoint not in csrf_endpoints or not session.get('_csrf_token'):
return
values['_csrf_token'] = session['_csrf_token']
from .csrf import bp as csrf_bp, csrf_protect
bp = [csrf_bp]
from collections import OrderedDict
from sqlalchemy import MetaData, event
from sqlalchemy.types import TypeDecorator, Text
from sqlalchemy.ext.mutable import MutableList
from flask_sqlalchemy import SQLAlchemy
from flask.json import JSONEncoder
# pylint: disable=C0103
db = SQLAlchemy()
# pylint: enable=C0103
class SQLAlchemyJSON(JSONEncoder):
def default(self, o):
if isinstance(o, db.Model):
result = OrderedDict()
for key in o.__mapper__.c.keys():
result[key] = getattr(o, key)
return result
return JSONEncoder.default(self, o)
convention = {
'ix': 'ix_%(column_0_label)s',
'uq': 'uq_%(table_name)s_%(column_0_name)s',
'ck': 'ck_%(table_name)s_%(column_0_name)s',
'fk': 'fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s',
'pk': 'pk_%(table_name)s'
}
metadata = MetaData(naming_convention=convention)
db = SQLAlchemy(metadata=metadata)
def enable_sqlite_foreign_key_support(dbapi_connection, connection_record):
# pylint: disable=unused-argument
cursor = dbapi_connection.cursor()
cursor.execute('PRAGMA foreign_keys=ON')
cursor.close()
# We want to enable SQLite foreign key support for app and test code, but not
# for migrations.
# The common way to add the handler to the Engine class (so it applies to all
# instances) would also affect the migrations. With flask_sqlalchemy v2.4 and
# newer we could overwrite SQLAlchemy.create_engine and add our handler there.
# However Debian Buster and Bullseye ship v2.1, so we do this here and call
# this function in create_app.
def customize_db_engine(engine):
if engine.name == 'sqlite':
event.listen(engine, 'connect', enable_sqlite_foreign_key_support)
elif engine.name in ('mysql', 'mariadb'):
@event.listens_for(engine, 'connect')
def receive_connect(dbapi_connection, connection_record): # pylint: disable=unused-argument
cursor = dbapi_connection.cursor()
cursor.execute('SHOW VARIABLES LIKE "character_set_connection"')
character_set_connection = cursor.fetchone()[1]
if character_set_connection != 'utf8mb4':
raise Exception(f'Unsupported connection charset "{character_set_connection}". Make sure to add "?charset=utf8mb4" to SQLALCHEMY_DATABASE_URI!')
cursor.execute('SHOW VARIABLES LIKE "collation_database"')
collation_database = cursor.fetchone()[1]
if collation_database != 'utf8mb4_nopad_bin':
raise Exception(f'Unsupported database collation "{collation_database}". Create the database with "CHARACTER SET utf8mb4 COLLATE utf8mb4_nopad_bin"!')
cursor.execute('SET NAMES utf8mb4 COLLATE utf8mb4_nopad_bin')
cursor.close()
class CommaSeparatedList(TypeDecorator):
# For some reason TypeDecorator.process_literal_param and
# TypeEngine.python_type are abstract but not actually required
# pylint: disable=abstract-method
impl = Text
cache_ok = True
def process_bind_param(self, value, dialect):
if value is None:
return None
for item in value:
if ',' in item:
raise ValueError('Items of comma-separated list must not contain commas')
return ','.join(value)
def process_result_value(self, value, dialect):
if value is None:
return None
return MutableList(value.split(','))
LDAP_BASE_USER="ou=users,dc=example,dc=com"
LDAP_BASE_GROUPS="ou=groups,dc=example,dc=com"
LDAP_BASE_MAIL="ou=postfix,dc=example,dc=com"
USER_GID=20001
LDAP_SERVICE_BIND_DN=""
LDAP_SERVICE_BIND_PASSWORD=""
LDAP_SERVICE_URL="ldapi:///"
# Service and non-service users must either have the same UID range or must not overlap
USER_MIN_UID=10000
USER_MAX_UID=18999
USER_SERVICE_MIN_UID=19000
USER_SERVICE_MAX_UID=19999
LDAP_USER_OBJECTCLASSES=["top", "inetOrgPerson", "organizationalPerson", "person", "posixAccount"]
LDAP_USER_GID=20001
LDAP_USER_MIN_UID=10000
LDAP_USER_MAX_UID=18999
GROUP_MIN_GID=20000
GROUP_MAX_GID=49999
# The period of time that a login lasts for.
SESSION_LIFETIME_SECONDS=3600
# The period of time that the session cookie lasts for. This is refreshed on each page load.
PERMANENT_SESSION_LIFETIME=2678400
# CSRF protection
SESSION_COOKIE_SECURE=True
SESSION_COOKIE_HTTPONLY=True
SESSION_COOKIE_SAMESITE='Strict'
LANGUAGES={
# Language identifier (see Accept-Language HTTP header) -> Display Name
"en": "EN",
"de": "DE",
}
ACL_ADMIN_GROUP="uffd_admin"
# Group required to access selfservice functions (view selfservice, change profile/password/roles)
ACL_SELFSERVICE_GROUP="uffd_access"
# Group required to login
ACL_ACCESS_GROUP="uffd_access"
# Members can create invite links for signup
ACL_SIGNUP_GROUP="uffd_signup"
MAIL_SERVER='' # e.g. example.com
MAIL_PORT=465
MAIL_USERNAME='yourId@example.com'
MAIL_USERNAME='yourId@example.com' # set to empty string to disable authentication
MAIL_PASSWORD='*****'
MAIL_USE_STARTTLS=True
MAIL_FROM_ADDRESS='foo@bar.com'
MAIL_LDAP_OBJECTCLASSES=["top", "postfixVirtual"]
# Set to a domain name (e.g. "remailer.example.com") to enable remailer.
# Requires special mail server configuration (see uffd-socketmapd). Can be
# enabled/disabled per-service in the service settings. If enabled, services
# no longer get real user mail addresses but instead special autogenerated
# addresses that are replaced with the real mail addresses by the mail server.
REMAILER_DOMAIN = ''
REMAILER_OLD_DOMAINS = []
# Secret used for construction and verification of remailer addresses.
# If None, the value of SECRET_KEY is used.
REMAILER_SECRET_KEY = None
# Set to list of user loginnames to limit remailer to a small list of users.
# Useful for debugging. If None remailer is active for all users (if
# configured and enabled for a service). This option is deprecated. Use the
# per-service setting in the web interface instead.
REMAILER_LIMIT_TO_USERS = None
# Do not enable this on a public service! There is no spam protection implemented at the moment.
SELF_SIGNUP=False
INVITE_MAX_VALID_DAYS=21
LOGINNAME_BLOCKLIST=['^admin$', '^root$']
#MFA_ICON_URL = 'https://example.com/logo.png'
#MFA_RP_ID = 'example.com' # If unset, hostname from current request is used
MFA_RP_NAME = 'Uffd Test Service' # Service name passed to U2F/FIDO2 authenticators
ROLES_BASEROLES=['base']
SQLALCHEMY_TRACK_MODIFICATIONS=False
FOOTER_LINKS=[{"url": "https://example.com", "title": "example"}]
OAUTH2_CLIENTS={
#'test_client_id' : {'client_secret': 'random_secret', 'redirect_uris': ['https://example.com/oauth']},
# You can optionally restrict access to users with a certain group. Set 'required_group' to the name of an LDAP group name or a list of groups.
# ... 'required_group': 'test_access_group' ... only allows users with group "test_access_group" access
# ... 'required_group': ['groupa', ['groupb', 'groupc']] ... allows users with group "groupa" as well as users with both "groupb" and "groupc" access
}
# The default page after login or clicking the top left home button is the self-service
# page. If you would like it to be the services list instead, set this to True.
DEFAULT_PAGE_SERVICES=False
# Service overview page (disabled if empty)
SERVICES=[
......@@ -77,7 +107,12 @@ SERVICES=[
# # Infos are small/medium amounts of information displayed in a modal
# # dialog. All matching items are visible.
# 'infos': [
# {'title': 'Documentation', 'html': '<p>Some information about the service as html</p>', 'required_group': 'users'},
# {
# 'title': 'uffd',
# 'button_text': 'Documentation', # Defaults to the title if not set
# 'html': '<p>Some information about the service as html</p>',
# 'required_group': 'users',
# },
# ],
# # Links to external sites, all matching items are visible
# 'links': [
......@@ -85,17 +120,34 @@ SERVICES=[
# ]
# },
]
# A banner text that will be displayed above the services list
SERVICES_BANNER=''
# If the banner should be shown to users who are not logged in
SERVICES_BANNER_PUBLIC=True
# Enable the service overview page for users who are not logged in
SERVICES_PUBLIC=True
# An optional banner that will be displayed above the login form
#LOGIN_BANNER='Always check the URL. Never enter your SSO password on any other site.'
BRANDING_LOGO_URL='/static/empty.png'
SITE_TITLE='uffd'
# Name and contact mail address are displayed to users in a few places (plain text only!)
ORGANISATION_NAME='Example Organisation'
ORGANISATION_CONTACT='contact@example.com'
# Optional text included in account registration mails (plain text only!)
WELCOME_TEXT='See https://docs.example.com/ for further information.'
# do NOT set in production
#TEMPLATES_AUTO_RELOAD=True
#SQLALCHEMY_ECHO=True
#FLASK_ENV=development
#LDAP_SERVICE_MOCK=True
# DO set in production
......
# pylint: skip-file
from flask_babel import gettext as _
from warnings import warn
from flask import request, current_app
import urllib.parse
# WebAuthn support is optional because fido2 has a pretty unstable
# interface and might be difficult to install with the correct version
try:
import fido2 as __fido2
if __fido2.__version__.startswith('0.5.'):
from fido2.client import ClientData
from fido2.server import Fido2Server, RelyingParty as __PublicKeyCredentialRpEntity
from fido2.ctap2 import AttestationObject, AuthenticatorData, AttestedCredentialData
from fido2 import cbor
cbor.encode = cbor.dumps
cbor.decode = lambda arg: cbor.loads(arg)[0]
class PublicKeyCredentialRpEntity(__PublicKeyCredentialRpEntity):
def __init__(self, name, id):
super().__init__(id, name)
elif __fido2.__version__.startswith('0.9.'):
from fido2.client import ClientData
from fido2.webauthn import PublicKeyCredentialRpEntity
from fido2.server import Fido2Server
from fido2.ctap2 import AttestationObject, AuthenticatorData, AttestedCredentialData
from fido2 import cbor
elif __fido2.__version__.startswith('1.'):
from fido2.webauthn import PublicKeyCredentialRpEntity, CollectedClientData as ClientData, AttestationObject, AuthenticatorData, AttestedCredentialData
from fido2.server import Fido2Server
from fido2 import cbor
else:
raise ImportError(f'Unsupported fido2 version: {__fido2.__version__}')
def get_webauthn_server():
hostname = urllib.parse.urlsplit(request.url).hostname
return Fido2Server(PublicKeyCredentialRpEntity(id=current_app.config.get('MFA_RP_ID', hostname),
name=current_app.config['MFA_RP_NAME']))
WEBAUTHN_SUPPORTED = True
except ImportError as err:
warn(_('2FA WebAuthn support disabled because import of the fido2 module failed (%s)')%err)
WEBAUTHN_SUPPORTED = False
from .ldap import bp as ldap_bp
from .ldap import get_conn, user_conn, escape_filter_chars, uid_to_dn
from .ldap import loginname_to_dn, mail_to_dn, get_next_uid, loginname_is_safe, mailname_is_safe
from .ldap import get_ldap_array_attribute_safe, get_ldap_attribute_safe
bp = [ldap_bp]
import string
from flask import Blueprint, current_app
from ldap3.utils.conv import escape_filter_chars
from ldap3.core.exceptions import LDAPBindError, LDAPCursorError, LDAPPasswordIsMandatoryError
from ldap3 import Server, Connection, ALL, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES, MOCK_SYNC
bp = Blueprint("ldap", __name__)
def fix_connection(conn):
old_search = conn.search
def search(*args, **kwargs):
kwargs.update({'attributes': [ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES]})
return old_search(*args, **kwargs)
conn.search = search
return conn
def get_mock_conn():
if not current_app.debug:
raise Exception('LDAP_SERVICE_MOCK cannot be enabled on production instances')
# Entries are stored in-memory in the mocked `Connection` object. To make
# changes persistent across requests we reuse the same `Connection` object
# for all calls to `service_conn()` and `user_conn()`.
if not hasattr(current_app, 'ldap_mock'):
server = Server.from_definition('ldap_mock', 'ldap_server_info.json', 'ldap_server_schema.json')
current_app.ldap_mock = fix_connection(Connection(server, client_strategy=MOCK_SYNC))
current_app.ldap_mock.strategy.entries_from_json('ldap_server_entries.json')
current_app.ldap_mock.bind()
return current_app.ldap_mock
def service_conn():
if current_app.config.get('LDAP_SERVICE_MOCK', False):
return get_mock_conn()
server = Server(current_app.config["LDAP_SERVICE_URL"], get_info=ALL)
return fix_connection(Connection(server, current_app.config["LDAP_SERVICE_BIND_DN"], current_app.config["LDAP_SERVICE_BIND_PASSWORD"], auto_bind=True))
def user_conn(loginname, password):
if not loginname_is_safe(loginname):
return False
if current_app.config.get('LDAP_SERVICE_MOCK', False):
conn = get_mock_conn()
# Since we reuse the same conn for all calls to `user_conn()` we
# simulate the password check by rebinding. Note that ldap3's mocking
# implementation just compares the string in the objects's userPassword
# field with the password, no support for hashing or OpenLDAP-style
# password-prefixes ("{PLAIN}..." or "{ssha512}...").
try:
if not conn.rebind(loginname_to_dn(loginname), password):
return False
except (LDAPBindError, LDAPPasswordIsMandatoryError):
return False
return get_mock_conn()
server = Server(current_app.config["LDAP_SERVICE_URL"], get_info=ALL)
try:
return fix_connection(Connection(server, loginname_to_dn(loginname), password, auto_bind=True))
except (LDAPBindError, LDAPPasswordIsMandatoryError):
return False
def get_conn():
return service_conn()
def uid_to_dn(uid):
conn = get_conn()
conn.search(current_app.config["LDAP_BASE_USER"], '(&(objectclass=person)(uidNumber={}))'.format(escape_filter_chars(uid)))
if not len(conn.entries) == 1:
return None
return conn.entries[0].entry_dn
def loginname_to_dn(loginname):
if loginname_is_safe(loginname):
return 'uid={},{}'.format(loginname, current_app.config["LDAP_BASE_USER"])
raise ValueError('unsafe login name')
def mail_to_dn(uid):
if mailname_is_safe(uid):
return 'uid={},{}'.format(uid, current_app.config["LDAP_BASE_MAIL"])
raise Exception('unsafe mail name')
def loginname_is_safe(value):
if len(value) > 32 or len(value) < 1:
return False
for char in value:
if not char in string.ascii_lowercase + string.digits + '_-':
return False
return True
def mailname_is_safe(value):
return loginname_is_safe(value)
def get_next_uid():
conn = get_conn()
conn.search(current_app.config["LDAP_BASE_USER"], '(objectclass=person)')
max_uid = current_app.config["LDAP_USER_MIN_UID"]
for i in conn.entries:
# skip out of range entries
if i['uidNumber'].value > current_app.config["LDAP_USER_MAX_UID"]:
continue
if i['uidNumber'].value < current_app.config["LDAP_USER_MIN_UID"]:
continue
max_uid = max(i['uidNumber'].value, max_uid)
next_uid = max_uid + 1
if uid_to_dn(next_uid):
raise Exception('No free uid found')
return next_uid
def get_ldap_attribute_safe(ldapobject, attribute):
try:
result = ldapobject[attribute].value if attribute in ldapobject else None
# we have to catch LDAPCursorError here, because ldap3 in older versions has a broken __contains__ function
# see https://github.com/cannatag/ldap3/issues/493
# fixed in version 2.5
# debian buster ships 2.4.1
except LDAPCursorError:
result = None
return result
def get_ldap_array_attribute_safe(ldapobject, attribute):
# if the aray is empty, the attribute does not exist.
# if there is only one elemtent, ldap returns a string and not an array with one element
# we sanitize this to always be an array
result = get_ldap_attribute_safe(ldapobject, attribute)
if not result:
result = []
if isinstance(result, str):
result = [result]
return result
from copy import deepcopy
from collections.abc import MutableSet
from flask import current_app, request
from ldap3.utils.conv import escape_filter_chars
from ldap3.core.exceptions import LDAPBindError, LDAPCursorError, LDAPPasswordIsMandatoryError
from ldap3 import MODIFY_REPLACE, MODIFY_DELETE, MODIFY_ADD, HASHED_SALTED_SHA512
from ldap3 import Server, Connection, ALL, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES, MOCK_SYNC
def fix_connection(conn):
old_search = conn.search
def search(*args, **kwargs):
kwargs.update({'attributes': [ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES]})
return old_search(*args, **kwargs)
conn.search = search
return conn
def get_mock_conn():
if not current_app.debug:
raise Exception('LDAP_SERVICE_MOCK cannot be enabled on production instances')
# Entries are stored in-memory in the mocked `Connection` object. To make
# changes persistent across requests we reuse the same `Connection` object
# for all calls to `service_conn()` and `user_conn()`.
if not hasattr(current_app, 'ldap_mock'):
server = Server.from_definition('ldap_mock', 'ldap_server_info.json', 'ldap_server_schema.json')
current_app.ldap_mock = fix_connection(Connection(server, client_strategy=MOCK_SYNC))
current_app.ldap_mock.strategy.entries_from_json('ldap_server_entries.json')
current_app.ldap_mock.bind()
return current_app.ldap_mock
def get_conn():
if current_app.config.get('LDAP_SERVICE_MOCK', False):
return get_mock_conn()
server = Server(current_app.config["LDAP_SERVICE_URL"], get_info=ALL)
return fix_connection(Connection(server, current_app.config["LDAP_SERVICE_BIND_DN"], current_app.config["LDAP_SERVICE_BIND_PASSWORD"], auto_bind=True))
class LDAPSession:
def __init__(self):
self.__objects = {} # dn -> instance
self.__to_delete = []
self.__relations = {} # (srccls, srcattr, dn) -> {srcobj, ...}
def lookup(self, dn):
return self.__objects.get(dn)
def register(self, obj):
if obj.dn in self.__objects and self.__objects[obj.dn] != obj:
raise Exception()
self.__objects[obj.dn] = obj
return obj
def lookup_relations(self, srccls, srcattr, dn):
key = (srccls, srcattr, dn)
return self.__relations.get(key, set())
def update_relations(self, srcobj, srcattr, delete_dns=None, add_dns=None):
for dn in (delete_dns or []):
key = (type(srcobj), srcattr, dn)
self.__relations[key] = self.__relations.get(key, set())
self.__relations[key].discard(srcobj)
for dn in (add_dns or []):
key = (type(srcobj), srcattr, dn)
self.__relations[key] = self.__relations.get(key, set())
self.__relations[key].add(srcobj)
def add(self, obj):
if obj.ldap_created:
raise Exception()
self.register(obj)
def delete(self, obj):
if obj.dn in self.__objects:
del self.__objects[obj.dn]
self.__to_delete.append(obj)
def commit(self):
while self.__to_delete:
self.__to_delete.pop(0).ldap_delete()
for obj in self.__objects.values():
if not obj.ldap_created:
obj.ldap_create()
elif obj.ldap_dirty:
obj.ldap_modify()
def rollback(self):
self.__to_delete.clear()
self.__objects = {dn: obj for dn, obj in self.__objects.items() if obj.ldap_created}
for obj in self.__objects.values():
if obj.ldap_dirty:
obj.ldap_reset()
class FlaskLDAPMapper:
@property
def session(self):
if not hasattr(request, 'ldap_session'):
request.ldap_session = LDAPSession()
return request.ldap_session
ldap = FlaskLDAPMapper()
class LDAPSet(MutableSet):
def __init__(self, getitems, additem, delitem, encode=None, decode=None):
self.__getitems = getitems
self.__additem = additem
self.__delitem = delitem
self.__encode = encode or (lambda x: x)
self.__decode = decode or (lambda x: x)
def __repr__(self):
return repr(set(self))
def __contains__(self, value):
return self.__encode(value) in self.__getitems()
def __iter__(self):
return iter(map(self.__decode, self.__getitems()))
def __len__(self):
return len(self.__getitems())
def add(self, value):
if value not in self:
self.__additem(self.__encode(value))
def discard(self, value):
self.__delitem(self.__encode(value))
class LDAPAttribute:
def __init__(self, name, multi=False, default=None, encode=None, decode=None):
self.name = name
self.multi = multi
self.encode = encode or (lambda x: x)
self.decode = decode or (lambda x: x)
def default_wrapper():
values = default() if callable(default) else default
if not isinstance(values, list):
values = [values]
return [self.encode(value) for value in values]
self.default = default_wrapper
def __set_name__(self, cls, name):
if self.default is None:
return
if not cls.ldap_defaults:
cls.ldap_defaults = {}
cls.ldap_defaults[self.name] = self.default
def __get__(self, obj, objtype=None):
if obj is None:
return self
if self.multi:
return LDAPSet(getitems=lambda: obj.ldap_getattr(self.name),
additem=lambda value: obj.ldap_attradd(self.name, value),
delitem=lambda value: obj.ldap_attrdel(self.name, value),
encode=self.encode, decode=self.decode)
return self.decode((obj.ldap_getattr(self.name) or [None])[0])
def __set__(self, obj, values):
if not self.multi:
values = [values]
obj.ldap_setattr(self.name, [self.encode(value) for value in values])
class LDAPBackref:
def __init__(self, srccls, srcattr):
self.srccls = srccls
self.srcattr = srcattr
if srccls.ldap_relations is None:
srccls.ldap_relations = set()
srccls.ldap_relations.add(srcattr)
def init(self, obj):
if self.srcattr in obj.ldap_relation_data:
return
# The query instanciates all related objects that in turn add their relations to session
self.srccls.ldap_filter_by(**{self.srcattr: obj.dn})
obj.ldap_relation_data.add(self.srcattr)
def __get__(self, obj, objtype=None):
if obj is None:
return self
self.init(obj)
return LDAPSet(getitems=lambda: ldap.session.lookup_relations(self.srccls, self.srcattr, obj.dn),
additem=lambda value: value.ldap_attradd(self.srcattr, obj.dn),
delitem=lambda value: value.ldap_attrdel(self.srcattr, obj.dn))
def __set__(self, obj, values):
current = self.__get__(obj)
current.clear()
for value in values:
current.add(value)
class LDAPRelation(LDAPAttribute):
def __init__(self, name, dest, backref=None):
super().__init__(name, multi=True, encode=lambda value: value.dn,
decode=lambda value: dest.ldap_get(value))
self.name = name
self.dest = dest
self.backref = backref
def __set_name__(self, cls, name):
if self.backref is not None:
setattr(self.dest, self.backref, LDAPBackref(cls, self.name))
class LDAPModel:
ldap_dn_attribute = None
ldap_dn_base = None
ldap_base = None
ldap_object_classes = None
ldap_filter = None
ldap_defaults = None # Populated by LDAPAttribute
ldap_relations = None # Populated by LDAPBackref
def __init__(self, _ldap_dn=None, _ldap_attributes=None, **kwargs):
self.ldap_relation_data = set()
self.__ldap_dn = _ldap_dn
self.__ldap_attributes = {}
for key, values in (_ldap_attributes or {}).items():
if isinstance(values, list):
self.__ldap_attributes[key] = values
else:
self.__ldap_attributes[key] = [values]
self.__attributes = deepcopy(self.__ldap_attributes)
self.__changes = {}
for key, value, in kwargs.items():
if not hasattr(self, key):
raise Exception()
setattr(self, key, value)
for name in (self.ldap_relations or []):
self.__update_relations(name, add_dns=self.__attributes.get(name, []))
def __update_relations(self, name, delete_dns=None, add_dns=None):
if name in (self.ldap_relations or []):
ldap.session.update_relations(self, name, delete_dns, add_dns)
def ldap_getattr(self, name):
return self.__attributes.get(name, [])
def ldap_setattr(self, name, values):
self.__update_relations(name, delete_dns=self.__attributes.get(name, []))
self.__changes[name] = [(MODIFY_REPLACE, values)]
self.__attributes[name] = values
self.__update_relations(name, add_dns=values)
def ldap_attradd(self, name, value):
self.__changes[name] = self.__changes.get(name, []) + [(MODIFY_ADD, [value])]
self.__attributes[name].append(value)
self.__update_relations(name, add_dns=[value])
def ldap_attrdel(self, name, value):
self.__changes[name] = self.__changes.get(name, []) + [(MODIFY_DELETE, [value])]
if value in self.__attributes.get(name, []):
self.__attributes[name].remove(value)
self.__update_relations(name, delete_dns=[value])
def __repr__(self):
name = '%s.%s'%(type(self).__module__, type(self).__name__)
if self.__ldap_dn is None:
return '<%s>'%name
return '<%s %s>'%(name, self.__ldap_dn)
def build_dn(self):
return '%s=%s,%s'%(self.ldap_dn_attribute, self.__attributes[self.ldap_dn_attribute][0], self.ldap_dn_base)
@property
def dn(self):
if self.__ldap_dn is not None:
return self.__ldap_dn
return self.build_dn()
@classmethod
def ldap_get(cls, dn):
obj = ldap.session.lookup(dn)
if obj is None:
conn = get_conn()
conn.search(dn, cls.ldap_filter)
if not conn.response:
return None
if len(conn.response) != 1:
raise Exception()
obj = ldap.session.register(cls(_ldap_dn=conn.response[0]['dn'], _ldap_attributes=conn.response[0]['attributes']))
return obj
@classmethod
def ldap_all(cls):
conn = get_conn()
conn.search(cls.ldap_base, cls.ldap_filter)
res = []
for entry in conn.response:
obj = ldap.session.lookup(entry['dn'])
if obj is None:
obj = ldap.session.register(cls(_ldap_dn=entry['dn'], _ldap_attributes=entry['attributes']))
res.append(obj)
return res
@classmethod
def ldap_filter_by(cls, **kwargs):
filters = [cls.ldap_filter]
for key, value in kwargs.items():
filters.append('(%s=%s)'%(key, escape_filter_chars(value)))
conn = get_conn()
conn.search(cls.ldap_base, '(&%s)'%(''.join(filters)))
res = []
for entry in conn.response:
obj = ldap.session.lookup(entry['dn'])
if obj is None:
obj = ldap.session.register(cls(_ldap_dn=entry['dn'], _ldap_attributes=entry['attributes']))
res.append(obj)
return res
def ldap_reset(self):
for name in (self.ldap_relations or []):
self.__update_relations(name, delete_dns=self.__attributes.get(name, []))
self.__changes = {}
self.__attributes = deepcopy(self.__ldap_attributes)
for name in (self.ldap_relations or {}):
self.__update_relations(name, add_dns=self.__attributes.get(name, []))
@property
def ldap_dirty(self):
return bool(self.__changes)
@property
def ldap_created(self):
return bool(self.__ldap_attributes)
def ldap_modify(self):
if not self.ldap_created:
raise Exception()
if not self.ldap_dirty:
return
conn = get_conn()
success = conn.modify(self.dn, self.__changes)
if not success:
raise Exception()
self.__changes = {}
self.__ldap_attributes = deepcopy(self.__attributes)
def ldap_create(self):
if self.ldap_created:
raise Exception()
conn = get_conn()
for key, func in (self.ldap_defaults or {}).items():
if key not in self.__attributes:
values = func()
self.__attributes[key] = values
self.__changes[key] = [(MODIFY_REPLACE, values)]
success = conn.add(self.dn, self.ldap_object_classes, self.__attributes)
if not success:
raise Exception()
self.__changes = {}
self.__ldap_attributes = deepcopy(self.__attributes)
def ldap_delete(self):
conn = get_conn()
success = conn.delete(self.dn)
if not success:
raise Exception()
self.__ldap_attributes = {}
from ldap3.utils.hashed import hashed
import secrets
class User(LDAPModel):
ldap_base = 'ou=users,dc=example,dc=com'
ldap_dn_attribute = 'uid'
ldap_dn_base = 'ou=users,dc=example,dc=com'
ldap_filter = '(objectClass=person)'
ldap_object_classes = ['top', 'inetOrgPerson', 'organizationalPerson', 'person', 'posixAccount']
uid = LDAPAttribute('uidNumber')
loginname = LDAPAttribute('uid')
displayname = LDAPAttribute('cn')
mail = LDAPAttribute('mail')
pwhash = LDAPAttribute('userPassword', default=lambda: hashed(HASHED_SALTED_SHA512, secrets.token_hex(128)))
def password(self, value):
self.pwhash = hashed(HASHED_SALTED_SHA512, value)
password = property(fset=password)
class Group(LDAPModel):
ldap_base = 'ou=groups,dc=example,dc=com'
ldap_filter = '(objectClass=groupOfUniqueNames)'
gid = LDAPAttribute('gidNumber')
name = LDAPAttribute('cn')
description = LDAPAttribute('description', default='')
member_dns= LDAPAttribute('uniqueMember', multi=True)
members = LDAPRelation('uniqueMember', User, backref='groups')
class Mail(LDAPModel):
ldap_base = 'ou=postfix,dc=example,dc=com'
ldap_dn_attribute = 'uid'
ldap_dn_base = 'ou=postfix,dc=example,dc=com'
ldap_filter = '(objectClass=postfixVirtual)'
ldap_object_classes = ['top', 'postfixVirtual']
from .views import bp as bp_ui
bp = [bp_ui]
from ldap3 import MODIFY_REPLACE
from flask import current_app
from uffd import ldap
class Mail():
def __init__(self, uid=None, destinations=None, receivers=None, dn=None):
self.uid = uid
self.receivers = receivers if receivers else []
self.destinations = destinations if destinations else []
self.dn = dn
@classmethod
def from_ldap(cls, ldapobject):
return Mail(
uid=ldapobject['uid'].value,
receivers=ldap.get_ldap_array_attribute_safe(ldapobject, 'mailacceptinggeneralid'),
destinations=ldap.get_ldap_array_attribute_safe(ldapobject, 'maildrop'),
dn=ldapobject.entry_dn,
)
@classmethod
def from_ldap_dn(cls, dn):
conn = ldap.get_conn()
conn.search(dn, '(objectClass=postfixVirtual)')
if not len(conn.entries) == 1:
return None
return Mail.from_ldap(conn.entries[0])
def to_ldap(self, new=False):
conn = ldap.get_conn()
if new:
attributes = {
'uid': self.uid,
# same as for update
'mailacceptinggeneralid': self.receivers,
'maildrop': self.destinations,
}
self.dn = ldap.mail_to_dn(self.uid)
result = conn.add(self.dn, current_app.config['MAIL_LDAP_OBJECTCLASSES'], attributes)
else:
attributes = {
'mailacceptinggeneralid': [(MODIFY_REPLACE, self.receivers)],
'maildrop': [(MODIFY_REPLACE, self.destinations)],
}
result = conn.modify(self.dn, attributes)
return result
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.ldap import get_conn, escape_filter_chars
from uffd.session import login_required, is_valid_session, get_current_user
from uffd.mail.models import Mail
bp = Blueprint("mail", __name__, template_folder='templates', url_prefix='/mail/')
@bp.before_request
@login_required()
def mail_acl(): #pylint: disable=inconsistent-return-statements
if not mail_acl_check():
flash('Access denied')
return redirect(url_for('index'))
def mail_acl_check():
return is_valid_session() and get_current_user().is_in_group(current_app.config['ACL_ADMIN_GROUP'])
@bp.route("/")
@register_navbar('Mail', icon='envelope', blueprint=bp, visible=mail_acl_check)
def index():
conn = get_conn()
conn.search(current_app.config["LDAP_BASE_MAIL"], '(objectclass=postfixVirtual)')
mails = []
for i in conn.entries:
mails.append(Mail.from_ldap(i))
return render_template('mail_list.html', mails=mails)
@bp.route("/<uid>")
@bp.route("/new")
def show(uid=None):
if not uid:
mail = Mail()
else:
conn = get_conn()
conn.search(current_app.config["LDAP_BASE_MAIL"], '(&(objectclass=postfixVirtual)(uid={}))'.format((escape_filter_chars(uid))))
assert len(conn.entries) == 1
mail = Mail.from_ldap(conn.entries[0])
return render_template('mail.html', mail=mail)
@bp.route("/<uid>/update", methods=['POST'])
@bp.route("/new", methods=['POST'])
@csrf_protect(blueprint=bp)
def update(uid=False):
conn = get_conn()
is_newmail = bool(not uid)
if is_newmail:
mail = Mail()
else:
conn = get_conn()
conn.search(current_app.config["LDAP_BASE_MAIL"], '(&(objectclass=postfixVirtual)(uid={}))'.format((escape_filter_chars(uid))))
assert len(conn.entries) == 1
mail = Mail.from_ldap(conn.entries[0])
if is_newmail:
mail.uid = request.form.get('mail-uid')
mail.receivers = request.form.get('mail-receivers', '').splitlines()
mail.destinations = request.form.get('mail-destinations', '').splitlines()
if mail.to_ldap(new=is_newmail):
flash('Mail mapping updated.')
else:
flash('Error updating mail mapping: {}'.format(conn.result['message']))
if is_newmail:
return redirect(url_for('mail.index'))
return redirect(url_for('mail.show', uid=mail.uid))
@bp.route("/<uid>/del")
@csrf_protect(blueprint=bp)
def delete(uid):
conn = get_conn()
conn.search(current_app.config["LDAP_BASE_MAIL"], '(&(objectclass=postfixVirtual)(uid={}))'.format((escape_filter_chars(uid))))
assert len(conn.entries) == 1
mail = conn.entries[0]
if conn.delete(mail.entry_dn):
flash('Deleted mail mapping.')
else:
flash('Could not delete mail mapping: {}'.format(conn.result['message']))
return redirect(url_for('mail.index'))
from .views import bp as _bp
bp = [_bp]