Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • Dockerfile
  • feature_invite_validuntil_minmax
  • incremental-sync
  • jwt_encode_inconsistencies
  • master
  • redis-rate-limits
  • roles-recursive-cte
  • typehints
  • v1.0.x
  • v1.1.x
  • v1.2.x
  • v1.x.x
  • v0.1.2
  • v0.1.4
  • v0.1.5
  • v0.2.0
  • v0.3.0
  • v1.0.0
  • v1.0.1
  • v1.0.2
  • v1.1.0
  • v1.1.1
  • v1.1.2
  • v1.2.0
  • v2.0.0
  • v2.0.1
  • v2.1.0
  • v2.2.0
  • v2.3.0
  • v2.3.1
30 results

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Select Git revision
  • Dockerfile
  • claims-in-idtoke
  • feature_invite_validuntil_minmax
  • incremental-sync
  • jwt_encode_inconsistencies
  • master
  • recovery-code-pwhash
  • redis-rate-limits
  • roles-recursive-cte
  • typehints
  • v1.0.x
  • v1.1.x
  • v1.2.x
  • v1.x.x
  • v0.1.2
  • v0.1.4
  • v0.1.5
  • v0.2.0
  • v0.3.0
  • v1.0.0
  • v1.0.1
  • v1.0.2
  • v1.1.0
  • v1.1.1
  • v1.1.2
  • v1.2.0
  • v2.0.0
  • v2.0.1
  • v2.1.0
  • v2.2.0
  • v2.3.0
  • v2.3.1
