Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • Dockerfile
  • feature_invite_validuntil_minmax
  • incremental-sync
  • jwt_encode_inconsistencies
  • master
  • redis-rate-limits
  • roles-recursive-cte
  • typehints
  • v1.0.x
  • v1.1.x
  • v1.2.x
  • v1.x.x
  • v0.1.2
  • v0.1.4
  • v0.1.5
  • v0.2.0
  • v0.3.0
  • v1.0.0
  • v1.0.1
  • v1.0.2
  • v1.1.0
  • v1.1.1
  • v1.1.2
  • v1.2.0
  • v2.0.0
  • v2.0.1
  • v2.1.0
  • v2.2.0
  • v2.3.0
  • v2.3.1
30 results

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Select Git revision
  • Dockerfile
  • claims-in-idtoke
  • feature_invite_validuntil_minmax
  • incremental-sync
  • jwt_encode_inconsistencies
  • master
  • recovery-code-pwhash
  • redis-rate-limits
  • roles-recursive-cte
  • typehints
  • v1.0.x
  • v1.1.x
  • v1.2.x
  • v1.x.x
  • v0.1.2
  • v0.1.4
  • v0.1.5
  • v0.2.0
  • v0.3.0
  • v1.0.0
  • v1.0.1
  • v1.0.2
  • v1.1.0
  • v1.1.1
  • v1.1.2
  • v1.2.0
  • v2.0.0
  • v2.0.1
  • v2.1.0
  • v2.2.0
  • v2.3.0
  • v2.3.1
