Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
Loading items

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • strifel/uffd
  • thies/uffd-2
6 results
Select Git revision
Loading items
Show changes

Commits on Source 6

Showing
with 418 additions and 123 deletions
......@@ -524,7 +524,7 @@ class TestInviteUseViews(UffdTestCase):
db.session.add(invite)
db.session.commit()
token = invite.token
r = self.client.get(path=url_for('invite.use', token=token), follow_redirects=True)
r = self.client.get(path=url_for('invite.use', invite_id=invite.id, token=token), follow_redirects=True)
dump('invite_use', r)
self.assertEqual(r.status_code, 200)
......@@ -534,7 +534,7 @@ class TestInviteUseViews(UffdTestCase):
db.session.add(invite)
db.session.commit()
token = invite.token
r = self.client.get(path=url_for('invite.use', token=token), follow_redirects=True)
r = self.client.get(path=url_for('invite.use', invite_id=invite.id, token=token), follow_redirects=True)
dump('invite_use_loggedin', r)
self.assertEqual(r.status_code, 200)
......@@ -543,7 +543,7 @@ class TestInviteUseViews(UffdTestCase):
db.session.add(invite)
db.session.commit()
token = invite.token
r = self.client.get(path=url_for('invite.use', token=token), follow_redirects=True)
r = self.client.get(path=url_for('invite.use', invite_id=invite.id, token=token), follow_redirects=True)
dump('invite_use_inactive', r)
self.assertEqual(r.status_code, 200)
......@@ -565,6 +565,7 @@ class TestInviteUseViews(UffdTestCase):
db.session.add(invite)
db.session.commit()
ldap.session.commit()
invite_id = invite.id
token = invite.token
self.assertIn(role0, user.roles)
self.assertNotIn(role1, user.roles)
......@@ -573,7 +574,7 @@ class TestInviteUseViews(UffdTestCase):
self.assertNotIn(group1, user.groups)
self.assertFalse(invite.used)
self.login_as('user')
r = self.client.post(path=url_for('invite.grant', token=token), follow_redirects=True)
r = self.client.post(path=url_for('invite.grant', invite_id=invite_id, token=token), follow_redirects=True)
dump('invite_grant', r)
self.assertEqual(r.status_code, 200)
db_flush()
......@@ -590,9 +591,10 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), disabled=True)
db.session.add(invite)
db.session.commit()
invite_id = invite.id
token = invite.token
self.login_as('user')
r = self.client.post(path=url_for('invite.grant', token=token), follow_redirects=True)
r = self.client.post(path=url_for('invite.grant', invite_id=invite_id, token=token), follow_redirects=True)
dump('invite_grant_invalid_invite', r)
self.assertEqual(r.status_code, 200)
self.assertFalse(Invite.query.filter_by(token=token).first().used)
......@@ -601,9 +603,10 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60))
db.session.add(invite)
db.session.commit()
invite_id = invite.id
token = invite.token
self.login_as('user')
r = self.client.post(path=url_for('invite.grant', token=token), follow_redirects=True)
r = self.client.post(path=url_for('invite.grant', invite_id=invite_id, token=token), follow_redirects=True)
dump('invite_grant_no_roles', r)
self.assertEqual(r.status_code, 200)
self.assertFalse(Invite.query.filter_by(token=token).first().used)
......@@ -616,9 +619,10 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), roles=[role])
db.session.add(invite)
db.session.commit()
invite_id = invite.id
token = invite.token
self.login_as('user')
r = self.client.post(path=url_for('invite.grant', token=token), follow_redirects=True)
r = self.client.post(path=url_for('invite.grant', invite_id=invite_id, token=token), follow_redirects=True)
dump('invite_grant_no_new_roles', r)
self.assertEqual(r.status_code, 200)
self.assertFalse(Invite.query.filter_by(token=token).first().used)
......@@ -636,8 +640,9 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), roles=[role1, role2], allow_signup=True)
db.session.add(invite)
db.session.commit()
invite_id = invite.id
token = invite.token
r = self.client.get(path=url_for('invite.signup_start', token=token), follow_redirects=True)
r = self.client.get(path=url_for('invite.signup_start', invite_id=invite_id, token=token), follow_redirects=True)
dump('invite_signup_start', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('invite.signup_submit', token=token),
......@@ -657,7 +662,7 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True, disabled=True)
db.session.add(invite)
db.session.commit()
r = self.client.get(path=url_for('invite.signup_start', token=invite.token), follow_redirects=True)
r = self.client.get(path=url_for('invite.signup_start', invite_id=invite.id, token=invite.token), follow_redirects=True)
dump('invite_signup_invalid_invite', r)
self.assertEqual(r.status_code, 200)
......@@ -665,7 +670,7 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=False)
db.session.add(invite)
db.session.commit()
r = self.client.get(path=url_for('invite.signup_start', token=invite.token), follow_redirects=True)
r = self.client.get(path=url_for('invite.signup_start', invite_id=invite.id, token=invite.token), follow_redirects=True)
dump('invite_signup_nosignup', r)
self.assertEqual(r.status_code, 200)
......@@ -673,7 +678,7 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True)
db.session.add(invite)
db.session.commit()
r = self.client.post(path=url_for('invite.signup_submit', token=invite.token),
r = self.client.post(path=url_for('invite.signup_submit', invite_id=invite.id, token=invite.token),
data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com',
'password1': 'notsecret', 'password2': 'notthesame'}, follow_redirects=True)
dump('invite_signup_wrongpassword', r)
......@@ -683,7 +688,7 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True)
db.session.add(invite)
db.session.commit()
r = self.client.post(path=url_for('invite.signup_submit', token=invite.token),
r = self.client.post(path=url_for('invite.signup_submit', invite_id=invite.id, token=invite.token),
data={'loginname': '', 'displayname': 'New User', 'mail': 'test@example.com',
'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True)
dump('invite_signup_invalid', r)
......@@ -694,7 +699,7 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True)
db.session.add(invite)
db.session.commit()
r = self.client.post(path=url_for('invite.signup_submit', token=invite.token),
r = self.client.post(path=url_for('invite.signup_submit', invite_id=invite.id, token=invite.token),
data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com',
'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True)
dump('invite_signup_mailerror', r)
......@@ -704,14 +709,15 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True)
db.session.add(invite)
db.session.commit()
invite_id = invite.id
token = invite.token
for i in range(20):
r = self.client.post(path=url_for('invite.signup_submit', token=token),
r = self.client.post(path=url_for('invite.signup_submit', invite_id=invite_id, token=token),
data={'loginname': 'newuser%d'%i, 'displayname': 'New User', 'mail': 'test%d@example.com'%i,
'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.app.last_mail = None
r = self.client.post(path=url_for('invite.signup_submit', token=token),
r = self.client.post(path=url_for('invite.signup_submit', invite_id=invite_id, token=token),
data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com',
'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True)
dump('invite_signup_hostlimit', r)
......@@ -723,14 +729,15 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True)
db.session.add(invite)
db.session.commit()
invite_id = invite.id
token = invite.token
for i in range(3):
r = self.client.post(path=url_for('invite.signup_submit', token=token),
r = self.client.post(path=url_for('invite.signup_submit', invite_id=invite_id, token=token),
data={'loginname': 'newuser%d'%i, 'displayname': 'New User', 'mail': 'test@example.com',
'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.app.last_mail = None
r = self.client.post(path=url_for('invite.signup_submit', token=token),
r = self.client.post(path=url_for('invite.signup_submit', invite_id=invite_id, token=token),
data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com',
'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True)
dump('invite_signup_maillimit', r)
......@@ -742,8 +749,9 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True)
db.session.add(invite)
db.session.commit()
invite_id = invite.id
token = invite.token
r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True,
r = self.client.post(path=url_for('invite.signup_check', invite_id=invite_id, token=token), follow_redirects=True,
data={'loginname': 'newuser'})
self.assertEqual(r.status_code, 200)
self.assertEqual(r.content_type, 'application/json')
......@@ -753,8 +761,9 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True)
db.session.add(invite)
db.session.commit()
invite_id = invite.id
token = invite.token
r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True,
r = self.client.post(path=url_for('invite.signup_check', invite_id=invite_id, token=token), follow_redirects=True,
data={'loginname': ''})
self.assertEqual(r.status_code, 200)
self.assertEqual(r.content_type, 'application/json')
......@@ -764,8 +773,9 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True)
db.session.add(invite)
db.session.commit()
invite_id = invite.id
token = invite.token
r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True,
r = self.client.post(path=url_for('invite.signup_check', invite_id=invite_id, token=token), follow_redirects=True,
data={'loginname': 'testuser'})
self.assertEqual(r.status_code, 200)
self.assertEqual(r.content_type, 'application/json')
......@@ -775,8 +785,9 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=False)
db.session.add(invite)
db.session.commit()
invite_id = invite.id
token = invite.token
r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True,
r = self.client.post(path=url_for('invite.signup_check', invite_id=invite_id, token=token), follow_redirects=True,
data={'loginname': 'testuser'})
self.assertEqual(r.status_code, 403)
self.assertEqual(r.content_type, 'application/json')
......@@ -786,8 +797,9 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True, disabled=True)
db.session.add(invite)
db.session.commit()
invite_id = invite.id
token = invite.token
r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True,
r = self.client.post(path=url_for('invite.signup_check', invite_id=invite_id, token=token), follow_redirects=True,
data={'loginname': 'testuser'})
self.assertEqual(r.status_code, 403)
self.assertEqual(r.content_type, 'application/json')
......@@ -797,13 +809,14 @@ class TestInviteUseViews(UffdTestCase):
invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True)
db.session.add(invite)
db.session.commit()
invite_id = invite.id
token = invite.token
for i in range(20):
r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True,
r = self.client.post(path=url_for('invite.signup_check', invite_id=invite_id, token=token), follow_redirects=True,
data={'loginname': 'testuser'})
self.assertEqual(r.status_code, 200)
self.assertEqual(r.content_type, 'application/json')
r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True,
r = self.client.post(path=url_for('invite.signup_check', invite_id=invite_id, token=token), follow_redirects=True,
data={'loginname': 'testuser'})
self.assertEqual(r.status_code, 200)
self.assertEqual(r.content_type, 'application/json')
......
......@@ -60,7 +60,7 @@ class TestSelfservice(UffdTestCase):
token = MailToken.query.filter(MailToken.loginname == user.loginname).first()
self.assertEqual(token.newmail, 'newemail@example.com')
self.assertIn(token.token, str(self.app.last_mail.get_content()))
r = self.client.get(path=url_for('selfservice.token_mail', token=token.token), follow_redirects=True)
r = self.client.get(path=url_for('selfservice.token_mail', token_id=token.id, token=token.token), follow_redirects=True)
self.assertEqual(r.status_code, 200)
_user = request.user
self.assertEqual(_user.mail, 'newemail@example.com')
......@@ -135,7 +135,7 @@ class TestSelfservice(UffdTestCase):
def test_token_mail_emptydb(self):
self.login_as('user')
user = request.user
r = self.client.get(path=url_for('selfservice.token_mail', token='A'*128), follow_redirects=True)
r = self.client.get(path=url_for('selfservice.token_mail', token_id=1, token='A'*128), follow_redirects=True)
dump('token_mail_emptydb', r)
self.assertEqual(r.status_code, 200)
_user = request.user
......@@ -144,9 +144,10 @@ class TestSelfservice(UffdTestCase):
def test_token_mail_invalid(self):
self.login_as('user')
user = request.user
db.session.add(MailToken(loginname=user.loginname, newmail='newusermail@example.com'))
token = MailToken(loginname=user.loginname, newmail='newusermail@example.com')
db.session.add(token)
db.session.commit()
r = self.client.get(path=url_for('selfservice.token_mail', token='A'*128), follow_redirects=True)
r = self.client.get(path=url_for('selfservice.token_mail', token_id=token.id, token='A'*128), follow_redirects=True)
dump('token_mail_invalid', r)
self.assertEqual(r.status_code, 200)
_user = request.user
......@@ -161,7 +162,7 @@ class TestSelfservice(UffdTestCase):
admin_token = MailToken(loginname='testadmin', newmail='newadminmail@example.com')
db.session.add(admin_token)
db.session.commit()
r = self.client.get(path=url_for('selfservice.token_mail', token=admin_token.token), follow_redirects=True)
r = self.client.get(path=url_for('selfservice.token_mail', token_id=admin_token.id, token=admin_token.token), follow_redirects=True)
dump('token_mail_wrong_user', r)
self.assertEqual(r.status_code, 200)
_user = request.user
......@@ -176,7 +177,7 @@ class TestSelfservice(UffdTestCase):
created=(datetime.datetime.now() - datetime.timedelta(days=10)))
db.session.add(token)
db.session.commit()
r = self.client.get(path=url_for('selfservice.token_mail', token=token.token), follow_redirects=True)
r = self.client.get(path=url_for('selfservice.token_mail', token_id=token.id, token=token.token), follow_redirects=True)
dump('token_mail_expired', r)
self.assertEqual(r.status_code, 200)
_user = request.user
......@@ -243,10 +244,10 @@ class TestSelfservice(UffdTestCase):
token = PasswordToken(loginname=user.loginname)
db.session.add(token)
db.session.commit()
r = self.client.get(path=url_for('selfservice.token_password', token=token.token), follow_redirects=True)
r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token=token.token), follow_redirects=True)
dump('token_password', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('selfservice.token_password', token=token.token),
r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token=token.token),
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_submit', r)
self.assertEqual(r.status_code, 200)
......@@ -256,11 +257,11 @@ class TestSelfservice(UffdTestCase):
if self.use_userconnection:
self.skipTest('Password Token is not possible in user mode')
user = self.get_user()
r = self.client.get(path=url_for('selfservice.token_password', token='A'*128), follow_redirects=True)
r = self.client.get(path=url_for('selfservice.token_password', token_id=1, token='A'*128), follow_redirects=True)
dump('token_password_emptydb', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'Token expired, please try again', r.data)
r = self.client.post(path=url_for('selfservice.token_password', token='A'*128),
r = self.client.post(path=url_for('selfservice.token_password', token_id=1, token='A'*128),
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_emptydb_submit', r)
self.assertEqual(r.status_code, 200)
......@@ -274,11 +275,11 @@ class TestSelfservice(UffdTestCase):
token = PasswordToken(loginname=user.loginname)
db.session.add(token)
db.session.commit()
r = self.client.get(path=url_for('selfservice.token_password', token='A'*128), follow_redirects=True)
r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token='A'*128), follow_redirects=True)
dump('token_password_invalid', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'Token expired, please try again', r.data)
r = self.client.post(path=url_for('selfservice.token_password', token='A'*128),
r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token='A'*128),
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_invalid_submit', r)
self.assertEqual(r.status_code, 200)
......@@ -293,11 +294,11 @@ class TestSelfservice(UffdTestCase):
created=(datetime.datetime.now() - datetime.timedelta(days=10)))
db.session.add(token)
db.session.commit()
r = self.client.get(path=url_for('selfservice.token_password', token=token.token), follow_redirects=True)
r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token=token.token), follow_redirects=True)
dump('token_password_invalid_expired', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'Token expired, please try again', r.data)
r = self.client.post(path=url_for('selfservice.token_password', token=token.token),
r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token=token.token),
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_invalid_expired_submit', r)
self.assertEqual(r.status_code, 200)
......@@ -311,9 +312,9 @@ class TestSelfservice(UffdTestCase):
token = PasswordToken(loginname=user.loginname)
db.session.add(token)
db.session.commit()
r = self.client.get(path=url_for('selfservice.token_password', token=token.token), follow_redirects=True)
r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token=token.token), follow_redirects=True)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('selfservice.token_password', token=token.token),
r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token=token.token),
data={'password1': 'newpassword', 'password2': 'differentpassword'}, follow_redirects=True)
dump('token_password_different_passwords_submit', r)
self.assertEqual(r.status_code, 200)
......
......@@ -18,9 +18,9 @@ from utils import dump, UffdTestCase, db_flush
def refetch_signup(signup):
db.session.add(signup)
db.session.commit()
token = signup.token
id = signup.id
db_flush()
return Signup.query.get(token)
return Signup.query.get(id)
# We assume in all tests that Signup.validate and Signup.check_password do
# not alter any state
......@@ -165,18 +165,18 @@ class TestSignupModel(UffdTestCase):
self.assert_validate_valid(signup)
db.session.add(signup)
db.session.commit()
signup1_token = signup.token
signup1_id = signup.id
signup = Signup(loginname='newuser', displayname='New User', mail='test2@example.com', password='notsecret')
self.assert_validate_valid(signup)
db.session.add(signup)
db.session.commit()
signup2_token = signup.token
signup2_id = signup.id
db_flush()
signup = Signup.query.get(signup2_token)
signup = Signup.query.get(signup2_id)
self.assert_finish_success(signup, 'notsecret')
db.session.commit()
db_flush()
signup = Signup.query.get(signup1_token)
signup = Signup.query.get(signup1_id)
self.assert_finish_failure(signup, 'notsecret')
user = User.query.get('uid=newuser,{}'.format(self.app.config['LDAP_USER_SEARCH_BASE']))
self.assertEqual(user.mail, 'test2@example.com')
......@@ -328,14 +328,14 @@ class TestSignupViews(UffdTestCase):
self.assertFalse(signup.completed)
if self.use_openldap:
self.assertIsNone(login_get_user('newuser', 'notsecret'))
r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True)
r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True)
dump('test_signup_confirm', r)
self.assertEqual(r.status_code, 200)
signup = refetch_signup(signup)
self.assertFalse(signup.completed)
if self.use_openldap:
self.assertIsNone(login_get_user('newuser', 'notsecret'))
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_submit', r)
self.assertEqual(r.status_code, 200)
signup = refetch_signup(signup)
......@@ -355,9 +355,9 @@ class TestSignupViews(UffdTestCase):
self.assertFalse(signup.completed)
self.assertIsNotNone(request.user)
self.assertEqual(request.user.loginname, self.get_user().loginname)
r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True)
r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
self.assertEqual(r.status_code, 200)
signup = refetch_signup(signup)
self.assertTrue(signup.completed)
......@@ -365,10 +365,10 @@ class TestSignupViews(UffdTestCase):
self.assertEqual(request.user.loginname, 'newuser')
def test_confirm_notfound(self):
r = self.client.get(path=url_for('signup.signup_confirm', token='notasignuptoken'), follow_redirects=True)
r = self.client.get(path=url_for('signup.signup_confirm', signup_id=1, token='notasignuptoken'), follow_redirects=True)
dump('test_signup_confirm_notfound', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token='notasignuptoken'), follow_redirects=True, data={'password': 'notsecret'})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=1, token='notasignuptoken'), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_submit_notfound', r)
self.assertEqual(r.status_code, 200)
......@@ -376,10 +376,10 @@ class TestSignupViews(UffdTestCase):
signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
signup.created = datetime.datetime.now() - datetime.timedelta(hours=49)
signup = refetch_signup(signup)
r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True)
r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True)
dump('test_signup_confirm_expired', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_submit_expired', r)
self.assertEqual(r.status_code, 200)
......@@ -388,17 +388,17 @@ class TestSignupViews(UffdTestCase):
signup.user = self.get_user()
signup = refetch_signup(signup)
self.assertTrue(signup.completed)
r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True)
r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True)
dump('test_signup_confirm_completed', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_submit_completed', r)
self.assertEqual(r.status_code, 200)
def test_confirm_wrongpassword(self):
signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
signup = refetch_signup(signup)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'wrongpassword'})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'wrongpassword'})
dump('test_signup_confirm_wrongpassword', r)
self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed)
......@@ -407,7 +407,7 @@ class TestSignupViews(UffdTestCase):
# finish returns None and error message (here: because the user already exists)
signup = Signup(loginname=self.get_user().loginname, displayname='New User', mail='test@example.com', password='notsecret')
signup = refetch_signup(signup)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_error', r)
self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed)
......@@ -416,12 +416,12 @@ class TestSignupViews(UffdTestCase):
for i in range(20):
signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
signup = refetch_signup(signup)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i})
self.assertEqual(r.status_code, 200)
signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
signup = refetch_signup(signup)
self.assertFalse(signup.completed)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_hostlimit', r)
self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed)
......@@ -431,10 +431,10 @@ class TestSignupViews(UffdTestCase):
signup = refetch_signup(signup)
self.assertFalse(signup.completed)
for i in range(5):
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i})
self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed)
r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_confirmlimit', r)
self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed)
......
import functools
import secrets
from flask import Blueprint, jsonify, current_app, request, abort
......@@ -15,7 +16,10 @@ def apikey_required(scope=None):
if 'Authorization' not in request.headers or not request.headers['Authorization'].startswith('Bearer '):
return 'Unauthorized', 401, {'WWW-Authenticate': 'Bearer'}
token = request.headers['Authorization'][7:].strip()
request.api_client = current_app.config['API_CLIENTS'].get(token)
request.api_client = None
for client_token, client in current_app.config['API_CLIENTS'].items():
if secrets.compare_digest(client_token, token):
request.api_client = client
if request.api_client is None:
return 'Unauthorized', 401, {'WWW-Authenticate': 'Bearer error="invalid_token"'}
if scope is not None and scope not in request.api_client.get('scopes', []):
......
......@@ -94,7 +94,7 @@ class InviteGrant(db.Model):
class InviteSignup(Signup):
__tablename__ = 'invite_signup'
token = Column(String(128), ForeignKey('signup.token'), primary_key=True)
id = Column(Integer(), ForeignKey('signup.id'), primary_key=True)
invite_id = Column(Integer(), ForeignKey('invite.id'), nullable=False)
invite = relationship('Invite', back_populates='signups')
......
......@@ -22,8 +22,8 @@
<tr>
<td>
{% if invite.creator == request.user and invite.active %}
<a href="{{ url_for('invite.use', token=invite.token) }}"><code>{{ invite.short_token }}</code></a>
<button type="button" class="btn btn-link btn-sm p-0 copy-clipboard" data-copy="{{ url_for('invite.use', token=invite.token, _external=True) }}" title="{{_('Copy link to clipboard')}}"><i class="fas fa-clipboard"></i></button>
<a href="{{ url_for('invite.use', invite_id=invite.id, token=invite.token) }}"><code>{{ invite.short_token }}</code></a>
<button type="button" class="btn btn-link btn-sm p-0 copy-clipboard" data-copy="{{ url_for('invite.use', invite_id=invite.id, token=invite.token, _external=True) }}" title="{{_('Copy link to clipboard')}}"><i class="fas fa-clipboard"></i></button>
<button type="button" class="btn btn-link btn-sm p-0" data-toggle="modal" data-target="#modal-{{ invite.id }}-qrcode" title="{{_('Show link as QR code')}}"><i class="fas fa-qrcode"></i></button>
{% else %}
<code>{{ invite.short_token }}</code>
......@@ -143,7 +143,7 @@
</button>
</div>
<div class="modal-body">
{{ url_for('invite.use', token=invite.token, _external=True)|qrcode_svg(width='100%', height='100%') }}
{{ url_for('invite.use', invite_id=invite.id, token=invite.token, _external=True)|qrcode_svg(width='100%', height='100%') }}
</div>
</div>
</div>
......
......@@ -30,17 +30,17 @@
{% endif %}
{% if request.user %}
{% if invite.roles %}
<form method="POST" action="{{ url_for("invite.grant", token=invite.token) }}" class="mb-2">
<form method="POST" action="{{ url_for("invite.grant", invite_id=invite.id, token=invite.token) }}" class="mb-2">
<button type="submit" class="btn btn-primary btn-block">{{_('Add the roles to your account now')}}</button>
</form>
<a href="{{ url_for("session.logout", ref=url_for("session.login", ref=request.full_path)) }}" class="btn btn-secondary btn-block">{{_('Logout and switch to a different account')}}</a>
{% endif %}
{% if invite.allow_signup %}
<a href="{{ url_for("session.logout", ref=url_for("invite.signup_start", token=invite.token)) }}" class="btn btn-secondary btn-block">{{_('Logout to register a new account')}}</a>
<a href="{{ url_for("session.logout", ref=url_for("invite.signup_start", invite_id=invite.id, token=invite.token)) }}" class="btn btn-secondary btn-block">{{_('Logout to register a new account')}}</a>
{% endif %}
{% else %}
{% if invite.allow_signup %}
<a href="{{ url_for("invite.signup_start", token=invite.token) }}" class="btn btn-primary btn-block">{{_('Register a new account')}}</a>
<a href="{{ url_for("invite.signup_start", invite_id=invite.id, token=invite.token) }}" class="btn btn-primary btn-block">{{_('Register a new account')}}</a>
{% endif %}
{% if invite.roles %}
<a href="{{ url_for("session.login", ref=request.full_path) }}" class="btn btn-primary btn-block">{{_('Login and add the roles to your account')}}</a>
......
import datetime
import functools
import secrets
from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, jsonify
from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, jsonify, abort
from flask_babel import gettext as _, lazy_gettext
import sqlalchemy
......@@ -112,19 +113,34 @@ def reset(invite_id):
db.session.commit()
return redirect(url_for('.index'))
# Deprecated
@bp.route('/<token>')
def use(token):
invite = Invite.query.filter_by(token=token).first_or_404()
def use_legacy(token):
matching_invite = None
for invite in Invite.query.filter(Invite.valid_until > datetime.datetime.now().replace(second=0, microsecond=0)):
if secrets.compare_digest(invite.token, token):
matching_invite = invite
if not matching_invite:
abort(404)
return redirect(url_for('invite.use', invite_id=matching_invite.id, token=token))
@bp.route('/<int:invite_id>/<token>')
def use(invite_id, token):
invite = Invite.query.get(invite_id)
if not invite or not secrets.compare_digest(invite.token, token):
abort(404)
if not invite.active:
flash(_('Invalid invite link'))
return redirect('/')
return render_template('invite/use.html', invite=invite)
@bp.route('/<token>/grant', methods=['POST'])
@bp.route('/<int:invite_id>/<token>/grant', methods=['POST'])
@login_required()
@csrf_protect(blueprint=bp)
def grant(token):
invite = Invite.query.filter_by(token=token).first_or_404()
def grant(invite_id, token):
invite = Invite.query.get(invite_id)
if not invite or not secrets.compare_digest(invite.token, token):
abort(404)
invite_grant = InviteGrant(invite=invite, user=request.user)
db.session.add(invite_grant)
success, msg = invite_grant.apply()
......@@ -138,12 +154,17 @@ def grant(token):
@bp.url_defaults
def inject_invite_token(endpoint, values):
if endpoint in ['invite.signup_submit', 'invite.signup_check'] and 'token' in request.view_args:
if endpoint in ['invite.signup_submit', 'invite.signup_check']:
if 'invite_id' in request.view_args:
values['invite_id'] = request.view_args['invite_id']
if 'token' in request.view_args:
values['token'] = request.view_args['token']
@bp.route('/<token>/signup')
def signup_start(token):
invite = Invite.query.filter_by(token=token).first_or_404()
@bp.route('/<int:invite_id>/<token>/signup')
def signup_start(invite_id, token):
invite = Invite.query.get(invite_id)
if not invite or not secrets.compare_digest(invite.token, token):
abort(404)
if not invite.active:
flash(_('Invalid invite link'))
return redirect('/')
......@@ -152,12 +173,14 @@ def signup_start(token):
return redirect('/')
return render_template('signup/start.html')
@bp.route('/<token>/signupcheck', methods=['POST'])
def signup_check(token):
@bp.route('/<int:invite_id>/<token>/signupcheck', methods=['POST'])
def signup_check(invite_id, token):
if host_ratelimit.get_delay():
return jsonify({'status': 'ratelimited'})
host_ratelimit.log()
invite = Invite.query.filter_by(token=token).first_or_404()
invite = Invite.query.get(invite_id)
if not invite or not secrets.compare_digest(invite.token, token):
abort(404)
if not invite.active or not invite.allow_signup:
return jsonify({'status': 'error'}), 403
if not User().set_loginname(request.form['loginname']):
......@@ -166,9 +189,11 @@ def signup_check(token):
return jsonify({'status': 'exists'})
return jsonify({'status': 'ok'})
@bp.route('/<token>/signup', methods=['POST'])
def signup_submit(token):
invite = Invite.query.filter_by(token=token).first_or_404()
@bp.route('/<int:invite_id>/<token>/signup', methods=['POST'])
def signup_submit(invite_id, token):
invite = Invite.query.get(invite_id)
if not invite or not secrets.compare_digest(invite.token, token):
abort(404)
if request.form['password1'] != request.form['password2']:
return render_template('signup/start.html', error=_('Passwords do not match'))
signup_delay = signup_ratelimit.get_delay(request.form['mail'])
......
......@@ -61,7 +61,7 @@ class RecoveryCodeMethod(MFAMethod):
def verify(self, code):
code = code.replace(' ', '').lower()
return crypt.crypt(code, self.code_hash) == self.code_hash
return secrets.compare_digest(crypt.crypt(code, self.code_hash), self.code_hash)
def _hotp(counter, key, digits=6):
'''Generates HMAC-based one-time password according to RFC4226
......@@ -122,7 +122,8 @@ class TOTPMethod(MFAMethod):
:returns: True if code is valid, False otherwise'''
counter = int(time.time()/30)
if _hotp(counter-1, self.raw_key) == code or _hotp(counter, self.raw_key) == code:
for valid_code in [_hotp(counter-1, self.raw_key), _hotp(counter, self.raw_key)]:
if secrets.compare_digest(code, valid_code):
return True
return False
......
"""Add id to signup table
Revision ID: bf71799b7b9e
Revises: e9a67175e179
Create Date: 2021-09-06 23:30:07.486102
"""
from alembic import op
import sqlalchemy as sa
revision = 'bf71799b7b9e'
down_revision = 'e9a67175e179'
branch_labels = None
depends_on = None
def upgrade():
meta = sa.MetaData(bind=op.get_bind())
signup = sa.Table('signup', meta,
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('loginname', sa.Text(), nullable=True),
sa.Column('displayname', sa.Text(), nullable=True),
sa.Column('mail', sa.Text(), nullable=True),
sa.Column('pwhash', sa.Text(), nullable=True),
sa.Column('user_dn', sa.String(length=128), nullable=True),
sa.Column('type', sa.String(length=50), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_signup'))
)
with op.batch_alter_table(signup.name, copy_from=signup, recreate='always') as batch_op:
batch_op.add_column(sa.Column('id', sa.Integer(), autoincrement=True, nullable=False))
batch_op.drop_constraint('pk_signup', 'primary')
batch_op.create_primary_key('pk_signup', ['id'])
meta = sa.MetaData(bind=op.get_bind())
invite_signup = sa.Table('invite_signup', meta,
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('invite_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['invite_id'], ['invite.id'], name=op.f('fk_invite_signup_invite_id_invite')),
sa.ForeignKeyConstraint(['token'], ['signup.token'], name=op.f('fk_invite_signup_token_signup')),
sa.PrimaryKeyConstraint('token', name=op.f('pk_invite_signup'))
)
with op.batch_alter_table(invite_signup.name, copy_from=invite_signup, recreate='always') as batch_op:
batch_op.add_column(sa.Column('id', sa.Integer(), nullable=True))
meta = sa.MetaData(bind=op.get_bind())
signup = sa.Table('signup', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('loginname', sa.Text(), nullable=True),
sa.Column('displayname', sa.Text(), nullable=True),
sa.Column('mail', sa.Text(), nullable=True),
sa.Column('pwhash', sa.Text(), nullable=True),
sa.Column('user_dn', sa.String(length=128), nullable=True),
sa.Column('type', sa.String(length=50), nullable=True),
sa.PrimaryKeyConstraint('id', name=op.f('pk_signup'))
)
invite_signup = sa.Table('invite_signup', meta,
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('invite_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['invite_id'], ['invite.id'], name=op.f('fk_invite_signup_invite_id_invite')),
sa.ForeignKeyConstraint(['token'], ['signup.token'], name=op.f('fk_invite_signup_token_signup')),
sa.PrimaryKeyConstraint('token', name=op.f('pk_invite_signup'))
)
op.execute(invite_signup.update().values(id=sa.select([signup.c.id]).where(signup.c.token==invite_signup.c.token).limit(1).as_scalar()))
with op.batch_alter_table(invite_signup.name, copy_from=invite_signup, recreate='always') as batch_op:
batch_op.drop_constraint('fk_invite_signup_token_signup', type_='foreignkey')
batch_op.create_foreign_key(batch_op.f('fk_invite_signup_signup_id_signup'), 'signup', ['id'], ['id'])
batch_op.drop_constraint('pk_invite_signup', 'primary')
batch_op.drop_column('token')
batch_op.create_primary_key('pk_invite_signup', ['id'])
def downgrade():
meta = sa.MetaData(bind=op.get_bind())
invite_signup = sa.Table('invite_signup', meta,
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('invite_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['invite_id'], ['invite.id'], name=op.f('fk_invite_signup_invite_id_invite')),
sa.ForeignKeyConstraint(['id'], ['signup.id'], name=op.f('fk_invite_signup_id_signup')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_invite_signup'))
)
with op.batch_alter_table(invite_signup.name, copy_from=invite_signup, recreate='always') as batch_op:
batch_op.add_column(sa.Column('token', sa.VARCHAR(length=128), nullable=True))
meta = sa.MetaData(bind=op.get_bind())
signup = sa.Table('signup', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('loginname', sa.Text(), nullable=True),
sa.Column('displayname', sa.Text(), nullable=True),
sa.Column('mail', sa.Text(), nullable=True),
sa.Column('pwhash', sa.Text(), nullable=True),
sa.Column('user_dn', sa.String(length=128), nullable=True),
sa.Column('type', sa.String(length=50), nullable=True),
sa.PrimaryKeyConstraint('id', name=op.f('pk_signup'))
)
invite_signup = sa.Table('invite_signup', meta,
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('invite_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['invite_id'], ['invite.id'], name=op.f('fk_invite_signup_invite_id_invite')),
sa.ForeignKeyConstraint(['id'], ['signup.id'], name=op.f('fk_invite_signup_id_signup')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_invite_signup'))
)
op.execute(invite_signup.update().values(token=sa.select([signup.c.token]).where(signup.c.id==invite_signup.c.id).limit(1).as_scalar()))
with op.batch_alter_table(invite_signup.name, copy_from=invite_signup, recreate='always') as batch_op:
batch_op.drop_constraint('fk_invite_signup_id_signup', type_='foreignkey')
batch_op.create_foreign_key(batch_op.f('fk_invite_signup_signup_token_signup'), 'signup', ['token'], ['token'])
batch_op.drop_constraint('pk_invite_signup', 'primary')
batch_op.drop_column('id')
batch_op.create_primary_key('pk_invite_signup', ['token'])
meta = sa.MetaData(bind=op.get_bind())
signup = sa.Table('signup', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('loginname', sa.Text(), nullable=True),
sa.Column('displayname', sa.Text(), nullable=True),
sa.Column('mail', sa.Text(), nullable=True),
sa.Column('pwhash', sa.Text(), nullable=True),
sa.Column('user_dn', sa.String(length=128), nullable=True),
sa.Column('type', sa.String(length=50), nullable=True),
sa.PrimaryKeyConstraint('id', name=op.f('pk_signup'))
)
with op.batch_alter_table(signup.name, copy_from=signup, recreate='always') as batch_op:
batch_op.drop_constraint('pk_signup', 'primary')
batch_op.create_primary_key('pk_signup', ['token'])
batch_op.drop_column('id')
"""Add id to selfservice tokens
Revision ID: e9a67175e179
Revises: a8c6b6e91c28
Create Date: 2021-09-06 22:04:46.741233
"""
from alembic import op
import sqlalchemy as sa
revision = 'e9a67175e179'
down_revision = 'a8c6b6e91c28'
branch_labels = None
depends_on = None
def upgrade():
meta = sa.MetaData(bind=op.get_bind())
table = sa.Table('mailToken', meta,
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=True),
sa.Column('loginname', sa.String(length=32), nullable=True),
sa.Column('newmail', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_mailToken'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
batch_op.add_column(sa.Column('id', sa.Integer(), autoincrement=True, nullable=False))
batch_op.drop_constraint('pk_mailToken', 'primary')
batch_op.create_primary_key('pk_mailToken', ['id'])
table = sa.Table('passwordToken', meta,
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=True),
sa.Column('loginname', sa.String(length=32), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_passwordToken'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
batch_op.add_column(sa.Column('id', sa.Integer(), autoincrement=True, nullable=False))
batch_op.drop_constraint('pk_passwordToken', 'primary')
batch_op.create_primary_key('pk_passwordToken', ['id'])
def downgrade():
meta = sa.MetaData(bind=op.get_bind())
table = sa.Table('mailToken', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=True),
sa.Column('loginname', sa.String(length=32), nullable=True),
sa.Column('newmail', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_mailToken'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
batch_op.drop_constraint('pk_mailToken', 'primary')
batch_op.create_primary_key('pk_mailToken', ['token'])
batch_op.drop_column('id')
table = sa.Table('passwordToken', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=True),
sa.Column('loginname', sa.String(length=32), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_passwordToken'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
batch_op.add_column(sa.Column('id', sa.Integer(), autoincrement=True, nullable=False))
batch_op.drop_constraint('pk_passwordToken', 'primary')
batch_op.create_primary_key('pk_passwordToken', ['token'])
batch_op.drop_column('id')
import datetime
import functools
import urllib.parse
import secrets
from flask import Blueprint, request, jsonify, render_template, session, redirect, url_for, flash
from flask_oauthlib.provider import OAuth2Provider
......@@ -21,7 +22,15 @@ def load_client(client_id):
@oauth.grantgetter
def load_grant(client_id, code):
return OAuth2Grant.query.filter_by(client_id=client_id, code=code).first()
if '-' not in code:
return None
grant_id, grant_code = code.split('-', 2)
grant = OAuth2Grant.query.get(grant_id)
if not grant or grant.client_id != client_id:
return None
if not secrets.compare_digest(grant.code, grant_code):
return None
return grant
@oauth.grantsetter
def save_grant(client_id, code, oauthreq, *args, **kwargs): # pylint: disable=unused-argument
......@@ -30,14 +39,28 @@ def save_grant(client_id, code, oauthreq, *args, **kwargs): # pylint: disable=un
code=code['code'], redirect_uri=oauthreq.redirect_uri, expires=expires, _scopes=' '.join(oauthreq.scopes))
db.session.add(grant)
db.session.commit()
code['code'] = f"{grant.id}-{code['code']}"
return grant
@oauth.tokengetter
def load_token(access_token=None, refresh_token=None):
# pylint: disable=too-many-return-statements
if access_token:
return OAuth2Token.query.filter_by(access_token=access_token).first()
if '-' not in access_token:
return None
tok_id, tok_secret = access_token.split('-', 2)
tok = OAuth2Token.query.get(tok_id)
if not tok or not secrets.compare_digest(tok.access_token, tok_secret):
return None
return tok
if refresh_token:
return OAuth2Token.query.filter_by(refresh_token=refresh_token).first()
if '-' not in refresh_token:
return None
tok_id, tok_secret = refresh_token.split('-', 2)
tok = OAuth2Token.query.get(tok_id)
if not tok or not secrets.compare_digest(tok.refresh_token, tok_secret):
return None
return tok
return None
@oauth.tokensetter
......@@ -56,6 +79,8 @@ def save_token(token_data, oauthreq, *args, **kwargs): # pylint: disable=unused-
)
db.session.add(tok)
db.session.commit()
token_data['access_token'] = f"{tok.id}-{token_data['access_token']}"
token_data['refresh_token'] = f"{tok.id}-{token_data['refresh_token']}"
return tok
bp = Blueprint('oauth2', __name__, url_prefix='/oauth2/', template_folder='templates')
......
import datetime
from sqlalchemy import Column, String, DateTime
from sqlalchemy import Column, String, DateTime, Integer
from uffd.database import db
from uffd.utils import token_urlfriendly
class Token():
token = Column(String(128), primary_key=True, default=token_urlfriendly)
created = Column(DateTime, default=datetime.datetime.now)
class PasswordToken(Token, db.Model):
class PasswordToken(db.Model):
__tablename__ = 'passwordToken'
id = Column(Integer(), primary_key=True, autoincrement=True)
token = Column(String(128), default=token_urlfriendly, nullable=False)
created = Column(DateTime, default=datetime.datetime.now)
loginname = Column(String(32))
class MailToken(Token, db.Model):
class MailToken(db.Model):
__tablename__ = 'mailToken'
id = Column(Integer(), primary_key=True, autoincrement=True)
token = Column(String(128), default=token_urlfriendly, nullable=False)
created = Column(DateTime, default=datetime.datetime.now)
loginname = Column(String(32))
newmail = Column(String(255))
Hi {{ user.displayname }},
you have requested to change your mail address. To confirm the change, please visit the following url:
{{ url_for('selfservice.token_mail', token=token, _external=True) }}
{{ url_for('selfservice.token_mail', token_id=token.id, token=token.token, _external=True) }}
**The link is valid for 48h**
......@@ -4,7 +4,7 @@ welcome to the {{ config.ORGANISATION_NAME }} infrastructure! An account was cre
Please visit the following url to set your password:
{{ url_for('selfservice.token_password', token=token, _external=True) }}
{{ url_for('selfservice.token_password', token_id=token.id, token=token.token, _external=True) }}
**The link is valid for 48h**
......
Hi {{ user.displayname }},
you have requested a password reset. To reset your password, visit the following url:
{{ url_for('selfservice.token_password', token=token, _external=True) }}
{{ url_for('selfservice.token_password', token_id=token.id, token=token.token, _external=True) }}
**The link is valid for 48h**
......
{% extends 'base.html' %}
{% block body %}
<form action="{{ url_for("selfservice.token_password", token=token) }}" method="POST" onInput="password2.setCustomValidity(password1.value != password2.value ? 'Passwords do not match.' : '') ">
<form action="{{ url_for("selfservice.token_password", token_id=token.id, token=token.token) }}" method="POST" onInput="password2.setCustomValidity(password1.value != password2.value ? 'Passwords do not match.' : '') ">
<div class="row mt-2 justify-content-center">
<div class="col-lg-6 col-md-10" style="background: #f7f7f7; box-shadow: 0px 2px 2px rgba(0, 0, 0, 0.3); padding: 30px;">
<div class="text-center">
......
import datetime
import secrets
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session
from flask_babel import gettext as _, lazy_gettext
......@@ -83,44 +84,70 @@ def forgot_password():
send_passwordreset(user)
return redirect(url_for('session.login'))
@bp.route("/token/password/<token>", methods=(['POST', 'GET']))
def token_password(token):
dbtoken = PasswordToken.query.get(token)
if not dbtoken or dbtoken.created < (datetime.datetime.now() - datetime.timedelta(days=2)):
# Deprecated
@bp.route('/token/password/<token>')
def token_password_legacy(token):
matching_token = None
filter_expr = PasswordToken.created >= (datetime.datetime.now() - datetime.timedelta(days=2))
for dbtoken in PasswordToken.query.filter(filter_expr):
if secrets.compare_digest(dbtoken.token, token):
matching_token = dbtoken
if not matching_token:
flash(_('Token expired, please try again.'))
return redirect(url_for('session.login'))
return redirect(url_for('token_password', token_id=matching_token.id, token=token))
@bp.route("/token/password/<int:token_id>/<token>", methods=(['POST', 'GET']))
def token_password(token_id, token):
dbtoken = PasswordToken.query.get(token_id)
if not dbtoken or not secrets.compare_digest(dbtoken.token, token) or \
dbtoken.created < (datetime.datetime.now() - datetime.timedelta(days=2)):
flash(_('Token expired, please try again.'))
if dbtoken:
db.session.delete(dbtoken)
db.session.commit()
return redirect(url_for('session.login'))
if request.method == 'GET':
return render_template('selfservice/set_password.html', token=token)
return render_template('selfservice/set_password.html', token=dbtoken)
if not request.values['password1']:
flash(_('You need to set a password, please try again.'))
return render_template('selfservice/set_password.html', token=token)
return render_template('selfservice/set_password.html', token=dbtoken)
if not request.values['password1'] == request.values['password2']:
flash(_('Passwords do not match, please try again.'))
return render_template('selfservice/set_password.html', token=token)
return render_template('selfservice/set_password.html', token=dbtoken)
user = User.query.filter_by(loginname=dbtoken.loginname).one()
if not user.set_password(request.values['password1']):
flash(_('Password ist not valid, please try again.'))
return render_template('selfservice/set_password.html', token=token)
return render_template('selfservice/set_password.html', token=dbtoken)
db.session.delete(dbtoken)
flash(_('New password set'))
ldap.session.commit()
db.session.commit()
return redirect(url_for('session.login'))
# Deprecated
@bp.route("/token/mail_verification/<token>")
@login_required()
def token_mail(token):
dbtoken = MailToken.query.get(token)
if not dbtoken or dbtoken.created < (datetime.datetime.now() - datetime.timedelta(days=2)):
def token_mail_legacy(token):
matching_token = None
filter_expr = MailToken.created >= (datetime.datetime.now() - datetime.timedelta(days=2))
for dbtoken in MailToken.query.filter(filter_expr):
if secrets.compare_digest(dbtoken.token, token):
matching_token = dbtoken
if not matching_token:
flash(_('Token expired, please try again.'))
return redirect(url_for('session.login'))
return redirect(url_for('mail_password', token_id=matching_token.id, token=token))
@bp.route("/token/mail_verification/<int:token_id>/<token>")
def token_mail(token_id, token):
dbtoken = MailToken.query.get(token_id)
if not dbtoken or not secrets.compare_digest(dbtoken.token, token) or \
dbtoken.created < (datetime.datetime.now() - datetime.timedelta(days=2)):
flash(_('Token expired, please try again.'))
if dbtoken:
db.session.delete(dbtoken)
db.session.commit()
return redirect(url_for('selfservice.index'))
user = User.query.filter_by(loginname=dbtoken.loginname).one()
user.set_mail(dbtoken.newmail)
flash(_('New mail set'))
......@@ -157,7 +184,7 @@ def send_mail_verification(loginname, newmail):
user = User.query.filter_by(loginname=loginname).one()
if not sendmail(newmail, 'Mail verification', 'selfservice/mailverification.mail.txt', user=user, token=token.token):
if not sendmail(newmail, 'Mail verification', 'selfservice/mailverification.mail.txt', user=user, token=token):
flash(_('Mail to "%(mail_address)s" could not be sent!', mail_address=newmail))
def send_passwordreset(user, new=False):
......@@ -177,5 +204,5 @@ def send_passwordreset(user, new=False):
template = 'selfservice/passwordreset.mail.txt'
subject = 'Password reset'
if not sendmail(user.mail, subject, template, user=user, token=token.token):
if not sendmail(user.mail, subject, template, user=user, token=token):
flash(_('Mail to "%(mail_address)s" could not be sent!', mail_address=user.mail))
import datetime
from crypt import crypt
from sqlalchemy import Column, String, Text, DateTime
from sqlalchemy import Column, String, Text, DateTime, Integer
from uffd.ldapalchemy.dbutils import DBRelationship
from uffd.database import db
......@@ -26,7 +26,8 @@ class Signup(db.Model):
As long as they are not completed, signup requests have no effect each other
or different parts of the application.'''
__tablename__ = 'signup'
token = Column(String(128), primary_key=True, default=token_urlfriendly)
id = Column(Integer(), primary_key=True, autoincrement=True)
token = Column(String(128), default=token_urlfriendly, nullable=False)
created = Column(DateTime, default=datetime.datetime.now, nullable=False)
loginname = Column(Text)
displayname = Column(Text)
......
{% extends 'base.html' %}
{% block body %}
<form action="{{ url_for(".signup_confirm_submit", token=signup.token) }}" method="POST">
<form action="{{ url_for(".signup_confirm_submit", signup_id=signup.id, token=signup.token) }}" method="POST">
<div class="row mt-2 justify-content-center">
<div class="col-lg-6 col-md-10" style="background: #f7f7f7; box-shadow: 0px 2px 2px rgba(0, 0, 0, 0.3); padding: 30px;">
<div class="text-center">
......