Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • strifel/uffd
  • thies/uffd-2
6 results
Select Git revision
Show changes
Showing
with 3340 additions and 978 deletions
from uffd.utils import nopad_b32decode, nopad_b32encode, nopad_urlsafe_b64decode, nopad_urlsafe_b64encode
from tests.utils import UffdTestCase
class TestUtils(UffdTestCase):
def test_nopad_b32(self):
for n in range(0, 32):
self.assertEqual(b'X'*n, nopad_b32decode(nopad_b32encode(b'X'*n)))
def test_nopad_b64(self):
for n in range(0, 32):
self.assertEqual(b'X'*n, nopad_urlsafe_b64decode(nopad_urlsafe_b64encode(b'X'*n)))
import os import os
import tempfile
import shutil
import unittest import unittest
from flask import request, url_for from flask import url_for
import flask_migrate
from uffd import create_app, db from uffd import create_app, db
from uffd.user.models import User, Group from uffd.models import User, Group, Mail
from uffd.mail.models import Mail
def dump(basename, resp): def dump(basename, resp):
basename = basename.replace('.', '_').replace('/', '_') basename = basename.replace('.', '_').replace('/', '_')
...@@ -22,42 +20,12 @@ def dump(basename, resp): ...@@ -22,42 +20,12 @@ def dump(basename, resp):
def db_flush(): def db_flush():
db.session.rollback() db.session.rollback()
db.session = db.create_scoped_session() db.session.expire_all()
class UffdTestCase(unittest.TestCase): class AppTestCase(unittest.TestCase):
def get_user(self): DISABLE_SQLITE_MEMORY_DB = False
return User.query.filter_by(loginname='testuser').one_or_none()
def get_admin(self):
return User.query.filter_by(loginname='testadmin').one_or_none()
def get_admin_group(self):
return Group.query.filter_by(name='uffd_admin').one_or_none()
def get_access_group(self):
return Group.query.filter_by(name='uffd_access').one_or_none()
def get_users_group(self):
return Group.query.filter_by(name='users').one_or_none()
def get_mail(self):
return Mail.query.filter_by(uid='test').one_or_none()
def login_as(self, user, ref=None):
loginname = None
password = None
if user == 'user':
loginname = 'testuser'
password = 'userpassword'
elif user == 'admin':
loginname = 'testadmin'
password = 'adminpassword'
return self.client.post(path=url_for('session.login', ref=ref),
data={'loginname': loginname, 'password': password}, follow_redirects=True)
def setUp(self): def setUp(self):
# It would be far better to create a minimal app here, but since the
# session module depends on almost everything else, that is not really feasable
config = { config = {
'TESTING': True, 'TESTING': True,
'DEBUG': True, 'DEBUG': True,
...@@ -66,9 +34,77 @@ class UffdTestCase(unittest.TestCase): ...@@ -66,9 +34,77 @@ class UffdTestCase(unittest.TestCase):
'MAIL_SKIP_SEND': True, 'MAIL_SKIP_SEND': True,
'SELF_SIGNUP': True, 'SELF_SIGNUP': True,
} }
if self.DISABLE_SQLITE_MEMORY_DB:
try:
os.remove('/tmp/uffd-migration-test-db.sqlite3')
except FileNotFoundError:
pass
config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/uffd-migration-test-db.sqlite3'
if os.environ.get('TEST_WITH_MYSQL'):
import MySQLdb
conn = MySQLdb.connect(user='root', unix_socket='/var/run/mysqld/mysqld.sock')
cur = conn.cursor()
try:
cur.execute('DROP DATABASE uffd_tests')
except:
pass
cur.execute('CREATE DATABASE uffd_tests CHARACTER SET utf8mb4 COLLATE utf8mb4_nopad_bin')
conn.close()
config['SQLALCHEMY_DATABASE_URI'] = 'mysql+mysqldb:///uffd_tests?unix_socket=/var/run/mysqld/mysqld.sock&charset=utf8mb4'
self.app = create_app(config) self.app = create_app(config)
self.setUpApp() self.setUpApp()
def setUpApp(self):
pass
def tearDown(self):
if self.DISABLE_SQLITE_MEMORY_DB:
try:
os.remove('/tmp/uffd-migration-test-db.sqlite3')
except FileNotFoundError:
pass
class MigrationTestCase(AppTestCase):
DISABLE_SQLITE_MEMORY_DB = True
REVISION = None
def setUp(self):
super().setUp()
self.request_context = self.app.test_request_context()
self.request_context.__enter__()
if self.REVISION:
flask_migrate.upgrade(revision=self.REVISION + '-1')
def upgrade(self, revision='+1'):
db.session.commit()
flask_migrate.upgrade(revision=revision)
def downgrade(self, revision='-1'):
db.session.commit()
flask_migrate.downgrade(revision=revision)
def tearDown(self):
db.session.rollback()
self.request_context.__exit__(None, None, None)
super().tearDown()
class ModelTestCase(AppTestCase):
def setUp(self):
super().setUp()
self.request_context = self.app.test_request_context()
self.request_context.__enter__()
db.create_all()
db.session.commit()
def tearDown(self):
db.session.rollback()
self.request_context.__exit__(None, None, None)
super().tearDown()
class UffdTestCase(AppTestCase):
def setUp(self):
super().setUp()
self.client = self.app.test_client() self.client = self.app.test_client()
self.client.__enter__() self.client.__enter__()
# Just do some request so that we can use url_for # Just do some request so that we can use url_for
...@@ -81,16 +117,51 @@ class UffdTestCase(unittest.TestCase): ...@@ -81,16 +117,51 @@ class UffdTestCase(unittest.TestCase):
db.session.add(access_group) db.session.add(access_group)
admin_group = Group(name='uffd_admin', unix_gid=20003, description='Admin access to uffd') admin_group = Group(name='uffd_admin', unix_gid=20003, description='Admin access to uffd')
db.session.add(admin_group) db.session.add(admin_group)
testuser = User(loginname='testuser', unix_uid=10000, password='userpassword', mail='test@example.com', displayname='Test User', groups=[users_group, access_group]) testuser = User(loginname='testuser', unix_uid=10000, password='userpassword', primary_email_address='test@example.com', displayname='Test User', groups=[users_group, access_group])
db.session.add(testuser) db.session.add(testuser)
testadmin = User(loginname='testadmin', unix_uid=10001, password='adminpassword', mail='admin@example.com', displayname='Test Admin', groups=[users_group, access_group, admin_group]) testadmin = User(loginname='testadmin', unix_uid=10001, password='adminpassword', primary_email_address='admin@example.com', displayname='Test Admin', groups=[users_group, access_group, admin_group])
db.session.add(testadmin) db.session.add(testadmin)
testmail = Mail(uid='test', receivers=['test1@example.com', 'test2@example.com'], destinations=['testuser@mail.example.com']) testmail = Mail(uid='test', receivers=['test1@example.com', 'test2@example.com'], destinations=['testuser@mail.example.com'])
db.session.add(testmail) db.session.add(testmail)
self.setUpDB()
db.session.commit() db.session.commit()
def setUpApp(self): def setUpDB(self):
pass pass
def tearDown(self): def tearDown(self):
self.client.__exit__(None, None, None) self.client.__exit__(None, None, None)
super().tearDown()
def get_user(self):
return User.query.filter_by(loginname='testuser').one_or_none()
def get_admin(self):
return User.query.filter_by(loginname='testadmin').one_or_none()
def get_admin_group(self):
return Group.query.filter_by(name='uffd_admin').one_or_none()
def get_access_group(self):
return Group.query.filter_by(name='uffd_access').one_or_none()
def get_users_group(self):
return Group.query.filter_by(name='users').one_or_none()
def get_mail(self):
return Mail.query.filter_by(uid='test').one_or_none()
def login_as(self, user, ref=None):
# It is currently not possible to login while already logged in as another
# user, so make sure that we are not logged in first
self.client.get(path=url_for('session.logout'), follow_redirects=True)
loginname = None
password = None
if user == 'user':
loginname = 'testuser'
password = 'userpassword'
elif user == 'admin':
loginname = 'testadmin'
password = 'adminpassword'
return self.client.post(path=url_for('session.login', ref=ref),
data={'loginname': loginname, 'password': password}, follow_redirects=True)
This diff is collapsed.
import datetime
import time
import unittest import unittest
from flask import url_for, session from flask import url_for
# These imports are required, because otherwise we get circular imports?! from uffd.database import db
from uffd import user from uffd.models import Mail
from uffd.mail.models import Mail from tests.utils import dump, UffdTestCase
from uffd import create_app, db
from utils import dump, UffdTestCase, db_flush
class TestMailViews(UffdTestCase): class TestMailViews(UffdTestCase):
def setUp(self): def setUp(self):
...@@ -31,7 +26,7 @@ class TestMailViews(UffdTestCase): ...@@ -31,7 +26,7 @@ class TestMailViews(UffdTestCase):
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
def test_show(self): def test_show(self):
r = self.client.get(path=url_for('mail.show', uid=self.get_mail().uid), follow_redirects=True) r = self.client.get(path=url_for('mail.show', mai_id=self.get_mail().id), follow_redirects=True)
dump('mail_show', r) dump('mail_show', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -46,7 +41,7 @@ class TestMailViews(UffdTestCase): ...@@ -46,7 +41,7 @@ class TestMailViews(UffdTestCase):
self.assertEqual(m.uid, 'test') self.assertEqual(m.uid, 'test')
self.assertEqual(sorted(m.receivers), ['test1@example.com', 'test2@example.com']) self.assertEqual(sorted(m.receivers), ['test1@example.com', 'test2@example.com'])
self.assertEqual(sorted(m.destinations), ['testuser@mail.example.com']) self.assertEqual(sorted(m.destinations), ['testuser@mail.example.com'])
r = self.client.post(path=url_for('mail.update', uid=m.uid), r = self.client.post(path=url_for('mail.update', mail_id=m.id),
data={'mail-uid': 'test1', 'mail-receivers': 'foo@bar.com\ntest@bar.com', data={'mail-uid': 'test1', 'mail-receivers': 'foo@bar.com\ntest@bar.com',
'mail-destinations': 'testuser@mail.example.com\ntestadmin@mail.example.com'}, follow_redirects=True) 'mail-destinations': 'testuser@mail.example.com\ntestadmin@mail.example.com'}, follow_redirects=True)
dump('mail_update', r) dump('mail_update', r)
...@@ -68,7 +63,7 @@ class TestMailViews(UffdTestCase): ...@@ -68,7 +63,7 @@ class TestMailViews(UffdTestCase):
self.assertEqual(sorted(m.receivers), ['foo@bar.com', 'test@bar.com']) self.assertEqual(sorted(m.receivers), ['foo@bar.com', 'test@bar.com'])
self.assertEqual(sorted(m.destinations), ['testadmin@mail.example.com', 'testuser@mail.example.com']) self.assertEqual(sorted(m.destinations), ['testadmin@mail.example.com', 'testuser@mail.example.com'])
@unittest.skip('We do not catch LDAP errors at the moment!') # TODO: Not sure if necessary @unittest.skip('We do not catch DB errors at the moment!') # TODO
def test_create_error(self): def test_create_error(self):
r = self.client.post(path=url_for('mail.update'), r = self.client.post(path=url_for('mail.update'),
data={'mail-uid': 'test', 'mail-receivers': 'foo@bar.com\ntest@bar.com', data={'mail-uid': 'test', 'mail-receivers': 'foo@bar.com\ntest@bar.com',
...@@ -83,7 +78,7 @@ class TestMailViews(UffdTestCase): ...@@ -83,7 +78,7 @@ class TestMailViews(UffdTestCase):
def test_delete(self): def test_delete(self):
self.assertIsNotNone(self.get_mail()) self.assertIsNotNone(self.get_mail())
r = self.client.get(path=url_for('mail.delete', uid=self.get_mail().uid), follow_redirects=True) r = self.client.get(path=url_for('mail.delete', mail_id=self.get_mail().id), follow_redirects=True)
dump('mail_delete', r) dump('mail_delete', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertIsNone(self.get_mail()) self.assertIsNone(self.get_mail())
This diff is collapsed.
from flask import url_for
from uffd.database import db
from uffd.models import User, Role, RoleGroup
from tests.utils import dump, UffdTestCase
class TestRoleViews(UffdTestCase):
def setUp(self):
super().setUp()
self.login_as('admin')
def test_index(self):
db.session.add(Role(name='base', description='Base role description'))
db.session.add(Role(name='test1', description='Test1 role description'))
db.session.commit()
r = self.client.get(path=url_for('role.index'), follow_redirects=True)
dump('role_index', r)
self.assertEqual(r.status_code, 200)
def test_index_empty(self):
r = self.client.get(path=url_for('role.index'), follow_redirects=True)
dump('role_index_empty', r)
self.assertEqual(r.status_code, 200)
def test_show(self):
role = Role(name='base', description='Base role description')
db.session.add(role)
db.session.commit()
r = self.client.get(path=url_for('role.show', roleid=role.id), follow_redirects=True)
dump('role_show', r)
self.assertEqual(r.status_code, 200)
def test_new(self):
r = self.client.get(path=url_for('role.new'), follow_redirects=True)
dump('role_new', r)
self.assertEqual(r.status_code, 200)
def test_update(self):
role = Role(name='base', description='Base role description')
db.session.add(role)
db.session.commit()
role.groups[self.get_admin_group()] = RoleGroup()
db.session.commit()
self.assertEqual(role.name, 'base')
self.assertEqual(role.description, 'Base role description')
self.assertSetEqual(set(role.groups), {self.get_admin_group()})
r = self.client.post(path=url_for('role.update', roleid=role.id),
data={'name': 'base1', 'description': 'Base role description1', 'moderator-group': '', 'group-%d'%self.get_users_group().id: '1', 'group-%d'%self.get_access_group().id: '1'},
follow_redirects=True)
dump('role_update', r)
self.assertEqual(r.status_code, 200)
role = Role.query.get(role.id)
self.assertEqual(role.name, 'base1')
self.assertEqual(role.description, 'Base role description1')
self.assertSetEqual(set(role.groups), {self.get_access_group(), self.get_users_group()})
# TODO: verify that group memberships are updated
def test_create(self):
self.assertIsNone(Role.query.filter_by(name='base').first())
r = self.client.post(path=url_for('role.update'),
data={'name': 'base', 'description': 'Base role description', 'moderator-group': '', 'group-%d'%self.get_users_group().id: '1', 'group-%d'%self.get_access_group().id: '1'},
follow_redirects=True)
dump('role_create', r)
self.assertEqual(r.status_code, 200)
role = Role.query.filter_by(name='base').first()
self.assertIsNotNone(role)
self.assertEqual(role.name, 'base')
self.assertEqual(role.description, 'Base role description')
self.assertSetEqual(set(role.groups), {self.get_access_group(), self.get_users_group()})
# TODO: verify that group memberships are updated (currently not possible with ldap mock!)
def test_create_with_moderator_group(self):
self.assertIsNone(Role.query.filter_by(name='base').first())
r = self.client.post(path=url_for('role.update'),
data={'name': 'base', 'description': 'Base role description', 'moderator-group': self.get_admin_group().id, 'group-%d'%self.get_users_group().id: '1', 'group-%d'%self.get_access_group().id: '1'},
follow_redirects=True)
self.assertEqual(r.status_code, 200)
role = Role.query.filter_by(name='base').first()
self.assertIsNotNone(role)
self.assertEqual(role.name, 'base')
self.assertEqual(role.description, 'Base role description')
self.assertEqual(role.moderator_group.name, 'uffd_admin')
self.assertSetEqual(set(role.groups), {self.get_access_group(), self.get_users_group()})
# TODO: verify that group memberships are updated (currently not possible with ldap mock!)
def test_delete(self):
role = Role(name='base', description='Base role description')
db.session.add(role)
db.session.commit()
role_id = role.id
self.assertIsNotNone(Role.query.get(role_id))
r = self.client.get(path=url_for('role.delete', roleid=role.id), follow_redirects=True)
dump('role_delete', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(Role.query.get(role_id))
# TODO: verify that group memberships are updated (currently not possible with ldap mock!)
def test_set_default(self):
db.session.add(User(loginname='service', is_service_user=True, primary_email_address='service@example.com', displayname='Service'))
db.session.commit()
role = Role(name='test')
db.session.add(role)
role.groups[self.get_admin_group()] = RoleGroup()
user1 = self.get_user()
user2 = self.get_admin()
service_user = User.query.filter_by(loginname='service').one_or_none()
self.assertSetEqual(set(self.get_user().roles_effective), set())
self.assertSetEqual(set(self.get_admin().roles_effective), set())
self.assertSetEqual(set(service_user.roles_effective), set())
role.members.append(self.get_user())
role.members.append(service_user)
self.assertSetEqual(set(self.get_user().roles_effective), {role})
self.assertSetEqual(set(self.get_admin().roles_effective), set())
self.assertSetEqual(set(service_user.roles_effective), {role})
db.session.commit()
role_id = role.id
self.assertSetEqual(set(role.members), {self.get_user(), service_user})
r = self.client.get(path=url_for('role.set_default', roleid=role.id), follow_redirects=True)
dump('role_set_default', r)
self.assertEqual(r.status_code, 200)
role = Role.query.get(role_id)
service_user = User.query.filter_by(loginname='service').one_or_none()
self.assertSetEqual(set(role.members), {service_user})
self.assertSetEqual(set(self.get_user().roles_effective), {role})
self.assertSetEqual(set(self.get_admin().roles_effective), {role})
def test_unset_default(self):
admin_role = Role(name='admin', is_default=True)
db.session.add(admin_role)
admin_role.groups[self.get_admin_group()] = RoleGroup()
db.session.add(User(loginname='service', is_service_user=True, primary_email_address='service@example.com', displayname='Service'))
db.session.commit()
role = Role(name='test', is_default=True)
db.session.add(role)
service_user = User.query.filter_by(loginname='service').one_or_none()
role.members.append(service_user)
self.assertSetEqual(set(self.get_user().roles_effective), {role, admin_role})
self.assertSetEqual(set(self.get_admin().roles_effective), {role, admin_role})
self.assertSetEqual(set(service_user.roles_effective), {role})
db.session.commit()
role_id = role.id
admin_role_id = admin_role.id
self.assertSetEqual(set(role.members), {service_user})
r = self.client.get(path=url_for('role.unset_default', roleid=role.id), follow_redirects=True)
dump('role_unset_default', r)
self.assertEqual(r.status_code, 200)
role = Role.query.get(role_id)
admin_role = Role.query.get(admin_role_id)
service_user = User.query.filter_by(loginname='service').one_or_none()
self.assertSetEqual(set(role.members), {service_user})
self.assertSetEqual(set(self.get_user().roles_effective), {admin_role})
self.assertSetEqual(set(self.get_admin().roles_effective), {admin_role})
from flask import url_for from flask import url_for
from uffd.user.models import User, Group
from uffd.role.models import Role, RoleGroup
from uffd.database import db from uffd.database import db
from uffd.models import Role, RoleGroup
from utils import dump, UffdTestCase from tests.utils import dump, UffdTestCase
class TestRolemodViewsLoggedOut(UffdTestCase): class TestRolemodViewsLoggedOut(UffdTestCase):
def test_acl_nologin(self): def test_acl_nologin(self):
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
from .views import bp as _bp
bp = [_bp]
This diff is collapsed.
[python: **.py] [python: **.py]
[jinja2: **/templates/**.html] [jinja2: **/templates/**.html]
extensions=jinja2.ext.autoescape,jinja2.ext.with_
from .user import user_command
from .group import group_command
from .role import role_command
from .profile import profile_command
from .gendevcert import gendevcert_command
from .cleanup import cleanup_command
from .roles_update_all import roles_update_all_command
from .unique_email_addresses import unique_email_addresses_command
def init_app(app):
app.cli.add_command(user_command)
app.cli.add_command(group_command)
app.cli.add_command(role_command)
app.cli.add_command(gendevcert_command)
app.cli.add_command(profile_command)
app.cli.add_command(cleanup_command)
app.cli.add_command(roles_update_all_command)
app.cli.add_command(unique_email_addresses_command)
import click
from flask.cli import with_appcontext
from uffd.tasks import cleanup_task
from uffd.database import db
@click.command('cleanup', help='Cleanup expired data')
@with_appcontext
def cleanup_command():
cleanup_task.run()
db.session.commit()