Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • Dockerfile
  • feature_invite_validuntil_minmax
  • incremental-sync
  • jwt_encode_inconsistencies
  • master
  • redis-rate-limits
  • roles-recursive-cte
  • typehints
  • v1.0.x
  • v1.1.x
  • v1.2.x
  • v1.x.x
  • v0.1.2
  • v0.1.4
  • v0.1.5
  • v0.2.0
  • v0.3.0
  • v1.0.0
  • v1.0.1
  • v1.0.2
  • v1.1.0
  • v1.1.1
  • v1.1.2
  • v1.2.0
  • v2.0.0
  • v2.0.1
  • v2.1.0
  • v2.2.0
  • v2.3.0
  • v2.3.1
30 results

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Select Git revision
  • Dockerfile
  • claims-in-idtoke
  • feature_invite_validuntil_minmax
  • incremental-sync
  • jwt_encode_inconsistencies
  • master
  • recovery-code-pwhash
  • redis-rate-limits
  • roles-recursive-cte
  • typehints
  • v1.0.x
  • v1.1.x
  • v1.2.x
  • v1.x.x
  • v0.1.2
  • v0.1.4
  • v0.1.5
  • v0.2.0
  • v0.3.0
  • v1.0.0
  • v1.0.1
  • v1.0.2
  • v1.1.0
  • v1.1.1
  • v1.1.2
  • v1.2.0
  • v2.0.0
  • v2.0.1
  • v2.1.0
  • v2.2.0
  • v2.3.0
  • v2.3.1
