From 917f9ecdb3512aeddebbdb46b90781cfeb7f81b1 Mon Sep 17 00:00:00 2001 From: Julian Rother <julian@cccv.de> Date: Fri, 3 Dec 2021 18:31:53 +0100 Subject: [PATCH] HTTP Basic auth for API with new API_CLIENTS_2 This change is going to be backported to v1.x.x to have a good migration path. Bearer auth with API_CLIENTS config key is deprecated and planned to be removed in v2.0.0. --- tests/test_api.py | 81 +++++++++++++++++++++++++++++++++++++++++ uffd/api/views.py | 38 ++++++++++++------- uffd/default_config.cfg | 9 +++++ 3 files changed, 115 insertions(+), 13 deletions(-) create mode 100644 tests/test_api.py diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 00000000..e9549e32 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,81 @@ +import base64 + +from flask import url_for + +from uffd.api.views import apikey_required +from utils import UffdTestCase + +def basic_auth(username, password): + return ('Authorization', 'Basic ' + base64.b64encode(f'{username}:{password}'.encode()).decode()) + +class TestAPIAuth(UffdTestCase): + def setUpApp(self): + self.app.config['API_CLIENTS'] = { + 'testtoken1': {'scopes': ['testscope']}, + 'testtoken2': {}, + } + self.app.config['API_CLIENTS_2'] = { + 'test1': {'client_secret': 'testsecret1', 'scopes': ['getusers', 'testscope']}, + 'test2': {'client_secret': 'testsecret2'}, + } + + @self.app.route('/test/endpoint1') + @apikey_required() + def testendpoint1(): + return 'OK', 200 + + @self.app.route('/test/endpoint2') + @apikey_required('getusers') + def testendpoint2(): + return 'OK', 200 + + @self.app.route('/test/endpoint3') + @apikey_required('testscope') + def testendpoint3(): + return 'OK', 200 + + def test_basic(self): + r = self.client.get(path=url_for('testendpoint1'), headers=[basic_auth('test1', 'testsecret1')], follow_redirects=True) + self.assertEqual(r.status_code, 200) + r = self.client.get(path=url_for('testendpoint2'), headers=[basic_auth('test1', 'testsecret1')], follow_redirects=True) + self.assertEqual(r.status_code, 200) + r = self.client.get(path=url_for('testendpoint3'), headers=[basic_auth('test1', 'testsecret1')], follow_redirects=True) + self.assertEqual(r.status_code, 200) + r = self.client.get(path=url_for('testendpoint1'), headers=[basic_auth('test2', 'testsecret2')], follow_redirects=True) + self.assertEqual(r.status_code, 200) + + def test_basic_invalid_credentials(self): + r = self.client.get(path=url_for('testendpoint1'), headers=[basic_auth('test-none', 'testsecret-none')], follow_redirects=True) + self.assertEqual(r.status_code, 401) + r = self.client.get(path=url_for('testendpoint1'), headers=[basic_auth('test1', 'testsecret2')], follow_redirects=True) + self.assertEqual(r.status_code, 401) + + def test_basic_missing_scope(self): + r = self.client.get(path=url_for('testendpoint2'), headers=[basic_auth('test2', 'testsecret2')], follow_redirects=True) + self.assertEqual(r.status_code, 403) + r = self.client.get(path=url_for('testendpoint3'), headers=[basic_auth('test2', 'testsecret2')], follow_redirects=True) + self.assertEqual(r.status_code, 403) + + def test_bearer(self): + r = self.client.get(path=url_for('testendpoint1'), headers=[('Authorization', 'Bearer testtoken1')], follow_redirects=True) + self.assertEqual(r.status_code, 200) + r = self.client.get(path=url_for('testendpoint2'), headers=[('Authorization', 'Bearer testtoken1')], follow_redirects=True) + self.assertEqual(r.status_code, 200) + r = self.client.get(path=url_for('testendpoint3'), headers=[('Authorization', 'Bearer testtoken1')], follow_redirects=True) + self.assertEqual(r.status_code, 200) + r = self.client.get(path=url_for('testendpoint1'), headers=[('Authorization', 'Bearer testtoken2')], follow_redirects=True) + self.assertEqual(r.status_code, 200) + r = self.client.get(path=url_for('testendpoint2'), headers=[('Authorization', 'Bearer testtoken2')], follow_redirects=True) + self.assertEqual(r.status_code, 200) + + def test_bearer_invalid_credentials(self): + r = self.client.get(path=url_for('testendpoint1'), headers=[('Authorization', 'Bearer testtoken-none')], follow_redirects=True) + self.assertEqual(r.status_code, 401) + + def test_bearer_missing_scope(self): + r = self.client.get(path=url_for('testendpoint3'), headers=[('Authorization', 'Bearer testtoken2')], follow_redirects=True) + self.assertEqual(r.status_code, 401) + + def test_no_auth(self): + r = self.client.get(path=url_for('testendpoint1'), follow_redirects=True) + self.assertEqual(r.status_code, 401) diff --git a/uffd/api/views.py b/uffd/api/views.py index d63003d2..9b6dba63 100644 --- a/uffd/api/views.py +++ b/uffd/api/views.py @@ -10,20 +10,32 @@ from uffd.session.views import login_get_user, login_ratelimit bp = Blueprint('api', __name__, template_folder='templates', url_prefix='/api/v1/') def apikey_required(scope=None): + # pylint: disable=too-many-return-statements def wrapper(func): @functools.wraps(func) def decorator(*args, **kwargs): - if 'Authorization' not in request.headers or not request.headers['Authorization'].startswith('Bearer '): - return 'Unauthorized', 401, {'WWW-Authenticate': 'Bearer'} - token = request.headers['Authorization'][7:].strip() - request.api_client = None - for client_token, client in current_app.config['API_CLIENTS'].items(): - if secrets.compare_digest(client_token, token): - request.api_client = client - if request.api_client is None: - return 'Unauthorized', 401, {'WWW-Authenticate': 'Bearer error="invalid_token"'} - if scope is not None and scope not in request.api_client.get('scopes', []): - return 'Unauthorized', 401, {'WWW-Authenticate': 'Bearer error="insufficient_scope",scope="%s"'%scope} + if request.authorization and request.authorization.password: + if request.authorization.username not in current_app.config['API_CLIENTS_2']: + return 'Unauthorized', 401, {'WWW-Authenticate': ['Basic realm="api"']} + client = current_app.config['API_CLIENTS_2'][request.authorization.username] + if not secrets.compare_digest(request.authorization.password, client['client_secret']): + return 'Unauthorized', 401, {'WWW-Authenticate': ['Basic realm="api"']} + if scope is not None and scope not in client.get('scopes', []): + return 'Forbidden', 403 + # To be removed in uffd v2 + elif 'Authorization' in request.headers and request.headers['Authorization'].startswith('Bearer '): + token = request.headers['Authorization'][7:].strip() + client = None + for client_token, data in current_app.config['API_CLIENTS'].items(): + if secrets.compare_digest(client_token, token): + client = data + if client is None: + return 'Unauthorized', 401, {'WWW-Authenticate': 'Bearer error="invalid_token"'} + client_scopes = ['getusers'] + client.get('scopes', []) + if scope is not None and scope not in client_scopes: + return 'Unauthorized', 401, {'WWW-Authenticate': 'Bearer error="insufficient_scope",scope="%s"'%scope} + else: + return 'Unauthorized', 401, {'WWW-Authenticate': ['Bearer', 'Basic realm="api"']} return func(*args, **kwargs) return decorator return wrapper @@ -33,7 +45,7 @@ def generate_group_dict(group): 'members': [user.loginname for user in group.members]} @bp.route('/getgroups', methods=['GET', 'POST']) -@apikey_required() +@apikey_required('getusers') def getgroups(): if len(request.values) > 1: abort(400) @@ -59,7 +71,7 @@ def generate_user_dict(user, all_groups=None): 'groups': [group.name for group in all_groups if user in group.members]} @bp.route('/getusers', methods=['GET', 'POST']) -@apikey_required() +@apikey_required('getusers') def getusers(): if len(request.values) > 1: abort(400) diff --git a/uffd/default_config.cfg b/uffd/default_config.cfg index e4a3971d..22018830 100644 --- a/uffd/default_config.cfg +++ b/uffd/default_config.cfg @@ -60,10 +60,19 @@ OAUTH2_CLIENTS={ # Set 'login_message' (or suffixed with a language code like 'login_message_de') to display a custom message on the login form. } +# Deprecated, will be removed in uffd v2 API_CLIENTS={ #'token': {'scopes': ['checkpassword']} } +API_CLIENTS_2={ + #'test_client_id' : {'client_secret': 'random_secret', 'scopes': ['users', 'checkpassword']}, + # Scopes: + # * 'getusers': Retrieve user and group data (endpoints getusers and getgroups) + # * 'checkpassword': Check user login password + # * 'getmails': Retrieve mail aliases +} + # Service overview page (disabled if empty) SERVICES=[ # # Title is mandatory, all other fields are optional. -- GitLab