Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Show changes
Showing
with 722 additions and 974 deletions
{
"entries": [
{
"dn": "uid=testuser,ou=users,dc=example,dc=com",
"raw": {
"cn": [
"Test User"
],
"createTimestamp": [
"20200101000000Z"
],
"creatorsName": [
"cn=admin,dc=example,dc=com"
],
"displayName": [
"Test User"
],
"entryDN": [
"uid=testuser,ou=users,dc=example,dc=com"
],
"entryUUID": [
"75e62c6a-03c2-11eb-adc1-0242ac120002"
],
"gidNumber": [
"20001"
],
"givenName": [
"Test User"
],
"hasSubordinates": [
"FALSE"
],
"homeDirectory": [
"/home/testuser"
],
"mail": [
"testuser@example.com"
],
"memberOf": [
"cn=uffd_access,ou=groups,dc=example,dc=com",
"cn=users,ou=groups,dc=example,dc=com"
],
"modifiersName": [
"cn=admin,dc=example,dc=com"
],
"modifyTimestamp": [
"20200101000000Z"
],
"objectClass": [
"top",
"inetOrgPerson",
"organizationalPerson",
"person",
"posixAccount"
],
"sn": [
" "
],
"structuralObjectClass": [
"inetOrgPerson"
],
"subschemaSubentry": [
"cn=Subschema"
],
"uid": [
"testuser"
],
"uidNumber": [
"10000"
],
"userPassword": [
"userpassword"
]
}
},
{
"dn": "uid=testadmin,ou=users,dc=example,dc=com",
"raw": {
"cn": [
"Test Admin"
],
"createTimestamp": [
"20200101000000Z"
],
"creatorsName": [
"cn=admin,dc=example,dc=com"
],
"displayName": [
"Test Admin"
],
"entryDN": [
"uid=testadmin,ou=users,dc=example,dc=com"
],
"entryUUID": [
"678c8470-03c2-11eb-adc1-0242ac120002"
],
"gidNumber": [
"20001"
],
"givenName": [
"Test Admin"
],
"hasSubordinates": [
"FALSE"
],
"homeDirectory": [
"/home/testadmin"
],
"mail": [
"testadmin@example.com"
],
"memberOf": [
"cn=users,ou=groups,dc=example,dc=com",
"cn=uffd_access,ou=groups,dc=example,dc=com",
"cn=uffd_admin,ou=groups,dc=example,dc=com"
],
"modifiersName": [
"cn=admin,dc=example,dc=com"
],
"modifyTimestamp": [
"20200101000000Z"
],
"objectClass": [
"top",
"inetOrgPerson",
"organizationalPerson",
"person",
"posixAccount"
],
"sn": [
" "
],
"structuralObjectClass": [
"inetOrgPerson"
],
"subschemaSubentry": [
"cn=Subschema"
],
"uid": [
"testadmin"
],
"uidNumber": [
"10001"
],
"userPassword": [
"adminpassword"
]
}
},
{
"dn": "cn=users,ou=groups,dc=example,dc=com",
"raw": {
"cn": [
"users"
],
"createTimestamp": [
"20200101000000Z"
],
"creatorsName": [
"cn=admin,dc=example,dc=com"
],
"description": [
"Base group for all users"
],
"entryDN": [
"cn=users,ou=groups,dc=example,dc=com"
],
"entryUUID": [
"1aec0e8c-03c3-11eb-adc1-0242ac120002"
],
"gidNumber": [
"20001"
],
"hasSubordinates": [
"FALSE"
],
"modifiersName": [
"cn=admin,dc=example,dc=com"
],
"modifyTimestamp": [
"20200101000000Z"
],
"objectClass": [
"posixGroup",
"groupOfUniqueNames",
"top"
],
"structuralObjectClass": [
"groupOfUniqueNames"
],
"subschemaSubentry": [
"cn=Subschema"
],
"uniqueMember": [
"uid=testuser,ou=users,dc=example,dc=com",
"uid=testadmin,ou=users,dc=example,dc=com"
]
}
},
{
"dn": "cn=uffd_access,ou=groups,dc=example,dc=com",
"raw": {
"cn": [
"uffd_access"
],
"createTimestamp": [
"20200101000000Z"
],
"creatorsName": [
"cn=admin,dc=example,dc=com"
],
"description": [
"User access to uffd selfservice"
],
"entryDN": [
"cn=uffd_access,ou=groups,dc=example,dc=com"
],
"entryUUID": [
"4fc8dd60-03c3-11eb-adc1-0242ac120002"
],
"gidNumber": [
"20002"
],
"hasSubordinates": [
"FALSE"
],
"modifiersName": [
"cn=admin,dc=example,dc=com"
],
"modifyTimestamp": [
"20200101000000Z"
],
"objectClass": [
"posixGroup",
"groupOfUniqueNames",
"top"
],
"structuralObjectClass": [
"groupOfUniqueNames"
],
"subschemaSubentry": [
"cn=Subschema"
],
"uniqueMember": [
"cn=dummy,ou=system,dc=example,dc=com",
"uid=testuser,ou=users,dc=example,dc=com",
"uid=testadmin,ou=users,dc=example,dc=com"
]
}
},
{
"dn": "cn=uffd_admin,ou=groups,dc=example,dc=com",
"raw": {
"cn": [
"uffd_admin"
],
"createTimestamp": [
"20200101000000Z"
],
"creatorsName": [
"cn=admin,dc=example,dc=com"
],
"description": [
"Admin access to uffd selfservice"
],
"entryDN": [
"cn=uffd_admin,ou=groups,dc=example,dc=com"
],
"entryUUID": [
"b5d869d6-03c3-11eb-adc1-0242ac120002"
],
"gidNumber": [
"20003"
],
"hasSubordinates": [
"FALSE"
],
"modifiersName": [
"cn=admin,dc=example,dc=com"
],
"modifyTimestamp": [
"20200101000000Z"
],
"objectClass": [
"posixGroup",
"groupOfUniqueNames",
"top"
],
"structuralObjectClass": [
"groupOfUniqueNames"
],
"subschemaSubentry": [
"cn=Subschema"
],
"uniqueMember": [
"uid=testadmin,ou=users,dc=example,dc=com"
]
}
},
{
"dn": "uid=test,ou=postfix,dc=example,dc=com",
"raw": {
"createTimestamp": [
"20200101000000Z"
],
"creatorsName": [
"cn=admin,dc=example,dc=com"
],
"entryDN": [
"uid=test,ou=postfix,dc=example,dc=com"
],
"entryUUID": [
"926e5273-a545-4dfe-8f20-d1eeaf41d796"
],
"hasSubordinates": [
"FALSE"
],
"mailacceptinggeneralid": [
"test1@example.com",
"test2@example.com"
],
"maildrop": [
"testuser@mail.example.com"
],
"modifiersName": [
"cn=admin,dc=example,dc=com"
],
"modifyTimestamp": [
"20200101000000Z"
],
"objectClass": [
"top",
"postfixVirtual"
],
"structuralObjectClass": [
"postfixVirtual"
],
"subschemaSubentry": [
"cn=Subschema"
],
"uid": [
"test"
]
}
}
]
}
{
"raw": {
"altServer": [],
"configContext": [
"cn=config"
],
"entryDN": [
""
],
"namingContexts": [
"dc=example,dc=com"
],
"objectClass": [
"top",
"OpenLDAProotDSE"
],
"structuralObjectClass": [
"OpenLDAProotDSE"
],
"subschemaSubentry": [
"cn=Subschema"
],
"supportedCapabilities": [],
"supportedControl": [
"2.16.840.1.113730.3.4.18",
"2.16.840.1.113730.3.4.2",
"1.3.6.1.4.1.4203.1.10.1",
"1.3.6.1.1.22",
"1.2.840.113556.1.4.319",
"1.2.826.0.1.3344810.2.3",
"1.3.6.1.1.13.2",
"1.3.6.1.1.13.1",
"1.3.6.1.1.12"
],
"supportedExtension": [
"1.3.6.1.4.1.1466.20037",
"1.3.6.1.4.1.4203.1.11.1",
"1.3.6.1.4.1.4203.1.11.3",
"1.3.6.1.1.8"
],
"supportedFeatures": [
"1.3.6.1.1.14",
"1.3.6.1.4.1.4203.1.5.1",
"1.3.6.1.4.1.4203.1.5.2",
"1.3.6.1.4.1.4203.1.5.3",
"1.3.6.1.4.1.4203.1.5.4",
"1.3.6.1.4.1.4203.1.5.5"
],
"supportedLDAPVersion": [
"3"
],
"supportedSASLMechanisms": [
"DIGEST-MD5",
"CRAM-MD5",
"NTLM"
],
"vendorName": [],
"vendorVersion": []
},
"type": "DsaInfo"
}
This diff is collapsed.
location / {
uwsgi_pass unix:///run/uwsgi/app/uffd/socket;
include uwsgi_params;
}
location /static {
alias /usr/share/uffd/uffd/static;
}
#!/usr/bin/python3
from werkzeug.contrib.profiler import ProfilerMiddleware
from uffd import create_app
app = create_app()
app.wsgi_app = ProfilerMiddleware(app.wsgi_app, restrictions=[30])
app.run(debug=True)
[pytest]
filterwarnings =
# DeprecationWarning from dependencies that we use
ignore:`formatargspec` is deprecated since Python 3.5. Use `signature` and the `Signature` object directly:DeprecationWarning
ignore:Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3, and in 3.10 it will stop working:DeprecationWarning
# Versions from Debian Buster
ldap3==2.4.1
flask==1.0.2
Flask-SQLAlchemy==2.1
qrcode==6.1
fido2==0.5.0
Flask-OAuthlib==0.9.5
.
# Testing
pytest==3.10.1
atomicwrites==1.1.5
attrs==18.2.0
more-itertools==4.2.0
pluggy==0.8.0
py==1.7.0
pyOpenSSL==19.0.0
#!/usr/bin/env python3
from werkzeug.serving import make_ssl_devcert
from uffd import *
if __name__ == '__main__':
app = create_app()
init_db(app)
print(app.url_map)
if not os.path.exists('devcert.crt') or not os.path.exists('devcert.key'):
make_ssl_devcert('devcert')
# WebAuthn requires https and a hostname (not just an IP address). If you
# don't want to test U2F/FIDO2 device registration/authorization, you can
# safely remove `host` and `ssl_context`.
app.run(threaded=True, debug=True, host='localhost', ssl_context=('devcert.crt', 'devcert.key'))
#!/usr/bin/env python3
import unittest
import os
from src import server
def setUp():
server.app.testing = True
def tearDown():
os.unlink(server.app.config['SQLITE_DB'])
if __name__ == '__main__':
setUp()
try:
suite = unittest.defaultTestLoader.discover('./tests/', pattern="*")
unittest.TextTestRunner(verbosity=2, failfast=True).run(suite)
finally:
tearDown()
from setuptools import setup, find_packages
import os
with open('README.md', 'r', encoding='utf-8') as f:
long_description = f.read()
long_description = '**DO NOT INSTALL FROM PIP FOR PRODUCTION DEPLOYMENTS**, see [Deployment](#Deployment) for more information.\n\n\n\n' + long_description
setup(
name='uffd',
version=os.environ.get('PACKAGE_VERSION', 'local'),
description='Web-based user management and single sign-on software',
long_description=long_description,
long_description_content_type='text/markdown',
url='https://git.cccv.de/uffd/uffd',
classifiers=[
'Programming Language :: Python :: 3',
'Development Status :: 5 - Production/Stable',
'License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)',
'Operating System :: OS Independent',
'Topic :: System :: Systems Administration :: Authentication/Directory :: LDAP',
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
'Environment :: Web Environment',
'Framework :: Flask',
],
author='CCCV',
author_email='it@cccv.de',
license='AGPL3',
packages=['uffd'],
include_package_data=True,
zip_safe=False,
python_requires='>=3.7',
install_requires=[
# Versions Debian Buster packages are based on.
# DO NOT USE FOR PRODUCTION, those in the setup.py are not updated regularly
'flask==1.0.2',
'Flask-SQLAlchemy==2.1',
'qrcode==6.1',
'fido2==0.5.0',
'cryptography==2.6.1',
'pyjwt==1.7.0',
'Flask-Migrate==2.1.1',
'Flask-Babel==0.11.2',
'alembic==1.0.0',
'argon2-cffi==18.3.0',
'itsdangerous==0.24',
'prometheus-client==0.9',
'ua-parser==0.8.0',
# The main dependencies on their own lead to version collisions and pip is
# not very good at resolving them, so we pin the versions from Debian Buster
# for all dependencies.
'certifi==2018.8.24',
#cffi==1.12.2'
'cffi # v1.12.2 no longer works with python3.9. Newer versions seem to work fine.',
'chardet==3.0.4',
'click==7.0',
'idna==2.6',
'Jinja2==2.10',
'MarkupSafe==1.1.0',
'pyasn1==0.4.2',
'pycparser==2.19',
'requests==2.21.0',
'requests-oauthlib==1.0.0',
'six==1.12.0',
'SQLAlchemy==1.2.18',
'urllib3==1.24.1',
'Werkzeug==0.14.1',
'python-dateutil==2.7.3',
#editor==1.0.3
'Mako==1.0.7',
],
)
File moved
from uffd.database import db
from uffd.models import User, Role, RoleGroup
from tests.utils import UffdTestCase
class TestRoleCLI(UffdTestCase):
def setUp(self):
super().setUp()
role = Role(name='admin')
db.session.add(role)
role.groups[self.get_admin_group()] = RoleGroup(group=self.get_admin_group())
role.members.append(self.get_admin())
role = Role(name='base', is_default=True)
db.session.add(role)
role.groups[self.get_access_group()] = RoleGroup(group=self.get_access_group())
db.session.add(Role(name='test'))
for user in User.query:
user.update_groups()
db.session.commit()
self.client.__exit__(None, None, None)
def test_list(self):
result = self.app.test_cli_runner().invoke(args=['role', 'list'])
self.assertEqual(result.exit_code, 0)
def test_show(self):
result = self.app.test_cli_runner().invoke(args=['role', 'show', 'admin'])
self.assertEqual(result.exit_code, 0)
result = self.app.test_cli_runner().invoke(args=['role', 'show', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
def test_create(self):
result = self.app.test_cli_runner().invoke(args=['role', 'create', 'test']) # conflicting name
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--moderator-group', 'doesnotexist']) # invalid mod group
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--add-group', 'doesnotexist']) # invalid group
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--add-role', 'doesnotexist']) # invalid role
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newrole', '--description', 'Role description',
'--moderator-group', 'uffd_admin', '--add-group', 'users',
'--add-role', 'admin'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='newrole').one()
self.assertIsNotNone(role)
self.assertEqual(role.description, 'Role description')
self.assertEqual(role.moderator_group, self.get_admin_group())
self.assertEqual(list(role.groups), [self.get_users_group()])
self.assertEqual(role.included_roles, Role.query.filter_by(name='admin').all())
with self.app.test_request_context():
for user in User.query:
self.assertNotIn(self.get_users_group(), user.groups)
result = self.app.test_cli_runner().invoke(args=['role', 'create', 'newbase', '--add-group', 'users', '--default'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
for user in User.query:
self.assertIn(self.get_users_group(), user.groups)
def test_update(self):
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'doesnotexist', '--description', 'New description'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--add-group', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--remove-group', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--add-role', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--remove-role', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--moderator-group', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--description', 'New description',
'--moderator-group', 'uffd_admin', '--add-group', 'users',
'--remove-group', 'uffd_access', '--add-role', 'admin'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='base').first()
self.assertIsNotNone(role)
self.assertEqual(role.description, 'New description')
self.assertEqual(role.moderator_group, self.get_admin_group())
self.assertEqual(list(role.groups), [self.get_users_group()])
self.assertEqual(role.included_roles, Role.query.filter_by(name='admin').all())
self.assertEqual(set(self.get_user().groups), {self.get_users_group(), self.get_admin_group()})
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--no-moderator-group', '--clear-groups',
'--add-group', 'uffd_access', '--remove-role', 'admin',
'--add-role', 'test'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='base').first()
self.assertIsNone(role.moderator_group)
self.assertEqual(list(role.groups), [self.get_access_group()])
self.assertEqual(role.included_roles, Role.query.filter_by(name='test').all())
self.assertEqual(set(self.get_user().groups), {self.get_access_group()})
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--clear-roles'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='base').first()
self.assertEqual(role.included_roles, [])
self.assertEqual(role.is_default, True)
self.assertEqual(set(self.get_user().groups), {self.get_access_group()})
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--no-default'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='base').first()
self.assertEqual(role.is_default, False)
self.assertEqual(set(self.get_user().groups), set())
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'base', '--default'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='base').first()
self.assertEqual(role.is_default, True)
self.assertEqual(set(self.get_user().groups), {self.get_access_group()})
# Regression test for https://git.cccv.de/uffd/uffd/-/issues/156
def test_update_without_description(self):
with self.app.test_request_context():
role = Role.query.filter_by(name='test').first()
role.description = 'Test description'
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['role', 'update', 'test', '--clear-groups'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
role = Role.query.filter_by(name='test').first()
self.assertEqual(role.description, 'Test description')
def test_delete(self):
with self.app.test_request_context():
self.assertIsNotNone(Role.query.filter_by(name='test').first())
result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'test'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertIsNone(Role.query.filter_by(name='test').first())
result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
with self.app.test_request_context():
self.assertIn(self.get_admin_group(), self.get_admin().groups)
result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'admin'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertNotIn(self.get_admin_group(), self.get_admin().groups)
with self.app.test_request_context():
self.assertIn(self.get_access_group(), self.get_user().groups)
result = self.app.test_cli_runner().invoke(args=['role', 'delete', 'base'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertNotIn(self.get_access_group(), self.get_user().groups)
from uffd.database import db
from uffd.models import User, UserEmail, FeatureFlag
from tests.utils import UffdTestCase
class TestUniqueEmailAddressCommands(UffdTestCase):
def setUp(self):
super().setUp()
self.client.__exit__(None, None, None)
def test_enable(self):
result = self.app.test_cli_runner().invoke(args=['unique-email-addresses', 'enable'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertTrue(FeatureFlag.unique_email_addresses)
def test_enable_already_enabled(self):
with self.app.test_request_context():
FeatureFlag.unique_email_addresses.enable()
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['unique-email-addresses', 'enable'])
self.assertEqual(result.exit_code, 1)
def test_enable_user_conflict(self):
with self.app.test_request_context():
db.session.add(UserEmail(user=self.get_user(), address='foo@example.com'))
db.session.add(UserEmail(user=self.get_user(), address='FOO@example.com'))
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['unique-email-addresses', 'enable'])
self.assertEqual(result.exit_code, 1)
def test_enable_global_conflict(self):
with self.app.test_request_context():
db.session.add(UserEmail(user=self.get_user(), address='foo@example.com', verified=True))
db.session.add(UserEmail(user=self.get_admin(), address='FOO@example.com', verified=True))
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['unique-email-addresses', 'enable'])
self.assertEqual(result.exit_code, 1)
def test_disable(self):
with self.app.test_request_context():
FeatureFlag.unique_email_addresses.enable()
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['unique-email-addresses', 'disable'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertFalse(FeatureFlag.unique_email_addresses)
def test_disable_already_enabled(self):
result = self.app.test_cli_runner().invoke(args=['unique-email-addresses', 'disable'])
self.assertEqual(result.exit_code, 1)
from uffd.database import db
from uffd.models import User, Group, Role, RoleGroup, FeatureFlag
from tests.utils import UffdTestCase
class TestUserCLI(UffdTestCase):
def setUp(self):
super().setUp()
role = Role(name='admin')
role.groups[self.get_admin_group()] = RoleGroup(group=self.get_admin_group())
db.session.add(role)
db.session.add(Role(name='test'))
db.session.commit()
self.client.__exit__(None, None, None)
def test_list(self):
result = self.app.test_cli_runner().invoke(args=['user', 'list'])
self.assertEqual(result.exit_code, 0)
def test_show(self):
result = self.app.test_cli_runner().invoke(args=['user', 'show', 'testuser'])
self.assertEqual(result.exit_code, 0)
result = self.app.test_cli_runner().invoke(args=['user', 'show', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
def test_create(self):
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'new user', '--mail', 'foobar@example.com']) # invalid login name
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', '']) # invalid mail
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'foobar@example.com', '--password', '']) # invalid password
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'foobar@example.com', '--displayname', '']) # invalid display name
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'foobar@example.com', '--add-role', 'doesnotexist']) # unknown role
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'testuser', '--mail', 'foobar@example.com']) # conflicting name
self.assertEqual(result.exit_code, 1)
with self.app.test_request_context():
FeatureFlag.unique_email_addresses.enable()
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'test@example.com']) # conflicting email address
self.assertEqual(result.exit_code, 1)
with self.app.test_request_context():
FeatureFlag.unique_email_addresses.disable()
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser', '--mail', 'newmail@example.com',
'--displayname', 'New Display Name', '--password', 'newpassword', '--add-role', 'admin'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='newuser').first()
self.assertIsNotNone(user)
self.assertEqual(user.primary_email.address, 'newmail@example.com')
self.assertEqual(user.displayname, 'New Display Name')
self.assertTrue(user.password.verify('newpassword'))
self.assertEqual(user.roles, Role.query.filter_by(name='admin').all())
self.assertIn(self.get_admin_group(), user.groups)
self.assertFalse(user.is_deactivated)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser2', '--mail', 'newmail2@example.com', '--deactivate'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='newuser2').first()
self.assertTrue(user.is_deactivated)
def test_update(self):
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'doesnotexist', '--displayname', 'foo'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--mail', '']) # invalid mail
self.assertEqual(result.exit_code, 1)
with self.app.test_request_context():
FeatureFlag.unique_email_addresses.enable()
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--mail', 'admin@example.com']) # conflicting mail
self.assertEqual(result.exit_code, 1)
with self.app.test_request_context():
FeatureFlag.unique_email_addresses.disable()
db.session.commit()
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--password', '']) # invalid password
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--displayname', '']) # invalid display name
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--remove-role', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--mail', 'newmail@example.com',
'--displayname', 'New Display Name', '--password', 'newpassword'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertIsNotNone(user)
self.assertEqual(user.primary_email.address, 'newmail@example.com')
self.assertEqual(user.displayname, 'New Display Name')
self.assertTrue(user.password.verify('newpassword'))
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--add-role', 'admin', '--add-role', 'test'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertEqual(set(user.roles), {Role.query.filter_by(name='admin').one(), Role.query.filter_by(name='test').one()})
self.assertIn(self.get_admin_group(), user.groups)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--remove-role', 'admin'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertEqual(user.roles, Role.query.filter_by(name='test').all())
self.assertNotIn(self.get_admin_group(), user.groups)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--clear-roles', '--add-role', 'admin'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertEqual(user.roles, Role.query.filter_by(name='admin').all())
self.assertIn(self.get_admin_group(), user.groups)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--deactivate'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertTrue(user.is_deactivated)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--activate'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertFalse(user.is_deactivated)
def test_delete(self):
with self.app.test_request_context():
self.assertIsNotNone(User.query.filter_by(loginname='testuser').first())
result = self.app.test_cli_runner().invoke(args=['user', 'delete', 'testuser'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertIsNone(User.query.filter_by(loginname='testuser').first())
result = self.app.test_cli_runner().invoke(args=['user', 'delete', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
class TestGroupCLI(UffdTestCase):
def setUp(self):
super().setUp()
self.client.__exit__(None, None, None)
def test_list(self):
result = self.app.test_cli_runner().invoke(args=['group', 'list'])
self.assertEqual(result.exit_code, 0)
def test_show(self):
result = self.app.test_cli_runner().invoke(args=['group', 'show', 'users'])
self.assertEqual(result.exit_code, 0)
result = self.app.test_cli_runner().invoke(args=['group', 'show', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
def test_create(self):
result = self.app.test_cli_runner().invoke(args=['group', 'create', 'users']) # Duplicate name
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['group', 'create', 'new group'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['group', 'create', 'newgroup', '--description', 'A new group'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
group = Group.query.filter_by(name='newgroup').first()
self.assertIsNotNone(group)
self.assertEqual(group.description, 'A new group')
def test_update(self):
result = self.app.test_cli_runner().invoke(args=['group', 'update', 'doesnotexist', '--description', 'foo'])
self.assertEqual(result.exit_code, 1)
result = self.app.test_cli_runner().invoke(args=['group', 'update', 'users', '--description', 'New description'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
group = Group.query.filter_by(name='users').first()
self.assertEqual(group.description, 'New description')
def test_update_without_description(self):
result = self.app.test_cli_runner().invoke(args=['group', 'update', 'users']) # Should not change anything
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
group = Group.query.filter_by(name='users').first()
self.assertEqual(group.description, 'Base group for all users')
def test_delete(self):
with self.app.test_request_context():
self.assertIsNotNone(Group.query.filter_by(name='users').first())
result = self.app.test_cli_runner().invoke(args=['group', 'delete', 'users'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
self.assertIsNone(Group.query.filter_by(name='users').first())
result = self.app.test_cli_runner().invoke(args=['group', 'delete', 'doesnotexist'])
self.assertEqual(result.exit_code, 1)
import os
import sys
import datetime
from uffd.database import db
from uffd.models import (
User, UserEmail, Group,
RecoveryCodeMethod, TOTPMethod, WebauthnMethod,
Role, RoleGroup,
Signup,
Invite, InviteGrant, InviteSignup,
DeviceLoginConfirmation,
Service,
OAuth2Client, OAuth2LogoutURI, OAuth2Grant, OAuth2Token, OAuth2DeviceLoginInitiation,
PasswordToken,
Session,
)
from tests.utils import MigrationTestCase
class TestFuzzy(MigrationTestCase):
def setUpApp(self):
self.app.config['LDAP_SERVICE_MOCK'] = True
self.app.config['OAUTH2_CLIENTS'] = {
'test': {
'service_name': 'test',
'client_secret': 'testsecret',
'redirect_uris': ['http://localhost:5004/oauthproxy/callback'],
'logout_urls': ['http://localhost:5004/oauthproxy/logout']
}
}
self.app.config['API_CLIENTS_2'] = {
'test': {
'service_name': 'test',
'client_secret': 'testsecret',
'scopes': ['checkpassword', 'getusers', 'getmails']
},
}
# Runs every upgrade/downgrade script with data. To do this we first upgrade
# to head, create data, then downgrade, upgrade, downgrade for every revision.
def test_migrations_fuzzy(self):
self.upgrade('head')
# Users and groups were created by 878b25c4fae7_ldap_to_db because we set LDAP_SERVICE_MOCK to True
user = User.query.first()
group = Group.query.first()
db.session.add(RecoveryCodeMethod(user=user))
db.session.add(TOTPMethod(user=user, name='mytotp'))
db.session.add(WebauthnMethod(user=user, name='mywebauthn', cred=b''))
role = Role(name='role', groups={group: RoleGroup(group=group)})
db.session.add(role)
role.members.append(user)
db.session.add(Role(name='base', included_roles=[role], locked=True, is_default=True, moderator_group=group, groups={group: RoleGroup(group=group)}))
db.session.add(Signup(loginname='newuser', displayname='New User', mail='newuser@example.com', password='newpassword'))
db.session.add(Signup(loginname='testuser', displayname='Testuser', mail='testuser@example.com', password='testpassword', user=user))
invite = Invite(valid_until=datetime.datetime.now(), roles=[role])
db.session.add(invite)
invite.signups.append(InviteSignup(loginname='newuser', displayname='New User', mail='newuser@example.com', password='newpassword'))
invite.grants.append(InviteGrant(user=user))
db.session.add(Invite(creator=user, valid_until=datetime.datetime.now()))
service = Service(name='testservice', access_group=group)
oauth2_client = OAuth2Client(service=service, client_id='testclient', client_secret='testsecret', redirect_uris=['http://localhost:1234/callback'], logout_uris=[OAuth2LogoutURI(method='GET', uri='http://localhost:1234/callback')])
db.session.add_all([service, oauth2_client])
session = Session(
user=user,
secret='0919de9da3f7dc6c33ab849f44c20e8221b673ca701030de17488f3269fc5469f100e2ce56e5fd71305b23d8ecbb06d80d22004adcd3fefc5f5fcb80a436e31f2c2d9cc8fe8c59ae44871ae4524408d312474570280bf29d3ba145a4bd00010ca758eaa0795b180ec12978b42d13bf4c4f06f72103d44077022ce656610be855',
ip_address='127.0.0.1',
user_agent='Mozilla/5.0 (X11; Linux x86_64; rv:124.0) Gecko/20100101 Firefox/124.0',
mfa_done=True,
)
db.session.add(session)
db.session.add(OAuth2Grant(session=session, client=oauth2_client, _code='testcode', redirect_uri='http://example.com/callback', expires=datetime.datetime.now()))
db.session.add(OAuth2Token(session=session, client=oauth2_client, token_type='Bearer', _access_token='testcode', _refresh_token='testcode', expires=datetime.datetime.now()))
db.session.add(OAuth2DeviceLoginInitiation(client=oauth2_client, confirmations=[DeviceLoginConfirmation(session=session)]))
db.session.add(PasswordToken(user=user))
db.session.commit()
revs = [s.split('_', 1)[0] for s in os.listdir('uffd/migrations/versions') if '_' in s and s.endswith('.py')]
for rev in revs:
self.downgrade('-1')
self.upgrade('+1')
self.downgrade('-1')
from uffd.database import db
from uffd.models.misc import lock_table, Lock
from tests.utils import MigrationTestCase
class TestForMissingLockRows(MigrationTestCase):
def test_check_missing_lock_rows(self):
self.upgrade('head')
existing_locks = {row[0] for row in db.session.execute(db.select([lock_table.c.name])).fetchall()}
for name in Lock.ALL_LOCKS - existing_locks:
self.fail(f'Lock "{name}" is missing. Make sure to add a migration that inserts it.')
# Add something like this:
# conn = op.get_bind()
# lock_table = sa.table('lock', sa.column('name'))
# conn.execute(sa.insert(lock_table).values(name='NAME'))
from uffd.database import db
from uffd.models.misc import lock_table, Lock
from tests.utils import MigrationTestCase
user_table = db.table('user',
db.column('id'),
db.column('unix_uid'),
db.column('loginname'),
db.column('displayname'),
db.column('primary_email_id'),
db.column('is_service_user'),
)
user_email_table = db.table('user_email',
db.column('id'),
db.column('address'),
db.column('address_normalized'),
db.column('verified'),
)
group_table = db.table('group',
db.column('id'),
db.column('unix_gid'),
db.column('name'),
db.column('description')
)
uid_allocation_table = db.table('uid_allocation', db.column('id'))
gid_allocation_table = db.table('gid_allocation', db.column('id'))
class TestMigration(MigrationTestCase):
REVISION = 'aeb07202a6c8'
def setUpApp(self):
self.app.config['USER_MIN_UID'] = 10000
self.app.config['USER_MAX_UID'] = 10005
self.app.config['USER_SERVICE_MIN_UID'] = 10006
self.app.config['USER_SERVICE_MAX_UID'] = 10010
self.app.config['GROUP_MIN_GID'] = 20000
self.app.config['GROUP_MAX_GID'] = 20005
def create_user(self, uid):
db.session.execute(db.insert(user_email_table).values(
address=f'email{uid}@example.com',
address_normalized=f'email{uid}@example.com',
verified=True
))
email_id = db.session.execute(
db.select([user_email_table.c.id])
.where(user_email_table.c.address == f'email{uid}@example.com')
).scalar()
db.session.execute(db.insert(user_table).values(
unix_uid=uid,
loginname=f'user{uid}',
displayname='user',
primary_email_id=email_id,
is_service_user=False
))
def create_group(self, gid):
db.session.execute(db.insert(group_table).values(unix_gid=gid, name=f'group{gid}', description=''))
def fetch_uid_allocations(self):
return [row[0] for row in db.session.execute(
db.select([uid_allocation_table])
.order_by(uid_allocation_table.c.id)
).fetchall()]
def fetch_gid_allocations(self):
return [row[0] for row in db.session.execute(
db.select([gid_allocation_table])
.order_by(gid_allocation_table.c.id)
).fetchall()]
def test_empty(self):
# No users/groups
self.upgrade()
self.assertEqual(self.fetch_uid_allocations(), [])
self.assertEqual(self.fetch_gid_allocations(), [])
def test_gid_first_minus_one(self):
self.create_group(19999)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [19999])
def test_gid_first(self):
self.create_group(20000)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [20000])
def test_gid_first_plus_one(self):
self.create_group(20001)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [20000, 20001])
def test_gid_last_minus_one(self):
self.create_group(20004)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [20000, 20001, 20002, 20003, 20004])
def test_gid_last(self):
self.create_group(20005)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [20000, 20001, 20002, 20003, 20004, 20005])
def test_gid_last_plus_one(self):
self.create_group(20006)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [20006])
def test_gid_complex(self):
self.create_group(10)
self.create_group(20001)
self.create_group(20003)
self.create_group(20010)
self.upgrade()
self.assertEqual(self.fetch_gid_allocations(), [10, 20000, 20001, 20002, 20003, 20010])
# The code for UIDs is mostly the same as for GIDs, so we don't test all
# the edge cases again.
def test_uid_different_ranges(self):
self.create_user(10)
self.create_user(10000)
self.create_user(10002)
self.create_user(10007)
self.create_user(10009)
self.create_user(90000)
self.upgrade()
self.assertEqual(self.fetch_uid_allocations(), [10, 10000, 10001, 10002, 10006, 10007, 10008, 10009, 90000])
def test_uid_same_ranges(self):
self.app.config['USER_MIN_UID'] = 10000
self.app.config['USER_MAX_UID'] = 10010
self.app.config['USER_SERVICE_MIN_UID'] = 10000
self.app.config['USER_SERVICE_MAX_UID'] = 10010
self.create_user(10)
self.create_user(10000)
self.create_user(10002)
self.create_user(10007)
self.create_user(10009)
self.create_user(90000)
self.upgrade()
self.assertEqual(self.fetch_uid_allocations(), [10, 10000, 10001, 10002, 10003, 10004, 10005, 10006, 10007, 10008, 10009, 90000])