32 results
Show changes
import secrets
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, abort
from flask import Blueprint, render_template, session, request, url_for, redirect, flash, current_app, abort
from flask_babel import gettext as _, lazy_gettext
from sqlalchemy.exc import IntegrityError
......@@ -8,7 +8,12 @@ from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.sendmail import sendmail
from uffd.database import db
from uffd.models import User, UserEmail, PasswordToken, Role, host_ratelimit, Ratelimit, format_delay
from uffd.models import (
User, UserEmail, PasswordToken, Role, host_ratelimit, Ratelimit, format_delay,
Session, MFAMethod, TOTPMethod, WebauthnMethod, RecoveryCodeMethod,
)
from uffd.fido2_compat import * # pylint: disable=wildcard-import,unused-wildcard-import
from .session import login_required
bp = Blueprint("selfservice", __name__, template_folder='templates', url_prefix='/self/')
......@@ -68,7 +73,7 @@ def forgot_password():
reset_ratelimit.log(loginname+'/'+mail)
host_ratelimit.log()
flash(_("We sent a mail to this user's mail address if you entered the correct mail and login name combination"))
user = User.query.filter_by(loginname=loginname).one_or_none()
user = User.query.filter_by(loginname=loginname, is_deactivated=False).one_or_none()
if not user:
return redirect(url_for('session.login'))
matches = any(map(lambda email: secrets.compare_digest(email.address, mail), user.verified_emails))
......@@ -198,6 +203,16 @@ def update_email_preferences():
flash(_('E-Mail preferences updated'))
return redirect(url_for('selfservice.index'))
@bp.route("/session/<int:session_id>/revoke", methods=(['POST']))
@csrf_protect(blueprint=bp)
@login_required(selfservice_acl_check)
def revoke_session(session_id):
_session = Session.query.filter_by(id=session_id, user=request.user).first_or_404()
db.session.delete(_session)
db.session.commit()
flash(_('Session revoked'))
return redirect(url_for('selfservice.index'))
@bp.route("/leaverole/<int:roleid>", methods=(['POST']))
@csrf_protect(blueprint=bp)
@login_required(selfservice_acl_check)
......@@ -225,3 +240,119 @@ def send_passwordreset(user, new=False):
email = user.recovery_email or user.primary_email
if not sendmail(email.address, subject, template, user=user, token=token):
flash(_('E-Mail to "%(mail_address)s" could not be sent!', mail_address=email.address))
@bp.route('/mfa/', methods=['GET'])
@login_required(selfservice_acl_check)
def setup_mfa():
return render_template('selfservice/setup_mfa.html')
@bp.route('/mfa/setup/disable', methods=['GET'])
@login_required(selfservice_acl_check)
def disable_mfa():
return render_template('selfservice/disable_mfa.html')
@bp.route('/mfa/setup/disable', methods=['POST'])
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def disable_mfa_confirm():
MFAMethod.query.filter_by(user=request.user).delete()
db.session.commit()
request.user.update_groups()
db.session.commit()
return redirect(url_for('selfservice.setup_mfa'))
@bp.route('/mfa/setup/recovery', methods=['POST'])
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def setup_mfa_recovery():
for method in RecoveryCodeMethod.query.filter_by(user=request.user).all():
db.session.delete(method)
methods = []
for _ in range(10):
method = RecoveryCodeMethod(request.user)
methods.append(method)
db.session.add(method)
db.session.commit()
return render_template('selfservice/setup_mfa_recovery.html', methods=methods)
@bp.route('/mfa/setup/totp', methods=['GET'])
@login_required(selfservice_acl_check)
def setup_mfa_totp():
method = TOTPMethod(request.user)
session['mfa_totp_key'] = method.key
return render_template('selfservice/setup_mfa_totp.html', method=method, name=request.values['name'])
@bp.route('/mfa/setup/totp', methods=['POST'])
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def setup_mfa_totp_finish():
if not RecoveryCodeMethod.query.filter_by(user=request.user).all():
flash(_('Generate recovery codes first!'))
return redirect(url_for('selfservice.setup_mfa'))
method = TOTPMethod(request.user, name=request.values['name'], key=session.pop('mfa_totp_key'))
if method.verify(request.form['code']):
db.session.add(method)
request.user.update_groups()
db.session.commit()
return redirect(url_for('selfservice.setup_mfa'))
flash(_('Code is invalid'))
return redirect(url_for('selfservice.setup_mfa_totp', name=request.values['name']))
@bp.route('/mfa/setup/totp/<int:id>/delete')
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def delete_mfa_totp(id): #pylint: disable=redefined-builtin
method = TOTPMethod.query.filter_by(user=request.user, id=id).first_or_404()
db.session.delete(method)
request.user.update_groups()
db.session.commit()
return redirect(url_for('selfservice.setup_mfa'))
bp.add_app_template_global(WEBAUTHN_SUPPORTED, name='webauthn_supported')
if WEBAUTHN_SUPPORTED:
@bp.route('/mfa/setup/webauthn/begin', methods=['POST'])
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def setup_mfa_webauthn_begin():
if not RecoveryCodeMethod.query.filter_by(user=request.user).all():
abort(403)
methods = WebauthnMethod.query.filter_by(user=request.user).all()
creds = [method.cred for method in methods]
server = get_webauthn_server()
registration_data, state = server.register_begin(
{
"id": str(request.user.id).encode(),
"name": request.user.loginname,
"displayName": request.user.displayname,
},
creds,
user_verification='discouraged',
)
session["webauthn-state"] = state
return cbor.encode(registration_data)
@bp.route('/mfa/setup/webauthn/complete', methods=['POST'])
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def setup_mfa_webauthn_complete():
server = get_webauthn_server()
data = cbor.decode(request.get_data())
client_data = ClientData(data["clientDataJSON"])
att_obj = AttestationObject(data["attestationObject"])
auth_data = server.register_complete(session["webauthn-state"], client_data, att_obj)
method = WebauthnMethod(request.user, auth_data.credential_data, name=data['name'])
db.session.add(method)
request.user.update_groups()
db.session.commit()
return cbor.encode({"status": "OK"})
@bp.route('/mfa/setup/webauthn/<int:id>/delete')
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def delete_mfa_webauthn(id): #pylint: disable=redefined-builtin
method = WebauthnMethod.query.filter_by(user=request.user, id=id).first_or_404()
db.session.delete(method)
request.user.update_groups()
db.session.commit()
return redirect(url_for('selfservice.setup_mfa'))
......@@ -28,7 +28,7 @@ def overview_login_maybe_required(func):
return decorator
def overview_navbar_visible():
return get_services(request.user) != [] or admin_acl()
return get_services(request.user) or admin_acl()
@bp.route('/services/')
@register_navbar(lazy_gettext('Services'), icon='sitemap', blueprint=bp, visible=overview_navbar_visible)
......@@ -80,6 +80,7 @@ def edit_submit(id=None):
else:
service.limit_access = True
service.access_group = Group.query.get(request.form['access-group'])
service.hide_deactivated_users = request.form.get('hide_deactivated_users') == '1'
service.enable_email_preferences = request.form.get('enable_email_preferences') == '1'
service.remailer_mode = RemailerMode[request.form['remailer-mode']]
remailer_overwrite_mode = RemailerMode[request.form['remailer-overwrite-mode']]
......
......@@ -8,56 +8,74 @@ from flask_babel import gettext as _
from uffd.database import db
from uffd.csrf import csrf_protect
from uffd.secure_redirect import secure_local_redirect
from uffd.models import User, DeviceLoginInitiation, DeviceLoginConfirmation, Ratelimit, host_ratelimit, format_delay
from uffd.models import User, DeviceLoginInitiation, DeviceLoginConfirmation, Ratelimit, host_ratelimit, format_delay, Session
from uffd.fido2_compat import * # pylint: disable=wildcard-import,unused-wildcard-import
bp = Blueprint("session", __name__, template_folder='templates', url_prefix='/')
login_ratelimit = Ratelimit('login', 1*60, 3)
mfa_ratelimit = Ratelimit('mfa', 1*60, 3)
@bp.before_app_request
def set_request_user():
request.user = None
request.user_pre_mfa = None
if 'user_id' not in session:
request.session = None
request.session_pre_mfa = None
if 'id' not in session:
return
if 'logintime' not in session:
if 'secret' not in session:
return
if datetime.datetime.utcnow().timestamp() > session['logintime'] + current_app.config['SESSION_LIFETIME_SECONDS']:
_session = Session.query.get(session['id'])
if _session is None or not _session.secret.verify(session['secret']) or _session.expired:
return
user = User.query.get(session['user_id'])
if not user or not user.is_in_group(current_app.config['ACL_ACCESS_GROUP']):
if _session.last_used <= datetime.datetime.utcnow() - datetime.timedelta(seconds=60):
_session.last_used = datetime.datetime.utcnow()
_session.ip_address = request.remote_addr
_session.user_agent = request.user_agent.string
db.session.commit()
if _session.user.is_deactivated or not _session.user.is_in_group(current_app.config['ACL_ACCESS_GROUP']):
return
request.user_pre_mfa = user
if session.get('user_mfa'):
request.user = user
def login_get_user(loginname, password):
user = User.query.filter_by(loginname=loginname).one_or_none()
if user is None or not user.password.verify(password):
return None
return user
request.session_pre_mfa = _session
request.user_pre_mfa = _session.user
if _session.mfa_done:
request.session = _session
request.user = _session.user
@bp.route("/logout")
def logout():
# The oauth2 module takes data from `session` and injects it into the url,
# so we need to build the url BEFORE we clear the session!
resp = redirect(url_for('oauth2.logout', ref=request.values.get('ref', url_for('.login'))))
if request.session_pre_mfa:
db.session.delete(request.session_pre_mfa)
db.session.commit()
session.clear()
return resp
def set_session(user, skip_mfa=False):
session.clear()
session.permanent = True
session['user_id'] = user.id
session['logintime'] = datetime.datetime.utcnow().timestamp()
session['_csrf_token'] = secrets.token_hex(128)
secret = secrets.token_hex(128)
_session = Session(
user=user,
secret=secret,
ip_address=request.remote_addr,
user_agent=request.user_agent.string,
)
if skip_mfa:
session['user_mfa'] = True
_session.mfa_done = True
db.session.add(_session)
db.session.commit()
session['id'] = _session.id
session['secret'] = secret
session['_csrf_token'] = secrets.token_hex(128)
@bp.route("/login", methods=('GET', 'POST'))
def login():
# pylint: disable=too-many-return-statements
if request.user_pre_mfa:
return redirect(url_for('mfa.auth', ref=request.values.get('ref', url_for('index'))))
return redirect(url_for('session.mfa_auth', ref=request.values.get('ref', url_for('index'))))
if request.method == 'GET':
return render_template('session/login.html', ref=request.values.get('ref'))
......@@ -71,12 +89,16 @@ def login():
else:
flash(_('We received too many requests from your ip address/network! Please wait at least %(delay)s.', delay=format_delay(host_delay)))
return render_template('session/login.html', ref=request.values.get('ref'))
user = login_get_user(username, password)
if user is None:
user = User.query.filter_by(loginname=username).one_or_none()
if user is None or not user.password.verify(password):
login_ratelimit.log(username)
host_ratelimit.log()
flash(_('Login name or password is wrong'))
return render_template('session/login.html', ref=request.values.get('ref'))
if user.is_deactivated:
flash(_('Your account is deactivated. Contact %(contact_email)s for details.', contact_email=current_app.config['ORGANISATION_CONTACT']))
return render_template('session/login.html', ref=request.values.get('ref'))
if user.password.needs_rehash:
user.password = password
db.session.commit()
......@@ -84,7 +106,7 @@ def login():
flash(_('You do not have access to this service'))
return render_template('session/login.html', ref=request.values.get('ref'))
set_session(user)
return redirect(url_for('mfa.auth', ref=request.values.get('ref', url_for('index'))))
return redirect(url_for('session.mfa_auth', ref=request.values.get('ref', url_for('index'))))
def login_required_pre_mfa(no_redirect=False):
def wrapper(func):
......@@ -107,13 +129,93 @@ def login_required(permission_check=lambda: True):
flash(_('You need to login first'))
return redirect(url_for('session.login', ref=request.full_path))
if not request.user:
return redirect(url_for('mfa.auth', ref=request.full_path))
return redirect(url_for('session.mfa_auth', ref=request.full_path))
if not permission_check():
abort(403)
return func(*args, **kwargs)
return decorator
return wrapper
@bp.route('/mfa/auth', methods=['GET'])
@login_required_pre_mfa()
def mfa_auth():
if not request.user_pre_mfa.mfa_enabled:
request.session_pre_mfa.mfa_done = True
db.session.commit()
set_request_user()
if request.session_pre_mfa.mfa_done:
return secure_local_redirect(request.values.get('ref', url_for('index')))
return render_template('session/mfa_auth.html', ref=request.values.get('ref'))
@bp.route('/mfa/auth', methods=['POST'])
@login_required_pre_mfa()
def mfa_auth_finish():
delay = mfa_ratelimit.get_delay(request.user_pre_mfa.id)
if delay:
flash(_('We received too many invalid attempts! Please wait at least %s.')%format_delay(delay))
return redirect(url_for('session.mfa_auth', ref=request.values.get('ref')))
for method in request.user_pre_mfa.mfa_totp_methods:
if method.verify(request.form['code']):
request.session_pre_mfa.mfa_done = True
db.session.commit()
set_request_user()
return secure_local_redirect(request.values.get('ref', url_for('index')))
for method in request.user_pre_mfa.mfa_recovery_codes:
if method.verify(request.form['code']):
db.session.delete(method)
request.session_pre_mfa.mfa_done = True
db.session.commit()
set_request_user()
if len(request.user_pre_mfa.mfa_recovery_codes) <= 1:
flash(_('You have exhausted your recovery codes. Please generate new ones now!'))
return redirect(url_for('selfservice.setup_mfa'))
if len(request.user_pre_mfa.mfa_recovery_codes) <= 5:
flash(_('You only have a few recovery codes remaining. Make sure to generate new ones before they run out.'))
return redirect(url_for('selfservice.setup_mfa'))
return secure_local_redirect(request.values.get('ref', url_for('index')))
mfa_ratelimit.log(request.user_pre_mfa.id)
flash(_('Two-factor authentication failed'))
return redirect(url_for('session.mfa_auth', ref=request.values.get('ref')))
if WEBAUTHN_SUPPORTED:
@bp.route("/mfa/auth/webauthn/begin", methods=["POST"])
@login_required_pre_mfa(no_redirect=True)
def mfa_auth_webauthn_begin():
server = get_webauthn_server()
creds = [method.cred for method in request.user_pre_mfa.mfa_webauthn_methods]
if not creds:
abort(404)
auth_data, state = server.authenticate_begin(creds, user_verification='discouraged')
session["webauthn-state"] = state
return cbor.encode(auth_data)
@bp.route("/mfa/auth/webauthn/complete", methods=["POST"])
@login_required_pre_mfa(no_redirect=True)
def mfa_auth_webauthn_complete():
server = get_webauthn_server()
creds = [method.cred for method in request.user_pre_mfa.mfa_webauthn_methods]
if not creds:
abort(404)
data = cbor.decode(request.get_data())
credential_id = data["credentialId"]
client_data = ClientData(data["clientDataJSON"])
auth_data = AuthenticatorData(data["authenticatorData"])
signature = data["signature"]
# authenticate_complete() (as of python-fido2 v0.5.0, the version in Debian Buster)
# does not check signCount, although the spec recommends it
server.authenticate_complete(
session.pop("webauthn-state"),
creds,
credential_id,
client_data,
auth_data,
signature,
)
request.session_pre_mfa.mfa_done = True
db.session.commit()
set_request_user()
return cbor.encode({"status": "OK"})
@bp.route("/login/device/start")
def devicelogin_start():
session['devicelogin_started'] = True
......@@ -159,12 +261,12 @@ def deviceauth():
@login_required()
@csrf_protect(blueprint=bp)
def deviceauth_submit():
DeviceLoginConfirmation.query.filter_by(user=request.user).delete()
DeviceLoginConfirmation.query.filter_by(session=request.session).delete()
initiation = DeviceLoginInitiation.query.filter_by(code=request.form['initiation-code']).one_or_none()
if initiation is None or initiation.expired:
flash(_('Invalid initiation code'))
return redirect(url_for('session.deviceauth'))
confirmation = DeviceLoginConfirmation(user=request.user, initiation=initiation)
confirmation = DeviceLoginConfirmation(session=request.session, initiation=initiation)
db.session.add(confirmation)
db.session.commit()
return render_template('session/deviceauth.html', initiation=initiation, confirmation=confirmation)
......@@ -172,6 +274,6 @@ def deviceauth_submit():
@bp.route("/device/finish", methods=['GET', 'POST'])
@login_required()
def deviceauth_finish():
DeviceLoginConfirmation.query.filter_by(user=request.user).delete()
DeviceLoginConfirmation.query.filter_by(session=request.session).delete()
db.session.commit()
return redirect(url_for('index'))
......@@ -9,7 +9,7 @@ from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.remailer import remailer
from uffd.database import db
from uffd.models import User, UserEmail, Role
from uffd.models import User, UserEmail, Role, MFAMethod
from .selfservice import send_passwordreset
from .session import login_required
......@@ -146,6 +146,43 @@ def update(id):
flash(_('User updated'))
return redirect(url_for('user.show', id=user.id))
@bp.route('/<int:id>/deactivate')
@csrf_protect(blueprint=bp)
def deactivate(id):
user = User.query.get_or_404(id)
user.is_deactivated = True
db.session.commit()
flash(_('User deactivated'))
return redirect(url_for('user.show', id=user.id))
@bp.route('/<int:id>/activate')
@csrf_protect(blueprint=bp)
def activate(id):
user = User.query.get_or_404(id)
user.is_deactivated = False
db.session.commit()
flash(_('User activated'))
return redirect(url_for('user.show', id=user.id))
@bp.route('/<int:id>/mfa/disable')
@csrf_protect(blueprint=bp)
def disable_mfa(id):
user = User.query.get_or_404(id)
MFAMethod.query.filter_by(user=user).delete()
user.update_groups()
db.session.commit()
flash(_('Two-factor authentication was reset'))
return redirect(url_for('user.show', id=id))
@bp.route('/<int:id>/sessions/revoke')
@csrf_protect(blueprint=bp)
def revoke_sessions(id):
user = User.query.get_or_404(id)
user.sessions.clear()
db.session.commit()
flash(_('Sessions revoked'))
return redirect(url_for('user.show', id=user.id))
@bp.route("/<int:id>/del")
@csrf_protect(blueprint=bp)
def delete(id):
......