From b9b6ecad7005b0bf8c06b9d756ceae6f18232173 Mon Sep 17 00:00:00 2001 From: Julian Rother <julianr@fsmpi.rwth-aachen.de> Date: Wed, 14 Apr 2021 00:42:39 +0200 Subject: [PATCH] Replace secret token with numeric id as invite primary key --- .../versions/54b2413586fd_invite_pk_change.py | 128 ++++++++++++++++++ tests/test_invite.py | 32 ++--- uffd/invite/models.py | 9 +- uffd/invite/templates/invite/list.html | 8 +- uffd/invite/views.py | 22 +-- 5 files changed, 164 insertions(+), 35 deletions(-) create mode 100644 migrations/versions/54b2413586fd_invite_pk_change.py diff --git a/migrations/versions/54b2413586fd_invite_pk_change.py b/migrations/versions/54b2413586fd_invite_pk_change.py new file mode 100644 index 00000000..5d927cf0 --- /dev/null +++ b/migrations/versions/54b2413586fd_invite_pk_change.py @@ -0,0 +1,128 @@ +"""invite pk change + +Revision ID: 54b2413586fd +Revises: 2a6b1fb82ce6 +Create Date: 2021-04-13 23:33:40.118507 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = '54b2413586fd' +down_revision = '2a6b1fb82ce6' +branch_labels = None +depends_on = None + +invite = sa.sql.table('invite', + sa.sql.column('id', sa.Integer()), + sa.sql.column('token', sa.String(length=128)) +) +invite_grant = sa.sql.table('invite_grant', + sa.sql.column('invite_id', sa.Integer()), + sa.sql.column('invite_token', sa.String(length=128)) +) +invite_roles = sa.sql.table('invite_roles', + sa.sql.column('invite_id', sa.Integer()), + sa.sql.column('invite_token', sa.String(length=128)) +) +invite_signup = sa.sql.table('invite_signup', + sa.sql.column('invite_id', sa.Integer()), + sa.sql.column('invite_token', sa.String(length=128)) +) + +def upgrade(): + # CHECK constraints get lost when reflecting from the actual table + meta = sa.MetaData(bind=op.get_bind()) + table = sa.Table('invite', meta, + sa.Column('token', sa.String(length=128), nullable=False), + sa.Column('created', sa.DateTime(), nullable=False), + sa.Column('valid_until', sa.DateTime(), nullable=False), + sa.Column('single_use', sa.Boolean(name=op.f('ck_invite_single_use')), nullable=False), + sa.Column('allow_signup', sa.Boolean(name=op.f('ck_invite_allow_signup')), nullable=False), + sa.Column('used', sa.Boolean(name=op.f('ck_invite_used')), nullable=False), + sa.Column('disabled', sa.Boolean(name=op.f('ck_invite_disabled')), nullable=False), + sa.PrimaryKeyConstraint('token', name=op.f('pk_invite')) + ) + with op.batch_alter_table('invite', copy_from=table) as batch_op: + batch_op.drop_constraint(batch_op.f('pk_invite'), type_='primary') + batch_op.add_column(sa.Column('id', sa.Integer(), autoincrement=True, nullable=False)) + batch_op.create_primary_key(batch_op.f('pk_invite'), ['id']) + batch_op.create_unique_constraint(batch_op.f('uq_invite_token'), ['token']) + with op.batch_alter_table('invite_grant', schema=None) as batch_op: + batch_op.add_column(sa.Column('invite_id', sa.Integer(), nullable=True)) + with op.batch_alter_table('invite_roles', schema=None) as batch_op: + batch_op.add_column(sa.Column('invite_id', sa.Integer(), nullable=True)) + with op.batch_alter_table('invite_signup', schema=None) as batch_op: + batch_op.add_column(sa.Column('invite_id', sa.Integer(), nullable=True)) + + op.execute(invite_grant.update().values(invite_id=sa.select([invite.c.id]).where(invite.c.token==invite_grant.c.invite_token).as_scalar())) + op.execute(invite_roles.update().values(invite_id=sa.select([invite.c.id]).where(invite.c.token==invite_roles.c.invite_token).as_scalar())) + op.execute(invite_signup.update().values(invite_id=sa.select([invite.c.id]).where(invite.c.token==invite_signup.c.invite_token).as_scalar())) + + with op.batch_alter_table('invite_grant', schema=None) as batch_op: + batch_op.alter_column('invite_id', existing_type=sa.INTEGER(), nullable=False) + batch_op.drop_constraint('fk_invite_grant_invite_token_invite', type_='foreignkey') + batch_op.create_foreign_key(batch_op.f('fk_invite_grant_invite_id_invite'), 'invite', ['invite_id'], ['id']) + batch_op.drop_column('invite_token') + with op.batch_alter_table('invite_roles', schema=None) as batch_op: + batch_op.drop_constraint(batch_op.f('pk_invite_roles'), type_='primary') + batch_op.create_primary_key(batch_op.f('pk_invite_roles'), ['invite_id', 'role_id']) + batch_op.drop_constraint('fk_invite_roles_invite_token_invite', type_='foreignkey') + batch_op.create_foreign_key(batch_op.f('fk_invite_roles_invite_id_invite'), 'invite', ['invite_id'], ['id']) + batch_op.drop_column('invite_token') + with op.batch_alter_table('invite_signup', schema=None) as batch_op: + batch_op.alter_column('invite_id', existing_type=sa.INTEGER(), nullable=False) + batch_op.drop_constraint('fk_invite_signup_invite_token_invite', type_='foreignkey') + batch_op.create_foreign_key(batch_op.f('fk_invite_signup_invite_id_invite'), 'invite', ['invite_id'], ['id']) + batch_op.drop_column('invite_token') + +def downgrade(): + with op.batch_alter_table('invite_signup', schema=None) as batch_op: + batch_op.add_column(sa.Column('invite_token', sa.VARCHAR(length=128), nullable=True)) + with op.batch_alter_table('invite_roles', schema=None) as batch_op: + batch_op.add_column(sa.Column('invite_token', sa.VARCHAR(length=128), nullable=True)) + with op.batch_alter_table('invite_grant', schema=None) as batch_op: + batch_op.add_column(sa.Column('invite_token', sa.VARCHAR(length=128), nullable=True)) + + op.execute(invite_grant.update().values(invite_token=sa.select([invite.c.token]).where(invite.c.id==invite_grant.c.invite_id).as_scalar())) + op.execute(invite_roles.update().values(invite_token=sa.select([invite.c.token]).where(invite.c.id==invite_roles.c.invite_id).as_scalar())) + op.execute(invite_signup.update().values(invite_token=sa.select([invite.c.token]).where(invite.c.id==invite_signup.c.invite_id).as_scalar())) + + with op.batch_alter_table('invite_signup', schema=None) as batch_op: + batch_op.alter_column('invite_token', existing_type=sa.VARCHAR(length=128), nullable=False) + batch_op.drop_constraint(batch_op.f('fk_invite_signup_invite_id_invite'), type_='foreignkey') + batch_op.create_foreign_key('fk_invite_signup_invite_token_invite', 'invite', ['invite_token'], ['token']) + batch_op.drop_column('invite_id') + with op.batch_alter_table('invite_roles', schema=None) as batch_op: + batch_op.alter_column('invite_token', existing_type=sa.VARCHAR(length=128), nullable=False) + batch_op.drop_constraint(batch_op.f('pk_invite_roles'), type_='primary') + batch_op.create_primary_key(batch_op.f('pk_invite_roles'), ['invite_token', 'role_id']) + batch_op.drop_constraint(batch_op.f('fk_invite_roles_invite_id_invite'), type_='foreignkey') + batch_op.create_foreign_key('fk_invite_roles_invite_token_invite', 'invite', ['invite_token'], ['token']) + batch_op.drop_column('invite_id') + with op.batch_alter_table('invite_grant', schema=None) as batch_op: + batch_op.alter_column('invite_token', existing_type=sa.VARCHAR(length=128), nullable=False) + batch_op.drop_constraint(batch_op.f('fk_invite_grant_invite_id_invite'), type_='foreignkey') + batch_op.create_foreign_key('fk_invite_grant_invite_token_invite', 'invite', ['invite_token'], ['token']) + batch_op.drop_column('invite_id') + + # CHECK constraints get lost when reflecting from the actual table + meta = sa.MetaData(bind=op.get_bind()) + table = sa.Table('invite', meta, + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('token', sa.String(length=128), nullable=False), + sa.Column('created', sa.DateTime(), nullable=False), + sa.Column('valid_until', sa.DateTime(), nullable=False), + sa.Column('single_use', sa.Boolean(name=op.f('ck_invite_single_use')), nullable=False), + sa.Column('allow_signup', sa.Boolean(name=op.f('ck_invite_allow_signup')), nullable=False), + sa.Column('used', sa.Boolean(name=op.f('ck_invite_used')), nullable=False), + sa.Column('disabled', sa.Boolean(name=op.f('ck_invite_disabled')), nullable=False), + sa.PrimaryKeyConstraint('id', name=op.f('pk_invite')), + sa.UniqueConstraint('token', name=op.f('uq_invite_token')) + ) + with op.batch_alter_table('invite', copy_from=table) as batch_op: + batch_op.drop_constraint(batch_op.f('uq_invite_token'), type_='unique') + batch_op.drop_constraint(batch_op.f('pk_invite'), type_='primary') + batch_op.drop_column('id') + batch_op.create_primary_key(batch_op.f('pk_invite'), ['token']) diff --git a/tests/test_invite.py b/tests/test_invite.py index c73130b0..ad61567f 100644 --- a/tests/test_invite.py +++ b/tests/test_invite.py @@ -333,11 +333,11 @@ class TestInviteViews(UffdTestCase): invite = Invite(valid_until=valid_until) db.session.add(invite) db.session.commit() - token = invite.token - self.assertTrue(Invite.query.get(token).active) - r = self.client.post(path=url_for('invite.disable', token=token), follow_redirects=True) + id = invite.id + self.assertTrue(Invite.query.get(id).active) + r = self.client.post(path=url_for('invite.disable', invite_id=id), follow_redirects=True) dump('invite_disable', r) - self.assertTrue(Invite.query.get(token).disabled) + self.assertTrue(Invite.query.get(id).disabled) def test_reset_disabled(self): self.login_admin() @@ -345,11 +345,11 @@ class TestInviteViews(UffdTestCase): invite = Invite(valid_until=valid_until, disabled=True) db.session.add(invite) db.session.commit() - token = invite.token - self.assertFalse(Invite.query.get(token).active) - r = self.client.post(path=url_for('invite.reset', token=token), follow_redirects=True) + id = invite.id + self.assertFalse(Invite.query.get(id).active) + r = self.client.post(path=url_for('invite.reset', invite_id=id), follow_redirects=True) dump('invite_reset_disabled', r) - self.assertTrue(Invite.query.get(token).active) + self.assertTrue(Invite.query.get(id).active) def test_reset_voided(self): self.login_admin() @@ -357,11 +357,11 @@ class TestInviteViews(UffdTestCase): invite = Invite(valid_until=valid_until, single_use=True, used=True) db.session.add(invite) db.session.commit() - token = invite.token - self.assertFalse(Invite.query.get(token).active) - r = self.client.post(path=url_for('invite.reset', token=token), follow_redirects=True) + id = invite.id + self.assertFalse(Invite.query.get(id).active) + r = self.client.post(path=url_for('invite.reset', invite_id=id), follow_redirects=True) dump('invite_reset_voided', r) - self.assertTrue(Invite.query.get(token).active) + self.assertTrue(Invite.query.get(id).active) def test_use(self): invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True, roles=[Role(name='testrole1'), Role(name='testrole2')]) @@ -421,7 +421,7 @@ class TestInviteViews(UffdTestCase): dump('invite_grant', r) self.assertEqual(r.status_code, 200) db_flush() - invite = Invite.query.get(token) + invite = Invite.query.filter_by(token=token).first() user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') self.assertTrue(invite.used) self.assertIn('baserole', [role.name for role in user.roles]) @@ -439,7 +439,7 @@ class TestInviteViews(UffdTestCase): r = self.client.post(path=url_for('invite.grant', token=token), follow_redirects=True) dump('invite_grant_invalid_invite', r) self.assertEqual(r.status_code, 200) - self.assertFalse(Invite.query.get(token).used) + self.assertFalse(Invite.query.filter_by(token=token).first().used) def test_grant_no_roles(self): invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60)) @@ -450,7 +450,7 @@ class TestInviteViews(UffdTestCase): r = self.client.post(path=url_for('invite.grant', token=token), follow_redirects=True) dump('invite_grant_no_roles', r) self.assertEqual(r.status_code, 200) - self.assertFalse(Invite.query.get(token).used) + self.assertFalse(Invite.query.filter_by(token=token).first().used) def test_grant_no_new_roles(self): user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') @@ -465,7 +465,7 @@ class TestInviteViews(UffdTestCase): r = self.client.post(path=url_for('invite.grant', token=token), follow_redirects=True) dump('invite_grant_no_new_roles', r) self.assertEqual(r.status_code, 200) - self.assertFalse(Invite.query.get(token).used) + self.assertFalse(Invite.query.filter_by(token=token).first().used) def test_signup(self): self.app.config['ROLES_BASEROLES'] = ['base'] diff --git a/uffd/invite/models.py b/uffd/invite/models.py index a737274c..ef2ea20f 100644 --- a/uffd/invite/models.py +++ b/uffd/invite/models.py @@ -11,13 +11,14 @@ from uffd.signup.models import Signup # pylint: disable=E1101 invite_roles = db.Table('invite_roles', - Column('invite_token', String(128), ForeignKey('invite.token'), primary_key=True), + Column('invite_id', Integer(), ForeignKey('invite.id'), primary_key=True), Column('role_id', Integer, ForeignKey('role.id'), primary_key=True) ) class Invite(db.Model): __tablename__ = 'invite' - token = Column(String(128), primary_key=True, default=lambda: secrets.token_hex(20)) + id = Column(Integer(), primary_key=True, autoincrement=True) + token = Column(String(128), unique=True, nullable=False, default=lambda: secrets.token_hex(20)) created = Column(DateTime, default=datetime.datetime.now, nullable=False) valid_until = Column(DateTime, nullable=False) single_use = Column(Boolean, default=True, nullable=False) @@ -50,7 +51,7 @@ class Invite(db.Model): class InviteGrant(db.Model): __tablename__ = 'invite_grant' id = Column(Integer(), primary_key=True, autoincrement=True) - invite_token = Column(String(128), ForeignKey('invite.token'), nullable=False) + invite_id = Column(Integer(), ForeignKey('invite.id'), nullable=False) user_dn = Column(String(128), nullable=False) user = DBRelationship('user_dn', User) @@ -70,7 +71,7 @@ class InviteGrant(db.Model): class InviteSignup(Signup): __tablename__ = 'invite_signup' token = Column(String(128), ForeignKey('signup.token'), primary_key=True) - invite_token = Column(String(128), ForeignKey('invite.token'), nullable=False) + invite_id = Column(Integer(), ForeignKey('invite.id'), nullable=False) invite = relationship('Invite', back_populates='signups') __mapper_args__ = { diff --git a/uffd/invite/templates/invite/list.html b/uffd/invite/templates/invite/list.html index 8db2ece9..c7cc9a4b 100644 --- a/uffd/invite/templates/invite/list.html +++ b/uffd/invite/templates/invite/list.html @@ -44,18 +44,18 @@ {% for role in invite.roles %}{{ ', ' if loop.index != 1 }}<a href="{{ url_for('role.show', roleid=role.id) }}" style="white-space: nowrap;"><i class="fas fa-key"></i> {{ role.name }}</a>{% endfor %} </td> <td> - <a href="#" data-toggle="modal" data-target="#modal-{{ invite.token }}"> + <a href="#" data-toggle="modal" data-target="#modal-{{ invite.id }}"> <span style="white-space: nowrap;">{{ invite.signups|selectattr('completed')|list|length }} <i class="fas fa-users" title="user registrations"></i></span>, <span style="white-space: nowrap;">{{ invite.grants|length }} <i class="fas fa-key" title="role grants"></i></span> </a> </td> <td class="text-right"> {% if invite.active %} - <form action="{{ url_for('invite.disable', token=invite.token) }}" method="POST"> + <form action="{{ url_for('invite.disable', invite_id=invite.id) }}" method="POST"> <button type="submit" class="btn btn-link btn-sm py-0" title="Disable"><i class="fas fa-ban" style="width: 1.5em;"></i></button> </form> {% else %} - <form action="{{ url_for('invite.reset', token=invite.token) }}" method="POST"> + <form action="{{ url_for('invite.reset', invite_id=invite.id) }}" method="POST"> <button type="submit" class="btn btn-link btn-sm py-0" title="Reenable" {{ 'disabled' if invite.expired }}><i class="fas fa-redo" style="width: 1.5em;"></i></button> </form> {% endif %} @@ -67,7 +67,7 @@ </div> {% for invite in invites %} -<div class="modal" tabindex="-1" id="modal-{{ invite.token }}"> +<div class="modal" tabindex="-1" id="modal-{{ invite.id }}"> <div class="modal-dialog"> <div class="modal-content"> <div class="modal-header"> diff --git a/uffd/invite/views.py b/uffd/invite/views.py index a0a00e86..27ec9346 100644 --- a/uffd/invite/views.py +++ b/uffd/invite/views.py @@ -57,25 +57,25 @@ def new_submit(): db.session.commit() return redirect(url_for('invite.index')) -@bp.route('/<token>/disable', methods=['POST']) +@bp.route('/<int:invite_id>/disable', methods=['POST']) @invite_acl_required @csrf_protect(blueprint=bp) -def disable(token): - Invite.query.get_or_404(token).disable() +def disable(invite_id): + Invite.query.get_or_404(invite_id).disable() db.session.commit() return redirect(url_for('.index')) -@bp.route('/<token>/reset', methods=['POST']) +@bp.route('/<int:invite_id>/reset', methods=['POST']) @invite_acl_required @csrf_protect(blueprint=bp) -def reset(token): - Invite.query.get_or_404(token).reset() +def reset(invite_id): + Invite.query.get_or_404(invite_id).reset() db.session.commit() return redirect(url_for('.index')) @bp.route('/<token>') def use(token): - invite = Invite.query.get_or_404(token) + invite = Invite.query.filter_by(token=token).first_or_404() if not invite.active: flash('Invalid invite link') return redirect('/') @@ -84,7 +84,7 @@ def use(token): @bp.route('/<token>/grant', methods=['POST']) @login_required() def grant(token): - invite = Invite.query.get_or_404(token) + invite = Invite.query.filter_by(token=token).first_or_404() invite_grant = InviteGrant(invite=invite, user=get_current_user()) db.session.add(invite_grant) success, msg = invite_grant.apply() @@ -103,7 +103,7 @@ def inject_invite_token(endpoint, values): @bp.route('/<token>/signup') def signup_start(token): - invite = Invite.query.get_or_404(token) + invite = Invite.query.filter_by(token=token).first_or_404() if not invite.active: flash('Invalid invite link') return redirect('/') @@ -117,7 +117,7 @@ def signup_check(token): if host_ratelimit.get_delay(): return jsonify({'status': 'ratelimited'}) host_ratelimit.log() - invite = Invite.query.get_or_404(token) + invite = Invite.query.filter_by(token=token).first_or_404() if not invite.active or not invite.allow_signup: return jsonify({'status': 'error'}), 403 if not User().set_loginname(request.form['loginname']): @@ -128,7 +128,7 @@ def signup_check(token): @bp.route('/<token>/signup', methods=['POST']) def signup_submit(token): - invite = Invite.query.get_or_404(token) + invite = Invite.query.filter_by(token=token).first_or_404() if request.form['password1'] != request.form['password2']: return render_template('signup/start.html', error='Passwords do not match') signup_delay = signup_ratelimit.get_delay(request.form['mail']) -- GitLab