Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Select Git revision
Show changes
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app
from flask_babel import gettext as _, lazy_gettext
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.database import db
from uffd.models import Role, RoleGroup, Group
from .session import login_required
bp = Blueprint("role", __name__, template_folder='templates', url_prefix='/role/')
def role_acl_check():
return request.user and request.user.is_in_group(current_app.config['ACL_ADMIN_GROUP'])
@bp.before_request
@login_required(role_acl_check)
def role_acl():
pass
@bp.route("/")
@register_navbar(lazy_gettext('Roles'), icon='key', blueprint=bp, visible=role_acl_check)
def index():
return render_template('role/list.html', roles=Role.query.all())
@bp.route("/new")
def new():
return render_template('role/show.html', role=Role(), groups=Group.query.all(), roles=Role.query.all())
@bp.route("/<int:roleid>")
def show(roleid=None):
role = Role.query.get(roleid)
return render_template('role/show.html', role=role, groups=Group.query.all(), roles=Role.query.all())
@bp.route("/<int:roleid>/update", methods=['POST'])
@bp.route("/new", methods=['POST'])
@csrf_protect(blueprint=bp)
def update(roleid=None):
if roleid is None:
role = Role()
db.session.add(role)
else:
role = Role.query.get(roleid)
role.description = request.values['description']
if not role.locked:
role.name = request.values['name']
if not request.values['moderator-group']:
role.moderator_group = None
else:
role.moderator_group = Group.query.get(request.values['moderator-group'])
for included_role in Role.query.all():
if included_role != role and request.values.get('include-role-{}'.format(included_role.id)):
role.included_roles.append(included_role)
elif included_role in role.included_roles:
role.included_roles.remove(included_role)
role.groups.clear()
for group in Group.query.all():
if request.values.get(f'group-{group.id}', False):
role.groups[group] = RoleGroup(requires_mfa=bool(request.values.get(f'group-mfa-{group.id}', '')))
role.update_member_groups()
db.session.commit()
return redirect(url_for('role.show', roleid=role.id))
@bp.route("/<int:roleid>/del")
@csrf_protect(blueprint=bp)
def delete(roleid):
role = Role.query.get(roleid)
if role.locked:
flash(_('Locked roles cannot be deleted'))
return redirect(url_for('role.show', roleid=role.id))
old_members = set(role.members_effective)
role.members.clear()
db.session.delete(role)
for user in old_members:
user.update_groups()
db.session.commit()
return redirect(url_for('role.index'))
@bp.route("/<int:roleid>/unlock")
@csrf_protect(blueprint=bp)
def unlock(roleid):
role = Role.query.get(roleid)
role.locked = False
db.session.commit()
return redirect(url_for('role.show', roleid=role.id))
@bp.route("/<int:roleid>/setdefault")
@csrf_protect(blueprint=bp)
def set_default(roleid):
role = Role.query.get(roleid)
if role.is_default:
return redirect(url_for('role.show', roleid=role.id))
role.is_default = True
for user in set(role.members):
if not user.is_service_user:
role.members.remove(user)
role.update_member_groups()
db.session.commit()
return redirect(url_for('role.show', roleid=role.id))
@bp.route("/<int:roleid>/unsetdefault")
@csrf_protect(blueprint=bp)
def unset_default(roleid):
role = Role.query.get(roleid)
if not role.is_default:
return redirect(url_for('role.show', roleid=role.id))
old_members = set(role.members_effective)
role.is_default = False
for user in old_members:
if not user.is_service_user:
user.update_groups()
db.session.commit()
return redirect(url_for('role.show', roleid=role.id))
from flask import Blueprint, render_template, request, url_for, redirect, flash, abort
from flask_babel import gettext as _, lazy_gettext
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.database import db
from uffd.models import Role, User, Group
from .session import login_required
bp = Blueprint('rolemod', __name__, template_folder='templates', url_prefix='/rolemod/')
def user_is_rolemod():
return request.user and Role.query.join(Role.moderator_group).join(Group.members).filter(User.id==request.user.id).count()
@bp.before_request
@login_required()
def acl_check():
if not user_is_rolemod():
abort(403)
@bp.route("/")
@register_navbar(lazy_gettext('Moderation'), icon='user-lock', blueprint=bp, visible=user_is_rolemod)
def index():
roles = Role.query.join(Role.moderator_group).join(Group.members).filter(User.id==request.user.id).all()
return render_template('rolemod/list.html', roles=roles)
@bp.route("/<int:role_id>")
def show(role_id):
role = Role.query.get_or_404(role_id)
if role.moderator_group not in request.user.groups:
abort(403)
return render_template('rolemod/show.html', role=role)
@bp.route("/<int:role_id>", methods=['POST'])
@csrf_protect(blueprint=bp)
def update(role_id):
role = Role.query.get_or_404(role_id)
if role.moderator_group not in request.user.groups:
abort(403)
if request.form['description'] != role.description:
if len(request.form['description']) > 256:
flash(_('Description too long'))
return redirect(url_for('.show', role_id=role.id))
role.description = request.form['description']
db.session.commit()
return redirect(url_for('.show', role_id=role.id))
@bp.route("/<int:role_id>/delete_member/<int:member_id>")
@csrf_protect(blueprint=bp)
def delete_member(role_id, member_id):
role = Role.query.get_or_404(role_id)
if role.moderator_group not in request.user.groups:
abort(403)
member = User.query.get_or_404(member_id)
if member in role.members:
role.members.remove(member)
member.update_groups()
db.session.commit()
flash(_('Member removed'))
return redirect(url_for('.show', role_id=role.id))
import secrets
from flask import Blueprint, render_template, session, request, url_for, redirect, flash, current_app, abort
from flask_babel import gettext as _, lazy_gettext
from sqlalchemy.exc import IntegrityError
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.sendmail import sendmail
from uffd.database import db
from uffd.models import (
User, UserEmail, PasswordToken, Role, host_ratelimit, Ratelimit, format_delay,
Session, MFAMethod, TOTPMethod, WebauthnMethod, RecoveryCodeMethod,
)
from uffd.fido2_compat import * # pylint: disable=wildcard-import,unused-wildcard-import
from .session import login_required
bp = Blueprint("selfservice", __name__, template_folder='templates', url_prefix='/self/')
reset_ratelimit = Ratelimit('passwordreset', 1*60*60, 3)
def selfservice_acl_check():
return request.user and request.user.is_in_group(current_app.config['ACL_SELFSERVICE_GROUP'])
@bp.route("/")
@register_navbar(lazy_gettext('Selfservice'), icon='portrait', blueprint=bp, visible=selfservice_acl_check)
@login_required(selfservice_acl_check)
def index():
return render_template('selfservice/self.html', user=request.user)
@bp.route("/updateprofile", methods=(['POST']))
@csrf_protect(blueprint=bp)
@login_required(selfservice_acl_check)
def update_profile():
if request.values['displayname'] != request.user.displayname:
if request.user.set_displayname(request.values['displayname']):
flash(_('Display name changed.'))
else:
flash(_('Display name is not valid.'))
db.session.commit()
return redirect(url_for('selfservice.index'))
@bp.route("/changepassword", methods=(['POST']))
@csrf_protect(blueprint=bp)
@login_required(selfservice_acl_check)
def change_password():
if not request.values['password1'] == request.values['password2']:
flash(_('Passwords do not match'))
else:
if request.user.set_password(request.values['password1']):
flash(_('Password changed'))
else:
flash(_('Invalid password'))
db.session.commit()
return redirect(url_for('selfservice.index'))
@bp.route("/passwordreset", methods=(['GET', 'POST']))
def forgot_password():
if request.method == 'GET':
return render_template('selfservice/forgot_password.html')
loginname = request.values['loginname'].lower()
mail = request.values['mail']
reset_delay = reset_ratelimit.get_delay(loginname+'/'+mail)
host_delay = host_ratelimit.get_delay()
if reset_delay or host_delay:
if reset_delay > host_delay:
flash(_('We received too many password reset requests for this user! Please wait at least %(delay)s.', delay=format_delay(reset_delay)))
else:
flash(_('We received too many requests from your ip address/network! Please wait at least %(delay)s.', delay=format_delay(host_delay)))
return redirect(url_for('.forgot_password'))
reset_ratelimit.log(loginname+'/'+mail)
host_ratelimit.log()
flash(_("We sent a mail to this user's mail address if you entered the correct mail and login name combination"))
user = User.query.filter_by(loginname=loginname, is_deactivated=False).one_or_none()
if not user:
return redirect(url_for('session.login'))
matches = any(map(lambda email: secrets.compare_digest(email.address, mail), user.verified_emails))
if not matches:
return redirect(url_for('session.login'))
recovery_email = user.recovery_email or user.primary_email
if recovery_email.address == mail and user.is_in_group(current_app.config['ACL_SELFSERVICE_GROUP']):
send_passwordreset(user)
return redirect(url_for('session.login'))
@bp.route("/token/password/<int:token_id>/<token>", methods=(['POST', 'GET']))
def token_password(token_id, token):
dbtoken = PasswordToken.query.get(token_id)
if not dbtoken or not secrets.compare_digest(dbtoken.token, token) or \
dbtoken.expired:
flash(_('Link invalid or expired'))
return redirect(url_for('session.login'))
if request.method == 'GET':
return render_template('selfservice/set_password.html', token=dbtoken)
if not request.values['password1']:
flash(_('You need to set a password, please try again.'))
return render_template('selfservice/set_password.html', token=dbtoken)
if not request.values['password1'] == request.values['password2']:
flash(_('Passwords do not match, please try again.'))
return render_template('selfservice/set_password.html', token=dbtoken)
if not dbtoken.user.is_in_group(current_app.config['ACL_SELFSERVICE_GROUP']):
abort(403)
if not dbtoken.user.set_password(request.values['password1']):
flash(_('Password ist not valid, please try again.'))
return render_template('selfservice/set_password.html', token=dbtoken)
db.session.delete(dbtoken)
db.session.commit()
flash(_('New password set'))
return redirect(url_for('session.login'))
@bp.route("/email/new", methods=['POST'])
@login_required(selfservice_acl_check)
def add_email():
email = UserEmail(user=request.user)
if not email.set_address(request.form['address']):
flash(_('E-Mail address is invalid'))
return redirect(url_for('selfservice.index'))
try:
db.session.flush()
except IntegrityError:
flash(_('E-Mail address already exists'))
return redirect(url_for('selfservice.index'))
secret = email.start_verification()
db.session.add(email)
db.session.commit()
if not sendmail(email.address, 'Mail verification', 'selfservice/mailverification.mail.txt', user=request.user, email=email, secret=secret):
flash(_('E-Mail to "%(mail_address)s" could not be sent!', mail_address=email.address))
else:
flash(_('We sent you an email, please verify your mail address.'))
return redirect(url_for('selfservice.index'))
@bp.route("/email/<int:email_id>/verify/<secret>")
@bp.route("/token/mail_verification/<int:legacy_id>/<secret>")
@login_required(selfservice_acl_check)
def verify_email(secret, email_id=None, legacy_id=None):
if email_id is not None:
email = UserEmail.query.get(email_id)
else:
email = UserEmail.query.filter_by(verification_legacy_id=legacy_id).one()
if not email or email.verification_expired:
flash(_('Link invalid or expired'))
return redirect(url_for('selfservice.index'))
if email.user != request.user:
abort(403, description=_('This link was generated for another user. Login as the correct user to continue.'))
if not email.finish_verification(secret):
flash(_('Link invalid or expired'))
return redirect(url_for('selfservice.index'))
if legacy_id is not None:
request.user.primary_email = email
try:
db.session.commit()
except IntegrityError:
flash(_('E-Mail address is already used by another account'))
return redirect(url_for('selfservice.index'))
flash(_('E-Mail address verified'))
return redirect(url_for('selfservice.index'))
@bp.route("/email/<int:email_id>/retry")
@login_required(selfservice_acl_check)
def retry_email_verification(email_id):
email = UserEmail.query.filter_by(id=email_id, user=request.user, verified=False).first_or_404()
secret = email.start_verification()
db.session.commit()
if not sendmail(email.address, 'E-Mail verification', 'selfservice/mailverification.mail.txt', user=request.user, email=email, secret=secret):
flash(_('E-Mail to "%(mail_address)s" could not be sent!', mail_address=email.address))
else:
flash(_('We sent you an email, please verify your mail address.'))
return redirect(url_for('selfservice.index'))
@bp.route("/email/<int:email_id>/delete", methods=['POST', 'GET'])
@login_required(selfservice_acl_check)
def delete_email(email_id):
email = UserEmail.query.filter_by(id=email_id, user=request.user).first_or_404()
try:
db.session.delete(email)
db.session.commit()
except IntegrityError:
flash(_('Cannot delete primary e-mail address'))
return redirect(url_for('selfservice.index'))
flash(_('E-Mail address deleted'))
return redirect(url_for('selfservice.index'))
@bp.route("/email/preferences", methods=['POST'])
@login_required(selfservice_acl_check)
def update_email_preferences():
verified_emails = UserEmail.query.filter_by(user=request.user, verified=True)
request.user.primary_email = verified_emails.filter_by(id=request.form['primary_email']).one()
if request.form['recovery_email'] == 'primary':
request.user.recovery_email = None
else:
request.user.recovery_email = verified_emails.filter_by(id=request.form['recovery_email']).one()
for service_user in request.user.service_users:
if not service_user.has_email_preferences:
continue
value = request.form.get(f'service_{service_user.service.id}_email', 'primary')
if value == 'primary':
service_user.service_email = None
else:
service_user.service_email = verified_emails.filter_by(id=value).one()
db.session.commit()
flash(_('E-Mail preferences updated'))
return redirect(url_for('selfservice.index'))
@bp.route("/session/<int:session_id>/revoke", methods=(['POST']))
@csrf_protect(blueprint=bp)
@login_required(selfservice_acl_check)
def revoke_session(session_id):
_session = Session.query.filter_by(id=session_id, user=request.user).first_or_404()
db.session.delete(_session)
db.session.commit()
flash(_('Session revoked'))
return redirect(url_for('selfservice.index'))
@bp.route("/leaverole/<int:roleid>", methods=(['POST']))
@csrf_protect(blueprint=bp)
@login_required(selfservice_acl_check)
def leave_role(roleid):
role = Role.query.get_or_404(roleid)
role.members.remove(request.user)
request.user.update_groups()
db.session.commit()
flash(_('You left role %(role_name)s', role_name=role.name))
return redirect(url_for('selfservice.index'))
def send_passwordreset(user, new=False):
PasswordToken.query.filter(PasswordToken.user == user).delete()
token = PasswordToken(user=user)
db.session.add(token)
db.session.commit()
if new:
template = 'selfservice/newuser.mail.txt'
subject = 'Welcome to the %s infrastructure'%current_app.config.get('ORGANISATION_NAME', '')
else:
template = 'selfservice/passwordreset.mail.txt'
subject = 'Password reset'
email = user.recovery_email or user.primary_email
if not sendmail(email.address, subject, template, user=user, token=token):
flash(_('E-Mail to "%(mail_address)s" could not be sent!', mail_address=email.address))
@bp.route('/mfa/', methods=['GET'])
@login_required(selfservice_acl_check)
def setup_mfa():
return render_template('selfservice/setup_mfa.html')
@bp.route('/mfa/setup/disable', methods=['GET'])
@login_required(selfservice_acl_check)
def disable_mfa():
return render_template('selfservice/disable_mfa.html')
@bp.route('/mfa/setup/disable', methods=['POST'])
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def disable_mfa_confirm():
MFAMethod.query.filter_by(user=request.user).delete()
db.session.commit()
request.user.update_groups()
db.session.commit()
return redirect(url_for('selfservice.setup_mfa'))
@bp.route('/mfa/setup/recovery', methods=['POST'])
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def setup_mfa_recovery():
for method in RecoveryCodeMethod.query.filter_by(user=request.user).all():
db.session.delete(method)
methods = []
for _ in range(10):
method = RecoveryCodeMethod(request.user)
methods.append(method)
db.session.add(method)
db.session.commit()
return render_template('selfservice/setup_mfa_recovery.html', methods=methods)
@bp.route('/mfa/setup/totp', methods=['GET'])
@login_required(selfservice_acl_check)
def setup_mfa_totp():
method = TOTPMethod(request.user)
session['mfa_totp_key'] = method.key
return render_template('selfservice/setup_mfa_totp.html', method=method, name=request.values['name'])
@bp.route('/mfa/setup/totp', methods=['POST'])
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def setup_mfa_totp_finish():
if not RecoveryCodeMethod.query.filter_by(user=request.user).all():
flash(_('Generate recovery codes first!'))
return redirect(url_for('selfservice.setup_mfa'))
method = TOTPMethod(request.user, name=request.values['name'], key=session.pop('mfa_totp_key'))
if method.verify(request.form['code']):
db.session.add(method)
request.user.update_groups()
db.session.commit()
return redirect(url_for('selfservice.setup_mfa'))
flash(_('Code is invalid'))
return redirect(url_for('selfservice.setup_mfa_totp', name=request.values['name']))
@bp.route('/mfa/setup/totp/<int:id>/delete')
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def delete_mfa_totp(id): #pylint: disable=redefined-builtin
method = TOTPMethod.query.filter_by(user=request.user, id=id).first_or_404()
db.session.delete(method)
request.user.update_groups()
db.session.commit()
return redirect(url_for('selfservice.setup_mfa'))
bp.add_app_template_global(WEBAUTHN_SUPPORTED, name='webauthn_supported')
if WEBAUTHN_SUPPORTED:
@bp.route('/mfa/setup/webauthn/begin', methods=['POST'])
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def setup_mfa_webauthn_begin():
if not RecoveryCodeMethod.query.filter_by(user=request.user).all():
abort(403)
methods = WebauthnMethod.query.filter_by(user=request.user).all()
creds = [method.cred for method in methods]
server = get_webauthn_server()
registration_data, state = server.register_begin(
{
"id": str(request.user.id).encode(),
"name": request.user.loginname,
"displayName": request.user.displayname,
},
creds,
user_verification='discouraged',
)
session["webauthn-state"] = state
return cbor.encode(registration_data)
@bp.route('/mfa/setup/webauthn/complete', methods=['POST'])
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def setup_mfa_webauthn_complete():
server = get_webauthn_server()
data = cbor.decode(request.get_data())
client_data = ClientData(data["clientDataJSON"])
att_obj = AttestationObject(data["attestationObject"])
auth_data = server.register_complete(session["webauthn-state"], client_data, att_obj)
method = WebauthnMethod(request.user, auth_data.credential_data, name=data['name'])
db.session.add(method)
request.user.update_groups()
db.session.commit()
return cbor.encode({"status": "OK"})
@bp.route('/mfa/setup/webauthn/<int:id>/delete')
@login_required(selfservice_acl_check)
@csrf_protect(blueprint=bp)
def delete_mfa_webauthn(id): #pylint: disable=redefined-builtin
method = WebauthnMethod.query.filter_by(user=request.user, id=id).first_or_404()
db.session.delete(method)
request.user.update_groups()
db.session.commit()
return redirect(url_for('selfservice.setup_mfa'))
import functools
from flask import Blueprint, render_template, request, url_for, redirect, current_app, abort
from flask_babel import lazy_gettext
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.database import db
from uffd.models import User, Service, ServiceUser, get_services, Group, OAuth2Client, OAuth2LogoutURI, APIClient, RemailerMode
from .session import login_required
bp = Blueprint('service', __name__, template_folder='templates')
bp.add_app_template_global(RemailerMode, 'RemailerMode')
def admin_acl():
return request.user and request.user.is_in_group(current_app.config['ACL_ADMIN_GROUP'])
def overview_login_maybe_required(func):
@functools.wraps(func)
def decorator(*args, **kwargs):
if not current_app.config['SERVICES']:
return login_required(admin_acl)(func)(*args, **kwargs)
if not current_app.config['SERVICES_PUBLIC']:
return login_required()(func)(*args, **kwargs)
return func(*args, **kwargs)
return decorator
def overview_navbar_visible():
return get_services(request.user) or admin_acl()
@bp.route('/services/')
@register_navbar(lazy_gettext('Services'), icon='sitemap', blueprint=bp, visible=overview_navbar_visible)
@overview_login_maybe_required
def overview():
services = get_services(request.user)
banner = ''
if request.user or current_app.config['SERVICES_BANNER_PUBLIC']:
banner = current_app.config['SERVICES_BANNER']
return render_template('service/overview.html', services=services, banner=banner)
@bp.route('/service/admin')
@login_required(admin_acl)
def index():
return render_template('service/index.html', services=Service.query.all())
@bp.route('/service/new')
@bp.route('/service/<int:id>')
@login_required(admin_acl)
def show(id=None):
service = Service() if id is None else Service.query.get_or_404(id)
remailer_overwrites = []
if id is not None:
# pylint: disable=singleton-comparison
remailer_overwrites = ServiceUser.query.filter(
ServiceUser.service_id == id,
ServiceUser.remailer_overwrite_mode != None
).all()
all_groups = Group.query.all()
return render_template('service/show.html', service=service, all_groups=all_groups, remailer_overwrites=remailer_overwrites)
@bp.route('/service/new', methods=['POST'])
@bp.route('/service/<int:id>', methods=['POST'])
@csrf_protect(blueprint=bp)
@login_required(admin_acl)
def edit_submit(id=None):
if id is None:
service = Service()
db.session.add(service)
else:
service = Service.query.get_or_404(id)
service.name = request.form['name']
if not request.form['access-group']:
service.limit_access = True
service.access_group = None
elif request.form['access-group'] == 'all':
service.limit_access = False
service.access_group = None
else:
service.limit_access = True
service.access_group = Group.query.get(request.form['access-group'])
service.hide_deactivated_users = request.form.get('hide_deactivated_users') == '1'
service.enable_email_preferences = request.form.get('enable_email_preferences') == '1'
service.remailer_mode = RemailerMode[request.form['remailer-mode']]
remailer_overwrite_mode = RemailerMode[request.form['remailer-overwrite-mode']]
remailer_overwrite_user_ids = [
User.query.filter_by(loginname=loginname.strip()).one().id
for loginname in request.form['remailer-overwrite-users'].split(',') if loginname.strip()
]
# pylint: disable=singleton-comparison
service_users = ServiceUser.query.filter(
ServiceUser.service == service,
db.or_(
ServiceUser.user_id.in_(remailer_overwrite_user_ids),
ServiceUser.remailer_overwrite_mode != None,
)
)
for service_user in service_users:
if service_user.user_id in remailer_overwrite_user_ids:
service_user.remailer_overwrite_mode = remailer_overwrite_mode
else:
service_user.remailer_overwrite_mode = None
db.session.commit()
return redirect(url_for('service.show', id=service.id))
@bp.route('/service/<int:id>/delete')
@csrf_protect(blueprint=bp)
@login_required(admin_acl)
def delete(id):
service = Service.query.get_or_404(id)
db.session.delete(service)
db.session.commit()
return redirect(url_for('service.index'))
@bp.route('/service/<int:service_id>/oauth2/new')
@bp.route('/service/<int:service_id>/oauth2/<int:db_id>')
@login_required(admin_acl)
def oauth2_show(service_id, db_id=None):
service = Service.query.get_or_404(service_id)
client = OAuth2Client() if db_id is None else OAuth2Client.query.filter_by(service_id=service_id, db_id=db_id).first_or_404()
return render_template('service/oauth2.html', service=service, client=client)
@bp.route('/service/<int:service_id>/oauth2/new', methods=['POST'])
@bp.route('/service/<int:service_id>/oauth2/<int:db_id>', methods=['POST'])
@csrf_protect(blueprint=bp)
@login_required(admin_acl)
def oauth2_submit(service_id, db_id=None):
service = Service.query.get_or_404(service_id)
if db_id is None:
client = OAuth2Client(service=service)
db.session.add(client)
else:
client = OAuth2Client.query.filter_by(service_id=service_id, db_id=db_id).first_or_404()
client.client_id = request.form['client_id']
if request.form['client_secret']:
client.client_secret = request.form['client_secret']
if not client.client_secret:
abort(400)
client.redirect_uris = [x.strip() for x in request.form['redirect_uris'].split('\n') if x.strip()]
client.logout_uris = []
for line in request.form['logout_uris'].split('\n'):
line = line.strip()
if not line:
continue
method, uri = line.split(' ', 2)
client.logout_uris.append(OAuth2LogoutURI(method=method, uri=uri))
db.session.commit()
return redirect(url_for('service.show', id=service.id))
@bp.route('/service/<int:service_id>/oauth2/<int:db_id>/delete')
@csrf_protect(blueprint=bp)
@login_required(admin_acl)
def oauth2_delete(service_id, db_id=None):
service = Service.query.get_or_404(service_id)
client = OAuth2Client.query.filter_by(service_id=service_id, db_id=db_id).first_or_404()
db.session.delete(client)
db.session.commit()
return redirect(url_for('service.show', id=service.id))
@bp.route('/service/<int:service_id>/api/new')
@bp.route('/service/<int:service_id>/api/<int:id>')
@login_required(admin_acl)
def api_show(service_id, id=None):
service = Service.query.get_or_404(service_id)
client = APIClient() if id is None else APIClient.query.filter_by(service_id=service_id, id=id).first_or_404()
return render_template('service/api.html', service=service, client=client)
@bp.route('/service/<int:service_id>/api/new', methods=['POST'])
@bp.route('/service/<int:service_id>/api/<int:id>', methods=['POST'])
@csrf_protect(blueprint=bp)
@login_required(admin_acl)
def api_submit(service_id, id=None):
service = Service.query.get_or_404(service_id)
if id is None:
client = APIClient(service=service)
db.session.add(client)
else:
client = APIClient.query.filter_by(service_id=service_id, id=id).first_or_404()
client.auth_username = request.form['auth_username']
if request.form['auth_password']:
client.auth_password = request.form['auth_password']
if not client.auth_password:
abort(400)
client.perm_users = request.form.get('perm_users') == '1'
client.perm_checkpassword = request.form.get('perm_checkpassword') == '1'
client.perm_mail_aliases = request.form.get('perm_mail_aliases') == '1'
client.perm_remailer = request.form.get('perm_remailer') == '1'
client.perm_metrics = request.form.get('perm_metrics') == '1'
db.session.commit()
return redirect(url_for('service.show', id=service.id))
@bp.route('/service/<int:service_id>/api/<int:id>/delete')
@csrf_protect(blueprint=bp)
@login_required(admin_acl)
def api_delete(service_id, id=None):
service = Service.query.get_or_404(service_id)
client = APIClient.query.filter_by(service_id=service_id, id=id).first_or_404()
db.session.delete(client)
db.session.commit()
return redirect(url_for('service.show', id=service.id))
import datetime
import secrets
import functools
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session, abort
from flask_babel import gettext as _
from uffd.database import db
from uffd.csrf import csrf_protect
from uffd.secure_redirect import secure_local_redirect
from uffd.models import User, DeviceLoginInitiation, DeviceLoginConfirmation, Ratelimit, host_ratelimit, format_delay, Session
from uffd.fido2_compat import * # pylint: disable=wildcard-import,unused-wildcard-import
bp = Blueprint("session", __name__, template_folder='templates', url_prefix='/')
login_ratelimit = Ratelimit('login', 1*60, 3)
mfa_ratelimit = Ratelimit('mfa', 1*60, 3)
@bp.before_app_request
def set_request_user():
request.user = None
request.user_pre_mfa = None
request.session = None
request.session_pre_mfa = None
if 'id' not in session:
return
if 'secret' not in session:
return
_session = Session.query.get(session['id'])
if _session is None or not _session.secret.verify(session['secret']) or _session.expired:
return
if _session.last_used <= datetime.datetime.utcnow() - datetime.timedelta(seconds=60):
_session.last_used = datetime.datetime.utcnow()
_session.ip_address = request.remote_addr
_session.user_agent = request.user_agent.string
db.session.commit()
if _session.user.is_deactivated or not _session.user.is_in_group(current_app.config['ACL_ACCESS_GROUP']):
return
request.session_pre_mfa = _session
request.user_pre_mfa = _session.user
if _session.mfa_done:
request.session = _session
request.user = _session.user
@bp.route("/logout")
def logout():
# The oauth2 module takes data from `session` and injects it into the url,
# so we need to build the url BEFORE we clear the session!
resp = redirect(url_for('oauth2.logout', ref=request.values.get('ref', url_for('.login'))))
if request.session_pre_mfa:
db.session.delete(request.session_pre_mfa)
db.session.commit()
session.clear()
return resp
def set_session(user, skip_mfa=False):
session.clear()
session.permanent = True
secret = secrets.token_hex(128)
_session = Session(
user=user,
secret=secret,
ip_address=request.remote_addr,
user_agent=request.user_agent.string,
)
if skip_mfa:
_session.mfa_done = True
db.session.add(_session)
db.session.commit()
session['id'] = _session.id
session['secret'] = secret
session['_csrf_token'] = secrets.token_hex(128)
@bp.route("/login", methods=('GET', 'POST'))
def login():
# pylint: disable=too-many-return-statements
if request.user_pre_mfa:
return redirect(url_for('session.mfa_auth', ref=request.values.get('ref', url_for('index'))))
if request.method == 'GET':
return render_template('session/login.html', ref=request.values.get('ref'))
username = request.form['loginname'].lower()
password = request.form['password']
login_delay = login_ratelimit.get_delay(username)
host_delay = host_ratelimit.get_delay()
if login_delay or host_delay:
if login_delay > host_delay:
flash(_('We received too many invalid login attempts for this user! Please wait at least %(delay)s.', delay=format_delay(login_delay)))
else:
flash(_('We received too many requests from your ip address/network! Please wait at least %(delay)s.', delay=format_delay(host_delay)))
return render_template('session/login.html', ref=request.values.get('ref'))
user = User.query.filter_by(loginname=username).one_or_none()
if user is None or not user.password.verify(password):
login_ratelimit.log(username)
host_ratelimit.log()
flash(_('Login name or password is wrong'))
return render_template('session/login.html', ref=request.values.get('ref'))
if user.is_deactivated:
flash(_('Your account is deactivated. Contact %(contact_email)s for details.', contact_email=current_app.config['ORGANISATION_CONTACT']))
return render_template('session/login.html', ref=request.values.get('ref'))
if user.password.needs_rehash:
user.password = password
db.session.commit()
if not user.is_in_group(current_app.config['ACL_ACCESS_GROUP']):
flash(_('You do not have access to this service'))
return render_template('session/login.html', ref=request.values.get('ref'))
set_session(user)
return redirect(url_for('session.mfa_auth', ref=request.values.get('ref', url_for('index'))))
def login_required_pre_mfa(no_redirect=False):
def wrapper(func):
@functools.wraps(func)
def decorator(*args, **kwargs):
if not request.user_pre_mfa:
if no_redirect:
abort(403)
flash(_('You need to login first'))
return redirect(url_for('session.login', ref=request.full_path))
return func(*args, **kwargs)
return decorator
return wrapper
def login_required(permission_check=lambda: True):
def wrapper(func):
@functools.wraps(func)
def decorator(*args, **kwargs):
if not request.user_pre_mfa:
flash(_('You need to login first'))
return redirect(url_for('session.login', ref=request.full_path))
if not request.user:
return redirect(url_for('session.mfa_auth', ref=request.full_path))
if not permission_check():
abort(403)
return func(*args, **kwargs)
return decorator
return wrapper
@bp.route('/mfa/auth', methods=['GET'])
@login_required_pre_mfa()
def mfa_auth():
if not request.user_pre_mfa.mfa_enabled:
request.session_pre_mfa.mfa_done = True
db.session.commit()
set_request_user()
if request.session_pre_mfa.mfa_done:
return secure_local_redirect(request.values.get('ref', url_for('index')))
return render_template('session/mfa_auth.html', ref=request.values.get('ref'))
@bp.route('/mfa/auth', methods=['POST'])
@login_required_pre_mfa()
def mfa_auth_finish():
delay = mfa_ratelimit.get_delay(request.user_pre_mfa.id)
if delay:
flash(_('We received too many invalid attempts! Please wait at least %s.')%format_delay(delay))
return redirect(url_for('session.mfa_auth', ref=request.values.get('ref')))
for method in request.user_pre_mfa.mfa_totp_methods:
if method.verify(request.form['code']):
request.session_pre_mfa.mfa_done = True
db.session.commit()
set_request_user()
return secure_local_redirect(request.values.get('ref', url_for('index')))
for method in request.user_pre_mfa.mfa_recovery_codes:
if method.verify(request.form['code']):
db.session.delete(method)
request.session_pre_mfa.mfa_done = True
db.session.commit()
set_request_user()
if len(request.user_pre_mfa.mfa_recovery_codes) <= 1:
flash(_('You have exhausted your recovery codes. Please generate new ones now!'))
return redirect(url_for('selfservice.setup_mfa'))
if len(request.user_pre_mfa.mfa_recovery_codes) <= 5:
flash(_('You only have a few recovery codes remaining. Make sure to generate new ones before they run out.'))
return redirect(url_for('selfservice.setup_mfa'))
return secure_local_redirect(request.values.get('ref', url_for('index')))
mfa_ratelimit.log(request.user_pre_mfa.id)
flash(_('Two-factor authentication failed'))
return redirect(url_for('session.mfa_auth', ref=request.values.get('ref')))
if WEBAUTHN_SUPPORTED:
@bp.route("/mfa/auth/webauthn/begin", methods=["POST"])
@login_required_pre_mfa(no_redirect=True)
def mfa_auth_webauthn_begin():
server = get_webauthn_server()
creds = [method.cred for method in request.user_pre_mfa.mfa_webauthn_methods]
if not creds:
abort(404)
auth_data, state = server.authenticate_begin(creds, user_verification='discouraged')
session["webauthn-state"] = state
return cbor.encode(auth_data)
@bp.route("/mfa/auth/webauthn/complete", methods=["POST"])
@login_required_pre_mfa(no_redirect=True)
def mfa_auth_webauthn_complete():
server = get_webauthn_server()
creds = [method.cred for method in request.user_pre_mfa.mfa_webauthn_methods]
if not creds:
abort(404)
data = cbor.decode(request.get_data())
credential_id = data["credentialId"]
client_data = ClientData(data["clientDataJSON"])
auth_data = AuthenticatorData(data["authenticatorData"])
signature = data["signature"]
# authenticate_complete() (as of python-fido2 v0.5.0, the version in Debian Buster)
# does not check signCount, although the spec recommends it
server.authenticate_complete(
session.pop("webauthn-state"),
creds,
credential_id,
client_data,
auth_data,
signature,
)
request.session_pre_mfa.mfa_done = True
db.session.commit()
set_request_user()
return cbor.encode({"status": "OK"})
@bp.route("/login/device/start")
def devicelogin_start():
session['devicelogin_started'] = True
return secure_local_redirect(request.values['ref'])
@bp.route("/login/device")
def devicelogin():
if 'devicelogin_id' not in session or 'devicelogin_secret' not in session:
return redirect(url_for('session.login', ref=request.values['ref'], devicelogin=True))
initiation = DeviceLoginInitiation.query.filter_by(id=session['devicelogin_id'], secret=session['devicelogin_secret']).one_or_none()
if not initiation or initiation.expired:
flash(_('Initiation code is no longer valid'))
return redirect(url_for('session.login', ref=request.values['ref'], devicelogin=True))
return render_template('session/devicelogin.html', ref=request.values.get('ref'), initiation=initiation)
@bp.route("/login/device", methods=['POST'])
def devicelogin_submit():
if 'devicelogin_id' not in session or 'devicelogin_secret' not in session:
return redirect(url_for('session.login', ref=request.values['ref'], devicelogin=True))
initiation = DeviceLoginInitiation.query.filter_by(id=session['devicelogin_id'], secret=session['devicelogin_secret']).one_or_none()
if not initiation or initiation.expired:
flash(_('Initiation code is no longer valid'))
return redirect(url_for('session.login', ref=request.values['ref'], devicelogin=True))
confirmation = DeviceLoginConfirmation.query.filter_by(initiation=initiation, code=request.form['confirmation-code']).one_or_none()
if confirmation is None:
flash(_('Invalid confirmation code'))
return render_template('session/devicelogin.html', ref=request.values.get('ref'), initiation=initiation)
session['devicelogin_confirmation'] = confirmation.id
return secure_local_redirect(request.values['ref'])
@bp.route("/device")
@login_required()
def deviceauth():
if 'initiation-code' not in request.values:
return render_template('session/deviceauth.html')
initiation = DeviceLoginInitiation.query.filter_by(code=request.values['initiation-code']).one_or_none()
if initiation is None or initiation.expired:
flash(_('Invalid initiation code'))
return redirect(url_for('session.deviceauth'))
return render_template('session/deviceauth.html', initiation=initiation)
@bp.route("/device", methods=['POST'])
@login_required()
@csrf_protect(blueprint=bp)
def deviceauth_submit():
DeviceLoginConfirmation.query.filter_by(session=request.session).delete()
initiation = DeviceLoginInitiation.query.filter_by(code=request.form['initiation-code']).one_or_none()
if initiation is None or initiation.expired:
flash(_('Invalid initiation code'))
return redirect(url_for('session.deviceauth'))
confirmation = DeviceLoginConfirmation(session=request.session, initiation=initiation)
db.session.add(confirmation)
db.session.commit()
return render_template('session/deviceauth.html', initiation=initiation, confirmation=confirmation)
@bp.route("/device/finish", methods=['GET', 'POST'])
@login_required()
def deviceauth_finish():
DeviceLoginConfirmation.query.filter_by(session=request.session).delete()
db.session.commit()
return redirect(url_for('index'))
import functools
import secrets
from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, jsonify
from flask_babel import gettext as _
from uffd.sendmail import sendmail
from uffd.database import db
from uffd.models import User, Signup, Ratelimit, host_ratelimit, format_delay
from .session import set_session
bp = Blueprint('signup', __name__, template_folder='templates', url_prefix='/signup/')
signup_ratelimit = Ratelimit('signup', 24*60, 3)
confirm_ratelimit = Ratelimit('signup_confirm', 10*60, 3)
def signup_enabled(func):
@functools.wraps(func)
def decorator(*args, **kwargs):
if not current_app.config['SELF_SIGNUP']:
flash(_('Signup not enabled'))
return redirect(url_for('index'))
return func(*args, **kwargs)
return decorator
@bp.route('/')
@signup_enabled
def signup_start():
return render_template('signup/start.html')
@bp.route('/check', methods=['POST'])
@signup_enabled
def signup_check():
if host_ratelimit.get_delay():
return jsonify({'status': 'ratelimited'})
host_ratelimit.log()
if not User().set_loginname(request.form['loginname']):
return jsonify({'status': 'invalid'})
if User.query.filter_by(loginname=request.form['loginname']).all():
return jsonify({'status': 'exists'})
return jsonify({'status': 'ok'})
@bp.route('/', methods=['POST'])
@signup_enabled
def signup_submit():
if request.form['password1'] != request.form['password2']:
flash(_('Passwords do not match'), 'error')
return render_template('signup/start.html')
signup_delay = signup_ratelimit.get_delay(request.form['mail'])
host_delay = host_ratelimit.get_delay()
if signup_delay and signup_delay > host_delay:
flash(_('Too many signup requests with this mail address! Please wait %(delay)s.',
delay=format_delay(signup_delay)), 'error')
return render_template('signup/start.html')
if host_delay:
flash(_('Too many requests! Please wait %(delay)s.', delay=format_delay(host_delay)), 'error')
return render_template('signup/start.html')
host_ratelimit.log()
signup = Signup(loginname=request.form['loginname'],
displayname=request.form['displayname'],
mail=request.form['mail'])
# If the password is invalid, signup.set_password returns False and does not
# set signup.password. We don't need to check the return value here, because
# we call signup.verify next and that checks if signup.password is set.
signup.set_password(request.form['password1'])
valid, msg = signup.validate()
if not valid:
flash(msg, 'error')
return render_template('signup/start.html')
db.session.add(signup)
db.session.commit()
sent = sendmail(signup.mail, 'Confirm your mail address', 'signup/mail.txt', signup=signup)
if not sent:
flash(_('Could not send mail'), 'error')
return render_template('signup/start.html')
signup_ratelimit.log(request.form['mail'])
return render_template('signup/submitted.html', signup=signup)
# signup_confirm* views are always accessible so other modules (e.g. invite) can reuse them
@bp.route('/confirm/<int:signup_id>/<token>')
def signup_confirm(signup_id, token):
signup = Signup.query.get(signup_id)
if not signup or not secrets.compare_digest(signup.token, token) or signup.expired or signup.completed:
flash(_('Invalid signup link'))
return redirect(url_for('index'))
return render_template('signup/confirm.html', signup=signup)
@bp.route('/confirm/<int:signup_id>/<token>', methods=['POST'])
def signup_confirm_submit(signup_id, token):
signup = Signup.query.get(signup_id)
if not signup or not secrets.compare_digest(signup.token, token) or signup.expired or signup.completed:
flash(_('Invalid signup link'))
return redirect(url_for('index'))
confirm_delay = confirm_ratelimit.get_delay(token)
host_delay = host_ratelimit.get_delay()
if confirm_delay and confirm_delay > host_delay:
flash(_('Too many failed attempts! Please wait %(delay)s.', delay=format_delay(confirm_delay)), 'error')
return render_template('signup/confirm.html', signup=signup)
if host_delay:
return render_template('signup/confirm.html', signup=signup)
if not signup.password.verify(request.form['password']):
host_ratelimit.log()
confirm_ratelimit.log(token)
flash(_('Wrong password'), 'error')
return render_template('signup/confirm.html', signup=signup)
user, msg = signup.finish(request.form['password'])
if user is None:
db.session.rollback()
flash(msg, 'error')
return render_template('signup/confirm.html', signup=signup)
db.session.commit()
set_session(user, skip_mfa=True)
flash(_('Your account was successfully created'))
return redirect(url_for('index'))
import csv
import io
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app
from flask_babel import gettext as _, lazy_gettext
from sqlalchemy.exc import IntegrityError
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.remailer import remailer
from uffd.database import db
from uffd.models import User, UserEmail, Role, MFAMethod
from .selfservice import send_passwordreset
from .session import login_required
bp = Blueprint("user", __name__, template_folder='templates', url_prefix='/user/')
bp.add_app_template_global(User, 'User')
bp.add_app_template_global(remailer, 'remailer')
def user_acl_check():
return request.user and request.user.is_in_group(current_app.config['ACL_ADMIN_GROUP'])
@bp.before_request
@login_required(user_acl_check)
def user_acl():
pass
@bp.route("/")
@register_navbar(lazy_gettext('Users'), icon='users', blueprint=bp, visible=user_acl_check)
def index():
return render_template('user/list.html', users=User.query.all())
@bp.route("/<int:id>")
@bp.route("/new")
def show(id=None):
user = User() if id is None else User.query.get_or_404(id)
return render_template('user/show.html', user=user, roles=Role.query.all())
@bp.route("/new", methods=['POST'])
@csrf_protect(blueprint=bp)
def create():
user = User()
if request.form.get('serviceaccount'):
user.is_service_user = True
ignore_blocklist = request.form.get('ignore-loginname-blocklist', False)
if not user.set_loginname(request.form['loginname'], ignore_blocklist=ignore_blocklist):
flash(_('Login name does not meet requirements'))
return redirect(url_for('user.show'))
if not user.set_primary_email_address(request.form['email']):
flash(_('E-Mail address is invalid'))
return redirect(url_for('user.show'))
new_displayname = request.form['displayname'] if request.form['displayname'] else request.form['loginname']
if user.displayname != new_displayname and not user.set_displayname(new_displayname):
flash(_('Display name does not meet requirements'))
return redirect(url_for('user.show'))
db.session.add(user)
try:
db.session.flush()
except IntegrityError:
flash(_('Login name or e-mail address is already in use'))
return redirect(url_for('user.show'))
for role in Role.query.all():
if not user.is_service_user and role.is_default:
continue
if request.values.get('role-{}'.format(role.id), False):
user.roles.append(role)
user.update_groups()
db.session.commit()
if user.is_service_user:
flash(_('Service user created'))
else:
send_passwordreset(user, new=True)
flash(_('User created. We sent the user a password reset link by e-mail'))
return redirect(url_for('user.show', id=user.id))
@bp.route("/<int:id>/update", methods=['POST'])
@csrf_protect(blueprint=bp)
def update(id):
# pylint: disable=too-many-branches,too-many-statements
user = User.query.get_or_404(id)
for email in user.all_emails:
if f'email-{email.id}-present' in request.form:
email.verified = email.verified or (request.form.get(f'email-{email.id}-verified') == '1')
for key, value in request.form.items():
parts = key.split('-')
if not parts[0] == 'newemail' or not parts[2] == 'address' or not value:
continue
tmp_id = parts[1]
email = UserEmail(
user=user,
verified=(request.form.get(f'newemail-{tmp_id}-verified') == '1'),
)
if not email.set_address(value):
flash(_('E-Mail address is invalid'))
return redirect(url_for('user.show', id=id))
db.session.add(email)
try:
db.session.flush()
except IntegrityError:
flash(_('E-Mail address already exists or is used by another account'))
return redirect(url_for('user.show', id=id))
verified_emails = UserEmail.query.filter_by(user=user, verified=True)
user.primary_email = verified_emails.filter_by(id=request.form['primary_email']).one()
if request.form['recovery_email'] == 'primary':
user.recovery_email = None
else:
user.recovery_email = verified_emails.filter_by(id=request.form['recovery_email']).one()
for service_user in user.service_users:
if not service_user.has_email_preferences:
continue
value = request.form.get(f'service_{service_user.service.id}_email', 'primary')
if value == 'primary':
service_user.service_email = None
else:
service_user.service_email = verified_emails.filter_by(id=value).one()
for email in user.all_emails:
if request.form.get(f'email-{email.id}-delete') == '1':
db.session.delete(email)
new_displayname = request.form['displayname'] if request.form['displayname'] else request.form['loginname']
if user.displayname != new_displayname and not user.set_displayname(new_displayname):
flash(_('Display name does not meet requirements'))
return redirect(url_for('user.show', id=id))
new_password = request.form.get('password')
if new_password:
if not user.set_password(new_password):
flash(_('Password is invalid'))
return redirect(url_for('user.show', id=id))
user.roles.clear()
for role in Role.query.all():
if not user.is_service_user and role.is_default:
continue
if request.values.get('role-{}'.format(role.id), False):
user.roles.append(role)
user.update_groups()
db.session.commit()
flash(_('User updated'))
return redirect(url_for('user.show', id=user.id))
@bp.route('/<int:id>/deactivate')
@csrf_protect(blueprint=bp)
def deactivate(id):
user = User.query.get_or_404(id)
user.is_deactivated = True
db.session.commit()
flash(_('User deactivated'))
return redirect(url_for('user.show', id=user.id))
@bp.route('/<int:id>/activate')
@csrf_protect(blueprint=bp)
def activate(id):
user = User.query.get_or_404(id)
user.is_deactivated = False
db.session.commit()
flash(_('User activated'))
return redirect(url_for('user.show', id=user.id))
@bp.route('/<int:id>/mfa/disable')
@csrf_protect(blueprint=bp)
def disable_mfa(id):
user = User.query.get_or_404(id)
MFAMethod.query.filter_by(user=user).delete()
user.update_groups()
db.session.commit()
flash(_('Two-factor authentication was reset'))
return redirect(url_for('user.show', id=id))
@bp.route('/<int:id>/sessions/revoke')
@csrf_protect(blueprint=bp)
def revoke_sessions(id):
user = User.query.get_or_404(id)
user.sessions.clear()
db.session.commit()
flash(_('Sessions revoked'))
return redirect(url_for('user.show', id=user.id))
@bp.route("/<int:id>/del")
@csrf_protect(blueprint=bp)
def delete(id):
user = User.query.get_or_404(id)
user.roles.clear()
db.session.delete(user)
db.session.commit()
flash(_('Deleted user'))
return redirect(url_for('user.index'))
@bp.route("/csv", methods=['POST'])
@csrf_protect(blueprint=bp)
def csvimport():
csvdata = request.values.get('csv')
if not csvdata:
flash('No data for csv import!')
return redirect(url_for('user.index'))
ignore_blocklist = request.values.get('ignore-loginname-blocklist', False)
roles = Role.query.filter_by(is_default=False).all()
usersadded = 0
with io.StringIO(initial_value=csvdata) as csvfile:
csvreader = csv.reader(csvfile)
for row in csvreader:
if not len(row) == 3:
flash("invalid line, ignored : {}".format(row))
continue
newuser = User()
if not newuser.set_loginname(row[0], ignore_blocklist=ignore_blocklist) or not newuser.set_displayname(row[0]):
flash("invalid login name, skipped : {}".format(row))
continue
if not newuser.set_primary_email_address(row[1]):
flash("invalid e-mail address, skipped : {}".format(row))
continue
db.session.add(newuser)
for role in roles:
if str(role.id) in row[2].split(';'):
role.members.append(newuser)
newuser.update_groups()
try:
db.session.commit()
except IntegrityError:
flash('Error adding user {}'.format(row[0]))
db.session.rollback()
continue
send_passwordreset(newuser, new=True)
usersadded += 1
flash('Added {} new users'.format(usersadded))
return redirect(url_for('user.index'))
#!/bin/sh
set -e
pybabel extract -F uffd/babel.cfg -k lazy_gettext -o messages.pot uffd
# If you want to initialize a new message, use:
# pybabel init -i messages.pot -d uffd/translations -l fr
# Complete Documentation of Flask-Babel: https://flask-babel.tkte.ch
pybabel update -i messages.pot -d uffd/translations
pybabel compile -d uffd/translations
if [ -n "$1" ]; then
NUM_EMPTY="$(tr '\n' '|' < uffd/translations/$1/LC_MESSAGES/messages.po | sed 's/msgstr ""|/empty/g' | tr '|' '\n' | grep '^empty$' | wc -l)"
NUM_TOTAL="$(grep '^msgid' uffd/translations/$1/LC_MESSAGES/messages.po | wc -l)"
# Emulate python-coverage output
echo "TOTAL $NUM_TOTAL $(( $NUM_TOTAL - $NUM_EMPTY )) $(( 100 * ($NUM_TOTAL - $NUM_EMPTY) / $NUM_TOTAL ))%"
fi
[uwsgi]
plugin = python3
manage-script-name = true
uid = uffd
gid = uffd
vacuum = true
master = true
process = 2
threads = 4
buffer-size = 8192 ; this buffer is used for http headers and defaults to 4k
single-interpreter = true
need-app = true
env = PYTHONIOENCODING=UTF-8
env = LANG=en_GB.utf8
env = TZ=Europe/Berlin
env = CONFIG_PATH=/etc/uffd/uffd.cfg
chdir = /usr/share/uffd
module = uffd:create_app()
hook-pre-app = exec:FLASK_APP=uffd flask db upgrade