Skip to content
Snippets Groups Projects
Commit 32e40c47 authored by Julian's avatar Julian
Browse files

Verify signup link secrets in constant-time

Existing links continue to work. However support for legacy links (without id)
is deprecated and will be removed in the future.
parent c9873e4c
No related branches found
No related tags found
No related merge requests found
......@@ -19,9 +19,9 @@ from utils import dump, UffdTestCase, db_flush
def refetch_signup(signup):
db.session.add(signup)
db.session.commit()
token = signup.token
id = signup.id
db_flush()
return Signup.query.get(token)
return Signup.query.get(id)
# We assume in all tests that Signup.validate and Signup.check_password do
# not alter any state
......@@ -166,18 +166,18 @@ class TestSignupModel(UffdTestCase):
self.assert_validate_valid(signup)
db.session.add(signup)
db.session.commit()
signup1_token = signup.token
signup1_id = signup.id
signup = Signup(loginname='newuser', displayname='New User', mail='test2@example.com', password='notsecret')
self.assert_validate_valid(signup)
db.session.add(signup)
db.session.commit()
signup2_token = signup.token
signup2_id = signup.id
db_flush()
signup = Signup.query.get(signup2_token)
signup = Signup.query.get(signup2_id)
self.assert_finish_success(signup, 'notsecret')
db.session.commit()
db_flush()
signup = Signup.query.get(signup1_token)
signup = Signup.query.get(signup1_id)
self.assert_finish_failure(signup, 'notsecret')
user = User.query.get('uid=newuser,{}'.format(self.app.config['LDAP_USER_SEARCH_BASE']))
self.assertEqual(user.mail, 'test2@example.com')
......@@ -333,14 +333,14 @@ class TestSignupViews(UffdTestCase):
self.assertFalse(signup.completed)
if self.use_openldap:
self.assertIsNone(login_get_user('newuser', 'notsecret'))
r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True)
r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True)
dump('test_signup_confirm', r)
self.assertEqual(r.status_code, 200)
signup = refetch_signup(signup)
self.assertFalse(signup.completed)
if self.use_openldap:
self.assertIsNone(login_get_user('newuser', 'notsecret'))
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_submit', r)
self.assertEqual(r.status_code, 200)
signup = refetch_signup(signup)
......@@ -364,9 +364,9 @@ class TestSignupViews(UffdTestCase):
self.assertFalse(signup.completed)
self.assertIsNotNone(request.user)
self.assertEqual(request.user.loginname, self.get_user().loginname)
r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True)
r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
self.assertEqual(r.status_code, 200)
signup = refetch_signup(signup)
self.assertTrue(signup.completed)
......@@ -374,10 +374,10 @@ class TestSignupViews(UffdTestCase):
self.assertEqual(request.user.loginname, 'newuser')
def test_confirm_notfound(self):
r = self.client.get(path=url_for('signup.signup_confirm', token='notasignuptoken'), follow_redirects=True)
r = self.client.get(path=url_for('signup.signup_confirm', signup_id=1, token='notasignuptoken'), follow_redirects=True)
dump('test_signup_confirm_notfound', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token='notasignuptoken'), follow_redirects=True, data={'password': 'notsecret'})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=1, token='notasignuptoken'), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_submit_notfound', r)
self.assertEqual(r.status_code, 200)
......@@ -385,10 +385,10 @@ class TestSignupViews(UffdTestCase):
signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
signup.created = datetime.datetime.now() - datetime.timedelta(hours=49)
signup = refetch_signup(signup)
r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True)
r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True)
dump('test_signup_confirm_expired', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_submit_expired', r)
self.assertEqual(r.status_code, 200)
......@@ -397,17 +397,17 @@ class TestSignupViews(UffdTestCase):
signup.user = self.get_user()
signup = refetch_signup(signup)
self.assertTrue(signup.completed)
r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True)
r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True)
dump('test_signup_confirm_completed', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_submit_completed', r)
self.assertEqual(r.status_code, 200)
def test_confirm_wrongpassword(self):
signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
signup = refetch_signup(signup)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'wrongpassword'})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'wrongpassword'})
dump('test_signup_confirm_wrongpassword', r)
self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed)
......@@ -416,7 +416,7 @@ class TestSignupViews(UffdTestCase):
# finish returns None and error message (here: because the user already exists)
signup = Signup(loginname=self.get_user().loginname, displayname='New User', mail='test@example.com', password='notsecret')
signup = refetch_signup(signup)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_error', r)
self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed)
......@@ -425,12 +425,12 @@ class TestSignupViews(UffdTestCase):
for i in range(20):
signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
signup = refetch_signup(signup)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i})
self.assertEqual(r.status_code, 200)
signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
signup = refetch_signup(signup)
self.assertFalse(signup.completed)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_hostlimit', r)
self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed)
......@@ -440,10 +440,10 @@ class TestSignupViews(UffdTestCase):
signup = refetch_signup(signup)
self.assertFalse(signup.completed)
for i in range(5):
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i})
self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_confirmlimit', r)
self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed)
......
......@@ -94,7 +94,7 @@ class InviteGrant(db.Model):
class InviteSignup(Signup):
__tablename__ = 'invite_signup'
token = Column(String(128), ForeignKey('signup.token'), primary_key=True)
id = Column(Integer(), ForeignKey('signup.id'), primary_key=True)
invite_id = Column(Integer(), ForeignKey('invite.id'), nullable=False)
invite = relationship('Invite', back_populates='signups')
......
"""Add id to signup table
Revision ID: bf71799b7b9e
Revises: e9a67175e179
Create Date: 2021-09-06 23:30:07.486102
"""
from alembic import op
import sqlalchemy as sa
revision = 'bf71799b7b9e'
down_revision = 'e9a67175e179'
branch_labels = None
depends_on = None
def upgrade():
meta = sa.MetaData(bind=op.get_bind())
signup = sa.Table('signup', meta,
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('loginname', sa.Text(), nullable=True),
sa.Column('displayname', sa.Text(), nullable=True),
sa.Column('mail', sa.Text(), nullable=True),
sa.Column('pwhash', sa.Text(), nullable=True),
sa.Column('user_dn', sa.String(length=128), nullable=True),
sa.Column('type', sa.String(length=50), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_signup'))
)
with op.batch_alter_table(signup.name, copy_from=signup, recreate='always') as batch_op:
batch_op.add_column(sa.Column('id', sa.Integer(), autoincrement=True, nullable=False))
batch_op.drop_constraint('pk_signup', 'primary')
batch_op.create_primary_key('pk_signup', ['id'])
meta = sa.MetaData(bind=op.get_bind())
invite_signup = sa.Table('invite_signup', meta,
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('invite_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['invite_id'], ['invite.id'], name=op.f('fk_invite_signup_invite_id_invite')),
sa.ForeignKeyConstraint(['token'], ['signup.token'], name=op.f('fk_invite_signup_token_signup')),
sa.PrimaryKeyConstraint('token', name=op.f('pk_invite_signup'))
)
with op.batch_alter_table(invite_signup.name, copy_from=invite_signup, recreate='always') as batch_op:
batch_op.add_column(sa.Column('id', sa.Integer(), nullable=True))
meta = sa.MetaData(bind=op.get_bind())
signup = sa.Table('signup', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('loginname', sa.Text(), nullable=True),
sa.Column('displayname', sa.Text(), nullable=True),
sa.Column('mail', sa.Text(), nullable=True),
sa.Column('pwhash', sa.Text(), nullable=True),
sa.Column('user_dn', sa.String(length=128), nullable=True),
sa.Column('type', sa.String(length=50), nullable=True),
sa.PrimaryKeyConstraint('id', name=op.f('pk_signup'))
)
invite_signup = sa.Table('invite_signup', meta,
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('invite_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['invite_id'], ['invite.id'], name=op.f('fk_invite_signup_invite_id_invite')),
sa.ForeignKeyConstraint(['token'], ['signup.token'], name=op.f('fk_invite_signup_token_signup')),
sa.PrimaryKeyConstraint('token', name=op.f('pk_invite_signup'))
)
op.execute(invite_signup.update().values(id=sa.select([signup.c.id]).where(signup.c.token==invite_signup.c.token).limit(1).as_scalar()))
with op.batch_alter_table(invite_signup.name, copy_from=invite_signup, recreate='always') as batch_op:
batch_op.drop_constraint('fk_invite_signup_token_signup', type_='foreignkey')
batch_op.create_foreign_key(batch_op.f('fk_invite_signup_signup_id_signup'), 'signup', ['id'], ['id'])
batch_op.drop_constraint('pk_invite_signup', 'primary')
batch_op.drop_column('token')
batch_op.create_primary_key('pk_invite_signup', ['id'])
def downgrade():
meta = sa.MetaData(bind=op.get_bind())
invite_signup = sa.Table('invite_signup', meta,
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('invite_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['invite_id'], ['invite.id'], name=op.f('fk_invite_signup_invite_id_invite')),
sa.ForeignKeyConstraint(['id'], ['signup.id'], name=op.f('fk_invite_signup_id_signup')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_invite_signup'))
)
with op.batch_alter_table(invite_signup.name, copy_from=invite_signup, recreate='always') as batch_op:
batch_op.add_column(sa.Column('token', sa.VARCHAR(length=128), nullable=True))
meta = sa.MetaData(bind=op.get_bind())
signup = sa.Table('signup', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('loginname', sa.Text(), nullable=True),
sa.Column('displayname', sa.Text(), nullable=True),
sa.Column('mail', sa.Text(), nullable=True),
sa.Column('pwhash', sa.Text(), nullable=True),
sa.Column('user_dn', sa.String(length=128), nullable=True),
sa.Column('type', sa.String(length=50), nullable=True),
sa.PrimaryKeyConstraint('id', name=op.f('pk_signup'))
)
invite_signup = sa.Table('invite_signup', meta,
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('invite_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['invite_id'], ['invite.id'], name=op.f('fk_invite_signup_invite_id_invite')),
sa.ForeignKeyConstraint(['id'], ['signup.id'], name=op.f('fk_invite_signup_id_signup')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_invite_signup'))
)
op.execute(invite_signup.update().values(token=sa.select([signup.c.token]).where(signup.c.id==invite_signup.c.id).limit(1).as_scalar()))
with op.batch_alter_table(invite_signup.name, copy_from=invite_signup, recreate='always') as batch_op:
batch_op.drop_constraint('fk_invite_signup_id_signup', type_='foreignkey')
batch_op.create_foreign_key(batch_op.f('fk_invite_signup_signup_token_signup'), 'signup', ['token'], ['token'])
batch_op.drop_constraint('pk_invite_signup', 'primary')
batch_op.drop_column('id')
batch_op.create_primary_key('pk_invite_signup', ['token'])
meta = sa.MetaData(bind=op.get_bind())
signup = sa.Table('signup', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('loginname', sa.Text(), nullable=True),
sa.Column('displayname', sa.Text(), nullable=True),
sa.Column('mail', sa.Text(), nullable=True),
sa.Column('pwhash', sa.Text(), nullable=True),
sa.Column('user_dn', sa.String(length=128), nullable=True),
sa.Column('type', sa.String(length=50), nullable=True),
sa.PrimaryKeyConstraint('id', name=op.f('pk_signup'))
)
with op.batch_alter_table(signup.name, copy_from=signup, recreate='always') as batch_op:
batch_op.drop_constraint('pk_signup', 'primary')
batch_op.create_primary_key('pk_signup', ['token'])
batch_op.drop_column('id')
import datetime
from crypt import crypt
from sqlalchemy import Column, String, Text, DateTime
from sqlalchemy import Column, String, Text, DateTime, Integer
from uffd.ldapalchemy.dbutils import DBRelationship
from uffd.database import db
......@@ -26,7 +26,8 @@ class Signup(db.Model):
As long as they are not completed, signup requests have no effect each other
or different parts of the application.'''
__tablename__ = 'signup'
token = Column(String(128), primary_key=True, default=token_urlfriendly)
id = Column(Integer(), primary_key=True, autoincrement=True)
token = Column(String(128), default=token_urlfriendly, nullable=False)
created = Column(DateTime, default=datetime.datetime.now, nullable=False)
loginname = Column(Text)
displayname = Column(Text)
......
{% extends 'base_narrow.html' %}
{% block body %}
<form action="{{ url_for(".signup_confirm_submit", token=signup.token) }}" method="POST">
<form action="{{ url_for(".signup_confirm_submit", signup_id=signup.id, token=signup.token) }}" method="POST">
<div class="col-12">
<h2 class="text-center">{{_('Complete Registration')}}</h2>
</div>
......
......@@ -4,7 +4,7 @@ an account was created on the {{ config.ORGANISATION_NAME }} infrastructure with
Please visit the following url to complete the account registration:
{{ url_for('signup.signup_confirm', token=signup.token, _external=True) }}
{{ url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token, _external=True) }}
**The link is valid for 48h**
......
import functools
import secrets
import datetime
from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, jsonify
from flask_babel import gettext as _
......@@ -69,19 +71,32 @@ def signup_submit():
signup_ratelimit.log(request.form['mail'])
return render_template('signup/submitted.html', signup=signup)
# signup_confirm* views are always accessible so other modules (e.g. invite) can reuse them
# Deprecated
@bp.route('/confirm/<token>')
def signup_confirm(token):
signup = Signup.query.get(token)
if not signup or signup.expired or signup.completed:
def signup_confirm_legacy(token):
matching_signup = None
filter_expr = Signup.created >= (datetime.datetime.now() - datetime.timedelta(hours=48))
for signup in Signup.query.filter(filter_expr):
if secrets.compare_digest(signup.token, token):
matching_signup = signup
if not matching_signup:
flash(_('Invalid signup link'))
return redirect(url_for('session.login'))
return redirect(url_for('signup.signup_confirm', signup_id=matching_signup.id, token=token))
# signup_confirm* views are always accessible so other modules (e.g. invite) can reuse them
@bp.route('/confirm/<int:signup_id>/<token>')
def signup_confirm(signup_id, token):
signup = Signup.query.get(signup_id)
if not signup or not secrets.compare_digest(signup.token, token) or signup.expired or signup.completed:
flash(_('Invalid signup link'))
return redirect(url_for('index'))
return render_template('signup/confirm.html', signup=signup)
@bp.route('/confirm/<token>', methods=['POST'])
def signup_confirm_submit(token):
signup = Signup.query.get(token)
if not signup or signup.expired or signup.completed:
@bp.route('/confirm/<int:signup_id>/<token>', methods=['POST'])
def signup_confirm_submit(signup_id, token):
signup = Signup.query.get(signup_id)
if not signup or not secrets.compare_digest(signup.token, token) or signup.expired or signup.completed:
flash(_('Invalid signup link'))
return redirect(url_for('index'))
confirm_delay = confirm_ratelimit.get_delay(token)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment