Newer
Older
import datetime
import unittest
from flask import url_for
from utils import dump, UffdTestCase
from uffd.remailer import remailer
from uffd.tasks import cleanup_task
from uffd.database import db
from uffd.models import Service, ServiceUser, User, UserEmail, RemailerMode
class TestServiceUser(UffdTestCase):
def setUp(self):
super().setUp()
db.session.add_all([Service(name='service1'), Service(name='service2', remailer_mode=RemailerMode.ENABLED_V1)])
db.session.commit()
def test_auto_create(self):
service_count = Service.query.count()
user_count = User.query.count()
self.assertEqual(ServiceUser.query.count(), service_count * user_count)
db.session.add(User(loginname='newuser1', displayname='New User', primary_email_address='new1@example.com'))
db.session.commit()
self.assertEqual(ServiceUser.query.count(), service_count * (user_count + 1))
db.session.add(Service(name='service3'))
db.session.commit()
self.assertEqual(ServiceUser.query.count(), (service_count + 1) * (user_count + 1))
db.session.add(User(loginname='newuser2', displayname='New User', primary_email_address='new2@example.com'))
db.session.add(User(loginname='newuser3', displayname='New User', primary_email_address='new3@example.com'))
db.session.add(Service(name='service4'))
db.session.add(Service(name='service5'))
db.session.commit()
self.assertEqual(ServiceUser.query.count(), (service_count + 3) * (user_count + 3))
def test_create_missing(self):
service_count = Service.query.count()
user_count = User.query.count()
self.assertEqual(ServiceUser.query.count(), service_count * user_count)
db.session.delete(ServiceUser.query.first())
db.session.commit()
self.assertEqual(ServiceUser.query.count(), service_count * user_count - 1)
cleanup_task.run()
db.session.commit()
self.assertEqual(ServiceUser.query.count(), service_count * user_count)
def test_service_email(self):
user = self.get_user()
service = Service.query.filter_by(name='service1').first()
service_user = ServiceUser.query.get((service.id, user.id))
self.assertEqual(service_user.service_email, None)
service_user.service_email = UserEmail(user=user, address='foo@bar', verified=True)
with self.assertRaises(Exception):
service_user.service_email = UserEmail(user=user, address='foo2@bar', verified=False)
with self.assertRaises(Exception):
service_user.service_email = UserEmail(user=self.get_admin(), address='foo3@bar', verified=True)
def test_real_email(self):
user = self.get_user()
service = Service.query.filter_by(name='service1').first()
service_user = ServiceUser.query.get((service.id, user.id))
self.assertEqual(service_user.real_email, user.primary_email.address)
service_user.service_email = UserEmail(user=user, address='foo@bar', verified=True)
self.assertEqual(service_user.real_email, user.primary_email.address)
service.enable_email_preferences = True
self.assertEqual(service_user.real_email, service_user.service_email.address)
def test_get_by_remailer_email(self):
user = self.get_user()
service = Service.query.filter_by(name='service1').first()
service_user = ServiceUser.query.get((service.id, user.id))
self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
remailer_email = remailer.build_v1_address(service.id, user.id)
# 1. remailer not setup
self.app.config['REMAILER_DOMAIN'] = ''
self.assertIsNone(ServiceUser.get_by_remailer_email(user.primary_email.address))
self.assertIsNone(ServiceUser.get_by_remailer_email(remailer_email))
self.assertIsNone(ServiceUser.get_by_remailer_email('invalid'))
# 2. remailer setup
self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
self.assertIsNone(ServiceUser.get_by_remailer_email(user.primary_email.address))
self.assertEqual(ServiceUser.get_by_remailer_email(remailer_email), service_user)
self.assertIsNone(ServiceUser.get_by_remailer_email('invalid'))
def test_email(self):
user = self.get_user()
service = Service.query.filter_by(name='service1').first()
service_user = ServiceUser.query.get((service.id, user.id))
self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
remailer_email = remailer.build_v1_address(service.id, user.id)
# 1. remailer not setup
self.app.config['REMAILER_DOMAIN'] = ''
self.assertEqual(service_user.email, user.primary_email.address)
self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
self.assertEqual(service_user.email, user.primary_email.address)
# 3. remailer setup + remailer enabled + REMAILER_LIMIT_TO_USERS unset
service.remailer_mode = RemailerMode.ENABLED_V1
db.session.commit()
self.assertEqual(service_user.email, remailer_email)
# 4. remailer setup + remailer enabled + REMAILER_LIMIT_TO_USERS does not include user
self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testadmin']
self.assertEqual(service_user.email, user.primary_email.address)
# 5. remailer setup + remailer enabled + REMAILER_LIMIT_TO_USERS includes user
self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testuser']
self.assertEqual(service_user.email, remailer_email)
def test_filter_query_by_email(self):
def run_query(value):
return {(su.service_id, su.user_id) for su in ServiceUser.filter_query_by_email(ServiceUser.query, value)}
user1 = self.get_user()
user2 = User(loginname='user2', primary_email_address=user1.primary_email.address, displayname='User 2')
service1 = Service.query.filter_by(name='service1').first() # remailer disabled
service2 = Service.query.filter_by(name='service2').first() # remailer enabled
self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
remailer_email1_1 = remailer.build_v1_address(service1.id, user1.id)
remailer_email2_1 = remailer.build_v1_address(service2.id, user1.id)
remailer_email1_2 = remailer.build_v1_address(service1.id, user2.id)
remailer_email2_2 = remailer.build_v1_address(service2.id, user2.id)
# 1. remailer disabled
self.app.config['REMAILER_DOMAIN'] = ''
self.assertEqual(run_query(user1.primary_email.address), {
(service1.id, user1.id), (service1.id, user2.id),
(service2.id, user1.id), (service2.id, user2.id),
})
self.assertEqual(run_query(remailer_email1_1), set())
self.assertEqual(run_query(remailer_email2_1), set())
self.assertEqual(run_query('invalid'), set())
# 2. remailer enabled + REMAILER_LIMIT_TO_USERS unset
self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
self.assertEqual(run_query(user1.primary_email.address), {
(service1.id, user1.id), (service1.id, user2.id),
})
self.assertEqual(run_query(remailer_email1_1), set())
self.assertEqual(run_query(remailer_email2_1), {
(service2.id, user1.id),
})
self.assertEqual(run_query(remailer_email2_1 + ' '), set())
self.assertEqual(run_query('invalid'), set())
# 3. remailer enabled + REMAILER_LIMIT_TO_USERS includes testuser
self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testuser']
self.assertEqual(run_query(user1.primary_email.address), {
(service1.id, user1.id), (service1.id, user2.id),
(service2.id, user2.id),
})
self.assertEqual(run_query(remailer_email1_1), set())
self.assertEqual(run_query(remailer_email2_1), {
(service2.id, user1.id),
})
self.assertEqual(run_query(remailer_email2_1 + ' '), set())
self.assertEqual(run_query(remailer_email1_2), set())
self.assertEqual(run_query(remailer_email2_2), set())
self.assertEqual(run_query('invalid'), set())
# 4. remailer enabled + REMAILER_LIMIT_TO_USERS does not include user (should behave the same as 1.)
self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testadmin']
self.assertEqual(run_query(user1.primary_email.address), {
(service1.id, user1.id), (service1.id, user2.id),
(service2.id, user1.id), (service2.id, user2.id),
})
self.assertEqual(run_query(remailer_email1_1), set())
self.assertEqual(run_query(remailer_email2_1), set())
self.assertEqual(run_query('invalid'), set())
def test_filter_query_by_email_prefs(self):
def run_query(value):
return {(su.service_id, su.user_id) for su in ServiceUser.filter_query_by_email(ServiceUser.query, value)}
user1 = self.get_user()
service1 = Service.query.filter_by(name='service1').first() # remailer disabled
service2 = Service.query.filter_by(name='service2').first() # remailer enabled
self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
remailer_email1_1 = remailer.build_v1_address(service1.id, user1.id)
remailer_email2_1 = remailer.build_v1_address(service2.id, user1.id)
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
self.app.config['REMAILER_DOMAIN'] = ''
self.assertEqual(run_query(user1.primary_email.address), {
(service1.id, user1.id),
(service2.id, user1.id),
})
self.assertEqual(run_query('addr1-1@example.com'), set())
self.assertEqual(run_query('addr2-1@example.com'), set())
self.assertEqual(run_query(remailer_email1_1), set())
self.assertEqual(run_query(remailer_email2_1), set())
ServiceUser.query.get((service1.id, user1.id)).service_email = UserEmail(user=user1, verified=True, address='addr1-1@example.com')
ServiceUser.query.get((service2.id, user1.id)).service_email = UserEmail(user=user1, verified=True, address='addr2-1@example.com')
self.assertEqual(run_query(user1.primary_email.address), {
(service1.id, user1.id),
(service2.id, user1.id),
})
self.assertEqual(run_query('addr1-1@example.com'), set())
self.assertEqual(run_query('addr2-1@example.com'), set())
self.assertEqual(run_query(remailer_email1_1), set())
self.assertEqual(run_query(remailer_email2_1), set())
service1.enable_email_preferences = True
service2.enable_email_preferences = True
self.assertEqual(run_query(user1.primary_email.address), set())
self.assertEqual(run_query('addr1-1@example.com'), {
(service1.id, user1.id),
})
self.assertEqual(run_query('addr2-1@example.com'), {
(service2.id, user1.id),
})
self.assertEqual(run_query(remailer_email1_1), set())
self.assertEqual(run_query(remailer_email2_1), set())
self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
self.assertEqual(run_query(user1.primary_email.address), set())
self.assertEqual(run_query('addr1-1@example.com'), {
(service1.id, user1.id),
})
self.assertEqual(run_query('addr2-1@example.com'), set())
self.assertEqual(run_query(remailer_email1_1), set())
self.assertEqual(run_query(remailer_email2_1), {
(service2.id, user1.id),
})
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
class TestServices(UffdTestCase):
def setUpApp(self):
self.app.config['SERVICES'] = [
{
'title': 'Service Title',
'subtitle': 'Service Subtitle',
'description': 'Short description of the service as plain text',
'url': 'https://example.com/',
'logo_url': '/static/fairy-dust-color.png',
'required_group': 'users',
'permission_levels': [
{'name': 'Moderator', 'required_group': 'moderators'},
{'name': 'Admin', 'required_group': 'uffd_admin'},
],
'confidential': True,
'groups': [
{'name': 'Group "crew_crew"', 'required_group': 'users'},
{'name': 'Group "crew_logistik"', 'required_group': 'uffd_admin'},
],
'infos': [
{'title': 'Documentation', 'html': '<p>Some information about the service as html</p>', 'required_group': 'users'},
],
'links': [
{'title': 'Link to an external site', 'url': '#', 'required_group': 'users'},
],
},
{
'title': 'Minimal Service Title',
}
]
self.app.config['SERVICES_PUBLIC'] = True
def test_overview(self):
r = self.client.get(path=url_for('service.overview'))
dump('service_overview_guest', r)
self.assertEqual(r.status_code, 200)
self.assertNotIn(b'https://example.com/', r.data)
r = self.client.get(path=url_for('service.overview'))
dump('service_overview_user', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'https://example.com/', r.data)
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def test_overview_disabled(self):
self.app.config['SERVICES'] = []
# Should return login page
r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
dump('service_overview_disabled_guest', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'name="password"', r.data)
self.login_as('user')
# Should return access denied page
r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
dump('service_overview_disabled_user', r)
self.assertEqual(r.status_code, 403)
self.login_as('admin')
# Should return (empty) overview page
r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
dump('service_overview_disabled_admin', r)
self.assertEqual(r.status_code, 200)
def test_overview_nonpublic(self):
self.app.config['SERVICES_PUBLIC'] = False
# Should return login page
r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
dump('service_overview_nonpublic_guest', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'name="password"', r.data)
self.login_as('user')
# Should return overview page
r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
dump('service_overview_nonpublic_user', r)
self.assertEqual(r.status_code, 200)
self.login_as('admin')
# Should return overview page
r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
dump('service_overview_nonpublic_admin', r)
self.assertEqual(r.status_code, 200)
def test_overview_public(self):
# Should return overview page
r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
dump('service_overview_public_guest', r)
self.assertEqual(r.status_code, 200)
self.login_as('user')
# Should return overview page
r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
dump('service_overview_public_user', r)
self.assertEqual(r.status_code, 200)
self.login_as('admin')
# Should return overview page
r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
dump('service_overview_public_admin', r)
self.assertEqual(r.status_code, 200)