Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Show changes
Showing
with 3199 additions and 42 deletions
from flask import url_for
from uffd.database import db
from uffd.models import User, Role, RoleGroup
from tests.utils import dump, UffdTestCase
class TestRoleViews(UffdTestCase):
def setUp(self):
super().setUp()
self.login_as('admin')
def test_index(self):
db.session.add(Role(name='base', description='Base role description'))
db.session.add(Role(name='test1', description='Test1 role description'))
db.session.commit()
r = self.client.get(path=url_for('role.index'), follow_redirects=True)
dump('role_index', r)
self.assertEqual(r.status_code, 200)
def test_index_empty(self):
r = self.client.get(path=url_for('role.index'), follow_redirects=True)
dump('role_index_empty', r)
self.assertEqual(r.status_code, 200)
def test_show(self):
role = Role(name='base', description='Base role description')
db.session.add(role)
db.session.commit()
r = self.client.get(path=url_for('role.show', roleid=role.id), follow_redirects=True)
dump('role_show', r)
self.assertEqual(r.status_code, 200)
def test_new(self):
r = self.client.get(path=url_for('role.new'), follow_redirects=True)
dump('role_new', r)
self.assertEqual(r.status_code, 200)
def test_update(self):
role = Role(name='base', description='Base role description')
db.session.add(role)
db.session.commit()
role.groups[self.get_admin_group()] = RoleGroup()
db.session.commit()
self.assertEqual(role.name, 'base')
self.assertEqual(role.description, 'Base role description')
self.assertSetEqual(set(role.groups), {self.get_admin_group()})
r = self.client.post(path=url_for('role.update', roleid=role.id),
data={'name': 'base1', 'description': 'Base role description1', 'moderator-group': '', 'group-%d'%self.get_users_group().id: '1', 'group-%d'%self.get_access_group().id: '1'},
follow_redirects=True)
dump('role_update', r)
self.assertEqual(r.status_code, 200)
role = Role.query.get(role.id)
self.assertEqual(role.name, 'base1')
self.assertEqual(role.description, 'Base role description1')
self.assertSetEqual(set(role.groups), {self.get_access_group(), self.get_users_group()})
# TODO: verify that group memberships are updated
def test_create(self):
self.assertIsNone(Role.query.filter_by(name='base').first())
r = self.client.post(path=url_for('role.update'),
data={'name': 'base', 'description': 'Base role description', 'moderator-group': '', 'group-%d'%self.get_users_group().id: '1', 'group-%d'%self.get_access_group().id: '1'},
follow_redirects=True)
dump('role_create', r)
self.assertEqual(r.status_code, 200)
role = Role.query.filter_by(name='base').first()
self.assertIsNotNone(role)
self.assertEqual(role.name, 'base')
self.assertEqual(role.description, 'Base role description')
self.assertSetEqual(set(role.groups), {self.get_access_group(), self.get_users_group()})
# TODO: verify that group memberships are updated (currently not possible with ldap mock!)
def test_create_with_moderator_group(self):
self.assertIsNone(Role.query.filter_by(name='base').first())
r = self.client.post(path=url_for('role.update'),
data={'name': 'base', 'description': 'Base role description', 'moderator-group': self.get_admin_group().id, 'group-%d'%self.get_users_group().id: '1', 'group-%d'%self.get_access_group().id: '1'},
follow_redirects=True)
self.assertEqual(r.status_code, 200)
role = Role.query.filter_by(name='base').first()
self.assertIsNotNone(role)
self.assertEqual(role.name, 'base')
self.assertEqual(role.description, 'Base role description')
self.assertEqual(role.moderator_group.name, 'uffd_admin')
self.assertSetEqual(set(role.groups), {self.get_access_group(), self.get_users_group()})
# TODO: verify that group memberships are updated (currently not possible with ldap mock!)
def test_delete(self):
role = Role(name='base', description='Base role description')
db.session.add(role)
db.session.commit()
role_id = role.id
self.assertIsNotNone(Role.query.get(role_id))
r = self.client.get(path=url_for('role.delete', roleid=role.id), follow_redirects=True)
dump('role_delete', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(Role.query.get(role_id))
# TODO: verify that group memberships are updated (currently not possible with ldap mock!)
def test_set_default(self):
db.session.add(User(loginname='service', is_service_user=True, primary_email_address='service@example.com', displayname='Service'))
db.session.commit()
role = Role(name='test')
db.session.add(role)
role.groups[self.get_admin_group()] = RoleGroup()
user1 = self.get_user()
user2 = self.get_admin()
service_user = User.query.filter_by(loginname='service').one_or_none()
self.assertSetEqual(set(self.get_user().roles_effective), set())
self.assertSetEqual(set(self.get_admin().roles_effective), set())
self.assertSetEqual(set(service_user.roles_effective), set())
role.members.append(self.get_user())
role.members.append(service_user)
self.assertSetEqual(set(self.get_user().roles_effective), {role})
self.assertSetEqual(set(self.get_admin().roles_effective), set())
self.assertSetEqual(set(service_user.roles_effective), {role})
db.session.commit()
role_id = role.id
self.assertSetEqual(set(role.members), {self.get_user(), service_user})
r = self.client.get(path=url_for('role.set_default', roleid=role.id), follow_redirects=True)
dump('role_set_default', r)
self.assertEqual(r.status_code, 200)
role = Role.query.get(role_id)
service_user = User.query.filter_by(loginname='service').one_or_none()
self.assertSetEqual(set(role.members), {service_user})
self.assertSetEqual(set(self.get_user().roles_effective), {role})
self.assertSetEqual(set(self.get_admin().roles_effective), {role})
def test_unset_default(self):
admin_role = Role(name='admin', is_default=True)
db.session.add(admin_role)
admin_role.groups[self.get_admin_group()] = RoleGroup()
db.session.add(User(loginname='service', is_service_user=True, primary_email_address='service@example.com', displayname='Service'))
db.session.commit()
role = Role(name='test', is_default=True)
db.session.add(role)
service_user = User.query.filter_by(loginname='service').one_or_none()
role.members.append(service_user)
self.assertSetEqual(set(self.get_user().roles_effective), {role, admin_role})
self.assertSetEqual(set(self.get_admin().roles_effective), {role, admin_role})
self.assertSetEqual(set(service_user.roles_effective), {role})
db.session.commit()
role_id = role.id
admin_role_id = admin_role.id
self.assertSetEqual(set(role.members), {service_user})
r = self.client.get(path=url_for('role.unset_default', roleid=role.id), follow_redirects=True)
dump('role_unset_default', r)
self.assertEqual(r.status_code, 200)
role = Role.query.get(role_id)
admin_role = Role.query.get(admin_role_id)
service_user = User.query.filter_by(loginname='service').one_or_none()
self.assertSetEqual(set(role.members), {service_user})
self.assertSetEqual(set(self.get_user().roles_effective), {admin_role})
self.assertSetEqual(set(self.get_admin().roles_effective), {admin_role})
from flask import url_for
from uffd.database import db
from uffd.models import Role, RoleGroup
from tests.utils import dump, UffdTestCase
class TestRolemodViewsLoggedOut(UffdTestCase):
def test_acl_nologin(self):
r = self.client.get(path=url_for('rolemod.index'), follow_redirects=True)
dump('rolemod_acl_nologin', r)
self.assertEqual(r.status_code, 200)
def test_index(self):
db.session.add(Role(name='test_role_1', moderator_group=self.get_access_group()))
db.session.add(Role(name='test_role_2', moderator_group=self.get_admin_group()))
db.session.add(Role(name='test_role_3'))
db.session.commit()
self.login_as('user')
r = self.client.get(path=url_for('rolemod.index'), follow_redirects=True)
dump('rolemod_index', r)
self.assertEqual(r.status_code, 200)
self.assertIn('test_role_1'.encode(), r.data)
self.assertNotIn('test_role_2'.encode(), r.data)
self.assertNotIn('test_role_3'.encode(), r.data)
class TestRolemodViews(UffdTestCase):
def setUp(self):
super().setUp()
self.login_as('user')
def test_acl_notmod(self):
db.session.add(Role(name='test', moderator_group=self.get_admin_group()))
db.session.commit()
r = self.client.get(path=url_for('rolemod.index'), follow_redirects=True)
dump('rolemod_acl_notmod', r)
self.assertEqual(r.status_code, 403)
def test_show(self):
role = Role(name='test', moderator_group=self.get_access_group())
db.session.add(role)
role.members.append(self.get_admin())
db.session.commit()
r = self.client.get(path=url_for('rolemod.show', role_id=role.id), follow_redirects=True)
dump('rolemod_show', r)
self.assertEqual(r.status_code, 200)
def test_show_empty(self):
role = Role(name='test', moderator_group=self.get_access_group())
db.session.add(role)
db.session.commit()
r = self.client.get(path=url_for('rolemod.show', role_id=role.id), follow_redirects=True)
dump('rolemod_show_empty', r)
self.assertEqual(r.status_code, 200)
def test_show_noperm(self):
# Make sure we pass the blueprint-wide acl check
db.session.add(Role(name='other_role', moderator_group=self.get_access_group()))
role = Role(name='test', moderator_group=self.get_admin_group())
db.session.add(role)
db.session.commit()
r = self.client.get(path=url_for('rolemod.show', role_id=role.id), follow_redirects=True)
dump('rolemod_show_noperm', r)
self.assertEqual(r.status_code, 403)
def test_show_nomod(self):
# Make sure we pass the blueprint-wide acl check
db.session.add(Role(name='other_role', moderator_group=self.get_access_group()))
role = Role(name='test')
db.session.add(role)
db.session.commit()
r = self.client.get(path=url_for('rolemod.show', role_id=role.id), follow_redirects=True)
dump('rolemod_show_nomod', r)
self.assertEqual(r.status_code, 403)
def test_update(self):
role = Role(name='test', description='old_description', moderator_group=self.get_access_group())
db.session.add(role)
db.session.commit()
r = self.client.post(path=url_for('rolemod.update', role_id=role.id), data={'description': 'new_description'}, follow_redirects=True)
dump('rolemod_update', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(Role.query.get(role.id).description, 'new_description')
def test_update_descr_too_long(self):
role = Role(name='test', description='old_description', moderator_group=self.get_access_group())
db.session.add(role)
db.session.commit()
r = self.client.post(path=url_for('rolemod.update', role_id=role.id), data={'description': 'long_description'*300}, follow_redirects=True)
dump('rolemod_update_descr_too_long', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(Role.query.get(role.id).description, 'old_description')
def test_update_noperm(self):
# Make sure we pass the blueprint-wide acl check
db.session.add(Role(name='other_role', moderator_group=self.get_access_group()))
role = Role(name='test', description='old_description', moderator_group=self.get_admin_group())
db.session.add(role)
db.session.commit()
r = self.client.post(path=url_for('rolemod.update', role_id=role.id), data={'description': 'new_description'}, follow_redirects=True)
dump('rolemod_update_noperm', r)
self.assertEqual(r.status_code, 403)
self.assertEqual(Role.query.get(role.id).description, 'old_description')
def test_update_nomod(self):
# Make sure we pass the blueprint-wide acl check
db.session.add(Role(name='other_role', moderator_group=self.get_access_group()))
role = Role(name='test', description='old_description')
db.session.add(role)
db.session.commit()
r = self.client.post(path=url_for('rolemod.update', role_id=role.id), data={'description': 'new_description'}, follow_redirects=True)
dump('rolemod_update_nomod', r)
self.assertEqual(r.status_code, 403)
self.assertEqual(Role.query.get(role.id).description, 'old_description')
def test_delete_member(self):
role = Role(name='test', moderator_group=self.get_access_group())
role.groups[self.get_admin_group()] = RoleGroup()
db.session.add(role)
role.members.append(self.get_admin())
db.session.commit()
role.update_member_groups()
db.session.commit()
user = self.get_admin()
group = self.get_admin_group()
self.assertTrue(user in group.members)
role = Role.query.get(role.id)
self.assertTrue(user in role.members)
r = self.client.get(path=url_for('rolemod.delete_member', role_id=role.id, member_id=user.id), follow_redirects=True)
dump('rolemod_delete_member', r)
self.assertEqual(r.status_code, 200)
user_updated = self.get_admin()
group = self.get_admin_group()
self.assertFalse(user_updated in group.members)
role = Role.query.get(role.id)
self.assertFalse(user_updated in role.members)
def test_delete_member_nomember(self):
role = Role(name='test', moderator_group=self.get_access_group())
role.groups[self.get_admin_group()] = RoleGroup()
db.session.add(role)
db.session.commit()
user = self.get_admin()
r = self.client.get(path=url_for('rolemod.delete_member', role_id=role.id, member_id=user.id), follow_redirects=True)
dump('rolemod_delete_member_nomember', r)
self.assertEqual(r.status_code, 200)
def test_delete_member_noperm(self):
# Make sure we pass the blueprint-wide acl check
db.session.add(Role(name='other_role', moderator_group=self.get_access_group()))
role = Role(name='test', moderator_group=self.get_admin_group())
db.session.add(role)
role.members.append(self.get_admin())
db.session.commit()
user = self.get_admin()
role = Role.query.get(role.id)
self.assertTrue(user in role.members)
r = self.client.get(path=url_for('rolemod.delete_member', role_id=role.id, member_id=user.id), follow_redirects=True)
dump('rolemod_delete_member_noperm', r)
self.assertEqual(r.status_code, 403)
user_updated = self.get_admin()
role = Role.query.get(role.id)
self.assertTrue(user_updated in role.members)
def test_delete_member_nomod(self):
# Make sure we pass the blueprint-wide acl check
db.session.add(Role(name='other_role', moderator_group=self.get_access_group()))
role = Role(name='test')
db.session.add(role)
role.members.append(self.get_admin())
db.session.commit()
user = self.get_admin()
role = Role.query.get(role.id)
self.assertTrue(user in role.members)
r = self.client.get(path=url_for('rolemod.delete_member', role_id=role.id, member_id=user.id), follow_redirects=True)
dump('rolemod_delete_member_nomod', r)
self.assertEqual(r.status_code, 403)
user_updated = self.get_admin()
role = Role.query.get(role.id)
self.assertTrue(user_updated in role.members)
import datetime
import re
import time
from flask import url_for, request, session
from uffd.database import db
from uffd.models import PasswordToken, UserEmail, Role, RoleGroup, Service, ServiceUser, FeatureFlag, MFAMethod, RecoveryCodeMethod, TOTPMethod, WebauthnMethod
from uffd.models.mfa import _hotp
from tests.utils import dump, UffdTestCase, db_flush
class TestSelfservice(UffdTestCase):
def test_index(self):
self.login_as('user')
r = self.client.get(path=url_for('selfservice.index'))
dump('selfservice_index', r)
self.assertEqual(r.status_code, 200)
user = request.user
self.assertIn(user.displayname.encode(), r.data)
self.assertIn(user.loginname.encode(), r.data)
self.assertIn(user.primary_email.address.encode(), r.data)
def test_update_displayname(self):
self.login_as('user')
r = self.client.post(path=url_for('selfservice.update_profile'),
data={'displayname': 'New Display Name'},
follow_redirects=True)
dump('update_displayname', r)
self.assertEqual(r.status_code, 200)
user = self.get_user()
self.assertEqual(user.displayname, 'New Display Name')
def test_update_displayname_invalid(self):
self.login_as('user')
r = self.client.post(path=url_for('selfservice.update_profile'),
data={'displayname': ''},
follow_redirects=True)
dump('update_displayname_invalid', r)
self.assertEqual(r.status_code, 200)
user = self.get_user()
self.assertNotEqual(user.displayname, '')
def test_add_email(self):
self.login_as('user')
r = self.client.post(path=url_for('selfservice.add_email'),
data={'address': 'new@example.com'},
follow_redirects=True)
dump('selfservice_add_email', r)
self.assertEqual(r.status_code, 200)
self.assertIn('new@example.com', self.app.last_mail['To'])
m = re.search(r'/email/([0-9]+)/verify/(.*)', str(self.app.last_mail.get_content()))
email_id, secret = m.groups()
email = UserEmail.query.get(email_id)
self.assertEqual(email.user.id, request.user.id)
self.assertEqual(email.address, 'new@example.com')
self.assertFalse(email.verified)
self.assertFalse(email.verification_expired)
self.assertTrue(email.verification_secret.verify(secret))
def test_add_email_duplicate(self):
self.login_as('user')
r = self.client.post(path=url_for('selfservice.add_email'),
data={'address': 'test@example.com'},
follow_redirects=True)
dump('selfservice_add_email_duplicate', r)
self.assertFalse(hasattr(self.app, 'last_mail'))
self.assertEqual(len(self.get_user().all_emails), 1)
self.assertEqual(UserEmail.query.filter_by(user=None).all(), [])
def test_verify_email(self):
self.login_as('user')
email = UserEmail(user=self.get_user(), address='new@example.com')
secret = email.start_verification()
db.session.add(email)
db.session.commit()
email_id = email.id
r = self.client.get(path=url_for('selfservice.verify_email', email_id=email_id, secret=secret), follow_redirects=True)
dump('selfservice_verify_email', r)
self.assertEqual(r.status_code, 200)
email = UserEmail.query.get(email_id)
self.assertTrue(email.verified)
self.assertEqual(self.get_user().primary_email.address, 'test@example.com')
def test_verify_email_notfound(self):
self.login_as('user')
r = self.client.get(path=url_for('selfservice.verify_email', email_id=2342, secret='invalidsecret'), follow_redirects=True)
dump('selfservice_verify_email_notfound', r)
def test_verify_email_wrong_user(self):
self.login_as('user')
email = UserEmail(user=self.get_admin(), address='new@example.com')
secret = email.start_verification()
db.session.add(email)
db.session.commit()
r = self.client.get(path=url_for('selfservice.verify_email', email_id=email.id, secret=secret), follow_redirects=True)
dump('selfservice_verify_email_wrong_user', r)
self.assertFalse(email.verified)
def test_verify_email_wrong_secret(self):
self.login_as('user')
email = UserEmail(user=self.get_user(), address='new@example.com')
secret = email.start_verification()
db.session.add(email)
db.session.commit()
r = self.client.get(path=url_for('selfservice.verify_email', email_id=email.id, secret='invalidsecret'), follow_redirects=True)
dump('selfservice_verify_email_wrong_secret', r)
self.assertFalse(email.verified)
def test_verify_email_expired(self):
self.login_as('user')
email = UserEmail(user=self.get_user(), address='new@example.com')
secret = email.start_verification()
email.verification_expires = datetime.datetime.utcnow() - datetime.timedelta(days=1)
db.session.add(email)
db.session.commit()
r = self.client.get(path=url_for('selfservice.verify_email', email_id=email.id, secret=secret), follow_redirects=True)
dump('selfservice_verify_email_expired', r)
self.assertFalse(email.verified)
def test_verify_email_legacy(self):
self.login_as('user')
email = UserEmail(
user=self.get_user(),
address='new@example.com',
verification_legacy_id=1337,
_verification_secret='{PLAIN}ZgvsUs2bZjr9Whpy1la7Q0PHbhjmpXtNdH1mCmDbQP7',
verification_expires=datetime.datetime.utcnow()+datetime.timedelta(days=1)
)
db.session.add(email)
db.session.commit()
email_id = email.id
r = self.client.get(path=f'/self/token/mail_verification/1337/ZgvsUs2bZjr9Whpy1la7Q0PHbhjmpXtNdH1mCmDbQP7', follow_redirects=True)
dump('selfservice_verify_email_legacy', r)
self.assertEqual(r.status_code, 200)
email = UserEmail.query.get(email_id)
self.assertTrue(email.verified)
self.assertEqual(self.get_user().primary_email, email)
def test_verify_email_duplicate_strict_uniqueness(self):
FeatureFlag.unique_email_addresses.enable()
db.session.commit()
self.login_as('user')
email = UserEmail(user=self.get_user(), address='admin@example.com')
secret = email.start_verification()
db.session.add(email)
db.session.commit()
email_id = email.id
r = self.client.get(path=url_for('selfservice.verify_email', email_id=email.id, secret=secret), follow_redirects=True)
dump('selfservice_verify_email_duplicate_strict_uniqueness', r)
email = UserEmail.query.get(email_id)
self.assertFalse(email.verified)
def test_retry_email_verification(self):
self.login_as('user')
email = UserEmail(user=self.get_user(), address='new@example.com')
old_secret = email.start_verification()
db.session.add(email)
db.session.commit()
r = self.client.get(path=url_for('selfservice.retry_email_verification', email_id=email.id), follow_redirects=True)
dump('selfservice_retry_email_verification', r)
self.assertEqual(r.status_code, 200)
self.assertIn('new@example.com', self.app.last_mail['To'])
m = re.search(r'/email/([0-9]+)/verify/(.*)', str(self.app.last_mail.get_content()))
email_id, secret = m.groups()
email = UserEmail.query.get(email_id)
self.assertEqual(email.user.id, request.user.id)
self.assertEqual(email.address, 'new@example.com')
self.assertFalse(email.verified)
self.assertFalse(email.verification_expired)
self.assertTrue(email.verification_secret.verify(secret))
self.assertFalse(email.verification_secret.verify(old_secret))
def test_delete_email(self):
self.login_as('user')
email = UserEmail(user=self.get_user(), address='new@example.com', verified=True)
db.session.add(email)
self.get_user().recovery_email = email
db.session.commit()
r = self.client.post(path=url_for('selfservice.delete_email', email_id=email.id), follow_redirects=True)
dump('selfservice_delete_email', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(UserEmail.query.filter_by(address='new@example.com').first())
self.assertIsNone(self.get_user().recovery_email)
self.assertEqual(self.get_user().primary_email.address, 'test@example.com')
def test_delete_email_invalid(self):
self.login_as('user')
r = self.client.post(path=url_for('selfservice.delete_email', email_id=2324), follow_redirects=True)
self.assertEqual(r.status_code, 404)
def test_delete_email_primary(self):
self.login_as('user')
r = self.client.post(path=url_for('selfservice.delete_email', email_id=request.user.primary_email.id), follow_redirects=True)
dump('selfservice_delete_email_primary', r)
self.assertEqual(self.get_user().primary_email.address, 'test@example.com')
def test_update_email_preferences(self):
self.login_as('user')
user_id = self.get_user().id
email = UserEmail(user=self.get_user(), address='new@example.com', verified=True)
db.session.add(email)
service = Service(name='service', enable_email_preferences=True, limit_access=False)
db.session.add(service)
db.session.commit()
email_id = email.id
service_id = service.id
old_email_id = self.get_user().primary_email.id
r = self.client.post(path=url_for('selfservice.update_email_preferences'),
data={'primary_email': str(email_id), 'recovery_email': 'primary'},
follow_redirects=True)
dump('selfservice_update_email_preferences', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(self.get_user().primary_email.id, email.id)
self.assertIsNone(self.get_user().recovery_email)
r = self.client.post(path=url_for('selfservice.update_email_preferences'),
data={'primary_email': str(old_email_id), 'recovery_email': str(email_id)},
follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertEqual(self.get_user().primary_email.id, old_email_id)
self.assertEqual(self.get_user().recovery_email.id, email_id)
r = self.client.post(path=url_for('selfservice.update_email_preferences'),
data={'primary_email': str(old_email_id), 'recovery_email': 'primary', f'service_{service_id}_email': 'primary'},
follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertIsNone(ServiceUser.query.get((service_id, user_id)).service_email)
r = self.client.post(path=url_for('selfservice.update_email_preferences'),
data={'primary_email': str(old_email_id), 'recovery_email': 'primary', f'service_{service_id}_email': str(email_id)},
follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertEqual(ServiceUser.query.get((service_id, user_id)).service_email.id, email_id)
def test_update_email_preferences_unverified(self):
self.login_as('user')
user_id = self.get_user().id
email = UserEmail(user=self.get_user(), address='new@example.com')
db.session.add(email)
service = Service(name='service', enable_email_preferences=True, limit_access=False)
db.session.add(service)
db.session.commit()
email_id = email.id
service_id = service.id
old_email_id = self.get_user().primary_email.id
with self.assertRaises(Exception):
r = self.client.post(path=url_for('selfservice.update_email_preferences'),
data={'primary_email': str(email_id), 'recovery_email': 'primary'},
follow_redirects=True)
with self.app.test_request_context():
self.assertEqual(self.get_user().primary_email.address, 'test@example.com')
with self.assertRaises(Exception):
r = self.client.post(path=url_for('selfservice.update_email_preferences'),
data={'primary_email': str(old_email_id), 'recovery_email': str(email_id)},
follow_redirects=True)
with self.app.test_request_context():
self.assertIsNone(self.get_user().recovery_email)
with self.assertRaises(Exception):
r = self.client.post(path=url_for('selfservice.update_email_preferences'),
data={'primary_email': str(old_email_id), 'recovery_email': 'primary', f'service_{service_id}_email': str(email_id)},
follow_redirects=True)
with self.app.test_request_context():
self.assertIsNone(ServiceUser.query.get((service_id, user_id)).service_email)
def test_update_email_preferences_invalid(self):
self.login_as('user')
user_id = self.get_user().id
email = UserEmail(user=self.get_user(), address='new@example.com', verified=True)
db.session.add(email)
service = Service(name='service', enable_email_preferences=True, limit_access=False)
db.session.add(service)
db.session.commit()
with self.assertRaises(Exception):
r = self.client.post(path=url_for('selfservice.update_email_preferences'),
data={'primary_email': str(email.id), 'recovery_email': '2342'},
follow_redirects=True)
with self.assertRaises(Exception):
r = self.client.post(path=url_for('selfservice.update_email_preferences'),
data={'primary_email': str(email.id), 'recovery_email': 'primary', f'service_{service_id}_email': '2342'},
follow_redirects=True)
with self.assertRaises(Exception):
r = self.client.post(path=url_for('selfservice.update_email_preferences'),
data={'primary_email': 'primary', 'recovery_email': 'primary'},
follow_redirects=True)
with self.assertRaises(Exception):
r = self.client.post(path=url_for('selfservice.update_email_preferences'),
data={'primary_email': '2342', 'recovery_email': 'primary'},
follow_redirects=True)
def test_change_password(self):
self.login_as('user')
r = self.client.post(path=url_for('selfservice.change_password'),
data={'password1': 'newpassword', 'password2': 'newpassword'},
follow_redirects=True)
dump('change_password', r)
self.assertEqual(r.status_code, 200)
self.assertTrue(self.get_user().password.verify('newpassword'))
def test_change_password_invalid(self):
self.login_as('user')
r = self.client.post(path=url_for('selfservice.change_password'),
data={'password1': 'shortpw', 'password2': 'shortpw'},
follow_redirects=True)
dump('change_password_invalid', r)
self.assertEqual(r.status_code, 200)
user = self.get_user()
self.assertFalse(user.password.verify('shortpw'))
self.assertTrue(user.password.verify('userpassword'))
# Regression test for #100 (login not possible if password contains character disallowed by SASLprep)
def test_change_password_samlprep_invalid(self):
self.login_as('user')
r = self.client.post(path=url_for('selfservice.change_password'),
data={'password1': 'shortpw\n', 'password2': 'shortpw\n'},
follow_redirects=True)
dump('change_password_samlprep_invalid', r)
self.assertEqual(r.status_code, 200)
user = self.get_user()
self.assertFalse(user.password.verify('shortpw\n'))
self.assertTrue(user.password.verify('userpassword'))
def test_change_password_mismatch(self):
self.login_as('user')
r = self.client.post(path=url_for('selfservice.change_password'),
data={'password1': 'newpassword1', 'password2': 'newpassword2'},
follow_redirects=True)
dump('change_password_mismatch', r)
self.assertEqual(r.status_code, 200)
user = self.get_user()
self.assertFalse(user.password.verify('newpassword1'))
self.assertFalse(user.password.verify('newpassword2'))
self.assertTrue(user.password.verify('userpassword'))
def test_leave_role(self):
baserole = Role(name='baserole', is_default=True)
db.session.add(baserole)
baserole.groups[self.get_access_group()] = RoleGroup()
role1 = Role(name='testrole1')
role2 = Role(name='testrole2')
db.session.add(role1)
db.session.add(role2)
self.get_user().roles = [role1, role2]
db.session.commit()
roleid = role1.id
self.login_as('user')
r = self.client.post(path=url_for('selfservice.leave_role', roleid=roleid), follow_redirects=True)
dump('leave_role', r)
self.assertEqual(r.status_code, 200)
_user = self.get_user()
self.assertEqual(len(_user.roles), 1)
self.assertEqual(list(_user.roles)[0].name, 'testrole2')
def test_forgot_password(self):
user = self.get_user()
r = self.client.get(path=url_for('selfservice.forgot_password'))
dump('forgot_password', r)
self.assertEqual(r.status_code, 200)
user = self.get_user()
r = self.client.post(path=url_for('selfservice.forgot_password'),
data={'loginname': user.loginname, 'mail': user.primary_email.address}, follow_redirects=True)
dump('forgot_password_submit', r)
self.assertEqual(r.status_code, 200)
token = PasswordToken.query.filter(PasswordToken.user == user).first()
self.assertIsNotNone(token)
self.assertIn(token.token, str(self.app.last_mail.get_content()))
def test_forgot_password_wrong_user(self):
user = self.get_user()
r = self.client.get(path=url_for('selfservice.forgot_password'))
self.assertEqual(r.status_code, 200)
user = self.get_user()
r = self.client.post(path=url_for('selfservice.forgot_password'),
data={'loginname': 'not_a_user', 'mail': user.primary_email.address}, follow_redirects=True)
dump('forgot_password_submit_wrong_user', r)
self.assertEqual(r.status_code, 200)
self.assertFalse(hasattr(self.app, 'last_mail'))
self.assertEqual(len(PasswordToken.query.all()), 0)
def test_forgot_password_wrong_email(self):
user = self.get_user()
r = self.client.get(path=url_for('selfservice.forgot_password'), follow_redirects=True)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('selfservice.forgot_password'),
data={'loginname': user.loginname, 'mail': 'not_an_email@example.com'}, follow_redirects=True)
dump('forgot_password_submit_wrong_email', r)
self.assertEqual(r.status_code, 200)
self.assertFalse(hasattr(self.app, 'last_mail'))
self.assertEqual(len(PasswordToken.query.all()), 0)
# Regression test for #31
def test_forgot_password_invalid_user(self):
r = self.client.post(path=url_for('selfservice.forgot_password'),
data={'loginname': '=', 'mail': 'test@example.com'}, follow_redirects=True)
dump('forgot_password_submit_invalid_user', r)
self.assertEqual(r.status_code, 200)
self.assertFalse(hasattr(self.app, 'last_mail'))
self.assertEqual(len(PasswordToken.query.all()), 0)
def test_forgot_password_wrong_user(self):
user = self.get_user()
r = self.client.get(path=url_for('selfservice.forgot_password'))
self.assertEqual(r.status_code, 200)
user = self.get_user()
user.is_deactivated = True
db.session.commit()
r = self.client.post(path=url_for('selfservice.forgot_password'),
data={'loginname': user.loginname, 'mail': user.primary_email.address}, follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertFalse(hasattr(self.app, 'last_mail'))
self.assertEqual(len(PasswordToken.query.all()), 0)
def test_token_password(self):
user = self.get_user()
token = PasswordToken(user=user)
db.session.add(token)
db.session.commit()
self.assertFalse(token.expired)
r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token=token.token), follow_redirects=True)
dump('token_password', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token=token.token),
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_submit', r)
self.assertEqual(r.status_code, 200)
self.assertTrue(self.get_user().password.verify('newpassword'))
def test_token_password_emptydb(self):
user = self.get_user()
r = self.client.get(path=url_for('selfservice.token_password', token_id=1, token='A'*128), follow_redirects=True)
dump('token_password_emptydb', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'Link invalid or expired', r.data)
r = self.client.post(path=url_for('selfservice.token_password', token_id=1, token='A'*128),
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_emptydb_submit', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'Link invalid or expired', r.data)
self.assertTrue(self.get_user().password.verify('userpassword'))
def test_token_password_invalid(self):
user = self.get_user()
token = PasswordToken(user=user)
db.session.add(token)
db.session.commit()
r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token='A'*128), follow_redirects=True)
dump('token_password_invalid', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'Link invalid or expired', r.data)
r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token='A'*128),
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_invalid_submit', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'Link invalid or expired', r.data)
self.assertTrue(self.get_user().password.verify('userpassword'))
def test_token_password_expired(self):
user = self.get_user()
token = PasswordToken(user=user, created=(datetime.datetime.utcnow() - datetime.timedelta(days=10)))
db.session.add(token)
db.session.commit()
self.assertTrue(token.expired)
r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token=token.token), follow_redirects=True)
dump('token_password_invalid_expired', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'Link invalid or expired', r.data)
r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token=token.token),
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_invalid_expired_submit', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'Link invalid or expired', r.data)
self.assertTrue(self.get_user().password.verify('userpassword'))
def test_token_password_different_passwords(self):
user = self.get_user()
token = PasswordToken(user=user)
db.session.add(token)
db.session.commit()
r = self.client.get(path=url_for('selfservice.token_password', token_id=token.id, token=token.token), follow_redirects=True)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('selfservice.token_password', token_id=token.id, token=token.token),
data={'password1': 'newpassword', 'password2': 'differentpassword'}, follow_redirects=True)
dump('token_password_different_passwords_submit', r)
self.assertEqual(r.status_code, 200)
self.assertTrue(self.get_user().password.verify('userpassword'))
def get_fido2_test_cred(self):
try:
from uffd.fido2_compat import AttestedCredentialData
except ImportError:
self.skipTest('fido2 could not be imported')
# Example public key from webauthn spec 6.5.1.1
return AttestedCredentialData(bytes.fromhex('00000000000000000000000000000000'+'0040'+'053cbcc9d37a61d3bac87cdcc77ee326256def08ab15775d3a720332e4101d14fae95aeee3bc9698781812e143c0597dc6e180595683d501891e9dd030454c0a'+'A501020326200121582065eda5a12577c2bae829437fe338701a10aaa375e1bb5b5de108de439c08551d2258201e52ed75701163f7f9e40ddf9f341b3dc9ba860af7e0ca7ca7e9eecd0084d19c'))
class TestMfaViews(UffdTestCase):
def setUp(self):
super().setUp()
db.session.add(RecoveryCodeMethod(user=self.get_admin()))
db.session.add(TOTPMethod(user=self.get_admin(), name='Admin Phone'))
# We don't want to skip all tests only because fido2 is not installed!
#db.session.add(WebauthnMethod(user=get_testadmin(), cred=get_fido2_test_cred(self), name='Admin FIDO2 dongle'))
db.session.commit()
def add_recovery_codes(self, count=10):
user = self.get_user()
for _ in range(count):
db.session.add(RecoveryCodeMethod(user=user))
db.session.commit()
def add_totp(self):
db.session.add(TOTPMethod(user=self.get_user(), name='My phone'))
db.session.commit()
def add_webauthn(self):
db.session.add(WebauthnMethod(user=self.get_user(), cred=get_fido2_test_cred(self), name='My FIDO2 dongle'))
db.session.commit()
def test_setup_disabled(self):
self.login_as('user')
r = self.client.get(path=url_for('selfservice.setup_mfa'), follow_redirects=True)
dump('mfa_setup_disabled', r)
self.assertEqual(r.status_code, 200)
def test_setup_recovery_codes(self):
self.login_as('user')
self.add_recovery_codes()
r = self.client.get(path=url_for('selfservice.setup_mfa'), follow_redirects=True)
dump('mfa_setup_only_recovery_codes', r)
self.assertEqual(r.status_code, 200)
def test_setup_enabled(self):
self.login_as('user')
self.add_recovery_codes()
self.add_totp()
self.add_webauthn()
r = self.client.get(path=url_for('selfservice.setup_mfa'), follow_redirects=True)
dump('mfa_setup_enabled', r)
self.assertEqual(r.status_code, 200)
def test_setup_few_recovery_codes(self):
self.login_as('user')
self.add_totp()
self.add_recovery_codes(1)
r = self.client.get(path=url_for('selfservice.setup_mfa'), follow_redirects=True)
dump('mfa_setup_few_recovery_codes', r)
self.assertEqual(r.status_code, 200)
def test_setup_no_recovery_codes(self):
self.login_as('user')
self.add_totp()
r = self.client.get(path=url_for('selfservice.setup_mfa'), follow_redirects=True)
dump('mfa_setup_no_recovery_codes', r)
self.assertEqual(r.status_code, 200)
def test_disable(self):
baserole = Role(name='baserole', is_default=True)
db.session.add(baserole)
baserole.groups[self.get_access_group()] = RoleGroup()
db.session.commit()
self.login_as('user')
self.add_recovery_codes()
self.add_totp()
admin_methods = len(MFAMethod.query.filter_by(user=self.get_admin()).all())
r = self.client.get(path=url_for('selfservice.disable_mfa'), follow_redirects=True)
dump('mfa_disable', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('selfservice.disable_mfa_confirm'), follow_redirects=True)
dump('mfa_disable_submit', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(MFAMethod.query.filter_by(user=request.user).all()), 0)
self.assertEqual(len(MFAMethod.query.filter_by(user=self.get_admin()).all()), admin_methods)
def test_disable_recovery_only(self):
baserole = Role(name='baserole', is_default=True)
db.session.add(baserole)
baserole.groups[self.get_access_group()] = RoleGroup()
db.session.commit()
self.login_as('user')
self.add_recovery_codes()
admin_methods = len(MFAMethod.query.filter_by(user=self.get_admin()).all())
self.assertNotEqual(len(MFAMethod.query.filter_by(user=request.user).all()), 0)
r = self.client.get(path=url_for('selfservice.disable_mfa'), follow_redirects=True)
dump('mfa_disable_recovery_only', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('selfservice.disable_mfa_confirm'), follow_redirects=True)
dump('mfa_disable_recovery_only_submit', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(MFAMethod.query.filter_by(user=request.user).all()), 0)
self.assertEqual(len(MFAMethod.query.filter_by(user=self.get_admin()).all()), admin_methods)
def test_setup_recovery(self):
self.login_as('user')
self.assertEqual(len(RecoveryCodeMethod.query.filter_by(user=request.user).all()), 0)
r = self.client.post(path=url_for('selfservice.setup_mfa_recovery'), follow_redirects=True)
dump('mfa_setup_recovery', r)
self.assertEqual(r.status_code, 200)
methods = RecoveryCodeMethod.query.filter_by(user=request.user).all()
self.assertNotEqual(len(methods), 0)
r = self.client.post(path=url_for('selfservice.setup_mfa_recovery'), follow_redirects=True)
dump('mfa_setup_recovery_reset', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(RecoveryCodeMethod.query.filter_by(id=methods[0].id).all()), 0)
self.assertNotEqual(len(methods), 0)
def test_setup_totp(self):
self.login_as('user')
self.add_recovery_codes()
r = self.client.get(path=url_for('selfservice.setup_mfa_totp', name='My TOTP Authenticator'), follow_redirects=True)
dump('mfa_setup_totp', r)
self.assertEqual(r.status_code, 200)
self.assertNotEqual(len(session.get('mfa_totp_key', '')), 0)
def test_setup_totp_without_recovery(self):
self.login_as('user')
r = self.client.get(path=url_for('selfservice.setup_mfa_totp', name='My TOTP Authenticator'), follow_redirects=True)
dump('mfa_setup_totp_without_recovery', r)
self.assertEqual(r.status_code, 200)
def test_setup_totp_finish(self):
baserole = Role(name='baserole', is_default=True)
db.session.add(baserole)
baserole.groups[self.get_access_group()] = RoleGroup()
db.session.commit()
self.login_as('user')
self.add_recovery_codes()
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
r = self.client.get(path=url_for('selfservice.setup_mfa_totp', name='My TOTP Authenticator'), follow_redirects=True)
method = TOTPMethod(request.user, key=session.get('mfa_totp_key', ''))
code = _hotp(int(time.time()/30), method.raw_key)
r = self.client.post(path=url_for('selfservice.setup_mfa_totp_finish', name='My TOTP Authenticator'), data={'code': code}, follow_redirects=True)
dump('mfa_setup_totp_finish', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 1)
def test_setup_totp_finish_without_recovery(self):
self.login_as('user')
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
r = self.client.get(path=url_for('selfservice.setup_mfa_totp', name='My TOTP Authenticator'), follow_redirects=True)
method = TOTPMethod(request.user, key=session.get('mfa_totp_key', ''))
code = _hotp(int(time.time()/30), method.raw_key)
r = self.client.post(path=url_for('selfservice.setup_mfa_totp_finish', name='My TOTP Authenticator'), data={'code': code}, follow_redirects=True)
dump('mfa_setup_totp_finish_without_recovery', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
def test_setup_totp_finish_wrong_code(self):
self.login_as('user')
self.add_recovery_codes()
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
r = self.client.get(path=url_for('selfservice.setup_mfa_totp', name='My TOTP Authenticator'), follow_redirects=True)
method = TOTPMethod(request.user, key=session.get('mfa_totp_key', ''))
code = _hotp(int(time.time()/30), method.raw_key)
code = str(int(code[0])+1)[-1] + code[1:]
r = self.client.post(path=url_for('selfservice.setup_mfa_totp_finish', name='My TOTP Authenticator'), data={'code': code}, follow_redirects=True)
dump('mfa_setup_totp_finish_wrong_code', r)
self.assertEqual(r.status_code, 200)
db_flush()
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
def test_setup_totp_finish_empty_code(self):
self.login_as('user')
self.add_recovery_codes()
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
r = self.client.get(path=url_for('selfservice.setup_mfa_totp', name='My TOTP Authenticator'), follow_redirects=True)
r = self.client.post(path=url_for('selfservice.setup_mfa_totp_finish', name='My TOTP Authenticator'), data={'code': ''}, follow_redirects=True)
dump('mfa_setup_totp_finish_empty_code', r)
self.assertEqual(r.status_code, 200)
db_flush()
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
def test_delete_totp(self):
baserole = Role(name='baserole', is_default=True)
db.session.add(baserole)
baserole.groups[self.get_access_group()] = RoleGroup()
db.session.commit()
self.login_as('user')
self.add_recovery_codes()
self.add_totp()
method = TOTPMethod(request.user, name='test')
db.session.add(method)
db.session.commit()
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 2)
r = self.client.get(path=url_for('selfservice.delete_mfa_totp', id=method.id), follow_redirects=True)
dump('mfa_delete_totp', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(TOTPMethod.query.filter_by(id=method.id).all()), 0)
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 1)
# TODO: webauthn setup tests
from flask import url_for
from uffd.database import db
from uffd.models import Service, ServiceUser, OAuth2Client, APIClient, RemailerMode
from tests.utils import dump, UffdTestCase
class TestServices(UffdTestCase):
def setUpApp(self):
self.app.config['SERVICES'] = [
{
'title': 'Service Title',
'subtitle': 'Service Subtitle',
'description': 'Short description of the service as plain text',
'url': 'https://example.com/',
'logo_url': '/static/fairy-dust-color.png',
'required_group': 'users',
'permission_levels': [
{'name': 'Moderator', 'required_group': 'moderators'},
{'name': 'Admin', 'required_group': 'uffd_admin'},
],
'confidential': True,
'groups': [
{'name': 'Group "crew_crew"', 'required_group': 'users'},
{'name': 'Group "crew_logistik"', 'required_group': 'uffd_admin'},
],
'infos': [
{'title': 'Documentation', 'html': '<p>Some information about the service as html</p>', 'required_group': 'users'},
],
'links': [
{'title': 'Link to an external site', 'url': '#', 'required_group': 'users'},
],
},
{
'title': 'Minimal Service Title',
}
]
self.app.config['SERVICES_PUBLIC'] = True
def test_overview(self):
r = self.client.get(path=url_for('service.overview'))
dump('service_overview_guest', r)
self.assertEqual(r.status_code, 200)
self.assertNotIn(b'https://example.com/', r.data)
self.login_as('user')
r = self.client.get(path=url_for('service.overview'))
dump('service_overview_user', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'https://example.com/', r.data)
def test_overview_disabled(self):
self.app.config['SERVICES'] = []
# Should return login page
r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
dump('service_overview_disabled_guest', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'name="password"', r.data)
self.login_as('user')
# Should return access denied page
r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
dump('service_overview_disabled_user', r)
self.assertEqual(r.status_code, 403)
self.login_as('admin')
# Should return (empty) overview page
r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
dump('service_overview_disabled_admin', r)
self.assertEqual(r.status_code, 200)
def test_overview_nonpublic(self):
self.app.config['SERVICES_PUBLIC'] = False
# Should return login page
r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
dump('service_overview_nonpublic_guest', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'name="password"', r.data)
self.login_as('user')
# Should return overview page
r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
dump('service_overview_nonpublic_user', r)
self.assertEqual(r.status_code, 200)
self.login_as('admin')
# Should return overview page
r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
dump('service_overview_nonpublic_admin', r)
self.assertEqual(r.status_code, 200)
def test_overview_public(self):
# Should return overview page
r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
dump('service_overview_public_guest', r)
self.assertEqual(r.status_code, 200)
self.login_as('user')
# Should return overview page
r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
dump('service_overview_public_user', r)
self.assertEqual(r.status_code, 200)
self.login_as('admin')
# Should return overview page
r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
dump('service_overview_public_admin', r)
self.assertEqual(r.status_code, 200)
class TestServiceAdminViews(UffdTestCase):
def setUpDB(self):
db.session.add(Service(
name='test1',
oauth2_clients=[OAuth2Client(client_id='test1_oauth2_client1', client_secret='test'), OAuth2Client(client_id='test1_oauth2_client2', client_secret='test')],
api_clients=[APIClient(auth_username='test1_api_client1', auth_password='test'), APIClient(auth_username='test1_api_client2', auth_password='test')],
))
db.session.add(Service(name='test2'))
db.session.add(Service(name='test3'))
db.session.commit()
self.service_id = Service.query.filter_by(name='test1').one().id
def test_index(self):
self.login_as('admin')
r = self.client.get(path=url_for('service.index'), follow_redirects=True)
dump('service_index', r)
self.assertEqual(r.status_code, 200)
def test_show(self):
self.login_as('admin')
r = self.client.get(path=url_for('service.show', id=self.service_id), follow_redirects=True)
dump('service_show', r)
self.assertEqual(r.status_code, 200)
def test_new(self):
self.login_as('admin')
r = self.client.get(path=url_for('service.show'), follow_redirects=True)
dump('service_new', r)
self.assertEqual(r.status_code, 200)
def test_new_submit(self):
self.login_as('admin')
r = self.client.post(
path=url_for('service.edit_submit'),
follow_redirects=True,
data={
'name': 'new-service',
'access-group': '',
'remailer-mode': 'DISABLED',
'remailer-overwrite-mode': 'ENABLED_V2',
'remailer-overwrite-users': '',
},
)
dump('service_new_submit', r)
self.assertEqual(r.status_code, 200)
service = Service.query.filter_by(name='new-service').one_or_none()
self.assertIsNotNone(service)
self.assertEqual(service.limit_access, True)
self.assertEqual(service.access_group, None)
self.assertEqual(service.remailer_mode, RemailerMode.DISABLED)
self.assertEqual(service.enable_email_preferences, False)
def test_edit(self):
self.login_as('admin')
r = self.client.post(
path=url_for('service.edit_submit', id=self.service_id),
follow_redirects=True,
data={
'name': 'new-name',
'access-group': '',
'remailer-mode': 'DISABLED',
'remailer-overwrite-mode': 'ENABLED_V2',
'remailer-overwrite-users': '',
},
)
dump('service_edit_submit', r)
self.assertEqual(r.status_code, 200)
service = Service.query.get(self.service_id)
self.assertEqual(service.name, 'new-name')
self.assertEqual(service.limit_access, True)
self.assertEqual(service.access_group, None)
self.assertEqual(service.remailer_mode, RemailerMode.DISABLED)
self.assertEqual(service.enable_email_preferences, False)
self.assertEqual(service.hide_deactivated_users, False)
def test_edit_access_all(self):
self.login_as('admin')
r = self.client.post(
path=url_for('service.edit_submit', id=self.service_id),
follow_redirects=True,
data={
'name': 'test1',
'access-group': 'all',
'remailer-mode': 'DISABLED',
'remailer-overwrite-mode': 'ENABLED_V2',
'remailer-overwrite-users': '',
},
)
self.assertEqual(r.status_code, 200)
service = Service.query.get(self.service_id)
self.assertEqual(service.limit_access, False)
self.assertEqual(service.access_group, None)
def test_edit_access_group(self):
self.login_as('admin')
r = self.client.post(
path=url_for('service.edit_submit', id=self.service_id),
follow_redirects=True,
data={
'name': 'test1',
'access-group': str(self.get_users_group().id),
'remailer-mode': 'DISABLED',
'remailer-overwrite-mode': 'ENABLED_V2',
'remailer-overwrite-users': '',
},
)
self.assertEqual(r.status_code, 200)
service = Service.query.get(self.service_id)
self.assertEqual(service.limit_access, True)
self.assertEqual(service.access_group, self.get_users_group())
def test_edit_hide_deactivated_users(self):
self.login_as('admin')
r = self.client.post(
path=url_for('service.edit_submit', id=self.service_id),
follow_redirects=True,
data={
'name': 'test1',
'access-group': '',
'remailer-mode': 'DISABLED',
'remailer-overwrite-mode': 'ENABLED_V2',
'remailer-overwrite-users': '',
'hide_deactivated_users': '1',
},
)
self.assertEqual(r.status_code, 200)
service = Service.query.get(self.service_id)
self.assertEqual(service.hide_deactivated_users, True)
def test_edit_email_preferences(self):
self.login_as('admin')
r = self.client.post(
path=url_for('service.edit_submit', id=self.service_id),
follow_redirects=True,
data={
'name': 'test1',
'access-group': '',
'remailer-mode': 'DISABLED',
'remailer-overwrite-mode': 'ENABLED_V2',
'remailer-overwrite-users': '',
'enable_email_preferences': '1',
},
)
self.assertEqual(r.status_code, 200)
service = Service.query.get(self.service_id)
self.assertEqual(service.enable_email_preferences, True)
def test_edit_remailer_mode(self):
self.login_as('admin')
r = self.client.post(
path=url_for('service.edit_submit', id=self.service_id),
follow_redirects=True,
data={
'name': 'test1',
'access-group': '',
'remailer-mode': 'ENABLED_V2',
'remailer-overwrite-mode': 'ENABLED_V2',
'remailer-overwrite-users': '',
},
)
self.assertEqual(r.status_code, 200)
service = Service.query.get(self.service_id)
self.assertEqual(service.remailer_mode, RemailerMode.ENABLED_V2)
def test_edit_remailer_overwrite_enable(self):
self.login_as('admin')
r = self.client.post(
path=url_for('service.edit_submit', id=self.service_id),
follow_redirects=True,
data={
'name': 'test1',
'access-group': '',
'remailer-mode': 'DISABLED',
'remailer-overwrite-mode': 'ENABLED_V2',
'remailer-overwrite-users': 'testuser, testadmin',
},
)
self.assertEqual(r.status_code, 200)
service_user1 = ServiceUser.query.get((self.service_id, self.get_user().id))
service_user2 = ServiceUser.query.get((self.service_id, self.get_admin().id))
self.assertEqual(service_user1.remailer_overwrite_mode, RemailerMode.ENABLED_V2)
self.assertEqual(service_user2.remailer_overwrite_mode, RemailerMode.ENABLED_V2)
self.assertEqual(
set(ServiceUser.query.filter(
ServiceUser.service_id == self.service_id,
ServiceUser.remailer_overwrite_mode != None
).all()),
{service_user1, service_user2}
)
def test_edit_remailer_overwrite_change(self):
service_user = ServiceUser.query.get((self.service_id, self.get_user().id))
service_user.remailer_overwrite_mode = RemailerMode.ENABLED_V2
db.session.commit()
self.login_as('admin')
r = self.client.post(
path=url_for('service.edit_submit', id=self.service_id),
follow_redirects=True,
data={
'name': 'test1',
'access-group': '',
'remailer-mode': 'DISABLED',
'remailer-overwrite-mode': 'ENABLED_V1',
'remailer-overwrite-users': ', testadmin',
},
)
self.assertEqual(r.status_code, 200)
service_user = ServiceUser.query.get((self.service_id, self.get_admin().id))
self.assertEqual(service_user.remailer_overwrite_mode, RemailerMode.ENABLED_V1)
self.assertEqual(
ServiceUser.query.filter(
ServiceUser.service_id == self.service_id,
ServiceUser.remailer_overwrite_mode != None
).all(),
[service_user]
)
def test_edit_remailer_overwrite_disable(self):
service_user = ServiceUser.query.get((self.service_id, self.get_user().id))
service_user.remailer_overwrite_mode = RemailerMode.ENABLED_V2
db.session.commit()
self.login_as('admin')
r = self.client.post(
path=url_for('service.edit_submit', id=self.service_id),
follow_redirects=True,
data={
'name': 'test1',
'access-group': '',
'remailer-mode': 'DISABLED',
'remailer-overwrite-mode': 'ENABLED_V2',
'remailer-overwrite-users': '',
},
)
self.assertEqual(r.status_code, 200)
self.assertEqual(
ServiceUser.query.filter(
ServiceUser.service_id == self.service_id,
ServiceUser.remailer_overwrite_mode != None
).all(),
[]
)
def test_delete(self):
self.login_as('admin')
r = self.client.get(path=url_for('service.delete', id=self.service_id), follow_redirects=True)
dump('service_delete', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(Service.query.get(self.service_id))
import time
import unittest
from flask import url_for, request
from uffd.database import db
from uffd.password_hash import PlaintextPasswordHash
from uffd.models import DeviceLoginConfirmation, Service, OAuth2Client, OAuth2DeviceLoginInitiation, User, RecoveryCodeMethod, TOTPMethod
from uffd.models.mfa import _hotp
from uffd.views.session import login_required
from tests.utils import dump, UffdTestCase, db_flush
class TestSession(UffdTestCase):
def setUpApp(self):
self.app.config['SESSION_LIFETIME_SECONDS'] = 2
@self.app.route('/test_login_required')
@login_required()
def test_login_required():
return 'SUCCESS ' + request.user.loginname, 200
@self.app.route('/test_group_required1')
@login_required(lambda: request.user.is_in_group('users'))
def test_group_required1():
return 'SUCCESS', 200
@self.app.route('/test_group_required2')
@login_required(lambda: request.user.is_in_group('notagroup'))
def test_group_required2():
return 'SUCCESS', 200
def setUp(self):
super().setUp()
self.assertIsNone(request.user)
def login(self):
self.login_as('user')
self.assertIsNotNone(request.user)
def assertLoggedIn(self):
self.assertEqual(self.client.get(path=url_for('test_login_required'), follow_redirects=True).data, b'SUCCESS testuser')
def assertLoggedOut(self):
self.assertNotIn(b'SUCCESS', self.client.get(path=url_for('test_login_required'), follow_redirects=True).data)
def test_login(self):
self.assertLoggedOut()
r = self.client.get(path=url_for('session.login'), follow_redirects=True)
dump('login', r)
self.assertEqual(r.status_code, 200)
r = self.login_as('user')
dump('login_post', r)
self.assertEqual(r.status_code, 200)
self.assertLoggedIn()
def test_login_password_rehash(self):
self.get_user().password = PlaintextPasswordHash.from_password('userpassword')
db.session.commit()
self.assertIsInstance(self.get_user().password, PlaintextPasswordHash)
db_flush()
r = self.login_as('user')
self.assertEqual(r.status_code, 200)
self.assertLoggedIn()
self.assertIsInstance(self.get_user().password, User.password.method_cls)
self.assertTrue(self.get_user().password.verify('userpassword'))
def test_titlecase_password(self):
r = self.client.post(path=url_for('session.login'),
data={'loginname': self.get_user().loginname.title(), 'password': 'userpassword'}, follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertLoggedIn()
def test_redirect(self):
r = self.login_as('user', ref=url_for('test_login_required'))
self.assertEqual(r.status_code, 200)
self.assertEqual(r.data, b'SUCCESS testuser')
def test_wrong_password(self):
r = self.client.post(path=url_for('session.login'),
data={'loginname': self.get_user().loginname, 'password': 'wrongpassword'},
follow_redirects=True)
dump('login_wrong_password', r)
self.assertEqual(r.status_code, 200)
self.assertLoggedOut()
def test_empty_password(self):
r = self.client.post(path=url_for('session.login'),
data={'loginname': self.get_user().loginname, 'password': ''}, follow_redirects=True)
dump('login_empty_password', r)
self.assertEqual(r.status_code, 200)
self.assertLoggedOut()
# Regression test for #100 (uncatched LDAPSASLPrepError)
def test_saslprep_invalid_password(self):
r = self.client.post(path=url_for('session.login'),
data={'loginname': 'testuser', 'password': 'wrongpassword\n'}, follow_redirects=True)
dump('login_saslprep_invalid_password', r)
self.assertEqual(r.status_code, 200)
self.assertLoggedOut()
def test_wrong_user(self):
r = self.client.post(path=url_for('session.login'),
data={'loginname': 'nouser', 'password': 'userpassword'},
follow_redirects=True)
dump('login_wrong_user', r)
self.assertEqual(r.status_code, 200)
self.assertLoggedOut()
def test_empty_user(self):
r = self.client.post(path=url_for('session.login'),
data={'loginname': '', 'password': 'userpassword'}, follow_redirects=True)
dump('login_empty_user', r)
self.assertEqual(r.status_code, 200)
self.assertLoggedOut()
def test_no_access(self):
r = self.client.post(path=url_for('session.login'),
data={'loginname': 'testservice', 'password': 'servicepassword'}, follow_redirects=True)
dump('login_no_access', r)
self.assertEqual(r.status_code, 200)
self.assertLoggedOut()
def test_deactivated(self):
self.get_user().is_deactivated = True
db.session.commit()
r = self.login_as('user')
dump('login_deactivated', r)
self.assertEqual(r.status_code, 200)
self.assertLoggedOut()
def test_deactivated_after_login(self):
self.login_as('user')
self.get_user().is_deactivated = True
db.session.commit()
self.assertLoggedOut()
def test_group_required(self):
self.login()
self.assertEqual(self.client.get(path=url_for('test_group_required1'),
follow_redirects=True).data, b'SUCCESS')
self.assertNotEqual(self.client.get(path=url_for('test_group_required2'),
follow_redirects=True).data, b'SUCCESS')
def test_logout(self):
self.login()
r = self.client.get(path=url_for('session.logout'), follow_redirects=True)
dump('logout', r)
self.assertEqual(r.status_code, 200)
self.assertLoggedOut()
def test_timeout(self):
self.login()
time.sleep(3)
self.assertLoggedOut()
def test_ratelimit(self):
for i in range(20):
self.client.post(path=url_for('session.login'),
data={'loginname': self.get_user().loginname,
'password': 'wrongpassword_%i'%i}, follow_redirects=True)
r = self.login_as('user')
dump('login_ratelimit', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
def test_deviceauth(self):
oauth2_client = OAuth2Client(service=Service(name='test', limit_access=False), client_id='test', client_secret='testsecret', redirect_uris=['http://localhost:5009/callback', 'http://localhost:5009/callback2'])
initiation = OAuth2DeviceLoginInitiation(client=oauth2_client)
db.session.add(initiation)
db.session.commit()
code = initiation.code
self.login()
r = self.client.get(path=url_for('session.deviceauth'), follow_redirects=True)
dump('deviceauth', r)
self.assertEqual(r.status_code, 200)
r = self.client.get(path=url_for('session.deviceauth', **{'initiation-code': code}), follow_redirects=True)
dump('deviceauth_check', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'test', r.data)
r = self.client.post(path=url_for('session.deviceauth_submit'), data={'initiation-code': code}, follow_redirects=True)
dump('deviceauth_submit', r)
self.assertEqual(r.status_code, 200)
initiation = OAuth2DeviceLoginInitiation.query.filter_by(code=code).one()
self.assertEqual(len(initiation.confirmations), 1)
self.assertEqual(initiation.confirmations[0].session.user.loginname, 'testuser')
self.assertIn(initiation.confirmations[0].code.encode(), r.data)
r = self.client.get(path=url_for('session.deviceauth_finish'), follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertEqual(DeviceLoginConfirmation.query.all(), [])
class TestMfaViews(UffdTestCase):
def add_recovery_codes(self, count=10):
user = self.get_user()
for _ in range(count):
db.session.add(RecoveryCodeMethod(user=user))
db.session.commit()
def add_totp(self):
db.session.add(TOTPMethod(user=self.get_user(), name='My phone'))
db.session.commit()
def test_auth_integration(self):
self.add_recovery_codes()
self.add_totp()
db.session.commit()
self.assertIsNone(request.user)
r = self.login_as('user')
dump('mfa_auth_redirected', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'/mfa/auth', r.data)
self.assertIsNone(request.user)
r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False)
dump('mfa_auth', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
def test_auth_disabled(self):
self.assertIsNone(request.user)
self.login_as('user')
r = self.client.get(path=url_for('session.mfa_auth', ref='/redirecttarget'), follow_redirects=False)
self.assertEqual(r.status_code, 302)
self.assertTrue(r.location.endswith('/redirecttarget'))
self.assertIsNotNone(request.user)
def test_auth_recovery_only(self):
self.add_recovery_codes()
self.assertIsNone(request.user)
self.login_as('user')
r = self.client.get(path=url_for('session.mfa_auth', ref='/redirecttarget'), follow_redirects=False)
self.assertEqual(r.status_code, 302)
self.assertTrue(r.location.endswith('/redirecttarget'))
self.assertIsNotNone(request.user)
def test_auth_recovery_code(self):
self.add_recovery_codes()
self.add_totp()
method = RecoveryCodeMethod(user=self.get_user())
db.session.add(method)
db.session.commit()
method_id = method.id
self.login_as('user')
r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False)
dump('mfa_auth_recovery_code', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
r = self.client.post(path=url_for('session.mfa_auth_finish', ref='/redirecttarget'), data={'code': method.code_value})
self.assertEqual(r.status_code, 302)
self.assertTrue(r.location.endswith('/redirecttarget'))
self.assertIsNotNone(request.user)
self.assertEqual(len(RecoveryCodeMethod.query.filter_by(id=method_id).all()), 0)
def test_auth_totp_code(self):
self.add_recovery_codes()
self.add_totp()
method = TOTPMethod(user=self.get_user(), name='testname')
raw_key = method.raw_key
db.session.add(method)
db.session.commit()
self.login_as('user')
r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False)
dump('mfa_auth_totp_code', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
code = _hotp(int(time.time()/30), raw_key)
r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': code}, follow_redirects=True)
dump('mfa_auth_totp_code_submit', r)
self.assertEqual(r.status_code, 200)
self.assertIsNotNone(request.user)
def test_auth_totp_code_reuse(self):
self.add_recovery_codes()
self.add_totp()
method = TOTPMethod(user=self.get_user(), name='testname')
raw_key = method.raw_key
db.session.add(method)
db.session.commit()
self.login_as('user')
r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
code = _hotp(int(time.time()/30), raw_key)
r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': code}, follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertIsNotNone(request.user)
self.login_as('user')
r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': code}, follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
def test_auth_empty_code(self):
self.add_recovery_codes()
self.add_totp()
self.login_as('user')
r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': ''}, follow_redirects=True)
dump('mfa_auth_empty_code', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
def test_auth_invalid_code(self):
self.add_recovery_codes()
self.add_totp()
method = TOTPMethod(user=self.get_user(), name='testname')
raw_key = method.raw_key
db.session.add(method)
db.session.commit()
self.login_as('user')
r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
code = _hotp(int(time.time()/30), raw_key)
code = str(int(code[0])+1)[-1] + code[1:]
r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': code}, follow_redirects=True)
dump('mfa_auth_invalid_code', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
def test_auth_ratelimit(self):
self.add_recovery_codes()
self.add_totp()
method = TOTPMethod(user=self.get_user(), name='testname')
raw_key = method.raw_key
db.session.add(method)
db.session.commit()
self.login_as('user')
self.assertIsNone(request.user)
code = _hotp(int(time.time()/30), raw_key)
inv_code = str(int(code[0])+1)[-1] + code[1:]
for i in range(20):
r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': inv_code}, follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': code}, follow_redirects=True)
dump('mfa_auth_ratelimit', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
# TODO: webauthn auth tests
import datetime
from flask import url_for, request
from uffd.database import db
from uffd.models import User, Signup, Role, RoleGroup, FeatureFlag
from tests.utils import dump, UffdTestCase, db_flush
def refetch_signup(signup):
db.session.add(signup)
db.session.commit()
id = signup.id
db.session.expunge(signup)
return Signup.query.get(id)
class TestSignupViews(UffdTestCase):
def setUpApp(self):
self.app.config['SELF_SIGNUP'] = True
self.app.last_mail = None
def test_signup(self):
r = self.client.get(path=url_for('signup.signup_start'), follow_redirects=True)
dump('test_signup', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(Signup.query.filter_by(loginname='newuser').all(), [])
r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True,
data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'new@example.com',
'password1': 'notsecret', 'password2': 'notsecret'})
dump('test_signup_submit', r)
self.assertEqual(r.status_code, 200)
db_flush()
signups = Signup.query.filter_by(loginname='newuser').all()
self.assertEqual(len(signups), 1)
signup = signups[0]
self.assertEqual(signup.loginname, 'newuser')
self.assertEqual(signup.displayname, 'New User')
self.assertEqual(signup.mail, 'new@example.com')
self.assertIn(signup.token, str(self.app.last_mail.get_content()))
self.assertTrue(signup.password.verify('notsecret'))
self.assertTrue(signup.validate()[0])
def test_signup_disabled(self):
self.app.config['SELF_SIGNUP'] = False
r = self.client.get(path=url_for('signup.signup_start'), follow_redirects=True)
dump('test_signup_disabled', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(Signup.query.filter_by(loginname='newuser').all(), [])
r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True,
data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'new@example.com',
'password1': 'notsecret', 'password2': 'notsecret'})
dump('test_signup_submit_disabled', r)
self.assertEqual(r.status_code, 200)
db_flush()
self.assertEqual(Signup.query.filter_by(loginname='newuser').all(), [])
def test_signup_wrongpassword(self):
r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True,
data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'new@example.com',
'password1': 'notsecret', 'password2': 'notthesame'})
dump('test_signup_wrongpassword', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(self.app.last_mail)
def test_signup_invalid(self):
r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True,
data={'loginname': '', 'displayname': 'New User', 'mail': 'new@example.com',
'password1': 'notsecret', 'password2': 'notsecret'})
dump('test_signup_invalid', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(self.app.last_mail)
def test_signup_mailerror(self):
self.app.config['MAIL_SKIP_SEND'] = 'fail'
r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True,
data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'new@example.com',
'password1': 'notsecret', 'password2': 'notsecret'})
dump('test_signup_mailerror', r)
self.assertEqual(r.status_code, 200)
def test_signup_hostlimit(self):
# Each signup_submit request leaks information about the existance of a
# user with a specific loginname. A host/network-based ratelimit should
# make enumerations of all user accounts difficult/next to impossible.
# Additionally each successful requests sends a mail to an
# attacker-controlled address. The ratelimit limits the applicability for
# spamming.
for i in range(20):
r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True,
data={'loginname': 'newuser%d'%i, 'displayname': 'New User', 'mail': 'test%d@example.com'%i,
'password1': 'notsecret', 'password2': 'notsecret'})
self.assertEqual(r.status_code, 200)
self.app.last_mail = None
r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True,
data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'new@example.com',
'password1': 'notsecret', 'password2': 'notsecret'})
dump('test_signup_hostlimit', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(Signup.query.filter_by(loginname='newuser').all(), [])
self.assertIsNone(self.app.last_mail)
def test_signup_maillimit(self):
for i in range(3):
r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True,
data={'loginname': 'newuser%d'%i, 'displayname': 'New User', 'mail': 'new@example.com',
'password1': 'notsecret', 'password2': 'notsecret'})
self.assertEqual(r.status_code, 200)
self.app.last_mail = None
r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True,
data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'new@example.com',
'password1': 'notsecret', 'password2': 'notsecret'})
dump('test_signup_maillimit', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(self.app.last_mail)
# Check that we did not hit the host limit
r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True,
data={'loginname': 'differentuser', 'displayname': 'New User',
'mail': 'different@mailaddress.com', 'password1': 'notsecret',
'password2': 'notsecret'})
self.assertIsNotNone(self.app.last_mail)
def test_signup_check(self):
r = self.client.post(path=url_for('signup.signup_check'), follow_redirects=True,
data={'loginname': 'newuser'})
self.assertEqual(r.status_code, 200)
self.assertEqual(r.content_type, 'application/json')
self.assertEqual(r.json['status'], 'ok')
def test_signup_check_invalid(self):
r = self.client.post(path=url_for('signup.signup_check'), follow_redirects=True,
data={'loginname': ''})
self.assertEqual(r.status_code, 200)
self.assertEqual(r.content_type, 'application/json')
self.assertEqual(r.json['status'], 'invalid')
def test_signup_check_exists(self):
r = self.client.post(path=url_for('signup.signup_check'), follow_redirects=True,
data={'loginname': 'testuser'})
self.assertEqual(r.status_code, 200)
self.assertEqual(r.content_type, 'application/json')
self.assertEqual(r.json['status'], 'exists')
def test_signup_check_ratelimited(self):
for i in range(20):
r = self.client.post(path=url_for('signup.signup_check'), follow_redirects=True,
data={'loginname': 'newuser%d'%i})
self.assertEqual(r.status_code, 200)
self.assertEqual(r.content_type, 'application/json')
r = self.client.post(path=url_for('signup.signup_check'), follow_redirects=True,
data={'loginname': 'newuser'})
self.assertEqual(r.status_code, 200)
self.assertEqual(r.content_type, 'application/json')
self.assertEqual(r.json['status'], 'ratelimited')
def test_confirm(self):
baserole = Role(name='baserole', is_default=True)
db.session.add(baserole)
baserole.groups[self.get_access_group()] = RoleGroup()
db.session.commit()
signup = Signup(loginname='newuser', displayname='New User', mail='new@example.com', password='notsecret')
signup = refetch_signup(signup)
self.assertFalse(signup.completed)
self.assertIsNone(User.query.filter_by(loginname='newuser').one_or_none())
r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True)
dump('test_signup_confirm', r)
self.assertEqual(r.status_code, 200)
signup = refetch_signup(signup)
self.assertFalse(signup.completed)
self.assertIsNone(User.query.filter_by(loginname='newuser').one_or_none())
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_submit', r)
self.assertEqual(r.status_code, 200)
signup = refetch_signup(signup)
self.assertTrue(signup.completed)
self.assertEqual(signup.user.loginname, 'newuser')
self.assertEqual(signup.user.displayname, 'New User')
self.assertEqual(signup.user.primary_email.address, 'new@example.com')
self.assertTrue(User.query.filter_by(loginname='newuser').one_or_none().password.verify('notsecret'))
def test_confirm_loggedin(self):
baserole = Role(name='baserole', is_default=True)
db.session.add(baserole)
baserole.groups[self.get_access_group()] = RoleGroup()
db.session.commit()
self.login_as('user')
signup = Signup(loginname='newuser', displayname='New User', mail='new@example.com', password='notsecret')
signup = refetch_signup(signup)
self.assertFalse(signup.completed)
self.assertIsNotNone(request.user)
self.assertEqual(request.user.loginname, self.get_user().loginname)
r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
self.assertEqual(r.status_code, 200)
signup = refetch_signup(signup)
self.assertTrue(signup.completed)
self.assertIsNotNone(request.user)
self.assertEqual(request.user.loginname, 'newuser')
def test_confirm_notfound(self):
r = self.client.get(path=url_for('signup.signup_confirm', signup_id=1, token='notasignuptoken'), follow_redirects=True)
dump('test_signup_confirm_notfound', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=1, token='notasignuptoken'), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_submit_notfound', r)
self.assertEqual(r.status_code, 200)
def test_confirm_expired(self):
signup = Signup(loginname='newuser', displayname='New User', mail='new@example.com', password='notsecret')
signup.created = datetime.datetime.utcnow() - datetime.timedelta(hours=49)
signup = refetch_signup(signup)
r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True)
dump('test_signup_confirm_expired', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_submit_expired', r)
self.assertEqual(r.status_code, 200)
def test_confirm_completed(self):
signup = Signup(loginname=self.get_user().loginname, displayname='New User', mail='new@example.com', password='notsecret')
signup.user = self.get_user()
signup = refetch_signup(signup)
self.assertTrue(signup.completed)
r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True)
dump('test_signup_confirm_completed', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_submit_completed', r)
self.assertEqual(r.status_code, 200)
def test_confirm_wrongpassword(self):
signup = Signup(loginname='newuser', displayname='New User', mail='new@example.com', password='notsecret')
signup = refetch_signup(signup)
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'wrongpassword'})
dump('test_signup_confirm_wrongpassword', r)
signup = refetch_signup(signup)
self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed)
def test_confirm_error(self):
# finish returns None and error message (here: because the user already exists)
signup = Signup(loginname=self.get_user().loginname, displayname='New User', mail='new@example.com', password='notsecret')
signup = refetch_signup(signup)
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_error', r)
signup = refetch_signup(signup)
self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed)
def test_confirm_error_email_uniqueness(self):
FeatureFlag.unique_email_addresses.enable()
db.session.commit()
# finish returns None and error message (here: because the email address already exists)
# This case is interesting, because the error also invalidates the ORM session
signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret')
db.session.add(signup)
db.session.commit()
signup_id = signup.id
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_error_email_uniqueness', r)
self.assertEqual(r.status_code, 200)
signup = Signup.query.get(signup_id)
self.assertFalse(signup.completed)
def test_confirm_hostlimit(self):
for i in range(20):
signup = Signup(loginname='newuser', displayname='New User', mail='new@example.com', password='notsecret')
signup = refetch_signup(signup)
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i})
self.assertEqual(r.status_code, 200)
signup = Signup(loginname='newuser', displayname='New User', mail='new@example.com', password='notsecret')
signup = refetch_signup(signup)
self.assertFalse(signup.completed)
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_hostlimit', r)
self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed)
def test_confirm_confirmlimit(self):
signup = Signup(loginname='newuser', displayname='New User', mail='new@example.com', password='notsecret')
signup = refetch_signup(signup)
self.assertFalse(signup.completed)
for i in range(5):
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i})
self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed)
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_confirmlimit', r)
self.assertEqual(r.status_code, 200)
self.assertFalse(signup.completed)
from flask import url_for, request
from uffd.database import db
from uffd.models import User, UserEmail, Group, Role, Service, ServiceUser, FeatureFlag, MFAMethod, RecoveryCodeMethod, TOTPMethod
from tests.utils import dump, UffdTestCase
class TestUserViews(UffdTestCase):
def setUp(self):
super().setUp()
self.app.last_mail = None
self.login_as('admin')
def test_index(self):
r = self.client.get(path=url_for('user.index'), follow_redirects=True)
dump('user_index', r)
self.assertEqual(r.status_code, 200)
def test_new(self):
db.session.add(Role(name='base', is_default=True))
role1 = Role(name='role1')
db.session.add(role1)
role2 = Role(name='role2')
db.session.add(role2)
db.session.commit()
role1_id = role1.id
r = self.client.get(path=url_for('user.show'), follow_redirects=True)
dump('user_new', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(User.query.filter_by(loginname='newuser').one_or_none())
r = self.client.post(path=url_for('user.create'),
data={'loginname': 'newuser', 'email': 'newuser@example.com', 'displayname': 'New User',
f'role-{role1_id}': '1', 'password': 'newpassword'}, follow_redirects=True)
dump('user_new_submit', r)
self.assertEqual(r.status_code, 200)
user_ = User.query.filter_by(loginname='newuser').one_or_none()
roles = sorted([r.name for r in user_.roles_effective])
self.assertIsNotNone(user_)
self.assertFalse(user_.is_service_user)
self.assertEqual(user_.loginname, 'newuser')
self.assertEqual(user_.displayname, 'New User')
self.assertEqual(user_.primary_email.address, 'newuser@example.com')
self.assertFalse(user_.password)
self.assertGreaterEqual(user_.unix_uid, self.app.config['USER_MIN_UID'])
self.assertLessEqual(user_.unix_uid, self.app.config['USER_MAX_UID'])
role1 = Role(name='role1')
self.assertEqual(roles, ['base', 'role1'])
self.assertIsNotNone(self.app.last_mail)
def test_new_service(self):
db.session.add(Role(name='base', is_default=True))
role1 = Role(name='role1')
db.session.add(role1)
role2 = Role(name='role2')
db.session.add(role2)
db.session.commit()
role1_id = role1.id
r = self.client.get(path=url_for('user.show'), follow_redirects=True)
dump('user_new_service', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(User.query.filter_by(loginname='newuser').one_or_none())
r = self.client.post(path=url_for('user.create'),
data={'loginname': 'newuser', 'email': 'newuser@example.com', 'displayname': 'New User',
f'role-{role1_id}': '1', 'password': 'newpassword', 'serviceaccount': '1'}, follow_redirects=True)
dump('user_new_submit', r)
self.assertEqual(r.status_code, 200)
user = User.query.filter_by(loginname='newuser').one_or_none()
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user)
self.assertTrue(user.is_service_user)
self.assertEqual(user.loginname, 'newuser')
self.assertEqual(user.displayname, 'New User')
self.assertEqual(user.primary_email.address, 'newuser@example.com')
self.assertTrue(user.unix_uid)
self.assertFalse(user.password)
role1 = Role(name='role1')
self.assertEqual(roles, ['role1'])
self.assertIsNone(self.app.last_mail)
def test_new_invalid_loginname(self):
r = self.client.post(path=url_for('user.create'),
data={'loginname': '!newuser', 'email': 'newuser@example.com', 'displayname': 'New User',
'password': 'newpassword'}, follow_redirects=True)
dump('user_new_invalid_loginname', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(User.query.filter_by(loginname='newuser').one_or_none())
def test_new_empty_loginname(self):
r = self.client.post(path=url_for('user.create'),
data={'loginname': '', 'email': 'newuser@example.com', 'displayname': 'New User',
'password': 'newpassword'}, follow_redirects=True)
dump('user_new_empty_loginname', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(User.query.filter_by(loginname='newuser').one_or_none())
def test_new_conflicting_loginname(self):
self.assertEqual(User.query.filter_by(loginname='testuser').count(), 1)
r = self.client.post(path=url_for('user.create'),
data={'loginname': 'testuser', 'email': 'newuser@example.com', 'displayname': 'New User',
'password': 'newpassword'}, follow_redirects=True)
dump('user_new_conflicting_loginname', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(User.query.filter_by(loginname='testuser').count(), 1)
def test_new_empty_email(self):
r = self.client.post(path=url_for('user.create'),
data={'loginname': 'newuser', 'email': '', 'displayname': 'New User',
'password': 'newpassword'}, follow_redirects=True)
dump('user_new_empty_email', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(User.query.filter_by(loginname='newuser').one_or_none())
def test_new_conflicting_email(self):
FeatureFlag.unique_email_addresses.enable()
db.session.commit()
r = self.client.post(path=url_for('user.create'),
data={'loginname': 'newuser', 'email': 'test@example.com', 'displayname': 'New User',
'password': 'newpassword'}, follow_redirects=True)
dump('user_new_conflicting_email', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(User.query.filter_by(loginname='newuser').one_or_none())
def test_new_invalid_display_name(self):
r = self.client.post(path=url_for('user.create'),
data={'loginname': 'newuser', 'email': 'newuser@example.com', 'displayname': 'A'*200,
'password': 'newpassword'}, follow_redirects=True)
dump('user_new_invalid_display_name', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(User.query.filter_by(loginname='newuser').one_or_none())
def test_update(self):
user_unupdated = self.get_user()
email_id = str(user_unupdated.primary_email.id)
db.session.add(Role(name='base', is_default=True))
role1 = Role(name='role1')
db.session.add(role1)
role2 = Role(name='role2')
db.session.add(role2)
role2.members.append(user_unupdated)
db.session.commit()
role1_id = role1.id
r = self.client.get(path=url_for('user.show', id=user_unupdated.id), follow_redirects=True)
dump('user_update', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', id=user_unupdated.id),
data={'loginname': 'testuser',
f'email-{email_id}-present': '1', 'primary_email': email_id, 'recovery_email': 'primary',
'displayname': 'New User', f'role-{role1_id}': '1', 'password': ''},
follow_redirects=True)
dump('user_update_submit', r)
self.assertEqual(r.status_code, 200)
user_updated = self.get_user()
roles = sorted([r.name for r in user_updated.roles_effective])
self.assertEqual(user_updated.displayname, 'New User')
self.assertEqual(user_updated.primary_email.address, 'test@example.com')
self.assertEqual(user_updated.unix_uid, user_unupdated.unix_uid)
self.assertEqual(user_updated.loginname, user_unupdated.loginname)
self.assertTrue(user_updated.password.verify('userpassword'))
self.assertEqual(roles, ['base', 'role1'])
def test_update_password(self):
user_unupdated = self.get_user()
email_id = str(user_unupdated.primary_email.id)
r = self.client.get(path=url_for('user.show', id=user_unupdated.id), follow_redirects=True)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', id=user_unupdated.id),
data={'loginname': 'testuser',
f'email-{email_id}-present': '1', 'primary_email': email_id, 'recovery_email': 'primary',
'displayname': 'New User',
'password': 'newpassword'}, follow_redirects=True)
dump('user_update_password', r)
self.assertEqual(r.status_code, 200)
user_updated = self.get_user()
self.assertEqual(user_updated.displayname, 'New User')
self.assertEqual(user_updated.primary_email.address, 'test@example.com')
self.assertEqual(user_updated.unix_uid, user_unupdated.unix_uid)
self.assertEqual(user_updated.loginname, user_unupdated.loginname)
self.assertTrue(user_updated.password.verify('newpassword'))
self.assertFalse(user_updated.password.verify('userpassword'))
def test_update_invalid_password(self):
user_unupdated = self.get_user()
email_id = str(user_unupdated.primary_email.id)
r = self.client.get(path=url_for('user.show', id=user_unupdated.id), follow_redirects=True)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', id=user_unupdated.id),
data={'loginname': 'testuser',
f'email-{email_id}-present': '1', 'primary_email': email_id, 'recovery_email': 'primary',
'displayname': 'New User',
'password': 'A'}, follow_redirects=True)
dump('user_update_invalid_password', r)
self.assertEqual(r.status_code, 200)
user_updated = self.get_user()
self.assertFalse(user_updated.password.verify('A'))
self.assertTrue(user_updated.password.verify('userpassword'))
self.assertEqual(user_updated.displayname, user_unupdated.displayname)
self.assertEqual(user_updated.primary_email.address, user_unupdated.primary_email.address)
self.assertEqual(user_updated.loginname, user_unupdated.loginname)
# Regression test for #100 (login not possible if password contains character disallowed by SASLprep)
def test_update_saslprep_invalid_password(self):
user_unupdated = self.get_user()
email_id = str(user_unupdated.primary_email.id)
r = self.client.get(path=url_for('user.show', id=user_unupdated.id), follow_redirects=True)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', id=user_unupdated.id),
data={'loginname': 'testuser',
f'email-{email_id}-present': '1', 'primary_email': email_id, 'recovery_email': 'primary',
'displayname': 'New User',
'password': 'newpassword\n'}, follow_redirects=True)
dump('user_update_invalid_password', r)
self.assertEqual(r.status_code, 200)
user_updated = self.get_user()
self.assertFalse(user_updated.password.verify('newpassword\n'))
self.assertTrue(user_updated.password.verify('userpassword'))
self.assertEqual(user_updated.displayname, user_unupdated.displayname)
self.assertEqual(user_updated.primary_email.address, user_unupdated.primary_email.address)
self.assertEqual(user_updated.loginname, user_unupdated.loginname)
def test_update_email(self):
user = self.get_user()
email = UserEmail(user=user, address='foo@example.com')
service1 = Service(name='service1', enable_email_preferences=True, limit_access=False)
service2 = Service(name='service2', enable_email_preferences=True, limit_access=False)
db.session.add_all([service1, service2])
db.session.commit()
email1_id = user.primary_email.id
email2_id = email.id
service1_id = service1.id
service2_id = service2.id
r = self.client.post(path=url_for('user.update', id=user.id),
data={'loginname': 'testuser',
f'email-{email1_id}-present': '1',
f'email-{email2_id}-present': '1',
f'email-{email2_id}-verified': '1',
f'newemail-1-address': 'new1@example.com',
f'newemail-2-address': 'new2@example.com', f'newemail-2-verified': '1',
'primary_email': email2_id, 'recovery_email': email1_id,
f'service_{service1_id}_email': 'primary',
f'service_{service2_id}_email': email2_id,
'displayname': 'Test User', 'password': ''},
follow_redirects=True)
dump('user_update_email', r)
self.assertEqual(r.status_code, 200)
user = self.get_user()
self.assertEqual(user.primary_email.id, email2_id)
self.assertEqual(user.recovery_email.id, email1_id)
self.assertEqual(ServiceUser.query.get((service1.id, user.id)).service_email, None)
self.assertEqual(ServiceUser.query.get((service2.id, user.id)).service_email.id, email2_id)
self.assertEqual(
{email.address: email.verified for email in user.all_emails},
{
'test@example.com': True,
'foo@example.com': True,
'new1@example.com': False,
'new2@example.com': True,
}
)
def test_update_email_conflict(self):
user = self.get_user()
user_id = user.id
email_id = user.primary_email.id
email_address = user.primary_email.address
r = self.client.post(path=url_for('user.update', id=user.id),
data={'loginname': 'testuser',
f'email-{email_id}-present': '1',
f'newemail-1-address': user.primary_email.address},
follow_redirects=True)
dump('user_update_email_conflict', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(UserEmail.query.filter_by(user_id=user_id).count(), 1)
def test_update_email_strict_uniqueness(self):
FeatureFlag.unique_email_addresses.enable()
db.session.commit()
user = self.get_user()
email = UserEmail(user=user, address='foo@example.com')
service1 = Service(name='service1', enable_email_preferences=True, limit_access=False)
service2 = Service(name='service2', enable_email_preferences=True, limit_access=False)
db.session.add_all([service1, service2])
db.session.commit()
email1_id = user.primary_email.id
email2_id = email.id
service1_id = service1.id
service2_id = service2.id
r = self.client.post(path=url_for('user.update', id=user.id),
data={'loginname': 'testuser',
f'email-{email1_id}-present': '1',
f'email-{email2_id}-present': '1',
f'email-{email2_id}-verified': '1',
f'newemail-1-address': 'new1@example.com',
f'newemail-2-address': 'new2@example.com', f'newemail-2-verified': '1',
'primary_email': email2_id, 'recovery_email': email1_id,
f'service_{service1_id}_email': 'primary',
f'service_{service2_id}_email': email2_id,
'displayname': 'Test User', 'password': ''},
follow_redirects=True)
dump('user_update_email_strict_uniqueness', r)
self.assertEqual(r.status_code, 200)
user = self.get_user()
self.assertEqual(user.primary_email.id, email2_id)
self.assertEqual(user.recovery_email.id, email1_id)
self.assertEqual(ServiceUser.query.get((service1.id, user.id)).service_email, None)
self.assertEqual(ServiceUser.query.get((service2.id, user.id)).service_email.id, email2_id)
self.assertEqual(
{email.address: email.verified for email in user.all_emails},
{
'test@example.com': True,
'foo@example.com': True,
'new1@example.com': False,
'new2@example.com': True,
}
)
def test_update_email_strict_uniqueness_conflict(self):
FeatureFlag.unique_email_addresses.enable()
db.session.commit()
user = self.get_user()
user_id = user.id
email_id = user.primary_email.id
email_address = user.primary_email.address
r = self.client.post(path=url_for('user.update', id=user.id),
data={'loginname': 'testuser',
f'email-{email_id}-present': '1',
f'newemail-1-address': user.primary_email.address},
follow_redirects=True)
dump('user_update_email_strict_uniqueness_conflict', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(UserEmail.query.filter_by(user_id=user_id).count(), 1)
def test_update_invalid_display_name(self):
user_unupdated = self.get_user()
email_id = str(user_unupdated.primary_email.id)
r = self.client.get(path=url_for('user.show', id=user_unupdated.id), follow_redirects=True)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', id=user_unupdated.id),
data={'loginname': 'testuser',
f'email-{email_id}-present': '1', 'primary_email': email_id, 'recovery_email': 'primary',
'displayname': 'A'*200,
'password': 'newpassword'}, follow_redirects=True)
dump('user_update_invalid_display_name', r)
self.assertEqual(r.status_code, 200)
user_updated = self.get_user()
self.assertEqual(user_updated.displayname, user_unupdated.displayname)
self.assertEqual(user_updated.primary_email.address, user_unupdated.primary_email.address)
self.assertEqual(user_updated.loginname, user_unupdated.loginname)
self.assertFalse(user_updated.password.verify('newpassword'))
self.assertTrue(user_updated.password.verify('userpassword'))
def test_show(self):
r = self.client.get(path=url_for('user.show', id=self.get_user().id), follow_redirects=True)
dump('user_show', r)
self.assertEqual(r.status_code, 200)
def test_show_self(self):
r = self.client.get(path=url_for('user.show', id=self.get_admin().id), follow_redirects=True)
dump('user_show_self', r)
self.assertEqual(r.status_code, 200)
def test_delete(self):
r = self.client.get(path=url_for('user.delete', id=self.get_user().id), follow_redirects=True)
dump('user_delete', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(self.get_user())
def test_deactivate(self):
r = self.client.get(path=url_for('user.deactivate', id=self.get_user().id), follow_redirects=True)
dump('user_deactivate', r)
self.assertEqual(r.status_code, 200)
self.assertTrue(self.get_user().is_deactivated)
def test_activate(self):
self.get_user().is_deactivated = True
db.session.commit()
r = self.client.get(path=url_for('user.activate', id=self.get_user().id), follow_redirects=True)
dump('user_activate', r)
self.assertEqual(r.status_code, 200)
self.assertFalse(self.get_user().is_deactivated)
def test_disable_mfa(self):
db.session.add(RecoveryCodeMethod(user=self.get_admin()))
user = self.get_user()
for _ in range(10):
db.session.add(RecoveryCodeMethod(user=user))
db.session.add(TOTPMethod(user=self.get_user(), name='My phone'))
db.session.commit()
self.login_as('admin')
admin_methods = len(MFAMethod.query.filter_by(user=self.get_admin()).all())
r = self.client.get(path=url_for('user.disable_mfa', id=self.get_user().id), follow_redirects=True)
dump('user_disable_mfa', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(MFAMethod.query.filter_by(user=self.get_user()).all()), 0)
self.assertEqual(len(MFAMethod.query.filter_by(user=self.get_admin()).all()), admin_methods)
def test_csvimport(self):
role1 = Role(name='role1')
db.session.add(role1)
role2 = Role(name='role2')
db.session.add(role2)
db.session.commit()
data = f'''\
newuser1,newuser1@example.com,
newuser2,newuser2@example.com,{role1.id}
newuser3,newuser3@example.com,{role1.id};{role2.id}
newuser4,newuser4@example.com,9999
newuser5,newuser5@example.com,notanumber
newuser6,newuser6@example.com,{role1.id};{role2.id};
newuser7,invalidmail,
newuser8,,
,newuser9@example.com,
,,
,,,
newuser10,newuser10@example.com,
newuser11,newuser11@example.com, {role1.id};{role2.id}
newuser12,newuser12@example.com,{role1.id};{role1.id}
<invalid tag-like thingy>'''
r = self.client.post(path=url_for('user.csvimport'), data={'csv': data}, follow_redirects=True)
dump('user_csvimport', r)
self.assertEqual(r.status_code, 200)
user = User.query.filter_by(loginname='newuser1').one_or_none()
self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser1')
self.assertEqual(user.displayname, 'newuser1')
self.assertEqual(user.primary_email.address, 'newuser1@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, [])
user = User.query.filter_by(loginname='newuser2').one_or_none()
self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser2')
self.assertEqual(user.displayname, 'newuser2')
self.assertEqual(user.primary_email.address, 'newuser2@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['role1'])
user = User.query.filter_by(loginname='newuser3').one_or_none()
self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser3')
self.assertEqual(user.displayname, 'newuser3')
self.assertEqual(user.primary_email.address, 'newuser3@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['role1', 'role2'])
user = User.query.filter_by(loginname='newuser4').one_or_none()
self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser4')
self.assertEqual(user.displayname, 'newuser4')
self.assertEqual(user.primary_email.address, 'newuser4@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, [])
user = User.query.filter_by(loginname='newuser5').one_or_none()
self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser5')
self.assertEqual(user.displayname, 'newuser5')
self.assertEqual(user.primary_email.address, 'newuser5@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, [])
user = User.query.filter_by(loginname='newuser6').one_or_none()
self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser6')
self.assertEqual(user.displayname, 'newuser6')
self.assertEqual(user.primary_email.address, 'newuser6@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['role1', 'role2'])
self.assertIsNone(User.query.filter_by(loginname='newuser7').one_or_none())
self.assertIsNone(User.query.filter_by(loginname='newuser8').one_or_none())
self.assertIsNone(User.query.filter_by(loginname='newuser9').one_or_none())
user = User.query.filter_by(loginname='newuser10').one_or_none()
self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser10')
self.assertEqual(user.displayname, 'newuser10')
self.assertEqual(user.primary_email.address, 'newuser10@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, [])
user = User.query.filter_by(loginname='newuser11').one_or_none()
self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser11')
self.assertEqual(user.displayname, 'newuser11')
self.assertEqual(user.primary_email.address, 'newuser11@example.com')
# Currently the csv import is not very robust, imho newuser11 should have role1 and role2!
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['role2'])
user = User.query.filter_by(loginname='newuser12').one_or_none()
self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser12')
self.assertEqual(user.displayname, 'newuser12')
self.assertEqual(user.primary_email.address, 'newuser12@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['role1'])
class TestGroupViews(UffdTestCase):
def setUp(self):
super().setUp()
self.login_as('admin')
def test_index(self):
r = self.client.get(path=url_for('group.index'), follow_redirects=True)
dump('group_index', r)
self.assertEqual(r.status_code, 200)
def test_show(self):
r = self.client.get(path=url_for('group.show', gid=20001), follow_redirects=True)
dump('group_show', r)
self.assertEqual(r.status_code, 200)
def test_new(self):
r = self.client.get(path=url_for('group.show'), follow_redirects=True)
dump('group_new', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(Group.query.filter_by(name='newgroup').one_or_none())
r = self.client.post(path=url_for('group.update'),
data={'unix_gid': '', 'name': 'newgroup', 'description': 'Test description'},
follow_redirects=True)
dump('group_new_submit', r)
self.assertEqual(r.status_code, 200)
group = Group.query.filter_by(name='newgroup').one_or_none()
self.assertIsNotNone(group)
self.assertEqual(group.name, 'newgroup')
self.assertEqual(group.description, 'Test description')
self.assertGreaterEqual(group.unix_gid, self.app.config['GROUP_MIN_GID'])
self.assertLessEqual(group.unix_gid, self.app.config['GROUP_MAX_GID'])
def test_new_fixed_gid(self):
gid = self.app.config['GROUP_MAX_GID'] - 1
r = self.client.post(path=url_for('group.update'),
data={'unix_gid': str(gid), 'name': 'newgroup', 'description': 'Test description'},
follow_redirects=True)
dump('group_new_fixed_gid', r)
self.assertEqual(r.status_code, 200)
group = Group.query.filter_by(name='newgroup').one_or_none()
self.assertIsNotNone(group)
self.assertEqual(group.name, 'newgroup')
self.assertEqual(group.description, 'Test description')
self.assertEqual(group.unix_gid, gid)
def test_new_existing_name(self):
gid = self.app.config['GROUP_MAX_GID'] - 1
db.session.add(Group(name='newgroup', description='Original description', unix_gid=gid))
db.session.commit()
r = self.client.post(path=url_for('group.update'),
data={'unix_gid': '', 'name': 'newgroup', 'description': 'New description'},
follow_redirects=True)
dump('group_new_existing_name', r)
self.assertEqual(r.status_code, 400)
group = Group.query.filter_by(name='newgroup').one_or_none()
self.assertIsNotNone(group)
self.assertEqual(group.name, 'newgroup')
self.assertEqual(group.description, 'Original description')
self.assertEqual(group.unix_gid, gid)
def test_new_name_too_long(self):
r = self.client.post(path=url_for('group.update'),
data={'unix_gid': '', 'name': 'a'*33, 'description': 'New description'},
follow_redirects=True)
dump('group_new_name_too_long', r)
self.assertEqual(r.status_code, 400)
group = Group.query.filter_by(name='a'*33).one_or_none()
self.assertIsNone(group)
def test_new_name_too_short(self):
r = self.client.post(path=url_for('group.update'),
data={'unix_gid': '', 'name': '', 'description': 'New description'},
follow_redirects=True)
dump('group_new_name_too_short', r)
self.assertEqual(r.status_code, 400)
group = Group.query.filter_by(name='').one_or_none()
self.assertIsNone(group)
def test_new_name_invalid(self):
r = self.client.post(path=url_for('group.update'),
data={'unix_gid': '', 'name': 'foo bar', 'description': 'New description'},
follow_redirects=True)
dump('group_new_name_invalid', r)
self.assertEqual(r.status_code, 400)
group = Group.query.filter_by(name='foo bar').one_or_none()
self.assertIsNone(group)
def test_new_existing_gid(self):
gid = self.app.config['GROUP_MAX_GID'] - 1
db.session.add(Group(name='newgroup', description='Original description', unix_gid=gid))
db.session.commit()
r = self.client.post(path=url_for('group.update'),
data={'unix_gid': str(gid), 'name': 'newgroup2', 'description': 'New description'},
follow_redirects=True)
dump('group_new_existing_gid', r)
self.assertEqual(r.status_code, 400)
group = Group.query.filter_by(name='newgroup').one_or_none()
self.assertIsNotNone(group)
self.assertEqual(group.name, 'newgroup')
self.assertEqual(group.description, 'Original description')
self.assertEqual(group.unix_gid, gid)
self.assertIsNone(Group.query.filter_by(name='newgroup2').one_or_none())
def test_update(self):
group = Group(name='newgroup', description='Original description')
db.session.add(group)
db.session.commit()
group_id = group.id
group_gid = group.unix_gid
new_gid = self.app.config['GROUP_MAX_GID'] - 1
r = self.client.post(path=url_for('group.update', id=group_id),
data={'unix_gid': str(new_gid), 'name': 'newgroup_changed', 'description': 'New description'},
follow_redirects=True)
dump('group_update', r)
self.assertEqual(r.status_code, 200)
group = Group.query.get(group_id)
self.assertEqual(group.name, 'newgroup') # Not changed
self.assertEqual(group.description, 'New description') # Changed
self.assertEqual(group.unix_gid, group_gid) # Not changed
def test_delete(self):
group1 = Group(name='newgroup1', description='Original description1')
group2 = Group(name='newgroup2', description='Original description2')
db.session.add(group1)
db.session.add(group2)
db.session.commit()
group1_id = group1.id
group2_id = group2.id
r = self.client.get(path=url_for('group.delete', id=group1_id), follow_redirects=True)
dump('group_delete', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(Group.query.get(group1_id))
self.assertIsNotNone(Group.query.get(group2_id))
import os
import secrets
import sys
from flask import Flask, redirect, url_for
from werkzeug.routing import IntegerConverter
from flask import Flask, redirect, url_for, request, render_template
from flask_babel import Babel
from babel.dates import LOCALTZ
from werkzeug.exceptions import Forbidden
from flask_migrate import Migrate
from uffd.database import db, SQLAlchemyJSON
from uffd.template_helper import register_template_helper
from uffd.navbar import setup_navbar
from .database import db, customize_db_engine
from .template_helper import register_template_helper
from .navbar import setup_navbar
from .csrf import bp as csrf_bp
from . import models, views, commands
def load_config_file(app, path, silent=False):
if not os.path.exists(path):
if not silent:
raise Exception(f"Config file {path} not found")
return False
def create_app(test_config=None):
# create and configure the app
app = Flask(__name__, instance_relative_config=False)
app.json_encoder = SQLAlchemyJSON
if path.endswith(".json"):
app.config.from_json(path)
elif path.endswith(".yaml") or path.endswith(".yml"):
import yaml # pylint: disable=import-outside-toplevel disable=import-error
with open(path, encoding='utf-8') as ymlfile:
data = yaml.safe_load(ymlfile)
app.config.from_mapping(data)
else:
app.config.from_pyfile(path, silent=True)
return True
def init_config(app: Flask, test_config):
# set development default config values
app.config.from_mapping(
SECRET_KEY=secrets.token_hex(128),
SQLALCHEMY_DATABASE_URI="sqlite:///{}".format(os.path.join(app.instance_path, 'uffd.sqlit3')),
)
app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{os.path.join(app.instance_path, 'uffd.sqlit3')}"
app.config.from_pyfile('default_config.cfg')
# load config
if test_config is not None:
app.config.from_mapping(test_config)
elif os.environ.get('CONFIG_PATH'):
load_config_file(app, os.environ['CONFIG_PATH'], silent=False)
else:
for filename in ["config.cfg", "config.json", "config.yml", "config.yaml"]:
if load_config_file(app, os.path.join(app.instance_path, filename), silent=True):
break
if app.secret_key is None:
if app.env == "production":
raise Exception("SECRET_KEY not configured and we are running in production mode!")
app.secret_key = secrets.token_hex(128)
def create_app(test_config=None): # pylint: disable=too-many-locals,too-many-statements
app = Flask(__name__, instance_relative_config=False)
init_config(app, test_config)
register_template_helper(app)
setup_navbar(app)
if not test_config:
# load the instance config, if it exists, when not testing
app.config.from_pyfile(os.path.join(app.instance_path, 'config.cfg'))
# Sort the navbar positions by their blueprint names (from the left)
if app.config['DEFAULT_PAGE_SERVICES']:
positions = ["service", "selfservice"]
else:
# load the test config if passed in
app.config.from_mapping(test_config)
positions = ["selfservice", "service"]
positions += ["rolemod", "invite", "user", "group", "role", "mail"]
setup_navbar(app, positions)
# ensure the instance folder exists
app.register_blueprint(csrf_bp)
views.init_app(app)
commands.init_app(app)
# We never want to fail here, but at a file access that doesn't work.
# We might only have read access to app.instance_path
try:
os.makedirs(app.instance_path)
except OSError:
os.makedirs(app.instance_path, exist_ok=True)
except: # pylint: disable=bare-except
pass
db.init_app(app)
# pylint: disable=C0415
from uffd import user, selfservice, role, session, csrf, ldap
# pylint: enable=C0415
Migrate(app, db, render_as_batch=True, directory=os.path.join(app.root_path, 'migrations'))
with app.app_context():
customize_db_engine(db.engine)
for i in user.bp + selfservice.bp + role.bp + session.bp + csrf.bp + ldap.bp:
app.register_blueprint(i)
@app.shell_context_processor
def push_request_context(): #pylint: disable=unused-variable
ctx = {name: getattr(models, name) for name in models.__all__}
ctx.setdefault('db', db)
return ctx
@app.route("/")
def index(): #pylint: disable=unused-variable
return redirect(url_for('selfservice.index'))
# flask-babel requires pytz-style timezone objects, but in rare cases (e.g.
# non-IANA TZ values) LOCALTZ is stdlib-style (without normalize/localize)
if not hasattr(LOCALTZ, 'normalize'):
LOCALTZ.normalize = lambda dt: dt
if not hasattr(LOCALTZ, 'localize'):
LOCALTZ.localize = lambda dt: dt.replace(tzinfo=LOCALTZ)
return app
class PatchedBabel(Babel):
@property
def default_timezone(self):
if self.app.config['BABEL_DEFAULT_TIMEZONE'] == 'LOCALTZ':
return LOCALTZ
return super().default_timezone
def init_db(app):
with app.app_context():
db.create_all()
babel = PatchedBabel(app, default_timezone='LOCALTZ')
@babel.localeselector
def get_locale(): #pylint: disable=unused-variable
language_cookie = request.cookies.get('language')
if language_cookie is not None and language_cookie in app.config['LANGUAGES']:
return language_cookie
return request.accept_languages.best_match(list(app.config['LANGUAGES']))
app.add_template_global(get_locale)
return app
[python: **.py]
[jinja2: **/templates/**.html]
from .user import user_command
from .group import group_command
from .role import role_command
from .profile import profile_command
from .gendevcert import gendevcert_command
from .cleanup import cleanup_command
from .roles_update_all import roles_update_all_command
from .unique_email_addresses import unique_email_addresses_command
def init_app(app):
app.cli.add_command(user_command)
app.cli.add_command(group_command)
app.cli.add_command(role_command)
app.cli.add_command(gendevcert_command)
app.cli.add_command(profile_command)
app.cli.add_command(cleanup_command)
app.cli.add_command(roles_update_all_command)
app.cli.add_command(unique_email_addresses_command)
import click
from flask.cli import with_appcontext
from uffd.tasks import cleanup_task
from uffd.database import db
@click.command('cleanup', help='Cleanup expired data')
@with_appcontext
def cleanup_command():
cleanup_task.run()
db.session.commit()
import os
import click
from werkzeug.serving import make_ssl_devcert
@click.command("gendevcert", help='Generates a self-signed TLS certificate for development')
def gendevcert_command(): #pylint: disable=unused-variable
if os.path.exists('devcert.crt') or os.path.exists('devcert.key'):
print('Refusing to overwrite existing "devcert.crt"/"devcert.key" file!')
return
make_ssl_devcert('devcert')
print('Certificate written to "devcert.crt", private key to "devcert.key".')
print('Run `flask run --cert devcert.crt --key devcert.key` to use it.')
from flask import current_app
from flask.cli import AppGroup
from sqlalchemy.exc import IntegrityError
import click
from uffd.database import db
from uffd.models import Group
group_command = AppGroup('group', help='Manage groups')
@group_command.command(help='List names of all groups')
def list():
with current_app.test_request_context():
for group in Group.query:
click.echo(group.name)
@group_command.command(help='Show details of group')
@click.argument('name')
def show(name):
with current_app.test_request_context():
group = Group.query.filter_by(name=name).one_or_none()
if group is None:
raise click.ClickException(f'Group {name} not found')
click.echo(f'Name: {group.name}')
click.echo(f'Unix GID: {group.unix_gid}')
click.echo(f'Description: {group.description}')
click.echo(f'Members: {", ".join([user.loginname for user in group.members])}')
@group_command.command(help='Create new group')
@click.argument('name')
@click.option('--description', default='', help='Set description text. Empty per default.')
def create(name, description):
with current_app.test_request_context():
group = Group(description=description)
if not group.set_name(name):
raise click.ClickException('Invalid name')
try:
db.session.add(group)
db.session.commit()
except IntegrityError:
# pylint: disable=raise-missing-from
raise click.ClickException(f'A group with name "{name}" already exists')
@group_command.command(help='Update group attributes')
@click.argument('name')
@click.option('--description', help='Set description text.')
def update(name, description):
with current_app.test_request_context():
group = Group.query.filter_by(name=name).one_or_none()
if group is None:
raise click.ClickException(f'Group {name} not found')
if description is not None:
group.description = description
db.session.commit()
@group_command.command(help='Delete group')
@click.argument('name')
def delete(name):
with current_app.test_request_context():
group = Group.query.filter_by(name=name).one_or_none()
if group is None:
raise click.ClickException(f'Group {name} not found')
db.session.delete(group)
db.session.commit()
import os
from flask import current_app
from flask.cli import with_appcontext
import click
try:
from werkzeug.middleware.profiler import ProfilerMiddleware
except ImportError:
from werkzeug.contrib.profiler import ProfilerMiddleware
@click.command("profile", help='Runs app with profiler')
@with_appcontext
def profile_command(): #pylint: disable=unused-variable
# app.run() is silently ignored if executed from commands. We really want
# to do this, so we overwrite the check by overwriting the environment
# variable.
os.environ['FLASK_RUN_FROM_CLI'] = 'false'
current_app.wsgi_app = ProfilerMiddleware(current_app.wsgi_app, restrictions=[30])
current_app.run(debug=True)
from flask import current_app
from flask.cli import AppGroup
from sqlalchemy.exc import IntegrityError
import click
from uffd.database import db
from uffd.models import Group, Role, RoleGroup
role_command = AppGroup('role', help='Manage roles')
# pylint: disable=too-many-arguments,too-many-locals
def update_attrs(role, description=None, default=None,
moderator_group=None, clear_moderator_group=False,
clear_groups=False, add_group=tuple(), remove_group=tuple(),
clear_roles=False, add_role=tuple(), remove_role=tuple()):
if description is not None:
role.description = description
if default is not None:
role.is_default = default
if clear_moderator_group:
role.moderator_group = None
elif moderator_group is not None:
group = Group.query.filter_by(name=moderator_group).one_or_none()
if group is None:
raise click.ClickException(f'Moderaor group {moderator_group} not found')
role.moderator_group = group
if clear_groups:
role.groups.clear()
for group_name in add_group:
group = Group.query.filter_by(name=group_name).one_or_none()
if group is None:
raise click.ClickException(f'Group {group_name} not found')
role.groups[group] = RoleGroup(group=group)
for group_name in remove_group:
group = Group.query.filter_by(name=group_name).one_or_none()
if group is None:
raise click.ClickException(f'Group {group_name} not found')
del role.groups[group]
if clear_roles:
role.included_roles.clear()
for role_name in add_role:
_role = Role.query.filter_by(name=role_name).one_or_none()
if _role is None:
raise click.ClickException(f'Role {role_name} not found')
role.included_roles.append(_role)
for role_name in remove_role:
_role = Role.query.filter_by(name=role_name).one_or_none()
if _role is None:
raise click.ClickException(f'Role {role_name} not found')
role.included_roles.remove(_role)
@role_command.command(help='List names of all roles')
def list():
with current_app.test_request_context():
for role in Role.query:
click.echo(role.name)
@role_command.command(help='Show details of group')
@click.argument('name')
def show(name):
with current_app.test_request_context():
role = Role.query.filter_by(name=name).one_or_none()
if role is None:
raise click.ClickException(f'Role {name} not found')
click.echo(f'Name: {role.name}')
click.echo(f'Description: {role.description}')
click.echo(f'Default: {role.is_default}')
click.echo(f'Moderator group: {role.moderator_group.name if role.moderator_group else None}')
click.echo(f'Direct groups: {", ".join(sorted([group.name for group in role.groups]))}')
click.echo(f'Effective groups: {", ".join(sorted([group.name for group in role.groups_effective]))}')
click.echo(f'Included roles: {", ".join(sorted([irole.name for irole in role.included_roles]))}')
click.echo(f'Direct members: {", ".join(sorted([user.loginname for user in role.members]))}')
click.echo(f'Effective members: {", ".join(sorted([user.loginname for user in role.members_effective]))}')
@role_command.command(help='Create new role')
@click.argument('name')
@click.option('--description', default='', help='Set description text.')
@click.option('--default/--no-default', default=False, help='Mark role as default or not. Non-service users are auto-added to default roles.')
@click.option('--moderator-group', metavar='GROUP_NAME', help='Set moderator group. No moderator group if unset.')
@click.option('--add-group', multiple=True, metavar='GROUP_NAME', help='Add group granted to role members. Repeat to add multiple groups.')
@click.option('--add-role', multiple=True, metavar='ROLE_NAME', help='Add role to inherit groups from. Repeat to add multiple roles.')
def create(name, description, default, moderator_group, add_group, add_role):
with current_app.test_request_context():
try:
role = Role(name=name)
update_attrs(role, description, default, moderator_group,
add_group=add_group, add_role=add_role)
db.session.add(role)
role.update_member_groups()
db.session.commit()
except IntegrityError:
# pylint: disable=raise-missing-from
raise click.ClickException(f'A role with name "{name}" already exists')
@role_command.command(help='Update role attributes')
@click.argument('name')
@click.option('--description', help='Set description text.')
@click.option('--default/--no-default', default=None, help='Mark role as default or not. Non-service users are auto-added to default roles.')
@click.option('--moderator-group', metavar='GROUP_NAME', help='Set moderator group.')
@click.option('--no-moderator-group', is_flag=True, flag_value=True, default=False, help='Clear moderator group setting.')
@click.option('--clear-groups', is_flag=True, flag_value=True, default=False, help='Remove all groups granted to role members. Executed before --add-group.')
@click.option('--add-group', multiple=True, metavar='GROUP_NAME', help='Add group granted to role members. Repeat to add multiple groups.')
@click.option('--remove-group', multiple=True, metavar='GROUP_NAME', help='Remove group granted to role members. Repeat to remove multiple groups.')
@click.option('--clear-roles', is_flag=True, flag_value=True, default=False, help='Remove all included roles. Executed before --add-role.')
@click.option('--add-role', multiple=True, metavar='ROLE_NAME', help='Add role to inherit groups from. Repeat to add multiple roles.')
@click.option('--remove-role', multiple=True, metavar='ROLE_NAME', help='Remove included role. Repeat to remove multiple roles.')
def update(name, description, default, moderator_group, no_moderator_group,
clear_groups, add_group, remove_group, clear_roles, add_role, remove_role):
with current_app.test_request_context():
role = Role.query.filter_by(name=name).one_or_none()
if role is None:
raise click.ClickException(f'Role {name} not found')
old_members = set(role.members_effective)
update_attrs(role, description, default, moderator_group,
no_moderator_group, clear_groups, add_group, remove_group,
clear_roles, add_role, remove_role)
for user in old_members:
user.update_groups()
role.update_member_groups()
db.session.commit()
@role_command.command(help='Delete role')
@click.argument('name')
def delete(name):
with current_app.test_request_context():
role = Role.query.filter_by(name=name).one_or_none()
if role is None:
raise click.ClickException(f'Role {name} not found')
old_members = set(role.members_effective)
db.session.delete(role)
for user in old_members:
user.update_groups()
db.session.commit()
import sys
from flask import current_app
from flask.cli import with_appcontext
import click
from uffd.database import db
from uffd.models import User
@click.command('roles-update-all', help='Update group memberships for all users based on their roles')
@click.option('--check-only', is_flag=True)
@with_appcontext
def roles_update_all_command(check_only): #pylint: disable=unused-variable
consistent = True
with current_app.test_request_context():
for user in User.query.all():
groups_added, groups_removed = user.update_groups()
if groups_added:
consistent = False
print('Adding groups [%s] to user %s'%(', '.join([group.name for group in groups_added]), user.loginname))
if groups_removed:
consistent = False
print('Removing groups [%s] from user %s'%(', '.join([group.name for group in groups_removed]), user.loginname))
if not check_only:
db.session.commit()
if check_only and not consistent:
print('No changes were made because --check-only is set')
print()
print('Error: Groups are not consistent with roles in database')
sys.exit(1)
import click
from flask.cli import with_appcontext
from sqlalchemy.exc import IntegrityError
from uffd.database import db
from uffd.models import User, UserEmail, FeatureFlag
# pylint completely fails to understand SQLAlchemy's query functions
# pylint: disable=no-member
@click.group('unique-email-addresses', help='Enable/disable e-mail address uniqueness checks')
def unique_email_addresses_command():
pass
@unique_email_addresses_command.command('enable')
@with_appcontext
def enable_unique_email_addresses_command():
if FeatureFlag.unique_email_addresses:
raise click.ClickException('Uniqueness checks for e-mail addresses are already enabled')
query = db.select([UserEmail.address_normalized, UserEmail.user_id])\
.group_by(UserEmail.address_normalized, UserEmail.user_id)\
.having(db.func.count(UserEmail.id.distinct()) > 1)
for address_normalized, user_id in db.session.execute(query).fetchall():
user = User.query.get(user_id)
user_emails = UserEmail.query.filter_by(address_normalized=address_normalized, user_id=user_id)
click.echo(f'User "{user.loginname}" has the same e-mail address multiple times:', err=True)
for user_email in user_emails:
if user_email.verified:
click.echo(f'- {user_email.address}', err=True)
else:
click.echo(f'- {user_email.address} (unverified)', err=True)
click.echo()
query = db.select([UserEmail.address_normalized, UserEmail.address])\
.where(UserEmail.verified)\
.group_by(UserEmail.address_normalized)\
.having(db.func.count(UserEmail.id.distinct()) > 1)
for address_normalized, address in db.session.execute(query).fetchall():
click.echo(f'E-mail address "{address}" is used by multiple users:', err=True)
user_emails = UserEmail.query.filter_by(address_normalized=address_normalized, verified=True)
for user_email in user_emails:
if user_email.address != address:
click.echo(f'- {user_email.user.loginname} ({user_email.address})', err=True)
else:
click.echo(f'- {user_email.user.loginname}', err=True)
click.echo()
try:
FeatureFlag.unique_email_addresses.enable()
except IntegrityError:
# pylint: disable=raise-missing-from
raise click.ClickException('''Some existing e-mail addresses violate uniqueness checks
You need to fix this manually in the admin interface. Then run this command
again to continue.''')
db.session.commit()
click.echo('Uniqueness checks for e-mail addresses enabled')
@unique_email_addresses_command.command('disable')
@with_appcontext
def disable_unique_email_addresses_command():
if not FeatureFlag.unique_email_addresses:
raise click.ClickException('Uniqueness checks for e-mail addresses are already disabled')
click.echo('''Please note that the option to disable email address uniqueness checks will
be remove in uffd v3.
''', err=True)
FeatureFlag.unique_email_addresses.disable()
db.session.commit()
click.echo('Uniqueness checks for e-mail addresses disabled')
from flask import current_app
from flask.cli import AppGroup
from sqlalchemy.exc import IntegrityError
import click
from uffd.database import db
from uffd.models import User, Role
user_command = AppGroup('user', help='Manage users')
# pylint: disable=too-many-arguments
def update_attrs(user, mail=None, displayname=None, password=None,
prompt_password=False, clear_roles=False,
add_role=tuple(), remove_role=tuple(), deactivate=None):
if password is None and prompt_password:
password = click.prompt('Password', hide_input=True, confirmation_prompt='Confirm password')
if mail is not None and not user.set_primary_email_address(mail):
raise click.ClickException('Invalid mail address')
if displayname is not None and not user.set_displayname(displayname):
raise click.ClickException('Invalid displayname')
if password is not None and not user.set_password(password):
raise click.ClickException('Invalid password')
if deactivate is not None:
user.is_deactivated = deactivate
if clear_roles:
user.roles.clear()
for role_name in add_role:
role = Role.query.filter_by(name=role_name).one_or_none()
if role is None:
raise click.ClickException(f'Role {role_name} not found')
role.members.append(user)
for role_name in remove_role:
role = Role.query.filter_by(name=role_name).one_or_none()
if role is None:
raise click.ClickException(f'Role {role_name} not found')
role.members.remove(user)
user.update_groups()
@user_command.command(help='List login names of all users')
def list():
with current_app.test_request_context():
for user in User.query:
click.echo(user.loginname)
@user_command.command(help='Show details of user')
@click.argument('loginname')
def show(loginname):
with current_app.test_request_context():
user = User.query.filter_by(loginname=loginname).one_or_none()
if user is None:
raise click.ClickException(f'User {loginname} not found')
click.echo(f'Loginname: {user.loginname}')
click.echo(f'Deactivated: {user.is_deactivated}')
click.echo(f'Displayname: {user.displayname}')
click.echo(f'Mail: {user.primary_email.address}')
click.echo(f'Service User: {user.is_service_user}')
click.echo(f'Roles: {", ".join([role.name for role in user.roles])}')
click.echo(f'Groups: {", ".join([group.name for group in user.groups])}')
@user_command.command(help='Create new user')
@click.argument('loginname')
@click.option('--mail', required=True, metavar='EMAIL_ADDRESS', help='E-Mail address')
@click.option('--displayname', help='Set display name. Defaults to login name.')
@click.option('--service/--no-service', default=False, help='Create service or regular (default) user. '+\
'Regular users automatically have roles marked as default. '+\
'Service users do not.')
@click.option('--password', help='Password for SSO login. Login disabled if unset.')
@click.option('--prompt-password', is_flag=True, flag_value=True, default=False, help='Read password interactively from terminal.')
@click.option('--add-role', multiple=True, help='Add role to user. Repeat to add multiple roles.', metavar='ROLE_NAME')
@click.option('--deactivate', is_flag=True, flag_value=True, default=None, help='Deactivate account.')
def create(loginname, mail, displayname, service, password, prompt_password, add_role, deactivate):
with current_app.test_request_context():
if displayname is None:
displayname = loginname
user = User(is_service_user=service)
if not user.set_loginname(loginname, ignore_blocklist=True):
raise click.ClickException('Invalid loginname')
try:
db.session.add(user)
update_attrs(user, mail, displayname, password, prompt_password, add_role=add_role, deactivate=deactivate)
db.session.commit()
except IntegrityError:
# pylint: disable=raise-missing-from
raise click.ClickException('Login name or e-mail address is already in use')
@user_command.command(help='Update user attributes and roles')
@click.argument('loginname')
@click.option('--mail', metavar='EMAIL_ADDRESS', help='Set e-mail address.')
@click.option('--displayname', help='Set display name.')
@click.option('--password', help='Set password for SSO login.')
@click.option('--prompt-password', is_flag=True, flag_value=True, default=False, help='Set password by reading it interactivly from terminal.')
@click.option('--clear-roles', is_flag=True, flag_value=True, default=False, help='Remove all roles from user. Executed before --add-role.')
@click.option('--add-role', multiple=True, help='Add role to user. Repeat to add multiple roles.')
@click.option('--remove-role', multiple=True, help='Remove role from user. Repeat to remove multiple roles.')
@click.option('--deactivate/--activate', default=None, help='Deactivate or reactivate account.')
def update(loginname, mail, displayname, password, prompt_password, clear_roles, add_role, remove_role, deactivate):
with current_app.test_request_context():
user = User.query.filter_by(loginname=loginname).one_or_none()
if user is None:
raise click.ClickException(f'User {loginname} not found')
try:
update_attrs(user, mail, displayname, password, prompt_password, clear_roles, add_role, remove_role, deactivate)
db.session.commit()
except IntegrityError:
# pylint: disable=raise-missing-from
raise click.ClickException('E-mail address is already in use')
@user_command.command(help='Delete user')
@click.argument('loginname')
def delete(loginname):
with current_app.test_request_context():
user = User.query.filter_by(loginname=loginname).one_or_none()
if user is None:
raise click.ClickException(f'User {loginname} not found')
db.session.delete(user)
db.session.commit()
......@@ -4,9 +4,7 @@ from flask import Blueprint, request, session
bp = Blueprint("csrf", __name__)
# pylint: disable=invalid-name
csrfEndpoints = []
# pylint: enable=invalid-name
csrf_endpoints = []
def csrf_protect(blueprint=None, endpoint=None):
def wraper(func):
......@@ -15,7 +13,7 @@ def csrf_protect(blueprint=None, endpoint=None):
urlendpoint = "{}.{}".format(blueprint.name, func.__name__)
else:
urlendpoint = func.__name__
csrfEndpoints.append(urlendpoint)
csrf_endpoints.append(urlendpoint)
@wraps(func)
def decorator(*args, **kwargs):
if '_csrf_token' in request.values:
......@@ -32,6 +30,6 @@ def csrf_protect(blueprint=None, endpoint=None):
@bp.app_url_defaults
def csrf_inject(endpoint, values):
if endpoint not in csrfEndpoints or not session.get('_csrf_token'):
if endpoint not in csrf_endpoints or not session.get('_csrf_token'):
return
values['_csrf_token'] = session['_csrf_token']
from .csrf import bp as csrf_bp, csrf_protect
bp = [csrf_bp]