diff --git a/tests/models/test_mfa.py b/tests/models/test_mfa.py index f2494fafc124119bd1d73b733f707036ca8144d7..885aba0c35ecd6bfc195d62180deffe5ba477ddd 100644 --- a/tests/models/test_mfa.py +++ b/tests/models/test_mfa.py @@ -82,6 +82,15 @@ class TestMfaMethodModels(UffdTestCase): self.assertTrue(method.verify(_hotp(counter, method.raw_key))) self.assertFalse(method.verify(_hotp(counter+2, method.raw_key))) + def test_totp_method_verify_reuse(self): + method = TOTPMethod(user=self.get_user()) + counter = int(time.time()/30) + self.assertFalse(method.verify(_hotp(counter-2, method.raw_key))) + self.assertTrue(method.verify(_hotp(counter-1, method.raw_key))) + self.assertTrue(method.verify(_hotp(counter, method.raw_key))) + self.assertFalse(method.verify(_hotp(counter-1, method.raw_key))) + self.assertFalse(method.verify(_hotp(counter, method.raw_key))) + def test_webauthn_method(self): data = get_fido2_test_cred(self) method = WebauthnMethod(user=self.get_user(), cred=data, name='testname') diff --git a/tests/views/test_mfa.py b/tests/views/test_mfa.py index 589bf3707bc782505bdb324822c7e504b1da7d4f..995e321dfae336e8c7d2ae697d242abc0a4b45f1 100644 --- a/tests/views/test_mfa.py +++ b/tests/views/test_mfa.py @@ -296,6 +296,29 @@ class TestMfaViews(UffdTestCase): self.assertEqual(r.status_code, 200) self.assertIsNotNone(request.user) + def test_auth_totp_code_reuse(self): + self.add_recovery_codes() + self.add_totp() + method = TOTPMethod(user=self.get_user(), name='testname') + raw_key = method.raw_key + db.session.add(method) + db.session.commit() + self.login_as('user') + r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False) + self.assertEqual(r.status_code, 200) + self.assertIsNone(request.user) + code = _hotp(int(time.time()/30), raw_key) + r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': code}, follow_redirects=True) + self.assertEqual(r.status_code, 200) + self.assertIsNotNone(request.user) + self.login_as('user') + r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False) + self.assertEqual(r.status_code, 200) + self.assertIsNone(request.user) + r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': code}, follow_redirects=True) + self.assertEqual(r.status_code, 200) + self.assertIsNone(request.user) + def test_auth_empty_code(self): self.add_recovery_codes() self.add_totp() diff --git a/uffd/migrations/versions/a9b449776953_add_mfa_method_totp_last_counter.py b/uffd/migrations/versions/a9b449776953_add_mfa_method_totp_last_counter.py new file mode 100644 index 0000000000000000000000000000000000000000..b391db8ab82f33075574e01a78b0f450dcf61703 --- /dev/null +++ b/uffd/migrations/versions/a9b449776953_add_mfa_method_totp_last_counter.py @@ -0,0 +1,51 @@ +"""Add mfa_method.totp_last_counter + +Revision ID: a9b449776953 +Revises: 23293f32b503 +Create Date: 2023-11-07 12:09:23.843865 + +""" +from alembic import op +import sqlalchemy as sa + +revision = 'a9b449776953' +down_revision = '23293f32b503' +branch_labels = None +depends_on = None + +def upgrade(): + meta = sa.MetaData(bind=op.get_bind()) + mfa_method = sa.Table('mfa_method', meta, + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('type', sa.Enum('RECOVERY_CODE', 'TOTP', 'WEBAUTHN', create_constraint=True, name='ck_mfa_method_type'), nullable=False), + sa.Column('created', sa.DateTime(), nullable=False), + sa.Column('name', sa.String(length=128), nullable=True), + sa.Column('user_id', sa.Integer(), nullable=False), + sa.Column('recovery_salt', sa.String(length=64), nullable=True), + sa.Column('recovery_hash', sa.String(length=256), nullable=True), + sa.Column('totp_key', sa.String(length=64), nullable=True), + sa.Column('webauthn_cred', sa.Text(), nullable=True), + sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_mfa_method_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id', name=op.f('pk_mfa_method')) + ) + with op.batch_alter_table('mfa_method', copy_from=mfa_method) as batch_op: + batch_op.add_column(sa.Column('totp_last_counter', sa.Integer(), nullable=True)) + +def downgrade(): + meta = sa.MetaData(bind=op.get_bind()) + mfa_method = sa.Table('mfa_method', meta, + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('type', sa.Enum('RECOVERY_CODE', 'TOTP', 'WEBAUTHN', create_constraint=True, name='ck_mfa_method_type'), nullable=False), + sa.Column('created', sa.DateTime(), nullable=False), + sa.Column('name', sa.String(length=128), nullable=True), + sa.Column('user_id', sa.Integer(), nullable=False), + sa.Column('recovery_salt', sa.String(length=64), nullable=True), + sa.Column('recovery_hash', sa.String(length=256), nullable=True), + sa.Column('totp_key', sa.String(length=64), nullable=True), + sa.Column('totp_last_counter', sa.Integer(), nullable=True), + sa.Column('webauthn_cred', sa.Text(), nullable=True), + sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_mfa_method_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id', name=op.f('pk_mfa_method')) + ) + with op.batch_alter_table('mfa_method', copy_from=mfa_method) as batch_op: + batch_op.drop_column('totp_last_counter') diff --git a/uffd/models/mfa.py b/uffd/models/mfa.py index 0951fb5eae6484f300d66c4b3e44d7cbd67f1bbe..16bd83bd862a788b1ac4af4ac4214a6ebc5af07e 100644 --- a/uffd/models/mfa.py +++ b/uffd/models/mfa.py @@ -79,6 +79,7 @@ def _hotp(counter, key, digits=6): class TOTPMethod(MFAMethod): key = Column('totp_key', String(64)) + last_counter = Column('totp_last_counter', Integer()) user = relationship('User', backref='mfa_totp_methods') __mapper_args__ = { @@ -120,10 +121,13 @@ class TOTPMethod(MFAMethod): :param code: String of digits (as entered by the user) :returns: True if code is valid, False otherwise''' - counter = int(time.time()/30) - for valid_code in [_hotp(counter-1, self.raw_key), _hotp(counter, self.raw_key)]: - if secrets.compare_digest(code, valid_code): - return True + current_counter = int(time.time()/30) + for counter in (current_counter - 1, current_counter): + if counter > (self.last_counter or 0): + valid_code = _hotp(counter, self.raw_key) + if secrets.compare_digest(code, valid_code): + self.last_counter = counter + return True return False class WebauthnMethod(MFAMethod): diff --git a/uffd/views/mfa.py b/uffd/views/mfa.py index 7471bef285048d31add7d0bf4bfb8a1883292af3..3aab43749ed6c8489a9c66a3f73ecc014ea7920e 100644 --- a/uffd/views/mfa.py +++ b/uffd/views/mfa.py @@ -215,6 +215,7 @@ def auth_finish(): return redirect(url_for('mfa.auth', ref=request.values.get('ref'))) for method in request.user_pre_mfa.mfa_totp_methods: if method.verify(request.form['code']): + db.session.commit() session['user_mfa'] = True set_request_user() return secure_local_redirect(request.values.get('ref', url_for('index')))