Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Show changes
Showing
with 2076 additions and 173 deletions
"""Migrate oauth2 state from user to session
Revision ID: e71e29cc605a
Revises: 99df71f0f4a0
Create Date: 2024-05-18 21:59:20.435912
"""
from alembic import op
import sqlalchemy as sa
revision = 'e71e29cc605a'
down_revision = '99df71f0f4a0'
branch_labels = None
depends_on = None
def upgrade():
op.drop_table('oauth2grant')
op.drop_table('oauth2token')
op.create_table('oauth2grant',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('expires', sa.DateTime(), nullable=False),
sa.Column('session_id', sa.Integer(), nullable=False),
sa.Column('client_db_id', sa.Integer(), nullable=False),
sa.Column('code', sa.String(length=255), nullable=False),
sa.Column('redirect_uri', sa.String(length=255), nullable=True),
sa.Column('nonce', sa.Text(), nullable=True),
sa.Column('_scopes', sa.Text(), nullable=False),
sa.Column('claims', sa.Text(), nullable=True),
sa.ForeignKeyConstraint(['client_db_id'], ['oauth2client.db_id'], name=op.f('fk_oauth2grant_client_db_id_oauth2client'), onupdate='CASCADE', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['session_id'], ['session.id'], name=op.f('fk_oauth2grant_session_id_session'), onupdate='CASCADE', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_oauth2grant'))
)
op.create_table('oauth2token',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('expires', sa.DateTime(), nullable=False),
sa.Column('session_id', sa.Integer(), nullable=False),
sa.Column('client_db_id', sa.Integer(), nullable=False),
sa.Column('token_type', sa.String(length=40), nullable=False),
sa.Column('access_token', sa.String(length=255), nullable=False),
sa.Column('refresh_token', sa.String(length=255), nullable=False),
sa.Column('_scopes', sa.Text(), nullable=False),
sa.Column('claims', sa.Text(), nullable=True),
sa.ForeignKeyConstraint(['client_db_id'], ['oauth2client.db_id'], name=op.f('fk_oauth2token_client_db_id_oauth2client'), onupdate='CASCADE', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['session_id'], ['session.id'], name=op.f('fk_oauth2token_session_id_session'), onupdate='CASCADE', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_oauth2token')),
sa.UniqueConstraint('access_token', name=op.f('uq_oauth2token_access_token')),
sa.UniqueConstraint('refresh_token', name=op.f('uq_oauth2token_refresh_token'))
)
def downgrade():
# We don't drop and recreate the table here to improve fuzzy migration test coverage
meta = sa.MetaData(bind=op.get_bind())
session = sa.table('session',
sa.column('id', sa.Integer),
sa.column('user_id', sa.Integer()),
)
with op.batch_alter_table('oauth2token', schema=None) as batch_op:
batch_op.add_column(sa.Column('user_id', sa.INTEGER(), nullable=True))
oauth2token = sa.Table('oauth2token', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('expires', sa.DateTime(), nullable=False),
sa.Column('session_id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=True),
sa.Column('client_db_id', sa.Integer(), nullable=False),
sa.Column('token_type', sa.String(length=40), nullable=False),
sa.Column('access_token', sa.String(length=255), nullable=False),
sa.Column('refresh_token', sa.String(length=255), nullable=False),
sa.Column('_scopes', sa.Text(), nullable=False),
sa.Column('claims', sa.Text(), nullable=True),
sa.ForeignKeyConstraint(['client_db_id'], ['oauth2client.db_id'], name=op.f('fk_oauth2token_client_db_id_oauth2client'), onupdate='CASCADE', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['session_id'], ['session.id'], name=op.f('fk_oauth2token_session_id_session'), onupdate='CASCADE', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_oauth2token')),
sa.UniqueConstraint('access_token', name=op.f('uq_oauth2token_access_token')),
sa.UniqueConstraint('refresh_token', name=op.f('uq_oauth2token_refresh_token'))
)
op.execute(oauth2token.update().values(user_id=sa.select([session.c.user_id]).where(oauth2token.c.session_id==session.c.id).as_scalar()))
op.execute(oauth2token.delete().where(oauth2token.c.user_id==None))
with op.batch_alter_table('oauth2token', copy_from=oauth2token) as batch_op:
batch_op.alter_column('user_id', nullable=False, existing_type=sa.Integer())
batch_op.create_foreign_key('fk_oauth2token_user_id_user', 'user', ['user_id'], ['id'], onupdate='CASCADE', ondelete='CASCADE')
batch_op.drop_constraint(batch_op.f('fk_oauth2token_session_id_session'), type_='foreignkey')
batch_op.drop_column('session_id')
with op.batch_alter_table('oauth2grant', schema=None) as batch_op:
batch_op.add_column(sa.Column('user_id', sa.INTEGER(), nullable=True))
oauth2grant = sa.Table('oauth2grant', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('expires', sa.DateTime(), nullable=False),
sa.Column('session_id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=True),
sa.Column('client_db_id', sa.Integer(), nullable=False),
sa.Column('code', sa.String(length=255), nullable=False),
sa.Column('redirect_uri', sa.String(length=255), nullable=True),
sa.Column('nonce', sa.Text(), nullable=True),
sa.Column('_scopes', sa.Text(), nullable=False),
sa.Column('claims', sa.Text(), nullable=True),
sa.ForeignKeyConstraint(['client_db_id'], ['oauth2client.db_id'], name=op.f('fk_oauth2grant_client_db_id_oauth2client'), onupdate='CASCADE', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['session_id'], ['session.id'], name=op.f('fk_oauth2grant_session_id_session'), onupdate='CASCADE', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_oauth2grant'))
)
op.execute(oauth2grant.update().values(user_id=sa.select([session.c.user_id]).where(oauth2grant.c.session_id==session.c.id).as_scalar()))
op.execute(oauth2grant.delete().where(oauth2grant.c.user_id==None))
with op.batch_alter_table('oauth2grant', copy_from=oauth2grant) as batch_op:
batch_op.alter_column('user_id', nullable=False, existing_type=sa.Integer())
batch_op.create_foreign_key('fk_oauth2grant_user_id_user', 'user', ['user_id'], ['id'], onupdate='CASCADE', ondelete='CASCADE')
batch_op.drop_constraint(batch_op.f('fk_oauth2grant_session_id_session'), type_='foreignkey')
batch_op.drop_column('session_id')
"""Add id to selfservice tokens
Revision ID: e9a67175e179
Revises: a8c6b6e91c28
Create Date: 2021-09-06 22:04:46.741233
"""
from alembic import op
import sqlalchemy as sa
revision = 'e9a67175e179'
down_revision = 'a8c6b6e91c28'
branch_labels = None
depends_on = None
def upgrade():
meta = sa.MetaData(bind=op.get_bind())
table = sa.Table('mailToken', meta,
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=True),
sa.Column('loginname', sa.String(length=32), nullable=True),
sa.Column('newmail', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_mailToken'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
batch_op.drop_constraint('pk_mailToken', 'primary')
batch_op.add_column(sa.Column('id', sa.Integer(), nullable=True))
batch_op.create_primary_key('pk_mailToken', ['id'])
batch_op.alter_column('id', autoincrement=True, nullable=False, existing_type=sa.Integer())
table = sa.Table('passwordToken', meta,
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=True),
sa.Column('loginname', sa.String(length=32), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_passwordToken'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
batch_op.drop_constraint('pk_passwordToken', 'primary')
batch_op.add_column(sa.Column('id', sa.Integer(), nullable=True))
batch_op.create_primary_key('pk_passwordToken', ['id'])
batch_op.alter_column('id', autoincrement=True, nullable=False, existing_type=sa.Integer())
def downgrade():
meta = sa.MetaData(bind=op.get_bind())
table = sa.Table('mailToken', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=True),
sa.Column('loginname', sa.String(length=32), nullable=True),
sa.Column('newmail', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_mailToken'))
)
with op.batch_alter_table(table.name, copy_from=table) as batch_op:
batch_op.alter_column('id', autoincrement=False, existing_type=sa.Integer())
batch_op.drop_constraint('pk_mailToken', 'primary')
batch_op.create_primary_key('pk_mailToken', ['token'])
batch_op.drop_column('id')
table = sa.Table('passwordToken', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=True),
sa.Column('loginname', sa.String(length=32), nullable=True),
sa.PrimaryKeyConstraint('token', name=op.f('pk_passwordToken'))
)
with op.batch_alter_table(table.name, copy_from=table) as batch_op:
batch_op.alter_column('id', autoincrement=False, existing_type=sa.Integer())
batch_op.drop_constraint('pk_passwordToken', 'primary')
batch_op.create_primary_key('pk_passwordToken', ['token'])
batch_op.drop_column('id')
"""Add ServiceUser
Revision ID: f2eb2c52a61f
Revises: 9f824f61d8ac
Create Date: 2022-08-21 00:42:37.896970
"""
from alembic import op
import sqlalchemy as sa
revision = 'f2eb2c52a61f'
down_revision = '9f824f61d8ac'
branch_labels = None
depends_on = None
def upgrade():
service_user = op.create_table('service_user',
sa.Column('service_id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['service_id'], ['service.id'], name=op.f('fk_service_user_service_id_service'), onupdate='CASCADE', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_service_user_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('service_id', 'user_id', name=op.f('pk_service_user'))
)
service = sa.table('service', sa.column('id'))
user = sa.table('user', sa.column('id'))
op.execute(service_user.insert().from_select(
['service_id', 'user_id'],
sa.select([service.c.id, user.c.id]).select_from(sa.join(service, user, sa.true()))
))
def downgrade():
op.drop_table('service_user')
from .api import APIClient
from .invite import Invite, InviteGrant, InviteSignup
from .mail import Mail, MailReceiveAddress, MailDestinationAddress
from .mfa import MFAType, MFAMethod, RecoveryCodeMethod, TOTPMethod, WebauthnMethod
from .oauth2 import OAuth2Client, OAuth2RedirectURI, OAuth2LogoutURI, OAuth2Grant, OAuth2Token, OAuth2DeviceLoginInitiation, OAuth2Key
from .role import Role, RoleGroup, RoleGroupMap
from .selfservice import PasswordToken
from .service import RemailerMode, Service, ServiceUser, get_services
from .session import Session, DeviceLoginType, DeviceLoginInitiation, DeviceLoginConfirmation
from .signup import Signup
from .user import User, UserEmail, Group, IDAllocator, IDRangeExhaustedError, IDAlreadyAllocatedError
from .ratelimit import RatelimitEvent, Ratelimit, HostRatelimit, host_ratelimit, format_delay
from .misc import FeatureFlag, Lock
__all__ = [
'APIClient',
'Invite', 'InviteGrant', 'InviteSignup',
'Mail', 'MailReceiveAddress', 'MailDestinationAddress',
'MFAType', 'MFAMethod', 'RecoveryCodeMethod', 'TOTPMethod', 'WebauthnMethod',
'OAuth2Client', 'OAuth2RedirectURI', 'OAuth2LogoutURI', 'OAuth2Grant', 'OAuth2Token', 'OAuth2DeviceLoginInitiation',
'Role', 'RoleGroup', 'RoleGroupMap',
'PasswordToken',
'RemailerMode', 'Service', 'ServiceUser', 'get_services',
'DeviceLoginType', 'DeviceLoginInitiation', 'DeviceLoginConfirmation',
'Signup',
'User', 'UserEmail', 'Group', 'IDAllocator', 'IDRangeExhaustedError', 'IDAlreadyAllocatedError',
'RatelimitEvent', 'Ratelimit', 'HostRatelimit', 'host_ratelimit', 'format_delay',
'FeatureFlag', 'Lock',
]
from sqlalchemy import Column, Integer, String, ForeignKey, Boolean, Text
from sqlalchemy.orm import relationship
from uffd.database import db
from uffd.password_hash import PasswordHashAttribute, HighEntropyPasswordHash
class APIClient(db.Model):
__tablename__ = 'api_client'
id = Column(Integer, primary_key=True, autoincrement=True)
service_id = Column(Integer, ForeignKey('service.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
service = relationship('Service', back_populates='api_clients')
auth_username = Column(String(40), unique=True, nullable=False)
_auth_password = Column('auth_password', Text(), nullable=False)
auth_password = PasswordHashAttribute('_auth_password', HighEntropyPasswordHash)
# Permissions are defined by adding an attribute named "perm_NAME"
perm_users = Column(Boolean(create_constraint=True), default=False, nullable=False)
perm_checkpassword = Column(Boolean(create_constraint=True), default=False, nullable=False)
perm_mail_aliases = Column(Boolean(create_constraint=True), default=False, nullable=False)
perm_remailer = Column(Boolean(create_constraint=True), default=False, nullable=False)
perm_metrics = Column(Boolean(create_constraint=True), default=False, nullable=False)
@classmethod
def permission_exists(cls, name):
return hasattr(cls, 'perm_'+name)
def has_permission(self, name):
return getattr(self, 'perm_' + name)
import secrets
import datetime import datetime
from flask_babel import gettext as _
from flask import current_app
from sqlalchemy import Column, String, Integer, ForeignKey, DateTime, Boolean from sqlalchemy import Column, String, Integer, ForeignKey, DateTime, Boolean
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from ldapalchemy.dbutils import DBRelationship
from uffd.utils import token_urlfriendly
from uffd.database import db from uffd.database import db
from uffd.user.models import User from .signup import Signup
from uffd.signup.models import Signup
# pylint: disable=E1101
invite_roles = db.Table('invite_roles', invite_roles = db.Table('invite_roles',
Column('invite_token', String(128), ForeignKey('invite.token'), primary_key=True), Column('invite_id', Integer(), ForeignKey('invite.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True),
Column('role_id', Integer, ForeignKey('role.id'), primary_key=True) Column('role_id', Integer, ForeignKey('role.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True)
) )
class Invite(db.Model): class Invite(db.Model):
__tablename__ = 'invite' __tablename__ = 'invite'
token = Column(String(128), primary_key=True, default=lambda: secrets.token_hex(20)) id = Column(Integer(), primary_key=True, autoincrement=True)
created = Column(DateTime, default=datetime.datetime.now, nullable=False) token = Column(String(128), unique=True, nullable=False, default=token_urlfriendly)
created = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
creator_id = Column(Integer(), ForeignKey('user.id', onupdate='CASCADE'), nullable=True)
creator = relationship('User')
valid_until = Column(DateTime, nullable=False) valid_until = Column(DateTime, nullable=False)
single_use = Column(Boolean, default=True, nullable=False) single_use = Column(Boolean(create_constraint=True), default=True, nullable=False)
allow_signup = Column(Boolean, default=True, nullable=False) allow_signup = Column(Boolean(create_constraint=True), default=True, nullable=False)
used = Column(Boolean, default=False, nullable=False) used = Column(Boolean(create_constraint=True), default=False, nullable=False)
disabled = Column(Boolean, default=False, nullable=False) disabled = Column(Boolean(create_constraint=True), default=False, nullable=False)
roles = relationship('Role', secondary=invite_roles) roles = relationship('Role', secondary=invite_roles)
signups = relationship('InviteSignup', back_populates='invite', lazy=True) signups = relationship('InviteSignup', back_populates='invite', lazy=True, cascade='all, delete-orphan')
grants = relationship('InviteGrant', backref='invite', lazy=True) grants = relationship('InviteGrant', back_populates='invite', lazy=True, cascade='all, delete-orphan')
@property @hybrid_property
def expired(self): def expired(self):
return datetime.datetime.now().replace(second=0, microsecond=0) > self.valid_until return self.valid_until < datetime.datetime.utcnow().replace(second=0, microsecond=0)
@property @hybrid_property
def voided(self): def voided(self):
return self.single_use and self.used return self.single_use and self.used
@property
def permitted(self):
if self.creator is None:
return False # Creator does not exist (anymore)
if self.creator.is_deactivated:
return False
if self.creator.is_in_group(current_app.config['ACL_ADMIN_GROUP']):
return True
if self.allow_signup and not self.creator.is_in_group(current_app.config['ACL_SIGNUP_GROUP']):
return False
for role in self.roles:
if role.moderator_group is None or role.moderator_group not in self.creator.groups:
return False
return True
@property @property
def active(self): def active(self):
return not self.disabled and not self.voided and not self.expired return not self.disabled and not self.voided and not self.expired and self.permitted
@property
def short_token(self):
if len(self.token) < 30:
return '<too short>'
return self.token[:10] + ''
def disable(self): def disable(self):
self.disabled = True self.disabled = True
...@@ -50,27 +74,28 @@ class Invite(db.Model): ...@@ -50,27 +74,28 @@ class Invite(db.Model):
class InviteGrant(db.Model): class InviteGrant(db.Model):
__tablename__ = 'invite_grant' __tablename__ = 'invite_grant'
id = Column(Integer(), primary_key=True, autoincrement=True) id = Column(Integer(), primary_key=True, autoincrement=True)
invite_token = Column(String(128), ForeignKey('invite.token'), nullable=False) invite_id = Column(Integer(), ForeignKey('invite.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
user_dn = Column(String(128), nullable=False) invite = relationship('Invite', back_populates='grants')
user = DBRelationship('user_dn', User) user_id = Column(Integer(), ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
user = relationship('User')
def apply(self): def apply(self):
if not self.invite.active: if not self.invite.active:
return False, 'Invite link is invalid' return False, _('Invite link is invalid')
if not self.invite.roles: if not self.invite.roles:
return False, 'Invite link does not grant any roles' return False, _('Invite link does not grant any roles')
if set(self.invite.roles).issubset(self.user.roles): if set(self.invite.roles).issubset(self.user.roles):
return False, 'Invite link does not grant any new roles' return False, _('Invite link does not grant any new roles')
for role in self.invite.roles: for role in self.invite.roles:
self.user.roles.add(role) self.user.roles.append(role)
self.user.update_groups() self.user.update_groups()
self.invite.used = True self.invite.used = True
return True, 'Success' return True, _('Success')
class InviteSignup(Signup): class InviteSignup(Signup):
__tablename__ = 'invite_signup' __tablename__ = 'invite_signup'
token = Column(String(128), ForeignKey('signup.token'), primary_key=True) id = Column(Integer(), ForeignKey('signup.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True)
invite_token = Column(String(128), ForeignKey('invite.token'), nullable=False) invite_id = Column(Integer(), ForeignKey('invite.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
invite = relationship('Invite', back_populates='signups') invite = relationship('Invite', back_populates='signups')
__mapper_args__ = { __mapper_args__ = {
...@@ -79,17 +104,16 @@ class InviteSignup(Signup): ...@@ -79,17 +104,16 @@ class InviteSignup(Signup):
def validate(self): def validate(self):
if not self.invite.active or not self.invite.allow_signup: if not self.invite.active or not self.invite.allow_signup:
return False, 'Invite link is invalid' return False, _('Invite link is invalid')
return super().validate() return super().validate()
def finish(self, password): def finish(self, password):
if not self.invite.active or not self.invite.allow_signup: if not self.invite.active or not self.invite.allow_signup:
return None, 'Invite link is invalid' return None, _('Invite link is invalid')
user, msg = super().finish(password) user, msg = super().finish(password)
if user is not None: if user is not None:
# super().finish() already added ROLES_BASEROLES
for role in self.invite.roles: for role in self.invite.roles:
user.roles.add(role) user.roles.append(role)
user.update_groups() user.update_groups()
self.invite.used = True self.invite.used = True
return user, msg return user, msg
import re
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.associationproxy import association_proxy
from uffd.database import db
class Mail(db.Model):
# Aliases are looked up by receiver addresses with api.getmails. To emulate
# the pre-v2/LDAP behaviour, the lookup needs to be case-insensitive. To not
# rely on database-specific behaviour, we ensure that all receiver addresses
# are stored lower-case and convert incoming addresses in api.getmails to
# lower-case. Note that full emulation of LDAP behaviour would also require
# whitespace normalization. Instead we disallow spaces in receiver addresses.
# Match ASCII code points 33 (!) to 64 (@) and 91 ([) to 126 (~), i.e. any
# number of lower-case ASCII letters, digits, symbols
RECEIVER_REGEX = '[!-@[-~]*'
RECEIVER_REGEX_COMPILED = re.compile(RECEIVER_REGEX)
__tablename__ = 'mail'
id = Column(Integer(), primary_key=True, autoincrement=True)
uid = Column(String(32), unique=True, nullable=False)
_receivers = relationship('MailReceiveAddress', cascade='all, delete-orphan')
receivers = association_proxy('_receivers', 'address')
_destinations = relationship('MailDestinationAddress', cascade='all, delete-orphan')
destinations = association_proxy('_destinations', 'address')
@property
def invalid_receivers(self):
return [addr for addr in self.receivers if not re.fullmatch(self.RECEIVER_REGEX_COMPILED, addr)]
class MailReceiveAddress(db.Model):
__tablename__ = 'mail_receive_address'
id = Column(Integer(), primary_key=True, autoincrement=True)
mail_id = Column(Integer(), ForeignKey('mail.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
address = Column(String(128), nullable=False)
def __init__(self, address):
self.address = address
class MailDestinationAddress(db.Model):
__tablename__ = 'mail_destination_address'
id = Column(Integer(), primary_key=True, autoincrement=True)
mail_id = Column(Integer(), ForeignKey('mail.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
address = Column(String(128), nullable=False)
def __init__(self, address):
self.address = address
...@@ -8,15 +8,19 @@ import hmac ...@@ -8,15 +8,19 @@ import hmac
import hashlib import hashlib
import base64 import base64
import urllib.parse import urllib.parse
# imports for recovery codes
import crypt
from flask import request, current_app from flask import request, current_app
from sqlalchemy import Column, Integer, Enum, String, DateTime, Text from sqlalchemy import Column, Integer, Enum, String, DateTime, Text, ForeignKey
from ldapalchemy.dbutils import DBRelationship from sqlalchemy.orm import relationship, backref
from uffd.utils import nopad_b32decode, nopad_b32encode
from uffd.password_hash import PasswordHashAttribute, CryptPasswordHash
from uffd.database import db from uffd.database import db
from uffd.user.models import User from .user import User
User.mfa_recovery_codes = relationship('RecoveryCodeMethod', viewonly=True)
User.mfa_totp_methods = relationship('TOTPMethod', viewonly=True)
User.mfa_webauthn_methods = relationship('WebauthnMethod', viewonly=True)
User.mfa_enabled = property(lambda user: bool(user.mfa_totp_methods or user.mfa_webauthn_methods))
class MFAType(enum.Enum): class MFAType(enum.Enum):
RECOVERY_CODE = 0 RECOVERY_CODE = 0
...@@ -26,11 +30,11 @@ class MFAType(enum.Enum): ...@@ -26,11 +30,11 @@ class MFAType(enum.Enum):
class MFAMethod(db.Model): class MFAMethod(db.Model):
__tablename__ = 'mfa_method' __tablename__ = 'mfa_method'
id = Column(Integer(), primary_key=True, autoincrement=True) id = Column(Integer(), primary_key=True, autoincrement=True)
type = Column(Enum(MFAType)) type = Column(Enum(MFAType, create_constraint=True), nullable=False)
created = Column(DateTime()) created = Column(DateTime(), nullable=False, default=datetime.datetime.utcnow)
name = Column(String(128)) name = Column(String(128))
dn = Column(String(128)) user_id = Column(Integer(), ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
user = DBRelationship('dn', User, backref='mfa_methods') user = relationship('User', backref=backref('mfa_methods', cascade='all, delete-orphan'))
__mapper_args__ = { __mapper_args__ = {
'polymorphic_on': type, 'polymorphic_on': type,
...@@ -39,12 +43,11 @@ class MFAMethod(db.Model): ...@@ -39,12 +43,11 @@ class MFAMethod(db.Model):
def __init__(self, user, name=None): def __init__(self, user, name=None):
self.user = user self.user = user
self.name = name self.name = name
self.created = datetime.datetime.now() self.created = datetime.datetime.utcnow()
class RecoveryCodeMethod(MFAMethod): class RecoveryCodeMethod(MFAMethod):
code_salt = Column('recovery_salt', String(64)) _code = Column('recovery_hash', String(256))
code_hash = Column('recovery_hash', String(256)) code = PasswordHashAttribute('_code', CryptPasswordHash)
user = DBRelationship('dn', User, backref='mfa_recovery_codes')
__mapper_args__ = { __mapper_args__ = {
'polymorphic_identity': MFAType.RECOVERY_CODE 'polymorphic_identity': MFAType.RECOVERY_CODE
...@@ -52,14 +55,11 @@ class RecoveryCodeMethod(MFAMethod): ...@@ -52,14 +55,11 @@ class RecoveryCodeMethod(MFAMethod):
def __init__(self, user): def __init__(self, user):
super().__init__(user, None) super().__init__(user, None)
# The code attribute is only available in newly created objects as only # self.code_value is not stored and only available on freshly initiated objects
# it's hash is stored in the database self.code = self.code_value = secrets.token_hex(8).replace(' ', '').lower()
self.code = secrets.token_hex(8).replace(' ', '').lower()
self.code_hash = crypt.crypt(self.code)
def verify(self, code): def verify(self, code):
code = code.replace(' ', '').lower() return self.code.verify(code.replace(' ', '').lower())
return crypt.crypt(code, self.code_hash) == self.code_hash
def _hotp(counter, key, digits=6): def _hotp(counter, key, digits=6):
'''Generates HMAC-based one-time password according to RFC4226 '''Generates HMAC-based one-time password according to RFC4226
...@@ -77,7 +77,7 @@ def _hotp(counter, key, digits=6): ...@@ -77,7 +77,7 @@ def _hotp(counter, key, digits=6):
class TOTPMethod(MFAMethod): class TOTPMethod(MFAMethod):
key = Column('totp_key', String(64)) key = Column('totp_key', String(64))
user = DBRelationship('dn', User, backref='mfa_totp_methods') last_counter = Column('totp_last_counter', Integer())
__mapper_args__ = { __mapper_args__ = {
'polymorphic_identity': MFAType.TOTP 'polymorphic_identity': MFAType.TOTP
...@@ -86,13 +86,12 @@ class TOTPMethod(MFAMethod): ...@@ -86,13 +86,12 @@ class TOTPMethod(MFAMethod):
def __init__(self, user, name=None, key=None): def __init__(self, user, name=None, key=None):
super().__init__(user, name) super().__init__(user, name)
if key is None: if key is None:
key = base64.b32encode(secrets.token_bytes(16)).rstrip(b'=').decode() key = nopad_b32encode(secrets.token_bytes(16)).decode()
self.key = key self.key = key
@property @property
def raw_key(self): def raw_key(self):
tmp = self.key + '='*(8 - (len(self.key) % 8)) return nopad_b32decode(self.key)
return base64.b32decode(tmp.encode())
@property @property
def issuer(self): def issuer(self):
...@@ -119,14 +118,17 @@ class TOTPMethod(MFAMethod): ...@@ -119,14 +118,17 @@ class TOTPMethod(MFAMethod):
:param code: String of digits (as entered by the user) :param code: String of digits (as entered by the user)
:returns: True if code is valid, False otherwise''' :returns: True if code is valid, False otherwise'''
counter = int(time.time()/30) current_counter = int(time.time()/30)
if _hotp(counter-1, self.raw_key) == code or _hotp(counter, self.raw_key) == code: for counter in (current_counter - 1, current_counter):
return True if counter > (self.last_counter or 0):
valid_code = _hotp(counter, self.raw_key)
if secrets.compare_digest(code, valid_code):
self.last_counter = counter
return True
return False return False
class WebauthnMethod(MFAMethod): class WebauthnMethod(MFAMethod):
_cred = Column('webauthn_cred', Text()) _cred = Column('webauthn_cred', Text())
user = DBRelationship('dn', User, backref='mfa_webauthn_methods')
__mapper_args__ = { __mapper_args__ = {
'polymorphic_identity': MFAType.WEBAUTHN 'polymorphic_identity': MFAType.WEBAUTHN
...@@ -138,7 +140,7 @@ class WebauthnMethod(MFAMethod): ...@@ -138,7 +140,7 @@ class WebauthnMethod(MFAMethod):
@property @property
def cred(self): def cred(self):
from fido2.ctap2 import AttestedCredentialData #pylint: disable=import-outside-toplevel from uffd.fido2_compat import AttestedCredentialData #pylint: disable=import-outside-toplevel
return AttestedCredentialData(base64.b64decode(self._cred)) return AttestedCredentialData(base64.b64decode(self._cred))
@cred.setter @cred.setter
......
from uffd.database import db
# pylint completely fails to understand SQLAlchemy's query functions
# pylint: disable=no-member
feature_flag_table = db.Table('feature_flag',
db.Column('name', db.String(32), primary_key=True),
)
class FeatureFlag:
def __init__(self, name):
self.name = name
self.enable_hooks = []
self.disable_hooks = []
@property
def expr(self):
return db.exists().where(feature_flag_table.c.name == self.name)
def __bool__(self):
return db.session.execute(db.select([self.expr])).scalar()
def enable_hook(self, func):
self.enable_hooks.append(func)
return func
def enable(self):
db.session.execute(db.insert(feature_flag_table).values(name=self.name))
for func in self.enable_hooks:
func()
def disable_hook(self, func):
self.disable_hooks.append(func)
return func
def disable(self):
db.session.execute(db.delete(feature_flag_table).where(feature_flag_table.c.name == self.name))
for func in self.disable_hooks:
func()
FeatureFlag.unique_email_addresses = FeatureFlag('unique-email-addresses')
lock_table = db.Table('lock',
db.Column('name', db.String(32), primary_key=True),
)
class Lock:
ALL_LOCKS = set()
def __init__(self, name):
self.name = name
assert name not in self.ALL_LOCKS
self.ALL_LOCKS.add(name)
def acquire(self):
'''Acquire the lock until the end of the current transaction
Calling acquire while the specific lock is already held has no effect.'''
if db.engine.name == 'sqlite':
# SQLite does not support with_for_update, but we can lock the whole DB
# with any write operation. So we do a dummy update.
db.session.execute(db.update(lock_table).where(False).values(name=None))
elif db.engine.name in ('mysql', 'mariadb'):
result = db.session.execute(db.select([lock_table.c.name]).where(lock_table.c.name == self.name).with_for_update()).scalar()
if result is not None:
return
# We add all lock rows with migrations so we should never end up here
raise Exception(f'Lock "{self.name}" is missing')
else:
raise NotImplementedError()
# Only executed when lock_table is created with db.create/db.create_all (e.g.
# during testing). Otherwise the rows are inserted with migrations.
@db.event.listens_for(lock_table, 'after_create') # pylint: disable=no-member
def insert_lock_rows(target, connection, **kwargs): # pylint: disable=unused-argument
for name in Lock.ALL_LOCKS:
db.session.execute(db.insert(lock_table).values(name=name))
db.session.commit()
import datetime
import json
import secrets
import base64
from sqlalchemy import Column, Integer, String, DateTime, Text, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.ext.associationproxy import association_proxy
import jwt
from uffd.database import db, CommaSeparatedList
from uffd.tasks import cleanup_task
from uffd.password_hash import PasswordHashAttribute, HighEntropyPasswordHash
from uffd.utils import token_urlfriendly
from .session import DeviceLoginInitiation, DeviceLoginType
from .service import ServiceUser
# pyjwt v1.7.x compat (Buster/Bullseye)
if not hasattr(jwt, 'get_algorithm_by_name'):
jwt.get_algorithm_by_name = lambda name: jwt.algorithms.get_default_algorithms()[name]
class OAuth2Client(db.Model):
__tablename__ = 'oauth2client'
# Inconsistently named "db_id" instead of "id" because of the naming conflict
# with "client_id" in the OAuth2 standard
db_id = Column(Integer, primary_key=True, autoincrement=True)
service_id = Column(Integer, ForeignKey('service.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
service = relationship('Service', back_populates='oauth2_clients')
client_id = Column(String(40), unique=True, nullable=False)
_client_secret = Column('client_secret', Text(), nullable=False)
client_secret = PasswordHashAttribute('_client_secret', HighEntropyPasswordHash)
_redirect_uris = relationship('OAuth2RedirectURI', cascade='all, delete-orphan')
redirect_uris = association_proxy('_redirect_uris', 'uri')
logout_uris = relationship('OAuth2LogoutURI', cascade='all, delete-orphan')
@property
def default_redirect_uri(self):
return self.redirect_uris[0] if len(self.redirect_uris) == 1 else None
def access_allowed(self, user):
service_user = ServiceUser.query.get((self.service_id, user.id))
return service_user and service_user.has_access
@property
def logout_uris_json(self):
return json.dumps([[item.method, item.uri] for item in self.logout_uris])
class OAuth2RedirectURI(db.Model):
__tablename__ = 'oauth2redirect_uri'
id = Column(Integer, primary_key=True, autoincrement=True)
client_db_id = Column(Integer, ForeignKey('oauth2client.db_id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
uri = Column(String(255), nullable=False)
def __init__(self, uri):
self.uri = uri
class OAuth2LogoutURI(db.Model):
__tablename__ = 'oauth2logout_uri'
id = Column(Integer, primary_key=True, autoincrement=True)
client_db_id = Column(Integer, ForeignKey('oauth2client.db_id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
method = Column(String(40), nullable=False, default='GET')
uri = Column(String(255), nullable=False)
@cleanup_task.delete_by_attribute('expired')
class OAuth2Grant(db.Model):
__tablename__ = 'oauth2grant'
id = Column(Integer, primary_key=True, autoincrement=True)
EXPIRES_IN = 100
expires = Column(DateTime, nullable=False, default=lambda: datetime.datetime.utcnow() + datetime.timedelta(seconds=OAuth2Grant.EXPIRES_IN))
session_id = Column(Integer(), ForeignKey('session.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
session = relationship('Session')
client_db_id = Column(Integer, ForeignKey('oauth2client.db_id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
client = relationship('OAuth2Client')
_code = Column('code', String(255), nullable=False, default=token_urlfriendly)
code = property(lambda self: f'{self.id}-{self._code}')
redirect_uri = Column(String(255), nullable=True)
nonce = Column(Text(), nullable=True)
scopes = Column('_scopes', CommaSeparatedList(), nullable=False, default=tuple())
_claims = Column('claims', Text(), nullable=True)
@property
def claims(self):
return json.loads(self._claims) if self._claims is not None else None
@claims.setter
def claims(self, value):
self._claims = json.dumps(value) if value is not None else None
@property
def service_user(self):
return ServiceUser.query.get((self.client.service_id, self.session.user_id))
@hybrid_property
def expired(self):
if self.expires is None:
return False
return self.expires < datetime.datetime.utcnow()
@classmethod
def get_by_authorization_code(cls, code):
# pylint: disable=protected-access
if '-' not in code:
return None
grant_id, grant_code = code.split('-', 2)
grant = cls.query.filter_by(id=grant_id, expired=False).first()
if not grant or not secrets.compare_digest(grant._code, grant_code):
return None
if grant.session.expired or grant.session.user.is_deactivated:
return None
if not grant.service_user or not grant.service_user.has_access:
return None
return grant
def make_token(self, **kwargs):
return OAuth2Token(
session=self.session,
client=self.client,
scopes=self.scopes,
claims=self.claims,
**kwargs
)
# OAuth2Token objects are cleaned-up when the session expires and is
# auto-deleted (or the user manually revokes it).
class OAuth2Token(db.Model):
__tablename__ = 'oauth2token'
id = Column(Integer, primary_key=True, autoincrement=True)
EXPIRES_IN = 3600
expires = Column(DateTime, nullable=False, default=lambda: datetime.datetime.utcnow() + datetime.timedelta(seconds=OAuth2Token.EXPIRES_IN))
session_id = Column(Integer(), ForeignKey('session.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
session = relationship('Session')
client_db_id = Column(Integer, ForeignKey('oauth2client.db_id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
client = relationship('OAuth2Client')
# currently only bearer is supported
token_type = Column(String(40), nullable=False, default='bearer')
_access_token = Column('access_token', String(255), unique=True, nullable=False, default=token_urlfriendly)
access_token = property(lambda self: f'{self.id}-{self._access_token}')
_refresh_token = Column('refresh_token', String(255), unique=True, nullable=False, default=token_urlfriendly)
refresh_token = property(lambda self: f'{self.id}-{self._refresh_token}')
scopes = Column('_scopes', CommaSeparatedList(), nullable=False, default=tuple())
_claims = Column('claims', Text(), nullable=True)
@property
def claims(self):
return json.loads(self._claims) if self._claims is not None else None
@claims.setter
def claims(self, value):
self._claims = json.dumps(value) if value is not None else None
@property
def service_user(self):
return ServiceUser.query.get((self.client.service_id, self.session.user_id))
@hybrid_property
def expired(self):
return self.expires < datetime.datetime.utcnow()
@classmethod
def get_by_access_token(cls, access_token):
# pylint: disable=protected-access
if '-' not in access_token:
return None
token_id, token_secret = access_token.split('-', 2)
token = cls.query.filter_by(id=token_id, expired=False).first()
if not token or not secrets.compare_digest(token._access_token, token_secret):
return None
if token.session.expired or token.session.user.is_deactivated:
return None
if not token.service_user or not token.service_user.has_access:
return None
return token
class OAuth2DeviceLoginInitiation(DeviceLoginInitiation):
__mapper_args__ = {
'polymorphic_identity': DeviceLoginType.OAUTH2
}
client_db_id = Column('oauth2_client_db_id', Integer, ForeignKey('oauth2client.db_id', onupdate='CASCADE', ondelete='CASCADE'))
client = relationship('OAuth2Client')
@property
def description(self):
return self.client.service.name
class OAuth2Key(db.Model):
__tablename__ = 'oauth2_key'
id = Column(String(64), primary_key=True, default=token_urlfriendly)
created = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
active = Column(Boolean(create_constraint=False), default=True, nullable=False)
algorithm = Column(String(32), nullable=False)
private_key_jwk = Column(Text(), nullable=False)
public_key_jwk = Column(Text(), nullable=False)
def __init__(self, **kwargs):
if kwargs.get('algorithm') and kwargs.get('private_key') \
and not kwargs.get('private_key_jwk') \
and not kwargs.get('public_key_jwk'):
algorithm = jwt.get_algorithm_by_name(kwargs['algorithm'])
private_key = kwargs.pop('private_key')
kwargs['private_key_jwk'] = algorithm.to_jwk(private_key)
kwargs['public_key_jwk'] = algorithm.to_jwk(private_key.public_key())
super().__init__(**kwargs)
@property
def private_key(self):
# pylint: disable=protected-access,import-outside-toplevel
# cryptography performs expensive checks when loading RSA private keys.
# Since we only load keys we generated ourselves with help of cryptography,
# these checks are unnecessary.
import cryptography.hazmat.backends.openssl
cryptography.hazmat.backends.openssl.backend._rsa_skip_check_key = True
res = jwt.get_algorithm_by_name(self.algorithm).from_jwk(self.private_key_jwk)
cryptography.hazmat.backends.openssl.backend._rsa_skip_check_key = False
return res
@property
def public_key(self):
return jwt.get_algorithm_by_name(self.algorithm).from_jwk(self.public_key_jwk)
@property
def public_key_jwks_dict(self):
res = json.loads(self.public_key_jwk)
res['kid'] = self.id
res['alg'] = self.algorithm
res['use'] = 'sig'
# RFC7517 4.3 "The "use" and "key_ops" JWK members SHOULD NOT be used together [...]"
res.pop('key_ops', None)
return res
def encode_jwt(self, payload):
if not self.active:
raise jwt.exceptions.InvalidKeyError(f'Key {self.id} not active')
res = jwt.encode(payload, key=self.private_key, algorithm=self.algorithm, headers={'kid': self.id})
# pyjwt pre-v2 compat (Buster/Bullseye)
if isinstance(res, bytes):
res = res.decode()
return res
# Hash algorithm for at_hash/c_hash from OpenID Connect Core 1.0 section 3.1.3.6
def oidc_hash(self, value):
# pylint: disable=import-outside-toplevel
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend # Only required for Buster
hash_alg = jwt.get_algorithm_by_name(self.algorithm).hash_alg
digest = hashes.Hash(hash_alg(), backend=default_backend())
digest.update(value)
return base64.urlsafe_b64encode(
digest.finalize()[:hash_alg.digest_size // 2]
).decode('ascii').rstrip('=')
@classmethod
def get_preferred_key(cls, algorithm='RS256'):
return cls.query.filter_by(active=True, algorithm=algorithm).order_by(OAuth2Key.created.desc()).first()
@classmethod
def get_available_algorithms(cls):
return ['RS256']
@classmethod
def decode_jwt(cls, data, algorithms=('RS256',), **kwargs):
headers = jwt.get_unverified_header(data)
if 'kid' not in headers:
raise jwt.exceptions.InvalidKeyError('JWT without kid')
kid = headers['kid']
key = cls.query.get(kid)
if not key:
raise jwt.exceptions.InvalidKeyError(f'Key {kid} not found')
if not key.active:
raise jwt.exceptions.InvalidKeyError(f'Key {kid} not active')
return jwt.decode(data, key=key.public_key, algorithms=algorithms, **kwargs)
@classmethod
def generate_rsa_key(cls, public_exponent=65537, key_size=3072):
# pylint: disable=import-outside-toplevel
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend # Only required for Buster
return cls(algorithm='RS256', private_key=rsa.generate_private_key(public_exponent=public_exponent, key_size=key_size, backend=default_backend()))
...@@ -3,17 +3,26 @@ import ipaddress ...@@ -3,17 +3,26 @@ import ipaddress
import math import math
from flask import request from flask import request
from flask_babel import gettext as _
from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.ext.hybrid import hybrid_property
from uffd.tasks import cleanup_task
from uffd.database import db from uffd.database import db
@cleanup_task.delete_by_attribute('expired')
class RatelimitEvent(db.Model): class RatelimitEvent(db.Model):
__tablename__ = 'ratelimit_event' __tablename__ = 'ratelimit_event'
id = Column(Integer(), primary_key=True, autoincrement=True) id = Column(Integer(), primary_key=True, autoincrement=True)
timestamp = Column(DateTime(), default=datetime.datetime.now) timestamp = Column(DateTime(), default=datetime.datetime.utcnow, nullable=False)
name = Column(String(128)) expires = Column(DateTime(), nullable=False)
name = Column(String(128), nullable=False)
key = Column(String(128)) key = Column(String(128))
@hybrid_property
def expired(self):
return self.expires < datetime.datetime.utcnow()
class Ratelimit: class Ratelimit:
def __init__(self, name, interval, limit): def __init__(self, name, interval, limit):
self.name = name self.name = name
...@@ -21,25 +30,23 @@ class Ratelimit: ...@@ -21,25 +30,23 @@ class Ratelimit:
self.limit = limit self.limit = limit
self.base = interval**(1/limit) self.base = interval**(1/limit)
def cleanup(self):
limit = datetime.datetime.now() - datetime.timedelta(seconds=self.interval)
RatelimitEvent.query.filter(RatelimitEvent.name == self.name, RatelimitEvent.timestamp <= limit).delete()
db.session.commit()
def log(self, key=None): def log(self, key=None):
db.session.add(RatelimitEvent(name=self.name, key=key)) db.session.add(RatelimitEvent(name=self.name, key=key, expires=datetime.datetime.utcnow() + datetime.timedelta(seconds=self.interval)))
db.session.commit() db.session.commit()
def get_delay(self, key=None): def get_delay(self, key=None):
self.cleanup() events = RatelimitEvent.query\
events = RatelimitEvent.query.filter(RatelimitEvent.name == self.name, RatelimitEvent.key == key).all() .filter(db.not_(RatelimitEvent.expired))\
.filter_by(name=self.name, key=key)\
.order_by(RatelimitEvent.timestamp)\
.all()
if not events: if not events:
return 0 return 0
delay = math.ceil(self.base**len(events)) delay = math.ceil(self.base**len(events))
if delay < 5: if delay < 5:
delay = 0 delay = 0
delay = min(delay, 365*24*60*60) # prevent overflow of datetime objetcs delay = min(delay, 365*24*60*60) # prevent overflow of datetime objects
remaining = events[0].timestamp + datetime.timedelta(seconds=delay) - datetime.datetime.now() remaining = events[0].timestamp + datetime.timedelta(seconds=delay) - datetime.datetime.utcnow()
return max(0, math.ceil(remaining.total_seconds())) return max(0, math.ceil(remaining.total_seconds()))
def get_addrkey(addr=None): def get_addrkey(addr=None):
...@@ -66,16 +73,16 @@ class HostRatelimit(Ratelimit): ...@@ -66,16 +73,16 @@ class HostRatelimit(Ratelimit):
def format_delay(seconds): def format_delay(seconds):
if seconds <= 15: if seconds <= 15:
return 'a few seconds' return _('a few seconds')
if seconds <= 30: if seconds <= 30:
return '30 seconds' return _('30 seconds')
if seconds <= 60: if seconds <= 60:
return 'one minute' return _('one minute')
if seconds < 3000: if seconds < 3000:
return '%d minutes'%(math.ceil(seconds/60)+1) return _('%(minutes)d minutes', minutes=(math.ceil(seconds/60)+1))
if seconds <= 3600: if seconds <= 3600:
return 'one hour' return _('one hour')
return '%d hours'%math.ceil(seconds/3600) return _('%(hours)d hours', hours=math.ceil(seconds/3600))
# Global host-based ratelimit # Global host-based ratelimit
host_ratelimit = HostRatelimit('host', 1*60*60, 25) host_ratelimit = HostRatelimit('host', 1*60*60, 25)
from sqlalchemy import Column, String, Integer, Text, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from sqlalchemy.orm.collections import MappedCollection, collection
from uffd.database import db
from .user import User
class RoleGroup(db.Model):
__tablename__ = 'role_groups'
role_id = Column(Integer(), ForeignKey('role.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True)
role = relationship('Role', back_populates='groups')
group_id = Column(Integer(), ForeignKey('group.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True)
group = relationship('Group')
requires_mfa = Column(Boolean(create_constraint=True), default=False, nullable=False)
# pylint: disable=E1101
role_members = db.Table('role_members',
db.Column('role_id', db.Integer(), db.ForeignKey('role.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True),
db.Column('user_id', db.Integer(), db.ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True)
)
# pylint: disable=E1101
role_inclusion = db.Table('role-inclusion',
Column('role_id', Integer, ForeignKey('role.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True),
Column('included_role_id', Integer, ForeignKey('role.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True)
)
def flatten_recursive(objs, attr):
'''Returns a set of objects and all objects included in object.`attr` recursivly while avoiding loops'''
objs = set(objs)
new_objs = set(objs)
while new_objs:
for obj in getattr(new_objs.pop(), attr):
if obj not in objs:
objs.add(obj)
new_objs.add(obj)
return objs
def get_user_roles_effective(user):
base = set(user.roles)
if not user.is_service_user:
base.update(Role.query.filter_by(is_default=True))
return flatten_recursive(base, 'included_roles')
User.roles_effective = property(get_user_roles_effective)
def compute_user_groups(user, ignore_mfa=False):
groups = set()
for role in user.roles_effective:
for group in role.groups:
if ignore_mfa or not role.groups[group].requires_mfa or user.mfa_enabled:
groups.add(group)
return groups
User.compute_groups = compute_user_groups
def update_user_groups(user):
current_groups = set(user.groups)
groups = user.compute_groups()
if groups == current_groups:
return set(), set()
groups_added = groups - current_groups
groups_removed = current_groups - groups
for group in groups_removed:
user.groups.remove(group)
for group in groups_added:
user.groups.append(group)
return groups_added, groups_removed
User.update_groups = update_user_groups
class RoleGroupMap(MappedCollection):
def __init__(self):
super().__init__(keyfunc=lambda rolegroup: rolegroup.group)
@collection.internally_instrumented
def __setitem__(self, key, value, _sa_initiator=None):
value.group = key
super().__setitem__(key, value, _sa_initiator)
class Role(db.Model):
__tablename__ = 'role'
id = Column(Integer(), primary_key=True, autoincrement=True)
name = Column(String(32), unique=True, nullable=False)
description = Column(Text(), default='', nullable=False)
included_roles = relationship('Role', secondary=role_inclusion,
primaryjoin=id == role_inclusion.c.role_id,
secondaryjoin=id == role_inclusion.c.included_role_id,
backref='including_roles')
including_roles = [] # overwritten by backref
moderator_group_id = Column(Integer(), ForeignKey('group.id', onupdate='CASCADE', ondelete='SET NULL'), nullable=True)
moderator_group = relationship('Group')
members = relationship('User', secondary='role_members', back_populates='roles')
groups = relationship('RoleGroup', collection_class=RoleGroupMap, cascade='all, delete-orphan', back_populates='role')
# Roles that are managed externally (e.g. by Ansible) can be locked to
# prevent accidental editing of name, moderator group, included roles
# and groups as well as deletion in the web interface.
locked = Column(Boolean(create_constraint=True), default=False, nullable=False)
is_default = Column(Boolean(create_constraint=True), default=False, nullable=False)
@property
def members_effective(self):
members = set()
for role in flatten_recursive([self], 'including_roles'):
members.update(role.members)
if role.is_default:
members.update([user for user in User.query.all() if not user.is_service_user])
return members
@property
def included_roles_recursive(self):
return flatten_recursive(self.included_roles, 'included_roles')
@property
def groups_effective(self):
groups = set(self.groups)
for role in self.included_roles_recursive:
groups.update(role.groups)
return groups
def update_member_groups(self):
for user in self.members_effective:
user.update_groups()
import datetime
from sqlalchemy import Column, String, DateTime, Integer, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.hybrid import hybrid_property
from uffd.database import db
from uffd.utils import token_urlfriendly
from uffd.tasks import cleanup_task
@cleanup_task.delete_by_attribute('expired')
class PasswordToken(db.Model):
__tablename__ = 'passwordToken'
id = Column(Integer(), primary_key=True, autoincrement=True)
token = Column(String(128), default=token_urlfriendly, nullable=False)
created = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
user_id = Column(Integer(), ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
user = relationship('User')
@hybrid_property
def expired(self):
if self.created is None:
return False
return self.created < datetime.datetime.utcnow() - datetime.timedelta(days=2)
import enum
from flask import current_app
from flask_babel import get_locale
from sqlalchemy import Column, Integer, String, ForeignKey, Boolean, Enum
from sqlalchemy.orm import relationship, validates
from uffd.database import db
from uffd.remailer import remailer
from uffd.tasks import cleanup_task
from .user import User, UserEmail, user_groups
class RemailerMode(enum.Enum):
DISABLED = 0
ENABLED_V1 = 1
ENABLED_V2 = 2
class Service(db.Model):
__tablename__ = 'service'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(255), unique=True, nullable=False)
# If limit_access is False, all users have access and access_group is
# ignored. This attribute exists for legacy API and OAuth2 clients that
# were migrated from config definitions where a missing "required_group"
# parameter meant no access restrictions. Representing this state by
# setting access_group_id to NULL would lead to a bad/unintuitive ondelete
# behaviour.
limit_access = Column(Boolean(create_constraint=True), default=True, nullable=False)
access_group_id = Column(Integer(), ForeignKey('group.id', onupdate='CASCADE', ondelete='SET NULL'), nullable=True)
access_group = relationship('Group')
oauth2_clients = relationship('OAuth2Client', back_populates='service', cascade='all, delete-orphan')
api_clients = relationship('APIClient', back_populates='service', cascade='all, delete-orphan')
remailer_mode = Column(Enum(RemailerMode, create_constraint=True), default=RemailerMode.DISABLED, nullable=False)
enable_email_preferences = Column(Boolean(create_constraint=True), default=False, nullable=False)
hide_deactivated_users = Column(Boolean(create_constraint=True), default=False, nullable=False)
class ServiceUser(db.Model):
'''Service-related configuration and state for a user
ServiceUser objects are auto-created whenever a new User or Service is
created, so there one for for every (Service, User) pair.
Service- or User-related code should always use ServiceUser in queries
instead of User/Service.'''
__tablename__ = 'service_user'
__table_args__ = (
db.PrimaryKeyConstraint('service_id', 'user_id'),
)
service_id = Column(Integer(), ForeignKey('service.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
service = relationship('Service', viewonly=True)
user_id = Column(Integer(), ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
user = relationship('User', viewonly=True)
@property
def has_access(self):
return not self.service.limit_access or self.service.access_group in self.user.groups
@property
def has_email_preferences(self):
return self.has_access and self.service.enable_email_preferences
remailer_overwrite_mode = Column(Enum(RemailerMode, create_constraint=True), default=None, nullable=True)
@property
def effective_remailer_mode(self):
if not remailer.configured:
return RemailerMode.DISABLED
if current_app.config['REMAILER_LIMIT_TO_USERS'] is not None:
if self.user.loginname not in current_app.config['REMAILER_LIMIT_TO_USERS']:
return RemailerMode.DISABLED
if self.remailer_overwrite_mode is not None:
return self.remailer_overwrite_mode
return self.service.remailer_mode
service_email_id = Column(Integer(), ForeignKey('user_email.id', onupdate='CASCADE', ondelete='SET NULL'))
service_email = relationship('UserEmail')
@validates('service_email')
def validate_service_email(self, key, value): # pylint: disable=unused-argument
if value is not None:
if not value.user:
value.user = self.user
if value.user != self.user:
raise ValueError('UserEmail assigned to ServiceUser.service_email is not associated with user')
if not value.verified:
raise ValueError('UserEmail assigned to serviceUser.service_email is not verified')
return value
# Actual e-mail address that mails from the service are sent to
@property
def real_email(self):
if self.has_email_preferences and self.service_email:
return self.service_email.address
return self.user.primary_email.address
@classmethod
def get_by_remailer_email(cls, address):
if not remailer.configured:
return None
result = remailer.parse_address(address)
if result is None:
return None
# result is (service_id, user_id), i.e. our primary key
return cls.query.get(result)
# E-Mail address as seen by the service
@property
def email(self):
if self.effective_remailer_mode == RemailerMode.ENABLED_V1:
return remailer.build_v1_address(self.service_id, self.user_id)
if self.effective_remailer_mode == RemailerMode.ENABLED_V2:
return remailer.build_v2_address(self.service_id, self.user_id)
return self.real_email
# User.primary_email and ServiceUser.service_email can only be set to
# verified addresses, so this should always return True
@property
def email_verified(self):
if self.effective_remailer_mode != RemailerMode.DISABLED:
return True
if self.has_email_preferences and self.service_email:
return self.service_email.verified
return self.user.primary_email.verified
@classmethod
def filter_query_by_email(cls, query, email):
'''Filter query of ServiceUser by ServiceUser.email'''
# pylint completely fails to understand SQLAlchemy's query functions
# pylint: disable=no-member,invalid-name,singleton-comparison
service_user = cls.get_by_remailer_email(email)
if service_user and service_user.email == email:
return query.filter(cls.user_id == service_user.user_id, cls.service_id == service_user.service_id)
AliasedUser = db.aliased(User)
AliasedPrimaryEmail = db.aliased(UserEmail)
AliasedServiceEmail = db.aliased(UserEmail)
AliasedService = db.aliased(Service)
aliased_user_groups = db.aliased(user_groups)
query = query.join(cls.user.of_type(AliasedUser))
query = query.join(AliasedUser.primary_email.of_type(AliasedPrimaryEmail))
query = query.outerjoin(cls.service_email.of_type(AliasedServiceEmail))
query = query.join(cls.service.of_type(AliasedService))
remailer_enabled = db.case(
whens=[
(db.not_(remailer.configured), False),
(
db.not_(AliasedUser.loginname.in_(current_app.config['REMAILER_LIMIT_TO_USERS']))
if current_app.config['REMAILER_LIMIT_TO_USERS'] is not None else db.and_(False),
False
),
(cls.remailer_overwrite_mode != None, cls.remailer_overwrite_mode != RemailerMode.DISABLED)
],
else_=(AliasedService.remailer_mode != RemailerMode.DISABLED)
)
has_access = db.or_(
db.not_(AliasedService.limit_access),
db.exists().where(db.and_(
aliased_user_groups.c.user_id == AliasedUser.id,
aliased_user_groups.c.group_id == AliasedService.access_group_id,
))
)
has_email_preferences = db.and_(
has_access,
AliasedService.enable_email_preferences,
)
real_email_matches = db.case(
whens=[
# pylint: disable=singleton-comparison
(db.and_(has_email_preferences, cls.service_email != None), AliasedServiceEmail.address == email),
],
else_=(AliasedPrimaryEmail.address == email)
)
return query.filter(db.and_(db.not_(remailer_enabled), real_email_matches))
@db.event.listens_for(db.Session, 'after_flush') # pylint: disable=no-member
def create_service_users(session, flush_context): # pylint: disable=unused-argument
# pylint completely fails to understand SQLAlchemy's query functions
# pylint: disable=no-member
new_user_ids = [user.id for user in session.new if isinstance(user, User)]
new_service_ids = [service.id for service in session.new if isinstance(service, Service)]
if not new_user_ids and not new_service_ids:
return
db.session.execute(db.insert(ServiceUser).from_select(
['service_id', 'user_id'],
db.select([Service.id, User.id]).select_from(db.join(Service, User, db.true())).where(db.or_(
Service.id.in_(new_service_ids),
User.id.in_(new_user_ids),
))
))
# On databases with write concurrency (i.e. everything but SQLite), the
# after_flush handler above is racy. So in rare cases ServiceUser objects
# might be missing.
@cleanup_task.handler
def create_missing_service_users():
# pylint completely fails to understand SQLAlchemy's query functions
# pylint: disable=no-member
db.session.execute(db.insert(ServiceUser).from_select(
['service_id', 'user_id'],
db.select([Service.id, User.id]).select_from(db.join(Service, User, db.true())).where(db.not_(
ServiceUser.query.filter(
ServiceUser.service_id == Service.id,
ServiceUser.user_id == User.id
).exists()
))
))
# The user-visible services show on the service overview page are read from
# the SERVICES config key. It is planned to gradually extend the Service model
# in order to finally replace the config-defined services.
def get_language_specific(data, field_name, default =''):
return data.get(field_name + '_' + get_locale().language, data.get(field_name, default))
# pylint: disable=too-many-branches
def get_services(user=None):
if not user and not current_app.config['SERVICES_PUBLIC']:
return []
services = []
for service_data in current_app.config['SERVICES']:
service_title = get_language_specific(service_data, 'title')
if not service_title:
continue
service_description = get_language_specific(service_data, 'description')
service = {
'title': service_title,
'subtitle': service_data.get('subtitle', ''),
'description': service_description,
'url': service_data.get('url', ''),
'logo_url': service_data.get('logo_url', ''),
'has_access': True,
'permission': '',
'groups': [],
'infos': [],
'links': [],
}
if service_data.get('required_group'):
if not user or not user.has_permission(service_data['required_group']):
service['has_access'] = False
for permission_data in service_data.get('permission_levels', []):
if permission_data.get('required_group'):
if not user or not user.has_permission(permission_data['required_group']):
continue
if not permission_data.get('name'):
continue
service['has_access'] = True
service['permission'] = permission_data['name']
if service_data.get('confidential', False) and not service['has_access']:
continue
for group_data in service_data.get('groups', []):
if group_data.get('required_group'):
if not user or not user.has_permission(group_data['required_group']):
continue
if not group_data.get('name'):
continue
service['groups'].append(group_data)
for info_data in service_data.get('infos', []):
if info_data.get('required_group'):
if not user or not user.has_permission(info_data['required_group']):
continue
info_title = get_language_specific(info_data, 'title')
info_html = get_language_specific(info_data, 'html')
if not info_title or not info_html:
continue
info_button_text = get_language_specific(info_data, 'button_text', info_title)
info = {
'title': info_title,
'button_text': info_button_text,
'html': info_html,
'id': '%d-%d'%(len(services), len(service['infos'])),
}
service['infos'].append(info)
for link_data in service_data.get('links', []):
if link_data.get('required_group'):
if not user or not user.has_permission(link_data['required_group']):
continue
if not link_data.get('url') or not link_data.get('title'):
continue
service['links'].append(link_data)
services.append(service)
return services
import datetime
import secrets
import enum
from flask import current_app
from sqlalchemy import Column, String, Integer, DateTime, ForeignKey, Enum, Text, Boolean
from sqlalchemy.orm import relationship
from sqlalchemy.ext.hybrid import hybrid_property
from flask_babel import gettext as _
try:
from ua_parser import user_agent_parser
USER_AGENT_PARSER_SUPPORTED = True
except ImportError:
USER_AGENT_PARSER_SUPPORTED = False
from uffd.database import db
from uffd.utils import token_typeable
from uffd.tasks import cleanup_task
from uffd.password_hash import PasswordHashAttribute, HighEntropyPasswordHash
@cleanup_task.delete_by_attribute('expired')
class Session(db.Model):
__tablename__ = 'session'
id = Column(Integer(), primary_key=True, autoincrement=True)
_secret = Column('secret', Text)
secret = PasswordHashAttribute('_secret', HighEntropyPasswordHash)
user_id = Column(Integer(), ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
user = relationship('User', back_populates='sessions')
oauth2_grants = relationship('OAuth2Grant', back_populates='session', cascade='all, delete-orphan')
oauth2_tokens = relationship('OAuth2Token', back_populates='session', cascade='all, delete-orphan')
created = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
last_used = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
user_agent = Column(Text, nullable=False, default='')
ip_address = Column(Text)
mfa_done = Column(Boolean(create_constraint=True), default=False, nullable=False)
@hybrid_property
def expired(self):
if self.created is None or self.last_used is None:
return False
if self.created < datetime.datetime.utcnow() - datetime.timedelta(seconds=current_app.config['SESSION_LIFETIME_SECONDS']):
return True
if self.last_used < datetime.datetime.utcnow() - current_app.permanent_session_lifetime:
return True
return False
@expired.expression
def expired(cls): # pylint: disable=no-self-argument
return db.or_(
cls.created < datetime.datetime.utcnow() - datetime.timedelta(seconds=current_app.config['SESSION_LIFETIME_SECONDS']),
cls.last_used < datetime.datetime.utcnow() - current_app.permanent_session_lifetime,
)
@property
def user_agent_browser(self):
# pylint: disable=too-many-return-statements
if USER_AGENT_PARSER_SUPPORTED and not getattr(self, 'DISABLE_USER_AGENT_PARSER', False):
family = user_agent_parser.ParseUserAgent(self.user_agent)['family']
return family if family != 'Other' else _('Unknown')
if ' OPR/' in self.user_agent:
return 'Opera'
if ' Edg/' in self.user_agent:
return 'Microsoft Edge'
if ' Safari/' in self.user_agent and ' Chrome/' not in self.user_agent:
return 'Safari'
if ' Chrome/' in self.user_agent:
return 'Chrome'
if ' Firefox/' in self.user_agent:
return 'Firefox'
return _('Unknown')
@property
def user_agent_platform(self):
if USER_AGENT_PARSER_SUPPORTED and not getattr(self, 'DISABLE_USER_AGENT_PARSER', False):
family = user_agent_parser.ParseOS(self.user_agent)['family']
return family if family != 'Other' else _('Unknown')
sysinfo = ([''] + self.user_agent.split('(', 1))[-1].split(')', 0)[0]
platforms = [
'Android', 'Linux', 'OpenBSD', 'FreeBSD', 'NetBSD', 'Windows', 'iPhone',
'iPad', 'Macintosh',
]
for platform in platforms:
if platform in sysinfo:
return platform
return _('Unknown')
# Device login provides a convenient and secure way to log into SSO-enabled
# services on a secondary device without entering the user password or
# completing 2FA challenges.
#
# Use-cases:
# * A user wants to log into a single OAuth2-enabled web service on his
# mobile phone without trusting the device enough to expose his full
# credentials.
# * A user wants to log into an OAuth2-enabled web service on a secondary
# device at a busy event location with too little privacy to securly enter
# his credentials but already has a login session on his laptop.
# * A user wants to log into an OAuth2-enabled service via the web browser
# on a native mobile app on his phone and cannot use his 2FA method on that
# device (e.g. FIDO2 token with USB-A) or in the app's web view.
# The mechanism uses two random codes: When the user attempts to authenticate
# with an SSO-enabled service and chooses the "Device Login" option on the SSO
# login page, the SSO generates and displays an initiation code. That code is
# securly bound to the browser session that is used to request it. The user
# logs into the SSO on another device using his credentials and 2FA methods and
# opens a page to authorize the device login attempt. There he enteres the
# initiation code. The SSO displays the details of the device login attempt
# (i.e. the name of the service to log into). Once the user authorizes the
# login attempt, the SSO generates a confirmation code and displays it to the
# user. The user enters the confirmation code on the device he wants to log
# in with and proceeds with the authentication.
#
# An attacker might
# * generate initiation codes,
# * observe the displayed/entered initiation code,
# * observe the displayed/entered confirmation code and
# * possibly divert the victims attention and provoke typing errors.
#
# An attacker must not be able to
# * authenticate with an SSO-enabled service as another user or
# * trick a user to authenticate with an SSO-enabled service as the attacker.
#
# An example for the second case would be the Nextcloud mobile app: The app
# integrates closely with the phone's OS and provides features like
# auto-upload of photos, contacts and more. If the app would authenticate
# with an attacker-controlled account, the attacker would have access to
# this data.
class DeviceLoginType(enum.Enum):
OAUTH2 = 0
@cleanup_task.delete_by_attribute('expired')
class DeviceLoginInitiation(db.Model):
'''Abstract initiation code class
An initiation code is generated and displayed when a user chooses
"Device Login" on the login page. Instances are always bound to a
specific service, e.g. a client id in case of OAuth2.
The code attribute is formed out of two indepentently unique parts
to ensure that at any time all existing codes differ in at least two
characters (i.e. mistyping one character can not result in another
existing and possibly attacker-controlled code).
An initiation code is securly bound to the session that it was created
with by storing both id and secret in the authenticated session cookie.'''
__tablename__ = 'device_login_initiation'
id = Column(Integer(), primary_key=True, autoincrement=True)
type = Column(Enum(DeviceLoginType, create_constraint=True), nullable=False)
code0 = Column(String(32), unique=True, nullable=False, default=lambda: token_typeable(3))
code1 = Column(String(32), unique=True, nullable=False, default=lambda: token_typeable(3))
secret = Column(String(128), nullable=False, default=lambda: secrets.token_hex(64))
confirmations = relationship('DeviceLoginConfirmation', back_populates='initiation', cascade='all, delete-orphan')
created = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
__mapper_args__ = {
'polymorphic_on': type,
}
@hybrid_property
def code(self):
# Split into two parts, each unique, to ensure that every code differs
# in more than one character from other existing codes.
return self.code0 + self.code1
@hybrid_property
def expired(self):
if self.created is None:
return False
return self.created < datetime.datetime.utcnow() - datetime.timedelta(minutes=30)
@property
def description(self):
raise NotImplementedError()
class DeviceLoginConfirmation(db.Model):
'''Confirmation code class
A confirmation code is generated and displayed when an authenticated user
enters an initiation code and confirms the device login attempt. Every
instance is bound to both an initiation code and a login session.
The code attribute is formed out of two indepentently unique parts
to ensure that at any time all existing codes differ in at least two
characters (i.e. mistyping one character can not result in another
existing and possibly attacker-controlled code).'''
__tablename__ = 'device_login_confirmation'
id = Column(Integer(), primary_key=True, autoincrement=True)
initiation_id = Column(Integer(), ForeignKey('device_login_initiation.id',
name='fk_device_login_confirmation_initiation_id_',
onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
initiation = relationship('DeviceLoginInitiation', back_populates='confirmations')
session_id = Column(Integer(), ForeignKey('session.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False, unique=True)
session = relationship('Session')
code0 = Column(String(32), nullable=False, default=lambda: token_typeable(1))
code1 = Column(String(32), nullable=False, default=lambda: token_typeable(1))
__table_args__ = (
db.UniqueConstraint('initiation_id', 'code0', name='uq_device_login_confirmation_initiation_id_code0'),
db.UniqueConstraint('initiation_id', 'code1', name='uq_device_login_confirmation_initiation_id_code1'),
)
@hybrid_property
def code(self):
# Split into two parts, each unique, to ensure that every code differs
# in more than one character from other existing codes.
return self.code0 + self.code1
import secrets
import datetime import datetime
from crypt import crypt
from flask import current_app from flask_babel import gettext as _
from sqlalchemy import Column, String, Text, DateTime from sqlalchemy import Column, Integer, String, DateTime, Text, ForeignKey
from ldapalchemy.dbutils import DBRelationship from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import relationship, backref
from sqlalchemy.ext.hybrid import hybrid_property
from uffd.tasks import cleanup_task
from uffd.utils import token_urlfriendly
from uffd.password_hash import PasswordHashAttribute, LowEntropyPasswordHash
from uffd.database import db from uffd.database import db
from uffd.ldap import ldap from .user import User
from uffd.user.models import User
from uffd.role.models import Role
@cleanup_task.delete_by_attribute('expired_and_not_completed')
class Signup(db.Model): class Signup(db.Model):
'''Model that represents a self-signup request '''Model that represents a self-signup request
...@@ -25,17 +27,19 @@ class Signup(db.Model): ...@@ -25,17 +27,19 @@ class Signup(db.Model):
address does not allow a third party to complete the signup procedure and address does not allow a third party to complete the signup procedure and
set a new password with the (also mail-based) password reset functionality. set a new password with the (also mail-based) password reset functionality.
As long as they are not completed, signup requests have no effect each other As long as they are not completed, signup requests have no effect on each
or different parts of the application.''' other or different parts of the application.'''
__tablename__ = 'signup' __tablename__ = 'signup'
token = Column(String(128), primary_key=True, default=lambda: secrets.token_hex(20)) id = Column(Integer(), primary_key=True, autoincrement=True)
created = Column(DateTime, default=datetime.datetime.now, nullable=False) token = Column(String(128), default=token_urlfriendly, nullable=False)
created = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
loginname = Column(Text) loginname = Column(Text)
displayname = Column(Text) displayname = Column(Text)
mail = Column(Text) mail = Column(Text)
pwhash = Column(Text) _password = Column('pwhash', Text)
user_dn = Column(String(128)) # Set after successful confirmation password = PasswordHashAttribute('_password', LowEntropyPasswordHash)
user = DBRelationship('user_dn', User) user_id = Column(Integer(), ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=True, unique=True)
user = relationship('User', backref=backref('signups', cascade='all, delete-orphan'))
type = Column(String(50)) type = Column(String(50))
__mapper_args__ = { __mapper_args__ = {
...@@ -43,23 +47,26 @@ class Signup(db.Model): ...@@ -43,23 +47,26 @@ class Signup(db.Model):
'polymorphic_on': type 'polymorphic_on': type
} }
# Write-only property def set_password(self, value):
def password(self, value):
if not User().set_password(value): if not User().set_password(value):
return return False
self.pwhash = crypt(value) self.password = value
password = property(fset=password) return True
def check_password(self, value): @hybrid_property
return self.pwhash is not None and crypt(value, self.pwhash) == self.pwhash
@property
def expired(self): def expired(self):
return self.created is not None and datetime.datetime.now() >= self.created + datetime.timedelta(hours=48) if self.created is None:
return False
return self.created < datetime.datetime.utcnow() - datetime.timedelta(hours=48)
@property @hybrid_property
def completed(self): def completed(self):
return self.user_dn is not None # pylint: disable=singleton-comparison
return self.user_id != None
@hybrid_property
def expired_and_not_completed(self):
return db.and_(self.expired, db.not_(self.completed))
def validate(self): # pylint: disable=too-many-return-statements def validate(self): # pylint: disable=too-many-return-statements
'''Return whether the signup request is valid and Signup.finish is likely to succeed '''Return whether the signup request is valid and Signup.finish is likely to succeed
...@@ -68,18 +75,18 @@ class Signup(db.Model): ...@@ -68,18 +75,18 @@ class Signup(db.Model):
is False and `errmsg` contains a string describing why. Otherwise is False and `errmsg` contains a string describing why. Otherwise
`valid` is True.''' `valid` is True.'''
if self.completed or self.expired: if self.completed or self.expired:
return False, 'Invalid signup request' return False, _('Invalid signup request')
if not User().set_loginname(self.loginname): if not User().set_loginname(self.loginname):
return False, 'Login name is invalid' return False, _('Login name is invalid')
if not User().set_displayname(self.displayname): if not User().set_displayname(self.displayname):
return False, 'Display name is invalid' return False, _('Display name is invalid')
if not User().set_mail(self.mail): if not User().set_primary_email_address(self.mail):
return False, 'Mail address is invalid' return False, _('E-Mail address is invalid')
if self.pwhash is None: if not self.password:
return False, 'Invalid password' return False, _('Invalid password')
if User.query.filter_by(loginname=self.loginname).all(): if User.query.filter_by(loginname=self.loginname).all():
return False, 'A user with this login name already exists' return False, _('A user with this login name already exists')
return True, 'Valid' return True, _('Valid')
def finish(self, password): def finish(self, password):
'''Complete the signup procedure and return the new user '''Complete the signup procedure and return the new user
...@@ -93,20 +100,23 @@ class Signup(db.Model): ...@@ -93,20 +100,23 @@ class Signup(db.Model):
`errmsg` contains a string describing why. Otherwise `user` is a `errmsg` contains a string describing why. Otherwise `user` is a
User object.''' User object.'''
if self.completed or self.expired: if self.completed or self.expired:
return None, 'Invalid signup request' return None, _('Invalid signup request')
if not self.check_password(password): if not self.password.verify(password):
return None, 'Wrong password' return None, _('Wrong password')
if User.query.filter_by(loginname=self.loginname).all(): if User.query.filter_by(loginname=self.loginname).all():
return None, 'A user with this login name already exists' return None, _('A user with this login name already exists')
user = User(loginname=self.loginname, displayname=self.displayname, mail=self.mail, password=password) # Flush to make sure the flush below does not catch unrelated errors
ldap.session.add(user) db.session.flush()
for name in current_app.config['ROLES_BASEROLES']: user = User(loginname=self.loginname, displayname=self.displayname, primary_email_address=self.mail, password=self.password)
for role in Role.query.filter_by(name=name).all(): db.session.add(user)
user.roles.add(role) try:
user.update_groups() db.session.flush()
except IntegrityError:
return None, _('Login name or e-mail address is already in use')
user.update_groups() # pylint: disable=no-member
self.user = user self.user = user
self.loginname = None self.loginname = None
self.displayname = None self.displayname = None
self.mail = None self.mail = None
self.pwhash = None self.password = None
return user, 'Success' return user, _('Success')
import string
import re
import datetime
import unicodedata
from flask import current_app, escape
from flask_babel import lazy_gettext
from sqlalchemy import Column, Integer, String, ForeignKey, Boolean, Text, DateTime
from sqlalchemy.orm import relationship, validates
from sqlalchemy.ext.hybrid import hybrid_property
from uffd.database import db
from uffd.remailer import remailer
from uffd.utils import token_urlfriendly
from uffd.password_hash import PasswordHashAttribute, LowEntropyPasswordHash, HighEntropyPasswordHash
from .misc import FeatureFlag, Lock
class IDRangeExhaustedError(Exception):
pass
class IDAlreadyAllocatedError(ValueError):
pass
# Helper class for UID/GID allocation that prevents reuse even if
# users/groups are deleted.
#
# To keep track of formerly used UIDs/GIDs, they are always also added to
# uid/gid allocation tables. Rows in these tables are never deleted.
# User/group tables have foreign key constraints to ensure that there can
# only ever be three cases for a given ID:
#
# 1. The ID was never used (does not exist in either user/group or allocation
# table)
# 2. The ID was used, but the user/group was deleted (it does not exist in
# user/group table, but it exists in the allocation table)
# 3. The ID is in use (it exists in both the user/group and the allocation
# table)
#
# For auto-allocation, there are a few edge cases to consider:
#
# 1. GIDs can be chosen freely in the web interface, e.g. one could easily
# create a group with the last GID in range.
# 2. For UIDs there are two ranges (for regular users and for service users).
# The ranges may either be the same or they may be different but
# non-overlapping.
# 3. ID ranges can be changed (e.g. extended to either side if the old range
# is exhausted). Existing IDs should not change.
#
# The approach we use here is to always auto-allocate the first unused id
# in range. This approach handles the three edge cases well and even behaves
# sanely in unsupported configurations like different but overlapping UID
# ranges.
class IDAllocator:
# pylint completely fails to understand SQLAlchemy's query functions
# pylint: disable=no-member
def __init__(self, name):
self.name = name
self.lock = Lock(f'{name}_allocation')
self.allocation_table = db.Table(f'{name}_allocation', db.Column('id', db.Integer(), primary_key=True))
def allocate(self, id):
self.lock.acquire()
result = db.session.execute(
db.select([self.allocation_table.c.id])
.where(self.allocation_table.c.id == id)
).scalar()
if result is not None:
raise IDAlreadyAllocatedError(f'Cannot allocate {self.name}: {id} is in use or was used in the past')
db.session.execute(db.insert(self.allocation_table).values(id=id))
def auto(self, min_id, max_id):
'''Auto-allocate and return an unused id in range'''
self.lock.acquire()
# We cannot easily iterate through a large range of numbers with generic
# SQL statements looking for unused ids. So to find the first unused id in
# range, we look for the first used id in range that is followed by an
# unused id. This does not work if there are no used ids in range (returns
# NULL) or if min_id is unused (returns higher id while it should return
# min_id). To fix this we also check if min_id is used or not.
tmp = db.aliased(self.allocation_table)
first_unused_id = db.session.execute(
db.select([db.func.min(self.allocation_table.c.id + 1)])
.where(self.allocation_table.c.id >= min_id)
.where(db.not_(db.exists().where(tmp.c.id == self.allocation_table.c.id + 1)))
).scalar()
min_id_used = db.session.execute(
db.select([db.exists()
.where(self.allocation_table.c.id == min_id)])
).scalar()
if not min_id_used:
first_unused_id = min_id
if first_unused_id > max_id:
raise IDRangeExhaustedError(f'Cannot auto-allocate {self.name}: Range is exhausted')
db.session.execute(db.insert(self.allocation_table).values(id=first_unused_id))
return first_unused_id
def user_unix_uid_default(context):
if context.get_current_parameters()['is_service_user']:
min_uid = current_app.config['USER_SERVICE_MIN_UID']
max_uid = current_app.config['USER_SERVICE_MAX_UID']
else:
min_uid = current_app.config['USER_MIN_UID']
max_uid = current_app.config['USER_MAX_UID']
return User.unix_uid_allocator.auto(min_uid, max_uid)
class User(db.Model):
# Allows 8 to 256 ASCII letters (lower and upper case), digits, spaces and
# symbols/punctuation characters. It disallows control characters and
# non-ASCII characters to prevent setting passwords considered invalid by
# SASLprep.
#
# This REGEX ist used both in Python and JS.
PASSWORD_REGEX = '[ -~]*'
PASSWORD_MINLEN = 8
PASSWORD_MAXLEN = 256
PASSWORD_DESCRIPTION = lazy_gettext('At least %(minlen)d and at most %(maxlen)d characters. ' + \
'Only letters, digits, spaces and some symbols (<code>%(symbols)s</code>) allowed. ' + \
'Please use a password manager.',
minlen=PASSWORD_MINLEN, maxlen=PASSWORD_MAXLEN, symbols=escape('!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'))
__tablename__ = 'user'
id = Column(Integer(), primary_key=True, autoincrement=True)
unix_uid_allocator = IDAllocator('uid')
unix_uid = Column(Integer(), ForeignKey('uid_allocation.id'), unique=True, nullable=False, default=user_unix_uid_default)
@validates('unix_uid')
def validate_unix_uid(self, key, value): # pylint: disable=unused-argument
if self.unix_uid != value and value is not None:
self.unix_uid_allocator.allocate(value)
return value
loginname = Column(String(32), unique=True, nullable=False)
displayname = Column(String(128), nullable=False)
all_emails = relationship(
'UserEmail',
foreign_keys='UserEmail.user_id',
cascade='all, delete-orphan',
back_populates='user',
post_update=True,
)
verified_emails = relationship(
'UserEmail',
foreign_keys='UserEmail.user_id',
viewonly=True,
primaryjoin='and_(User.id == UserEmail.user_id, UserEmail.verified)',
)
primary_email_id = Column(Integer(), ForeignKey('user_email.id', onupdate='CASCADE'), nullable=False)
primary_email = relationship('UserEmail', foreign_keys='User.primary_email_id')
# recovery_email_id == NULL -> use primary email
recovery_email_id = Column(Integer(), ForeignKey('user_email.id', onupdate='CASCADE', ondelete='SET NULL'))
recovery_email = relationship('UserEmail', foreign_keys='User.recovery_email_id')
@validates('primary_email', 'recovery_email')
def validate_email(self, key, value):
if value is not None:
if not value.user:
value.user = self
if value.user != self:
raise ValueError(f'UserEmail assigned to User.{key} is not associated with user')
if not value.verified:
raise ValueError(f'UserEmail assigned to User.{key} is not verified')
return value
_password = Column('pwhash', Text(), nullable=True)
password = PasswordHashAttribute('_password', LowEntropyPasswordHash)
is_service_user = Column(Boolean(create_constraint=True), default=False, nullable=False)
is_deactivated = Column(Boolean(create_constraint=True), default=False, nullable=False)
groups = relationship('Group', secondary='user_groups', back_populates='members')
roles = relationship('Role', secondary='role_members', back_populates='members')
service_users = relationship('ServiceUser', viewonly=True)
sessions = relationship('Session', back_populates='user', cascade='all, delete-orphan')
def __init__(self, primary_email_address=None, **kwargs):
super().__init__(**kwargs)
if primary_email_address is not None:
self.primary_email = UserEmail(address=primary_email_address, verified=True)
@property
def unix_gid(self):
return current_app.config['USER_GID']
def is_in_group(self, name):
if not name:
return True
for group in self.groups:
if group.name == name:
return True
return False
def has_permission(self, required_group=None):
if not required_group:
return True
group_names = {group.name for group in self.groups}
group_sets = required_group
if isinstance(group_sets, str):
group_sets = [group_sets]
for group_set in group_sets:
if isinstance(group_set, str):
group_set = [group_set]
if set(group_set) - group_names == set():
return True
return False
def set_loginname(self, value, ignore_blocklist=False):
if len(value) > 32 or len(value) < 1:
return False
for char in value:
if not char in string.ascii_lowercase + string.digits + '_-':
return False
if not ignore_blocklist:
for expr in current_app.config['LOGINNAME_BLOCKLIST']:
if re.match(expr, value):
return False
self.loginname = value
return True
def set_displayname(self, value):
if len(value) > 128 or len(value) < 1:
return False
self.displayname = value
return True
def set_password(self, value):
if len(value) < self.PASSWORD_MINLEN or len(value) > self.PASSWORD_MAXLEN or not re.fullmatch(self.PASSWORD_REGEX, value):
return False
self.password = value
return True
def set_primary_email_address(self, address):
# UserEmail.query.filter_by(user=self, address=address).first() would cause
# a flush, so we do this in python. A flush would cause an IntegrityError if
# this method is used a new User object, since primary_email_id is not
# nullable.
email = ([item for item in self.all_emails if item.address == address] or [None])[0]
if not email:
email = UserEmail()
if not email.set_address(address):
return False
email.verified = True
self.primary_email = email
return True
# Somehow pylint non-deterministically fails to detect that .update_groups is set in role.models
def update_groups(self):
pass
class UserEmail(db.Model):
__tablename__ = 'user_email'
id = Column(Integer(), primary_key=True, autoincrement=True)
# We have a cyclic dependency between User.primary_email and UserEmail.user.
# To solve this, we make UserEmail.user nullable, add validators, and set
# post_update=True here and for the backref.
user_id = Column(Integer(), ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE', use_alter=True))
user = relationship('User', foreign_keys='UserEmail.user_id', back_populates='all_emails', post_update=True)
@validates('user')
def validate_user(self, key, value): # pylint: disable=unused-argument
if self.user is not None and self.user != value:
raise ValueError('UserEmail.user cannot be changed once set')
return value
@classmethod
def normalize_address(cls, value):
return unicodedata.normalize('NFKC', value).lower().strip()
address = Column(String(128), nullable=False)
address_normalized = Column(String(128), nullable=False)
@validates('address')
def validate_address(self, key, value): # pylint: disable=unused-argument
if self.address is not None and self.address != value:
raise ValueError('UserEmail.address cannot be changed once set')
self.address_normalized = self.normalize_address(value)
return value
# True or None/NULL (not False, see constraints below)
_verified = Column('verified', Boolean(create_constraint=True), nullable=True)
@hybrid_property
def verified(self):
# pylint: disable=singleton-comparison
return self._verified != None
@verified.setter
def verified(self, value):
if self._verified and not value:
raise ValueError('UserEmail cannot be unverified once verified')
self._verified = True if value else None
verification_legacy_id = Column(Integer()) # id of old MailToken
_verification_secret = Column('verification_secret', Text())
verification_secret = PasswordHashAttribute('_verification_secret', HighEntropyPasswordHash)
verification_expires = Column(DateTime)
# Until uffd v3, we make the stricter unique constraints optional, by having
# enable_strict_constraints act as a switch to enable/disable the constraints
# on a per-row basis.
# True or None/NULL if disabled (not False, see constraints below)
enable_strict_constraints = Column(
Boolean(create_constraint=True),
nullable=True,
default=db.select([db.case([(FeatureFlag.unique_email_addresses.expr, True)], else_=None)])
)
# The unique constraints rely on the common interpretation of SQL92, that if
# any column in a unique constraint is NULL, the unique constraint essentially
# does not apply to the row. This is how SQLite, MySQL/MariaDB, PostgreSQL and
# other common databases behave. A few others like Microsoft SQL Server do not
# follow this, but we don't support them anyway.
__table_args__ = (
# A user cannot have the same address more than once, regardless of verification status
db.UniqueConstraint('user_id', 'address', name='uq_user_email_user_id_address'), # Legacy, to be removed in v3
# Same unique constraint as uq_user_email_user_id_address, but with
# address_normalized instead of address. Only active if
# enable_strict_constraints is not NULL.
db.UniqueConstraint('user_id', 'address_normalized', 'enable_strict_constraints',
name='uq_user_email_user_id_address_normalized'),
# The same verified address can only exist once. Only active if
# enable_strict_constraints is not NULL. Unverified addresses are ignored,
# since verified is NULL in that case.
db.UniqueConstraint('address_normalized', 'verified', 'enable_strict_constraints',
name='uq_user_email_address_normalized_verified'),
)
def set_address(self, value):
if len(value) < 3 or '@' not in value:
return False
domain = value.rsplit('@', 1)[-1]
if remailer.is_remailer_domain(domain):
return False
self.address = value
return True
def start_verification(self):
if self.verified:
raise Exception('UserEmail.start_verification must not be called if address is already verified')
self.verification_legacy_id = None
secret = token_urlfriendly()
self.verification_secret = secret
self.verification_expires = datetime.datetime.utcnow() + datetime.timedelta(days=2)
return secret
@hybrid_property
def verification_expired(self):
if self.verification_expires is None:
return True
return self.verification_expires < datetime.datetime.utcnow()
def finish_verification(self, secret):
# pylint: disable=using-constant-test,no-member
if self.verification_expired:
return False
if not self.verification_secret.verify(secret):
return False
self.verification_legacy_id = None
self.verification_secret = None
self.verification_expires = None
self.verified = True
return True
@FeatureFlag.unique_email_addresses.enable_hook
def enable_unique_email_addresses():
UserEmail.query.update({UserEmail.enable_strict_constraints: True})
@FeatureFlag.unique_email_addresses.disable_hook
def disable_unique_email_addresses():
UserEmail.query.update({UserEmail.enable_strict_constraints: None})
# pylint: disable=E1101
user_groups = db.Table('user_groups',
Column('user_id', Integer(), ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True),
Column('group_id', Integer(), ForeignKey('group.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True)
)
def group_unix_gid_default():
return Group.unix_gid_allocator.auto(current_app.config['GROUP_MIN_GID'], current_app.config['GROUP_MAX_GID'])
class Group(db.Model):
__tablename__ = 'group'
id = Column(Integer(), primary_key=True, autoincrement=True)
unix_gid_allocator = IDAllocator('gid')
unix_gid = Column(Integer(), ForeignKey('gid_allocation.id'), unique=True, nullable=False, default=group_unix_gid_default)
@validates('unix_gid')
def validate_unix_gid(self, key, value): # pylint: disable=unused-argument
if self.unix_gid != value and value is not None:
self.unix_gid_allocator.allocate(value)
return value
name = Column(String(32), unique=True, nullable=False)
description = Column(String(128), nullable=False, default='')
members = relationship('User', secondary='user_groups', back_populates='groups')
def set_name(self, value):
if len(value) > 32 or len(value) < 1:
return False
for char in value:
if not char in string.ascii_lowercase + string.digits + '_-':
return False
self.name = value
return True
# pylint: disable=invalid-name def setup_navbar(app, positions):
navbarList = [] app.navbarPositions = positions
# pylint: enable=invalid-name app.navbarList = []
app.jinja_env.globals['getnavbar'] = lambda: [n for n in app.navbarList if n['visible']()]
def setup_navbar(app):
app.jinja_env.globals['getnavbar'] = lambda: [n for n in navbarList if n['visible']()]
# iconlib can be 'bootstrap' # iconlib can be 'bootstrap'
# ( see: http://getbootstrap.com/components/#glyphicons ) # ( see: http://getbootstrap.com/components/#glyphicons )
# or 'fa' # or 'fa'
# ( see: http://fontawesome.io/icons/ ) # ( see: http://fontawesome.io/icons/ )
# visible is a function that returns "True" if this icon should be visible in the calling context # visible is a function that returns "True" if this icon should be visible in the calling context
# pylint: disable=too-many-arguments
def register_navbar(name, iconlib='fa', icon=None, group=None, endpoint=None, blueprint=None, visible=None): def register_navbar(name, iconlib='fa', icon=None, group=None, endpoint=None, blueprint=None, visible=None):
def wrapper(func): def wrapper(func):
urlendpoint = endpoint def deferred_call(state):
if not endpoint: assert blueprint
# pylint: disable=protected-access urlendpoint = endpoint
if blueprint: if not endpoint:
urlendpoint = "{}.{}".format(blueprint.name, func.__name__) # pylint: disable=protected-access
else: if blueprint:
urlendpoint = func.__name_ urlendpoint = "{}.{}".format(blueprint.name, func.__name__)
else:
urlendpoint = func.__name_
# pylint: enable=protected-access # pylint: enable=protected-access
item = {} item = {}
item['iconlib'] = iconlib item['iconlib'] = iconlib
item['icon'] = icon item['icon'] = icon
item['group'] = group item['group'] = group
item['endpoint'] = urlendpoint item['endpoint'] = urlendpoint
item['name'] = name item['name'] = name
item['blueprint'] = blueprint item['blueprint'] = blueprint
item['visible'] = visible or (lambda: True) item['visible'] = visible or (lambda: True)
navbarList.append(item) item['position'] = 99
if blueprint.name in state.app.navbarPositions:
item['position'] = state.app.navbarPositions.index(blueprint.name)
else:
item['visible'] = lambda: False
state.app.navbarList.append(item)
state.app.navbarList.sort(key=lambda item: item['position'])
blueprint.record_once(deferred_call)
return func return func
return wrapper return wrapper
from .views import bp as _bp
bp = [_bp]
from flask import current_app
from sqlalchemy import Column, Integer, String, DateTime, Text
from ldapalchemy.dbutils import DBRelationship
from uffd.database import db
from uffd.user.models import User
class OAuth2Client:
def __init__(self, client_id, client_secret, redirect_uris, required_group=None, logout_urls=None):
self.client_id = client_id
self.client_secret = client_secret
# We only support the Authorization Code Flow for confidential (server-side) clients
self.client_type = 'confidential'
self.redirect_uris = redirect_uris
self.default_scopes = ['profile']
self.required_group = required_group
self.logout_urls = []
for url in (logout_urls or []):
if isinstance(url, str):
self.logout_urls.append(['GET', url])
else:
self.logout_urls.append(url)
@classmethod
def from_id(cls, client_id):
return OAuth2Client(client_id, **current_app.config['OAUTH2_CLIENTS'][client_id])
@property
def default_redirect_uri(self):
return self.redirect_uris[0]
def access_allowed(self, user):
return user.has_permission(self.required_group)
class OAuth2Grant(db.Model):
__tablename__ = 'oauth2grant'
id = Column(Integer, primary_key=True)
user_dn = Column(String(128))
user = DBRelationship('user_dn', User, backref='oauth2_grants')
client_id = Column(String(40))
@property
def client(self):
return OAuth2Client.from_id(self.client_id)
@client.setter
def client(self, newclient):
self.client_id = newclient.client_id
code = Column(String(255), index=True, nullable=False)
redirect_uri = Column(String(255))
expires = Column(DateTime)
_scopes = Column(Text)
@property
def scopes(self):
if self._scopes:
return self._scopes.split()
return []
def delete(self):
db.session.delete(self)
db.session.commit()
return self
class OAuth2Token(db.Model):
__tablename__ = 'oauth2token'
id = Column(Integer, primary_key=True)
user_dn = Column(String(128))
user = DBRelationship('user_dn', User, backref='oauth2_tokens')
client_id = Column(String(40))
@property
def client(self):
return OAuth2Client.from_id(self.client_id)
@client.setter
def client(self, newclient):
self.client_id = newclient.client_id
# currently only bearer is supported
token_type = Column(String(40))
access_token = Column(String(255), unique=True)
refresh_token = Column(String(255), unique=True)
expires = Column(DateTime)
_scopes = Column(Text)
@property
def scopes(self):
if self._scopes:
return self._scopes.split()
return []
def delete(self):
db.session.delete(self)
db.session.commit()
return self