Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • Dockerfile
  • feature_invite_validuntil_minmax
  • incremental-sync
  • jwt_encode_inconsistencies
  • master
  • redis-rate-limits
  • roles-recursive-cte
  • typehints
  • v1.0.x
  • v1.1.x
  • v1.2.x
  • v1.x.x
  • v0.1.2
  • v0.1.4
  • v0.1.5
  • v0.2.0
  • v0.3.0
  • v1.0.0
  • v1.0.1
  • v1.0.2
  • v1.1.0
  • v1.1.1
  • v1.1.2
  • v1.2.0
  • v2.0.0
  • v2.0.1
  • v2.1.0
  • v2.2.0
  • v2.3.0
  • v2.3.1
30 results

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Select Git revision
  • Dockerfile
  • claims-in-idtoke
  • feature_invite_validuntil_minmax
  • incremental-sync
  • jwt_encode_inconsistencies
  • master
  • recovery-code-pwhash
  • redis-rate-limits
  • roles-recursive-cte
  • typehints
  • v1.0.x
  • v1.1.x
  • v1.2.x
  • v1.x.x
  • v0.1.2
  • v0.1.4
  • v0.1.5
  • v0.2.0
  • v0.3.0
  • v1.0.0
  • v1.0.1
  • v1.0.2
  • v1.1.0
  • v1.1.1
  • v1.1.2
  • v1.2.0
  • v2.0.0
  • v2.0.1
  • v2.1.0
  • v2.2.0
  • v2.3.0
  • v2.3.1
