Forked from
uffd / uffd
326 commits behind the upstream repository.
test_oauth2.py 5.42 KiB
import datetime
from urllib.parse import urlparse, parse_qs
from flask import url_for
# These imports are required, because otherwise we get circular imports?!
from uffd import ldap, user
from uffd.session.views import get_current_user
from uffd.user.models import User
from uffd.oauth2.models import OAuth2Client
from uffd import create_app, db, ldap
from utils import dump, UffdTestCase
def get_user():
return User.from_ldap_dn('uid=testuser,ou=users,dc=example,dc=com')
def get_admin():
return User.from_ldap_dn('uid=testadmin,ou=users,dc=example,dc=com')
class TestOAuth2Client(UffdTestCase):
def setUpApp(self):
self.app.config['OAUTH2_CLIENTS'] = {
'test': {'client_secret': 'testsecret', 'redirect_uris': ['http://localhost:5009/callback', 'http://localhost:5009/callback2']},
'test1': {'client_secret': 'testsecret1', 'redirect_uris': ['http://localhost:5008/callback'], 'required_group': 'users'},
}
def test_from_id(self):
client = OAuth2Client.from_id('test')
self.assertEqual(client.client_id, 'test')
self.assertEqual(client.client_secret, 'testsecret')
self.assertEqual(client.redirect_uris, ['http://localhost:5009/callback', 'http://localhost:5009/callback2'])
self.assertEqual(client.default_redirect_uri, 'http://localhost:5009/callback')
self.assertEqual(client.default_scopes, ['profile'])
self.assertEqual(client.client_type, 'confidential')
client = OAuth2Client.from_id('test1')
self.assertEqual(client.client_id, 'test1')
self.assertEqual(client.required_group, 'users')
def test_access_allowed(self):
user = get_user() # has 'users' and 'uffd_access' group
admin = get_admin() # has 'users', 'uffd_access' and 'uffd_admin' group
client = OAuth2Client('test', '', [''], None)
self.assertTrue(client.access_allowed(user))
self.assertTrue(client.access_allowed(admin))
client = OAuth2Client('test', '', [''], 'users')
self.assertTrue(client.access_allowed(user))
self.assertTrue(client.access_allowed(admin))
client = OAuth2Client('test', '', [''], 'notagroup')
self.assertFalse(client.access_allowed(user))
self.assertFalse(client.access_allowed(admin))
client = OAuth2Client('test', '', [''], 'uffd_admin')
self.assertFalse(client.access_allowed(user))
self.assertTrue(client.access_allowed(admin))
client = OAuth2Client('test', '', [''], ['uffd_admin'])
self.assertFalse(client.access_allowed(user))
self.assertTrue(client.access_allowed(admin))
client = OAuth2Client('test', '', [''], ['uffd_admin', 'notagroup'])
self.assertFalse(client.access_allowed(user))
self.assertTrue(client.access_allowed(admin))
client = OAuth2Client('test', '', [''], ['notagroup', 'uffd_admin' ])
self.assertFalse(client.access_allowed(user))
self.assertTrue(client.access_allowed(admin))
client = OAuth2Client('test', '', [''], ['uffd_admin', 'users'])
self.assertTrue(client.access_allowed(user))
self.assertTrue(client.access_allowed(admin))
client = OAuth2Client('test', '', [''], ['uffd_admin', 'users'])
self.assertTrue(client.access_allowed(user))
self.assertTrue(client.access_allowed(admin))
client = OAuth2Client('test', '', [''], [['uffd_admin', 'users'], ['users', 'uffd_access']])
self.assertTrue(client.access_allowed(user))
self.assertTrue(client.access_allowed(admin))
client = OAuth2Client('test', '', [''], ['uffd_admin', ['users', 'notagroup']])
self.assertFalse(client.access_allowed(user))
self.assertTrue(client.access_allowed(admin))
class TestViews(UffdTestCase):
def setUpApp(self):
self.app.config['OAUTH2_CLIENTS'] = {
'test': {'client_secret': 'testsecret', 'redirect_uris': ['http://localhost:5009/callback', 'http://localhost:5009/callback2']},
'test1': {'client_secret': 'testsecret1', 'redirect_uris': ['http://localhost:5008/callback'], 'required_group': 'uffd_admin'},
}
def test_authorization(self):
self.client.post(path=url_for('session.login'),
data={'loginname': 'testuser', 'password': 'userpassword'}, follow_redirects=True)
state = 'teststate'
r = self.client.get(path=url_for('oauth2.authorize', response_type='code', client_id='test', state=state, redirect_uri='http://localhost:5009/callback'), follow_redirects=False)
while True:
if r.status_code != 302 or r.location.startswith('http://localhost:5009/callback'):
break
r = self.client.get(r.location, follow_redirects=False)
self.assertEqual(r.status_code, 302)
self.assertTrue(r.location.startswith('http://localhost:5009/callback'))
args = parse_qs(urlparse(r.location).query)
self.assertEqual(args['state'], [state])
code = args['code'][0]
r = self.client.post(path=url_for('oauth2.token'),
data={'grant_type': 'authorization_code', 'code': code, 'redirect_uri': 'http://localhost:5009/callback', 'client_id': 'test', 'client_secret': 'testsecret'}, follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertEqual(r.content_type, 'application/json')
self.assertEqual(r.json['token_type'], 'Bearer')
self.assertEqual(r.json['scope'], 'profile')
token = r.json['access_token']
r = self.client.get(path=url_for('oauth2.userinfo'), headers=[('Authorization', 'Bearer %s'%token)], follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertEqual(r.content_type, 'application/json')
user = get_user()
self.assertEqual(r.json['id'], user.uid)
self.assertEqual(r.json['name'], user.displayname)
self.assertEqual(r.json['nickname'], user.loginname)
self.assertEqual(r.json['email'], user.mail)
self.assertTrue(r.json.get('groups'))