diff --git a/tests/test_signup.py b/tests/test_signup.py index f6b65275b4aa9ef988d3c010c970eb0747679110..79cbac7504224afeeac96ad986642104bca6fe6f 100644 --- a/tests/test_signup.py +++ b/tests/test_signup.py @@ -19,9 +19,9 @@ from utils import dump, UffdTestCase, db_flush def refetch_signup(signup): db.session.add(signup) db.session.commit() - token = signup.token + id = signup.id db_flush() - return Signup.query.get(token) + return Signup.query.get(id) # We assume in all tests that Signup.validate and Signup.check_password do # not alter any state @@ -166,18 +166,18 @@ class TestSignupModel(UffdTestCase): self.assert_validate_valid(signup) db.session.add(signup) db.session.commit() - signup1_token = signup.token + signup1_id = signup.id signup = Signup(loginname='newuser', displayname='New User', mail='test2@example.com', password='notsecret') self.assert_validate_valid(signup) db.session.add(signup) db.session.commit() - signup2_token = signup.token + signup2_id = signup.id db_flush() - signup = Signup.query.get(signup2_token) + signup = Signup.query.get(signup2_id) self.assert_finish_success(signup, 'notsecret') db.session.commit() db_flush() - signup = Signup.query.get(signup1_token) + signup = Signup.query.get(signup1_id) self.assert_finish_failure(signup, 'notsecret') user = User.query.get('uid=newuser,{}'.format(self.app.config['LDAP_USER_SEARCH_BASE'])) self.assertEqual(user.mail, 'test2@example.com') @@ -333,14 +333,14 @@ class TestSignupViews(UffdTestCase): self.assertFalse(signup.completed) if self.use_openldap: self.assertIsNone(login_get_user('newuser', 'notsecret')) - r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True) + r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True) dump('test_signup_confirm', r) self.assertEqual(r.status_code, 200) signup = refetch_signup(signup) self.assertFalse(signup.completed) if self.use_openldap: self.assertIsNone(login_get_user('newuser', 'notsecret')) - r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) + r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) dump('test_signup_confirm_submit', r) self.assertEqual(r.status_code, 200) signup = refetch_signup(signup) @@ -364,9 +364,9 @@ class TestSignupViews(UffdTestCase): self.assertFalse(signup.completed) self.assertIsNotNone(request.user) self.assertEqual(request.user.loginname, self.get_user().loginname) - r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True) + r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True) self.assertEqual(r.status_code, 200) - r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) + r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) self.assertEqual(r.status_code, 200) signup = refetch_signup(signup) self.assertTrue(signup.completed) @@ -374,10 +374,10 @@ class TestSignupViews(UffdTestCase): self.assertEqual(request.user.loginname, 'newuser') def test_confirm_notfound(self): - r = self.client.get(path=url_for('signup.signup_confirm', token='notasignuptoken'), follow_redirects=True) + r = self.client.get(path=url_for('signup.signup_confirm', signup_id=1, token='notasignuptoken'), follow_redirects=True) dump('test_signup_confirm_notfound', r) self.assertEqual(r.status_code, 200) - r = self.client.post(path=url_for('signup.signup_confirm_submit', token='notasignuptoken'), follow_redirects=True, data={'password': 'notsecret'}) + r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=1, token='notasignuptoken'), follow_redirects=True, data={'password': 'notsecret'}) dump('test_signup_confirm_submit_notfound', r) self.assertEqual(r.status_code, 200) @@ -385,10 +385,10 @@ class TestSignupViews(UffdTestCase): signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') signup.created = datetime.datetime.now() - datetime.timedelta(hours=49) signup = refetch_signup(signup) - r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True) + r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True) dump('test_signup_confirm_expired', r) self.assertEqual(r.status_code, 200) - r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) + r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) dump('test_signup_confirm_submit_expired', r) self.assertEqual(r.status_code, 200) @@ -397,17 +397,17 @@ class TestSignupViews(UffdTestCase): signup.user = self.get_user() signup = refetch_signup(signup) self.assertTrue(signup.completed) - r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True) + r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True) dump('test_signup_confirm_completed', r) self.assertEqual(r.status_code, 200) - r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) + r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) dump('test_signup_confirm_submit_completed', r) self.assertEqual(r.status_code, 200) def test_confirm_wrongpassword(self): signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') signup = refetch_signup(signup) - r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'wrongpassword'}) + r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'wrongpassword'}) dump('test_signup_confirm_wrongpassword', r) self.assertEqual(r.status_code, 200) self.assertFalse(signup.completed) @@ -416,7 +416,7 @@ class TestSignupViews(UffdTestCase): # finish returns None and error message (here: because the user already exists) signup = Signup(loginname=self.get_user().loginname, displayname='New User', mail='test@example.com', password='notsecret') signup = refetch_signup(signup) - r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) + r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) dump('test_signup_confirm_error', r) self.assertEqual(r.status_code, 200) self.assertFalse(signup.completed) @@ -425,12 +425,12 @@ class TestSignupViews(UffdTestCase): for i in range(20): signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') signup = refetch_signup(signup) - r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i}) + r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i}) self.assertEqual(r.status_code, 200) signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') signup = refetch_signup(signup) self.assertFalse(signup.completed) - r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) + r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) dump('test_signup_confirm_hostlimit', r) self.assertEqual(r.status_code, 200) self.assertFalse(signup.completed) @@ -440,10 +440,10 @@ class TestSignupViews(UffdTestCase): signup = refetch_signup(signup) self.assertFalse(signup.completed) for i in range(5): - r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i}) + r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i}) self.assertEqual(r.status_code, 200) self.assertFalse(signup.completed) - r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) + r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) dump('test_signup_confirm_confirmlimit', r) self.assertEqual(r.status_code, 200) self.assertFalse(signup.completed) diff --git a/uffd/invite/models.py b/uffd/invite/models.py index 1c54ba881b5401b6f022c2e96892ce026a144d7f..99b5dd5992410b56f70b1989aba5e4c238318ec3 100644 --- a/uffd/invite/models.py +++ b/uffd/invite/models.py @@ -94,7 +94,7 @@ class InviteGrant(db.Model): class InviteSignup(Signup): __tablename__ = 'invite_signup' - token = Column(String(128), ForeignKey('signup.token'), primary_key=True) + id = Column(Integer(), ForeignKey('signup.id'), primary_key=True) invite_id = Column(Integer(), ForeignKey('invite.id'), nullable=False) invite = relationship('Invite', back_populates='signups') diff --git a/uffd/migrations/versions/bf71799b7b9e_add_id_to_signup_table.py b/uffd/migrations/versions/bf71799b7b9e_add_id_to_signup_table.py new file mode 100644 index 0000000000000000000000000000000000000000..68aa43bfa37d9bf56e151a3253b9fa8ba40a2dbc --- /dev/null +++ b/uffd/migrations/versions/bf71799b7b9e_add_id_to_signup_table.py @@ -0,0 +1,131 @@ +"""Add id to signup table + +Revision ID: bf71799b7b9e +Revises: e9a67175e179 +Create Date: 2021-09-06 23:30:07.486102 + +""" +from alembic import op +import sqlalchemy as sa + +revision = 'bf71799b7b9e' +down_revision = 'e9a67175e179' +branch_labels = None +depends_on = None + +def upgrade(): + meta = sa.MetaData(bind=op.get_bind()) + signup = sa.Table('signup', meta, + sa.Column('token', sa.String(length=128), nullable=False), + sa.Column('created', sa.DateTime(), nullable=False), + sa.Column('loginname', sa.Text(), nullable=True), + sa.Column('displayname', sa.Text(), nullable=True), + sa.Column('mail', sa.Text(), nullable=True), + sa.Column('pwhash', sa.Text(), nullable=True), + sa.Column('user_dn', sa.String(length=128), nullable=True), + sa.Column('type', sa.String(length=50), nullable=True), + sa.PrimaryKeyConstraint('token', name=op.f('pk_signup')) + ) + with op.batch_alter_table(signup.name, copy_from=signup, recreate='always') as batch_op: + batch_op.add_column(sa.Column('id', sa.Integer(), autoincrement=True, nullable=False)) + batch_op.drop_constraint('pk_signup', 'primary') + batch_op.create_primary_key('pk_signup', ['id']) + + meta = sa.MetaData(bind=op.get_bind()) + invite_signup = sa.Table('invite_signup', meta, + sa.Column('token', sa.String(length=128), nullable=False), + sa.Column('invite_id', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(['invite_id'], ['invite.id'], name=op.f('fk_invite_signup_invite_id_invite')), + sa.ForeignKeyConstraint(['token'], ['signup.token'], name=op.f('fk_invite_signup_token_signup')), + sa.PrimaryKeyConstraint('token', name=op.f('pk_invite_signup')) + ) + with op.batch_alter_table(invite_signup.name, copy_from=invite_signup, recreate='always') as batch_op: + batch_op.add_column(sa.Column('id', sa.Integer(), nullable=True)) + + meta = sa.MetaData(bind=op.get_bind()) + signup = sa.Table('signup', meta, + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('token', sa.String(length=128), nullable=False), + sa.Column('created', sa.DateTime(), nullable=False), + sa.Column('loginname', sa.Text(), nullable=True), + sa.Column('displayname', sa.Text(), nullable=True), + sa.Column('mail', sa.Text(), nullable=True), + sa.Column('pwhash', sa.Text(), nullable=True), + sa.Column('user_dn', sa.String(length=128), nullable=True), + sa.Column('type', sa.String(length=50), nullable=True), + sa.PrimaryKeyConstraint('id', name=op.f('pk_signup')) + ) + invite_signup = sa.Table('invite_signup', meta, + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('token', sa.String(length=128), nullable=False), + sa.Column('invite_id', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(['invite_id'], ['invite.id'], name=op.f('fk_invite_signup_invite_id_invite')), + sa.ForeignKeyConstraint(['token'], ['signup.token'], name=op.f('fk_invite_signup_token_signup')), + sa.PrimaryKeyConstraint('token', name=op.f('pk_invite_signup')) + ) + op.execute(invite_signup.update().values(id=sa.select([signup.c.id]).where(signup.c.token==invite_signup.c.token).limit(1).as_scalar())) + with op.batch_alter_table(invite_signup.name, copy_from=invite_signup, recreate='always') as batch_op: + batch_op.drop_constraint('fk_invite_signup_token_signup', type_='foreignkey') + batch_op.create_foreign_key(batch_op.f('fk_invite_signup_signup_id_signup'), 'signup', ['id'], ['id']) + batch_op.drop_constraint('pk_invite_signup', 'primary') + batch_op.drop_column('token') + batch_op.create_primary_key('pk_invite_signup', ['id']) + +def downgrade(): + meta = sa.MetaData(bind=op.get_bind()) + invite_signup = sa.Table('invite_signup', meta, + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('invite_id', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(['invite_id'], ['invite.id'], name=op.f('fk_invite_signup_invite_id_invite')), + sa.ForeignKeyConstraint(['id'], ['signup.id'], name=op.f('fk_invite_signup_id_signup')), + sa.PrimaryKeyConstraint('id', name=op.f('pk_invite_signup')) + ) + with op.batch_alter_table(invite_signup.name, copy_from=invite_signup, recreate='always') as batch_op: + batch_op.add_column(sa.Column('token', sa.VARCHAR(length=128), nullable=True)) + + meta = sa.MetaData(bind=op.get_bind()) + signup = sa.Table('signup', meta, + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('token', sa.String(length=128), nullable=False), + sa.Column('created', sa.DateTime(), nullable=False), + sa.Column('loginname', sa.Text(), nullable=True), + sa.Column('displayname', sa.Text(), nullable=True), + sa.Column('mail', sa.Text(), nullable=True), + sa.Column('pwhash', sa.Text(), nullable=True), + sa.Column('user_dn', sa.String(length=128), nullable=True), + sa.Column('type', sa.String(length=50), nullable=True), + sa.PrimaryKeyConstraint('id', name=op.f('pk_signup')) + ) + invite_signup = sa.Table('invite_signup', meta, + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('token', sa.String(length=128), nullable=False), + sa.Column('invite_id', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(['invite_id'], ['invite.id'], name=op.f('fk_invite_signup_invite_id_invite')), + sa.ForeignKeyConstraint(['id'], ['signup.id'], name=op.f('fk_invite_signup_id_signup')), + sa.PrimaryKeyConstraint('id', name=op.f('pk_invite_signup')) + ) + op.execute(invite_signup.update().values(token=sa.select([signup.c.token]).where(signup.c.id==invite_signup.c.id).limit(1).as_scalar())) + with op.batch_alter_table(invite_signup.name, copy_from=invite_signup, recreate='always') as batch_op: + batch_op.drop_constraint('fk_invite_signup_id_signup', type_='foreignkey') + batch_op.create_foreign_key(batch_op.f('fk_invite_signup_signup_token_signup'), 'signup', ['token'], ['token']) + batch_op.drop_constraint('pk_invite_signup', 'primary') + batch_op.drop_column('id') + batch_op.create_primary_key('pk_invite_signup', ['token']) + + meta = sa.MetaData(bind=op.get_bind()) + signup = sa.Table('signup', meta, + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('token', sa.String(length=128), nullable=False), + sa.Column('created', sa.DateTime(), nullable=False), + sa.Column('loginname', sa.Text(), nullable=True), + sa.Column('displayname', sa.Text(), nullable=True), + sa.Column('mail', sa.Text(), nullable=True), + sa.Column('pwhash', sa.Text(), nullable=True), + sa.Column('user_dn', sa.String(length=128), nullable=True), + sa.Column('type', sa.String(length=50), nullable=True), + sa.PrimaryKeyConstraint('id', name=op.f('pk_signup')) + ) + with op.batch_alter_table(signup.name, copy_from=signup, recreate='always') as batch_op: + batch_op.drop_constraint('pk_signup', 'primary') + batch_op.create_primary_key('pk_signup', ['token']) + batch_op.drop_column('id') diff --git a/uffd/signup/models.py b/uffd/signup/models.py index e994acb00ccf3701f73d144730c743b91910e8ee..094837380d8c84175673a2e02bd7d77e19ca16e4 100644 --- a/uffd/signup/models.py +++ b/uffd/signup/models.py @@ -1,7 +1,7 @@ import datetime from crypt import crypt -from sqlalchemy import Column, String, Text, DateTime +from sqlalchemy import Column, String, Text, DateTime, Integer from uffd.ldapalchemy.dbutils import DBRelationship from uffd.database import db @@ -26,7 +26,8 @@ class Signup(db.Model): As long as they are not completed, signup requests have no effect each other or different parts of the application.''' __tablename__ = 'signup' - token = Column(String(128), primary_key=True, default=token_urlfriendly) + id = Column(Integer(), primary_key=True, autoincrement=True) + token = Column(String(128), default=token_urlfriendly, nullable=False) created = Column(DateTime, default=datetime.datetime.now, nullable=False) loginname = Column(Text) displayname = Column(Text) diff --git a/uffd/signup/templates/signup/confirm.html b/uffd/signup/templates/signup/confirm.html index d6b75e2a9b589ce874c0bfbbb5665716781187c8..ffd13dd4ec0131235bd256af750f7d75d7cfea83 100644 --- a/uffd/signup/templates/signup/confirm.html +++ b/uffd/signup/templates/signup/confirm.html @@ -1,7 +1,7 @@ {% extends 'base_narrow.html' %} {% block body %} -<form action="{{ url_for(".signup_confirm_submit", token=signup.token) }}" method="POST"> +<form action="{{ url_for(".signup_confirm_submit", signup_id=signup.id, token=signup.token) }}" method="POST"> <div class="col-12"> <h2 class="text-center">{{_('Complete Registration')}}</h2> </div> diff --git a/uffd/signup/templates/signup/mail.txt b/uffd/signup/templates/signup/mail.txt index b73479fb199170f7851d008ecab0b40050ad05ba..98c901dd0b74b338ace135c1890bde2e47c19a87 100644 --- a/uffd/signup/templates/signup/mail.txt +++ b/uffd/signup/templates/signup/mail.txt @@ -4,7 +4,7 @@ an account was created on the {{ config.ORGANISATION_NAME }} infrastructure with Please visit the following url to complete the account registration: -{{ url_for('signup.signup_confirm', token=signup.token, _external=True) }} +{{ url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token, _external=True) }} **The link is valid for 48h** diff --git a/uffd/signup/views.py b/uffd/signup/views.py index b15b14c6e37e09f1148e00398bdf48ee0ca7b08b..09b82d39a8ffcb1c5f61de66fbc77b259d0d7dd1 100644 --- a/uffd/signup/views.py +++ b/uffd/signup/views.py @@ -1,4 +1,6 @@ import functools +import secrets +import datetime from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, jsonify from flask_babel import gettext as _ @@ -69,19 +71,32 @@ def signup_submit(): signup_ratelimit.log(request.form['mail']) return render_template('signup/submitted.html', signup=signup) -# signup_confirm* views are always accessible so other modules (e.g. invite) can reuse them +# Deprecated @bp.route('/confirm/<token>') -def signup_confirm(token): - signup = Signup.query.get(token) - if not signup or signup.expired or signup.completed: +def signup_confirm_legacy(token): + matching_signup = None + filter_expr = Signup.created >= (datetime.datetime.now() - datetime.timedelta(hours=48)) + for signup in Signup.query.filter(filter_expr): + if secrets.compare_digest(signup.token, token): + matching_signup = signup + if not matching_signup: + flash(_('Invalid signup link')) + return redirect(url_for('session.login')) + return redirect(url_for('signup.signup_confirm', signup_id=matching_signup.id, token=token)) + +# signup_confirm* views are always accessible so other modules (e.g. invite) can reuse them +@bp.route('/confirm/<int:signup_id>/<token>') +def signup_confirm(signup_id, token): + signup = Signup.query.get(signup_id) + if not signup or not secrets.compare_digest(signup.token, token) or signup.expired or signup.completed: flash(_('Invalid signup link')) return redirect(url_for('index')) return render_template('signup/confirm.html', signup=signup) -@bp.route('/confirm/<token>', methods=['POST']) -def signup_confirm_submit(token): - signup = Signup.query.get(token) - if not signup or signup.expired or signup.completed: +@bp.route('/confirm/<int:signup_id>/<token>', methods=['POST']) +def signup_confirm_submit(signup_id, token): + signup = Signup.query.get(signup_id) + if not signup or not secrets.compare_digest(signup.token, token) or signup.expired or signup.completed: flash(_('Invalid signup link')) return redirect(url_for('index')) confirm_delay = confirm_ratelimit.get_delay(token)