Skip to content
Snippets Groups Projects
test_services.py 13.8 KiB
Newer Older
  • Learn to ignore specific revisions
  • import datetime
    import unittest
    
    from flask import url_for
    
    from utils import dump, UffdTestCase
    
    Julian's avatar
    Julian committed
    from uffd.remailer import remailer
    from uffd.tasks import cleanup_task
    from uffd.database import db
    
    Julian's avatar
    Julian committed
    from uffd.models import Service, ServiceUser, User, UserEmail, RemailerMode
    
    Julian's avatar
    Julian committed
    
    class TestServiceUser(UffdTestCase):
    	def setUp(self):
    		super().setUp()
    
    Julian's avatar
    Julian committed
    		db.session.add_all([Service(name='service1'), Service(name='service2', remailer_mode=RemailerMode.ENABLED_V1)])
    
    Julian's avatar
    Julian committed
    		db.session.commit()
    
    	def test_auto_create(self):
    		service_count = Service.query.count()
    		user_count = User.query.count()
    		self.assertEqual(ServiceUser.query.count(), service_count * user_count)
    
    		db.session.add(User(loginname='newuser1', displayname='New User', primary_email_address='new1@example.com'))
    
    Julian's avatar
    Julian committed
    		db.session.commit()
    		self.assertEqual(ServiceUser.query.count(), service_count * (user_count + 1))
    		db.session.add(Service(name='service3'))
    		db.session.commit()
    		self.assertEqual(ServiceUser.query.count(), (service_count + 1) * (user_count + 1))
    
    		db.session.add(User(loginname='newuser2', displayname='New User', primary_email_address='new2@example.com'))
    		db.session.add(User(loginname='newuser3', displayname='New User', primary_email_address='new3@example.com'))
    
    Julian's avatar
    Julian committed
    		db.session.add(Service(name='service4'))
    		db.session.add(Service(name='service5'))
    		db.session.commit()
    		self.assertEqual(ServiceUser.query.count(), (service_count + 3) * (user_count + 3))
    
    	def test_create_missing(self):
    		service_count = Service.query.count()
    		user_count = User.query.count()
    		self.assertEqual(ServiceUser.query.count(), service_count * user_count)
    		db.session.delete(ServiceUser.query.first())
    		db.session.commit()
    		self.assertEqual(ServiceUser.query.count(), service_count * user_count - 1)
    		cleanup_task.run()
    		db.session.commit()
    		self.assertEqual(ServiceUser.query.count(), service_count  * user_count)
    
    
    Julian's avatar
    Julian committed
    	def test_service_email(self):
    		user = self.get_user()
    		service = Service.query.filter_by(name='service1').first()
    		service_user = ServiceUser.query.get((service.id, user.id))
    		self.assertEqual(service_user.service_email, None)
    		service_user.service_email = UserEmail(user=user, address='foo@bar', verified=True)
    		with self.assertRaises(Exception):
    			service_user.service_email = UserEmail(user=user, address='foo2@bar', verified=False)
    		with self.assertRaises(Exception):
    			service_user.service_email = UserEmail(user=self.get_admin(), address='foo3@bar', verified=True)
    
    
    Julian's avatar
    Julian committed
    	def test_real_email(self):
    		user = self.get_user()
    		service = Service.query.filter_by(name='service1').first()
    		service_user = ServiceUser.query.get((service.id, user.id))
    
    		self.assertEqual(service_user.real_email, user.primary_email.address)
    
    Julian's avatar
    Julian committed
    		service_user.service_email = UserEmail(user=user, address='foo@bar', verified=True)
    		self.assertEqual(service_user.real_email, user.primary_email.address)
    		service.enable_email_preferences = True
    		self.assertEqual(service_user.real_email, service_user.service_email.address)
    
    Julian's avatar
    Julian committed
    
    	def test_get_by_remailer_email(self):
    		user = self.get_user()
    		service = Service.query.filter_by(name='service1').first()
    		service_user = ServiceUser.query.get((service.id, user.id))
    		self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
    
    Julian's avatar
    Julian committed
    		remailer_email = remailer.build_v1_address(service.id, user.id)
    
    Julian's avatar
    Julian committed
    		# 1. remailer not setup
    		self.app.config['REMAILER_DOMAIN'] = ''
    
    		self.assertIsNone(ServiceUser.get_by_remailer_email(user.primary_email.address))
    
    Julian's avatar
    Julian committed
    		self.assertIsNone(ServiceUser.get_by_remailer_email(remailer_email))
    		self.assertIsNone(ServiceUser.get_by_remailer_email('invalid'))
    		# 2. remailer setup
    		self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
    
    		self.assertIsNone(ServiceUser.get_by_remailer_email(user.primary_email.address))
    
    Julian's avatar
    Julian committed
    		self.assertEqual(ServiceUser.get_by_remailer_email(remailer_email), service_user)
    		self.assertIsNone(ServiceUser.get_by_remailer_email('invalid'))
    
    	def test_email(self):
    		user = self.get_user()
    		service = Service.query.filter_by(name='service1').first()
    		service_user = ServiceUser.query.get((service.id, user.id))
    		self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
    
    Julian's avatar
    Julian committed
    		remailer_email = remailer.build_v1_address(service.id, user.id)
    
    Julian's avatar
    Julian committed
    		# 1. remailer not setup
    		self.app.config['REMAILER_DOMAIN'] = ''
    
    		self.assertEqual(service_user.email, user.primary_email.address)
    
    Julian's avatar
    Julian committed
    		# 2. remailer setup + remailer disabled
    
    Julian's avatar
    Julian committed
    		self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
    
    		self.assertEqual(service_user.email, user.primary_email.address)
    
    Julian's avatar
    Julian committed
    		# 3. remailer setup + remailer enabled + REMAILER_LIMIT_TO_USERS unset
    		service.remailer_mode = RemailerMode.ENABLED_V1
    
    Julian's avatar
    Julian committed
    		db.session.commit()
    		self.assertEqual(service_user.email, remailer_email)
    
    Julian's avatar
    Julian committed
    		# 4. remailer setup + remailer enabled + REMAILER_LIMIT_TO_USERS does not include user
    
    Julian's avatar
    Julian committed
    		self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testadmin']
    
    		self.assertEqual(service_user.email, user.primary_email.address)
    
    Julian's avatar
    Julian committed
    		# 5. remailer setup + remailer enabled + REMAILER_LIMIT_TO_USERS includes user
    
    Julian's avatar
    Julian committed
    		self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testuser']
    		self.assertEqual(service_user.email, remailer_email)
    
    	def test_filter_query_by_email(self):
    		def run_query(value):
    			return {(su.service_id, su.user_id) for su in ServiceUser.filter_query_by_email(ServiceUser.query, value)}
    
    		user1 = self.get_user()
    
    		user2 = User(loginname='user2', primary_email_address=user1.primary_email.address, displayname='User 2')
    
    Julian's avatar
    Julian committed
    		db.session.add(user2)
    		db.session.commit()
    
    Julian's avatar
    Julian committed
    		service1 = Service.query.filter_by(name='service1').first() # remailer disabled
    		service2 = Service.query.filter_by(name='service2').first() # remailer enabled
    
    Julian's avatar
    Julian committed
    		self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
    
    Julian's avatar
    Julian committed
    		remailer_email1_1 = remailer.build_v1_address(service1.id, user1.id)
    		remailer_email2_1 = remailer.build_v1_address(service2.id, user1.id)
    		remailer_email1_2 = remailer.build_v1_address(service1.id, user2.id)
    		remailer_email2_2 = remailer.build_v1_address(service2.id, user2.id)
    
    Julian's avatar
    Julian committed
    
    		# 1. remailer disabled
    		self.app.config['REMAILER_DOMAIN'] = ''
    
    		self.assertEqual(run_query(user1.primary_email.address), {
    
    Julian's avatar
    Julian committed
    			(service1.id, user1.id), (service1.id, user2.id),
    			(service2.id, user1.id), (service2.id, user2.id),
    		})
    		self.assertEqual(run_query(remailer_email1_1), set())
    		self.assertEqual(run_query(remailer_email2_1), set())
    		self.assertEqual(run_query('invalid'), set())
    
    		# 2. remailer enabled + REMAILER_LIMIT_TO_USERS unset
    		self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
    
    		self.assertEqual(run_query(user1.primary_email.address), {
    
    Julian's avatar
    Julian committed
    			(service1.id, user1.id), (service1.id, user2.id),
    		})
    		self.assertEqual(run_query(remailer_email1_1), set())
    		self.assertEqual(run_query(remailer_email2_1), {
    			(service2.id, user1.id),
    		})
    		self.assertEqual(run_query(remailer_email2_1 + ' '), set())
    		self.assertEqual(run_query('invalid'), set())
    
    		# 3. remailer enabled + REMAILER_LIMIT_TO_USERS includes testuser
    		self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testuser']
    
    		self.assertEqual(run_query(user1.primary_email.address), {
    
    Julian's avatar
    Julian committed
    			(service1.id, user1.id), (service1.id, user2.id),
    			(service2.id, user2.id),
    		})
    		self.assertEqual(run_query(remailer_email1_1), set())
    		self.assertEqual(run_query(remailer_email2_1), {
    			(service2.id, user1.id),
    		})
    		self.assertEqual(run_query(remailer_email2_1 + ' '), set())
    		self.assertEqual(run_query(remailer_email1_2), set())
    		self.assertEqual(run_query(remailer_email2_2), set())
    		self.assertEqual(run_query('invalid'), set())
    
    		# 4. remailer enabled + REMAILER_LIMIT_TO_USERS does not include user (should behave the same as 1.)
    		self.app.config['REMAILER_LIMIT_TO_USERS'] = ['testadmin']
    
    		self.assertEqual(run_query(user1.primary_email.address), {
    
    Julian's avatar
    Julian committed
    			(service1.id, user1.id), (service1.id, user2.id),
    			(service2.id, user1.id), (service2.id, user2.id),
    		})
    		self.assertEqual(run_query(remailer_email1_1), set())
    		self.assertEqual(run_query(remailer_email2_1), set())
    		self.assertEqual(run_query('invalid'), set())
    
    Julian's avatar
    Julian committed
    	def test_filter_query_by_email_prefs(self):
    		def run_query(value):
    			return {(su.service_id, su.user_id) for su in ServiceUser.filter_query_by_email(ServiceUser.query, value)}
    
    		user1 = self.get_user()
    
    Julian's avatar
    Julian committed
    		service1 = Service.query.filter_by(name='service1').first() # remailer disabled
    		service2 = Service.query.filter_by(name='service2').first() # remailer enabled
    
    Julian's avatar
    Julian committed
    		self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
    
    Julian's avatar
    Julian committed
    		remailer_email1_1 = remailer.build_v1_address(service1.id, user1.id)
    		remailer_email2_1 = remailer.build_v1_address(service2.id, user1.id)
    
    Julian's avatar
    Julian committed
    
    		self.app.config['REMAILER_DOMAIN'] = ''
    		self.assertEqual(run_query(user1.primary_email.address), {
    			(service1.id, user1.id),
    			(service2.id, user1.id),
    		})
    		self.assertEqual(run_query('addr1-1@example.com'), set())
    		self.assertEqual(run_query('addr2-1@example.com'), set())
    		self.assertEqual(run_query(remailer_email1_1), set())
    		self.assertEqual(run_query(remailer_email2_1), set())
    
    		ServiceUser.query.get((service1.id, user1.id)).service_email = UserEmail(user=user1, verified=True, address='addr1-1@example.com')
    		ServiceUser.query.get((service2.id, user1.id)).service_email = UserEmail(user=user1, verified=True, address='addr2-1@example.com')
    		self.assertEqual(run_query(user1.primary_email.address), {
    			(service1.id, user1.id),
    			(service2.id, user1.id),
    		})
    		self.assertEqual(run_query('addr1-1@example.com'), set())
    		self.assertEqual(run_query('addr2-1@example.com'), set())
    		self.assertEqual(run_query(remailer_email1_1), set())
    		self.assertEqual(run_query(remailer_email2_1), set())
    		service1.enable_email_preferences = True
    		service2.enable_email_preferences = True
    		self.assertEqual(run_query(user1.primary_email.address), set())
    		self.assertEqual(run_query('addr1-1@example.com'), {
    			(service1.id, user1.id),
    		})
    		self.assertEqual(run_query('addr2-1@example.com'), {
    			(service2.id, user1.id),
    		})
    		self.assertEqual(run_query(remailer_email1_1), set())
    		self.assertEqual(run_query(remailer_email2_1), set())
    		self.app.config['REMAILER_DOMAIN'] = 'remailer.example.com'
    		self.assertEqual(run_query(user1.primary_email.address), set())
    		self.assertEqual(run_query('addr1-1@example.com'), {
    			(service1.id, user1.id),
    		})
    		self.assertEqual(run_query('addr2-1@example.com'), set())
    		self.assertEqual(run_query(remailer_email1_1), set())
    		self.assertEqual(run_query(remailer_email2_1), {
    			(service2.id, user1.id),
    		})
    
    
    class TestServices(UffdTestCase):
    	def setUpApp(self):
    		self.app.config['SERVICES'] = [
    			{
    				'title': 'Service Title',
    				'subtitle': 'Service Subtitle',
    				'description': 'Short description of the service as plain text',
    				'url': 'https://example.com/',
    				'logo_url': '/static/fairy-dust-color.png',
    				'required_group': 'users',
    				'permission_levels': [
    					{'name': 'Moderator', 'required_group': 'moderators'},
    					{'name': 'Admin', 'required_group': 'uffd_admin'},
    				],
    				'confidential': True,
    				'groups': [
    					{'name': 'Group "crew_crew"', 'required_group': 'users'},
    					{'name': 'Group "crew_logistik"', 'required_group': 'uffd_admin'},
    				],
    				'infos': [
    					{'title': 'Documentation', 'html': '<p>Some information about the service as html</p>', 'required_group': 'users'},
    				],
    				'links': [
    					{'title': 'Link to an external site', 'url': '#', 'required_group': 'users'},
    				],
    			},
    			{
    				'title': 'Minimal Service Title',
    			}
    		]
    		self.app.config['SERVICES_PUBLIC'] = True
    
    
    	def test_overview(self):
    		r = self.client.get(path=url_for('service.overview'))
    
    		dump('service_overview_guest', r)
    
    		self.assertEqual(r.status_code, 200)
    		self.assertNotIn(b'https://example.com/', r.data)
    
    sistason's avatar
    sistason committed
    		self.login_as('user')
    
    		r = self.client.get(path=url_for('service.overview'))
    
    		dump('service_overview_user', r)
    
    		self.assertEqual(r.status_code, 200)
    		self.assertIn(b'https://example.com/', r.data)
    
    
    	def test_overview_disabled(self):
    		self.app.config['SERVICES'] = []
    		# Should return login page
    		r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
    		dump('service_overview_disabled_guest', r)
    		self.assertEqual(r.status_code, 200)
    		self.assertIn(b'name="password"', r.data)
    		self.login_as('user')
    		# Should return access denied page
    		r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
    		dump('service_overview_disabled_user', r)
    		self.assertEqual(r.status_code, 403)
    		self.login_as('admin')
    		# Should return (empty) overview page
    		r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
    		dump('service_overview_disabled_admin', r)
    		self.assertEqual(r.status_code, 200)
    
    	def test_overview_nonpublic(self):
    		self.app.config['SERVICES_PUBLIC'] = False
    		# Should return login page
    		r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
    		dump('service_overview_nonpublic_guest', r)
    		self.assertEqual(r.status_code, 200)
    		self.assertIn(b'name="password"', r.data)
    		self.login_as('user')
    		# Should return overview page
    		r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
    		dump('service_overview_nonpublic_user', r)
    		self.assertEqual(r.status_code, 200)
    		self.login_as('admin')
    		# Should return overview page
    		r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
    		dump('service_overview_nonpublic_admin', r)
    		self.assertEqual(r.status_code, 200)
    
    	def test_overview_public(self):
    		# Should return overview page
    		r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
    		dump('service_overview_public_guest', r)
    		self.assertEqual(r.status_code, 200)
    		self.login_as('user')
    		# Should return overview page
    		r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
    		dump('service_overview_public_user', r)
    		self.assertEqual(r.status_code, 200)
    		self.login_as('admin')
    		# Should return overview page
    		r = self.client.get(path=url_for('service.overview'), follow_redirects=True)
    		dump('service_overview_public_admin', r)
    		self.assertEqual(r.status_code, 200)