Skip to content
Snippets Groups Projects
Commit 374b4dfe authored by Julian's avatar Julian
Browse files

Verify selfservice link secrets in constant-time

This affects mail verification and password reset links. Existing links
continue to work. However support for legacy links (without id) is
deprecated and will be removed in the future.
parent b570dabf
Branches
Tags
No related merge requests found
...@@ -60,7 +60,7 @@ class TestSelfservice(UffdTestCase): ...@@ -60,7 +60,7 @@ class TestSelfservice(UffdTestCase):
token = MailToken.query.filter(MailToken.loginname == user.loginname).first() token = MailToken.query.filter(MailToken.loginname == user.loginname).first()
self.assertEqual(token.newmail, 'newemail@example.com') self.assertEqual(token.newmail, 'newemail@example.com')
self.assertIn(token.token, str(self.app.last_mail.get_content())) self.assertIn(token.token, str(self.app.last_mail.get_content()))
r = self.client.get(path=url_for('selfservice.token_mail', token=token.token), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_mail', token_id=token.id, token=token.token), follow_redirects=True)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
_user = request.user _user = request.user
self.assertEqual(_user.mail, 'newemail@example.com') self.assertEqual(_user.mail, 'newemail@example.com')
...@@ -135,7 +135,7 @@ class TestSelfservice(UffdTestCase): ...@@ -135,7 +135,7 @@ class TestSelfservice(UffdTestCase):
def test_token_mail_emptydb(self): def test_token_mail_emptydb(self):
self.login_as('user') self.login_as('user')
user = request.user user = request.user
r = self.client.get(path=url_for('selfservice.token_mail', token='A'*128), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_mail', token_id=1, token='A'*128), follow_redirects=True)
dump('token_mail_emptydb', r) dump('token_mail_emptydb', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
_user = request.user _user = request.user
...@@ -144,9 +144,10 @@ class TestSelfservice(UffdTestCase): ...@@ -144,9 +144,10 @@ class TestSelfservice(UffdTestCase):
def test_token_mail_invalid(self): def test_token_mail_invalid(self):
self.login_as('user') self.login_as('user')
user = request.user user = request.user
db.session.add(MailToken(loginname=user.loginname, newmail='newusermail@example.com')) token = MailToken(loginname=user.loginname, newmail='newusermail@example.com')
db.session.add(token)
db.session.commit() db.session.commit()
r = self.client.get(path=url_for('selfservice.token_mail', token='A'*128), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_mail', token_id=token.id, token='A'*128), follow_redirects=True)
dump('token_mail_invalid', r) dump('token_mail_invalid', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
_user = request.user _user = request.user
...@@ -161,7 +162,7 @@ class TestSelfservice(UffdTestCase): ...@@ -161,7 +162,7 @@ class TestSelfservice(UffdTestCase):
admin_token = MailToken(loginname='testadmin', newmail='newadminmail@example.com') admin_token = MailToken(loginname='testadmin', newmail='newadminmail@example.com')
db.session.add(admin_token) db.session.add(admin_token)
db.session.commit() db.session.commit()
r = self.client.get(path=url_for('selfservice.token_mail', token=admin_token.token), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_mail', token_id=admin_token.id, token=admin_token.token), follow_redirects=True)
dump('token_mail_wrong_user', r) dump('token_mail_wrong_user', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
_user = request.user _user = request.user
...@@ -176,7 +177,7 @@ class TestSelfservice(UffdTestCase): ...@@ -176,7 +177,7 @@ class TestSelfservice(UffdTestCase):
created=(datetime.datetime.now() - datetime.timedelta(days=10))) created=(datetime.datetime.now() - datetime.timedelta(days=10)))
db.session.add(token) db.session.add(token)
db.session.commit() db.session.commit()
r = self.client.get(path=url_for('selfservice.token_mail', token=token.token), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_mail', token_id=token.id, token=token.token), follow_redirects=True)
dump('token_mail_expired', r) dump('token_mail_expired', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
_user = request.user _user = request.user
...@@ -243,10 +244,10 @@ class TestSelfservice(UffdTestCase): ...@@ -243,10 +244,10 @@ class TestSelfservice(UffdTestCase):
token = PasswordToken(loginname=user.loginname) token = PasswordToken(loginname=user.loginname)
db.session.add(token) db.session.add(token)
db.session.commit() db.session.commit()
r = self.client.get(path=url_for('selfservice.token_password', token=token.token), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token=token.token), follow_redirects=True)
dump('token_password', r) dump('token_password', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('selfservice.token_password', token=token.token), r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token=token.token),
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True) data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_submit', r) dump('token_password_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -256,11 +257,11 @@ class TestSelfservice(UffdTestCase): ...@@ -256,11 +257,11 @@ class TestSelfservice(UffdTestCase):
if self.use_userconnection: if self.use_userconnection:
self.skipTest('Password Token is not possible in user mode') self.skipTest('Password Token is not possible in user mode')
user = self.get_user() user = self.get_user()
r = self.client.get(path=url_for('selfservice.token_password', token='A'*128), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_password', token_id=1, token='A'*128), follow_redirects=True)
dump('token_password_emptydb', r) dump('token_password_emptydb', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertIn(b'Token expired, please try again', r.data) self.assertIn(b'Token expired, please try again', r.data)
r = self.client.post(path=url_for('selfservice.token_password', token='A'*128), r = self.client.post(path=url_for('selfservice.token_password', token_id=1, token='A'*128),
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True) data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_emptydb_submit', r) dump('token_password_emptydb_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -274,11 +275,11 @@ class TestSelfservice(UffdTestCase): ...@@ -274,11 +275,11 @@ class TestSelfservice(UffdTestCase):
token = PasswordToken(loginname=user.loginname) token = PasswordToken(loginname=user.loginname)
db.session.add(token) db.session.add(token)
db.session.commit() db.session.commit()
r = self.client.get(path=url_for('selfservice.token_password', token='A'*128), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token='A'*128), follow_redirects=True)
dump('token_password_invalid', r) dump('token_password_invalid', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertIn(b'Token expired, please try again', r.data) self.assertIn(b'Token expired, please try again', r.data)
r = self.client.post(path=url_for('selfservice.token_password', token='A'*128), r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token='A'*128),
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True) data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_invalid_submit', r) dump('token_password_invalid_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -293,11 +294,11 @@ class TestSelfservice(UffdTestCase): ...@@ -293,11 +294,11 @@ class TestSelfservice(UffdTestCase):
created=(datetime.datetime.now() - datetime.timedelta(days=10))) created=(datetime.datetime.now() - datetime.timedelta(days=10)))
db.session.add(token) db.session.add(token)
db.session.commit() db.session.commit()
r = self.client.get(path=url_for('selfservice.token_password', token=token.token), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token=token.token), follow_redirects=True)
dump('token_password_invalid_expired', r) dump('token_password_invalid_expired', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertIn(b'Token expired, please try again', r.data) self.assertIn(b'Token expired, please try again', r.data)
r = self.client.post(path=url_for('selfservice.token_password', token=token.token), r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token=token.token),
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True) data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_invalid_expired_submit', r) dump('token_password_invalid_expired_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -311,9 +312,9 @@ class TestSelfservice(UffdTestCase): ...@@ -311,9 +312,9 @@ class TestSelfservice(UffdTestCase):
token = PasswordToken(loginname=user.loginname) token = PasswordToken(loginname=user.loginname)
db.session.add(token) db.session.add(token)
db.session.commit() db.session.commit()
r = self.client.get(path=url_for('selfservice.token_password', token=token.token), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token=token.token), follow_redirects=True)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('selfservice.token_password', token=token.token), r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token=token.token),
data={'password1': 'newpassword', 'password2': 'differentpassword'}, follow_redirects=True) data={'password1': 'newpassword', 'password2': 'differentpassword'}, follow_redirects=True)
dump('token_password_different_passwords_submit', r) dump('token_password_different_passwords_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
......
"""Add id to selfservice tokens
Revision ID: e9a67175e179
Revises: a8c6b6e91c28
Create Date: 2021-09-06 22:04:46.741233
"""
from alembic import op
import sqlalchemy as sa
revision = 'e9a67175e179'
down_revision = 'a8c6b6e91c28'
branch_labels = None
depends_on = None
def upgrade():
meta = sa.MetaData(bind=op.get_bind())
table = sa.Table('mailToken', meta,
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=True),
sa.Column('loginname', sa.String(length=32), nullable=True),
sa.Column('newmail', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_mailToken'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
batch_op.add_column(sa.Column('id', sa.Integer(), autoincrement=True, nullable=False))
batch_op.drop_constraint('pk_mailToken', 'primary')
batch_op.create_primary_key('pk_mailToken', ['id'])
table = sa.Table('passwordToken', meta,
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=True),
sa.Column('loginname', sa.String(length=32), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_passwordToken'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
batch_op.add_column(sa.Column('id', sa.Integer(), autoincrement=True, nullable=False))
batch_op.drop_constraint('pk_passwordToken', 'primary')
batch_op.create_primary_key('pk_passwordToken', ['id'])
def downgrade():
meta = sa.MetaData(bind=op.get_bind())
table = sa.Table('mailToken', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=True),
sa.Column('loginname', sa.String(length=32), nullable=True),
sa.Column('newmail', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_mailToken'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
batch_op.drop_constraint('pk_mailToken', 'primary')
batch_op.create_primary_key('pk_mailToken', ['token'])
batch_op.drop_column('id')
table = sa.Table('passwordToken', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=True),
sa.Column('loginname', sa.String(length=32), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_passwordToken'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
batch_op.add_column(sa.Column('id', sa.Integer(), autoincrement=True, nullable=False))
batch_op.drop_constraint('pk_passwordToken', 'primary')
batch_op.create_primary_key('pk_passwordToken', ['token'])
batch_op.drop_column('id')
import datetime import datetime
from sqlalchemy import Column, String, DateTime from sqlalchemy import Column, String, DateTime, Integer
from uffd.database import db from uffd.database import db
from uffd.utils import token_urlfriendly from uffd.utils import token_urlfriendly
class Token(): class PasswordToken(db.Model):
token = Column(String(128), primary_key=True, default=token_urlfriendly)
created = Column(DateTime, default=datetime.datetime.now)
class PasswordToken(Token, db.Model):
__tablename__ = 'passwordToken' __tablename__ = 'passwordToken'
id = Column(Integer(), primary_key=True, autoincrement=True)
token = Column(String(128), default=token_urlfriendly, nullable=False)
created = Column(DateTime, default=datetime.datetime.now)
loginname = Column(String(32)) loginname = Column(String(32))
class MailToken(Token, db.Model): class MailToken(db.Model):
__tablename__ = 'mailToken' __tablename__ = 'mailToken'
id = Column(Integer(), primary_key=True, autoincrement=True)
token = Column(String(128), default=token_urlfriendly, nullable=False)
created = Column(DateTime, default=datetime.datetime.now)
loginname = Column(String(32)) loginname = Column(String(32))
newmail = Column(String(255)) newmail = Column(String(255))
Hi {{ user.displayname }}, Hi {{ user.displayname }},
you have requested to change your mail address. To confirm the change, please visit the following url: you have requested to change your mail address. To confirm the change, please visit the following url:
{{ url_for('selfservice.token_mail', token=token, _external=True) }} {{ url_for('selfservice.token_mail', token_id=token.id, token=token.token, _external=True) }}
**The link is valid for 48h** **The link is valid for 48h**
...@@ -4,7 +4,7 @@ welcome to the {{ config.ORGANISATION_NAME }} infrastructure! An account was cre ...@@ -4,7 +4,7 @@ welcome to the {{ config.ORGANISATION_NAME }} infrastructure! An account was cre
Please visit the following url to set your password: Please visit the following url to set your password:
{{ url_for('selfservice.token_password', token=token, _external=True) }} {{ url_for('selfservice.token_password', token_id=token.id, token=token.token, _external=True) }}
**The link is valid for 48h** **The link is valid for 48h**
......
Hi {{ user.displayname }}, Hi {{ user.displayname }},
you have requested a password reset. To reset your password, visit the following url: you have requested a password reset. To reset your password, visit the following url:
{{ url_for('selfservice.token_password', token=token, _external=True) }} {{ url_for('selfservice.token_password', token_id=token.id, token=token.token, _external=True) }}
**The link is valid for 48h** **The link is valid for 48h**
......
{% extends 'base.html' %} {% extends 'base.html' %}
{% block body %} {% block body %}
<form action="{{ url_for("selfservice.token_password", token=token) }}" method="POST" onInput="password2.setCustomValidity(password1.value != password2.value ? 'Passwords do not match.' : '') "> <form action="{{ url_for("selfservice.token_password", token_id=token.id, token=token.token) }}" method="POST" onInput="password2.setCustomValidity(password1.value != password2.value ? 'Passwords do not match.' : '') ">
<div class="row mt-2 justify-content-center"> <div class="row mt-2 justify-content-center">
<div class="col-lg-6 col-md-10" style="background: #f7f7f7; box-shadow: 0px 2px 2px rgba(0, 0, 0, 0.3); padding: 30px;"> <div class="col-lg-6 col-md-10" style="background: #f7f7f7; box-shadow: 0px 2px 2px rgba(0, 0, 0, 0.3); padding: 30px;">
<div class="text-center"> <div class="text-center">
......
import datetime import datetime
import secrets
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session
from flask_babel import gettext as _, lazy_gettext from flask_babel import gettext as _, lazy_gettext
...@@ -83,44 +84,70 @@ def forgot_password(): ...@@ -83,44 +84,70 @@ def forgot_password():
send_passwordreset(user) send_passwordreset(user)
return redirect(url_for('session.login')) return redirect(url_for('session.login'))
@bp.route("/token/password/<token>", methods=(['POST', 'GET'])) # Deprecated
def token_password(token): @bp.route('/token/password/<token>')
dbtoken = PasswordToken.query.get(token) def token_password_legacy(token):
if not dbtoken or dbtoken.created < (datetime.datetime.now() - datetime.timedelta(days=2)): matching_token = None
filter_expr = PasswordToken.created >= (datetime.datetime.now() - datetime.timedelta(days=2))
for dbtoken in PasswordToken.query.filter(filter_expr):
if secrets.compare_digest(dbtoken.token, token):
matching_token = dbtoken
if not matching_token:
flash(_('Token expired, please try again.'))
return redirect(url_for('session.login'))
return redirect(url_for('token_password', token_id=matching_token.id, token=token))
@bp.route("/token/password/<int:token_id>/<token>", methods=(['POST', 'GET']))
def token_password(token_id, token):
dbtoken = PasswordToken.query.get(token_id)
if not dbtoken or not secrets.compare_digest(dbtoken.token, token) or \
dbtoken.created < (datetime.datetime.now() - datetime.timedelta(days=2)):
flash(_('Token expired, please try again.')) flash(_('Token expired, please try again.'))
if dbtoken: if dbtoken:
db.session.delete(dbtoken) db.session.delete(dbtoken)
db.session.commit() db.session.commit()
return redirect(url_for('session.login')) return redirect(url_for('session.login'))
if request.method == 'GET': if request.method == 'GET':
return render_template('selfservice/set_password.html', token=token) return render_template('selfservice/set_password.html', token=dbtoken)
if not request.values['password1']: if not request.values['password1']:
flash(_('You need to set a password, please try again.')) flash(_('You need to set a password, please try again.'))
return render_template('selfservice/set_password.html', token=token) return render_template('selfservice/set_password.html', token=dbtoken)
if not request.values['password1'] == request.values['password2']: if not request.values['password1'] == request.values['password2']:
flash(_('Passwords do not match, please try again.')) flash(_('Passwords do not match, please try again.'))
return render_template('selfservice/set_password.html', token=token) return render_template('selfservice/set_password.html', token=dbtoken)
user = User.query.filter_by(loginname=dbtoken.loginname).one() user = User.query.filter_by(loginname=dbtoken.loginname).one()
if not user.set_password(request.values['password1']): if not user.set_password(request.values['password1']):
flash(_('Password ist not valid, please try again.')) flash(_('Password ist not valid, please try again.'))
return render_template('selfservice/set_password.html', token=token) return render_template('selfservice/set_password.html', token=dbtoken)
db.session.delete(dbtoken) db.session.delete(dbtoken)
flash(_('New password set')) flash(_('New password set'))
ldap.session.commit() ldap.session.commit()
db.session.commit() db.session.commit()
return redirect(url_for('session.login')) return redirect(url_for('session.login'))
# Deprecated
@bp.route("/token/mail_verification/<token>") @bp.route("/token/mail_verification/<token>")
@login_required() def token_mail_legacy(token):
def token_mail(token): matching_token = None
dbtoken = MailToken.query.get(token) filter_expr = MailToken.created >= (datetime.datetime.now() - datetime.timedelta(days=2))
if not dbtoken or dbtoken.created < (datetime.datetime.now() - datetime.timedelta(days=2)): for dbtoken in MailToken.query.filter(filter_expr):
if secrets.compare_digest(dbtoken.token, token):
matching_token = dbtoken
if not matching_token:
flash(_('Token expired, please try again.'))
return redirect(url_for('session.login'))
return redirect(url_for('mail_password', token_id=matching_token.id, token=token))
@bp.route("/token/mail_verification/<int:token_id>/<token>")
def token_mail(token_id, token):
dbtoken = MailToken.query.get(token_id)
if not dbtoken or not secrets.compare_digest(dbtoken.token, token) or \
dbtoken.created < (datetime.datetime.now() - datetime.timedelta(days=2)):
flash(_('Token expired, please try again.')) flash(_('Token expired, please try again.'))
if dbtoken: if dbtoken:
db.session.delete(dbtoken) db.session.delete(dbtoken)
db.session.commit() db.session.commit()
return redirect(url_for('selfservice.index')) return redirect(url_for('selfservice.index'))
user = User.query.filter_by(loginname=dbtoken.loginname).one() user = User.query.filter_by(loginname=dbtoken.loginname).one()
user.set_mail(dbtoken.newmail) user.set_mail(dbtoken.newmail)
flash(_('New mail set')) flash(_('New mail set'))
...@@ -157,7 +184,7 @@ def send_mail_verification(loginname, newmail): ...@@ -157,7 +184,7 @@ def send_mail_verification(loginname, newmail):
user = User.query.filter_by(loginname=loginname).one() user = User.query.filter_by(loginname=loginname).one()
if not sendmail(newmail, 'Mail verification', 'selfservice/mailverification.mail.txt', user=user, token=token.token): if not sendmail(newmail, 'Mail verification', 'selfservice/mailverification.mail.txt', user=user, token=token):
flash(_('Mail to "%(mail_address)s" could not be sent!', mail_address=newmail)) flash(_('Mail to "%(mail_address)s" could not be sent!', mail_address=newmail))
def send_passwordreset(user, new=False): def send_passwordreset(user, new=False):
...@@ -177,5 +204,5 @@ def send_passwordreset(user, new=False): ...@@ -177,5 +204,5 @@ def send_passwordreset(user, new=False):
template = 'selfservice/passwordreset.mail.txt' template = 'selfservice/passwordreset.mail.txt'
subject = 'Password reset' subject = 'Password reset'
if not sendmail(user.mail, subject, template, user=user, token=token.token): if not sendmail(user.mail, subject, template, user=user, token=token):
flash(_('Mail to "%(mail_address)s" could not be sent!', mail_address=user.mail)) flash(_('Mail to "%(mail_address)s" could not be sent!', mail_address=user.mail))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment