diff --git a/tests/test_api.py b/tests/test_api.py index 46c6fefba59f42f15040e7cec0aebc6cd09faa8f..1e2213870454a0c60b3505d0e4acd0b6c57b2ea3 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -2,9 +2,10 @@ import base64 from flask import url_for -from uffd.models import APIClient, Service, User, remailer from uffd.password_hash import PlaintextPasswordHash +from uffd.remailer import remailer from uffd.database import db +from uffd.models import APIClient, Service, User from uffd.views.api import apikey_required from utils import UffdTestCase, db_flush @@ -148,7 +149,7 @@ class TestAPIGetusers(UffdTestCase): self.assertEqual(r.status_code, 200) service = Service.query.filter_by(name='test').one() self.assertEqual(self.fix_result(r.json), [ - {'displayname': 'Test User', 'email': remailer.build_address(user, service), 'id': 10000, 'loginname': 'testuser', 'groups': ['uffd_access', 'users']}, + {'displayname': 'Test User', 'email': remailer.build_address(service.id, user.id), 'id': 10000, 'loginname': 'testuser', 'groups': ['uffd_access', 'users']}, ]) def test_loginname(self): @@ -176,11 +177,11 @@ class TestAPIGetusers(UffdTestCase): db.session.commit() user = self.get_admin() self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' - r = self.client.get(path=url_for('api.getusers', email=remailer.build_address(user, service)), headers=[basic_auth('test', 'test')], follow_redirects=True) + r = self.client.get(path=url_for('api.getusers', email=remailer.build_address(service.id, user.id)), headers=[basic_auth('test', 'test')], follow_redirects=True) self.assertEqual(r.status_code, 200) service = Service.query.filter_by(name='test').one() self.assertEqual(self.fix_result(r.json), [ - {'displayname': 'Test Admin', 'email': remailer.build_address(user, service), 'id': 10001, 'loginname': 'testadmin', 'groups': ['uffd_access', 'uffd_admin', 'users']} + {'displayname': 'Test Admin', 'email': remailer.build_address(service.id, user.id), 'id': 10001, 'loginname': 'testadmin', 'groups': ['uffd_access', 'uffd_admin', 'users']} ]) def test_email_empty(self): @@ -266,7 +267,7 @@ class TestAPIRemailerResolve(UffdTestCase): self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' service = Service.query.filter_by(name='service2').one() user = self.get_user() - r = self.client.get(path=url_for('api.resolve_remailer', orig_address=remailer.build_address(user, service)), headers=[basic_auth('test', 'test')], follow_redirects=True) + r = self.client.get(path=url_for('api.resolve_remailer', orig_address=remailer.build_address(service.id, user.id)), headers=[basic_auth('test', 'test')], follow_redirects=True) self.assertEqual(r.status_code, 200) self.assertEqual(r.json, {'address': user.mail}) r = self.client.get(path=url_for('api.resolve_remailer', orig_address='foo@bar'), headers=[basic_auth('test', 'test')], follow_redirects=True) diff --git a/tests/test_oauth2.py b/tests/test_oauth2.py index cd612b4a02d96472b0eca35351c7defa6ffb4688..990e2c13238fe54da45c71af8fedf449957da5cb 100644 --- a/tests/test_oauth2.py +++ b/tests/test_oauth2.py @@ -5,7 +5,8 @@ from flask import url_for, session from uffd import create_app, db from uffd.password_hash import PlaintextPasswordHash -from uffd.models import User, remailer, DeviceLoginConfirmation, Service, OAuth2Client, OAuth2DeviceLoginInitiation +from uffd.remailer import remailer +from uffd.models import User, DeviceLoginConfirmation, Service, OAuth2Client, OAuth2DeviceLoginInitiation from utils import dump, UffdTestCase @@ -54,7 +55,7 @@ class TestViews(UffdTestCase): self.login_as('user') r = self.client.get(path=url_for('oauth2.authorize', response_type='code', client_id='test', state='teststate', redirect_uri='http://localhost:5009/callback', scope='profile'), follow_redirects=False) service = Service.query.filter_by(name='test').one() - self.assert_authorization(r, mail=remailer.build_address(self.get_user(), service)) + self.assert_authorization(r, mail=remailer.build_address(service.id, self.get_user().id)) def test_authorization_client_secret_rehash(self): OAuth2Client.query.delete() diff --git a/tests/test_remailer.py b/tests/test_remailer.py new file mode 100644 index 0000000000000000000000000000000000000000..8ef261fc87d8145301b796c7e09557362de84050 --- /dev/null +++ b/tests/test_remailer.py @@ -0,0 +1,63 @@ +from uffd.remailer import remailer + +from utils import UffdTestCase + +USER_ID = 1234 +SERVICE1_ID = 4223 +SERVICE2_ID = 3242 + +class TestRemailer(UffdTestCase): + def test_is_remailer_domain(self): + self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' + self.assertTrue(remailer.is_remailer_domain('remailer.example.com')) + self.assertTrue(remailer.is_remailer_domain('REMAILER.EXAMPLE.COM')) + self.assertTrue(remailer.is_remailer_domain(' remailer.example.com ')) + self.assertFalse(remailer.is_remailer_domain('other.remailer.example.com')) + self.assertFalse(remailer.is_remailer_domain('example.com')) + self.app.config['REMAILER_OLD_DOMAINS'] = [' OTHER.remailer.example.com '] + self.assertTrue(remailer.is_remailer_domain(' OTHER.remailer.example.com ')) + self.assertTrue(remailer.is_remailer_domain('remailer.example.com')) + self.assertTrue(remailer.is_remailer_domain('other.remailer.example.com')) + self.assertFalse(remailer.is_remailer_domain('example.com')) + + def test_build_address(self): + self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' + self.assertTrue(remailer.build_address(SERVICE1_ID, USER_ID).endswith('@remailer.example.com')) + self.assertTrue(remailer.build_address(SERVICE2_ID, USER_ID).endswith('@remailer.example.com')) + self.assertLessEqual(len(remailer.build_local_part(SERVICE1_ID, USER_ID)), 64) + self.assertLessEqual(len(remailer.build_address(SERVICE1_ID, USER_ID)), 256) + self.assertEqual(remailer.build_address(SERVICE1_ID, USER_ID), remailer.build_address(SERVICE1_ID, USER_ID)) + self.assertNotEqual(remailer.build_address(SERVICE1_ID, USER_ID), remailer.build_address(SERVICE2_ID, USER_ID)) + addr = remailer.build_address(SERVICE1_ID, USER_ID) + self.app.config['REMAILER_OLD_DOMAINS'] = ['old.remailer.example.com'] + self.assertEqual(remailer.build_address(SERVICE1_ID, USER_ID), addr) + self.assertTrue(remailer.build_address(SERVICE1_ID, USER_ID).endswith('@remailer.example.com')) + self.app.config['REMAILER_SECRET_KEY'] = self.app.config['SECRET_KEY'] + self.assertEqual(remailer.build_address(SERVICE1_ID, USER_ID), addr) + self.app.config['REMAILER_SECRET_KEY'] = 'REMAILER-DEBUGKEY' + self.assertNotEqual(remailer.build_address(SERVICE1_ID, USER_ID), addr) + + def test_parse_address(self): + self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' + addr = remailer.build_address(SERVICE2_ID, USER_ID) + # REMAILER_DOMAIN behaviour + self.app.config['REMAILER_DOMAIN'] = None + self.assertIsNone(remailer.parse_address(addr)) + self.assertIsNone(remailer.parse_address('foo@example.com')) + self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' + self.assertEqual(remailer.parse_address(addr), (SERVICE2_ID, USER_ID)) + self.assertIsNone(remailer.parse_address('foo@example.com')) + self.assertIsNone(remailer.parse_address('foo@remailer.example.com')) + self.assertIsNone(remailer.parse_address('v1-foo@remailer.example.com')) + self.app.config['REMAILER_DOMAIN'] = 'new-remailer.example.com' + self.assertIsNone(remailer.parse_address(addr)) + self.app.config['REMAILER_OLD_DOMAINS'] = ['remailer.example.com'] + self.assertEqual(remailer.parse_address(addr), (SERVICE2_ID, USER_ID)) + # REMAILER_SECRET_KEY behaviour + self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' + self.app.config['REMAILER_OLD_DOMAINS'] = [] + self.assertEqual(remailer.parse_address(addr), (SERVICE2_ID, USER_ID)) + self.app.config['REMAILER_SECRET_KEY'] = self.app.config['SECRET_KEY'] + self.assertEqual(remailer.parse_address(addr), (SERVICE2_ID, USER_ID)) + self.app.config['REMAILER_SECRET_KEY'] = 'REMAILER-DEBUGKEY' + self.assertIsNone(remailer.parse_address(addr)) diff --git a/tests/test_services.py b/tests/test_services.py index 09cc802e5ae888e509e83b492a31371439d78275..643b1b45bc91c8cb62fccdd061bf13929e30cb6e 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -4,6 +4,162 @@ import unittest from flask import url_for from utils import dump, UffdTestCase +from uffd.remailer import remailer +from uffd.tasks import cleanup_task +from uffd.database import db +from uffd.models import Service, ServiceUser, User + +class TestServiceUser(UffdTestCase): + def setUp(self): + super().setUp() + db.session.add_all([Service(name='service1'), Service(name='service2', use_remailer=True)]) + db.session.commit() + + def test_auto_create(self): + service_count = Service.query.count() + user_count = User.query.count() + self.assertEqual(ServiceUser.query.count(), service_count * user_count) + db.session.add(User(loginname='newuser1', displayname='New User', mail='new1@example.com')) + db.session.commit() + self.assertEqual(ServiceUser.query.count(), service_count * (user_count + 1)) + db.session.add(Service(name='service3')) + db.session.commit() + self.assertEqual(ServiceUser.query.count(), (service_count + 1) * (user_count + 1)) + db.session.add(User(loginname='newuser2', displayname='New User', mail='new2@example.com')) + db.session.add(User(loginname='newuser3', displayname='New User', mail='new3@example.com')) + db.session.add(Service(name='service4')) + db.session.add(Service(name='service5')) + db.session.commit() + self.assertEqual(ServiceUser.query.count(), (service_count + 3) * (user_count + 3)) + + def test_create_missing(self): + service_count = Service.query.count() + user_count = User.query.count() + self.assertEqual(ServiceUser.query.count(), service_count * user_count) + db.session.delete(ServiceUser.query.first()) + db.session.commit() + self.assertEqual(ServiceUser.query.count(), service_count * user_count - 1) + cleanup_task.run() + db.session.commit() + self.assertEqual(ServiceUser.query.count(), service_count * user_count) + + def test_real_email(self): + user = self.get_user() + service = Service.query.filter_by(name='service1').first() + service_user = ServiceUser.query.get((service.id, user.id)) + self.assertEqual(service_user.real_email, user.mail) + + def test_remailer_email(self): + user = self.get_user() + service = Service.query.filter_by(name='service1').first() + service_user = ServiceUser.query.get((service.id, user.id)) + with self.assertRaises(Exception): + service_user.remailer_email + self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' + self.assertEqual(service_user.remailer_email, remailer.build_address(service.id, user.id)) + + def test_get_by_remailer_email(self): + user = self.get_user() + service = Service.query.filter_by(name='service1').first() + service_user = ServiceUser.query.get((service.id, user.id)) + self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' + remailer_email = remailer.build_address(service.id, user.id) + # 1. remailer not setup + self.app.config['REMAILER_DOMAIN'] = '' + self.assertIsNone(ServiceUser.get_by_remailer_email(user.mail)) + self.assertIsNone(ServiceUser.get_by_remailer_email(remailer_email)) + self.assertIsNone(ServiceUser.get_by_remailer_email('invalid')) + # 2. remailer setup + self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' + self.assertIsNone(ServiceUser.get_by_remailer_email(user.mail)) + self.assertEqual(ServiceUser.get_by_remailer_email(remailer_email), service_user) + self.assertIsNone(ServiceUser.get_by_remailer_email('invalid')) + + def test_email(self): + user = self.get_user() + service = Service.query.filter_by(name='service1').first() + service_user = ServiceUser.query.get((service.id, user.id)) + self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' + remailer_email = remailer.build_address(service.id, user.id) + # 1. remailer not setup + self.app.config['REMAILER_DOMAIN'] = '' + self.assertEqual(service_user.email, user.mail) + # 2. remailer setup + service.use_remailer disabled + self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' + self.assertEqual(service_user.email, user.mail) + # 3. remailer setup + service.use_remailer enabled + REMAILER_LIMIT_TO_USERS unset + service.use_remailer = True + db.session.commit() + self.assertEqual(service_user.email, remailer_email) + # 4. remailer setup + service.use_remailer enabled + REMAILER_LIMIT_TO_USERS does not include user + self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testadmin'] + self.assertEqual(service_user.email, user.mail) + # 5. remailer setup + service.use_remailer enabled + REMAILER_LIMIT_TO_USERS includes user + self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testuser'] + self.assertEqual(service_user.email, remailer_email) + + def test_filter_query_by_email(self): + def run_query(value): + return {(su.service_id, su.user_id) for su in ServiceUser.filter_query_by_email(ServiceUser.query, value)} + + user1 = self.get_user() + user2 = User(loginname='user2', mail=user1.mail, displayname='User 2') + db.session.add(user2) + db.session.commit() + service1 = Service.query.filter_by(name='service1').first() # use_remailer=False + service2 = Service.query.filter_by(name='service2').first() # use_remailer=True + self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' + remailer_email1_1 = remailer.build_address(service1.id, user1.id) + remailer_email2_1 = remailer.build_address(service2.id, user1.id) + remailer_email1_2 = remailer.build_address(service1.id, user2.id) + remailer_email2_2 = remailer.build_address(service2.id, user2.id) + + # 1. remailer disabled + self.app.config['REMAILER_DOMAIN'] = '' + self.assertEqual(run_query(user1.mail), { + (service1.id, user1.id), (service1.id, user2.id), + (service2.id, user1.id), (service2.id, user2.id), + }) + self.assertEqual(run_query(remailer_email1_1), set()) + self.assertEqual(run_query(remailer_email2_1), set()) + self.assertEqual(run_query('invalid'), set()) + + # 2. remailer enabled + REMAILER_LIMIT_TO_USERS unset + self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' + self.assertEqual(run_query(user1.mail), { + (service1.id, user1.id), (service1.id, user2.id), + }) + self.assertEqual(run_query(remailer_email1_1), set()) + self.assertEqual(run_query(remailer_email2_1), { + (service2.id, user1.id), + }) + self.assertEqual(run_query(remailer_email2_1 + ' '), set()) + self.assertEqual(run_query('invalid'), set()) + + # 3. remailer enabled + REMAILER_LIMIT_TO_USERS includes testuser + self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testuser'] + self.assertEqual(run_query(user1.mail), { + (service1.id, user1.id), (service1.id, user2.id), + (service2.id, user2.id), + }) + self.assertEqual(run_query(remailer_email1_1), set()) + self.assertEqual(run_query(remailer_email2_1), { + (service2.id, user1.id), + }) + self.assertEqual(run_query(remailer_email2_1 + ' '), set()) + self.assertEqual(run_query(remailer_email1_2), set()) + self.assertEqual(run_query(remailer_email2_2), set()) + self.assertEqual(run_query('invalid'), set()) + + # 4. remailer enabled + REMAILER_LIMIT_TO_USERS does not include user (should behave the same as 1.) + self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testadmin'] + self.assertEqual(run_query(user1.mail), { + (service1.id, user1.id), (service1.id, user2.id), + (service2.id, user1.id), (service2.id, user2.id), + }) + self.assertEqual(run_query(remailer_email1_1), set()) + self.assertEqual(run_query(remailer_email2_1), set()) + self.assertEqual(run_query('invalid'), set()) class TestServices(UffdTestCase): def setUpApp(self): diff --git a/tests/test_user.py b/tests/test_user.py index 0152a0583aec2a4c3a66affb0fcb5d9a9dd33580..190234d068b5ce3b09325e6cf26d034a1d8ca407 100644 --- a/tests/test_user.py +++ b/tests/test_user.py @@ -5,7 +5,8 @@ from flask import url_for, session import sqlalchemy from uffd import create_app, db -from uffd.models import User, remailer, RemailerAddress, Group, Role, RoleGroup, Service +from uffd.remailer import remailer +from uffd.models import User, Group, Role, RoleGroup, Service from utils import dump, UffdTestCase @@ -109,121 +110,6 @@ class TestUserModel(UffdTestCase): self.assertFalse(user.set_mail('v1-1-testuser@REMAILER.example.com')) self.assertFalse(user.set_mail('v1-1-testuser@foobar@remailer.example.com')) - def test_get_service_mail(self): - service1 = Service(name='service1') - service2 = Service(name='service2', use_remailer=True) - db.session.add_all([service1, service2]) - db.session.commit() - user = self.get_user() - self.assertEqual(user.get_service_mail(service1), user.mail) - self.assertEqual(user.get_service_mail(service2), user.mail) - self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' - self.assertEqual(user.get_service_mail(service1), user.mail) - self.assertEqual(user.get_service_mail(service2), remailer.build_address(user, service2)) - self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testadmin'] - self.assertEqual(user.get_service_mail(service1), user.mail) - self.assertEqual(user.get_service_mail(service2), user.mail) - self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testadmin', 'testuser'] - self.assertEqual(user.get_service_mail(service1), user.mail) - self.assertEqual(user.get_service_mail(service2), remailer.build_address(user, service2)) - - def test_filter_by_service_mail(self): - service1 = Service(name='service1') - service2 = Service(name='service2', use_remailer=True) - db.session.add_all([service1, service2]) - db.session.commit() - user = self.get_user() - self.assertEqual(User.query.filter(User.filter_by_service_mail(service1, user.mail)).all(), [user]) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service1, remailer.build_address(user, service1))).all(), []) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service1, remailer.build_address(user, service2))).all(), []) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service2, user.mail)).all(), [user]) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service2, remailer.build_address(user, service1))).all(), []) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service2, remailer.build_address(user, service2))).all(), []) - self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' - self.assertEqual(User.query.filter(User.filter_by_service_mail(service1, user.mail)).all(), [user]) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service1, remailer.build_address(user, service1))).all(), []) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service1, remailer.build_address(user, service2))).all(), []) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service2, user.mail)).all(), []) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service2, remailer.build_address(user, service2))).all(), [user]) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service2, remailer.build_address(user, service1))).all(), []) - self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testadmin'] - self.assertEqual(User.query.filter(User.filter_by_service_mail(service1, user.mail)).all(), [user]) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service1, remailer.build_address(user, service1))).all(), []) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service1, remailer.build_address(user, service2))).all(), []) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service2, user.mail)).all(), [user]) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service2, remailer.build_address(user, service1))).all(), []) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service2, remailer.build_address(user, service2))).all(), []) - self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testadmin', 'testuser'] - self.assertEqual(User.query.filter(User.filter_by_service_mail(service1, user.mail)).all(), [user]) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service1, remailer.build_address(user, service1))).all(), []) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service1, remailer.build_address(user, service2))).all(), []) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service2, user.mail)).all(), []) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service2, remailer.build_address(user, service1))).all(), []) - self.assertEqual(User.query.filter(User.filter_by_service_mail(service2, remailer.build_address(user, service2))).all(), [user]) - -class TestRemailer(UffdTestCase): - def setUpDB(self): - self.service1 = Service(name='service1') - self.service2 = Service(name='service2', use_remailer=True) - db.session.add_all([self.service1, self.service2]) - - def test_is_remailer_domain(self): - self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' - self.assertTrue(remailer.is_remailer_domain('remailer.example.com')) - self.assertTrue(remailer.is_remailer_domain('REMAILER.EXAMPLE.COM')) - self.assertTrue(remailer.is_remailer_domain(' remailer.example.com ')) - self.assertFalse(remailer.is_remailer_domain('other.remailer.example.com')) - self.assertFalse(remailer.is_remailer_domain('example.com')) - self.app.config['REMAILER_OLD_DOMAINS'] = [' OTHER.remailer.example.com '] - self.assertTrue(remailer.is_remailer_domain(' OTHER.remailer.example.com ')) - self.assertTrue(remailer.is_remailer_domain('remailer.example.com')) - self.assertTrue(remailer.is_remailer_domain('other.remailer.example.com')) - self.assertFalse(remailer.is_remailer_domain('example.com')) - - def test_build_address(self): - self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' - user = self.get_user() - self.assertTrue(remailer.build_address(user, self.service1).endswith('@remailer.example.com')) - self.assertTrue(remailer.build_address(user, self.service2).endswith('@remailer.example.com')) - self.assertLessEqual(len(remailer.build_local_part(user, self.service1)), 64) - self.assertLessEqual(len(remailer.build_address(user, self.service1)), 256) - self.assertEqual(remailer.build_address(user, self.service1), remailer.build_address(user, self.service1)) - self.assertNotEqual(remailer.build_address(user, self.service1), remailer.build_address(user, self.service2)) - addr = remailer.build_address(user, self.service1) - self.app.config['REMAILER_OLD_DOMAINS'] = ['old.remailer.example.com'] - self.assertEqual(remailer.build_address(user, self.service1), addr) - self.assertTrue(remailer.build_address(user, self.service1).endswith('@remailer.example.com')) - self.app.config['REMAILER_SECRET_KEY'] = self.app.config['SECRET_KEY'] - self.assertEqual(remailer.build_address(user, self.service1), addr) - self.app.config['REMAILER_SECRET_KEY'] = 'REMAILER-DEBUGKEY' - self.assertNotEqual(remailer.build_address(user, self.service1), addr) - - def test_parse_address(self): - self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' - user = self.get_user() - addr = remailer.build_address(user, self.service2) - # REMAILER_DOMAIN behaviour - self.app.config['REMAILER_DOMAIN'] = None - self.assertIsNone(remailer.parse_address(addr)) - self.assertIsNone(remailer.parse_address('foo@example.com')) - self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' - self.assertEqual(remailer.parse_address(addr), RemailerAddress(user, self.service2)) - self.assertIsNone(remailer.parse_address('foo@example.com')) - self.assertIsNone(remailer.parse_address('foo@remailer.example.com')) - self.assertIsNone(remailer.parse_address('v1-foo@remailer.example.com')) - self.app.config['REMAILER_DOMAIN'] = 'new-remailer.example.com' - self.assertIsNone(remailer.parse_address(addr)) - self.app.config['REMAILER_OLD_DOMAINS'] = ['remailer.example.com'] - self.assertEqual(remailer.parse_address(addr), RemailerAddress(user, self.service2)) - # REMAILER_SECRET_KEY behaviour - self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com' - self.app.config['REMAILER_OLD_DOMAINS'] = [] - self.assertEqual(remailer.parse_address(addr), RemailerAddress(user, self.service2)) - self.app.config['REMAILER_SECRET_KEY'] = self.app.config['SECRET_KEY'] - self.assertEqual(remailer.parse_address(addr), RemailerAddress(user, self.service2)) - self.app.config['REMAILER_SECRET_KEY'] = 'REMAILER-DEBUGKEY' - self.assertIsNone(remailer.parse_address(addr)) - class TestUserViews(UffdTestCase): def setUp(self): super().setUp() diff --git a/uffd/migrations/versions/f2eb2c52a61f_add_serviceuser.py b/uffd/migrations/versions/f2eb2c52a61f_add_serviceuser.py new file mode 100644 index 0000000000000000000000000000000000000000..2440efa28d126299ff017ac1dcc832c0d71b2020 --- /dev/null +++ b/uffd/migrations/versions/f2eb2c52a61f_add_serviceuser.py @@ -0,0 +1,29 @@ +"""Add ServiceUser + +Revision ID: f2eb2c52a61f +Revises: 9f824f61d8ac +Create Date: 2022-08-21 00:42:37.896970 + +""" +from alembic import op +import sqlalchemy as sa + +revision = 'f2eb2c52a61f' +down_revision = '9f824f61d8ac' +branch_labels = None +depends_on = None + +def upgrade(): + service_user = op.create_table('service_user', + sa.Column('service_id', sa.Integer(), nullable=False), + sa.Column('user_id', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(['service_id'], ['service.id'], name=op.f('fk_service_user_service_id_service'), onupdate='CASCADE', ondelete='CASCADE'), + sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_service_user_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'), + sa.PrimaryKeyConstraint('service_id', 'user_id', name=op.f('pk_service_user')) + ) + service = sa.table('service', sa.column('id')) + user = sa.table('user', sa.column('id')) + op.execute(service_user.insert().from_select(['service_id', 'user_id'], sa.select([service.c.id, user.c.id]))) + +def downgrade(): + op.drop_table('service_user') diff --git a/uffd/models/__init__.py b/uffd/models/__init__.py index 2734866ec39d5c7eef3f19e56952a07def7ac6bb..cedc85e6e94282530962c6e8d29937cb1b50e6a6 100644 --- a/uffd/models/__init__.py +++ b/uffd/models/__init__.py @@ -5,10 +5,10 @@ from .mfa import MFAType, MFAMethod, RecoveryCodeMethod, TOTPMethod, WebauthnMet from .oauth2 import OAuth2Client, OAuth2RedirectURI, OAuth2LogoutURI, OAuth2Grant, OAuth2Token, OAuth2DeviceLoginInitiation from .role import Role, RoleGroup, RoleGroupMap from .selfservice import PasswordToken, MailToken -from .service import Service, get_services +from .service import Service, ServiceUser, get_services from .session import DeviceLoginType, DeviceLoginInitiation, DeviceLoginConfirmation from .signup import Signup -from .user import User, Group, RemailerAddress, remailer +from .user import User, Group from .ratelimit import RatelimitEvent, Ratelimit, HostRatelimit, host_ratelimit, format_delay __all__ = [ @@ -19,9 +19,9 @@ __all__ = [ 'OAuth2Client', 'OAuth2RedirectURI', 'OAuth2LogoutURI', 'OAuth2Grant', 'OAuth2Token', 'OAuth2DeviceLoginInitiation', 'Role', 'RoleGroup', 'RoleGroupMap', 'PasswordToken', 'MailToken', - 'Service', 'get_services', + 'Service', 'ServiceUser', 'get_services', 'DeviceLoginType', 'DeviceLoginInitiation', 'DeviceLoginConfirmation', 'Signup', - 'User', 'Group', 'RemailerAddress', 'remailer', + 'User', 'Group', 'RatelimitEvent', 'Ratelimit', 'HostRatelimit', 'host_ratelimit', 'format_delay', ] diff --git a/uffd/models/oauth2.py b/uffd/models/oauth2.py index dd0df00f3b068c8db4529c13a347431574e0c30d..b79dd79286a17ce34a239e01b3c9701a78331243 100644 --- a/uffd/models/oauth2.py +++ b/uffd/models/oauth2.py @@ -10,6 +10,7 @@ from uffd.database import db, CommaSeparatedList from uffd.tasks import cleanup_task from uffd.password_hash import PasswordHashAttribute, HighEntropyPasswordHash from .session import DeviceLoginInitiation, DeviceLoginType +from .service import ServiceUser class OAuth2Client(db.Model): __tablename__ = 'oauth2client' @@ -40,7 +41,8 @@ class OAuth2Client(db.Model): return self.redirect_uris[0] def access_allowed(self, user): - return self.service.has_access(user) + service_user = ServiceUser.query.get((self.service_id, user.id)) + return service_user and service_user.has_access @property def logout_uris_json(self): diff --git a/uffd/models/service.py b/uffd/models/service.py index 8584b1b0b709faf12e079439d6c80b5f5885ed81..6953ee71545878a7d248ec6ea4cd7c5b884bfa2a 100644 --- a/uffd/models/service.py +++ b/uffd/models/service.py @@ -4,6 +4,9 @@ from sqlalchemy import Column, Integer, String, ForeignKey, Boolean from sqlalchemy.orm import relationship from uffd.database import db +from uffd.remailer import remailer +from uffd.tasks import cleanup_task +from .user import User class Service(db.Model): __tablename__ = 'service' @@ -25,8 +28,114 @@ class Service(db.Model): use_remailer = Column(Boolean(), default=False, nullable=False) - def has_access(self, user): - return not self.limit_access or self.access_group in user.groups +class ServiceUser(db.Model): + '''Service-related configuration and state for a user + + ServiceUser objects are auto-created whenever a new User or Service is + created, so there one for for every (Service, User) pair. + + Service- or User-related code should always use ServiceUser in queries + instead of User/Service.''' + __tablename__ = 'service_user' + __table_args__ = ( + db.PrimaryKeyConstraint('service_id', 'user_id'), + ) + + service_id = Column(Integer(), ForeignKey('service.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False) + service = relationship('Service', viewonly=True) + user_id = Column(Integer(), ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False) + user = relationship('User', viewonly=True) + + @property + def has_access(self): + return not self.service.limit_access or self.service.access_group in self.user.groups + + # Actual e-mail address that mails from the service are sent to + @property + def real_email(self): + return self.user.mail + + @property + def remailer_email(self): + if not remailer.configured: + raise Exception('ServiceUser.remailer_email accessed with unconfigured remailer') + return remailer.build_address(self.service_id, self.user_id) + + @classmethod + def get_by_remailer_email(cls, address): + if not remailer.configured: + return None + result = remailer.parse_address(address) + if result is None: + return None + # result is (service_id, user_id), i.e. our primary key + return cls.query.get(result) + + # E-Mail address as seen by the service + @property + def email(self): + use_remailer = remailer.configured and self.service.use_remailer + if current_app.config['REMAILER_LIMIT_TO_USERS'] is not None: + use_remailer = use_remailer and self.user.loginname in current_app.config['REMAILER_LIMIT_TO_USERS'] + if use_remailer: + return self.remailer_email + return self.real_email + + @classmethod + def filter_query_by_email(cls, query, email): + '''Filter query of ServiceUser by ServiceUser.email''' + # pylint completely fails to understand SQLAlchemy's query functions + # pylint: disable=no-member,invalid-name + service_user = cls.get_by_remailer_email(email) + if service_user and service_user.email == email: + return query.filter(cls.user_id == service_user.user_id, cls.service_id == service_user.service_id) + + AliasedUser = db.aliased(User) + AliasedService = db.aliased(Service) + + query = query.join(cls.user.of_type(AliasedUser)) + query = query.join(cls.service.of_type(AliasedService)) + + remailer_enabled_expr = AliasedService.use_remailer if remailer.configured else False + if current_app.config['REMAILER_LIMIT_TO_USERS'] is not None: + remailer_enabled_expr = db.and_( + remailer_enabled_expr, + AliasedUser.loginname.in_(current_app.config['REMAILER_LIMIT_TO_USERS']), + ) + return query.filter(db.and_(db.not_(remailer_enabled_expr), AliasedUser.mail == email)) + +@db.event.listens_for(db.Session, 'after_flush') # pylint: disable=no-member +def create_service_users(session, flush_context): # pylint: disable=unused-argument + # pylint completely fails to understand SQLAlchemy's query functions + # pylint: disable=no-member + new_user_ids = [user.id for user in session.new if isinstance(user, User)] + new_service_ids = [service.id for service in session.new if isinstance(service, Service)] + if not new_user_ids and not new_service_ids: + return + db.session.execute(db.insert(ServiceUser).from_select( + ['service_id', 'user_id'], + db.select([Service.id, User.id]).where(db.or_( + Service.id.in_(new_service_ids), + User.id.in_(new_user_ids), + )) + )) + +# On databases with write concurrency (i.e. everything but SQLite), the +# after_flush handler above is racy. So in rare cases ServiceUser objects +# might be missing. +@cleanup_task.handler +def create_missing_service_users(): + # pylint completely fails to understand SQLAlchemy's query functions + # pylint: disable=no-member + db.session.execute(db.insert(ServiceUser).from_select( + ['service_id', 'user_id'], + db.select([Service.id, User.id]).where(db.not_( + ServiceUser.query.filter( + ServiceUser.service_id == Service.id, + ServiceUser.user_id == User.id + ).exists() + )) + )) # The user-visible services show on the service overview page are read from # the SERVICES config key. It is planned to gradually extend the Service model diff --git a/uffd/models/user.py b/uffd/models/user.py index a91c9fe75f5f5f8e616da3c2dd2f12b9c680c1f6..f822e2324a6e3b885dc3257c09d80603d7698a08 100644 --- a/uffd/models/user.py +++ b/uffd/models/user.py @@ -2,13 +2,13 @@ import string import re from flask import current_app, escape -import itsdangerous from flask_babel import lazy_gettext from sqlalchemy import Column, Integer, String, ForeignKey, Boolean, Text from sqlalchemy.orm import relationship from uffd.database import db from uffd.password_hash import PasswordHashAttribute, LowEntropyPasswordHash +from uffd.remailer import remailer # pylint: disable=E1101 user_groups = db.Table('user_groups', @@ -44,6 +44,8 @@ class User(db.Model): groups = relationship('Group', secondary='user_groups', back_populates='members') roles = relationship('Role', secondary='role_members', back_populates='members') + service_users = relationship('ServiceUser', viewonly=True) + @property def unix_gid(self): return current_app.config['USER_GID'] @@ -104,93 +106,10 @@ class User(db.Model): self.mail = value return True - def get_service_mail(self, service): - if not remailer.configured or not service.use_remailer: - return self.mail - if current_app.config['REMAILER_LIMIT_TO_USERS'] is not None and \ - self.loginname not in current_app.config['REMAILER_LIMIT_TO_USERS']: - return self.mail - return remailer.build_address(self, service) - - @classmethod - def filter_by_service_mail(cls, service, address): - if not remailer.configured or not service.use_remailer: - return cls.mail == address - remailer_address = remailer.parse_address(address) - if remailer_address and remailer_address.service == service and \ - remailer_address.user.get_service_mail(service) == address: - return cls.id == remailer_address.user.id - if current_app.config['REMAILER_LIMIT_TO_USERS'] is not None: - return db.and_( - db.not_(cls.loginname.in_(current_app.config['REMAILER_LIMIT_TO_USERS'])), - cls.mail == address - ) - return False - # Somehow pylint non-deterministically fails to detect that .update_groups is set in invite.modes def update_groups(self): pass -class RemailerAddress: - def __init__(self, user, service): - self.user = user - self.service = service - - def __eq__(self, remailer_address): - return remailer_address is not None and self.user == remailer_address.user and self.service == remailer_address.service - -class Remailer: - '''The remailer feature improves user privacy by hiding real mail addresses - from services and instead providing them with autogenerated pseudonymous - remailer addresses. If a service sends a mail to a remailer address, the mail - service uses an uffd API endpoint to get the real mail address and rewrites - the remailer address with it. In case of a leak of user data from a service, - the remailer addresses are useless for third-parties.''' - - # pylint: disable=no-self-use - - @property - def configured(self): - return bool(current_app.config['REMAILER_DOMAIN']) - - def get_serializer(self): - secret = current_app.config['REMAILER_SECRET_KEY'] or current_app.secret_key - return itsdangerous.URLSafeSerializer(secret, salt='remailer_address_v1') - - def build_local_part(self, user, service): - return 'v1-' + self.get_serializer().dumps([service.id, user.id]) - - def build_address(self, user, service): - return self.build_local_part(user, service) + '@' + current_app.config['REMAILER_DOMAIN'] - - def is_remailer_domain(self, domain): - domains = {domain.lower().strip() for domain in current_app.config['REMAILER_OLD_DOMAINS']} - if current_app.config['REMAILER_DOMAIN']: - domains.add(current_app.config['REMAILER_DOMAIN'].lower().strip()) - return domain.lower().strip() in domains - - def parse_address(self, address): - # With a top-level import we get circular import problems - # pylint: disable=import-outside-toplevel - from .service import Service - if '@' not in address: - return None - local_part, domain = address.rsplit('@', 1) - if not self.is_remailer_domain(domain) or not local_part.startswith('v1-'): - return None - data = local_part[len('v1-'):] - try: - service_id, user_id = self.get_serializer().loads(data) - except itsdangerous.BadData: - return None - service = Service.query.get(service_id) - user = User.query.get(user_id) - if not service or not user: - return None - return RemailerAddress(user, service) - -remailer = Remailer() - def next_id_expr(column, min_value, max_value): # db.func.max(column) + 1: highest used value in range + 1, NULL if no values in range # db.func.min(..., max_value): clip to range diff --git a/uffd/remailer.py b/uffd/remailer.py new file mode 100644 index 0000000000000000000000000000000000000000..a4b37506ebd8e0b90113f81b04675237502fd43f --- /dev/null +++ b/uffd/remailer.py @@ -0,0 +1,47 @@ +from flask import current_app +import itsdangerous + +class Remailer: + '''The remailer feature improves user privacy by hiding real mail addresses + from services and instead providing them with autogenerated pseudonymous + remailer addresses. If a service sends a mail to a remailer address, the mail + service uses an uffd API endpoint to get the real mail address and rewrites + the remailer address with it. In case of a leak of user data from a service, + the remailer addresses are useless for third-parties.''' + + # pylint: disable=no-self-use + + @property + def configured(self): + return bool(current_app.config['REMAILER_DOMAIN']) + + def get_serializer(self): + secret = current_app.config['REMAILER_SECRET_KEY'] or current_app.secret_key + return itsdangerous.URLSafeSerializer(secret, salt='remailer_address_v1') + + def build_local_part(self, service_id, user_id): + return 'v1-' + self.get_serializer().dumps([service_id, user_id]) + + def build_address(self, service_id, user_id): + return self.build_local_part(service_id, user_id) + '@' + current_app.config['REMAILER_DOMAIN'] + + def is_remailer_domain(self, domain): + domains = {domain.lower().strip() for domain in current_app.config['REMAILER_OLD_DOMAINS']} + if current_app.config['REMAILER_DOMAIN']: + domains.add(current_app.config['REMAILER_DOMAIN'].lower().strip()) + return domain.lower().strip() in domains + + def parse_address(self, address): + if '@' not in address: + return None + local_part, domain = address.rsplit('@', 1) + if not self.is_remailer_domain(domain) or not local_part.startswith('v1-'): + return None + data = local_part[len('v1-'):] + try: + service_id, user_id = self.get_serializer().loads(data) + except itsdangerous.BadData: + return None + return (service_id, user_id) + +remailer = Remailer() diff --git a/uffd/views/api.py b/uffd/views/api.py index 11d59db408c2e00f3302ae74b0c24ffd80e42803..506bf5d9db591bf25fb01fd440146b2fe5610e4c 100644 --- a/uffd/views/api.py +++ b/uffd/views/api.py @@ -3,8 +3,8 @@ import functools from flask import Blueprint, jsonify, request, abort from uffd.database import db -from uffd.models import User, remailer, Group, Mail, MailReceiveAddress, MailDestinationAddress, APIClient -from .session import login_get_user, login_ratelimit +from uffd.models import User, ServiceUser, Group, Mail, MailReceiveAddress, MailDestinationAddress, APIClient +from .session import login_ratelimit bp = Blueprint('api', __name__, template_folder='templates', url_prefix='/api/v1/') @@ -57,18 +57,18 @@ def getgroups(): query = query.join(Group.members).filter(User.loginname == values[0]) else: abort(400) - # Single-result queries perform better without joinedload + # Single-result queries perform better without eager loading if key is None or key == 'member': - query = query.options(db.joinedload(Group.members)) + query = query.options(db.selectinload(Group.members)) return jsonify([generate_group_dict(group) for group in query]) -def generate_user_dict(user): +def generate_user_dict(service_user): return { - 'id': user.unix_uid, - 'loginname': user.loginname, - 'email': user.get_service_mail(request.api_client.service), - 'displayname': user.displayname, - 'groups': [group.name for group in user.groups] + 'id': service_user.user.unix_uid, + 'loginname': service_user.user.loginname, + 'email': service_user.email, + 'displayname': service_user.user.displayname, + 'groups': [group.name for group in service_user.user.groups] } @bp.route('/getusers', methods=['GET', 'POST']) @@ -78,7 +78,7 @@ def getusers(): abort(400) key = (list(request.values.keys()) or [None])[0] values = request.values.getlist(key) - query = User.query + query = ServiceUser.query.filter_by(service=request.api_client.service).join(ServiceUser.user) if key is None: pass elif key == 'id' and len(values) == 1: @@ -86,14 +86,15 @@ def getusers(): elif key == 'loginname' and len(values) == 1: query = query.filter(User.loginname == values[0]) elif key == 'email' and len(values) == 1: - query = query.filter(User.filter_by_service_mail(request.api_client.service, values[0])) + query = ServiceUser.filter_query_by_email(query, values[0]) elif key == 'group' and len(values) == 1: query = query.join(User.groups).filter(Group.name == values[0]) else: abort(400) - # Single-result queries perform better without joinedload + # Single-result queries perform better without eager loading if key is None or key == 'group': - query = query.options(db.joinedload(User.groups)) + # pylint: disable=no-member + query = query.options(db.joinedload(ServiceUser.user).selectinload(User.groups)) return jsonify([generate_user_dict(user) for user in query]) @bp.route('/checkpassword', methods=['POST']) @@ -106,14 +107,17 @@ def checkpassword(): login_delay = login_ratelimit.get_delay(username) if login_delay: return 'Too Many Requests', 429, {'Retry-After': '%d'%login_delay} - user = login_get_user(username, password) - if user is None: + service_user = ServiceUser.query.join(User).filter( + ServiceUser.service == request.api_client.service, + User.loginname == username, + ).one_or_none() + if service_user is None or not service_user.user.password.verify(password): login_ratelimit.log(username) return jsonify(None) - if user.password.needs_rehash: - user.password = password + if service_user.user.password.needs_rehash: + service_user.user.password = password db.session.commit() - return jsonify(generate_user_dict(user)) + return jsonify(generate_user_dict(service_user)) def generate_mail_dict(mail): return { @@ -129,17 +133,18 @@ def getmails(): abort(400) key = (list(request.values.keys()) or [None])[0] values = request.values.getlist(key) + query = Mail.query if key is None: - mails = Mail.query.all() + pass elif key == 'name' and len(values) == 1: - mails = Mail.query.filter_by(uid=values[0]).all() + query = query.filter_by(uid=values[0]) elif key == 'receive_address' and len(values) == 1: - mails = Mail.query.filter(Mail.receivers.any(MailReceiveAddress.address==values[0].lower())).all() + query = query.filter(Mail.receivers.any(MailReceiveAddress.address==values[0].lower())) elif key == 'destination_address' and len(values) == 1: - mails = Mail.query.filter(Mail.destinations.any(MailDestinationAddress.address==values[0])).all() + query = query.filter(Mail.destinations.any(MailDestinationAddress.address==values[0])) else: abort(400) - return jsonify([generate_mail_dict(mail) for mail in mails]) + return jsonify([generate_mail_dict(mail) for mail in query]) @bp.route('/resolve-remailer', methods=['GET', 'POST']) @apikey_required('remailer') @@ -149,7 +154,7 @@ def resolve_remailer(): values = request.values.getlist('orig_address') if len(values) != 1: abort(400) - remailer_address = remailer.parse_address(values[0]) - if not remailer_address: + service_user = ServiceUser.get_by_remailer_email(values[0]) + if not service_user: return jsonify(address=None) - return jsonify(address=remailer_address.user.mail) + return jsonify(address=service_user.real_email) diff --git a/uffd/views/oauth2.py b/uffd/views/oauth2.py index d9219cab5c02c6ffae05fc9f4eceb51d596fbfe5..1d0df2d02aa82f2b1b812eccb2ac859ad11b1e6e 100644 --- a/uffd/views/oauth2.py +++ b/uffd/views/oauth2.py @@ -9,7 +9,7 @@ from sqlalchemy.exc import IntegrityError from uffd.secure_redirect import secure_local_redirect from uffd.database import db -from uffd.models import DeviceLoginConfirmation, OAuth2Client, OAuth2Grant, OAuth2Token, OAuth2DeviceLoginInitiation, host_ratelimit, format_delay +from uffd.models import DeviceLoginConfirmation, OAuth2Client, OAuth2Grant, OAuth2Token, OAuth2DeviceLoginInitiation, host_ratelimit, format_delay, ServiceUser class UffdRequestValidator(oauthlib.oauth2.RequestValidator): # Argument "oauthreq" is named "request" in superclass but this clashes with flask's "request" object @@ -227,13 +227,13 @@ def oauth_required(*scopes): @bp.route('/userinfo') @oauth_required('profile') def userinfo(): - user = request.oauth.user + service_user = ServiceUser.query.get((request.oauth.client.service_id, request.oauth.user.id)) return jsonify( - id=user.unix_uid, - name=user.displayname, - nickname=user.loginname, - email=user.get_service_mail(request.oauth.client.service), - groups=[group.name for group in user.groups] + id=service_user.user.unix_uid, + name=service_user.user.displayname, + nickname=service_user.user.loginname, + email=service_user.email, + groups=[group.name for group in service_user.user.groups] ) @bp.app_url_defaults diff --git a/uffd/views/user.py b/uffd/views/user.py index 3ecf45d0e1a4942ebefb4dfa69ba41b8db288015..8e2ab63866fb856c41e0ff6de3aad4259e035429 100644 --- a/uffd/views/user.py +++ b/uffd/views/user.py @@ -7,8 +7,9 @@ from sqlalchemy.exc import IntegrityError from uffd.navbar import register_navbar from uffd.csrf import csrf_protect +from uffd.remailer import remailer from uffd.database import db -from uffd.models import User, remailer, Role +from uffd.models import User, Role from .selfservice import send_passwordreset from .session import login_required