Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • Dockerfile
  • consistent_strings
  • feature_invite_validuntil_minmax
  • incremental-sync
  • master
  • pw-autocomplete-off
  • pylint_disable_consider-using-f-string
  • qol_edits
  • redis-rate-limits
  • roles-recursive-cte
  • test_instance_path
  • typehints
  • v1.0.x
  • v1.1.x
  • v1.2.x
  • v1.x.x
  • v0.1.2
  • v0.1.4
  • v0.1.5
  • v0.2.0
  • v0.3.0
  • v1.0.0
  • v1.0.1
  • v1.0.2
  • v1.1.0
  • v1.1.1
  • v1.1.2
  • v1.2.0
  • v2.0.0
  • v2.0.1
  • v2.1.0
  • v2.2.0
32 results

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • strifel/uffd
  • thies/uffd-2
6 results
Select Git revision
  • Dockerfile
  • feature_invite_validuntil_minmax
  • incremental-sync
  • jwt_encode_inconsistencies
  • master
  • redis-rate-limits
  • roles-recursive-cte
  • typehints
  • v1.0.x
  • v1.1.x
  • v1.2.x
  • v1.x.x
  • v0.1.2
  • v0.1.4
  • v0.1.5
  • v0.2.0
  • v0.3.0
  • v1.0.0
  • v1.0.1
  • v1.0.2
  • v1.1.0
  • v1.1.1
  • v1.1.2
  • v1.2.0
  • v2.0.0
  • v2.0.1
  • v2.1.0
  • v2.2.0
  • v2.3.0
  • v2.3.1
30 results
Show changes

Commits on Source 6

