Skip to content
Snippets Groups Projects
Forked from uffd / uffd
66 commits behind the upstream repository.
  • Julian's avatar
    fa67bde0
    Migrate OAuth2 and API clients to database · fa67bde0
    Julian authored
    Also adds a shallow Service model that coexists with the config-defined
    services to group multiple OAuth2 and API clients together.
    
    Clients defined in the config with OAUTH2_CLIENTS and API_CLIENTS_2 are
    imported by the database migrations.
    
    Removes support for complex values for the OAuth2 client group_required option.
    Only simple group names are supported, not (nested) lists of groups previously
    interpreted as AND/OR conjunctions. Also removes support for the login_message
    parameter of OAuth2 clients.
    fa67bde0
    History
    Migrate OAuth2 and API clients to database
    Julian authored
    Also adds a shallow Service model that coexists with the config-defined
    services to group multiple OAuth2 and API clients together.
    
    Clients defined in the config with OAUTH2_CLIENTS and API_CLIENTS_2 are
    imported by the database migrations.
    
    Removes support for complex values for the OAuth2 client group_required option.
    Only simple group names are supported, not (nested) lists of groups previously
    interpreted as AND/OR conjunctions. Also removes support for the login_message
    parameter of OAuth2 clients.
test_api.py 5.83 KiB
import base64

from flask import url_for

from uffd.api.views import apikey_required
from uffd.api.models import APIClient
from uffd.service.models import Service
from uffd.user.models import User
from uffd.password_hash import PlaintextPasswordHash
from uffd.database import db
from utils import UffdTestCase, db_flush

def basic_auth(username, password):
	return ('Authorization', 'Basic ' + base64.b64encode(f'{username}:{password}'.encode()).decode())

class TestAPIAuth(UffdTestCase):
	def setUpApp(self):
		@self.app.route('/test/endpoint1')
		@apikey_required()
		def testendpoint1():
			return 'OK', 200

		@self.app.route('/test/endpoint2')
		@apikey_required('users')
		def testendpoint2():
			return 'OK', 200

	def setUpDB(self):
		db.session.add(APIClient(service=Service(name='test1'), auth_username='test1', auth_password='testsecret1', perm_users=True))
		db.session.add(APIClient(service=Service(name='test2'), auth_username='test2', auth_password='testsecret2'))

	def test_basic(self):
		r = self.client.get(path=url_for('testendpoint1'), headers=[basic_auth('test1', 'testsecret1')], follow_redirects=True)
		self.assertEqual(r.status_code, 200)
		r = self.client.get(path=url_for('testendpoint2'), headers=[basic_auth('test1', 'testsecret1')], follow_redirects=True)
		self.assertEqual(r.status_code, 200)
		r = self.client.get(path=url_for('testendpoint1'), headers=[basic_auth('test2', 'testsecret2')], follow_redirects=True)
		self.assertEqual(r.status_code, 200)

	def test_basic_invalid_credentials(self):
		r = self.client.get(path=url_for('testendpoint1'), headers=[basic_auth('test-none', 'testsecret-none')], follow_redirects=True)
		self.assertEqual(r.status_code, 401)
		r = self.client.get(path=url_for('testendpoint1'), headers=[basic_auth('test1', 'testsecret2')], follow_redirects=True)
		self.assertEqual(r.status_code, 401)

	def test_basic_missing_scope(self):
		r = self.client.get(path=url_for('testendpoint2'), headers=[basic_auth('test2', 'testsecret2')], follow_redirects=True)
		self.assertEqual(r.status_code, 403)

	def test_no_auth(self):
		r = self.client.get(path=url_for('testendpoint1'), follow_redirects=True)
		self.assertEqual(r.status_code, 401)

	def test_auth_password_rehash(self):
		db.session.add(APIClient(service=Service(name='test3'), auth_username='test3', auth_password=PlaintextPasswordHash.from_password('testsecret3')))
		db.session.commit()
		self.assertIsInstance(APIClient.query.filter_by(auth_username='test3').one().auth_password, PlaintextPasswordHash)
		r = self.client.get(path=url_for('testendpoint1'), headers=[basic_auth('test3', 'testsecret3')], follow_redirects=True)
		self.assertEqual(r.status_code, 200)
		api_client = APIClient.query.filter_by(auth_username='test3').one()
		self.assertIsInstance(api_client.auth_password, APIClient.auth_password.method_cls)
		self.assertTrue(api_client.auth_password.verify('testsecret3'))
		r = self.client.get(path=url_for('testendpoint1'), headers=[basic_auth('test3', 'testsecret3')], follow_redirects=True)
		self.assertEqual(r.status_code, 200)

