Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Show changes
import csv
import io
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.selfservice import send_passwordreset
from uffd.session import login_required, is_valid_session, get_current_user
from uffd.role.models import Role
from uffd.database import db
from uffd.ldap import ldap, LDAPCommitError
from .models import User
bp = Blueprint("user", __name__, template_folder='templates', url_prefix='/user/')
@bp.before_request
@login_required()
def user_acl(): #pylint: disable=inconsistent-return-statements
if not user_acl_check():
flash('Access denied')
return redirect(url_for('index'))
def user_acl_check():
return is_valid_session() and get_current_user().is_in_group(current_app.config['ACL_ADMIN_GROUP'])
@bp.route("/")
@register_navbar('Users', icon='users', blueprint=bp, visible=user_acl_check)
def index():
return render_template('user_list.html', users=User.query.all())
@bp.route("/<int:uid>")
@bp.route("/new")
def show(uid=None):
user = User() if uid is None else User.query.filter_by(uid=uid).first_or_404()
return render_template('user.html', user=user, roles=Role.query.all())
@bp.route("/<int:uid>/update", methods=['POST'])
@bp.route("/new", methods=['POST'])
@csrf_protect(blueprint=bp)
def update(uid=None):
if uid is None:
user = User()
if not user.set_loginname(request.form['loginname']):
flash('Login name does not meet requirements')
return redirect(url_for('user.show'))
else:
user = User.query.filter_by(uid=uid).first_or_404()
if not user.set_mail(request.form['mail']):
flash('Mail is invalid')
return redirect(url_for('user.show', uid=uid))
new_displayname = request.form['displayname'] if request.form['displayname'] else request.form['loginname']
if not user.set_displayname(new_displayname):
flash('Display name does not meet requirements')
return redirect(url_for('user.show', uid=uid))
new_password = request.form.get('password')
if uid is not None and new_password:
user.set_password(new_password)
ldap.session.add(user)
user.roles.clear()
for role in Role.query.all():
if request.values.get('role-{}'.format(role.id), False) or role.name in current_app.config["ROLES_BASEROLES"]:
user.roles.add(role)
user.update_groups()
ldap.session.commit()
db.session.commit()
if uid is None:
send_passwordreset(user, new=True)
flash('User created. We sent the user a password reset link by mail')
else:
flash('User updated')
return redirect(url_for('user.show', uid=user.uid))
@bp.route("/<int:uid>/del")
@csrf_protect(blueprint=bp)
def delete(uid):
user = User.query.filter_by(uid=uid).first_or_404()
user.roles.clear()
ldap.session.delete(user)
ldap.session.commit()
db.session.commit()
flash('Deleted user')
return redirect(url_for('user.index'))
@bp.route("/csv", methods=['POST'])
@csrf_protect(blueprint=bp)
def csvimport():
csvdata = request.values.get('csv')
if not csvdata:
flash('No data for csv import!')
return redirect(url_for('user.index'))
roles = Role.query.all()
usersadded = 0
with io.StringIO(initial_value=csvdata) as csvfile:
csvreader = csv.reader(csvfile)
for row in csvreader:
if not len(row) == 3:
flash("invalid line, ignored : {}".format(row))
continue
newuser = User()
if not newuser.set_loginname(row[0]) or not newuser.set_displayname(row[0]):
flash("invalid login name, skipped : {}".format(row))
continue
if not newuser.set_mail(row[1]):
flash("invalid mail address, skipped : {}".format(row))
continue
ldap.session.add(newuser)
for role in roles:
if (str(role.id) in row[2].split(';')) or role.name in current_app.config["ROLES_BASEROLES"]:
role.members.add(newuser)
newuser.update_groups()
try:
ldap.session.commit()
db.session.commit()
except LDAPCommitError:
flash('Error adding user {}'.format(row[0]))
ldap.session.rollback()
db.session.rollback()
continue
send_passwordreset(newuser, new=True)
usersadded += 1
flash('Added {} new users'.format(usersadded))
return redirect(url_for('user.index'))
import secrets
import math
import base64
def token_with_alphabet(alphabet, nbytes=None):
'''Return random text token that consists of characters from `alphabet`'''
if nbytes is None:
nbytes = max(secrets.DEFAULT_ENTROPY, 32)
nbytes_per_char = math.log(len(alphabet), 256)
nchars = math.ceil(nbytes / nbytes_per_char)
return ''.join([secrets.choice(alphabet) for _ in range(nchars)])
def token_typeable(nbytes=None):
'''Return random text token that is easy to type (on mobile)'''
alphabet = '123456789abcdefghkmnopqrstuvwx' # No '0ijlyz'
return token_with_alphabet(alphabet, nbytes=nbytes)
def token_urlfriendly(nbytes=None):
'''Return random text token that is urlsafe and works around common parsing bugs'''
alphabet = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
return token_with_alphabet(alphabet, nbytes=nbytes)
def nopad_b32decode(value):
if isinstance(value, bytes):
value = value.decode()
return base64.b32decode(value + ('=' * (-len(value) % 8)))
def nopad_b32encode(value):
return base64.b32encode(value).rstrip(b'=')
def nopad_urlsafe_b64decode(value):
if isinstance(value, bytes):
value = value.decode()
return base64.urlsafe_b64decode(value + ('=' * (-len(value) % 4)))
def nopad_urlsafe_b64encode(value):
return base64.urlsafe_b64encode(value).rstrip(b'=')
from flask import redirect, url_for, request, render_template
from werkzeug.exceptions import Forbidden
from uffd.secure_redirect import secure_local_redirect
from . import session, selfservice, signup, oauth2, user, group, service, role, invite, api, mail, rolemod
def init_app(app):
@app.errorhandler(403)
def handle_403(error):
return render_template('403.html', description=error.description if error.description != Forbidden.description else None), 403
@app.route("/")
def index(): #pylint: disable=unused-variable
if app.config['DEFAULT_PAGE_SERVICES']:
return redirect(url_for('service.overview'))
return redirect(url_for('selfservice.index'))
@app.route('/lang', methods=['POST'])
def setlang(): #pylint: disable=unused-variable
resp = secure_local_redirect(request.values.get('ref', '/'))
if 'lang' in request.values:
resp.set_cookie('language', request.values['lang'])
return resp
app.register_blueprint(session.bp)
app.register_blueprint(selfservice.bp)
app.register_blueprint(signup.bp)
app.register_blueprint(oauth2.bp)
app.register_blueprint(user.bp)
app.register_blueprint(group.bp)
app.register_blueprint(service.bp)
app.register_blueprint(role.bp)
app.register_blueprint(invite.bp)
app.register_blueprint(api.bp)
app.register_blueprint(mail.bp)
app.register_blueprint(rolemod.bp)
app.add_url_rule("/metrics", view_func=api.prometheus_metrics)
import functools
from flask import Blueprint, jsonify, request, abort, Response
from uffd.database import db
from uffd.models import (
User, ServiceUser, Group, Mail, MailReceiveAddress, MailDestinationAddress, APIClient,
RecoveryCodeMethod, TOTPMethod, WebauthnMethod, Invite, Role, Service )
from .session import login_ratelimit
bp = Blueprint('api', __name__, template_folder='templates', url_prefix='/api/v1/')
def apikey_required(permission=None):
# pylint: disable=too-many-return-statements
if permission is not None:
assert APIClient.permission_exists(permission)
def wrapper(func):
@functools.wraps(func)
def decorator(*args, **kwargs):
if not request.authorization or not request.authorization.password:
return 'Unauthorized', 401, {'WWW-Authenticate': ['Basic realm="api"']}
client = APIClient.query.filter_by(auth_username=request.authorization.username).first()
if not client:
return 'Unauthorized', 401, {'WWW-Authenticate': ['Basic realm="api"']}
if not client.auth_password.verify(request.authorization.password):
return 'Unauthorized', 401, {'WWW-Authenticate': ['Basic realm="api"']}
if client.auth_password.needs_rehash:
client.auth_password = request.authorization.password
db.session.commit()
if permission is not None and not client.has_permission(permission):
return 'Forbidden', 403
request.api_client = client
return func(*args, **kwargs)
return decorator
return wrapper
def generate_group_dict(group):
return {
'id': group.unix_gid,
'name': group.name,
'members': [
user.loginname
for user in group.members
if not user.is_deactivated or not request.api_client.service.hide_deactivated_users
]
}
@bp.route('/getgroups', methods=['GET', 'POST'])
@apikey_required('users')
def getgroups():
if len(request.values) > 1:
abort(400)
key = (list(request.values.keys()) or [None])[0]
values = request.values.getlist(key)
query = Group.query
if key is None:
pass
elif key == 'id' and len(values) == 1:
query = query.filter(Group.unix_gid == values[0])
elif key == 'name' and len(values) == 1:
query = query.filter(Group.name == values[0])
elif key == 'member' and len(values) == 1:
query = query.join(Group.members).filter(User.loginname == values[0])
if request.api_client.service.hide_deactivated_users:
query = query.filter(db.not_(User.is_deactivated))
else:
abort(400)
# Single-result queries perform better without eager loading
if key is None or key == 'member':
query = query.options(db.selectinload(Group.members))
return jsonify([generate_group_dict(group) for group in query])
def generate_user_dict(service_user):
return {
'id': service_user.user.unix_uid,
'loginname': service_user.user.loginname,
'email': service_user.email,
'displayname': service_user.user.displayname,
'groups': [group.name for group in service_user.user.groups]
}
@bp.route('/getusers', methods=['GET', 'POST'])
@apikey_required('users')
def getusers():
if len(request.values) > 1:
abort(400)
key = (list(request.values.keys()) or [None])[0]
values = request.values.getlist(key)
query = ServiceUser.query.filter_by(service=request.api_client.service).join(ServiceUser.user)
if request.api_client.service.hide_deactivated_users:
query = query.filter(db.not_(User.is_deactivated))
if key is None:
pass
elif key == 'id' and len(values) == 1:
query = query.filter(User.unix_uid == values[0])
elif key == 'loginname' and len(values) == 1:
query = query.filter(User.loginname == values[0])
elif key == 'email' and len(values) == 1:
query = ServiceUser.filter_query_by_email(query, values[0])
elif key == 'group' and len(values) == 1:
query = query.join(User.groups).filter(Group.name == values[0])
else:
abort(400)
# Single-result queries perform better without eager loading
if key is None or key == 'group':
# pylint: disable=no-member
query = query.options(db.joinedload(ServiceUser.user).selectinload(User.groups))
query = query.options(db.joinedload(ServiceUser.user).joinedload(User.primary_email))
return jsonify([generate_user_dict(user) for user in query])
@bp.route('/checkpassword', methods=['POST'])
@apikey_required('checkpassword')
def checkpassword():
if set(request.values.keys()) != {'loginname', 'password'}:
abort(400)
username = request.form['loginname'].lower()
password = request.form['password']
login_delay = login_ratelimit.get_delay(username)
if login_delay:
return 'Too Many Requests', 429, {'Retry-After': '%d'%login_delay}
service_user = ServiceUser.query.join(User).filter(
ServiceUser.service == request.api_client.service,
User.loginname == username,
).one_or_none()
if service_user is None or not service_user.user.password.verify(password):
login_ratelimit.log(username)
return jsonify(None)
if service_user.user.is_deactivated:
return jsonify(None)
if service_user.user.password.needs_rehash:
service_user.user.password = password
db.session.commit()
return jsonify(generate_user_dict(service_user))
def generate_mail_dict(mail):
return {
'name': mail.uid,
'receive_addresses': list(mail.receivers),
'destination_addresses': list(mail.destinations)
}
@bp.route('/getmails', methods=['GET', 'POST'])
@apikey_required('mail_aliases')
def getmails():
if len(request.values) > 1:
abort(400)
key = (list(request.values.keys()) or [None])[0]
values = request.values.getlist(key)
query = Mail.query
if key is None:
pass
elif key == 'name' and len(values) == 1:
query = query.filter_by(uid=values[0])
elif key == 'receive_address' and len(values) == 1:
query = query.filter(Mail.receivers.any(MailReceiveAddress.address==values[0].lower()))
elif key == 'destination_address' and len(values) == 1:
query = query.filter(Mail.destinations.any(MailDestinationAddress.address==values[0]))
else:
abort(400)
return jsonify([generate_mail_dict(mail) for mail in query])
@bp.route('/resolve-remailer', methods=['GET', 'POST'])
@apikey_required('remailer')
def resolve_remailer():
if list(request.values.keys()) != ['orig_address']:
abort(400)
values = request.values.getlist('orig_address')
if len(values) != 1:
abort(400)
service_user = ServiceUser.get_by_remailer_email(values[0])
if not service_user:
return jsonify(address=None)
return jsonify(address=service_user.real_email)
@bp.route('/metrics_prometheus', methods=['GET'])
@apikey_required('metrics')
def prometheus_metrics():
import pkg_resources #pylint: disable=import-outside-toplevel
from prometheus_client.core import CollectorRegistry, CounterMetricFamily, InfoMetricFamily #pylint: disable=import-outside-toplevel
from prometheus_client import PLATFORM_COLLECTOR, generate_latest, CONTENT_TYPE_LATEST #pylint: disable=import-outside-toplevel
class UffdCollector():
def collect(self):
try:
uffd_version = str(pkg_resources.get_distribution('uffd').version)
except pkg_resources.DistributionNotFound:
uffd_version = "unknown"
yield InfoMetricFamily('uffd_version', 'Various version infos', value={"version": uffd_version})
user_metric = CounterMetricFamily('uffd_users_total', 'Number of users', labels=['user_type'])
user_metric.add_metric(['regular'], value=User.query.filter_by(is_service_user=False).count())
user_metric.add_metric(['service'], User.query.filter_by(is_service_user=True).count())
yield user_metric
mfa_auth_metric = CounterMetricFamily('uffd_users_auth_mfa_total', 'mfa stats', labels=['mfa_type'])
mfa_auth_metric.add_metric(['recoverycode'], value=RecoveryCodeMethod.query.count())
mfa_auth_metric.add_metric(['totp'], value=TOTPMethod.query.count())
mfa_auth_metric.add_metric(['webauthn'], value=WebauthnMethod.query.count())
yield mfa_auth_metric
yield CounterMetricFamily('uffd_roles_total', 'Number of roles', value=Role.query.count())
role_members_metric = CounterMetricFamily('uffd_role_members_total', 'Members of a role', labels=['role_name'])
for role in Role.query.all():
role_members_metric.add_metric([role.name], value=len(role.members))
yield role_members_metric
group_metric = CounterMetricFamily('uffd_groups_total', 'Total number of groups', value=Group.query.count())
yield group_metric
invite_metric = CounterMetricFamily('uffd_invites_total', 'Number of invites', labels=['invite_state'])
invite_metric.add_metric(['used'], value=Invite.query.filter_by(used=True).count())
invite_metric.add_metric(['expired'], value=Invite.query.filter_by(expired=True).count())
invite_metric.add_metric(['disabled'], value=Invite.query.filter_by(disabled=True).count())
invite_metric.add_metric(['voided'], value=Invite.query.filter_by(voided=True).count())
invite_metric.add_metric([], value=Invite.query.count())
yield invite_metric
yield CounterMetricFamily('uffd_services_total', 'Number of services', value=Service.query.count())
registry = CollectorRegistry(auto_describe=True)
registry.register(PLATFORM_COLLECTOR)
registry.register(UffdCollector())
return Response(response=generate_latest(registry=registry),content_type=CONTENT_TYPE_LATEST)
from flask import Blueprint, render_template, current_app, request, flash, redirect, url_for
from flask_babel import lazy_gettext, gettext as _
import sqlalchemy
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.database import db
from uffd.models import Group
from .session import login_required
bp = Blueprint("group", __name__, template_folder='templates', url_prefix='/group/')
def group_acl_check():
return request.user and request.user.is_in_group(current_app.config['ACL_ADMIN_GROUP'])
@bp.before_request
@login_required(group_acl_check)
def group_acl():
pass
@bp.route("/")
@register_navbar(lazy_gettext('Groups'), icon='layer-group', blueprint=bp, visible=group_acl_check)
def index():
return render_template('group/list.html', groups=Group.query.all())
@bp.route("/<int:id>")
@bp.route("/new")
def show(id=None):
group = Group() if id is None else Group.query.get_or_404(id)
return render_template('group/show.html', group=group)
@bp.route("/<int:id>/update", methods=['POST'])
@bp.route("/new", methods=['POST'])
@csrf_protect(blueprint=bp)
def update(id=None):
if id is None:
group = Group()
if request.form['unix_gid']:
try:
group.unix_gid = int(request.form['unix_gid'])
except ValueError:
flash(_('GID is already in use or was used in the past'))
return render_template('group/show.html', group=group), 400
if not group.set_name(request.form['name']):
flash(_('Invalid name'))
return render_template('group/show.html', group=group), 400
else:
group = Group.query.get_or_404(id)
group.description = request.form['description']
db.session.add(group)
if id is None:
try:
db.session.commit()
except sqlalchemy.exc.IntegrityError:
db.session.rollback()
flash(_('Group with this name or id already exists'))
return render_template('group/show.html', group=group), 400
else:
db.session.commit()
if id is None:
flash(_('Group created'))
else:
flash(_('Group updated'))
return redirect(url_for('group.show', id=group.id))
@bp.route("/<int:id>/delete")
@csrf_protect(blueprint=bp)
def delete(id):
group = Group.query.get_or_404(id)
db.session.delete(group)
db.session.commit()
flash(_('Deleted group'))
return redirect(url_for('group.index'))
import datetime
import functools
import secrets
from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, jsonify
from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, jsonify, abort
from flask_babel import gettext as _, lazy_gettext, to_utc
import sqlalchemy
from uffd.csrf import csrf_protect
from uffd.database import db
from uffd.ldap import ldap
from uffd.session import get_current_user, login_required, is_valid_session
from uffd.role.models import Role
from uffd.invite.models import Invite, InviteSignup, InviteGrant
from uffd.user.models import User
from uffd.sendmail import sendmail
from uffd.navbar import register_navbar
from uffd.ratelimit import host_ratelimit, format_delay
from uffd.signup.views import signup_ratelimit
from uffd.database import db
from uffd.models import Role, User, Group, Invite, InviteSignup, InviteGrant, host_ratelimit, format_delay
from .session import login_required
from .signup import signup_ratelimit
from .selfservice import selfservice_acl_check
bp = Blueprint('invite', __name__, template_folder='templates', url_prefix='/invite/')
def invite_acl():
return is_valid_session() and get_current_user().is_in_group(current_app.config['ACL_ADMIN_GROUP'])
def invite_acl_required(func):
@functools.wraps(func)
@login_required()
def decorator(*args, **kwargs):
if not invite_acl():
flash('Access denied')
return redirect(url_for('index'))
return func(*args, **kwargs)
return decorator
def invite_acl_check():
if not request.user:
return False
if request.user.is_in_group(current_app.config['ACL_ADMIN_GROUP']):
return True
if request.user.is_in_group(current_app.config['ACL_SIGNUP_GROUP']):
return True
if Role.query.join(Role.moderator_group).join(Group.members).filter(User.id==request.user.id).count():
return True
return False
def view_acl_filter(user):
if user.is_in_group(current_app.config['ACL_ADMIN_GROUP']):
return sqlalchemy.true()
creator_filter = (Invite.creator == user)
rolemod_filter = Invite.roles.any(Role.moderator_group.has(Group.id.in_([group.id for group in user.groups])))
return creator_filter | rolemod_filter
def reset_acl_filter(user):
if user.is_in_group(current_app.config['ACL_ADMIN_GROUP']):
return sqlalchemy.true()
return Invite.creator == user
@bp.route('/')
@register_navbar('Invites', icon='link', blueprint=bp, visible=invite_acl)
@invite_acl_required
@register_navbar(lazy_gettext('Invites'), icon='link', blueprint=bp, visible=invite_acl_check)
@login_required(invite_acl_check)
def index():
return render_template('invite/list.html', invites=Invite.query.all())
invites = Invite.query.filter(view_acl_filter(request.user)).all()
return render_template('invite/list.html', invites=invites)
@bp.route('/new')
@invite_acl_required
@login_required(invite_acl_check)
def new():
return render_template('invite/new.html', roles=Role.query.all())
if request.user.is_in_group(current_app.config['ACL_ADMIN_GROUP']):
allow_signup = True
roles = Role.query.all()
else:
allow_signup = request.user.is_in_group(current_app.config['ACL_SIGNUP_GROUP'])
roles = Role.query.join(Role.moderator_group).join(Group.members).filter(User.id==request.user.id).all()
return render_template('invite/new.html', roles=roles, allow_signup=allow_signup)
def parse_datetime_local_input(value):
return to_utc(datetime.datetime.fromisoformat(value))
@bp.route('/new', methods=['POST'])
@invite_acl_required
@login_required(invite_acl_check)
@csrf_protect(blueprint=bp)
def new_submit():
invite = Invite(single_use=(request.values['single-use'] == '1'),
valid_until=datetime.datetime.fromisoformat(request.values['valid-until']),
allow_signup=(request.values['allow-signup'] == '1'))
invite = Invite(creator=request.user,
single_use=(request.values['single-use'] == '1'),
valid_until=parse_datetime_local_input(request.values['valid-until']),
allow_signup=(request.values.get('allow-signup', '0') == '1'))
for key, value in request.values.items():
if key.startswith('role-') and value == '1':
role = Role.query.get(key[5:])
invite.roles.append(role)
invite.roles.append(Role.query.get(key[5:]))
if invite.valid_until > datetime.datetime.utcnow() + datetime.timedelta(days=current_app.config['INVITE_MAX_VALID_DAYS']):
flash(_('The "Expires After" date is too far in the future'))
return new()
if not invite.permitted:
flash(_('You are not allowed to create invite links with these permissions'))
return new()
if not invite.allow_signup and not invite.roles:
flash(_('Invite link must either allow signup or grant at least one role'))
return new()
db.session.add(invite)
db.session.commit()
return redirect(url_for('invite.index'))
@bp.route('/<token>/disable', methods=['POST'])
@invite_acl_required
@bp.route('/<int:invite_id>/disable', methods=['POST'])
@login_required(invite_acl_check)
@csrf_protect(blueprint=bp)
def disable(token):
Invite.query.get_or_404(token).disable()
def disable(invite_id):
invite = Invite.query.filter(view_acl_filter(request.user)).filter_by(id=invite_id).first_or_404()
invite.disable()
db.session.commit()
return redirect(url_for('.index'))
@bp.route('/<token>/reset', methods=['POST'])
@invite_acl_required
@bp.route('/<int:invite_id>/reset', methods=['POST'])
@login_required(invite_acl_check)
@csrf_protect(blueprint=bp)
def reset(token):
Invite.query.get_or_404(token).reset()
def reset(invite_id):
invite = Invite.query.filter(reset_acl_filter(request.user)).filter_by(id=invite_id).first_or_404()
invite.reset()
db.session.commit()
return redirect(url_for('.index'))
@bp.route('/<token>')
def use(token):
invite = Invite.query.get_or_404(token)
@bp.route('/<int:invite_id>/<token>')
def use(invite_id, token):
invite = Invite.query.get(invite_id)
if not invite or not secrets.compare_digest(invite.token, token):
abort(404)
if not invite.active:
flash('Invalid invite link')
flash(_('Invalid invite link'))
return redirect('/')
return render_template('invite/use.html', invite=invite)
@bp.route('/<token>/grant', methods=['POST'])
@login_required()
def grant(token):
invite = Invite.query.get_or_404(token)
invite_grant = InviteGrant(invite=invite, user=get_current_user())
@bp.route('/<int:invite_id>/<token>/grant', methods=['POST'])
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def grant(invite_id, token):
invite = Invite.query.get(invite_id)
if not invite or not secrets.compare_digest(invite.token, token):
abort(404)
invite_grant = InviteGrant(invite=invite, user=request.user)
db.session.add(invite_grant)
success, msg = invite_grant.apply()
if not success:
flash(msg)
return redirect(url_for('selfservice.index'))
ldap.session.commit()
db.session.commit()
flash('Roles successfully updated')
flash(_('Roles successfully updated'))
return redirect(url_for('selfservice.index'))
@bp.url_defaults
def inject_invite_token(endpoint, values):
if endpoint in ['invite.signup_submit', 'invite.signup_check'] and 'token' in request.view_args:
values['token'] = request.view_args['token']
@bp.route('/<token>/signup')
def signup_start(token):
invite = Invite.query.get_or_404(token)
if endpoint in ['invite.signup_submit', 'invite.signup_check']:
if 'invite_id' in request.view_args:
values['invite_id'] = request.view_args['invite_id']
if 'token' in request.view_args:
values['token'] = request.view_args['token']
@bp.route('/<int:invite_id>/<token>/signup')
def signup_start(invite_id, token):
invite = Invite.query.get(invite_id)
if not invite or not secrets.compare_digest(invite.token, token):
abort(404)
if not invite.active:
flash('Invalid invite link')
flash(_('Invalid invite link'))
return redirect('/')
if not invite.allow_signup:
flash('Invite link does not allow signup')
flash(_('Invite link does not allow signup'))
return redirect('/')
return render_template('signup/start.html')
@bp.route('/<token>/signupcheck', methods=['POST'])
def signup_check(token):
@bp.route('/<int:invite_id>/<token>/signupcheck', methods=['POST'])
def signup_check(invite_id, token):
if host_ratelimit.get_delay():
return jsonify({'status': 'ratelimited'})
host_ratelimit.log()
invite = Invite.query.get_or_404(token)
invite = Invite.query.get(invite_id)
if not invite or not secrets.compare_digest(invite.token, token):
abort(404)
if not invite.active or not invite.allow_signup:
return jsonify({'status': 'error'}), 403
if not User().set_loginname(request.form['loginname']):
......@@ -125,17 +166,23 @@ def signup_check(token):
return jsonify({'status': 'exists'})
return jsonify({'status': 'ok'})
@bp.route('/<token>/signup', methods=['POST'])
def signup_submit(token):
invite = Invite.query.get_or_404(token)
@bp.route('/<int:invite_id>/<token>/signup', methods=['POST'])
def signup_submit(invite_id, token):
invite = Invite.query.get(invite_id)
if not invite or not secrets.compare_digest(invite.token, token):
abort(404)
if request.form['password1'] != request.form['password2']:
return render_template('signup/start.html', error='Passwords do not match')
flash(_('Passwords do not match'), 'error')
return render_template('signup/start.html')
signup_delay = signup_ratelimit.get_delay(request.form['mail'])
host_delay = host_ratelimit.get_delay()
if signup_delay and signup_delay > host_delay:
return render_template('signup/start.html', error='Too many signup requests with this mail address! Please wait %s.'%format_delay(signup_delay))
flash(_('Too many signup requests with this mail address! Please wait %(delay)s.',
delay=format_delay(signup_delay)), 'error')
return render_template('signup/start.html')
if host_delay:
return render_template('signup/start.html', error='Too many requests! Please wait %s.'%format_delay(host_delay))
flash(_('Too many requests! Please wait %(delay)s.', delay=format_delay(host_delay)), 'error')
return render_template('signup/start.html')
host_ratelimit.log()
signup = InviteSignup(invite=invite, loginname=request.form['loginname'],
displayname=request.form['displayname'],
......@@ -143,11 +190,13 @@ def signup_submit(token):
password=request.form['password1'])
valid, msg = signup.validate()
if not valid:
return render_template('signup/start.html', error=msg)
flash(msg, 'error')
return render_template('signup/start.html')
db.session.add(signup)
db.session.commit()
sent = sendmail(signup.mail, 'Confirm your mail address', 'signup/mail.txt', signup=signup)
if not sent:
return render_template('signup/start.html', error='Cound not send mail')
flash(_('Could not send mail'), 'error')
return render_template('signup/start.html')
signup_ratelimit.log(request.form['mail'])
return render_template('signup/submitted.html', signup=signup)
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app
from flask_babel import gettext as _, lazy_gettext
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.ldap import ldap
from uffd.session import login_required, is_valid_session, get_current_user
from uffd.mail.models import Mail
from uffd.database import db
from uffd.models import Mail
from .session import login_required
bp = Blueprint("mail", __name__, template_folder='templates', url_prefix='/mail/')
@bp.before_request
@login_required()
def mail_acl(): #pylint: disable=inconsistent-return-statements
if not mail_acl_check():
flash('Access denied')
return redirect(url_for('index'))
def mail_acl_check():
return is_valid_session() and get_current_user().is_in_group(current_app.config['ACL_ADMIN_GROUP'])
return request.user and request.user.is_in_group(current_app.config['ACL_ADMIN_GROUP'])
@bp.before_request
@login_required(mail_acl_check)
def mail_acl():
pass
@bp.route("/")
@register_navbar('Mail', icon='envelope', blueprint=bp, visible=mail_acl_check)
@register_navbar(lazy_gettext('Forwardings'), icon='envelope', blueprint=bp, visible=mail_acl_check)
def index():
return render_template('mail_list.html', mails=Mail.query.all())
return render_template('mail/list.html', mails=Mail.query.all())
@bp.route("/<uid>")
@bp.route("/<int:mail_id>")
@bp.route("/new")
def show(uid=None):
mail = Mail()
if uid is not None:
mail = Mail.query.filter_by(uid=uid).first_or_404()
return render_template('mail.html', mail=mail)
def show(mail_id=None):
if mail_id is not None:
mail = Mail.query.get_or_404(mail_id)
else:
mail = Mail()
return render_template('mail/show.html', mail=mail)
@bp.route("/<uid>/update", methods=['POST'])
@bp.route("/<int:mail_id>/update", methods=['POST'])
@bp.route("/new", methods=['POST'])
@csrf_protect(blueprint=bp)
def update(uid=None):
if uid is not None:
mail = Mail.query.filter_by(uid=uid).first_or_404()
def update(mail_id=None):
if mail_id is not None:
mail = Mail.query.get_or_404(mail_id)
else:
mail = Mail(uid=request.form.get('mail-uid'))
mail.receivers = request.form.get('mail-receivers', '').splitlines()
mail.destinations = request.form.get('mail-destinations', '').splitlines()
ldap.session.add(mail)
ldap.session.commit()
flash('Mail mapping updated.')
return redirect(url_for('mail.show', uid=mail.uid))
@bp.route("/<uid>/del")
if mail.invalid_receivers:
for addr in mail.invalid_receivers:
flash(_('Invalid receive address: %(mail_address)s', mail_address=addr))
return render_template('mail/show.html', mail=mail)
db.session.add(mail)
db.session.commit()
flash(_('Mail mapping updated.'))
return redirect(url_for('mail.show', mail_id=mail.id))
@bp.route("/<int:mail_id>/del")
@csrf_protect(blueprint=bp)
def delete(uid):
mail = Mail.query.filter_by(uid=uid).first_or_404()
ldap.session.delete(mail)
ldap.session.commit()
flash('Deleted mail mapping.')
def delete(mail_id):
mail = Mail.query.get_or_404(mail_id)
db.session.delete(mail)
db.session.commit()
flash(_('Deleted mail mapping.'))
return redirect(url_for('mail.index'))
import urllib.parse
import time
import json
from flask import Blueprint, request, jsonify, render_template, session, redirect, url_for, flash, abort
from flask_babel import gettext as _
from sqlalchemy.exc import IntegrityError
import jwt
from uffd.secure_redirect import secure_local_redirect
from uffd.database import db
from uffd.models import (
DeviceLoginConfirmation, OAuth2Client, OAuth2Grant, OAuth2Token, OAuth2DeviceLoginInitiation,
host_ratelimit, format_delay, OAuth2Key,
)
def get_issuer():
return request.host_url.rstrip('/')
OIDC_SCOPES = {
# From https://openid.net/specs/openid-connect-core-1_0.html
'openid': {
# "The sub (subject) Claim MUST always be returned in the UserInfo Response."
'sub': None,
},
'profile': {
'name': None,
'family_name': None,
'given_name': None,
'middle_name': None,
'nickname': None,
'preferred_username': None,
'profile': None,
'picture': None,
'website': None,
'gender': None,
'birthdate': None,
'zoneinfo': None,
'locale': None,
'updated_at': None,
},
'email': {
'email': None,
'email_verified': None,
},
# Custom scopes
'groups': {
'groups': None,
},
}
OIDC_CLAIMS = {
'sub': lambda service_user: str(service_user.user.unix_uid),
'name': lambda service_user: service_user.user.displayname,
'preferred_username': lambda service_user: service_user.user.loginname,
'email': lambda service_user: service_user.email,
'email_verified': lambda service_user: service_user.email_verified,
# RFC 9068 registers the "groups" claim with a syntax taken from SCIM (RFC 7643)
# that is different from what we use here. The plain list of names syntax we use
# is far more common in the context of id_token/userinfo claims.
'groups': lambda service_user: [group.name for group in service_user.user.groups]
}
def render_claims(scopes, claims, service_user):
claims = dict(claims)
for scope in scopes:
claims.update(OIDC_SCOPES.get(scope, {}))
# This would be a good place to enforce permissions on available claims
res = {}
for claim, func in OIDC_CLAIMS.items():
if claim in claims:
res[claim] = func(service_user=service_user)
return res
bp = Blueprint('oauth2', __name__, template_folder='templates')
@bp.route('/.well-known/openid-configuration')
def discover():
return jsonify({
'issuer': get_issuer(),
'authorization_endpoint': url_for('oauth2.authorize', _external=True),
'token_endpoint': url_for('oauth2.token', _external=True),
'userinfo_endpoint': url_for('oauth2.userinfo', _external=True),
'jwks_uri': url_for('oauth2.keys', _external=True),
'scopes_supported': sorted(OIDC_SCOPES.keys()),
'response_types_supported': ['code'],
'grant_types_supported': ['authorization_code'],
'id_token_signing_alg_values_supported': OAuth2Key.get_available_algorithms(),
'subject_types_supported': ['public'],
'token_endpoint_auth_methods_supported': ['client_secret_basic', 'client_secret_post'],
'claims_supported': sorted(['iat', 'exp', 'aud', 'iss'] + list(OIDC_CLAIMS.keys())),
'claims_parameter_supported': True,
'request_uri_parameter_supported': False, # default is True
})
@bp.route('/oauth2/keys')
def keys():
return jsonify({
'keys': [key.public_key_jwks_dict for key in OAuth2Key.query.filter_by(active=True)],
}), 200, {'Cache-Control': ['max-age=86400, public, must-revalidate, no-transform=true']}
def oauth2_redirect(**extra_args):
urlparts = urllib.parse.urlparse(request.oauth2_redirect_uri)
args = urllib.parse.parse_qs(urlparts.query)
if 'state' in request.args:
args['state'] = request.args['state']
for key, value in extra_args.items():
if value is not None:
args[key] = [value]
return redirect(urlparts._replace(query=urllib.parse.urlencode(args, doseq=True)).geturl())
class OAuth2Error(Exception):
ERROR: str
def __init__(self, error_description=None):
self.error_description = error_description
@property
def params(self):
res = {'error': self.ERROR}
if self.error_description:
res['error_description'] = self.error_description
return res
# RFC 6749: OAuth 2.0
class InvalidRequestError(OAuth2Error):
ERROR = 'invalid_request'
class UnsupportedResponseTypeError(OAuth2Error):
ERROR = 'unsupported_response_type'
class InvalidScopeError(OAuth2Error):
ERROR = 'invalid_scope'
class InvalidClientError(OAuth2Error):
ERROR = 'invalid_client'
class UnsupportedGrantTypeError(OAuth2Error):
ERROR = 'unsupported_grant_type'
class InvalidGrantError(OAuth2Error):
ERROR = 'invalid_grant'
class AccessDeniedError(OAuth2Error):
ERROR = 'access_denied'
def __init__(self, flash_message=None, **kwargs):
self.flash_message = flash_message
super().__init__(**kwargs)
# RFC 6750: OAuth 2.0 Bearer Token Usage
class InvalidTokenError(OAuth2Error):
ERROR = 'invalid_token'
# OpenID Connect Core 1.0
class LoginRequiredError(OAuth2Error):
ERROR = 'login_required'
def __init__(self, response=None, flash_message=None, **kwargs):
self.response = response
self.flash_message = flash_message
super().__init__(**kwargs)
class RequestNotSupportedError(OAuth2Error):
ERROR = 'request_not_supported'
class RequestURINotSupportedError(OAuth2Error):
ERROR = 'request_uri_not_supported'
def authorize_validate_request():
request.oauth2_redirect_uri = None
for param in request.args:
if len(request.args.getlist(param)) > 1:
raise InvalidRequestError(error_description=f'Duplicate parameter {param}')
if 'client_id' not in request.args:
raise InvalidRequestError(error_description='Required parameter client_id missing')
client_id = request.args['client_id']
client = OAuth2Client.query.filter_by(client_id=client_id).one_or_none()
if not client:
raise InvalidRequestError(error_description=f'Unknown client {client_id}')
redirect_uri = request.args.get('redirect_uri')
if redirect_uri and redirect_uri not in client.redirect_uris:
raise InvalidRequestError(error_description='Invalid redirect_uri')
request.oauth2_redirect_uri = redirect_uri or client.default_redirect_uri
if not request.oauth2_redirect_uri:
raise InvalidRequestError(error_description='Parameter redirect_uri required')
if 'response_type' not in request.args:
raise InvalidRequestError(error_description='Required parameter response_type missing')
response_type = request.args['response_type']
if response_type != 'code':
raise UnsupportedResponseTypeError(error_description='Unsupported response type')
scopes = {scope for scope in request.args.get('scope', '').split(' ') if scope} or {'profile'}
if scopes == {'profile'}:
pass # valid plain OAuth2 scopes
elif 'openid' in scopes:
# OIDC core spec: "Scope values used that are not understood by an implementation SHOULD be ignored."
# Since we don't support some of the optional scope values defined by the
# spec (phone, address, offline_access), it's probably best to ignore all
# unknown scopes.
pass # valid OIDC scopes
else:
raise InvalidScopeError(error_description='Unknown scope')
return OAuth2Grant(
client=client,
# redirect_uri is None if not present in request! This affects token request validation.
redirect_uri=redirect_uri,
scopes=scopes,
)
def authorize_validate_request_oidc(grant):
nonce = request.args.get('nonce')
claims = json.loads(request.args['claims']) if 'claims' in request.args else None
if 'request' in request.args:
raise RequestNotSupportedError()
if 'request_uri' in request.args:
raise RequestURINotSupportedError()
prompt_values = {value for value in request.args.get('prompt', '').split(' ') if value}
if 'none' in prompt_values and prompt_values != {'none'}:
raise InvalidRequestError(error_description='Invalid usage of none prompt parameter value')
sub_value = None
if claims and claims.get('id_token', {}).get('sub', {}).get('value') is not None:
sub_value = claims['id_token']['sub']['value']
if 'id_token_hint' in request.args:
try:
id_token = OAuth2Key.decode_jwt(
request.args['id_token_hint'],
issuer=get_issuer(),
options={'verify_exp': False, 'verify_aud': False}
)
except (jwt.exceptions.InvalidTokenError, jwt.exceptions.InvalidKeyError) as err:
raise InvalidRequestError(error_description='Invalid id_token_hint value') from err
if sub_value is not None and id_token['sub'] != sub_value:
raise InvalidRequestError(error_description='Ambiguous sub values in claims and id_token_hint')
sub_value = id_token['sub']
# We "MUST only send a positive response if the End-User identified by that
# sub value has an active session with the Authorization Server or has been
# Authenticated as a result of the request". However, we currently cannot
# display the login page if there is already a valid session. So we can only
# support sub_value in combination with prompt=none for now.
if sub_value is not None and 'none' not in prompt_values:
raise InvalidRequestError(error_description='id_token_hint or sub claim value not supported without prompt=none')
grant.nonce = nonce
grant.claims = claims
return grant, sub_value, prompt_values
def authorize_user(client):
if request.session:
return request.session
if 'devicelogin_started' in session:
del session['devicelogin_started']
host_delay = host_ratelimit.get_delay()
if host_delay:
raise LoginRequiredError(
flash_message=_(
'We received too many requests from your ip address/network! Please wait at least %(delay)s.',
delay=format_delay(host_delay)
),
response=redirect(url_for('session.login', ref=request.full_path, devicelogin=True))
)
host_ratelimit.log()
initiation = OAuth2DeviceLoginInitiation(client=client)
db.session.add(initiation)
try:
db.session.commit()
except IntegrityError as err:
raise LoginRequiredError(
flash_message=_('Device login is currently not available. Try again later!'),
response=redirect(url_for('session.login', ref=request.values['ref'], devicelogin=True))
) from err
session['devicelogin_id'] = initiation.id
session['devicelogin_secret'] = initiation.secret
raise LoginRequiredError(response=redirect(url_for('session.devicelogin', ref=request.full_path)))
if 'devicelogin_id' in session and 'devicelogin_secret' in session and 'devicelogin_confirmation' in session:
initiation = OAuth2DeviceLoginInitiation.query.filter_by(
id=session['devicelogin_id'],
secret=session['devicelogin_secret'],
client=client
).one_or_none()
confirmation = DeviceLoginConfirmation.query.get(session['devicelogin_confirmation'])
del session['devicelogin_id']
del session['devicelogin_secret']
del session['devicelogin_confirmation']
if not initiation or initiation.expired or not confirmation or confirmation.session.user.is_deactivated:
raise LoginRequiredError(
flash_message=_('Device login failed'),
response=redirect(url_for('session.login', ref=request.full_path, devicelogin=True))
)
db.session.delete(initiation)
db.session.commit()
return confirmation.session
raise LoginRequiredError(
flash_message=_('You need to login to access this service'),
response=redirect(url_for('session.login', ref=request.full_path, devicelogin=True))
)
@bp.route('/oauth2/authorize')
def authorize():
is_oidc = 'openid' in request.args.get('scope', '').split(' ')
try:
grant = authorize_validate_request()
sub_value, prompt_values = None, []
if is_oidc:
grant, sub_value, prompt_values = authorize_validate_request_oidc(grant)
except OAuth2Error as err:
# Correct OAuth2/OIDC error handling would be to redirect back to the
# client with an error paramter, unless client_id or redirect_uri is
# invalid. However, uffd never did that before adding OIDC support and
# many applications fail to correctly handle this case. As a compromise
# we report errors correctly in OIDC mode and don't in plain OAuth2 mode.
if is_oidc and request.oauth2_redirect_uri:
return oauth2_redirect(**err.params)
return render_template('oauth2/error.html', **err.params), 400
try:
_session = authorize_user(grant.client)
if sub_value is not None and str(_session.user.unix_uid) != sub_value:
# We only reach this point in OIDC requests with prompt=none, see
# authorize_validate_request_oidc. So this LoginRequiredError is
# always returned as a redirect back to the client.
raise LoginRequiredError()
if not grant.client.access_allowed(_session.user):
raise AccessDeniedError(flash_message=_(
"You don't have the permission to access the service <b>%(service_name)s</b>.",
service_name=grant.client.service.name
))
grant.session = _session
except LoginRequiredError as err:
# We abuse LoginRequiredError to signal a redirect to the login page
if is_oidc and 'none' in prompt_values:
err.error_description = 'Login required but prompt value set to none'
return oauth2_redirect(**err.params)
if err.flash_message:
flash(err.flash_message)
return err.response
except AccessDeniedError as err:
if is_oidc and request.oauth2_redirect_uri:
return oauth2_redirect(**err.params)
abort(403, description=err.flash_message)
db.session.add(grant)
db.session.commit()
return oauth2_redirect(code=grant.code)
def token_authenticate_client():
for param in ('client_id', 'client_secret'):
if len(request.form.getlist(param)) > 1:
raise InvalidRequestError(error_description=f'Duplicate parameter {param}')
if request.authorization:
client_id = urllib.parse.unquote(request.authorization.username)
client_secret = urllib.parse.unquote(request.authorization.password)
if request.form.get('client_id', client_id) != client_id:
raise InvalidRequestError(error_description='Ambiguous parameter client_id')
if 'client_secret' in request.form:
raise InvalidRequestError(error_description='Ambiguous parameter client_secret')
elif 'client_id' in request.form and 'client_secret' in request.form:
client_id = request.form['client_id']
client_secret = request.form['client_secret']
else:
raise InvalidClientError()
client = OAuth2Client.query.filter_by(client_id=client_id).one_or_none()
if client is None or not client.client_secret.verify(client_secret):
raise InvalidClientError()
if client.client_secret.needs_rehash:
client.client_secret = client_secret
db.session.commit()
return client
def token_validate_request(client):
for param in ('grant_type', 'code', 'redirect_uri'):
if len(request.form.getlist(param)) > 1:
raise InvalidRequestError(error_description=f'Duplicate parameter {param}')
if 'grant_type' not in request.form:
raise InvalidRequestError(error_description='Parameter grant_type missing')
grant_type = request.form['grant_type']
if grant_type != 'authorization_code':
raise UnsupportedGrantTypeError()
if 'code' not in request.form:
raise InvalidRequestError(error_description='Parameter code missing')
code = request.form['code']
grant = OAuth2Grant.get_by_authorization_code(code)
if not grant or grant.client != client:
raise InvalidGrantError()
if grant.redirect_uri and grant.redirect_uri != request.form.get('redirect_uri'):
raise InvalidRequestError(error_description='Parameter redirect_uri missing or invalid')
return grant
@bp.route('/oauth2/token', methods=['POST'])
def token():
try:
client = token_authenticate_client()
grant = token_validate_request(client)
except InvalidClientError as err:
return jsonify(err.params), 401, {'WWW-Authenticate': ['Basic realm="oauth2"']}
except OAuth2Error as err:
return jsonify(err.params), 400
tok = grant.make_token()
db.session.add(tok)
db.session.delete(grant)
db.session.commit()
resp = {
'token_type': 'Bearer',
'access_token': tok.access_token,
'expires_in': tok.EXPIRES_IN,
'scope': ' '.join(tok.scopes),
}
if 'openid' in tok.scopes:
key = OAuth2Key.get_preferred_key()
id_token = render_claims(['openid'], (grant.claims or {}).get('id_token', {}), tok.service_user)
id_token['iss'] = get_issuer()
id_token['aud'] = tok.client.client_id
id_token['iat'] = int(time.time())
id_token['at_hash'] = key.oidc_hash(tok.access_token.encode('ascii'))
id_token['exp'] = id_token['iat'] + tok.EXPIRES_IN
if grant.nonce:
id_token['nonce'] = grant.nonce
resp['id_token'] = OAuth2Key.get_preferred_key().encode_jwt(id_token)
else:
# We don't support the refresh_token grant type. Due to limitations of
# oauthlib we always returned (disfunctional) refresh tokens in the past.
# We still do that for non-OIDC clients to not change behavour for
# existing clients.
resp['refresh_token'] = tok.refresh_token
return jsonify(resp), 200, {'Cache-Control': ['no-store']}
def validate_access_token():
if len(request.headers.getlist('Authorization')) == 1 and 'access_token' not in request.values:
auth_type, auth_value = (request.headers['Authorization'].split(' ', 1) + [''])[:2]
if auth_type.lower() != 'bearer':
raise InvalidRequestError()
access_token = auth_value
elif len(request.values.getlist('access_token')) == 1 and 'Authorization' not in request.headers:
access_token = request.values['access_token']
else:
raise InvalidClientError()
tok = OAuth2Token.get_by_access_token(access_token)
if not tok:
raise InvalidTokenError()
return tok
@bp.route('/oauth2/userinfo', methods=['GET', 'POST'])
def userinfo():
try:
tok = validate_access_token()
except OAuth2Error as err:
# RFC 6750:
# If the request lacks any authentication information (e.g., the client
# was unaware that authentication is necessary or attempted using an
# unsupported authentication method), the resource server SHOULD NOT
# include an error code or other error information.
header = 'Bearer'
if request.headers.get('Authorization', '').lower().startswith('bearer') or 'access_token' in request.values:
header += f' error="{err.ERROR}"'
return '', 401, {'WWW-Authenticate': [header]}
service_user = tok.service_user
if 'openid' in tok.scopes:
resp = render_claims(tok.scopes, (tok.claims or {}).get('userinfo', {}), service_user)
else:
resp = {
'id': service_user.user.unix_uid,
'name': service_user.user.displayname,
'nickname': service_user.user.loginname,
'email': service_user.email,
'groups': [group.name for group in service_user.user.groups],
}
return jsonify(resp), 200, {'Cache-Control': ['private']}
@bp.app_url_defaults
def inject_logout_params(endpoint, values):
if endpoint != 'oauth2.logout' or not request.session:
return
client_ids = set(token.client.client_id for token in request.session.oauth2_tokens)
if client_ids:
values['client_ids'] = ','.join(client_ids)
@bp.route('/oauth2/logout')
def logout():
if not request.values.get('client_ids'):
return secure_local_redirect(request.values.get('ref', '/'))
client_ids = request.values['client_ids'].split(',')
clients = [OAuth2Client.query.filter_by(client_id=client_id).one() for client_id in client_ids]
return render_template('oauth2/logout.html', clients=clients)
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app
from flask_babel import gettext as _, lazy_gettext
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.database import db
from uffd.models import Role, RoleGroup, Group
from .session import login_required
bp = Blueprint("role", __name__, template_folder='templates', url_prefix='/role/')
def role_acl_check():
return request.user and request.user.is_in_group(current_app.config['ACL_ADMIN_GROUP'])
@bp.before_request
@login_required(role_acl_check)
def role_acl():
pass
@bp.route("/")
@register_navbar(lazy_gettext('Roles'), icon='key', blueprint=bp, visible=role_acl_check)
def index():
return render_template('role/list.html', roles=Role.query.all())
@bp.route("/new")
def new():
return render_template('role/show.html', role=Role(), groups=Group.query.all(), roles=Role.query.all())
@bp.route("/<int:roleid>")
def show(roleid=None):
role = Role.query.get(roleid)
return render_template('role/show.html', role=role, groups=Group.query.all(), roles=Role.query.all())
@bp.route("/<int:roleid>/update", methods=['POST'])
@bp.route("/new", methods=['POST'])
@csrf_protect(blueprint=bp)
def update(roleid=None):
if roleid is None:
role = Role()
db.session.add(role)
else:
role = Role.query.get(roleid)
role.description = request.values['description']
if not role.locked:
role.name = request.values['name']
if not request.values['moderator-group']:
role.moderator_group = None
else:
role.moderator_group = Group.query.get(request.values['moderator-group'])
for included_role in Role.query.all():
if included_role != role and request.values.get('include-role-{}'.format(included_role.id)):
role.included_roles.append(included_role)
elif included_role in role.included_roles:
role.included_roles.remove(included_role)
role.groups.clear()
for group in Group.query.all():
if request.values.get(f'group-{group.id}', False):
role.groups[group] = RoleGroup(requires_mfa=bool(request.values.get(f'group-mfa-{group.id}', '')))
role.update_member_groups()
db.session.commit()
return redirect(url_for('role.show', roleid=role.id))
@bp.route("/<int:roleid>/del")
@csrf_protect(blueprint=bp)
def delete(roleid):
role = Role.query.get(roleid)
if role.locked:
flash(_('Locked roles cannot be deleted'))
return redirect(url_for('role.show', roleid=role.id))
old_members = set(role.members_effective)
role.members.clear()
db.session.delete(role)
for user in old_members:
user.update_groups()
db.session.commit()
return redirect(url_for('role.index'))
@bp.route("/<int:roleid>/unlock")
@csrf_protect(blueprint=bp)
def unlock(roleid):
role = Role.query.get(roleid)
role.locked = False
db.session.commit()
return redirect(url_for('role.show', roleid=role.id))
@bp.route("/<int:roleid>/setdefault")
@csrf_protect(blueprint=bp)
def set_default(roleid):
role = Role.query.get(roleid)
if role.is_default:
return redirect(url_for('role.show', roleid=role.id))
role.is_default = True
for user in set(role.members):
if not user.is_service_user:
role.members.remove(user)
role.update_member_groups()
db.session.commit()
return redirect(url_for('role.show', roleid=role.id))
@bp.route("/<int:roleid>/unsetdefault")
@csrf_protect(blueprint=bp)
def unset_default(roleid):
role = Role.query.get(roleid)
if not role.is_default:
return redirect(url_for('role.show', roleid=role.id))
old_members = set(role.members_effective)
role.is_default = False
for user in old_members:
if not user.is_service_user:
user.update_groups()
db.session.commit()
return redirect(url_for('role.show', roleid=role.id))
from flask import Blueprint, render_template, request, url_for, redirect, flash, abort
from flask_babel import gettext as _, lazy_gettext
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.database import db
from uffd.models import Role, User, Group
from .session import login_required
bp = Blueprint('rolemod', __name__, template_folder='templates', url_prefix='/rolemod/')
def user_is_rolemod():
return request.user and Role.query.join(Role.moderator_group).join(Group.members).filter(User.id==request.user.id).count()
@bp.before_request
@login_required()
def acl_check():
if not user_is_rolemod():
abort(403)
@bp.route("/")
@register_navbar(lazy_gettext('Moderation'), icon='user-lock', blueprint=bp, visible=user_is_rolemod)
def index():
roles = Role.query.join(Role.moderator_group).join(Group.members).filter(User.id==request.user.id).all()
return render_template('rolemod/list.html', roles=roles)
@bp.route("/<int:role_id>")
def show(role_id):
role = Role.query.get_or_404(role_id)
if role.moderator_group not in request.user.groups:
abort(403)
return render_template('rolemod/show.html', role=role)
@bp.route("/<int:role_id>", methods=['POST'])
@csrf_protect(blueprint=bp)
def update(role_id):
role = Role.query.get_or_404(role_id)
if role.moderator_group not in request.user.groups:
abort(403)
if request.form['description'] != role.description:
if len(request.form['description']) > 256:
flash(_('Description too long'))
return redirect(url_for('.show', role_id=role.id))
role.description = request.form['description']
db.session.commit()
return redirect(url_for('.show', role_id=role.id))
@bp.route("/<int:role_id>/delete_member/<int:member_id>")
@csrf_protect(blueprint=bp)
def delete_member(role_id, member_id):
role = Role.query.get_or_404(role_id)
if role.moderator_group not in request.user.groups:
abort(403)
member = User.query.get_or_404(member_id)
if member in role.members:
role.members.remove(member)
member.update_groups()
db.session.commit()
flash(_('Member removed'))
return redirect(url_for('.show', role_id=role.id))
import secrets
from flask import Blueprint, render_template, session, request, url_for, redirect, flash, current_app, abort
from flask_babel import gettext as _, lazy_gettext
from sqlalchemy.exc import IntegrityError
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.sendmail import sendmail
from uffd.database import db
from uffd.models import (
User, UserEmail, PasswordToken, Role, host_ratelimit, Ratelimit, format_delay,
Session, MFAMethod, TOTPMethod, WebauthnMethod, RecoveryCodeMethod,
)
from uffd.fido2_compat import * # pylint: disable=wildcard-import,unused-wildcard-import
from .session import login_required
bp = Blueprint("selfservice", __name__, template_folder='templates', url_prefix='/self/')
reset_ratelimit = Ratelimit('passwordreset', 1*60*60, 3)
def selfservice_acl_check():
return request.user and request.user.is_in_group(current_app.config['ACL_SELFSERVICE_GROUP'])
@bp.route("/")
@register_navbar(lazy_gettext('Selfservice'), icon='portrait', blueprint=bp, visible=selfservice_acl_check)
@login_required(selfservice_acl_check)
def index():
return render_template('selfservice/self.html', user=request.user)
@bp.route("/updateprofile", methods=(['POST']))
@csrf_protect(blueprint=bp)
@login_required(selfservice_acl_check)
def update_profile():
if request.values['displayname'] != request.user.displayname:
if request.user.set_displayname(request.values['displayname']):
flash(_('Display name changed.'))
else:
flash(_('Display name is not valid.'))
db.session.commit()
return redirect(url_for('selfservice.index'))
@bp.route("/changepassword", methods=(['POST']))
@csrf_protect(blueprint=bp)
@login_required(selfservice_acl_check)
def change_password():
if not request.values['password1'] == request.values['password2']:
flash(_('Passwords do not match'))
else:
if request.user.set_password(request.values['password1']):
flash(_('Password changed'))
else:
flash(_('Invalid password'))
db.session.commit()
return redirect(url_for('selfservice.index'))
@bp.route("/passwordreset", methods=(['GET', 'POST']))
def forgot_password():
if request.method == 'GET':
return render_template('selfservice/forgot_password.html')
loginname = request.values['loginname'].lower()
mail = request.values['mail']
reset_delay = reset_ratelimit.get_delay(loginname+'/'+mail)
host_delay = host_ratelimit.get_delay()
if reset_delay or host_delay:
if reset_delay > host_delay:
flash(_('We received too many password reset requests for this user! Please wait at least %(delay)s.', delay=format_delay(reset_delay)))
else:
flash(_('We received too many requests from your ip address/network! Please wait at least %(delay)s.', delay=format_delay(host_delay)))
return redirect(url_for('.forgot_password'))
reset_ratelimit.log(loginname+'/'+mail)
host_ratelimit.log()
flash(_("We sent a mail to this user's mail address if you entered the correct mail and login name combination"))
user = User.query.filter_by(loginname=loginname, is_deactivated=False).one_or_none()
if not user:
return redirect(url_for('session.login'))
matches = any(map(lambda email: secrets.compare_digest(email.address, mail), user.verified_emails))
if not matches:
return redirect(url_for('session.login'))
recovery_email = user.recovery_email or user.primary_email
if recovery_email.address == mail and user.is_in_group(current_app.config['ACL_SELFSERVICE_GROUP']):
send_passwordreset(user)
return redirect(url_for('session.login'))
@bp.route("/token/password/<int:token_id>/<token>", methods=(['POST', 'GET']))
def token_password(token_id, token):
dbtoken = PasswordToken.query.get(token_id)
if not dbtoken or not secrets.compare_digest(dbtoken.token, token) or \
dbtoken.expired:
flash(_('Link invalid or expired'))
return redirect(url_for('session.login'))
if request.method == 'GET':
return render_template('selfservice/set_password.html', token=dbtoken)
if not request.values['password1']:
flash(_('You need to set a password, please try again.'))
return render_template('selfservice/set_password.html', token=dbtoken)
if not request.values['password1'] == request.values['password2']:
flash(_('Passwords do not match, please try again.'))
return render_template('selfservice/set_password.html', token=dbtoken)
if not dbtoken.user.is_in_group(current_app.config['ACL_SELFSERVICE_GROUP']):
abort(403)
if not dbtoken.user.set_password(request.values['password1']):
flash(_('Password ist not valid, please try again.'))
return render_template('selfservice/set_password.html', token=dbtoken)
db.session.delete(dbtoken)
db.session.commit()
flash(_('New password set'))
return redirect(url_for('session.login'))
@bp.route("/email/new", methods=['POST'])
@login_required(selfservice_acl_check)
def add_email():
email = UserEmail(user=request.user)
if not email.set_address(request.form['address']):
flash(_('E-Mail address is invalid'))
return redirect(url_for('selfservice.index'))
try:
db.session.flush()
except IntegrityError:
flash(_('E-Mail address already exists'))
return redirect(url_for('selfservice.index'))
secret = email.start_verification()
db.session.add(email)
db.session.commit()
if not sendmail(email.address, 'Mail verification', 'selfservice/mailverification.mail.txt', user=request.user, email=email, secret=secret):
flash(_('E-Mail to "%(mail_address)s" could not be sent!', mail_address=email.address))
else:
flash(_('We sent you an email, please verify your mail address.'))
return redirect(url_for('selfservice.index'))
@bp.route("/email/<int:email_id>/verify/<secret>")
@bp.route("/token/mail_verification/<int:legacy_id>/<secret>")
@login_required(selfservice_acl_check)
def verify_email(secret, email_id=None, legacy_id=None):
if email_id is not None:
email = UserEmail.query.get(email_id)
else:
email = UserEmail.query.filter_by(verification_legacy_id=legacy_id).one()
if not email or email.verification_expired:
flash(_('Link invalid or expired'))
return redirect(url_for('selfservice.index'))
if email.user != request.user:
abort(403, description=_('This link was generated for another user. Login as the correct user to continue.'))
if not email.finish_verification(secret):
flash(_('Link invalid or expired'))
return redirect(url_for('selfservice.index'))
if legacy_id is not None:
request.user.primary_email = email
try:
db.session.commit()
except IntegrityError:
flash(_('E-Mail address is already used by another account'))
return redirect(url_for('selfservice.index'))
flash(_('E-Mail address verified'))
return redirect(url_for('selfservice.index'))
@bp.route("/email/<int:email_id>/retry")
@login_required(selfservice_acl_check)
def retry_email_verification(email_id):
email = UserEmail.query.filter_by(id=email_id, user=request.user, verified=False).first_or_404()
secret = email.start_verification()
db.session.commit()
if not sendmail(email.address, 'E-Mail verification', 'selfservice/mailverification.mail.txt', user=request.user, email=email, secret=secret):
flash(_('E-Mail to "%(mail_address)s" could not be sent!', mail_address=email.address))
else:
flash(_('We sent you an email, please verify your mail address.'))
return redirect(url_for('selfservice.index'))
@bp.route("/email/<int:email_id>/delete", methods=['POST', 'GET'])
@login_required(selfservice_acl_check)
def delete_email(email_id):
email = UserEmail.query.filter_by(id=email_id, user=request.user).first_or_404()
try:
db.session.delete(email)
db.session.commit()
except IntegrityError:
flash(_('Cannot delete primary e-mail address'))
return redirect(url_for('selfservice.index'))
flash(_('E-Mail address deleted'))
return redirect(url_for('selfservice.index'))
@bp.route("/email/preferences", methods=['POST'])
@login_required(selfservice_acl_check)
def update_email_preferences():
verified_emails = UserEmail.query.filter_by(user=request.user, verified=True)
request.user.primary_email = verified_emails.filter_by(id=request.form['primary_email']).one()
if request.form['recovery_email'] == 'primary':
request.user.recovery_email = None
else:
request.user.recovery_email = verified_emails.filter_by(id=request.form['recovery_email']).one()
for service_user in request.user.service_users:
if not service_user.has_email_preferences:
continue
value = request.form.get(f'service_{service_user.service.id}_email', 'primary')
if value == 'primary':
service_user.service_email = None
else:
service_user.service_email = verified_emails.filter_by(id=value).one()
db.session.commit()
flash(_('E-Mail preferences updated'))
return redirect(url_for('selfservice.index'))
@bp.route("/session/<int:session_id>/revoke", methods=(['POST']))
@csrf_protect(blueprint=bp)
@login_required(selfservice_acl_check)
def revoke_session(session_id):
_session = Session.query.filter_by(id=session_id, user=request.user).first_or_404()
db.session.delete(_session)
db.session.commit()
flash(_('Session revoked'))
return redirect(url_for('selfservice.index'))
@bp.route("/leaverole/<int:roleid>", methods=(['POST']))
@csrf_protect(blueprint=bp)
@login_required(selfservice_acl_check)
def leave_role(roleid):
role = Role.query.get_or_404(roleid)
role.members.remove(request.user)
request.user.update_groups()
db.session.commit()
flash(_('You left role %(role_name)s', role_name=role.name))
return redirect(url_for('selfservice.index'))
def send_passwordreset(user, new=False):
PasswordToken.query.filter(PasswordToken.user == user).delete()
token = PasswordToken(user=user)
db.session.add(token)
db.session.commit()
if new:
template = 'selfservice/newuser.mail.txt'
subject = 'Welcome to the %s infrastructure'%current_app.config.get('ORGANISATION_NAME', '')
else:
template = 'selfservice/passwordreset.mail.txt'
subject = 'Password reset'
email = user.recovery_email or user.primary_email
if not sendmail(email.address, subject, template, user=user, token=token):
flash(_('E-Mail to "%(mail_address)s" could not be sent!', mail_address=email.address))
@bp.route('/mfa/', methods=['GET'])
@login_required(selfservice_acl_check)
def setup_mfa():
return render_template('selfservice/setup_mfa.html')
@bp.route('/mfa/setup/disable', methods=['GET'])
@login_required(selfservice_acl_check)
def disable_mfa():
return render_template('selfservice/disable_mfa.html')
@bp.route('/mfa/setup/disable', methods=['POST'])
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def disable_mfa_confirm():
MFAMethod.query.filter_by(user=request.user).delete()
db.session.commit()
request.user.update_groups()
db.session.commit()
return redirect(url_for('selfservice.setup_mfa'))
@bp.route('/mfa/setup/recovery', methods=['POST'])
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def setup_mfa_recovery():
for method in RecoveryCodeMethod.query.filter_by(user=request.user).all():
db.session.delete(method)
methods = []
for _ in range(10):
method = RecoveryCodeMethod(request.user)
methods.append(method)
db.session.add(method)
db.session.commit()
return render_template('selfservice/setup_mfa_recovery.html', methods=methods)
@bp.route('/mfa/setup/totp', methods=['GET'])
@login_required(selfservice_acl_check)
def setup_mfa_totp():
method = TOTPMethod(request.user)
session['mfa_totp_key'] = method.key
return render_template('selfservice/setup_mfa_totp.html', method=method, name=request.values['name'])
@bp.route('/mfa/setup/totp', methods=['POST'])
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def setup_mfa_totp_finish():
if not RecoveryCodeMethod.query.filter_by(user=request.user).all():
flash(_('Generate recovery codes first!'))
return redirect(url_for('selfservice.setup_mfa'))
method = TOTPMethod(request.user, name=request.values['name'], key=session.pop('mfa_totp_key'))
if method.verify(request.form['code']):
db.session.add(method)
request.user.update_groups()
db.session.commit()
return redirect(url_for('selfservice.setup_mfa'))
flash(_('Code is invalid'))
return redirect(url_for('selfservice.setup_mfa_totp', name=request.values['name']))
@bp.route('/mfa/setup/totp/<int:id>/delete')
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def delete_mfa_totp(id): #pylint: disable=redefined-builtin
method = TOTPMethod.query.filter_by(user=request.user, id=id).first_or_404()
db.session.delete(method)
request.user.update_groups()
db.session.commit()
return redirect(url_for('selfservice.setup_mfa'))
bp.add_app_template_global(WEBAUTHN_SUPPORTED, name='webauthn_supported')
if WEBAUTHN_SUPPORTED:
@bp.route('/mfa/setup/webauthn/begin', methods=['POST'])
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def setup_mfa_webauthn_begin():
if not RecoveryCodeMethod.query.filter_by(user=request.user).all():
abort(403)
methods = WebauthnMethod.query.filter_by(user=request.user).all()
creds = [method.cred for method in methods]
server = get_webauthn_server()
registration_data, state = server.register_begin(
{
"id": str(request.user.id).encode(),
"name": request.user.loginname,
"displayName": request.user.displayname,
},
creds,
user_verification='discouraged',
)
session["webauthn-state"] = state
return cbor.encode(registration_data)
@bp.route('/mfa/setup/webauthn/complete', methods=['POST'])
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def setup_mfa_webauthn_complete():
server = get_webauthn_server()
data = cbor.decode(request.get_data())
client_data = ClientData(data["clientDataJSON"])
att_obj = AttestationObject(data["attestationObject"])
auth_data = server.register_complete(session["webauthn-state"], client_data, att_obj)
method = WebauthnMethod(request.user, auth_data.credential_data, name=data['name'])
db.session.add(method)
request.user.update_groups()
db.session.commit()
return cbor.encode({"status": "OK"})
@bp.route('/mfa/setup/webauthn/<int:id>/delete')
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def delete_mfa_webauthn(id): #pylint: disable=redefined-builtin
method = WebauthnMethod.query.filter_by(user=request.user, id=id).first_or_404()
db.session.delete(method)
request.user.update_groups()
db.session.commit()
return redirect(url_for('selfservice.setup_mfa'))
import functools
from flask import Blueprint, render_template, request, url_for, redirect, current_app, abort
from flask_babel import lazy_gettext
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.database import db
from uffd.models import User, Service, ServiceUser, get_services, Group, OAuth2Client, OAuth2LogoutURI, APIClient, RemailerMode
from .session import login_required
bp = Blueprint('service', __name__, template_folder='templates')
bp.add_app_template_global(RemailerMode, 'RemailerMode')
def admin_acl():
return request.user and request.user.is_in_group(current_app.config['ACL_ADMIN_GROUP'])
def overview_login_maybe_required(func):
@functools.wraps(func)
def decorator(*args, **kwargs):
if not current_app.config['SERVICES']:
return login_required(admin_acl)(func)(*args, **kwargs)
if not current_app.config['SERVICES_PUBLIC']:
return login_required()(func)(*args, **kwargs)
return func(*args, **kwargs)
return decorator
def overview_navbar_visible():
return get_services(request.user) or admin_acl()
@bp.route('/services/')
@register_navbar(lazy_gettext('Services'), icon='sitemap', blueprint=bp, visible=overview_navbar_visible)
@overview_login_maybe_required
def overview():
services = get_services(request.user)
banner = ''
if request.user or current_app.config['SERVICES_BANNER_PUBLIC']:
banner = current_app.config['SERVICES_BANNER']
return render_template('service/overview.html', services=services, banner=banner)
@bp.route('/service/admin')
@login_required(admin_acl)
def index():
return render_template('service/index.html', services=Service.query.all())
@bp.route('/service/new')
@bp.route('/service/<int:id>')
@login_required(admin_acl)
def show(id=None):
service = Service() if id is None else Service.query.get_or_404(id)
remailer_overwrites = []
if id is not None:
# pylint: disable=singleton-comparison
remailer_overwrites = ServiceUser.query.filter(
ServiceUser.service_id == id,
ServiceUser.remailer_overwrite_mode != None
).all()
all_groups = Group.query.all()
return render_template('service/show.html', service=service, all_groups=all_groups, remailer_overwrites=remailer_overwrites)
@bp.route('/service/new', methods=['POST'])
@bp.route('/service/<int:id>', methods=['POST'])
@csrf_protect(blueprint=bp)
@login_required(admin_acl)
def edit_submit(id=None):
if id is None:
service = Service()
db.session.add(service)
else:
service = Service.query.get_or_404(id)
service.name = request.form['name']
if not request.form['access-group']:
service.limit_access = True
service.access_group = None
elif request.form['access-group'] == 'all':
service.limit_access = False
service.access_group = None
else:
service.limit_access = True
service.access_group = Group.query.get(request.form['access-group'])
service.hide_deactivated_users = request.form.get('hide_deactivated_users') == '1'
service.enable_email_preferences = request.form.get('enable_email_preferences') == '1'
service.remailer_mode = RemailerMode[request.form['remailer-mode']]
remailer_overwrite_mode = RemailerMode[request.form['remailer-overwrite-mode']]
remailer_overwrite_user_ids = [
User.query.filter_by(loginname=loginname.strip()).one().id
for loginname in request.form['remailer-overwrite-users'].split(',') if loginname.strip()
]
# pylint: disable=singleton-comparison
service_users = ServiceUser.query.filter(
ServiceUser.service == service,
db.or_(
ServiceUser.user_id.in_(remailer_overwrite_user_ids),
ServiceUser.remailer_overwrite_mode != None,
)
)
for service_user in service_users:
if service_user.user_id in remailer_overwrite_user_ids:
service_user.remailer_overwrite_mode = remailer_overwrite_mode
else:
service_user.remailer_overwrite_mode = None
db.session.commit()
return redirect(url_for('service.show', id=service.id))
@bp.route('/service/<int:id>/delete')
@csrf_protect(blueprint=bp)
@login_required(admin_acl)
def delete(id):
service = Service.query.get_or_404(id)
db.session.delete(service)
db.session.commit()
return redirect(url_for('service.index'))
@bp.route('/service/<int:service_id>/oauth2/new')
@bp.route('/service/<int:service_id>/oauth2/<int:db_id>')
@login_required(admin_acl)
def oauth2_show(service_id, db_id=None):
service = Service.query.get_or_404(service_id)
client = OAuth2Client() if db_id is None else OAuth2Client.query.filter_by(service_id=service_id, db_id=db_id).first_or_404()
return render_template('service/oauth2.html', service=service, client=client)
@bp.route('/service/<int:service_id>/oauth2/new', methods=['POST'])
@bp.route('/service/<int:service_id>/oauth2/<int:db_id>', methods=['POST'])
@csrf_protect(blueprint=bp)
@login_required(admin_acl)
def oauth2_submit(service_id, db_id=None):
service = Service.query.get_or_404(service_id)
if db_id is None:
client = OAuth2Client(service=service)
db.session.add(client)
else:
client = OAuth2Client.query.filter_by(service_id=service_id, db_id=db_id).first_or_404()
client.client_id = request.form['client_id']
if request.form['client_secret']:
client.client_secret = request.form['client_secret']
if not client.client_secret:
abort(400)
client.redirect_uris = [x.strip() for x in request.form['redirect_uris'].split('\n') if x.strip()]
client.logout_uris = []
for line in request.form['logout_uris'].split('\n'):
line = line.strip()
if not line:
continue
method, uri = line.split(' ', 2)
client.logout_uris.append(OAuth2LogoutURI(method=method, uri=uri))
db.session.commit()
return redirect(url_for('service.show', id=service.id))
@bp.route('/service/<int:service_id>/oauth2/<int:db_id>/delete')
@csrf_protect(blueprint=bp)
@login_required(admin_acl)
def oauth2_delete(service_id, db_id=None):
service = Service.query.get_or_404(service_id)
client = OAuth2Client.query.filter_by(service_id=service_id, db_id=db_id).first_or_404()
db.session.delete(client)
db.session.commit()
return redirect(url_for('service.show', id=service.id))
@bp.route('/service/<int:service_id>/api/new')
@bp.route('/service/<int:service_id>/api/<int:id>')
@login_required(admin_acl)
def api_show(service_id, id=None):
service = Service.query.get_or_404(service_id)
client = APIClient() if id is None else APIClient.query.filter_by(service_id=service_id, id=id).first_or_404()
return render_template('service/api.html', service=service, client=client)
@bp.route('/service/<int:service_id>/api/new', methods=['POST'])
@bp.route('/service/<int:service_id>/api/<int:id>', methods=['POST'])
@csrf_protect(blueprint=bp)
@login_required(admin_acl)
def api_submit(service_id, id=None):
service = Service.query.get_or_404(service_id)
if id is None:
client = APIClient(service=service)
db.session.add(client)
else:
client = APIClient.query.filter_by(service_id=service_id, id=id).first_or_404()
client.auth_username = request.form['auth_username']
if request.form['auth_password']:
client.auth_password = request.form['auth_password']
if not client.auth_password:
abort(400)
client.perm_users = request.form.get('perm_users') == '1'
client.perm_checkpassword = request.form.get('perm_checkpassword') == '1'
client.perm_mail_aliases = request.form.get('perm_mail_aliases') == '1'
client.perm_remailer = request.form.get('perm_remailer') == '1'
client.perm_metrics = request.form.get('perm_metrics') == '1'
db.session.commit()
return redirect(url_for('service.show', id=service.id))
@bp.route('/service/<int:service_id>/api/<int:id>/delete')
@csrf_protect(blueprint=bp)
@login_required(admin_acl)
def api_delete(service_id, id=None):
service = Service.query.get_or_404(service_id)
client = APIClient.query.filter_by(service_id=service_id, id=id).first_or_404()
db.session.delete(client)
db.session.commit()
return redirect(url_for('service.show', id=service.id))
import datetime
import secrets
import functools
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session, abort
from flask_babel import gettext as _
from uffd.database import db
from uffd.csrf import csrf_protect
from uffd.secure_redirect import secure_local_redirect
from uffd.models import User, DeviceLoginInitiation, DeviceLoginConfirmation, Ratelimit, host_ratelimit, format_delay, Session
from uffd.fido2_compat import * # pylint: disable=wildcard-import,unused-wildcard-import
bp = Blueprint("session", __name__, template_folder='templates', url_prefix='/')
login_ratelimit = Ratelimit('login', 1*60, 3)
mfa_ratelimit = Ratelimit('mfa', 1*60, 3)
@bp.before_app_request
def set_request_user():
request.user = None
request.user_pre_mfa = None
request.session = None
request.session_pre_mfa = None
if 'id' not in session:
return
if 'secret' not in session:
return
_session = Session.query.get(session['id'])
if _session is None or not _session.secret.verify(session['secret']) or _session.expired:
return
if _session.last_used <= datetime.datetime.utcnow() - datetime.timedelta(seconds=60):
_session.last_used = datetime.datetime.utcnow()
_session.ip_address = request.remote_addr
_session.user_agent = request.user_agent.string
db.session.commit()
if _session.user.is_deactivated or not _session.user.is_in_group(current_app.config['ACL_ACCESS_GROUP']):
return
request.session_pre_mfa = _session
request.user_pre_mfa = _session.user
if _session.mfa_done:
request.session = _session
request.user = _session.user
@bp.route("/logout")
def logout():
# The oauth2 module takes data from `session` and injects it into the url,
# so we need to build the url BEFORE we clear the session!
resp = redirect(url_for('oauth2.logout', ref=request.values.get('ref', url_for('.login'))))
if request.session_pre_mfa:
db.session.delete(request.session_pre_mfa)
db.session.commit()
session.clear()
return resp
def set_session(user, skip_mfa=False):
session.clear()
session.permanent = True
secret = secrets.token_hex(128)
_session = Session(
user=user,
secret=secret,
ip_address=request.remote_addr,
user_agent=request.user_agent.string,
)
if skip_mfa:
_session.mfa_done = True
db.session.add(_session)
db.session.commit()
session['id'] = _session.id
session['secret'] = secret
session['_csrf_token'] = secrets.token_hex(128)
@bp.route("/login", methods=('GET', 'POST'))
def login():
# pylint: disable=too-many-return-statements
if request.user_pre_mfa:
return redirect(url_for('session.mfa_auth', ref=request.values.get('ref', url_for('index'))))
if request.method == 'GET':
return render_template('session/login.html', ref=request.values.get('ref'))
username = request.form['loginname'].lower()
password = request.form['password']
login_delay = login_ratelimit.get_delay(username)
host_delay = host_ratelimit.get_delay()
if login_delay or host_delay:
if login_delay > host_delay:
flash(_('We received too many invalid login attempts for this user! Please wait at least %(delay)s.', delay=format_delay(login_delay)))
else:
flash(_('We received too many requests from your ip address/network! Please wait at least %(delay)s.', delay=format_delay(host_delay)))
return render_template('session/login.html', ref=request.values.get('ref'))
user = User.query.filter_by(loginname=username).one_or_none()
if user is None or not user.password.verify(password):
login_ratelimit.log(username)
host_ratelimit.log()
flash(_('Login name or password is wrong'))
return render_template('session/login.html', ref=request.values.get('ref'))
if user.is_deactivated:
flash(_('Your account is deactivated. Contact %(contact_email)s for details.', contact_email=current_app.config['ORGANISATION_CONTACT']))
return render_template('session/login.html', ref=request.values.get('ref'))
if user.password.needs_rehash:
user.password = password
db.session.commit()
if not user.is_in_group(current_app.config['ACL_ACCESS_GROUP']):
flash(_('You do not have access to this service'))
return render_template('session/login.html', ref=request.values.get('ref'))
set_session(user)
return redirect(url_for('session.mfa_auth', ref=request.values.get('ref', url_for('index'))))
def login_required_pre_mfa(no_redirect=False):
def wrapper(func):
@functools.wraps(func)
def decorator(*args, **kwargs):
if not request.user_pre_mfa:
if no_redirect:
abort(403)
flash(_('You need to login first'))
return redirect(url_for('session.login', ref=request.full_path))
return func(*args, **kwargs)
return decorator
return wrapper
def login_required(permission_check=lambda: True):
def wrapper(func):
@functools.wraps(func)
def decorator(*args, **kwargs):
if not request.user_pre_mfa:
flash(_('You need to login first'))
return redirect(url_for('session.login', ref=request.full_path))
if not request.user:
return redirect(url_for('session.mfa_auth', ref=request.full_path))
if not permission_check():
abort(403)
return func(*args, **kwargs)
return decorator
return wrapper
@bp.route('/mfa/auth', methods=['GET'])
@login_required_pre_mfa()
def mfa_auth():
if not request.user_pre_mfa.mfa_enabled:
request.session_pre_mfa.mfa_done = True
db.session.commit()
set_request_user()
if request.session_pre_mfa.mfa_done:
return secure_local_redirect(request.values.get('ref', url_for('index')))
return render_template('session/mfa_auth.html', ref=request.values.get('ref'))
@bp.route('/mfa/auth', methods=['POST'])
@login_required_pre_mfa()
def mfa_auth_finish():
delay = mfa_ratelimit.get_delay(request.user_pre_mfa.id)
if delay:
flash(_('We received too many invalid attempts! Please wait at least %s.')%format_delay(delay))
return redirect(url_for('session.mfa_auth', ref=request.values.get('ref')))
for method in request.user_pre_mfa.mfa_totp_methods:
if method.verify(request.form['code']):
request.session_pre_mfa.mfa_done = True
db.session.commit()
set_request_user()
return secure_local_redirect(request.values.get('ref', url_for('index')))
for method in request.user_pre_mfa.mfa_recovery_codes:
if method.verify(request.form['code']):
db.session.delete(method)
request.session_pre_mfa.mfa_done = True
db.session.commit()
set_request_user()
if len(request.user_pre_mfa.mfa_recovery_codes) <= 1:
flash(_('You have exhausted your recovery codes. Please generate new ones now!'))
return redirect(url_for('selfservice.setup_mfa'))
if len(request.user_pre_mfa.mfa_recovery_codes) <= 5:
flash(_('You only have a few recovery codes remaining. Make sure to generate new ones before they run out.'))
return redirect(url_for('selfservice.setup_mfa'))
return secure_local_redirect(request.values.get('ref', url_for('index')))
mfa_ratelimit.log(request.user_pre_mfa.id)
flash(_('Two-factor authentication failed'))
return redirect(url_for('session.mfa_auth', ref=request.values.get('ref')))
if WEBAUTHN_SUPPORTED:
@bp.route("/mfa/auth/webauthn/begin", methods=["POST"])
@login_required_pre_mfa(no_redirect=True)
def mfa_auth_webauthn_begin():
server = get_webauthn_server()
creds = [method.cred for method in request.user_pre_mfa.mfa_webauthn_methods]
if not creds:
abort(404)
auth_data, state = server.authenticate_begin(creds, user_verification='discouraged')
session["webauthn-state"] = state
return cbor.encode(auth_data)
@bp.route("/mfa/auth/webauthn/complete", methods=["POST"])
@login_required_pre_mfa(no_redirect=True)
def mfa_auth_webauthn_complete():
server = get_webauthn_server()
creds = [method.cred for method in request.user_pre_mfa.mfa_webauthn_methods]
if not creds:
abort(404)
data = cbor.decode(request.get_data())
credential_id = data["credentialId"]
client_data = ClientData(data["clientDataJSON"])
auth_data = AuthenticatorData(data["authenticatorData"])
signature = data["signature"]
# authenticate_complete() (as of python-fido2 v0.5.0, the version in Debian Buster)
# does not check signCount, although the spec recommends it
server.authenticate_complete(
session.pop("webauthn-state"),
creds,
credential_id,
client_data,
auth_data,
signature,
)
request.session_pre_mfa.mfa_done = True
db.session.commit()
set_request_user()
return cbor.encode({"status": "OK"})
@bp.route("/login/device/start")
def devicelogin_start():
session['devicelogin_started'] = True
return secure_local_redirect(request.values['ref'])
@bp.route("/login/device")
def devicelogin():
if 'devicelogin_id' not in session or 'devicelogin_secret' not in session:
return redirect(url_for('session.login', ref=request.values['ref'], devicelogin=True))
initiation = DeviceLoginInitiation.query.filter_by(id=session['devicelogin_id'], secret=session['devicelogin_secret']).one_or_none()
if not initiation or initiation.expired:
flash(_('Initiation code is no longer valid'))
return redirect(url_for('session.login', ref=request.values['ref'], devicelogin=True))
return render_template('session/devicelogin.html', ref=request.values.get('ref'), initiation=initiation)
@bp.route("/login/device", methods=['POST'])
def devicelogin_submit():
if 'devicelogin_id' not in session or 'devicelogin_secret' not in session:
return redirect(url_for('session.login', ref=request.values['ref'], devicelogin=True))
initiation = DeviceLoginInitiation.query.filter_by(id=session['devicelogin_id'], secret=session['devicelogin_secret']).one_or_none()
if not initiation or initiation.expired:
flash(_('Initiation code is no longer valid'))
return redirect(url_for('session.login', ref=request.values['ref'], devicelogin=True))
confirmation = DeviceLoginConfirmation.query.filter_by(initiation=initiation, code=request.form['confirmation-code']).one_or_none()
if confirmation is None:
flash(_('Invalid confirmation code'))
return render_template('session/devicelogin.html', ref=request.values.get('ref'), initiation=initiation)
session['devicelogin_confirmation'] = confirmation.id
return secure_local_redirect(request.values['ref'])
@bp.route("/device")
@login_required()
def deviceauth():
if 'initiation-code' not in request.values:
return render_template('session/deviceauth.html')
initiation = DeviceLoginInitiation.query.filter_by(code=request.values['initiation-code']).one_or_none()
if initiation is None or initiation.expired:
flash(_('Invalid initiation code'))
return redirect(url_for('session.deviceauth'))
return render_template('session/deviceauth.html', initiation=initiation)
@bp.route("/device", methods=['POST'])
@login_required()
@csrf_protect(blueprint=bp)
def deviceauth_submit():
DeviceLoginConfirmation.query.filter_by(session=request.session).delete()
initiation = DeviceLoginInitiation.query.filter_by(code=request.form['initiation-code']).one_or_none()
if initiation is None or initiation.expired:
flash(_('Invalid initiation code'))
return redirect(url_for('session.deviceauth'))
confirmation = DeviceLoginConfirmation(session=request.session, initiation=initiation)
db.session.add(confirmation)
db.session.commit()
return render_template('session/deviceauth.html', initiation=initiation, confirmation=confirmation)
@bp.route("/device/finish", methods=['GET', 'POST'])
@login_required()
def deviceauth_finish():
DeviceLoginConfirmation.query.filter_by(session=request.session).delete()
db.session.commit()
return redirect(url_for('index'))
import functools
import secrets
from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, jsonify
from flask_babel import gettext as _
from uffd.database import db
from uffd.ldap import ldap
from uffd.session import set_session
from uffd.user.models import User
from uffd.sendmail import sendmail
from uffd.signup.models import Signup
from uffd.ratelimit import Ratelimit, host_ratelimit, format_delay
from uffd.database import db
from uffd.models import User, Signup, Ratelimit, host_ratelimit, format_delay
from .session import set_session
bp = Blueprint('signup', __name__, template_folder='templates', url_prefix='/signup/')
......@@ -19,7 +18,7 @@ def signup_enabled(func):
@functools.wraps(func)
def decorator(*args, **kwargs):
if not current_app.config['SELF_SIGNUP']:
flash('Singup not enabled')
flash(_('Signup not enabled'))
return redirect(url_for('index'))
return func(*args, **kwargs)
return decorator
......@@ -45,58 +44,71 @@ def signup_check():
@signup_enabled
def signup_submit():
if request.form['password1'] != request.form['password2']:
return render_template('signup/start.html', error='Passwords do not match')
flash(_('Passwords do not match'), 'error')
return render_template('signup/start.html')
signup_delay = signup_ratelimit.get_delay(request.form['mail'])
host_delay = host_ratelimit.get_delay()
if signup_delay and signup_delay > host_delay:
return render_template('signup/start.html', error='Too many signup requests with this mail address! Please wait %s.'%format_delay(signup_delay))
flash(_('Too many signup requests with this mail address! Please wait %(delay)s.',
delay=format_delay(signup_delay)), 'error')
return render_template('signup/start.html')
if host_delay:
return render_template('signup/start.html', error='Too many requests! Please wait %s.'%format_delay(host_delay))
flash(_('Too many requests! Please wait %(delay)s.', delay=format_delay(host_delay)), 'error')
return render_template('signup/start.html')
host_ratelimit.log()
signup = Signup(loginname=request.form['loginname'],
displayname=request.form['displayname'],
mail=request.form['mail'], password=request.form['password1'])
mail=request.form['mail'])
# If the password is invalid, signup.set_password returns False and does not
# set signup.password. We don't need to check the return value here, because
# we call signup.verify next and that checks if signup.password is set.
signup.set_password(request.form['password1'])
valid, msg = signup.validate()
if not valid:
return render_template('signup/start.html', error=msg)
flash(msg, 'error')
return render_template('signup/start.html')
db.session.add(signup)
db.session.commit()
sent = sendmail(signup.mail, 'Confirm your mail address', 'signup/mail.txt', signup=signup)
if not sent:
return render_template('signup/start.html', error='Cound not send mail')
flash(_('Could not send mail'), 'error')
return render_template('signup/start.html')
signup_ratelimit.log(request.form['mail'])
return render_template('signup/submitted.html', signup=signup)
# signup_confirm* views are always accessible so other modules (e.g. invite) can reuse them
@bp.route('/confirm/<token>')
def signup_confirm(token):
signup = Signup.query.get(token)
if not signup or signup.expired or signup.completed:
flash('Invalid signup link')
@bp.route('/confirm/<int:signup_id>/<token>')
def signup_confirm(signup_id, token):
signup = Signup.query.get(signup_id)
if not signup or not secrets.compare_digest(signup.token, token) or signup.expired or signup.completed:
flash(_('Invalid signup link'))
return redirect(url_for('index'))
return render_template('signup/confirm.html', signup=signup)
@bp.route('/confirm/<token>', methods=['POST'])
def signup_confirm_submit(token):
signup = Signup.query.get(token)
if not signup or signup.expired or signup.completed:
flash('Invalid signup link')
@bp.route('/confirm/<int:signup_id>/<token>', methods=['POST'])
def signup_confirm_submit(signup_id, token):
signup = Signup.query.get(signup_id)
if not signup or not secrets.compare_digest(signup.token, token) or signup.expired or signup.completed:
flash(_('Invalid signup link'))
return redirect(url_for('index'))
confirm_delay = confirm_ratelimit.get_delay(token)
host_delay = host_ratelimit.get_delay()
if confirm_delay and confirm_delay > host_delay:
return render_template('signup/confirm.html', signup=signup, error='Too many failed attempts! Please wait %s.'%format_delay(confirm_delay))
flash(_('Too many failed attempts! Please wait %(delay)s.', delay=format_delay(confirm_delay)), 'error')
return render_template('signup/confirm.html', signup=signup)
if host_delay:
return render_template('signup/confirm.html', signup=signup, error='Too many requests! Please wait %s.'%format_delay(host_delay))
if not signup.check_password(request.form['password']):
return render_template('signup/confirm.html', signup=signup)
if not signup.password.verify(request.form['password']):
host_ratelimit.log()
confirm_ratelimit.log(token)
return render_template('signup/confirm.html', signup=signup, error='Wrong password')
flash(_('Wrong password'), 'error')
return render_template('signup/confirm.html', signup=signup)
user, msg = signup.finish(request.form['password'])
if user is None:
return render_template('signup/confirm.html', signup=signup, error=msg)
db.session.rollback()
flash(msg, 'error')
return render_template('signup/confirm.html', signup=signup)
db.session.commit()
ldap.session.commit()
set_session(user, skip_mfa=True)
flash('Your account was successfully created')
return redirect(url_for('selfservice.index'))
flash(_('Your account was successfully created'))
return redirect(url_for('index'))
import csv
import io
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app
from flask_babel import gettext as _, lazy_gettext
from sqlalchemy.exc import IntegrityError
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.remailer import remailer
from uffd.database import db
from uffd.models import User, UserEmail, Role, MFAMethod
from .selfservice import send_passwordreset
from .session import login_required
bp = Blueprint("user", __name__, template_folder='templates', url_prefix='/user/')
bp.add_app_template_global(User, 'User')
bp.add_app_template_global(remailer, 'remailer')
def user_acl_check():
return request.user and request.user.is_in_group(current_app.config['ACL_ADMIN_GROUP'])
@bp.before_request
@login_required(user_acl_check)
def user_acl():
pass
@bp.route("/")
@register_navbar(lazy_gettext('Users'), icon='users', blueprint=bp, visible=user_acl_check)
def index():
return render_template('user/list.html', users=User.query.all())
@bp.route("/<int:id>")
@bp.route("/new")
def show(id=None):
user = User() if id is None else User.query.get_or_404(id)
return render_template('user/show.html', user=user, roles=Role.query.all())
@bp.route("/new", methods=['POST'])
@csrf_protect(blueprint=bp)
def create():
user = User()
if request.form.get('serviceaccount'):
user.is_service_user = True
ignore_blocklist = request.form.get('ignore-loginname-blocklist', False)
if not user.set_loginname(request.form['loginname'], ignore_blocklist=ignore_blocklist):
flash(_('Login name does not meet requirements'))
return redirect(url_for('user.show'))
if not user.set_primary_email_address(request.form['email']):
flash(_('E-Mail address is invalid'))
return redirect(url_for('user.show'))
new_displayname = request.form['displayname'] if request.form['displayname'] else request.form['loginname']
if user.displayname != new_displayname and not user.set_displayname(new_displayname):
flash(_('Display name does not meet requirements'))
return redirect(url_for('user.show'))
db.session.add(user)
try:
db.session.flush()
except IntegrityError:
flash(_('Login name or e-mail address is already in use'))
return redirect(url_for('user.show'))
for role in Role.query.all():
if not user.is_service_user and role.is_default:
continue
if request.values.get('role-{}'.format(role.id), False):
user.roles.append(role)
user.update_groups()
db.session.commit()
if user.is_service_user:
flash(_('Service user created'))
else:
send_passwordreset(user, new=True)
flash(_('User created. We sent the user a password reset link by e-mail'))
return redirect(url_for('user.show', id=user.id))
@bp.route("/<int:id>/update", methods=['POST'])
@csrf_protect(blueprint=bp)
def update(id):
# pylint: disable=too-many-branches,too-many-statements
user = User.query.get_or_404(id)
for email in user.all_emails:
if f'email-{email.id}-present' in request.form:
email.verified = email.verified or (request.form.get(f'email-{email.id}-verified') == '1')
for key, value in request.form.items():
parts = key.split('-')
if not parts[0] == 'newemail' or not parts[2] == 'address' or not value:
continue
tmp_id = parts[1]
email = UserEmail(
user=user,
verified=(request.form.get(f'newemail-{tmp_id}-verified') == '1'),
)
if not email.set_address(value):
flash(_('E-Mail address is invalid'))
return redirect(url_for('user.show', id=id))
db.session.add(email)
try:
db.session.flush()
except IntegrityError:
flash(_('E-Mail address already exists or is used by another account'))
return redirect(url_for('user.show', id=id))
verified_emails = UserEmail.query.filter_by(user=user, verified=True)
user.primary_email = verified_emails.filter_by(id=request.form['primary_email']).one()
if request.form['recovery_email'] == 'primary':
user.recovery_email = None
else:
user.recovery_email = verified_emails.filter_by(id=request.form['recovery_email']).one()
for service_user in user.service_users:
if not service_user.has_email_preferences:
continue
value = request.form.get(f'service_{service_user.service.id}_email', 'primary')
if value == 'primary':
service_user.service_email = None
else:
service_user.service_email = verified_emails.filter_by(id=value).one()
for email in user.all_emails:
if request.form.get(f'email-{email.id}-delete') == '1':
db.session.delete(email)
new_displayname = request.form['displayname'] if request.form['displayname'] else request.form['loginname']
if user.displayname != new_displayname and not user.set_displayname(new_displayname):
flash(_('Display name does not meet requirements'))
return redirect(url_for('user.show', id=id))
new_password = request.form.get('password')
if new_password:
if not user.set_password(new_password):
flash(_('Password is invalid'))
return redirect(url_for('user.show', id=id))
user.roles.clear()
for role in Role.query.all():
if not user.is_service_user and role.is_default:
continue
if request.values.get('role-{}'.format(role.id), False):
user.roles.append(role)
user.update_groups()
db.session.commit()
flash(_('User updated'))
return redirect(url_for('user.show', id=user.id))
@bp.route('/<int:id>/deactivate')
@csrf_protect(blueprint=bp)
def deactivate(id):
user = User.query.get_or_404(id)
user.is_deactivated = True
db.session.commit()
flash(_('User deactivated'))
return redirect(url_for('user.show', id=user.id))
@bp.route('/<int:id>/activate')
@csrf_protect(blueprint=bp)
def activate(id):
user = User.query.get_or_404(id)
user.is_deactivated = False
db.session.commit()
flash(_('User activated'))
return redirect(url_for('user.show', id=user.id))
@bp.route('/<int:id>/mfa/disable')
@csrf_protect(blueprint=bp)
def disable_mfa(id):
user = User.query.get_or_404(id)
MFAMethod.query.filter_by(user=user).delete()
user.update_groups()
db.session.commit()
flash(_('Two-factor authentication was reset'))
return redirect(url_for('user.show', id=id))
@bp.route('/<int:id>/sessions/revoke')
@csrf_protect(blueprint=bp)
def revoke_sessions(id):
user = User.query.get_or_404(id)
user.sessions.clear()
db.session.commit()
flash(_('Sessions revoked'))
return redirect(url_for('user.show', id=user.id))
@bp.route("/<int:id>/del")
@csrf_protect(blueprint=bp)
def delete(id):
user = User.query.get_or_404(id)
user.roles.clear()
db.session.delete(user)
db.session.commit()
flash(_('Deleted user'))
return redirect(url_for('user.index'))
@bp.route("/csv", methods=['POST'])
@csrf_protect(blueprint=bp)
def csvimport():
csvdata = request.values.get('csv')
if not csvdata:
flash('No data for csv import!')
return redirect(url_for('user.index'))
ignore_blocklist = request.values.get('ignore-loginname-blocklist', False)
roles = Role.query.filter_by(is_default=False).all()
usersadded = 0
with io.StringIO(initial_value=csvdata) as csvfile:
csvreader = csv.reader(csvfile)
for row in csvreader:
if not len(row) == 3:
flash("invalid line, ignored : {}".format(row))
continue
newuser = User()
if not newuser.set_loginname(row[0], ignore_blocklist=ignore_blocklist) or not newuser.set_displayname(row[0]):
flash("invalid login name, skipped : {}".format(row))
continue
if not newuser.set_primary_email_address(row[1]):
flash("invalid e-mail address, skipped : {}".format(row))
continue
db.session.add(newuser)
for role in roles:
if str(role.id) in row[2].split(';'):
role.members.append(newuser)
newuser.update_groups()
try:
db.session.commit()
except IntegrityError:
flash('Error adding user {}'.format(row[0]))
db.session.rollback()
continue
send_passwordreset(newuser, new=True)
usersadded += 1
flash('Added {} new users'.format(usersadded))
return redirect(url_for('user.index'))
#!/bin/sh
set -e
pybabel extract -F uffd/babel.cfg -k lazy_gettext -o messages.pot uffd
# If you want to initialize a new message, use:
# pybabel init -i messages.pot -d uffd/translations -l fr
# Complete Documentation of Flask-Babel: https://flask-babel.tkte.ch
pybabel update -i messages.pot -d uffd/translations
pybabel compile -d uffd/translations
if [ -n "$1" ]; then
NUM_EMPTY="$(tr '\n' '|' < uffd/translations/$1/LC_MESSAGES/messages.po | sed 's/msgstr ""|/empty/g' | tr '|' '\n' | grep '^empty$' | wc -l)"
NUM_TOTAL="$(grep '^msgid' uffd/translations/$1/LC_MESSAGES/messages.po | wc -l)"
# Emulate python-coverage output
echo "TOTAL $NUM_TOTAL $(( $NUM_TOTAL - $NUM_EMPTY )) $(( 100 * ($NUM_TOTAL - $NUM_EMPTY) / $NUM_TOTAL ))%"
fi
[uwsgi]
plugin = python3
manage-script-name = true
uid = uffd
gid = uffd
vacuum = true
master = true
process = 2
threads = 4
buffer-size = 8192 ; this buffer is used for http headers and defaults to 4k
single-interpreter = true
need-app = true
env = PYTHONIOENCODING=UTF-8
env = LANG=en_GB.utf8
env = TZ=Europe/Berlin
env = CONFIG_PATH=/etc/uffd/uffd.cfg
chdir = /usr/share/uffd
module = uffd:create_app()
hook-pre-app = exec:FLASK_APP=uffd flask db upgrade