import datetime import time import unittest from flask import url_for, session # These imports are required, because otherwise we get circular imports?! from uffd import user from uffd.user.models import User, Group from uffd.role.models import flatten_recursive, Role, RoleGroup from uffd.mfa.models import TOTPMethod from uffd import create_app, db from utils import dump, UffdTestCase class TestPrimitives(unittest.TestCase): def test_flatten_recursive(self): class Node: def __init__(self, *neighbors): self.neighbors = set(neighbors or set()) cycle = Node() cycle.neighbors.add(cycle) common = Node(cycle) intermediate1 = Node(common) intermediate2 = Node(common, intermediate1) stub = Node() backref = Node() start1 = Node(intermediate1, intermediate2, stub, backref) backref.neighbors.add(start1) start2 = Node() self.assertSetEqual(flatten_recursive({start1, start2}, 'neighbors'), {start1, start2, backref, stub, intermediate1, intermediate2, common, cycle}) self.assertSetEqual(flatten_recursive(set(), 'neighbors'), set()) class TestUserRoleAttributes(UffdTestCase): def test_roles_effective(self): db.session.add(User(loginname='service', is_service_user=True, mail='service@example.com', displayname='Service')) db.session.commit() user = self.get_user() service_user = User.query.filter_by(loginname='service').one_or_none() included_by_default_role = Role(name='included_by_default') default_role = Role(name='default', is_default=True, included_roles=[included_by_default_role]) included_role = Role(name='included') cycle_role = Role(name='cycle') direct_role1 = Role(name='role1', members=[user, service_user], included_roles=[included_role, cycle_role]) direct_role2 = Role(name='role2', members=[user, service_user], included_roles=[included_role]) cycle_role.included_roles.append(direct_role1) db.session.add_all([included_by_default_role, default_role, included_role, cycle_role, direct_role1, direct_role2]) self.assertSetEqual(user.roles_effective, {direct_role1, direct_role2, cycle_role, included_role, default_role, included_by_default_role}) self.assertSetEqual(service_user.roles_effective, {direct_role1, direct_role2, cycle_role, included_role}) def test_compute_groups(self): user = self.get_user() group1 = self.get_users_group() group2 = self.get_access_group() role1 = Role(name='role1', groups={group1: RoleGroup(group=group1)}) role2 = Role(name='role2', groups={group1: RoleGroup(group=group1), group2: RoleGroup(group=group2)}) db.session.add_all([role1, role2]) self.assertSetEqual(user.compute_groups(), set()) role1.members.append(user) role2.members.append(user) self.assertSetEqual(user.compute_groups(), {group1, group2}) role2.groups[group2].requires_mfa = True self.assertSetEqual(user.compute_groups(), {group1}) db.session.add(TOTPMethod(user=user)) self.assertSetEqual(user.compute_groups(), {group1, group2}) def test_update_groups(self): user = self.get_user() group1 = self.get_users_group() group2 = self.get_access_group() role1 = Role(name='role1', members=[user], groups={group1: RoleGroup(group=group1)}) role2 = Role(name='role2', groups={group2: RoleGroup(group=group2)}) db.session.add_all([role1, role2]) user.groups = [group2] groups_added, groups_removed = user.update_groups() self.assertSetEqual(groups_added, {group1}) self.assertSetEqual(groups_removed, {group2}) self.assertSetEqual(set(user.groups), {group1}) groups_added, groups_removed = user.update_groups() self.assertSetEqual(groups_added, set()) self.assertSetEqual(groups_removed, set()) self.assertSetEqual(set(user.groups), {group1}) class TestRoleModel(UffdTestCase): def test_members_effective(self): db.session.add(User(loginname='service', is_service_user=True, mail='service@example.com', displayname='Service')) db.session.commit() user1 = self.get_user() user2 = self.get_admin() service = User.query.filter_by(loginname='service').one_or_none() included_by_default_role = Role(name='included_by_default') default_role = Role(name='default', is_default=True, included_roles=[included_by_default_role]) included_role = Role(name='included') direct_role = Role(name='direct', members=[user1, user2, service], included_roles=[included_role]) empty_role = Role(name='empty', included_roles=[included_role]) self.assertSetEqual(included_by_default_role.members_effective, {user1, user2}) self.assertSetEqual(default_role.members_effective, {user1, user2}) self.assertSetEqual(included_role.members_effective, {user1, user2, service}) self.assertSetEqual(direct_role.members_effective, {user1, user2, service}) self.assertSetEqual(empty_role.members_effective, set()) def test_included_roles_recursive(self): baserole = Role(name='base') role1 = Role(name='role1', included_roles=[baserole]) role2 = Role(name='role2', included_roles=[baserole]) role3 = Role(name='role3', included_roles=[role1, role2]) self.assertSetEqual(role1.included_roles_recursive, {baserole}) self.assertSetEqual(role2.included_roles_recursive, {baserole}) self.assertSetEqual(role3.included_roles_recursive, {baserole, role1, role2}) baserole.included_roles.append(role1) self.assertSetEqual(role3.included_roles_recursive, {baserole, role1, role2}) def test_groups_effective(self): group1 = self.get_users_group() group2 = self.get_access_group() baserole = Role(name='base', groups={group1: RoleGroup(group=group1)}) role1 = Role(name='role1', groups={group2: RoleGroup(group=group2)}, included_roles=[baserole]) self.assertSetEqual(baserole.groups_effective, {group1}) self.assertSetEqual(role1.groups_effective, {group1, group2}) def test_update_member_groups(self): user1 = self.get_user() user1.update_groups() user2 = self.get_admin() user2.update_groups() group1 = self.get_users_group() group2 = self.get_access_group() group3 = self.get_admin_group() baserole = Role(name='base', members=[user1], groups={group1: RoleGroup(group=group1)}) role1 = Role(name='role1', members=[user2], groups={group2: RoleGroup(group=group2)}, included_roles=[baserole]) db.session.add_all([baserole, role1]) baserole.update_member_groups() role1.update_member_groups() self.assertSetEqual(set(user1.groups), {group1}) self.assertSetEqual(set(user2.groups), {group1, group2}) baserole.groups[group3] = RoleGroup() baserole.update_member_groups() self.assertSetEqual(set(user1.groups), {group1, group3}) self.assertSetEqual(set(user2.groups), {group1, group2, group3}) class TestRoleViews(UffdTestCase): def setUp(self): super().setUp() self.login_as('admin') def test_index(self): db.session.add(Role(name='base', description='Base role description')) db.session.add(Role(name='test1', description='Test1 role description')) db.session.commit() r = self.client.get(path=url_for('role.index'), follow_redirects=True) dump('role_index', r) self.assertEqual(r.status_code, 200) def test_index_empty(self): r = self.client.get(path=url_for('role.index'), follow_redirects=True) dump('role_index_empty', r) self.assertEqual(r.status_code, 200) def test_show(self): role = Role(name='base', description='Base role description') db.session.add(role) db.session.commit() r = self.client.get(path=url_for('role.show', roleid=role.id), follow_redirects=True) dump('role_show', r) self.assertEqual(r.status_code, 200) def test_new(self): r = self.client.get(path=url_for('role.new'), follow_redirects=True) dump('role_new', r) self.assertEqual(r.status_code, 200) def test_update(self): role = Role(name='base', description='Base role description') db.session.add(role) db.session.commit() role.groups[self.get_admin_group()] = RoleGroup() db.session.commit() self.assertEqual(role.name, 'base') self.assertEqual(role.description, 'Base role description') self.assertSetEqual(set(role.groups), {self.get_admin_group()}) r = self.client.post(path=url_for('role.update', roleid=role.id), data={'name': 'base1', 'description': 'Base role description1', 'moderator-group': '', 'group-%d'%self.get_users_group().id: '1', 'group-%d'%self.get_access_group().id: '1'}, follow_redirects=True) dump('role_update', r) self.assertEqual(r.status_code, 200) role = Role.query.get(role.id) self.assertEqual(role.name, 'base1') self.assertEqual(role.description, 'Base role description1') self.assertSetEqual(set(role.groups), {self.get_access_group(), self.get_users_group()}) # TODO: verify that group memberships are updated (currently not possible with ldap mock!) def test_create(self): self.assertIsNone(Role.query.filter_by(name='base').first()) r = self.client.post(path=url_for('role.update'), data={'name': 'base', 'description': 'Base role description', 'moderator-group': '', 'group-%d'%self.get_users_group().id: '1', 'group-%d'%self.get_access_group().id: '1'}, follow_redirects=True) dump('role_create', r) self.assertEqual(r.status_code, 200) role = Role.query.filter_by(name='base').first() self.assertIsNotNone(role) self.assertEqual(role.name, 'base') self.assertEqual(role.description, 'Base role description') self.assertSetEqual(set(role.groups), {self.get_access_group(), self.get_users_group()}) # TODO: verify that group memberships are updated (currently not possible with ldap mock!) def test_create_with_moderator_group(self): self.assertIsNone(Role.query.filter_by(name='base').first()) r = self.client.post(path=url_for('role.update'), data={'name': 'base', 'description': 'Base role description', 'moderator-group': self.get_admin_group().id, 'group-%d'%self.get_users_group().id: '1', 'group-%d'%self.get_access_group().id: '1'}, follow_redirects=True) self.assertEqual(r.status_code, 200) role = Role.query.filter_by(name='base').first() self.assertIsNotNone(role) self.assertEqual(role.name, 'base') self.assertEqual(role.description, 'Base role description') self.assertEqual(role.moderator_group.name, 'uffd_admin') self.assertSetEqual(set(role.groups), {self.get_access_group(), self.get_users_group()}) # TODO: verify that group memberships are updated (currently not possible with ldap mock!) def test_delete(self): role = Role(name='base', description='Base role description') db.session.add(role) db.session.commit() role_id = role.id self.assertIsNotNone(Role.query.get(role_id)) r = self.client.get(path=url_for('role.delete', roleid=role.id), follow_redirects=True) dump('role_delete', r) self.assertEqual(r.status_code, 200) self.assertIsNone(Role.query.get(role_id)) # TODO: verify that group memberships are updated (currently not possible with ldap mock!) def test_set_default(self): db.session.add(User(loginname='service', is_service_user=True, mail='service@example.com', displayname='Service')) db.session.commit() role = Role(name='test') db.session.add(role) role.groups[self.get_admin_group()] = RoleGroup() user1 = self.get_user() user2 = self.get_admin() service_user = User.query.filter_by(loginname='service').one_or_none() self.assertSetEqual(set(self.get_user().roles_effective), set()) self.assertSetEqual(set(self.get_admin().roles_effective), set()) self.assertSetEqual(set(service_user.roles_effective), set()) role.members.append(self.get_user()) role.members.append(service_user) self.assertSetEqual(set(self.get_user().roles_effective), {role}) self.assertSetEqual(set(self.get_admin().roles_effective), set()) self.assertSetEqual(set(service_user.roles_effective), {role}) db.session.commit() role_id = role.id self.assertSetEqual(set(role.members), {self.get_user(), service_user}) r = self.client.get(path=url_for('role.set_default', roleid=role.id), follow_redirects=True) dump('role_set_default', r) self.assertEqual(r.status_code, 200) role = Role.query.get(role_id) service_user = User.query.filter_by(loginname='service').one_or_none() self.assertSetEqual(set(role.members), {service_user}) self.assertSetEqual(set(self.get_user().roles_effective), {role}) self.assertSetEqual(set(self.get_admin().roles_effective), {role}) def test_unset_default(self): admin_role = Role(name='admin', is_default=True) db.session.add(admin_role) admin_role.groups[self.get_admin_group()] = RoleGroup() db.session.add(User(loginname='service', is_service_user=True, mail='service@example.com', displayname='Service')) db.session.commit() role = Role(name='test', is_default=True) db.session.add(role) service_user = User.query.filter_by(loginname='service').one_or_none() role.members.append(service_user) self.assertSetEqual(set(self.get_user().roles_effective), {role, admin_role}) self.assertSetEqual(set(self.get_admin().roles_effective), {role, admin_role}) self.assertSetEqual(set(service_user.roles_effective), {role}) db.session.commit() role_id = role.id admin_role_id = admin_role.id self.assertSetEqual(set(role.members), {service_user}) r = self.client.get(path=url_for('role.unset_default', roleid=role.id), follow_redirects=True) dump('role_unset_default', r) self.assertEqual(r.status_code, 200) role = Role.query.get(role_id) admin_role = Role.query.get(admin_role_id) service_user = User.query.filter_by(loginname='service').one_or_none() self.assertSetEqual(set(role.members), {service_user}) self.assertSetEqual(set(self.get_user().roles_effective), {admin_role}) self.assertSetEqual(set(self.get_admin().roles_effective), {admin_role}) class TestRoleCLI(UffdTestCase): def setUp(self): super().setUp() role = Role(name='admin') db.session.add(role) role.groups[self.get_admin_group()] = RoleGroup(group=self.get_admin_group()) role.members.append(self.get_admin()) role = Role(name='base', is_default=True) db.session.add(role) role.groups[self.get_access_group()] = RoleGroup(group=self.get_access_group()) db.session.add(Role(name='test')) for user in User.query: user.update_groups() db.session.commit() self.client.__exit__(None, None, None) def test_list(self): result = self.app.test_cli_runner().invoke(args=['role', 'list']) self.assertEqual(result.exit_code, 0) def test_show(self): result = self.app.test_cli_runner().invoke(args=['role', 'show', 'admin']) self.assertEqual(result.exit_code, 0) result = self.app.test_cli_runner().invoke(args=['role', 'show', 'doesnotexist']) self.assertEqual(result.exit_code, 1) def test_create(self): result = self.app.test_cli_runner().invoke(args=['role', 'create', 'test']) # conflicting name self.assertEqual(result.exit_code, 1) result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--moderator-group', 'doesnotexist']) # invalid mod group self.assertEqual(result.exit_code, 1) result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--add-group', 'doesnotexist']) # invalid group self.assertEqual(result.exit_code, 1) result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--add-role', 'doesnotexist']) # invalid role self.assertEqual(result.exit_code, 1) result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--description', 'Role description', '--moderator-group', 'uffd_admin', '--add-group', 'users', '--add-role', 'admin']) self.assertEqual(result.exit_code, 0) with self.app.test_request_context(): role = Role.query.filter_by(name='newrole').one() self.assertIsNotNone(role) self.assertEqual(role.description, 'Role description') self.assertEqual(role.moderator_group, self.get_admin_group()) self.assertEqual(list(role.groups), [self.get_users_group()]) self.assertEqual(role.included_roles, Role.query.filter_by(name='admin').all()) with self.app.test_request_context(): for user in User.query: self.assertNotIn(self.get_users_group(), user.groups) result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newbase', '--add-group', 'users', '--default']) self.assertEqual(result.exit_code, 0) with self.app.test_request_context(): for user in User.query: self.assertIn(self.get_users_group(), user.groups) def test_update(self): result = self.app.test_cli_runner().invoke(args=['role', 'update', 'doesnotexist', '--description', 'New description']) self.assertEqual(result.exit_code, 1) result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--add-group', 'doesnotexist']) self.assertEqual(result.exit_code, 1) result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--remove-group', 'doesnotexist']) self.assertEqual(result.exit_code, 1) result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--add-role', 'doesnotexist']) self.assertEqual(result.exit_code, 1) result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--remove-role', 'doesnotexist']) self.assertEqual(result.exit_code, 1) result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--moderator-group', 'doesnotexist']) self.assertEqual(result.exit_code, 1) result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--description', 'New description', '--moderator-group', 'uffd_admin', '--add-group', 'users', '--remove-group', 'uffd_access', '--add-role', 'admin']) self.assertEqual(result.exit_code, 0) with self.app.test_request_context(): role = Role.query.filter_by(name='base').first() self.assertIsNotNone(role) self.assertEqual(role.description, 'New description') self.assertEqual(role.moderator_group, self.get_admin_group()) self.assertEqual(list(role.groups), [self.get_users_group()]) self.assertEqual(role.included_roles, Role.query.filter_by(name='admin').all()) self.assertEqual(set(self.get_user().groups), {self.get_users_group(), self.get_admin_group()}) result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--no-moderator-group', '--clear-groups', '--add-group', 'uffd_access', '--remove-role', 'admin', '--add-role', 'test']) self.assertEqual(result.exit_code, 0) with self.app.test_request_context(): role = Role.query.filter_by(name='base').first() self.assertIsNone(role.moderator_group) self.assertEqual(list(role.groups), [self.get_access_group()]) self.assertEqual(role.included_roles, Role.query.filter_by(name='test').all()) self.assertEqual(set(self.get_user().groups), {self.get_access_group()}) result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--clear-roles']) self.assertEqual(result.exit_code, 0) with self.app.test_request_context(): role = Role.query.filter_by(name='base').first() self.assertEqual(role.included_roles, []) self.assertEqual(role.is_default, True) self.assertEqual(set(self.get_user().groups), {self.get_access_group()}) result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--no-default']) self.assertEqual(result.exit_code, 0) with self.app.test_request_context(): role = Role.query.filter_by(name='base').first() self.assertEqual(role.is_default, False) self.assertEqual(set(self.get_user().groups), set()) result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--default']) self.assertEqual(result.exit_code, 0) with self.app.test_request_context(): role = Role.query.filter_by(name='base').first() self.assertEqual(role.is_default, True) self.assertEqual(set(self.get_user().groups), {self.get_access_group()}) def test_delete(self): with self.app.test_request_context(): self.assertIsNotNone(Role.query.filter_by(name='test').first()) result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'test']) self.assertEqual(result.exit_code, 0) with self.app.test_request_context(): self.assertIsNone(Role.query.filter_by(name='test').first()) result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'doesnotexist']) self.assertEqual(result.exit_code, 1) with self.app.test_request_context(): self.assertIn(self.get_admin_group(), self.get_admin().groups) result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'admin']) self.assertEqual(result.exit_code, 0) with self.app.test_request_context(): self.assertNotIn(self.get_admin_group(), self.get_admin().groups) with self.app.test_request_context(): self.assertIn(self.get_access_group(), self.get_user().groups) result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'base']) self.assertEqual(result.exit_code, 0) with self.app.test_request_context(): self.assertNotIn(self.get_access_group(), self.get_user().groups)