Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Show changes
Showing
with 1239 additions and 127 deletions
from collections import OrderedDict
from sqlalchemy import MetaData, event
from sqlalchemy.types import TypeDecorator, Text
from sqlalchemy.ext.mutable import MutableList
from flask_sqlalchemy import SQLAlchemy
from flask.json import JSONEncoder
# pylint: disable=C0103
db = SQLAlchemy()
# pylint: enable=C0103
class SQLAlchemyJSON(JSONEncoder):
def default(self, o):
if isinstance(o, db.Model):
result = OrderedDict()
for key in o.__mapper__.c.keys():
result[key] = getattr(o, key)
return result
return JSONEncoder.default(self, o)
convention = {
'ix': 'ix_%(column_0_label)s',
'uq': 'uq_%(table_name)s_%(column_0_name)s',
'ck': 'ck_%(table_name)s_%(column_0_name)s',
'fk': 'fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s',
'pk': 'pk_%(table_name)s'
}
metadata = MetaData(naming_convention=convention)
db = SQLAlchemy(metadata=metadata)
def enable_sqlite_foreign_key_support(dbapi_connection, connection_record):
# pylint: disable=unused-argument
cursor = dbapi_connection.cursor()
cursor.execute('PRAGMA foreign_keys=ON')
cursor.close()
# We want to enable SQLite foreign key support for app and test code, but not
# for migrations.
# The common way to add the handler to the Engine class (so it applies to all
# instances) would also affect the migrations. With flask_sqlalchemy v2.4 and
# newer we could overwrite SQLAlchemy.create_engine and add our handler there.
# However Debian Buster and Bullseye ship v2.1, so we do this here and call
# this function in create_app.
def customize_db_engine(engine):
if engine.name == 'sqlite':
event.listen(engine, 'connect', enable_sqlite_foreign_key_support)
elif engine.name in ('mysql', 'mariadb'):
@event.listens_for(engine, 'connect')
def receive_connect(dbapi_connection, connection_record): # pylint: disable=unused-argument
cursor = dbapi_connection.cursor()
cursor.execute('SHOW VARIABLES LIKE "character_set_connection"')
character_set_connection = cursor.fetchone()[1]
if character_set_connection != 'utf8mb4':
raise Exception(f'Unsupported connection charset "{character_set_connection}". Make sure to add "?charset=utf8mb4" to SQLALCHEMY_DATABASE_URI!')
cursor.execute('SHOW VARIABLES LIKE "collation_database"')
collation_database = cursor.fetchone()[1]
if collation_database != 'utf8mb4_nopad_bin':
raise Exception(f'Unsupported database collation "{collation_database}". Create the database with "CHARACTER SET utf8mb4 COLLATE utf8mb4_nopad_bin"!')
cursor.execute('SET NAMES utf8mb4 COLLATE utf8mb4_nopad_bin')
cursor.close()
class CommaSeparatedList(TypeDecorator):
# For some reason TypeDecorator.process_literal_param and
# TypeEngine.python_type are abstract but not actually required
# pylint: disable=abstract-method
impl = Text
cache_ok = True
def process_bind_param(self, value, dialect):
if value is None:
return None
for item in value:
if ',' in item:
raise ValueError('Items of comma-separated list must not contain commas')
return ','.join(value)
def process_result_value(self, value, dialect):
if value is None:
return None
return MutableList(value.split(','))
LDAP_BASE_USER="ou=users,dc=example,dc=com"
LDAP_BASE_GROUPS="ou=groups,dc=example,dc=com"
LDAP_SERVICE_BIND_DN=""
LDAP_SERVICE_BIND_PASSWORD=""
LDAP_SERVICE_URL="ldapi:///"
LDAP_USER_OBJECTCLASSES=["vmailUser", "top", "inetOrgPerson", "organizationalPerson", "person", "posixAccount"]
LDAP_USER_GID=20001
LDAP_USER_MIN_UID=10000
LDAP_USER_MAX_UID=18999
USER_GID=20001
# Service and non-service users must either have the same UID range or must not overlap
USER_MIN_UID=10000
USER_MAX_UID=18999
USER_SERVICE_MIN_UID=19000
USER_SERVICE_MAX_UID=19999
GROUP_MIN_GID=20000
GROUP_MAX_GID=49999
# The period of time that a login lasts for.
SESSION_LIFETIME_SECONDS=3600
ACL_LDAP_GROUP_USEREDIT="admins"
ACL_ADMIN_GROUP="admin"
ACL_SELFSERVICE_GROUP="user"
# The period of time that the session cookie lasts for. This is refreshed on each page load.
PERMANENT_SESSION_LIFETIME=2678400
# CSRF protection
SESSION_COOKIE_SECURE=True
SESSION_COOKIE_HTTPONLY=True
SESSION_COOKIE_SAMESITE='Strict'
LANGUAGES={
# Language identifier (see Accept-Language HTTP header) -> Display Name
"en": "EN",
"de": "DE",
}
ACL_ADMIN_GROUP="uffd_admin"
# Group required to access selfservice functions (view selfservice, change profile/password/roles)
ACL_SELFSERVICE_GROUP="uffd_access"
# Group required to login
ACL_ACCESS_GROUP="uffd_access"
# Members can create invite links for signup
ACL_SIGNUP_GROUP="uffd_signup"
MAIL_SERVER='smtp.gmail.com'
MAIL_SERVER='' # e.g. example.com
MAIL_PORT=465
MAIL_USERNAME='yourId@gmail.com'
MAIL_USERNAME='yourId@example.com' # set to empty string to disable authentication
MAIL_PASSWORD='*****'
MAIL_USE_STARTTLS=True
MAIL_FROM_ADDRESS='foo@bar.com'
# Set to a domain name (e.g. "remailer.example.com") to enable remailer.
# Requires special mail server configuration (see uffd-socketmapd). Can be
# enabled/disabled per-service in the service settings. If enabled, services
# no longer get real user mail addresses but instead special autogenerated
# addresses that are replaced with the real mail addresses by the mail server.
REMAILER_DOMAIN = ''
REMAILER_OLD_DOMAINS = []
# Secret used for construction and verification of remailer addresses.
# If None, the value of SECRET_KEY is used.
REMAILER_SECRET_KEY = None
# Set to list of user loginnames to limit remailer to a small list of users.
# Useful for debugging. If None remailer is active for all users (if
# configured and enabled for a service). This option is deprecated. Use the
# per-service setting in the web interface instead.
REMAILER_LIMIT_TO_USERS = None
# Do not enable this on a public service! There is no spam protection implemented at the moment.
SELF_SIGNUP=False
INVITE_MAX_VALID_DAYS=21
LOGINNAME_BLOCKLIST=['^admin$', '^root$']
#MFA_ICON_URL = 'https://example.com/logo.png'
#MFA_RP_ID = 'example.com' # If unset, hostname from current request is used
MFA_RP_NAME = 'Uffd Test Service' # Service name passed to U2F/FIDO2 authenticators
SQLALCHEMY_TRACK_MODIFICATIONS=False
FOOTER_LINKS=[{"url": "https://example.com", "title": "example"}]
# The default page after login or clicking the top left home button is the self-service
# page. If you would like it to be the services list instead, set this to True.
DEFAULT_PAGE_SERVICES=False
# Service overview page (disabled if empty)
SERVICES=[
# # Title is mandatory, all other fields are optional.
# # For permission_levels/groups/infos/links all fields are mandatory aside from required_group.
# {
# 'title': 'Service Title',
# 'subtitle': 'Service Subtitle',
# 'description': 'Short description of the service as plain text',
# 'url': 'https://example.com/',
# 'logo_url': 'https://example.com/logo.png',
# # Basic access group name, service is accessible to everyone if empty
# 'required_group': 'users',
# # Non-basic permission levels, the last matching entry is selected.
# # Users with a matching permission level are considered to have
# # access to the service (as if they have the basic access group).
# 'permission_levels': [
# {'name': 'Moderator', 'required_group': 'moderators'},
# {'name': 'Admin', 'required_group': 'uffd_admin'},
# ],
# # Per default all services are listed publicly (but grayed out for
# # guests/users without access). Confidential services are only visible
# # to users with access rights to the service.
# 'confidential': True,
# # In-service groups, all matching items are visible
# 'groups': [
# {'name': 'Group "crew_crew"', 'required_group': 'users'},
# {'name': 'Group "crew_logistik"', 'required_group': 'uffd_admin'},
# ],
# # Infos are small/medium amounts of information displayed in a modal
# # dialog. All matching items are visible.
# 'infos': [
# {
# 'title': 'uffd',
# 'button_text': 'Documentation', # Defaults to the title if not set
# 'html': '<p>Some information about the service as html</p>',
# 'required_group': 'users',
# },
# ],
# # Links to external sites, all matching items are visible
# 'links': [
# {'title': 'Link to an external site', 'url': '#', 'required_group': 'users'},
# ]
# },
]
# A banner text that will be displayed above the services list
SERVICES_BANNER=''
# If the banner should be shown to users who are not logged in
SERVICES_BANNER_PUBLIC=True
# Enable the service overview page for users who are not logged in
SERVICES_PUBLIC=True
# An optional banner that will be displayed above the login form
#LOGIN_BANNER='Always check the URL. Never enter your SSO password on any other site.'
BRANDING_LOGO_URL='/static/empty.png'
SITE_TITLE='uffd'
# Name and contact mail address are displayed to users in a few places (plain text only!)
ORGANISATION_NAME='Example Organisation'
ORGANISATION_CONTACT='contact@example.com'
# Optional text included in account registration mails (plain text only!)
WELCOME_TEXT='See https://docs.example.com/ for further information.'
# do NOT set in production
#TEMPLATES_AUTO_RELOAD=True
......
# pylint: skip-file
from flask_babel import gettext as _
from warnings import warn
from flask import request, current_app
import urllib.parse
# WebAuthn support is optional because fido2 has a pretty unstable
# interface and might be difficult to install with the correct version
try:
import fido2 as __fido2
if __fido2.__version__.startswith('0.5.'):
from fido2.client import ClientData
from fido2.server import Fido2Server, RelyingParty as __PublicKeyCredentialRpEntity
from fido2.ctap2 import AttestationObject, AuthenticatorData, AttestedCredentialData
from fido2 import cbor
cbor.encode = cbor.dumps
cbor.decode = lambda arg: cbor.loads(arg)[0]
class PublicKeyCredentialRpEntity(__PublicKeyCredentialRpEntity):
def __init__(self, name, id):
super().__init__(id, name)
elif __fido2.__version__.startswith('0.9.'):
from fido2.client import ClientData
from fido2.webauthn import PublicKeyCredentialRpEntity
from fido2.server import Fido2Server
from fido2.ctap2 import AttestationObject, AuthenticatorData, AttestedCredentialData
from fido2 import cbor
elif __fido2.__version__.startswith('1.'):
from fido2.webauthn import PublicKeyCredentialRpEntity, CollectedClientData as ClientData, AttestationObject, AuthenticatorData, AttestedCredentialData
from fido2.server import Fido2Server
from fido2 import cbor
else:
raise ImportError(f'Unsupported fido2 version: {__fido2.__version__}')
def get_webauthn_server():
hostname = urllib.parse.urlsplit(request.url).hostname
return Fido2Server(PublicKeyCredentialRpEntity(id=current_app.config.get('MFA_RP_ID', hostname),
name=current_app.config['MFA_RP_NAME']))
WEBAUTHN_SUPPORTED = True
except ImportError as err:
warn(_('2FA WebAuthn support disabled because import of the fido2 module failed (%s)')%err)
WEBAUTHN_SUPPORTED = False
from .ldap import bp as ldap_bp
from .ldap import get_conn, user_conn, escape_filter_chars, uid_to_dn
from .ldap import loginname_to_dn, get_next_uid, loginname_is_safe
from .ldap import get_ldap_array_attribute_safe, get_ldap_attribute_safe
bp = [ldap_bp]
import string
from flask import Blueprint, current_app
from ldap3.utils.conv import escape_filter_chars
from ldap3.core.exceptions import LDAPBindError, LDAPCursorError
from ldap3 import Server, Connection, ALL, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES
bp = Blueprint("ldap", __name__)
def fix_connection(conn):
old_search = conn.search
def search(*args, **kwargs):
kwargs.update({'attributes': [ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES]})
return old_search(*args, **kwargs)
conn.search = search
return conn
def service_conn():
server = Server(current_app.config["LDAP_SERVICE_URL"], get_info=ALL)
return fix_connection(Connection(server, current_app.config["LDAP_SERVICE_BIND_DN"], current_app.config["LDAP_SERVICE_BIND_PASSWORD"], auto_bind=True))
def user_conn(loginname, password):
if not loginname_is_safe(loginname):
return False
server = Server(current_app.config["LDAP_SERVICE_URL"], get_info=ALL)
try:
return fix_connection(Connection(server, loginname_to_dn(loginname), password, auto_bind=True))
except LDAPBindError:
return False
def get_conn():
return service_conn()
def uid_to_dn(uid):
conn = get_conn()
conn.search(current_app.config["LDAP_BASE_USER"], '(&(objectclass=person)(uidNumber={}))'.format(escape_filter_chars(uid)))
if not len(conn.entries) == 1:
return None
return conn.entries[0].entry_dn
def loginname_to_dn(loginname):
if loginname_is_safe(loginname):
return 'uid={},{}'.format(loginname, current_app.config["LDAP_BASE_USER"])
raise Exception('unsafe login name')
def loginname_is_safe(value):
if len(value) > 32 or len(value) < 1:
return False
for char in value:
if not char in string.ascii_lowercase + string.digits + '_':
return False
return True
def get_next_uid():
conn = get_conn()
conn.search(current_app.config["LDAP_BASE_USER"], '(objectclass=person)')
max_uid = current_app.config["LDAP_USER_MIN_UID"]
for i in conn.entries:
# skip out of range entries
if i['uidNumber'].value > current_app.config["LDAP_USER_MAX_UID"]:
continue
if i['uidNumber'].value < current_app.config["LDAP_USER_MIN_UID"]:
continue
max_uid = max(i['uidNumber'].value, max_uid)
next_uid = max_uid + 1
if uid_to_dn(next_uid):
raise Exception('No free uid found')
return next_uid
def get_ldap_attribute_safe(ldapobject, attribute):
try:
result = ldapobject[attribute].value if attribute in ldapobject else None
# we have to catch LDAPCursorError here, because ldap3 in older versions has a broken __contains__ function
# see https://github.com/cannatag/ldap3/issues/493
# fixed in version 2.5
# debian buster ships 2.4.1
except LDAPCursorError:
result = None
return result
def get_ldap_array_attribute_safe(ldapobject, attribute):
# if the aray is empty, the attribute does not exist.
# if there is only one elemtent, ldap returns a string and not an array with one element
# we sanitize this to always be an array
result = get_ldap_attribute_safe(ldapobject, attribute)
if not result:
result = []
if isinstance(result, str):
result = [result]
return result
Database Migrations
===================
While we use Alembic in a single-database configuration, the migration scripts
are compatible with both SQLite and MySQL/MariaDB.
Compatability with SQLite almost always requires `batch_alter_table` operations
to modify existing tables. These recreate the tables, copy the data and finally
replace the old with the newly creaed ones. Alembic is configured to
auto-generate those operations, but in most cases the generated code fails to
fully reflect all details of the original schema. This way some contraints
(i.e. `CHECK` contstrains on Enums) are lost. Define the full table and pass it
with `copy_from` to `batch_alter_table` to prevent this.
Compatability with MySQL requires special care when changing primary keys and
when dealing with foreign keys. It often helps to temporarily remove foreign
key constraints concerning the table that is subject to change. When adding an
autoincrement id column as the new primary key of a table, recreate the table
with `batch_alter_table`.
The `check_migrations.py` script verifies that upgrading and downgrading works
with both databases. While it is far from perfect, it catches many common
errors. It runs automatically as part of the CI pipeline. Make sure to update
the script when adding new tables and when making significant changes to
existing tables.
# A generic, single database configuration.
[alembic]
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
from __future__ import with_statement
from alembic import context
from sqlalchemy import engine_from_config, pool
from logging.config import fileConfig
import logging
import click
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
logger = logging.getLogger('alembic.env')
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
from flask import current_app
config.set_main_option('sqlalchemy.url',
current_app.config.get('SQLALCHEMY_DATABASE_URI'))
target_metadata = current_app.extensions['migrate'].db.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
# this callback is used to prevent an auto-migration from being generated
# when there are no changes to the schema
# reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html
def process_revision_directives(context, revision, directives):
if getattr(config.cmd_opts, 'autogenerate', False):
script = directives[0]
if script.upgrade_ops.is_empty():
directives[:] = []
logger.info('No changes in schema detected.')
engine = engine_from_config(config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool)
connection = engine.connect()
context.configure(connection=connection,
target_metadata=target_metadata,
process_revision_directives=process_revision_directives,
**current_app.extensions['migrate'].configure_args)
if engine.name in ('mysql', 'mariadb'):
character_set_connection = connection.execute('SHOW VARIABLES LIKE "character_set_connection"').fetchone()[1]
if character_set_connection != 'utf8mb4':
raise click.ClickException(f'Unsupported connection charset "{character_set_connection}". Make sure to add "?charset=utf8mb4" to SQLALCHEMY_DATABASE_URI!')
collation_database = connection.execute('SHOW VARIABLES LIKE "collation_database"').fetchone()[1]
if collation_database != 'utf8mb4_nopad_bin':
raise click.ClickException(f'Unsupported database collation "{collation_database}". Create the database with "CHARACTER SET utf8mb4 COLLATE utf8mb4_nopad_bin"!')
connection.execute('SET NAMES utf8mb4 COLLATE utf8mb4_nopad_bin')
try:
with context.begin_transaction():
context.run_migrations()
finally:
connection.close()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}
"""OpenID Connect Support
Revision ID: 01fdd7820f29
Revises: a9b449776953
Create Date: 2023-11-09 16:52:20.860871
"""
from alembic import op
import sqlalchemy as sa
import datetime
import secrets
import math
import logging
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend # Only required for Buster
import jwt
# pyjwt v1.7.x compat (Buster/Bullseye)
if not hasattr(jwt, 'get_algorithm_by_name'):
jwt.get_algorithm_by_name = lambda name: jwt.algorithms.get_default_algorithms()[name]
# revision identifiers, used by Alembic.
revision = '01fdd7820f29'
down_revision = 'a9b449776953'
branch_labels = None
depends_on = None
logger = logging.getLogger('alembic.runtime.migration.01fdd7820f29')
def token_with_alphabet(alphabet, nbytes=None):
'''Return random text token that consists of characters from `alphabet`'''
if nbytes is None:
nbytes = max(secrets.DEFAULT_ENTROPY, 32)
nbytes_per_char = math.log(len(alphabet), 256)
nchars = math.ceil(nbytes / nbytes_per_char)
return ''.join([secrets.choice(alphabet) for _ in range(nchars)])
def token_urlfriendly(nbytes=None):
'''Return random text token that is urlsafe and works around common parsing bugs'''
alphabet = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
return token_with_alphabet(alphabet, nbytes=nbytes)
def upgrade():
logger.info('Generating 3072 bit RSA key pair (RS256) for OpenID Connect support ...')
private_key = rsa.generate_private_key(public_exponent=65537, key_size=3072, backend=default_backend())
meta = sa.MetaData(bind=op.get_bind())
oauth2_key = op.create_table('oauth2_key',
sa.Column('id', sa.String(length=64), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('active', sa.Boolean(create_constraint=False), nullable=False),
sa.Column('algorithm', sa.String(length=32), nullable=False),
sa.Column('private_key_jwk', sa.Text(), nullable=False),
sa.Column('public_key_jwk', sa.Text(), nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('pk_oauth2_key'))
)
algorithm = jwt.get_algorithm_by_name('RS256')
op.bulk_insert(oauth2_key, [{
'id': token_urlfriendly(),
'created': datetime.datetime.utcnow(),
'active': True,
'algorithm': 'RS256',
'private_key_jwk': algorithm.to_jwk(private_key),
'public_key_jwk': algorithm.to_jwk(private_key.public_key()),
}])
with op.batch_alter_table('oauth2grant', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_oauth2grant_code'))
oauth2grant = sa.Table('oauth2grant', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('client_db_id', sa.Integer(), nullable=False),
sa.Column('code', sa.String(length=255), nullable=False),
sa.Column('redirect_uri', sa.String(length=255), nullable=False),
sa.Column('expires', sa.DateTime(), nullable=False),
sa.Column('_scopes', sa.Text(), nullable=False),
sa.ForeignKeyConstraint(['client_db_id'], ['oauth2client.db_id'], name=op.f('fk_oauth2grant_client_db_id_oauth2client'), onupdate='CASCADE', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_oauth2grant_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_oauth2grant'))
)
with op.batch_alter_table('oauth2grant', copy_from=oauth2grant) as batch_op:
batch_op.add_column(sa.Column('nonce', sa.Text(), nullable=True))
batch_op.add_column(sa.Column('claims', sa.Text(), nullable=True))
batch_op.alter_column('redirect_uri', existing_type=sa.VARCHAR(length=255), nullable=True)
oauth2token = sa.Table('oauth2token', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('client_db_id', sa.Integer(), nullable=False),
sa.Column('token_type', sa.String(length=40), nullable=False),
sa.Column('access_token', sa.String(length=255), nullable=False),
sa.Column('refresh_token', sa.String(length=255), nullable=False),
sa.Column('expires', sa.DateTime(), nullable=False),
sa.Column('_scopes', sa.Text(), nullable=False),
sa.ForeignKeyConstraint(['client_db_id'], ['oauth2client.db_id'], name=op.f('fk_oauth2token_client_db_id_oauth2client'), onupdate='CASCADE', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_oauth2token_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_oauth2token')),
sa.UniqueConstraint('access_token', name=op.f('uq_oauth2token_access_token')),
sa.UniqueConstraint('refresh_token', name=op.f('uq_oauth2token_refresh_token'))
)
with op.batch_alter_table('oauth2token', copy_from=oauth2token) as batch_op:
batch_op.add_column(sa.Column('claims', sa.Text(), nullable=True))
def downgrade():
meta = sa.MetaData(bind=op.get_bind())
oauth2token = sa.Table('oauth2token', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('client_db_id', sa.Integer(), nullable=False),
sa.Column('token_type', sa.String(length=40), nullable=False),
sa.Column('access_token', sa.String(length=255), nullable=False),
sa.Column('refresh_token', sa.String(length=255), nullable=False),
sa.Column('expires', sa.DateTime(), nullable=False),
sa.Column('_scopes', sa.Text(), nullable=False),
sa.Column('claims', sa.Text(), nullable=True),
sa.ForeignKeyConstraint(['client_db_id'], ['oauth2client.db_id'], name=op.f('fk_oauth2token_client_db_id_oauth2client'), onupdate='CASCADE', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_oauth2token_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_oauth2token')),
sa.UniqueConstraint('access_token', name=op.f('uq_oauth2token_access_token')),
sa.UniqueConstraint('refresh_token', name=op.f('uq_oauth2token_refresh_token'))
)
with op.batch_alter_table('oauth2token', copy_from=oauth2token) as batch_op:
batch_op.drop_column('claims')
oauth2grant = sa.Table('oauth2grant', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('client_db_id', sa.Integer(), nullable=False),
sa.Column('code', sa.String(length=255), nullable=False),
sa.Column('redirect_uri', sa.String(length=255), nullable=True),
sa.Column('nonce', sa.Text(), nullable=True),
sa.Column('expires', sa.DateTime(), nullable=False),
sa.Column('_scopes', sa.Text(), nullable=False),
sa.Column('claims', sa.Text(), nullable=True),
sa.ForeignKeyConstraint(['client_db_id'], ['oauth2client.db_id'], name=op.f('fk_oauth2grant_client_db_id_oauth2client'), onupdate='CASCADE', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_oauth2grant_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_oauth2grant'))
)
with op.batch_alter_table('oauth2grant', copy_from=oauth2grant) as batch_op:
batch_op.alter_column('redirect_uri', existing_type=sa.VARCHAR(length=255), nullable=False)
batch_op.drop_column('claims')
batch_op.drop_column('nonce')
batch_op.create_index(batch_op.f('ix_oauth2grant_code'), ['code'], unique=False)
op.drop_table('oauth2_key')
"""lower-case mail receive addresses
Revision ID: 042879d5e3ac
Revises: 878b25c4fae7
Create Date: 2022-02-01 20:37:32.103288
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '042879d5e3ac'
down_revision = '878b25c4fae7'
branch_labels = None
depends_on = None
def upgrade():
meta = sa.MetaData(bind=op.get_bind())
mail_receive_address_table = sa.Table('mail_receive_address', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('mail_id', sa.Integer(), nullable=False),
sa.Column('address', sa.String(length=128), nullable=False),
sa.ForeignKeyConstraint(['mail_id'], ['mail.id'], name=op.f('fk_mail_receive_address_mail_id_mail'), onupdate='CASCADE', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_mail_receive_address'))
)
op.execute(mail_receive_address_table.update().values(address=sa.func.lower(mail_receive_address_table.c.address)))
def downgrade():
pass
"""add expires attribute to ratelimit_event
Revision ID: 09d2edcaf0cc
Revises: af07cea65391
Create Date: 2022-02-15 14:16:19.318253
"""
from alembic import op
import sqlalchemy as sa
revision = '09d2edcaf0cc'
down_revision = 'af07cea65391'
branch_labels = None
depends_on = None
def upgrade():
meta = sa.MetaData(bind=op.get_bind())
ratelimit_event = sa.Table('ratelimit_event', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('timestamp', sa.DateTime(), nullable=True),
sa.Column('name', sa.String(length=128), nullable=True),
sa.Column('key', sa.String(length=128), nullable=True),
sa.PrimaryKeyConstraint('id', name=op.f('pk_ratelimit_event'))
)
op.execute(ratelimit_event.delete())
with op.batch_alter_table('ratelimit_event', copy_from=ratelimit_event) as batch_op:
batch_op.add_column(sa.Column('expires', sa.DateTime(), nullable=False))
batch_op.alter_column('name', existing_type=sa.VARCHAR(length=128), nullable=False)
batch_op.alter_column('timestamp', existing_type=sa.DATETIME(), nullable=False)
def downgrade():
meta = sa.MetaData(bind=op.get_bind())
ratelimit_event = sa.Table('ratelimit_event', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('timestamp', sa.DateTime(), nullable=False),
sa.Column('expires', sa.DateTime(), nullable=False),
sa.Column('name', sa.String(length=128), nullable=False),
sa.Column('key', sa.String(length=128), nullable=True),
sa.PrimaryKeyConstraint('id', name=op.f('pk_ratelimit_event'))
)
op.execute(ratelimit_event.delete())
with op.batch_alter_table('ratelimit_event', schema=None) as batch_op:
batch_op.alter_column('timestamp', existing_type=sa.DATETIME(), nullable=True)
batch_op.alter_column('name', existing_type=sa.VARCHAR(length=128), nullable=True)
batch_op.drop_column('expires')
"""MySQL compat fixes
Revision ID: 11ecc8f1ac3b
Revises: bf71799b7b9e
Create Date: 2021-09-13 04:15:07.479295
"""
from alembic import op
import sqlalchemy as sa
revision = '11ecc8f1ac3b'
down_revision = 'bf71799b7b9e'
branch_labels = None
depends_on = None
def upgrade():
meta = sa.MetaData(bind=op.get_bind())
table = sa.Table('device_login_confirmation', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('initiation_id', sa.Integer(), nullable=False),
sa.Column('user_dn', sa.String(length=128), nullable=False),
sa.Column('code0', sa.String(length=32), nullable=False),
sa.Column('code1', sa.String(length=32), nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('pk_device_login_confirmation')),
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
pass
table = sa.Table('invite_signup', meta,
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('invite_id', sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('pk_invite_signup'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
pass
meta = sa.MetaData(bind=op.get_bind())
# Previously "fk_device_login_confirmation_initiation_id_" was named
# "fk_device_login_confirmation_initiation_id_device_login_initiation"
# but this was too long for MySQL.
table = sa.Table('device_login_confirmation', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('initiation_id', sa.Integer(), nullable=False),
sa.Column('user_dn', sa.String(length=128), nullable=False),
sa.Column('code0', sa.String(length=32), nullable=False),
sa.Column('code1', sa.String(length=32), nullable=False),
sa.ForeignKeyConstraint(['initiation_id'], ['device_login_initiation.id'], name='fk_device_login_confirmation_initiation_id_'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_device_login_confirmation')),
sa.UniqueConstraint('initiation_id', 'code0', name='uq_device_login_confirmation_initiation_id_code0'),
sa.UniqueConstraint('initiation_id', 'code1', name='uq_device_login_confirmation_initiation_id_code1'),
sa.UniqueConstraint('user_dn', name=op.f('uq_device_login_confirmation_user_dn'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
pass
# Previously "fk_invite_signup_id_signup" was named
# "fk_invite_signup_signup_id_signup" by mistake.
table = sa.Table('invite_signup', meta,
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('invite_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['id'], ['signup.id'], name=op.f('fk_invite_signup_id_signup')),
sa.ForeignKeyConstraint(['invite_id'], ['invite.id'], name=op.f('fk_invite_signup_invite_id_invite')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_invite_signup'))
)
with op.batch_alter_table(table.name, copy_from=table, recreate='always') as batch_op:
pass
def downgrade():
pass
"""Deactivate users
Revision ID: 23293f32b503
Revises: e249233e2a31
Create Date: 2022-11-10 02:06:27.766520
"""
from alembic import op
import sqlalchemy as sa
revision = '23293f32b503'
down_revision = 'e249233e2a31'
branch_labels = None
depends_on = None
def upgrade():
meta = sa.MetaData(bind=op.get_bind())
with op.batch_alter_table('service', schema=None) as batch_op:
batch_op.add_column(sa.Column('hide_deactivated_users', sa.Boolean(create_constraint=True), nullable=False, server_default=sa.false()))
service = sa.Table('service', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('limit_access', sa.Boolean(create_constraint=True), nullable=False),
sa.Column('access_group_id', sa.Integer(), nullable=True),
sa.Column('remailer_mode', sa.Enum('DISABLED', 'ENABLED_V1', 'ENABLED_V2', create_constraint=True, name='remailermode'), nullable=False),
sa.Column('enable_email_preferences', sa.Boolean(create_constraint=True), nullable=False),
sa.Column('hide_deactivated_users', sa.Boolean(create_constraint=True), nullable=False, server_default=sa.false()),
sa.ForeignKeyConstraint(['access_group_id'], ['group.id'], name=op.f('fk_service_access_group_id_group'), onupdate='CASCADE', ondelete='SET NULL'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_service')),
sa.UniqueConstraint('name', name=op.f('uq_service_name'))
)
with op.batch_alter_table('service', copy_from=service) as batch_op:
batch_op.alter_column('hide_deactivated_users', server_default=None)
with op.batch_alter_table('user', schema=None) as batch_op:
batch_op.add_column(sa.Column('is_deactivated', sa.Boolean(create_constraint=True), nullable=False, server_default=sa.false()))
user = sa.Table('user', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('unix_uid', sa.Integer(), nullable=False),
sa.Column('loginname', sa.String(length=32), nullable=False),
sa.Column('displayname', sa.String(length=128), nullable=False),
sa.Column('primary_email_id', sa.Integer(), nullable=False),
sa.Column('recovery_email_id', sa.Integer(), nullable=True),
sa.Column('pwhash', sa.Text(), nullable=True),
sa.Column('is_service_user', sa.Boolean(create_constraint=True), nullable=False),
sa.Column('is_deactivated', sa.Boolean(create_constraint=True), nullable=False, server_default=sa.false()),
sa.ForeignKeyConstraint(['primary_email_id'], ['user_email.id'], name=op.f('fk_user_primary_email_id_user_email'), onupdate='CASCADE'),
sa.ForeignKeyConstraint(['recovery_email_id'], ['user_email.id'], name=op.f('fk_user_recovery_email_id_user_email'), onupdate='CASCADE', ondelete='SET NULL'),
sa.ForeignKeyConstraint(['unix_uid'], ['uid_allocation.id'], name=op.f('fk_user_unix_uid_uid_allocation')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_user')),
sa.UniqueConstraint('loginname', name=op.f('uq_user_loginname')),
sa.UniqueConstraint('unix_uid', name=op.f('uq_user_unix_uid'))
)
with op.batch_alter_table('user', copy_from=user) as batch_op:
batch_op.alter_column('is_deactivated', server_default=None)
def downgrade():
meta = sa.MetaData(bind=op.get_bind())
user = sa.Table('user', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('unix_uid', sa.Integer(), nullable=False),
sa.Column('loginname', sa.String(length=32), nullable=False),
sa.Column('displayname', sa.String(length=128), nullable=False),
sa.Column('primary_email_id', sa.Integer(), nullable=False),
sa.Column('recovery_email_id', sa.Integer(), nullable=True),
sa.Column('pwhash', sa.Text(), nullable=True),
sa.Column('is_service_user', sa.Boolean(create_constraint=True), nullable=False),
sa.Column('is_deactivated', sa.Boolean(create_constraint=True), nullable=False),
sa.ForeignKeyConstraint(['primary_email_id'], ['user_email.id'], name=op.f('fk_user_primary_email_id_user_email'), onupdate='CASCADE'),
sa.ForeignKeyConstraint(['recovery_email_id'], ['user_email.id'], name=op.f('fk_user_recovery_email_id_user_email'), onupdate='CASCADE', ondelete='SET NULL'),
sa.ForeignKeyConstraint(['unix_uid'], ['uid_allocation.id'], name=op.f('fk_user_unix_uid_uid_allocation')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_user')),
sa.UniqueConstraint('loginname', name=op.f('uq_user_loginname')),
sa.UniqueConstraint('unix_uid', name=op.f('uq_user_unix_uid'))
)
with op.batch_alter_table('user', schema=None) as batch_op:
batch_op.drop_column('is_deactivated')
service = sa.Table('service', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('limit_access', sa.Boolean(create_constraint=True), nullable=False),
sa.Column('access_group_id', sa.Integer(), nullable=True),
sa.Column('remailer_mode', sa.Enum('DISABLED', 'ENABLED_V1', 'ENABLED_V2', create_constraint=True, name='remailermode'), nullable=False),
sa.Column('enable_email_preferences', sa.Boolean(create_constraint=True), nullable=False),
sa.Column('hide_deactivated_users', sa.Boolean(create_constraint=True), nullable=False),
sa.ForeignKeyConstraint(['access_group_id'], ['group.id'], name=op.f('fk_service_access_group_id_group'), onupdate='CASCADE', ondelete='SET NULL'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_service')),
sa.UniqueConstraint('name', name=op.f('uq_service_name'))
)
with op.batch_alter_table('service', copy_from=service) as batch_op:
batch_op.drop_column('hide_deactivated_users')
"""added missing oauth2grant.code index
Revision ID: 2a6b1fb82ce6
Revises: cbca20cf64d9
Create Date: 2021-04-13 23:03:46.280189
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '2a6b1fb82ce6'
down_revision = 'cbca20cf64d9'
branch_labels = None
depends_on = None
def upgrade():
with op.batch_alter_table('oauth2grant', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_oauth2grant_code'), ['code'], unique=False)
def downgrade():
with op.batch_alter_table('oauth2grant', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_oauth2grant_code'))
"""Remailer v2
Revision ID: 2b68f688bec1
Revises: e13b733ec856
Create Date: 2022-10-20 03:40:11.522343
"""
from alembic import op
import sqlalchemy as sa
revision = '2b68f688bec1'
down_revision = 'e13b733ec856'
branch_labels = None
depends_on = None
def upgrade():
with op.batch_alter_table('service', schema=None) as batch_op:
batch_op.add_column(sa.Column('remailer_mode', sa.Enum('DISABLED', 'ENABLED_V1', 'ENABLED_V2', create_constraint=True, name='remailermode'), nullable=False, server_default='DISABLED'))
service = sa.table('service',
sa.column('id', sa.Integer),
sa.column('use_remailer', sa.Boolean(create_constraint=True)),
sa.column('remailer_mode', sa.Enum('DISABLED', 'ENABLED_V1', 'ENABLED_V2', create_constraint=True, name='remailermode')),
)
op.execute(service.update().values(remailer_mode='ENABLED_V1').where(service.c.use_remailer))
meta = sa.MetaData(bind=op.get_bind())
service = sa.Table('service', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('limit_access', sa.Boolean(create_constraint=True), nullable=False),
sa.Column('access_group_id', sa.Integer(), nullable=True),
sa.Column('use_remailer', sa.Boolean(create_constraint=True), nullable=False),
sa.Column('enable_email_preferences', sa.Boolean(create_constraint=True), nullable=False),
sa.Column('remailer_mode', sa.Enum('DISABLED', 'ENABLED_V1', 'ENABLED_V2', create_constraint=True, name='remailermode'), nullable=False, server_default='DISABLED'),
sa.ForeignKeyConstraint(['access_group_id'], ['group.id'], name=op.f('fk_service_access_group_id_group'), onupdate='CASCADE', ondelete='SET NULL'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_service')),
sa.UniqueConstraint('name', name=op.f('uq_service_name'))
)
with op.batch_alter_table('service', copy_from=service) as batch_op:
batch_op.alter_column('remailer_mode', server_default=None)
batch_op.drop_column('use_remailer')
def downgrade():
with op.batch_alter_table('service', schema=None) as batch_op:
batch_op.add_column(sa.Column('use_remailer', sa.BOOLEAN(), nullable=False, server_default=sa.false()))
service = sa.table('service',
sa.column('id', sa.Integer),
sa.column('use_remailer', sa.Boolean(create_constraint=True)),
sa.column('remailer_mode', sa.Enum('DISABLED', 'ENABLED_V1', 'ENABLED_V2', create_constraint=True, name='remailermode')),
)
op.execute(service.update().values(use_remailer=sa.true()).where(service.c.remailer_mode != 'DISABLED'))
meta = sa.MetaData(bind=op.get_bind())
service = sa.Table('service', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('limit_access', sa.Boolean(create_constraint=True), nullable=False),
sa.Column('access_group_id', sa.Integer(), nullable=True),
sa.Column('use_remailer', sa.Boolean(create_constraint=True), nullable=False, server_default=sa.false()),
sa.Column('enable_email_preferences', sa.Boolean(create_constraint=True), nullable=False),
sa.Column('remailer_mode', sa.Enum('DISABLED', 'ENABLED_V1', 'ENABLED_V2', create_constraint=True, name='remailermode'), nullable=False),
sa.ForeignKeyConstraint(['access_group_id'], ['group.id'], name=op.f('fk_service_access_group_id_group'), onupdate='CASCADE', ondelete='SET NULL'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_service')),
sa.UniqueConstraint('name', name=op.f('uq_service_name'))
)
with op.batch_alter_table('service', copy_from=service) as batch_op:
batch_op.alter_column('use_remailer', server_default=None)
batch_op.drop_column('remailer_mode')
"""Unique email addresses
Revision ID: 468995a9c9ee
Revises: 2b68f688bec1
Create Date: 2022-10-21 01:25:01.469670
"""
import unicodedata
from alembic import op
import sqlalchemy as sa
revision = '468995a9c9ee'
down_revision = '2b68f688bec1'
branch_labels = None
depends_on = None
def normalize_address(value):
return unicodedata.normalize('NFKC', value).lower().strip()
def iter_rows_paged(table, pk='id', limit=1000):
conn = op.get_bind()
pk_column = getattr(table.c, pk)
last_pk = None
while True:
expr = table.select().order_by(pk_column).limit(limit)
if last_pk is not None:
expr = expr.where(pk_column > last_pk)
result = conn.execute(expr)
pk_index = list(result.keys()).index(pk)
rows = result.fetchall()
if not rows:
break
yield from rows
last_pk = rows[-1][pk_index]
def upgrade():
with op.batch_alter_table('user_email', schema=None) as batch_op:
batch_op.add_column(sa.Column('address_normalized', sa.String(length=128), nullable=True))
batch_op.add_column(sa.Column('enable_strict_constraints', sa.Boolean(create_constraint=True), nullable=True))
batch_op.alter_column('verified', existing_type=sa.Boolean(create_constraint=True), nullable=True)
meta = sa.MetaData(bind=op.get_bind())
user_email_table = sa.Table('user_email', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('user_id', sa.Integer(), nullable=True),
sa.Column('address', sa.String(length=128), nullable=False),
sa.Column('address_normalized', sa.String(length=128), nullable=True),
sa.Column('enable_strict_constraints', sa.Boolean(create_constraint=True), nullable=True),
sa.Column('verified', sa.Boolean(create_constraint=True), nullable=True),
sa.Column('verification_legacy_id', sa.Integer(), nullable=True),
sa.Column('verification_secret', sa.Text(), nullable=True),
sa.Column('verification_expires', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_user_email_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_user_email')),
sa.UniqueConstraint('user_id', 'address', name='uq_user_email_user_id_address')
)
for row in iter_rows_paged(user_email_table):
id = row[0]
address = row[2]
verified = row[5]
op.execute(user_email_table.update()\
.where(user_email_table.c.id == id)\
.values(
address_normalized=normalize_address(address),
verified=(True if verified else None)
)
)
with op.batch_alter_table('user_email', copy_from=user_email_table) as batch_op:
batch_op.alter_column('address_normalized', existing_type=sa.String(length=128), nullable=False)
batch_op.create_unique_constraint('uq_user_email_address_normalized_verified', ['address_normalized', 'verified', 'enable_strict_constraints'])
batch_op.create_unique_constraint('uq_user_email_user_id_address_normalized', ['user_id', 'address_normalized', 'enable_strict_constraints'])
op.create_table('feature_flag',
sa.Column('name', sa.String(32), nullable=False),
sa.PrimaryKeyConstraint('name', name=op.f('pk_feature_flag')),
)
def downgrade():
meta = sa.MetaData(bind=op.get_bind())
user_email_table = sa.Table('user_email', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('user_id', sa.Integer(), nullable=True),
sa.Column('address', sa.String(length=128), nullable=False),
sa.Column('address_normalized', sa.String(length=128), nullable=False),
sa.Column('enable_strict_constraints', sa.Boolean(create_constraint=True), nullable=True),
sa.Column('verified', sa.Boolean(create_constraint=True), nullable=True),
sa.Column('verification_legacy_id', sa.Integer(), nullable=True),
sa.Column('verification_secret', sa.Text(), nullable=True),
sa.Column('verification_expires', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_user_email_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_user_email')),
sa.UniqueConstraint('user_id', 'address', name='uq_user_email_user_id_address'),
sa.UniqueConstraint('address_normalized', 'verified', 'enable_strict_constraints', name='uq_user_email_address_normalized_verified'),
sa.UniqueConstraint('user_id', 'address_normalized', 'enable_strict_constraints', name='uq_user_email_user_id_address_normalized')
)
op.execute(user_email_table.update().where(user_email_table.c.verified == None).values(verified=False))
with op.batch_alter_table('user_email', copy_from=user_email_table) as batch_op:
batch_op.drop_constraint('uq_user_email_user_id_address_normalized', type_='unique')
batch_op.drop_constraint('uq_user_email_address_normalized_verified', type_='unique')
batch_op.alter_column('verified', existing_type=sa.Boolean(create_constraint=True), nullable=False)
batch_op.drop_column('enable_strict_constraints')
batch_op.drop_column('address_normalized')
op.drop_table('feature_flag')
"""Unified password hashing for recovery codes
Revision ID: 4bd316207e59
Revises: e71e29cc605a
Create Date: 2024-05-22 03:13:55.917641
"""
from alembic import op
import sqlalchemy as sa
revision = '4bd316207e59'
down_revision = 'e71e29cc605a'
branch_labels = None
depends_on = None
def upgrade():
meta = sa.MetaData(bind=op.get_bind())
mfa_method = sa.Table('mfa_method', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('type', sa.Enum('RECOVERY_CODE', 'TOTP', 'WEBAUTHN', name='mfatype', create_constraint=True), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('name', sa.String(length=128), nullable=True),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('recovery_salt', sa.String(length=64), nullable=True),
sa.Column('recovery_hash', sa.String(length=256), nullable=True),
sa.Column('totp_key', sa.String(length=64), nullable=True),
sa.Column('totp_last_counter', sa.Integer(), nullable=True),
sa.Column('webauthn_cred', sa.Text(), nullable=True),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_mfa_method_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_mfa_method'))
)
# This field was already unused before the change to unified password hashing. So this is unrelated cleanup.
with op.batch_alter_table('mfa_method', copy_from=mfa_method) as batch_op:
batch_op.drop_column('recovery_salt')
op.execute(mfa_method.update().values(recovery_hash=('{crypt}' + mfa_method.c.recovery_hash)).where(mfa_method.c.type == 'RECOVERY_CODE'))
def downgrade():
meta = sa.MetaData(bind=op.get_bind())
mfa_method = sa.Table('mfa_method', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('type', sa.Enum('RECOVERY_CODE', 'TOTP', 'WEBAUTHN', name='mfatype', create_constraint=True), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('name', sa.String(length=128), nullable=True),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('recovery_hash', sa.String(length=256), nullable=True),
sa.Column('totp_key', sa.String(length=64), nullable=True),
sa.Column('totp_last_counter', sa.Integer(), nullable=True),
sa.Column('webauthn_cred', sa.Text(), nullable=True),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_mfa_method_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id', name=op.f('pk_mfa_method'))
)
with op.batch_alter_table('mfa_method', copy_from=mfa_method) as batch_op:
batch_op.add_column(sa.Column('recovery_salt', sa.VARCHAR(length=64), nullable=True))
op.execute(
mfa_method.delete().where(sa.and_(
mfa_method.c.type == 'RECOVERY_CODE',
sa.not_(mfa_method.c.recovery_hash.ilike('{crypt}%'))
))
)
op.execute(
mfa_method.update().values(
recovery_hash=sa.func.substr(mfa_method.c.recovery_hash, len('{crypt}') + 1)
).where(sa.and_(
mfa_method.c.type == 'RECOVERY_CODE',
mfa_method.c.recovery_hash.ilike('{crypt}%')
))
)
"""invite pk change
Revision ID: 54b2413586fd
Revises: 2a6b1fb82ce6
Create Date: 2021-04-13 23:33:40.118507
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '54b2413586fd'
down_revision = '2a6b1fb82ce6'
branch_labels = None
depends_on = None
invite = sa.sql.table('invite',
sa.sql.column('id', sa.Integer()),
sa.sql.column('token', sa.String(length=128))
)
invite_grant = sa.sql.table('invite_grant',
sa.sql.column('invite_id', sa.Integer()),
sa.sql.column('invite_token', sa.String(length=128))
)
invite_roles = sa.sql.table('invite_roles',
sa.sql.column('invite_id', sa.Integer()),
sa.sql.column('invite_token', sa.String(length=128))
)
invite_signup = sa.sql.table('invite_signup',
sa.sql.column('invite_id', sa.Integer()),
sa.sql.column('invite_token', sa.String(length=128))
)
def upgrade():
# CHECK constraints get lost when reflecting from the actual table
meta = sa.MetaData(bind=op.get_bind())
table = sa.Table('invite', meta,
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('valid_until', sa.DateTime(), nullable=False),
sa.Column('single_use', sa.Boolean(create_constraint=True, name=op.f('ck_invite_single_use')), nullable=False),
sa.Column('allow_signup', sa.Boolean(create_constraint=True, name=op.f('ck_invite_allow_signup')), nullable=False),
sa.Column('used', sa.Boolean(create_constraint=True, name=op.f('ck_invite_used')), nullable=False),
sa.Column('disabled', sa.Boolean(create_constraint=True, name=op.f('ck_invite_disabled')), nullable=False),
sa.PrimaryKeyConstraint('token', name=op.f('pk_invite'))
)
with op.batch_alter_table('invite_grant', schema=None) as batch_op:
batch_op.drop_constraint('fk_invite_grant_invite_token_invite', type_='foreignkey')
with op.batch_alter_table('invite_roles', schema=None) as batch_op:
batch_op.drop_constraint('fk_invite_roles_invite_token_invite', type_='foreignkey')
with op.batch_alter_table('invite_signup', schema=None) as batch_op:
batch_op.drop_constraint('fk_invite_signup_invite_token_invite', type_='foreignkey')
with op.batch_alter_table('invite', copy_from=table, recreate='always') as batch_op:
batch_op.drop_constraint(batch_op.f('pk_invite'), type_='primary')
batch_op.add_column(sa.Column('id', sa.Integer(), nullable=True))
batch_op.create_primary_key(batch_op.f('pk_invite'), ['id'])
batch_op.alter_column('id', autoincrement=True, nullable=False, existing_type=sa.Integer())
batch_op.create_unique_constraint(batch_op.f('uq_invite_token'), ['token'])
with op.batch_alter_table('invite_grant', schema=None) as batch_op:
batch_op.add_column(sa.Column('invite_id', sa.Integer(), nullable=True))
with op.batch_alter_table('invite_roles', schema=None) as batch_op:
batch_op.add_column(sa.Column('invite_id', sa.Integer(), nullable=True))
with op.batch_alter_table('invite_signup', schema=None) as batch_op:
batch_op.add_column(sa.Column('invite_id', sa.Integer(), nullable=True))
op.execute(invite_grant.update().values(invite_id=sa.select([invite.c.id]).where(invite.c.token==invite_grant.c.invite_token).as_scalar()))
op.execute(invite_roles.update().values(invite_id=sa.select([invite.c.id]).where(invite.c.token==invite_roles.c.invite_token).as_scalar()))
op.execute(invite_signup.update().values(invite_id=sa.select([invite.c.id]).where(invite.c.token==invite_signup.c.invite_token).as_scalar()))
with op.batch_alter_table('invite_grant', schema=None) as batch_op:
batch_op.alter_column('invite_id', existing_type=sa.INTEGER(), nullable=False)
batch_op.create_foreign_key(batch_op.f('fk_invite_grant_invite_id_invite'), 'invite', ['invite_id'], ['id'])
batch_op.drop_column('invite_token')
with op.batch_alter_table('invite_roles', schema=None) as batch_op:
batch_op.drop_constraint(batch_op.f('pk_invite_roles'), type_='primary')
batch_op.create_primary_key(batch_op.f('pk_invite_roles'), ['invite_id', 'role_id'])
batch_op.create_foreign_key(batch_op.f('fk_invite_roles_invite_id_invite'), 'invite', ['invite_id'], ['id'])
batch_op.drop_column('invite_token')
with op.batch_alter_table('invite_signup', schema=None) as batch_op:
batch_op.alter_column('invite_id', existing_type=sa.INTEGER(), nullable=False)
batch_op.create_foreign_key(batch_op.f('fk_invite_signup_invite_id_invite'), 'invite', ['invite_id'], ['id'])
batch_op.drop_column('invite_token')
def downgrade():
with op.batch_alter_table('invite_signup', schema=None) as batch_op:
batch_op.drop_constraint(batch_op.f('fk_invite_signup_invite_id_invite'), type_='foreignkey')
batch_op.add_column(sa.Column('invite_token', sa.VARCHAR(length=128), nullable=True))
with op.batch_alter_table('invite_roles', schema=None) as batch_op:
batch_op.drop_constraint(batch_op.f('fk_invite_roles_invite_id_invite'), type_='foreignkey')
batch_op.add_column(sa.Column('invite_token', sa.VARCHAR(length=128), nullable=True))
with op.batch_alter_table('invite_grant', schema=None) as batch_op:
batch_op.drop_constraint(batch_op.f('fk_invite_grant_invite_id_invite'), type_='foreignkey')
batch_op.add_column(sa.Column('invite_token', sa.VARCHAR(length=128), nullable=True))
op.execute(invite_grant.update().values(invite_token=sa.select([invite.c.token]).where(invite.c.id==invite_grant.c.invite_id).as_scalar()))
op.execute(invite_roles.update().values(invite_token=sa.select([invite.c.token]).where(invite.c.id==invite_roles.c.invite_id).as_scalar()))
op.execute(invite_signup.update().values(invite_token=sa.select([invite.c.token]).where(invite.c.id==invite_signup.c.invite_id).as_scalar()))
with op.batch_alter_table('invite_signup', schema=None) as batch_op:
batch_op.alter_column('invite_token', existing_type=sa.VARCHAR(length=128), nullable=False)
batch_op.drop_column('invite_id')
with op.batch_alter_table('invite_roles', schema=None) as batch_op:
batch_op.alter_column('invite_token', existing_type=sa.VARCHAR(length=128), nullable=False)
batch_op.drop_constraint(batch_op.f('pk_invite_roles'), type_='primary')
batch_op.create_primary_key(batch_op.f('pk_invite_roles'), ['invite_token', 'role_id'])
batch_op.drop_column('invite_id')
with op.batch_alter_table('invite_grant', schema=None) as batch_op:
batch_op.alter_column('invite_token', existing_type=sa.VARCHAR(length=128), nullable=False)
batch_op.drop_column('invite_id')
# CHECK constraints get lost when reflecting from the actual table
meta = sa.MetaData(bind=op.get_bind())
table = sa.Table('invite', meta,
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=128), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('valid_until', sa.DateTime(), nullable=False),
sa.Column('single_use', sa.Boolean(create_constraint=True, name=op.f('ck_invite_single_use')), nullable=False),
sa.Column('allow_signup', sa.Boolean(create_constraint=True, name=op.f('ck_invite_allow_signup')), nullable=False),
sa.Column('used', sa.Boolean(create_constraint=True, name=op.f('ck_invite_used')), nullable=False),
sa.Column('disabled', sa.Boolean(create_constraint=True, name=op.f('ck_invite_disabled')), nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('pk_invite')),
sa.UniqueConstraint('token', name=op.f('uq_invite_token'))
)
with op.batch_alter_table('invite', copy_from=table, recreate='always') as batch_op:
batch_op.drop_constraint(batch_op.f('uq_invite_token'), type_='unique')
batch_op.alter_column('id', autoincrement=False, existing_type=sa.Integer())
batch_op.drop_constraint(batch_op.f('pk_invite'), type_='primary')
batch_op.drop_column('id')
batch_op.create_primary_key(batch_op.f('pk_invite'), ['token'])
with op.batch_alter_table('invite_signup', schema=None) as batch_op:
batch_op.create_foreign_key('fk_invite_signup_invite_token_invite', 'invite', ['invite_token'], ['token'])
with op.batch_alter_table('invite_roles', schema=None) as batch_op:
batch_op.create_foreign_key('fk_invite_roles_invite_token_invite', 'invite', ['invite_token'], ['token'])
with op.batch_alter_table('invite_grant', schema=None) as batch_op:
batch_op.create_foreign_key('fk_invite_grant_invite_token_invite', 'invite', ['invite_token'], ['token'])
"""Role inclusion
Revision ID: 5a07d4a63b64
Revises: a29870f95175
Create Date: 2021-04-05 15:00:26.205433
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '5a07d4a63b64'
down_revision = 'a29870f95175'
branch_labels = None
depends_on = None
def upgrade():
op.create_table('role-inclusion',
sa.Column('role_id', sa.Integer(), nullable=False),
sa.Column('included_role_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['included_role_id'], ['role.id'], ),
sa.ForeignKeyConstraint(['role_id'], ['role.id'], ),
sa.PrimaryKeyConstraint('role_id', 'included_role_id')
)
def downgrade():
op.drop_table('role-inclusion')