Commit 6cacdf54 authored by Julian's avatar Julian
Browse files

Implemented mfa requirements for RoleGroup

parent fc6cc381
Pipeline #6791 passed with stage
in 3 minutes and 59 seconds
"""added RoleGroup.requires_mfa and cleanup
Revision ID: bad6fc529510
Revises: aff5f350dcdf
Create Date: 2021-06-22 15:58:10.515330
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'bad6fc529510'
down_revision = 'aff5f350dcdf'
branch_labels = None
depends_on = None
def upgrade():
meta = sa.MetaData(bind=op.get_bind())
table = sa.Table('role-group', meta,
sa.Column('role_id', sa.Integer(), nullable=False),
sa.Column('dn', sa.String(128), nullable=False),
sa.ForeignKeyConstraint(['role_id'], ['role.id'], name=op.f('fk_role-group_role_id_role')),
sa.PrimaryKeyConstraint('role_id', 'dn', name=op.f('pk_role-group'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
batch_op.alter_column('dn', new_column_name='group_dn', nullable=False)
batch_op.add_column(sa.Column('requires_mfa', sa.Boolean(name=op.f('ck_role-group_requires_mfa')), nullable=False, default=False))
def downgrade():
meta = sa.MetaData(bind=op.get_bind())
table = sa.Table('role-group', meta,
sa.Column('role_id', sa.Integer(), nullable=False),
sa.Column('group_dn', sa.String(128), nullable=False),
sa.ForeignKeyConstraint(['role_id'], ['role.id'], name=op.f('fk_role-group_role_id_role'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
batch_op.add_column(sa.Column('id', sa.INTEGER(), nullable=False, autoincrement=True, primary_key=True))
batch_op.alter_column('group_dn', new_column_name='dn', nullable=True)
batch_op.add_column(sa.Column('requires_mfa', sa.Boolean(name=op.f('ck_role-group_requires_mfa')), nullable=False, default=False))
......@@ -11,7 +11,7 @@ from uffd.ldap import ldap
from uffd import create_app, db
from uffd.invite.models import Invite, InviteGrant, InviteSignup
from uffd.user.models import User, Group
from uffd.role.models import Role
from uffd.role.models import Role, RoleGroup
from uffd.session.views import login_get_user
from utils import dump, UffdTestCase, db_flush
......@@ -102,12 +102,12 @@ class TestInviteGrantModel(UffdTestCase):
def test_success(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
group0 = Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com')
role0 = Role(name='baserole', groups=[group0])
role0 = Role(name='baserole', groups={group0: RoleGroup(group=group0)})
db.session.add(role0)
user.roles.add(role0)
user.update_groups()
group1 = Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com')
role1 = Role(name='testrole1', groups=[group1])
role1 = Role(name='testrole1', groups={group1: RoleGroup(group=group1)})
db.session.add(role1)
role2 = Role(name='testrole2')
db.session.add(role2)
......@@ -140,7 +140,7 @@ class TestInviteGrantModel(UffdTestCase):
def test_inactive(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
group = Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com')
role = Role(name='testrole1', groups=[group])
role = Role(name='testrole1', groups={group: RoleGroup(group=group)})
db.session.add(role)
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), roles=[role], single_use=True, used=True)
self.assertFalse(invite.active)
......@@ -175,8 +175,8 @@ class TestInviteGrantModel(UffdTestCase):
class TestInviteSignupModel(UffdTestCase):
def create_base_roles(self):
baserole = Role(name='base', is_default=True)
baserole.groups.add(Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com'))
baserole.groups.add(Group.query.get('cn=users,ou=groups,dc=example,dc=com'))
baserole.groups[Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com')] = RoleGroup()
baserole.groups[Group.query.get('cn=users,ou=groups,dc=example,dc=com')] = RoleGroup()
db.session.add(baserole)
db.session.commit()
......@@ -186,7 +186,7 @@ class TestInviteSignupModel(UffdTestCase):
base_group1 = Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com')
base_group2 = Group.query.get('cn=users,ou=groups,dc=example,dc=com')
group = Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com')
role1 = Role(name='testrole1', groups=[group])
role1 = Role(name='testrole1', groups={group: RoleGroup(group=group)})
db.session.add(role1)
role2 = Role(name='testrole2')
db.session.add(role2)
......@@ -564,12 +564,12 @@ class TestInviteUseViews(UffdTestCase):
def test_grant(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
group0 = Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com')
role0 = Role(name='baserole', groups=[group0])
role0 = Role(name='baserole', groups={group0: RoleGroup(group=group0)})
db.session.add(role0)
user.roles.add(role0)
user.update_groups()
group1 = Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com')
role1 = Role(name='testrole1', groups=[group1])
role1 = Role(name='testrole1', groups={group1: RoleGroup(group=group1)})
db.session.add(role1)
role2 = Role(name='testrole2')
db.session.add(role2)
......@@ -637,11 +637,11 @@ class TestInviteUseViews(UffdTestCase):
def test_signup(self):
baserole = Role(name='base', is_default=True)
baserole.groups.add(Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com'))
baserole.groups.add(Group.query.get('cn=users,ou=groups,dc=example,dc=com'))
baserole.groups[Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com')] = RoleGroup()
baserole.groups[Group.query.get('cn=users,ou=groups,dc=example,dc=com')] = RoleGroup()
db.session.add(baserole)
group = Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com')
role1 = Role(name='testrole1', groups=[group])
role1 = Role(name='testrole1', groups={group: RoleGroup(group=group)})
db.session.add(role1)
role2 = Role(name='testrole2')
db.session.add(role2)
......
import datetime
import time
import unittest
from flask import url_for, session
......@@ -8,49 +9,107 @@ from uffd.ldap import ldap
from uffd import user
from uffd.user.models import User, Group
from uffd.role.models import Role
from uffd.role.models import flatten_recursive, Role, RoleGroup
from uffd.mfa.models import TOTPMethod
from uffd import create_app, db
from utils import dump, UffdTestCase
class TestPrimitives(unittest.TestCase):
def test_flatten_recursive(self):
class Node:
def __init__(self, *neighbors):
self.neighbors = set(neighbors or set())
cycle = Node()
cycle.neighbors.add(cycle)
common = Node(cycle)
intermediate1 = Node(common)
intermediate2 = Node(common, intermediate1)
stub = Node()
backref = Node()
start1 = Node(intermediate1, intermediate2, stub, backref)
backref.neighbors.add(start1)
start2 = Node()
self.assertSetEqual(flatten_recursive({start1, start2}, 'neighbors'),
{start1, start2, backref, stub, intermediate1, intermediate2, common, cycle})
self.assertSetEqual(flatten_recursive(set(), 'neighbors'), set())
class TestUserRoleAttributes(UffdTestCase):
def test_roles_effective(self):
user1 = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
user1.update_groups()
for user in User.query.filter_by(loginname='service').all():
ldap.session.delete(user)
ldap.session.add(User(loginname='service', is_service_user=True, mail='service@example.com', displayname='Service'))
ldap.session.commit()
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
service_user = User.query.get('uid=service,ou=users,dc=example,dc=com')
included_by_default_role = Role(name='included_by_default')
default_role = Role(name='default', is_default=True, included_roles=[included_by_default_role])
included_role = Role(name='included')
default_role = Role(name='default', is_default=True)
role1 = Role(name='role1', members=[user1], included_roles=[included_role])
role2 = Role(name='role2', included_roles=[included_role])
db.session.add_all([included_role, default_role, role1, role2])
self.assertSetEqual(user1.roles_effective, {included_role, default_role, role1})
included_role.included_roles.append(role2)
self.assertSetEqual(user1.roles_effective, {included_role, default_role, role1, role2})
cycle_role = Role(name='cycle')
direct_role1 = Role(name='role1', members=[user, service_user], included_roles=[included_role, cycle_role])
direct_role2 = Role(name='role2', members=[user, service_user], included_roles=[included_role])
cycle_role.included_roles.append(direct_role1)
db.session.add_all([included_by_default_role, default_role, included_role, cycle_role, direct_role1, direct_role2])
self.assertSetEqual(user.roles_effective, {direct_role1, direct_role2, cycle_role, included_role, default_role, included_by_default_role})
self.assertSetEqual(service_user.roles_effective, {direct_role1, direct_role2, cycle_role, included_role})
ldap.session.delete(service_user)
ldap.session.commit()
def test_compute_groups(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
group1 = Group.query.get('cn=users,ou=groups,dc=example,dc=com')
group2 = Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com')
role1 = Role(name='role1', groups={group1: RoleGroup(group=group1)})
role2 = Role(name='role2', groups={group1: RoleGroup(group=group1), group2: RoleGroup(group=group2)})
db.session.add_all([role1, role2])
self.assertSetEqual(user.compute_groups(), set())
role1.members.add(user)
role2.members.add(user)
self.assertSetEqual(user.compute_groups(), {group1, group2})
role2.groups[group2].requires_mfa = True
self.assertSetEqual(user.compute_groups(), {group1})
db.session.add(TOTPMethod(user=user))
self.assertSetEqual(user.compute_groups(), {group1, group2})
def test_update_groups(self):
user1 = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
user1.update_groups()
self.assertSetEqual(set(user1.groups), set())
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
group1 = Group.query.get('cn=users,ou=groups,dc=example,dc=com')
group2 = Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com')
baserole = Role(name='base', groups=[group1])
role1 = Role(name='role1', groups=[group2], members=[user1])
db.session.add_all([baserole, role1])
user1.update_groups()
self.assertSetEqual(set(user1.groups), {group2})
role1.included_roles.append(baserole)
user1.update_groups()
self.assertSetEqual(set(user1.groups), {group1, group2})
role1 = Role(name='role1', members=[user], groups={group1: RoleGroup(group=group1)})
role2 = Role(name='role2', groups={group2: RoleGroup(group=group2)})
db.session.add_all([role1, role2])
user.groups = {group2}
groups_added, groups_removed = user.update_groups()
self.assertSetEqual(groups_added, {group1})
self.assertSetEqual(groups_removed, {group2})
self.assertSetEqual(set(user.groups), {group1})
groups_added, groups_removed = user.update_groups()
self.assertSetEqual(groups_added, set())
self.assertSetEqual(groups_removed, set())
self.assertSetEqual(set(user.groups), {group1})
class TestRoleModel(UffdTestCase):
def test_members_effective(self):
for user in User.query.filter_by(loginname='service').all():
ldap.session.delete(user)
ldap.session.add(User(loginname='service', is_service_user=True, mail='service@example.com', displayname='Service'))
ldap.session.commit()
user1 = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
user2 = User.query.get('uid=testadmin,ou=users,dc=example,dc=com')
included_role = Role(name='included', members=[user1])
default_role = Role(name='default', is_default=True)
role1 = Role(name='role1', included_roles=[included_role], members=[user2])
self.assertSetEqual(included_role.members_effective, {user1, user2})
service = User.query.get('uid=service,ou=users,dc=example,dc=com')
included_by_default_role = Role(name='included_by_default')
default_role = Role(name='default', is_default=True, included_roles=[included_by_default_role])
included_role = Role(name='included')
direct_role = Role(name='direct', members=[user1, user2, service], included_roles=[included_role])
empty_role = Role(name='empty', included_roles=[included_role])
self.assertSetEqual(included_by_default_role.members_effective, {user1, user2})
self.assertSetEqual(default_role.members_effective, {user1, user2})
self.assertSetEqual(role1.members_effective, {user2})
self.assertSetEqual(included_role.members_effective, {user1, user2, service})
self.assertSetEqual(direct_role.members_effective, {user1, user2, service})
self.assertSetEqual(empty_role.members_effective, set())
ldap.session.delete(service)
ldap.session.commit()
def test_included_roles_recursive(self):
baserole = Role(name='base')
......@@ -66,8 +125,8 @@ class TestRoleModel(UffdTestCase):
def test_groups_effective(self):
group1 = Group.query.get('cn=users,ou=groups,dc=example,dc=com')
group2 = Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com')
baserole = Role(name='base', groups=[group1])
role1 = Role(name='role1', groups=[group2], included_roles=[baserole])
baserole = Role(name='base', groups={group1: RoleGroup(group=group1)})
role1 = Role(name='role1', groups={group2: RoleGroup(group=group2)}, included_roles=[baserole])
self.assertSetEqual(baserole.groups_effective, {group1})
self.assertSetEqual(role1.groups_effective, {group1, group2})
......@@ -79,14 +138,14 @@ class TestRoleModel(UffdTestCase):
group1 = Group.query.get('cn=users,ou=groups,dc=example,dc=com')
group2 = Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com')
group3 = Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com')
baserole = Role(name='base', members=[user1], groups=[group1])
role1 = Role(name='role1', members=[user2], groups=[group2], included_roles=[baserole])
baserole = Role(name='base', members=[user1], groups={group1: RoleGroup(group=group1)})
role1 = Role(name='role1', members=[user2], groups={group2: RoleGroup(group=group2)}, included_roles=[baserole])
db.session.add_all([baserole, role1])
baserole.update_member_groups()
role1.update_member_groups()
self.assertSetEqual(set(user1.groups), {group1})
self.assertSetEqual(set(user2.groups), {group1, group2})
baserole.groups.add(group3)
baserole.groups[group3] = RoleGroup()
baserole.update_member_groups()
self.assertSetEqual(set(user1.groups), {group1, group3})
self.assertSetEqual(set(user2.groups), {group1, group2, group3})
......@@ -127,7 +186,7 @@ class TestRoleViews(UffdTestCase):
role = Role(name='base', description='Base role description')
db.session.add(role)
db.session.commit()
role.groups.add(Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com'))
role.groups[Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com')] = RoleGroup()
db.session.commit()
self.assertEqual(role.name, 'base')
self.assertEqual(role.description, 'Base role description')
......
from flask import url_for
from uffd.user.models import User, Group
from uffd.role.models import Role
from uffd.role.models import Role, RoleGroup
from uffd.database import db
from uffd.ldap import ldap
......@@ -136,7 +136,8 @@ class TestRolemodViews(UffdTestCase):
def test_delete_member(self):
self.login()
role = Role(name='test', moderator_group=Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com'), groups=[Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com')])
role = Role(name='test', moderator_group=Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com'))
role.groups[Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com')] = RoleGroup()
db.session.add(role)
role.members.add(User.query.get('uid=testadmin,ou=users,dc=example,dc=com'))
db.session.commit()
......@@ -158,7 +159,8 @@ class TestRolemodViews(UffdTestCase):
def test_delete_member_nomember(self):
self.login()
role = Role(name='test', moderator_group=Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com'), groups=[Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com')])
role = Role(name='test', moderator_group=Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com'))
role.groups[Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com')] = RoleGroup()
db.session.add(role)
db.session.commit()
user = User.query.get('uid=testadmin,ou=users,dc=example,dc=com')
......
......@@ -18,6 +18,8 @@ from ldapalchemy.dbutils import DBRelationship
from uffd.database import db
from uffd.user.models import User
User.mfa_enabled = property(lambda user: bool(user.mfa_totp_methods or user.mfa_webauthn_methods))
class MFAType(enum.Enum):
RECOVERY_CODE = 0
TOTP = 1
......
......@@ -11,7 +11,7 @@
<div class="col-12 mb-3">
<h2 class="text-center">Two-Factor Authentication</h2>
</div>
{% if webauthn_methods %}
{% if request.user_pre_mfa.mfa_webauthn_methods %}
<noscript>
<div class="form-group col-12">
<div id="webauthn-nojs" class="alert alert-warning" role="alert">Enable javascript for authentication with U2F/FIDO2 devices</div>
......@@ -42,7 +42,7 @@
</div>
</form>
{% if webauthn_supported and webauthn_methods %}
{% if webauthn_supported and request.user_pre_mfa.mfa_webauthn_methods %}
<script src="{{ url_for('static', filename="cbor.js") }}"></script>
<script>
function begin_webauthn() {
......
......@@ -8,9 +8,9 @@ mfa_enabled: The user has setup at least one two-factor method. Two-factor authe
#}
{% set mfa_enabled = totp_methods or webauthn_methods %}
{% set mfa_init = not recovery_methods and not mfa_enabled %}
{% set mfa_setup = recovery_methods and not mfa_enabled %}
{% set mfa_enabled = request.user.mfa_enabled %}
{% set mfa_init = not request.user.mfa_recovery_codes and not mfa_enabled %}
{% set mfa_setup = request.user.mfa_recovery_codes and not mfa_enabled %}
{% block body %}
<p>Two-factor authentication is currently <strong>{{ 'enabled' if mfa_enabled else 'disabled' }}</strong>.
......@@ -57,9 +57,9 @@ You need to setup at least one authentication method to enable two-factor authen
{% endif %}
</form>
{% if recovery_methods %}
<p>{{ recovery_methods|length }} recovery codes remain</p>
{% elif not recovery_methods and mfa_enabled %}
{% if request.user.mfa_recovery_codes %}
<p>{{ request.user.mfa_recovery_codes|length }} recovery codes remain</p>
{% elif not request.user.mfa_recovery_codes and mfa_enabled %}
<p><strong>You have no remaining recovery codes.</strong></p>
{% endif %}
</div>
......@@ -93,14 +93,14 @@ You need to setup at least one authentication method to enable two-factor authen
</tr>
</thead>
<tbody>
{% for method in totp_methods %}
{% for method in request.user.mfa_totp_methods %}
<tr>
<td>{{ method.name }}</td>
<td>{{ method.created.strftime('%b %d, %Y') }}</td>
<td><a class="btn btn-sm btn-danger float-right" href="{{ url_for('mfa.delete_totp', id=method.id) }}">Delete</a></td>
</tr>
{% endfor %}
{% if not totp_methods %}
{% if not request.user.mfa_totp_methods %}
<tr class="table-secondary">
<td colspan=3 class="text-center">No authenticator apps registered yet</td>
</tr>
......@@ -148,14 +148,14 @@ You need to setup at least one authentication method to enable two-factor authen
</tr>
</thead>
<tbody>
{% for method in webauthn_methods %}
{% for method in request.user.mfa_webauthn_methods %}
<tr>
<td>{{ method.name }}</td>
<td>{{ method.created.strftime('%b %d, %Y') }}</td>
<td><a class="btn btn-sm btn-danger float-right" href="{{ url_for('mfa.delete_webauthn', id=method.id) }}">Delete</a></td>
</tr>
{% endfor %}
{% if not webauthn_methods %}
{% if not request.user.mfa_webauthn_methods %}
<tr class="table-secondary">
<td colspan=3 class="text-center">No U2F/FIDO2 devices registered yet</td>
</tr>
......
......@@ -4,6 +4,7 @@ import urllib.parse
from flask import Blueprint, render_template, session, request, redirect, url_for, flash, current_app, abort
from uffd.database import db
from uffd.ldap import ldap
from uffd.mfa.models import MFAMethod, TOTPMethod, WebauthnMethod, RecoveryCodeMethod
from uffd.session.views import login_required, login_required_pre_mfa, set_request_user
from uffd.user.models import User
......@@ -17,10 +18,7 @@ mfa_ratelimit = Ratelimit('mfa', 1*60, 3)
@bp.route('/', methods=['GET'])
@login_required()
def setup():
recovery_methods = RecoveryCodeMethod.query.filter_by(dn=request.user.dn).all()
totp_methods = TOTPMethod.query.filter_by(dn=request.user.dn).all()
webauthn_methods = WebauthnMethod.query.filter_by(dn=request.user.dn).all()
return render_template('mfa/setup.html', totp_methods=totp_methods, webauthn_methods=webauthn_methods, recovery_methods=recovery_methods)
return render_template('mfa/setup.html')
@bp.route('/setup/disable', methods=['GET'])
@login_required()
......@@ -33,6 +31,8 @@ def disable():
def disable_confirm():
MFAMethod.query.filter_by(dn=request.user.dn).delete()
db.session.commit()
request.user.update_groups()
ldap.session.commit()
return redirect(url_for('mfa.setup'))
@bp.route('/admin/<int:uid>/disable')
......@@ -47,6 +47,8 @@ def admin_disable(uid):
user = User.query.filter_by(uid=uid).one()
MFAMethod.query.filter_by(dn=user.dn).delete()
db.session.commit()
user.update_groups()
ldap.session.commit()
flash('Two-factor authentication was reset')
return redirect(url_for('user.show', uid=uid))
......@@ -82,6 +84,8 @@ def setup_totp_finish():
if method.verify(request.form['code']):
db.session.add(method)
db.session.commit()
request.user.update_groups()
ldap.session.commit()
return redirect(url_for('mfa.setup'))
flash('Code is invalid')
return redirect(url_for('mfa.setup_totp', name=request.values['name']))
......@@ -93,6 +97,8 @@ def delete_totp(id): #pylint: disable=redefined-builtin
method = TOTPMethod.query.filter_by(dn=request.user.dn, id=id).first_or_404()
db.session.delete(method)
db.session.commit()
request.user.update_groups()
ldap.session.commit()
return redirect(url_for('mfa.setup'))
# WebAuthn support is optional because fido2 has a pretty unstable
......@@ -147,14 +153,15 @@ if WEBAUTHN_SUPPORTED:
method = WebauthnMethod(request.user, auth_data.credential_data, name=data['name'])
db.session.add(method)
db.session.commit()
request.user.update_groups()
ldap.session.commit()
return cbor.dumps({"status": "OK"})
@bp.route("/auth/webauthn/begin", methods=["POST"])
@login_required_pre_mfa(no_redirect=True)
def auth_webauthn_begin():
server = get_webauthn_server()
methods = WebauthnMethod.query.filter_by(dn=request.user_pre_mfa.dn).all()
creds = [method.cred for method in methods]
creds = [method.cred for method in request.user_pre_mfa.mfa_webauthn_methods]
if not creds:
abort(404)
auth_data, state = server.authenticate_begin(creds, user_verification='discouraged')
......@@ -165,8 +172,7 @@ if WEBAUTHN_SUPPORTED:
@login_required_pre_mfa(no_redirect=True)
def auth_webauthn_complete():
server = get_webauthn_server()
methods = WebauthnMethod.query.filter_by(dn=request.user_pre_mfa.dn).all()
creds = [method.cred for method in methods]
creds = [method.cred for method in request.user_pre_mfa.mfa_webauthn_methods]
if not creds:
abort(404)
data = cbor.loads(request.get_data())[0]
......@@ -195,22 +201,19 @@ def delete_webauthn(id): #pylint: disable=redefined-builtin
method = WebauthnMethod.query.filter_by(dn=request.user.dn, id=id).first_or_404()
db.session.delete(method)
db.session.commit()
request.user.update_groups()
ldap.session.commit()
return redirect(url_for('mfa.setup'))
@bp.route('/auth', methods=['GET'])
@login_required_pre_mfa()
def auth():
recovery_methods = RecoveryCodeMethod.query.filter_by(dn=request.user_pre_mfa.dn).all()
totp_methods = TOTPMethod.query.filter_by(dn=request.user_pre_mfa.dn).all()
webauthn_methods = WebauthnMethod.query.filter_by(dn=request.user_pre_mfa.dn).all()
if not totp_methods and not webauthn_methods:
if not request.user_pre_mfa.mfa_enabled:
session['user_mfa'] = True
set_request_user()
if session.get('user_mfa'):
return redirect(request.values.get('ref', url_for('index')))
return render_template('mfa/auth.html', ref=request.values.get('ref'), totp_methods=totp_methods,
webauthn_methods=webauthn_methods, recovery_methods=recovery_methods)
return render_template('mfa/auth.html', ref=request.values.get('ref'))
@bp.route('/auth', methods=['POST'])
@login_required_pre_mfa()
......@@ -219,23 +222,21 @@ def auth_finish():
if delay:
flash('We received too many invalid attempts! Please wait at least %s.'%format_delay(delay))
return redirect(url_for('mfa.auth', ref=request.values.get('ref')))
recovery_methods = RecoveryCodeMethod.query.filter_by(dn=request.user_pre_mfa.dn).all()
totp_methods = TOTPMethod.query.filter_by(dn=request.user_pre_mfa.dn).all()
for method in totp_methods:
for method in request.user_pre_mfa.mfa_totp_methods:
if method.verify(request.form['code']):
session['user_mfa'] = True
set_request_user()
return redirect(request.values.get('ref', url_for('index')))
for method in recovery_methods:
for method in request.user_pre_mfa.mfa_recovery_codes:
if method.verify(request.form['code']):
db.session.delete(method)
db.session.commit()
session['user_mfa'] = True
set_request_user()
if len(recovery_methods) <= 1:
if len(request.user_pre_mfa.mfa_recovery_codes) <= 1:
flash('You have exhausted your recovery codes. Please generate new ones now!')
return redirect(url_for('mfa.setup'))
if len(recovery_methods) <= 5:
if len(request.user_pre_mfa.mfa_recovery_codes) <= 5:
flash('You only have a few recovery codes remaining. Make sure to generate new ones before they run out.')
return redirect(url_for('mfa.setup'))
return redirect(request.values.get('ref', url_for('index')))
......
from sqlalchemy import Column, String, Integer, Text, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from sqlalchemy.orm.collections import MappedCollection, collection
from sqlalchemy.ext.declarative import declared_attr
from ldapalchemy.dbutils import DBRelationship
......@@ -9,16 +10,12 @@ from uffd.user.models import User, Group
class RoleGroup(db.Model):
__tablename__ = 'role-group'
__table_args__ = (
db.UniqueConstraint('dn', 'role_id'),
)
role_id = Column(Integer(), ForeignKey('role.id'), primary_key=True)
group_dn = Column(String(128), primary_key=True)
requires_mfa = Column(Boolean(), default=False, nullable=False)
id = Column(Integer(), primary_key=True, autoincrement=True)
dn = Column(String(128))
@declared_attr
def role_id(self):
return Column(ForeignKey('role.id'))