diff --git a/tests/models/test_mfa.py b/tests/models/test_mfa.py index 885aba0c35ecd6bfc195d62180deffe5ba477ddd..738a71fc462629e454cecedab0e96f01d830e0e9 100644 --- a/tests/models/test_mfa.py +++ b/tests/models/test_mfa.py @@ -40,10 +40,10 @@ class TestMfaMethodModels(UffdTestCase): db.session.add(method) db.session.commit() method_id = method.id - method_code = method.code + method_code = method.code_value db.session.expunge(method) method = RecoveryCodeMethod.query.get(method_id) - self.assertFalse(hasattr(method, 'code')) + self.assertFalse(hasattr(method, 'code_value')) self.assertFalse(method.verify('')) self.assertFalse(method.verify('A'*8)) self.assertTrue(method.verify(method_code)) diff --git a/tests/views/test_session.py b/tests/views/test_session.py index 3debfb23c02ac2ab389060982807cadc43b8b684..3549863b3e2a11a58c794786e41ec5f17507161a 100644 --- a/tests/views/test_session.py +++ b/tests/views/test_session.py @@ -244,7 +244,7 @@ class TestMfaViews(UffdTestCase): dump('mfa_auth_recovery_code', r) self.assertEqual(r.status_code, 200) self.assertIsNone(request.user) - r = self.client.post(path=url_for('session.mfa_auth_finish', ref='/redirecttarget'), data={'code': method.code}) + r = self.client.post(path=url_for('session.mfa_auth_finish', ref='/redirecttarget'), data={'code': method.code_value}) self.assertEqual(r.status_code, 302) self.assertTrue(r.location.endswith('/redirecttarget')) self.assertIsNotNone(request.user) diff --git a/uffd/migrations/versions/4bd316207e59_unified_password_hashing_for_recovery_codes.py b/uffd/migrations/versions/4bd316207e59_unified_password_hashing_for_recovery_codes.py new file mode 100644 index 0000000000000000000000000000000000000000..27e1d12122fd2b00a66f41ff291b1310548126e9 --- /dev/null +++ b/uffd/migrations/versions/4bd316207e59_unified_password_hashing_for_recovery_codes.py @@ -0,0 +1,67 @@ +"""Unified password hashing for recovery codes + +Revision ID: 4bd316207e59 +Revises: e71e29cc605a +Create Date: 2024-05-22 03:13:55.917641 + +""" +from alembic import op +import sqlalchemy as sa + +revision = '4bd316207e59' +down_revision = 'e71e29cc605a' +branch_labels = None +depends_on = None + +def upgrade(): + meta = sa.MetaData(bind=op.get_bind()) + mfa_method = sa.Table('mfa_method', meta, + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('type', sa.Enum('RECOVERY_CODE', 'TOTP', 'WEBAUTHN', name='mfatype', create_constraint=True), nullable=False), + sa.Column('created', sa.DateTime(), nullable=False), + sa.Column('name', sa.String(length=128), nullable=True), + sa.Column('user_id', sa.Integer(), nullable=False), + sa.Column('recovery_salt', sa.String(length=64), nullable=True), + sa.Column('recovery_hash', sa.String(length=256), nullable=True), + sa.Column('totp_key', sa.String(length=64), nullable=True), + sa.Column('totp_last_counter', sa.Integer(), nullable=True), + sa.Column('webauthn_cred', sa.Text(), nullable=True), + sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_mfa_method_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id', name=op.f('pk_mfa_method')) + ) + # This field was already unused before the change to unified password hashing. So this is unrelated cleanup. + with op.batch_alter_table('mfa_method', copy_from=mfa_method) as batch_op: + batch_op.drop_column('recovery_salt') + op.execute(mfa_method.update().values(recovery_hash=('{crypt}' + mfa_method.c.recovery_hash)).where(mfa_method.c.type == 'RECOVERY_CODE')) + +def downgrade(): + meta = sa.MetaData(bind=op.get_bind()) + mfa_method = sa.Table('mfa_method', meta, + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('type', sa.Enum('RECOVERY_CODE', 'TOTP', 'WEBAUTHN', name='mfatype', create_constraint=True), nullable=False), + sa.Column('created', sa.DateTime(), nullable=False), + sa.Column('name', sa.String(length=128), nullable=True), + sa.Column('user_id', sa.Integer(), nullable=False), + sa.Column('recovery_hash', sa.String(length=256), nullable=True), + sa.Column('totp_key', sa.String(length=64), nullable=True), + sa.Column('totp_last_counter', sa.Integer(), nullable=True), + sa.Column('webauthn_cred', sa.Text(), nullable=True), + sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_mfa_method_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id', name=op.f('pk_mfa_method')) + ) + with op.batch_alter_table('mfa_method', copy_from=mfa_method) as batch_op: + batch_op.add_column(sa.Column('recovery_salt', sa.VARCHAR(length=64), nullable=True)) + op.execute( + mfa_method.delete().where(sa.and_( + mfa_method.c.type == 'RECOVERY_CODE', + sa.not_(mfa_method.c.recovery_hash.ilike('{crypt}%')) + )) + ) + op.execute( + mfa_method.update().values( + recovery_hash=sa.func.substr(mfa_method.c.recovery_hash, len('{crypt}') + 1) + ).where(sa.and_( + mfa_method.c.type == 'RECOVERY_CODE', + mfa_method.c.recovery_hash.ilike('{crypt}%') + )) + ) diff --git a/uffd/models/mfa.py b/uffd/models/mfa.py index 6a646c84c91e4067c2789b43598325b4e0241523..249d8e802c28ab827acf7af320af2c95ba43fe26 100644 --- a/uffd/models/mfa.py +++ b/uffd/models/mfa.py @@ -8,13 +8,12 @@ import hmac import hashlib import base64 import urllib.parse -# imports for recovery codes -import crypt # pylint: disable=deprecated-module from flask import request, current_app from sqlalchemy import Column, Integer, Enum, String, DateTime, Text, ForeignKey from sqlalchemy.orm import relationship, backref from uffd.utils import nopad_b32decode, nopad_b32encode +from uffd.password_hash import PasswordHashAttribute, CryptPasswordHash from uffd.database import db from .user import User @@ -47,8 +46,8 @@ class MFAMethod(db.Model): self.created = datetime.datetime.utcnow() class RecoveryCodeMethod(MFAMethod): - code_salt = Column('recovery_salt', String(64)) - code_hash = Column('recovery_hash', String(256)) + _code = Column('recovery_hash', String(256)) + code = PasswordHashAttribute('_code', CryptPasswordHash) __mapper_args__ = { 'polymorphic_identity': MFAType.RECOVERY_CODE @@ -56,14 +55,11 @@ class RecoveryCodeMethod(MFAMethod): def __init__(self, user): super().__init__(user, None) - # The code attribute is only available in newly created objects as only - # it's hash is stored in the database - self.code = secrets.token_hex(8).replace(' ', '').lower() - self.code_hash = crypt.crypt(self.code) + # self.code_value is not stored and only available on freshly initiated objects + self.code = self.code_value = secrets.token_hex(8).replace(' ', '').lower() def verify(self, code): - code = code.replace(' ', '').lower() - return secrets.compare_digest(crypt.crypt(code, self.code_hash), self.code_hash) + return self.code.verify(code.replace(' ', '').lower()) def _hotp(counter, key, digits=6): '''Generates HMAC-based one-time password according to RFC4226 diff --git a/uffd/templates/selfservice/setup_mfa_recovery.html b/uffd/templates/selfservice/setup_mfa_recovery.html index 16c087b1d824104f7906e7c99244f3d416844ba4..c9f61f2c08664c6a8ccb50df6acf60d00bbc0211 100644 --- a/uffd/templates/selfservice/setup_mfa_recovery.html +++ b/uffd/templates/selfservice/setup_mfa_recovery.html @@ -12,7 +12,7 @@ <div class="text-monospace"> <ul> {% for method in methods %} - <li>{{ method.code }}</li> + <li>{{ method.code_value }}</li> {% endfor %} </ul> </div> @@ -24,7 +24,7 @@ <div class="btn-toolbar"> <a class="ml-auto mb-2 btn btn-primary d-print-none" href="{{ url_for('selfservice.setup_mfa') }}">{{_("Continue")}}</a> - <a class="ml-2 mb-2 btn btn-light d-print-none" href="{{ methods|map(attribute='code')|join('\n')|datauri }}" download="uffd-recovery-codes"> + <a class="ml-2 mb-2 btn btn-light d-print-none" href="{{ methods|map(attribute='code_value')|join('\n')|datauri }}" download="uffd-recovery-codes"> {{_("Download codes")}} </a> <button class="ml-2 mb-2 btn btn-light d-print-none" type="button" onClick="window.print()">{{_("Print codes")}}</button>