diff --git a/README.md b/README.md index defb80c33342f837fd49f3b09f200eaa98b18038..49293d90ffe09456518ba2a469854c47a820041d 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,13 @@ FLASK_APP=uffd FLASK_ENV=development flask run During development, you may want to create some example data: ``` -FLASK_APP=uffd flask create-examples +export FLASK_APP=uffd +flask group create 'uffd_access' --description 'Access to Single-Sign-On and Selfservice' +flask group create 'uffd_admin' --description 'Admin access to uffd' +flask role create 'base' --default --add-group 'uffd_access' +flask role create 'admin' --default --add-group 'uffd_admin' +flask user create 'testuser' --password 'userpassword' --mail 'test@example.com' --displayname 'Test User' +flask user create 'testadmin' --password 'adminpassword' --mail 'admin@example.com' --displayname 'Test Admin' --add-role 'admin' ``` Afterwards you can login as a normal user with "testuser" and "userpassword", or as an admin with "testad diff --git a/tests/test_role.py b/tests/test_role.py index 7ede7f7a46b112fea637dee0253d9bbad84ab4a3..7186f790969eee4cacdd097fc5c1d7590de62bca 100644 --- a/tests/test_role.py +++ b/tests/test_role.py @@ -287,3 +287,135 @@ class TestRoleViews(UffdTestCase): self.assertSetEqual(set(role.members), {service_user}) self.assertSetEqual(set(self.get_user().roles_effective), {admin_role}) self.assertSetEqual(set(self.get_admin().roles_effective), {admin_role}) + +class TestRoleCLI(UffdTestCase): + def setUp(self): + super().setUp() + role = Role(name='admin') + db.session.add(role) + role.groups[self.get_admin_group()] = RoleGroup(group=self.get_admin_group()) + role.members.append(self.get_admin()) + role = Role(name='base', is_default=True) + db.session.add(role) + role.groups[self.get_access_group()] = RoleGroup(group=self.get_access_group()) + db.session.add(Role(name='test')) + for user in User.query: + user.update_groups() + db.session.commit() + self.client.__exit__(None, None, None) + + def test_list(self): + result = self.app.test_cli_runner().invoke(args=['role', 'list']) + self.assertEqual(result.exit_code, 0) + + def test_show(self): + result = self.app.test_cli_runner().invoke(args=['role', 'show', 'admin']) + self.assertEqual(result.exit_code, 0) + result = self.app.test_cli_runner().invoke(args=['role', 'show', 'doesnotexist']) + self.assertEqual(result.exit_code, 1) + + def test_create(self): + result = self.app.test_cli_runner().invoke(args=['role', 'create', 'test']) # conflicting name + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--moderator-group', 'doesnotexist']) # invalid mod group + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--add-group', 'doesnotexist']) # invalid group + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--add-role', 'doesnotexist']) # invalid role + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--description', 'Role description', + '--moderator-group', 'uffd_admin', '--add-group', 'users', + '--add-role', 'admin']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + role = Role.query.filter_by(name='newrole').one() + self.assertIsNotNone(role) + self.assertEqual(role.description, 'Role description') + self.assertEqual(role.moderator_group, self.get_admin_group()) + self.assertEqual(list(role.groups), [self.get_users_group()]) + self.assertEqual(role.included_roles, Role.query.filter_by(name='admin').all()) + with self.app.test_request_context(): + for user in User.query: + self.assertNotIn(self.get_users_group(), user.groups) + result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newbase', '--add-group', 'users', '--default']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + for user in User.query: + self.assertIn(self.get_users_group(), user.groups) + + def test_update(self): + result = self.app.test_cli_runner().invoke(args=['role', 'update', 'doesnotexist', '--description', 'New description']) + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--add-group', 'doesnotexist']) + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--remove-group', 'doesnotexist']) + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--add-role', 'doesnotexist']) + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--remove-role', 'doesnotexist']) + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--moderator-group', 'doesnotexist']) + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--description', 'New description', + '--moderator-group', 'uffd_admin', '--add-group', 'users', + '--remove-group', 'uffd_access', '--add-role', 'admin']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + role = Role.query.filter_by(name='base').first() + self.assertIsNotNone(role) + self.assertEqual(role.description, 'New description') + self.assertEqual(role.moderator_group, self.get_admin_group()) + self.assertEqual(list(role.groups), [self.get_users_group()]) + self.assertEqual(role.included_roles, Role.query.filter_by(name='admin').all()) + self.assertEqual(set(self.get_user().groups), {self.get_users_group(), self.get_admin_group()}) + result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--no-moderator-group', '--clear-groups', + '--add-group', 'uffd_access', '--remove-role', 'admin', + '--add-role', 'test']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + role = Role.query.filter_by(name='base').first() + self.assertIsNone(role.moderator_group) + self.assertEqual(list(role.groups), [self.get_access_group()]) + self.assertEqual(role.included_roles, Role.query.filter_by(name='test').all()) + self.assertEqual(set(self.get_user().groups), {self.get_access_group()}) + result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--clear-roles']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + role = Role.query.filter_by(name='base').first() + self.assertEqual(role.included_roles, []) + self.assertEqual(role.is_default, True) + self.assertEqual(set(self.get_user().groups), {self.get_access_group()}) + result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--no-default']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + role = Role.query.filter_by(name='base').first() + self.assertEqual(role.is_default, False) + self.assertEqual(set(self.get_user().groups), set()) + result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--default']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + role = Role.query.filter_by(name='base').first() + self.assertEqual(role.is_default, True) + self.assertEqual(set(self.get_user().groups), {self.get_access_group()}) + + def test_delete(self): + with self.app.test_request_context(): + self.assertIsNotNone(Role.query.filter_by(name='test').first()) + result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'test']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + self.assertIsNone(Role.query.filter_by(name='test').first()) + result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'doesnotexist']) + self.assertEqual(result.exit_code, 1) + with self.app.test_request_context(): + self.assertIn(self.get_admin_group(), self.get_admin().groups) + result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'admin']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + self.assertNotIn(self.get_admin_group(), self.get_admin().groups) + with self.app.test_request_context(): + self.assertIn(self.get_access_group(), self.get_user().groups) + result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'base']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + self.assertNotIn(self.get_access_group(), self.get_user().groups) diff --git a/tests/test_user.py b/tests/test_user.py index 139b5c46fdf2fa9f0ee2c59e98d938fc46e1d374..fbc06a2a72e9bc56e1554b67e8e6c91c1331fe52 100644 --- a/tests/test_user.py +++ b/tests/test_user.py @@ -7,7 +7,7 @@ from flask import url_for, session from uffd import user from uffd.user.models import User, Group -from uffd.role.models import Role +from uffd.role.models import Role, RoleGroup from uffd import create_app, db from utils import dump, UffdTestCase @@ -352,6 +352,100 @@ newuser12,newuser12@example.com,{role1.id};{role1.id} roles = sorted([r.name for r in user.roles]) self.assertEqual(roles, ['role1']) +class TestUserCLI(UffdTestCase): + def setUp(self): + super().setUp() + role = Role(name='admin') + role.groups[self.get_admin_group()] = RoleGroup(group=self.get_admin_group()) + db.session.add(role) + db.session.add(Role(name='test')) + db.session.commit() + self.client.__exit__(None, None, None) + + def test_list(self): + result = self.app.test_cli_runner().invoke(args=['user', 'list']) + self.assertEqual(result.exit_code, 0) + + def test_show(self): + result = self.app.test_cli_runner().invoke(args=['user', 'show', 'testuser']) + self.assertEqual(result.exit_code, 0) + result = self.app.test_cli_runner().invoke(args=['user', 'show', 'doesnotexist']) + self.assertEqual(result.exit_code, 1) + + def test_create(self): + result = self.app.test_cli_runner().invoke(args=['user', 'create', 'new user', '--mail', 'foobar@example.com']) # invalid login name + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', '']) # invalid mail + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'foobar@example.com', '--password', '']) # invalid password + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'foobar@example.com', '--displayname', '']) # invalid display name + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'foobar@example.com', '--add-role', 'doesnotexist']) # unknown role + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['user', 'create', 'testuser', '--mail', 'foobar@example.com']) # conflicting name + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'newmail@example.com', + '--displayname', 'New Display Name', '--password', 'newpassword', '--add-role', 'admin']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + user = User.query.filter_by(loginname='newuser').first() + self.assertIsNotNone(user) + self.assertEqual(user.mail, 'newmail@example.com') + self.assertEqual(user.displayname, 'New Display Name') + self.assertTrue(user.check_password('newpassword')) + self.assertEqual(user.roles, Role.query.filter_by(name='admin').all()) + self.assertIn(self.get_admin_group(), user.groups) + + def test_update(self): + result = self.app.test_cli_runner().invoke(args=['user', 'update', 'doesnotexist', '--displayname', 'foo']) + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--mail', '']) # invalid mail + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--password', '']) # invalid password + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--displayname', '']) # invalid display name + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--remove-role', 'doesnotexist']) + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--mail', 'newmail@example.com', + '--displayname', 'New Display Name', '--password', 'newpassword']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + user = User.query.filter_by(loginname='testuser').first() + self.assertIsNotNone(user) + self.assertEqual(user.mail, 'newmail@example.com') + self.assertEqual(user.displayname, 'New Display Name') + self.assertTrue(user.check_password('newpassword')) + result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--add-role', 'admin', '--add-role', 'test']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + user = User.query.filter_by(loginname='testuser').first() + self.assertEqual(set(user.roles), {Role.query.filter_by(name='admin').one(), Role.query.filter_by(name='test').one()}) + self.assertIn(self.get_admin_group(), user.groups) + result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--remove-role', 'admin']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + user = User.query.filter_by(loginname='testuser').first() + self.assertEqual(user.roles, Role.query.filter_by(name='test').all()) + self.assertNotIn(self.get_admin_group(), user.groups) + result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--clear-roles', '--add-role', 'admin']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + user = User.query.filter_by(loginname='testuser').first() + self.assertEqual(user.roles, Role.query.filter_by(name='admin').all()) + self.assertIn(self.get_admin_group(), user.groups) + + def test_delete(self): + with self.app.test_request_context(): + self.assertIsNotNone(User.query.filter_by(loginname='testuser').first()) + result = self.app.test_cli_runner().invoke(args=['user', 'delete', 'testuser']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + self.assertIsNone(User.query.filter_by(loginname='testuser').first()) + result = self.app.test_cli_runner().invoke(args=['user', 'delete', 'doesnotexist']) + self.assertEqual(result.exit_code, 1) + class TestGroupViews(UffdTestCase): def setUp(self): super().setUp() @@ -458,3 +552,50 @@ class TestGroupViews(UffdTestCase): self.assertEqual(r.status_code, 200) self.assertIsNone(Group.query.get(group1_id)) self.assertIsNotNone(Group.query.get(group2_id)) + +class TestGroupCLI(UffdTestCase): + def setUp(self): + super().setUp() + self.client.__exit__(None, None, None) + + def test_list(self): + result = self.app.test_cli_runner().invoke(args=['group', 'list']) + self.assertEqual(result.exit_code, 0) + + def test_show(self): + result = self.app.test_cli_runner().invoke(args=['group', 'show', 'users']) + self.assertEqual(result.exit_code, 0) + result = self.app.test_cli_runner().invoke(args=['group', 'show', 'doesnotexist']) + self.assertEqual(result.exit_code, 1) + + def test_create(self): + result = self.app.test_cli_runner().invoke(args=['group', 'create', 'users']) # Duplicate name + self.assertEqual(result.exit_code, 1) + # See #127 (Enforce alphabet and length constraints for Group.name) + #result = self.app.test_cli_runner().invoke(args=['group', 'create', 'new group']) + #self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['group', 'create', 'newgroup', '--description', 'A new group']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + group = Group.query.filter_by(name='newgroup').first() + self.assertIsNotNone(group) + self.assertEqual(group.description, 'A new group') + + def test_update(self): + result = self.app.test_cli_runner().invoke(args=['group', 'update', 'doesnotexist', '--description', 'foo']) + self.assertEqual(result.exit_code, 1) + result = self.app.test_cli_runner().invoke(args=['group', 'update', 'users', '--description', 'New description']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + group = Group.query.filter_by(name='users').first() + self.assertEqual(group.description, 'New description') + + def test_delete(self): + with self.app.test_request_context(): + self.assertIsNotNone(Group.query.filter_by(name='users').first()) + result = self.app.test_cli_runner().invoke(args=['group', 'delete', 'users']) + self.assertEqual(result.exit_code, 0) + with self.app.test_request_context(): + self.assertIsNone(Group.query.filter_by(name='users').first()) + result = self.app.test_cli_runner().invoke(args=['group', 'delete', 'doesnotexist']) + self.assertEqual(result.exit_code, 1) diff --git a/uffd/__init__.py b/uffd/__init__.py index 441501da9eebc0b1c7c8502aef9313be4fbc4a7c..f5190360fa21115e9d33ee3b9c440aff5ebf666e 100644 --- a/uffd/__init__.py +++ b/uffd/__init__.py @@ -129,28 +129,6 @@ def create_app(test_config=None): # pylint: disable=too-many-locals,too-many-sta app.wsgi_app = ProfilerMiddleware(app.wsgi_app, restrictions=[30]) app.run(debug=True) - @app.cli.command("create-examples", help='Create example users, groups and roles') - def create_examples(): #pylint: disable=unused-variable - assert app.debug - with app.test_request_context(): - access_group = Group(name='uffd_access', description='Access to Single-Sign-On and Selfservice') - db.session.add(access_group) - admin_group = Group(name='uffd_admin', description='Admin access to uffd') - db.session.add(admin_group) - base_role = Role(name='base', is_default=True, groups={access_group: RoleGroup(group=access_group)}, description='Base role for all regular users') - db.session.add(base_role) - admin_role = Role(name='admin', groups={admin_group: RoleGroup(group=admin_group)}, description='Admin role') - db.session.add(admin_role) - testuser = User(loginname='testuser', password='userpassword', mail='test@example.com', displayname='Test User') - testuser.update_groups() - db.session.add(testuser) - testadmin = User(loginname='testadmin', password='adminpassword', mail='admin@example.com', displayname='Test Admin', roles=[admin_role]) - testadmin.update_groups() - db.session.add(testadmin) - testmail = Mail(uid='test', receivers=['test1@example.com', 'test2@example.com'], destinations=['testuser@mail.example.com']) - db.session.add(testmail) - db.session.commit() - babel = Babel(app) @babel.localeselector diff --git a/uffd/role/__init__.py b/uffd/role/__init__.py index 671578662f91c82cb987ffe679c1f102dc493d1f..550874c08599da42cff0a0caf7c4d2c04e085400 100644 --- a/uffd/role/__init__.py +++ b/uffd/role/__init__.py @@ -1,3 +1,4 @@ from .views import bp as bp_ui +from .cli import bp as bp_cli -bp = [bp_ui] +bp = [bp_ui, bp_cli] diff --git a/uffd/role/cli.py b/uffd/role/cli.py new file mode 100644 index 0000000000000000000000000000000000000000..54d69c350f8d32bfe9f6a1e768e0bf9f92b0ed88 --- /dev/null +++ b/uffd/role/cli.py @@ -0,0 +1,140 @@ +from flask import Blueprint, current_app +from flask.cli import AppGroup +from sqlalchemy.exc import IntegrityError +import click + +from uffd.database import db +from uffd.user.models import Group + +from .models import Role, RoleGroup + +bp = Blueprint('role_cli', __name__) +role_cli = AppGroup('role', help='Manage roles') + +@bp.record +def add_cli_commands(state): + state.app.cli.add_command(role_cli) + +# pylint: disable=too-many-arguments,too-many-locals + +def update_attrs(role, description=None, default=None, + moderator_group=None, clear_moderator_group=False, + clear_groups=False, add_group=tuple(), remove_group=tuple(), + clear_roles=False, add_role=tuple(), remove_role=tuple()): + if description is not None: + role.description = description + if default is not None: + role.is_default = default + if clear_moderator_group: + role.moderator_group = None + elif moderator_group is not None: + group = Group.query.filter_by(name=moderator_group).one_or_none() + if group is None: + raise click.ClickException(f'Moderaor group {moderator_group} not found') + role.moderator_group = group + if clear_groups: + role.groups.clear() + for group_name in add_group: + group = Group.query.filter_by(name=group_name).one_or_none() + if group is None: + raise click.ClickException(f'Group {group_name} not found') + role.groups[group] = RoleGroup(group=group) + for group_name in remove_group: + group = Group.query.filter_by(name=group_name).one_or_none() + if group is None: + raise click.ClickException(f'Group {group_name} not found') + del role.groups[group] + if clear_roles: + role.included_roles.clear() + for role_name in add_role: + _role = Role.query.filter_by(name=role_name).one_or_none() + if _role is None: + raise click.ClickException(f'Role {role_name} not found') + role.included_roles.append(_role) + for role_name in remove_role: + _role = Role.query.filter_by(name=role_name).one_or_none() + if _role is None: + raise click.ClickException(f'Role {role_name} not found') + role.included_roles.remove(_role) + +@role_cli.command(help='List names of all roles') +def list(): + with current_app.test_request_context(): + for role in Role.query: + click.echo(role.name) + +@role_cli.command(help='Show details of group') +@click.argument('name') +def show(name): + with current_app.test_request_context(): + role = Role.query.filter_by(name=name).one_or_none() + if role is None: + raise click.ClickException(f'Role {name} not found') + click.echo(f'Name: {role.name}') + click.echo(f'Description: {role.description}') + click.echo(f'Default: {role.is_default}') + click.echo(f'Moderator group: {role.moderator_group.name if role.moderator_group else None}') + click.echo(f'Direct groups: {", ".join(sorted([group.name for group in role.groups]))}') + click.echo(f'Effective groups: {", ".join(sorted([group.name for group in role.groups_effective]))}') + click.echo(f'Included roles: {", ".join(sorted([irole.name for irole in role.included_roles]))}') + click.echo(f'Direct members: {", ".join(sorted([user.loginname for user in role.members]))}') + click.echo(f'Effective members: {", ".join(sorted([user.loginname for user in role.members_effective]))}') + +@role_cli.command(help='Create new role') +@click.argument('name') +@click.option('--description', default='', help='Set description text.') +@click.option('--default/--no-default', default=False, help='Mark role as default or not. Non-service users are auto-added to default roles.') +@click.option('--moderator-group', metavar='GROUP_NAME', help='Set moderator group. No moderator group if unset.') +@click.option('--add-group', multiple=True, metavar='GROUP_NAME', help='Add group granted to role members. Repeat to add multiple groups.') +@click.option('--add-role', multiple=True, metavar='ROLE_NAME', help='Add role to inherit groups from. Repeat to add multiple roles.') +def create(name, description, default, moderator_group, add_group, add_role): + with current_app.test_request_context(): + try: + role = Role(name=name) + update_attrs(role, description, default, moderator_group, + add_group=add_group, add_role=add_role) + db.session.add(role) + role.update_member_groups() + db.session.commit() + except IntegrityError as ex: + raise click.ClickException(f'Role creation failed: {ex}') + +@role_cli.command(help='Update role attributes') +@click.argument('name') +@click.option('--description', default='', help='Set description text.') +@click.option('--default/--no-default', default=None, help='Mark role as default or not. Non-service users are auto-added to default roles.') +@click.option('--moderator-group', metavar='GROUP_NAME', help='Set moderator group.') +@click.option('--no-moderator-group', is_flag=True, flag_value=True, default=False, help='Clear moderator group setting.') +@click.option('--clear-groups', is_flag=True, flag_value=True, default=False, help='Remove all groups granted to role members. Executed before --add-group.') +@click.option('--add-group', multiple=True, metavar='GROUP_NAME', help='Add group granted to role members. Repeat to add multiple groups.') +@click.option('--remove-group', multiple=True, metavar='GROUP_NAME', help='Remove group granted to role members. Repeat to remove multiple groups.') +@click.option('--clear-roles', is_flag=True, flag_value=True, default=False, help='Remove all included roles. Executed before --add-role.') +@click.option('--add-role', multiple=True, metavar='ROLE_NAME', help='Add role to inherit groups from. Repeat to add multiple roles.') +@click.option('--remove-role', multiple=True, metavar='ROLE_NAME', help='Remove included role. Repeat to remove multiple roles.') +def update(name, description, default, moderator_group, no_moderator_group, + clear_groups, add_group, remove_group, clear_roles, add_role, remove_role): + with current_app.test_request_context(): + role = Role.query.filter_by(name=name).one_or_none() + if role is None: + raise click.ClickException(f'Role {name} not found') + old_members = set(role.members_effective) + update_attrs(role, description, default, moderator_group, + no_moderator_group, clear_groups, add_group, remove_group, + clear_roles, add_role, remove_role) + for user in old_members: + user.update_groups() + role.update_member_groups() + db.session.commit() + +@role_cli.command(help='Delete role') +@click.argument('name') +def delete(name): + with current_app.test_request_context(): + role = Role.query.filter_by(name=name).one_or_none() + if role is None: + raise click.ClickException(f'Role {name} not found') + old_members = set(role.members_effective) + db.session.delete(role) + for user in old_members: + user.update_groups() + db.session.commit() diff --git a/uffd/user/__init__.py b/uffd/user/__init__.py index 3d56a5689b10fdc41e607c5a15dd2fae86d90c73..17a20ec5bc904b2bf507b0e3f9aa922767a2452e 100644 --- a/uffd/user/__init__.py +++ b/uffd/user/__init__.py @@ -1,4 +1,6 @@ from .views_user import bp as bp_user +from .cli_user import bp as bp_cli_user from .views_group import bp as bp_group +from .cli_group import bp as bp_cli_group -bp = [bp_user, bp_group] +bp = [bp_user, bp_group, bp_cli_user, bp_cli_group] diff --git a/uffd/user/cli_group.py b/uffd/user/cli_group.py new file mode 100644 index 0000000000000000000000000000000000000000..753a3a50372262fed0a9e6486fc25a597734941f --- /dev/null +++ b/uffd/user/cli_group.py @@ -0,0 +1,66 @@ +from flask import Blueprint, current_app +from flask.cli import AppGroup +from sqlalchemy.exc import IntegrityError +import click + +from uffd.database import db + +from .models import Group + +bp = Blueprint('group_cli', __name__) +group_cli = AppGroup('group', help='Manage groups') + +@bp.record +def add_cli_commands(state): + state.app.cli.add_command(group_cli) + +@group_cli.command(help='List names of all groups') +def list(): + with current_app.test_request_context(): + for group in Group.query: + click.echo(group.name) + +@group_cli.command(help='Show details of group') +@click.argument('name') +def show(name): + with current_app.test_request_context(): + group = Group.query.filter_by(name=name).one_or_none() + if group is None: + raise click.ClickException(f'Group {name} not found') + click.echo(f'Name: {group.name}') + click.echo(f'Unix GID: {group.unix_gid}') + click.echo(f'Description: {group.description}') + click.echo(f'Members: {", ".join([user.loginname for user in group.members])}') + +@group_cli.command(help='Create new group') +@click.argument('name') +@click.option('--description', default='', help='Set description text. Empty per default.') +def create(name, description): + with current_app.test_request_context(): + try: + db.session.add(Group(name=name, description=description)) + db.session.commit() + except IntegrityError as ex: + raise click.ClickException(f'Group creation failed: {ex}') + +@group_cli.command(help='Update group attributes') +@click.argument('name') +@click.option('--description', default='', help='Set description text.') +def update(name, description): + with current_app.test_request_context(): + group = Group.query.filter_by(name=name).one_or_none() + if group is None: + raise click.ClickException(f'Group {name} not found') + if description is not None: + group.description = description + db.session.commit() + +@group_cli.command(help='Delete group') +@click.argument('name') +def delete(name): + with current_app.test_request_context(): + group = Group.query.filter_by(name=name).one_or_none() + if group is None: + raise click.ClickException(f'Group {name} not found') + db.session.delete(group) + db.session.commit() diff --git a/uffd/user/cli_user.py b/uffd/user/cli_user.py new file mode 100644 index 0000000000000000000000000000000000000000..490af1cf2a84441ad6d2fff7e24e94b43a8e76e1 --- /dev/null +++ b/uffd/user/cli_user.py @@ -0,0 +1,114 @@ +from flask import Blueprint, current_app +from flask.cli import AppGroup +from sqlalchemy.exc import IntegrityError +import click + +from uffd.role.models import Role +from uffd.database import db + +from .models import User + +bp = Blueprint('user_cli', __name__) +user_cli = AppGroup('user', help='Manage users') + +@bp.record +def add_cli_commands(state): + state.app.cli.add_command(user_cli) + +# pylint: disable=too-many-arguments + +def update_attrs(user, mail=None, displayname=None, password=None, + prompt_password=False, clear_roles=False, + add_role=tuple(), remove_role=tuple()): + if password is None and prompt_password: + password = click.prompt('Password', hide_input=True, confirmation_prompt='Confirm password') + if mail is not None and not user.set_mail(mail): + raise click.ClickException('Invalid mail address') + if displayname is not None and not user.set_displayname(displayname): + raise click.ClickException('Invalid displayname') + if password is not None and not user.set_password(password): + raise click.ClickException('Invalid password') + if clear_roles: + user.roles.clear() + for role_name in add_role: + role = Role.query.filter_by(name=role_name).one_or_none() + if role is None: + raise click.ClickException(f'Role {role_name} not found') + role.members.append(user) + for role_name in remove_role: + role = Role.query.filter_by(name=role_name).one_or_none() + if role is None: + raise click.ClickException(f'Role {role_name} not found') + role.members.remove(user) + user.update_groups() + +@user_cli.command(help='List login names of all users') +def list(): + with current_app.test_request_context(): + for user in User.query: + click.echo(user.loginname) + +@user_cli.command(help='Show details of user') +@click.argument('loginname') +def show(loginname): + with current_app.test_request_context(): + user = User.query.filter_by(loginname=loginname).one_or_none() + if user is None: + raise click.ClickException(f'User {loginname} not found') + click.echo(f'Loginname: {user.loginname}') + click.echo(f'Displayname: {user.displayname}') + click.echo(f'Mail: {user.mail}') + click.echo(f'Service User: {user.is_service_user}') + click.echo(f'Roles: {", ".join([role.name for role in user.roles])}') + click.echo(f'Groups: {", ".join([group.name for group in user.groups])}') + +@user_cli.command(help='Create new user') +@click.argument('loginname') +@click.option('--mail', required=True, metavar='EMAIL_ADDRESS', help='E-Mail address') +@click.option('--displayname', help='Set display name. Defaults to login name.') +@click.option('--service/--no-service', default=False, help='Create service or regular (default) user. '+\ + 'Regular users automatically have roles marked as default. '+\ + 'Service users do not.') +@click.option('--password', help='Password for SSO login. Login disabled if unset.') +@click.option('--prompt-password', is_flag=True, flag_value=True, default=False, help='Read password interactively from terminal.') +@click.option('--add-role', multiple=True, help='Add role to user. Repeat to add multiple roles.', metavar='ROLE_NAME') +def create(loginname, mail, displayname, service, password, prompt_password, add_role): + with current_app.test_request_context(): + if displayname is None: + displayname = loginname + user = User(is_service_user=service) + if not user.set_loginname(loginname, ignore_blocklist=True): + raise click.ClickException('Invalid loginname') + try: + db.session.add(user) + update_attrs(user, mail, displayname, password, prompt_password, add_role=add_role) + db.session.commit() + except IntegrityError as ex: + raise click.ClickException(f'User creation failed: {ex}') + +@user_cli.command(help='Update user attributes and roles') +@click.argument('loginname') +@click.option('--mail', metavar='EMAIL_ADDRESS', help='Set e-mail address.') +@click.option('--displayname', help='Set display name.') +@click.option('--password', help='Set password for SSO login.') +@click.option('--prompt-password', is_flag=True, flag_value=True, default=False, help='Set password by reading it interactivly from terminal.') +@click.option('--clear-roles', is_flag=True, flag_value=True, default=False, help='Remove all roles from user. Executed before --add-role.') +@click.option('--add-role', multiple=True, help='Add role to user. Repeat to add multiple roles.') +@click.option('--remove-role', multiple=True, help='Remove role from user. Repeat to remove multiple roles.') +def update(loginname, mail, displayname, password, prompt_password, clear_roles, add_role, remove_role): + with current_app.test_request_context(): + user = User.query.filter_by(loginname=loginname).one_or_none() + if user is None: + raise click.ClickException(f'User {loginname} not found') + update_attrs(user, mail, displayname, password, prompt_password, clear_roles, add_role, remove_role) + db.session.commit() + +@user_cli.command(help='Delete user') +@click.argument('loginname') +def delete(loginname): + with current_app.test_request_context(): + user = User.query.filter_by(loginname=loginname).one_or_none() + if user is None: + raise click.ClickException(f'User {loginname} not found') + db.session.delete(user) + db.session.commit()