Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Select Git revision
Show changes
Showing
with 3404 additions and 1063 deletions
import datetime
import unittest
from flask import url_for
# These imports are required, because otherwise we get circular imports?!
from uffd import user
from utils import dump, UffdTestCase
class TestServices(UffdTestCase):
def setUpApp(self):
self.app.config['SERVICES'] = [
{
'title': 'Service Title',
'subtitle': 'Service Subtitle',
'description': 'Short description of the service as plain text',
'url': 'https://example.com/',
'logo_url': '/static/fairy-dust-color.png',
'required_group': 'users',
'permission_levels': [
{'name': 'Moderator', 'required_group': 'moderators'},
{'name': 'Admin', 'required_group': 'uffd_admin'},
],
'confidential': True,
'groups': [
{'name': 'Group "crew_crew"', 'required_group': 'users'},
{'name': 'Group "crew_logistik"', 'required_group': 'uffd_admin'},
],
'infos': [
{'title': 'Documentation', 'html': '<p>Some information about the service as html</p>', 'required_group': 'users'},
],
'links': [
{'title': 'Link to an external site', 'url': '#', 'required_group': 'users'},
],
},
{
'title': 'Minimal Service Title',
}
]
self.app.config['SERVICES_PUBLIC'] = True
def test_index(self):
r = self.client.get(path=url_for('services.index'))
dump('services_index_public', r)
self.assertEqual(r.status_code, 200)
self.assertNotIn(b'https://example.com/', r.data)
self.login_as('user')
r = self.client.get(path=url_for('services.index'))
dump('services_index', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'https://example.com/', r.data)
import unittest
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from uffd.tasks import CleanupTask
class TestCleanupTask(unittest.TestCase):
def test(self):
app = Flask(__name__)
app.testing = True
app.debug = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
db = SQLAlchemy(app)
cleanup_task = CleanupTask()
@cleanup_task.delete_by_attribute('delete_me')
class TestModel(db.Model):
id = db.Column(db.Integer(), primary_key=True, autoincrement=True)
delete_me = db.Column(db.Boolean(), default=False, nullable=False)
with app.test_request_context():
db.create_all()
db.session.add(TestModel(delete_me=True))
db.session.add(TestModel(delete_me=True))
db.session.add(TestModel(delete_me=True))
db.session.add(TestModel(delete_me=False))
db.session.add(TestModel(delete_me=False))
db.session.commit()
db.session.expire_all()
self.assertEqual(TestModel.query.count(), 5)
with app.test_request_context():
cleanup_task.run()
db.session.commit()
db.session.expire_all()
with app.test_request_context():
self.assertEqual(TestModel.query.count(), 2)
from uffd.utils import nopad_b32decode, nopad_b32encode, nopad_urlsafe_b64decode, nopad_urlsafe_b64encode
from tests.utils import UffdTestCase
class TestUtils(UffdTestCase):
def test_nopad_b32(self):
for n in range(0, 32):
self.assertEqual(b'X'*n, nopad_b32decode(nopad_b32encode(b'X'*n)))
def test_nopad_b64(self):
for n in range(0, 32):
self.assertEqual(b'X'*n, nopad_urlsafe_b64decode(nopad_urlsafe_b64encode(b'X'*n)))
import os
import tempfile
import shutil
import unittest
from flask import request, url_for
from flask import url_for
import flask_migrate
from uffd import create_app, db
from uffd.user.models import User, Group
from uffd.mail.models import Mail
from uffd.models import User, Group, Mail
def dump(basename, resp):
basename = basename.replace('.', '_').replace('/', '_')
......@@ -22,42 +20,12 @@ def dump(basename, resp):
def db_flush():
db.session.rollback()
db.session = db.create_scoped_session()
db.session.expire_all()
class UffdTestCase(unittest.TestCase):
def get_user(self):
return User.query.filter_by(loginname='testuser').one_or_none()
def get_admin(self):
return User.query.filter_by(loginname='testadmin').one_or_none()
def get_admin_group(self):
return Group.query.filter_by(name='uffd_admin').one_or_none()
def get_access_group(self):
return Group.query.filter_by(name='uffd_access').one_or_none()
def get_users_group(self):
return Group.query.filter_by(name='users').one_or_none()
def get_mail(self):
return Mail.query.filter_by(uid='test').one_or_none()
def login_as(self, user, ref=None):
loginname = None
password = None
if user == 'user':
loginname = 'testuser'
password = 'userpassword'
elif user == 'admin':
loginname = 'testadmin'
password = 'adminpassword'
return self.client.post(path=url_for('session.login', ref=ref),
data={'loginname': loginname, 'password': password}, follow_redirects=True)
class AppTestCase(unittest.TestCase):
DISABLE_SQLITE_MEMORY_DB = False
def setUp(self):
# It would be far better to create a minimal app here, but since the
# session module depends on almost everything else, that is not really feasable
config = {
'TESTING': True,
'DEBUG': True,
......@@ -65,12 +33,78 @@ class UffdTestCase(unittest.TestCase):
'SECRET_KEY': 'DEBUGKEY',
'MAIL_SKIP_SEND': True,
'SELF_SIGNUP': True,
'ENABLE_INVITE': True,
'ENABLE_PASSWORDRESET': True
}
if self.DISABLE_SQLITE_MEMORY_DB:
try:
os.remove('/tmp/uffd-migration-test-db.sqlite3')
except FileNotFoundError:
pass
config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/uffd-migration-test-db.sqlite3'
if os.environ.get('TEST_WITH_MYSQL'):
import MySQLdb
conn = MySQLdb.connect(user='root', unix_socket='/var/run/mysqld/mysqld.sock')
cur = conn.cursor()
try:
cur.execute('DROP DATABASE uffd_tests')
except:
pass
cur.execute('CREATE DATABASE uffd_tests CHARACTER SET utf8mb4 COLLATE utf8mb4_nopad_bin')
conn.close()
config['SQLALCHEMY_DATABASE_URI'] = 'mysql+mysqldb:///uffd_tests?unix_socket=/var/run/mysqld/mysqld.sock&charset=utf8mb4'
self.app = create_app(config)
self.setUpApp()
def setUpApp(self):
pass
def tearDown(self):
if self.DISABLE_SQLITE_MEMORY_DB:
try:
os.remove('/tmp/uffd-migration-test-db.sqlite3')
except FileNotFoundError:
pass
class MigrationTestCase(AppTestCase):
DISABLE_SQLITE_MEMORY_DB = True
REVISION = None
def setUp(self):
super().setUp()
self.request_context = self.app.test_request_context()
self.request_context.__enter__()
if self.REVISION:
flask_migrate.upgrade(revision=self.REVISION + '-1')
def upgrade(self, revision='+1'):
db.session.commit()
flask_migrate.upgrade(revision=revision)
def downgrade(self, revision='-1'):
db.session.commit()
flask_migrate.downgrade(revision=revision)
def tearDown(self):
db.session.rollback()
self.request_context.__exit__(None, None, None)
super().tearDown()
class ModelTestCase(AppTestCase):
def setUp(self):
super().setUp()
self.request_context = self.app.test_request_context()
self.request_context.__enter__()
db.create_all()
db.session.commit()
def tearDown(self):
db.session.rollback()
self.request_context.__exit__(None, None, None)
super().tearDown()
class UffdTestCase(AppTestCase):
def setUp(self):
super().setUp()
self.client = self.app.test_client()
self.client.__enter__()
# Just do some request so that we can use url_for
......@@ -83,16 +117,51 @@ class UffdTestCase(unittest.TestCase):
db.session.add(access_group)
admin_group = Group(name='uffd_admin', unix_gid=20003, description='Admin access to uffd')
db.session.add(admin_group)
testuser = User(loginname='testuser', unix_uid=10000, password='userpassword', mail='test@example.com', displayname='Test User', groups=[users_group, access_group])
testuser = User(loginname='testuser', unix_uid=10000, password='userpassword', primary_email_address='test@example.com', displayname='Test User', groups=[users_group, access_group])
db.session.add(testuser)
testadmin = User(loginname='testadmin', unix_uid=10001, password='adminpassword', mail='admin@example.com', displayname='Test Admin', groups=[users_group, access_group, admin_group])
testadmin = User(loginname='testadmin', unix_uid=10001, password='adminpassword', primary_email_address='admin@example.com', displayname='Test Admin', groups=[users_group, access_group, admin_group])
db.session.add(testadmin)
testmail = Mail(uid='test', receivers=['test1@example.com', 'test2@example.com'], destinations=['testuser@mail.example.com'])
db.session.add(testmail)
self.setUpDB()
db.session.commit()
def setUpApp(self):
def setUpDB(self):
pass
def tearDown(self):
self.client.__exit__(None, None, None)
super().tearDown()
def get_user(self):
return User.query.filter_by(loginname='testuser').one_or_none()
def get_admin(self):
return User.query.filter_by(loginname='testadmin').one_or_none()
def get_admin_group(self):
return Group.query.filter_by(name='uffd_admin').one_or_none()
def get_access_group(self):
return Group.query.filter_by(name='uffd_access').one_or_none()
def get_users_group(self):
return Group.query.filter_by(name='users').one_or_none()
def get_mail(self):
return Mail.query.filter_by(uid='test').one_or_none()
def login_as(self, user, ref=None):
# It is currently not possible to login while already logged in as another
# user, so make sure that we are not logged in first
self.client.get(path=url_for('session.logout'), follow_redirects=True)
loginname = None
password = None
if user == 'user':
loginname = 'testuser'
password = 'userpassword'
elif user == 'admin':
loginname = 'testadmin'
password = 'adminpassword'
return self.client.post(path=url_for('session.login', ref=ref),
data={'loginname': loginname, 'password': password}, follow_redirects=True)
This diff is collapsed.
import datetime
import time
import unittest
from flask import url_for, session
from flask import url_for
# These imports are required, because otherwise we get circular imports?!
from uffd import user
from uffd.database import db
from uffd.models import Mail
from uffd.mail.models import Mail
from uffd import create_app, db
from utils import dump, UffdTestCase, db_flush
from tests.utils import dump, UffdTestCase
class TestMailViews(UffdTestCase):
def setUp(self):
......@@ -31,7 +26,7 @@ class TestMailViews(UffdTestCase):
self.assertEqual(r.status_code, 200)
def test_show(self):
r = self.client.get(path=url_for('mail.show', uid=self.get_mail().uid), follow_redirects=True)
r = self.client.get(path=url_for('mail.show', mai_id=self.get_mail().id), follow_redirects=True)
dump('mail_show', r)
self.assertEqual(r.status_code, 200)
......@@ -46,7 +41,7 @@ class TestMailViews(UffdTestCase):
self.assertEqual(m.uid, 'test')
self.assertEqual(sorted(m.receivers), ['test1@example.com', 'test2@example.com'])
self.assertEqual(sorted(m.destinations), ['testuser@mail.example.com'])
r = self.client.post(path=url_for('mail.update', uid=m.uid),
r = self.client.post(path=url_for('mail.update', mail_id=m.id),
data={'mail-uid': 'test1', 'mail-receivers': 'foo@bar.com\ntest@bar.com',
'mail-destinations': 'testuser@mail.example.com\ntestadmin@mail.example.com'}, follow_redirects=True)
dump('mail_update', r)
......@@ -68,7 +63,7 @@ class TestMailViews(UffdTestCase):
self.assertEqual(sorted(m.receivers), ['foo@bar.com', 'test@bar.com'])
self.assertEqual(sorted(m.destinations), ['testadmin@mail.example.com', 'testuser@mail.example.com'])
@unittest.skip('We do not catch LDAP errors at the moment!') # TODO: Not sure if necessary
@unittest.skip('We do not catch DB errors at the moment!') # TODO
def test_create_error(self):
r = self.client.post(path=url_for('mail.update'),
data={'mail-uid': 'test', 'mail-receivers': 'foo@bar.com\ntest@bar.com',
......@@ -83,7 +78,7 @@ class TestMailViews(UffdTestCase):
def test_delete(self):
self.assertIsNotNone(self.get_mail())
r = self.client.get(path=url_for('mail.delete', uid=self.get_mail().uid), follow_redirects=True)
r = self.client.get(path=url_for('mail.delete', mail_id=self.get_mail().id), follow_redirects=True)
dump('mail_delete', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(self.get_mail())
This diff is collapsed.
This diff is collapsed.
from flask import url_for
from uffd.user.models import User, Group
from uffd.role.models import Role, RoleGroup
from uffd.database import db
from uffd.models import Role, RoleGroup
from utils import dump, UffdTestCase
from tests.utils import dump, UffdTestCase
class TestRolemodViewsLoggedOut(UffdTestCase):
def test_acl_nologin(self):
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
from .views import bp as _bp
bp = [_bp]
import functools
import secrets
from flask import Blueprint, jsonify, current_app, request, abort
from uffd.user.models import User, Group
from uffd.mail.models import Mail, MailReceiveAddress, MailDestinationAddress
from uffd.session.views import login_get_user, login_ratelimit
bp = Blueprint('api', __name__, template_folder='templates', url_prefix='/api/v1/')
def apikey_required(scope=None):
def wrapper(func):
@functools.wraps(func)
def decorator(*args, **kwargs):
if 'Authorization' not in request.headers or not request.headers['Authorization'].startswith('Bearer '):
return 'Unauthorized', 401, {'WWW-Authenticate': 'Bearer'}
token = request.headers['Authorization'][7:].strip()
request.api_client = None
for client_token, client in current_app.config['API_CLIENTS'].items():
if secrets.compare_digest(client_token, token):
request.api_client = client
if request.api_client is None:
return 'Unauthorized', 401, {'WWW-Authenticate': 'Bearer error="invalid_token"'}
if scope is not None and scope not in request.api_client.get('scopes', []):
return 'Unauthorized', 401, {'WWW-Authenticate': 'Bearer error="insufficient_scope",scope="%s"'%scope}
return func(*args, **kwargs)
return decorator
return wrapper
def generate_group_dict(group):
return {'id': group.unix_gid, 'name': group.name,
'members': [user.loginname for user in group.members]}
@bp.route('/getgroups', methods=['GET', 'POST'])
@apikey_required()
def getgroups():
if len(request.values) > 1:
abort(400)
key = (list(request.values.keys()) or [None])[0]
values = request.values.getlist(key)
if key is None:
groups = Group.query.all()
elif key == 'id' and len(values) == 1:
groups = Group.query.filter_by(unix_gid=values[0]).all()
elif key == 'name' and len(values) == 1:
groups = Group.query.filter_by(name=values[0]).all()
elif key == 'member' and len(values) == 1:
user = User.query.filter_by(loginname=values[0]).one_or_none()
groups = [] if user is None else user.groups
else:
abort(400)
return jsonify([generate_group_dict(group) for group in groups])
def generate_user_dict(user, all_groups=None):
if all_groups is None:
all_groups = user.groups
return {'id': user.unix_uid, 'loginname': user.loginname, 'email': user.mail, 'displayname': user.displayname,
'groups': [group.name for group in all_groups if user in group.members]}
@bp.route('/getusers', methods=['GET', 'POST'])
@apikey_required()
def getusers():
if len(request.values) > 1:
abort(400)
key = (list(request.values.keys()) or [None])[0]
values = request.values.getlist(key)
if key is None:
users = User.query.all()
elif key == 'id' and len(values) == 1:
users = User.query.filter_by(unix_uid=values[0]).all()
elif key == 'loginname' and len(values) == 1:
users = User.query.filter_by(loginname=values[0]).all()
elif key == 'email' and len(values) == 1:
users = User.query.filter_by(mail=values[0]).all()
elif key == 'group' and len(values) == 1:
group = Group.query.filter_by(name=values[0]).one_or_none()
users = [] if group is None else group.members
else:
abort(400)
all_groups = None
if len(users) > 1:
all_groups = Group.query.all()
return jsonify([generate_user_dict(user, all_groups) for user in users])
@bp.route('/checkpassword', methods=['POST'])
@apikey_required('checkpassword')
def checkpassword():
if set(request.values.keys()) != {'loginname', 'password'}:
abort(400)
username = request.form['loginname'].lower()
password = request.form['password']
login_delay = login_ratelimit.get_delay(username)
if login_delay:
return 'Too Many Requests', 429, {'Retry-After': '%d'%login_delay}
user = login_get_user(username, password)
if user is None:
login_ratelimit.log(username)
return jsonify(None)
return jsonify(generate_user_dict(user))
def generate_mail_dict(mail):
return {'name': mail.uid, 'receive_addresses': list(mail.receivers),
'destination_addresses': list(mail.destinations)}
@bp.route('/getmails', methods=['GET', 'POST'])
@apikey_required('getmails')
def getmails():
if len(request.values) > 1:
abort(400)
key = (list(request.values.keys()) or [None])[0]
values = request.values.getlist(key)
if key is None:
mails = Mail.query.all()
elif key == 'name' and len(values) == 1:
mails = Mail.query.filter_by(uid=values[0]).all()
elif key == 'receive_address' and len(values) == 1:
mails = Mail.query.filter(Mail.receivers.any(MailReceiveAddress.address==values[0])).all()
elif key == 'destination_address' and len(values) == 1:
mails = Mail.query.filter(Mail.destinations.any(MailDestinationAddress.address==values[0])).all()
else:
abort(400)
return jsonify([generate_mail_dict(mail) for mail in mails])
[python: **.py]
[jinja2: **/templates/**.html]
extensions=jinja2.ext.autoescape,jinja2.ext.with_