diff --git a/tests/test_invite.py b/tests/test_invite.py new file mode 100644 index 0000000000000000000000000000000000000000..d488b84ff4682142cf9afbf2ce74bffe9d0854a1 --- /dev/null +++ b/tests/test_invite.py @@ -0,0 +1,658 @@ +import unittest +import datetime +import time + +from flask import url_for, session + +# These imports are required, because otherwise we get circular imports?! +from uffd import user +from uffd.ldap import ldap + +from uffd import create_app, db +from uffd.invite.models import Invite, InviteGrant, InviteSignup +from uffd.user.models import User, Group +from uffd.role.models import Role +from uffd.session.views import get_current_user, is_valid_session, login_get_user + +from utils import dump, UffdTestCase, db_flush + +class TestInviteModel(UffdTestCase): + def test_expire(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60)) + self.assertFalse(invite.expired) + self.assertTrue(invite.active) + invite.valid_until = datetime.datetime.now() - datetime.timedelta(seconds=60) + self.assertTrue(invite.expired) + self.assertFalse(invite.active) + + def test_void(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), single_use=False) + self.assertFalse(invite.voided) + self.assertTrue(invite.active) + invite.used = True + self.assertFalse(invite.voided) + self.assertTrue(invite.active) + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), single_use=True) + self.assertFalse(invite.voided) + self.assertTrue(invite.active) + invite.used = True + self.assertTrue(invite.voided) + self.assertFalse(invite.active) + + def test_disable(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60)) + self.assertTrue(invite.active) + invite.disable() + self.assertFalse(invite.active) + + def test_reset_disabled(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60)) + invite.disable() + self.assertFalse(invite.active) + invite.reset() + self.assertTrue(invite.active) + + def test_reset_expired(self): + invite = Invite(valid_until=datetime.datetime.now() - datetime.timedelta(seconds=60)) + self.assertFalse(invite.active) + invite.reset() + self.assertFalse(invite.active) + + def test_reset_disabled(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), single_use=False) + invite.used = True + invite.disable() + self.assertFalse(invite.active) + invite.reset() + self.assertTrue(invite.active) + +class TestInviteGrantModel(UffdTestCase): + def test_success(self): + user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') + group0 = Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com') + role0 = Role(name='baserole', groups=[group0]) + db.session.add(role0) + user.roles.add(role0) + user.update_groups() + group1 = Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com') + role1 = Role(name='testrole1', groups=[group1]) + db.session.add(role1) + role2 = Role(name='testrole2') + db.session.add(role2) + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), roles=[role1, role2]) + self.assertIn(role0, user.roles) + self.assertNotIn(role1, user.roles) + self.assertNotIn(role2, user.roles) + self.assertIn(group0, user.groups) + self.assertNotIn(group1, user.groups) + self.assertFalse(invite.used) + grant = InviteGrant(invite=invite, user=user) + success, msg = grant.apply() + self.assertTrue(success) + self.assertIn(role0, user.roles) + self.assertIn(role1, user.roles) + self.assertIn(role2, user.roles) + self.assertIn(group0, user.groups) + self.assertIn(group1, user.groups) + self.assertTrue(invite.used) + db.session.commit() + ldap.session.commit() + db_flush() + user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') + self.assertIn('baserole', [role.name for role in user.roles]) + self.assertIn('testrole1', [role.name for role in user.roles]) + self.assertIn('testrole2', [role.name for role in user.roles]) + self.assertIn('cn=uffd_access,ou=groups,dc=example,dc=com', [group.dn for group in user.groups]) + self.assertIn('cn=uffd_admin,ou=groups,dc=example,dc=com', [group.dn for group in user.groups]) + + def test_inactive(self): + user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') + group = Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com') + role = Role(name='testrole1', groups=[group]) + db.session.add(role) + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), roles=[role], single_use=True, used=True) + self.assertFalse(invite.active) + grant = InviteGrant(invite=invite, user=user) + success, msg = grant.apply() + self.assertFalse(success) + self.assertIsInstance(msg, str) + self.assertNotIn(role, user.roles) + self.assertNotIn(group, user.groups) + + def test_no_roles(self): + user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60)) + self.assertTrue(invite.active) + grant = InviteGrant(invite=invite, user=user) + success, msg = grant.apply() + self.assertFalse(success) + self.assertIsInstance(msg, str) + + def test_no_new_roles(self): + user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') + role = Role(name='testrole1') + db.session.add(role) + user.roles.add(role) + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), roles=[role]) + self.assertTrue(invite.active) + grant = InviteGrant(invite=invite, user=user) + success, msg = grant.apply() + self.assertFalse(success) + self.assertIsInstance(msg, str) + +class TestInviteSignupModel(UffdTestCase): + def create_base_roles(self): + self.app.config['ROLES_BASEROLES'] = ['base'] + baserole = Role(name='base') + baserole.groups.add(Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com')) + baserole.groups.add(Group.query.get('cn=users,ou=groups,dc=example,dc=com')) + db.session.add(baserole) + db.session.commit() + + def test_success(self): + self.create_base_roles() + base_role = Role.query.filter_by(name='base').one() + base_group1 = Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com') + base_group2 = Group.query.get('cn=users,ou=groups,dc=example,dc=com') + group = Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com') + role1 = Role(name='testrole1', groups=[group]) + db.session.add(role1) + role2 = Role(name='testrole2') + db.session.add(role2) + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), roles=[role1, role2], allow_signup=True) + signup = InviteSignup(invite=invite, loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + self.assertFalse(invite.used) + valid, msg = signup.validate() + self.assertTrue(valid) + self.assertFalse(invite.used) + user, msg = signup.finish('notsecret') + self.assertIsInstance(user, User) + self.assertTrue(invite.used) + self.assertEqual(user.loginname, 'newuser') + self.assertEqual(user.displayname, 'New User') + self.assertEqual(user.mail, 'test@example.com') + self.assertEqual(signup.user.dn, user.dn) + self.assertIn(base_role, user.roles) + self.assertIn(role1, user.roles) + self.assertIn(role2, user.roles) + self.assertIn(base_group1, user.groups) + self.assertIn(base_group2, user.groups) + self.assertIn(group, user.groups) + db.session.commit() + ldap.session.commit() + db_flush() + self.assertEqual(len(User.query.filter_by(loginname='newuser').all()), 1) + + def test_success_no_roles(self): + self.create_base_roles() + base_role = Role.query.filter_by(name='base').one() + base_group1 = Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com') + base_group2 = Group.query.get('cn=users,ou=groups,dc=example,dc=com') + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) + signup = InviteSignup(invite=invite, loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + self.assertFalse(invite.used) + valid, msg = signup.validate() + self.assertTrue(valid) + self.assertFalse(invite.used) + user, msg = signup.finish('notsecret') + self.assertIsInstance(user, User) + self.assertTrue(invite.used) + self.assertEqual(user.loginname, 'newuser') + self.assertEqual(user.displayname, 'New User') + self.assertEqual(user.mail, 'test@example.com') + self.assertEqual(signup.user.dn, user.dn) + self.assertIn(base_role, user.roles) + self.assertEqual(len(user.roles), 1) + self.assertIn(base_group1, user.groups) + self.assertIn(base_group2, user.groups) + self.assertEqual(len(user.groups), 2) + db.session.commit() + ldap.session.commit() + db_flush() + self.assertEqual(len(User.query.filter_by(loginname='newuser').all()), 1) + + def test_inactive(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True, single_use=True, used=True) + self.assertFalse(invite.active) + signup = InviteSignup(invite=invite, loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + valid, msg = signup.validate() + self.assertFalse(valid) + self.assertIsInstance(msg, str) + user, msg = signup.finish('notsecret') + self.assertIsNone(user) + self.assertIsInstance(msg, str) + + def test_invalid(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) + self.assertTrue(invite.active) + signup = InviteSignup(invite=invite, loginname='', displayname='New User', mail='test@example.com', password='notsecret') + valid, msg = signup.validate() + self.assertFalse(valid) + self.assertIsInstance(msg, str) + + def test_invalid2(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) + self.assertTrue(invite.active) + signup = InviteSignup(invite=invite, loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + user, msg = signup.finish('wrongpassword') + self.assertIsNone(user) + self.assertIsInstance(msg, str) + + def test_no_signup(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=False) + self.assertTrue(invite.active) + signup = InviteSignup(invite=invite, loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + valid, msg = signup.validate() + self.assertFalse(valid) + self.assertIsInstance(msg, str) + user, msg = signup.finish('notsecret') + self.assertIsNone(user) + self.assertIsInstance(msg, str) + +class TestInviteViews(UffdTestCase): + def setUpApp(self): + self.app.config['SELF_SIGNUP'] = False + self.app.last_mail = None + + def login_admin(self): + self.client.post(path=url_for('session.login'), + data={'loginname': 'testadmin', 'password': 'adminpassword'}, follow_redirects=True) + + def login_user(self): + self.client.post(path=url_for('session.login'), + data={'loginname': 'testuser', 'password': 'userpassword'}, follow_redirects=True) + + def test_index(self): + valid_until = datetime.datetime.now() + datetime.timedelta(seconds=60) + valid_until_expired = datetime.datetime.now() - datetime.timedelta(seconds=60) + user1 = User.query.get('uid=testuser,ou=users,dc=example,dc=com') + user2 = User.query.get('uid=testadmin,ou=users,dc=example,dc=com') + role1 = Role(name='testrole1') + db.session.add(role1) + role2 = Role(name='testrole2') + db.session.add(role2) + # All possible states + db.session.add(Invite(valid_until=valid_until, single_use=False)) + db.session.add(Invite(valid_until=valid_until, single_use=True, used=False)) + db.session.add(Invite(valid_until=valid_until, single_use=True, used=True, signups=[InviteSignup(user=user1)])) + db.session.add(Invite(valid_until=valid_until_expired)) + db.session.add(Invite(valid_until=valid_until, disabled=True)) + # Different permissions + db.session.add(Invite(valid_until=valid_until, allow_signup=True)) + db.session.add(Invite(valid_until=valid_until, allow_signup=False)) + db.session.add(Invite(valid_until=valid_until, allow_signup=True, roles=[role1], grants=[InviteGrant(user=user2)])) + db.session.add(Invite(valid_until=valid_until, allow_signup=False, roles=[role1, role2])) + db.session.commit() + self.login_admin() + r = self.client.get(path=url_for('invite.index'), follow_redirects=True) + dump('invite_index', r) + self.assertEqual(r.status_code, 200) + + def test_index_empty(self): + self.login_admin() + r = self.client.get(path=url_for('invite.index'), follow_redirects=True) + dump('invite_index_empty', r) + self.assertEqual(r.status_code, 200) + + def test_index_nologin(self): + r = self.client.get(path=url_for('invite.index'), follow_redirects=True) + dump('invite_index_nologin', r) + self.assertEqual(r.status_code, 200) + + def test_index_noaccess(self): + self.login_user() + r = self.client.get(path=url_for('invite.index'), follow_redirects=True) + dump('invite_index_noaccess', r) + self.assertEqual(r.status_code, 200) + + def test_new(self): + self.login_admin() + role = Role(name='testrole1') + db.session.add(role) + db.session.commit() + role_id = role.id + r = self.client.get(path=url_for('invite.new'), follow_redirects=True) + dump('invite_new', r) + self.assertEqual(r.status_code, 200) + valid_until = (datetime.datetime.now() + datetime.timedelta(seconds=60)).isoformat() + self.assertListEqual(Invite.query.all(), []) + r = self.client.post(path=url_for('invite.new_submit'), + data={'single-use': '1', 'valid-until': valid_until, + 'allow-signup': '1', 'role-%d'%role_id: '1'}, follow_redirects=True) + dump('invite_new_submit', r) + invite = Invite.query.one() + role = Role.query.get(role_id) + self.assertTrue(invite.active) + self.assertTrue(invite.single_use) + self.assertTrue(invite.allow_signup) + self.assertListEqual(invite.roles, [role]) + + def test_disable(self): + self.login_admin() + valid_until = datetime.datetime.now() + datetime.timedelta(seconds=60) + invite = Invite(valid_until=valid_until) + db.session.add(invite) + db.session.commit() + token = invite.token + self.assertTrue(Invite.query.get(token).active) + r = self.client.post(path=url_for('invite.disable', token=token), follow_redirects=True) + dump('invite_disable', r) + self.assertTrue(Invite.query.get(token).disabled) + + def test_reset_disabled(self): + self.login_admin() + valid_until = datetime.datetime.now() + datetime.timedelta(seconds=60) + invite = Invite(valid_until=valid_until, disabled=True) + db.session.add(invite) + db.session.commit() + token = invite.token + self.assertFalse(Invite.query.get(token).active) + r = self.client.post(path=url_for('invite.reset', token=token), follow_redirects=True) + dump('invite_reset_disabled', r) + self.assertTrue(Invite.query.get(token).active) + + def test_reset_voided(self): + self.login_admin() + valid_until = datetime.datetime.now() + datetime.timedelta(seconds=60) + invite = Invite(valid_until=valid_until, single_use=True, used=True) + db.session.add(invite) + db.session.commit() + token = invite.token + self.assertFalse(Invite.query.get(token).active) + r = self.client.post(path=url_for('invite.reset', token=token), follow_redirects=True) + dump('invite_reset_voided', r) + self.assertTrue(Invite.query.get(token).active) + + def test_use(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True, roles=[Role(name='testrole1'), Role(name='testrole2')]) + db.session.add(invite) + db.session.commit() + token = invite.token + r = self.client.get(path=url_for('invite.use', token=token), follow_redirects=True) + dump('invite_use', r) + self.assertEqual(r.status_code, 200) + + def test_use_loggedin(self): + self.login_user() + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True, roles=[Role(name='testrole1'), Role(name='testrole2')]) + db.session.add(invite) + db.session.commit() + token = invite.token + r = self.client.get(path=url_for('invite.use', token=token), follow_redirects=True) + dump('invite_use_loggedin', r) + self.assertEqual(r.status_code, 200) + + def test_use_inactive(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), disabled=True) + db.session.add(invite) + db.session.commit() + token = invite.token + r = self.client.get(path=url_for('invite.use', token=token), follow_redirects=True) + dump('invite_use_inactive', r) + self.assertEqual(r.status_code, 200) + + # TODO: test cases for {logged in, not logged in} x (signup-only, grant-only, both, none?} + + def test_grant(self): + user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') + group0 = Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com') + role0 = Role(name='baserole', groups=[group0]) + db.session.add(role0) + user.roles.add(role0) + user.update_groups() + group1 = Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com') + role1 = Role(name='testrole1', groups=[group1]) + db.session.add(role1) + role2 = Role(name='testrole2') + db.session.add(role2) + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), roles=[role1, role2]) + db.session.add(invite) + db.session.commit() + ldap.session.commit() + token = invite.token + self.assertIn(role0, user.roles) + self.assertNotIn(role1, user.roles) + self.assertNotIn(role2, user.roles) + self.assertIn(group0, user.groups) + self.assertNotIn(group1, user.groups) + self.assertFalse(invite.used) + self.login_user() + r = self.client.post(path=url_for('invite.grant', token=token), follow_redirects=True) + dump('invite_grant', r) + self.assertEqual(r.status_code, 200) + db_flush() + invite = Invite.query.get(token) + user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') + self.assertTrue(invite.used) + self.assertIn('baserole', [role.name for role in user.roles]) + self.assertIn('testrole1', [role.name for role in user.roles]) + self.assertIn('testrole2', [role.name for role in user.roles]) + self.assertIn('cn=uffd_access,ou=groups,dc=example,dc=com', [group.dn for group in user.groups]) + self.assertIn('cn=uffd_admin,ou=groups,dc=example,dc=com', [group.dn for group in user.groups]) + + def test_grant_invalid_invite(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), disabled=True) + db.session.add(invite) + db.session.commit() + token = invite.token + self.login_user() + r = self.client.post(path=url_for('invite.grant', token=token), follow_redirects=True) + dump('invite_grant_invalid_invite', r) + self.assertEqual(r.status_code, 200) + self.assertFalse(Invite.query.get(token).used) + + def test_grant_no_roles(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60)) + db.session.add(invite) + db.session.commit() + token = invite.token + self.login_user() + r = self.client.post(path=url_for('invite.grant', token=token), follow_redirects=True) + dump('invite_grant_no_roles', r) + self.assertEqual(r.status_code, 200) + self.assertFalse(Invite.query.get(token).used) + + def test_grant_no_new_roles(self): + user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') + role = Role(name='testrole') + db.session.add(role) + user.roles.add(role) + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), roles=[role]) + db.session.add(invite) + db.session.commit() + token = invite.token + self.login_user() + r = self.client.post(path=url_for('invite.grant', token=token), follow_redirects=True) + dump('invite_grant_no_new_roles', r) + self.assertEqual(r.status_code, 200) + self.assertFalse(Invite.query.get(token).used) + + def test_signup(self): + self.app.config['ROLES_BASEROLES'] = ['base'] + baserole = Role(name='base') + baserole.groups.add(Group.query.get('cn=uffd_access,ou=groups,dc=example,dc=com')) + baserole.groups.add(Group.query.get('cn=users,ou=groups,dc=example,dc=com')) + db.session.add(baserole) + group = Group.query.get('cn=uffd_admin,ou=groups,dc=example,dc=com') + role1 = Role(name='testrole1', groups=[group]) + db.session.add(role1) + role2 = Role(name='testrole2') + db.session.add(role2) + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), roles=[role1, role2], allow_signup=True) + db.session.add(invite) + db.session.commit() + token = invite.token + r = self.client.get(path=url_for('invite.signup_start', token=token), follow_redirects=True) + dump('invite_signup_start', r) + self.assertEqual(r.status_code, 200) + r = self.client.post(path=url_for('invite.signup_submit', token=token), + data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com', + 'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True) + dump('invite_signup_submit', r) + self.assertEqual(r.status_code, 200) + signup = InviteSignup.query.one() + self.assertEqual(signup.loginname, 'newuser') + self.assertEqual(signup.displayname, 'New User') + self.assertEqual(signup.mail, 'test@example.com') + self.assertIn(signup.token, str(self.app.last_mail.get_content())) + self.assertTrue(signup.check_password('notsecret')) + self.assertTrue(signup.validate()[0]) + + def test_signup_invalid_invite(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True, disabled=True) + db.session.add(invite) + db.session.commit() + r = self.client.get(path=url_for('invite.signup_start', token=invite.token), follow_redirects=True) + dump('invite_signup_invalid_invite', r) + self.assertEqual(r.status_code, 200) + + def test_signup_nosignup(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=False) + db.session.add(invite) + db.session.commit() + r = self.client.get(path=url_for('invite.signup_start', token=invite.token), follow_redirects=True) + dump('invite_signup_nosignup', r) + self.assertEqual(r.status_code, 200) + + def test_signup_wrongpassword(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) + db.session.add(invite) + db.session.commit() + r = self.client.post(path=url_for('invite.signup_submit', token=invite.token), + data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com', + 'password1': 'notsecret', 'password2': 'notthesame'}, follow_redirects=True) + dump('invite_signup_wrongpassword', r) + self.assertEqual(r.status_code, 200) + + def test_signup_invalid(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) + db.session.add(invite) + db.session.commit() + r = self.client.post(path=url_for('invite.signup_submit', token=invite.token), + data={'loginname': '', 'displayname': 'New User', 'mail': 'test@example.com', + 'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True) + dump('invite_signup_invalid', r) + self.assertEqual(r.status_code, 200) + + def test_signup_mailerror(self): + self.app.config['MAIL_SKIP_SEND'] = 'fail' + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) + db.session.add(invite) + db.session.commit() + r = self.client.post(path=url_for('invite.signup_submit', token=invite.token), + data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com', + 'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True) + dump('invite_signup_mailerror', r) + self.assertEqual(r.status_code, 200) + + def test_signup_hostlimit(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) + db.session.add(invite) + db.session.commit() + token = invite.token + for i in range(20): + r = self.client.post(path=url_for('invite.signup_submit', token=token), + data={'loginname': 'newuser%d'%i, 'displayname': 'New User', 'mail': 'test%d@example.com'%i, + 'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True) + self.assertEqual(r.status_code, 200) + self.app.last_mail = None + r = self.client.post(path=url_for('invite.signup_submit', token=token), + data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com', + 'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True) + dump('invite_signup_hostlimit', r) + self.assertEqual(r.status_code, 200) + self.assertEqual(InviteSignup.query.filter_by(loginname='newuser').all(), []) + self.assertIsNone(self.app.last_mail) + + def test_signup_mailimit(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) + db.session.add(invite) + db.session.commit() + token = invite.token + for i in range(3): + r = self.client.post(path=url_for('invite.signup_submit', token=token), + data={'loginname': 'newuser%d'%i, 'displayname': 'New User', 'mail': 'test@example.com', + 'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True) + self.assertEqual(r.status_code, 200) + self.app.last_mail = None + r = self.client.post(path=url_for('invite.signup_submit', token=token), + data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com', + 'password1': 'notsecret', 'password2': 'notsecret'}, follow_redirects=True) + dump('invite_signup_maillimit', r) + self.assertEqual(r.status_code, 200) + self.assertEqual(InviteSignup.query.filter_by(loginname='newuser').all(), []) + self.assertIsNone(self.app.last_mail) + + def test_signup_check(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) + db.session.add(invite) + db.session.commit() + token = invite.token + r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True, + data={'loginname': 'newuser'}) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.content_type, 'application/json') + self.assertEqual(r.json['status'], 'ok') + + def test_signup_check_invalid(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) + db.session.add(invite) + db.session.commit() + token = invite.token + r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True, + data={'loginname': ''}) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.content_type, 'application/json') + self.assertEqual(r.json['status'], 'invalid') + + def test_signup_check_exists(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) + db.session.add(invite) + db.session.commit() + token = invite.token + r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True, + data={'loginname': 'testuser'}) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.content_type, 'application/json') + self.assertEqual(r.json['status'], 'exists') + + def test_signup_check_nosignup(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=False) + db.session.add(invite) + db.session.commit() + token = invite.token + r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True, + data={'loginname': 'testuser'}) + self.assertEqual(r.status_code, 403) + self.assertEqual(r.content_type, 'application/json') + self.assertEqual(r.json['status'], 'error') + + def test_signup_check_error(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True, disabled=True) + db.session.add(invite) + db.session.commit() + token = invite.token + r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True, + data={'loginname': 'testuser'}) + self.assertEqual(r.status_code, 403) + self.assertEqual(r.content_type, 'application/json') + self.assertEqual(r.json['status'], 'error') + + def test_signup_check_ratelimited(self): + invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), allow_signup=True) + db.session.add(invite) + db.session.commit() + token = invite.token + for i in range(20): + r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True, + data={'loginname': 'testuser'}) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.content_type, 'application/json') + r = self.client.post(path=url_for('invite.signup_check', token=token), follow_redirects=True, + data={'loginname': 'testuser'}) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.content_type, 'application/json') + self.assertEqual(r.json['status'], 'ratelimited') + +class TestInviteViewsOL(TestInviteViews): + use_openldap = True diff --git a/uffd/__init__.py b/uffd/__init__.py index e124a3ce1209e0fd10d280aa32fa065a1d2f67e3..f0828bb2d0e61a079335879baa6f716aeba32036 100644 --- a/uffd/__init__.py +++ b/uffd/__init__.py @@ -43,10 +43,10 @@ def create_app(test_config=None): # pylint: disable=too-many-locals db.init_app(app) # pylint: disable=C0415 - from uffd import user, selfservice, role, mail, session, csrf, mfa, oauth2, services, signup + from uffd import user, selfservice, role, mail, session, csrf, mfa, oauth2, services, signup, invite # pylint: enable=C0415 - for i in user.bp + selfservice.bp + role.bp + mail.bp + session.bp + csrf.bp + mfa.bp + oauth2.bp + services.bp + signup.bp: + for i in user.bp + selfservice.bp + role.bp + mail.bp + session.bp + csrf.bp + mfa.bp + oauth2.bp + services.bp + signup.bp + invite.bp: app.register_blueprint(i) @app.route("/") diff --git a/uffd/invite/__init__.py b/uffd/invite/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..656390049779db11f3fbd9a16498218b18982068 --- /dev/null +++ b/uffd/invite/__init__.py @@ -0,0 +1,3 @@ +from .views import bp as _bp + +bp = [_bp] diff --git a/uffd/invite/models.py b/uffd/invite/models.py new file mode 100644 index 0000000000000000000000000000000000000000..a737274c6cb40bfc908f73a8773c73cef68853c0 --- /dev/null +++ b/uffd/invite/models.py @@ -0,0 +1,95 @@ +import secrets +import datetime + +from sqlalchemy import Column, String, Integer, ForeignKey, DateTime, Boolean +from sqlalchemy.orm import relationship +from ldapalchemy.dbutils import DBRelationship + +from uffd.database import db +from uffd.user.models import User +from uffd.signup.models import Signup + +# pylint: disable=E1101 +invite_roles = db.Table('invite_roles', + Column('invite_token', String(128), ForeignKey('invite.token'), primary_key=True), + Column('role_id', Integer, ForeignKey('role.id'), primary_key=True) +) + +class Invite(db.Model): + __tablename__ = 'invite' + token = Column(String(128), primary_key=True, default=lambda: secrets.token_hex(20)) + created = Column(DateTime, default=datetime.datetime.now, nullable=False) + valid_until = Column(DateTime, nullable=False) + single_use = Column(Boolean, default=True, nullable=False) + allow_signup = Column(Boolean, default=True, nullable=False) + used = Column(Boolean, default=False, nullable=False) + disabled = Column(Boolean, default=False, nullable=False) + roles = relationship('Role', secondary=invite_roles) + signups = relationship('InviteSignup', back_populates='invite', lazy=True) + grants = relationship('InviteGrant', backref='invite', lazy=True) + + @property + def expired(self): + return datetime.datetime.now().replace(second=0, microsecond=0) > self.valid_until + + @property + def voided(self): + return self.single_use and self.used + + @property + def active(self): + return not self.disabled and not self.voided and not self.expired + + def disable(self): + self.disabled = True + + def reset(self): + self.disabled = False + self.used = False + +class InviteGrant(db.Model): + __tablename__ = 'invite_grant' + id = Column(Integer(), primary_key=True, autoincrement=True) + invite_token = Column(String(128), ForeignKey('invite.token'), nullable=False) + user_dn = Column(String(128), nullable=False) + user = DBRelationship('user_dn', User) + + def apply(self): + if not self.invite.active: + return False, 'Invite link is invalid' + if not self.invite.roles: + return False, 'Invite link does not grant any roles' + if set(self.invite.roles).issubset(self.user.roles): + return False, 'Invite link does not grant any new roles' + for role in self.invite.roles: + self.user.roles.add(role) + self.user.update_groups() + self.invite.used = True + return True, 'Success' + +class InviteSignup(Signup): + __tablename__ = 'invite_signup' + token = Column(String(128), ForeignKey('signup.token'), primary_key=True) + invite_token = Column(String(128), ForeignKey('invite.token'), nullable=False) + invite = relationship('Invite', back_populates='signups') + + __mapper_args__ = { + 'polymorphic_identity': 'InviteSignup' + } + + def validate(self): + if not self.invite.active or not self.invite.allow_signup: + return False, 'Invite link is invalid' + return super().validate() + + def finish(self, password): + if not self.invite.active or not self.invite.allow_signup: + return None, 'Invite link is invalid' + user, msg = super().finish(password) + if user is not None: + # super().finish() already added ROLES_BASEROLES + for role in self.invite.roles: + user.roles.add(role) + user.update_groups() + self.invite.used = True + return user, msg diff --git a/uffd/invite/templates/invite/list.html b/uffd/invite/templates/invite/list.html new file mode 100644 index 0000000000000000000000000000000000000000..8db2ece9a0af69599264a584c0cd671e5537fad2 --- /dev/null +++ b/uffd/invite/templates/invite/list.html @@ -0,0 +1,97 @@ +{% extends 'base.html' %} + +{% block body %} + +<div class="btn-toolbar mb-2"> + <a class="btn btn-primary ml-auto" href="{{ url_for("invite.new") }}"><i class="fa fa-plus" aria-hidden="true"></i> New</a> +</div> +<div class="table-responsive"> + <table class="table"> + <thead> + <tr> + <th scope="col">Link</th> + <th scope="col">Status</th> + <th scope="col">Created on</th> + <th scope="col">Expires after</th> + <th scope="col">Permissions</th> + <th scope="col">Usages</th> + <th scope="col"></th> + </tr> + </thead> + <tbody> + {% for invite in invites|sort(attribute='created', reverse=True)|sort(attribute='active', reverse=True) %} + <tr> + <td><a style="width: 8em; display: inline-block;" class="text-truncate" href="{{ url_for('invite.use', token=invite.token) }}"><code>{{ invite.token }}</code></a></td> + <td> + {% if invite.disabled %} + Disabled + {% elif invite.voided %} + Voided + {% elif invite.expired %} + Expired + {% elif not invite.active %} + Invalid + {% elif invite.single_use %} + Valid once + {% else %} + Valid + {% endif %} + </td> + <td>{{ invite.created.strftime('%Y-%m-%d %H:%M') }}</td> + <td>{{ invite.valid_until.strftime('%Y-%m-%d %H:%M') }}</td> + <td> + {{ 'Signup' if invite.allow_signup }}{{ ', ' if invite.allow_signup and invite.roles }} + {% for role in invite.roles %}{{ ', ' if loop.index != 1 }}<a href="{{ url_for('role.show', roleid=role.id) }}" style="white-space: nowrap;"><i class="fas fa-key"></i> {{ role.name }}</a>{% endfor %} + </td> + <td> + <a href="#" data-toggle="modal" data-target="#modal-{{ invite.token }}"> + <span style="white-space: nowrap;">{{ invite.signups|selectattr('completed')|list|length }} <i class="fas fa-users" title="user registrations"></i></span>, + <span style="white-space: nowrap;">{{ invite.grants|length }} <i class="fas fa-key" title="role grants"></i></span> + </a> + </td> + <td class="text-right"> + {% if invite.active %} + <form action="{{ url_for('invite.disable', token=invite.token) }}" method="POST"> + <button type="submit" class="btn btn-link btn-sm py-0" title="Disable"><i class="fas fa-ban" style="width: 1.5em;"></i></button> + </form> + {% else %} + <form action="{{ url_for('invite.reset', token=invite.token) }}" method="POST"> + <button type="submit" class="btn btn-link btn-sm py-0" title="Reenable" {{ 'disabled' if invite.expired }}><i class="fas fa-redo" style="width: 1.5em;"></i></button> + </form> + {% endif %} + </td> + </tr> + {% endfor %} + </tbody> + </table> +</div> + +{% for invite in invites %} +<div class="modal" tabindex="-1" id="modal-{{ invite.token }}"> + <div class="modal-dialog"> + <div class="modal-content"> + <div class="modal-header"> + <h5 class="modal-title">Invite Usages</h5> + <button type="button" class="close" data-dismiss="modal" aria-label="Close"> + <span aria-hidden="true">×</span> + </button> + </div> + <div class="modal-body"> + <ul> + {% for signup in invite.signups if signup.completed %} + <li>Registration of user <a href="{{ url_for('user.show', uid=signup.user.uid) }}">{{ signup.user.loginname }}</a></li> + {% endfor %} + {% for grant in invite.grants if grant.user %} + <li>Roles granted to <a href="{{ url_for('user.show', uid=grant.user.uid) }}">{{ grant.user.loginname }}</a></li> + {% endfor %} + </ul> + </div> + <div class="modal-footer"> + <button type="button" class="btn btn-primary" data-dismiss="modal">Close</button> + </div> + </div> + </div> +</div> +{% endfor %} + +{% endblock %} diff --git a/uffd/invite/templates/invite/new.html b/uffd/invite/templates/invite/new.html new file mode 100644 index 0000000000000000000000000000000000000000..ca203c25a87b00e36ea49fad3113dc8491f8f06c --- /dev/null +++ b/uffd/invite/templates/invite/new.html @@ -0,0 +1,54 @@ +{% extends 'base.html' %} + +{% block body %} +<form action="{{ url_for("invite.new_submit") }}" method="POST" class="form"> + <div class="form-group"> + <label for="single-use">Link Type</label> + <select class="form-control" id="single-use" name="single-use"> + <option value="1">Valid for a single successful use</option> + <option value="0">Multi-use</option> + </select> + </div> + <div class="form-group"> + <label for="valid-until">Expires After</label> + <input class="form-control" type="datetime-local" id="valid-until" name="valid-until" value="{{ (datetime.now() + timedelta(hours=36)).replace(hour=23, minute=59, second=59, microsecond=0).isoformat(timespec='minutes') }}"> + </div> + <div class="form-group"> + <label for="allow-signup">Account Registration</label> + <select class="form-control" id="allow-signup" name="allow-signup"> + <option value="1">Link allows account registration</option> + <option value="0">No account registration allowed</option> + </select> + </div> + <div class="form-group"> + <label for="valid-until">Granted Roles</label> + <table class="table table-sm"> + <thead> + <tr> + <th scope="col"></th> + <th scope="col">Name</th> + <th scope="col">Description</th> + </tr> + </thead> + <tbody> + {% for role in roles|sort(attribute="name") if role.name not in config['ROLES_BASEROLES'] %} + <tr> + <td> + <div class="form-check"> + <input class="form-check-input" type="checkbox" id="role-{{ role.id }}" name="role-{{ role.id }}" value="1"> + </div> + </td> + <td>{{ role.name }}</td> + <td>{{ role.description }}</td> + </tr> + {% endfor %} + {% if not roles %} + <tr><td colspan="3" class="bg-light text-muted text-center">There are no (non-base) roles yet</td></tr> + {% endif %} + </tbody> + </table> + </div> + <button type="submit" class="btn btn-primary"><i class="fa fa-save" aria-hidden="true"></i> Create Link</button> + <a href="{{ url_for("invite.index") }}" class="btn btn-secondary">Cancel</a> +</form> +{% endblock %} diff --git a/uffd/invite/templates/invite/use.html b/uffd/invite/templates/invite/use.html new file mode 100644 index 0000000000000000000000000000000000000000..a51db8858e2538f797cb015fe76e05a1a321fdc4 --- /dev/null +++ b/uffd/invite/templates/invite/use.html @@ -0,0 +1,51 @@ +{% extends 'base.html' %} + +{% block body %} + +<div class="row mt-2 justify-content-center"> + <div class="col-lg-6 col-md-10" style="background: #f7f7f7; box-shadow: 0px 2px 2px rgba(0, 0, 0, 0.3); padding: 30px;"> + <div class="text-center"> + <img alt="branding logo" src="{{ config.get("BRANDING_LOGO_URL") }}" class="col-lg-8 col-md-12" > + </div> + <div class="col-12 mb-3"> + <h2 class="text-center">Invite Link</h2> + </div> + {% if not is_valid_session() %} + <p>Welcome to the CCCV Single-Sign-On!</p> + {% endif %} + + {% if invite.roles and invite.allow_signup %} + <p>With this link you can register a new user account with the following roles or add the roles to an existing account:</p> + {% elif invite.roles %} + <p>With this link you can add the following roles to an existing account:</p> + {% elif invite.allow_signup %} + <p>With this link you can register a new user account.</p> + {% endif %} + {% if invite.roles %} + <ul> + {% for role in invite.roles %} + <li>{{ role.name }}{% if role.description %}: {{ role.description }}{% endif %}</li> + {% endfor %} + </ul> + {% endif %} + {% if is_valid_session() %} + {% if invite.roles %} + <form method="POST" action="{{ url_for("invite.grant", token=invite.token) }}" class="mb-2"> + <button type="submit" class="btn btn-primary btn-block">Add the roles to your account now</button> + </form> + <a href="{{ url_for("session.logout", ref=url_for("session.login", ref=request.url)) }}" class="btn btn-secondary btn-block">Logout and switch to a different account</a> + {% endif %} + {% if invite.allow_signup %} + <a href="{{ url_for("session.logout", ref=url_for("invite.signup_start", token=invite.token)) }}" class="btn btn-secondary btn-block">Logout to register a new account</a> + {% endif %} + {% else %} + {% if invite.allow_signup %} + <a href="{{ url_for("invite.signup_start", token=invite.token) }}" class="btn btn-primary btn-block">Register a new account</a> + {% endif %} + {% if invite.roles %} + <a href="{{ url_for("session.login", ref=request.url) }}" class="btn btn-primary btn-block">Login and add the roles to your account</a> + {% endif %} + {% endif %} + </div> +</div> +{% endblock %} diff --git a/uffd/invite/views.py b/uffd/invite/views.py new file mode 100644 index 0000000000000000000000000000000000000000..79b3f7b2b0273346802dcc0b2fded2ba099418d6 --- /dev/null +++ b/uffd/invite/views.py @@ -0,0 +1,153 @@ +import datetime +import functools + +from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, jsonify + +from uffd.csrf import csrf_protect +from uffd.database import db +from uffd.ldap import ldap +from uffd.session import get_current_user, login_required, is_valid_session +from uffd.role.models import Role +from uffd.invite.models import Invite, InviteSignup, InviteGrant +from uffd.user.models import User +from uffd.sendmail import sendmail +from uffd.navbar import register_navbar +from uffd.ratelimit import host_ratelimit, format_delay +from uffd.signup.views import signup_ratelimit + +bp = Blueprint('invite', __name__, template_folder='templates', url_prefix='/invite/') + +def invite_acl(): + return is_valid_session() and get_current_user().is_in_group(current_app.config['ACL_ADMIN_GROUP']) + +def invite_acl_required(func): + @functools.wraps(func) + @login_required() + def decorator(*args, **kwargs): + if not invite_acl(): + flash('Access denied') + return redirect(url_for('index')) + return func(*args, **kwargs) + return decorator + +@bp.route('/') +@register_navbar('Invites', icon='link', blueprint=bp, visible=invite_acl) +@invite_acl_required +def index(): + return render_template('invite/list.html', invites=Invite.query.all()) + +@bp.route('/new') +@invite_acl_required +def new(): + return render_template('invite/new.html', roles=Role.query.all()) + +@bp.route('/new', methods=['POST']) +@invite_acl_required +@csrf_protect(blueprint=bp) +def new_submit(): + invite = Invite(single_use=(request.values['single-use'] == '1'), + valid_until=datetime.datetime.fromisoformat(request.values['valid-until']), + allow_signup=(request.values['allow-signup'] == '1')) + for key, value in request.values.items(): + if key.startswith('role-') and value == '1': + role = Role.query.get(key[5:]) + invite.roles.append(role) + db.session.add(invite) + db.session.commit() + return redirect(url_for('invite.index')) + +@bp.route('/<token>/disable', methods=['POST']) +@invite_acl_required +@csrf_protect(blueprint=bp) +def disable(token): + Invite.query.get_or_404(token).disable() + db.session.commit() + return redirect(url_for('.index')) + +@bp.route('/<token>/reset', methods=['POST']) +@invite_acl_required +@csrf_protect(blueprint=bp) +def reset(token): + Invite.query.get_or_404(token).reset() + db.session.commit() + return redirect(url_for('.index')) + +@bp.route('/<token>') +def use(token): + invite = Invite.query.get_or_404(token) + if not invite.active: + flash('Invalid invite link') + return redirect('/') + return render_template('invite/use.html', invite=invite) + +@bp.route('/<token>/grant', methods=['POST']) +@login_required() +def grant(token): + invite = Invite.query.get_or_404(token) + invite_grant = InviteGrant(invite=invite, user=get_current_user()) + db.session.add(invite_grant) + success, msg = invite_grant.apply() + if not success: + flash(msg) + return redirect(url_for('selfservice.index')) + ldap.session.commit() + db.session.commit() + flash('Roles successfully updated') + return redirect(url_for('selfservice.index')) + +@bp.url_defaults +def inject_invite_token(endpoint, values): + if endpoint in ['invite.signup_submit', 'invite.signup_check'] and 'token' in request.view_args: + values['token'] = request.view_args['token'] + +@bp.route('/<token>/signup') +def signup_start(token): + invite = Invite.query.get_or_404(token) + if not invite.active: + flash('Invalid invite link') + return redirect('/') + if not invite.allow_signup: + flash('Invite link does not allow signup') + return redirect('/') + return render_template('signup/start.html') + +@bp.route('/<token>/signupcheck', methods=['POST']) +def signup_check(token): + if host_ratelimit.get_delay(): + return jsonify({'status': 'ratelimited'}) + host_ratelimit.log() + invite = Invite.query.get_or_404(token) + if not invite.active or not invite.allow_signup: + return jsonify({'status': 'error'}), 403 + if not User().set_loginname(request.form['loginname']): + return jsonify({'status': 'invalid'}) + if User.query.filter_by(loginname=request.form['loginname']).all(): + return jsonify({'status': 'exists'}) + return jsonify({'status': 'ok'}) + +@bp.route('/<token>/signup', methods=['POST']) +def signup_submit(token): + invite = Invite.query.get_or_404(token) + if request.form['password1'] != request.form['password2']: + return render_template('signup/start.html', error='Passwords do not match') + signup_delay = signup_ratelimit.get_delay(request.form['mail']) + host_delay = host_ratelimit.get_delay() + if signup_delay and signup_delay > host_delay: + return render_template('signup/start.html', error='Too many signup requests with this mail address! Please wait %s.'%format_delay(signup_delay)) + if host_delay: + return render_template('signup/start.html', error='Too many requests! Please wait %s.'%format_delay(host_delay)) + host_ratelimit.log() + signup = InviteSignup(invite=invite, loginname=request.form['loginname'], + displayname=request.form['displayname'], + mail=request.form['mail'], + password=request.form['password1']) + valid, msg = signup.validate() + if not valid: + return render_template('signup/start.html', error=msg) + db.session.add(signup) + db.session.commit() + sent = sendmail(signup.mail, 'Confirm your mail address', 'signup/mail.txt', signup=signup) + if not sent: + return render_template('signup/start.html', error='Cound not send mail') + signup_ratelimit.log(request.form['mail']) + return render_template('signup/submitted.html', signup=signup) diff --git a/uffd/session/views.py b/uffd/session/views.py index 0331f2f103f2f2bc9c0459164fa911acc4209755..a2ff21251dcc48f6d4ee0a67047a7d473e7dd0a2 100644 --- a/uffd/session/views.py +++ b/uffd/session/views.py @@ -46,7 +46,7 @@ def login_get_user(loginname, password): def logout(): # The oauth2 module takes data from `session` and injects it into the url, # so we need to build the url BEFORE we clear the session! - resp = redirect(url_for('oauth2.logout', ref=url_for('.login'))) + resp = redirect(url_for('oauth2.logout', ref=request.values.get('ref', url_for('.login')))) session.clear() return resp