32 results
Show changes
Showing
with 1689 additions and 15 deletions
#!/usr/bin/env python3
from werkzeug.serving import make_ssl_devcert
from uffd import *
if __name__ == '__main__':
app = create_app()
init_db(app)
print(app.url_map)
if not os.path.exists('devcert.crt') or not os.path.exists('devcert.key'):
make_ssl_devcert('devcert')
# WebAuthn requires https and a hostname (not just an IP address). If you
# don't want to test U2F/FIDO2 device registration/authorization, you can
# safely remove `host` and `ssl_context`.
app.run(threaded=True, debug=True, host='localhost', ssl_context=('devcert.crt', 'devcert.key'))
from setuptools import setup, find_packages
import os
with open('README.md', 'r', encoding='utf-8') as f:
long_description = f.read()
long_description = '**DO NOT INSTALL FROM PIP FOR PRODUCTION DEPLOYMENTS**, see [Deployment](#Deployment) for more information.\n\n\n\n' + long_description
setup(
name='uffd',
version=os.environ.get('PACKAGE_VERSION', 'local'),
description='Web-based user management and single sign-on software',
long_description=long_description,
long_description_content_type='text/markdown',
url='https://git.cccv.de/uffd/uffd',
classifiers=[
'Programming Language :: Python :: 3',
'Development Status :: 5 - Production/Stable',
'License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)',
'Operating System :: OS Independent',
'Topic :: System :: Systems Administration :: Authentication/Directory :: LDAP',
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
'Environment :: Web Environment',
'Framework :: Flask',
],
author='CCCV',
author_email='it@cccv.de',
license='AGPL3',
packages=['uffd'],
include_package_data=True,
zip_safe=False,
python_requires='>=3.7',
install_requires=[
# Versions Debian Buster packages are based on.
# DO NOT USE FOR PRODUCTION, those in the setup.py are not updated regularly
'flask==1.0.2',
'Flask-SQLAlchemy==2.1',
'qrcode==6.1',
'fido2==0.5.0',
'cryptography==2.6.1',
'pyjwt==1.7.0',
'Flask-Migrate==2.1.1',
'Flask-Babel==0.11.2',
'alembic==1.0.0',
'argon2-cffi==18.3.0',
'itsdangerous==0.24',
'prometheus-client==0.9',
'ua-parser==0.8.0',
# The main dependencies on their own lead to version collisions and pip is
# not very good at resolving them, so we pin the versions from Debian Buster
# for all dependencies.
'certifi==2018.8.24',
#cffi==1.12.2'
'cffi # v1.12.2 no longer works with python3.9. Newer versions seem to work fine.',
'chardet==3.0.4',
'click==7.0',
'idna==2.6',
'Jinja2==2.10',
'MarkupSafe==1.1.0',
'pyasn1==0.4.2',
'pycparser==2.19',
'requests==2.21.0',
'requests-oauthlib==1.0.0',
'six==1.12.0',
'SQLAlchemy==1.2.18',
'urllib3==1.24.1',
'Werkzeug==0.14.1',
'python-dateutil==2.7.3',
#editor==1.0.3
'Mako==1.0.7',
],
)
File moved
from uffd.database import db
from uffd.models import User, Role, RoleGroup
from tests.utils import UffdTestCase
class TestRoleCLI(UffdTestCase):
def setUp(self):
super().setUp()
role = Role(name='admin')
db.session.add(role)
role.groups[self.get_admin_group()] = RoleGroup(group=self.get_admin_group())
role.members.append(self.get_admin())
role = Role(name='base', is_default=True)
db.session.add(role)
role.groups[self.get_access_group()] = RoleGroup(group=self.get_access_group())
db.session.add(Role(name='test'))
for user in User.query:
user.update_groups()
db.session.commit()
self.client.__exit__(None, None, None)
def test_list(self):
result = self.app.test_cli_runner().invoke(args=['role', 'list'])
self.assertEqual(result.exit_code, 0)
def test_show(self):
result = self.app.test_cli_runner().invoke(args=['role', 'show', 'admin'])
self.assertEqual(result.exit_code, 0)
result = self.app.test_cli_runner().invoke(args=['role', 'show', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
def test_create(self):
result = self.app.test_cli_runner().invoke(args=['role', 'create', 'test']) # conflicting name
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--moderator-group', 'doesnotexist']) # invalid mod group
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--add-group', 'doesnotexist']) # invalid group
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--add-role', 'doesnotexist']) # invalid role
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--description', 'Role description',
'--moderator-group', 'uffd_admin', '--add-group', 'users',
'--add-role', 'admin'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='newrole').one()
self.assertIsNotNone(role)
self.assertEqual(role.description, 'Role description')
self.assertEqual(role.moderator_group, self.get_admin_group())
self.assertEqual(list(role.groups), [self.get_users_group()])
self.assertEqual(role.included_roles, Role.query.filter_by(name='admin').all())
with self.app.test_request_context():
for user in User.query:
self.assertNotIn(self.get_users_group(), user.groups)
result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newbase', '--add-group', 'users', '--default'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
for user in User.query:
self.assertIn(self.get_users_group(), user.groups)
def test_update(self):
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'doesnotexist', '--description', 'New description'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--add-group', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--remove-group', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--add-role', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--remove-role', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--moderator-group', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--description', 'New description',
'--moderator-group', 'uffd_admin', '--add-group', 'users',
'--remove-group', 'uffd_access', '--add-role', 'admin'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='base').first()
self.assertIsNotNone(role)
self.assertEqual(role.description, 'New description')
self.assertEqual(role.moderator_group, self.get_admin_group())
self.assertEqual(list(role.groups), [self.get_users_group()])
self.assertEqual(role.included_roles, Role.query.filter_by(name='admin').all())
self.assertEqual(set(self.get_user().groups), {self.get_users_group(), self.get_admin_group()})
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--no-moderator-group', '--clear-groups',
'--add-group', 'uffd_access', '--remove-role', 'admin',
'--add-role', 'test'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='base').first()
self.assertIsNone(role.moderator_group)
self.assertEqual(list(role.groups), [self.get_access_group()])
self.assertEqual(role.included_roles, Role.query.filter_by(name='test').all())
self.assertEqual(set(self.get_user().groups), {self.get_access_group()})
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--clear-roles'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='base').first()
self.assertEqual(role.included_roles, [])
self.assertEqual(role.is_default, True)
self.assertEqual(set(self.get_user().groups), {self.get_access_group()})
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--no-default'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='base').first()
self.assertEqual(role.is_default, False)
self.assertEqual(set(self.get_user().groups), set())
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--default'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='base').first()
self.assertEqual(role.is_default, True)
self.assertEqual(set(self.get_user().groups), {self.get_access_group()})
# Regression test for https://git.cccv.de/uffd/uffd/-/issues/156
def test_update_without_description(self):
with self.app.test_request_context():
role = Role.query.filter_by(name='test').first()
role.description = 'Test description'
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--clear-groups'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='test').first()
self.assertEqual(role.description, 'Test description')
def test_delete(self):
with self.app.test_request_context():
self.assertIsNotNone(Role.query.filter_by(name='test').first())
result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'test'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertIsNone(Role.query.filter_by(name='test').first())
result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
with self.app.test_request_context():
self.assertIn(self.get_admin_group(), self.get_admin().groups)
result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'admin'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertNotIn(self.get_admin_group(), self.get_admin().groups)
with self.app.test_request_context():
self.assertIn(self.get_access_group(), self.get_user().groups)
result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'base'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertNotIn(self.get_access_group(), self.get_user().groups)
from uffd.database import db
from uffd.models import User, UserEmail, FeatureFlag
from tests.utils import UffdTestCase
class TestUniqueEmailAddressCommands(UffdTestCase):
def setUp(self):
super().setUp()
self.client.__exit__(None, None, None)
def test_enable(self):
result = self.app.test_cli_runner().invoke(args=['unique-email-addresses', 'enable'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertTrue(FeatureFlag.unique_email_addresses)
def test_enable_already_enabled(self):
with self.app.test_request_context():
FeatureFlag.unique_email_addresses.enable()
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['unique-email-addresses', 'enable'])
self.assertEqual(result.exit_code, 1)
def test_enable_user_conflict(self):
with self.app.test_request_context():
db.session.add(UserEmail(user=self.get_user(), address='foo@example.com'))
db.session.add(UserEmail(user=self.get_user(), address='FOO@example.com'))
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['unique-email-addresses', 'enable'])
self.assertEqual(result.exit_code, 1)
def test_enable_global_conflict(self):
with self.app.test_request_context():
db.session.add(UserEmail(user=self.get_user(), address='foo@example.com', verified=True))
db.session.add(UserEmail(user=self.get_admin(), address='FOO@example.com', verified=True))
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['unique-email-addresses', 'enable'])
self.assertEqual(result.exit_code, 1)
def test_disable(self):
with self.app.test_request_context():
FeatureFlag.unique_email_addresses.enable()
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['unique-email-addresses', 'disable'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertFalse(FeatureFlag.unique_email_addresses)
def test_disable_already_enabled(self):
result = self.app.test_cli_runner().invoke(args=['unique-email-addresses', 'disable'])
self.assertEqual(result.exit_code, 1)
from uffd.database import db
from uffd.models import User, Group, Role, RoleGroup, FeatureFlag
from tests.utils import UffdTestCase
class TestUserCLI(UffdTestCase):
def setUp(self):
super().setUp()
role = Role(name='admin')
role.groups[self.get_admin_group()] = RoleGroup(group=self.get_admin_group())
db.session.add(role)
db.session.add(Role(name='test'))
db.session.commit()
self.client.__exit__(None, None, None)
def test_list(self):
result = self.app.test_cli_runner().invoke(args=['user', 'list'])
self.assertEqual(result.exit_code, 0)
def test_show(self):
result = self.app.test_cli_runner().invoke(args=['user', 'show', 'testuser'])
self.assertEqual(result.exit_code, 0)
result = self.app.test_cli_runner().invoke(args=['user', 'show', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
def test_create(self):
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'new user', '--mail', 'foobar@example.com']) # invalid login name
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', '']) # invalid mail
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'foobar@example.com', '--password', '']) # invalid password
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'foobar@example.com', '--displayname', '']) # invalid display name
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'foobar@example.com', '--add-role', 'doesnotexist']) # unknown role
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'testuser', '--mail', 'foobar@example.com']) # conflicting name
self.assertEqual(result.exit_code, 1)
with self.app.test_request_context():
FeatureFlag.unique_email_addresses.enable()
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'test@example.com']) # conflicting email address
self.assertEqual(result.exit_code, 1)
with self.app.test_request_context():
FeatureFlag.unique_email_addresses.disable()
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'newmail@example.com',
'--displayname', 'New Display Name', '--password', 'newpassword', '--add-role', 'admin'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='newuser').first()
self.assertIsNotNone(user)
self.assertEqual(user.primary_email.address, 'newmail@example.com')
self.assertEqual(user.displayname, 'New Display Name')
self.assertTrue(user.password.verify('newpassword'))
self.assertEqual(user.roles, Role.query.filter_by(name='admin').all())
self.assertIn(self.get_admin_group(), user.groups)
self.assertFalse(user.is_deactivated)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser2', '--mail', 'newmail2@example.com', '--deactivate'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='newuser2').first()
self.assertTrue(user.is_deactivated)
def test_update(self):
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'doesnotexist', '--displayname', 'foo'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--mail', '']) # invalid mail
self.assertEqual(result.exit_code, 1)
with self.app.test_request_context():
FeatureFlag.unique_email_addresses.enable()
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--mail', 'admin@example.com']) # conflicting mail
self.assertEqual(result.exit_code, 1)
with self.app.test_request_context():
FeatureFlag.unique_email_addresses.disable()
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--password', '']) # invalid password
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--displayname', '']) # invalid display name
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--remove-role', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--mail', 'newmail@example.com',
'--displayname', 'New Display Name', '--password', 'newpassword'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertIsNotNone(user)
self.assertEqual(user.primary_email.address, 'newmail@example.com')
self.assertEqual(user.displayname, 'New Display Name')
self.assertTrue(user.password.verify('newpassword'))
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--add-role', 'admin', '--add-role', 'test'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertEqual(set(user.roles), {Role.query.filter_by(name='admin').one(), Role.query.filter_by(name='test').one()})
self.assertIn(self.get_admin_group(), user.groups)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--remove-role', 'admin'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertEqual(user.roles, Role.query.filter_by(name='test').all())
self.assertNotIn(self.get_admin_group(), user.groups)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--clear-roles', '--add-role', 'admin'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertEqual(user.roles, Role.query.filter_by(name='admin').all())
self.assertIn(self.get_admin_group(), user.groups)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--deactivate'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertTrue(user.is_deactivated)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--activate'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertFalse(user.is_deactivated)
def test_delete(self):
with self.app.test_request_context():
self.assertIsNotNone(User.query.filter_by(loginname='testuser').first())
result = self.app.test_cli_runner().invoke(args=['user', 'delete', 'testuser'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertIsNone(User.query.filter_by(loginname='testuser').first())
result = self.app.test_cli_runner().invoke(args=['user', 'delete', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
class TestGroupCLI(UffdTestCase):
def setUp(self):
super().setUp()
self.client.__exit__(None, None, None)
def test_list(self):
result = self.app.test_cli_runner().invoke(args=['group', 'list'])
self.assertEqual(result.exit_code, 0)
def test_show(self):
result = self.app.test_cli_runner().invoke(args=['group', 'show', 'users'])
self.assertEqual(result.exit_code, 0)
result = self.app.test_cli_runner().invoke(args=['group', 'show', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
def test_create(self):
result = self.app.test_cli_runner().invoke(args=['group', 'create', 'users']) # Duplicate name
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['group', 'create', 'new group'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['group', 'create', 'newgroup', '--description', 'A new group'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
group = Group.query.filter_by(name='newgroup').first()
self.assertIsNotNone(group)
self.assertEqual(group.description, 'A new group')
def test_update(self):
result = self.app.test_cli_runner().invoke(args=['group', 'update', 'doesnotexist', '--description', 'foo'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['group', 'update', 'users', '--description', 'New description'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
group = Group.query.filter_by(name='users').first()
self.assertEqual(group.description, 'New description')
def test_update_without_description(self):
result = self.app.test_cli_runner().invoke(args=['group', 'update', 'users']) # Should not change anything
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
group = Group.query.filter_by(name='users').first()
self.assertEqual(group.description, 'Base group for all users')
def test_delete(self):
with self.app.test_request_context():
self.assertIsNotNone(Group.query.filter_by(name='users').first())
result = self.app.test_cli_runner().invoke(args=['group', 'delete', 'users'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertIsNone(Group.query.filter_by(name='users').first())
result = self.app.test_cli_runner().invoke(args=['group', 'delete', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
import os
import sys
import datetime
from uffd.database import db
from uffd.models import (
User, UserEmail, Group,
RecoveryCodeMethod, TOTPMethod, WebauthnMethod,
Role, RoleGroup,
Signup,
Invite, InviteGrant, InviteSignup,
DeviceLoginConfirmation,
Service,
OAuth2Client, OAuth2LogoutURI, OAuth2Grant, OAuth2Token, OAuth2DeviceLoginInitiation,
PasswordToken,
Session,
)
from tests.utils import MigrationTestCase
class TestFuzzy(MigrationTestCase):
def setUpApp(self):
self.app.config['LDAP_SERVICE_MOCK'] = True
self.app.config['OAUTH2_CLIENTS'] = {
'test': {
'service_name': 'test',
'client_secret': 'testsecret',
'redirect_uris': ['http://localhost:5004/oauthproxy/callback'],
'logout_urls': ['http://localhost:5004/oauthproxy/logout']
}
}
self.app.config['API_CLIENTS_2'] = {
'test': {
'service_name': 'test',
'client_secret': 'testsecret',
'scopes': ['checkpassword', 'getusers', 'getmails']
},
}
# Runs every upgrade/downgrade script with data. To do this we first upgrade
# to head, create data, then downgrade, upgrade, downgrade for every revision.
def test_migrations_fuzzy(self):
self.upgrade('head')
# Users and groups were created by 878b25c4fae7_ldap_to_db because we set LDAP_SERVICE_MOCK to True
user = User.query.first()
group = Group.query.first()
db.session.add(RecoveryCodeMethod(user=user))
db.session.add(TOTPMethod(user=user, name='mytotp'))
db.session.add(WebauthnMethod(user=user, name='mywebauthn', cred=b''))
role = Role(name='role', groups={group: RoleGroup(group=group)})
db.session.add(role)
role.members.append(user)
db.session.add(Role(name='base', included_roles=[role], locked=True, is_default=True, moderator_group=group, groups={group: RoleGroup(group=group)}))
db.session.add(Signup(loginname='newuser', displayname='New User', mail='newuser@example.com', password='newpassword'))
db.session.add(Signup(loginname='testuser', displayname='Testuser', mail='testuser@example.com', password='testpassword', user=user))
invite = Invite(valid_until=datetime.datetime.now(), roles=[role])
db.session.add(invite)
invite.signups.append(InviteSignup(loginname='newuser', displayname='New User', mail='newuser@example.com', password='newpassword'))
invite.grants.append(InviteGrant(user=user))
db.session.add(Invite(creator=user, valid_until=datetime.datetime.now()))
service = Service(name='testservice', access_group=group)
oauth2_client = OAuth2Client(service=service, client_id='testclient', client_secret='testsecret', redirect_uris=['http://localhost:1234/callback'], logout_uris=[OAuth2LogoutURI(method='GET', uri='http://localhost:1234/callback')])
db.session.add_all([service, oauth2_client])
session = Session(
user=user,
secret='0919de9da3f7dc6c33ab849f44c20e8221b673ca701030de17488f3269fc5469f100e2ce56e5fd71305b23d8ecbb06d80d22004adcd3fefc5f5fcb80a436e31f2c2d9cc8fe8c59ae44871ae4524408d312474570280bf29d3ba145a4bd00010ca758eaa0795b180ec12978b42d13bf4c4f06f72103d44077022ce656610be855',
ip_address='127.0.0.1',
user_agent='Mozilla/5.0 (X11; Linux x86_64; rv:124.0) Gecko/20100101 Firefox/124.0',
mfa_done=True,
)
db.session.add(session)
db.session.add(OAuth2Grant(session=session, client=oauth2_client, _code='testcode', redirect_uri='http://example.com/callback', expires=datetime.datetime.now()))
db.session.add(OAuth2Token(session=session, client=oauth2_client, token_type='Bearer', _access_token='testcode', _refresh_token='testcode', expires=datetime.datetime.now()))
db.session.add(OAuth2DeviceLoginInitiation(client=oauth2_client, confirmations=[DeviceLoginConfirmation(session=session)]))
db.session.add(PasswordToken(user=user))
db.session.commit()
revs = [s.split('_', 1)[0] for s in os.listdir('uffd/migrations/versions') if '_' in s and s.endswith('.py')]
for rev in revs:
self.downgrade('-1')
self.upgrade('+1')
self.downgrade('-1')
from uffd.database import db
from uffd.models.misc import lock_table, Lock
from tests.utils import MigrationTestCase
class TestForMissingLockRows(MigrationTestCase):
def test_check_missing_lock_rows(self):
self.upgrade('head')
existing_locks = {row[0] for row in db.session.execute(db.select([lock_table.c.name])).fetchall()}
for name in Lock.ALL_LOCKS - existing_locks:
self.fail(f'Lock "{name}" is missing. Make sure to add a migration that inserts it.')
# Add something like this:
# conn = op.get_bind()
# lock_table = sa.table('lock', sa.column('name'))
# conn.execute(sa.insert(lock_table).values(name='NAME'))
from uffd.database import db
from uffd.models.misc import lock_table, Lock
from tests.utils import MigrationTestCase
user_table = db.table('user',
db.column('id'),
db.column('unix_uid'),
db.column('loginname'),
db.column('displayname'),
db.column('primary_email_id'),
db.column('is_service_user'),
)
user_email_table = db.table('user_email',
db.column('id'),
db.column('address'),
db.column('address_normalized'),
db.column('verified'),
)
group_table = db.table('group',
db.column('id'),
db.column('unix_gid'),
db.column('name'),
db.column('description')
)
uid_allocation_table = db.table('uid_allocation', db.column('id'))
gid_allocation_table = db.table('gid_allocation', db.column('id'))
class TestMigration(MigrationTestCase):
REVISION = 'aeb07202a6c8'
def setUpApp(self):
self.app.config['USER_MIN_UID'] = 10000
self.app.config['USER_MAX_UID'] = 10005
self.app.config['USER_SERVICE_MIN_UID'] = 10006
self.app.config['USER_SERVICE_MAX_UID'] = 10010
self.app.config['GROUP_MIN_GID'] = 20000
self.app.config['GROUP_MAX_GID'] = 20005
def create_user(self, uid):
db.session.execute(db.insert(user_email_table).values(
address=f'email{uid}@example.com',
address_normalized=f'email{uid}@example.com',
verified=True
))
email_id = db.session.execute(
db.select([user_email_table.c.id])
.where(user_email_table.c.address == f'email{uid}@example.com')
).scalar()
db.session.execute(db.insert(user_table).values(
unix_uid=uid,
loginname=f'user{uid}',
displayname='user',
primary_email_id=email_id,
is_service_user=False
))
def create_group(self, gid):
db.session.execute(db.insert(group_table).values(unix_gid=gid, name=f'group{gid}', description=''))
def fetch_uid_allocations(self):
return [row[0] for row in db.session.execute(
db.select([uid_allocation_table])
.order_by(uid_allocation_table.c.id)
).fetchall()]
def fetch_gid_allocations(self):
return [row[0] for row in db.session.execute(
db.select([gid_allocation_table])
.order_by(gid_allocation_table.c.id)
).fetchall()]
def test_empty(self):
# No users/groups
self.upgrade()
self.assertEqual(self.fetch_uid_allocations(), [])
self.assertEqual(self.fetch_gid_allocations(), [])
def test_gid_first_minus_one(self):
self.create_group(19999)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [19999])
def test_gid_first(self):
self.create_group(20000)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [20000])
def test_gid_first_plus_one(self):
self.create_group(20001)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [20000, 20001])
def test_gid_last_minus_one(self):
self.create_group(20004)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [20000, 20001, 20002, 20003, 20004])
def test_gid_last(self):
self.create_group(20005)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [20000, 20001, 20002, 20003, 20004, 20005])
def test_gid_last_plus_one(self):
self.create_group(20006)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [20006])
def test_gid_complex(self):
self.create_group(10)
self.create_group(20001)
self.create_group(20003)
self.create_group(20010)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [10, 20000, 20001, 20002, 20003, 20010])
# The code for UIDs is mostly the same as for GIDs, so we don't test all
# the edge cases again.
def test_uid_different_ranges(self):
self.create_user(10)
self.create_user(10000)
self.create_user(10002)
self.create_user(10007)
self.create_user(10009)
self.create_user(90000)
self.upgrade()
self.assertEqual(self.fetch_uid_allocations(), [10, 10000, 10001, 10002, 10006, 10007, 10008, 10009, 90000])
def test_uid_same_ranges(self):
self.app.config['USER_MIN_UID'] = 10000
self.app.config['USER_MAX_UID'] = 10010
self.app.config['USER_SERVICE_MIN_UID'] = 10000
self.app.config['USER_SERVICE_MAX_UID'] = 10010
self.create_user(10)
self.create_user(10000)
self.create_user(10002)
self.create_user(10007)
self.create_user(10009)
self.create_user(90000)
self.upgrade()
self.assertEqual(self.fetch_uid_allocations(), [10, 10000, 10001, 10002, 10003, 10004, 10005, 10006, 10007, 10008, 10009, 90000])
import datetime
from flask import current_app
from uffd.database import db
from uffd.models import Invite, InviteGrant, InviteSignup, User, Role, RoleGroup
from tests.utils import UffdTestCase, db_flush
class TestInviteModel(UffdTestCase):
def test_expire(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), creator=self.get_admin())
self.assertFalse(invite.expired)
self.assertTrue(invite.active)
invite.valid_until = datetime.datetime.utcnow() - datetime.timedelta(seconds=60)
self.assertTrue(invite.expired)
self.assertFalse(invite.active)
def test_void(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), single_use=False, creator=self.get_admin())
self.assertFalse(invite.voided)
self.assertTrue(invite.active)
invite.used = True
self.assertFalse(invite.voided)
self.assertTrue(invite.active)
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), single_use=True, creator=self.get_admin())
self.assertFalse(invite.voided)
self.assertTrue(invite.active)
invite.used = True
self.assertTrue(invite.voided)
self.assertFalse(invite.active)
def test_permitted(self):
role = Role(name='testrole')
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), allow_signup=True, roles=[role])
self.assertFalse(invite.permitted)
self.assertFalse(invite.active)
invite.creator = self.get_admin()
self.assertTrue(invite.permitted)
self.assertTrue(invite.active)
invite.creator.is_deactivated = True
self.assertFalse(invite.permitted)
self.assertFalse(invite.active)
invite.creator = self.get_user()
self.assertFalse(invite.permitted)
self.assertFalse(invite.active)
role.moderator_group = self.get_access_group()
current_app.config['ACL_SIGNUP_GROUP'] = 'uffd_access'
self.assertTrue(invite.permitted)
self.assertTrue(invite.active)
role.moderator_group = None
self.assertFalse(invite.permitted)
self.assertFalse(invite.active)
role.moderator_group = self.get_access_group()
current_app.config['ACL_SIGNUP_GROUP'] = 'uffd_admin'
self.assertFalse(invite.permitted)
self.assertFalse(invite.active)
def test_disable(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), creator=self.get_admin())
self.assertTrue(invite.active)
invite.disable()
self.assertFalse(invite.active)
def test_reset_disabled(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), creator=self.get_admin())
invite.disable()
self.assertFalse(invite.active)
invite.reset()
self.assertTrue(invite.active)
def test_reset_expired(self):
invite = Invite(valid_until=datetime.datetime.utcnow() - datetime.timedelta(seconds=60), creator=self.get_admin())
self.assertFalse(invite.active)
invite.reset()
self.assertFalse(invite.active)
def test_reset_single_use(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), single_use=False, creator=self.get_admin())
invite.used = True
invite.disable()
self.assertFalse(invite.active)
invite.reset()
self.assertTrue(invite.active)
def test_short_token(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), creator=self.get_admin())
db.session.add(invite)
db.session.commit()
self.assertTrue(len(invite.short_token) <= len(invite.token)/3)
class TestInviteGrantModel(UffdTestCase):
def test_success(self):
user = self.get_user()
group0 = self.get_access_group()
role0 = Role(name='baserole', groups={group0: RoleGroup(group=group0)})
db.session.add(role0)
user.roles.append(role0)
user.update_groups()
group1 = self.get_admin_group()
role1 = Role(name='testrole1', groups={group1: RoleGroup(group=group1)})
db.session.add(role1)
role2 = Role(name='testrole2')
db.session.add(role2)
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), roles=[role1, role2], creator=self.get_admin())
self.assertIn(role0, user.roles)
self.assertNotIn(role1, user.roles)
self.assertNotIn(role2, user.roles)
self.assertIn(group0, user.groups)
self.assertNotIn(group1, user.groups)
self.assertFalse(invite.used)
grant = InviteGrant(invite=invite, user=user)
success, msg = grant.apply()
self.assertTrue(success)
self.assertIn(role0, user.roles)
self.assertIn(role1, user.roles)
self.assertIn(role2, user.roles)
self.assertIn(group0, user.groups)
self.assertIn(group1, user.groups)
self.assertTrue(invite.used)
db.session.commit()
db_flush()
user = self.get_user()
self.assertIn('baserole', [role.name for role in user.roles_effective])
self.assertIn('testrole1', [role.name for role in user.roles])
self.assertIn('testrole2', [role.name for role in user.roles])
self.assertIn(self.get_access_group(), user.groups)
self.assertIn(self.get_admin_group(), user.groups)
def test_inactive(self):
user = self.get_user()
group = self.get_admin_group()
role = Role(name='testrole1', groups={group: RoleGroup(group=group)})
db.session.add(role)
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), roles=[role], single_use=True, used=True, creator=self.get_admin())
self.assertFalse(invite.active)
grant = InviteGrant(invite=invite, user=user)
success, msg = grant.apply()
self.assertFalse(success)
self.assertIsInstance(msg, str)
self.assertNotIn(role, user.roles)
self.assertNotIn(group, user.groups)
def test_no_roles(self):
user = self.get_user()
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), creator=self.get_admin())
self.assertTrue(invite.active)
grant = InviteGrant(invite=invite, user=user)
success, msg = grant.apply()
self.assertFalse(success)
self.assertIsInstance(msg, str)
def test_no_new_roles(self):
user = self.get_user()
role = Role(name='testrole1')
db.session.add(role)
user.roles.append(role)
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), roles=[role], creator=self.get_admin())
self.assertTrue(invite.active)
grant = InviteGrant(invite=invite, user=user)
success, msg = grant.apply()
self.assertFalse(success)
self.assertIsInstance(msg, str)
class TestInviteSignupModel(UffdTestCase):
def create_base_roles(self):
baserole = Role(name='base', is_default=True)
baserole.groups[self.get_access_group()] = RoleGroup()
baserole.groups[self.get_users_group()] = RoleGroup()
db.session.add(baserole)
db.session.commit()
def test_success(self):
self.create_base_roles()
base_role = Role.query.filter_by(name='base').one()
base_group1 = self.get_access_group()
base_group2 = self.get_users_group()
group = self.get_admin_group()
role1 = Role(name='testrole1', groups={group: RoleGroup(group=group)})
db.session.add(role1)
role2 = Role(name='testrole2')
db.session.add(role2)
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), roles=[role1, role2], allow_signup=True, creator=self.get_admin())
signup = InviteSignup(invite=invite, loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
self.assertFalse(invite.used)
valid, msg = signup.validate()
self.assertTrue(valid)
self.assertFalse(invite.used)
user, msg = signup.finish('notsecret')
self.assertIsInstance(user, User)
self.assertTrue(invite.used)
self.assertEqual(user.loginname, 'newuser')
self.assertEqual(user.displayname, 'New User')
self.assertEqual(user.primary_email.address, 'test@example.com')
self.assertEqual(signup.user, user)
self.assertIn(base_role, user.roles_effective)
self.assertIn(role1, user.roles)
self.assertIn(role2, user.roles)
self.assertIn(base_group1, user.groups)
self.assertIn(base_group2, user.groups)
self.assertIn(group, user.groups)
db.session.commit()
db_flush()
self.assertEqual(len(User.query.filter_by(loginname='newuser').all()), 1)
def test_success_no_roles(self):
self.create_base_roles()
base_role = Role.query.filter_by(name='base').one()
base_group1 = self.get_access_group()
base_group2 = self.get_users_group()
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), allow_signup=True, creator=self.get_admin())
signup = InviteSignup(invite=invite, loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
self.assertFalse(invite.used)
valid, msg = signup.validate()
self.assertTrue(valid)
self.assertFalse(invite.used)
user, msg = signup.finish('notsecret')
self.assertIsInstance(user, User)
self.assertTrue(invite.used)
self.assertEqual(user.loginname, 'newuser')
self.assertEqual(user.displayname, 'New User')
self.assertEqual(user.primary_email.address, 'test@example.com')
self.assertEqual(signup.user, user)
self.assertIn(base_role, user.roles_effective)
self.assertEqual(len(user.roles_effective), 1)
self.assertIn(base_group1, user.groups)
self.assertIn(base_group2, user.groups)
self.assertEqual(len(user.groups), 2)
db.session.commit()
db_flush()
self.assertEqual(len(User.query.filter_by(loginname='newuser').all()), 1)
def test_inactive(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), allow_signup=True, single_use=True, used=True, creator=self.get_admin())
self.assertFalse(invite.active)
signup = InviteSignup(invite=invite, loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
valid, msg = signup.validate()
self.assertFalse(valid)
self.assertIsInstance(msg, str)
user, msg = signup.finish('notsecret')
self.assertIsNone(user)
self.assertIsInstance(msg, str)
def test_invalid(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), allow_signup=True, creator=self.get_admin())
self.assertTrue(invite.active)
signup = InviteSignup(invite=invite, loginname='', displayname='New User', mail='test@example.com', password='notsecret')
valid, msg = signup.validate()
self.assertFalse(valid)
self.assertIsInstance(msg, str)
def test_invalid2(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), allow_signup=True, creator=self.get_admin())
self.assertTrue(invite.active)
signup = InviteSignup(invite=invite, loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
user, msg = signup.finish('wrongpassword')
self.assertIsNone(user)
self.assertIsInstance(msg, str)
def test_no_signup(self):
invite = Invite(valid_until=datetime.datetime.utcnow() + datetime.timedelta(seconds=60), allow_signup=False, creator=self.get_admin())
self.assertTrue(invite.active)
signup = InviteSignup(invite=invite, loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
valid, msg = signup.validate()
self.assertFalse(valid)
self.assertIsInstance(msg, str)
user, msg = signup.finish('notsecret')
self.assertIsNone(user)
self.assertIsInstance(msg, str)
import unittest
import datetime
import time
from uffd.database import db
from uffd.models import RecoveryCodeMethod, TOTPMethod, WebauthnMethod
from uffd.models.mfa import _hotp
from tests.utils import UffdTestCase
class TestMfaPrimitives(unittest.TestCase):
def test_hotp(self):
self.assertEqual(_hotp(5555555, b'\xae\xa3T\x05\x89\xd6\xb76\xf61r\x92\xcc\xb5WZ\xe6)\x05q'), '458290')
self.assertEqual(_hotp(5555555, b'\xae\xa3T\x05\x89\xd6\xb76\xf61r\x92\xcc\xb5WZ\xe6)\x05q', digits=8), '20458290')
for digits in range(1, 10):
self.assertEqual(len(_hotp(1, b'abcd', digits=digits)), digits)
self.assertEqual(_hotp(1234, b''), '161024')
self.assertEqual(_hotp(0, b'\x04\x8fM\xcc\x7f\x82\x9c$a\x1b\xb3'), '279354')
self.assertEqual(_hotp(2**64-1, b'abcde'), '899292')
def get_fido2_test_cred(self):
try:
from uffd.fido2_compat import AttestedCredentialData
except ImportError:
self.skipTest('fido2 could not be imported')
# Example public key from webauthn spec 6.5.1.1
return AttestedCredentialData(bytes.fromhex('00000000000000000000000000000000'+'0040'+'053cbcc9d37a61d3bac87cdcc77ee326256def08ab15775d3a720332e4101d14fae95aeee3bc9698781812e143c0597dc6e180595683d501891e9dd030454c0a'+'A501020326200121582065eda5a12577c2bae829437fe338701a10aaa375e1bb5b5de108de439c08551d2258201e52ed75701163f7f9e40ddf9f341b3dc9ba860af7e0ca7ca7e9eecd0084d19c'))
class TestMfaMethodModels(UffdTestCase):
def test_common_attributes(self):
method = TOTPMethod(user=self.get_user(), name='testname')
self.assertTrue(method.created <= datetime.datetime.utcnow())
self.assertEqual(method.name, 'testname')
self.assertEqual(method.user.loginname, 'testuser')
method.user = self.get_admin()
self.assertEqual(method.user.loginname, 'testadmin')
def test_recovery_code_method(self):
method = RecoveryCodeMethod(user=self.get_user())
db.session.add(method)
db.session.commit()
method_id = method.id
method_code = method.code_value
db.session.expunge(method)
method = RecoveryCodeMethod.query.get(method_id)
self.assertFalse(hasattr(method, 'code_value'))
self.assertFalse(method.verify(''))
self.assertFalse(method.verify('A'*8))
self.assertTrue(method.verify(method_code))
def test_totp_method_attributes(self):
method = TOTPMethod(user=self.get_user(), name='testname')
raw_key = method.raw_key
issuer = method.issuer
accountname = method.accountname
key_uri = method.key_uri
self.assertEqual(method.name, 'testname')
# Restore method with key parameter
_method = TOTPMethod(user=self.get_user(), key=method.key, name='testname')
self.assertEqual(_method.name, 'testname')
self.assertEqual(_method.raw_key, raw_key)
self.assertEqual(_method.issuer, issuer)
self.assertEqual(_method.accountname, accountname)
self.assertEqual(_method.key_uri, key_uri)
db.session.add(method)
db.session.commit()
_method_id = _method.id
db.session.expunge(_method)
# Restore method from db
_method = TOTPMethod.query.get(_method_id)
self.assertEqual(_method.name, 'testname')
self.assertEqual(_method.raw_key, raw_key)
self.assertEqual(_method.issuer, issuer)
self.assertEqual(_method.accountname, accountname)
self.assertEqual(_method.key_uri, key_uri)
def test_totp_method_verify(self):
method = TOTPMethod(user=self.get_user())
counter = int(time.time()/30)
self.assertFalse(method.verify(''))
self.assertFalse(method.verify(_hotp(counter-2, method.raw_key)))
self.assertTrue(method.verify(_hotp(counter, method.raw_key)))
self.assertFalse(method.verify(_hotp(counter+2, method.raw_key)))
def test_totp_method_verify_reuse(self):
method = TOTPMethod(user=self.get_user())
counter = int(time.time()/30)
self.assertFalse(method.verify(_hotp(counter-2, method.raw_key)))
self.assertTrue(method.verify(_hotp(counter-1, method.raw_key)))
self.assertTrue(method.verify(_hotp(counter, method.raw_key)))
self.assertFalse(method.verify(_hotp(counter-1, method.raw_key)))
self.assertFalse(method.verify(_hotp(counter, method.raw_key)))
def test_webauthn_method(self):
data = get_fido2_test_cred(self)
method = WebauthnMethod(user=self.get_user(), cred=data, name='testname')
self.assertEqual(method.name, 'testname')
db.session.add(method)
db.session.commit()
method_id = method.id
method_cred = method.cred
db.session.expunge(method)
_method = WebauthnMethod.query.get(method_id)
self.assertEqual(_method.name, 'testname')
self.assertEqual(bytes(method_cred), bytes(_method.cred))
self.assertEqual(data.credential_id, _method.cred.credential_id)
self.assertEqual(data.public_key, _method.cred.public_key)
# We only test (de-)serialization here, as everything else is currently implemented in the views
import time
import threading
from sqlalchemy.exc import IntegrityError
from uffd.database import db
from uffd.models import FeatureFlag, Lock
from uffd.models.misc import feature_flag_table
from tests.utils import ModelTestCase
class TestFeatureFlag(ModelTestCase):
def test_disabled(self):
flag = FeatureFlag('foo')
self.assertFalse(flag)
self.assertFalse(db.session.execute(db.select([flag.expr])).scalar())
def test_enabled(self):
db.session.execute(db.insert(feature_flag_table).values(name='foo'))
flag = FeatureFlag('foo')
self.assertTrue(flag)
self.assertTrue(db.session.execute(db.select([flag.expr])).scalar())
def test_toggle(self):
flag = FeatureFlag('foo')
hooks_called = []
@flag.enable_hook
def enable_hook1():
hooks_called.append('enable1')
@flag.enable_hook
def enable_hook2():
hooks_called.append('enable2')
@flag.disable_hook
def disable_hook1():
hooks_called.append('disable1')
@flag.disable_hook
def disable_hook2():
hooks_called.append('disable2')
hooks_called.clear()
flag.enable()
self.assertTrue(flag)
self.assertEqual(hooks_called, ['enable1', 'enable2'])
hooks_called.clear()
flag.disable()
self.assertFalse(flag)
self.assertEqual(hooks_called, ['disable1', 'disable2'])
flag.disable() # does nothing
self.assertFalse(flag)
flag.enable()
self.assertTrue(flag)
with self.assertRaises(IntegrityError):
flag.enable()
self.assertTrue(flag)
class TestLock(ModelTestCase):
DISABLE_SQLITE_MEMORY_DB = True
def setUpApp(self):
self.lock = Lock('testlock')
def run_lock_test(self):
result = []
def func():
with self.app.test_request_context():
self.lock.acquire()
result.append('bar')
t = threading.Thread(target=func)
t.start()
time.sleep(1)
result.append('foo')
time.sleep(1)
db.session.rollback()
t.join()
return result
def test_lock2(self):
self.assertEqual(self.run_lock_test(), ['bar', 'foo'])
self.lock.acquire()
self.assertEqual(self.run_lock_test(), ['foo', 'bar'])
import unittest
import datetime
import jwt
from uffd.database import db
from uffd.models import OAuth2Key
from tests.utils import UffdTestCase
TEST_JWK = dict(
id='HvOn74G7njK1GoFNe8Dta087casdWMsm06pNhOXRgJU',
created=datetime.datetime(2023, 11, 9, 0, 21, 10),
active=True,
algorithm='RS256',
private_key_jwk='''{
"kty": "RSA",
"key_ops": ["sign"],
"n": "vrznqUy8Xamph6s0Z02fFMIyjwLAMio35i9DXYjXP1ZQwSZ3SsIh3m2ablMnlu8PVlnYUzoj8rXyAWND0FSfWoQQxv1rq15pllKueddLoJsv321N_NRB8beGsLrsndw8QO0q3RWqV9O3kqhlTMjgj6bquX42wLaXrPLJyfbT3zObBsToG4UxpOyly84aklJXU5wIs0cbmjbfd8Xld38BG8Oh7Ozy5b93vPpJW6rudZRxU6QYC0r9bFFLIHJWrR4bzQMLGoJ63xjPOCl4WNpOYc9B7PNgnWTLXlFd51Hw9CaT2MRWsKNCSU77f6nZkfjWa1IsQdF0I48m46qgq7bEOOl9DbThbCnpblWrctdyg6du-OvCyVmkAo1KGtANl0027pgqUI_9HBMi33y3UPQm1ALHXIyIDBZtExH3lD6MMK3XGJfUxZuIOBndK-PXm5Fed52bgLOcf-24X6aHFn-8oyDVIj9OHkKWjy7jtKdmqZc4pBdVuCaMCYzj8iERWA3H",
"e": "AQAB",
"d": "G7yoH5mLcZTA6ia-byCoN-zpofGvdga9AZnxPO0vsq6K_cY_O2gxuVZ3n6reAKKbuLNGCbb_D_Dffs4q8rprlfkgi3TCLzXX5Zv5HWTD7a4Y7xpxEzQ2sWo-iagVIqZVPh0pyjliqnTyUWnFmWiY0gBe9UHianHjFVZqe8E2HFOKgW3UUbQz0keg8JtJ3T9gzZrM38KWbqhOJO0VVSRAoANPTSnumfRsUCyWywrMtIfgAbQaKazqX3xkOsAF1L-iNfd6slzPvRyIQVflVDMdfKnsu-lHiKJ0DK_lg9f55T5FymgcXsq43EKBQ2H4v2dafIm-vtWx_TRZWj_msD32BEPBA-zTqh_oP1r6a3DZh4DBtWY3vzSiuhAC0erlRs-hRTX_e9ET5fUbJnmNxjnxQD9zZmwq4ujMK6KFnHct8t77Qxj3a-wDR_XyDJ4_EKYqHlcVHfxGNBSvIdjuZJkPJnVpVtfCtpyamQIR4u5oNV7fIwYe_tFnw0Y90rGoJMzB",
"p": "-A-FnH21HJ7GPWUm9k3mxsxSchy89QEUCZZiH6EcB4ZP8wJsxrQsUSIHCR74YmZEI3Ulsum1Ql4x50k7Q2sNh9SnwKvmctjksehGy4yCrdunAqjqyz3wFwGaKWnhn3frkiqH5ATjkOoc8qHz8saa7reeVClj47ZWyy-Nl559ycLMs0rI1N_THzO07C3jSbJhyPj0yeygAflsRqqnNvEQ6ps1VLiqf9G5jfSvUUn5DyKIpep9iGo29caGSIPIy_2h",
"q": "xNe1-QWskxOcY_GiHpFWdvzqr1o9fxg5whgpNcGi3caokw2iNHRYut4cbVvFFBlv_9B5QCl9WVfR2ADG0AtvkvUxEZqCdxEvcqjIANeRLKHDjW5kMuPS0_fcskFP-r7mCM9SBfPplfMVCF5nuNWf5LzNopWfsTChIDD1rSpPjItNYuwLXszm_3R81HHHeQLcyvoMxLCmeLy5TXX2hXOMHh2IMZCXAHopJmLJUVnQ48kr5jd2l0kLbmx3aBqdccJn",
"dp": "MLS7g1KbcRcrzXpDADGjkn0j4wwJfgHMMWW5toQnwMJ6iDh9qzZNTVDlGMFf-9IgpuWllU-WK4XbPpJ-dGpcqcLzfT1DbmFv5g65d9YLAqASVs9b6rQqpBnIb0E-79TYCEcZj4f2NsoBDRMHly-v1BdxmwzVdCylNhgMMS0Jfcgl8T5J2KJqDcJVT9piumGwGYnoZo1zjW-v9uAjHQKQU8BN5Git8ZL4YAsfMVLY-EPLmOhF5bcVO4TTcQGPN56B",
"dq": "HiiSl-G3rB0QE_v8g8Ruw_JCHrWrwGI8zzEWd0cApgv-3fDzzieZRKAtKNArpMW09DPDsAHrU5nx669KxqtJ3_EzIGhU3ttCMsYLRp3Af18VcADe1zEypwlNxf3dvCQtaGIjRgg13KSOr2aPa7FHOyt2MhfMjMBPn3gA3BQkdfsN0z8pCtBIABGf4ojAMBkxLOQcurH5_3uixGxzZcTrTd3mdPmbORZ-YYQ3JgCl0ZCL6kzLHaiyWKvDq66QOtK3",
"qi": "ySqD9cUxbq3wkCsPQId_YfQLIqb5RK_JJIMjtBOdTdo4aT5tmodYCSmjBmhrYXjDWtyJdelvPfdSfgncHJhf8VgkZ8TPvUeaQwsQFBwB5llwpdb72eEEJrmG1SVwNMoFCLXdNT3ACad16cUDMnWmklH0X07OzdxGOBnGhgLZUs4RbPjLH7OpYTyQqVy2L8vofqJR42cfePZw8WQM4k0PPbhralhybExIkSCmaQyYbACZ5k0OVQErEqnj4elglA0h"
}''',
public_key_jwk='''{
"kty": "RSA",
"key_ops": ["verify"],
"n": "vrznqUy8Xamph6s0Z02fFMIyjwLAMio35i9DXYjXP1ZQwSZ3SsIh3m2ablMnlu8PVlnYUzoj8rXyAWND0FSfWoQQxv1rq15pllKueddLoJsv321N_NRB8beGsLrsndw8QO0q3RWqV9O3kqhlTMjgj6bquX42wLaXrPLJyfbT3zObBsToG4UxpOyly84aklJXU5wIs0cbmjbfd8Xld38BG8Oh7Ozy5b93vPpJW6rudZRxU6QYC0r9bFFLIHJWrR4bzQMLGoJ63xjPOCl4WNpOYc9B7PNgnWTLXlFd51Hw9CaT2MRWsKNCSU77f6nZkfjWa1IsQdF0I48m46qgq7bEOOl9DbThbCnpblWrctdyg6du-OvCyVmkAo1KGtANl0027pgqUI_9HBMi33y3UPQm1ALHXIyIDBZtExH3lD6MMK3XGJfUxZuIOBndK-PXm5Fed52bgLOcf-24X6aHFn-8oyDVIj9OHkKWjy7jtKdmqZc4pBdVuCaMCYzj8iERWA3H",
"e": "AQAB"
}''',
)
class TestOAuth2Key(UffdTestCase):
def setUp(self):
super().setUp()
db.session.add(OAuth2Key(**TEST_JWK))
db.session.add(OAuth2Key(
id='1e9gdk7',
created=datetime.datetime(2014, 11, 8, 0, 0, 0),
active=True,
algorithm='RS256',
private_key_jwk='invalid',
public_key_jwk='''{
"kty":"RSA",
"n":"w7Zdfmece8iaB0kiTY8pCtiBtzbptJmP28nSWwtdjRu0f2GFpajvWE4VhfJAjEsOcwYzay7XGN0b-X84BfC8hmCTOj2b2eHT7NsZegFPKRUQzJ9wW8ipn_aDJWMGDuB1XyqT1E7DYqjUCEOD1b4FLpy_xPn6oV_TYOfQ9fZdbE5HGxJUzekuGcOKqOQ8M7wfYHhHHLxGpQVgL0apWuP2gDDOdTtpuld4D2LK1MZK99s9gaSjRHE8JDb1Z4IGhEcEyzkxswVdPndUWzfvWBBWXWxtSUvQGBRkuy1BHOa4sP6FKjWEeeF7gm7UMs2Nm2QUgNZw6xvEDGaLk4KASdIxRQ",
"e":"AQAB"
}'''
))
db.session.commit()
self.key = OAuth2Key.query.get('HvOn74G7njK1GoFNe8Dta087casdWMsm06pNhOXRgJU')
self.key_oidc_spec = OAuth2Key.query.get('1e9gdk7')
def test_private_key(self):
self.key.private_key
def test_public_key(self):
self.key.private_key
def test_public_key_jwks_dict(self):
self.assertEqual(self.key.public_key_jwks_dict, {
"kid": "HvOn74G7njK1GoFNe8Dta087casdWMsm06pNhOXRgJU",
"kty": "RSA",
"alg": "RS256",
"use": "sig",
"n": "vrznqUy8Xamph6s0Z02fFMIyjwLAMio35i9DXYjXP1ZQwSZ3SsIh3m2ablMnlu8PVlnYUzoj8rXyAWND0FSfWoQQxv1rq15pllKueddLoJsv321N_NRB8beGsLrsndw8QO0q3RWqV9O3kqhlTMjgj6bquX42wLaXrPLJyfbT3zObBsToG4UxpOyly84aklJXU5wIs0cbmjbfd8Xld38BG8Oh7Ozy5b93vPpJW6rudZRxU6QYC0r9bFFLIHJWrR4bzQMLGoJ63xjPOCl4WNpOYc9B7PNgnWTLXlFd51Hw9CaT2MRWsKNCSU77f6nZkfjWa1IsQdF0I48m46qgq7bEOOl9DbThbCnpblWrctdyg6du-OvCyVmkAo1KGtANl0027pgqUI_9HBMi33y3UPQm1ALHXIyIDBZtExH3lD6MMK3XGJfUxZuIOBndK-PXm5Fed52bgLOcf-24X6aHFn-8oyDVIj9OHkKWjy7jtKdmqZc4pBdVuCaMCYzj8iERWA3H",
"e": "AQAB"
})
def test_encode_jwt(self):
jwtdata = self.key.encode_jwt({'aud': 'test', 'foo': 'bar'})
self.assertIsInstance(jwtdata, str) # Regression check for #165
self.assertEqual(
jwt.get_unverified_header(jwtdata),
# typ is optional, x5u/x5c/jku/jwk are discoraged by OIDC Core 1.0 spec section 2
{'kid': self.key.id, 'alg': self.key.algorithm, 'typ': 'JWT'}
)
self.assertEqual(
OAuth2Key.decode_jwt(jwtdata, audience='test'),
{'aud': 'test', 'foo': 'bar'}
)
self.key.active = False
with self.assertRaises(jwt.exceptions.InvalidKeyError):
self.key.encode_jwt({'aud': 'test', 'foo': 'bar'})
def test_oidc_hash(self):
# Example from OIDC Core 1.0 spec A.3
self.assertEqual(
self.key.oidc_hash(b'jHkWEdUXMU1BwAsC4vtUsZwnNvTIxEl0z9K3vx5KF0Y'),
'77QmUPtjPfzWtF2AnpK9RQ'
)
# Example from OIDC Core 1.0 spec A.4
self.assertEqual(
self.key.oidc_hash(b'Qcb0Orv1zh30vL1MPRsbm-diHiMwcLyZvn1arpZv-Jxf_11jnpEX3Tgfvk'),
'LDktKdoQak3Pk0cnXxCltA'
)
# Example from OIDC Core 1.0 spec A.6
self.assertEqual(
self.key.oidc_hash(b'jHkWEdUXMU1BwAsC4vtUsZwnNvTIxEl0z9K3vx5KF0Y'),
'77QmUPtjPfzWtF2AnpK9RQ'
)
self.assertEqual(
self.key.oidc_hash(b'Qcb0Orv1zh30vL1MPRsbm-diHiMwcLyZvn1arpZv-Jxf_11jnpEX3Tgfvk'),
'LDktKdoQak3Pk0cnXxCltA'
)
def test_decode_jwt(self):
# Example from OIDC Core 1.0 spec A.2
jwt_data = (
'eyJraWQiOiIxZTlnZGs3IiwiYWxnIjoiUlMyNTYifQ.ewogImlz'
'cyI6ICJodHRwOi8vc2VydmVyLmV4YW1wbGUuY29tIiwKICJzdWIiOiAiMjQ4'
'Mjg5NzYxMDAxIiwKICJhdWQiOiAiczZCaGRSa3F0MyIsCiAibm9uY2UiOiAi'
'bi0wUzZfV3pBMk1qIiwKICJleHAiOiAxMzExMjgxOTcwLAogImlhdCI6IDEz'
'MTEyODA5NzAsCiAibmFtZSI6ICJKYW5lIERvZSIsCiAiZ2l2ZW5fbmFtZSI6'
'ICJKYW5lIiwKICJmYW1pbHlfbmFtZSI6ICJEb2UiLAogImdlbmRlciI6ICJm'
'ZW1hbGUiLAogImJpcnRoZGF0ZSI6ICIwMDAwLTEwLTMxIiwKICJlbWFpbCI6'
'ICJqYW5lZG9lQGV4YW1wbGUuY29tIiwKICJwaWN0dXJlIjogImh0dHA6Ly9l'
'eGFtcGxlLmNvbS9qYW5lZG9lL21lLmpwZyIKfQ.rHQjEmBqn9Jre0OLykYNn'
'spA10Qql2rvx4FsD00jwlB0Sym4NzpgvPKsDjn_wMkHxcp6CilPcoKrWHcip'
'R2iAjzLvDNAReF97zoJqq880ZD1bwY82JDauCXELVR9O6_B0w3K-E7yM2mac'
'AAgNCUwtik6SjoSUZRcf-O5lygIyLENx882p6MtmwaL1hd6qn5RZOQ0TLrOY'
'u0532g9Exxcm-ChymrB4xLykpDj3lUivJt63eEGGN6DH5K6o33TcxkIjNrCD'
'4XB1CKKumZvCedgHHF3IAK4dVEDSUoGlH9z4pP_eWYNXvqQOjGs-rDaQzUHl'
'6cQQWNiDpWOl_lxXjQEvQ'
)
self.assertEqual(
OAuth2Key.decode_jwt(jwt_data, options={'verify_exp': False, 'verify_aud': False}),
{
"iss": "http://server.example.com",
"sub": "248289761001",
"aud": "s6BhdRkqt3",
"nonce": "n-0S6_WzA2Mj",
"exp": 1311281970,
"iat": 1311280970,
"name": "Jane Doe",
"given_name": "Jane",
"family_name": "Doe",
"gender": "female",
"birthdate": "0000-10-31",
"email": "janedoe@example.com",
"picture": "http://example.com/janedoe/me.jpg"
}
)
with self.assertRaises(jwt.exceptions.InvalidKeyError):
# {"alg":"RS256"} -> no key id
OAuth2Key.decode_jwt('eyJhbGciOiJSUzI1NiJ9.' + jwt_data.split('.', 1)[-1])
with self.assertRaises(jwt.exceptions.InvalidKeyError):
# {"kid":"XXXXX","alg":"RS256"} -> unknown key id
OAuth2Key.decode_jwt('eyJraWQiOiJYWFhYWCIsImFsZyI6IlJTMjU2In0.' + jwt_data.split('.', 1)[-1])
OAuth2Key.query.get('1e9gdk7').active = False
with self.assertRaises(jwt.exceptions.InvalidKeyError):
# not active
OAuth2Key.decode_jwt(jwt_data)
def test_generate_rsa_key(self):
key = OAuth2Key.generate_rsa_key()
self.assertEqual(key.algorithm, 'RS256')
import unittest
from uffd.database import db
from uffd.models import User, Role, RoleGroup, TOTPMethod
from uffd.models.role import flatten_recursive
from tests.utils import UffdTestCase
class TestPrimitives(unittest.TestCase):
def test_flatten_recursive(self):
class Node:
def __init__(self, *neighbors):
self.neighbors = set(neighbors or set())
cycle = Node()
cycle.neighbors.add(cycle)
common = Node(cycle)
intermediate1 = Node(common)
intermediate2 = Node(common, intermediate1)
stub = Node()
backref = Node()
start1 = Node(intermediate1, intermediate2, stub, backref)
backref.neighbors.add(start1)
start2 = Node()
self.assertSetEqual(flatten_recursive({start1, start2}, 'neighbors'),
{start1, start2, backref, stub, intermediate1, intermediate2, common, cycle})
self.assertSetEqual(flatten_recursive(set(), 'neighbors'), set())
class TestUserRoleAttributes(UffdTestCase):
def test_roles_effective(self):
db.session.add(User(loginname='service', is_service_user=True, primary_email_address='service@example.com', displayname='Service'))
db.session.commit()
user = self.get_user()
service_user = User.query.filter_by(loginname='service').one_or_none()
included_by_default_role = Role(name='included_by_default')
default_role = Role(name='default', is_default=True, included_roles=[included_by_default_role])
included_role = Role(name='included')
cycle_role = Role(name='cycle')
direct_role1 = Role(name='role1', members=[user, service_user], included_roles=[included_role, cycle_role])
direct_role2 = Role(name='role2', members=[user, service_user], included_roles=[included_role])
cycle_role.included_roles.append(direct_role1)
db.session.add_all([included_by_default_role, default_role, included_role, cycle_role, direct_role1, direct_role2])
self.assertSetEqual(user.roles_effective, {direct_role1, direct_role2, cycle_role, included_role, default_role, included_by_default_role})
self.assertSetEqual(service_user.roles_effective, {direct_role1, direct_role2, cycle_role, included_role})
def test_compute_groups(self):
user = self.get_user()
group1 = self.get_users_group()
group2 = self.get_access_group()
role1 = Role(name='role1', groups={group1: RoleGroup(group=group1)})
role2 = Role(name='role2', groups={group1: RoleGroup(group=group1), group2: RoleGroup(group=group2)})
db.session.add_all([role1, role2])
self.assertSetEqual(user.compute_groups(), set())
role1.members.append(user)
role2.members.append(user)
self.assertSetEqual(user.compute_groups(), {group1, group2})
role2.groups[group2].requires_mfa = True
self.assertSetEqual(user.compute_groups(), {group1})
db.session.add(TOTPMethod(user=user))
db.session.commit()
self.assertSetEqual(user.compute_groups(), {group1, group2})
def test_update_groups(self):
user = self.get_user()
group1 = self.get_users_group()
group2 = self.get_access_group()
role1 = Role(name='role1', members=[user], groups={group1: RoleGroup(group=group1)})
role2 = Role(name='role2', groups={group2: RoleGroup(group=group2)})
db.session.add_all([role1, role2])
user.groups = [group2]
groups_added, groups_removed = user.update_groups()
self.assertSetEqual(groups_added, {group1})
self.assertSetEqual(groups_removed, {group2})
self.assertSetEqual(set(user.groups), {group1})
groups_added, groups_removed = user.update_groups()
self.assertSetEqual(groups_added, set())
self.assertSetEqual(groups_removed, set())
self.assertSetEqual(set(user.groups), {group1})
class TestRoleModel(UffdTestCase):
def test_members_effective(self):
db.session.add(User(loginname='service', is_service_user=True, primary_email_address='service@example.com', displayname='Service'))
db.session.commit()
user1 = self.get_user()
user2 = self.get_admin()
service = User.query.filter_by(loginname='service').one_or_none()
included_by_default_role = Role(name='included_by_default')
default_role = Role(name='default', is_default=True, included_roles=[included_by_default_role])
included_role = Role(name='included')
direct_role = Role(name='direct', members=[user1, user2, service], included_roles=[included_role])
empty_role = Role(name='empty', included_roles=[included_role])
self.assertSetEqual(included_by_default_role.members_effective, {user1, user2})
self.assertSetEqual(default_role.members_effective, {user1, user2})
self.assertSetEqual(included_role.members_effective, {user1, user2, service})
self.assertSetEqual(direct_role.members_effective, {user1, user2, service})
self.assertSetEqual(empty_role.members_effective, set())
def test_included_roles_recursive(self):
baserole = Role(name='base')
role1 = Role(name='role1', included_roles=[baserole])
role2 = Role(name='role2', included_roles=[baserole])
role3 = Role(name='role3', included_roles=[role1, role2])
self.assertSetEqual(role1.included_roles_recursive, {baserole})
self.assertSetEqual(role2.included_roles_recursive, {baserole})
self.assertSetEqual(role3.included_roles_recursive, {baserole, role1, role2})
baserole.included_roles.append(role1)
self.assertSetEqual(role3.included_roles_recursive, {baserole, role1, role2})
def test_groups_effective(self):
group1 = self.get_users_group()
group2 = self.get_access_group()
baserole = Role(name='base', groups={group1: RoleGroup(group=group1)})
role1 = Role(name='role1', groups={group2: RoleGroup(group=group2)}, included_roles=[baserole])
self.assertSetEqual(baserole.groups_effective, {group1})
self.assertSetEqual(role1.groups_effective, {group1, group2})
def test_update_member_groups(self):
user1 = self.get_user()
user1.update_groups()
user2 = self.get_admin()
user2.update_groups()
group1 = self.get_users_group()
group2 = self.get_access_group()
group3 = self.get_admin_group()
baserole = Role(name='base', members=[user1], groups={group1: RoleGroup(group=group1)})
role1 = Role(name='role1', members=[user2], groups={group2: RoleGroup(group=group2)}, included_roles=[baserole])
db.session.add_all([baserole, role1])
baserole.update_member_groups()
role1.update_member_groups()
self.assertSetEqual(set(user1.groups), {group1})
self.assertSetEqual(set(user2.groups), {group1, group2})
baserole.groups[group3] = RoleGroup()
baserole.update_member_groups()
self.assertSetEqual(set(user1.groups), {group1, group3})
self.assertSetEqual(set(user2.groups), {group1, group2, group3})
import itertools
from uffd.remailer import remailer
from uffd.tasks import cleanup_task
from uffd.database import db
from uffd.models import Service, ServiceUser, User, UserEmail, RemailerMode
from tests.utils import UffdTestCase
class TestServiceUser(UffdTestCase):
def setUp(self):
super().setUp()
db.session.add_all([Service(name='service1', limit_access=False), Service(name='service2', remailer_mode=RemailerMode.ENABLED_V1, limit_access=False)])
db.session.commit()
def test_auto_create(self):
service_count = Service.query.count()
user_count = User.query.count()
self.assertEqual(ServiceUser.query.count(), service_count * user_count)
db.session.add(User(loginname='newuser1', displayname='New User', primary_email_address='new1@example.com'))
db.session.commit()
self.assertEqual(ServiceUser.query.count(), service_count * (user_count + 1))
db.session.add(Service(name='service3'))
db.session.commit()
self.assertEqual(ServiceUser.query.count(), (service_count + 1) * (user_count + 1))
db.session.add(User(loginname='newuser2', displayname='New User', primary_email_address='new2@example.com'))
db.session.add(User(loginname='newuser3', displayname='New User', primary_email_address='new3@example.com'))
db.session.add(Service(name='service4'))
db.session.add(Service(name='service5'))
db.session.commit()
self.assertEqual(ServiceUser.query.count(), (service_count + 3) * (user_count + 3))
def test_create_missing(self):
service_count = Service.query.count()
user_count = User.query.count()
self.assertEqual(ServiceUser.query.count(), service_count * user_count)
db.session.delete(ServiceUser.query.first())
db.session.commit()
self.assertEqual(ServiceUser.query.count(), service_count * user_count - 1)
cleanup_task.run()
db.session.commit()
self.assertEqual(ServiceUser.query.count(), service_count * user_count)
def test_effective_remailer_mode(self):
self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
user = self.get_user()
service = Service.query.filter_by(name='service1').first()
service.remailer_mode = RemailerMode.ENABLED_V2
service_user = ServiceUser.query.get((service.id, user.id))
self.assertEqual(service_user.effective_remailer_mode, RemailerMode.ENABLED_V2)
self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testadmin']
self.assertEqual(service_user.effective_remailer_mode, RemailerMode.DISABLED)
self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testuser']
self.assertEqual(service_user.effective_remailer_mode, RemailerMode.ENABLED_V2)
self.app.config['REMAILER_LIMIT_TO_USERS'] = None
service_user.remailer_overwrite_mode = RemailerMode.ENABLED_V1
service.remailer_mode = RemailerMode.DISABLED
self.assertEqual(service_user.effective_remailer_mode, RemailerMode.ENABLED_V1)
self.app.config['REMAILER_DOMAIN'] = ''
self.assertEqual(service_user.effective_remailer_mode, RemailerMode.DISABLED)
def test_service_email(self):
user = self.get_user()
service = Service.query.filter_by(name='service1').first()
service_user = ServiceUser.query.get((service.id, user.id))
self.assertEqual(service_user.service_email, None)
service_user.service_email = UserEmail(user=user, address='foo@bar', verified=True)
with self.assertRaises(Exception):
service_user.service_email = UserEmail(user=user, address='foo2@bar', verified=False)
with self.assertRaises(Exception):
service_user.service_email = UserEmail(user=self.get_admin(), address='foo3@bar', verified=True)
def test_real_email(self):
user = self.get_user()
service = Service.query.filter_by(name='service1').first()
service_user = ServiceUser.query.get((service.id, user.id))
self.assertEqual(service_user.real_email, user.primary_email.address)
service_user.service_email = UserEmail(user=user, address='foo@bar', verified=True)
self.assertEqual(service_user.real_email, user.primary_email.address)
service.enable_email_preferences = True
self.assertEqual(service_user.real_email, service_user.service_email.address)
service.limit_access = True
self.assertEqual(service_user.real_email, user.primary_email.address)
service.access_group = self.get_admin_group()
self.assertEqual(service_user.real_email, user.primary_email.address)
service.access_group = self.get_users_group()
self.assertEqual(service_user.real_email, service_user.service_email.address)
def test_get_by_remailer_email(self):
user = self.get_user()
service = Service.query.filter_by(name='service1').first()
service_user = ServiceUser.query.get((service.id, user.id))
self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
remailer_email = remailer.build_v1_address(service.id, user.id)
# 1. remailer not setup
self.app.config['REMAILER_DOMAIN'] = ''
self.assertIsNone(ServiceUser.get_by_remailer_email(user.primary_email.address))
self.assertIsNone(ServiceUser.get_by_remailer_email(remailer_email))
self.assertIsNone(ServiceUser.get_by_remailer_email('invalid'))
# 2. remailer setup
self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
self.assertIsNone(ServiceUser.get_by_remailer_email(user.primary_email.address))
self.assertEqual(ServiceUser.get_by_remailer_email(remailer_email), service_user)
self.assertIsNone(ServiceUser.get_by_remailer_email('invalid'))
def test_email(self):
user = self.get_user()
service = Service.query.filter_by(name='service1').first()
service_user = ServiceUser.query.get((service.id, user.id))
self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
remailer_email = remailer.build_v1_address(service.id, user.id)
# 1. remailer not setup
self.app.config['REMAILER_DOMAIN'] = ''
self.assertEqual(service_user.email, user.primary_email.address)
# 2. remailer setup + remailer disabled
self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
self.assertEqual(service_user.email, user.primary_email.address)
# 3. remailer setup + remailer enabled + REMAILER_LIMIT_TO_USERS unset
service.remailer_mode = RemailerMode.ENABLED_V1
db.session.commit()
self.assertEqual(service_user.email, remailer_email)
# 4. remailer setup + remailer enabled + REMAILER_LIMIT_TO_USERS does not include user
self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testadmin']
self.assertEqual(service_user.email, user.primary_email.address)
# 5. remailer setup + remailer enabled + REMAILER_LIMIT_TO_USERS includes user
self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testuser']
self.assertEqual(service_user.email, remailer_email)
# 6. remailer setup + remailer disabled + user overwrite
self.app.config['REMAILER_LIMIT_TO_USERS'] = None
service.remailer_mode = RemailerMode.DISABLED
service_user.remailer_overwrite_mode = RemailerMode.ENABLED_V1
self.assertEqual(service_user.email, remailer_email)
# 7. remailer setup + remailer enabled + user overwrite
self.app.config['REMAILER_LIMIT_TO_USERS'] = None
service.remailer_mode = RemailerMode.ENABLED_V1
service_user.remailer_overwrite_mode = RemailerMode.DISABLED
self.assertEqual(service_user.email, user.primary_email.address)
def test_filter_query_by_email(self):
service = Service.query.filter_by(name='service1').first()
user = self.get_user()
self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
remailer_email_v1 = remailer.build_v1_address(service.id, user.id)
remailer_email_v2 = remailer.build_v2_address(service.id, user.id)
email1 = user.primary_email
email2 = UserEmail(user=user, address='test2@example.com', verified=True)
db.session.add(email2)
service_user = ServiceUser.query.get((service.id, user.id))
all_service_users = ServiceUser.query.all()
cases = itertools.product(
# Input values
[
'test@example.com',
'test2@example.com',
'other@example.com',
remailer_email_v1,
remailer_email_v2,
],
# REMAILER_DOMAIN config
[None, 'remailer.example.com'],
# REMAILER_LIMIT config
[None, ['testuser', 'otheruser'], ['testadmin', 'otheruser']],
# service.remailer_mode
[RemailerMode.DISABLED, RemailerMode.ENABLED_V1, RemailerMode.ENABLED_V2],
# service.enable_email_preferences
[True, False],
# service.limit_access, service.access_group
[(False, None), (True, None), (True, self.get_admin_group()), (True, self.get_users_group())],
# service_user.service_email
[None, email1, email2],
# service_user.remailer_overwrite_mode
[None, RemailerMode.DISABLED, RemailerMode.ENABLED_V1, RemailerMode.ENABLED_V2],
)
for options in cases:
value = options[0]
self.app.config['REMAILER_DOMAIN'] = options[1]
self.app.config['REMAILER_LIMIT_TO_USERS'] = options[2]
service.remailer_mode = options[3]
service.enable_email_preferences = options[4]
service.limit_access, service.access_group = options[5]
service_user.service_email = options[6]
service_user.remailer_overwrite_mode = options[7]
a = {result for result in all_service_users if result.email == value}
b = set(ServiceUser.filter_query_by_email(ServiceUser.query, value).all())
if a != b:
self.fail(f'{a} != {b} with ' + repr(options))
import unittest
import datetime
from uffd.database import db
from uffd.models.session import Session, USER_AGENT_PARSER_SUPPORTED
from tests.utils import UffdTestCase
class TestSession(UffdTestCase):
def test_expire(self):
self.app.config['SESSION_LIFETIME_SECONDS'] = 100
self.app.config['PERMANENT_SESSION_LIFETIME'] = 10
user = self.get_user()
def make_session(created_age, last_used_age):
return Session(
user=user,
created=datetime.datetime.utcnow() - datetime.timedelta(seconds=created_age),
last_used=datetime.datetime.utcnow() - datetime.timedelta(seconds=last_used_age),
)
session1 = Session(user=user)
self.assertFalse(session1.expired)
session2 = make_session(0, 0)
self.assertFalse(session2.expired)
session3 = make_session(50, 5)
self.assertFalse(session3.expired)
session4 = make_session(50, 15)
self.assertTrue(session4.expired)
session5 = make_session(105, 5)
self.assertTrue(session5.expired)
session6 = make_session(105, 15)
self.assertTrue(session6.expired)
db.session.add_all([session1, session2, session3, session4, session5, session6])
db.session.commit()
self.assertEqual(set(Session.query.filter_by(expired=False).all()), {session1, session2, session3})
self.assertEqual(set(Session.query.filter_by(expired=True).all()), {session4, session5, session6})
def test_useragent_ua_parser(self):
if not USER_AGENT_PARSER_SUPPORTED:
self.skipTest('ua_parser not available')
session = Session(user_agent='Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0')
self.assertEqual(session.user_agent_browser, 'Firefox')
self.assertEqual(session.user_agent_platform, 'Windows')
def test_useragent_no_ua_parser(self):
session = Session(user_agent='Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0')
session.DISABLE_USER_AGENT_PARSER = True
self.assertEqual(session.user_agent_browser, 'Firefox')
self.assertEqual(session.user_agent_platform, 'Windows')