Skip to content
Snippets Groups Projects
Commit 39794116 authored by Julian's avatar Julian
Browse files

Added api module with endpoints getusers, getgroups and checkpassword

parent 17c8e40e
Branches
No related tags found
No related merge requests found
...@@ -67,10 +67,10 @@ def create_app(test_config=None): # pylint: disable=too-many-locals ...@@ -67,10 +67,10 @@ def create_app(test_config=None): # pylint: disable=too-many-locals
db.init_app(app) db.init_app(app)
Migrate(app, db, render_as_batch=True) Migrate(app, db, render_as_batch=True)
# pylint: disable=C0415 # pylint: disable=C0415
from uffd import user, selfservice, role, mail, session, csrf, mfa, oauth2, services, signup, rolemod, invite from uffd import user, selfservice, role, mail, session, csrf, mfa, oauth2, services, signup, rolemod, invite, api
# pylint: enable=C0415 # pylint: enable=C0415
for i in user.bp + selfservice.bp + role.bp + mail.bp + session.bp + csrf.bp + mfa.bp + oauth2.bp + services.bp + rolemod.bp: for i in user.bp + selfservice.bp + role.bp + mail.bp + session.bp + csrf.bp + mfa.bp + oauth2.bp + services.bp + rolemod.bp + api.bp:
app.register_blueprint(i) app.register_blueprint(i)
if app.config['LDAP_SERVICE_USER_BIND'] and (app.config['ENABLE_INVITE'] or if app.config['LDAP_SERVICE_USER_BIND'] and (app.config['ENABLE_INVITE'] or
......
from .views import bp as _bp
bp = [_bp]
import functools
from flask import Blueprint, jsonify, current_app, request, abort
from uffd.user.models import User, Group
from uffd.session.views import login_get_user, login_ratelimit
bp = Blueprint('api', __name__, template_folder='templates', url_prefix='/api/v1/')
def apikey_required(scope=None):
def wrapper(func):
@functools.wraps(func)
def decorator(*args, **kwargs):
if 'Authorization' not in request.headers or not request.headers['Authorization'].startswith('Bearer '):
return 'Unauthorized', 401, {'WWW-Authenticate': 'Bearer'}
token = request.headers['Authorization'][7:].strip()
request.api_client = current_app.config['API_CLIENTS'].get(token)
if request.api_client is None:
return 'Unauthorized', 401, {'WWW-Authenticate': 'Bearer error="invalid_token"'}
if scope is not None and scope not in request.api_client.get('scopes', []):
return 'Unauthorized', 401, {'WWW-Authenticate': 'Bearer error="insufficient_scope",scope="%s"'%scope}
return func(*args, **kwargs)
return decorator
return wrapper
def generate_group_dict(group):
return {'id': group.gid, 'name': group.name,
'members': [user.loginname for user in group.members]}
@bp.route('/getgroups', methods=['GET', 'POST'])
@apikey_required()
def getgroups():
if len(request.values) > 1:
abort(400)
key = (list(request.values.keys()) or [None])[0]
values = request.values.getlist(key)
if key is None:
groups = Group.query.all()
elif key == 'id' and len(values) == 1:
groups = Group.query.filter_by(gid=values[0]).all()
elif key == 'name' and len(values) == 1:
groups = Group.query.filter_by(name=values[0]).all()
elif key == 'member' and len(values) == 1:
user = User.query.filter_by(loginname=values[0]).one_or_none()
groups = [] if user is None else user.groups
else:
abort(400)
return jsonify([generate_group_dict(group) for group in groups])
def generate_user_dict(user):
return {'id': user.uid, 'loginname': user.loginname, 'email': user.mail, 'displayname': user.displayname,
'groups': [group.name for group in user.groups]}
@bp.route('/getusers', methods=['GET', 'POST'])
@apikey_required()
def getusers():
if len(request.values) > 1:
abort(400)
key = (list(request.values.keys()) or [None])[0]
values = request.values.getlist(key)
Group.query.all() # Fill object cache for better preformance
if key is None:
users = User.query.all()
elif key == 'id' and len(values) == 1:
users = User.query.filter_by(uid=values[0]).all()
elif key == 'loginname' and len(values) == 1:
users = User.query.filter_by(loginname=values[0]).all()
elif key == 'email' and len(values) == 1:
users = User.query.filter_by(mail=values[0]).all()
elif key == 'group' and len(values) == 1:
group = Group.query.filter_by(name=values[0]).one_or_none()
users = [] if group is None else group.members
else:
abort(400)
return jsonify([generate_user_dict(user) for user in users])
@bp.route('/checkpassword', methods=['POST'])
@apikey_required('checkpassword')
def checkpassword():
if set(request.values.keys()) != {'loginname', 'password'}:
abort(400)
username = request.form['loginname']
password = request.form['password']
login_delay = login_ratelimit.get_delay(username)
if login_delay:
return 'Too Many Requests', 429, {'Retry-After': '%d'%login_delay}
user = login_get_user(username, password)
if user is None:
login_ratelimit.log(username)
return jsonify(None)
return jsonify(generate_user_dict(user))
...@@ -95,6 +95,10 @@ OAUTH2_CLIENTS={ ...@@ -95,6 +95,10 @@ OAUTH2_CLIENTS={
# ... 'required_group': ['groupa', ['groupb', 'groupc']] ... allows users with group "groupa" as well as users with both "groupb" and "groupc" access # ... 'required_group': ['groupa', ['groupb', 'groupc']] ... allows users with group "groupa" as well as users with both "groupb" and "groupc" access
} }
API_CLIENTS={
#'token': {'scopes': ['checkpassword']}
}
# Service overview page (disabled if empty) # Service overview page (disabled if empty)
SERVICES=[ SERVICES=[
# # Title is mandatory, all other fields are optional. # # Title is mandatory, all other fields are optional.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment