Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Select Git revision
Show changes
Showing
with 373 additions and 842 deletions
from .views import bp as _bp
bp = [_bp]
from flask import current_app
from sqlalchemy import Column, Integer, String, DateTime, Text
from uffd.ldapalchemy.dbutils import DBRelationship
from uffd.database import db
from uffd.user.models import User
from uffd.session.models import DeviceLoginInitiation, DeviceLoginType
class OAuth2Client:
def __init__(self, client_id, client_secret, redirect_uris, required_group=None, logout_urls=None):
self.client_id = client_id
self.client_secret = client_secret
# We only support the Authorization Code Flow for confidential (server-side) clients
self.client_type = 'confidential'
self.redirect_uris = redirect_uris
self.default_scopes = ['profile']
self.required_group = required_group
self.logout_urls = []
for url in (logout_urls or []):
if isinstance(url, str):
self.logout_urls.append(['GET', url])
else:
self.logout_urls.append(url)
@classmethod
def from_id(cls, client_id):
return OAuth2Client(client_id, **current_app.config['OAUTH2_CLIENTS'][client_id])
@property
def default_redirect_uri(self):
return self.redirect_uris[0]
def access_allowed(self, user):
return user.has_permission(self.required_group)
class OAuth2Grant(db.Model):
__tablename__ = 'oauth2grant'
id = Column(Integer, primary_key=True)
user_dn = Column(String(128))
user = DBRelationship('user_dn', User, backref='oauth2_grants')
client_id = Column(String(40))
@property
def client(self):
return OAuth2Client.from_id(self.client_id)
@client.setter
def client(self, newclient):
self.client_id = newclient.client_id
code = Column(String(255), index=True, nullable=False)
redirect_uri = Column(String(255))
expires = Column(DateTime)
_scopes = Column(Text)
@property
def scopes(self):
if self._scopes:
return self._scopes.split()
return []
def delete(self):
db.session.delete(self)
db.session.commit()
return self
class OAuth2Token(db.Model):
__tablename__ = 'oauth2token'
id = Column(Integer, primary_key=True)
user_dn = Column(String(128))
user = DBRelationship('user_dn', User, backref='oauth2_tokens')
client_id = Column(String(40))
@property
def client(self):
return OAuth2Client.from_id(self.client_id)
@client.setter
def client(self, newclient):
self.client_id = newclient.client_id
# currently only bearer is supported
token_type = Column(String(40))
access_token = Column(String(255), unique=True)
refresh_token = Column(String(255), unique=True)
expires = Column(DateTime)
_scopes = Column(Text)
@property
def scopes(self):
if self._scopes:
return self._scopes.split()
return []
def delete(self):
db.session.delete(self)
db.session.commit()
return self
class OAuth2DeviceLoginInitiation(DeviceLoginInitiation):
__mapper_args__ = {
'polymorphic_identity': DeviceLoginType.OAUTH2
}
oauth2_client_id = Column(String(40))
@property
def oauth2_client(self):
return OAuth2Client.from_id(self.oauth2_client_id)
@property
def description(self):
return self.oauth2_client.client_id
import datetime
import functools
import urllib.parse
from flask import Blueprint, request, jsonify, render_template, session, redirect, url_for, flash
from flask_oauthlib.provider import OAuth2Provider
from flask_babel import gettext as _
from sqlalchemy.exc import IntegrityError
from uffd.ratelimit import host_ratelimit, format_delay
from uffd.database import db
from uffd.secure_redirect import secure_local_redirect
from uffd.session.models import DeviceLoginConfirmation
from .models import OAuth2Client, OAuth2Grant, OAuth2Token, OAuth2DeviceLoginInitiation
oauth = OAuth2Provider()
@oauth.clientgetter
def load_client(client_id):
return OAuth2Client.from_id(client_id)
@oauth.grantgetter
def load_grant(client_id, code):
return OAuth2Grant.query.filter_by(client_id=client_id, code=code).first()
@oauth.grantsetter
def save_grant(client_id, code, oauthreq, *args, **kwargs): # pylint: disable=unused-argument
expires = datetime.datetime.utcnow() + datetime.timedelta(seconds=100)
grant = OAuth2Grant(user_dn=request.oauth2_user.dn, client_id=client_id,
code=code['code'], redirect_uri=oauthreq.redirect_uri, expires=expires, _scopes=' '.join(oauthreq.scopes))
db.session.add(grant)
db.session.commit()
return grant
@oauth.tokengetter
def load_token(access_token=None, refresh_token=None):
if access_token:
return OAuth2Token.query.filter_by(access_token=access_token).first()
if refresh_token:
return OAuth2Token.query.filter_by(refresh_token=refresh_token).first()
return None
@oauth.tokensetter
def save_token(token_data, oauthreq, *args, **kwargs): # pylint: disable=unused-argument
OAuth2Token.query.filter_by(client_id=oauthreq.client.client_id, user_dn=oauthreq.user.dn).delete()
expires_in = token_data.get('expires_in')
expires = datetime.datetime.utcnow() + datetime.timedelta(seconds=expires_in)
tok = OAuth2Token(
user_dn=oauthreq.user.dn,
client_id=oauthreq.client.client_id,
token_type=token_data['token_type'],
access_token=token_data['access_token'],
refresh_token=token_data['refresh_token'],
expires=expires,
_scopes=' '.join(oauthreq.scopes)
)
db.session.add(tok)
db.session.commit()
return tok
bp = Blueprint('oauth2', __name__, url_prefix='/oauth2/', template_folder='templates')
@bp.record
def init(state):
state.app.config.setdefault('OAUTH2_PROVIDER_ERROR_ENDPOINT', 'oauth2.error')
oauth.init_app(state.app)
# flask-oauthlib has the bug to require the scope parameter for authorize
# requests, which is actually optional according to the OAuth2.0 spec.
# We don't really use scopes and this requirement just complicates the
# configuration of clients.
# See also: https://github.com/lepture/flask-oauthlib/pull/320
def inject_scope(func):
@functools.wraps(func)
def decorator(*args, **kwargs):
args = request.args.to_dict()
if not args.get('scope'):
args['scope'] = 'profile'
return redirect(request.base_url+'?'+urllib.parse.urlencode(args))
return func(*args, **kwargs)
return decorator
@bp.route('/authorize', methods=['GET', 'POST'])
@inject_scope
@oauth.authorize_handler
def authorize(*args, **kwargs): # pylint: disable=unused-argument
client = kwargs['request'].client
request.oauth2_user = None
if request.user:
request.oauth2_user = request.user
elif 'devicelogin_started' in session:
del session['devicelogin_started']
host_delay = host_ratelimit.get_delay()
if host_delay:
flash(_('We received too many requests from your ip address/network! Please wait at least %(delay)s.', delay=format_delay(host_delay)))
return redirect(url_for('session.login', ref=request.full_path, devicelogin=True))
host_ratelimit.log()
initiation = OAuth2DeviceLoginInitiation(oauth2_client_id=client.client_id)
db.session.add(initiation)
try:
db.session.commit()
except IntegrityError:
flash(_('Device login is currently not available. Try again later!'))
return redirect(url_for('session.login', ref=request.values['ref'], devicelogin=True))
session['devicelogin_id'] = initiation.id
session['devicelogin_secret'] = initiation.secret
return redirect(url_for('session.devicelogin', ref=request.full_path))
elif 'devicelogin_id' in session and 'devicelogin_secret' in session and 'devicelogin_confirmation' in session:
initiation = OAuth2DeviceLoginInitiation.query.filter_by(id=session['devicelogin_id'], secret=session['devicelogin_secret'],
oauth2_client_id=client.client_id).one_or_none()
confirmation = DeviceLoginConfirmation.query.get(session['devicelogin_confirmation'])
del session['devicelogin_id']
del session['devicelogin_secret']
del session['devicelogin_confirmation']
if not initiation or initiation.expired or not confirmation:
flash('Device login failed')
return redirect(url_for('session.login', ref=request.full_path, devicelogin=True))
request.oauth2_user = confirmation.user
db.session.delete(initiation)
db.session.commit()
else:
return redirect(url_for('session.login', ref=request.full_path, devicelogin=True))
# Here we would normally ask the user, if he wants to give the requesting
# service access to his data. Since we only have trusted services (the
# clients defined in the server config), we don't ask for consent.
session['oauth2-clients'] = session.get('oauth2-clients', [])
if client.client_id not in session['oauth2-clients']:
session['oauth2-clients'].append(client.client_id)
return client.access_allowed(request.oauth2_user)
@bp.route('/token', methods=['GET', 'POST'])
@oauth.token_handler
def token():
return None
@bp.route('/userinfo')
@oauth.require_oauth('profile')
def userinfo():
user = request.oauth.user
# We once exposed the entryUUID here as "ldap_uuid" until realising that it
# can (and does!) change randomly and is therefore entirely useless as an
# indentifier.
return jsonify(
id=user.uid,
name=user.displayname,
nickname=user.loginname,
email=user.mail,
ldap_dn=user.dn,
groups=[group.name for group in user.groups]
)
@bp.route('/error')
def error():
args = dict(request.values)
err = args.pop('error', 'unknown')
error_description = args.pop('error_description', '')
return render_template('oauth2/error.html', error=err, error_description=error_description, args=args)
@bp.app_url_defaults
def inject_logout_params(endpoint, values):
if endpoint != 'oauth2.logout' or not session.get('oauth2-clients'):
return
values['client_ids'] = ','.join(session['oauth2-clients'])
@bp.route('/logout')
def logout():
if not request.values.get('client_ids'):
return secure_local_redirect(request.values.get('ref', '/'))
client_ids = request.values['client_ids'].split(',')
clients = [OAuth2Client.from_id(client_id) for client_id in client_ids]
return render_template('oauth2/logout.html', clients=clients)
import secrets
import hashlib
import base64
from crypt import crypt # pylint: disable=deprecated-module
import argon2
def build_value(method_name, data):
return '{' + method_name + '}' + data
def parse_value(value):
if value is not None and value.startswith('{') and '}' in value:
method_name, data = value[1:].split('}', 1)
return method_name.lower(), data
raise ValueError('Invalid password hash')
class PasswordHashRegistry:
'''Factory for creating objects of the correct PasswordHash subclass for a
given password hash value'''
def __init__(self):
self.methods = {}
def register(self, cls):
assert cls.METHOD_NAME not in self.methods
self.methods[cls.METHOD_NAME] = cls
return cls
def parse(self, value, **kwargs):
method_name, _ = parse_value(value)
method_cls = self.methods.get(method_name)
if method_cls is None:
raise ValueError(f'Unknown password hash method {method_name}')
return method_cls(value, **kwargs)
registry = PasswordHashRegistry()
class PasswordHash:
'''OpenLDAP-/NIS-style password hash
Instances wrap password hash strings in the form "{METHOD_NAME}DATA".
Allows gradual migration of password hashing methods by checking
PasswordHash.needs_rehash every time the password is processed and rehashing
it with PasswordHash.from_password if needed. For PasswordHash.needs_rehash
to work, the PasswordHash subclass for the current password hashing method
is instantiated with target_cls set to the PasswordHash subclass of the
intended hashing method.
Instances should be created with PasswordHashRegistry.parse to get the
appropriate subclass based on the method name in a value.'''
METHOD_NAME = None
def __init__(self, value, target_cls=None):
method_name, data = parse_value(value)
if method_name != self.METHOD_NAME:
raise ValueError('Invalid password hash')
self.value = value
self.data = data
self.target_cls = target_cls or type(self)
@classmethod
def from_password(cls, password):
raise NotImplementedError()
def verify(self, password):
raise NotImplementedError()
@property
def needs_rehash(self):
return not isinstance(self, self.target_cls)
@registry.register
class PlaintextPasswordHash(PasswordHash):
'''Pseudo password hash for passwords stored without hashing
Should only be used for migration of existing plaintext passwords. Add the
prefix "{plain}" for this.'''
METHOD_NAME = 'plain'
@classmethod
def from_password(cls, password):
return cls(build_value(cls.METHOD_NAME, password))
def verify(self, password):
return secrets.compare_digest(self.data, password)
class HashlibPasswordHash(PasswordHash):
HASHLIB_ALGORITHM = None
@classmethod
def from_password(cls, password):
ctx = hashlib.new(cls.HASHLIB_ALGORITHM, password.encode())
return cls(build_value(cls.METHOD_NAME, base64.b64encode(ctx.digest()).decode()))
def verify(self, password):
digest = base64.b64decode(self.data.encode())
ctx = hashlib.new(self.HASHLIB_ALGORITHM, password.encode())
return secrets.compare_digest(digest, ctx.digest())
class SaltedHashlibPasswordHash(PasswordHash):
HASHLIB_ALGORITHM = None
@classmethod
def from_password(cls, password):
salt = secrets.token_bytes(8)
ctx = hashlib.new(cls.HASHLIB_ALGORITHM)
ctx.update(password.encode())
ctx.update(salt)
return cls(build_value(cls.METHOD_NAME, base64.b64encode(ctx.digest()+salt).decode()))
def verify(self, password):
data = base64.b64decode(self.data.encode())
ctx = hashlib.new(self.HASHLIB_ALGORITHM)
digest = data[:ctx.digest_size]
salt = data[ctx.digest_size:]
ctx.update(password.encode())
ctx.update(salt)
return secrets.compare_digest(digest, ctx.digest())
@registry.register
class MD5PasswordHash(HashlibPasswordHash):
METHOD_NAME = 'md5'
HASHLIB_ALGORITHM = 'md5'
@registry.register
class SaltedMD5PasswordHash(SaltedHashlibPasswordHash):
METHOD_NAME = 'smd5'
HASHLIB_ALGORITHM = 'md5'
@registry.register
class SHA1PasswordHash(HashlibPasswordHash):
METHOD_NAME = 'sha'
HASHLIB_ALGORITHM = 'sha1'
@registry.register
class SaltedSHA1PasswordHash(SaltedHashlibPasswordHash):
METHOD_NAME = 'ssha'
HASHLIB_ALGORITHM = 'sha1'
@registry.register
class SHA256PasswordHash(HashlibPasswordHash):
METHOD_NAME = 'sha256'
HASHLIB_ALGORITHM = 'sha256'
@registry.register
class SaltedSHA256PasswordHash(SaltedHashlibPasswordHash):
METHOD_NAME = 'ssha256'
HASHLIB_ALGORITHM = 'sha256'
@registry.register
class SHA384PasswordHash(HashlibPasswordHash):
METHOD_NAME = 'sha384'
HASHLIB_ALGORITHM = 'sha384'
@registry.register
class SaltedSHA384PasswordHash(SaltedHashlibPasswordHash):
METHOD_NAME = 'ssha384'
HASHLIB_ALGORITHM = 'sha384'
@registry.register
class SHA512PasswordHash(HashlibPasswordHash):
METHOD_NAME = 'sha512'
HASHLIB_ALGORITHM = 'sha512'
@registry.register
class SaltedSHA512PasswordHash(SaltedHashlibPasswordHash):
METHOD_NAME = 'ssha512'
HASHLIB_ALGORITHM = 'sha512'
@registry.register
class CryptPasswordHash(PasswordHash):
METHOD_NAME = 'crypt'
@classmethod
def from_password(cls, password):
return cls(build_value(cls.METHOD_NAME, crypt(password)))
def verify(self, password):
return secrets.compare_digest(crypt(password, self.data), self.data)
@registry.register
class Argon2PasswordHash(PasswordHash):
METHOD_NAME = 'argon2'
hasher = argon2.PasswordHasher()
@classmethod
def from_password(cls, password):
return cls(build_value(cls.METHOD_NAME, cls.hasher.hash(password)))
def verify(self, password):
try:
return self.hasher.verify(self.data, password)
except argon2.exceptions.Argon2Error:
return False
except argon2.exceptions.InvalidHash:
return False
@property
def needs_rehash(self):
return super().needs_rehash or self.hasher.check_needs_rehash(self.data)
class InvalidPasswordHash:
def __init__(self, value=None):
self.value = value
# pylint: disable=unused-argument
def verify(self, password):
return False
@property
def needs_rehash(self):
return True
def __bool__(self):
return False
# An alternative approach for the behaviour of PasswordHashAttribute would be
# to use sqlalchemy.TypeDecorator. A type decorator allows custom encoding and
# decoding of values coming from the database (when query results are loaded)
# and going into the database (when statements are executed).
#
# This has one downside: After setting e.g. user.password to a string value it
# remains a string value until the change is flushed. It is not possible to
# coerce values to PasswordHash objects as soon as they are set.
#
# This is too inconsistent. Code should be able to rely on user.password to
# always behave like a PasswordHash object.
class PasswordHashAttribute:
'''Descriptor for wrapping an attribute storing a password hash string
Usage example:
>>> class User:
... # Could e.g. be an SQLAlchemy.Column or just a simple attribute
... _passord_hash = None
... password = PasswordHashAttribute('_passord_hash', SHA512PasswordHash)
...
>>> user = User()
>>> type(user.password)
<class 'uffd.password_hash.InvalidPasswordHash'>
>>>
>>> user._password_hash = '{plain}my_password'
>>> type(user.password)
<class 'uffd.password_hash.InvalidPasswordHash'>
>>> user.password.needs_rehash
True
>>>
>>> user.password = 'my_password'
>>> user._passord_hash
'{sha512}3ajDRohg3LJOIoq47kQgjUPrL1/So6U4uvvTnbT/EUyYKaZL0aRxDgwCH4pBNLai+LF+zMh//nnYRZ4t8pT7AQ=='
>>> type(user.password)
<class 'uffd.password_hash.SHA512PasswordHash'>
>>>
>>> user.password = None
>>> user._passord_hash is None
True
>>> type(user.password)
<class 'uffd.password_hash.InvalidPasswordHash'>
When set to a (plaintext) password the underlying attribute is set to a hash
value for the password. When set to None the underlying attribute is also set
to None.'''
def __init__(self, attribute_name, method_cls):
self.attribute_name = attribute_name
self.method_cls = method_cls
def __get__(self, obj, objtype=None):
if obj is None:
return self
value = getattr(obj, self.attribute_name)
try:
return registry.parse(value, target_cls=self.method_cls)
except ValueError:
return InvalidPasswordHash(value)
def __set__(self, obj, value):
if value is None:
value = InvalidPasswordHash()
elif isinstance(value, str):
value = self.method_cls.from_password(value)
setattr(obj, self.attribute_name, value.value)
# Hashing method for (potentially) low entropy secrets like user passwords. Is
# usually slow and uses salting to make dictionary attacks difficult.
LowEntropyPasswordHash = Argon2PasswordHash
# Hashing method for high entropy secrets like API keys. The secrets are
# generated instead of user-selected to ensure a high level of entropy. Is
# fast and does not need salting, since dictionary attacks are not feasable
# due to high entropy.
HighEntropyPasswordHash = SHA512PasswordHash
from flask import current_app
import itsdangerous
from uffd.utils import nopad_b32decode, nopad_b32encode, nopad_urlsafe_b64decode, nopad_urlsafe_b64encode
class Remailer:
'''The remailer feature improves user privacy by hiding real mail addresses
from services and instead providing them with autogenerated pseudonymous
remailer addresses. If a service sends a mail to a remailer address, the mail
service uses an uffd API endpoint to get the real mail address and rewrites
the remailer address with it. In case of a leak of user data from a service,
the remailer addresses are useless for third-parties.
Version 2 of the remailer address format is tolerant to case conversions at
the cost of being slightly longer.'''
@property
def configured(self):
return bool(current_app.config['REMAILER_DOMAIN'])
def get_serializer(self):
secret = current_app.config['REMAILER_SECRET_KEY'] or current_app.secret_key
return itsdangerous.URLSafeSerializer(secret, salt='remailer_address_v1')
def build_v1_address(self, service_id, user_id):
payload = self.get_serializer().dumps([service_id, user_id])
return 'v1-' + payload + '@' + current_app.config['REMAILER_DOMAIN']
def build_v2_address(self, service_id, user_id):
data, sign = self.get_serializer().dumps([service_id, user_id]).split('.', 1)
data = nopad_b32encode(nopad_urlsafe_b64decode(data)).decode().lower()
sign = nopad_b32encode(nopad_urlsafe_b64decode(sign)).decode().lower()
payload = data + '-' + sign
return 'v2-' + payload + '@' + current_app.config['REMAILER_DOMAIN']
def is_remailer_domain(self, domain):
domains = {domain.lower().strip() for domain in current_app.config['REMAILER_OLD_DOMAINS']}
if current_app.config['REMAILER_DOMAIN']:
domains.add(current_app.config['REMAILER_DOMAIN'].lower().strip())
return domain.lower().strip() in domains
def parse_v1_payload(self, payload):
try:
service_id, user_id = self.get_serializer().loads(payload)
except itsdangerous.BadData:
return None
return (service_id, user_id)
def parse_v2_payload(self, payload):
data, sign = (payload.split('-', 1) + [''])[:2]
try:
data = nopad_urlsafe_b64encode(nopad_b32decode(data.upper())).decode()
sign = nopad_urlsafe_b64encode(nopad_b32decode(sign.upper())).decode()
except ValueError:
return None
payload = data + '.' + sign
try:
service_id, user_id = self.get_serializer().loads(payload)
except itsdangerous.BadData:
return None
return (service_id, user_id)
def parse_address(self, address):
if '@' not in address:
return None
local_part, domain = address.rsplit('@', 1)
if not self.is_remailer_domain(domain):
return None
prefix, payload = (local_part.strip().split('-', 1) + [''])[:2]
if prefix == 'v1':
return self.parse_v1_payload(payload)
if prefix.lower() == 'v2':
return self.parse_v2_payload(payload)
return None
remailer = Remailer()
from .views import bp as bp_ui
bp = [bp_ui]
from .views import bp as bp_ui
bp = [bp_ui]
from .views import bp as bp_ui, send_passwordreset
bp = [bp_ui]
import datetime
import secrets
from sqlalchemy import Column, String, DateTime
from uffd.database import db
def random_token():
return secrets.token_hex(128)
class Token():
token = Column(String(128), primary_key=True, default=random_token)
created = Column(DateTime, default=datetime.datetime.now)
class PasswordToken(Token, db.Model):
__tablename__ = 'passwordToken'
loginname = Column(String(32))
class MailToken(Token, db.Model):
__tablename__ = 'mailToken'
loginname = Column(String(32))
newmail = Column(String(255))
{% extends 'base.html' %}
{% block body %}
<form action="{{ url_for("selfservice.forgot_password") }}" method="POST">
<div class="row mt-2 justify-content-center">
<div class="col-lg-6 col-md-10" style="background: #f7f7f7; box-shadow: 0px 2px 2px rgba(0, 0, 0, 0.3); padding: 30px;">
<div class="text-center">
<img alt="branding logo" src="{{ config.get("BRANDING_LOGO_URL") }}" class="col-lg-8 col-md-12" >
</div>
<div class="col-12">
<h2 class="text-center">{{_("Forgot password")}}</h2>
</div>
<div class="form-group col-12">
<label for="user-loginname">{{_("Login Name")}}</label>
<input type="text" class="form-control" id="user-loginname" name="loginname" required="required" tabindex = "1">
</div>
<div class="form-group col-12">
<label for="user-mail">{{_("Mail Address")}}</label>
<input type="text" class="form-control" id="user-mail" name="mail" required="required" tabindex = "2">
</div>
<div class="form-group col-12">
<button type="submit" class="btn btn-primary btn-block" tabindex = "3">{{_("Send password reset mail")}}</button>
</div>
</div>
</div>
</form>
{% endblock %}
{% extends 'base.html' %}
{% block body %}
<form action="{{ url_for("selfservice.token_password", token=token) }}" method="POST" onInput="password2.setCustomValidity(password1.value != password2.value ? 'Passwords do not match.' : '') ">
<div class="row mt-2 justify-content-center">
<div class="col-lg-6 col-md-10" style="background: #f7f7f7; box-shadow: 0px 2px 2px rgba(0, 0, 0, 0.3); padding: 30px;">
<div class="text-center">
<img alt="branding logo" src="{{ config.get("BRANDING_LOGO_URL") }}" class="col-lg-8 col-md-12" >
</div>
<div class="col-12">
<h2 class="text-center">{{_("Reset password")}}</h2>
</div>
<div class="form-group col-12">
<label for="user-password1">{{_("New Password")}}</label>
<input type="password" class="form-control" id="user-password1" name="password1" required="required" tabindex = "2">
<small class="form-text text-muted">
{{_("At least 8 and at most 256 characters, no other special requirements. But please don't be stupid, do use a password manager.")}}
</small>
</div>
<div class="form-group col-12">
<label for="user-password2">{{_("Repeat Password")}}</label>
<input type="password" class="form-control" id="user-password2" name="password2" required="required" tabindex = "2">
</div>
<div class="form-group col-12">
<button type="submit" class="btn btn-primary btn-block" tabindex = "3">{{_("Set password")}}</button>
</div>
</div>
</div>
</form>
{% endblock %}
import datetime
import smtplib
from email.message import EmailMessage
import email.utils
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session
from flask_babel import gettext as _, lazy_gettext
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
from uffd.user.models import User
from uffd.session import login_required
from uffd.selfservice.models import PasswordToken, MailToken
from uffd.role.models import Role
from uffd.database import db
from uffd.ldap import ldap
from uffd.ratelimit import host_ratelimit, Ratelimit, format_delay
bp = Blueprint("selfservice", __name__, template_folder='templates', url_prefix='/self/')
reset_ratelimit = Ratelimit('passwordreset', 1*60*60, 3)
@bp.route("/")
@register_navbar(lazy_gettext('Selfservice'), icon='portrait', blueprint=bp, visible=lambda: bool(request.user))
@login_required()
def index():
return render_template('selfservice/self.html', user=request.user)
@bp.route("/updateprofile", methods=(['POST']))
@csrf_protect(blueprint=bp)
@login_required()
def update_profile():
user = request.user
if request.values['displayname'] != user.displayname:
if user.set_displayname(request.values['displayname']):
flash(_('Display name changed.'))
else:
flash(_('Display name is not valid.'))
if request.values['mail'] != user.mail:
send_mail_verification(user.loginname, request.values['mail'])
flash(_('We sent you an email, please verify your mail address.'))
ldap.session.commit()
return redirect(url_for('selfservice.index'))
@bp.route("/changepassword", methods=(['POST']))
@csrf_protect(blueprint=bp)
@login_required()
def change_password():
password_changed = False
user = request.user
if not request.values['password1'] == request.values['password2']:
flash(_('Passwords do not match'))
else:
if user.set_password(request.values['password1']):
flash(_('Password changed'))
password_changed = True
else:
flash(_('Invalid password'))
ldap.session.commit()
# When using a user_connection, update the connection on password-change
if password_changed and current_app.config['LDAP_SERVICE_USER_BIND']:
session['user_pw'] = request.values['password1']
return redirect(url_for('selfservice.index'))
@bp.route("/passwordreset", methods=(['GET', 'POST']))
def forgot_password():
if request.method == 'GET':
return render_template('selfservice/forgot_password.html')
loginname = request.values['loginname'].lower()
mail = request.values['mail']
reset_delay = reset_ratelimit.get_delay(loginname+'/'+mail)
host_delay = host_ratelimit.get_delay()
if reset_delay or host_delay:
if reset_delay > host_delay:
flash(_('We received too many password reset requests for this user! Please wait at least %(delay)s.', delay=format_delay(reset_delay)))
else:
flash(_('We received too many requests from your ip address/network! Please wait at least %(delay)s.', delay=format_delay(host_delay)))
return redirect(url_for('.forgot_password'))
reset_ratelimit.log(loginname+'/'+mail)
host_ratelimit.log()
flash(_("We sent a mail to this user's mail address if you entered the correct mail and login name combination"))
user = User.query.filter_by(loginname=loginname).one_or_none()
if user and user.mail == mail:
send_passwordreset(user)
return redirect(url_for('session.login'))
@bp.route("/token/password/<token>", methods=(['POST', 'GET']))
def token_password(token):
dbtoken = PasswordToken.query.get(token)
if not dbtoken or dbtoken.created < (datetime.datetime.now() - datetime.timedelta(days=2)):
flash(_('Token expired, please try again.'))
if dbtoken:
db.session.delete(dbtoken)
db.session.commit()
return redirect(url_for('session.login'))
if request.method == 'GET':
return render_template('selfservice/set_password.html', token=token)
if not request.values['password1']:
flash(_('You need to set a password, please try again.'))
return render_template('selfservice/set_password.html', token=token)
if not request.values['password1'] == request.values['password2']:
flash(_('Passwords do not match, please try again.'))
return render_template('selfservice/set_password.html', token=token)
user = User.query.filter_by(loginname=dbtoken.loginname).one()
if not user.set_password(request.values['password1']):
flash(_('Password ist not valid, please try again.'))
return render_template('selfservice/set_password.html', token=token)
db.session.delete(dbtoken)
flash(_('New password set'))
ldap.session.commit()
db.session.commit()
return redirect(url_for('session.login'))
@bp.route("/token/mail_verification/<token>")
@login_required()
def token_mail(token):
dbtoken = MailToken.query.get(token)
if not dbtoken or dbtoken.created < (datetime.datetime.now() - datetime.timedelta(days=2)):
flash(_('Token expired, please try again.'))
if dbtoken:
db.session.delete(dbtoken)
db.session.commit()
return redirect(url_for('selfservice.index'))
user = User.query.filter_by(loginname=dbtoken.loginname).one()
user.set_mail(dbtoken.newmail)
flash(_('New mail set'))
db.session.delete(dbtoken)
ldap.session.commit()
db.session.commit()
return redirect(url_for('selfservice.index'))
@bp.route("/leaverole/<int:roleid>", methods=(['POST']))
@csrf_protect(blueprint=bp)
@login_required()
def leave_role(roleid):
if not current_app.config['ENABLE_ROLESELFSERVICE']:
flash(_('Leaving roles is disabled'))
return redirect(url_for('selfservice.index'))
role = Role.query.get_or_404(roleid)
role.members.discard(request.user)
request.user.update_groups()
ldap.session.commit()
db.session.commit()
flash(_('You left role %(role_name)s', role_name=role.name))
return redirect(url_for('selfservice.index'))
def send_mail_verification(loginname, newmail):
expired_tokens = MailToken.query.filter(MailToken.created < (datetime.datetime.now() - datetime.timedelta(days=2))).all()
duplicate_tokens = MailToken.query.filter(MailToken.loginname == loginname).all()
for i in expired_tokens + duplicate_tokens:
db.session.delete(i)
token = MailToken()
token.loginname = loginname
token.newmail = newmail
db.session.add(token)
db.session.commit()
user = User.query.filter_by(loginname=loginname).one()
msg = EmailMessage()
msg.set_content(render_template('selfservice/mailverification.mail.txt', user=user, token=token.token))
msg['Subject'] = 'Mail verification'
send_mail(newmail, msg)
def send_passwordreset(user, new=False):
expired_tokens = PasswordToken.query.filter(PasswordToken.created < (datetime.datetime.now() - datetime.timedelta(days=2))).all()
duplicate_tokens = PasswordToken.query.filter(PasswordToken.loginname == user.loginname).all()
for i in expired_tokens + duplicate_tokens:
db.session.delete(i)
token = PasswordToken()
token.loginname = user.loginname
db.session.add(token)
db.session.commit()
msg = EmailMessage()
if new:
msg.set_content(render_template('selfservice/newuser.mail.txt', user=user, token=token.token))
msg['Subject'] = 'Welcome to the %s infrastructure'%current_app.config.get('ORGANISATION_NAME', '')
else:
msg.set_content(render_template('selfservice/passwordreset.mail.txt', user=user, token=token.token))
msg['Subject'] = 'Password reset'
send_mail(user.mail, msg)
def send_mail(to_address, msg):
msg['From'] = current_app.config['MAIL_FROM_ADDRESS']
msg['To'] = to_address
msg['Date'] = email.utils.formatdate(localtime=1)
msg['Message-ID'] = email.utils.make_msgid()
try:
if current_app.debug:
current_app.last_mail = None
current_app.logger.debug('Trying to send email to %s:\n'%(to_address)+str(msg))
if current_app.debug and current_app.config.get('MAIL_SKIP_SEND', False):
if current_app.config['MAIL_SKIP_SEND'] == 'fail':
raise smtplib.SMTPException()
current_app.last_mail = msg
return True
server = smtplib.SMTP(host=current_app.config['MAIL_SERVER'], port=current_app.config['MAIL_PORT'])
if current_app.config['MAIL_USE_STARTTLS']:
server.starttls()
server.login(current_app.config['MAIL_USERNAME'], current_app.config['MAIL_PASSWORD'])
server.send_message(msg)
server.quit()
if current_app.debug:
current_app.last_mail = msg
return True
except smtplib.SMTPException:
flash(_('Mail to "%(mail_address)s" could not be sent!', mail_address=to_address))
return False
......@@ -24,7 +24,8 @@ def sendmail(addr, subject, template_name, **kwargs):
server = smtplib.SMTP(host=current_app.config['MAIL_SERVER'], port=current_app.config['MAIL_PORT'])
if current_app.config['MAIL_USE_STARTTLS']:
server.starttls()
server.login(current_app.config['MAIL_USERNAME'], current_app.config['MAIL_PASSWORD'])
if current_app.config['MAIL_USERNAME']:
server.login(current_app.config['MAIL_USERNAME'], current_app.config['MAIL_PASSWORD'])
server.send_message(msg)
server.quit()
if current_app.debug:
......
from .views import bp as _bp
bp = [_bp]
from flask import Blueprint, render_template, current_app, abort, request
from flask_babel import lazy_gettext, get_locale
from uffd.navbar import register_navbar
bp = Blueprint("services", __name__, template_folder='templates', url_prefix='/services')
# pylint: disable=too-many-branches
def get_services(user=None):
if not user and not current_app.config['SERVICES_PUBLIC']:
return []
services = []
for service_data in current_app.config['SERVICES']:
service_title = get_language_specific(service_data, 'title')
if not service_title:
continue
service_description = get_language_specific(service_data, 'description')
service = {
'title': service_title,
'subtitle': service_data.get('subtitle', ''),
'description': service_description,
'url': service_data.get('url', ''),
'logo_url': service_data.get('logo_url', ''),
'has_access': True,
'permission': '',
'groups': [],
'infos': [],
'links': [],
}
if service_data.get('required_group'):
if not user or not user.has_permission(service_data['required_group']):
service['has_access'] = False
for permission_data in service_data.get('permission_levels', []):
if permission_data.get('required_group'):
if not user or not user.has_permission(permission_data['required_group']):
continue
if not permission_data.get('name'):
continue
service['has_access'] = True
service['permission'] = permission_data['name']
if service_data.get('confidential', False) and not service['has_access']:
continue
for group_data in service_data.get('groups', []):
if group_data.get('required_group'):
if not user or not user.has_permission(group_data['required_group']):
continue
if not group_data.get('name'):
continue
service['groups'].append(group_data)
for info_data in service_data.get('infos', []):
if info_data.get('required_group'):
if not user or not user.has_permission(info_data['required_group']):
continue
info_title = get_language_specific(info_data, 'title')
info_html = get_language_specific(info_data, 'html')
if not info_title or not info_html:
continue
info_button_text = get_language_specific(info_data, 'button_text', info_title)
info = {
'title': info_title,
'button_text': info_button_text,
'html': info_html,
'id': '%d-%d'%(len(services), len(service['infos'])),
}
service['infos'].append(info)
for link_data in service_data.get('links', []):
if link_data.get('required_group'):
if not user or not user.has_permission(link_data['required_group']):
continue
if not link_data.get('url') or not link_data.get('title'):
continue
service['links'].append(link_data)
services.append(service)
return services
def get_language_specific(data, field_name, default =''):
return data.get(field_name + '_' + get_locale().language, data.get(field_name, default))
def services_visible():
return len(get_services(request.user)) > 0
@bp.route("/")
@register_navbar(lazy_gettext('Services'), icon='sitemap', blueprint=bp, visible=services_visible)
def index():
services = get_services(request.user)
if not current_app.config['SERVICES']:
abort(404)
banner = current_app.config.get('SERVICES_BANNER')
# Set the banner to None if it is not public and no user is logged in
if not (current_app.config["SERVICES_BANNER_PUBLIC"] or request.user):
banner = None
return render_template('services/overview.html', user=request.user, services=services, banner=banner)
from .views import bp as bp_ui, login_required, set_session
bp = [bp_ui]
{% extends 'base.html' %}
{% block body %}
{% if not initiation %}
<form action="{{ url_for("session.deviceauth") }}">
{% elif not confirmation %}
<form action="{{ url_for("session.deviceauth_submit") }}" method="POST">
{% else %}
<form action="{{ url_for("session.deviceauth_finish") }}" method="POST">
{% endif %}
<div class="row mt-2 justify-content-center">
<div class="col-lg-6 col-md-10" style="background: #f7f7f7; box-shadow: 0px 2px 2px rgba(0, 0, 0, 0.3); padding: 30px;">
<div class="text-center">
<img alt="branding logo" src="{{ config.get("BRANDING_LOGO_URL") }}" class="col-lg-8 col-md-12" >
</div>
<div class="col-12">
<h2 class="text-center">{{_('Authorize Device Login')}}</h2>
</div>
<div class="form-group col-12">
<p>{{_('Log into a service on another device without entering your password.')}}</p>
</div>
<div class="form-group col-12">
<label for="initiation-code">{{_('Initiation Code')}}</label>
{% if not initiation %}
<input type="text" class="form-control" id="initiation-code" name="initiation-code" value="{{ initiation_code or '' }}" required="required" tabindex = "1" autofocus>
{% else %}
<input type="text" class="form-control" id="initiation-code" name="initiation-code" value="{{ initiation.code }}" readonly>
{% endif %}
</div>
{% if confirmation %}
<div class="form-group col-12">
<label for="confirmation-code">{{_('Confirmation Code')}}</label>
<input type="text" class="form-control" id="confirmation-code" name="confirmation-code" value="{{ confirmation.code }}" readonly>
</div>
{% endif %}
{% if not initiation %}
<div class="form-group col-12">
<p>{{_('Start logging into a service on the other device and chose "Device Login" on the login page. Enter the displayed initiation code in the box above.')}}</p>
</div>
<div class="form-group col-12">
<button type="submit" class="btn btn-primary btn-block" tabindex = "2">{{_('Continue')}}</button>
</div>
<div class="form-group col-12">
<a href="{{ url_for('index') }}" class="btn btn-secondary btn-block" tabindex="0">{{_('Cancel')}}</a>
</div>
{% elif not confirmation %}
<div class="form-group col-12">
<p>{{_('Authorize the login for service <b>%(service_name)s</b>?', service_name=initiation.description|e)|safe}}</p>
</div>
<div class="form-group col-12">
<button type="submit" class="btn btn-primary btn-block" tabindex = "2">{{_('Authorize Login')}}</button>
</div>
<div class="form-group col-12">
<a href="{{ url_for('index') }}" class="btn btn-secondary btn-block" tabindex="0">{{_('Cancel')}}</a>
</div>
{% else %}
<div class="form-group col-12">
<p>{{_('Enter the confirmation code on the other device and complete the login. Click <em>Finish</em> afterwards.')|safe}}</p>
</div>
<div class="form-group col-12">
<button type="submit" class="btn btn-primary btn-block" tabindex = "2">{{_('Finish')}}</button>
</div>
{% endif %}
</div>
</div>
</form>
{% endblock %}
{% extends 'base.html' %}
{% block body %}
<form action="{{ url_for("session.devicelogin_submit", ref=ref) }}" method="POST">
<div class="row mt-2 justify-content-center">
<div class="col-lg-6 col-md-10" style="background: #f7f7f7; box-shadow: 0px 2px 2px rgba(0, 0, 0, 0.3); padding: 30px;">
<div class="text-center">
<img alt="branding logo" src="{{ config.get("BRANDING_LOGO_URL") }}" class="col-lg-8 col-md-12" >
</div>
<div class="col-12">
<h2 class="text-center">{{_('Device Login')}}</h2>
</div>
<div class="form-group col-12">
<p>{{_('Use a login session on another device (e.g. your laptop) to log into a service without entering your password.')}}</p>
</div>
{% if initiation %}
<div class="form-group col-12">
<label for="initiation-code">{{_('Initiation Code')}}</label>
<input type="text" class="form-control" id="initiation-code" name="initiation-code" value="{{ initiation.code }}" readonly>
</div>
<input type="hidden" class="form-control" id="initiation-secret" name="initiation-secret" value="{{ initiation.secret }}">
<div class="form-group col-12">
<label for="confirmation-code">{{_('Confirmation Code')}}</label>
<input type="text" class="form-control" id="confirmation-code" name="confirmation-code" required="required" tabindex = "1" autofocus>
</div>
<div class="form-group col-12">
<p>{{_('Open <code><a href="%(deviceauth_url)s">%(deviceauth_url)s</a></code> on the other device and enter the initiation code there. Then enter the confirmation code in the box above.', deviceauth_url=url_for('session.deviceauth', _external=True)|e)|safe}}</p>
</div>
<div class="form-group col-12">
<button type="submit" class="btn btn-primary btn-block" tabindex = "3">{{_('Continue')}}</button>
</div>
{% endif %}
<div class="form-group col-12">
<a href="{{ url_for('session.login', ref=ref, devicelogin=True) }}" class="btn btn-secondary btn-block" tabindex="0">{{_('Cancel')}}</a>
</div>
</div>
</div>
</form>
{% endblock %}
{% extends 'base.html' %}
{% block body %}
<form action="{{ url_for("session.login", ref=ref) }}" method="POST">
<div class="row mt-2 justify-content-center">
<div class="col-lg-6 col-md-10" style="background: #f7f7f7; box-shadow: 0px 2px 2px rgba(0, 0, 0, 0.3); padding: 30px;">
<div class="text-center">
<img alt="branding logo" src="{{ config.get("BRANDING_LOGO_URL") }}" class="col-lg-8 col-md-12" >
</div>
<div class="col-12">
<h2 class="text-center">{{_("Login")}}</h2>
</div>
<div class="form-group col-12">
<label for="user-loginname">{{_("Login Name")}}</label>
<input type="text" class="form-control" id="user-loginname" name="loginname" required="required" tabindex = "1" autofocus>
</div>
<div class="form-group col-12">
<label for="user-password1">{{_("Password")}}</label>
<input type="password" class="form-control" id="user-password1" name="password" required="required" tabindex = "2">
</div>
<div class="form-group col-12">
<button type="submit" class="btn btn-primary btn-block" tabindex = "3">{{_("Login")}}</button>
</div>
{% if request.values.get('devicelogin') %}
<div class="text-center text-muted mb-3">{{_("- or -")}}</div>
<div class="form-group col-12">
<a href="{{ url_for('session.devicelogin_start', ref=ref) }}" class="btn btn-primary btn-block" tabindex="0">{{_("Device Login")}}</a>
</div>
{% endif %}
<div class="clearfix col-12">
{% if config['SELF_SIGNUP'] %}
<a href="{{ url_for("signup.signup_start") }}" class="float-left">{{_("Register")}}</a>
{% endif %}
{% if config['ENABLE_PASSWORDRESET'] %}
<a href="{{ url_for("selfservice.forgot_password") }}" class="float-right">{{_("Forgot Password?")}}</a>
{% endif %}
</div>
</div>
</div>
</form>
{% endblock %}
from .views import bp as _bp
bp = [_bp]