From 7a94d7de73c8aa36d0a6834cd5d38d0c7d5abc2f Mon Sep 17 00:00:00 2001
From: Julian Rother <julian@cccv.de>
Date: Tue, 7 Nov 2023 12:17:58 +0100
Subject: [PATCH] Prevent TOTP code reuse

Time-based one-time password (TOTP) codes are only valid for a short period
of time. In addition they are meant to be single-use to make them more
resistant against phishing and eavesdropping (e.g. keyloggers). Prior to this
change uffd did not keep track of used codes and thus did not prevent code
reuse.
---
 tests/models/test_mfa.py                      |  9 ++++
 tests/views/test_mfa.py                       | 23 +++++++++
 ...776953_add_mfa_method_totp_last_counter.py | 51 +++++++++++++++++++
 uffd/models/mfa.py                            | 12 +++--
 uffd/views/mfa.py                             |  1 +
 5 files changed, 92 insertions(+), 4 deletions(-)
 create mode 100644 uffd/migrations/versions/a9b449776953_add_mfa_method_totp_last_counter.py

diff --git a/tests/models/test_mfa.py b/tests/models/test_mfa.py
index f2494faf..885aba0c 100644
--- a/tests/models/test_mfa.py
+++ b/tests/models/test_mfa.py
@@ -82,6 +82,15 @@ class TestMfaMethodModels(UffdTestCase):
 		self.assertTrue(method.verify(_hotp(counter, method.raw_key)))
 		self.assertFalse(method.verify(_hotp(counter+2, method.raw_key)))
 
+	def test_totp_method_verify_reuse(self):
+		method = TOTPMethod(user=self.get_user())
+		counter = int(time.time()/30)
+		self.assertFalse(method.verify(_hotp(counter-2, method.raw_key)))
+		self.assertTrue(method.verify(_hotp(counter-1, method.raw_key)))
+		self.assertTrue(method.verify(_hotp(counter, method.raw_key)))
+		self.assertFalse(method.verify(_hotp(counter-1, method.raw_key)))
+		self.assertFalse(method.verify(_hotp(counter, method.raw_key)))
+
 	def test_webauthn_method(self):
 		data = get_fido2_test_cred(self)
 		method = WebauthnMethod(user=self.get_user(), cred=data, name='testname')
diff --git a/tests/views/test_mfa.py b/tests/views/test_mfa.py
index 589bf370..995e321d 100644
--- a/tests/views/test_mfa.py
+++ b/tests/views/test_mfa.py
@@ -296,6 +296,29 @@ class TestMfaViews(UffdTestCase):
 		self.assertEqual(r.status_code, 200)
 		self.assertIsNotNone(request.user)
 
+	def test_auth_totp_code_reuse(self):
+		self.add_recovery_codes()
+		self.add_totp()
+		method = TOTPMethod(user=self.get_user(), name='testname')
+		raw_key = method.raw_key
+		db.session.add(method)
+		db.session.commit()
+		self.login_as('user')
+		r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False)
+		self.assertEqual(r.status_code, 200)
+		self.assertIsNone(request.user)
+		code = _hotp(int(time.time()/30), raw_key)
+		r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': code}, follow_redirects=True)
+		self.assertEqual(r.status_code, 200)
+		self.assertIsNotNone(request.user)
+		self.login_as('user')
+		r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False)
+		self.assertEqual(r.status_code, 200)
+		self.assertIsNone(request.user)
+		r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': code}, follow_redirects=True)
+		self.assertEqual(r.status_code, 200)
+		self.assertIsNone(request.user)
+
 	def test_auth_empty_code(self):
 		self.add_recovery_codes()
 		self.add_totp()