32 results
Show changes
import functools
from flask import Blueprint, render_template, request, url_for, redirect, current_app, abort
from flask_babel import lazy_gettext
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.database import db
from uffd.models import User, Service, ServiceUser, get_services, Group, OAuth2Client, OAuth2LogoutURI, APIClient, RemailerMode
from .session import login_required
bp = Blueprint('service', __name__, template_folder='templates')
bp.add_app_template_global(RemailerMode, 'RemailerMode')
def admin_acl():
return request.user and request.user.is_in_group(current_app.config['ACL_ADMIN_GROUP'])
def overview_login_maybe_required(func):
@functools.wraps(func)
def decorator(*args, **kwargs):
if not current_app.config['SERVICES']:
return login_required(admin_acl)(func)(*args, **kwargs)
if not current_app.config['SERVICES_PUBLIC']:
return login_required()(func)(*args, **kwargs)
return func(*args, **kwargs)
return decorator
def overview_navbar_visible():
return get_services(request.user) or admin_acl()
@bp.route('/services/')
@register_navbar(lazy_gettext('Services'), icon='sitemap', blueprint=bp, visible=overview_navbar_visible)
@overview_login_maybe_required
def overview():
services = get_services(request.user)
banner = ''
if request.user or current_app.config['SERVICES_BANNER_PUBLIC']:
banner = current_app.config['SERVICES_BANNER']
return render_template('service/overview.html', services=services, banner=banner)
@bp.route('/service/admin')
@login_required(admin_acl)
def index():
return render_template('service/index.html', services=Service.query.all())
@bp.route('/service/new')
@bp.route('/service/<int:id>')
@login_required(admin_acl)
def show(id=None):
service = Service() if id is None else Service.query.get_or_404(id)
remailer_overwrites = []
if id is not None:
# pylint: disable=singleton-comparison
remailer_overwrites = ServiceUser.query.filter(
ServiceUser.service_id == id,
ServiceUser.remailer_overwrite_mode != None
).all()
all_groups = Group.query.all()
return render_template('service/show.html', service=service, all_groups=all_groups, remailer_overwrites=remailer_overwrites)
@bp.route('/service/new', methods=['POST'])
@bp.route('/service/<int:id>', methods=['POST'])
@csrf_protect(blueprint=bp)
@login_required(admin_acl)
def edit_submit(id=None):
if id is None:
service = Service()
db.session.add(service)
else:
service = Service.query.get_or_404(id)
service.name = request.form['name']
if not request.form['access-group']:
service.limit_access = True
service.access_group = None
elif request.form['access-group'] == 'all':
service.limit_access = False
service.access_group = None
else:
service.limit_access = True
service.access_group = Group.query.get(request.form['access-group'])
service.hide_deactivated_users = request.form.get('hide_deactivated_users') == '1'
service.enable_email_preferences = request.form.get('enable_email_preferences') == '1'
service.remailer_mode = RemailerMode[request.form['remailer-mode']]
remailer_overwrite_mode = RemailerMode[request.form['remailer-overwrite-mode']]
remailer_overwrite_user_ids = [
User.query.filter_by(loginname=loginname.strip()).one().id
for loginname in request.form['remailer-overwrite-users'].split(',') if loginname.strip()
]
# pylint: disable=singleton-comparison
service_users = ServiceUser.query.filter(
ServiceUser.service == service,
db.or_(
ServiceUser.user_id.in_(remailer_overwrite_user_ids),
ServiceUser.remailer_overwrite_mode != None,
)
)
for service_user in service_users:
if service_user.user_id in remailer_overwrite_user_ids:
service_user.remailer_overwrite_mode = remailer_overwrite_mode
else:
service_user.remailer_overwrite_mode = None
db.session.commit()
return redirect(url_for('service.show', id=service.id))
@bp.route('/service/<int:id>/delete')
@csrf_protect(blueprint=bp)
@login_required(admin_acl)
def delete(id):
service = Service.query.get_or_404(id)
db.session.delete(service)
db.session.commit()
return redirect(url_for('service.index'))
@bp.route('/service/<int:service_id>/oauth2/new')
@bp.route('/service/<int:service_id>/oauth2/<int:db_id>')
@login_required(admin_acl)
def oauth2_show(service_id, db_id=None):
service = Service.query.get_or_404(service_id)
client = OAuth2Client() if db_id is None else OAuth2Client.query.filter_by(service_id=service_id, db_id=db_id).first_or_404()
return render_template('service/oauth2.html', service=service, client=client)
@bp.route('/service/<int:service_id>/oauth2/new', methods=['POST'])
@bp.route('/service/<int:service_id>/oauth2/<int:db_id>', methods=['POST'])
@csrf_protect(blueprint=bp)
@login_required(admin_acl)
def oauth2_submit(service_id, db_id=None):
service = Service.query.get_or_404(service_id)
if db_id is None:
client = OAuth2Client(service=service)
db.session.add(client)
else:
client = OAuth2Client.query.filter_by(service_id=service_id, db_id=db_id).first_or_404()
client.client_id = request.form['client_id']
if request.form['client_secret']:
client.client_secret = request.form['client_secret']
if not client.client_secret:
abort(400)
client.redirect_uris = [x.strip() for x in request.form['redirect_uris'].split('\n') if x.strip()]
client.logout_uris = []
for line in request.form['logout_uris'].split('\n'):
line = line.strip()
if not line:
continue
method, uri = line.split(' ', 2)
client.logout_uris.append(OAuth2LogoutURI(method=method, uri=uri))
db.session.commit()
return redirect(url_for('service.show', id=service.id))
@bp.route('/service/<int:service_id>/oauth2/<int:db_id>/delete')
@csrf_protect(blueprint=bp)
@login_required(admin_acl)
def oauth2_delete(service_id, db_id=None):
service = Service.query.get_or_404(service_id)
client = OAuth2Client.query.filter_by(service_id=service_id, db_id=db_id).first_or_404()
db.session.delete(client)
db.session.commit()
return redirect(url_for('service.show', id=service.id))
@bp.route('/service/<int:service_id>/api/new')
@bp.route('/service/<int:service_id>/api/<int:id>')
@login_required(admin_acl)
def api_show(service_id, id=None):
service = Service.query.get_or_404(service_id)
client = APIClient() if id is None else APIClient.query.filter_by(service_id=service_id, id=id).first_or_404()
return render_template('service/api.html', service=service, client=client)
@bp.route('/service/<int:service_id>/api/new', methods=['POST'])
@bp.route('/service/<int:service_id>/api/<int:id>', methods=['POST'])
@csrf_protect(blueprint=bp)
@login_required(admin_acl)
def api_submit(service_id, id=None):
service = Service.query.get_or_404(service_id)
if id is None:
client = APIClient(service=service)
db.session.add(client)
else:
client = APIClient.query.filter_by(service_id=service_id, id=id).first_or_404()
client.auth_username = request.form['auth_username']
if request.form['auth_password']:
client.auth_password = request.form['auth_password']
if not client.auth_password:
abort(400)
client.perm_users = request.form.get('perm_users') == '1'
client.perm_checkpassword = request.form.get('perm_checkpassword') == '1'
client.perm_mail_aliases = request.form.get('perm_mail_aliases') == '1'
client.perm_remailer = request.form.get('perm_remailer') == '1'
client.perm_metrics = request.form.get('perm_metrics') == '1'
db.session.commit()
return redirect(url_for('service.show', id=service.id))
@bp.route('/service/<int:service_id>/api/<int:id>/delete')
@csrf_protect(blueprint=bp)
@login_required(admin_acl)
def api_delete(service_id, id=None):
service = Service.query.get_or_404(service_id)
client = APIClient.query.filter_by(service_id=service_id, id=id).first_or_404()
db.session.delete(client)
db.session.commit()
return redirect(url_for('service.show', id=service.id))
......@@ -8,57 +8,74 @@ from flask_babel import gettext as _
from uffd.database import db
from uffd.csrf import csrf_protect
from uffd.secure_redirect import secure_local_redirect
from uffd.user.models import User
from uffd.ratelimit import Ratelimit, host_ratelimit, format_delay
from uffd.session.models import DeviceLoginInitiation, DeviceLoginConfirmation
from uffd.models import User, DeviceLoginInitiation, DeviceLoginConfirmation, Ratelimit, host_ratelimit, format_delay, Session
from uffd.fido2_compat import * # pylint: disable=wildcard-import,unused-wildcard-import
bp = Blueprint("session", __name__, template_folder='templates', url_prefix='/')
login_ratelimit = Ratelimit('login', 1*60, 3)
mfa_ratelimit = Ratelimit('mfa', 1*60, 3)
@bp.before_app_request
def set_request_user():
request.user = None
request.user_pre_mfa = None
if 'user_id' not in session:
request.session = None
request.session_pre_mfa = None
if 'id' not in session:
return
if 'logintime' not in session:
if 'secret' not in session:
return
if datetime.datetime.now().timestamp() > session['logintime'] + current_app.config['SESSION_LIFETIME_SECONDS']:
_session = Session.query.get(session['id'])
if _session is None or not _session.secret.verify(session['secret']) or _session.expired:
return
user = User.query.get(session['user_id'])
if not user or not user.is_in_group(current_app.config['ACL_ACCESS_GROUP']):
if _session.last_used <= datetime.datetime.utcnow() - datetime.timedelta(seconds=60):
_session.last_used = datetime.datetime.utcnow()
_session.ip_address = request.remote_addr
_session.user_agent = request.user_agent.string
db.session.commit()
if _session.user.is_deactivated or not _session.user.is_in_group(current_app.config['ACL_ACCESS_GROUP']):
return
request.user_pre_mfa = user
if session.get('user_mfa'):
request.user = user
def login_get_user(loginname, password):
user = User.query.filter_by(loginname=loginname).one_or_none()
if user is None or not user.check_password(password):
return None
return user
request.session_pre_mfa = _session
request.user_pre_mfa = _session.user
if _session.mfa_done:
request.session = _session
request.user = _session.user
@bp.route("/logout")
def logout():
# The oauth2 module takes data from `session` and injects it into the url,
# so we need to build the url BEFORE we clear the session!
resp = redirect(url_for('oauth2.logout', ref=request.values.get('ref', url_for('.login'))))
if request.session_pre_mfa:
db.session.delete(request.session_pre_mfa)
db.session.commit()
session.clear()
return resp
def set_session(user, skip_mfa=False):
session.clear()
session['user_id'] = user.id
session['logintime'] = datetime.datetime.now().timestamp()
session['_csrf_token'] = secrets.token_hex(128)
session.permanent = True
secret = secrets.token_hex(128)
_session = Session(
user=user,
secret=secret,
ip_address=request.remote_addr,
user_agent=request.user_agent.string,
)
if skip_mfa:
session['user_mfa'] = True
_session.mfa_done = True
db.session.add(_session)
db.session.commit()
session['id'] = _session.id
session['secret'] = secret
session['_csrf_token'] = secrets.token_hex(128)
@bp.route("/login", methods=('GET', 'POST'))
def login():
# pylint: disable=too-many-return-statements
if request.user_pre_mfa:
return redirect(url_for('mfa.auth', ref=request.values.get('ref', url_for('index'))))
return redirect(url_for('session.mfa_auth', ref=request.values.get('ref', url_for('index'))))
if request.method == 'GET':
return render_template('session/login.html', ref=request.values.get('ref'))
......@@ -72,17 +89,24 @@ def login():
else:
flash(_('We received too many requests from your ip address/network! Please wait at least %(delay)s.', delay=format_delay(host_delay)))
return render_template('session/login.html', ref=request.values.get('ref'))
user = login_get_user(username, password)
if user is None:
user = User.query.filter_by(loginname=username).one_or_none()
if user is None or not user.password.verify(password):
login_ratelimit.log(username)
host_ratelimit.log()
flash(_('Login name or password is wrong'))
return render_template('session/login.html', ref=request.values.get('ref'))
if user.is_deactivated:
flash(_('Your account is deactivated. Contact %(contact_email)s for details.', contact_email=current_app.config['ORGANISATION_CONTACT']))
return render_template('session/login.html', ref=request.values.get('ref'))
if user.password.needs_rehash:
user.password = password
db.session.commit()
if not user.is_in_group(current_app.config['ACL_ACCESS_GROUP']):
flash(_('You do not have access to this service'))
return render_template('session/login.html', ref=request.values.get('ref'))
set_session(user)
return redirect(url_for('mfa.auth', ref=request.values.get('ref', url_for('index'))))
return redirect(url_for('session.mfa_auth', ref=request.values.get('ref', url_for('index'))))
def login_required_pre_mfa(no_redirect=False):
def wrapper(func):
......@@ -105,13 +129,93 @@ def login_required(permission_check=lambda: True):
flash(_('You need to login first'))
return redirect(url_for('session.login', ref=request.full_path))
if not request.user:
return redirect(url_for('mfa.auth', ref=request.full_path))
return redirect(url_for('session.mfa_auth', ref=request.full_path))
if not permission_check():
abort(403)
return func(*args, **kwargs)
return decorator
return wrapper
@bp.route('/mfa/auth', methods=['GET'])
@login_required_pre_mfa()
def mfa_auth():
if not request.user_pre_mfa.mfa_enabled:
request.session_pre_mfa.mfa_done = True
db.session.commit()
set_request_user()
if request.session_pre_mfa.mfa_done:
return secure_local_redirect(request.values.get('ref', url_for('index')))
return render_template('session/mfa_auth.html', ref=request.values.get('ref'))
@bp.route('/mfa/auth', methods=['POST'])
@login_required_pre_mfa()
def mfa_auth_finish():
delay = mfa_ratelimit.get_delay(request.user_pre_mfa.id)
if delay:
flash(_('We received too many invalid attempts! Please wait at least %s.')%format_delay(delay))
return redirect(url_for('session.mfa_auth', ref=request.values.get('ref')))
for method in request.user_pre_mfa.mfa_totp_methods:
if method.verify(request.form['code']):
request.session_pre_mfa.mfa_done = True
db.session.commit()
set_request_user()
return secure_local_redirect(request.values.get('ref', url_for('index')))
for method in request.user_pre_mfa.mfa_recovery_codes:
if method.verify(request.form['code']):
db.session.delete(method)
request.session_pre_mfa.mfa_done = True
db.session.commit()
set_request_user()
if len(request.user_pre_mfa.mfa_recovery_codes) <= 1:
flash(_('You have exhausted your recovery codes. Please generate new ones now!'))
return redirect(url_for('selfservice.setup_mfa'))
if len(request.user_pre_mfa.mfa_recovery_codes) <= 5:
flash(_('You only have a few recovery codes remaining. Make sure to generate new ones before they run out.'))
return redirect(url_for('selfservice.setup_mfa'))
return secure_local_redirect(request.values.get('ref', url_for('index')))
mfa_ratelimit.log(request.user_pre_mfa.id)
flash(_('Two-factor authentication failed'))
return redirect(url_for('session.mfa_auth', ref=request.values.get('ref')))
if WEBAUTHN_SUPPORTED:
@bp.route("/mfa/auth/webauthn/begin", methods=["POST"])
@login_required_pre_mfa(no_redirect=True)
def mfa_auth_webauthn_begin():
server = get_webauthn_server()
creds = [method.cred for method in request.user_pre_mfa.mfa_webauthn_methods]
if not creds:
abort(404)
auth_data, state = server.authenticate_begin(creds, user_verification='discouraged')
session["webauthn-state"] = state
return cbor.encode(auth_data)
@bp.route("/mfa/auth/webauthn/complete", methods=["POST"])
@login_required_pre_mfa(no_redirect=True)
def mfa_auth_webauthn_complete():
server = get_webauthn_server()
creds = [method.cred for method in request.user_pre_mfa.mfa_webauthn_methods]
if not creds:
abort(404)
data = cbor.decode(request.get_data())
credential_id = data["credentialId"]
client_data = ClientData(data["clientDataJSON"])
auth_data = AuthenticatorData(data["authenticatorData"])
signature = data["signature"]
# authenticate_complete() (as of python-fido2 v0.5.0, the version in Debian Buster)
# does not check signCount, although the spec recommends it
server.authenticate_complete(
session.pop("webauthn-state"),
creds,
credential_id,
client_data,
auth_data,
signature,
)
request.session_pre_mfa.mfa_done = True
db.session.commit()
set_request_user()
return cbor.encode({"status": "OK"})
@bp.route("/login/device/start")
def devicelogin_start():
session['devicelogin_started'] = True
......@@ -157,12 +261,12 @@ def deviceauth():
@login_required()
@csrf_protect(blueprint=bp)
def deviceauth_submit():
DeviceLoginConfirmation.query.filter_by(user=request.user).delete()
DeviceLoginConfirmation.query.filter_by(session=request.session).delete()
initiation = DeviceLoginInitiation.query.filter_by(code=request.form['initiation-code']).one_or_none()
if initiation is None or initiation.expired:
flash(_('Invalid initiation code'))
return redirect(url_for('session.deviceauth'))
confirmation = DeviceLoginConfirmation(user=request.user, initiation=initiation)
confirmation = DeviceLoginConfirmation(session=request.session, initiation=initiation)
db.session.add(confirmation)
db.session.commit()
return render_template('session/deviceauth.html', initiation=initiation, confirmation=confirmation)
......@@ -170,6 +274,6 @@ def deviceauth_submit():
@bp.route("/device/finish", methods=['GET', 'POST'])
@login_required()
def deviceauth_finish():
DeviceLoginConfirmation.query.filter_by(user=request.user).delete()
DeviceLoginConfirmation.query.filter_by(session=request.session).delete()
db.session.commit()
return redirect(url_for('index'))
import functools
import secrets
import datetime
from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, jsonify
from flask_babel import gettext as _
from uffd.database import db
from uffd.session import set_session
from uffd.user.models import User
from uffd.sendmail import sendmail
from uffd.signup.models import Signup
from uffd.ratelimit import Ratelimit, host_ratelimit, format_delay
from uffd.database import db
from uffd.models import User, Signup, Ratelimit, host_ratelimit, format_delay
from .session import set_session
bp = Blueprint('signup', __name__, template_folder='templates', url_prefix='/signup/')
......@@ -21,7 +18,7 @@ def signup_enabled(func):
@functools.wraps(func)
def decorator(*args, **kwargs):
if not current_app.config['SELF_SIGNUP']:
flash(_('Singup not enabled'))
flash(_('Signup not enabled'))
return redirect(url_for('index'))
return func(*args, **kwargs)
return decorator
......@@ -47,42 +44,38 @@ def signup_check():
@signup_enabled
def signup_submit():
if request.form['password1'] != request.form['password2']:
return render_template('signup/start.html', error=_('Passwords do not match'))
flash(_('Passwords do not match'), 'error')
return render_template('signup/start.html')
signup_delay = signup_ratelimit.get_delay(request.form['mail'])
host_delay = host_ratelimit.get_delay()
if signup_delay and signup_delay > host_delay:
return render_template('signup/start.html', error=_('Too many signup requests with this mail address! Please wait %(delay)s.',
delay=format_delay(signup_delay)))
flash(_('Too many signup requests with this mail address! Please wait %(delay)s.',
delay=format_delay(signup_delay)), 'error')
return render_template('signup/start.html')
if host_delay:
return render_template('signup/start.html', error=_('Too many requests! Please wait %(delay)s.', delay=format_delay(host_delay)))
flash(_('Too many requests! Please wait %(delay)s.', delay=format_delay(host_delay)), 'error')
return render_template('signup/start.html')
host_ratelimit.log()
signup = Signup(loginname=request.form['loginname'],
displayname=request.form['displayname'],
mail=request.form['mail'], password=request.form['password1'])
mail=request.form['mail'])
# If the password is invalid, signup.set_password returns False and does not
# set signup.password. We don't need to check the return value here, because
# we call signup.verify next and that checks if signup.password is set.
signup.set_password(request.form['password1'])
valid, msg = signup.validate()
if not valid:
return render_template('signup/start.html', error=msg)
flash(msg, 'error')
return render_template('signup/start.html')
db.session.add(signup)
db.session.commit()
sent = sendmail(signup.mail, 'Confirm your mail address', 'signup/mail.txt', signup=signup)
if not sent:
return render_template('signup/start.html', error=_('Cound not send mail'))
flash(_('Could not send mail'), 'error')
return render_template('signup/start.html')
signup_ratelimit.log(request.form['mail'])
return render_template('signup/submitted.html', signup=signup)
# Deprecated
@bp.route('/confirm/<token>')
def signup_confirm_legacy(token):
matching_signup = None
filter_expr = Signup.created >= (datetime.datetime.now() - datetime.timedelta(hours=48))
for signup in Signup.query.filter(filter_expr):
if secrets.compare_digest(signup.token, token):
matching_signup = signup
if not matching_signup:
flash(_('Invalid signup link'))
return redirect(url_for('session.login'))
return redirect(url_for('signup.signup_confirm', signup_id=matching_signup.id, token=token))
# signup_confirm* views are always accessible so other modules (e.g. invite) can reuse them
@bp.route('/confirm/<int:signup_id>/<token>')
def signup_confirm(signup_id, token):
......@@ -101,17 +94,21 @@ def signup_confirm_submit(signup_id, token):
confirm_delay = confirm_ratelimit.get_delay(token)
host_delay = host_ratelimit.get_delay()
if confirm_delay and confirm_delay > host_delay:
return render_template('signup/confirm.html', signup=signup, error=_('Too many failed attempts! Please wait %(delay)s.', delay=format_delay(confirm_delay)))
flash(_('Too many failed attempts! Please wait %(delay)s.', delay=format_delay(confirm_delay)), 'error')
return render_template('signup/confirm.html', signup=signup)
if host_delay:
return render_template('signup/confirm.html', signup=signup, error=_('Too many requests! Please wait %(delay)s.', delay=format_delay(host_delay)))
if not signup.check_password(request.form['password']):
return render_template('signup/confirm.html', signup=signup)
if not signup.password.verify(request.form['password']):
host_ratelimit.log()
confirm_ratelimit.log(token)
return render_template('signup/confirm.html', signup=signup, error=_('Wrong password'))
flash(_('Wrong password'), 'error')
return render_template('signup/confirm.html', signup=signup)
user, msg = signup.finish(request.form['password'])
if user is None:
return render_template('signup/confirm.html', signup=signup, error=msg)
db.session.rollback()
flash(msg, 'error')
return render_template('signup/confirm.html', signup=signup)
db.session.commit()
set_session(user, skip_mfa=True)
flash(_('Your account was successfully created'))
return redirect(url_for('selfservice.index'))
return redirect(url_for('index'))
......@@ -7,16 +7,16 @@ from sqlalchemy.exc import IntegrityError
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.selfservice import send_passwordreset
from uffd.session import login_required
from uffd.role.models import Role
from uffd.remailer import remailer
from uffd.database import db
from .models import User
from uffd.models import User, UserEmail, Role, MFAMethod
from .selfservice import send_passwordreset
from .session import login_required
bp = Blueprint("user", __name__, template_folder='templates', url_prefix='/user/')
bp.add_app_template_global(User, 'User')
bp.add_app_template_global(remailer, 'remailer')
def user_acl_check():
return request.user and request.user.is_in_group(current_app.config['ACL_ADMIN_GROUP'])
......@@ -27,7 +27,7 @@ def user_acl():
pass
@bp.route("/")
@register_navbar(21, lazy_gettext('Users'), icon='users', blueprint=bp, visible=user_acl_check)
@register_navbar(lazy_gettext('Users'), icon='users', blueprint=bp, visible=user_acl_check)
def index():
return render_template('user/list.html', users=User.query.all())
......@@ -37,33 +37,103 @@ def show(id=None):
user = User() if id is None else User.query.get_or_404(id)
return render_template('user/show.html', user=user, roles=Role.query.all())
@bp.route("/<int:id>/update", methods=['POST'])
@bp.route("/new", methods=['POST'])
@csrf_protect(blueprint=bp)
def update(id=None):
if id is None:
def create():
user = User()
ignore_blocklist = request.form.get('ignore-loginname-blocklist', False)
if request.form.get('serviceaccount'):
user.is_service_user = True
ignore_blocklist = request.form.get('ignore-loginname-blocklist', False)
if not user.set_loginname(request.form['loginname'], ignore_blocklist=ignore_blocklist):
flash(_('Login name does not meet requirements'))
return redirect(url_for('user.show'))
if not user.set_primary_email_address(request.form['email']):
flash(_('E-Mail address is invalid'))
return redirect(url_for('user.show'))
new_displayname = request.form['displayname'] if request.form['displayname'] else request.form['loginname']
if user.displayname != new_displayname and not user.set_displayname(new_displayname):
flash(_('Display name does not meet requirements'))
return redirect(url_for('user.show'))
db.session.add(user)
try:
db.session.flush()
except IntegrityError:
flash(_('Login name or e-mail address is already in use'))
return redirect(url_for('user.show'))
for role in Role.query.all():
if not user.is_service_user and role.is_default:
continue
if request.values.get('role-{}'.format(role.id), False):
user.roles.append(role)
user.update_groups()
db.session.commit()
if user.is_service_user:
flash(_('Service user created'))
else:
send_passwordreset(user, new=True)
flash(_('User created. We sent the user a password reset link by e-mail'))
return redirect(url_for('user.show', id=user.id))
@bp.route("/<int:id>/update", methods=['POST'])
@csrf_protect(blueprint=bp)
def update(id):
# pylint: disable=too-many-branches,too-many-statements
user = User.query.get_or_404(id)
if user.mail != request.form['mail'] and not user.set_mail(request.form['mail']):
flash(_('Mail is invalid'))
for email in user.all_emails:
if f'email-{email.id}-present' in request.form:
email.verified = email.verified or (request.form.get(f'email-{email.id}-verified') == '1')
for key, value in request.form.items():
parts = key.split('-')
if not parts[0] == 'newemail' or not parts[2] == 'address' or not value:
continue
tmp_id = parts[1]
email = UserEmail(
user=user,
verified=(request.form.get(f'newemail-{tmp_id}-verified') == '1'),
)
if not email.set_address(value):
flash(_('E-Mail address is invalid'))
return redirect(url_for('user.show', id=id))
db.session.add(email)
try:
db.session.flush()
except IntegrityError:
flash(_('E-Mail address already exists or is used by another account'))
return redirect(url_for('user.show', id=id))
verified_emails = UserEmail.query.filter_by(user=user, verified=True)
user.primary_email = verified_emails.filter_by(id=request.form['primary_email']).one()
if request.form['recovery_email'] == 'primary':
user.recovery_email = None
else:
user.recovery_email = verified_emails.filter_by(id=request.form['recovery_email']).one()
for service_user in user.service_users:
if not service_user.has_email_preferences:
continue
value = request.form.get(f'service_{service_user.service.id}_email', 'primary')
if value == 'primary':
service_user.service_email = None
else:
service_user.service_email = verified_emails.filter_by(id=value).one()
for email in user.all_emails:
if request.form.get(f'email-{email.id}-delete') == '1':
db.session.delete(email)
new_displayname = request.form['displayname'] if request.form['displayname'] else request.form['loginname']
if user.displayname != new_displayname and not user.set_displayname(new_displayname):
flash(_('Display name does not meet requirements'))
return redirect(url_for('user.show', id=id))
new_password = request.form.get('password')
if id is not None and new_password:
if new_password:
if not user.set_password(new_password):
flash(_('Password is invalid'))
return redirect(url_for('user.show', id=id))
db.session.add(user)
user.roles.clear()
for role in Role.query.all():
if not user.is_service_user and role.is_default:
......@@ -71,17 +141,48 @@ def update(id=None):
if request.values.get('role-{}'.format(role.id), False):
user.roles.append(role)
user.update_groups()
db.session.commit()
if id is None:
if user.is_service_user:
flash(_('Service user created'))
else:
send_passwordreset(user, new=True)
flash(_('User created. We sent the user a password reset link by mail'))
else:
flash(_('User updated'))
return redirect(url_for('user.show', id=user.id))
@bp.route('/<int:id>/deactivate')
@csrf_protect(blueprint=bp)
def deactivate(id):
user = User.query.get_or_404(id)
user.is_deactivated = True
db.session.commit()
flash(_('User deactivated'))
return redirect(url_for('user.show', id=user.id))
@bp.route('/<int:id>/activate')
@csrf_protect(blueprint=bp)
def activate(id):
user = User.query.get_or_404(id)
user.is_deactivated = False
db.session.commit()
flash(_('User activated'))
return redirect(url_for('user.show', id=user.id))
@bp.route('/<int:id>/mfa/disable')
@csrf_protect(blueprint=bp)
def disable_mfa(id):
user = User.query.get_or_404(id)
MFAMethod.query.filter_by(user=user).delete()
user.update_groups()
db.session.commit()
flash(_('Two-factor authentication was reset'))
return redirect(url_for('user.show', id=id))
@bp.route('/<int:id>/sessions/revoke')
@csrf_protect(blueprint=bp)
def revoke_sessions(id):
user = User.query.get_or_404(id)
user.sessions.clear()
db.session.commit()
flash(_('Sessions revoked'))
return redirect(url_for('user.show', id=user.id))
@bp.route("/<int:id>/del")
@csrf_protect(blueprint=bp)
def delete(id):
......@@ -114,8 +215,8 @@ def csvimport():
if not newuser.set_loginname(row[0], ignore_blocklist=ignore_blocklist) or not newuser.set_displayname(row[0]):
flash("invalid login name, skipped : {}".format(row))
continue
if not newuser.set_mail(row[1]):
flash("invalid mail address, skipped : {}".format(row))
if not newuser.set_primary_email_address(row[1]):
flash("invalid e-mail address, skipped : {}".format(row))
continue
db.session.add(newuser)
for role in roles:
......
[uwsgi]
plugin = python3
manage-script-name = true
workers = %k*2
uid = uffd
gid = uffd
vacuum = true
master = true
process = 2
threads = 4
buffer-size = 8192 ; this buffer is used for http headers and defaults to 4k
single-interpreter = true
need-app = true
env = PYTHONIOENCODING=UTF-8
env = LANG=en_GB.utf8
env = TZ=Europe/Berlin
env = CONFIG_FILENAME=/etc/uffd/uffd.cfg
env = CONFIG_PATH=/etc/uffd/uffd.cfg
chdir = /usr/share/uffd
module = uffd:create_app()
......