From c9873e4c4f253aadfec2832af13e6f4244a10c55 Mon Sep 17 00:00:00 2001
From: Julian Rother <julian@cccv.de>
Date: Mon, 6 Sep 2021 23:12:18 +0200
Subject: [PATCH] Verify selfservice link secrets in constant-time

This affects mail verification and password reset links. Existing links
continue to work. However support for legacy links (without id) is
deprecated and will be removed in the future.
---
 tests/test_selfservice.py                     | 33 +++++-----
 ...a67175e179_add_id_to_selfservice_tokens.py | 65 +++++++++++++++++++
 uffd/selfservice/models.py                    | 16 +++--
 .../selfservice/mailverification.mail.txt     |  2 +-
 .../templates/selfservice/newuser.mail.txt    |  2 +-
 .../selfservice/passwordreset.mail.txt        |  2 +-
 .../templates/selfservice/set_password.html   |  2 +-
 uffd/selfservice/views.py                     | 56 ++++++++++++----
 8 files changed, 137 insertions(+), 41 deletions(-)
 create mode 100644 uffd/migrations/versions/e9a67175e179_add_id_to_selfservice_tokens.py

diff --git a/tests/test_selfservice.py b/tests/test_selfservice.py
index bb5075b2..2d381d05 100644
--- a/tests/test_selfservice.py
+++ b/tests/test_selfservice.py
@@ -60,7 +60,7 @@ class TestSelfservice(UffdTestCase):
 		token = MailToken.query.filter(MailToken.loginname == user.loginname).first()
 		self.assertEqual(token.newmail, 'newemail@example.com')
 		self.assertIn(token.token, str(self.app.last_mail.get_content()))
-		r = self.client.get(path=url_for('selfservice.token_mail', token=token.token), follow_redirects=True)
+		r = self.client.get(path=url_for('selfservice.token_mail', token_id=token.id, token=token.token), follow_redirects=True)
 		self.assertEqual(r.status_code, 200)
 		_user = request.user
 		self.assertEqual(_user.mail, 'newemail@example.com')
@@ -151,7 +151,7 @@ class TestSelfservice(UffdTestCase):
 	def test_token_mail_emptydb(self):
 		self.login_as('user')
 		user = request.user
-		r = self.client.get(path=url_for('selfservice.token_mail', token='A'*128), follow_redirects=True)
+		r = self.client.get(path=url_for('selfservice.token_mail', token_id=1, token='A'*128), follow_redirects=True)
 		dump('token_mail_emptydb', r)
 		self.assertEqual(r.status_code, 200)
 		_user = request.user
@@ -160,9 +160,10 @@ class TestSelfservice(UffdTestCase):
 	def test_token_mail_invalid(self):
 		self.login_as('user')
 		user = request.user
-		db.session.add(MailToken(loginname=user.loginname, newmail='newusermail@example.com'))
+		token = MailToken(loginname=user.loginname, newmail='newusermail@example.com')
+		db.session.add(token)
 		db.session.commit()
-		r = self.client.get(path=url_for('selfservice.token_mail', token='A'*128), follow_redirects=True)
+		r = self.client.get(path=url_for('selfservice.token_mail', token_id=token.id, token='A'*128), follow_redirects=True)
 		dump('token_mail_invalid', r)
 		self.assertEqual(r.status_code, 200)
 		_user = request.user
@@ -176,7 +177,7 @@ class TestSelfservice(UffdTestCase):
 		admin_token = MailToken(loginname='testadmin', newmail='newadminmail@example.com')
 		db.session.add(admin_token)
 		db.session.commit()
-		r = self.client.get(path=url_for('selfservice.token_mail', token=admin_token.token), follow_redirects=True)
+		r = self.client.get(path=url_for('selfservice.token_mail', token_id=admin_token.id, token=admin_token.token), follow_redirects=True)
 		dump('token_mail_wrong_user', r)
 		self.assertEqual(r.status_code, 403)
 		_user = request.user
@@ -191,7 +192,7 @@ class TestSelfservice(UffdTestCase):
 			created=(datetime.datetime.now() - datetime.timedelta(days=10)))
 		db.session.add(token)
 		db.session.commit()
-		r = self.client.get(path=url_for('selfservice.token_mail', token=token.token), follow_redirects=True)
+		r = self.client.get(path=url_for('selfservice.token_mail', token_id=token.id, token=token.token), follow_redirects=True)
 		dump('token_mail_expired', r)
 		self.assertEqual(r.status_code, 200)
 		_user = request.user
