diff --git a/tests/views/test_mfa.py b/tests/views/test_mfa.py deleted file mode 100644 index 995e321dfae336e8c7d2ae697d242abc0a4b45f1..0000000000000000000000000000000000000000 --- a/tests/views/test_mfa.py +++ /dev/null @@ -1,372 +0,0 @@ -import time - -from flask import url_for, session, request - -from uffd.database import db -from uffd.models import Role, RoleGroup, MFAMethod, RecoveryCodeMethod, TOTPMethod, WebauthnMethod -from uffd.models.mfa import _hotp - -from tests.utils import dump, UffdTestCase, db_flush - -def get_fido2_test_cred(self): - try: - from uffd.fido2_compat import AttestedCredentialData - except ImportError: - self.skipTest('fido2 could not be imported') - # Example public key from webauthn spec 6.5.1.1 - return AttestedCredentialData(bytes.fromhex('00000000000000000000000000000000'+'0040'+'053cbcc9d37a61d3bac87cdcc77ee326256def08ab15775d3a720332e4101d14fae95aeee3bc9698781812e143c0597dc6e180595683d501891e9dd030454c0a'+'A501020326200121582065eda5a12577c2bae829437fe338701a10aaa375e1bb5b5de108de439c08551d2258201e52ed75701163f7f9e40ddf9f341b3dc9ba860af7e0ca7ca7e9eecd0084d19c')) - -class TestMfaViews(UffdTestCase): - def setUp(self): - super().setUp() - db.session.add(RecoveryCodeMethod(user=self.get_admin())) - db.session.add(TOTPMethod(user=self.get_admin(), name='Admin Phone')) - # We don't want to skip all tests only because fido2 is not installed! - #db.session.add(WebauthnMethod(user=get_testadmin(), cred=get_fido2_test_cred(self), name='Admin FIDO2 dongle')) - db.session.commit() - - def add_recovery_codes(self, count=10): - user = self.get_user() - for _ in range(count): - db.session.add(RecoveryCodeMethod(user=user)) - db.session.commit() - - def add_totp(self): - db.session.add(TOTPMethod(user=self.get_user(), name='My phone')) - db.session.commit() - - def add_webauthn(self): - db.session.add(WebauthnMethod(user=self.get_user(), cred=get_fido2_test_cred(self), name='My FIDO2 dongle')) - db.session.commit() - - def test_setup_disabled(self): - self.login_as('user') - r = self.client.get(path=url_for('mfa.setup'), follow_redirects=True) - dump('mfa_setup_disabled', r) - self.assertEqual(r.status_code, 200) - - def test_setup_recovery_codes(self): - self.login_as('user') - self.add_recovery_codes() - r = self.client.get(path=url_for('mfa.setup'), follow_redirects=True) - dump('mfa_setup_only_recovery_codes', r) - self.assertEqual(r.status_code, 200) - - def test_setup_enabled(self): - self.login_as('user') - self.add_recovery_codes() - self.add_totp() - self.add_webauthn() - r = self.client.get(path=url_for('mfa.setup'), follow_redirects=True) - dump('mfa_setup_enabled', r) - self.assertEqual(r.status_code, 200) - - def test_setup_few_recovery_codes(self): - self.login_as('user') - self.add_totp() - self.add_recovery_codes(1) - r = self.client.get(path=url_for('mfa.setup'), follow_redirects=True) - dump('mfa_setup_few_recovery_codes', r) - self.assertEqual(r.status_code, 200) - - def test_setup_no_recovery_codes(self): - self.login_as('user') - self.add_totp() - r = self.client.get(path=url_for('mfa.setup'), follow_redirects=True) - dump('mfa_setup_no_recovery_codes', r) - self.assertEqual(r.status_code, 200) - - def test_disable(self): - baserole = Role(name='baserole', is_default=True) - db.session.add(baserole) - baserole.groups[self.get_access_group()] = RoleGroup() - db.session.commit() - self.login_as('user') - self.add_recovery_codes() - self.add_totp() - admin_methods = len(MFAMethod.query.filter_by(user=self.get_admin()).all()) - r = self.client.get(path=url_for('mfa.disable'), follow_redirects=True) - dump('mfa_disable', r) - self.assertEqual(r.status_code, 200) - r = self.client.post(path=url_for('mfa.disable_confirm'), follow_redirects=True) - dump('mfa_disable_submit', r) - self.assertEqual(r.status_code, 200) - self.assertEqual(len(MFAMethod.query.filter_by(user=request.user).all()), 0) - self.assertEqual(len(MFAMethod.query.filter_by(user=self.get_admin()).all()), admin_methods) - - def test_disable_recovery_only(self): - baserole = Role(name='baserole', is_default=True) - db.session.add(baserole) - baserole.groups[self.get_access_group()] = RoleGroup() - db.session.commit() - self.login_as('user') - self.add_recovery_codes() - admin_methods = len(MFAMethod.query.filter_by(user=self.get_admin()).all()) - self.assertNotEqual(len(MFAMethod.query.filter_by(user=request.user).all()), 0) - r = self.client.get(path=url_for('mfa.disable'), follow_redirects=True) - dump('mfa_disable_recovery_only', r) - self.assertEqual(r.status_code, 200) - r = self.client.post(path=url_for('mfa.disable_confirm'), follow_redirects=True) - dump('mfa_disable_recovery_only_submit', r) - self.assertEqual(r.status_code, 200) - self.assertEqual(len(MFAMethod.query.filter_by(user=request.user).all()), 0) - self.assertEqual(len(MFAMethod.query.filter_by(user=self.get_admin()).all()), admin_methods) - - def test_admin_disable(self): - for method in MFAMethod.query.filter_by(user=self.get_admin()).all(): - if not isinstance(method, RecoveryCodeMethod): - db.session.delete(method) - db.session.commit() - self.add_recovery_codes() - self.add_totp() - self.login_as('admin') - self.assertIsNotNone(request.user) - admin_methods = len(MFAMethod.query.filter_by(user=self.get_admin()).all()) - r = self.client.get(path=url_for('mfa.admin_disable', id=self.get_user().id), follow_redirects=True) - dump('mfa_admin_disable', r) - self.assertEqual(r.status_code, 200) - self.assertEqual(len(MFAMethod.query.filter_by(user=self.get_user()).all()), 0) - self.assertEqual(len(MFAMethod.query.filter_by(user=self.get_admin()).all()), admin_methods) - - def test_setup_recovery(self): - self.login_as('user') - self.assertEqual(len(RecoveryCodeMethod.query.filter_by(user=request.user).all()), 0) - r = self.client.post(path=url_for('mfa.setup_recovery'), follow_redirects=True) - dump('mfa_setup_recovery', r) - self.assertEqual(r.status_code, 200) - methods = RecoveryCodeMethod.query.filter_by(user=request.user).all() - self.assertNotEqual(len(methods), 0) - r = self.client.post(path=url_for('mfa.setup_recovery'), follow_redirects=True) - dump('mfa_setup_recovery_reset', r) - self.assertEqual(r.status_code, 200) - self.assertEqual(len(RecoveryCodeMethod.query.filter_by(id=methods[0].id).all()), 0) - self.assertNotEqual(len(methods), 0) - - def test_setup_totp(self): - self.login_as('user') - self.add_recovery_codes() - r = self.client.get(path=url_for('mfa.setup_totp', name='My TOTP Authenticator'), follow_redirects=True) - dump('mfa_setup_totp', r) - self.assertEqual(r.status_code, 200) - self.assertNotEqual(len(session.get('mfa_totp_key', '')), 0) - - def test_setup_totp_without_recovery(self): - self.login_as('user') - r = self.client.get(path=url_for('mfa.setup_totp', name='My TOTP Authenticator'), follow_redirects=True) - dump('mfa_setup_totp_without_recovery', r) - self.assertEqual(r.status_code, 200) - - def test_setup_totp_finish(self): - baserole = Role(name='baserole', is_default=True) - db.session.add(baserole) - baserole.groups[self.get_access_group()] = RoleGroup() - db.session.commit() - self.login_as('user') - self.add_recovery_codes() - self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0) - r = self.client.get(path=url_for('mfa.setup_totp', name='My TOTP Authenticator'), follow_redirects=True) - method = TOTPMethod(request.user, key=session.get('mfa_totp_key', '')) - code = _hotp(int(time.time()/30), method.raw_key) - r = self.client.post(path=url_for('mfa.setup_totp_finish', name='My TOTP Authenticator'), data={'code': code}, follow_redirects=True) - dump('mfa_setup_totp_finish', r) - self.assertEqual(r.status_code, 200) - self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 1) - - def test_setup_totp_finish_without_recovery(self): - self.login_as('user') - self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0) - r = self.client.get(path=url_for('mfa.setup_totp', name='My TOTP Authenticator'), follow_redirects=True) - method = TOTPMethod(request.user, key=session.get('mfa_totp_key', '')) - code = _hotp(int(time.time()/30), method.raw_key) - r = self.client.post(path=url_for('mfa.setup_totp_finish', name='My TOTP Authenticator'), data={'code': code}, follow_redirects=True) - dump('mfa_setup_totp_finish_without_recovery', r) - self.assertEqual(r.status_code, 200) - self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0) - - def test_setup_totp_finish_wrong_code(self): - self.login_as('user') - self.add_recovery_codes() - self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0) - r = self.client.get(path=url_for('mfa.setup_totp', name='My TOTP Authenticator'), follow_redirects=True) - method = TOTPMethod(request.user, key=session.get('mfa_totp_key', '')) - code = _hotp(int(time.time()/30), method.raw_key) - code = str(int(code[0])+1)[-1] + code[1:] - r = self.client.post(path=url_for('mfa.setup_totp_finish', name='My TOTP Authenticator'), data={'code': code}, follow_redirects=True) - dump('mfa_setup_totp_finish_wrong_code', r) - self.assertEqual(r.status_code, 200) - db_flush() - self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0) - - def test_setup_totp_finish_empty_code(self): - self.login_as('user') - self.add_recovery_codes() - self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0) - r = self.client.get(path=url_for('mfa.setup_totp', name='My TOTP Authenticator'), follow_redirects=True) - r = self.client.post(path=url_for('mfa.setup_totp_finish', name='My TOTP Authenticator'), data={'code': ''}, follow_redirects=True) - dump('mfa_setup_totp_finish_empty_code', r) - self.assertEqual(r.status_code, 200) - db_flush() - self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0) - - def test_delete_totp(self): - baserole = Role(name='baserole', is_default=True) - db.session.add(baserole) - baserole.groups[self.get_access_group()] = RoleGroup() - db.session.commit() - self.login_as('user') - self.add_recovery_codes() - self.add_totp() - method = TOTPMethod(request.user, name='test') - db.session.add(method) - db.session.commit() - self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 2) - r = self.client.get(path=url_for('mfa.delete_totp', id=method.id), follow_redirects=True) - dump('mfa_delete_totp', r) - self.assertEqual(r.status_code, 200) - self.assertEqual(len(TOTPMethod.query.filter_by(id=method.id).all()), 0) - self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 1) - - # TODO: webauthn setup tests - - def test_auth_integration(self): - self.add_recovery_codes() - self.add_totp() - db.session.commit() - self.assertIsNone(request.user) - r = self.login_as('user') - dump('mfa_auth_redirected', r) - self.assertEqual(r.status_code, 200) - self.assertIn(b'/mfa/auth', r.data) - self.assertIsNone(request.user) - r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False) - dump('mfa_auth', r) - self.assertEqual(r.status_code, 200) - self.assertIsNone(request.user) - - def test_auth_disabled(self): - self.assertIsNone(request.user) - self.login_as('user') - r = self.client.get(path=url_for('mfa.auth', ref='/redirecttarget'), follow_redirects=False) - self.assertEqual(r.status_code, 302) - self.assertTrue(r.location.endswith('/redirecttarget')) - self.assertIsNotNone(request.user) - - def test_auth_recovery_only(self): - self.add_recovery_codes() - self.assertIsNone(request.user) - self.login_as('user') - r = self.client.get(path=url_for('mfa.auth', ref='/redirecttarget'), follow_redirects=False) - self.assertEqual(r.status_code, 302) - self.assertTrue(r.location.endswith('/redirecttarget')) - self.assertIsNotNone(request.user) - - def test_auth_recovery_code(self): - self.add_recovery_codes() - self.add_totp() - method = RecoveryCodeMethod(user=self.get_user()) - db.session.add(method) - db.session.commit() - method_id = method.id - self.login_as('user') - r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False) - dump('mfa_auth_recovery_code', r) - self.assertEqual(r.status_code, 200) - self.assertIsNone(request.user) - r = self.client.post(path=url_for('mfa.auth_finish', ref='/redirecttarget'), data={'code': method.code}) - self.assertEqual(r.status_code, 302) - self.assertTrue(r.location.endswith('/redirecttarget')) - self.assertIsNotNone(request.user) - self.assertEqual(len(RecoveryCodeMethod.query.filter_by(id=method_id).all()), 0) - - def test_auth_totp_code(self): - self.add_recovery_codes() - self.add_totp() - method = TOTPMethod(user=self.get_user(), name='testname') - raw_key = method.raw_key - db.session.add(method) - db.session.commit() - self.login_as('user') - r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False) - dump('mfa_auth_totp_code', r) - self.assertEqual(r.status_code, 200) - self.assertIsNone(request.user) - code = _hotp(int(time.time()/30), raw_key) - r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': code}, follow_redirects=True) - dump('mfa_auth_totp_code_submit', r) - self.assertEqual(r.status_code, 200) - self.assertIsNotNone(request.user) - - def test_auth_totp_code_reuse(self): - self.add_recovery_codes() - self.add_totp() - method = TOTPMethod(user=self.get_user(), name='testname') - raw_key = method.raw_key - db.session.add(method) - db.session.commit() - self.login_as('user') - r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False) - self.assertEqual(r.status_code, 200) - self.assertIsNone(request.user) - code = _hotp(int(time.time()/30), raw_key) - r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': code}, follow_redirects=True) - self.assertEqual(r.status_code, 200) - self.assertIsNotNone(request.user) - self.login_as('user') - r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False) - self.assertEqual(r.status_code, 200) - self.assertIsNone(request.user) - r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': code}, follow_redirects=True) - self.assertEqual(r.status_code, 200) - self.assertIsNone(request.user) - - def test_auth_empty_code(self): - self.add_recovery_codes() - self.add_totp() - self.login_as('user') - r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False) - self.assertEqual(r.status_code, 200) - self.assertIsNone(request.user) - r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': ''}, follow_redirects=True) - dump('mfa_auth_empty_code', r) - self.assertEqual(r.status_code, 200) - self.assertIsNone(request.user) - - def test_auth_invalid_code(self): - self.add_recovery_codes() - self.add_totp() - method = TOTPMethod(user=self.get_user(), name='testname') - raw_key = method.raw_key - db.session.add(method) - db.session.commit() - self.login_as('user') - r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False) - self.assertEqual(r.status_code, 200) - self.assertIsNone(request.user) - code = _hotp(int(time.time()/30), raw_key) - code = str(int(code[0])+1)[-1] + code[1:] - r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': code}, follow_redirects=True) - dump('mfa_auth_invalid_code', r) - self.assertEqual(r.status_code, 200) - self.assertIsNone(request.user) - - def test_auth_ratelimit(self): - self.add_recovery_codes() - self.add_totp() - method = TOTPMethod(user=self.get_user(), name='testname') - raw_key = method.raw_key - db.session.add(method) - db.session.commit() - self.login_as('user') - self.assertIsNone(request.user) - code = _hotp(int(time.time()/30), raw_key) - inv_code = str(int(code[0])+1)[-1] + code[1:] - for i in range(20): - r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': inv_code}, follow_redirects=True) - self.assertEqual(r.status_code, 200) - self.assertIsNone(request.user) - r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': code}, follow_redirects=True) - dump('mfa_auth_ratelimit', r) - self.assertEqual(r.status_code, 200) - self.assertIsNone(request.user) - - # TODO: webauthn auth tests diff --git a/tests/views/test_selfservice.py b/tests/views/test_selfservice.py index 2396b77f868fc3d7d7a4c49b0ad90e4a100a045d..eb7430d225fb5a204c07f5c3ba3093f4c7c92fa6 100644 --- a/tests/views/test_selfservice.py +++ b/tests/views/test_selfservice.py @@ -1,12 +1,14 @@ import datetime import re +import time -from flask import url_for, request +from flask import url_for, request, session from uffd.database import db -from uffd.models import PasswordToken, UserEmail, Role, RoleGroup, Service, ServiceUser, FeatureFlag +from uffd.models import PasswordToken, UserEmail, Role, RoleGroup, Service, ServiceUser, FeatureFlag, MFAMethod, RecoveryCodeMethod, TOTPMethod, WebauthnMethod +from uffd.models.mfa import _hotp -from tests.utils import dump, UffdTestCase +from tests.utils import dump, UffdTestCase, db_flush class TestSelfservice(UffdTestCase): def test_index(self): @@ -478,3 +480,207 @@ class TestSelfservice(UffdTestCase): dump('token_password_different_passwords_submit', r) self.assertEqual(r.status_code, 200) self.assertTrue(self.get_user().password.verify('userpassword')) + +def get_fido2_test_cred(self): + try: + from uffd.fido2_compat import AttestedCredentialData + except ImportError: + self.skipTest('fido2 could not be imported') + # Example public key from webauthn spec 6.5.1.1 + return AttestedCredentialData(bytes.fromhex('00000000000000000000000000000000'+'0040'+'053cbcc9d37a61d3bac87cdcc77ee326256def08ab15775d3a720332e4101d14fae95aeee3bc9698781812e143c0597dc6e180595683d501891e9dd030454c0a'+'A501020326200121582065eda5a12577c2bae829437fe338701a10aaa375e1bb5b5de108de439c08551d2258201e52ed75701163f7f9e40ddf9f341b3dc9ba860af7e0ca7ca7e9eecd0084d19c')) + +class TestMfaViews(UffdTestCase): + def setUp(self): + super().setUp() + db.session.add(RecoveryCodeMethod(user=self.get_admin())) + db.session.add(TOTPMethod(user=self.get_admin(), name='Admin Phone')) + # We don't want to skip all tests only because fido2 is not installed! + #db.session.add(WebauthnMethod(user=get_testadmin(), cred=get_fido2_test_cred(self), name='Admin FIDO2 dongle')) + db.session.commit() + + def add_recovery_codes(self, count=10): + user = self.get_user() + for _ in range(count): + db.session.add(RecoveryCodeMethod(user=user)) + db.session.commit() + + def add_totp(self): + db.session.add(TOTPMethod(user=self.get_user(), name='My phone')) + db.session.commit() + + def add_webauthn(self): + db.session.add(WebauthnMethod(user=self.get_user(), cred=get_fido2_test_cred(self), name='My FIDO2 dongle')) + db.session.commit() + + def test_setup_disabled(self): + self.login_as('user') + r = self.client.get(path=url_for('selfservice.setup_mfa'), follow_redirects=True) + dump('mfa_setup_disabled', r) + self.assertEqual(r.status_code, 200) + + def test_setup_recovery_codes(self): + self.login_as('user') + self.add_recovery_codes() + r = self.client.get(path=url_for('selfservice.setup_mfa'), follow_redirects=True) + dump('mfa_setup_only_recovery_codes', r) + self.assertEqual(r.status_code, 200) + + def test_setup_enabled(self): + self.login_as('user') + self.add_recovery_codes() + self.add_totp() + self.add_webauthn() + r = self.client.get(path=url_for('selfservice.setup_mfa'), follow_redirects=True) + dump('mfa_setup_enabled', r) + self.assertEqual(r.status_code, 200) + + def test_setup_few_recovery_codes(self): + self.login_as('user') + self.add_totp() + self.add_recovery_codes(1) + r = self.client.get(path=url_for('selfservice.setup_mfa'), follow_redirects=True) + dump('mfa_setup_few_recovery_codes', r) + self.assertEqual(r.status_code, 200) + + def test_setup_no_recovery_codes(self): + self.login_as('user') + self.add_totp() + r = self.client.get(path=url_for('selfservice.setup_mfa'), follow_redirects=True) + dump('mfa_setup_no_recovery_codes', r) + self.assertEqual(r.status_code, 200) + + def test_disable(self): + baserole = Role(name='baserole', is_default=True) + db.session.add(baserole) + baserole.groups[self.get_access_group()] = RoleGroup() + db.session.commit() + self.login_as('user') + self.add_recovery_codes() + self.add_totp() + admin_methods = len(MFAMethod.query.filter_by(user=self.get_admin()).all()) + r = self.client.get(path=url_for('selfservice.disable_mfa'), follow_redirects=True) + dump('mfa_disable', r) + self.assertEqual(r.status_code, 200) + r = self.client.post(path=url_for('selfservice.disable_mfa_confirm'), follow_redirects=True) + dump('mfa_disable_submit', r) + self.assertEqual(r.status_code, 200) + self.assertEqual(len(MFAMethod.query.filter_by(user=request.user).all()), 0) + self.assertEqual(len(MFAMethod.query.filter_by(user=self.get_admin()).all()), admin_methods) + + def test_disable_recovery_only(self): + baserole = Role(name='baserole', is_default=True) + db.session.add(baserole) + baserole.groups[self.get_access_group()] = RoleGroup() + db.session.commit() + self.login_as('user') + self.add_recovery_codes() + admin_methods = len(MFAMethod.query.filter_by(user=self.get_admin()).all()) + self.assertNotEqual(len(MFAMethod.query.filter_by(user=request.user).all()), 0) + r = self.client.get(path=url_for('selfservice.disable_mfa'), follow_redirects=True) + dump('mfa_disable_recovery_only', r) + self.assertEqual(r.status_code, 200) + r = self.client.post(path=url_for('selfservice.disable_mfa_confirm'), follow_redirects=True) + dump('mfa_disable_recovery_only_submit', r) + self.assertEqual(r.status_code, 200) + self.assertEqual(len(MFAMethod.query.filter_by(user=request.user).all()), 0) + self.assertEqual(len(MFAMethod.query.filter_by(user=self.get_admin()).all()), admin_methods) + + def test_setup_recovery(self): + self.login_as('user') + self.assertEqual(len(RecoveryCodeMethod.query.filter_by(user=request.user).all()), 0) + r = self.client.post(path=url_for('selfservice.setup_mfa_recovery'), follow_redirects=True) + dump('mfa_setup_recovery', r) + self.assertEqual(r.status_code, 200) + methods = RecoveryCodeMethod.query.filter_by(user=request.user).all() + self.assertNotEqual(len(methods), 0) + r = self.client.post(path=url_for('selfservice.setup_mfa_recovery'), follow_redirects=True) + dump('mfa_setup_recovery_reset', r) + self.assertEqual(r.status_code, 200) + self.assertEqual(len(RecoveryCodeMethod.query.filter_by(id=methods[0].id).all()), 0) + self.assertNotEqual(len(methods), 0) + + def test_setup_totp(self): + self.login_as('user') + self.add_recovery_codes() + r = self.client.get(path=url_for('selfservice.setup_mfa_totp', name='My TOTP Authenticator'), follow_redirects=True) + dump('mfa_setup_totp', r) + self.assertEqual(r.status_code, 200) + self.assertNotEqual(len(session.get('mfa_totp_key', '')), 0) + + def test_setup_totp_without_recovery(self): + self.login_as('user') + r = self.client.get(path=url_for('selfservice.setup_mfa_totp', name='My TOTP Authenticator'), follow_redirects=True) + dump('mfa_setup_totp_without_recovery', r) + self.assertEqual(r.status_code, 200) + + def test_setup_totp_finish(self): + baserole = Role(name='baserole', is_default=True) + db.session.add(baserole) + baserole.groups[self.get_access_group()] = RoleGroup() + db.session.commit() + self.login_as('user') + self.add_recovery_codes() + self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0) + r = self.client.get(path=url_for('selfservice.setup_mfa_totp', name='My TOTP Authenticator'), follow_redirects=True) + method = TOTPMethod(request.user, key=session.get('mfa_totp_key', '')) + code = _hotp(int(time.time()/30), method.raw_key) + r = self.client.post(path=url_for('selfservice.setup_mfa_totp_finish', name='My TOTP Authenticator'), data={'code': code}, follow_redirects=True) + dump('mfa_setup_totp_finish', r) + self.assertEqual(r.status_code, 200) + self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 1) + + def test_setup_totp_finish_without_recovery(self): + self.login_as('user') + self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0) + r = self.client.get(path=url_for('selfservice.setup_mfa_totp', name='My TOTP Authenticator'), follow_redirects=True) + method = TOTPMethod(request.user, key=session.get('mfa_totp_key', '')) + code = _hotp(int(time.time()/30), method.raw_key) + r = self.client.post(path=url_for('selfservice.setup_mfa_totp_finish', name='My TOTP Authenticator'), data={'code': code}, follow_redirects=True) + dump('mfa_setup_totp_finish_without_recovery', r) + self.assertEqual(r.status_code, 200) + self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0) + + def test_setup_totp_finish_wrong_code(self): + self.login_as('user') + self.add_recovery_codes() + self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0) + r = self.client.get(path=url_for('selfservice.setup_mfa_totp', name='My TOTP Authenticator'), follow_redirects=True) + method = TOTPMethod(request.user, key=session.get('mfa_totp_key', '')) + code = _hotp(int(time.time()/30), method.raw_key) + code = str(int(code[0])+1)[-1] + code[1:] + r = self.client.post(path=url_for('selfservice.setup_mfa_totp_finish', name='My TOTP Authenticator'), data={'code': code}, follow_redirects=True) + dump('mfa_setup_totp_finish_wrong_code', r) + self.assertEqual(r.status_code, 200) + db_flush() + self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0) + + def test_setup_totp_finish_empty_code(self): + self.login_as('user') + self.add_recovery_codes() + self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0) + r = self.client.get(path=url_for('selfservice.setup_mfa_totp', name='My TOTP Authenticator'), follow_redirects=True) + r = self.client.post(path=url_for('selfservice.setup_mfa_totp_finish', name='My TOTP Authenticator'), data={'code': ''}, follow_redirects=True) + dump('mfa_setup_totp_finish_empty_code', r) + self.assertEqual(r.status_code, 200) + db_flush() + self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0) + + def test_delete_totp(self): + baserole = Role(name='baserole', is_default=True) + db.session.add(baserole) + baserole.groups[self.get_access_group()] = RoleGroup() + db.session.commit() + self.login_as('user') + self.add_recovery_codes() + self.add_totp() + method = TOTPMethod(request.user, name='test') + db.session.add(method) + db.session.commit() + self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 2) + r = self.client.get(path=url_for('selfservice.delete_mfa_totp', id=method.id), follow_redirects=True) + dump('mfa_delete_totp', r) + self.assertEqual(r.status_code, 200) + self.assertEqual(len(TOTPMethod.query.filter_by(id=method.id).all()), 0) + self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 1) + + # TODO: webauthn setup tests diff --git a/tests/views/test_session.py b/tests/views/test_session.py index 1780ac87fa4ed8e49f61e5ae597d25492968d948..d2bf2b7c2d69823ce91f9384e8028495a399d059 100644 --- a/tests/views/test_session.py +++ b/tests/views/test_session.py @@ -5,7 +5,8 @@ from flask import url_for, request from uffd.database import db from uffd.password_hash import PlaintextPasswordHash -from uffd.models import DeviceLoginConfirmation, Service, OAuth2Client, OAuth2DeviceLoginInitiation, User +from uffd.models import DeviceLoginConfirmation, Service, OAuth2Client, OAuth2DeviceLoginInitiation, User, RecoveryCodeMethod, TOTPMethod +from uffd.models.mfa import _hotp from uffd.views.session import login_required from tests.utils import dump, UffdTestCase, db_flush @@ -148,7 +149,6 @@ class TestSession(UffdTestCase): self.assertEqual(r.status_code, 200) self.assertLoggedOut() - @unittest.skip('See #29') def test_timeout(self): self.login() time.sleep(3) @@ -188,3 +188,157 @@ class TestSession(UffdTestCase): r = self.client.get(path=url_for('session.deviceauth_finish'), follow_redirects=True) self.assertEqual(r.status_code, 200) self.assertEqual(DeviceLoginConfirmation.query.all(), []) + +class TestMfaViews(UffdTestCase): + def add_recovery_codes(self, count=10): + user = self.get_user() + for _ in range(count): + db.session.add(RecoveryCodeMethod(user=user)) + db.session.commit() + + def add_totp(self): + db.session.add(TOTPMethod(user=self.get_user(), name='My phone')) + db.session.commit() + + def test_auth_integration(self): + self.add_recovery_codes() + self.add_totp() + db.session.commit() + self.assertIsNone(request.user) + r = self.login_as('user') + dump('mfa_auth_redirected', r) + self.assertEqual(r.status_code, 200) + self.assertIn(b'/mfa/auth', r.data) + self.assertIsNone(request.user) + r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False) + dump('mfa_auth', r) + self.assertEqual(r.status_code, 200) + self.assertIsNone(request.user) + + def test_auth_disabled(self): + self.assertIsNone(request.user) + self.login_as('user') + r = self.client.get(path=url_for('session.mfa_auth', ref='/redirecttarget'), follow_redirects=False) + self.assertEqual(r.status_code, 302) + self.assertTrue(r.location.endswith('/redirecttarget')) + self.assertIsNotNone(request.user) + + def test_auth_recovery_only(self): + self.add_recovery_codes() + self.assertIsNone(request.user) + self.login_as('user') + r = self.client.get(path=url_for('session.mfa_auth', ref='/redirecttarget'), follow_redirects=False) + self.assertEqual(r.status_code, 302) + self.assertTrue(r.location.endswith('/redirecttarget')) + self.assertIsNotNone(request.user) + + def test_auth_recovery_code(self): + self.add_recovery_codes() + self.add_totp() + method = RecoveryCodeMethod(user=self.get_user()) + db.session.add(method) + db.session.commit() + method_id = method.id + self.login_as('user') + r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False) + dump('mfa_auth_recovery_code', r) + self.assertEqual(r.status_code, 200) + self.assertIsNone(request.user) + r = self.client.post(path=url_for('session.mfa_auth_finish', ref='/redirecttarget'), data={'code': method.code}) + self.assertEqual(r.status_code, 302) + self.assertTrue(r.location.endswith('/redirecttarget')) + self.assertIsNotNone(request.user) + self.assertEqual(len(RecoveryCodeMethod.query.filter_by(id=method_id).all()), 0) + + def test_auth_totp_code(self): + self.add_recovery_codes() + self.add_totp() + method = TOTPMethod(user=self.get_user(), name='testname') + raw_key = method.raw_key + db.session.add(method) + db.session.commit() + self.login_as('user') + r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False) + dump('mfa_auth_totp_code', r) + self.assertEqual(r.status_code, 200) + self.assertIsNone(request.user) + code = _hotp(int(time.time()/30), raw_key) + r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': code}, follow_redirects=True) + dump('mfa_auth_totp_code_submit', r) + self.assertEqual(r.status_code, 200) + self.assertIsNotNone(request.user) + + def test_auth_totp_code_reuse(self): + self.add_recovery_codes() + self.add_totp() + method = TOTPMethod(user=self.get_user(), name='testname') + raw_key = method.raw_key + db.session.add(method) + db.session.commit() + self.login_as('user') + r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False) + self.assertEqual(r.status_code, 200) + self.assertIsNone(request.user) + code = _hotp(int(time.time()/30), raw_key) + r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': code}, follow_redirects=True) + self.assertEqual(r.status_code, 200) + self.assertIsNotNone(request.user) + self.login_as('user') + r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False) + self.assertEqual(r.status_code, 200) + self.assertIsNone(request.user) + r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': code}, follow_redirects=True) + self.assertEqual(r.status_code, 200) + self.assertIsNone(request.user) + + def test_auth_empty_code(self): + self.add_recovery_codes() + self.add_totp() + self.login_as('user') + r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False) + self.assertEqual(r.status_code, 200) + self.assertIsNone(request.user) + r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': ''}, follow_redirects=True) + dump('mfa_auth_empty_code', r) + self.assertEqual(r.status_code, 200) + self.assertIsNone(request.user) + + def test_auth_invalid_code(self): + self.add_recovery_codes() + self.add_totp() + method = TOTPMethod(user=self.get_user(), name='testname') + raw_key = method.raw_key + db.session.add(method) + db.session.commit() + self.login_as('user') + r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False) + self.assertEqual(r.status_code, 200) + self.assertIsNone(request.user) + code = _hotp(int(time.time()/30), raw_key) + code = str(int(code[0])+1)[-1] + code[1:] + r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': code}, follow_redirects=True) + dump('mfa_auth_invalid_code', r) + self.assertEqual(r.status_code, 200) + self.assertIsNone(request.user) + + def test_auth_ratelimit(self): + self.add_recovery_codes() + self.add_totp() + method = TOTPMethod(user=self.get_user(), name='testname') + raw_key = method.raw_key + db.session.add(method) + db.session.commit() + self.login_as('user') + self.assertIsNone(request.user) + code = _hotp(int(time.time()/30), raw_key) + inv_code = str(int(code[0])+1)[-1] + code[1:] + for i in range(20): + r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': inv_code}, follow_redirects=True) + self.assertEqual(r.status_code, 200) + self.assertIsNone(request.user) + r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': code}, follow_redirects=True) + dump('mfa_auth_ratelimit', r) + self.assertEqual(r.status_code, 200) + self.assertIsNone(request.user) + + # TODO: webauthn auth tests diff --git a/tests/views/test_user.py b/tests/views/test_user.py index 6bffa84d0dbae6b62799d492546fc0f4367bc167..2987428c23e803798f871a833f63ca5fb00c52d0 100644 --- a/tests/views/test_user.py +++ b/tests/views/test_user.py @@ -1,7 +1,7 @@ -from flask import url_for +from flask import url_for, request from uffd.database import db -from uffd.models import User, UserEmail, Group, Role, Service, ServiceUser, FeatureFlag +from uffd.models import User, UserEmail, Group, Role, Service, ServiceUser, FeatureFlag, MFAMethod, RecoveryCodeMethod, TOTPMethod from tests.utils import dump, UffdTestCase @@ -378,6 +378,21 @@ class TestUserViews(UffdTestCase): self.assertEqual(r.status_code, 200) self.assertFalse(self.get_user().is_deactivated) + def test_disable_mfa(self): + db.session.add(RecoveryCodeMethod(user=self.get_admin())) + user = self.get_user() + for _ in range(10): + db.session.add(RecoveryCodeMethod(user=user)) + db.session.add(TOTPMethod(user=self.get_user(), name='My phone')) + db.session.commit() + self.login_as('admin') + admin_methods = len(MFAMethod.query.filter_by(user=self.get_admin()).all()) + r = self.client.get(path=url_for('user.disable_mfa', id=self.get_user().id), follow_redirects=True) + dump('user_disable_mfa', r) + self.assertEqual(r.status_code, 200) + self.assertEqual(len(MFAMethod.query.filter_by(user=self.get_user()).all()), 0) + self.assertEqual(len(MFAMethod.query.filter_by(user=self.get_admin()).all()), admin_methods) + def test_csvimport(self): role1 = Role(name='role1') db.session.add(role1) diff --git a/uffd/fido2_compat.py b/uffd/fido2_compat.py index dd8fd9ba0f88c5f1cafa175c6fea8715c740926a..f682b78bb108d377d8001b2c0c1630a07e32a935 100644 --- a/uffd/fido2_compat.py +++ b/uffd/fido2_compat.py @@ -1,26 +1,46 @@ # pylint: skip-file -import fido2 as __fido2 +from flask_babel import gettext as _ +from warnings import warn +from flask import request, current_app +import urllib.parse -if __fido2.__version__.startswith('0.5.'): - from fido2.client import ClientData - from fido2.server import Fido2Server, RelyingParty as __PublicKeyCredentialRpEntity - from fido2.ctap2 import AttestationObject, AuthenticatorData, AttestedCredentialData - from fido2 import cbor - cbor.encode = cbor.dumps - cbor.decode = lambda arg: cbor.loads(arg)[0] - class PublicKeyCredentialRpEntity(__PublicKeyCredentialRpEntity): - def __init__(self, name, id): - super().__init__(id, name) -elif __fido2.__version__.startswith('0.9.'): - from fido2.client import ClientData - from fido2.webauthn import PublicKeyCredentialRpEntity - from fido2.server import Fido2Server - from fido2.ctap2 import AttestationObject, AuthenticatorData, AttestedCredentialData - from fido2 import cbor -elif __fido2.__version__.startswith('1.'): - from fido2.webauthn import PublicKeyCredentialRpEntity, CollectedClientData as ClientData, AttestationObject, AuthenticatorData, AttestedCredentialData - from fido2.server import Fido2Server - from fido2 import cbor -else: - raise ImportError(f'Unsupported fido2 version: {__fido2.__version__}') + +# WebAuthn support is optional because fido2 has a pretty unstable +# interface and might be difficult to install with the correct version + +try: + import fido2 as __fido2 + + if __fido2.__version__.startswith('0.5.'): + from fido2.client import ClientData + from fido2.server import Fido2Server, RelyingParty as __PublicKeyCredentialRpEntity + from fido2.ctap2 import AttestationObject, AuthenticatorData, AttestedCredentialData + from fido2 import cbor + cbor.encode = cbor.dumps + cbor.decode = lambda arg: cbor.loads(arg)[0] + class PublicKeyCredentialRpEntity(__PublicKeyCredentialRpEntity): + def __init__(self, name, id): + super().__init__(id, name) + elif __fido2.__version__.startswith('0.9.'): + from fido2.client import ClientData + from fido2.webauthn import PublicKeyCredentialRpEntity + from fido2.server import Fido2Server + from fido2.ctap2 import AttestationObject, AuthenticatorData, AttestedCredentialData + from fido2 import cbor + elif __fido2.__version__.startswith('1.'): + from fido2.webauthn import PublicKeyCredentialRpEntity, CollectedClientData as ClientData, AttestationObject, AuthenticatorData, AttestedCredentialData + from fido2.server import Fido2Server + from fido2 import cbor + else: + raise ImportError(f'Unsupported fido2 version: {__fido2.__version__}') + + def get_webauthn_server(): + hostname = urllib.parse.urlsplit(request.url).hostname + return Fido2Server(PublicKeyCredentialRpEntity(id=current_app.config.get('MFA_RP_ID', hostname), + name=current_app.config['MFA_RP_NAME'])) + + WEBAUTHN_SUPPORTED = True +except ImportError as err: + warn(_('2FA WebAuthn support disabled because import of the fido2 module failed (%s)')%err) + WEBAUTHN_SUPPORTED = False diff --git a/uffd/templates/mfa/disable.html b/uffd/templates/selfservice/disable_mfa.html similarity index 81% rename from uffd/templates/mfa/disable.html rename to uffd/templates/selfservice/disable_mfa.html index d577b73b18e94cd950f168dd61146274e0262de0..504e62873ce2affaabef0a87f13195dd8b26294c 100644 --- a/uffd/templates/mfa/disable.html +++ b/uffd/templates/selfservice/disable_mfa.html @@ -7,7 +7,7 @@ You can later generate new recovery codes and setup your applications and devices again.")}} </p> -<form class="form" action="{{ url_for('mfa.disable_confirm') }}" method="POST"> +<form class="form" action="{{ url_for('selfservice.disable_mfa_confirm') }}" method="POST"> <button type="submit" class="btn btn-danger btn-block">{{_("Disable two-factor authentication")}}</button> </form> diff --git a/uffd/templates/selfservice/self.html b/uffd/templates/selfservice/self.html index cc977f8f03c9436509728919d5a24b6203f729d4..1bc052c318a4fc1ac567a2e744b626c770e2d5e7 100644 --- a/uffd/templates/selfservice/self.html +++ b/uffd/templates/selfservice/self.html @@ -167,7 +167,7 @@ {{ _("Two-factor authentication is currently <strong>disabled</strong>.")|safe }} {% endif %} </p> - <a class="btn btn-primary btn-block" href="{{ url_for('mfa.setup') }}">{{_("Manage two-factor authentication")}}</a> + <a class="btn btn-primary btn-block" href="{{ url_for('selfservice.setup_mfa') }}">{{_("Manage two-factor authentication")}}</a> </div> </div> diff --git a/uffd/templates/mfa/setup.html b/uffd/templates/selfservice/setup_mfa.html similarity index 92% rename from uffd/templates/mfa/setup.html rename to uffd/templates/selfservice/setup_mfa.html index 134f651bfbbe75e1aab27b65be297f21e3cb6b56..baa769b3036ea5b7ee8d8eea8399ece045f39b7d 100644 --- a/uffd/templates/mfa/setup.html +++ b/uffd/templates/selfservice/setup_mfa.html @@ -28,11 +28,11 @@ mfa_enabled: The user has setup at least one two-factor method. Two-factor authe {% if mfa_setup or mfa_enabled %} <div class="clearfix"> {% if mfa_enabled %} - <form class="form float-right" action="{{ url_for('mfa.disable') }}"> + <form class="form float-right" action="{{ url_for('selfservice.disable_mfa') }}"> <button type="submit" class="btn btn-danger mb-2">{{_("Disable two-factor authentication")}}</button> </form> {% else %} - <form class="form float-right" action="{{ url_for('mfa.disable_confirm') }}" method="POST"> + <form class="form float-right" action="{{ url_for('selfservice.disable_mfa_confirm') }}" method="POST"> <button type="submit" class="btn btn-light mb-2">{{_("Reset two-factor configuration")}}</button> </form> {% endif %} @@ -56,7 +56,7 @@ mfa_enabled: The user has setup at least one two-factor method. Two-factor authe </div> <div class="col-12 col-md-7"> - <form class="form" action="{{ url_for('mfa.setup_recovery') }}" method="POST"> + <form class="form" action="{{ url_for('selfservice.setup_mfa_recovery') }}" method="POST"> {% if mfa_init %} <button type="submit" class="btn btn-primary mb-2 col"> {{_("Generate recovery codes to enable two-factor authentication")}} @@ -93,7 +93,7 @@ mfa_enabled: The user has setup at least one two-factor method. Two-factor authe </div> <div class="col-12 col-md-7"> - <form class="form mb-2" action="{{ url_for('mfa.setup_totp') }}" autocomplete="off"> + <form class="form mb-2" action="{{ url_for('selfservice.setup_mfa_totp') }}" autocomplete="off"> <div class="row m-0"> <label class="sr-only" for="totp-name">{{_("Name")}}</label> <input type="text" name="name" class="form-control mb-2 col-12 col-lg-auto mr-2" style="width: 15em;" id="totp-name" placeholder="{{_("Name")}}" required {{ 'disabled' if mfa_init }}> @@ -114,7 +114,7 @@ mfa_enabled: The user has setup at least one two-factor method. Two-factor authe <tr> <td>{{ method.name }}</td> <td>{{ method.created|dateformat }}</td> - <td><a class="btn btn-sm btn-danger float-right" href="{{ url_for('mfa.delete_totp', id=method.id) }}">{{_("Delete")}}</a></td> + <td><a class="btn btn-sm btn-danger float-right" href="{{ url_for('selfservice.delete_mfa_totp', id=method.id) }}">{{_("Delete")}}</a></td> </tr> {% endfor %} {% if not request.user.mfa_totp_methods %} @@ -176,7 +176,7 @@ mfa_enabled: The user has setup at least one two-factor method. Two-factor authe <tr> <td>{{ method.name }}</td> <td>{{ method.created|dateformat }}</td> - <td><a class="btn btn-sm btn-danger float-right" href="{{ url_for('mfa.delete_webauthn', id=method.id) }}">{{_("Delete")}}</a></td> + <td><a class="btn btn-sm btn-danger float-right" href="{{ url_for('selfservice.delete_mfa_webauthn', id=method.id) }}">{{_("Delete")}}</a></td> </tr> {% endfor %} {% if not request.user.mfa_webauthn_methods %} @@ -198,7 +198,7 @@ $('#webauthn-form').on('submit', function(e) { $('#webauthn-spinner').removeClass('d-none'); $('#webauthn-btn-text').text({{ _('Contacting server')|tojson }}); $('#webauthn-btn').prop('disabled', true); - fetch({{ url_for('mfa.setup_webauthn_begin')|tojson }}, { + fetch({{ url_for('selfservice.setup_mfa_webauthn_begin')|tojson }}, { method: 'POST', }).then(function(response) { if (response.ok) @@ -210,7 +210,7 @@ $('#webauthn-form').on('submit', function(e) { $('#webauthn-btn-text').text({{ _('Waiting for device')|tojson }}); return navigator.credentials.create(options); }).then(function(attestation) { - return fetch({{ url_for('mfa.setup_webauthn_complete')|tojson }}, { + return fetch({{ url_for('selfservice.setup_mfa_webauthn_complete')|tojson }}, { method: 'POST', headers: {'Content-Type': 'application/cbor'}, body: CBOR.encode({ @@ -223,7 +223,7 @@ $('#webauthn-form').on('submit', function(e) { if (response.ok) { $('#webauthn-spinner').addClass('d-none'); $('#webauthn-btn-text').text({{ _('Success')|tojson }}); - window.location = {{ url_for('mfa.setup')|tojson }}; + window.location = {{ url_for('selfservice.setup_mfa')|tojson }}; } else { throw new Error({{ _('Invalid response from device')|tojson }}); } diff --git a/uffd/templates/mfa/setup_recovery.html b/uffd/templates/selfservice/setup_mfa_recovery.html similarity index 94% rename from uffd/templates/mfa/setup_recovery.html rename to uffd/templates/selfservice/setup_mfa_recovery.html index 2f70cbbd4e6b4936433009e38d8d4425dd01f217..16c087b1d824104f7906e7c99244f3d416844ba4 100644 --- a/uffd/templates/mfa/setup_recovery.html +++ b/uffd/templates/selfservice/setup_mfa_recovery.html @@ -23,7 +23,7 @@ </p> <div class="btn-toolbar"> - <a class="ml-auto mb-2 btn btn-primary d-print-none" href="{{ url_for('mfa.setup') }}">{{_("Continue")}}</a> + <a class="ml-auto mb-2 btn btn-primary d-print-none" href="{{ url_for('selfservice.setup_mfa') }}">{{_("Continue")}}</a> <a class="ml-2 mb-2 btn btn-light d-print-none" href="{{ methods|map(attribute='code')|join('\n')|datauri }}" download="uffd-recovery-codes"> {{_("Download codes")}} </a> diff --git a/uffd/templates/mfa/setup_totp.html b/uffd/templates/selfservice/setup_mfa_totp.html similarity index 91% rename from uffd/templates/mfa/setup_totp.html rename to uffd/templates/selfservice/setup_mfa_totp.html index b450793dd3a8e906d751044df293f7382549b9ac..e8f7405af4e4a4fe448d761672cbbe2d3133c4eb 100644 --- a/uffd/templates/mfa/setup_totp.html +++ b/uffd/templates/selfservice/setup_mfa_totp.html @@ -32,7 +32,7 @@ </div> </div> -<form action="{{ url_for('mfa.setup_totp_finish', name=name) }}" method="POST" autocomplete="off" class="form"> +<form action="{{ url_for('selfservice.setup_mfa_totp_finish', name=name) }}" method="POST" autocomplete="off" class="form"> <div class="row m-0"> <input type="text" name="code" class="form-control mb-2 mr-2 col-auto col-md" id="code" placeholder="{{_('Code')}}" required autofocus> <button type="submit" class="btn btn-primary mb-2 col col-md-auto">{{_("Verify and complete setup")}}</button> diff --git a/uffd/templates/mfa/auth.html b/uffd/templates/session/mfa_auth.html similarity index 95% rename from uffd/templates/mfa/auth.html rename to uffd/templates/session/mfa_auth.html index 28bea1d536ae28e7b1603f5b35cadb14ff9ce928..cd9b306071f4aa5f7e6b702f3a0dabee2da64f3f 100644 --- a/uffd/templates/mfa/auth.html +++ b/uffd/templates/session/mfa_auth.html @@ -1,7 +1,7 @@ {% extends 'base_narrow.html' %} {% block body %} -<form action="{{ url_for("mfa.auth_finish", ref=ref) }}" method="POST" autocomplete="off"> +<form action="{{ url_for("session.mfa_auth_finish", ref=ref) }}" method="POST" autocomplete="off"> <div class="col-12 mb-3"> <h2 class="text-center">{{_("Two-Factor Authentication")}}</h2> </div> @@ -42,7 +42,7 @@ function begin_webauthn() { $('#webauthn-spinner').removeClass('d-none'); $('#webauthn-btn-text').text({{ _('Contacting server')|tojson }}); $('#webauthn-btn').prop('disabled', true); - fetch({{ url_for('mfa.auth_webauthn_begin')|tojson }}, { + fetch({{ url_for('session.mfa_auth_webauthn_begin')|tojson }}, { method: 'POST', }).then(function(response) { if (response.ok) { @@ -60,7 +60,7 @@ function begin_webauthn() { return navigator.credentials.get(options); }).then(function(assertion) { $('#webauthn-btn-text').text({{ _('Verifing response')|tojson }}); - return fetch({{ url_for('mfa.auth_webauthn_complete')|tojson }}, { + return fetch({{ url_for('session.mfa_auth_webauthn_complete')|tojson }}, { method: 'POST', headers: {'Content-Type': 'application/cbor'}, body: CBOR.encode({ diff --git a/uffd/templates/user/show.html b/uffd/templates/user/show.html index 04a0d6835b8d0c29aac4973df7edda81d4652d36..ca7dfb54eec74fb7b0df428cc8462f553ffcaa07 100644 --- a/uffd/templates/user/show.html +++ b/uffd/templates/user/show.html @@ -190,7 +190,7 @@ {{ _("Status:") }} {{ _("Enabled") if user.mfa_enabled else _("Disabled") }}<br> {{ user.mfa_recovery_codes|length }} {{ _("Recovery Codes") }}, {{ user.mfa_totp_methods|length }} {{ _("Authenticator Apps (TOTP)") }}, {{ user.mfa_webauthn_methods|length }} {{ _("U2F and FIDO2 Devices") }} </p> - <a href="{{ url_for("mfa.admin_disable", id=user.id) }}" onClick='return confirm({{_("Are you sure?")|tojson}});' class="btn btn-secondary">{{_("Reset 2FA")}}</a> + <a href="{{ url_for("user.disable_mfa", id=user.id) }}" onClick='return confirm({{_("Are you sure?")|tojson}});' class="btn btn-secondary">{{_("Reset 2FA")}}</a> </div> <div class="form-group col"> diff --git a/uffd/views/__init__.py b/uffd/views/__init__.py index 1ee14190f7396badd7c833e361f4481bba8e7350..b025086da6da021401f4602d144bd0d5a4046d03 100644 --- a/uffd/views/__init__.py +++ b/uffd/views/__init__.py @@ -3,7 +3,7 @@ from werkzeug.exceptions import Forbidden from uffd.secure_redirect import secure_local_redirect -from . import session, selfservice, signup, mfa, oauth2, user, group, service, role, invite, api, mail, rolemod +from . import session, selfservice, signup, oauth2, user, group, service, role, invite, api, mail, rolemod def init_app(app): @app.errorhandler(403) @@ -26,7 +26,6 @@ def init_app(app): app.register_blueprint(session.bp) app.register_blueprint(selfservice.bp) app.register_blueprint(signup.bp) - app.register_blueprint(mfa.bp) app.register_blueprint(oauth2.bp) app.register_blueprint(user.bp) app.register_blueprint(group.bp) diff --git a/uffd/views/mfa.py b/uffd/views/mfa.py deleted file mode 100644 index d8f0d99e559e4361a72cd844d88db6eda53c0876..0000000000000000000000000000000000000000 --- a/uffd/views/mfa.py +++ /dev/null @@ -1,239 +0,0 @@ -from warnings import warn -import urllib.parse - -from flask import Blueprint, render_template, session, request, redirect, url_for, flash, current_app, abort -from flask_babel import gettext as _ - -from uffd.csrf import csrf_protect -from uffd.secure_redirect import secure_local_redirect -from uffd.database import db -from uffd.models import MFAMethod, TOTPMethod, WebauthnMethod, RecoveryCodeMethod, User, Ratelimit, format_delay -from .session import login_required, login_required_pre_mfa, set_request_user - -bp = Blueprint('mfa', __name__, template_folder='templates', url_prefix='/mfa/') - -mfa_ratelimit = Ratelimit('mfa', 1*60, 3) - -@bp.route('/', methods=['GET']) -@login_required() -def setup(): - return render_template('mfa/setup.html') - -@bp.route('/setup/disable', methods=['GET']) -@login_required() -def disable(): - return render_template('mfa/disable.html') - -@bp.route('/setup/disable', methods=['POST']) -@login_required() -@csrf_protect(blueprint=bp) -def disable_confirm(): - MFAMethod.query.filter_by(user=request.user).delete() - db.session.commit() - request.user.update_groups() - db.session.commit() - return redirect(url_for('mfa.setup')) - -@bp.route('/admin/<int:id>/disable') -@login_required() -@csrf_protect(blueprint=bp) -def admin_disable(id): - # Group cannot be checked with login_required kwarg, because the config - # variable is not available when the decorator is processed - if not request.user.is_in_group(current_app.config['ACL_ADMIN_GROUP']): - abort(403) - user = User.query.get(id) - MFAMethod.query.filter_by(user=user).delete() - user.update_groups() - db.session.commit() - flash(_('Two-factor authentication was reset')) - return redirect(url_for('user.show', id=id)) - -@bp.route('/setup/recovery', methods=['POST']) -@login_required() -@csrf_protect(blueprint=bp) -def setup_recovery(): - for method in RecoveryCodeMethod.query.filter_by(user=request.user).all(): - db.session.delete(method) - methods = [] - for _ in range(10): - method = RecoveryCodeMethod(request.user) - methods.append(method) - db.session.add(method) - db.session.commit() - return render_template('mfa/setup_recovery.html', methods=methods) - -@bp.route('/setup/totp', methods=['GET']) -@login_required() -def setup_totp(): - method = TOTPMethod(request.user) - session['mfa_totp_key'] = method.key - return render_template('mfa/setup_totp.html', method=method, name=request.values['name']) - -@bp.route('/setup/totp', methods=['POST']) -@login_required() -@csrf_protect(blueprint=bp) -def setup_totp_finish(): - if not RecoveryCodeMethod.query.filter_by(user=request.user).all(): - flash(_('Generate recovery codes first!')) - return redirect(url_for('mfa.setup')) - method = TOTPMethod(request.user, name=request.values['name'], key=session.pop('mfa_totp_key')) - if method.verify(request.form['code']): - db.session.add(method) - request.user.update_groups() - db.session.commit() - return redirect(url_for('mfa.setup')) - flash(_('Code is invalid')) - return redirect(url_for('mfa.setup_totp', name=request.values['name'])) - -@bp.route('/setup/totp/<int:id>/delete') -@login_required() -@csrf_protect(blueprint=bp) -def delete_totp(id): #pylint: disable=redefined-builtin - method = TOTPMethod.query.filter_by(user=request.user, id=id).first_or_404() - db.session.delete(method) - request.user.update_groups() - db.session.commit() - return redirect(url_for('mfa.setup')) - -# WebAuthn support is optional because fido2 has a pretty unstable -# interface and might be difficult to install with the correct version -try: - from uffd.fido2_compat import * # pylint: disable=wildcard-import,unused-wildcard-import - WEBAUTHN_SUPPORTED = True -except ImportError as err: - warn(_('2FA WebAuthn support disabled because import of the fido2 module failed (%s)')%err) - WEBAUTHN_SUPPORTED = False - -bp.add_app_template_global(WEBAUTHN_SUPPORTED, name='webauthn_supported') - -if WEBAUTHN_SUPPORTED: - def get_webauthn_server(): - hostname = urllib.parse.urlsplit(request.url).hostname - return Fido2Server(PublicKeyCredentialRpEntity(id=current_app.config.get('MFA_RP_ID', hostname), - name=current_app.config['MFA_RP_NAME'])) - - @bp.route('/setup/webauthn/begin', methods=['POST']) - @login_required() - @csrf_protect(blueprint=bp) - def setup_webauthn_begin(): - if not RecoveryCodeMethod.query.filter_by(user=request.user).all(): - abort(403) - methods = WebauthnMethod.query.filter_by(user=request.user).all() - creds = [method.cred for method in methods] - server = get_webauthn_server() - registration_data, state = server.register_begin( - { - "id": str(request.user.id).encode(), - "name": request.user.loginname, - "displayName": request.user.displayname, - }, - creds, - user_verification='discouraged', - ) - session["webauthn-state"] = state - return cbor.encode(registration_data) - - @bp.route('/setup/webauthn/complete', methods=['POST']) - @login_required() - @csrf_protect(blueprint=bp) - def setup_webauthn_complete(): - server = get_webauthn_server() - data = cbor.decode(request.get_data()) - client_data = ClientData(data["clientDataJSON"]) - att_obj = AttestationObject(data["attestationObject"]) - auth_data = server.register_complete(session["webauthn-state"], client_data, att_obj) - method = WebauthnMethod(request.user, auth_data.credential_data, name=data['name']) - db.session.add(method) - request.user.update_groups() - db.session.commit() - return cbor.encode({"status": "OK"}) - - @bp.route("/auth/webauthn/begin", methods=["POST"]) - @login_required_pre_mfa(no_redirect=True) - def auth_webauthn_begin(): - server = get_webauthn_server() - creds = [method.cred for method in request.user_pre_mfa.mfa_webauthn_methods] - if not creds: - abort(404) - auth_data, state = server.authenticate_begin(creds, user_verification='discouraged') - session["webauthn-state"] = state - return cbor.encode(auth_data) - - @bp.route("/auth/webauthn/complete", methods=["POST"]) - @login_required_pre_mfa(no_redirect=True) - def auth_webauthn_complete(): - server = get_webauthn_server() - creds = [method.cred for method in request.user_pre_mfa.mfa_webauthn_methods] - if not creds: - abort(404) - data = cbor.decode(request.get_data()) - credential_id = data["credentialId"] - client_data = ClientData(data["clientDataJSON"]) - auth_data = AuthenticatorData(data["authenticatorData"]) - signature = data["signature"] - # authenticate_complete() (as of python-fido2 v0.5.0, the version in Debian Buster) - # does not check signCount, although the spec recommends it - server.authenticate_complete( - session.pop("webauthn-state"), - creds, - credential_id, - client_data, - auth_data, - signature, - ) - request.session.mfa_done = True - db.session.commit() - set_request_user() - return cbor.encode({"status": "OK"}) - -@bp.route('/setup/webauthn/<int:id>/delete') -@login_required() -@csrf_protect(blueprint=bp) -def delete_webauthn(id): #pylint: disable=redefined-builtin - method = WebauthnMethod.query.filter_by(user=request.user, id=id).first_or_404() - db.session.delete(method) - request.user.update_groups() - db.session.commit() - return redirect(url_for('mfa.setup')) - -@bp.route('/auth', methods=['GET']) -@login_required_pre_mfa() -def auth(): - if not request.user_pre_mfa.mfa_enabled: - request.session.mfa_done = True - db.session.commit() - set_request_user() - if request.session.mfa_done: - return secure_local_redirect(request.values.get('ref', url_for('index'))) - return render_template('mfa/auth.html', ref=request.values.get('ref')) - -@bp.route('/auth', methods=['POST']) -@login_required_pre_mfa() -def auth_finish(): - delay = mfa_ratelimit.get_delay(request.user_pre_mfa.id) - if delay: - flash(_('We received too many invalid attempts! Please wait at least %s.')%format_delay(delay)) - return redirect(url_for('mfa.auth', ref=request.values.get('ref'))) - for method in request.user_pre_mfa.mfa_totp_methods: - if method.verify(request.form['code']): - request.session.mfa_done = True - db.session.commit() - set_request_user() - return secure_local_redirect(request.values.get('ref', url_for('index'))) - for method in request.user_pre_mfa.mfa_recovery_codes: - if method.verify(request.form['code']): - db.session.delete(method) - request.session.mfa_done = True - db.session.commit() - set_request_user() - if len(request.user_pre_mfa.mfa_recovery_codes) <= 1: - flash(_('You have exhausted your recovery codes. Please generate new ones now!')) - return redirect(url_for('mfa.setup')) - if len(request.user_pre_mfa.mfa_recovery_codes) <= 5: - flash(_('You only have a few recovery codes remaining. Make sure to generate new ones before they run out.')) - return redirect(url_for('mfa.setup')) - return secure_local_redirect(request.values.get('ref', url_for('index'))) - mfa_ratelimit.log(request.user_pre_mfa.id) - flash(_('Two-factor authentication failed')) - return redirect(url_for('mfa.auth', ref=request.values.get('ref'))) diff --git a/uffd/views/selfservice.py b/uffd/views/selfservice.py index 3d8c821da580af2c0351d638d0a4ebad247f1df9..7342facde360d94d2f8b8015ecc0a4d5432729cb 100644 --- a/uffd/views/selfservice.py +++ b/uffd/views/selfservice.py @@ -1,6 +1,6 @@ import secrets -from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, abort +from flask import Blueprint, render_template, session, request, url_for, redirect, flash, current_app, abort from flask_babel import gettext as _, lazy_gettext from sqlalchemy.exc import IntegrityError @@ -8,7 +8,12 @@ from uffd.navbar import register_navbar from uffd.csrf import csrf_protect from uffd.sendmail import sendmail from uffd.database import db -from uffd.models import User, UserEmail, PasswordToken, Role, host_ratelimit, Ratelimit, format_delay, Session +from uffd.models import ( + User, UserEmail, PasswordToken, Role, host_ratelimit, Ratelimit, format_delay, + Session, MFAMethod, TOTPMethod, WebauthnMethod, RecoveryCodeMethod, +) +from uffd.fido2_compat import * # pylint: disable=wildcard-import,unused-wildcard-import + from .session import login_required bp = Blueprint("selfservice", __name__, template_folder='templates', url_prefix='/self/') @@ -202,8 +207,8 @@ def update_email_preferences(): @csrf_protect(blueprint=bp) @login_required(selfservice_acl_check) def revoke_session(session_id): - session = Session.query.filter_by(id=session_id, user=request.user).first_or_404() - db.session.delete(session) + _session = Session.query.filter_by(id=session_id, user=request.user).first_or_404() + db.session.delete(_session) db.session.commit() flash(_('Session revoked')) return redirect(url_for('selfservice.index')) @@ -235,3 +240,119 @@ def send_passwordreset(user, new=False): email = user.recovery_email or user.primary_email if not sendmail(email.address, subject, template, user=user, token=token): flash(_('E-Mail to "%(mail_address)s" could not be sent!', mail_address=email.address)) + +@bp.route('/mfa/', methods=['GET']) +@login_required() +def setup_mfa(): + return render_template('selfservice/setup_mfa.html') + +@bp.route('/mfa/setup/disable', methods=['GET']) +@login_required() +def disable_mfa(): + return render_template('selfservice/disable_mfa.html') + +@bp.route('/mfa/setup/disable', methods=['POST']) +@login_required() +@csrf_protect(blueprint=bp) +def disable_mfa_confirm(): + MFAMethod.query.filter_by(user=request.user).delete() + db.session.commit() + request.user.update_groups() + db.session.commit() + return redirect(url_for('selfservice.setup_mfa')) + +@bp.route('/mfa/setup/recovery', methods=['POST']) +@login_required() +@csrf_protect(blueprint=bp) +def setup_mfa_recovery(): + for method in RecoveryCodeMethod.query.filter_by(user=request.user).all(): + db.session.delete(method) + methods = [] + for _ in range(10): + method = RecoveryCodeMethod(request.user) + methods.append(method) + db.session.add(method) + db.session.commit() + return render_template('selfservice/setup_mfa_recovery.html', methods=methods) + +@bp.route('/mfa/setup/totp', methods=['GET']) +@login_required() +def setup_mfa_totp(): + method = TOTPMethod(request.user) + session['mfa_totp_key'] = method.key + return render_template('selfservice/setup_mfa_totp.html', method=method, name=request.values['name']) + +@bp.route('/mfa/setup/totp', methods=['POST']) +@login_required() +@csrf_protect(blueprint=bp) +def setup_mfa_totp_finish(): + if not RecoveryCodeMethod.query.filter_by(user=request.user).all(): + flash(_('Generate recovery codes first!')) + return redirect(url_for('selfservice.setup_mfa')) + method = TOTPMethod(request.user, name=request.values['name'], key=session.pop('mfa_totp_key')) + if method.verify(request.form['code']): + db.session.add(method) + request.user.update_groups() + db.session.commit() + return redirect(url_for('selfservice.setup_mfa')) + flash(_('Code is invalid')) + return redirect(url_for('selfservice.setup_mfa_totp', name=request.values['name'])) + +@bp.route('/mfa/setup/totp/<int:id>/delete') +@login_required() +@csrf_protect(blueprint=bp) +def delete_mfa_totp(id): #pylint: disable=redefined-builtin + method = TOTPMethod.query.filter_by(user=request.user, id=id).first_or_404() + db.session.delete(method) + request.user.update_groups() + db.session.commit() + return redirect(url_for('selfservice.setup_mfa')) + +bp.add_app_template_global(WEBAUTHN_SUPPORTED, name='webauthn_supported') + +if WEBAUTHN_SUPPORTED: + @bp.route('/mfa/setup/webauthn/begin', methods=['POST']) + @login_required() + @csrf_protect(blueprint=bp) + def setup_mfa_webauthn_begin(): + if not RecoveryCodeMethod.query.filter_by(user=request.user).all(): + abort(403) + methods = WebauthnMethod.query.filter_by(user=request.user).all() + creds = [method.cred for method in methods] + server = get_webauthn_server() + registration_data, state = server.register_begin( + { + "id": str(request.user.id).encode(), + "name": request.user.loginname, + "displayName": request.user.displayname, + }, + creds, + user_verification='discouraged', + ) + session["webauthn-state"] = state + return cbor.encode(registration_data) + + @bp.route('/mfa/setup/webauthn/complete', methods=['POST']) + @login_required() + @csrf_protect(blueprint=bp) + def setup_mfa_webauthn_complete(): + server = get_webauthn_server() + data = cbor.decode(request.get_data()) + client_data = ClientData(data["clientDataJSON"]) + att_obj = AttestationObject(data["attestationObject"]) + auth_data = server.register_complete(session["webauthn-state"], client_data, att_obj) + method = WebauthnMethod(request.user, auth_data.credential_data, name=data['name']) + db.session.add(method) + request.user.update_groups() + db.session.commit() + return cbor.encode({"status": "OK"}) + +@bp.route('/mfa/setup/webauthn/<int:id>/delete') +@login_required() +@csrf_protect(blueprint=bp) +def delete_mfa_webauthn(id): #pylint: disable=redefined-builtin + method = WebauthnMethod.query.filter_by(user=request.user, id=id).first_or_404() + db.session.delete(method) + request.user.update_groups() + db.session.commit() + return redirect(url_for('selfservice.setup_mfa')) diff --git a/uffd/views/session.py b/uffd/views/session.py index 548b80be64c69a4392578bc8513f8093cea974fa..e07808c34f28bf92756c2cb932ac7fbf1a377d85 100644 --- a/uffd/views/session.py +++ b/uffd/views/session.py @@ -9,10 +9,12 @@ from uffd.database import db from uffd.csrf import csrf_protect from uffd.secure_redirect import secure_local_redirect from uffd.models import User, DeviceLoginInitiation, DeviceLoginConfirmation, Ratelimit, host_ratelimit, format_delay, Session +from uffd.fido2_compat import * # pylint: disable=wildcard-import,unused-wildcard-import bp = Blueprint("session", __name__, template_folder='templates', url_prefix='/') login_ratelimit = Ratelimit('login', 1*60, 3) +mfa_ratelimit = Ratelimit('mfa', 1*60, 3) @bp.before_app_request def set_request_user(): @@ -71,7 +73,7 @@ def set_session(user, skip_mfa=False): def login(): # pylint: disable=too-many-return-statements if request.user_pre_mfa: - return redirect(url_for('mfa.auth', ref=request.values.get('ref', url_for('index')))) + return redirect(url_for('session.mfa_auth', ref=request.values.get('ref', url_for('index')))) if request.method == 'GET': return render_template('session/login.html', ref=request.values.get('ref')) @@ -102,7 +104,7 @@ def login(): flash(_('You do not have access to this service')) return render_template('session/login.html', ref=request.values.get('ref')) set_session(user) - return redirect(url_for('mfa.auth', ref=request.values.get('ref', url_for('index')))) + return redirect(url_for('session.mfa_auth', ref=request.values.get('ref', url_for('index')))) def login_required_pre_mfa(no_redirect=False): def wrapper(func): @@ -125,13 +127,93 @@ def login_required(permission_check=lambda: True): flash(_('You need to login first')) return redirect(url_for('session.login', ref=request.full_path)) if not request.user: - return redirect(url_for('mfa.auth', ref=request.full_path)) + return redirect(url_for('session.mfa_auth', ref=request.full_path)) if not permission_check(): abort(403) return func(*args, **kwargs) return decorator return wrapper +@bp.route('/mfa/auth', methods=['GET']) +@login_required_pre_mfa() +def mfa_auth(): + if not request.user_pre_mfa.mfa_enabled: + request.session.mfa_done = True + db.session.commit() + set_request_user() + if request.session.mfa_done: + return secure_local_redirect(request.values.get('ref', url_for('index'))) + return render_template('session/mfa_auth.html', ref=request.values.get('ref')) + +@bp.route('/mfa/auth', methods=['POST']) +@login_required_pre_mfa() +def mfa_auth_finish(): + delay = mfa_ratelimit.get_delay(request.user_pre_mfa.id) + if delay: + flash(_('We received too many invalid attempts! Please wait at least %s.')%format_delay(delay)) + return redirect(url_for('session.mfa_auth', ref=request.values.get('ref'))) + for method in request.user_pre_mfa.mfa_totp_methods: + if method.verify(request.form['code']): + request.session.mfa_done = True + db.session.commit() + set_request_user() + return secure_local_redirect(request.values.get('ref', url_for('index'))) + for method in request.user_pre_mfa.mfa_recovery_codes: + if method.verify(request.form['code']): + db.session.delete(method) + request.session.mfa_done = True + db.session.commit() + set_request_user() + if len(request.user_pre_mfa.mfa_recovery_codes) <= 1: + flash(_('You have exhausted your recovery codes. Please generate new ones now!')) + return redirect(url_for('selfservice.setup_mfa')) + if len(request.user_pre_mfa.mfa_recovery_codes) <= 5: + flash(_('You only have a few recovery codes remaining. Make sure to generate new ones before they run out.')) + return redirect(url_for('selfservice.setup_mfa')) + return secure_local_redirect(request.values.get('ref', url_for('index'))) + mfa_ratelimit.log(request.user_pre_mfa.id) + flash(_('Two-factor authentication failed')) + return redirect(url_for('session.mfa_auth', ref=request.values.get('ref'))) + +if WEBAUTHN_SUPPORTED: + @bp.route("/mfa/auth/webauthn/begin", methods=["POST"]) + @login_required_pre_mfa(no_redirect=True) + def mfa_auth_webauthn_begin(): + server = get_webauthn_server() + creds = [method.cred for method in request.user_pre_mfa.mfa_webauthn_methods] + if not creds: + abort(404) + auth_data, state = server.authenticate_begin(creds, user_verification='discouraged') + session["webauthn-state"] = state + return cbor.encode(auth_data) + + @bp.route("/mfa/auth/webauthn/complete", methods=["POST"]) + @login_required_pre_mfa(no_redirect=True) + def mfa_auth_webauthn_complete(): + server = get_webauthn_server() + creds = [method.cred for method in request.user_pre_mfa.mfa_webauthn_methods] + if not creds: + abort(404) + data = cbor.decode(request.get_data()) + credential_id = data["credentialId"] + client_data = ClientData(data["clientDataJSON"]) + auth_data = AuthenticatorData(data["authenticatorData"]) + signature = data["signature"] + # authenticate_complete() (as of python-fido2 v0.5.0, the version in Debian Buster) + # does not check signCount, although the spec recommends it + server.authenticate_complete( + session.pop("webauthn-state"), + creds, + credential_id, + client_data, + auth_data, + signature, + ) + request.session.mfa_done = True + db.session.commit() + set_request_user() + return cbor.encode({"status": "OK"}) + @bp.route("/login/device/start") def devicelogin_start(): session['devicelogin_started'] = True diff --git a/uffd/views/user.py b/uffd/views/user.py index d7fb4ef749e47a6c1aae4d3f3d28ac93d1843859..a16895e7089586ba9fc4d1c253a49a752d9eb2a0 100644 --- a/uffd/views/user.py +++ b/uffd/views/user.py @@ -9,7 +9,7 @@ from uffd.navbar import register_navbar from uffd.csrf import csrf_protect from uffd.remailer import remailer from uffd.database import db -from uffd.models import User, UserEmail, Role +from uffd.models import User, UserEmail, Role, MFAMethod from .selfservice import send_passwordreset from .session import login_required @@ -164,6 +164,16 @@ def activate(id): flash(_('User activated')) return redirect(url_for('user.show', id=user.id)) +@bp.route('/<int:id>/mfa/disable') +@csrf_protect(blueprint=bp) +def disable_mfa(id): + user = User.query.get_or_404(id) + MFAMethod.query.filter_by(user=user).delete() + user.update_groups() + db.session.commit() + flash(_('Two-factor authentication was reset')) + return redirect(url_for('user.show', id=id)) + @bp.route('/<int:id>/sessions/revoke') @csrf_protect(blueprint=bp) def revoke_sessions(id):