diff --git a/uffd/migrations/versions/a9b449776953_add_mfa_method_totp_last_counter.py b/uffd/migrations/versions/a9b449776953_add_mfa_method_totp_last_counter.py
new file mode 100644
index 00000000..b391db8a
--- /dev/null
+++ b/uffd/migrations/versions/a9b449776953_add_mfa_method_totp_last_counter.py
@@ -0,0 +1,51 @@
+"""Add mfa_method.totp_last_counter
+
+Revision ID: a9b449776953
+Revises: 23293f32b503
+Create Date: 2023-11-07 12:09:23.843865
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+revision = 'a9b449776953'
+down_revision = '23293f32b503'
+branch_labels = None
+depends_on = None
+
+def upgrade():
+	meta = sa.MetaData(bind=op.get_bind())
+	mfa_method = sa.Table('mfa_method', meta,
+		sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
+		sa.Column('type', sa.Enum('RECOVERY_CODE', 'TOTP', 'WEBAUTHN', create_constraint=True, name='ck_mfa_method_type'), nullable=False),
+		sa.Column('created', sa.DateTime(), nullable=False),
+		sa.Column('name', sa.String(length=128), nullable=True),
+		sa.Column('user_id', sa.Integer(), nullable=False),
+		sa.Column('recovery_salt', sa.String(length=64), nullable=True),
+		sa.Column('recovery_hash', sa.String(length=256), nullable=True),
+		sa.Column('totp_key', sa.String(length=64), nullable=True),
+		sa.Column('webauthn_cred', sa.Text(), nullable=True),
+		sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_mfa_method_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'),
+		sa.PrimaryKeyConstraint('id', name=op.f('pk_mfa_method'))
+	)
+	with op.batch_alter_table('mfa_method', copy_from=mfa_method) as batch_op:
+		batch_op.add_column(sa.Column('totp_last_counter', sa.Integer(), nullable=True))
+
+def downgrade():
+	meta = sa.MetaData(bind=op.get_bind())
+	mfa_method = sa.Table('mfa_method', meta,
+		sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
+		sa.Column('type', sa.Enum('RECOVERY_CODE', 'TOTP', 'WEBAUTHN', create_constraint=True, name='ck_mfa_method_type'), nullable=False),
+		sa.Column('created', sa.DateTime(), nullable=False),
+		sa.Column('name', sa.String(length=128), nullable=True),
+		sa.Column('user_id', sa.Integer(), nullable=False),
+		sa.Column('recovery_salt', sa.String(length=64), nullable=True),
+		sa.Column('recovery_hash', sa.String(length=256), nullable=True),
+		sa.Column('totp_key', sa.String(length=64), nullable=True),
+		sa.Column('totp_last_counter', sa.Integer(), nullable=True),
+		sa.Column('webauthn_cred', sa.Text(), nullable=True),
+		sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_mfa_method_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'),
+		sa.PrimaryKeyConstraint('id', name=op.f('pk_mfa_method'))
+	)
+	with op.batch_alter_table('mfa_method', copy_from=mfa_method) as batch_op:
+		batch_op.drop_column('totp_last_counter')
diff --git a/uffd/models/mfa.py b/uffd/models/mfa.py
index 0951fb5e..16bd83bd 100644
--- a/uffd/models/mfa.py
+++ b/uffd/models/mfa.py
@@ -79,6 +79,7 @@ def _hotp(counter, key, digits=6):
 
 class TOTPMethod(MFAMethod):
 	key = Column('totp_key', String(64))
+	last_counter = Column('totp_last_counter', Integer())
 	user = relationship('User', backref='mfa_totp_methods')
 
 	__mapper_args__ = {
@@ -120,10 +121,13 @@ class TOTPMethod(MFAMethod):
 		:param code: String of digits (as entered by the user)
 
 		:returns: True if code is valid, False otherwise'''
-		counter = int(time.time()/30)
-		for valid_code in [_hotp(counter-1, self.raw_key), _hotp(counter, self.raw_key)]:
-			if secrets.compare_digest(code, valid_code):
-				return True
+		current_counter = int(time.time()/30)
+		for counter in (current_counter - 1, current_counter):
+			if counter > (self.last_counter or 0):
+				valid_code = _hotp(counter, self.raw_key)
+				if secrets.compare_digest(code, valid_code):
+					self.last_counter = counter
+					return True
 		return False
 
 class WebauthnMethod(MFAMethod):
diff --git a/uffd/views/mfa.py b/uffd/views/mfa.py
index 7471bef2..3aab4374 100644
--- a/uffd/views/mfa.py
+++ b/uffd/views/mfa.py
@@ -215,6 +215,7 @@ def auth_finish():
 		return redirect(url_for('mfa.auth', ref=request.values.get('ref')))
 	for method in request.user_pre_mfa.mfa_totp_methods:
 		if method.verify(request.form['code']):
+			db.session.commit()
 			session['user_mfa'] = True
 			set_request_user()
 			return secure_local_redirect(request.values.get('ref', url_for('index')))
-- 
GitLab