@@ -258,10 +259,10 @@ class TestSelfservice(UffdTestCase):
 		token = PasswordToken(loginname=user.loginname)
 		db.session.add(token)
 		db.session.commit()
-		r = self.client.get(path=url_for('selfservice.token_password', token=token.token), follow_redirects=True)
+		r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token=token.token), follow_redirects=True)
 		dump('token_password', r)
 		self.assertEqual(r.status_code, 200)
-		r = self.client.post(path=url_for('selfservice.token_password', token=token.token),
+		r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token=token.token),
 			data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
 		dump('token_password_submit', r)
 		self.assertEqual(r.status_code, 200)
@@ -271,11 +272,11 @@ class TestSelfservice(UffdTestCase):
 		if self.use_userconnection:
 			self.skipTest('Password Token is not possible in user mode')
 		user = self.get_user()
-		r = self.client.get(path=url_for('selfservice.token_password', token='A'*128), follow_redirects=True)
+		r = self.client.get(path=url_for('selfservice.token_password', token_id=1, token='A'*128), follow_redirects=True)
 		dump('token_password_emptydb', r)
 		self.assertEqual(r.status_code, 200)
 		self.assertIn(b'Token expired, please try again', r.data)
-		r = self.client.post(path=url_for('selfservice.token_password', token='A'*128),
+		r = self.client.post(path=url_for('selfservice.token_password', token_id=1, token='A'*128),
 			data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
 		dump('token_password_emptydb_submit', r)
 		self.assertEqual(r.status_code, 200)
@@ -289,11 +290,11 @@ class TestSelfservice(UffdTestCase):
 		token = PasswordToken(loginname=user.loginname)
 		db.session.add(token)
 		db.session.commit()
-		r = self.client.get(path=url_for('selfservice.token_password', token='A'*128), follow_redirects=True)
+		r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token='A'*128), follow_redirects=True)
 		dump('token_password_invalid', r)
 		self.assertEqual(r.status_code, 200)
 		self.assertIn(b'Token expired, please try again', r.data)
-		r = self.client.post(path=url_for('selfservice.token_password', token='A'*128),
+		r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token='A'*128),
 			data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
 		dump('token_password_invalid_submit', r)
 		self.assertEqual(r.status_code, 200)
@@ -308,11 +309,11 @@ class TestSelfservice(UffdTestCase):
 			created=(datetime.datetime.now() - datetime.timedelta(days=10)))
 		db.session.add(token)
 		db.session.commit()
-		r = self.client.get(path=url_for('selfservice.token_password', token=token.token), follow_redirects=True)
+		r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token=token.token), follow_redirects=True)
 		dump('token_password_invalid_expired', r)
 		self.assertEqual(r.status_code, 200)
 		self.assertIn(b'Token expired, please try again', r.data)
-		r = self.client.post(path=url_for('selfservice.token_password', token=token.token),
+		r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token=token.token),
 			data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
 		dump('token_password_invalid_expired_submit', r)
 		self.assertEqual(r.status_code, 200)
@@ -326,9 +327,9 @@ class TestSelfservice(UffdTestCase):
 		token = PasswordToken(loginname=user.loginname)
 		db.session.add(token)
 		db.session.commit()
-		r = self.client.get(path=url_for('selfservice.token_password', token=token.token), follow_redirects=True)
+		r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token=token.token), follow_redirects=True)
 		self.assertEqual(r.status_code, 200)
-		r = self.client.post(path=url_for('selfservice.token_password', token=token.token),
+		r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token=token.token),
 			data={'password1': 'newpassword', 'password2': 'differentpassword'}, follow_redirects=True)
 		dump('token_password_different_passwords_submit', r)
 		self.assertEqual(r.status_code, 200)