class TestAPIGetmails(UffdTestCase):
	def setUpDB(self):
		db.session.add(APIClient(service=Service(name='test'), auth_username='test', auth_password='test', perm_mail_aliases=True))

	def test_lookup(self):
		r = self.client.get(path=url_for('api.getmails', receive_address='test1@example.com'), headers=[basic_auth('test', 'test')], follow_redirects=True)
		self.assertEqual(r.status_code, 200)
		self.assertEqual(r.json, [{'name': 'test', 'receive_addresses': ['test1@example.com', 'test2@example.com'], 'destination_addresses': ['testuser@mail.example.com']}])
		r = self.client.get(path=url_for('api.getmails', receive_address='test2@example.com'), headers=[basic_auth('test', 'test')], follow_redirects=True)
		self.assertEqual(r.status_code, 200)
		self.assertEqual(r.json, [{'name': 'test', 'receive_addresses': ['test1@example.com', 'test2@example.com'], 'destination_addresses': ['testuser@mail.example.com']}])

	def test_lookup_notfound(self):
		r = self.client.get(path=url_for('api.getmails', receive_address='test3@example.com'), headers=[basic_auth('test', 'test')], follow_redirects=True)
		self.assertEqual(r.status_code, 200)
		self.assertEqual(r.json, [])

	def test_lookup_case_folding(self):
		r = self.client.get(path=url_for('api.getmails', receive_address='Test1@example.com'), headers=[basic_auth('test', 'test')], follow_redirects=True)
		self.assertEqual(r.status_code, 200)
		self.assertEqual(r.json, [{'name': 'test', 'receive_addresses': ['test1@example.com', 'test2@example.com'], 'destination_addresses': ['testuser@mail.example.com']}])

class TestAPICheckPassword(UffdTestCase):
	def setUpDB(self):
		db.session.add(APIClient(service=Service(name='test'), auth_username='test', auth_password='test', perm_checkpassword=True))

	def test(self):
		r = self.client.post(path=url_for('api.checkpassword'), data={'loginname': 'testuser', 'password': 'userpassword'}, headers=[basic_auth('test', 'test')])
		self.assertEqual(r.status_code, 200)
		self.assertEqual(r.json['loginname'], 'testuser')

	def test_password_rehash(self):
		self.get_user().password = PlaintextPasswordHash.from_password('userpassword')
		db.session.commit()
		self.assertIsInstance(self.get_user().password, PlaintextPasswordHash)
		db_flush()
		r = self.client.post(path=url_for('api.checkpassword'), data={'loginname': 'testuser', 'password': 'userpassword'}, headers=[basic_auth('test', 'test')])
		self.assertEqual(r.status_code, 200)
		self.assertEqual(r.json['loginname'], 'testuser')
		self.assertIsInstance(self.get_user().password, User.password.method_cls)
		self.assertTrue(self.get_user().password.verify('userpassword'))

	def test_wrong_password(self):
		r = self.client.post(path=url_for('api.checkpassword'), data={'loginname': 'testuser', 'password': 'wrongpassword'}, headers=[basic_auth('test', 'test')])
		self.assertEqual(r.status_code, 200)
		self.assertEqual(r.json, None)