Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Show changes
Showing
with 1441 additions and 39 deletions
location / {
uwsgi_pass unix:///run/uwsgi/app/uffd/socket;
include uwsgi_params;
}
location /static {
alias /usr/share/uffd/uffd/static;
}
[pytest]
filterwarnings =
# DeprecationWarning from dependencies that we use
ignore:`formatargspec` is deprecated since Python 3.5. Use `signature` and the `Signature` object directly:DeprecationWarning
ignore:Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3, and in 3.10 it will stop working:DeprecationWarning
# Versions from Debian Buster
ldap3==2.4.1
flask==1.0.2
Flask-SQLAlchemy==2.1
qrcode==6.1
fido2==0.5.0
Flask-OAuthlib==0.9.5
Flask-Migrate==2.1.1
Flask-Babel==0.11.2
alembic==1.0.0
# The main dependencies on their own lead to version collisions and pip is
# not very good at resolving them, so we pin the versions from Debian Buster
# for all dependencies.
certifi==2018.8.24
#cffi==1.12.2
cffi # v1.12.2 no longer works with python3.9. Newer versions seem to work fine.
chardet==3.0.4
click==7.0
cryptography==2.6.1
idna==2.6
itsdangerous==0.24
Jinja2==2.10
MarkupSafe==1.1.0
oauthlib==2.1.0
pyasn1==0.4.2
pycparser==2.19
requests==2.21.0
requests-oauthlib==1.0.0
six==1.12.0
SQLAlchemy==1.2.18
urllib3==1.24.1
Werkzeug==0.14.1
python-dateutil==2.7.3
#editor==1.0.3
Mako==1.0.7
.
# Testing
pytest==3.10.1
......
......@@ -3,17 +3,18 @@ import os
with open('README.md', 'r', encoding='utf-8') as f:
long_description = f.read()
long_description = '**DO NOT INSTALL FROM PIP FOR PRODUCTION DEPLOYMENTS**, see [Deployment](#Deployment) for more information.\n\n\n\n' + long_description
setup(
name='uffd',
version=os.environ.get('PACKAGE_VERSION', 'local'),
description='UserFerwaltungsFrontend: Ldap based single sign on and user management web software',
description='Web-based user management and single sign-on software',
long_description=long_description,
long_description_content_type='text/markdown',
url='https://git.cccv.de/uffd/uffd',
classifiers=[
'Programming Language :: Python :: 3',
'Development Status :: 4 - Beta',
'Development Status :: 5 - Production/Stable',
'License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)',
'Operating System :: OS Independent',
'Topic :: System :: Systems Administration :: Authentication/Directory :: LDAP',
......@@ -24,7 +25,48 @@ setup(
author='CCCV',
author_email='it@cccv.de',
license='AGPL3',
packages=find_packages(),
packages=['uffd'],
include_package_data=True,
zip_safe=False,
python_requires='>=3.7',
install_requires=[
# Versions Debian Buster packages are based on.
# DO NOT USE FOR PRODUCTION, those in the setup.py are not updated regularly
'flask==1.0.2',
'Flask-SQLAlchemy==2.1',
'qrcode==6.1',
'fido2==0.5.0',
'cryptography==2.6.1',
'pyjwt==1.7.0',
'Flask-Migrate==2.1.1',
'Flask-Babel==0.11.2',
'alembic==1.0.0',
'argon2-cffi==18.3.0',
'itsdangerous==0.24',
'prometheus-client==0.9',
'ua-parser==0.8.0',
# The main dependencies on their own lead to version collisions and pip is
# not very good at resolving them, so we pin the versions from Debian Buster
# for all dependencies.
'certifi==2018.8.24',
#cffi==1.12.2'
'cffi # v1.12.2 no longer works with python3.9. Newer versions seem to work fine.',
'chardet==3.0.4',
'click==7.0',
'idna==2.6',
'Jinja2==2.10',
'MarkupSafe==1.1.0',
'pyasn1==0.4.2',
'pycparser==2.19',
'requests==2.21.0',
'requests-oauthlib==1.0.0',
'six==1.12.0',
'SQLAlchemy==1.2.18',
'urllib3==1.24.1',
'Werkzeug==0.14.1',
'python-dateutil==2.7.3',
#editor==1.0.3
'Mako==1.0.7',
],
)
from uffd.database import db
from uffd.models import User, Role, RoleGroup
from tests.utils import UffdTestCase
class TestRoleCLI(UffdTestCase):
def setUp(self):
super().setUp()
role = Role(name='admin')
db.session.add(role)
role.groups[self.get_admin_group()] = RoleGroup(group=self.get_admin_group())
role.members.append(self.get_admin())
role = Role(name='base', is_default=True)
db.session.add(role)
role.groups[self.get_access_group()] = RoleGroup(group=self.get_access_group())
db.session.add(Role(name='test'))
for user in User.query:
user.update_groups()
db.session.commit()
self.client.__exit__(None, None, None)
def test_list(self):
result = self.app.test_cli_runner().invoke(args=['role', 'list'])
self.assertEqual(result.exit_code, 0)
def test_show(self):
result = self.app.test_cli_runner().invoke(args=['role', 'show', 'admin'])
self.assertEqual(result.exit_code, 0)
result = self.app.test_cli_runner().invoke(args=['role', 'show', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
def test_create(self):
result = self.app.test_cli_runner().invoke(args=['role', 'create', 'test']) # conflicting name
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--moderator-group', 'doesnotexist']) # invalid mod group
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--add-group', 'doesnotexist']) # invalid group
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--add-role', 'doesnotexist']) # invalid role
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--description', 'Role description',
'--moderator-group', 'uffd_admin', '--add-group', 'users',
'--add-role', 'admin'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='newrole').one()
self.assertIsNotNone(role)
self.assertEqual(role.description, 'Role description')
self.assertEqual(role.moderator_group, self.get_admin_group())
self.assertEqual(list(role.groups), [self.get_users_group()])
self.assertEqual(role.included_roles, Role.query.filter_by(name='admin').all())
with self.app.test_request_context():
for user in User.query:
self.assertNotIn(self.get_users_group(), user.groups)
result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newbase', '--add-group', 'users', '--default'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
for user in User.query:
self.assertIn(self.get_users_group(), user.groups)
def test_update(self):
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'doesnotexist', '--description', 'New description'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--add-group', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--remove-group', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--add-role', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--remove-role', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--moderator-group', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--description', 'New description',
'--moderator-group', 'uffd_admin', '--add-group', 'users',
'--remove-group', 'uffd_access', '--add-role', 'admin'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='base').first()
self.assertIsNotNone(role)
self.assertEqual(role.description, 'New description')
self.assertEqual(role.moderator_group, self.get_admin_group())
self.assertEqual(list(role.groups), [self.get_users_group()])
self.assertEqual(role.included_roles, Role.query.filter_by(name='admin').all())
self.assertEqual(set(self.get_user().groups), {self.get_users_group(), self.get_admin_group()})
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--no-moderator-group', '--clear-groups',
'--add-group', 'uffd_access', '--remove-role', 'admin',
'--add-role', 'test'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='base').first()
self.assertIsNone(role.moderator_group)
self.assertEqual(list(role.groups), [self.get_access_group()])
self.assertEqual(role.included_roles, Role.query.filter_by(name='test').all())
self.assertEqual(set(self.get_user().groups), {self.get_access_group()})
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--clear-roles'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='base').first()
self.assertEqual(role.included_roles, [])
self.assertEqual(role.is_default, True)
self.assertEqual(set(self.get_user().groups), {self.get_access_group()})
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--no-default'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='base').first()
self.assertEqual(role.is_default, False)
self.assertEqual(set(self.get_user().groups), set())
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--default'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='base').first()
self.assertEqual(role.is_default, True)
self.assertEqual(set(self.get_user().groups), {self.get_access_group()})
# Regression test for https://git.cccv.de/uffd/uffd/-/issues/156
def test_update_without_description(self):
with self.app.test_request_context():
role = Role.query.filter_by(name='test').first()
role.description = 'Test description'
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--clear-groups'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='test').first()
self.assertEqual(role.description, 'Test description')
def test_delete(self):
with self.app.test_request_context():
self.assertIsNotNone(Role.query.filter_by(name='test').first())
result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'test'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertIsNone(Role.query.filter_by(name='test').first())
result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
with self.app.test_request_context():
self.assertIn(self.get_admin_group(), self.get_admin().groups)
result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'admin'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertNotIn(self.get_admin_group(), self.get_admin().groups)
with self.app.test_request_context():
self.assertIn(self.get_access_group(), self.get_user().groups)
result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'base'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertNotIn(self.get_access_group(), self.get_user().groups)
from uffd.database import db
from uffd.models import User, UserEmail, FeatureFlag
from tests.utils import UffdTestCase
class TestUniqueEmailAddressCommands(UffdTestCase):
def setUp(self):
super().setUp()
self.client.__exit__(None, None, None)
def test_enable(self):
result = self.app.test_cli_runner().invoke(args=['unique-email-addresses', 'enable'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertTrue(FeatureFlag.unique_email_addresses)
def test_enable_already_enabled(self):
with self.app.test_request_context():
FeatureFlag.unique_email_addresses.enable()
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['unique-email-addresses', 'enable'])
self.assertEqual(result.exit_code, 1)
def test_enable_user_conflict(self):
with self.app.test_request_context():
db.session.add(UserEmail(user=self.get_user(), address='foo@example.com'))
db.session.add(UserEmail(user=self.get_user(), address='FOO@example.com'))
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['unique-email-addresses', 'enable'])
self.assertEqual(result.exit_code, 1)
def test_enable_global_conflict(self):
with self.app.test_request_context():
db.session.add(UserEmail(user=self.get_user(), address='foo@example.com', verified=True))
db.session.add(UserEmail(user=self.get_admin(), address='FOO@example.com', verified=True))
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['unique-email-addresses', 'enable'])
self.assertEqual(result.exit_code, 1)
def test_disable(self):
with self.app.test_request_context():
FeatureFlag.unique_email_addresses.enable()
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['unique-email-addresses', 'disable'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertFalse(FeatureFlag.unique_email_addresses)
def test_disable_already_enabled(self):
result = self.app.test_cli_runner().invoke(args=['unique-email-addresses', 'disable'])
self.assertEqual(result.exit_code, 1)
from uffd.database import db
from uffd.models import User, Group, Role, RoleGroup, FeatureFlag
from tests.utils import UffdTestCase
class TestUserCLI(UffdTestCase):
def setUp(self):
super().setUp()
role = Role(name='admin')
role.groups[self.get_admin_group()] = RoleGroup(group=self.get_admin_group())
db.session.add(role)
db.session.add(Role(name='test'))
db.session.commit()
self.client.__exit__(None, None, None)
def test_list(self):
result = self.app.test_cli_runner().invoke(args=['user', 'list'])
self.assertEqual(result.exit_code, 0)
def test_show(self):
result = self.app.test_cli_runner().invoke(args=['user', 'show', 'testuser'])
self.assertEqual(result.exit_code, 0)
result = self.app.test_cli_runner().invoke(args=['user', 'show', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
def test_create(self):
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'new user', '--mail', 'foobar@example.com']) # invalid login name
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', '']) # invalid mail
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'foobar@example.com', '--password', '']) # invalid password
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'foobar@example.com', '--displayname', '']) # invalid display name
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'foobar@example.com', '--add-role', 'doesnotexist']) # unknown role
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'testuser', '--mail', 'foobar@example.com']) # conflicting name
self.assertEqual(result.exit_code, 1)
with self.app.test_request_context():
FeatureFlag.unique_email_addresses.enable()
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'test@example.com']) # conflicting email address
self.assertEqual(result.exit_code, 1)
with self.app.test_request_context():
FeatureFlag.unique_email_addresses.disable()
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'newmail@example.com',
'--displayname', 'New Display Name', '--password', 'newpassword', '--add-role', 'admin'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='newuser').first()
self.assertIsNotNone(user)
self.assertEqual(user.primary_email.address, 'newmail@example.com')
self.assertEqual(user.displayname, 'New Display Name')
self.assertTrue(user.password.verify('newpassword'))
self.assertEqual(user.roles, Role.query.filter_by(name='admin').all())
self.assertIn(self.get_admin_group(), user.groups)
self.assertFalse(user.is_deactivated)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser2', '--mail', 'newmail2@example.com', '--deactivate'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='newuser2').first()
self.assertTrue(user.is_deactivated)
def test_update(self):
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'doesnotexist', '--displayname', 'foo'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--mail', '']) # invalid mail
self.assertEqual(result.exit_code, 1)
with self.app.test_request_context():
FeatureFlag.unique_email_addresses.enable()
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--mail', 'admin@example.com']) # conflicting mail
self.assertEqual(result.exit_code, 1)
with self.app.test_request_context():
FeatureFlag.unique_email_addresses.disable()
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--password', '']) # invalid password
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--displayname', '']) # invalid display name
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--remove-role', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--mail', 'newmail@example.com',
'--displayname', 'New Display Name', '--password', 'newpassword'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertIsNotNone(user)
self.assertEqual(user.primary_email.address, 'newmail@example.com')
self.assertEqual(user.displayname, 'New Display Name')
self.assertTrue(user.password.verify('newpassword'))
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--add-role', 'admin', '--add-role', 'test'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertEqual(set(user.roles), {Role.query.filter_by(name='admin').one(), Role.query.filter_by(name='test').one()})
self.assertIn(self.get_admin_group(), user.groups)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--remove-role', 'admin'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertEqual(user.roles, Role.query.filter_by(name='test').all())
self.assertNotIn(self.get_admin_group(), user.groups)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--clear-roles', '--add-role', 'admin'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertEqual(user.roles, Role.query.filter_by(name='admin').all())
self.assertIn(self.get_admin_group(), user.groups)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--deactivate'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertTrue(user.is_deactivated)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--activate'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertFalse(user.is_deactivated)
def test_delete(self):
with self.app.test_request_context():
self.assertIsNotNone(User.query.filter_by(loginname='testuser').first())
result = self.app.test_cli_runner().invoke(args=['user', 'delete', 'testuser'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertIsNone(User.query.filter_by(loginname='testuser').first())
result = self.app.test_cli_runner().invoke(args=['user', 'delete', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
class TestGroupCLI(UffdTestCase):
def setUp(self):
super().setUp()
self.client.__exit__(None, None, None)
def test_list(self):
result = self.app.test_cli_runner().invoke(args=['group', 'list'])
self.assertEqual(result.exit_code, 0)
def test_show(self):
result = self.app.test_cli_runner().invoke(args=['group', 'show', 'users'])
self.assertEqual(result.exit_code, 0)
result = self.app.test_cli_runner().invoke(args=['group', 'show', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
def test_create(self):
result = self.app.test_cli_runner().invoke(args=['group', 'create', 'users']) # Duplicate name
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['group', 'create', 'new group'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['group', 'create', 'newgroup', '--description', 'A new group'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
group = Group.query.filter_by(name='newgroup').first()
self.assertIsNotNone(group)
self.assertEqual(group.description, 'A new group')
def test_update(self):
result = self.app.test_cli_runner().invoke(args=['group', 'update', 'doesnotexist', '--description', 'foo'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['group', 'update', 'users', '--description', 'New description'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
group = Group.query.filter_by(name='users').first()
self.assertEqual(group.description, 'New description')
def test_update_without_description(self):
result = self.app.test_cli_runner().invoke(args=['group', 'update', 'users']) # Should not change anything
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
group = Group.query.filter_by(name='users').first()
self.assertEqual(group.description, 'Base group for all users')
def test_delete(self):
with self.app.test_request_context():
self.assertIsNotNone(Group.query.filter_by(name='users').first())
result = self.app.test_cli_runner().invoke(args=['group', 'delete', 'users'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertIsNone(Group.query.filter_by(name='users').first())
result = self.app.test_cli_runner().invoke(args=['group', 'delete', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
import os
import sys
import datetime
from uffd.database import db
from uffd.models import (
User, UserEmail, Group,
RecoveryCodeMethod, TOTPMethod, WebauthnMethod,
Role, RoleGroup,
Signup,
Invite, InviteGrant, InviteSignup,
DeviceLoginConfirmation,
Service,
OAuth2Client, OAuth2LogoutURI, OAuth2Grant, OAuth2Token, OAuth2DeviceLoginInitiation,
PasswordToken,
Session,
)
from tests.utils import MigrationTestCase
class TestFuzzy(MigrationTestCase):
def setUpApp(self):
self.app.config['LDAP_SERVICE_MOCK'] = True
self.app.config['OAUTH2_CLIENTS'] = {
'test': {
'service_name': 'test',
'client_secret': 'testsecret',
'redirect_uris': ['http://localhost:5004/oauthproxy/callback'],
'logout_urls': ['http://localhost:5004/oauthproxy/logout']
}
}
self.app.config['API_CLIENTS_2'] = {
'test': {
'service_name': 'test',
'client_secret': 'testsecret',
'scopes': ['checkpassword', 'getusers', 'getmails']
},
}
# Runs every upgrade/downgrade script with data. To do this we first upgrade
# to head, create data, then downgrade, upgrade, downgrade for every revision.
def test_migrations_fuzzy(self):
self.upgrade('head')
# Users and groups were created by 878b25c4fae7_ldap_to_db because we set LDAP_SERVICE_MOCK to True
user = User.query.first()
group = Group.query.first()
db.session.add(RecoveryCodeMethod(user=user))
db.session.add(TOTPMethod(user=user, name='mytotp'))
db.session.add(WebauthnMethod(user=user, name='mywebauthn', cred=b''))
role = Role(name='role', groups={group: RoleGroup(group=group)})
db.session.add(role)
role.members.append(user)
db.session.add(Role(name='base', included_roles=[role], locked=True, is_default=True, moderator_group=group, groups={group: RoleGroup(group=group)}))
db.session.add(Signup(loginname='newuser', displayname='New User', mail='newuser@example.com', password='newpassword'))
db.session.add(Signup(loginname='testuser', displayname='Testuser', mail='testuser@example.com', password='testpassword', user=user))
invite = Invite(valid_until=datetime.datetime.now(), roles=[role])
db.session.add(invite)
invite.signups.append(InviteSignup(loginname='newuser', displayname='New User', mail='newuser@example.com', password='newpassword'))
invite.grants.append(InviteGrant(user=user))
db.session.add(Invite(creator=user, valid_until=datetime.datetime.now()))
service = Service(name='testservice', access_group=group)
oauth2_client = OAuth2Client(service=service, client_id='testclient', client_secret='testsecret', redirect_uris=['http://localhost:1234/callback'], logout_uris=[OAuth2LogoutURI(method='GET', uri='http://localhost:1234/callback')])
db.session.add_all([service, oauth2_client])
session = Session(
user=user,
secret='0919de9da3f7dc6c33ab849f44c20e8221b673ca701030de17488f3269fc5469f100e2ce56e5fd71305b23d8ecbb06d80d22004adcd3fefc5f5fcb80a436e31f2c2d9cc8fe8c59ae44871ae4524408d312474570280bf29d3ba145a4bd00010ca758eaa0795b180ec12978b42d13bf4c4f06f72103d44077022ce656610be855',
ip_address='127.0.0.1',
user_agent='Mozilla/5.0 (X11; Linux x86_64; rv:124.0) Gecko/20100101 Firefox/124.0',
mfa_done=True,
)
db.session.add(session)
db.session.add(OAuth2Grant(session=session, client=oauth2_client, _code='testcode', redirect_uri='http://example.com/callback', expires=datetime.datetime.now()))
db.session.add(OAuth2Token(session=session, client=oauth2_client, token_type='Bearer', _access_token='testcode', _refresh_token='testcode', expires=datetime.datetime.now()))
db.session.add(OAuth2DeviceLoginInitiation(client=oauth2_client, confirmations=[DeviceLoginConfirmation(session=session)]))
db.session.add(PasswordToken(user=user))
db.session.commit()
revs = [s.split('_', 1)[0] for s in os.listdir('uffd/migrations/versions') if '_' in s and s.endswith('.py')]
for rev in revs:
self.downgrade('-1')
self.upgrade('+1')
self.downgrade('-1')
from uffd.database import db
from uffd.models.misc import lock_table, Lock
from tests.utils import MigrationTestCase
class TestForMissingLockRows(MigrationTestCase):
def test_check_missing_lock_rows(self):
self.upgrade('head')
existing_locks = {row[0] for row in db.session.execute(db.select([lock_table.c.name])).fetchall()}
for name in Lock.ALL_LOCKS - existing_locks:
self.fail(f'Lock "{name}" is missing. Make sure to add a migration that inserts it.')
# Add something like this:
# conn = op.get_bind()
# lock_table = sa.table('lock', sa.column('name'))
# conn.execute(sa.insert(lock_table).values(name='NAME'))
from uffd.database import db
from uffd.models.misc import lock_table, Lock
from tests.utils import MigrationTestCase
user_table = db.table('user',
db.column('id'),
db.column('unix_uid'),
db.column('loginname'),
db.column('displayname'),
db.column('primary_email_id'),
db.column('is_service_user'),
)
user_email_table = db.table('user_email',
db.column('id'),
db.column('address'),
db.column('address_normalized'),
db.column('verified'),
)
group_table = db.table('group',
db.column('id'),
db.column('unix_gid'),
db.column('name'),
db.column('description')
)
uid_allocation_table = db.table('uid_allocation', db.column('id'))
gid_allocation_table = db.table('gid_allocation', db.column('id'))
class TestMigration(MigrationTestCase):
REVISION = 'aeb07202a6c8'
def setUpApp(self):
self.app.config['USER_MIN_UID'] = 10000
self.app.config['USER_MAX_UID'] = 10005
self.app.config['USER_SERVICE_MIN_UID'] = 10006
self.app.config['USER_SERVICE_MAX_UID'] = 10010
self.app.config['GROUP_MIN_GID'] = 20000
self.app.config['GROUP_MAX_GID'] = 20005
def create_user(self, uid):
db.session.execute(db.insert(user_email_table).values(
address=f'email{uid}@example.com',
address_normalized=f'email{uid}@example.com',
verified=True
))
email_id = db.session.execute(
db.select([user_email_table.c.id])
.where(user_email_table.c.address == f'email{uid}@example.com')
).scalar()
db.session.execute(db.insert(user_table).values(
unix_uid=uid,
loginname=f'user{uid}',
displayname='user',
primary_email_id=email_id,
is_service_user=False
))
def create_group(self, gid):
db.session.execute(db.insert(group_table).values(unix_gid=gid, name=f'group{gid}', description=''))
def fetch_uid_allocations(self):
return [row[0] for row in db.session.execute(
db.select([uid_allocation_table])
.order_by(uid_allocation_table.c.id)
).fetchall()]
def fetch_gid_allocations(self):
return [row[0] for row in db.session.execute(
db.select([gid_allocation_table])
.order_by(gid_allocation_table.c.id)
).fetchall()]
def test_empty(self):
# No users/groups
self.upgrade()
self.assertEqual(self.fetch_uid_allocations(), [])
self.assertEqual(self.fetch_gid_allocations(), [])
def test_gid_first_minus_one(self):
self.create_group(19999)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [19999])
def test_gid_first(self):
self.create_group(20000)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [20000])
def test_gid_first_plus_one(self):
self.create_group(20001)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [20000, 20001])
def test_gid_last_minus_one(self):
self.create_group(20004)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [20000, 20001, 20002, 20003, 20004])
def test_gid_last(self):
self.create_group(20005)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [20000, 20001, 20002, 20003, 20004, 20005])
def test_gid_last_plus_one(self):
self.create_group(20006)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [20006])
def test_gid_complex(self):
self.create_group(10)
self.create_group(20001)
self.create_group(20003)
self.create_group(20010)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [10, 20000, 20001, 20002, 20003, 20010])
# The code for UIDs is mostly the same as for GIDs, so we don't test all
# the edge cases again.
def test_uid_different_ranges(self):
self.create_user(10)
self.create_user(10000)
self.create_user(10002)
self.create_user(10007)
self.create_user(10009)
self.create_user(90000)
self.upgrade()
self.assertEqual(self.fetch_uid_allocations(), [10, 10000, 10001, 10002, 10006, 10007, 10008, 10009, 90000])
def test_uid_same_ranges(self):
self.app.config['USER_MIN_UID'] = 10000
self.app.config['USER_MAX_UID'] = 10010
self.app.config['USER_SERVICE_MIN_UID'] = 10000
self.app.config['USER_SERVICE_MAX_UID'] = 10010
self.create_user(10)
self.create_user(10000)
self.create_user(10002)
self.create_user(10007)
self.create_user(10009)
self.create_user(90000)
self.upgrade()
self.assertEqual(self.fetch_uid_allocations(), [10, 10000, 10001, 10002, 10003, 10004, 10005, 10006, 10007, 10008, 10009, 90000])
import datetime
from flask import current_app
from uffd.database import db
from uffd.models import Invite, InviteGrant, InviteSignup, User, Role, RoleGroup
from tests.utils import UffdTestCase, db_flush
class TestInviteModel(UffdTestCase):
def test_expire(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), creator=self.get_admin())
self.assertFalse(invite.expired)
self.assertTrue(invite.active)
invite.valid_until = datetime.datetime.utcnow() - datetime.timedelta(seconds=60)
self.assertTrue(invite.expired)
self.assertFalse(invite.active)
def test_void(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), single_use=False, creator=self.get_admin())
self.assertFalse(invite.voided)
self.assertTrue(invite.active)
invite.used = True
self.assertFalse(invite.voided)
self.assertTrue(invite.active)
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), single_use=True, creator=self.get_admin())
self.assertFalse(invite.voided)
self.assertTrue(invite.active)
invite.used = True
self.assertTrue(invite.voided)
self.assertFalse(invite.active)
def test_permitted(self):
role = Role(name='testrole')
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), allow_signup=True, roles=[role])
self.assertFalse(invite.permitted)
self.assertFalse(invite.active)
invite.creator = self.get_admin()
self.assertTrue(invite.permitted)
self.assertTrue(invite.active)
invite.creator.is_deactivated = True
self.assertFalse(invite.permitted)
self.assertFalse(invite.active)
invite.creator = self.get_user()
self.assertFalse(invite.permitted)
self.assertFalse(invite.active)
role.moderator_group = self.get_access_group()
current_app.config['ACL_SIGNUP_GROUP'] = 'uffd_access'
self.assertTrue(invite.permitted)
self.assertTrue(invite.active)
role.moderator_group = None
self.assertFalse(invite.permitted)
self.assertFalse(invite.active)
role.moderator_group = self.get_access_group()
current_app.config['ACL_SIGNUP_GROUP'] = 'uffd_admin'
self.assertFalse(invite.permitted)
self.assertFalse(invite.active)
def test_disable(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), creator=self.get_admin())
self.assertTrue(invite.active)
invite.disable()
self.assertFalse(invite.active)
def test_reset_disabled(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), creator=self.get_admin())
invite.disable()
self.assertFalse(invite.active)
invite.reset()
self.assertTrue(invite.active)
def test_reset_expired(self):
invite = Invite(valid_until=datetime.datetime.utcnow() - datetime.timedelta(seconds=60), creator=self.get_admin())
self.assertFalse(invite.active)
invite.reset()
self.assertFalse(invite.active)
def test_reset_single_use(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), single_use=False, creator=self.get_admin())
invite.used = True
invite.disable()
self.assertFalse(invite.active)
invite.reset()
self.assertTrue(invite.active)
def test_short_token(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), creator=self.get_admin())
db.session.add(invite)
db.session.commit()
self.assertTrue(len(invite.short_token) <= len(invite.token)/3)
class TestInviteGrantModel(UffdTestCase):
def test_success(self):
user = self.get_user()
group0 = self.get_access_group()
role0 = Role(name='baserole', groups={group0: RoleGroup(group=group0)})
db.session.add(role0)
user.roles.append(role0)
user.update_groups()
group1 = self.get_admin_group()
role1 = Role(name='testrole1', groups={group1: RoleGroup(group=group1)})
db.session.add(role1)
role2 = Role(name='testrole2')
db.session.add(role2)
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), roles=[role1, role2], creator=self.get_admin())
self.assertIn(role0, user.roles)
self.assertNotIn(role1, user.roles)
self.assertNotIn(role2, user.roles)
self.assertIn(group0, user.groups)
self.assertNotIn(group1, user.groups)
self.assertFalse(invite.used)
grant = InviteGrant(invite=invite, user=user)
success, msg = grant.apply()
self.assertTrue(success)
self.assertIn(role0, user.roles)
self.assertIn(role1, user.roles)
self.assertIn(role2, user.roles)
self.assertIn(group0, user.groups)
self.assertIn(group1, user.groups)
self.assertTrue(invite.used)
db.session.commit()
db_flush()
user = self.get_user()
self.assertIn('baserole', [role.name for role in user.roles_effective])
self.assertIn('testrole1', [role.name for role in user.roles])
self.assertIn('testrole2', [role.name for role in user.roles])
self.assertIn(self.get_access_group(), user.groups)
self.assertIn(self.get_admin_group(), user.groups)
def test_inactive(self):
user = self.get_user()
group = self.get_admin_group()
role = Role(name='testrole1', groups={group: RoleGroup(group=group)})
db.session.add(role)
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), roles=[role], single_use=True, used=True, creator=self.get_admin())
self.assertFalse(invite.active)
grant = InviteGrant(invite=invite, user=user)
success, msg = grant.apply()
self.assertFalse(success)
self.assertIsInstance(msg, str)
self.assertNotIn(role, user.roles)
self.assertNotIn(group, user.groups)
def test_no_roles(self):
user = self.get_user()
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), creator=self.get_admin())
self.assertTrue(invite.active)
grant = InviteGrant(invite=invite, user=user)
success, msg = grant.apply()
self.assertFalse(success)
self.assertIsInstance(msg, str)
def test_no_new_roles(self):
user = self.get_user()
role = Role(name='testrole1')
db.session.add(role)
user.roles.append(role)
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), roles=[role], creator=self.get_admin())
self.assertTrue(invite.active)
grant = InviteGrant(invite=invite, user=user)
success, msg = grant.apply()
self.assertFalse(success)
self.assertIsInstance(msg, str)
class TestInviteSignupModel(UffdTestCase):
def create_base_roles(self):
baserole = Role(name='base', is_default=True)
baserole.groups[self.get_access_group()] = RoleGroup()
baserole.groups[self.get_users_group()] = RoleGroup()
db.session.add(baserole)
db.session.commit()
def test_success(self):
self.create_base_roles()
base_role = Role.query.filter_by(name='base').one()
base_group1 = self.get_access_group()
base_group2 = self.get_users_group()
group = self.get_admin_group()
role1 = Role(name='testrole1', groups={group: RoleGroup(group=group)})
db.session.add(role1)
role2 = Role(name='testrole2')
db.session.add(role2)
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), roles=[role1, role2], allow_signup=True, creator=self.get_admin())
signup = InviteSignup(invite=invite, loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
self.assertFalse(invite.used)
valid, msg = signup.validate()
self.assertTrue(valid)
self.assertFalse(invite.used)
user, msg = signup.finish('notsecret')
self.assertIsInstance(user, User)
self.assertTrue(invite.used)
self.assertEqual(user.loginname, 'newuser')
self.assertEqual(user.displayname, 'New User')
self.assertEqual(user.primary_email.address, 'test@example.com')
self.assertEqual(signup.user, user)
self.assertIn(base_role, user.roles_effective)
self.assertIn(role1, user.roles)
self.assertIn(role2, user.roles)
self.assertIn(base_group1, user.groups)
self.assertIn(base_group2, user.groups)
self.assertIn(group, user.groups)
db.session.commit()
db_flush()
self.assertEqual(len(User.query.filter_by(loginname='newuser').all()), 1)
def test_success_no_roles(self):
self.create_base_roles()
base_role = Role.query.filter_by(name='base').one()
base_group1 = self.get_access_group()
base_group2 = self.get_users_group()
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), allow_signup=True, creator=self.get_admin())
signup = InviteSignup(invite=invite, loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
self.assertFalse(invite.used)
valid, msg = signup.validate()
self.assertTrue(valid)
self.assertFalse(invite.used)
user, msg = signup.finish('notsecret')
self.assertIsInstance(user, User)
self.assertTrue(invite.used)
self.assertEqual(user.loginname, 'newuser')
self.assertEqual(user.displayname, 'New User')
self.assertEqual(user.primary_email.address, 'test@example.com')
self.assertEqual(signup.user, user)
self.assertIn(base_role, user.roles_effective)
self.assertEqual(len(user.roles_effective), 1)
self.assertIn(base_group1, user.groups)
self.assertIn(base_group2, user.groups)
self.assertEqual(len(user.groups), 2)
db.session.commit()
db_flush()
self.assertEqual(len(User.query.filter_by(loginname='newuser').all()), 1)
def test_inactive(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), allow_signup=True, single_use=True, used=True, creator=self.get_admin())
self.assertFalse(invite.active)
signup = InviteSignup(invite=invite, loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
valid, msg = signup.validate()
self.assertFalse(valid)
self.assertIsInstance(msg, str)
user, msg = signup.finish('notsecret')
self.assertIsNone(user)
self.assertIsInstance(msg, str)
def test_invalid(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), allow_signup=True, creator=self.get_admin())
self.assertTrue(invite.active)
signup = InviteSignup(invite=invite, loginname='', displayname='New User', mail='test@example.com', password='notsecret')
valid, msg = signup.validate()
self.assertFalse(valid)
self.assertIsInstance(msg, str)
def test_invalid2(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), allow_signup=True, creator=self.get_admin())
self.assertTrue(invite.active)
signup = InviteSignup(invite=invite, loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
user, msg = signup.finish('wrongpassword')
self.assertIsNone(user)
self.assertIsInstance(msg, str)
def test_no_signup(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), allow_signup=False, creator=self.get_admin())
self.assertTrue(invite.active)
signup = InviteSignup(invite=invite, loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
valid, msg = signup.validate()
self.assertFalse(valid)
self.assertIsInstance(msg, str)
user, msg = signup.finish('notsecret')
self.assertIsNone(user)
self.assertIsInstance(msg, str)
import unittest
import datetime
import time
from uffd.database import db
from uffd.models import RecoveryCodeMethod, TOTPMethod, WebauthnMethod
from uffd.models.mfa import _hotp
from tests.utils import UffdTestCase
class TestMfaPrimitives(unittest.TestCase):
def test_hotp(self):
self.assertEqual(_hotp(5555555, b'\xae\xa3T\x05\x89\xd6\xb76\xf61r\x92\xcc\xb5WZ\xe6)\x05q'), '458290')
self.assertEqual(_hotp(5555555, b'\xae\xa3T\x05\x89\xd6\xb76\xf61r\x92\xcc\xb5WZ\xe6)\x05q', digits=8), '20458290')
for digits in range(1, 10):
self.assertEqual(len(_hotp(1, b'abcd', digits=digits)), digits)
self.assertEqual(_hotp(1234, b''), '161024')
self.assertEqual(_hotp(0, b'\x04\x8fM\xcc\x7f\x82\x9c$a\x1b\xb3'), '279354')
self.assertEqual(_hotp(2**64-1, b'abcde'), '899292')
def get_fido2_test_cred(self):
try:
from uffd.fido2_compat import AttestedCredentialData
except ImportError:
self.skipTest('fido2 could not be imported')
# Example public key from webauthn spec 6.5.1.1
return AttestedCredentialData(bytes.fromhex('00000000000000000000000000000000'+'0040'+'053cbcc9d37a61d3bac87cdcc77ee326256def08ab15775d3a720332e4101d14fae95aeee3bc9698781812e143c0597dc6e180595683d501891e9dd030454c0a'+'A501020326200121582065eda5a12577c2bae829437fe338701a10aaa375e1bb5b5de108de439c08551d2258201e52ed75701163f7f9e40ddf9f341b3dc9ba860af7e0ca7ca7e9eecd0084d19c'))
class TestMfaMethodModels(UffdTestCase):
def test_common_attributes(self):
method = TOTPMethod(user=self.get_user(), name='testname')
self.assertTrue(method.created <= datetime.datetime.utcnow())
self.assertEqual(method.name, 'testname')
self.assertEqual(method.user.loginname, 'testuser')
method.user = self.get_admin()
self.assertEqual(method.user.loginname, 'testadmin')
def test_recovery_code_method(self):
method = RecoveryCodeMethod(user=self.get_user())
db.session.add(method)
db.session.commit()
method_id = method.id
method_code = method.code_value
db.session.expunge(method)
method = RecoveryCodeMethod.query.get(method_id)
self.assertFalse(hasattr(method, 'code_value'))
self.assertFalse(method.verify(''))
self.assertFalse(method.verify('A'*8))
self.assertTrue(method.verify(method_code))
def test_totp_method_attributes(self):
method = TOTPMethod(user=self.get_user(), name='testname')
raw_key = method.raw_key
issuer = method.issuer
accountname = method.accountname
key_uri = method.key_uri
self.assertEqual(method.name, 'testname')
# Restore method with key parameter
_method = TOTPMethod(user=self.get_user(), key=method.key, name='testname')
self.assertEqual(_method.name, 'testname')
self.assertEqual(_method.raw_key, raw_key)
self.assertEqual(_method.issuer, issuer)
self.assertEqual(_method.accountname, accountname)
self.assertEqual(_method.key_uri, key_uri)
db.session.add(method)
db.session.commit()
_method_id = _method.id
db.session.expunge(_method)
# Restore method from db
_method = TOTPMethod.query.get(_method_id)
self.assertEqual(_method.name, 'testname')
self.assertEqual(_method.raw_key, raw_key)
self.assertEqual(_method.issuer, issuer)
self.assertEqual(_method.accountname, accountname)
self.assertEqual(_method.key_uri, key_uri)
def test_totp_method_verify(self):
method = TOTPMethod(user=self.get_user())
counter = int(time.time()/30)
self.assertFalse(method.verify(''))
self.assertFalse(method.verify(_hotp(counter-2, method.raw_key)))
self.assertTrue(method.verify(_hotp(counter, method.raw_key)))
self.assertFalse(method.verify(_hotp(counter+2, method.raw_key)))
def test_totp_method_verify_reuse(self):
method = TOTPMethod(user=self.get_user())
counter = int(time.time()/30)
self.assertFalse(method.verify(_hotp(counter-2, method.raw_key)))
self.assertTrue(method.verify(_hotp(counter-1, method.raw_key)))
self.assertTrue(method.verify(_hotp(counter, method.raw_key)))
self.assertFalse(method.verify(_hotp(counter-1, method.raw_key)))
self.assertFalse(method.verify(_hotp(counter, method.raw_key)))
def test_webauthn_method(self):
data = get_fido2_test_cred(self)
method = WebauthnMethod(user=self.get_user(), cred=data, name='testname')
self.assertEqual(method.name, 'testname')
db.session.add(method)
db.session.commit()
method_id = method.id
method_cred = method.cred
db.session.expunge(method)
_method = WebauthnMethod.query.get(method_id)
self.assertEqual(_method.name, 'testname')
self.assertEqual(bytes(method_cred), bytes(_method.cred))
self.assertEqual(data.credential_id, _method.cred.credential_id)
self.assertEqual(data.public_key, _method.cred.public_key)
# We only test (de-)serialization here, as everything else is currently implemented in the views
import time
import threading
from sqlalchemy.exc import IntegrityError
from uffd.database import db
from uffd.models import FeatureFlag, Lock
from uffd.models.misc import feature_flag_table
from tests.utils import ModelTestCase
class TestFeatureFlag(ModelTestCase):
def test_disabled(self):
flag = FeatureFlag('foo')
self.assertFalse(flag)
self.assertFalse(db.session.execute(db.select([flag.expr])).scalar())
def test_enabled(self):
db.session.execute(db.insert(feature_flag_table).values(name='foo'))
flag = FeatureFlag('foo')
self.assertTrue(flag)
self.assertTrue(db.session.execute(db.select([flag.expr])).scalar())
def test_toggle(self):
flag = FeatureFlag('foo')
hooks_called = []
@flag.enable_hook
def enable_hook1():
hooks_called.append('enable1')
@flag.enable_hook
def enable_hook2():
hooks_called.append('enable2')
@flag.disable_hook
def disable_hook1():
hooks_called.append('disable1')
@flag.disable_hook
def disable_hook2():
hooks_called.append('disable2')
hooks_called.clear()
flag.enable()
self.assertTrue(flag)
self.assertEqual(hooks_called, ['enable1', 'enable2'])
hooks_called.clear()
flag.disable()
self.assertFalse(flag)
self.assertEqual(hooks_called, ['disable1', 'disable2'])
flag.disable() # does nothing
self.assertFalse(flag)
flag.enable()
self.assertTrue(flag)
with self.assertRaises(IntegrityError):
flag.enable()
self.assertTrue(flag)
class TestLock(ModelTestCase):
DISABLE_SQLITE_MEMORY_DB = True
def setUpApp(self):
self.lock = Lock('testlock')
def run_lock_test(self):
result = []
def func():
with self.app.test_request_context():
self.lock.acquire()
result.append('bar')
t = threading.Thread(target=func)
t.start()
time.sleep(1)
result.append('foo')
time.sleep(1)
db.session.rollback()
t.join()
return result
def test_lock2(self):
self.assertEqual(self.run_lock_test(), ['bar', 'foo'])
self.lock.acquire()
self.assertEqual(self.run_lock_test(), ['foo', 'bar'])
import unittest
import datetime
import jwt
from uffd.database import db
from uffd.models import OAuth2Key
from tests.utils import UffdTestCase
TEST_JWK = dict(
id='HvOn74G7njK1GoFNe8Dta087casdWMsm06pNhOXRgJU',
created=datetime.datetime(2023, 11, 9, 0, 21, 10),
active=True,
algorithm='RS256',
private_key_jwk='''{
"kty": "RSA",
"key_ops": ["sign"],
"n": "vrznqUy8Xamph6s0Z02fFMIyjwLAMio35i9DXYjXP1ZQwSZ3SsIh3m2ablMnlu8PVlnYUzoj8rXyAWND0FSfWoQQxv1rq15pllKueddLoJsv321N_NRB8beGsLrsndw8QO0q3RWqV9O3kqhlTMjgj6bquX42wLaXrPLJyfbT3zObBsToG4UxpOyly84aklJXU5wIs0cbmjbfd8Xld38BG8Oh7Ozy5b93vPpJW6rudZRxU6QYC0r9bFFLIHJWrR4bzQMLGoJ63xjPOCl4WNpOYc9B7PNgnWTLXlFd51Hw9CaT2MRWsKNCSU77f6nZkfjWa1IsQdF0I48m46qgq7bEOOl9DbThbCnpblWrctdyg6du-OvCyVmkAo1KGtANl0027pgqUI_9HBMi33y3UPQm1ALHXIyIDBZtExH3lD6MMK3XGJfUxZuIOBndK-PXm5Fed52bgLOcf-24X6aHFn-8oyDVIj9OHkKWjy7jtKdmqZc4pBdVuCaMCYzj8iERWA3H",
"e": "AQAB",
"d": "G7yoH5mLcZTA6ia-byCoN-zpofGvdga9AZnxPO0vsq6K_cY_O2gxuVZ3n6reAKKbuLNGCbb_D_Dffs4q8rprlfkgi3TCLzXX5Zv5HWTD7a4Y7xpxEzQ2sWo-iagVIqZVPh0pyjliqnTyUWnFmWiY0gBe9UHianHjFVZqe8E2HFOKgW3UUbQz0keg8JtJ3T9gzZrM38KWbqhOJO0VVSRAoANPTSnumfRsUCyWywrMtIfgAbQaKazqX3xkOsAF1L-iNfd6slzPvRyIQVflVDMdfKnsu-lHiKJ0DK_lg9f55T5FymgcXsq43EKBQ2H4v2dafIm-vtWx_TRZWj_msD32BEPBA-zTqh_oP1r6a3DZh4DBtWY3vzSiuhAC0erlRs-hRTX_e9ET5fUbJnmNxjnxQD9zZmwq4ujMK6KFnHct8t77Qxj3a-wDR_XyDJ4_EKYqHlcVHfxGNBSvIdjuZJkPJnVpVtfCtpyamQIR4u5oNV7fIwYe_tFnw0Y90rGoJMzB",
"p": "-A-FnH21HJ7GPWUm9k3mxsxSchy89QEUCZZiH6EcB4ZP8wJsxrQsUSIHCR74YmZEI3Ulsum1Ql4x50k7Q2sNh9SnwKvmctjksehGy4yCrdunAqjqyz3wFwGaKWnhn3frkiqH5ATjkOoc8qHz8saa7reeVClj47ZWyy-Nl559ycLMs0rI1N_THzO07C3jSbJhyPj0yeygAflsRqqnNvEQ6ps1VLiqf9G5jfSvUUn5DyKIpep9iGo29caGSIPIy_2h",
"q": "xNe1-QWskxOcY_GiHpFWdvzqr1o9fxg5whgpNcGi3caokw2iNHRYut4cbVvFFBlv_9B5QCl9WVfR2ADG0AtvkvUxEZqCdxEvcqjIANeRLKHDjW5kMuPS0_fcskFP-r7mCM9SBfPplfMVCF5nuNWf5LzNopWfsTChIDD1rSpPjItNYuwLXszm_3R81HHHeQLcyvoMxLCmeLy5TXX2hXOMHh2IMZCXAHopJmLJUVnQ48kr5jd2l0kLbmx3aBqdccJn",
"dp": "MLS7g1KbcRcrzXpDADGjkn0j4wwJfgHMMWW5toQnwMJ6iDh9qzZNTVDlGMFf-9IgpuWllU-WK4XbPpJ-dGpcqcLzfT1DbmFv5g65d9YLAqASVs9b6rQqpBnIb0E-79TYCEcZj4f2NsoBDRMHly-v1BdxmwzVdCylNhgMMS0Jfcgl8T5J2KJqDcJVT9piumGwGYnoZo1zjW-v9uAjHQKQU8BN5Git8ZL4YAsfMVLY-EPLmOhF5bcVO4TTcQGPN56B",
"dq": "HiiSl-G3rB0QE_v8g8Ruw_JCHrWrwGI8zzEWd0cApgv-3fDzzieZRKAtKNArpMW09DPDsAHrU5nx669KxqtJ3_EzIGhU3ttCMsYLRp3Af18VcADe1zEypwlNxf3dvCQtaGIjRgg13KSOr2aPa7FHOyt2MhfMjMBPn3gA3BQkdfsN0z8pCtBIABGf4ojAMBkxLOQcurH5_3uixGxzZcTrTd3mdPmbORZ-YYQ3JgCl0ZCL6kzLHaiyWKvDq66QOtK3",
"qi": "ySqD9cUxbq3wkCsPQId_YfQLIqb5RK_JJIMjtBOdTdo4aT5tmodYCSmjBmhrYXjDWtyJdelvPfdSfgncHJhf8VgkZ8TPvUeaQwsQFBwB5llwpdb72eEEJrmG1SVwNMoFCLXdNT3ACad16cUDMnWmklH0X07OzdxGOBnGhgLZUs4RbPjLH7OpYTyQqVy2L8vofqJR42cfePZw8WQM4k0PPbhralhybExIkSCmaQyYbACZ5k0OVQErEqnj4elglA0h"
}''',
public_key_jwk='''{
"kty": "RSA",
"key_ops": ["verify"],
"n": "vrznqUy8Xamph6s0Z02fFMIyjwLAMio35i9DXYjXP1ZQwSZ3SsIh3m2ablMnlu8PVlnYUzoj8rXyAWND0FSfWoQQxv1rq15pllKueddLoJsv321N_NRB8beGsLrsndw8QO0q3RWqV9O3kqhlTMjgj6bquX42wLaXrPLJyfbT3zObBsToG4UxpOyly84aklJXU5wIs0cbmjbfd8Xld38BG8Oh7Ozy5b93vPpJW6rudZRxU6QYC0r9bFFLIHJWrR4bzQMLGoJ63xjPOCl4WNpOYc9B7PNgnWTLXlFd51Hw9CaT2MRWsKNCSU77f6nZkfjWa1IsQdF0I48m46qgq7bEOOl9DbThbCnpblWrctdyg6du-OvCyVmkAo1KGtANl0027pgqUI_9HBMi33y3UPQm1ALHXIyIDBZtExH3lD6MMK3XGJfUxZuIOBndK-PXm5Fed52bgLOcf-24X6aHFn-8oyDVIj9OHkKWjy7jtKdmqZc4pBdVuCaMCYzj8iERWA3H",
"e": "AQAB"
}''',
)
class TestOAuth2Key(UffdTestCase):
def setUp(self):
super().setUp()
db.session.add(OAuth2Key(**TEST_JWK))
db.session.add(OAuth2Key(
id='1e9gdk7',
created=datetime.datetime(2014, 11, 8, 0, 0, 0),
active=True,
algorithm='RS256',
private_key_jwk='invalid',
public_key_jwk='''{
"kty":"RSA",
"n":"w7Zdfmece8iaB0kiTY8pCtiBtzbptJmP28nSWwtdjRu0f2GFpajvWE4VhfJAjEsOcwYzay7XGN0b-X84BfC8hmCTOj2b2eHT7NsZegFPKRUQzJ9wW8ipn_aDJWMGDuB1XyqT1E7DYqjUCEOD1b4FLpy_xPn6oV_TYOfQ9fZdbE5HGxJUzekuGcOKqOQ8M7wfYHhHHLxGpQVgL0apWuP2gDDOdTtpuld4D2LK1MZK99s9gaSjRHE8JDb1Z4IGhEcEyzkxswVdPndUWzfvWBBWXWxtSUvQGBRkuy1BHOa4sP6FKjWEeeF7gm7UMs2Nm2QUgNZw6xvEDGaLk4KASdIxRQ",
"e":"AQAB"
}'''
))
db.session.commit()
self.key = OAuth2Key.query.get('HvOn74G7njK1GoFNe8Dta087casdWMsm06pNhOXRgJU')
self.key_oidc_spec = OAuth2Key.query.get('1e9gdk7')
def test_private_key(self):
self.key.private_key
def test_public_key(self):
self.key.private_key
def test_public_key_jwks_dict(self):
self.assertEqual(self.key.public_key_jwks_dict, {
"kid": "HvOn74G7njK1GoFNe8Dta087casdWMsm06pNhOXRgJU",
"kty": "RSA",
"alg": "RS256",
"use": "sig",
"n": "vrznqUy8Xamph6s0Z02fFMIyjwLAMio35i9DXYjXP1ZQwSZ3SsIh3m2ablMnlu8PVlnYUzoj8rXyAWND0FSfWoQQxv1rq15pllKueddLoJsv321N_NRB8beGsLrsndw8QO0q3RWqV9O3kqhlTMjgj6bquX42wLaXrPLJyfbT3zObBsToG4UxpOyly84aklJXU5wIs0cbmjbfd8Xld38BG8Oh7Ozy5b93vPpJW6rudZRxU6QYC0r9bFFLIHJWrR4bzQMLGoJ63xjPOCl4WNpOYc9B7PNgnWTLXlFd51Hw9CaT2MRWsKNCSU77f6nZkfjWa1IsQdF0I48m46qgq7bEOOl9DbThbCnpblWrctdyg6du-OvCyVmkAo1KGtANl0027pgqUI_9HBMi33y3UPQm1ALHXIyIDBZtExH3lD6MMK3XGJfUxZuIOBndK-PXm5Fed52bgLOcf-24X6aHFn-8oyDVIj9OHkKWjy7jtKdmqZc4pBdVuCaMCYzj8iERWA3H",
"e": "AQAB"
})
def test_encode_jwt(self):
jwtdata = self.key.encode_jwt({'aud': 'test', 'foo': 'bar'})
self.assertIsInstance(jwtdata, str) # Regression check for #165
self.assertEqual(
jwt.get_unverified_header(jwtdata),
# typ is optional, x5u/x5c/jku/jwk are discoraged by OIDC Core 1.0 spec section 2
{'kid': self.key.id, 'alg': self.key.algorithm, 'typ': 'JWT'}
)
self.assertEqual(
OAuth2Key.decode_jwt(jwtdata, audience='test'),
{'aud': 'test', 'foo': 'bar'}
)
self.key.active = False
with self.assertRaises(jwt.exceptions.InvalidKeyError):
self.key.encode_jwt({'aud': 'test', 'foo': 'bar'})
def test_oidc_hash(self):
# Example from OIDC Core 1.0 spec A.3
self.assertEqual(
self.key.oidc_hash(b'jHkWEdUXMU1BwAsC4vtUsZwnNvTIxEl0z9K3vx5KF0Y'),
'77QmUPtjPfzWtF2AnpK9RQ'
)
# Example from OIDC Core 1.0 spec A.4
self.assertEqual(
self.key.oidc_hash(b'Qcb0Orv1zh30vL1MPRsbm-diHiMwcLyZvn1arpZv-Jxf_11jnpEX3Tgfvk'),
'LDktKdoQak3Pk0cnXxCltA'
)
# Example from OIDC Core 1.0 spec A.6
self.assertEqual(
self.key.oidc_hash(b'jHkWEdUXMU1BwAsC4vtUsZwnNvTIxEl0z9K3vx5KF0Y'),
'77QmUPtjPfzWtF2AnpK9RQ'
)
self.assertEqual(
self.key.oidc_hash(b'Qcb0Orv1zh30vL1MPRsbm-diHiMwcLyZvn1arpZv-Jxf_11jnpEX3Tgfvk'),
'LDktKdoQak3Pk0cnXxCltA'
)
def test_decode_jwt(self):
# Example from OIDC Core 1.0 spec A.2
jwt_data = (
'eyJraWQiOiIxZTlnZGs3IiwiYWxnIjoiUlMyNTYifQ.ewogImlz'
'cyI6ICJodHRwOi8vc2VydmVyLmV4YW1wbGUuY29tIiwKICJzdWIiOiAiMjQ4'
'Mjg5NzYxMDAxIiwKICJhdWQiOiAiczZCaGRSa3F0MyIsCiAibm9uY2UiOiAi'
'bi0wUzZfV3pBMk1qIiwKICJleHAiOiAxMzExMjgxOTcwLAogImlhdCI6IDEz'
'MTEyODA5NzAsCiAibmFtZSI6ICJKYW5lIERvZSIsCiAiZ2l2ZW5fbmFtZSI6'
'ICJKYW5lIiwKICJmYW1pbHlfbmFtZSI6ICJEb2UiLAogImdlbmRlciI6ICJm'
'ZW1hbGUiLAogImJpcnRoZGF0ZSI6ICIwMDAwLTEwLTMxIiwKICJlbWFpbCI6'
'ICJqYW5lZG9lQGV4YW1wbGUuY29tIiwKICJwaWN0dXJlIjogImh0dHA6Ly9l'
'eGFtcGxlLmNvbS9qYW5lZG9lL21lLmpwZyIKfQ.rHQjEmBqn9Jre0OLykYNn'
'spA10Qql2rvx4FsD00jwlB0Sym4NzpgvPKsDjn_wMkHxcp6CilPcoKrWHcip'
'R2iAjzLvDNAReF97zoJqq880ZD1bwY82JDauCXELVR9O6_B0w3K-E7yM2mac'
'AAgNCUwtik6SjoSUZRcf-O5lygIyLENx882p6MtmwaL1hd6qn5RZOQ0TLrOY'
'u0532g9Exxcm-ChymrB4xLykpDj3lUivJt63eEGGN6DH5K6o33TcxkIjNrCD'
'4XB1CKKumZvCedgHHF3IAK4dVEDSUoGlH9z4pP_eWYNXvqQOjGs-rDaQzUHl'
'6cQQWNiDpWOl_lxXjQEvQ'
)
self.assertEqual(
OAuth2Key.decode_jwt(jwt_data, options={'verify_exp': False, 'verify_aud': False}),
{
"iss": "http://server.example.com",
"sub": "248289761001",
"aud": "s6BhdRkqt3",
"nonce": "n-0S6_WzA2Mj",
"exp": 1311281970,
"iat": 1311280970,
"name": "Jane Doe",
"given_name": "Jane",
"family_name": "Doe",
"gender": "female",
"birthdate": "0000-10-31",
"email": "janedoe@example.com",
"picture": "http://example.com/janedoe/me.jpg"
}
)
with self.assertRaises(jwt.exceptions.InvalidKeyError):
# {"alg":"RS256"} -> no key id
OAuth2Key.decode_jwt('eyJhbGciOiJSUzI1NiJ9.' + jwt_data.split('.', 1)[-1])
with self.assertRaises(jwt.exceptions.InvalidKeyError):
# {"kid":"XXXXX","alg":"RS256"} -> unknown key id
OAuth2Key.decode_jwt('eyJraWQiOiJYWFhYWCIsImFsZyI6IlJTMjU2In0.' + jwt_data.split('.', 1)[-1])
OAuth2Key.query.get('1e9gdk7').active = False
with self.assertRaises(jwt.exceptions.InvalidKeyError):
# not active
OAuth2Key.decode_jwt(jwt_data)
def test_generate_rsa_key(self):
key = OAuth2Key.generate_rsa_key()
self.assertEqual(key.algorithm, 'RS256')
import datetime
import time
import unittest
from flask import url_for, session
from uffd.database import db
from uffd.models import User, Role, RoleGroup, TOTPMethod
from uffd.models.role import flatten_recursive
# These imports are required, because otherwise we get circular imports?!
from uffd.ldap import ldap
from uffd import user
from uffd.user.models import User, Group
from uffd.role.models import flatten_recursive, Role, RoleGroup
from uffd.mfa.models import TOTPMethod
from uffd import create_app, db
from utils import dump, UffdTestCase
from tests.utils import UffdTestCase
class TestPrimitives(unittest.TestCase):
def test_flatten_recursive(self):
......@@ -37,12 +28,10 @@ class TestPrimitives(unittest.TestCase):
class TestUserRoleAttributes(UffdTestCase):
def test_roles_effective(self):
for user in User.query.filter_by(loginname='service').all():
ldap.session.delete(user)
ldap.session.add(User(loginname='service', is_service_user=True, mail='service@example.com', displayname='Service'))
ldap.session.commit()
db.session.add(User(loginname='service', is_service_user=True, primary_email_address='service@example.com', displayname='Service'))
db.session.commit()
user = self.get_user()
service_user = User.query.get('uid=service,{}'.format(self.app.config['LDAP_USER_SEARCH_BASE']))
service_user = User.query.filter_by(loginname='service').one_or_none()
included_by_default_role = Role(name='included_by_default')
default_role = Role(name='default', is_default=True, included_roles=[included_by_default_role])
included_role = Role(name='included')
......@@ -53,8 +42,6 @@ class TestUserRoleAttributes(UffdTestCase):
db.session.add_all([included_by_default_role, default_role, included_role, cycle_role, direct_role1, direct_role2])
self.assertSetEqual(user.roles_effective, {direct_role1, direct_role2, cycle_role, included_role, default_role, included_by_default_role})
self.assertSetEqual(service_user.roles_effective, {direct_role1, direct_role2, cycle_role, included_role})
ldap.session.delete(service_user)
ldap.session.commit()
def test_compute_groups(self):
user = self.get_user()
......@@ -64,12 +51,13 @@ class TestUserRoleAttributes(UffdTestCase):
role2 = Role(name='role2', groups={group1: RoleGroup(group=group1), group2: RoleGroup(group=group2)})
db.session.add_all([role1, role2])
self.assertSetEqual(user.compute_groups(), set())
role1.members.add(user)
role2.members.add(user)
role1.members.append(user)
role2.members.append(user)
self.assertSetEqual(user.compute_groups(), {group1, group2})
role2.groups[group2].requires_mfa = True
self.assertSetEqual(user.compute_groups(), {group1})
db.session.add(TOTPMethod(user=user))
db.session.commit()
self.assertSetEqual(user.compute_groups(), {group1, group2})
def test_update_groups(self):
......@@ -79,7 +67,7 @@ class TestUserRoleAttributes(UffdTestCase):
role1 = Role(name='role1', members=[user], groups={group1: RoleGroup(group=group1)})
role2 = Role(name='role2', groups={group2: RoleGroup(group=group2)})
db.session.add_all([role1, role2])
user.groups = {group2}
user.groups = [group2]
groups_added, groups_removed = user.update_groups()
self.assertSetEqual(groups_added, {group1})
self.assertSetEqual(groups_removed, {group2})
......@@ -91,13 +79,11 @@ class TestUserRoleAttributes(UffdTestCase):
class TestRoleModel(UffdTestCase):
def test_members_effective(self):
for user in User.query.filter_by(loginname='service').all():
ldap.session.delete(user)
ldap.session.add(User(loginname='service', is_service_user=True, mail='service@example.com', displayname='Service'))
ldap.session.commit()
db.session.add(User(loginname='service', is_service_user=True, primary_email_address='service@example.com', displayname='Service'))
db.session.commit()
user1 = self.get_user()
user2 = self.get_admin()
service = User.query.get('uid=service,{}'.format(self.app.config['LDAP_USER_SEARCH_BASE']))
service = User.query.filter_by(loginname='service').one_or_none()
included_by_default_role = Role(name='included_by_default')
default_role = Role(name='default', is_default=True, included_roles=[included_by_default_role])
included_role = Role(name='included')
......@@ -108,8 +94,6 @@ class TestRoleModel(UffdTestCase):
self.assertSetEqual(included_role.members_effective, {user1, user2, service})
self.assertSetEqual(direct_role.members_effective, {user1, user2, service})
self.assertSetEqual(empty_role.members_effective, set())
ldap.session.delete(service)
ldap.session.commit()
def test_included_roles_recursive(self):
baserole = Role(name='base')
......@@ -149,160 +133,3 @@ class TestRoleModel(UffdTestCase):
baserole.update_member_groups()
self.assertSetEqual(set(user1.groups), {group1, group3})
self.assertSetEqual(set(user2.groups), {group1, group2, group3})
class TestRoleViews(UffdTestCase):
def setUp(self):
super().setUp()
self.login_as('admin')
def test_index(self):
db.session.add(Role(name='base', description='Base role description'))
db.session.add(Role(name='test1', description='Test1 role description'))
db.session.commit()
r = self.client.get(path=url_for('role.index'), follow_redirects=True)
dump('role_index', r)
self.assertEqual(r.status_code, 200)
def test_index_empty(self):
r = self.client.get(path=url_for('role.index'), follow_redirects=True)
dump('role_index_empty', r)
self.assertEqual(r.status_code, 200)
def test_show(self):
role = Role(name='base', description='Base role description')
db.session.add(role)
db.session.commit()
r = self.client.get(path=url_for('role.show', roleid=role.id), follow_redirects=True)
dump('role_show', r)
self.assertEqual(r.status_code, 200)
def test_new(self):
r = self.client.get(path=url_for('role.new'), follow_redirects=True)
dump('role_new', r)
self.assertEqual(r.status_code, 200)
def test_update(self):
role = Role(name='base', description='Base role description')
db.session.add(role)
db.session.commit()
role.groups[self.get_admin_group()] = RoleGroup()
db.session.commit()
self.assertEqual(role.name, 'base')
self.assertEqual(role.description, 'Base role description')
self.assertEqual([group.dn for group in role.groups], [self.test_data.get('group_uffd_admin').get('dn')])
r = self.client.post(path=url_for('role.update', roleid=role.id),
data={'name': 'base1', 'description': 'Base role description1', 'moderator-group': '', 'group-20001': '1', 'group-20002': '1'},
follow_redirects=True)
dump('role_update', r)
self.assertEqual(r.status_code, 200)
role = Role.query.get(role.id)
self.assertEqual(role.name, 'base1')
self.assertEqual(role.description, 'Base role description1')
self.assertEqual(sorted([group.dn for group in role.groups]), [self.test_data.get('group_uffd_access').get('dn'),
self.test_data.get('group_users').get('dn')])
# TODO: verify that group memberships are updated (currently not possible with ldap mock!)
def test_create(self):
self.assertIsNone(Role.query.filter_by(name='base').first())
r = self.client.post(path=url_for('role.update'),
data={'name': 'base', 'description': 'Base role description', 'moderator-group': '', 'group-20001': '1', 'group-20002': '1'},
follow_redirects=True)
dump('role_create', r)
self.assertEqual(r.status_code, 200)
role = Role.query.filter_by(name='base').first()
self.assertIsNotNone(role)
self.assertEqual(role.name, 'base')
self.assertEqual(role.description, 'Base role description')
self.assertEqual(sorted([group.dn for group in role.groups]), [self.test_data.get('group_uffd_access').get('dn'),
self.test_data.get('group_users').get('dn')])
# TODO: verify that group memberships are updated (currently not possible with ldap mock!)
def test_create_with_moderator_group(self):
self.assertIsNone(Role.query.filter_by(name='base').first())
r = self.client.post(path=url_for('role.update'),
data={'name': 'base', 'description': 'Base role description', 'moderator-group': self.test_data.get('group_uffd_admin').get('dn'), 'group-20001': '1', 'group-20002': '1'},
follow_redirects=True)
self.assertEqual(r.status_code, 200)
role = Role.query.filter_by(name='base').first()
self.assertIsNotNone(role)
self.assertEqual(role.name, 'base')
self.assertEqual(role.description, 'Base role description')
self.assertEqual(role.moderator_group.name, 'uffd_admin')
self.assertEqual(sorted([group.dn for group in role.groups]), [self.test_data.get('group_uffd_access').get('dn'),
self.test_data.get('group_users').get('dn')])
# TODO: verify that group memberships are updated (currently not possible with ldap mock!)
def test_delete(self):
role = Role(name='base', description='Base role description')
db.session.add(role)
db.session.commit()
role_id = role.id
self.assertIsNotNone(Role.query.get(role_id))
r = self.client.get(path=url_for('role.delete', roleid=role.id), follow_redirects=True)
dump('role_delete', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(Role.query.get(role_id))
# TODO: verify that group memberships are updated (currently not possible with ldap mock!)
def test_set_default(self):
for user in User.query.filter_by(loginname='service').all():
ldap.session.delete(user)
ldap.session.add(User(loginname='service', is_service_user=True, mail='service@example.com', displayname='Service'))
ldap.session.commit()
role = Role(name='test')
db.session.add(role)
user1 = self.get_user()
user2 = self.get_admin()
service_user = User.query.get('uid=service,{}'.format(self.app.config['LDAP_USER_SEARCH_BASE']))
self.assertSetEqual(set(user1.roles_effective), set())
self.assertSetEqual(set(user2.roles_effective), set())
self.assertSetEqual(set(service_user.roles_effective), set())
role.members.add(user1)
role.members.add(service_user)
self.assertSetEqual(set(user1.roles_effective), {role})
self.assertSetEqual(set(user2.roles_effective), set())
self.assertSetEqual(set(service_user.roles_effective), {role})
db.session.commit()
role_id = role.id
self.assertSetEqual(set(role.members), {user1, service_user})
r = self.client.get(path=url_for('role.set_default', roleid=role.id), follow_redirects=True)
dump('role_set_default', r)
self.assertEqual(r.status_code, 200)
role = Role.query.get(role_id)
service_user = User.query.get('uid=service,{}'.format(self.app.config['LDAP_USER_SEARCH_BASE']))
self.assertSetEqual(set(role.members), {service_user})
self.assertSetEqual(set(user1.roles_effective), {role})
self.assertSetEqual(set(user2.roles_effective), {role})
ldap.session.delete(service_user)
ldap.session.commit()
def test_unset_default(self):
for user in User.query.filter_by(loginname='service').all():
ldap.session.delete(user)
ldap.session.add(User(loginname='service', is_service_user=True, mail='service@example.com', displayname='Service'))
ldap.session.commit()
role = Role(name='test', is_default=True)
db.session.add(role)
user1 = self.get_user()
user2 = self.get_admin()
service_user = User.query.get('uid=service,{}'.format(self.app.config['LDAP_USER_SEARCH_BASE']))
role.members.add(service_user)
self.assertSetEqual(set(user1.roles_effective), {role})
self.assertSetEqual(set(user2.roles_effective), {role})
self.assertSetEqual(set(service_user.roles_effective), {role})
db.session.commit()
role_id = role.id
self.assertSetEqual(set(role.members), {service_user})
r = self.client.get(path=url_for('role.unset_default', roleid=role.id), follow_redirects=True)
dump('role_unset_default', r)
self.assertEqual(r.status_code, 200)
role = Role.query.get(role_id)
service_user = User.query.get('uid=service,{}'.format(self.app.config['LDAP_USER_SEARCH_BASE']))
self.assertSetEqual(set(role.members), {service_user})
self.assertSetEqual(set(user1.roles_effective), set())
self.assertSetEqual(set(user2.roles_effective), set())
ldap.session.delete(service_user)
ldap.session.commit()
class TestRoleViewsOL(TestRoleViews):
use_openldap = True