32 results
Show changes
Showing
with 1731 additions and 430 deletions
from uffd.database import db
# pylint completely fails to understand SQLAlchemy's query functions
# pylint: disable=no-member
feature_flag_table = db.Table('feature_flag',
db.Column('name', db.String(32), primary_key=True),
)
class FeatureFlag:
def __init__(self, name):
self.name = name
self.enable_hooks = []
self.disable_hooks = []
@property
def expr(self):
return db.exists().where(feature_flag_table.c.name == self.name)
def __bool__(self):
return db.session.execute(db.select([self.expr])).scalar()
def enable_hook(self, func):
self.enable_hooks.append(func)
return func
def enable(self):
db.session.execute(db.insert(feature_flag_table).values(name=self.name))
for func in self.enable_hooks:
func()
def disable_hook(self, func):
self.disable_hooks.append(func)
return func
def disable(self):
db.session.execute(db.delete(feature_flag_table).where(feature_flag_table.c.name == self.name))
for func in self.disable_hooks:
func()
FeatureFlag.unique_email_addresses = FeatureFlag('unique-email-addresses')
lock_table = db.Table('lock',
db.Column('name', db.String(32), primary_key=True),
)
class Lock:
ALL_LOCKS = set()
def __init__(self, name):
self.name = name
assert name not in self.ALL_LOCKS
self.ALL_LOCKS.add(name)
def acquire(self):
'''Acquire the lock until the end of the current transaction
Calling acquire while the specific lock is already held has no effect.'''
if db.engine.name == 'sqlite':
# SQLite does not support with_for_update, but we can lock the whole DB
# with any write operation. So we do a dummy update.
db.session.execute(db.update(lock_table).where(False).values(name=None))
elif db.engine.name in ('mysql', 'mariadb'):
result = db.session.execute(db.select([lock_table.c.name]).where(lock_table.c.name == self.name).with_for_update()).scalar()
if result is not None:
return
# We add all lock rows with migrations so we should never end up here
raise Exception(f'Lock "{self.name}" is missing')
else:
raise NotImplementedError()
# Only executed when lock_table is created with db.create/db.create_all (e.g.
# during testing). Otherwise the rows are inserted with migrations.
@db.event.listens_for(lock_table, 'after_create') # pylint: disable=no-member
def insert_lock_rows(target, connection, **kwargs): # pylint: disable=unused-argument
for name in Lock.ALL_LOCKS:
db.session.execute(db.insert(lock_table).values(name=name))
db.session.commit()
import datetime
import json
import secrets
import base64
from sqlalchemy import Column, Integer, String, DateTime, Text, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.ext.associationproxy import association_proxy
import jwt
from uffd.database import db, CommaSeparatedList
from uffd.tasks import cleanup_task
from uffd.password_hash import PasswordHashAttribute, HighEntropyPasswordHash
from uffd.utils import token_urlfriendly
from .session import DeviceLoginInitiation, DeviceLoginType
from .service import ServiceUser
# pyjwt v1.7.x compat (Buster/Bullseye)
if not hasattr(jwt, 'get_algorithm_by_name'):
jwt.get_algorithm_by_name = lambda name: jwt.algorithms.get_default_algorithms()[name]
class OAuth2Client(db.Model):
__tablename__ = 'oauth2client'
# Inconsistently named "db_id" instead of "id" because of the naming conflict
# with "client_id" in the OAuth2 standard
db_id = Column(Integer, primary_key=True, autoincrement=True)
service_id = Column(Integer, ForeignKey('service.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
service = relationship('Service', back_populates='oauth2_clients')
client_id = Column(String(40), unique=True, nullable=False)
_client_secret = Column('client_secret', Text(), nullable=False)
client_secret = PasswordHashAttribute('_client_secret', HighEntropyPasswordHash)
_redirect_uris = relationship('OAuth2RedirectURI', cascade='all, delete-orphan')
redirect_uris = association_proxy('_redirect_uris', 'uri')
logout_uris = relationship('OAuth2LogoutURI', cascade='all, delete-orphan')
@property
def default_redirect_uri(self):
return self.redirect_uris[0] if len(self.redirect_uris) == 1 else None
def access_allowed(self, user):
service_user = ServiceUser.query.get((self.service_id, user.id))
return service_user and service_user.has_access
@property
def logout_uris_json(self):
return json.dumps([[item.method, item.uri] for item in self.logout_uris])
class OAuth2RedirectURI(db.Model):
__tablename__ = 'oauth2redirect_uri'
id = Column(Integer, primary_key=True, autoincrement=True)
client_db_id = Column(Integer, ForeignKey('oauth2client.db_id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
uri = Column(String(255), nullable=False)
def __init__(self, uri):
self.uri = uri
class OAuth2LogoutURI(db.Model):
__tablename__ = 'oauth2logout_uri'
id = Column(Integer, primary_key=True, autoincrement=True)
client_db_id = Column(Integer, ForeignKey('oauth2client.db_id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
method = Column(String(40), nullable=False, default='GET')
uri = Column(String(255), nullable=False)
@cleanup_task.delete_by_attribute('expired')
class OAuth2Grant(db.Model):
__tablename__ = 'oauth2grant'
id = Column(Integer, primary_key=True, autoincrement=True)
EXPIRES_IN = 100
expires = Column(DateTime, nullable=False, default=lambda: datetime.datetime.utcnow() + datetime.timedelta(seconds=OAuth2Grant.EXPIRES_IN))
session_id = Column(Integer(), ForeignKey('session.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
session = relationship('Session')
client_db_id = Column(Integer, ForeignKey('oauth2client.db_id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
client = relationship('OAuth2Client')
_code = Column('code', String(255), nullable=False, default=token_urlfriendly)
code = property(lambda self: f'{self.id}-{self._code}')
redirect_uri = Column(String(255), nullable=True)
nonce = Column(Text(), nullable=True)
scopes = Column('_scopes', CommaSeparatedList(), nullable=False, default=tuple())
_claims = Column('claims', Text(), nullable=True)
@property
def claims(self):
return json.loads(self._claims) if self._claims is not None else None
@claims.setter
def claims(self, value):
self._claims = json.dumps(value) if value is not None else None
@property
def service_user(self):
return ServiceUser.query.get((self.client.service_id, self.session.user_id))
@hybrid_property
def expired(self):
if self.expires is None:
return False
return self.expires < datetime.datetime.utcnow()
@classmethod
def get_by_authorization_code(cls, code):
# pylint: disable=protected-access
if '-' not in code:
return None
grant_id, grant_code = code.split('-', 2)
grant = cls.query.filter_by(id=grant_id, expired=False).first()
if not grant or not secrets.compare_digest(grant._code, grant_code):
return None
if grant.session.expired or grant.session.user.is_deactivated:
return None
if not grant.service_user or not grant.service_user.has_access:
return None
return grant
def make_token(self, **kwargs):
return OAuth2Token(
session=self.session,
client=self.client,
scopes=self.scopes,
claims=self.claims,
**kwargs
)
# OAuth2Token objects are cleaned-up when the session expires and is
# auto-deleted (or the user manually revokes it).
class OAuth2Token(db.Model):
__tablename__ = 'oauth2token'
id = Column(Integer, primary_key=True, autoincrement=True)
EXPIRES_IN = 3600
expires = Column(DateTime, nullable=False, default=lambda: datetime.datetime.utcnow() + datetime.timedelta(seconds=OAuth2Token.EXPIRES_IN))
session_id = Column(Integer(), ForeignKey('session.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
session = relationship('Session')
client_db_id = Column(Integer, ForeignKey('oauth2client.db_id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
client = relationship('OAuth2Client')
# currently only bearer is supported
token_type = Column(String(40), nullable=False, default='bearer')
_access_token = Column('access_token', String(255), unique=True, nullable=False, default=token_urlfriendly)
access_token = property(lambda self: f'{self.id}-{self._access_token}')
_refresh_token = Column('refresh_token', String(255), unique=True, nullable=False, default=token_urlfriendly)
refresh_token = property(lambda self: f'{self.id}-{self._refresh_token}')
scopes = Column('_scopes', CommaSeparatedList(), nullable=False, default=tuple())
_claims = Column('claims', Text(), nullable=True)
@property
def claims(self):
return json.loads(self._claims) if self._claims is not None else None
@claims.setter
def claims(self, value):
self._claims = json.dumps(value) if value is not None else None
@property
def service_user(self):
return ServiceUser.query.get((self.client.service_id, self.session.user_id))
@hybrid_property
def expired(self):
return self.expires < datetime.datetime.utcnow()
@classmethod
def get_by_access_token(cls, access_token):
# pylint: disable=protected-access
if '-' not in access_token:
return None
token_id, token_secret = access_token.split('-', 2)
token = cls.query.filter_by(id=token_id, expired=False).first()
if not token or not secrets.compare_digest(token._access_token, token_secret):
return None
if token.session.expired or token.session.user.is_deactivated:
return None
if not token.service_user or not token.service_user.has_access:
return None
return token
class OAuth2DeviceLoginInitiation(DeviceLoginInitiation):
__mapper_args__ = {
'polymorphic_identity': DeviceLoginType.OAUTH2
}
client_db_id = Column('oauth2_client_db_id', Integer, ForeignKey('oauth2client.db_id', onupdate='CASCADE', ondelete='CASCADE'))
client = relationship('OAuth2Client')
@property
def description(self):
return self.client.service.name
class OAuth2Key(db.Model):
__tablename__ = 'oauth2_key'
id = Column(String(64), primary_key=True, default=token_urlfriendly)
created = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
active = Column(Boolean(create_constraint=False), default=True, nullable=False)
algorithm = Column(String(32), nullable=False)
private_key_jwk = Column(Text(), nullable=False)
public_key_jwk = Column(Text(), nullable=False)
def __init__(self, **kwargs):
if kwargs.get('algorithm') and kwargs.get('private_key') \
and not kwargs.get('private_key_jwk') \
and not kwargs.get('public_key_jwk'):
algorithm = jwt.get_algorithm_by_name(kwargs['algorithm'])
private_key = kwargs.pop('private_key')
kwargs['private_key_jwk'] = algorithm.to_jwk(private_key)
kwargs['public_key_jwk'] = algorithm.to_jwk(private_key.public_key())
super().__init__(**kwargs)
@property
def private_key(self):
# pylint: disable=protected-access,import-outside-toplevel
# cryptography performs expensive checks when loading RSA private keys.
# Since we only load keys we generated ourselves with help of cryptography,
# these checks are unnecessary.
import cryptography.hazmat.backends.openssl
cryptography.hazmat.backends.openssl.backend._rsa_skip_check_key = True
res = jwt.get_algorithm_by_name(self.algorithm).from_jwk(self.private_key_jwk)
cryptography.hazmat.backends.openssl.backend._rsa_skip_check_key = False
return res
@property
def public_key(self):
return jwt.get_algorithm_by_name(self.algorithm).from_jwk(self.public_key_jwk)
@property
def public_key_jwks_dict(self):
res = json.loads(self.public_key_jwk)
res['kid'] = self.id
res['alg'] = self.algorithm
res['use'] = 'sig'
# RFC7517 4.3 "The "use" and "key_ops" JWK members SHOULD NOT be used together [...]"
res.pop('key_ops', None)
return res
def encode_jwt(self, payload):
if not self.active:
raise jwt.exceptions.InvalidKeyError(f'Key {self.id} not active')
res = jwt.encode(payload, key=self.private_key, algorithm=self.algorithm, headers={'kid': self.id})
# pyjwt pre-v2 compat (Buster/Bullseye)
if isinstance(res, bytes):
res = res.decode()
return res
# Hash algorithm for at_hash/c_hash from OpenID Connect Core 1.0 section 3.1.3.6
def oidc_hash(self, value):
# pylint: disable=import-outside-toplevel
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend # Only required for Buster
hash_alg = jwt.get_algorithm_by_name(self.algorithm).hash_alg
digest = hashes.Hash(hash_alg(), backend=default_backend())
digest.update(value)
return base64.urlsafe_b64encode(
digest.finalize()[:hash_alg.digest_size // 2]
).decode('ascii').rstrip('=')
@classmethod
def get_preferred_key(cls, algorithm='RS256'):
return cls.query.filter_by(active=True, algorithm=algorithm).order_by(OAuth2Key.created.desc()).first()
@classmethod
def get_available_algorithms(cls):
return ['RS256']
@classmethod
def decode_jwt(cls, data, algorithms=('RS256',), **kwargs):
headers = jwt.get_unverified_header(data)
if 'kid' not in headers:
raise jwt.exceptions.InvalidKeyError('JWT without kid')
kid = headers['kid']
key = cls.query.get(kid)
if not key:
raise jwt.exceptions.InvalidKeyError(f'Key {kid} not found')
if not key.active:
raise jwt.exceptions.InvalidKeyError(f'Key {kid} not active')
return jwt.decode(data, key=key.public_key, algorithms=algorithms, **kwargs)
@classmethod
def generate_rsa_key(cls, public_exponent=65537, key_size=3072):
# pylint: disable=import-outside-toplevel
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend # Only required for Buster
return cls(algorithm='RS256', private_key=rsa.generate_private_key(public_exponent=public_exponent, key_size=key_size, backend=default_backend()))
......@@ -5,16 +5,24 @@ import math
from flask import request
from flask_babel import gettext as _
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.ext.hybrid import hybrid_property
from uffd.tasks import cleanup_task
from uffd.database import db
@cleanup_task.delete_by_attribute('expired')
class RatelimitEvent(db.Model):
__tablename__ = 'ratelimit_event'
id = Column(Integer(), primary_key=True, autoincrement=True)
timestamp = Column(DateTime(), default=datetime.datetime.now)
name = Column(String(128))
timestamp = Column(DateTime(), default=datetime.datetime.utcnow, nullable=False)
expires = Column(DateTime(), nullable=False)
name = Column(String(128), nullable=False)
key = Column(String(128))
@hybrid_property
def expired(self):
return self.expires < datetime.datetime.utcnow()
class Ratelimit:
def __init__(self, name, interval, limit):
self.name = name
......@@ -22,25 +30,23 @@ class Ratelimit:
self.limit = limit
self.base = interval**(1/limit)
def cleanup(self):
limit = datetime.datetime.now() - datetime.timedelta(seconds=self.interval)
RatelimitEvent.query.filter(RatelimitEvent.name == self.name, RatelimitEvent.timestamp <= limit).delete()
db.session.commit()
def log(self, key=None):
db.session.add(RatelimitEvent(name=self.name, key=key))
db.session.add(RatelimitEvent(name=self.name, key=key, expires=datetime.datetime.utcnow() + datetime.timedelta(seconds=self.interval)))
db.session.commit()
def get_delay(self, key=None):
self.cleanup()
events = RatelimitEvent.query.filter(RatelimitEvent.name == self.name, RatelimitEvent.key == key).all()
events = RatelimitEvent.query\
.filter(db.not_(RatelimitEvent.expired))\
.filter_by(name=self.name, key=key)\
.order_by(RatelimitEvent.timestamp)\
.all()
if not events:
return 0
delay = math.ceil(self.base**len(events))
if delay < 5:
delay = 0
delay = min(delay, 365*24*60*60) # prevent overflow of datetime objetcs
remaining = events[0].timestamp + datetime.timedelta(seconds=delay) - datetime.datetime.now()
delay = min(delay, 365*24*60*60) # prevent overflow of datetime objects
remaining = events[0].timestamp + datetime.timedelta(seconds=delay) - datetime.datetime.utcnow()
return max(0, math.ceil(remaining.total_seconds()))
def get_addrkey(addr=None):
......
from sqlalchemy import Column, String, Integer, Text, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from sqlalchemy.orm.collections import MappedCollection, collection
from sqlalchemy.ext.declarative import declared_attr
from uffd.ldapalchemy.dbutils import DBRelationship
from uffd.database import db
from uffd.user.models import User, Group
from .user import User
class RoleGroup(db.Model):
__tablename__ = 'role-group'
role_id = Column(Integer(), ForeignKey('role.id'), primary_key=True)
group_dn = Column(String(128), primary_key=True)
requires_mfa = Column(Boolean(), default=False, nullable=False)
__tablename__ = 'role_groups'
role_id = Column(Integer(), ForeignKey('role.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True)
role = relationship('Role', back_populates='groups')
group_id = Column(Integer(), ForeignKey('group.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True)
group = relationship('Group')
requires_mfa = Column(Boolean(create_constraint=True), default=False, nullable=False)
role = relationship('Role')
group = DBRelationship('group_dn', Group)
class RoleUser(db.Model):
__tablename__ = 'role-user'
__table_args__ = (
db.UniqueConstraint('dn', 'role_id'),
)
id = Column(Integer(), primary_key=True, autoincrement=True)
dn = Column(String(128))
@declared_attr
def role_id(self):
return Column(ForeignKey('role.id'))
# pylint: disable=E1101
role_members = db.Table('role_members',
db.Column('role_id', db.Integer(), db.ForeignKey('role.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True),
db.Column('user_id', db.Integer(), db.ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True)
)
# pylint: disable=E1101
role_inclusion = db.Table('role-inclusion',
Column('role_id', Integer, ForeignKey('role.id'), primary_key=True),
Column('included_role_id', Integer, ForeignKey('role.id'), primary_key=True)
Column('role_id', Integer, ForeignKey('role.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True),
Column('included_role_id', Integer, ForeignKey('role.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True)
)
def flatten_recursive(objs, attr):
......@@ -72,8 +62,9 @@ def update_user_groups(user):
groups_added = groups - current_groups
groups_removed = current_groups - groups
for group in groups_removed:
user.groups.discard(group)
user.groups.update(groups_added)
user.groups.remove(group)
for group in groups_added:
user.groups.append(group)
return groups_added, groups_removed
User.update_groups = update_user_groups
......@@ -90,28 +81,27 @@ class RoleGroupMap(MappedCollection):
class Role(db.Model):
__tablename__ = 'role'
id = Column(Integer(), primary_key=True, autoincrement=True)
name = Column(String(32), unique=True)
description = Column(Text(), default='')
name = Column(String(32), unique=True, nullable=False)
description = Column(Text(), default='', nullable=False)
included_roles = relationship('Role', secondary=role_inclusion,
primaryjoin=id == role_inclusion.c.role_id,
secondaryjoin=id == role_inclusion.c.included_role_id,
backref='including_roles')
including_roles = [] # overwritten by backref
moderator_group_dn = Column(String(128), nullable=True)
moderator_group = DBRelationship('moderator_group_dn', Group)
moderator_group_id = Column(Integer(), ForeignKey('group.id', onupdate='CASCADE', ondelete='SET NULL'), nullable=True)
moderator_group = relationship('Group')
db_members = relationship("RoleUser", backref="role", cascade="all, delete-orphan")
members = DBRelationship('db_members', User, RoleUser, backattr='role', backref='roles')
members = relationship('User', secondary='role_members', back_populates='roles')
groups = relationship('RoleGroup', collection_class=RoleGroupMap, cascade='all, delete-orphan')
groups = relationship('RoleGroup', collection_class=RoleGroupMap, cascade='all, delete-orphan', back_populates='role')
# Roles that are managed externally (e.g. by Ansible) can be locked to
# prevent accidental editing of name, moderator group, included roles
# and groups as well as deletion in the web interface.
locked = Column(Boolean(), default=False, nullable=False)
locked = Column(Boolean(create_constraint=True), default=False, nullable=False)
is_default = Column(Boolean(), default=False, nullable=False)
is_default = Column(Boolean(create_constraint=True), default=False, nullable=False)
@property
def members_effective(self):
......
import datetime
from sqlalchemy import Column, String, DateTime, Integer, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.hybrid import hybrid_property
from uffd.database import db
from uffd.utils import token_urlfriendly
from uffd.tasks import cleanup_task
@cleanup_task.delete_by_attribute('expired')
class PasswordToken(db.Model):
__tablename__ = 'passwordToken'
id = Column(Integer(), primary_key=True, autoincrement=True)
token = Column(String(128), default=token_urlfriendly, nullable=False)
created = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
user_id = Column(Integer(), ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
user = relationship('User')
@hybrid_property
def expired(self):
if self.created is None:
return False
return self.created < datetime.datetime.utcnow() - datetime.timedelta(days=2)
import enum
from flask import current_app
from flask_babel import get_locale
from sqlalchemy import Column, Integer, String, ForeignKey, Boolean, Enum
from sqlalchemy.orm import relationship, validates
from uffd.database import db
from uffd.remailer import remailer
from uffd.tasks import cleanup_task
from .user import User, UserEmail, user_groups
class RemailerMode(enum.Enum):
DISABLED = 0
ENABLED_V1 = 1
ENABLED_V2 = 2
class Service(db.Model):
__tablename__ = 'service'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(255), unique=True, nullable=False)
# If limit_access is False, all users have access and access_group is
# ignored. This attribute exists for legacy API and OAuth2 clients that
# were migrated from config definitions where a missing "required_group"
# parameter meant no access restrictions. Representing this state by
# setting access_group_id to NULL would lead to a bad/unintuitive ondelete
# behaviour.
limit_access = Column(Boolean(create_constraint=True), default=True, nullable=False)
access_group_id = Column(Integer(), ForeignKey('group.id', onupdate='CASCADE', ondelete='SET NULL'), nullable=True)
access_group = relationship('Group')
oauth2_clients = relationship('OAuth2Client', back_populates='service', cascade='all, delete-orphan')
api_clients = relationship('APIClient', back_populates='service', cascade='all, delete-orphan')
remailer_mode = Column(Enum(RemailerMode, create_constraint=True), default=RemailerMode.DISABLED, nullable=False)
enable_email_preferences = Column(Boolean(create_constraint=True), default=False, nullable=False)
hide_deactivated_users = Column(Boolean(create_constraint=True), default=False, nullable=False)
class ServiceUser(db.Model):
'''Service-related configuration and state for a user
ServiceUser objects are auto-created whenever a new User or Service is
created, so there one for for every (Service, User) pair.
Service- or User-related code should always use ServiceUser in queries
instead of User/Service.'''
__tablename__ = 'service_user'
__table_args__ = (
db.PrimaryKeyConstraint('service_id', 'user_id'),
)
service_id = Column(Integer(), ForeignKey('service.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
service = relationship('Service', viewonly=True)
user_id = Column(Integer(), ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
user = relationship('User', viewonly=True)
@property
def has_access(self):
return not self.service.limit_access or self.service.access_group in self.user.groups
@property
def has_email_preferences(self):
return self.has_access and self.service.enable_email_preferences
remailer_overwrite_mode = Column(Enum(RemailerMode, create_constraint=True), default=None, nullable=True)
@property
def effective_remailer_mode(self):
if not remailer.configured:
return RemailerMode.DISABLED
if current_app.config['REMAILER_LIMIT_TO_USERS'] is not None:
if self.user.loginname not in current_app.config['REMAILER_LIMIT_TO_USERS']:
return RemailerMode.DISABLED
if self.remailer_overwrite_mode is not None:
return self.remailer_overwrite_mode
return self.service.remailer_mode
service_email_id = Column(Integer(), ForeignKey('user_email.id', onupdate='CASCADE', ondelete='SET NULL'))
service_email = relationship('UserEmail')
@validates('service_email')
def validate_service_email(self, key, value): # pylint: disable=unused-argument
if value is not None:
if not value.user:
value.user = self.user
if value.user != self.user:
raise ValueError('UserEmail assigned to ServiceUser.service_email is not associated with user')
if not value.verified:
raise ValueError('UserEmail assigned to serviceUser.service_email is not verified')
return value
# Actual e-mail address that mails from the service are sent to
@property
def real_email(self):
if self.has_email_preferences and self.service_email:
return self.service_email.address
return self.user.primary_email.address
@classmethod
def get_by_remailer_email(cls, address):
if not remailer.configured:
return None
result = remailer.parse_address(address)
if result is None:
return None
# result is (service_id, user_id), i.e. our primary key
return cls.query.get(result)
# E-Mail address as seen by the service
@property
def email(self):
if self.effective_remailer_mode == RemailerMode.ENABLED_V1:
return remailer.build_v1_address(self.service_id, self.user_id)
if self.effective_remailer_mode == RemailerMode.ENABLED_V2:
return remailer.build_v2_address(self.service_id, self.user_id)
return self.real_email
# User.primary_email and ServiceUser.service_email can only be set to
# verified addresses, so this should always return True
@property
def email_verified(self):
if self.effective_remailer_mode != RemailerMode.DISABLED:
return True
if self.has_email_preferences and self.service_email:
return self.service_email.verified
return self.user.primary_email.verified
@classmethod
def filter_query_by_email(cls, query, email):
'''Filter query of ServiceUser by ServiceUser.email'''
# pylint completely fails to understand SQLAlchemy's query functions
# pylint: disable=no-member,invalid-name,singleton-comparison
service_user = cls.get_by_remailer_email(email)
if service_user and service_user.email == email:
return query.filter(cls.user_id == service_user.user_id, cls.service_id == service_user.service_id)
AliasedUser = db.aliased(User)
AliasedPrimaryEmail = db.aliased(UserEmail)
AliasedServiceEmail = db.aliased(UserEmail)
AliasedService = db.aliased(Service)
aliased_user_groups = db.aliased(user_groups)
query = query.join(cls.user.of_type(AliasedUser))
query = query.join(AliasedUser.primary_email.of_type(AliasedPrimaryEmail))
query = query.outerjoin(cls.service_email.of_type(AliasedServiceEmail))
query = query.join(cls.service.of_type(AliasedService))
remailer_enabled = db.case(
whens=[
(db.not_(remailer.configured), False),
(
db.not_(AliasedUser.loginname.in_(current_app.config['REMAILER_LIMIT_TO_USERS']))
if current_app.config['REMAILER_LIMIT_TO_USERS'] is not None else db.and_(False),
False
),
(cls.remailer_overwrite_mode != None, cls.remailer_overwrite_mode != RemailerMode.DISABLED)
],
else_=(AliasedService.remailer_mode != RemailerMode.DISABLED)
)
has_access = db.or_(
db.not_(AliasedService.limit_access),
db.exists().where(db.and_(
aliased_user_groups.c.user_id == AliasedUser.id,
aliased_user_groups.c.group_id == AliasedService.access_group_id,
))
)
has_email_preferences = db.and_(
has_access,
AliasedService.enable_email_preferences,
)
real_email_matches = db.case(
whens=[
# pylint: disable=singleton-comparison
(db.and_(has_email_preferences, cls.service_email != None), AliasedServiceEmail.address == email),
],
else_=(AliasedPrimaryEmail.address == email)
)
return query.filter(db.and_(db.not_(remailer_enabled), real_email_matches))
@db.event.listens_for(db.Session, 'after_flush') # pylint: disable=no-member
def create_service_users(session, flush_context): # pylint: disable=unused-argument
# pylint completely fails to understand SQLAlchemy's query functions
# pylint: disable=no-member
new_user_ids = [user.id for user in session.new if isinstance(user, User)]
new_service_ids = [service.id for service in session.new if isinstance(service, Service)]
if not new_user_ids and not new_service_ids:
return
db.session.execute(db.insert(ServiceUser).from_select(
['service_id', 'user_id'],
db.select([Service.id, User.id]).select_from(db.join(Service, User, db.true())).where(db.or_(
Service.id.in_(new_service_ids),
User.id.in_(new_user_ids),
))
))
# On databases with write concurrency (i.e. everything but SQLite), the
# after_flush handler above is racy. So in rare cases ServiceUser objects
# might be missing.
@cleanup_task.handler
def create_missing_service_users():
# pylint completely fails to understand SQLAlchemy's query functions
# pylint: disable=no-member
db.session.execute(db.insert(ServiceUser).from_select(
['service_id', 'user_id'],
db.select([Service.id, User.id]).select_from(db.join(Service, User, db.true())).where(db.not_(
ServiceUser.query.filter(
ServiceUser.service_id == Service.id,
ServiceUser.user_id == User.id
).exists()
))
))
# The user-visible services show on the service overview page are read from
# the SERVICES config key. It is planned to gradually extend the Service model
# in order to finally replace the config-defined services.
def get_language_specific(data, field_name, default =''):
return data.get(field_name + '_' + get_locale().language, data.get(field_name, default))
# pylint: disable=too-many-branches
def get_services(user=None):
if not user and not current_app.config['SERVICES_PUBLIC']:
return []
services = []
for service_data in current_app.config['SERVICES']:
service_title = get_language_specific(service_data, 'title')
if not service_title:
continue
service_description = get_language_specific(service_data, 'description')
service = {
'title': service_title,
'subtitle': service_data.get('subtitle', ''),
'description': service_description,
'url': service_data.get('url', ''),
'logo_url': service_data.get('logo_url', ''),
'has_access': True,
'permission': '',
'groups': [],
'infos': [],
'links': [],
}
if service_data.get('required_group'):
if not user or not user.has_permission(service_data['required_group']):
service['has_access'] = False
for permission_data in service_data.get('permission_levels', []):
if permission_data.get('required_group'):
if not user or not user.has_permission(permission_data['required_group']):
continue
if not permission_data.get('name'):
continue
service['has_access'] = True
service['permission'] = permission_data['name']
if service_data.get('confidential', False) and not service['has_access']:
continue
for group_data in service_data.get('groups', []):
if group_data.get('required_group'):
if not user or not user.has_permission(group_data['required_group']):
continue
if not group_data.get('name'):
continue
service['groups'].append(group_data)
for info_data in service_data.get('infos', []):
if info_data.get('required_group'):
if not user or not user.has_permission(info_data['required_group']):
continue
info_title = get_language_specific(info_data, 'title')
info_html = get_language_specific(info_data, 'html')
if not info_title or not info_html:
continue
info_button_text = get_language_specific(info_data, 'button_text', info_title)
info = {
'title': info_title,
'button_text': info_button_text,
'html': info_html,
'id': '%d-%d'%(len(services), len(service['infos'])),
}
service['infos'].append(info)
for link_data in service_data.get('links', []):
if link_data.get('required_group'):
if not user or not user.has_permission(link_data['required_group']):
continue
if not link_data.get('url') or not link_data.get('title'):
continue
service['links'].append(link_data)
services.append(service)
return services
import datetime
import secrets
import math
import enum
from sqlalchemy import Column, String, Integer, DateTime, ForeignKey, Enum
from flask import current_app
from sqlalchemy import Column, String, Integer, DateTime, ForeignKey, Enum, Text, Boolean
from sqlalchemy.orm import relationship
from sqlalchemy.ext.hybrid import hybrid_property
from flask_babel import gettext as _
try:
from ua_parser import user_agent_parser
USER_AGENT_PARSER_SUPPORTED = True
except ImportError:
USER_AGENT_PARSER_SUPPORTED = False
from uffd.ldapalchemy.dbutils import DBRelationship
from uffd.database import db
from uffd.user.models import User
from uffd.utils import token_typeable
from uffd.tasks import cleanup_task
from uffd.password_hash import PasswordHashAttribute, HighEntropyPasswordHash
@cleanup_task.delete_by_attribute('expired')
class Session(db.Model):
__tablename__ = 'session'
id = Column(Integer(), primary_key=True, autoincrement=True)
_secret = Column('secret', Text)
secret = PasswordHashAttribute('_secret', HighEntropyPasswordHash)
user_id = Column(Integer(), ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
user = relationship('User', back_populates='sessions')
oauth2_grants = relationship('OAuth2Grant', back_populates='session', cascade='all, delete-orphan')
oauth2_tokens = relationship('OAuth2Token', back_populates='session', cascade='all, delete-orphan')
created = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
last_used = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
user_agent = Column(Text, nullable=False, default='')
ip_address = Column(Text)
mfa_done = Column(Boolean(create_constraint=True), default=False, nullable=False)
def token_typeable(nbytes=None):
'''Return random text token that is easy to type (on mobile)'''
alphabet = '123456789abcdefghkmnopqrstuvwx' # No '0ijlyz'
if nbytes is None:
nbytes = secrets.DEFAULT_ENTROPY
nbytes_per_char = math.log(len(alphabet), 256)
nchars = math.ceil(nbytes / nbytes_per_char)
return ''.join([secrets.choice(alphabet) for _ in range(nchars)])
@hybrid_property
def expired(self):
if self.created is None or self.last_used is None:
return False
if self.created < datetime.datetime.utcnow() - datetime.timedelta(seconds=current_app.config['SESSION_LIFETIME_SECONDS']):
return True
if self.last_used < datetime.datetime.utcnow() - current_app.permanent_session_lifetime:
return True
return False
@expired.expression
def expired(cls): # pylint: disable=no-self-argument
return db.or_(
cls.created < datetime.datetime.utcnow() - datetime.timedelta(seconds=current_app.config['SESSION_LIFETIME_SECONDS']),
cls.last_used < datetime.datetime.utcnow() - current_app.permanent_session_lifetime,
)
@property
def user_agent_browser(self):
# pylint: disable=too-many-return-statements
if USER_AGENT_PARSER_SUPPORTED and not getattr(self, 'DISABLE_USER_AGENT_PARSER', False):
family = user_agent_parser.ParseUserAgent(self.user_agent)['family']
return family if family != 'Other' else _('Unknown')
if ' OPR/' in self.user_agent:
return 'Opera'
if ' Edg/' in self.user_agent:
return 'Microsoft Edge'
if ' Safari/' in self.user_agent and ' Chrome/' not in self.user_agent:
return 'Safari'
if ' Chrome/' in self.user_agent:
return 'Chrome'
if ' Firefox/' in self.user_agent:
return 'Firefox'
return _('Unknown')
@property
def user_agent_platform(self):
if USER_AGENT_PARSER_SUPPORTED and not getattr(self, 'DISABLE_USER_AGENT_PARSER', False):
family = user_agent_parser.ParseOS(self.user_agent)['family']
return family if family != 'Other' else _('Unknown')
sysinfo = ([''] + self.user_agent.split('(', 1))[-1].split(')', 0)[0]
platforms = [
'Android', 'Linux', 'OpenBSD', 'FreeBSD', 'NetBSD', 'Windows', 'iPhone',
'iPad', 'Macintosh',
]
for platform in platforms:
if platform in sysinfo:
return platform
return _('Unknown')
# Device login provides a convenient and secure way to log into SSO-enabled
# services on a secondary device without entering the user password or
......@@ -66,6 +137,7 @@ def token_typeable(nbytes=None):
class DeviceLoginType(enum.Enum):
OAUTH2 = 0
@cleanup_task.delete_by_attribute('expired')
class DeviceLoginInitiation(db.Model):
'''Abstract initiation code class
......@@ -79,17 +151,16 @@ class DeviceLoginInitiation(db.Model):
existing and possibly attacker-controlled code).
An initiation code is securly bound to the session that it was created
with by storing both id and secret in the encrypted and authenticated
session cookie.'''
with by storing both id and secret in the authenticated session cookie.'''
__tablename__ = 'device_login_initiation'
id = Column(Integer(), primary_key=True, autoincrement=True)
type = Column(Enum(DeviceLoginType), nullable=False)
type = Column(Enum(DeviceLoginType, create_constraint=True), nullable=False)
code0 = Column(String(32), unique=True, nullable=False, default=lambda: token_typeable(3))
code1 = Column(String(32), unique=True, nullable=False, default=lambda: token_typeable(3))
secret = Column(String(128), nullable=False, default=lambda: secrets.token_hex(64))
confirmations = relationship('DeviceLoginConfirmation', back_populates='initiation', cascade='all, delete-orphan')
created = Column(DateTime, default=datetime.datetime.now, nullable=False)
created = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
__mapper_args__ = {
'polymorphic_on': type,
......@@ -103,7 +174,9 @@ class DeviceLoginInitiation(db.Model):
@hybrid_property
def expired(self):
return self.created < datetime.datetime.now() - datetime.timedelta(minutes=30)
if self.created is None:
return False
return self.created < datetime.datetime.utcnow() - datetime.timedelta(minutes=30)
@property
def description(self):
......@@ -114,7 +187,7 @@ class DeviceLoginConfirmation(db.Model):
A confirmation code is generated and displayed when an authenticated user
enters an initiation code and confirms the device login attempt. Every
instance is bound to both an initiation code and a user.
instance is bound to both an initiation code and a login session.
The code attribute is formed out of two indepentently unique parts
to ensure that at any time all existing codes differ in at least two
......@@ -123,10 +196,12 @@ class DeviceLoginConfirmation(db.Model):
__tablename__ = 'device_login_confirmation'
id = Column(Integer(), primary_key=True, autoincrement=True)
initiation_id = Column(Integer(), ForeignKey('device_login_initiation.id'), nullable=False)
initiation_id = Column(Integer(), ForeignKey('device_login_initiation.id',
name='fk_device_login_confirmation_initiation_id_',
onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
initiation = relationship('DeviceLoginInitiation', back_populates='confirmations')
user_dn = Column(String(128), nullable=False, unique=True)
user = DBRelationship('user_dn', User)
session_id = Column(Integer(), ForeignKey('session.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False, unique=True)
session = relationship('Session')
code0 = Column(String(32), nullable=False, default=lambda: token_typeable(1))
code1 = Column(String(32), nullable=False, default=lambda: token_typeable(1))
......
import secrets
import datetime
from crypt import crypt
from sqlalchemy import Column, String, Text, DateTime
from flask_babel import gettext as _
from sqlalchemy import Column, Integer, String, DateTime, Text, ForeignKey
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import relationship, backref
from sqlalchemy.ext.hybrid import hybrid_property
from uffd.ldapalchemy.dbutils import DBRelationship
from uffd.tasks import cleanup_task
from uffd.utils import token_urlfriendly
from uffd.password_hash import PasswordHashAttribute, LowEntropyPasswordHash
from uffd.database import db
from uffd.ldap import ldap
from uffd.user.models import User
from .user import User
@cleanup_task.delete_by_attribute('expired_and_not_completed')
class Signup(db.Model):
'''Model that represents a self-signup request
......@@ -23,17 +27,19 @@ class Signup(db.Model):
address does not allow a third party to complete the signup procedure and
set a new password with the (also mail-based) password reset functionality.
As long as they are not completed, signup requests have no effect each other
or different parts of the application.'''
As long as they are not completed, signup requests have no effect on each
other or different parts of the application.'''
__tablename__ = 'signup'
token = Column(String(128), primary_key=True, default=lambda: secrets.token_urlsafe(48))
created = Column(DateTime, default=datetime.datetime.now, nullable=False)
id = Column(Integer(), primary_key=True, autoincrement=True)
token = Column(String(128), default=token_urlfriendly, nullable=False)
created = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
loginname = Column(Text)
displayname = Column(Text)
mail = Column(Text)
pwhash = Column(Text)
user_dn = Column(String(128)) # Set after successful confirmation
user = DBRelationship('user_dn', User, backref='signups')
_password = Column('pwhash', Text)
password = PasswordHashAttribute('_password', LowEntropyPasswordHash)
user_id = Column(Integer(), ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=True, unique=True)
user = relationship('User', backref=backref('signups', cascade='all, delete-orphan'))
type = Column(String(50))
__mapper_args__ = {
......@@ -41,23 +47,26 @@ class Signup(db.Model):
'polymorphic_on': type
}
# Write-only property
def password(self, value):
def set_password(self, value):
if not User().set_password(value):
return
self.pwhash = crypt(value)
password = property(fset=password)
return False
self.password = value
return True
def check_password(self, value):
return self.pwhash is not None and crypt(value, self.pwhash) == self.pwhash
@property
@hybrid_property
def expired(self):
return self.created is not None and datetime.datetime.now() >= self.created + datetime.timedelta(hours=48)
if self.created is None:
return False
return self.created < datetime.datetime.utcnow() - datetime.timedelta(hours=48)
@property
@hybrid_property
def completed(self):
return self.user_dn is not None
# pylint: disable=singleton-comparison
return self.user_id != None
@hybrid_property
def expired_and_not_completed(self):
return db.and_(self.expired, db.not_(self.completed))
def validate(self): # pylint: disable=too-many-return-statements
'''Return whether the signup request is valid and Signup.finish is likely to succeed
......@@ -66,18 +75,18 @@ class Signup(db.Model):
is False and `errmsg` contains a string describing why. Otherwise
`valid` is True.'''
if self.completed or self.expired:
return False, 'Invalid signup request'
return False, _('Invalid signup request')
if not User().set_loginname(self.loginname):
return False, 'Login name is invalid'
return False, _('Login name is invalid')
if not User().set_displayname(self.displayname):
return False, 'Display name is invalid'
if not User().set_mail(self.mail):
return False, 'Mail address is invalid'
if self.pwhash is None:
return False, 'Invalid password'
return False, _('Display name is invalid')
if not User().set_primary_email_address(self.mail):
return False, _('E-Mail address is invalid')
if not self.password:
return False, _('Invalid password')
if User.query.filter_by(loginname=self.loginname).all():
return False, 'A user with this login name already exists'
return True, 'Valid'
return False, _('A user with this login name already exists')
return True, _('Valid')
def finish(self, password):
'''Complete the signup procedure and return the new user
......@@ -91,17 +100,23 @@ class Signup(db.Model):
`errmsg` contains a string describing why. Otherwise `user` is a
User object.'''
if self.completed or self.expired:
return None, 'Invalid signup request'
if not self.check_password(password):
return None, 'Wrong password'
return None, _('Invalid signup request')
if not self.password.verify(password):
return None, _('Wrong password')
if User.query.filter_by(loginname=self.loginname).all():
return None, 'A user with this login name already exists'
user = User(loginname=self.loginname, displayname=self.displayname, mail=self.mail, password=password)
ldap.session.add(user)
user.update_groups()
return None, _('A user with this login name already exists')
# Flush to make sure the flush below does not catch unrelated errors
db.session.flush()
user = User(loginname=self.loginname, displayname=self.displayname, primary_email_address=self.mail, password=self.password)
db.session.add(user)
try:
db.session.flush()
except IntegrityError:
return None, _('Login name or e-mail address is already in use')
user.update_groups() # pylint: disable=no-member
self.user = user
self.loginname = None
self.displayname = None
self.mail = None
self.pwhash = None
return user, 'Success'
self.password = None
return user, _('Success')
import string
import re
import datetime
import unicodedata
from flask import current_app, escape
from flask_babel import lazy_gettext
from sqlalchemy import Column, Integer, String, ForeignKey, Boolean, Text, DateTime
from sqlalchemy.orm import relationship, validates
from sqlalchemy.ext.hybrid import hybrid_property
from uffd.database import db
from uffd.remailer import remailer
from uffd.utils import token_urlfriendly
from uffd.password_hash import PasswordHashAttribute, LowEntropyPasswordHash, HighEntropyPasswordHash
from .misc import FeatureFlag, Lock
class IDRangeExhaustedError(Exception):
pass
class IDAlreadyAllocatedError(ValueError):
pass
# Helper class for UID/GID allocation that prevents reuse even if
# users/groups are deleted.
#
# To keep track of formerly used UIDs/GIDs, they are always also added to
# uid/gid allocation tables. Rows in these tables are never deleted.
# User/group tables have foreign key constraints to ensure that there can
# only ever be three cases for a given ID:
#
# 1. The ID was never used (does not exist in either user/group or allocation
# table)
# 2. The ID was used, but the user/group was deleted (it does not exist in
# user/group table, but it exists in the allocation table)
# 3. The ID is in use (it exists in both the user/group and the allocation
# table)
#
# For auto-allocation, there are a few edge cases to consider:
#
# 1. GIDs can be chosen freely in the web interface, e.g. one could easily
# create a group with the last GID in range.
# 2. For UIDs there are two ranges (for regular users and for service users).
# The ranges may either be the same or they may be different but
# non-overlapping.
# 3. ID ranges can be changed (e.g. extended to either side if the old range
# is exhausted). Existing IDs should not change.
#
# The approach we use here is to always auto-allocate the first unused id
# in range. This approach handles the three edge cases well and even behaves
# sanely in unsupported configurations like different but overlapping UID
# ranges.
class IDAllocator:
# pylint completely fails to understand SQLAlchemy's query functions
# pylint: disable=no-member
def __init__(self, name):
self.name = name
self.lock = Lock(f'{name}_allocation')
self.allocation_table = db.Table(f'{name}_allocation', db.Column('id', db.Integer(), primary_key=True))
def allocate(self, id):
self.lock.acquire()
result = db.session.execute(
db.select([self.allocation_table.c.id])
.where(self.allocation_table.c.id == id)
).scalar()
if result is not None:
raise IDAlreadyAllocatedError(f'Cannot allocate {self.name}: {id} is in use or was used in the past')
db.session.execute(db.insert(self.allocation_table).values(id=id))
def auto(self, min_id, max_id):
'''Auto-allocate and return an unused id in range'''
self.lock.acquire()
# We cannot easily iterate through a large range of numbers with generic
# SQL statements looking for unused ids. So to find the first unused id in
# range, we look for the first used id in range that is followed by an
# unused id. This does not work if there are no used ids in range (returns
# NULL) or if min_id is unused (returns higher id while it should return
# min_id). To fix this we also check if min_id is used or not.
tmp = db.aliased(self.allocation_table)
first_unused_id = db.session.execute(
db.select([db.func.min(self.allocation_table.c.id + 1)])
.where(self.allocation_table.c.id >= min_id)
.where(db.not_(db.exists().where(tmp.c.id == self.allocation_table.c.id + 1)))
).scalar()
min_id_used = db.session.execute(
db.select([db.exists()
.where(self.allocation_table.c.id == min_id)])
).scalar()
if not min_id_used:
first_unused_id = min_id
if first_unused_id > max_id:
raise IDRangeExhaustedError(f'Cannot auto-allocate {self.name}: Range is exhausted')
db.session.execute(db.insert(self.allocation_table).values(id=first_unused_id))
return first_unused_id
def user_unix_uid_default(context):
if context.get_current_parameters()['is_service_user']:
min_uid = current_app.config['USER_SERVICE_MIN_UID']
max_uid = current_app.config['USER_SERVICE_MAX_UID']
else:
min_uid = current_app.config['USER_MIN_UID']
max_uid = current_app.config['USER_MAX_UID']
return User.unix_uid_allocator.auto(min_uid, max_uid)
class User(db.Model):
# Allows 8 to 256 ASCII letters (lower and upper case), digits, spaces and
# symbols/punctuation characters. It disallows control characters and
# non-ASCII characters to prevent setting passwords considered invalid by
# SASLprep.
#
# This REGEX ist used both in Python and JS.
PASSWORD_REGEX = '[ -~]*'
PASSWORD_MINLEN = 8
PASSWORD_MAXLEN = 256
PASSWORD_DESCRIPTION = lazy_gettext('At least %(minlen)d and at most %(maxlen)d characters. ' + \
'Only letters, digits, spaces and some symbols (<code>%(symbols)s</code>) allowed. ' + \
'Please use a password manager.',
minlen=PASSWORD_MINLEN, maxlen=PASSWORD_MAXLEN, symbols=escape('!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'))
__tablename__ = 'user'
id = Column(Integer(), primary_key=True, autoincrement=True)
unix_uid_allocator = IDAllocator('uid')
unix_uid = Column(Integer(), ForeignKey('uid_allocation.id'), unique=True, nullable=False, default=user_unix_uid_default)
@validates('unix_uid')
def validate_unix_uid(self, key, value): # pylint: disable=unused-argument
if self.unix_uid != value and value is not None:
self.unix_uid_allocator.allocate(value)
return value
loginname = Column(String(32), unique=True, nullable=False)
displayname = Column(String(128), nullable=False)
all_emails = relationship(
'UserEmail',
foreign_keys='UserEmail.user_id',
cascade='all, delete-orphan',
back_populates='user',
post_update=True,
)
verified_emails = relationship(
'UserEmail',
foreign_keys='UserEmail.user_id',
viewonly=True,
primaryjoin='and_(User.id == UserEmail.user_id, UserEmail.verified)',
)
primary_email_id = Column(Integer(), ForeignKey('user_email.id', onupdate='CASCADE'), nullable=False)
primary_email = relationship('UserEmail', foreign_keys='User.primary_email_id')
# recovery_email_id == NULL -> use primary email
recovery_email_id = Column(Integer(), ForeignKey('user_email.id', onupdate='CASCADE', ondelete='SET NULL'))
recovery_email = relationship('UserEmail', foreign_keys='User.recovery_email_id')
@validates('primary_email', 'recovery_email')
def validate_email(self, key, value):
if value is not None:
if not value.user:
value.user = self
if value.user != self:
raise ValueError(f'UserEmail assigned to User.{key} is not associated with user')
if not value.verified:
raise ValueError(f'UserEmail assigned to User.{key} is not verified')
return value
_password = Column('pwhash', Text(), nullable=True)
password = PasswordHashAttribute('_password', LowEntropyPasswordHash)
is_service_user = Column(Boolean(create_constraint=True), default=False, nullable=False)
is_deactivated = Column(Boolean(create_constraint=True), default=False, nullable=False)
groups = relationship('Group', secondary='user_groups', back_populates='members')
roles = relationship('Role', secondary='role_members', back_populates='members')
service_users = relationship('ServiceUser', viewonly=True)
sessions = relationship('Session', back_populates='user', cascade='all, delete-orphan')
def __init__(self, primary_email_address=None, **kwargs):
super().__init__(**kwargs)
if primary_email_address is not None:
self.primary_email = UserEmail(address=primary_email_address, verified=True)
@property
def unix_gid(self):
return current_app.config['USER_GID']
def is_in_group(self, name):
if not name:
return True
for group in self.groups:
if group.name == name:
return True
return False
def has_permission(self, required_group=None):
if not required_group:
return True
group_names = {group.name for group in self.groups}
group_sets = required_group
if isinstance(group_sets, str):
group_sets = [group_sets]
for group_set in group_sets:
if isinstance(group_set, str):
group_set = [group_set]
if set(group_set) - group_names == set():
return True
return False
def set_loginname(self, value, ignore_blocklist=False):
if len(value) > 32 or len(value) < 1:
return False
for char in value:
if not char in string.ascii_lowercase + string.digits + '_-':
return False
if not ignore_blocklist:
for expr in current_app.config['LOGINNAME_BLOCKLIST']:
if re.match(expr, value):
return False
self.loginname = value
return True
def set_displayname(self, value):
if len(value) > 128 or len(value) < 1:
return False
self.displayname = value
return True
def set_password(self, value):
if len(value) < self.PASSWORD_MINLEN or len(value) > self.PASSWORD_MAXLEN or not re.fullmatch(self.PASSWORD_REGEX, value):
return False
self.password = value
return True
def set_primary_email_address(self, address):
# UserEmail.query.filter_by(user=self, address=address).first() would cause
# a flush, so we do this in python. A flush would cause an IntegrityError if
# this method is used a new User object, since primary_email_id is not
# nullable.
email = ([item for item in self.all_emails if item.address == address] or [None])[0]
if not email:
email = UserEmail()
if not email.set_address(address):
return False
email.verified = True
self.primary_email = email
return True
# Somehow pylint non-deterministically fails to detect that .update_groups is set in role.models
def update_groups(self):
pass
class UserEmail(db.Model):
__tablename__ = 'user_email'
id = Column(Integer(), primary_key=True, autoincrement=True)
# We have a cyclic dependency between User.primary_email and UserEmail.user.
# To solve this, we make UserEmail.user nullable, add validators, and set
# post_update=True here and for the backref.
user_id = Column(Integer(), ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE', use_alter=True))
user = relationship('User', foreign_keys='UserEmail.user_id', back_populates='all_emails', post_update=True)
@validates('user')
def validate_user(self, key, value): # pylint: disable=unused-argument
if self.user is not None and self.user != value:
raise ValueError('UserEmail.user cannot be changed once set')
return value
@classmethod
def normalize_address(cls, value):
return unicodedata.normalize('NFKC', value).lower().strip()
address = Column(String(128), nullable=False)
address_normalized = Column(String(128), nullable=False)
@validates('address')
def validate_address(self, key, value): # pylint: disable=unused-argument
if self.address is not None and self.address != value:
raise ValueError('UserEmail.address cannot be changed once set')
self.address_normalized = self.normalize_address(value)
return value
# True or None/NULL (not False, see constraints below)
_verified = Column('verified', Boolean(create_constraint=True), nullable=True)
@hybrid_property
def verified(self):
# pylint: disable=singleton-comparison
return self._verified != None
@verified.setter
def verified(self, value):
if self._verified and not value:
raise ValueError('UserEmail cannot be unverified once verified')
self._verified = True if value else None
verification_legacy_id = Column(Integer()) # id of old MailToken
_verification_secret = Column('verification_secret', Text())
verification_secret = PasswordHashAttribute('_verification_secret', HighEntropyPasswordHash)
verification_expires = Column(DateTime)
# Until uffd v3, we make the stricter unique constraints optional, by having
# enable_strict_constraints act as a switch to enable/disable the constraints
# on a per-row basis.
# True or None/NULL if disabled (not False, see constraints below)
enable_strict_constraints = Column(
Boolean(create_constraint=True),
nullable=True,
default=db.select([db.case([(FeatureFlag.unique_email_addresses.expr, True)], else_=None)])
)
# The unique constraints rely on the common interpretation of SQL92, that if
# any column in a unique constraint is NULL, the unique constraint essentially
# does not apply to the row. This is how SQLite, MySQL/MariaDB, PostgreSQL and
# other common databases behave. A few others like Microsoft SQL Server do not
# follow this, but we don't support them anyway.
__table_args__ = (
# A user cannot have the same address more than once, regardless of verification status
db.UniqueConstraint('user_id', 'address', name='uq_user_email_user_id_address'), # Legacy, to be removed in v3
# Same unique constraint as uq_user_email_user_id_address, but with
# address_normalized instead of address. Only active if
# enable_strict_constraints is not NULL.
db.UniqueConstraint('user_id', 'address_normalized', 'enable_strict_constraints',
name='uq_user_email_user_id_address_normalized'),
# The same verified address can only exist once. Only active if
# enable_strict_constraints is not NULL. Unverified addresses are ignored,
# since verified is NULL in that case.
db.UniqueConstraint('address_normalized', 'verified', 'enable_strict_constraints',
name='uq_user_email_address_normalized_verified'),
)
def set_address(self, value):
if len(value) < 3 or '@' not in value:
return False
domain = value.rsplit('@', 1)[-1]
if remailer.is_remailer_domain(domain):
return False
self.address = value
return True
def start_verification(self):
if self.verified:
raise Exception('UserEmail.start_verification must not be called if address is already verified')
self.verification_legacy_id = None
secret = token_urlfriendly()
self.verification_secret = secret
self.verification_expires = datetime.datetime.utcnow() + datetime.timedelta(days=2)
return secret
@hybrid_property
def verification_expired(self):
if self.verification_expires is None:
return True
return self.verification_expires < datetime.datetime.utcnow()
def finish_verification(self, secret):
# pylint: disable=using-constant-test,no-member
if self.verification_expired:
return False
if not self.verification_secret.verify(secret):
return False
self.verification_legacy_id = None
self.verification_secret = None
self.verification_expires = None
self.verified = True
return True
@FeatureFlag.unique_email_addresses.enable_hook
def enable_unique_email_addresses():
UserEmail.query.update({UserEmail.enable_strict_constraints: True})
@FeatureFlag.unique_email_addresses.disable_hook
def disable_unique_email_addresses():
UserEmail.query.update({UserEmail.enable_strict_constraints: None})
# pylint: disable=E1101
user_groups = db.Table('user_groups',
Column('user_id', Integer(), ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True),
Column('group_id', Integer(), ForeignKey('group.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True)
)
def group_unix_gid_default():
return Group.unix_gid_allocator.auto(current_app.config['GROUP_MIN_GID'], current_app.config['GROUP_MAX_GID'])
class Group(db.Model):
__tablename__ = 'group'
id = Column(Integer(), primary_key=True, autoincrement=True)
unix_gid_allocator = IDAllocator('gid')
unix_gid = Column(Integer(), ForeignKey('gid_allocation.id'), unique=True, nullable=False, default=group_unix_gid_default)
@validates('unix_gid')
def validate_unix_gid(self, key, value): # pylint: disable=unused-argument
if self.unix_gid != value and value is not None:
self.unix_gid_allocator.allocate(value)
return value
name = Column(String(32), unique=True, nullable=False)
description = Column(String(128), nullable=False, default='')
members = relationship('User', secondary='user_groups', back_populates='groups')
def set_name(self, value):
if len(value) > 32 or len(value) < 1:
return False
for char in value:
if not char in string.ascii_lowercase + string.digits + '_-':
return False
self.name = value
return True
def setup_navbar(app):
def setup_navbar(app, positions):
app.navbarPositions = positions
app.navbarList = []
app.jinja_env.globals['getnavbar'] = lambda: [n for n in app.navbarList if n['visible']()]
......@@ -7,9 +8,11 @@ def setup_navbar(app):
# or 'fa'
# ( see: http://fontawesome.io/icons/ )
# visible is a function that returns "True" if this icon should be visible in the calling context
# pylint: disable=too-many-arguments
def register_navbar(name, iconlib='fa', icon=None, group=None, endpoint=None, blueprint=None, visible=None):
def wrapper(func):
def deferred_call(state):
assert blueprint
urlendpoint = endpoint
if not endpoint:
# pylint: disable=protected-access
......@@ -26,20 +29,14 @@ def register_navbar(name, iconlib='fa', icon=None, group=None, endpoint=None, bl
item['name'] = name
item['blueprint'] = blueprint
item['visible'] = visible or (lambda: True)
item['position'] = 99
if blueprint.name in state.app.navbarPositions:
item['position'] = state.app.navbarPositions.index(blueprint.name)
else:
item['visible'] = lambda: False
state.app.navbarList.append(item)
if blueprint:
blueprint.record_once(deferred_call)
else:
class StateMock:
def __init__(self, app):
self.app = app
# pylint: disable=C0415
from flask import current_app
# pylint: enable=C0415
deferred_call(StateMock(current_app))
state.app.navbarList.sort(key=lambda item: item['position'])
blueprint.record_once(deferred_call)
return func
return wrapper
from .views import bp as _bp
bp = [_bp]
from flask import current_app
from sqlalchemy import Column, Integer, String, DateTime, Text
from uffd.ldapalchemy.dbutils import DBRelationship
from uffd.database import db
from uffd.user.models import User
from uffd.session.models import DeviceLoginInitiation, DeviceLoginType
class OAuth2Client:
def __init__(self, client_id, client_secret, redirect_uris, required_group=None, logout_urls=None):
self.client_id = client_id
self.client_secret = client_secret
# We only support the Authorization Code Flow for confidential (server-side) clients
self.client_type = 'confidential'
self.redirect_uris = redirect_uris
self.default_scopes = ['profile']
self.required_group = required_group
self.logout_urls = []
for url in (logout_urls or []):
if isinstance(url, str):
self.logout_urls.append(['GET', url])
else:
self.logout_urls.append(url)
@classmethod
def from_id(cls, client_id):
return OAuth2Client(client_id, **current_app.config['OAUTH2_CLIENTS'][client_id])
@property
def default_redirect_uri(self):
return self.redirect_uris[0]
def access_allowed(self, user):
return user.has_permission(self.required_group)
class OAuth2Grant(db.Model):
__tablename__ = 'oauth2grant'
id = Column(Integer, primary_key=True)
user_dn = Column(String(128))
user = DBRelationship('user_dn', User, backref='oauth2_grants')
client_id = Column(String(40))
@property
def client(self):
return OAuth2Client.from_id(self.client_id)
@client.setter
def client(self, newclient):
self.client_id = newclient.client_id
code = Column(String(255), index=True, nullable=False)
redirect_uri = Column(String(255))
expires = Column(DateTime)
_scopes = Column(Text)
@property
def scopes(self):
if self._scopes:
return self._scopes.split()
return []
def delete(self):
db.session.delete(self)
db.session.commit()
return self
class OAuth2Token(db.Model):
__tablename__ = 'oauth2token'
id = Column(Integer, primary_key=True)
user_dn = Column(String(128))
user = DBRelationship('user_dn', User, backref='oauth2_tokens')
client_id = Column(String(40))
@property
def client(self):
return OAuth2Client.from_id(self.client_id)
@client.setter
def client(self, newclient):
self.client_id = newclient.client_id
# currently only bearer is supported
token_type = Column(String(40))
access_token = Column(String(255), unique=True)
refresh_token = Column(String(255), unique=True)
expires = Column(DateTime)
_scopes = Column(Text)
@property
def scopes(self):
if self._scopes:
return self._scopes.split()
return []
def delete(self):
db.session.delete(self)
db.session.commit()
return self
class OAuth2DeviceLoginInitiation(DeviceLoginInitiation):
__mapper_args__ = {
'polymorphic_identity': DeviceLoginType.OAUTH2
}
oauth2_client_id = Column(String(40))
@property
def oauth2_client(self):
return OAuth2Client.from_id(self.oauth2_client_id)
@property
def description(self):
return self.oauth2_client.client_id
import datetime
import functools
import urllib.parse
from flask import Blueprint, request, jsonify, render_template, session, redirect, url_for, flash
from flask_oauthlib.provider import OAuth2Provider
from flask_babel import gettext as _
from sqlalchemy.exc import IntegrityError
from uffd.ratelimit import host_ratelimit, format_delay
from uffd.database import db
from uffd.secure_redirect import secure_local_redirect
from uffd.session.models import DeviceLoginConfirmation
from .models import OAuth2Client, OAuth2Grant, OAuth2Token, OAuth2DeviceLoginInitiation
oauth = OAuth2Provider()
@oauth.clientgetter
def load_client(client_id):
return OAuth2Client.from_id(client_id)
@oauth.grantgetter
def load_grant(client_id, code):
return OAuth2Grant.query.filter_by(client_id=client_id, code=code).first()
@oauth.grantsetter
def save_grant(client_id, code, oauthreq, *args, **kwargs): # pylint: disable=unused-argument
expires = datetime.datetime.utcnow() + datetime.timedelta(seconds=100)
grant = OAuth2Grant(user_dn=request.oauth2_user.dn, client_id=client_id,
code=code['code'], redirect_uri=oauthreq.redirect_uri, expires=expires, _scopes=' '.join(oauthreq.scopes))
db.session.add(grant)
db.session.commit()
return grant
@oauth.tokengetter
def load_token(access_token=None, refresh_token=None):
if access_token:
return OAuth2Token.query.filter_by(access_token=access_token).first()
if refresh_token:
return OAuth2Token.query.filter_by(refresh_token=refresh_token).first()
return None
@oauth.tokensetter
def save_token(token_data, oauthreq, *args, **kwargs): # pylint: disable=unused-argument
OAuth2Token.query.filter_by(client_id=oauthreq.client.client_id, user_dn=oauthreq.user.dn).delete()
expires_in = token_data.get('expires_in')
expires = datetime.datetime.utcnow() + datetime.timedelta(seconds=expires_in)
tok = OAuth2Token(
user_dn=oauthreq.user.dn,
client_id=oauthreq.client.client_id,
token_type=token_data['token_type'],
access_token=token_data['access_token'],
refresh_token=token_data['refresh_token'],
expires=expires,
_scopes=' '.join(oauthreq.scopes)
)
db.session.add(tok)
db.session.commit()
return tok
bp = Blueprint('oauth2', __name__, url_prefix='/oauth2/', template_folder='templates')
@bp.record
def init(state):
state.app.config.setdefault('OAUTH2_PROVIDER_ERROR_ENDPOINT', 'oauth2.error')
oauth.init_app(state.app)
# flask-oauthlib has the bug to require the scope parameter for authorize
# requests, which is actually optional according to the OAuth2.0 spec.
# We don't really use scopes and this requirement just complicates the
# configuration of clients.
# See also: https://github.com/lepture/flask-oauthlib/pull/320
def inject_scope(func):
@functools.wraps(func)
def decorator(*args, **kwargs):
args = request.args.to_dict()
if not args.get('scope'):
args['scope'] = 'profile'
return redirect(request.base_url+'?'+urllib.parse.urlencode(args))
return func(*args, **kwargs)
return decorator
@bp.route('/authorize', methods=['GET', 'POST'])
@inject_scope
@oauth.authorize_handler
def authorize(*args, **kwargs): # pylint: disable=unused-argument
client = kwargs['request'].client
request.oauth2_user = None
if request.user:
request.oauth2_user = request.user
elif 'devicelogin_started' in session:
del session['devicelogin_started']
host_delay = host_ratelimit.get_delay()
if host_delay:
flash(_('We received too many requests from your ip address/network! Please wait at least %(delay)s.', delay=format_delay(host_delay)))
return redirect(url_for('session.login', ref=request.full_path, devicelogin=True))
host_ratelimit.log()
initiation = OAuth2DeviceLoginInitiation(oauth2_client_id=client.client_id)
db.session.add(initiation)
try:
db.session.commit()
except IntegrityError:
flash(_('Device login is currently not available. Try again later!'))
return redirect(url_for('session.login', ref=request.values['ref'], devicelogin=True))
session['devicelogin_id'] = initiation.id
session['devicelogin_secret'] = initiation.secret
return redirect(url_for('session.devicelogin', ref=request.full_path))
elif 'devicelogin_id' in session and 'devicelogin_secret' in session and 'devicelogin_confirmation' in session:
initiation = OAuth2DeviceLoginInitiation.query.filter_by(id=session['devicelogin_id'], secret=session['devicelogin_secret'],
oauth2_client_id=client.client_id).one_or_none()
confirmation = DeviceLoginConfirmation.query.get(session['devicelogin_confirmation'])
del session['devicelogin_id']
del session['devicelogin_secret']
del session['devicelogin_confirmation']
if not initiation or initiation.expired or not confirmation:
flash('Device login failed')
return redirect(url_for('session.login', ref=request.full_path, devicelogin=True))
request.oauth2_user = confirmation.user
db.session.delete(initiation)
db.session.commit()
else:
return redirect(url_for('session.login', ref=request.full_path, devicelogin=True))
# Here we would normally ask the user, if he wants to give the requesting
# service access to his data. Since we only have trusted services (the
# clients defined in the server config), we don't ask for consent.
session['oauth2-clients'] = session.get('oauth2-clients', [])
if client.client_id not in session['oauth2-clients']:
session['oauth2-clients'].append(client.client_id)
return client.access_allowed(request.oauth2_user)
@bp.route('/token', methods=['GET', 'POST'])
@oauth.token_handler
def token():
return None
@bp.route('/userinfo')
@oauth.require_oauth('profile')
def userinfo():
user = request.oauth.user
# We once exposed the entryUUID here as "ldap_uuid" until realising that it
# can (and does!) change randomly and is therefore entirely useless as an
# indentifier.
return jsonify(
id=user.uid,
name=user.displayname,
nickname=user.loginname,
email=user.mail,
ldap_dn=user.dn,
groups=[group.name for group in user.groups]
)
@bp.route('/error')
def error():
args = dict(request.values)
err = args.pop('error', 'unknown')
error_description = args.pop('error_description', '')
return render_template('oauth2/error.html', error=err, error_description=error_description, args=args)
@bp.app_url_defaults
def inject_logout_params(endpoint, values):
if endpoint != 'oauth2.logout' or not session.get('oauth2-clients'):
return
values['client_ids'] = ','.join(session['oauth2-clients'])
@bp.route('/logout')
def logout():
if not request.values.get('client_ids'):
return secure_local_redirect(request.values.get('ref', '/'))
client_ids = request.values['client_ids'].split(',')
clients = [OAuth2Client.from_id(client_id) for client_id in client_ids]
return render_template('oauth2/logout.html', clients=clients)
import secrets
import hashlib
import base64
from crypt import crypt # pylint: disable=deprecated-module
import argon2
def build_value(method_name, data):
return '{' + method_name + '}' + data
def parse_value(value):
if value is not None and value.startswith('{') and '}' in value:
method_name, data = value[1:].split('}', 1)
return method_name.lower(), data
raise ValueError('Invalid password hash')
class PasswordHashRegistry:
'''Factory for creating objects of the correct PasswordHash subclass for a
given password hash value'''
def __init__(self):
self.methods = {}
def register(self, cls):
assert cls.METHOD_NAME not in self.methods
self.methods[cls.METHOD_NAME] = cls
return cls
def parse(self, value, **kwargs):
method_name, _ = parse_value(value)
method_cls = self.methods.get(method_name)
if method_cls is None:
raise ValueError(f'Unknown password hash method {method_name}')
return method_cls(value, **kwargs)
registry = PasswordHashRegistry()
class PasswordHash:
'''OpenLDAP-/NIS-style password hash
Instances wrap password hash strings in the form "{METHOD_NAME}DATA".
Allows gradual migration of password hashing methods by checking
PasswordHash.needs_rehash every time the password is processed and rehashing
it with PasswordHash.from_password if needed. For PasswordHash.needs_rehash
to work, the PasswordHash subclass for the current password hashing method
is instantiated with target_cls set to the PasswordHash subclass of the
intended hashing method.
Instances should be created with PasswordHashRegistry.parse to get the
appropriate subclass based on the method name in a value.'''
METHOD_NAME = None
def __init__(self, value, target_cls=None):
method_name, data = parse_value(value)
if method_name != self.METHOD_NAME:
raise ValueError('Invalid password hash')
self.value = value
self.data = data
self.target_cls = target_cls or type(self)
@classmethod
def from_password(cls, password):
raise NotImplementedError()
def verify(self, password):
raise NotImplementedError()
@property
def needs_rehash(self):
return not isinstance(self, self.target_cls)
@registry.register
class PlaintextPasswordHash(PasswordHash):
'''Pseudo password hash for passwords stored without hashing
Should only be used for migration of existing plaintext passwords. Add the
prefix "{plain}" for this.'''
METHOD_NAME = 'plain'
@classmethod
def from_password(cls, password):
return cls(build_value(cls.METHOD_NAME, password))
def verify(self, password):
return secrets.compare_digest(self.data, password)
class HashlibPasswordHash(PasswordHash):
HASHLIB_ALGORITHM = None
@classmethod
def from_password(cls, password):
ctx = hashlib.new(cls.HASHLIB_ALGORITHM, password.encode())
return cls(build_value(cls.METHOD_NAME, base64.b64encode(ctx.digest()).decode()))
def verify(self, password):
digest = base64.b64decode(self.data.encode())
ctx = hashlib.new(self.HASHLIB_ALGORITHM, password.encode())
return secrets.compare_digest(digest, ctx.digest())
class SaltedHashlibPasswordHash(PasswordHash):
HASHLIB_ALGORITHM = None
@classmethod
def from_password(cls, password):
salt = secrets.token_bytes(8)
ctx = hashlib.new(cls.HASHLIB_ALGORITHM)
ctx.update(password.encode())
ctx.update(salt)
return cls(build_value(cls.METHOD_NAME, base64.b64encode(ctx.digest()+salt).decode()))
def verify(self, password):
data = base64.b64decode(self.data.encode())
ctx = hashlib.new(self.HASHLIB_ALGORITHM)
digest = data[:ctx.digest_size]
salt = data[ctx.digest_size:]
ctx.update(password.encode())
ctx.update(salt)
return secrets.compare_digest(digest, ctx.digest())
@registry.register
class MD5PasswordHash(HashlibPasswordHash):
METHOD_NAME = 'md5'
HASHLIB_ALGORITHM = 'md5'
@registry.register
class SaltedMD5PasswordHash(SaltedHashlibPasswordHash):
METHOD_NAME = 'smd5'
HASHLIB_ALGORITHM = 'md5'
@registry.register
class SHA1PasswordHash(HashlibPasswordHash):
METHOD_NAME = 'sha'
HASHLIB_ALGORITHM = 'sha1'
@registry.register
class SaltedSHA1PasswordHash(SaltedHashlibPasswordHash):
METHOD_NAME = 'ssha'
HASHLIB_ALGORITHM = 'sha1'
@registry.register
class SHA256PasswordHash(HashlibPasswordHash):
METHOD_NAME = 'sha256'
HASHLIB_ALGORITHM = 'sha256'
@registry.register
class SaltedSHA256PasswordHash(SaltedHashlibPasswordHash):
METHOD_NAME = 'ssha256'
HASHLIB_ALGORITHM = 'sha256'
@registry.register
class SHA384PasswordHash(HashlibPasswordHash):
METHOD_NAME = 'sha384'
HASHLIB_ALGORITHM = 'sha384'
@registry.register
class SaltedSHA384PasswordHash(SaltedHashlibPasswordHash):
METHOD_NAME = 'ssha384'
HASHLIB_ALGORITHM = 'sha384'
@registry.register
class SHA512PasswordHash(HashlibPasswordHash):
METHOD_NAME = 'sha512'
HASHLIB_ALGORITHM = 'sha512'
@registry.register
class SaltedSHA512PasswordHash(SaltedHashlibPasswordHash):
METHOD_NAME = 'ssha512'
HASHLIB_ALGORITHM = 'sha512'
@registry.register
class CryptPasswordHash(PasswordHash):
METHOD_NAME = 'crypt'
@classmethod
def from_password(cls, password):
return cls(build_value(cls.METHOD_NAME, crypt(password)))
def verify(self, password):
return secrets.compare_digest(crypt(password, self.data), self.data)
@registry.register
class Argon2PasswordHash(PasswordHash):
METHOD_NAME = 'argon2'
hasher = argon2.PasswordHasher()
@classmethod
def from_password(cls, password):
return cls(build_value(cls.METHOD_NAME, cls.hasher.hash(password)))
def verify(self, password):
try:
return self.hasher.verify(self.data, password)
except argon2.exceptions.Argon2Error:
return False
except argon2.exceptions.InvalidHash:
return False
@property
def needs_rehash(self):
return super().needs_rehash or self.hasher.check_needs_rehash(self.data)
class InvalidPasswordHash:
def __init__(self, value=None):
self.value = value
# pylint: disable=unused-argument
def verify(self, password):
return False
@property
def needs_rehash(self):
return True
def __bool__(self):
return False
# An alternative approach for the behaviour of PasswordHashAttribute would be
# to use sqlalchemy.TypeDecorator. A type decorator allows custom encoding and
# decoding of values coming from the database (when query results are loaded)
# and going into the database (when statements are executed).
#
# This has one downside: After setting e.g. user.password to a string value it
# remains a string value until the change is flushed. It is not possible to
# coerce values to PasswordHash objects as soon as they are set.
#
# This is too inconsistent. Code should be able to rely on user.password to
# always behave like a PasswordHash object.
class PasswordHashAttribute:
'''Descriptor for wrapping an attribute storing a password hash string
Usage example:
>>> class User:
... # Could e.g. be an SQLAlchemy.Column or just a simple attribute
... _passord_hash = None
... password = PasswordHashAttribute('_passord_hash', SHA512PasswordHash)
...
>>> user = User()
>>> type(user.password)
<class 'uffd.password_hash.InvalidPasswordHash'>
>>>
>>> user._password_hash = '{plain}my_password'
>>> type(user.password)
<class 'uffd.password_hash.InvalidPasswordHash'>
>>> user.password.needs_rehash
True
>>>
>>> user.password = 'my_password'
>>> user._passord_hash
'{sha512}3ajDRohg3LJOIoq47kQgjUPrL1/So6U4uvvTnbT/EUyYKaZL0aRxDgwCH4pBNLai+LF+zMh//nnYRZ4t8pT7AQ=='
>>> type(user.password)
<class 'uffd.password_hash.SHA512PasswordHash'>
>>>
>>> user.password = None
>>> user._passord_hash is None
True
>>> type(user.password)
<class 'uffd.password_hash.InvalidPasswordHash'>
When set to a (plaintext) password the underlying attribute is set to a hash
value for the password. When set to None the underlying attribute is also set
to None.'''
def __init__(self, attribute_name, method_cls):
self.attribute_name = attribute_name
self.method_cls = method_cls
def __get__(self, obj, objtype=None):
if obj is None:
return self
value = getattr(obj, self.attribute_name)
try:
return registry.parse(value, target_cls=self.method_cls)
except ValueError:
return InvalidPasswordHash(value)
def __set__(self, obj, value):
if value is None:
value = InvalidPasswordHash()
elif isinstance(value, str):
value = self.method_cls.from_password(value)
setattr(obj, self.attribute_name, value.value)
# Hashing method for (potentially) low entropy secrets like user passwords. Is
# usually slow and uses salting to make dictionary attacks difficult.
LowEntropyPasswordHash = Argon2PasswordHash
# Hashing method for high entropy secrets like API keys. The secrets are
# generated instead of user-selected to ensure a high level of entropy. Is
# fast and does not need salting, since dictionary attacks are not feasable
# due to high entropy.
HighEntropyPasswordHash = SHA512PasswordHash
from flask import current_app
import itsdangerous
from uffd.utils import nopad_b32decode, nopad_b32encode, nopad_urlsafe_b64decode, nopad_urlsafe_b64encode
class Remailer:
'''The remailer feature improves user privacy by hiding real mail addresses
from services and instead providing them with autogenerated pseudonymous
remailer addresses. If a service sends a mail to a remailer address, the mail
service uses an uffd API endpoint to get the real mail address and rewrites
the remailer address with it. In case of a leak of user data from a service,
the remailer addresses are useless for third-parties.
Version 2 of the remailer address format is tolerant to case conversions at
the cost of being slightly longer.'''
@property
def configured(self):
return bool(current_app.config['REMAILER_DOMAIN'])
def get_serializer(self):
secret = current_app.config['REMAILER_SECRET_KEY'] or current_app.secret_key
return itsdangerous.URLSafeSerializer(secret, salt='remailer_address_v1')
def build_v1_address(self, service_id, user_id):
payload = self.get_serializer().dumps([service_id, user_id])
return 'v1-' + payload + '@' + current_app.config['REMAILER_DOMAIN']
def build_v2_address(self, service_id, user_id):
data, sign = self.get_serializer().dumps([service_id, user_id]).split('.', 1)
data = nopad_b32encode(nopad_urlsafe_b64decode(data)).decode().lower()
sign = nopad_b32encode(nopad_urlsafe_b64decode(sign)).decode().lower()
payload = data + '-' + sign
return 'v2-' + payload + '@' + current_app.config['REMAILER_DOMAIN']
def is_remailer_domain(self, domain):
domains = {domain.lower().strip() for domain in current_app.config['REMAILER_OLD_DOMAINS']}
if current_app.config['REMAILER_DOMAIN']:
domains.add(current_app.config['REMAILER_DOMAIN'].lower().strip())
return domain.lower().strip() in domains
def parse_v1_payload(self, payload):
try:
service_id, user_id = self.get_serializer().loads(payload)
except itsdangerous.BadData:
return None
return (service_id, user_id)
def parse_v2_payload(self, payload):
data, sign = (payload.split('-', 1) + [''])[:2]
try:
data = nopad_urlsafe_b64encode(nopad_b32decode(data.upper())).decode()
sign = nopad_urlsafe_b64encode(nopad_b32decode(sign.upper())).decode()
except ValueError:
return None
payload = data + '.' + sign
try:
service_id, user_id = self.get_serializer().loads(payload)
except itsdangerous.BadData:
return None
return (service_id, user_id)
def parse_address(self, address):
if '@' not in address:
return None
local_part, domain = address.rsplit('@', 1)
if not self.is_remailer_domain(domain):
return None
prefix, payload = (local_part.strip().split('-', 1) + [''])[:2]
if prefix == 'v1':
return self.parse_v1_payload(payload)
if prefix.lower() == 'v2':
return self.parse_v2_payload(payload)
return None
remailer = Remailer()
from .views import bp as bp_ui
bp = [bp_ui]
from .views import bp as bp_ui
bp = [bp_ui]
from .views import bp as bp_ui, send_passwordreset
bp = [bp_ui]
import datetime
import secrets
from sqlalchemy import Column, String, DateTime
from uffd.database import db
def random_token():
return secrets.token_hex(128)
class Token():
token = Column(String(128), primary_key=True, default=random_token)
created = Column(DateTime, default=datetime.datetime.now)
class PasswordToken(Token, db.Model):
__tablename__ = 'passwordToken'
loginname = Column(String(32))
class MailToken(Token, db.Model):
__tablename__ = 'mailToken'
loginname = Column(String(32))
newmail = Column(String(255))
{% extends 'base.html' %}
{% block body %}
<form action="{{ url_for("selfservice.forgot_password") }}" method="POST">
<div class="row mt-2 justify-content-center">
<div class="col-lg-6 col-md-10" style="background: #f7f7f7; box-shadow: 0px 2px 2px rgba(0, 0, 0, 0.3); padding: 30px;">
<div class="text-center">
<img alt="branding logo" src="{{ config.get("BRANDING_LOGO_URL") }}" class="col-lg-8 col-md-12" >
</div>
<div class="col-12">
<h2 class="text-center">{{_("Forgot password")}}</h2>
</div>
<div class="form-group col-12">
<label for="user-loginname">{{_("Login Name")}}</label>
<input type="text" class="form-control" id="user-loginname" name="loginname" required="required" tabindex = "1">
</div>
<div class="form-group col-12">
<label for="user-mail">{{_("Mail Address")}}</label>
<input type="text" class="form-control" id="user-mail" name="mail" required="required" tabindex = "2">
</div>
<div class="form-group col-12">
<button type="submit" class="btn btn-primary btn-block" tabindex = "3">{{_("Send password reset mail")}}</button>
</div>
</div>
</div>
</form>
{% endblock %}