Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Show changes
import functools
import secrets
from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, jsonify
from flask_babel import gettext as _
from uffd.database import db
from uffd.ldap import ldap
from uffd.session import set_session
from uffd.user.models import User
from uffd.sendmail import sendmail
from uffd.signup.models import Signup
from uffd.ratelimit import Ratelimit, host_ratelimit, format_delay
from uffd.database import db
from uffd.models import User, Signup, Ratelimit, host_ratelimit, format_delay
from .session import set_session
bp = Blueprint('signup', __name__, template_folder='templates', url_prefix='/signup/')
......@@ -20,7 +18,7 @@ def signup_enabled(func):
@functools.wraps(func)
def decorator(*args, **kwargs):
if not current_app.config['SELF_SIGNUP']:
flash(_('Singup not enabled'))
flash(_('Signup not enabled'))
return redirect(url_for('index'))
return func(*args, **kwargs)
return decorator
......@@ -46,59 +44,71 @@ def signup_check():
@signup_enabled
def signup_submit():
if request.form['password1'] != request.form['password2']:
return render_template('signup/start.html', error=_('Passwords do not match'))
flash(_('Passwords do not match'), 'error')
return render_template('signup/start.html')
signup_delay = signup_ratelimit.get_delay(request.form['mail'])
host_delay = host_ratelimit.get_delay()
if signup_delay and signup_delay > host_delay:
return render_template('signup/start.html', error=_('Too many signup requests with this mail address! Please wait %(delay)s.',
delay=format_delay(signup_delay)))
flash(_('Too many signup requests with this mail address! Please wait %(delay)s.',
delay=format_delay(signup_delay)), 'error')
return render_template('signup/start.html')
if host_delay:
return render_template('signup/start.html', error=_('Too many requests! Please wait %(delay)s.', delay=format_delay(host_delay)))
flash(_('Too many requests! Please wait %(delay)s.', delay=format_delay(host_delay)), 'error')
return render_template('signup/start.html')
host_ratelimit.log()
signup = Signup(loginname=request.form['loginname'],
displayname=request.form['displayname'],
mail=request.form['mail'], password=request.form['password1'])
mail=request.form['mail'])
# If the password is invalid, signup.set_password returns False and does not
# set signup.password. We don't need to check the return value here, because
# we call signup.verify next and that checks if signup.password is set.
signup.set_password(request.form['password1'])
valid, msg = signup.validate()
if not valid:
return render_template('signup/start.html', error=msg)
flash(msg, 'error')
return render_template('signup/start.html')
db.session.add(signup)
db.session.commit()
sent = sendmail(signup.mail, 'Confirm your mail address', 'signup/mail.txt', signup=signup)
if not sent:
return render_template('signup/start.html', error=_('Cound not send mail'))
flash(_('Could not send mail'), 'error')
return render_template('signup/start.html')
signup_ratelimit.log(request.form['mail'])
return render_template('signup/submitted.html', signup=signup)
# signup_confirm* views are always accessible so other modules (e.g. invite) can reuse them
@bp.route('/confirm/<token>')
def signup_confirm(token):
signup = Signup.query.get(token)
if not signup or signup.expired or signup.completed:
@bp.route('/confirm/<int:signup_id>/<token>')
def signup_confirm(signup_id, token):
signup = Signup.query.get(signup_id)
if not signup or not secrets.compare_digest(signup.token, token) or signup.expired or signup.completed:
flash(_('Invalid signup link'))
return redirect(url_for('index'))
return render_template('signup/confirm.html', signup=signup)
@bp.route('/confirm/<token>', methods=['POST'])
def signup_confirm_submit(token):
signup = Signup.query.get(token)
if not signup or signup.expired or signup.completed:
@bp.route('/confirm/<int:signup_id>/<token>', methods=['POST'])
def signup_confirm_submit(signup_id, token):
signup = Signup.query.get(signup_id)
if not signup or not secrets.compare_digest(signup.token, token) or signup.expired or signup.completed:
flash(_('Invalid signup link'))
return redirect(url_for('index'))
confirm_delay = confirm_ratelimit.get_delay(token)
host_delay = host_ratelimit.get_delay()
if confirm_delay and confirm_delay > host_delay:
return render_template('signup/confirm.html', signup=signup, error=_('Too many failed attempts! Please wait %(delay)s.', delay=format_delay(confirm_delay)))
flash(_('Too many failed attempts! Please wait %(delay)s.', delay=format_delay(confirm_delay)), 'error')
return render_template('signup/confirm.html', signup=signup)
if host_delay:
return render_template('signup/confirm.html', signup=signup, error=_('Too many requests! Please wait %(delay)s.', delay=format_delay(host_delay)))
if not signup.check_password(request.form['password']):
return render_template('signup/confirm.html', signup=signup)
if not signup.password.verify(request.form['password']):
host_ratelimit.log()
confirm_ratelimit.log(token)
return render_template('signup/confirm.html', signup=signup, error=_('Wrong password'))
flash(_('Wrong password'), 'error')
return render_template('signup/confirm.html', signup=signup)
user, msg = signup.finish(request.form['password'])
if user is None:
return render_template('signup/confirm.html', signup=signup, error=msg)
db.session.rollback()
flash(msg, 'error')
return render_template('signup/confirm.html', signup=signup)
db.session.commit()
ldap.session.commit()
set_session(user, password=request.form['password'], skip_mfa=True)
set_session(user, skip_mfa=True)
flash(_('Your account was successfully created'))
return redirect(url_for('selfservice.index'))
return redirect(url_for('index'))
......@@ -3,92 +3,192 @@ import io
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app
from flask_babel import gettext as _, lazy_gettext
from sqlalchemy.exc import IntegrityError
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.selfservice import send_passwordreset
from uffd.session import login_required
from uffd.role.models import Role
from uffd.remailer import remailer
from uffd.database import db
from uffd.ldap import ldap, LDAPCommitError
from .models import User
from uffd.models import User, UserEmail, Role, MFAMethod
from .selfservice import send_passwordreset
from .session import login_required
bp = Blueprint("user", __name__, template_folder='templates', url_prefix='/user/')
@bp.before_request
@login_required()
def user_acl(): #pylint: disable=inconsistent-return-statements
if not user_acl_check():
flash(_('Access denied'))
return redirect(url_for('index'))
bp.add_app_template_global(User, 'User')
bp.add_app_template_global(remailer, 'remailer')
def user_acl_check():
return request.user and request.user.is_in_group(current_app.config['ACL_ADMIN_GROUP'])
@bp.before_request
@login_required(user_acl_check)
def user_acl():
pass
@bp.route("/")
@register_navbar(lazy_gettext('Users'), icon='users', blueprint=bp, visible=user_acl_check)
def index():
return render_template('user/list.html', users=User.query.all())
@bp.route("/<int:uid>")
@bp.route("/<int:id>")
@bp.route("/new")
def show(uid=None):
user = User() if uid is None else User.query.filter_by(uid=uid).first_or_404()
def show(id=None):
user = User() if id is None else User.query.get_or_404(id)
return render_template('user/show.html', user=user, roles=Role.query.all())
@bp.route("/<int:uid>/update", methods=['POST'])
@bp.route("/new", methods=['POST'])
@csrf_protect(blueprint=bp)
def update(uid=None):
if uid is None:
user = User()
ignore_blacklist = request.form.get('ignore-loginname-blacklist', False)
if request.form.get('serviceaccount'):
user.is_service_user = True
if not user.set_loginname(request.form['loginname'], ignore_blacklist=ignore_blacklist):
flash(_('Login name does not meet requirements'))
return redirect(url_for('user.show'))
def create():
user = User()
if request.form.get('serviceaccount'):
user.is_service_user = True
ignore_blocklist = request.form.get('ignore-loginname-blocklist', False)
if not user.set_loginname(request.form['loginname'], ignore_blocklist=ignore_blocklist):
flash(_('Login name does not meet requirements'))
return redirect(url_for('user.show'))
if not user.set_primary_email_address(request.form['email']):
flash(_('E-Mail address is invalid'))
return redirect(url_for('user.show'))
new_displayname = request.form['displayname'] if request.form['displayname'] else request.form['loginname']
if user.displayname != new_displayname and not user.set_displayname(new_displayname):
flash(_('Display name does not meet requirements'))
return redirect(url_for('user.show'))
db.session.add(user)
try:
db.session.flush()
except IntegrityError:
flash(_('Login name or e-mail address is already in use'))
return redirect(url_for('user.show'))
for role in Role.query.all():
if not user.is_service_user and role.is_default:
continue
if request.values.get('role-{}'.format(role.id), False):
user.roles.append(role)
user.update_groups()
db.session.commit()
if user.is_service_user:
flash(_('Service user created'))
else:
send_passwordreset(user, new=True)
flash(_('User created. We sent the user a password reset link by e-mail'))
return redirect(url_for('user.show', id=user.id))
@bp.route("/<int:id>/update", methods=['POST'])
@csrf_protect(blueprint=bp)
def update(id):
# pylint: disable=too-many-branches,too-many-statements
user = User.query.get_or_404(id)
for email in user.all_emails:
if f'email-{email.id}-present' in request.form:
email.verified = email.verified or (request.form.get(f'email-{email.id}-verified') == '1')
for key, value in request.form.items():
parts = key.split('-')
if not parts[0] == 'newemail' or not parts[2] == 'address' or not value:
continue
tmp_id = parts[1]
email = UserEmail(
user=user,
verified=(request.form.get(f'newemail-{tmp_id}-verified') == '1'),
)
if not email.set_address(value):
flash(_('E-Mail address is invalid'))
return redirect(url_for('user.show', id=id))
db.session.add(email)
try:
db.session.flush()
except IntegrityError:
flash(_('E-Mail address already exists or is used by another account'))
return redirect(url_for('user.show', id=id))
verified_emails = UserEmail.query.filter_by(user=user, verified=True)
user.primary_email = verified_emails.filter_by(id=request.form['primary_email']).one()
if request.form['recovery_email'] == 'primary':
user.recovery_email = None
else:
user = User.query.filter_by(uid=uid).first_or_404()
if user.mail != request.form['mail'] and not user.set_mail(request.form['mail']):
flash(_('Mail is invalid'))
return redirect(url_for('user.show', uid=uid))
user.recovery_email = verified_emails.filter_by(id=request.form['recovery_email']).one()
for service_user in user.service_users:
if not service_user.has_email_preferences:
continue
value = request.form.get(f'service_{service_user.service.id}_email', 'primary')
if value == 'primary':
service_user.service_email = None
else:
service_user.service_email = verified_emails.filter_by(id=value).one()
for email in user.all_emails:
if request.form.get(f'email-{email.id}-delete') == '1':
db.session.delete(email)
new_displayname = request.form['displayname'] if request.form['displayname'] else request.form['loginname']
if user.displayname != new_displayname and not user.set_displayname(new_displayname):
flash(_('Display name does not meet requirements'))
return redirect(url_for('user.show', uid=uid))
return redirect(url_for('user.show', id=id))
new_password = request.form.get('password')
if uid is not None and new_password:
if new_password:
if not user.set_password(new_password):
flash(_('Password is invalid'))
return redirect(url_for('user.show', uid=uid))
ldap.session.add(user)
return redirect(url_for('user.show', id=id))
user.roles.clear()
for role in Role.query.all():
if not user.is_service_user and role.is_default:
continue
if request.values.get('role-{}'.format(role.id), False):
user.roles.add(role)
user.roles.append(role)
user.update_groups()
ldap.session.commit()
db.session.commit()
if uid is None:
if user.is_service_user:
flash(_('Service user created'))
else:
send_passwordreset(user, new=True)
flash(_('User created. We sent the user a password reset link by mail'))
else:
flash(_('User updated'))
return redirect(url_for('user.show', uid=user.uid))
flash(_('User updated'))
return redirect(url_for('user.show', id=user.id))
@bp.route('/<int:id>/deactivate')
@csrf_protect(blueprint=bp)
def deactivate(id):
user = User.query.get_or_404(id)
user.is_deactivated = True
db.session.commit()
flash(_('User deactivated'))
return redirect(url_for('user.show', id=user.id))
@bp.route('/<int:id>/activate')
@csrf_protect(blueprint=bp)
def activate(id):
user = User.query.get_or_404(id)
user.is_deactivated = False
db.session.commit()
flash(_('User activated'))
return redirect(url_for('user.show', id=user.id))
@bp.route('/<int:id>/mfa/disable')
@csrf_protect(blueprint=bp)
def disable_mfa(id):
user = User.query.get_or_404(id)
MFAMethod.query.filter_by(user=user).delete()
user.update_groups()
db.session.commit()
flash(_('Two-factor authentication was reset'))
return redirect(url_for('user.show', id=id))
@bp.route('/<int:id>/sessions/revoke')
@csrf_protect(blueprint=bp)
def revoke_sessions(id):
user = User.query.get_or_404(id)
user.sessions.clear()
db.session.commit()
flash(_('Sessions revoked'))
return redirect(url_for('user.show', id=user.id))
@bp.route("/<int:uid>/del")
@bp.route("/<int:id>/del")
@csrf_protect(blueprint=bp)
def delete(uid):
user = User.query.filter_by(uid=uid).first_or_404()
def delete(id):
user = User.query.get_or_404(id)
user.roles.clear()
ldap.session.delete(user)
ldap.session.commit()
db.session.delete(user)
db.session.commit()
flash(_('Deleted user'))
return redirect(url_for('user.index'))
......@@ -101,7 +201,7 @@ def csvimport():
flash('No data for csv import!')
return redirect(url_for('user.index'))
ignore_blacklist = request.values.get('ignore-loginname-blacklist', False)
ignore_blocklist = request.values.get('ignore-loginname-blocklist', False)
roles = Role.query.filter_by(is_default=False).all()
usersadded = 0
......@@ -112,23 +212,21 @@ def csvimport():
flash("invalid line, ignored : {}".format(row))
continue
newuser = User()
if not newuser.set_loginname(row[0], ignore_blacklist=ignore_blacklist) or not newuser.set_displayname(row[0]):
if not newuser.set_loginname(row[0], ignore_blocklist=ignore_blocklist) or not newuser.set_displayname(row[0]):
flash("invalid login name, skipped : {}".format(row))
continue
if not newuser.set_mail(row[1]):
flash("invalid mail address, skipped : {}".format(row))
if not newuser.set_primary_email_address(row[1]):
flash("invalid e-mail address, skipped : {}".format(row))
continue
ldap.session.add(newuser)
db.session.add(newuser)
for role in roles:
if str(role.id) in row[2].split(';'):
role.members.add(newuser)
role.members.append(newuser)
newuser.update_groups()
try:
ldap.session.commit()
db.session.commit()
except LDAPCommitError:
except IntegrityError:
flash('Error adding user {}'.format(row[0]))
ldap.session.rollback()
db.session.rollback()
continue
send_passwordreset(newuser, new=True)
......
......@@ -2,6 +2,11 @@
set -e
pybabel extract -F uffd/babel.cfg -k lazy_gettext -o messages.pot uffd
# If you want to initialize a new message, use:
# pybabel init -i messages.pot -d uffd/translations -l fr
# Complete Documentation of Flask-Babel: https://flask-babel.tkte.ch
pybabel update -i messages.pot -d uffd/translations
pybabel compile -d uffd/translations
......
[uwsgi]
plugin = python3
env = PYTHONIOENCODING=UTF-8
env = LANG=en_GB.utf8
env = TZ=Europe/Berlin
env = CONFIG_FILENAME=/etc/uffd/uffd.cfg
manage-script-name = true
chdir = /usr/share/uffd
module = uffd:create_app()
uid = uffd
gid = uffd
vacuum = true
master = true
process = 2
threads = 4
buffer-size = 8192 ; this buffer is used for http headers and defaults to 4k
single-interpreter = true
need-app = true
env = PYTHONIOENCODING=UTF-8
env = LANG=en_GB.utf8
env = TZ=Europe/Berlin
env = CONFIG_PATH=/etc/uffd/uffd.cfg
chdir = /usr/share/uffd
module = uffd:create_app()
hook-pre-app = exec:FLASK_APP=uffd flask db upgrade