diff --git a/uffd/migrations/versions/e9a67175e179_add_id_to_selfservice_tokens.py b/uffd/migrations/versions/e9a67175e179_add_id_to_selfservice_tokens.py
new file mode 100644
index 00000000..934df36d
--- /dev/null
+++ b/uffd/migrations/versions/e9a67175e179_add_id_to_selfservice_tokens.py
@@ -0,0 +1,65 @@
+"""Add id to selfservice tokens
+
+Revision ID: e9a67175e179
+Revises: a8c6b6e91c28
+Create Date: 2021-09-06 22:04:46.741233
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+revision = 'e9a67175e179'
+down_revision = 'a8c6b6e91c28'
+branch_labels = None
+depends_on = None
+
+def upgrade():
+	meta = sa.MetaData(bind=op.get_bind())
+	table = sa.Table('mailToken', meta,
+		sa.Column('token', sa.String(length=128), nullable=False),
+		sa.Column('created', sa.DateTime(), nullable=True),
+		sa.Column('loginname', sa.String(length=32), nullable=True),
+		sa.Column('newmail', sa.String(length=255), nullable=True),
+		sa.PrimaryKeyConstraint('token', name=op.f('pk_mailToken'))
+	)
+	with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
+		batch_op.add_column(sa.Column('id', sa.Integer(), autoincrement=True, nullable=False))
+		batch_op.drop_constraint('pk_mailToken', 'primary')
+		batch_op.create_primary_key('pk_mailToken', ['id'])
+	table = sa.Table('passwordToken', meta,
+		sa.Column('token', sa.String(length=128), nullable=False),
+		sa.Column('created', sa.DateTime(), nullable=True),
+		sa.Column('loginname', sa.String(length=32), nullable=True),
+		sa.PrimaryKeyConstraint('token', name=op.f('pk_passwordToken'))
+	)
+	with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
+		batch_op.add_column(sa.Column('id', sa.Integer(), autoincrement=True, nullable=False))
+		batch_op.drop_constraint('pk_passwordToken', 'primary')
+		batch_op.create_primary_key('pk_passwordToken', ['id'])
+
+def downgrade():
+	meta = sa.MetaData(bind=op.get_bind())
+	table = sa.Table('mailToken', meta,
+		sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
+		sa.Column('token', sa.String(length=128), nullable=False),
+		sa.Column('created', sa.DateTime(), nullable=True),
+		sa.Column('loginname', sa.String(length=32), nullable=True),
+		sa.Column('newmail', sa.String(length=255), nullable=True),
+		sa.PrimaryKeyConstraint('token', name=op.f('pk_mailToken'))
+	)
+	with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
+		batch_op.drop_constraint('pk_mailToken', 'primary')
+		batch_op.create_primary_key('pk_mailToken', ['token'])
+		batch_op.drop_column('id')
+	table = sa.Table('passwordToken', meta,
+		sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
+		sa.Column('token', sa.String(length=128), nullable=False),
+		sa.Column('created', sa.DateTime(), nullable=True),
+		sa.Column('loginname', sa.String(length=32), nullable=True),
+		sa.PrimaryKeyConstraint('token', name=op.f('pk_passwordToken'))
+	)
+	with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
+		batch_op.add_column(sa.Column('id', sa.Integer(), autoincrement=True, nullable=False))
+		batch_op.drop_constraint('pk_passwordToken', 'primary')
+		batch_op.create_primary_key('pk_passwordToken', ['token'])
+		batch_op.drop_column('id')
diff --git a/uffd/selfservice/models.py b/uffd/selfservice/models.py
index b87148cd..096c8743 100644
--- a/uffd/selfservice/models.py
+++ b/uffd/selfservice/models.py
@@ -1,19 +1,21 @@
 import datetime
 
-from sqlalchemy import Column, String, DateTime
+from sqlalchemy import Column, String, DateTime, Integer
 
 from uffd.database import db
 from uffd.utils import token_urlfriendly
 
-class Token():
-	token = Column(String(128), primary_key=True, default=token_urlfriendly)
-	created = Column(DateTime, default=datetime.datetime.now)
-
-class PasswordToken(Token, db.Model):
+class PasswordToken(db.Model):
 	__tablename__ = 'passwordToken'
+	id = Column(Integer(), primary_key=True, autoincrement=True)
+	token = Column(String(128), default=token_urlfriendly, nullable=False)
+	created = Column(DateTime, default=datetime.datetime.now)
 	loginname = Column(String(32))
 
-class MailToken(Token, db.Model):
+class MailToken(db.Model):
 	__tablename__ = 'mailToken'
+	id = Column(Integer(), primary_key=True, autoincrement=True)
+	token = Column(String(128), default=token_urlfriendly, nullable=False)
+	created = Column(DateTime, default=datetime.datetime.now)
 	loginname = Column(String(32))
 	newmail = Column(String(255))
diff --git a/uffd/selfservice/templates/selfservice/mailverification.mail.txt b/uffd/selfservice/templates/selfservice/mailverification.mail.txt
index 405b2e65..f508be90 100644
--- a/uffd/selfservice/templates/selfservice/mailverification.mail.txt
+++ b/uffd/selfservice/templates/selfservice/mailverification.mail.txt
@@ -1,6 +1,6 @@
 Hi {{ user.displayname }},
 
 you have requested to change your mail address. To confirm the change, please visit the following url:
-{{ url_for('selfservice.token_mail', token=token, _external=True) }}
+{{ url_for('selfservice.token_mail', token_id=token.id, token=token.token, _external=True) }}
 
 **The link is valid for 48h**
diff --git a/uffd/selfservice/templates/selfservice/newuser.mail.txt b/uffd/selfservice/templates/selfservice/newuser.mail.txt
index cfe43223..9c7dd4d5 100644
--- a/uffd/selfservice/templates/selfservice/newuser.mail.txt
+++ b/uffd/selfservice/templates/selfservice/newuser.mail.txt
@@ -4,7 +4,7 @@ welcome to the {{ config.ORGANISATION_NAME }} infrastructure! An account was cre
 
 Please visit the following url to set your password:
 
-{{ url_for('selfservice.token_password', token=token, _external=True) }}
+{{ url_for('selfservice.token_password', token_id=token.id, token=token.token, _external=True) }}
 
 **The link is valid for 48h**
 
diff --git a/uffd/selfservice/templates/selfservice/passwordreset.mail.txt b/uffd/selfservice/templates/selfservice/passwordreset.mail.txt
index 88ab6676..bebe29c9 100644
--- a/uffd/selfservice/templates/selfservice/passwordreset.mail.txt
+++ b/uffd/selfservice/templates/selfservice/passwordreset.mail.txt
@@ -1,7 +1,7 @@
 Hi {{ user.displayname }},
 
 you have requested a password reset. To reset your password, visit the following url:
-{{ url_for('selfservice.token_password', token=token, _external=True) }}
+{{ url_for('selfservice.token_password', token_id=token.id, token=token.token, _external=True) }}
 
 **The link is valid for 48h**
 
diff --git a/uffd/selfservice/templates/selfservice/set_password.html b/uffd/selfservice/templates/selfservice/set_password.html
index 743b7633..39e51107 100644
--- a/uffd/selfservice/templates/selfservice/set_password.html
+++ b/uffd/selfservice/templates/selfservice/set_password.html
@@ -1,7 +1,7 @@
 {% extends 'base_narrow.html' %}
 
 {% block body %}
-<form action="{{ url_for("selfservice.token_password", token=token) }}" method="POST" onInput="password2.setCustomValidity(password1.value != password2.value ? 'Passwords do not match.' : '') ">
+<form action="{{ url_for("selfservice.token_password", token_id=token.id, token=token.token) }}" method="POST" onInput="password2.setCustomValidity(password1.value != password2.value ? 'Passwords do not match.' : '') ">
 	<div class="col-12">
 		<h2 class="text-center">{{_("Reset password")}}</h2>
 	</div>
diff --git a/uffd/selfservice/views.py b/uffd/selfservice/views.py
index 66274b98..a5ac6c91 100644
--- a/uffd/selfservice/views.py
+++ b/uffd/selfservice/views.py
@@ -1,4 +1,5 @@
 import datetime
+import secrets
 
 from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session, abort
 from flask_babel import gettext as _, lazy_gettext
@@ -86,46 +87,73 @@ def forgot_password():
 		send_passwordreset(user)
 	return redirect(url_for('session.login'))
 
-@bp.route("/token/password/<token>", methods=(['POST', 'GET']))
-def token_password(token):
-	dbtoken = PasswordToken.query.get(token)
-	if not dbtoken or dbtoken.created < (datetime.datetime.now() - datetime.timedelta(days=2)):
+# Deprecated
+@bp.route('/token/password/<token>')
+def token_password_legacy(token):
+	matching_token = None
+	filter_expr = PasswordToken.created >= (datetime.datetime.now() - datetime.timedelta(days=2))
+	for dbtoken in PasswordToken.query.filter(filter_expr):
+		if secrets.compare_digest(dbtoken.token, token):
+			matching_token = dbtoken
+	if not matching_token:
+		flash(_('Token expired, please try again.'))
+		return redirect(url_for('session.login'))
+	return redirect(url_for('token_password', token_id=matching_token.id, token=token))
+
+@bp.route("/token/password/<int:token_id>/<token>", methods=(['POST', 'GET']))
+def token_password(token_id, token):
+	dbtoken = PasswordToken.query.get(token_id)
+	if not dbtoken or not secrets.compare_digest(dbtoken.token, token) or \
+			dbtoken.created < (datetime.datetime.now() - datetime.timedelta(days=2)):
 		flash(_('Token expired, please try again.'))
 		if dbtoken:
 			db.session.delete(dbtoken)
 			db.session.commit()
 		return redirect(url_for('session.login'))
 	if request.method == 'GET':
-		return render_template('selfservice/set_password.html', token=token)
+		return render_template('selfservice/set_password.html', token=dbtoken)
 	if not request.values['password1']:
 		flash(_('You need to set a password, please try again.'))
-		return render_template('selfservice/set_password.html', token=token)
+		return render_template('selfservice/set_password.html', token=dbtoken)
 	if not request.values['password1'] == request.values['password2']:
 		flash(_('Passwords do not match, please try again.'))
-		return render_template('selfservice/set_password.html', token=token)
+		return render_template('selfservice/set_password.html', token=dbtoken)
 	user = User.query.filter_by(loginname=dbtoken.loginname).one()
 	if not user.is_in_group(current_app.config['ACL_SELFSERVICE_GROUP']):
 		abort(403)
 	if not user.set_password(request.values['password1']):
 		flash(_('Password ist not valid, please try again.'))
-		return render_template('selfservice/set_password.html', token=token)
+		return render_template('selfservice/set_password.html', token=dbtoken)
 	db.session.delete(dbtoken)
 	flash(_('New password set'))
 	ldap.session.commit()
 	db.session.commit()
 	return redirect(url_for('session.login'))
 
+# Deprecated
 @bp.route("/token/mail_verification/<token>")
+def token_mail_legacy(token):
+	matching_token = None
+	filter_expr = MailToken.created >= (datetime.datetime.now() - datetime.timedelta(days=2))
+	for dbtoken in MailToken.query.filter(filter_expr):
+		if secrets.compare_digest(dbtoken.token, token):
+			matching_token = dbtoken
+	if not matching_token:
+		flash(_('Token expired, please try again.'))
+		return redirect(url_for('session.login'))
+	return redirect(url_for('mail_password', token_id=matching_token.id, token=token))
+
+@bp.route("/token/mail_verification/<int:token_id>/<token>")
 @login_required(selfservice_acl_check)
-def token_mail(token):
-	dbtoken = MailToken.query.get(token)
-	if not dbtoken or dbtoken.created < (datetime.datetime.now() - datetime.timedelta(days=2)):
+def token_mail(token_id, token):
+	dbtoken = MailToken.query.get(token_id)
+	if not dbtoken or not secrets.compare_digest(dbtoken.token, token) or \
+			dbtoken.created < (datetime.datetime.now() - datetime.timedelta(days=2)):
 		flash(_('Token expired, please try again.'))
 		if dbtoken:
 			db.session.delete(dbtoken)
 			db.session.commit()
 		return redirect(url_for('selfservice.index'))
-
 	user = User.query.filter_by(loginname=dbtoken.loginname).one()
 	if user != request.user:
 		abort(403, description=_('This link was generated for another user. Login as the correct user to continue.'))
@@ -164,7 +192,7 @@ def send_mail_verification(loginname, newmail):
 
 	user = User.query.filter_by(loginname=loginname).one()
 
-	if not sendmail(newmail, 'Mail verification', 'selfservice/mailverification.mail.txt', user=user, token=token.token):
+	if not sendmail(newmail, 'Mail verification', 'selfservice/mailverification.mail.txt', user=user, token=token):
 		flash(_('Mail to "%(mail_address)s" could not be sent!', mail_address=newmail))
 
 def send_passwordreset(user, new=False):
@@ -184,5 +212,5 @@ def send_passwordreset(user, new=False):
 		template = 'selfservice/passwordreset.mail.txt'
 		subject = 'Password reset'
 
-	if not sendmail(user.mail, subject, template, user=user, token=token.token):
+	if not sendmail(user.mail, subject, template, user=user, token=token):
 		flash(_('Mail to "%(mail_address)s" could not be sent!', mail_address=user.mail))
-- 
GitLab