Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Select Git revision
Show changes
Showing
with 504 additions and 355 deletions
from .user import user_command
from .group import group_command
from .role import role_command
from .profile import profile_command
from .gendevcert import gendevcert_command
from .cleanup import cleanup_command
from .roles_update_all import roles_update_all_command
from .unique_email_addresses import unique_email_addresses_command
def init_app(app):
app.cli.add_command(user_command)
app.cli.add_command(group_command)
app.cli.add_command(role_command)
app.cli.add_command(gendevcert_command)
app.cli.add_command(profile_command)
app.cli.add_command(cleanup_command)
app.cli.add_command(roles_update_all_command)
app.cli.add_command(unique_email_addresses_command)
import click
from flask.cli import with_appcontext
from uffd.tasks import cleanup_task
from uffd.database import db
@click.command('cleanup', help='Cleanup expired data')
@with_appcontext
def cleanup_command():
cleanup_task.run()
db.session.commit()
import os
import click
from werkzeug.serving import make_ssl_devcert
@click.command("gendevcert", help='Generates a self-signed TLS certificate for development')
def gendevcert_command(): #pylint: disable=unused-variable
if os.path.exists('devcert.crt') or os.path.exists('devcert.key'):
print('Refusing to overwrite existing "devcert.crt"/"devcert.key" file!')
return
make_ssl_devcert('devcert')
print('Certificate written to "devcert.crt", private key to "devcert.key".')
print('Run `flask run --cert devcert.crt --key devcert.key` to use it.')
from flask import Blueprint, current_app
from flask import current_app
from flask.cli import AppGroup
from sqlalchemy.exc import IntegrityError
import click
from uffd.database import db
from .models import Group
from uffd.models import Group
bp = Blueprint('group_cli', __name__)
group_cli = AppGroup('group', help='Manage groups')
group_command = AppGroup('group', help='Manage groups')
@bp.record
def add_cli_commands(state):
state.app.cli.add_command(group_cli)
@group_cli.command(help='List names of all groups')
@group_command.command(help='List names of all groups')
def list():
with current_app.test_request_context():
for group in Group.query:
click.echo(group.name)
@group_cli.command(help='Show details of group')
@group_command.command(help='Show details of group')
@click.argument('name')
def show(name):
with current_app.test_request_context():
......@@ -32,20 +27,24 @@ def show(name):
click.echo(f'Description: {group.description}')
click.echo(f'Members: {", ".join([user.loginname for user in group.members])}')
@group_cli.command(help='Create new group')
@group_command.command(help='Create new group')
@click.argument('name')
@click.option('--description', default='', help='Set description text. Empty per default.')
def create(name, description):
with current_app.test_request_context():
group = Group(description=description)
if not group.set_name(name):
raise click.ClickException('Invalid name')
try:
db.session.add(Group(name=name, description=description))
db.session.add(group)
db.session.commit()
except IntegrityError as ex:
raise click.ClickException(f'Group creation failed: {ex}')
except IntegrityError:
# pylint: disable=raise-missing-from
raise click.ClickException(f'A group with name "{name}" already exists')
@group_cli.command(help='Update group attributes')
@group_command.command(help='Update group attributes')
@click.argument('name')
@click.option('--description', default='', help='Set description text.')
@click.option('--description', help='Set description text.')
def update(name, description):
with current_app.test_request_context():
group = Group.query.filter_by(name=name).one_or_none()
......@@ -55,7 +54,7 @@ def update(name, description):
group.description = description
db.session.commit()
@group_cli.command(help='Delete group')
@group_command.command(help='Delete group')
@click.argument('name')
def delete(name):
with current_app.test_request_context():
......
import os
from flask import current_app
from flask.cli import with_appcontext
import click
try:
from werkzeug.middleware.profiler import ProfilerMiddleware
except ImportError:
from werkzeug.contrib.profiler import ProfilerMiddleware
@click.command("profile", help='Runs app with profiler')
@with_appcontext
def profile_command(): #pylint: disable=unused-variable
# app.run() is silently ignored if executed from commands. We really want
# to do this, so we overwrite the check by overwriting the environment
# variable.
os.environ['FLASK_RUN_FROM_CLI'] = 'false'
current_app.wsgi_app = ProfilerMiddleware(current_app.wsgi_app, restrictions=[30])
current_app.run(debug=True)
from flask import Blueprint, current_app
from flask import current_app
from flask.cli import AppGroup
from sqlalchemy.exc import IntegrityError
import click
from uffd.database import db
from uffd.user.models import Group
from uffd.models import Group, Role, RoleGroup
from .models import Role, RoleGroup
bp = Blueprint('role_cli', __name__)
role_cli = AppGroup('role', help='Manage roles')
@bp.record
def add_cli_commands(state):
state.app.cli.add_command(role_cli)
role_command = AppGroup('role', help='Manage roles')
# pylint: disable=too-many-arguments,too-many-locals
......@@ -57,13 +50,13 @@ def update_attrs(role, description=None, default=None,
raise click.ClickException(f'Role {role_name} not found')
role.included_roles.remove(_role)
@role_cli.command(help='List names of all roles')
@role_command.command(help='List names of all roles')
def list():
with current_app.test_request_context():
for role in Role.query:
click.echo(role.name)
@role_cli.command(help='Show details of group')
@role_command.command(help='Show details of group')
@click.argument('name')
def show(name):
with current_app.test_request_context():
......@@ -80,7 +73,7 @@ def show(name):
click.echo(f'Direct members: {", ".join(sorted([user.loginname for user in role.members]))}')
click.echo(f'Effective members: {", ".join(sorted([user.loginname for user in role.members_effective]))}')
@role_cli.command(help='Create new role')
@role_command.command(help='Create new role')
@click.argument('name')
@click.option('--description', default='', help='Set description text.')
@click.option('--default/--no-default', default=False, help='Mark role as default or not. Non-service users are auto-added to default roles.')
......@@ -96,12 +89,13 @@ def create(name, description, default, moderator_group, add_group, add_role):
db.session.add(role)
role.update_member_groups()
db.session.commit()
except IntegrityError as ex:
raise click.ClickException(f'Role creation failed: {ex}')
except IntegrityError:
# pylint: disable=raise-missing-from
raise click.ClickException(f'A role with name "{name}" already exists')
@role_cli.command(help='Update role attributes')
@role_command.command(help='Update role attributes')
@click.argument('name')
@click.option('--description', default='', help='Set description text.')
@click.option('--description', help='Set description text.')
@click.option('--default/--no-default', default=None, help='Mark role as default or not. Non-service users are auto-added to default roles.')
@click.option('--moderator-group', metavar='GROUP_NAME', help='Set moderator group.')
@click.option('--no-moderator-group', is_flag=True, flag_value=True, default=False, help='Clear moderator group setting.')
......@@ -126,7 +120,7 @@ def update(name, description, default, moderator_group, no_moderator_group,
role.update_member_groups()
db.session.commit()
@role_cli.command(help='Delete role')
@role_command.command(help='Delete role')
@click.argument('name')
def delete(name):
with current_app.test_request_context():
......
import sys
from flask import current_app
from flask.cli import with_appcontext
import click
from uffd.database import db
from uffd.models import User
@click.command('roles-update-all', help='Update group memberships for all users based on their roles')
@click.option('--check-only', is_flag=True)
@with_appcontext
def roles_update_all_command(check_only): #pylint: disable=unused-variable
consistent = True
with current_app.test_request_context():
for user in User.query.all():
groups_added, groups_removed = user.update_groups()
if groups_added:
consistent = False
print('Adding groups [%s] to user %s'%(', '.join([group.name for group in groups_added]), user.loginname))
if groups_removed:
consistent = False
print('Removing groups [%s] from user %s'%(', '.join([group.name for group in groups_removed]), user.loginname))
if not check_only:
db.session.commit()
if check_only and not consistent:
print('No changes were made because --check-only is set')
print()
print('Error: Groups are not consistent with roles in database')
sys.exit(1)
import click
from flask.cli import with_appcontext
from sqlalchemy.exc import IntegrityError
from uffd.database import db
from uffd.models import User, UserEmail, FeatureFlag
# pylint completely fails to understand SQLAlchemy's query functions
# pylint: disable=no-member
@click.group('unique-email-addresses', help='Enable/disable e-mail address uniqueness checks')
def unique_email_addresses_command():
pass
@unique_email_addresses_command.command('enable')
@with_appcontext
def enable_unique_email_addresses_command():
if FeatureFlag.unique_email_addresses:
raise click.ClickException('Uniqueness checks for e-mail addresses are already enabled')
query = db.select([UserEmail.address_normalized, UserEmail.user_id])\
.group_by(UserEmail.address_normalized, UserEmail.user_id)\
.having(db.func.count(UserEmail.id.distinct()) > 1)
for address_normalized, user_id in db.session.execute(query).fetchall():
user = User.query.get(user_id)
user_emails = UserEmail.query.filter_by(address_normalized=address_normalized, user_id=user_id)
click.echo(f'User "{user.loginname}" has the same e-mail address multiple times:', err=True)
for user_email in user_emails:
if user_email.verified:
click.echo(f'- {user_email.address}', err=True)
else:
click.echo(f'- {user_email.address} (unverified)', err=True)
click.echo()
query = db.select([UserEmail.address_normalized, UserEmail.address])\
.where(UserEmail.verified)\
.group_by(UserEmail.address_normalized)\
.having(db.func.count(UserEmail.id.distinct()) > 1)
for address_normalized, address in db.session.execute(query).fetchall():
click.echo(f'E-mail address "{address}" is used by multiple users:', err=True)
user_emails = UserEmail.query.filter_by(address_normalized=address_normalized, verified=True)
for user_email in user_emails:
if user_email.address != address:
click.echo(f'- {user_email.user.loginname} ({user_email.address})', err=True)
else:
click.echo(f'- {user_email.user.loginname}', err=True)
click.echo()
try:
FeatureFlag.unique_email_addresses.enable()
except IntegrityError:
# pylint: disable=raise-missing-from
raise click.ClickException('''Some existing e-mail addresses violate uniqueness checks
You need to fix this manually in the admin interface. Then run this command
again to continue.''')
db.session.commit()
click.echo('Uniqueness checks for e-mail addresses enabled')
@unique_email_addresses_command.command('disable')
@with_appcontext
def disable_unique_email_addresses_command():
if not FeatureFlag.unique_email_addresses:
raise click.ClickException('Uniqueness checks for e-mail addresses are already disabled')
click.echo('''Please note that the option to disable email address uniqueness checks will
be remove in uffd v3.
''', err=True)
FeatureFlag.unique_email_addresses.disable()
db.session.commit()
click.echo('Uniqueness checks for e-mail addresses disabled')
from flask import Blueprint, current_app
from flask import current_app
from flask.cli import AppGroup
from sqlalchemy.exc import IntegrityError
import click
from uffd.role.models import Role
from uffd.database import db
from uffd.models import User, Role
from .models import User
bp = Blueprint('user_cli', __name__)
user_cli = AppGroup('user', help='Manage users')
@bp.record
def add_cli_commands(state):
state.app.cli.add_command(user_cli)
user_command = AppGroup('user', help='Manage users')
# pylint: disable=too-many-arguments
def update_attrs(user, mail=None, displayname=None, password=None,
prompt_password=False, clear_roles=False,
add_role=tuple(), remove_role=tuple()):
add_role=tuple(), remove_role=tuple(), deactivate=None):
if password is None and prompt_password:
password = click.prompt('Password', hide_input=True, confirmation_prompt='Confirm password')
if mail is not None and not user.set_mail(mail):
if mail is not None and not user.set_primary_email_address(mail):
raise click.ClickException('Invalid mail address')
if displayname is not None and not user.set_displayname(displayname):
raise click.ClickException('Invalid displayname')
if password is not None and not user.set_password(password):
raise click.ClickException('Invalid password')
if deactivate is not None:
user.is_deactivated = deactivate
if clear_roles:
user.roles.clear()
for role_name in add_role:
......@@ -42,13 +37,13 @@ def update_attrs(user, mail=None, displayname=None, password=None,
role.members.remove(user)
user.update_groups()
@user_cli.command(help='List login names of all users')
@user_command.command(help='List login names of all users')
def list():
with current_app.test_request_context():
for user in User.query:
click.echo(user.loginname)
@user_cli.command(help='Show details of user')
@user_command.command(help='Show details of user')
@click.argument('loginname')
def show(loginname):
with current_app.test_request_context():
......@@ -56,13 +51,14 @@ def show(loginname):
if user is None:
raise click.ClickException(f'User {loginname} not found')
click.echo(f'Loginname: {user.loginname}')
click.echo(f'Deactivated: {user.is_deactivated}')
click.echo(f'Displayname: {user.displayname}')
click.echo(f'Mail: {user.mail}')
click.echo(f'Mail: {user.primary_email.address}')
click.echo(f'Service User: {user.is_service_user}')
click.echo(f'Roles: {", ".join([role.name for role in user.roles])}')
click.echo(f'Groups: {", ".join([group.name for group in user.groups])}')
@user_cli.command(help='Create new user')
@user_command.command(help='Create new user')
@click.argument('loginname')
@click.option('--mail', required=True, metavar='EMAIL_ADDRESS', help='E-Mail address')
@click.option('--displayname', help='Set display name. Defaults to login name.')
......@@ -72,7 +68,8 @@ def show(loginname):
@click.option('--password', help='Password for SSO login. Login disabled if unset.')
@click.option('--prompt-password', is_flag=True, flag_value=True, default=False, help='Read password interactively from terminal.')
@click.option('--add-role', multiple=True, help='Add role to user. Repeat to add multiple roles.', metavar='ROLE_NAME')
def create(loginname, mail, displayname, service, password, prompt_password, add_role):
@click.option('--deactivate', is_flag=True, flag_value=True, default=None, help='Deactivate account.')
def create(loginname, mail, displayname, service, password, prompt_password, add_role, deactivate):
with current_app.test_request_context():
if displayname is None:
displayname = loginname
......@@ -81,12 +78,13 @@ def create(loginname, mail, displayname, service, password, prompt_password, add
raise click.ClickException('Invalid loginname')
try:
db.session.add(user)
update_attrs(user, mail, displayname, password, prompt_password, add_role=add_role)
update_attrs(user, mail, displayname, password, prompt_password, add_role=add_role, deactivate=deactivate)
db.session.commit()
except IntegrityError as ex:
raise click.ClickException(f'User creation failed: {ex}')
except IntegrityError:
# pylint: disable=raise-missing-from
raise click.ClickException('Login name or e-mail address is already in use')
@user_cli.command(help='Update user attributes and roles')
@user_command.command(help='Update user attributes and roles')
@click.argument('loginname')
@click.option('--mail', metavar='EMAIL_ADDRESS', help='Set e-mail address.')
@click.option('--displayname', help='Set display name.')
......@@ -95,15 +93,20 @@ def create(loginname, mail, displayname, service, password, prompt_password, add
@click.option('--clear-roles', is_flag=True, flag_value=True, default=False, help='Remove all roles from user. Executed before --add-role.')
@click.option('--add-role', multiple=True, help='Add role to user. Repeat to add multiple roles.')
@click.option('--remove-role', multiple=True, help='Remove role from user. Repeat to remove multiple roles.')
def update(loginname, mail, displayname, password, prompt_password, clear_roles, add_role, remove_role):
@click.option('--deactivate/--activate', default=None, help='Deactivate or reactivate account.')
def update(loginname, mail, displayname, password, prompt_password, clear_roles, add_role, remove_role, deactivate):
with current_app.test_request_context():
user = User.query.filter_by(loginname=loginname).one_or_none()
if user is None:
raise click.ClickException(f'User {loginname} not found')
update_attrs(user, mail, displayname, password, prompt_password, clear_roles, add_role, remove_role)
try:
update_attrs(user, mail, displayname, password, prompt_password, clear_roles, add_role, remove_role, deactivate)
db.session.commit()
except IntegrityError:
# pylint: disable=raise-missing-from
raise click.ClickException('E-mail address is already in use')
@user_cli.command(help='Delete user')
@user_command.command(help='Delete user')
@click.argument('loginname')
def delete(loginname):
with current_app.test_request_context():
......
File moved
from .csrf import bp as csrf_bp, csrf_protect
bp = [csrf_bp]
from collections import OrderedDict
from sqlalchemy import MetaData
from sqlalchemy import MetaData, event
from sqlalchemy.types import TypeDecorator, Text
from sqlalchemy.ext.mutable import MutableList
from flask_sqlalchemy import SQLAlchemy
from flask.json import JSONEncoder
convention = {
'ix': 'ix_%(column_0_label)s',
......@@ -15,11 +14,54 @@ metadata = MetaData(naming_convention=convention)
db = SQLAlchemy(metadata=metadata)
class SQLAlchemyJSON(JSONEncoder):
def default(self, o):
if isinstance(o, db.Model):
result = OrderedDict()
for key in o.__mapper__.c.keys():
result[key] = getattr(o, key)
return result
return JSONEncoder.default(self, o)
def enable_sqlite_foreign_key_support(dbapi_connection, connection_record):
# pylint: disable=unused-argument
cursor = dbapi_connection.cursor()
cursor.execute('PRAGMA foreign_keys=ON')
cursor.close()
# We want to enable SQLite foreign key support for app and test code, but not
# for migrations.
# The common way to add the handler to the Engine class (so it applies to all
# instances) would also affect the migrations. With flask_sqlalchemy v2.4 and
# newer we could overwrite SQLAlchemy.create_engine and add our handler there.
# However Debian Buster and Bullseye ship v2.1, so we do this here and call
# this function in create_app.
def customize_db_engine(engine):
if engine.name == 'sqlite':
event.listen(engine, 'connect', enable_sqlite_foreign_key_support)
elif engine.name in ('mysql', 'mariadb'):
@event.listens_for(engine, 'connect')
def receive_connect(dbapi_connection, connection_record): # pylint: disable=unused-argument
cursor = dbapi_connection.cursor()
cursor.execute('SHOW VARIABLES LIKE "character_set_connection"')
character_set_connection = cursor.fetchone()[1]
if character_set_connection != 'utf8mb4':
raise Exception(f'Unsupported connection charset "{character_set_connection}". Make sure to add "?charset=utf8mb4" to SQLALCHEMY_DATABASE_URI!')
cursor.execute('SHOW VARIABLES LIKE "collation_database"')
collation_database = cursor.fetchone()[1]
if collation_database != 'utf8mb4_nopad_bin':
raise Exception(f'Unsupported database collation "{collation_database}". Create the database with "CHARACTER SET utf8mb4 COLLATE utf8mb4_nopad_bin"!')
cursor.execute('SET NAMES utf8mb4 COLLATE utf8mb4_nopad_bin')
cursor.close()
class CommaSeparatedList(TypeDecorator):
# For some reason TypeDecorator.process_literal_param and
# TypeEngine.python_type are abstract but not actually required
# pylint: disable=abstract-method
impl = Text
cache_ok = True
def process_bind_param(self, value, dialect):
if value is None:
return None
for item in value:
if ',' in item:
raise ValueError('Items of comma-separated list must not contain commas')
return ','.join(value)
def process_result_value(self, value, dialect):
if value is None:
return None
return MutableList(value.split(','))
USER_GID=20001
# Service and non-service users must either have the same UID range or must not overlap
USER_MIN_UID=10000
USER_MAX_UID=18999
USER_SERVICE_MIN_UID=19000
USER_SERVICE_MAX_UID=19999
GROUP_MIN_GID=20000
GROUP_MAX_GID=49999
# The period of time that a login lasts for.
SESSION_LIFETIME_SECONDS=3600
# The period of time that the session cookie lasts for. This is refreshed on each page load.
PERMANENT_SESSION_LIFETIME=2678400
# CSRF protection
SESSION_COOKIE_SECURE=True
SESSION_COOKIE_HTTPONLY=True
......@@ -22,21 +30,33 @@ ACL_ADMIN_GROUP="uffd_admin"
# Group required to access selfservice functions (view selfservice, change profile/password/roles)
ACL_SELFSERVICE_GROUP="uffd_access"
# Group required to login
#ACL_ACCESS_GROUP="uffd_access" # if unset, the value of ACL_SELFSERVICE_GROUP is used
ACL_ACCESS_GROUP="uffd_access"
# Members can create invite links for signup
ACL_SIGNUP_GROUP="uffd_signup"
MAIL_SERVER='' # e.g. example.com
MAIL_PORT=465
MAIL_USERNAME='yourId@example.com'
MAIL_USERNAME='yourId@example.com' # set to empty string to disable authentication
MAIL_PASSWORD='*****'
MAIL_USE_STARTTLS=True
MAIL_FROM_ADDRESS='foo@bar.com'
# The following settings are not available when using a user connection
ENABLE_INVITE=True
ENABLE_PASSWORDRESET=True
ENABLE_ROLESELFSERVICE=True
# Set to a domain name (e.g. "remailer.example.com") to enable remailer.
# Requires special mail server configuration (see uffd-socketmapd). Can be
# enabled/disabled per-service in the service settings. If enabled, services
# no longer get real user mail addresses but instead special autogenerated
# addresses that are replaced with the real mail addresses by the mail server.
REMAILER_DOMAIN = ''
REMAILER_OLD_DOMAINS = []
# Secret used for construction and verification of remailer addresses.
# If None, the value of SECRET_KEY is used.
REMAILER_SECRET_KEY = None
# Set to list of user loginnames to limit remailer to a small list of users.
# Useful for debugging. If None remailer is active for all users (if
# configured and enabled for a service). This option is deprecated. Use the
# per-service setting in the web interface instead.
REMAILER_LIMIT_TO_USERS = None
# Do not enable this on a public service! There is no spam protection implemented at the moment.
SELF_SIGNUP=False
......@@ -52,17 +72,9 @@ SQLALCHEMY_TRACK_MODIFICATIONS=False
FOOTER_LINKS=[{"url": "https://example.com", "title": "example"}]
OAUTH2_CLIENTS={
#'test_client_id' : {'client_secret': 'random_secret', 'redirect_uris': ['https://example.com/oauth']},
# You can optionally restrict access to users with a certain group. Set 'required_group' to the name a group or a list of group names.
# ... 'required_group': 'test_access_group' ... only allows users with group "test_access_group" access
# ... 'required_group': ['groupa', ['groupb', 'groupc']] ... allows users with group "groupa" as well as users with both "groupb" and "groupc" access
# Set 'login_message' (or suffixed with a language code like 'login_message_de') to display a custom message on the login form.
}
API_CLIENTS={
#'token': {'scopes': ['checkpassword']}
}
# The default page after login or clicking the top left home button is the self-service
# page. If you would like it to be the services list instead, set this to True.
DEFAULT_PAGE_SERVICES=False
# Service overview page (disabled if empty)
SERVICES=[
......@@ -118,7 +130,11 @@ SERVICES_BANNER_PUBLIC=True
# Enable the service overview page for users who are not logged in
SERVICES_PUBLIC=True
# An optional banner that will be displayed above the login form
#LOGIN_BANNER='Always check the URL. Never enter your SSO password on any other site.'
BRANDING_LOGO_URL='/static/empty.png'
SITE_TITLE='uffd'
# Name and contact mail address are displayed to users in a few places (plain text only!)
ORGANISATION_NAME='Example Organisation'
......
# pylint: skip-file
from flask_babel import gettext as _
from warnings import warn
from flask import request, current_app
import urllib.parse
# WebAuthn support is optional because fido2 has a pretty unstable
# interface and might be difficult to install with the correct version
try:
import fido2 as __fido2
if __fido2.__version__.startswith('0.5.'):
from fido2.client import ClientData
from fido2.server import Fido2Server, RelyingParty as __PublicKeyCredentialRpEntity
from fido2.ctap2 import AttestationObject, AuthenticatorData, AttestedCredentialData
from fido2 import cbor
cbor.encode = cbor.dumps
cbor.decode = lambda arg: cbor.loads(arg)[0]
class PublicKeyCredentialRpEntity(__PublicKeyCredentialRpEntity):
def __init__(self, name, id):
super().__init__(id, name)
elif __fido2.__version__.startswith('0.9.'):
from fido2.client import ClientData
from fido2.webauthn import PublicKeyCredentialRpEntity
from fido2.server import Fido2Server
from fido2.ctap2 import AttestationObject, AuthenticatorData, AttestedCredentialData
from fido2 import cbor
elif __fido2.__version__.startswith('1.'):
from fido2.webauthn import PublicKeyCredentialRpEntity, CollectedClientData as ClientData, AttestationObject, AuthenticatorData, AttestedCredentialData
from fido2.server import Fido2Server
from fido2 import cbor
else:
raise ImportError(f'Unsupported fido2 version: {__fido2.__version__}')
def get_webauthn_server():
hostname = urllib.parse.urlsplit(request.url).hostname
return Fido2Server(PublicKeyCredentialRpEntity(id=current_app.config.get('MFA_RP_ID', hostname),
name=current_app.config['MFA_RP_NAME']))
WEBAUTHN_SUPPORTED = True
except ImportError as err:
warn(_('2FA WebAuthn support disabled because import of the fido2 module failed (%s)')%err)
WEBAUTHN_SUPPORTED = False
from .views import bp as _bp
bp = [_bp]
from .views import bp as bp_ui
bp = [bp_ui]
from .views import bp as _bp
bp = [_bp]
from warnings import warn
import urllib.parse
from flask import Blueprint, render_template, session, request, redirect, url_for, flash, current_app, abort
from flask_babel import gettext as _
from uffd.database import db
from uffd.mfa.models import MFAMethod, TOTPMethod, WebauthnMethod, RecoveryCodeMethod
from uffd.session.views import login_required, login_required_pre_mfa, set_request_user
from uffd.user.models import User
from uffd.csrf import csrf_protect
from uffd.secure_redirect import secure_local_redirect
from uffd.ratelimit import Ratelimit, format_delay
bp = Blueprint('mfa', __name__, template_folder='templates', url_prefix='/mfa/')
mfa_ratelimit = Ratelimit('mfa', 1*60, 3)
@bp.route('/', methods=['GET'])
@login_required()
def setup():
return render_template('mfa/setup.html')
@bp.route('/setup/disable', methods=['GET'])
@login_required()
def disable():
return render_template('mfa/disable.html')
@bp.route('/setup/disable', methods=['POST'])
@login_required()
@csrf_protect(blueprint=bp)
def disable_confirm():
MFAMethod.query.filter_by(user=request.user).delete()
db.session.commit()
request.user.update_groups()
db.session.commit()
return redirect(url_for('mfa.setup'))
@bp.route('/admin/<int:id>/disable')
@login_required()
@csrf_protect(blueprint=bp)
def admin_disable(id):
# Group cannot be checked with login_required kwarg, because the config
# variable is not available when the decorator is processed
if not request.user.is_in_group(current_app.config['ACL_ADMIN_GROUP']):
abort(403)
user = User.query.get(id)
MFAMethod.query.filter_by(user=user).delete()
user.update_groups()
db.session.commit()
flash(_('Two-factor authentication was reset'))
return redirect(url_for('user.show', id=id))
@bp.route('/setup/recovery', methods=['POST'])
@login_required()
@csrf_protect(blueprint=bp)
def setup_recovery():
for method in RecoveryCodeMethod.query.filter_by(user=request.user).all():
db.session.delete(method)
methods = []
for _ in range(10):
method = RecoveryCodeMethod(request.user)
methods.append(method)
db.session.add(method)
db.session.commit()
return render_template('mfa/setup_recovery.html', methods=methods)
@bp.route('/setup/totp', methods=['GET'])
@login_required()
def setup_totp():
method = TOTPMethod(request.user)
session['mfa_totp_key'] = method.key
return render_template('mfa/setup_totp.html', method=method, name=request.values['name'])
@bp.route('/setup/totp', methods=['POST'])
@login_required()
@csrf_protect(blueprint=bp)
def setup_totp_finish():
if not RecoveryCodeMethod.query.filter_by(user=request.user).all():
flash(_('Generate recovery codes first!'))
return redirect(url_for('mfa.setup'))
method = TOTPMethod(request.user, name=request.values['name'], key=session.pop('mfa_totp_key'))
if method.verify(request.form['code']):
db.session.add(method)
request.user.update_groups()
db.session.commit()
return redirect(url_for('mfa.setup'))
flash(_('Code is invalid'))
return redirect(url_for('mfa.setup_totp', name=request.values['name']))
@bp.route('/setup/totp/<int:id>/delete')
@login_required()
@csrf_protect(blueprint=bp)
def delete_totp(id): #pylint: disable=redefined-builtin
method = TOTPMethod.query.filter_by(user=request.user, id=id).first_or_404()
db.session.delete(method)
request.user.update_groups()
db.session.commit()
return redirect(url_for('mfa.setup'))
# WebAuthn support is optional because fido2 has a pretty unstable
# interface and might be difficult to install with the correct version
try:
import fido2
if fido2.__version__.startswith('0.5.'):
from fido2.client import ClientData
from fido2.server import Fido2Server, RelyingParty as PublicKeyCredentialRpEntity
from fido2.ctap2 import AttestationObject, AuthenticatorData
from fido2 import cbor
# pylint: disable=no-member
cbor.encode = cbor.dumps
cbor.decode = lambda arg: cbor.loads(arg)[0]
elif fido2.__version__.startswith('0.9.'):
from fido2.client import ClientData
from fido2.webauthn import PublicKeyCredentialRpEntity
from fido2.server import Fido2Server
from fido2.ctap2 import AttestationObject, AuthenticatorData
from fido2 import cbor
else:
raise ImportError(f'Unsupported fido2 version: {fido2.__version__}')
WEBAUTHN_SUPPORTED = True
except ImportError as err:
warn(_('2FA WebAuthn support disabled because import of the fido2 module failed (%s)')%err)
WEBAUTHN_SUPPORTED = False
bp.add_app_template_global(WEBAUTHN_SUPPORTED, name='webauthn_supported')
if WEBAUTHN_SUPPORTED:
def get_webauthn_server():
hostname = urllib.parse.urlsplit(request.url).hostname
return Fido2Server(PublicKeyCredentialRpEntity(current_app.config.get('MFA_RP_ID', hostname),
current_app.config['MFA_RP_NAME']))
@bp.route('/setup/webauthn/begin', methods=['POST'])
@login_required()
@csrf_protect(blueprint=bp)
def setup_webauthn_begin():
if not RecoveryCodeMethod.query.filter_by(user=request.user).all():
abort(403)
methods = WebauthnMethod.query.filter_by(user=request.user).all()
creds = [method.cred for method in methods]
server = get_webauthn_server()
registration_data, state = server.register_begin(
{
"id": str(request.user.id).encode(),
"name": request.user.loginname,
"displayName": request.user.displayname,
},
creds,
user_verification='discouraged',
)
session["webauthn-state"] = state
return cbor.encode(registration_data)
@bp.route('/setup/webauthn/complete', methods=['POST'])
@login_required()
@csrf_protect(blueprint=bp)
def setup_webauthn_complete():
server = get_webauthn_server()
data = cbor.decode(request.get_data())
client_data = ClientData(data["clientDataJSON"])
att_obj = AttestationObject(data["attestationObject"])
auth_data = server.register_complete(session["webauthn-state"], client_data, att_obj)
method = WebauthnMethod(request.user, auth_data.credential_data, name=data['name'])
db.session.add(method)
request.user.update_groups()
db.session.commit()
return cbor.encode({"status": "OK"})
@bp.route("/auth/webauthn/begin", methods=["POST"])
@login_required_pre_mfa(no_redirect=True)
def auth_webauthn_begin():
server = get_webauthn_server()
creds = [method.cred for method in request.user_pre_mfa.mfa_webauthn_methods]
if not creds:
abort(404)
auth_data, state = server.authenticate_begin(creds, user_verification='discouraged')
session["webauthn-state"] = state
return cbor.encode(auth_data)
@bp.route("/auth/webauthn/complete", methods=["POST"])
@login_required_pre_mfa(no_redirect=True)
def auth_webauthn_complete():
server = get_webauthn_server()
creds = [method.cred for method in request.user_pre_mfa.mfa_webauthn_methods]
if not creds:
abort(404)
data = cbor.decode(request.get_data())
credential_id = data["credentialId"]
client_data = ClientData(data["clientDataJSON"])
auth_data = AuthenticatorData(data["authenticatorData"])
signature = data["signature"]
# authenticate_complete() (as of python-fido2 v0.5.0, the version in Debian Buster)
# does not check signCount, although the spec recommends it
server.authenticate_complete(
session.pop("webauthn-state"),
creds,
credential_id,
client_data,
auth_data,
signature,
)
session['user_mfa'] = True
set_request_user()
return cbor.encode({"status": "OK"})
@bp.route('/setup/webauthn/<int:id>/delete')
@login_required()
@csrf_protect(blueprint=bp)
def delete_webauthn(id): #pylint: disable=redefined-builtin
method = WebauthnMethod.query.filter_by(user=request.user, id=id).first_or_404()
db.session.delete(method)
request.user.update_groups()
db.session.commit()
return redirect(url_for('mfa.setup'))
@bp.route('/auth', methods=['GET'])
@login_required_pre_mfa()
def auth():
if not request.user_pre_mfa.mfa_enabled:
session['user_mfa'] = True
set_request_user()
if session.get('user_mfa'):
return secure_local_redirect(request.values.get('ref', url_for('index')))
return render_template('mfa/auth.html', ref=request.values.get('ref'))
@bp.route('/auth', methods=['POST'])
@login_required_pre_mfa()
def auth_finish():
delay = mfa_ratelimit.get_delay(request.user_pre_mfa.id)
if delay:
flash(_('We received too many invalid attempts! Please wait at least %s.')%format_delay(delay))
return redirect(url_for('mfa.auth', ref=request.values.get('ref')))
for method in request.user_pre_mfa.mfa_totp_methods:
if method.verify(request.form['code']):
session['user_mfa'] = True
set_request_user()
return secure_local_redirect(request.values.get('ref', url_for('index')))
for method in request.user_pre_mfa.mfa_recovery_codes:
if method.verify(request.form['code']):
db.session.delete(method)
db.session.commit()
session['user_mfa'] = True
set_request_user()
if len(request.user_pre_mfa.mfa_recovery_codes) <= 1:
flash(_('You have exhausted your recovery codes. Please generate new ones now!'))
return redirect(url_for('mfa.setup'))
if len(request.user_pre_mfa.mfa_recovery_codes) <= 5:
flash(_('You only have a few recovery codes remaining. Make sure to generate new ones before they run out.'))
return redirect(url_for('mfa.setup'))
return secure_local_redirect(request.values.get('ref', url_for('index')))
mfa_ratelimit.log(request.user_pre_mfa.id)
flash(_('Two-factor authentication failed'))
return redirect(url_for('mfa.auth', ref=request.values.get('ref')))
......@@ -3,6 +3,7 @@ from alembic import context
from sqlalchemy import engine_from_config, pool
from logging.config import fileConfig
import logging
import click
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
......@@ -74,6 +75,14 @@ def run_migrations_online():
target_metadata=target_metadata,
process_revision_directives=process_revision_directives,
**current_app.extensions['migrate'].configure_args)
if engine.name in ('mysql', 'mariadb'):
character_set_connection = connection.execute('SHOW VARIABLES LIKE "character_set_connection"').fetchone()[1]
if character_set_connection != 'utf8mb4':
raise click.ClickException(f'Unsupported connection charset "{character_set_connection}". Make sure to add "?charset=utf8mb4" to SQLALCHEMY_DATABASE_URI!')
collation_database = connection.execute('SHOW VARIABLES LIKE "collation_database"').fetchone()[1]
if collation_database != 'utf8mb4_nopad_bin':
raise click.ClickException(f'Unsupported database collation "{collation_database}". Create the database with "CHARACTER SET utf8mb4 COLLATE utf8mb4_nopad_bin"!')
connection.execute('SET NAMES utf8mb4 COLLATE utf8mb4_nopad_bin')
try:
with context.begin_transaction():
......
"""OpenID Connect Support
Revision ID: 01fdd7820f29
Revises: a9b449776953
Create Date: 2023-11-09 16:52:20.860871
"""
from alembic import op
import sqlalchemy as sa
import datetime
import secrets
import math
import logging
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend # Only required for Buster
import jwt
# pyjwt v1.7.x compat (Buster/Bullseye)
if not hasattr(jwt, 'get_algorithm_by_name'):
jwt.get_algorithm_by_name = lambda name: jwt.algorithms.get_default_algorithms()[name]
# revision identifiers, used by Alembic.
revision = '01fdd7820f29'
down_revision = 'a9b449776953'
branch_labels = None
depends_on = None
logger = logging.getLogger('alembic.runtime.migration.01fdd7820f29')
def token_with_alphabet(alphabet, nbytes=None):
'''Return random text token that consists of characters from `alphabet`'''
if nbytes is None:
nbytes = max(secrets.DEFAULT_ENTROPY, 32)
nbytes_per_char = math.log(len(alphabet), 256)
nchars = math.ceil(nbytes / nbytes_per_char)
return ''.join([secrets.choice(alphabet) for _ in range(nchars)])
def token_urlfriendly(nbytes=None):
'''Return random text token that is urlsafe and works around common parsing bugs'''
alphabet = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
return token_with_alphabet(alphabet, nbytes=nbytes)
def upgrade():
logger.info('Generating 3072 bit RSA key pair (RS256) for OpenID Connect support ...')
private_key = rsa.generate_private_key(public_exponent=65537, key_size=3072, backend=default_backend())
meta = sa.MetaData(bind=op.get_bind())
oauth2_key = op.create_table('oauth2_key',
sa.Column('id', sa.String(length=64), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('active', sa.Boolean(create_constraint=False), nullable=False),
sa.Column('algorithm', sa.String(length=32), nullable=False),
sa.Column('private_key_jwk', sa.Text(), nullable=False),
sa.Column('public_key_jwk', sa.Text(), nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('pk_oauth2_key'))
)
algorithm = jwt.get_algorithm_by_name('RS256')
op.bulk_insert(oauth2_key, [{
'id': token_urlfriendly(),
'created': datetime.datetime.utcnow(),
'active': True,
'algorithm': 'RS256',
'private_key_jwk': algorithm.to_jwk(private_key),
'public_key_jwk': algorithm.to_jwk(private_key.public_key()),
}])
with op.batch_alter_table('oauth2grant', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_oauth2grant_code'))
oauth2grant = sa.Table('oauth2grant', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('client_db_id', sa.Integer(), nullable=False),
sa.Column('code', sa.String(length=255), nullable=False),
sa.Column('redirect_uri', sa.String(length=255), nullable=False),
sa.Column('expires', sa.DateTime(), nullable=False),
sa.Column('_scopes', sa.Text(), nullable=False),
sa.ForeignKeyConstraint(['client_db_id'], ['oauth2client.db_id'], name=op.f('fk_oauth2grant_client_db_id_oauth2client'), onupdate='CASCADE', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_oauth2grant_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_oauth2grant'))
)
with op.batch_alter_table('oauth2grant', copy_from=oauth2grant) as batch_op:
batch_op.add_column(sa.Column('nonce', sa.Text(), nullable=True))
batch_op.add_column(sa.Column('claims', sa.Text(), nullable=True))
batch_op.alter_column('redirect_uri', existing_type=sa.VARCHAR(length=255), nullable=True)
oauth2token = sa.Table('oauth2token', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('client_db_id', sa.Integer(), nullable=False),
sa.Column('token_type', sa.String(length=40), nullable=False),
sa.Column('access_token', sa.String(length=255), nullable=False),
sa.Column('refresh_token', sa.String(length=255), nullable=False),
sa.Column('expires', sa.DateTime(), nullable=False),
sa.Column('_scopes', sa.Text(), nullable=False),
sa.ForeignKeyConstraint(['client_db_id'], ['oauth2client.db_id'], name=op.f('fk_oauth2token_client_db_id_oauth2client'), onupdate='CASCADE', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_oauth2token_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_oauth2token')),
sa.UniqueConstraint('access_token', name=op.f('uq_oauth2token_access_token')),
sa.UniqueConstraint('refresh_token', name=op.f('uq_oauth2token_refresh_token'))
)
with op.batch_alter_table('oauth2token', copy_from=oauth2token) as batch_op:
batch_op.add_column(sa.Column('claims', sa.Text(), nullable=True))
def downgrade():
meta = sa.MetaData(bind=op.get_bind())
oauth2token = sa.Table('oauth2token', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('client_db_id', sa.Integer(), nullable=False),
sa.Column('token_type', sa.String(length=40), nullable=False),
sa.Column('access_token', sa.String(length=255), nullable=False),
sa.Column('refresh_token', sa.String(length=255), nullable=False),
sa.Column('expires', sa.DateTime(), nullable=False),
sa.Column('_scopes', sa.Text(), nullable=False),
sa.Column('claims', sa.Text(), nullable=True),
sa.ForeignKeyConstraint(['client_db_id'], ['oauth2client.db_id'], name=op.f('fk_oauth2token_client_db_id_oauth2client'), onupdate='CASCADE', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_oauth2token_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_oauth2token')),
sa.UniqueConstraint('access_token', name=op.f('uq_oauth2token_access_token')),
sa.UniqueConstraint('refresh_token', name=op.f('uq_oauth2token_refresh_token'))
)
with op.batch_alter_table('oauth2token', copy_from=oauth2token) as batch_op:
batch_op.drop_column('claims')
oauth2grant = sa.Table('oauth2grant', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('client_db_id', sa.Integer(), nullable=False),
sa.Column('code', sa.String(length=255), nullable=False),
sa.Column('redirect_uri', sa.String(length=255), nullable=True),
sa.Column('nonce', sa.Text(), nullable=True),
sa.Column('expires', sa.DateTime(), nullable=False),
sa.Column('_scopes', sa.Text(), nullable=False),
sa.Column('claims', sa.Text(), nullable=True),
sa.ForeignKeyConstraint(['client_db_id'], ['oauth2client.db_id'], name=op.f('fk_oauth2grant_client_db_id_oauth2client'), onupdate='CASCADE', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_oauth2grant_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_oauth2grant'))
)
with op.batch_alter_table('oauth2grant', copy_from=oauth2grant) as batch_op:
batch_op.alter_column('redirect_uri', existing_type=sa.VARCHAR(length=255), nullable=False)
batch_op.drop_column('claims')
batch_op.drop_column('nonce')
batch_op.create_index(batch_op.f('ix_oauth2grant_code'), ['code'], unique=False)
op.drop_table('oauth2_key')