From 6cacdf546f1fe5af8306b218deada3b2329ea2e5 Mon Sep 17 00:00:00 2001 From: Julian Rother <julianr@fsmpi.rwth-aachen.de> Date: Tue, 22 Jun 2021 17:05:53 +0200 Subject: [PATCH] Implemented mfa requirements for RoleGroup --- ...dded_rolegroup_requires_mfa_and_cleanup.py | 39 ++++++ tests/test_invite.py | 24 ++-- tests/test_role.py | 123 +++++++++++++----- tests/test_rolemod.py | 8 +- uffd/mfa/models.py | 2 + uffd/mfa/templates/mfa/auth.html | 4 +- uffd/mfa/templates/mfa/setup.html | 20 +-- uffd/mfa/views.py | 43 +++--- uffd/role/models.py | 59 +++++---- uffd/role/templates/role/show.html | 6 + uffd/role/views.py | 9 +- .../templates/selfservice/self.html | 7 + 12 files changed, 235 insertions(+), 109 deletions(-) create mode 100644 migrations/versions/bad6fc529510_added_rolegroup_requires_mfa_and_cleanup.py diff --git a/migrations/versions/bad6fc529510_added_rolegroup_requires_mfa_and_cleanup.py b/migrations/versions/bad6fc529510_added_rolegroup_requires_mfa_and_cleanup.py new file mode 100644 index 00000000..d18822db --- /dev/null +++ b/migrations/versions/bad6fc529510_added_rolegroup_requires_mfa_and_cleanup.py @@ -0,0 +1,39 @@ +"""added RoleGroup.requires_mfa and cleanup + +Revision ID: bad6fc529510 +Revises: aff5f350dcdf +Create Date: 2021-06-22 15:58:10.515330 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = 'bad6fc529510' +down_revision = 'aff5f350dcdf' +branch_labels = None +depends_on = None + +def upgrade(): + meta = sa.MetaData(bind=op.get_bind()) + table = sa.Table('role-group', meta, + sa.Column('role_id', sa.Integer(), nullable=False), + sa.Column('dn', sa.String(128), nullable=False), + sa.ForeignKeyConstraint(['role_id'], ['role.id'], name=op.f('fk_role-group_role_id_role')), + sa.PrimaryKeyConstraint('role_id', 'dn', name=op.f('pk_role-group')) + ) + with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op: + batch_op.alter_column('dn', new_column_name='group_dn', nullable=False) + batch_op.add_column(sa.Column('requires_mfa', sa.Boolean(name=op.f('ck_role-group_requires_mfa')), nullable=False, default=False)) + +def downgrade(): + meta = sa.MetaData(bind=op.get_bind()) + table = sa.Table('role-group', meta, + sa.Column('role_id', sa.Integer(), nullable=False), + sa.Column('group_dn', sa.String(128), nullable=False), + sa.ForeignKeyConstraint(['role_id'], ['role.id'], name=op.f('fk_role-group_role_id_role')) + ) + with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op: + batch_op.add_column(sa.Column('id', sa.INTEGER(), nullable=False, autoincrement=True, primary_key=True)) + batch_op.alter_column('group_dn', new_column_name='dn', nullable=True) + batch_op.add_column(sa.Column('requires_mfa', sa.Boolean(name=op.f('ck_role-group_requires_mfa')), nullable=False, default=False)) diff --git a/tests/test_invite.py b/tests/test_invite.py index b7384d4d..a6b770aa 100644 --- a/tests/test_invite.py +++ b/tests/test_invite.py @@ -11,7 +11,7 @@ from uffd.ldap import ldap from uffd import create_app, db from uffd.invite.models import Invite, InviteGrant, InviteSignup from uffd.user.models import User, Group -from uffd.role.models import Role +from uffd.role.models import Role, RoleGroup from uffd.session.views import login_get_user from utils import dump, UffdTestCase, db_flush @@ -102,12 +102,12 @@ class TestInviteGrantModel(UffdTestCase): def test_success(self): user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') group0 = Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com') - role0 = Role(name='baserole', groups=[group0]) + role0 = Role(name='baserole', groups={group0: RoleGroup(group=group0)}) db.session.add(role0) user.roles.add(role0) user.update_groups() group1 = Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com') - role1 = Role(name='testrole1', groups=[group1]) + role1 = Role(name='testrole1', groups={group1: RoleGroup(group=group1)}) db.session.add(role1) role2 = Role(name='testrole2') db.session.add(role2) @@ -140,7 +140,7 @@ class TestInviteGrantModel(UffdTestCase): def test_inactive(self): user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') group = Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com') - role = Role(name='testrole1', groups=[group]) + role = Role(name='testrole1', groups={group: RoleGroup(group=group)}) db.session.add(role) invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), roles=[role], single_use=True, used=True) self.assertFalse(invite.active) @@ -175,8 +175,8 @@ class TestInviteGrantModel(UffdTestCase): class TestInviteSignupModel(UffdTestCase): def create_base_roles(self): baserole = Role(name='base', is_default=True) - baserole.groups.add(Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com')) - baserole.groups.add(Group.query.get('cn=users,ou=groups,dc=example,dc=com')) + baserole.groups[Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com')] = RoleGroup() + baserole.groups[Group.query.get('cn=users,ou=groups,dc=example,dc=com')] = RoleGroup() db.session.add(baserole) db.session.commit() @@ -186,7 +186,7 @@ class TestInviteSignupModel(UffdTestCase): base_group1 = Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com') base_group2 = Group.query.get('cn=users,ou=groups,dc=example,dc=com') group = Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com') - role1 = Role(name='testrole1', groups=[group]) + role1 = Role(name='testrole1', groups={group: RoleGroup(group=group)}) db.session.add(role1) role2 = Role(name='testrole2') db.session.add(role2) @@ -564,12 +564,12 @@ class TestInviteUseViews(UffdTestCase): def test_grant(self): user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') group0 = Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com') - role0 = Role(name='baserole', groups=[group0]) + role0 = Role(name='baserole', groups={group0: RoleGroup(group=group0)}) db.session.add(role0) user.roles.add(role0) user.update_groups() group1 = Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com') - role1 = Role(name='testrole1', groups=[group1]) + role1 = Role(name='testrole1', groups={group1: RoleGroup(group=group1)}) db.session.add(role1) role2 = Role(name='testrole2') db.session.add(role2) @@ -637,11 +637,11 @@ class TestInviteUseViews(UffdTestCase): def test_signup(self): baserole = Role(name='base', is_default=True) - baserole.groups.add(Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com')) - baserole.groups.add(Group.query.get('cn=users,ou=groups,dc=example,dc=com')) + baserole.groups[Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com')] = RoleGroup() + baserole.groups[Group.query.get('cn=users,ou=groups,dc=example,dc=com')] = RoleGroup() db.session.add(baserole) group = Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com') - role1 = Role(name='testrole1', groups=[group]) + role1 = Role(name='testrole1', groups={group: RoleGroup(group=group)}) db.session.add(role1) role2 = Role(name='testrole2') db.session.add(role2) diff --git a/tests/test_role.py b/tests/test_role.py index 42be5827..dea9e108 100644 --- a/tests/test_role.py +++ b/tests/test_role.py @@ -1,5 +1,6 @@ import datetime import time +import unittest from flask import url_for, session @@ -8,49 +9,107 @@ from uffd.ldap import ldap from uffd import user from uffd.user.models import User, Group -from uffd.role.models import Role +from uffd.role.models import flatten_recursive, Role, RoleGroup +from uffd.mfa.models import TOTPMethod from uffd import create_app, db from utils import dump, UffdTestCase +class TestPrimitives(unittest.TestCase): + def test_flatten_recursive(self): + class Node: + def __init__(self, *neighbors): + self.neighbors = set(neighbors or set()) + + cycle = Node() + cycle.neighbors.add(cycle) + common = Node(cycle) + intermediate1 = Node(common) + intermediate2 = Node(common, intermediate1) + stub = Node() + backref = Node() + start1 = Node(intermediate1, intermediate2, stub, backref) + backref.neighbors.add(start1) + start2 = Node() + self.assertSetEqual(flatten_recursive({start1, start2}, 'neighbors'), + {start1, start2, backref, stub, intermediate1, intermediate2, common, cycle}) + self.assertSetEqual(flatten_recursive(set(), 'neighbors'), set()) + class TestUserRoleAttributes(UffdTestCase): def test_roles_effective(self): - user1 = User.query.get('uid=testuser,ou=users,dc=example,dc=com') - user1.update_groups() + for user in User.query.filter_by(loginname='service').all(): + ldap.session.delete(user) + ldap.session.add(User(loginname='service', is_service_user=True, mail='service@example.com', displayname='Service')) + ldap.session.commit() + user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') + service_user = User.query.get('uid=service,ou=users,dc=example,dc=com') + included_by_default_role = Role(name='included_by_default') + default_role = Role(name='default', is_default=True, included_roles=[included_by_default_role]) included_role = Role(name='included') - default_role = Role(name='default', is_default=True) - role1 = Role(name='role1', members=[user1], included_roles=[included_role]) - role2 = Role(name='role2', included_roles=[included_role]) - db.session.add_all([included_role, default_role, role1, role2]) - self.assertSetEqual(user1.roles_effective, {included_role, default_role, role1}) - included_role.included_roles.append(role2) - self.assertSetEqual(user1.roles_effective, {included_role, default_role, role1, role2}) + cycle_role = Role(name='cycle') + direct_role1 = Role(name='role1', members=[user, service_user], included_roles=[included_role, cycle_role]) + direct_role2 = Role(name='role2', members=[user, service_user], included_roles=[included_role]) + cycle_role.included_roles.append(direct_role1) + db.session.add_all([included_by_default_role, default_role, included_role, cycle_role, direct_role1, direct_role2]) + self.assertSetEqual(user.roles_effective, {direct_role1, direct_role2, cycle_role, included_role, default_role, included_by_default_role}) + self.assertSetEqual(service_user.roles_effective, {direct_role1, direct_role2, cycle_role, included_role}) + ldap.session.delete(service_user) + ldap.session.commit() + + def test_compute_groups(self): + user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') + group1 = Group.query.get('cn=users,ou=groups,dc=example,dc=com') + group2 = Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com') + role1 = Role(name='role1', groups={group1: RoleGroup(group=group1)}) + role2 = Role(name='role2', groups={group1: RoleGroup(group=group1), group2: RoleGroup(group=group2)}) + db.session.add_all([role1, role2]) + self.assertSetEqual(user.compute_groups(), set()) + role1.members.add(user) + role2.members.add(user) + self.assertSetEqual(user.compute_groups(), {group1, group2}) + role2.groups[group2].requires_mfa = True + self.assertSetEqual(user.compute_groups(), {group1}) + db.session.add(TOTPMethod(user=user)) + self.assertSetEqual(user.compute_groups(), {group1, group2}) def test_update_groups(self): - user1 = User.query.get('uid=testuser,ou=users,dc=example,dc=com') - user1.update_groups() - self.assertSetEqual(set(user1.groups), set()) + user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') group1 = Group.query.get('cn=users,ou=groups,dc=example,dc=com') group2 = Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com') - baserole = Role(name='base', groups=[group1]) - role1 = Role(name='role1', groups=[group2], members=[user1]) - db.session.add_all([baserole, role1]) - user1.update_groups() - self.assertSetEqual(set(user1.groups), {group2}) - role1.included_roles.append(baserole) - user1.update_groups() - self.assertSetEqual(set(user1.groups), {group1, group2}) + role1 = Role(name='role1', members=[user], groups={group1: RoleGroup(group=group1)}) + role2 = Role(name='role2', groups={group2: RoleGroup(group=group2)}) + db.session.add_all([role1, role2]) + user.groups = {group2} + groups_added, groups_removed = user.update_groups() + self.assertSetEqual(groups_added, {group1}) + self.assertSetEqual(groups_removed, {group2}) + self.assertSetEqual(set(user.groups), {group1}) + groups_added, groups_removed = user.update_groups() + self.assertSetEqual(groups_added, set()) + self.assertSetEqual(groups_removed, set()) + self.assertSetEqual(set(user.groups), {group1}) class TestRoleModel(UffdTestCase): def test_members_effective(self): + for user in User.query.filter_by(loginname='service').all(): + ldap.session.delete(user) + ldap.session.add(User(loginname='service', is_service_user=True, mail='service@example.com', displayname='Service')) + ldap.session.commit() user1 = User.query.get('uid=testuser,ou=users,dc=example,dc=com') user2 = User.query.get('uid=testadmin,ou=users,dc=example,dc=com') - included_role = Role(name='included', members=[user1]) - default_role = Role(name='default', is_default=True) - role1 = Role(name='role1', included_roles=[included_role], members=[user2]) - self.assertSetEqual(included_role.members_effective, {user1, user2}) + service = User.query.get('uid=service,ou=users,dc=example,dc=com') + included_by_default_role = Role(name='included_by_default') + default_role = Role(name='default', is_default=True, included_roles=[included_by_default_role]) + included_role = Role(name='included') + direct_role = Role(name='direct', members=[user1, user2, service], included_roles=[included_role]) + empty_role = Role(name='empty', included_roles=[included_role]) + self.assertSetEqual(included_by_default_role.members_effective, {user1, user2}) self.assertSetEqual(default_role.members_effective, {user1, user2}) - self.assertSetEqual(role1.members_effective, {user2}) + self.assertSetEqual(included_role.members_effective, {user1, user2, service}) + self.assertSetEqual(direct_role.members_effective, {user1, user2, service}) + self.assertSetEqual(empty_role.members_effective, set()) + ldap.session.delete(service) + ldap.session.commit() def test_included_roles_recursive(self): baserole = Role(name='base') @@ -66,8 +125,8 @@ class TestRoleModel(UffdTestCase): def test_groups_effective(self): group1 = Group.query.get('cn=users,ou=groups,dc=example,dc=com') group2 = Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com') - baserole = Role(name='base', groups=[group1]) - role1 = Role(name='role1', groups=[group2], included_roles=[baserole]) + baserole = Role(name='base', groups={group1: RoleGroup(group=group1)}) + role1 = Role(name='role1', groups={group2: RoleGroup(group=group2)}, included_roles=[baserole]) self.assertSetEqual(baserole.groups_effective, {group1}) self.assertSetEqual(role1.groups_effective, {group1, group2}) @@ -79,14 +138,14 @@ class TestRoleModel(UffdTestCase): group1 = Group.query.get('cn=users,ou=groups,dc=example,dc=com') group2 = Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com') group3 = Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com') - baserole = Role(name='base', members=[user1], groups=[group1]) - role1 = Role(name='role1', members=[user2], groups=[group2], included_roles=[baserole]) + baserole = Role(name='base', members=[user1], groups={group1: RoleGroup(group=group1)}) + role1 = Role(name='role1', members=[user2], groups={group2: RoleGroup(group=group2)}, included_roles=[baserole]) db.session.add_all([baserole, role1]) baserole.update_member_groups() role1.update_member_groups() self.assertSetEqual(set(user1.groups), {group1}) self.assertSetEqual(set(user2.groups), {group1, group2}) - baserole.groups.add(group3) + baserole.groups[group3] = RoleGroup() baserole.update_member_groups() self.assertSetEqual(set(user1.groups), {group1, group3}) self.assertSetEqual(set(user2.groups), {group1, group2, group3}) @@ -127,7 +186,7 @@ class TestRoleViews(UffdTestCase): role = Role(name='base', description='Base role description') db.session.add(role) db.session.commit() - role.groups.add(Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com')) + role.groups[Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com')] = RoleGroup() db.session.commit() self.assertEqual(role.name, 'base') self.assertEqual(role.description, 'Base role description') diff --git a/tests/test_rolemod.py b/tests/test_rolemod.py index f1baba1a..c178200a 100644 --- a/tests/test_rolemod.py +++ b/tests/test_rolemod.py @@ -1,7 +1,7 @@ from flask import url_for from uffd.user.models import User, Group -from uffd.role.models import Role +from uffd.role.models import Role, RoleGroup from uffd.database import db from uffd.ldap import ldap @@ -136,7 +136,8 @@ class TestRolemodViews(UffdTestCase): def test_delete_member(self): self.login() - role = Role(name='test', moderator_group=Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com'), groups=[Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com')]) + role = Role(name='test', moderator_group=Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com')) + role.groups[Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com')] = RoleGroup() db.session.add(role) role.members.add(User.query.get('uid=testadmin,ou=users,dc=example,dc=com')) db.session.commit() @@ -158,7 +159,8 @@ class TestRolemodViews(UffdTestCase): def test_delete_member_nomember(self): self.login() - role = Role(name='test', moderator_group=Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com'), groups=[Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com')]) + role = Role(name='test', moderator_group=Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com')) + role.groups[Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com')] = RoleGroup() db.session.add(role) db.session.commit() user = User.query.get('uid=testadmin,ou=users,dc=example,dc=com') diff --git a/uffd/mfa/models.py b/uffd/mfa/models.py index e525a378..541f231e 100644 --- a/uffd/mfa/models.py +++ b/uffd/mfa/models.py @@ -18,6 +18,8 @@ from ldapalchemy.dbutils import DBRelationship from uffd.database import db from uffd.user.models import User +User.mfa_enabled = property(lambda user: bool(user.mfa_totp_methods or user.mfa_webauthn_methods)) + class MFAType(enum.Enum): RECOVERY_CODE = 0 TOTP = 1 diff --git a/uffd/mfa/templates/mfa/auth.html b/uffd/mfa/templates/mfa/auth.html index 9feb0101..6b8afbfb 100644 --- a/uffd/mfa/templates/mfa/auth.html +++ b/uffd/mfa/templates/mfa/auth.html @@ -11,7 +11,7 @@ <div class="col-12 mb-3"> <h2 class="text-center">Two-Factor Authentication</h2> </div> - {% if webauthn_methods %} + {% if request.user_pre_mfa.mfa_webauthn_methods %} <noscript> <div class="form-group col-12"> <div id="webauthn-nojs" class="alert alert-warning" role="alert">Enable javascript for authentication with U2F/FIDO2 devices</div> @@ -42,7 +42,7 @@ </div> </form> -{% if webauthn_supported and webauthn_methods %} +{% if webauthn_supported and request.user_pre_mfa.mfa_webauthn_methods %} <script src="{{ url_for('static', filename="cbor.js") }}"></script> <script> function begin_webauthn() { diff --git a/uffd/mfa/templates/mfa/setup.html b/uffd/mfa/templates/mfa/setup.html index 93b634b2..707ebb5a 100644 --- a/uffd/mfa/templates/mfa/setup.html +++ b/uffd/mfa/templates/mfa/setup.html @@ -8,9 +8,9 @@ mfa_enabled: The user has setup at least one two-factor method. Two-factor authe #} -{% set mfa_enabled = totp_methods or webauthn_methods %} -{% set mfa_init = not recovery_methods and not mfa_enabled %} -{% set mfa_setup = recovery_methods and not mfa_enabled %} +{% set mfa_enabled = request.user.mfa_enabled %} +{% set mfa_init = not request.user.mfa_recovery_codes and not mfa_enabled %} +{% set mfa_setup = request.user.mfa_recovery_codes and not mfa_enabled %} {% block body %} <p>Two-factor authentication is currently <strong>{{ 'enabled' if mfa_enabled else 'disabled' }}</strong>. @@ -57,9 +57,9 @@ You need to setup at least one authentication method to enable two-factor authen {% endif %} </form> - {% if recovery_methods %} - <p>{{ recovery_methods|length }} recovery codes remain</p> - {% elif not recovery_methods and mfa_enabled %} + {% if request.user.mfa_recovery_codes %} + <p>{{ request.user.mfa_recovery_codes|length }} recovery codes remain</p> + {% elif not request.user.mfa_recovery_codes and mfa_enabled %} <p><strong>You have no remaining recovery codes.</strong></p> {% endif %} </div> @@ -93,14 +93,14 @@ You need to setup at least one authentication method to enable two-factor authen </tr> </thead> <tbody> - {% for method in totp_methods %} + {% for method in request.user.mfa_totp_methods %} <tr> <td>{{ method.name }}</td> <td>{{ method.created.strftime('%b %d, %Y') }}</td> <td><a class="btn btn-sm btn-danger float-right" href="{{ url_for('mfa.delete_totp', id=method.id) }}">Delete</a></td> </tr> {% endfor %} - {% if not totp_methods %} + {% if not request.user.mfa_totp_methods %} <tr class="table-secondary"> <td colspan=3 class="text-center">No authenticator apps registered yet</td> </tr> @@ -148,14 +148,14 @@ You need to setup at least one authentication method to enable two-factor authen </tr> </thead> <tbody> - {% for method in webauthn_methods %} + {% for method in request.user.mfa_webauthn_methods %} <tr> <td>{{ method.name }}</td> <td>{{ method.created.strftime('%b %d, %Y') }}</td> <td><a class="btn btn-sm btn-danger float-right" href="{{ url_for('mfa.delete_webauthn', id=method.id) }}">Delete</a></td> </tr> {% endfor %} - {% if not webauthn_methods %} + {% if not request.user.mfa_webauthn_methods %} <tr class="table-secondary"> <td colspan=3 class="text-center">No U2F/FIDO2 devices registered yet</td> </tr> diff --git a/uffd/mfa/views.py b/uffd/mfa/views.py index 7b95479f..406566ff 100644 --- a/uffd/mfa/views.py +++ b/uffd/mfa/views.py @@ -4,6 +4,7 @@ import urllib.parse from flask import Blueprint, render_template, session, request, redirect, url_for, flash, current_app, abort from uffd.database import db +from uffd.ldap import ldap from uffd.mfa.models import MFAMethod, TOTPMethod, WebauthnMethod, RecoveryCodeMethod from uffd.session.views import login_required, login_required_pre_mfa, set_request_user from uffd.user.models import User @@ -17,10 +18,7 @@ mfa_ratelimit = Ratelimit('mfa', 1*60, 3) @bp.route('/', methods=['GET']) @login_required() def setup(): - recovery_methods = RecoveryCodeMethod.query.filter_by(dn=request.user.dn).all() - totp_methods = TOTPMethod.query.filter_by(dn=request.user.dn).all() - webauthn_methods = WebauthnMethod.query.filter_by(dn=request.user.dn).all() - return render_template('mfa/setup.html', totp_methods=totp_methods, webauthn_methods=webauthn_methods, recovery_methods=recovery_methods) + return render_template('mfa/setup.html') @bp.route('/setup/disable', methods=['GET']) @login_required() @@ -33,6 +31,8 @@ def disable(): def disable_confirm(): MFAMethod.query.filter_by(dn=request.user.dn).delete() db.session.commit() + request.user.update_groups() + ldap.session.commit() return redirect(url_for('mfa.setup')) @bp.route('/admin/<int:uid>/disable') @@ -47,6 +47,8 @@ def admin_disable(uid): user = User.query.filter_by(uid=uid).one() MFAMethod.query.filter_by(dn=user.dn).delete() db.session.commit() + user.update_groups() + ldap.session.commit() flash('Two-factor authentication was reset') return redirect(url_for('user.show', uid=uid)) @@ -82,6 +84,8 @@ def setup_totp_finish(): if method.verify(request.form['code']): db.session.add(method) db.session.commit() + request.user.update_groups() + ldap.session.commit() return redirect(url_for('mfa.setup')) flash('Code is invalid') return redirect(url_for('mfa.setup_totp', name=request.values['name'])) @@ -93,6 +97,8 @@ def delete_totp(id): #pylint: disable=redefined-builtin method = TOTPMethod.query.filter_by(dn=request.user.dn, id=id).first_or_404() db.session.delete(method) db.session.commit() + request.user.update_groups() + ldap.session.commit() return redirect(url_for('mfa.setup')) # WebAuthn support is optional because fido2 has a pretty unstable @@ -147,14 +153,15 @@ if WEBAUTHN_SUPPORTED: method = WebauthnMethod(request.user, auth_data.credential_data, name=data['name']) db.session.add(method) db.session.commit() + request.user.update_groups() + ldap.session.commit() return cbor.dumps({"status": "OK"}) @bp.route("/auth/webauthn/begin", methods=["POST"]) @login_required_pre_mfa(no_redirect=True) def auth_webauthn_begin(): server = get_webauthn_server() - methods = WebauthnMethod.query.filter_by(dn=request.user_pre_mfa.dn).all() - creds = [method.cred for method in methods] + creds = [method.cred for method in request.user_pre_mfa.mfa_webauthn_methods] if not creds: abort(404) auth_data, state = server.authenticate_begin(creds, user_verification='discouraged') @@ -165,8 +172,7 @@ if WEBAUTHN_SUPPORTED: @login_required_pre_mfa(no_redirect=True) def auth_webauthn_complete(): server = get_webauthn_server() - methods = WebauthnMethod.query.filter_by(dn=request.user_pre_mfa.dn).all() - creds = [method.cred for method in methods] + creds = [method.cred for method in request.user_pre_mfa.mfa_webauthn_methods] if not creds: abort(404) data = cbor.loads(request.get_data())[0] @@ -195,22 +201,19 @@ def delete_webauthn(id): #pylint: disable=redefined-builtin method = WebauthnMethod.query.filter_by(dn=request.user.dn, id=id).first_or_404() db.session.delete(method) db.session.commit() + request.user.update_groups() + ldap.session.commit() return redirect(url_for('mfa.setup')) @bp.route('/auth', methods=['GET']) @login_required_pre_mfa() def auth(): - recovery_methods = RecoveryCodeMethod.query.filter_by(dn=request.user_pre_mfa.dn).all() - totp_methods = TOTPMethod.query.filter_by(dn=request.user_pre_mfa.dn).all() - webauthn_methods = WebauthnMethod.query.filter_by(dn=request.user_pre_mfa.dn).all() - if not totp_methods and not webauthn_methods: + if not request.user_pre_mfa.mfa_enabled: session['user_mfa'] = True set_request_user() - if session.get('user_mfa'): return redirect(request.values.get('ref', url_for('index'))) - return render_template('mfa/auth.html', ref=request.values.get('ref'), totp_methods=totp_methods, - webauthn_methods=webauthn_methods, recovery_methods=recovery_methods) + return render_template('mfa/auth.html', ref=request.values.get('ref')) @bp.route('/auth', methods=['POST']) @login_required_pre_mfa() @@ -219,23 +222,21 @@ def auth_finish(): if delay: flash('We received too many invalid attempts! Please wait at least %s.'%format_delay(delay)) return redirect(url_for('mfa.auth', ref=request.values.get('ref'))) - recovery_methods = RecoveryCodeMethod.query.filter_by(dn=request.user_pre_mfa.dn).all() - totp_methods = TOTPMethod.query.filter_by(dn=request.user_pre_mfa.dn).all() - for method in totp_methods: + for method in request.user_pre_mfa.mfa_totp_methods: if method.verify(request.form['code']): session['user_mfa'] = True set_request_user() return redirect(request.values.get('ref', url_for('index'))) - for method in recovery_methods: + for method in request.user_pre_mfa.mfa_recovery_codes: if method.verify(request.form['code']): db.session.delete(method) db.session.commit() session['user_mfa'] = True set_request_user() - if len(recovery_methods) <= 1: + if len(request.user_pre_mfa.mfa_recovery_codes) <= 1: flash('You have exhausted your recovery codes. Please generate new ones now!') return redirect(url_for('mfa.setup')) - if len(recovery_methods) <= 5: + if len(request.user_pre_mfa.mfa_recovery_codes) <= 5: flash('You only have a few recovery codes remaining. Make sure to generate new ones before they run out.') return redirect(url_for('mfa.setup')) return redirect(request.values.get('ref', url_for('index'))) diff --git a/uffd/role/models.py b/uffd/role/models.py index 97e3f2c1..580a8a62 100644 --- a/uffd/role/models.py +++ b/uffd/role/models.py @@ -1,5 +1,6 @@ from sqlalchemy import Column, String, Integer, Text, ForeignKey, Boolean from sqlalchemy.orm import relationship +from sqlalchemy.orm.collections import MappedCollection, collection from sqlalchemy.ext.declarative import declared_attr from ldapalchemy.dbutils import DBRelationship @@ -9,16 +10,12 @@ from uffd.user.models import User, Group class RoleGroup(db.Model): __tablename__ = 'role-group' - __table_args__ = ( - db.UniqueConstraint('dn', 'role_id'), - ) + role_id = Column(Integer(), ForeignKey('role.id'), primary_key=True) + group_dn = Column(String(128), primary_key=True) + requires_mfa = Column(Boolean(), default=False, nullable=False) - id = Column(Integer(), primary_key=True, autoincrement=True) - dn = Column(String(128)) - - @declared_attr - def role_id(self): - return Column(ForeignKey('role.id')) + role = relationship('Role') + group = DBRelationship('group_dn', Group) class RoleUser(db.Model): __tablename__ = 'role-user' @@ -50,19 +47,27 @@ def flatten_recursive(objs, attr): new_objs.add(obj) return objs -def get_roles_effective(user): +def get_user_roles_effective(user): base = set(user.roles) if not user.is_service_user: base.update(Role.query.filter_by(is_default=True)) return flatten_recursive(base, 'included_roles') -User.roles_effective = property(get_roles_effective) +User.roles_effective = property(get_user_roles_effective) -def update_user_groups(user): - current_groups = set(user.groups) +def compute_user_groups(user, ignore_mfa=False): groups = set() for role in user.roles_effective: - groups.update(role.groups) + for group in role.groups: + if ignore_mfa or not role.groups[group].requires_mfa or user.mfa_enabled: + groups.add(group) + return groups + +User.compute_groups = compute_user_groups + +def update_user_groups(user): + current_groups = set(user.groups) + groups = user.compute_groups() if groups == current_groups: return set(), set() groups_added = groups - current_groups @@ -74,6 +79,15 @@ def update_user_groups(user): User.update_groups = update_user_groups +class RoleGroupMap(MappedCollection): + def __init__(self): + super().__init__(keyfunc=lambda rolegroup: rolegroup.group) + + @collection.internally_instrumented + def __setitem__(self, key, value, _sa_initiator=None): + value.group = key + super().__setitem__(key, value, _sa_initiator) + class Role(db.Model): __tablename__ = 'role' id = Column(Integer(), primary_key=True, autoincrement=True) @@ -91,8 +105,7 @@ class Role(db.Model): db_members = relationship("RoleUser", backref="role", cascade="all, delete-orphan") members = DBRelationship('db_members', User, RoleUser, backattr='role', backref='roles') - db_groups = relationship("RoleGroup", backref="role", cascade="all, delete-orphan") - groups = DBRelationship('db_groups', Group, RoleGroup, backattr='role', backref='roles') + groups = relationship('RoleGroup', collection_class=RoleGroupMap, cascade='all, delete-orphan') # Roles that are managed externally (e.g. by Ansible) can be locked to # prevent accidental editing of name, moderator group, included roles @@ -103,14 +116,12 @@ class Role(db.Model): @property def members_effective(self): - users = set(self.members) - for role in flatten_recursive(self.including_roles, 'including_roles'): - users.update(role.members) - if self.is_default: - for user in User.query.all(): - if not user.is_service_user: - users.add(user) - return users + members = set() + for role in flatten_recursive([self], 'including_roles'): + members.update(role.members) + if role.is_default: + members.update([user for user in User.query.all() if not user.is_service_user]) + return members @property def included_roles_recursive(self): diff --git a/uffd/role/templates/role/show.html b/uffd/role/templates/role/show.html index d1b38130..beb6c051 100644 --- a/uffd/role/templates/role/show.html +++ b/uffd/role/templates/role/show.html @@ -126,6 +126,7 @@ Name, moderator group, included roles and groups of this role are managed extern <th scope="col"></th> <th scope="col">name</th> <th scope="col">description</th> + <th scope="col">2FA required</th> </tr> </thead> <tbody> @@ -144,6 +145,11 @@ Name, moderator group, included roles and groups of this role are managed extern <td> {{ group.description }} </td> + <td> + <div class="form-check"> + <input class="form-check-input" type="checkbox" id="group-mfa-{{ group.gid }}-checkbox" name="group-mfa-{{ group.gid }}" value="1" aria-label="enabled" {% if group in role.groups and role.groups[group].requires_mfa %}checked{% endif %} {{ 'disabled' if role.locked }}> + </div> + </td> </tr> {% endfor %} </tbody> diff --git a/uffd/role/views.py b/uffd/role/views.py index 267c4e7b..32d0593a 100644 --- a/uffd/role/views.py +++ b/uffd/role/views.py @@ -5,7 +5,7 @@ import click from uffd.navbar import register_navbar from uffd.csrf import csrf_protect -from uffd.role.models import Role +from uffd.role.models import Role, RoleGroup from uffd.user.models import User, Group from uffd.session import login_required from uffd.database import db @@ -83,11 +83,10 @@ def update(roleid=None): role.included_roles.append(included_role) elif included_role in role.included_roles: role.included_roles.remove(included_role) + role.groups.clear() for group in Group.query.all(): - if request.values.get('group-{}'.format(group.gid), False): - role.groups.add(group) - else: - role.groups.discard(group) + if request.values.get(f'group-{group.gid}', False): + role.groups[group] = RoleGroup(requires_mfa=bool(request.values.get(f'group-mfa-{group.gid}', ''))) role.update_member_groups() db.session.commit() ldap.session.commit() diff --git a/uffd/selfservice/templates/selfservice/self.html b/uffd/selfservice/templates/selfservice/self.html index 85d7b903..96bdaafd 100644 --- a/uffd/selfservice/templates/selfservice/self.html +++ b/uffd/selfservice/templates/selfservice/self.html @@ -2,6 +2,13 @@ {% block body %} +{% if not user.mfa_enabled and user.compute_groups() != user.compute_groups(ignore_mfa=True) %} +<div class="alert alert-warning" role="alert"> + Some permissions require you to setup two-factor authentication. + These permissions are not in effect until you do that. +</div> +{% endif %} + <div class="btn-toolbar"> <a class="ml-auto mb-3 btn btn-primary" href="{{ url_for('mfa.setup') }}">Manage two-factor authentication</a> </div> -- GitLab