Showing
with 418 additions and 123 deletions
...@@ -524,7 +524,7 @@ class TestInviteUseViews(UffdTestCase): ...@@ -524,7 +524,7 @@ class TestInviteUseViews(UffdTestCase):
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
token = invite.token token = invite.token
r = self.client.get(path=url_for('invite.use', token=token), follow_redirects=True) r = self.client.get(path=url_for('invite.use', invite_id=invite.id, token=token), follow_redirects=True)
dump('invite_use', r) dump('invite_use', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -534,7 +534,7 @@ class TestInviteUseViews(UffdTestCase): ...@@ -534,7 +534,7 @@ class TestInviteUseViews(UffdTestCase):
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
token = invite.token token = invite.token
r = self.client.get(path=url_for('invite.use', token=token), follow_redirects=True) r = self.client.get(path=url_for('invite.use', invite_id=invite.id, token=token), follow_redirects=True)
dump('invite_use_loggedin', r) dump('invite_use_loggedin', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -543,7 +543,7 @@ class TestInviteUseViews(UffdTestCase): ...@@ -543,7 +543,7 @@ class TestInviteUseViews(UffdTestCase):
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
token = invite.token token = invite.token
r = self.client.get(path=url_for('invite.use', token=token), follow_redirects=True) r = self.client.get(path=url_for('invite.use', invite_id=invite.id, token=token), follow_redirects=True)
dump('invite_use_inactive', r) dump('invite_use_inactive', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -565,6 +565,7 @@ class TestInviteUseViews(UffdTestCase): ...@@ -565,6 +565,7 @@ class TestInviteUseViews(UffdTestCase):
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
ldap.session.commit() ldap.session.commit()
invite_id = invite.id
token = invite.token token = invite.token
self.assertIn(role0, user.roles) self.assertIn(role0, user.roles)
self.assertNotIn(role1, user.roles) self.assertNotIn(role1, user.roles)
...@@ -573,7 +574,7 @@ class TestInviteUseViews(UffdTestCase): ...@@ -573,7 +574,7 @@ class TestInviteUseViews(UffdTestCase):
self.assertNotIn(group1, user.groups) self.assertNotIn(group1, user.groups)
self.assertFalse(invite.used) self.assertFalse(invite.used)
self.login_as('user') self.login_as('user')
r = self.client.post(path=url_for('invite.grant', token=token), follow_redirects=True) r = self.client.post(path=url_for('invite.grant', invite_id=invite_id, token=token), follow_redirects=True)
dump('invite_grant', r) dump('invite_grant', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
db_flush() db_flush()
...@@ -590,9 +591,10 @@ class TestInviteUseViews(UffdTestCase): ...@@ -590,9 +591,10 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), disabled=True) invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), disabled=True)
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
invite_id = invite.id
token = invite.token token = invite.token
self.login_as('user') self.login_as('user')
r = self.client.post(path=url_for('invite.grant', token=token), follow_redirects=True) r = self.client.post(path=url_for('invite.grant', invite_id=invite_id, token=token), follow_redirects=True)
dump('invite_grant_invalid_invite', r) dump('invite_grant_invalid_invite', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertFalse(Invite.query.filter_by(token=token).first().used) self.assertFalse(Invite.query.filter_by(token=token).first().used)
...@@ -601,9 +603,10 @@ class TestInviteUseViews(UffdTestCase): ...@@ -601,9 +603,10 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60)) invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60))
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
invite_id = invite.id
token = invite.token token = invite.token
self.login_as('user') self.login_as('user')
r = self.client.post(path=url_for('invite.grant', token=token), follow_redirects=True) r = self.client.post(path=url_for('invite.grant', invite_id=invite_id, token=token), follow_redirects=True)
dump('invite_grant_no_roles', r) dump('invite_grant_no_roles', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertFalse(Invite.query.filter_by(token=token).first().used) self.assertFalse(Invite.query.filter_by(token=token).first().used)
...@@ -616,9 +619,10 @@ class TestInviteUseViews(UffdTestCase): ...@@ -616,9 +619,10 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), roles=[role]) invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), roles=[role])
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
invite_id = invite.id
token = invite.token token = invite.token
self.login_as('user') self.login_as('user')
r = self.client.post(path=url_for('invite.grant', token=token), follow_redirects=True) r = self.client.post(path=url_for('invite.grant', invite_id=invite_id, token=token), follow_redirects=True)
dump('invite_grant_no_new_roles', r) dump('invite_grant_no_new_roles', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertFalse(Invite.query.filter_by(token=token).first().used) self.assertFalse(Invite.query.filter_by(token=token).first().used)
...@@ -636,8 +640,9 @@ class TestInviteUseViews(UffdTestCase): ...@@ -636,8 +640,9 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), roles=[role1, role2], allow_signup=True) invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), roles=[role1, role2], allow_signup=True)
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
invite_id = invite.id
token = invite.token token = invite.token
r = self.client.get(path=url_for('invite.signup_start', token=token), follow_redirects=True) r = self.client.get(path=url_for('invite.signup_start', invite_id=invite_id, token=token), follow_redirects=True)
dump('invite_signup_start', r) dump('invite_signup_start', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('invite.signup_submit', token=token), r = self.client.post(path=url_for('invite.signup_submit', token=token),
...@@ -657,7 +662,7 @@ class TestInviteUseViews(UffdTestCase): ...@@ -657,7 +662,7 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True, disabled=True) invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True, disabled=True)
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
r = self.client.get(path=url_for('invite.signup_start', token=invite.token), follow_redirects=True) r = self.client.get(path=url_for('invite.signup_start', invite_id=invite.id, token=invite.token), follow_redirects=True)
dump('invite_signup_invalid_invite', r) dump('invite_signup_invalid_invite', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -665,7 +670,7 @@ class TestInviteUseViews(UffdTestCase): ...@@ -665,7 +670,7 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=False) invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=False)
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
r = self.client.get(path=url_for('invite.signup_start', token=invite.token), follow_redirects=True) r = self.client.get(path=url_for('invite.signup_start', invite_id=invite.id, token=invite.token), follow_redirects=True)
dump('invite_signup_nosignup', r) dump('invite_signup_nosignup', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -673,7 +678,7 @@ class TestInviteUseViews(UffdTestCase): ...@@ -673,7 +678,7 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True)
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
r = self.client.post(path=url_for('invite.signup_submit', token=invite.token), r = self.client.post(path=url_for('invite.signup_submit', invite_id=invite.id, token=invite.token),
data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com', data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com',
'password1': 'notsecret', 'password2': 'notthesame'}, follow_redirects=True) 'password1': 'notsecret', 'password2': 'notthesame'}, follow_redirects=True)
dump('invite_signup_wrongpassword', r) dump('invite_signup_wrongpassword', r)
...@@ -683,7 +688,7 @@ class TestInviteUseViews(UffdTestCase): ...@@ -683,7 +688,7 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True)
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
r = self.client.post(path=url_for('invite.signup_submit', token=invite.token), r = self.client.post(path=url_for('invite.signup_submit', invite_id=invite.id, token=invite.token),
data={'loginname': '', 'displayname': 'New User', 'mail': 'test@example.com', data={'loginname': '', 'displayname': 'New User', 'mail': 'test@example.com',
'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True) 'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True)
dump('invite_signup_invalid', r) dump('invite_signup_invalid', r)
...@@ -694,7 +699,7 @@ class TestInviteUseViews(UffdTestCase): ...@@ -694,7 +699,7 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True)
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
r = self.client.post(path=url_for('invite.signup_submit', token=invite.token), r = self.client.post(path=url_for('invite.signup_submit', invite_id=invite.id, token=invite.token),
data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com', data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com',
'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True) 'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True)
dump('invite_signup_mailerror', r) dump('invite_signup_mailerror', r)
...@@ -704,14 +709,15 @@ class TestInviteUseViews(UffdTestCase): ...@@ -704,14 +709,15 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True)
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
invite_id = invite.id
token = invite.token token = invite.token
for i in range(20): for i in range(20):
r = self.client.post(path=url_for('invite.signup_submit', token=token), r = self.client.post(path=url_for('invite.signup_submit', invite_id=invite_id, token=token),
data={'loginname': 'newuser%d'%i, 'displayname': 'New User', 'mail': 'test%d@example.com'%i, data={'loginname': 'newuser%d'%i, 'displayname': 'New User', 'mail': 'test%d@example.com'%i,
'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True) 'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.app.last_mail = None self.app.last_mail = None
r = self.client.post(path=url_for('invite.signup_submit', token=token), r = self.client.post(path=url_for('invite.signup_submit', invite_id=invite_id, token=token),
data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com', data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com',
'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True) 'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True)
dump('invite_signup_hostlimit', r) dump('invite_signup_hostlimit', r)
...@@ -723,14 +729,15 @@ class TestInviteUseViews(UffdTestCase): ...@@ -723,14 +729,15 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True)
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
invite_id = invite.id
token = invite.token token = invite.token
for i in range(3): for i in range(3):
r = self.client.post(path=url_for('invite.signup_submit', token=token), r = self.client.post(path=url_for('invite.signup_submit', invite_id=invite_id, token=token),
data={'loginname': 'newuser%d'%i, 'displayname': 'New User', 'mail': 'test@example.com', data={'loginname': 'newuser%d'%i, 'displayname': 'New User', 'mail': 'test@example.com',
'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True) 'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.app.last_mail = None self.app.last_mail = None
r = self.client.post(path=url_for('invite.signup_submit', token=token), r = self.client.post(path=url_for('invite.signup_submit', invite_id=invite_id, token=token),
data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com', data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com',
'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True) 'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True)
dump('invite_signup_maillimit', r) dump('invite_signup_maillimit', r)
...@@ -742,8 +749,9 @@ class TestInviteUseViews(UffdTestCase): ...@@ -742,8 +749,9 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True)
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
invite_id = invite.id
token = invite.token token = invite.token
r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True, r = self.client.post(path=url_for('invite.signup_check', invite_id=invite_id, token=token), follow_redirects=True,
data={'loginname': 'newuser'}) data={'loginname': 'newuser'})
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertEqual(r.content_type, 'application/json') self.assertEqual(r.content_type, 'application/json')
...@@ -753,8 +761,9 @@ class TestInviteUseViews(UffdTestCase): ...@@ -753,8 +761,9 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True)
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
invite_id = invite.id
token = invite.token token = invite.token
r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True, r = self.client.post(path=url_for('invite.signup_check', invite_id=invite_id, token=token), follow_redirects=True,
data={'loginname': ''}) data={'loginname': ''})
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertEqual(r.content_type, 'application/json') self.assertEqual(r.content_type, 'application/json')
...@@ -764,8 +773,9 @@ class TestInviteUseViews(UffdTestCase): ...@@ -764,8 +773,9 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True)
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
invite_id = invite.id
token = invite.token token = invite.token
r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True, r = self.client.post(path=url_for('invite.signup_check', invite_id=invite_id, token=token), follow_redirects=True,
data={'loginname': 'testuser'}) data={'loginname': 'testuser'})
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertEqual(r.content_type, 'application/json') self.assertEqual(r.content_type, 'application/json')
...@@ -775,8 +785,9 @@ class TestInviteUseViews(UffdTestCase): ...@@ -775,8 +785,9 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=False) invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=False)
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
invite_id = invite.id
token = invite.token token = invite.token
r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True, r = self.client.post(path=url_for('invite.signup_check', invite_id=invite_id, token=token), follow_redirects=True,
data={'loginname': 'testuser'}) data={'loginname': 'testuser'})
self.assertEqual(r.status_code, 403) self.assertEqual(r.status_code, 403)
self.assertEqual(r.content_type, 'application/json') self.assertEqual(r.content_type, 'application/json')
...@@ -786,8 +797,9 @@ class TestInviteUseViews(UffdTestCase): ...@@ -786,8 +797,9 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True, disabled=True) invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True, disabled=True)
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
invite_id = invite.id
token = invite.token token = invite.token
r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True, r = self.client.post(path=url_for('invite.signup_check', invite_id=invite_id, token=token), follow_redirects=True,
data={'loginname': 'testuser'}) data={'loginname': 'testuser'})
self.assertEqual(r.status_code, 403) self.assertEqual(r.status_code, 403)
self.assertEqual(r.content_type, 'application/json') self.assertEqual(r.content_type, 'application/json')
...@@ -797,13 +809,14 @@ class TestInviteUseViews(UffdTestCase): ...@@ -797,13 +809,14 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True)
db.session.add(invite) db.session.add(invite)
db.session.commit() db.session.commit()
invite_id = invite.id
token = invite.token token = invite.token
for i in range(20): for i in range(20):
r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True, r = self.client.post(path=url_for('invite.signup_check', invite_id=invite_id, token=token), follow_redirects=True,
data={'loginname': 'testuser'}) data={'loginname': 'testuser'})
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertEqual(r.content_type, 'application/json') self.assertEqual(r.content_type, 'application/json')
r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True, r = self.client.post(path=url_for('invite.signup_check', invite_id=invite_id, token=token), follow_redirects=True,
data={'loginname': 'testuser'}) data={'loginname': 'testuser'})
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertEqual(r.content_type, 'application/json') self.assertEqual(r.content_type, 'application/json')
......
...@@ -60,7 +60,7 @@ class TestSelfservice(UffdTestCase): ...@@ -60,7 +60,7 @@ class TestSelfservice(UffdTestCase):
token = MailToken.query.filter(MailToken.loginname == user.loginname).first() token = MailToken.query.filter(MailToken.loginname == user.loginname).first()
self.assertEqual(token.newmail, 'newemail@example.com') self.assertEqual(token.newmail, 'newemail@example.com')
self.assertIn(token.token, str(self.app.last_mail.get_content())) self.assertIn(token.token, str(self.app.last_mail.get_content()))
r = self.client.get(path=url_for('selfservice.token_mail', token=token.token), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_mail', token_id=token.id, token=token.token), follow_redirects=True)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
_user = request.user _user = request.user
self.assertEqual(_user.mail, 'newemail@example.com') self.assertEqual(_user.mail, 'newemail@example.com')
...@@ -135,7 +135,7 @@ class TestSelfservice(UffdTestCase): ...@@ -135,7 +135,7 @@ class TestSelfservice(UffdTestCase):
def test_token_mail_emptydb(self): def test_token_mail_emptydb(self):
self.login_as('user') self.login_as('user')
user = request.user user = request.user
r = self.client.get(path=url_for('selfservice.token_mail', token='A'*128), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_mail', token_id=1, token='A'*128), follow_redirects=True)
dump('token_mail_emptydb', r) dump('token_mail_emptydb', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
_user = request.user _user = request.user
...@@ -144,9 +144,10 @@ class TestSelfservice(UffdTestCase): ...@@ -144,9 +144,10 @@ class TestSelfservice(UffdTestCase):
def test_token_mail_invalid(self): def test_token_mail_invalid(self):
self.login_as('user') self.login_as('user')
user = request.user user = request.user
db.session.add(MailToken(loginname=user.loginname, newmail='newusermail@example.com')) token = MailToken(loginname=user.loginname, newmail='newusermail@example.com')
db.session.add(token)
db.session.commit() db.session.commit()
r = self.client.get(path=url_for('selfservice.token_mail', token='A'*128), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_mail', token_id=token.id, token='A'*128), follow_redirects=True)
dump('token_mail_invalid', r) dump('token_mail_invalid', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
_user = request.user _user = request.user
...@@ -161,7 +162,7 @@ class TestSelfservice(UffdTestCase): ...@@ -161,7 +162,7 @@ class TestSelfservice(UffdTestCase):
admin_token = MailToken(loginname='testadmin', newmail='newadminmail@example.com') admin_token = MailToken(loginname='testadmin', newmail='newadminmail@example.com')
db.session.add(admin_token) db.session.add(admin_token)
db.session.commit() db.session.commit()
r = self.client.get(path=url_for('selfservice.token_mail', token=admin_token.token), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_mail', token_id=admin_token.id, token=admin_token.token), follow_redirects=True)
dump('token_mail_wrong_user', r) dump('token_mail_wrong_user', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
_user = request.user _user = request.user
...@@ -176,7 +177,7 @@ class TestSelfservice(UffdTestCase): ...@@ -176,7 +177,7 @@ class TestSelfservice(UffdTestCase):
created=(datetime.datetime.now() - datetime.timedelta(days=10))) created=(datetime.datetime.now() - datetime.timedelta(days=10)))
db.session.add(token) db.session.add(token)
db.session.commit() db.session.commit()
r = self.client.get(path=url_for('selfservice.token_mail', token=token.token), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_mail', token_id=token.id, token=token.token), follow_redirects=True)
dump('token_mail_expired', r) dump('token_mail_expired', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
_user = request.user _user = request.user
...@@ -243,10 +244,10 @@ class TestSelfservice(UffdTestCase): ...@@ -243,10 +244,10 @@ class TestSelfservice(UffdTestCase):
token = PasswordToken(loginname=user.loginname) token = PasswordToken(loginname=user.loginname)
db.session.add(token) db.session.add(token)
db.session.commit() db.session.commit()
r = self.client.get(path=url_for('selfservice.token_password', token=token.token), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token=token.token), follow_redirects=True)
dump('token_password', r) dump('token_password', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('selfservice.token_password', token=token.token), r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token=token.token),
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True) data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_submit', r) dump('token_password_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -256,11 +257,11 @@ class TestSelfservice(UffdTestCase): ...@@ -256,11 +257,11 @@ class TestSelfservice(UffdTestCase):
if self.use_userconnection: if self.use_userconnection:
self.skipTest('Password Token is not possible in user mode') self.skipTest('Password Token is not possible in user mode')
user = self.get_user() user = self.get_user()
r = self.client.get(path=url_for('selfservice.token_password', token='A'*128), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_password', token_id=1, token='A'*128), follow_redirects=True)
dump('token_password_emptydb', r) dump('token_password_emptydb', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertIn(b'Token expired, please try again', r.data) self.assertIn(b'Token expired, please try again', r.data)
r = self.client.post(path=url_for('selfservice.token_password', token='A'*128), r = self.client.post(path=url_for('selfservice.token_password', token_id=1, token='A'*128),
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True) data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_emptydb_submit', r) dump('token_password_emptydb_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -274,11 +275,11 @@ class TestSelfservice(UffdTestCase): ...@@ -274,11 +275,11 @@ class TestSelfservice(UffdTestCase):
token = PasswordToken(loginname=user.loginname) token = PasswordToken(loginname=user.loginname)
db.session.add(token) db.session.add(token)
db.session.commit() db.session.commit()
r = self.client.get(path=url_for('selfservice.token_password', token='A'*128), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token='A'*128), follow_redirects=True)
dump('token_password_invalid', r) dump('token_password_invalid', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertIn(b'Token expired, please try again', r.data) self.assertIn(b'Token expired, please try again', r.data)
r = self.client.post(path=url_for('selfservice.token_password', token='A'*128), r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token='A'*128),
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True) data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_invalid_submit', r) dump('token_password_invalid_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -293,11 +294,11 @@ class TestSelfservice(UffdTestCase): ...@@ -293,11 +294,11 @@ class TestSelfservice(UffdTestCase):
created=(datetime.datetime.now() - datetime.timedelta(days=10))) created=(datetime.datetime.now() - datetime.timedelta(days=10)))
db.session.add(token) db.session.add(token)
db.session.commit() db.session.commit()
r = self.client.get(path=url_for('selfservice.token_password', token=token.token), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token=token.token), follow_redirects=True)
dump('token_password_invalid_expired', r) dump('token_password_invalid_expired', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertIn(b'Token expired, please try again', r.data) self.assertIn(b'Token expired, please try again', r.data)
r = self.client.post(path=url_for('selfservice.token_password', token=token.token), r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token=token.token),
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True) data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_invalid_expired_submit', r) dump('token_password_invalid_expired_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -311,9 +312,9 @@ class TestSelfservice(UffdTestCase): ...@@ -311,9 +312,9 @@ class TestSelfservice(UffdTestCase):
token = PasswordToken(loginname=user.loginname) token = PasswordToken(loginname=user.loginname)
db.session.add(token) db.session.add(token)
db.session.commit() db.session.commit()
r = self.client.get(path=url_for('selfservice.token_password', token=token.token), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token=token.token), follow_redirects=True)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('selfservice.token_password', token=token.token), r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token=token.token),
data={'password1': 'newpassword', 'password2': 'differentpassword'}, follow_redirects=True) data={'password1': 'newpassword', 'password2': 'differentpassword'}, follow_redirects=True)
dump('token_password_different_passwords_submit', r) dump('token_password_different_passwords_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
......
...@@ -18,9 +18,9 @@ from utils import dump, UffdTestCase, db_flush ...@@ -18,9 +18,9 @@ from utils import dump, UffdTestCase, db_flush
def refetch_signup(signup): def refetch_signup(signup):
db.session.add(signup) db.session.add(signup)
db.session.commit() db.session.commit()
token = signup.token id = signup.id
db_flush() db_flush()
return Signup.query.get(token) return Signup.query.get(id)
# We assume in all tests that Signup.validate and Signup.check_password do # We assume in all tests that Signup.validate and Signup.check_password do
# not alter any state # not alter any state
...@@ -165,18 +165,18 @@ class TestSignupModel(UffdTestCase): ...@@ -165,18 +165,18 @@ class TestSignupModel(UffdTestCase):
self.assert_validate_valid(signup) self.assert_validate_valid(signup)
db.session.add(signup) db.session.add(signup)
db.session.commit() db.session.commit()
signup1_token = signup.token signup1_id = signup.id
signup = Signup(loginname='newuser', displayname='New User', mail='test2@example.com', password='notsecret') signup = Signup(loginname='newuser', displayname='New User', mail='test2@example.com', password='notsecret')
self.assert_validate_valid(signup) self.assert_validate_valid(signup)
db.session.add(signup) db.session.add(signup)
db.session.commit() db.session.commit()
signup2_token = signup.token signup2_id = signup.id
db_flush() db_flush()
signup = Signup.query.get(signup2_token) signup = Signup.query.get(signup2_id)
self.assert_finish_success(signup, 'notsecret') self.assert_finish_success(signup, 'notsecret')
db.session.commit() db.session.commit()
db_flush() db_flush()
signup = Signup.query.get(signup1_token) signup = Signup.query.get(signup1_id)
self.assert_finish_failure(signup, 'notsecret') self.assert_finish_failure(signup, 'notsecret')
user = User.query.get('uid=newuser,{}'.format(self.app.config['LDAP_USER_SEARCH_BASE'])) user = User.query.get('uid=newuser,{}'.format(self.app.config['LDAP_USER_SEARCH_BASE']))
self.assertEqual(user.mail, 'test2@example.com') self.assertEqual(user.mail, 'test2@example.com')
...@@ -328,14 +328,14 @@ class TestSignupViews(UffdTestCase): ...@@ -328,14 +328,14 @@ class TestSignupViews(UffdTestCase):
self.assertFalse(signup.completed) self.assertFalse(signup.completed)
if self.use_openldap: if self.use_openldap:
self.assertIsNone(login_get_user('newuser', 'notsecret')) self.assertIsNone(login_get_user('newuser', 'notsecret'))
r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True) r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True)
dump('test_signup_confirm', r) dump('test_signup_confirm', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
signup = refetch_signup(signup) signup = refetch_signup(signup)
self.assertFalse(signup.completed) self.assertFalse(signup.completed)
if self.use_openldap: if self.use_openldap:
self.assertIsNone(login_get_user('newuser', 'notsecret')) self.assertIsNone(login_get_user('newuser', 'notsecret'))
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_submit', r) dump('test_signup_confirm_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
signup = refetch_signup(signup) signup = refetch_signup(signup)
...@@ -355,9 +355,9 @@ class TestSignupViews(UffdTestCase): ...@@ -355,9 +355,9 @@ class TestSignupViews(UffdTestCase):
self.assertFalse(signup.completed) self.assertFalse(signup.completed)
self.assertIsNotNone(request.user) self.assertIsNotNone(request.user)
self.assertEqual(request.user.loginname, self.get_user().loginname) self.assertEqual(request.user.loginname, self.get_user().loginname)
r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True) r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
signup = refetch_signup(signup) signup = refetch_signup(signup)
self.assertTrue(signup.completed) self.assertTrue(signup.completed)
...@@ -365,10 +365,10 @@ class TestSignupViews(UffdTestCase): ...@@ -365,10 +365,10 @@ class TestSignupViews(UffdTestCase):
self.assertEqual(request.user.loginname, 'newuser') self.assertEqual(request.user.loginname, 'newuser')
def test_confirm_notfound(self): def test_confirm_notfound(self):
r = self.client.get(path=url_for('signup.signup_confirm', token='notasignuptoken'), follow_redirects=True) r = self.client.get(path=url_for('signup.signup_confirm', signup_id=1, token='notasignuptoken'), follow_redirects=True)
dump('test_signup_confirm_notfound', r) dump('test_signup_confirm_notfound', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token='notasignuptoken'), follow_redirects=True, data={'password': 'notsecret'}) r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=1, token='notasignuptoken'), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_submit_notfound', r) dump('test_signup_confirm_submit_notfound', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -376,10 +376,10 @@ class TestSignupViews(UffdTestCase): ...@@ -376,10 +376,10 @@ class TestSignupViews(UffdTestCase):
signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
signup.created = datetime.datetime.now() - datetime.timedelta(hours=49) signup.created = datetime.datetime.now() - datetime.timedelta(hours=49)
signup = refetch_signup(signup) signup = refetch_signup(signup)
r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True) r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True)
dump('test_signup_confirm_expired', r) dump('test_signup_confirm_expired', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_submit_expired', r) dump('test_signup_confirm_submit_expired', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -388,17 +388,17 @@ class TestSignupViews(UffdTestCase): ...@@ -388,17 +388,17 @@ class TestSignupViews(UffdTestCase):
signup.user = self.get_user() signup.user = self.get_user()
signup = refetch_signup(signup) signup = refetch_signup(signup)
self.assertTrue(signup.completed) self.assertTrue(signup.completed)
r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True) r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True)
dump('test_signup_confirm_completed', r) dump('test_signup_confirm_completed', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_submit_completed', r) dump('test_signup_confirm_submit_completed', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
def test_confirm_wrongpassword(self): def test_confirm_wrongpassword(self):
signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
signup = refetch_signup(signup) signup = refetch_signup(signup)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'wrongpassword'}) r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'wrongpassword'})
dump('test_signup_confirm_wrongpassword', r) dump('test_signup_confirm_wrongpassword', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed) self.assertFalse(signup.completed)
...@@ -407,7 +407,7 @@ class TestSignupViews(UffdTestCase): ...@@ -407,7 +407,7 @@ class TestSignupViews(UffdTestCase):
# finish returns None and error message (here: because the user already exists) # finish returns None and error message (here: because the user already exists)
signup = Signup(loginname=self.get_user().loginname, displayname='New User', mail='test@example.com', password='notsecret') signup = Signup(loginname=self.get_user().loginname, displayname='New User', mail='test@example.com', password='notsecret')
signup = refetch_signup(signup) signup = refetch_signup(signup)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_error', r) dump('test_signup_confirm_error', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed) self.assertFalse(signup.completed)
...@@ -416,12 +416,12 @@ class TestSignupViews(UffdTestCase): ...@@ -416,12 +416,12 @@ class TestSignupViews(UffdTestCase):
for i in range(20): for i in range(20):
signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
signup = refetch_signup(signup) signup = refetch_signup(signup)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i}) r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i})
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
signup = refetch_signup(signup) signup = refetch_signup(signup)
self.assertFalse(signup.completed) self.assertFalse(signup.completed)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_hostlimit', r) dump('test_signup_confirm_hostlimit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed) self.assertFalse(signup.completed)
...@@ -431,10 +431,10 @@ class TestSignupViews(UffdTestCase): ...@@ -431,10 +431,10 @@ class TestSignupViews(UffdTestCase):
signup = refetch_signup(signup) signup = refetch_signup(signup)
self.assertFalse(signup.completed) self.assertFalse(signup.completed)
for i in range(5): for i in range(5):
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i}) r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i})
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed) self.assertFalse(signup.completed)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_confirmlimit', r) dump('test_signup_confirm_confirmlimit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed) self.assertFalse(signup.completed)
......
import functools import functools
import secrets
from flask import Blueprint, jsonify, current_app, request, abort from flask import Blueprint, jsonify, current_app, request, abort
...@@ -15,7 +16,10 @@ def apikey_required(scope=None): ...@@ -15,7 +16,10 @@ def apikey_required(scope=None):
if 'Authorization' not in request.headers or not request.headers['Authorization'].startswith('Bearer '): if 'Authorization' not in request.headers or not request.headers['Authorization'].startswith('Bearer '):
return 'Unauthorized', 401, {'WWW-Authenticate': 'Bearer'} return 'Unauthorized', 401, {'WWW-Authenticate': 'Bearer'}
token = request.headers['Authorization'][7:].strip() token = request.headers['Authorization'][7:].strip()
request.api_client = current_app.config['API_CLIENTS'].get(token) request.api_client = None
for client_token, client in current_app.config['API_CLIENTS'].items():
if secrets.compare_digest(client_token, token):
request.api_client = client
if request.api_client is None: if request.api_client is None:
return 'Unauthorized', 401, {'WWW-Authenticate': 'Bearer error="invalid_token"'} return 'Unauthorized', 401, {'WWW-Authenticate': 'Bearer error="invalid_token"'}
if scope is not None and scope not in request.api_client.get('scopes', []): if scope is not None and scope not in request.api_client.get('scopes', []):
......
...@@ -94,7 +94,7 @@ class InviteGrant(db.Model): ...@@ -94,7 +94,7 @@ class InviteGrant(db.Model):
class InviteSignup(Signup): class InviteSignup(Signup):
__tablename__ = 'invite_signup' __tablename__ = 'invite_signup'
token = Column(String(128), ForeignKey('signup.token'), primary_key=True) id = Column(Integer(), ForeignKey('signup.id'), primary_key=True)
invite_id = Column(Integer(), ForeignKey('invite.id'), nullable=False) invite_id = Column(Integer(), ForeignKey('invite.id'), nullable=False)
invite = relationship('Invite', back_populates='signups') invite = relationship('Invite', back_populates='signups')
......
...@@ -22,8 +22,8 @@ ...@@ -22,8 +22,8 @@
<tr> <tr>
<td> <td>
{% if invite.creator == request.user and invite.active %} {% if invite.creator == request.user and invite.active %}
<a href="{{ url_for('invite.use', token=invite.token) }}"><code>{{ invite.short_token }}</code></a> <a href="{{ url_for('invite.use', invite_id=invite.id, token=invite.token) }}"><code>{{ invite.short_token }}</code></a>
<button type="button" class="btn btn-link btn-sm p-0 copy-clipboard" data-copy="{{ url_for('invite.use', token=invite.token, _external=True) }}" title="{{_('Copy link to clipboard')}}"><i class="fas fa-clipboard"></i></button> <button type="button" class="btn btn-link btn-sm p-0 copy-clipboard" data-copy="{{ url_for('invite.use', invite_id=invite.id, token=invite.token, _external=True) }}" title="{{_('Copy link to clipboard')}}"><i class="fas fa-clipboard"></i></button>
<button type="button" class="btn btn-link btn-sm p-0" data-toggle="modal" data-target="#modal-{{ invite.id }}-qrcode" title="{{_('Show link as QR code')}}"><i class="fas fa-qrcode"></i></button> <button type="button" class="btn btn-link btn-sm p-0" data-toggle="modal" data-target="#modal-{{ invite.id }}-qrcode" title="{{_('Show link as QR code')}}"><i class="fas fa-qrcode"></i></button>
{% else %} {% else %}
<code>{{ invite.short_token }}</code> <code>{{ invite.short_token }}</code>
...@@ -143,7 +143,7 @@ ...@@ -143,7 +143,7 @@
</button> </button>
</div> </div>
<div class="modal-body"> <div class="modal-body">
{{ url_for('invite.use', token=invite.token, _external=True)|qrcode_svg(width='100%', height='100%') }} {{ url_for('invite.use', invite_id=invite.id, token=invite.token, _external=True)|qrcode_svg(width='100%', height='100%') }}
</div> </div>
</div> </div>
</div> </div>
......
...@@ -30,17 +30,17 @@ ...@@ -30,17 +30,17 @@
{% endif %} {% endif %}
{% if request.user %} {% if request.user %}
{% if invite.roles %} {% if invite.roles %}
<form method="POST" action="{{ url_for("invite.grant", token=invite.token) }}" class="mb-2"> <form method="POST" action="{{ url_for("invite.grant", invite_id=invite.id, token=invite.token) }}" class="mb-2">
<button type="submit" class="btn btn-primary btn-block">{{_('Add the roles to your account now')}}</button> <button type="submit" class="btn btn-primary btn-block">{{_('Add the roles to your account now')}}</button>
</form> </form>
<a href="{{ url_for("session.logout", ref=url_for("session.login", ref=request.full_path)) }}" class="btn btn-secondary btn-block">{{_('Logout and switch to a different account')}}</a> <a href="{{ url_for("session.logout", ref=url_for("session.login", ref=request.full_path)) }}" class="btn btn-secondary btn-block">{{_('Logout and switch to a different account')}}</a>
{% endif %} {% endif %}
{% if invite.allow_signup %} {% if invite.allow_signup %}
<a href="{{ url_for("session.logout", ref=url_for("invite.signup_start", token=invite.token)) }}" class="btn btn-secondary btn-block">{{_('Logout to register a new account')}}</a> <a href="{{ url_for("session.logout", ref=url_for("invite.signup_start", invite_id=invite.id, token=invite.token)) }}" class="btn btn-secondary btn-block">{{_('Logout to register a new account')}}</a>
{% endif %} {% endif %}
{% else %} {% else %}
{% if invite.allow_signup %} {% if invite.allow_signup %}
<a href="{{ url_for("invite.signup_start", token=invite.token) }}" class="btn btn-primary btn-block">{{_('Register a new account')}}</a> <a href="{{ url_for("invite.signup_start", invite_id=invite.id, token=invite.token) }}" class="btn btn-primary btn-block">{{_('Register a new account')}}</a>
{% endif %} {% endif %}
{% if invite.roles %} {% if invite.roles %}
<a href="{{ url_for("session.login", ref=request.full_path) }}" class="btn btn-primary btn-block">{{_('Login and add the roles to your account')}}</a> <a href="{{ url_for("session.login", ref=request.full_path) }}" class="btn btn-primary btn-block">{{_('Login and add the roles to your account')}}</a>
......
import datetime import datetime
import functools import functools
import secrets
from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, jsonify from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, jsonify, abort
from flask_babel import gettext as _, lazy_gettext from flask_babel import gettext as _, lazy_gettext
import sqlalchemy import sqlalchemy
...@@ -112,19 +113,34 @@ def reset(invite_id): ...@@ -112,19 +113,34 @@ def reset(invite_id):
db.session.commit() db.session.commit()
return redirect(url_for('.index')) return redirect(url_for('.index'))
# Deprecated
@bp.route('/<token>') @bp.route('/<token>')
def use(token): def use_legacy(token):
invite = Invite.query.filter_by(token=token).first_or_404() matching_invite = None
for invite in Invite.query.filter(Invite.valid_until > datetime.datetime.now().replace(second=0, microsecond=0)):
if secrets.compare_digest(invite.token, token):
matching_invite = invite
if not matching_invite:
abort(404)
return redirect(url_for('invite.use', invite_id=matching_invite.id, token=token))
@bp.route('/<int:invite_id>/<token>')
def use(invite_id, token):
invite = Invite.query.get(invite_id)
if not invite or not secrets.compare_digest(invite.token, token):
abort(404)
if not invite.active: if not invite.active:
flash(_('Invalid invite link')) flash(_('Invalid invite link'))
return redirect('/') return redirect('/')
return render_template('invite/use.html', invite=invite) return render_template('invite/use.html', invite=invite)
@bp.route('/<token>/grant', methods=['POST']) @bp.route('/<int:invite_id>/<token>/grant', methods=['POST'])
@login_required() @login_required()
@csrf_protect(blueprint=bp) @csrf_protect(blueprint=bp)
def grant(token): def grant(invite_id, token):
invite = Invite.query.filter_by(token=token).first_or_404() invite = Invite.query.get(invite_id)
if not invite or not secrets.compare_digest(invite.token, token):
abort(404)
invite_grant = InviteGrant(invite=invite, user=request.user) invite_grant = InviteGrant(invite=invite, user=request.user)
db.session.add(invite_grant) db.session.add(invite_grant)
success, msg = invite_grant.apply() success, msg = invite_grant.apply()
...@@ -138,12 +154,17 @@ def grant(token): ...@@ -138,12 +154,17 @@ def grant(token):
@bp.url_defaults @bp.url_defaults
def inject_invite_token(endpoint, values): def inject_invite_token(endpoint, values):
if endpoint in ['invite.signup_submit', 'invite.signup_check'] and 'token' in request.view_args: if endpoint in ['invite.signup_submit', 'invite.signup_check']:
if 'invite_id' in request.view_args:
values['invite_id'] = request.view_args['invite_id']
if 'token' in request.view_args:
values['token'] = request.view_args['token'] values['token'] = request.view_args['token']
@bp.route('/<token>/signup') @bp.route('/<int:invite_id>/<token>/signup')
def signup_start(token): def signup_start(invite_id, token):
invite = Invite.query.filter_by(token=token).first_or_404() invite = Invite.query.get(invite_id)
if not invite or not secrets.compare_digest(invite.token, token):
abort(404)
if not invite.active: if not invite.active:
flash(_('Invalid invite link')) flash(_('Invalid invite link'))
return redirect('/') return redirect('/')
...@@ -152,12 +173,14 @@ def signup_start(token): ...@@ -152,12 +173,14 @@ def signup_start(token):
return redirect('/') return redirect('/')
return render_template('signup/start.html') return render_template('signup/start.html')
@bp.route('/<token>/signupcheck', methods=['POST']) @bp.route('/<int:invite_id>/<token>/signupcheck', methods=['POST'])
def signup_check(token): def signup_check(invite_id, token):
if host_ratelimit.get_delay(): if host_ratelimit.get_delay():
return jsonify({'status': 'ratelimited'}) return jsonify({'status': 'ratelimited'})
host_ratelimit.log() host_ratelimit.log()
invite = Invite.query.filter_by(token=token).first_or_404() invite = Invite.query.get(invite_id)
if not invite or not secrets.compare_digest(invite.token, token):
abort(404)
if not invite.active or not invite.allow_signup: if not invite.active or not invite.allow_signup:
return jsonify({'status': 'error'}), 403 return jsonify({'status': 'error'}), 403
if not User().set_loginname(request.form['loginname']): if not User().set_loginname(request.form['loginname']):
...@@ -166,9 +189,11 @@ def signup_check(token): ...@@ -166,9 +189,11 @@ def signup_check(token):
return jsonify({'status': 'exists'}) return jsonify({'status': 'exists'})
return jsonify({'status': 'ok'}) return jsonify({'status': 'ok'})
@bp.route('/<token>/signup', methods=['POST']) @bp.route('/<int:invite_id>/<token>/signup', methods=['POST'])
def signup_submit(token): def signup_submit(invite_id, token):
invite = Invite.query.filter_by(token=token).first_or_404() invite = Invite.query.get(invite_id)
if not invite or not secrets.compare_digest(invite.token, token):
abort(404)
if request.form['password1'] != request.form['password2']: if request.form['password1'] != request.form['password2']:
return render_template('signup/start.html', error=_('Passwords do not match')) return render_template('signup/start.html', error=_('Passwords do not match'))
signup_delay = signup_ratelimit.get_delay(request.form['mail']) signup_delay = signup_ratelimit.get_delay(request.form['mail'])
......
...@@ -61,7 +61,7 @@ class RecoveryCodeMethod(MFAMethod): ...@@ -61,7 +61,7 @@ class RecoveryCodeMethod(MFAMethod):
def verify(self, code): def verify(self, code):
code = code.replace(' ', '').lower() code = code.replace(' ', '').lower()
return crypt.crypt(code, self.code_hash) == self.code_hash return secrets.compare_digest(crypt.crypt(code, self.code_hash), self.code_hash)
def _hotp(counter, key, digits=6): def _hotp(counter, key, digits=6):
'''Generates HMAC-based one-time password according to RFC4226 '''Generates HMAC-based one-time password according to RFC4226
...@@ -122,7 +122,8 @@ class TOTPMethod(MFAMethod): ...@@ -122,7 +122,8 @@ class TOTPMethod(MFAMethod):
:returns: True if code is valid, False otherwise''' :returns: True if code is valid, False otherwise'''
counter = int(time.time()/30) counter = int(time.time()/30)
if _hotp(counter-1, self.raw_key) == code or _hotp(counter, self.raw_key) == code: for valid_code in [_hotp(counter-1, self.raw_key), _hotp(counter, self.raw_key)]:
if secrets.compare_digest(code, valid_code):
return True return True
return False return False
......
"""Add id to signup table
Revision ID: bf71799b7b9e
Revises: e9a67175e179
Create Date: 2021-09-06 23:30:07.486102
"""
from alembic import op
import sqlalchemy as sa
revision = 'bf71799b7b9e'
down_revision = 'e9a67175e179'
branch_labels = None
depends_on = None
def upgrade():
meta = sa.MetaData(bind=op.get_bind())
signup = sa.Table('signup', meta,
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('loginname', sa.Text(), nullable=True),
sa.Column('displayname', sa.Text(), nullable=True),
sa.Column('mail', sa.Text(), nullable=True),
sa.Column('pwhash', sa.Text(), nullable=True),
sa.Column('user_dn', sa.String(length=128), nullable=True),
sa.Column('type', sa.String(length=50), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_signup'))
)
with op.batch_alter_table(signup.name, copy_from=signup, recreate='always') as batch_op:
batch_op.add_column(sa.Column('id', sa.Integer(), autoincrement=True, nullable=False))
batch_op.drop_constraint('pk_signup', 'primary')
batch_op.create_primary_key('pk_signup', ['id'])
meta = sa.MetaData(bind=op.get_bind())
invite_signup = sa.Table('invite_signup', meta,
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('invite_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['invite_id'], ['invite.id'], name=op.f('fk_invite_signup_invite_id_invite')),
sa.ForeignKeyConstraint(['token'], ['signup.token'], name=op.f('fk_invite_signup_token_signup')),
sa.PrimaryKeyConstraint('token', name=op.f('pk_invite_signup'))
)
with op.batch_alter_table(invite_signup.name, copy_from=invite_signup, recreate='always') as batch_op:
batch_op.add_column(sa.Column('id', sa.Integer(), nullable=True))
meta = sa.MetaData(bind=op.get_bind())
signup = sa.Table('signup', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('loginname', sa.Text(), nullable=True),
sa.Column('displayname', sa.Text(), nullable=True),
sa.Column('mail', sa.Text(), nullable=True),
sa.Column('pwhash', sa.Text(), nullable=True),
sa.Column('user_dn', sa.String(length=128), nullable=True),
sa.Column('type', sa.String(length=50), nullable=True),
sa.PrimaryKeyConstraint('id', name=op.f('pk_signup'))
)
invite_signup = sa.Table('invite_signup', meta,
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('invite_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['invite_id'], ['invite.id'], name=op.f('fk_invite_signup_invite_id_invite')),
sa.ForeignKeyConstraint(['token'], ['signup.token'], name=op.f('fk_invite_signup_token_signup')),
sa.PrimaryKeyConstraint('token', name=op.f('pk_invite_signup'))
)
op.execute(invite_signup.update().values(id=sa.select([signup.c.id]).where(signup.c.token==invite_signup.c.token).limit(1).as_scalar()))
with op.batch_alter_table(invite_signup.name, copy_from=invite_signup, recreate='always') as batch_op:
batch_op.drop_constraint('fk_invite_signup_token_signup', type_='foreignkey')
batch_op.create_foreign_key(batch_op.f('fk_invite_signup_signup_id_signup'), 'signup', ['id'], ['id'])
batch_op.drop_constraint('pk_invite_signup', 'primary')
batch_op.drop_column('token')
batch_op.create_primary_key('pk_invite_signup', ['id'])
def downgrade():
meta = sa.MetaData(bind=op.get_bind())
invite_signup = sa.Table('invite_signup', meta,
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('invite_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['invite_id'], ['invite.id'], name=op.f('fk_invite_signup_invite_id_invite')),
sa.ForeignKeyConstraint(['id'], ['signup.id'], name=op.f('fk_invite_signup_id_signup')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_invite_signup'))
)
with op.batch_alter_table(invite_signup.name, copy_from=invite_signup, recreate='always') as batch_op:
batch_op.add_column(sa.Column('token', sa.VARCHAR(length=128), nullable=True))
meta = sa.MetaData(bind=op.get_bind())
signup = sa.Table('signup', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('loginname', sa.Text(), nullable=True),
sa.Column('displayname', sa.Text(), nullable=True),
sa.Column('mail', sa.Text(), nullable=True),
sa.Column('pwhash', sa.Text(), nullable=True),
sa.Column('user_dn', sa.String(length=128), nullable=True),
sa.Column('type', sa.String(length=50), nullable=True),
sa.PrimaryKeyConstraint('id', name=op.f('pk_signup'))
)
invite_signup = sa.Table('invite_signup', meta,
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('invite_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['invite_id'], ['invite.id'], name=op.f('fk_invite_signup_invite_id_invite')),
sa.ForeignKeyConstraint(['id'], ['signup.id'], name=op.f('fk_invite_signup_id_signup')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_invite_signup'))
)
op.execute(invite_signup.update().values(token=sa.select([signup.c.token]).where(signup.c.id==invite_signup.c.id).limit(1).as_scalar()))
with op.batch_alter_table(invite_signup.name, copy_from=invite_signup, recreate='always') as batch_op:
batch_op.drop_constraint('fk_invite_signup_id_signup', type_='foreignkey')
batch_op.create_foreign_key(batch_op.f('fk_invite_signup_signup_token_signup'), 'signup', ['token'], ['token'])
batch_op.drop_constraint('pk_invite_signup', 'primary')
batch_op.drop_column('id')
batch_op.create_primary_key('pk_invite_signup', ['token'])
meta = sa.MetaData(bind=op.get_bind())
signup = sa.Table('signup', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('loginname', sa.Text(), nullable=True),
sa.Column('displayname', sa.Text(), nullable=True),
sa.Column('mail', sa.Text(), nullable=True),
sa.Column('pwhash', sa.Text(), nullable=True),
sa.Column('user_dn', sa.String(length=128), nullable=True),
sa.Column('type', sa.String(length=50), nullable=True),
sa.PrimaryKeyConstraint('id', name=op.f('pk_signup'))
)
with op.batch_alter_table(signup.name, copy_from=signup, recreate='always') as batch_op:
batch_op.drop_constraint('pk_signup', 'primary')
batch_op.create_primary_key('pk_signup', ['token'])
batch_op.drop_column('id')
"""Add id to selfservice tokens
Revision ID: e9a67175e179
Revises: a8c6b6e91c28
Create Date: 2021-09-06 22:04:46.741233
"""
from alembic import op
import sqlalchemy as sa
revision = 'e9a67175e179'
down_revision = 'a8c6b6e91c28'
branch_labels = None
depends_on = None
def upgrade():
meta = sa.MetaData(bind=op.get_bind())
table = sa.Table('mailToken', meta,
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=True),
sa.Column('loginname', sa.String(length=32), nullable=True),
sa.Column('newmail', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_mailToken'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
batch_op.add_column(sa.Column('id', sa.Integer(), autoincrement=True, nullable=False))
batch_op.drop_constraint('pk_mailToken', 'primary')
batch_op.create_primary_key('pk_mailToken', ['id'])
table = sa.Table('passwordToken', meta,
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=True),
sa.Column('loginname', sa.String(length=32), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_passwordToken'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
batch_op.add_column(sa.Column('id', sa.Integer(), autoincrement=True, nullable=False))
batch_op.drop_constraint('pk_passwordToken', 'primary')
batch_op.create_primary_key('pk_passwordToken', ['id'])
def downgrade():
meta = sa.MetaData(bind=op.get_bind())
table = sa.Table('mailToken', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=True),
sa.Column('loginname', sa.String(length=32), nullable=True),
sa.Column('newmail', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_mailToken'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
batch_op.drop_constraint('pk_mailToken', 'primary')
batch_op.create_primary_key('pk_mailToken', ['token'])
batch_op.drop_column('id')
table = sa.Table('passwordToken', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=True),
sa.Column('loginname', sa.String(length=32), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_passwordToken'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
batch_op.add_column(sa.Column('id', sa.Integer(), autoincrement=True, nullable=False))
batch_op.drop_constraint('pk_passwordToken', 'primary')
batch_op.create_primary_key('pk_passwordToken', ['token'])
batch_op.drop_column('id')
import datetime import datetime
import functools import functools
import urllib.parse import urllib.parse
import secrets
from flask import Blueprint, request, jsonify, render_template, session, redirect, url_for, flash from flask import Blueprint, request, jsonify, render_template, session, redirect, url_for, flash
from flask_oauthlib.provider import OAuth2Provider from flask_oauthlib.provider import OAuth2Provider
...@@ -21,7 +22,15 @@ def load_client(client_id): ...@@ -21,7 +22,15 @@ def load_client(client_id):
@oauth.grantgetter @oauth.grantgetter
def load_grant(client_id, code): def load_grant(client_id, code):
return OAuth2Grant.query.filter_by(client_id=client_id, code=code).first() if '-' not in code:
return None
grant_id, grant_code = code.split('-', 2)
grant = OAuth2Grant.query.get(grant_id)
if not grant or grant.client_id != client_id:
return None
if not secrets.compare_digest(grant.code, grant_code):
return None
return grant
@oauth.grantsetter @oauth.grantsetter
def save_grant(client_id, code, oauthreq, *args, **kwargs): # pylint: disable=unused-argument def save_grant(client_id, code, oauthreq, *args, **kwargs): # pylint: disable=unused-argument
...@@ -30,14 +39,28 @@ def save_grant(client_id, code, oauthreq, *args, **kwargs): # pylint: disable=un ...@@ -30,14 +39,28 @@ def save_grant(client_id, code, oauthreq, *args, **kwargs): # pylint: disable=un
code=code['code'], redirect_uri=oauthreq.redirect_uri, expires=expires, _scopes=' '.join(oauthreq.scopes)) code=code['code'], redirect_uri=oauthreq.redirect_uri, expires=expires, _scopes=' '.join(oauthreq.scopes))
db.session.add(grant) db.session.add(grant)
db.session.commit() db.session.commit()
code['code'] = f"{grant.id}-{code['code']}"
return grant return grant
@oauth.tokengetter @oauth.tokengetter
def load_token(access_token=None, refresh_token=None): def load_token(access_token=None, refresh_token=None):
# pylint: disable=too-many-return-statements
if access_token: if access_token:
return OAuth2Token.query.filter_by(access_token=access_token).first() if '-' not in access_token:
return None
tok_id, tok_secret = access_token.split('-', 2)
tok = OAuth2Token.query.get(tok_id)
if not tok or not secrets.compare_digest(tok.access_token, tok_secret):
return None
return tok
if refresh_token: if refresh_token:
return OAuth2Token.query.filter_by(refresh_token=refresh_token).first() if '-' not in refresh_token:
return None
tok_id, tok_secret = refresh_token.split('-', 2)
tok = OAuth2Token.query.get(tok_id)
if not tok or not secrets.compare_digest(tok.refresh_token, tok_secret):
return None
return tok
return None return None
@oauth.tokensetter @oauth.tokensetter
...@@ -56,6 +79,8 @@ def save_token(token_data, oauthreq, *args, **kwargs): # pylint: disable=unused- ...@@ -56,6 +79,8 @@ def save_token(token_data, oauthreq, *args, **kwargs): # pylint: disable=unused-
) )
db.session.add(tok) db.session.add(tok)
db.session.commit() db.session.commit()
token_data['access_token'] = f"{tok.id}-{token_data['access_token']}"
token_data['refresh_token'] = f"{tok.id}-{token_data['refresh_token']}"
return tok return tok
bp = Blueprint('oauth2', __name__, url_prefix='/oauth2/', template_folder='templates') bp = Blueprint('oauth2', __name__, url_prefix='/oauth2/', template_folder='templates')
......
import datetime import datetime
from sqlalchemy import Column, String, DateTime from sqlalchemy import Column, String, DateTime, Integer
from uffd.database import db from uffd.database import db
from uffd.utils import token_urlfriendly from uffd.utils import token_urlfriendly
class Token(): class PasswordToken(db.Model):
token = Column(String(128), primary_key=True, default=token_urlfriendly)
created = Column(DateTime, default=datetime.datetime.now)
class PasswordToken(Token, db.Model):
__tablename__ = 'passwordToken' __tablename__ = 'passwordToken'
id = Column(Integer(), primary_key=True, autoincrement=True)
token = Column(String(128), default=token_urlfriendly, nullable=False)
created = Column(DateTime, default=datetime.datetime.now)
loginname = Column(String(32)) loginname = Column(String(32))
class MailToken(Token, db.Model): class MailToken(db.Model):
__tablename__ = 'mailToken' __tablename__ = 'mailToken'
id = Column(Integer(), primary_key=True, autoincrement=True)
token = Column(String(128), default=token_urlfriendly, nullable=False)
created = Column(DateTime, default=datetime.datetime.now)
loginname = Column(String(32)) loginname = Column(String(32))
newmail = Column(String(255)) newmail = Column(String(255))
Hi {{ user.displayname }}, Hi {{ user.displayname }},
you have requested to change your mail address. To confirm the change, please visit the following url: you have requested to change your mail address. To confirm the change, please visit the following url:
{{ url_for('selfservice.token_mail', token=token, _external=True) }} {{ url_for('selfservice.token_mail', token_id=token.id, token=token.token, _external=True) }}
**The link is valid for 48h** **The link is valid for 48h**
...@@ -4,7 +4,7 @@ welcome to the {{ config.ORGANISATION_NAME }} infrastructure! An account was cre ...@@ -4,7 +4,7 @@ welcome to the {{ config.ORGANISATION_NAME }} infrastructure! An account was cre
Please visit the following url to set your password: Please visit the following url to set your password:
{{ url_for('selfservice.token_password', token=token, _external=True) }} {{ url_for('selfservice.token_password', token_id=token.id, token=token.token, _external=True) }}
**The link is valid for 48h** **The link is valid for 48h**
......
Hi {{ user.displayname }}, Hi {{ user.displayname }},
you have requested a password reset. To reset your password, visit the following url: you have requested a password reset. To reset your password, visit the following url:
{{ url_for('selfservice.token_password', token=token, _external=True) }} {{ url_for('selfservice.token_password', token_id=token.id, token=token.token, _external=True) }}
**The link is valid for 48h** **The link is valid for 48h**
......
{% extends 'base.html' %} {% extends 'base.html' %}
{% block body %} {% block body %}
<form action="{{ url_for("selfservice.token_password", token=token) }}" method="POST" onInput="password2.setCustomValidity(password1.value != password2.value ? 'Passwords do not match.' : '') "> <form action="{{ url_for("selfservice.token_password", token_id=token.id, token=token.token) }}" method="POST" onInput="password2.setCustomValidity(password1.value != password2.value ? 'Passwords do not match.' : '') ">
<div class="row mt-2 justify-content-center"> <div class="row mt-2 justify-content-center">
<div class="col-lg-6 col-md-10" style="background: #f7f7f7; box-shadow: 0px 2px 2px rgba(0, 0, 0, 0.3); padding: 30px;"> <div class="col-lg-6 col-md-10" style="background: #f7f7f7; box-shadow: 0px 2px 2px rgba(0, 0, 0, 0.3); padding: 30px;">
<div class="text-center"> <div class="text-center">
......
import datetime import datetime
import secrets
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session
from flask_babel import gettext as _, lazy_gettext from flask_babel import gettext as _, lazy_gettext
...@@ -83,44 +84,70 @@ def forgot_password(): ...@@ -83,44 +84,70 @@ def forgot_password():
send_passwordreset(user) send_passwordreset(user)
return redirect(url_for('session.login')) return redirect(url_for('session.login'))
@bp.route("/token/password/<token>", methods=(['POST', 'GET'])) # Deprecated
def token_password(token): @bp.route('/token/password/<token>')
dbtoken = PasswordToken.query.get(token) def token_password_legacy(token):
if not dbtoken or dbtoken.created < (datetime.datetime.now() - datetime.timedelta(days=2)): matching_token = None
filter_expr = PasswordToken.created >= (datetime.datetime.now() - datetime.timedelta(days=2))
for dbtoken in PasswordToken.query.filter(filter_expr):
if secrets.compare_digest(dbtoken.token, token):
matching_token = dbtoken
if not matching_token:
flash(_('Token expired, please try again.'))
return redirect(url_for('session.login'))
return redirect(url_for('token_password', token_id=matching_token.id, token=token))
@bp.route("/token/password/<int:token_id>/<token>", methods=(['POST', 'GET']))
def token_password(token_id, token):
dbtoken = PasswordToken.query.get(token_id)
if not dbtoken or not secrets.compare_digest(dbtoken.token, token) or \
dbtoken.created < (datetime.datetime.now() - datetime.timedelta(days=2)):
flash(_('Token expired, please try again.')) flash(_('Token expired, please try again.'))
if dbtoken: if dbtoken:
db.session.delete(dbtoken) db.session.delete(dbtoken)
db.session.commit() db.session.commit()
return redirect(url_for('session.login')) return redirect(url_for('session.login'))
if request.method == 'GET': if request.method == 'GET':
return render_template('selfservice/set_password.html', token=token) return render_template('selfservice/set_password.html', token=dbtoken)
if not request.values['password1']: if not request.values['password1']:
flash(_('You need to set a password, please try again.')) flash(_('You need to set a password, please try again.'))
return render_template('selfservice/set_password.html', token=token) return render_template('selfservice/set_password.html', token=dbtoken)
if not request.values['password1'] == request.values['password2']: if not request.values['password1'] == request.values['password2']:
flash(_('Passwords do not match, please try again.')) flash(_('Passwords do not match, please try again.'))
return render_template('selfservice/set_password.html', token=token) return render_template('selfservice/set_password.html', token=dbtoken)
user = User.query.filter_by(loginname=dbtoken.loginname).one() user = User.query.filter_by(loginname=dbtoken.loginname).one()
if not user.set_password(request.values['password1']): if not user.set_password(request.values['password1']):
flash(_('Password ist not valid, please try again.')) flash(_('Password ist not valid, please try again.'))
return render_template('selfservice/set_password.html', token=token) return render_template('selfservice/set_password.html', token=dbtoken)
db.session.delete(dbtoken) db.session.delete(dbtoken)
flash(_('New password set')) flash(_('New password set'))
ldap.session.commit() ldap.session.commit()
db.session.commit() db.session.commit()
return redirect(url_for('session.login')) return redirect(url_for('session.login'))
# Deprecated
@bp.route("/token/mail_verification/<token>") @bp.route("/token/mail_verification/<token>")
@login_required() def token_mail_legacy(token):
def token_mail(token): matching_token = None
dbtoken = MailToken.query.get(token) filter_expr = MailToken.created >= (datetime.datetime.now() - datetime.timedelta(days=2))
if not dbtoken or dbtoken.created < (datetime.datetime.now() - datetime.timedelta(days=2)): for dbtoken in MailToken.query.filter(filter_expr):
if secrets.compare_digest(dbtoken.token, token):
matching_token = dbtoken
if not matching_token:
flash(_('Token expired, please try again.'))
return redirect(url_for('session.login'))
return redirect(url_for('mail_password', token_id=matching_token.id, token=token))
@bp.route("/token/mail_verification/<int:token_id>/<token>")
def token_mail(token_id, token):
dbtoken = MailToken.query.get(token_id)
if not dbtoken or not secrets.compare_digest(dbtoken.token, token) or \
dbtoken.created < (datetime.datetime.now() - datetime.timedelta(days=2)):
flash(_('Token expired, please try again.')) flash(_('Token expired, please try again.'))
if dbtoken: if dbtoken:
db.session.delete(dbtoken) db.session.delete(dbtoken)
db.session.commit() db.session.commit()
return redirect(url_for('selfservice.index')) return redirect(url_for('selfservice.index'))
user = User.query.filter_by(loginname=dbtoken.loginname).one() user = User.query.filter_by(loginname=dbtoken.loginname).one()
user.set_mail(dbtoken.newmail) user.set_mail(dbtoken.newmail)
flash(_('New mail set')) flash(_('New mail set'))
...@@ -157,7 +184,7 @@ def send_mail_verification(loginname, newmail): ...@@ -157,7 +184,7 @@ def send_mail_verification(loginname, newmail):
user = User.query.filter_by(loginname=loginname).one() user = User.query.filter_by(loginname=loginname).one()
if not sendmail(newmail, 'Mail verification', 'selfservice/mailverification.mail.txt', user=user, token=token.token): if not sendmail(newmail, 'Mail verification', 'selfservice/mailverification.mail.txt', user=user, token=token):
flash(_('Mail to "%(mail_address)s" could not be sent!', mail_address=newmail)) flash(_('Mail to "%(mail_address)s" could not be sent!', mail_address=newmail))
def send_passwordreset(user, new=False): def send_passwordreset(user, new=False):
...@@ -177,5 +204,5 @@ def send_passwordreset(user, new=False): ...@@ -177,5 +204,5 @@ def send_passwordreset(user, new=False):
template = 'selfservice/passwordreset.mail.txt' template = 'selfservice/passwordreset.mail.txt'
subject = 'Password reset' subject = 'Password reset'
if not sendmail(user.mail, subject, template, user=user, token=token.token): if not sendmail(user.mail, subject, template, user=user, token=token):
flash(_('Mail to "%(mail_address)s" could not be sent!', mail_address=user.mail)) flash(_('Mail to "%(mail_address)s" could not be sent!', mail_address=user.mail))
import datetime import datetime
from crypt import crypt from crypt import crypt
from sqlalchemy import Column, String, Text, DateTime from sqlalchemy import Column, String, Text, DateTime, Integer
from uffd.ldapalchemy.dbutils import DBRelationship from uffd.ldapalchemy.dbutils import DBRelationship
from uffd.database import db from uffd.database import db
...@@ -26,7 +26,8 @@ class Signup(db.Model): ...@@ -26,7 +26,8 @@ class Signup(db.Model):
As long as they are not completed, signup requests have no effect each other As long as they are not completed, signup requests have no effect each other
or different parts of the application.''' or different parts of the application.'''
__tablename__ = 'signup' __tablename__ = 'signup'
token = Column(String(128), primary_key=True, default=token_urlfriendly) id = Column(Integer(), primary_key=True, autoincrement=True)
token = Column(String(128), default=token_urlfriendly, nullable=False)
created = Column(DateTime, default=datetime.datetime.now, nullable=False) created = Column(DateTime, default=datetime.datetime.now, nullable=False)
loginname = Column(Text) loginname = Column(Text)
displayname = Column(Text) displayname = Column(Text)
......
{% extends 'base.html' %} {% extends 'base.html' %}
{% block body %} {% block body %}
<form action="{{ url_for(".signup_confirm_submit", token=signup.token) }}" method="POST"> <form action="{{ url_for(".signup_confirm_submit", signup_id=signup.id, token=signup.token) }}" method="POST">
<div class="row mt-2 justify-content-center"> <div class="row mt-2 justify-content-center">
<div class="col-lg-6 col-md-10" style="background: #f7f7f7; box-shadow: 0px 2px 2px rgba(0, 0, 0, 0.3); padding: 30px;"> <div class="col-lg-6 col-md-10" style="background: #f7f7f7; box-shadow: 0px 2px 2px rgba(0, 0, 0, 0.3); padding: 30px;">
<div class="text-center"> <div class="text-center">
......