Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Show changes
Commits on Source (25)
  • Julian's avatar
    Add user deactivation · 6b2ee671
    Julian authored
    6b2ee671
  • Julian's avatar
    Optimize migration from 53c06069 (New UID/GID allocation approach) · 9545981a
    Julian authored
    Alembic runs migration scripts on SQLite and MariaDB in auto-commit mode, so
    inserting many rows with individual insert statements is extremely slow.
    9545981a
  • Julian's avatar
    Remove unused json encoder customizations · 1ed39c8e
    Julian authored
    1ed39c8e
  • thies's avatar
    Updated Key & README · c668919f
    thies authored and Julian's avatar Julian committed
    c668919f
  • Julian's avatar
    PEP 440 conformance for development builds · ee8db499
    Julian authored
    Recent setuptools releases refuse to build packages with invalid version
    strings. So instead of using the bare commit hash as the version, we now
    build proper version strings like X.Y.Z.dev-git.COMMIT for CI development
    builds and X.Y.Z for release builds (same as before).
    ee8db499
  • Julian's avatar
    c2b30f17
  • Julian's avatar
    Debian Bookworm support · 0d870ee1
    Julian authored
    - Add CI tests for Bookworm
    - Disable pylint deprecation warnings for crypt
    - Mitigate Flask changes that broke a few tests
    - Set create_constraint=True for Booleans/Enums to mitigate SQLAlchemy changes
    - Mitigate new Alembic CHECK constraint behaviour in batch mode
    0d870ee1
  • Julian's avatar
    Use Debian Bookworm for CI builds · 409d7e66
    Julian authored
    - Fix apt package build on Bookworm
    - Adapt babel.cfg to jinja 3.x.x and break compatability with older versions
    409d7e66
  • Julian's avatar
    Prevent TOTP code reuse · 7a94d7de
    Julian authored
    Time-based one-time password (TOTP) codes are only valid for a short period
    of time. In addition they are meant to be single-use to make them more
    resistant against phishing and eavesdropping (e.g. keyloggers). Prior to this
    change uffd did not keep track of used codes and thus did not prevent code
    reuse.
    7a94d7de
  • Julian's avatar
    Fix OAuth2 authorization code invalidation · 4457282d
    Julian authored
    9bfd6f81 changed the format of authorization codes, but did not adapt the
    invalidation code accordingly. Because of this, authorization codes were
    not invalidated and could have been used multiple times to request access
    tokens until expiring.
    4457282d
  • byteplow's avatar
    Dark mode · 16f5ae99
    byteplow authored and Julian's avatar Julian committed
    
    Automatically enabled based on OS/browser settings (prefers-color-scheme
    CSS media query)
    
    Co-authored-by: default avatarJulian Rother <julian@cccv.de>
    16f5ae99
  • Julian's avatar
    Fix SECRET_KEY auto-generation in debug mode · a662ceb2
    Julian authored
    Compatibility fix for Flask v2 (Debian Bookworm) and newer
    a662ceb2
  • Julian's avatar
    Fix ORM relationship conflict warnings · 4736d5a3
    Julian authored
    SQLAlchemy v1.4 (Debian Bookworm) annoyingly warns about overlapping
    user/mfa_method relationships.
    
    Fixes #146
    4736d5a3
  • Julian's avatar
    Fix autocomplete behaviour in Firefox · ccc90a8f
    Julian authored
    Firefox autofills all type="password" inputs with passwords from its built-in
    password store. This breaks usability of admin pages.
    
    This change fixes that by adding autocomplete="new-password" to these inputs.
    It also adds appropriate autocomplete attributes to other forms/inputs to
    improve autocomplete behaviour across browsers:
    
    - autocomplete="off" on all non-login/signup/selfservice forms
    - autocomplete="new-password" or autocomplete="current-password" on all
      type="password" inputs to workaround Firefox's misdetection
    - autocomplete="username"/"email"/"nickname" on login/signup/selfservice inputs
      wherever appropriate
    - Avoid type="password" where possible (e.g. on readonly fields)
    ccc90a8f
  • Julian's avatar
    Fix ORM cartesian product warnings · 94ba8b9c
    Julian authored
    SQLAlchemy v1.4 (Debian Bookworm) annoyingly warns about select statements
    that result in a cartesion product of multiple tables. We actually want
    cartesion products in all affected cases, so we change "SELECT FROM a,b" to
    the equivalent "SELECT FROM a JOIN b ON TRUE".
    
    See https://docs.sqlalchemy.org/en/14/changelog/migration_14.html
    94ba8b9c
  • Julian's avatar
    OpenID Connect Core 1.0 and Discovery 1.0 support · edd4f4ca
    Julian authored
    Limited to OpenID provider conformance profiles "Basic" and "Config":
    
    - Support for features mandatory to implement for all OpenID Providers,
      not the feature set for Dynamic OpenID Providers
    - Only Authorization Code Flow, no support for Implicit/Hybrid Flow
    - Only code response type, no support for token/id_token
    - Server metadata is served at /.well-known/openid-configuration
    
    Additional/optional features:
    
    - Support for "claims" parameter
    - Support for standard scopes "profile" and "email"
    - Support for non-standard scope/claim "groups" (in violation of RFC 9068)
    
    Compatability with existing (working) uffd client setups: Authorization
    requests without the "openid" scope behave the same as before  Prior to this
    change authorization requests with the "openid" scope were rejected by uffd.
    
    This change adds direct dependencies to pyjwt and cryptography. Prior to this
    change both were already transitive dependencies of oauthlib.
    edd4f4ca
  • Julian's avatar
    Revokable server-side sessions · bbd251f7
    Julian authored
    bbd251f7
  • Julian's avatar
    636169e5
  • Julian's avatar
    Fix 2FA selfservice permission checks · 11502833
    Julian authored
    Users with ACL_ACCESS_GROUP but without ACL_SELFSERVICE_GROUP were able to
    access the 2FA setup pages. Like all selfservice pages, these pages should
    only have been accessible to users with ACL_SELFSERVICE_GROUP.
    11502833
  • Julian's avatar
    fefac582
  • Julian's avatar
    Bind device login state to sessions instead of users · 08926d1f
    Julian authored
    Prerequisite for doing the same to OAuth2 state. This is required for
    implementing missing OIDC features later.
    08926d1f
  • Julian's avatar
    Bind OAuth2 state to sessions instead of users · 89f1ecdd
    Julian authored
    Prerequisite for implementing missing OIDC features.
    89f1ecdd
  • eNBeWe's avatar
    Fix OIDC token endpoint crash on Debian Buster/Bullseye · 23b7736a
    eNBeWe authored and Julian's avatar Julian committed
    
    The return type of jwt.encode() changed from bytes in v1.x (Buster/Bullseye)
    to str in v2.x (Bookworm). This let json.dumps crash on Buster und Bullseye
    with "TypeError: Object of type bytes is not JSON serializable".
    
    Flask v1.x (Buster/Bullseye) automatically uses simplejson.dumps instead of
    json.dumps if it is installed. simplejson.dumps auto-converts bytes to str per
    default. simplejson also happend to be installed in our CI images. This
    prevented the bug from surfacing in CI tests. We removed simplejson from our
    CI images in an external change.
    
    Co-authored-by: default avatarJulian Rother <julian@cccv.de>
    23b7736a
  • Julian's avatar
    Fix spinner style in dark mode · c0dfb38a
    Julian authored
    c0dfb38a
  • Julian's avatar
    Unified password hashing for recovery codes · 98fe5690
    Julian authored
    Closes #163
    98fe5690
Showing
with 1643 additions and 444 deletions
image: registry.git.cccv.de/uffd/docker-images/buster
image: registry.git.cccv.de/uffd/docker-images/bookworm
variables:
DEBIAN_FRONTEND: noninteractive
GIT_SUBMODULE_STRATEGY: normal
APT_API_URL: https://packages.cccv.de
APT_REPO: uffd
PYLINT_PIN: pylint~=2.10.0
PYLINT_PIN: pylint~=2.16.2
before_script:
- python3 -V
......@@ -13,7 +13,7 @@ before_script:
- uname -a
- python3 -m pylint --version
- python3 -m coverage --version
- echo "${CI_COMMIT_TAG}" | grep -qE "v[0-9]+[.][0-9]+[.][0-9]+.*" && export UFFD_PACKAGE_VERSION="${CI_COMMIT_TAG#v}" || export UFFD_PACKAGE_VERSION="${CI_COMMIT_SHA}"
- export PACKAGE_VERSION="$(git describe | sed -E -n -e 's/^v([0-9.]*)$/\1/p' -e 's/^v([0-9.]*)-([0-9]*)-g([0-9a-z]*)$/\1.dev+git.\3/p' | grep .)"
.build:
stage: build
......@@ -21,7 +21,7 @@ before_script:
build:pip:
extends: .build
script:
- PACKAGE_VERSION="${UFFD_PACKAGE_VERSION}" python3 -m build
- python3 -m build
artifacts:
paths:
- dist/*
......@@ -75,33 +75,30 @@ linter:bullseye:
reports:
codequality: codeclimate.json
linter:bookworm:
image: registry.git.cccv.de/uffd/docker-images/bookworm
stage: test
needs: []
script:
- pip3 install $PYLINT_PIN pylint-gitlab pylint-flask-sqlalchemy # this force-updates jinja2 and some other packages!
- python3 -m pylint --output-format=pylint_gitlab.GitlabCodeClimateReporter:codeclimate.json,pylint_gitlab.GitlabPagesHtmlReporter:pylint.html,colorized uffd
artifacts:
when: always
paths:
- pylint.html
reports:
codequality: codeclimate.json
tests:buster:sqlite:
image: registry.git.cccv.de/uffd/docker-images/buster
stage: test
needs: []
script:
- rm -rf pages
- mkdir -p pages
- cp -r uffd/static pages/static
- DUMP_PAGES=pages python3-coverage run --include 'uffd/*.py' -m pytest --junitxml=report.xml || touch failed
- sed -i -e 's/href="\/static\//href=".\/static\//g' -e 's/src="\/static\//src=".\/static\//g' pages/*.html || true
- python3-coverage report -m
- python3-coverage html
- python3-coverage xml
- test ! -e failed
- python3 -m pytest --junitxml=report.xml
artifacts:
when: always
paths:
- htmlcov/index.html
- htmlcov
- pages
expose_as: 'Coverage Report'
reports:
coverage_report:
coverage_format: cobertura
path: coverage.xml
junit: report.xml
coverage: '/^TOTAL.*\s+(\d+\%)$/'
tests:buster:mysql:
image: registry.git.cccv.de/uffd/docker-images/buster
......@@ -138,10 +135,50 @@ tests:bullseye:mysql:
reports:
junit: report.xml
tests:bookworm:sqlite:
image: registry.git.cccv.de/uffd/docker-images/bookworm
stage: test
needs: []
script:
- rm -rf pages
- mkdir -p pages
- cp -r uffd/static pages/static
- DUMP_PAGES=pages python3-coverage run --include 'uffd/*.py' -m pytest --junitxml=report.xml || touch failed
- sed -i -e 's/href="\/static\//href=".\/static\//g' -e 's/src="\/static\//src=".\/static\//g' pages/*.html || true
- python3-coverage report -m
- python3-coverage html
- python3-coverage xml
- test ! -e failed
artifacts:
when: always
paths:
- htmlcov/index.html
- htmlcov
- pages
expose_as: 'Coverage Report'
reports:
coverage_report:
coverage_format: cobertura
path: coverage.xml
junit: report.xml
coverage: '/^TOTAL.*\s+(\d+\%)$/'
tests:bookworm:mysql:
image: registry.git.cccv.de/uffd/docker-images/bookworm
stage: test
needs: []
script:
- service mariadb start
- TEST_WITH_MYSQL=1 python3 -m pytest --junitxml=report.xml
artifacts:
when: always
reports:
junit: report.xml
html5validator:
stage: test
needs:
- job: tests:buster:sqlite
- job: tests:bookworm:sqlite
script:
- html5validator --root pages 2>&1 | tee html5validator.log
artifacts:
......@@ -177,6 +214,14 @@ test:package:pip:bullseye:
script:
- pip3 install dist/*.tar.gz
test:package:pip:bookworm:
image: registry.git.cccv.de/uffd/docker-images/bookworm
stage: test
needs:
- job: build:pip
script:
- pip3 install dist/*.tar.gz
# Since we want to test if the package installs correctly on a fresh Debian
# install (has correct dependencies, etc.), we don't use uffd/docker-images
# here
......@@ -210,6 +255,21 @@ test:package:apt:bullseye:
- uffd-admin routes
- curl -Lv 127.0.0.1:5000
test:package:apt:bookworm:
image: ${CI_DEPENDENCY_PROXY_GROUP_IMAGE_PREFIX}/debian:bookworm
stage: test
needs:
- job: build:apt
before_script: []
script:
- apt -y update
- apt -y install curl ./*.deb
- service uwsgi start uffd || ( service uwsgi status uffd ; sleep 15; cat /var/log/uwsgi/app/uffd.log; )
- echo "server { listen 127.0.0.1:5000 default_server; include /etc/uffd/nginx.include.conf; }" > /etc/nginx/sites-enabled/uffd.ini
- service nginx start || ( service nginx status; nginx -t; exit 1; )
- uffd-admin routes
- curl -Lv 127.0.0.1:5000
.publish:
stage: deploy
rules:
......@@ -234,5 +294,6 @@ publish:apt:
- echo Update published repo for all distros
- 'curl --user "${APTLY_API_USER}:${APTLY_API_PW}" -X PUT -H "Content-Type: application/json" --data "{ }" "${APT_API_URL}/api/publish/uffd/buster"'
- 'curl --user "${APTLY_API_USER}:${APTLY_API_PW}" -X PUT -H "Content-Type: application/json" --data "{ }" "${APT_API_URL}/api/publish/uffd/bullseye"'
- 'curl --user "${APTLY_API_USER}:${APTLY_API_PW}" -X PUT -H "Content-Type: application/json" --data "{ }" "${APT_API_URL}/api/publish/uffd/bookworm"'
dependencies:
- build:apt
......@@ -68,6 +68,8 @@ disable=missing-module-docstring,
too-many-ancestors,
duplicate-code,
redefined-builtin,
superfluous-parens,
consider-using-f-string, # Temporary
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
......@@ -386,13 +388,6 @@ max-line-length=160
# Maximum number of lines in a module.
max-module-lines=1000
# List of optional constructs for which whitespace checking is disabled. `dict-
# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}.
# `trailing-comma` allows a space between comma and closing bracket: (a, ).
# `empty-line` allows space-only lines.
no-space-check=trailing-comma,
dict-separator
# Allow the body of a class to be on the same line as the declaration if body
# contains single statement.
single-line-class-stmt=no
......@@ -513,5 +508,5 @@ min-public-methods=2
# Exceptions that will emit a warning when being caught. Defaults to
# "BaseException, Exception".
overgeneral-exceptions=BaseException,
Exception
overgeneral-exceptions=builtin.BaseException,
builtin.Exception
......@@ -15,13 +15,15 @@ Please note that we refer to Debian packages here and **not** pip packages.
- python3-qrcode
- python3-fido2 (version 0.5.0 or 0.9.1, optional)
- python3-prometheus-client (optional, needed for metrics)
- python3-oauthlib
- python3-jwt
- python3-cryptography
- python3-flask-babel
- python3-argon2
- python3-itsdangerous (also a dependency of python3-flask)
- python3-mysqldb or python3-pymysql for MariaDB support
- python3-ua-parser (optional, better user agent parsing)
Some of the dependencies (especially fido2) changed their API in recent versions, so make sure to install the versions from Debian Buster or Bullseye.
Some of the dependencies (especially fido2) changed their API in recent versions, so make sure to install the versions from Debian Bookworm, Bullseye or Buster.
For development, you can also use virtualenv with the supplied `requirements.txt`.
## Development
......@@ -57,7 +59,7 @@ The dependencies of the pip package roughly represent the versions shipped by De
We do not keep them updated and we do not test the pip package!
The pip package only exists for local testing/development and to help build the Debian package.
We provide packages for Debian stable and oldstable (currently Bullseye and Buster).
We provide packages for Debian stable, oldstable and oldoldstable (currently Bookworm, Bullseye and Buster).
Since all dependencies are available in the official package mirrors, you will get security updates for everything but uffd itself from Debian.
To install uffd on Debian Bullseye, add our package mirror to `/etc/sources.list`:
......@@ -66,7 +68,7 @@ To install uffd on Debian Bullseye, add our package mirror to `/etc/sources.list
deb https://packages.cccv.de/uffd bullseye main
```
Then download [cccv-archive-key.gpg](cccv-archive-key.gpg) and add it to the trusted repository keys in `/etc/apt/trusted.gpg.d/`.
Then download [cccv-archive-key.gpg](https://packages.cccv.de/docs/cccv-archive-key.gpg) and add it to the trusted repository keys in `/etc/apt/trusted.gpg.d/`.
Afterwards run `apt update && apt install uffd` to install the package.
The Debian package uses uwsgi to run uffd and ships an `uffd-admin` script to execute flask commands in the correct context.
......@@ -97,6 +99,8 @@ The services need to be setup to use the following URLs with the Authorization C
* `/oauth2/token`: token request endpoint
* `/oauth2/userinfo`: endpoint that provides information about the current user
If the service supports server metadata discovery ([RFC 8414](https://www.rfc-editor.org/rfc/rfc8414)), configuring the base url of your uffd installation or `/.well-known/openid-configuration` as the discovery endpoint should be sufficient.
The only OAuth2 scope supported is `profile`. The userinfo endpoint returns json data with the following structure:
```
......@@ -114,6 +118,48 @@ The only OAuth2 scope supported is `profile`. The userinfo endpoint returns json
`id` is the numeric (Unix) user id, `name` the display name and `nickname` the loginname of the user.
## OpenID Connect Single-Sign-On Provider
In addition to plain OAuth2, uffd also has basic OpenID Connect support.
Endpoint URLs are the same as for plain OAuth2.
OpenID Connect support is enabled by requesting the `openid` scope.
ID token signing keys are served at `/oauth2/keys`.
See [OpenID Connect Core 1.0](https://openid.net/specs/openid-connect-core-1_0.html) specification for more details.
Supported flows and response types:
* Only Authorization Code Flow with `code` response type
Supported scopes:
* `openid`: Enables OpenID Connect support and returns mandatory `sub` claim
* `profile`: Returns `name` and `preferred_username` claims
* `email`: Returns `email` and `email_verified` claims
* `groups`: Returns non-standard `groups` claim
Supported claims:
* `sub` (string): Decimal encoded numeric (Unix) user id
* `name` (string): Display name
* `preferred_username`(string): Loginname
* `email` (string): Service-specific or primary email address
* `email_verified` (boolean): Verification status of `email` value (always `true`)
* `groups` (array of strings): Names of groups the user is a member of (non-standard)
uffd supports the optional `claims` authorization request parameter for requesting claims individually.
Note that there is a IANA-registered `groups` claim with a syntax borrowed from [SCIM](https://www.rfc-editor.org/rfc/rfc7643.html).
The syntax used by uffd is different and incompatible, although arguably more common for a claim named "groups" in this context.
uffd aims for complience with OpenID provider conformance profiles Basic and Config.
It is, however, not a certified OpenID provider and it has the following limitations:
* Only the `none` value for the `prompt` authorization request parameter is recognized. Other values (`login`, `consent` and `select_account`) are ignored.
* The `max_age` authorization request parameter is not supported and ignored by uffd.
* The `auth_time` claim is not supported and neither returned if the `max_age` authorization request parameter is present nor if it is requested via the `claims` parameter.
* Requesting the `sub` claim with a specific value for the ID Token (or passing the `id_token_hint` authorization request parameter) is only supported if the `prompt` authorization request parameter is set to `none`. The authorization request is rejected otherwise.
## Metrics
Uffd can export metrics in a prometheus compatible way. It needs python3-prometheus-client for this feature to work.
......
No preview for this file type
......@@ -23,7 +23,8 @@ Depends:
python3-flask-migrate,
python3-qrcode,
python3-fido2,
python3-oauthlib,
python3-jwt,
python3-cryptography,
python3-flask-babel,
python3-argon2,
python3-itsdangerous,
......@@ -33,4 +34,5 @@ Recommends:
nginx,
python3-mysqldb,
python3-prometheus-client,
python3-ua-parser,
Description: Web-based user management and single sign-on software
......@@ -36,13 +36,15 @@ setup(
'Flask-SQLAlchemy==2.1',
'qrcode==6.1',
'fido2==0.5.0',
'oauthlib==2.1.0',
'cryptography==2.6.1',
'pyjwt==1.7.0',
'Flask-Migrate==2.1.1',
'Flask-Babel==0.11.2',
'alembic==1.0.0',
'argon2-cffi==18.3.0',
'itsdangerous==0.24',
'prometheus-client==0.9',
'ua-parser==0.8.0',
# The main dependencies on their own lead to version collisions and pip is
# not very good at resolving them, so we pin the versions from Debian Buster
......@@ -52,11 +54,9 @@ setup(
'cffi # v1.12.2 no longer works with python3.9. Newer versions seem to work fine.',
'chardet==3.0.4',
'click==7.0',
'cryptography==2.6.1',
'idna==2.6',
'Jinja2==2.10',
'MarkupSafe==1.1.0',
'oauthlib==2.1.0',
'pyasn1==0.4.2',
'pycparser==2.19',
'requests==2.21.0',
......
......@@ -57,6 +57,13 @@ class TestUserCLI(UffdTestCase):
self.assertTrue(user.password.verify('newpassword'))
self.assertEqual(user.roles, Role.query.filter_by(name='admin').all())
self.assertIn(self.get_admin_group(), user.groups)
self.assertFalse(user.is_deactivated)
result = self.app.test_cli_runner().invoke(args=['user', 'create', 'newuser2', '--mail', 'newmail2@example.com', '--deactivate'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='newuser2').first()
self.assertTrue(user.is_deactivated)
def test_update(self):
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'doesnotexist', '--displayname', 'foo'])
......@@ -106,6 +113,16 @@ class TestUserCLI(UffdTestCase):
user = User.query.filter_by(loginname='testuser').first()
self.assertEqual(user.roles, Role.query.filter_by(name='admin').all())
self.assertIn(self.get_admin_group(), user.groups)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--deactivate'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertTrue(user.is_deactivated)
result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--activate'])
self.assertEqual(result.exit_code, 0)
with self.app.test_request_context():
user = User.query.filter_by(loginname='testuser').first()
self.assertFalse(user.is_deactivated)
def test_delete(self):
with self.app.test_request_context():
......
......@@ -13,6 +13,7 @@ from uffd.models import (
Service,
OAuth2Client, OAuth2LogoutURI, OAuth2Grant, OAuth2Token, OAuth2DeviceLoginInitiation,
PasswordToken,
Session,
)
from tests.utils import MigrationTestCase
......@@ -60,9 +61,17 @@ class TestFuzzy(MigrationTestCase):
service = Service(name='testservice', access_group=group)
oauth2_client = OAuth2Client(service=service, client_id='testclient', client_secret='testsecret', redirect_uris=['http://localhost:1234/callback'], logout_uris=[OAuth2LogoutURI(method='GET', uri='http://localhost:1234/callback')])
db.session.add_all([service, oauth2_client])
db.session.add(OAuth2Grant(user=user, client=oauth2_client, code='testcode', redirect_uri='http://example.com/callback', expires=datetime.datetime.now()))
db.session.add(OAuth2Token(user=user, client=oauth2_client, token_type='Bearer', access_token='testcode', refresh_token='testcode', expires=datetime.datetime.now()))
db.session.add(OAuth2DeviceLoginInitiation(client=oauth2_client, confirmations=[DeviceLoginConfirmation(user=user)]))
session = Session(
user=user,
secret='0919de9da3f7dc6c33ab849f44c20e8221b673ca701030de17488f3269fc5469f100e2ce56e5fd71305b23d8ecbb06d80d22004adcd3fefc5f5fcb80a436e31f2c2d9cc8fe8c59ae44871ae4524408d312474570280bf29d3ba145a4bd00010ca758eaa0795b180ec12978b42d13bf4c4f06f72103d44077022ce656610be855',
ip_address='127.0.0.1',
user_agent='Mozilla/5.0 (X11; Linux x86_64; rv:124.0) Gecko/20100101 Firefox/124.0',
mfa_done=True,
)
db.session.add(session)
db.session.add(OAuth2Grant(session=session, client=oauth2_client, _code='testcode', redirect_uri='http://example.com/callback', expires=datetime.datetime.now()))
db.session.add(OAuth2Token(session=session, client=oauth2_client, token_type='Bearer', _access_token='testcode', _refresh_token='testcode', expires=datetime.datetime.now()))
db.session.add(OAuth2DeviceLoginInitiation(client=oauth2_client, confirmations=[DeviceLoginConfirmation(session=session)]))
db.session.add(PasswordToken(user=user))
db.session.commit()
revs = [s.split('_', 1)[0] for s in os.listdir('uffd/migrations/versions') if '_' in s and s.endswith('.py')]
......
......@@ -38,6 +38,9 @@ class TestInviteModel(UffdTestCase):
invite.creator = self.get_admin()
self.assertTrue(invite.permitted)
self.assertTrue(invite.active)
invite.creator.is_deactivated = True
self.assertFalse(invite.permitted)
self.assertFalse(invite.active)
invite.creator = self.get_user()
self.assertFalse(invite.permitted)
self.assertFalse(invite.active)
......
......@@ -40,10 +40,10 @@ class TestMfaMethodModels(UffdTestCase):
db.session.add(method)
db.session.commit()
method_id = method.id
method_code = method.code
method_code = method.code_value
db.session.expunge(method)
method = RecoveryCodeMethod.query.get(method_id)
self.assertFalse(hasattr(method, 'code'))
self.assertFalse(hasattr(method, 'code_value'))
self.assertFalse(method.verify(''))
self.assertFalse(method.verify('A'*8))
self.assertTrue(method.verify(method_code))
......@@ -82,6 +82,15 @@ class TestMfaMethodModels(UffdTestCase):
self.assertTrue(method.verify(_hotp(counter, method.raw_key)))
self.assertFalse(method.verify(_hotp(counter+2, method.raw_key)))
def test_totp_method_verify_reuse(self):
method = TOTPMethod(user=self.get_user())
counter = int(time.time()/30)
self.assertFalse(method.verify(_hotp(counter-2, method.raw_key)))
self.assertTrue(method.verify(_hotp(counter-1, method.raw_key)))
self.assertTrue(method.verify(_hotp(counter, method.raw_key)))
self.assertFalse(method.verify(_hotp(counter-1, method.raw_key)))
self.assertFalse(method.verify(_hotp(counter, method.raw_key)))
def test_webauthn_method(self):
data = get_fido2_test_cred(self)
method = WebauthnMethod(user=self.get_user(), cred=data, name='testname')
......
import unittest
import datetime
import jwt
from uffd.database import db
from uffd.models import OAuth2Key
from tests.utils import UffdTestCase
TEST_JWK = dict(
id='HvOn74G7njK1GoFNe8Dta087casdWMsm06pNhOXRgJU',
created=datetime.datetime(2023, 11, 9, 0, 21, 10),
active=True,
algorithm='RS256',
private_key_jwk='''{
"kty": "RSA",
"key_ops": ["sign"],
"n": "vrznqUy8Xamph6s0Z02fFMIyjwLAMio35i9DXYjXP1ZQwSZ3SsIh3m2ablMnlu8PVlnYUzoj8rXyAWND0FSfWoQQxv1rq15pllKueddLoJsv321N_NRB8beGsLrsndw8QO0q3RWqV9O3kqhlTMjgj6bquX42wLaXrPLJyfbT3zObBsToG4UxpOyly84aklJXU5wIs0cbmjbfd8Xld38BG8Oh7Ozy5b93vPpJW6rudZRxU6QYC0r9bFFLIHJWrR4bzQMLGoJ63xjPOCl4WNpOYc9B7PNgnWTLXlFd51Hw9CaT2MRWsKNCSU77f6nZkfjWa1IsQdF0I48m46qgq7bEOOl9DbThbCnpblWrctdyg6du-OvCyVmkAo1KGtANl0027pgqUI_9HBMi33y3UPQm1ALHXIyIDBZtExH3lD6MMK3XGJfUxZuIOBndK-PXm5Fed52bgLOcf-24X6aHFn-8oyDVIj9OHkKWjy7jtKdmqZc4pBdVuCaMCYzj8iERWA3H",
"e": "AQAB",
"d": "G7yoH5mLcZTA6ia-byCoN-zpofGvdga9AZnxPO0vsq6K_cY_O2gxuVZ3n6reAKKbuLNGCbb_D_Dffs4q8rprlfkgi3TCLzXX5Zv5HWTD7a4Y7xpxEzQ2sWo-iagVIqZVPh0pyjliqnTyUWnFmWiY0gBe9UHianHjFVZqe8E2HFOKgW3UUbQz0keg8JtJ3T9gzZrM38KWbqhOJO0VVSRAoANPTSnumfRsUCyWywrMtIfgAbQaKazqX3xkOsAF1L-iNfd6slzPvRyIQVflVDMdfKnsu-lHiKJ0DK_lg9f55T5FymgcXsq43EKBQ2H4v2dafIm-vtWx_TRZWj_msD32BEPBA-zTqh_oP1r6a3DZh4DBtWY3vzSiuhAC0erlRs-hRTX_e9ET5fUbJnmNxjnxQD9zZmwq4ujMK6KFnHct8t77Qxj3a-wDR_XyDJ4_EKYqHlcVHfxGNBSvIdjuZJkPJnVpVtfCtpyamQIR4u5oNV7fIwYe_tFnw0Y90rGoJMzB",
"p": "-A-FnH21HJ7GPWUm9k3mxsxSchy89QEUCZZiH6EcB4ZP8wJsxrQsUSIHCR74YmZEI3Ulsum1Ql4x50k7Q2sNh9SnwKvmctjksehGy4yCrdunAqjqyz3wFwGaKWnhn3frkiqH5ATjkOoc8qHz8saa7reeVClj47ZWyy-Nl559ycLMs0rI1N_THzO07C3jSbJhyPj0yeygAflsRqqnNvEQ6ps1VLiqf9G5jfSvUUn5DyKIpep9iGo29caGSIPIy_2h",
"q": "xNe1-QWskxOcY_GiHpFWdvzqr1o9fxg5whgpNcGi3caokw2iNHRYut4cbVvFFBlv_9B5QCl9WVfR2ADG0AtvkvUxEZqCdxEvcqjIANeRLKHDjW5kMuPS0_fcskFP-r7mCM9SBfPplfMVCF5nuNWf5LzNopWfsTChIDD1rSpPjItNYuwLXszm_3R81HHHeQLcyvoMxLCmeLy5TXX2hXOMHh2IMZCXAHopJmLJUVnQ48kr5jd2l0kLbmx3aBqdccJn",
"dp": "MLS7g1KbcRcrzXpDADGjkn0j4wwJfgHMMWW5toQnwMJ6iDh9qzZNTVDlGMFf-9IgpuWllU-WK4XbPpJ-dGpcqcLzfT1DbmFv5g65d9YLAqASVs9b6rQqpBnIb0E-79TYCEcZj4f2NsoBDRMHly-v1BdxmwzVdCylNhgMMS0Jfcgl8T5J2KJqDcJVT9piumGwGYnoZo1zjW-v9uAjHQKQU8BN5Git8ZL4YAsfMVLY-EPLmOhF5bcVO4TTcQGPN56B",
"dq": "HiiSl-G3rB0QE_v8g8Ruw_JCHrWrwGI8zzEWd0cApgv-3fDzzieZRKAtKNArpMW09DPDsAHrU5nx669KxqtJ3_EzIGhU3ttCMsYLRp3Af18VcADe1zEypwlNxf3dvCQtaGIjRgg13KSOr2aPa7FHOyt2MhfMjMBPn3gA3BQkdfsN0z8pCtBIABGf4ojAMBkxLOQcurH5_3uixGxzZcTrTd3mdPmbORZ-YYQ3JgCl0ZCL6kzLHaiyWKvDq66QOtK3",
"qi": "ySqD9cUxbq3wkCsPQId_YfQLIqb5RK_JJIMjtBOdTdo4aT5tmodYCSmjBmhrYXjDWtyJdelvPfdSfgncHJhf8VgkZ8TPvUeaQwsQFBwB5llwpdb72eEEJrmG1SVwNMoFCLXdNT3ACad16cUDMnWmklH0X07OzdxGOBnGhgLZUs4RbPjLH7OpYTyQqVy2L8vofqJR42cfePZw8WQM4k0PPbhralhybExIkSCmaQyYbACZ5k0OVQErEqnj4elglA0h"
}''',
public_key_jwk='''{
"kty": "RSA",
"key_ops": ["verify"],
"n": "vrznqUy8Xamph6s0Z02fFMIyjwLAMio35i9DXYjXP1ZQwSZ3SsIh3m2ablMnlu8PVlnYUzoj8rXyAWND0FSfWoQQxv1rq15pllKueddLoJsv321N_NRB8beGsLrsndw8QO0q3RWqV9O3kqhlTMjgj6bquX42wLaXrPLJyfbT3zObBsToG4UxpOyly84aklJXU5wIs0cbmjbfd8Xld38BG8Oh7Ozy5b93vPpJW6rudZRxU6QYC0r9bFFLIHJWrR4bzQMLGoJ63xjPOCl4WNpOYc9B7PNgnWTLXlFd51Hw9CaT2MRWsKNCSU77f6nZkfjWa1IsQdF0I48m46qgq7bEOOl9DbThbCnpblWrctdyg6du-OvCyVmkAo1KGtANl0027pgqUI_9HBMi33y3UPQm1ALHXIyIDBZtExH3lD6MMK3XGJfUxZuIOBndK-PXm5Fed52bgLOcf-24X6aHFn-8oyDVIj9OHkKWjy7jtKdmqZc4pBdVuCaMCYzj8iERWA3H",
"e": "AQAB"
}''',
)
class TestOAuth2Key(UffdTestCase):
def setUp(self):
super().setUp()
db.session.add(OAuth2Key(**TEST_JWK))
db.session.add(OAuth2Key(
id='1e9gdk7',
created=datetime.datetime(2014, 11, 8, 0, 0, 0),
active=True,
algorithm='RS256',
private_key_jwk='invalid',
public_key_jwk='''{
"kty":"RSA",
"n":"w7Zdfmece8iaB0kiTY8pCtiBtzbptJmP28nSWwtdjRu0f2GFpajvWE4VhfJAjEsOcwYzay7XGN0b-X84BfC8hmCTOj2b2eHT7NsZegFPKRUQzJ9wW8ipn_aDJWMGDuB1XyqT1E7DYqjUCEOD1b4FLpy_xPn6oV_TYOfQ9fZdbE5HGxJUzekuGcOKqOQ8M7wfYHhHHLxGpQVgL0apWuP2gDDOdTtpuld4D2LK1MZK99s9gaSjRHE8JDb1Z4IGhEcEyzkxswVdPndUWzfvWBBWXWxtSUvQGBRkuy1BHOa4sP6FKjWEeeF7gm7UMs2Nm2QUgNZw6xvEDGaLk4KASdIxRQ",
"e":"AQAB"
}'''
))
db.session.commit()
self.key = OAuth2Key.query.get('HvOn74G7njK1GoFNe8Dta087casdWMsm06pNhOXRgJU')
self.key_oidc_spec = OAuth2Key.query.get('1e9gdk7')
def test_private_key(self):
self.key.private_key
def test_public_key(self):
self.key.private_key
def test_public_key_jwks_dict(self):
self.assertEqual(self.key.public_key_jwks_dict, {
"kid": "HvOn74G7njK1GoFNe8Dta087casdWMsm06pNhOXRgJU",
"kty": "RSA",
"alg": "RS256",
"use": "sig",
"n": "vrznqUy8Xamph6s0Z02fFMIyjwLAMio35i9DXYjXP1ZQwSZ3SsIh3m2ablMnlu8PVlnYUzoj8rXyAWND0FSfWoQQxv1rq15pllKueddLoJsv321N_NRB8beGsLrsndw8QO0q3RWqV9O3kqhlTMjgj6bquX42wLaXrPLJyfbT3zObBsToG4UxpOyly84aklJXU5wIs0cbmjbfd8Xld38BG8Oh7Ozy5b93vPpJW6rudZRxU6QYC0r9bFFLIHJWrR4bzQMLGoJ63xjPOCl4WNpOYc9B7PNgnWTLXlFd51Hw9CaT2MRWsKNCSU77f6nZkfjWa1IsQdF0I48m46qgq7bEOOl9DbThbCnpblWrctdyg6du-OvCyVmkAo1KGtANl0027pgqUI_9HBMi33y3UPQm1ALHXIyIDBZtExH3lD6MMK3XGJfUxZuIOBndK-PXm5Fed52bgLOcf-24X6aHFn-8oyDVIj9OHkKWjy7jtKdmqZc4pBdVuCaMCYzj8iERWA3H",
"e": "AQAB"
})
def test_encode_jwt(self):
jwtdata = self.key.encode_jwt({'aud': 'test', 'foo': 'bar'})
self.assertIsInstance(jwtdata, str) # Regression check for #165
self.assertEqual(
jwt.get_unverified_header(jwtdata),
# typ is optional, x5u/x5c/jku/jwk are discoraged by OIDC Core 1.0 spec section 2
{'kid': self.key.id, 'alg': self.key.algorithm, 'typ': 'JWT'}
)
self.assertEqual(
OAuth2Key.decode_jwt(jwtdata, audience='test'),
{'aud': 'test', 'foo': 'bar'}
)
self.key.active = False
with self.assertRaises(jwt.exceptions.InvalidKeyError):
self.key.encode_jwt({'aud': 'test', 'foo': 'bar'})
def test_oidc_hash(self):
# Example from OIDC Core 1.0 spec A.3
self.assertEqual(
self.key.oidc_hash(b'jHkWEdUXMU1BwAsC4vtUsZwnNvTIxEl0z9K3vx5KF0Y'),
'77QmUPtjPfzWtF2AnpK9RQ'
)
# Example from OIDC Core 1.0 spec A.4
self.assertEqual(
self.key.oidc_hash(b'Qcb0Orv1zh30vL1MPRsbm-diHiMwcLyZvn1arpZv-Jxf_11jnpEX3Tgfvk'),
'LDktKdoQak3Pk0cnXxCltA'
)
# Example from OIDC Core 1.0 spec A.6
self.assertEqual(
self.key.oidc_hash(b'jHkWEdUXMU1BwAsC4vtUsZwnNvTIxEl0z9K3vx5KF0Y'),
'77QmUPtjPfzWtF2AnpK9RQ'
)
self.assertEqual(
self.key.oidc_hash(b'Qcb0Orv1zh30vL1MPRsbm-diHiMwcLyZvn1arpZv-Jxf_11jnpEX3Tgfvk'),
'LDktKdoQak3Pk0cnXxCltA'
)
def test_decode_jwt(self):
# Example from OIDC Core 1.0 spec A.2
jwt_data = (
'eyJraWQiOiIxZTlnZGs3IiwiYWxnIjoiUlMyNTYifQ.ewogImlz'
'cyI6ICJodHRwOi8vc2VydmVyLmV4YW1wbGUuY29tIiwKICJzdWIiOiAiMjQ4'
'Mjg5NzYxMDAxIiwKICJhdWQiOiAiczZCaGRSa3F0MyIsCiAibm9uY2UiOiAi'
'bi0wUzZfV3pBMk1qIiwKICJleHAiOiAxMzExMjgxOTcwLAogImlhdCI6IDEz'
'MTEyODA5NzAsCiAibmFtZSI6ICJKYW5lIERvZSIsCiAiZ2l2ZW5fbmFtZSI6'
'ICJKYW5lIiwKICJmYW1pbHlfbmFtZSI6ICJEb2UiLAogImdlbmRlciI6ICJm'
'ZW1hbGUiLAogImJpcnRoZGF0ZSI6ICIwMDAwLTEwLTMxIiwKICJlbWFpbCI6'
'ICJqYW5lZG9lQGV4YW1wbGUuY29tIiwKICJwaWN0dXJlIjogImh0dHA6Ly9l'
'eGFtcGxlLmNvbS9qYW5lZG9lL21lLmpwZyIKfQ.rHQjEmBqn9Jre0OLykYNn'
'spA10Qql2rvx4FsD00jwlB0Sym4NzpgvPKsDjn_wMkHxcp6CilPcoKrWHcip'
'R2iAjzLvDNAReF97zoJqq880ZD1bwY82JDauCXELVR9O6_B0w3K-E7yM2mac'
'AAgNCUwtik6SjoSUZRcf-O5lygIyLENx882p6MtmwaL1hd6qn5RZOQ0TLrOY'
'u0532g9Exxcm-ChymrB4xLykpDj3lUivJt63eEGGN6DH5K6o33TcxkIjNrCD'
'4XB1CKKumZvCedgHHF3IAK4dVEDSUoGlH9z4pP_eWYNXvqQOjGs-rDaQzUHl'
'6cQQWNiDpWOl_lxXjQEvQ'
)
self.assertEqual(
OAuth2Key.decode_jwt(jwt_data, options={'verify_exp': False, 'verify_aud': False}),
{
"iss": "http://server.example.com",
"sub": "248289761001",
"aud": "s6BhdRkqt3",
"nonce": "n-0S6_WzA2Mj",
"exp": 1311281970,
"iat": 1311280970,
"name": "Jane Doe",
"given_name": "Jane",
"family_name": "Doe",
"gender": "female",
"birthdate": "0000-10-31",
"email": "janedoe@example.com",
"picture": "http://example.com/janedoe/me.jpg"
}
)
with self.assertRaises(jwt.exceptions.InvalidKeyError):
# {"alg":"RS256"} -> no key id
OAuth2Key.decode_jwt('eyJhbGciOiJSUzI1NiJ9.' + jwt_data.split('.', 1)[-1])
with self.assertRaises(jwt.exceptions.InvalidKeyError):
# {"kid":"XXXXX","alg":"RS256"} -> unknown key id
OAuth2Key.decode_jwt('eyJraWQiOiJYWFhYWCIsImFsZyI6IlJTMjU2In0.' + jwt_data.split('.', 1)[-1])
OAuth2Key.query.get('1e9gdk7').active = False
with self.assertRaises(jwt.exceptions.InvalidKeyError):
# not active
OAuth2Key.decode_jwt(jwt_data)
def test_generate_rsa_key(self):
key = OAuth2Key.generate_rsa_key()
self.assertEqual(key.algorithm, 'RS256')
......@@ -57,6 +57,7 @@ class TestUserRoleAttributes(UffdTestCase):
role2.groups[group2].requires_mfa = True
self.assertSetEqual(user.compute_groups(), {group1})
db.session.add(TOTPMethod(user=user))
db.session.commit()
self.assertSetEqual(user.compute_groups(), {group1, group2})
def test_update_groups(self):
......
import unittest
import datetime
from uffd.database import db
from uffd.models.session import Session, USER_AGENT_PARSER_SUPPORTED
from tests.utils import UffdTestCase
class TestSession(UffdTestCase):
def test_expire(self):
self.app.config['SESSION_LIFETIME_SECONDS'] = 100
self.app.config['PERMANENT_SESSION_LIFETIME'] = 10
user = self.get_user()
def make_session(created_age, last_used_age):
return Session(
user=user,
created=datetime.datetime.utcnow() - datetime.timedelta(seconds=created_age),
last_used=datetime.datetime.utcnow() - datetime.timedelta(seconds=last_used_age),
)
session1 = Session(user=user)
self.assertFalse(session1.expired)
session2 = make_session(0, 0)
self.assertFalse(session2.expired)
session3 = make_session(50, 5)
self.assertFalse(session3.expired)
session4 = make_session(50, 15)
self.assertTrue(session4.expired)
session5 = make_session(105, 5)
self.assertTrue(session5.expired)
session6 = make_session(105, 15)
self.assertTrue(session6.expired)
db.session.add_all([session1, session2, session3, session4, session5, session6])
db.session.commit()
self.assertEqual(set(Session.query.filter_by(expired=False).all()), {session1, session2, session3})
self.assertEqual(set(Session.query.filter_by(expired=True).all()), {session4, session5, session6})
def test_useragent_ua_parser(self):
if not USER_AGENT_PARSER_SUPPORTED:
self.skipTest('ua_parser not available')
session = Session(user_agent='Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0')
self.assertEqual(session.user_agent_browser, 'Firefox')
self.assertEqual(session.user_agent_platform, 'Windows')
def test_useragent_no_ua_parser(self):
session = Session(user_agent='Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0')
session.DISABLE_USER_AGENT_PARSER = True
self.assertEqual(session.user_agent_browser, 'Firefox')
self.assertEqual(session.user_agent_platform, 'Windows')
......@@ -109,6 +109,13 @@ class TestAPICheckPassword(UffdTestCase):
self.assertEqual(r.status_code, 200)
self.assertEqual(r.json, None)
def test_deactivated(self):
self.get_user().is_deactivated = True
db.session.commit()
r = self.client.post(path=url_for('api.checkpassword'), data={'loginname': 'testuser', 'password': 'userpassword'}, headers=[basic_auth('test', 'test')])
self.assertEqual(r.status_code, 200)
self.assertEqual(r.json, None)
class TestAPIGetusers(UffdTestCase):
def setUpDB(self):
db.session.add(APIClient(service=Service(name='test'), auth_username='test', auth_password='test', perm_users=True))
......@@ -201,6 +208,23 @@ class TestAPIGetusers(UffdTestCase):
self.assertEqual(r.status_code, 200)
self.assertEqual(r.json, [])
def test_deactivated(self):
self.get_user().is_deactivated = True
db.session.commit()
r = self.client.get(path=url_for('api.getusers'), headers=[basic_auth('test', 'test')], follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertEqual(self.fix_result(r.json), [
{'displayname': 'Test User', 'email': 'test@example.com', 'id': 10000, 'loginname': 'testuser', 'groups': ['uffd_access', 'users']},
{'displayname': 'Test Admin', 'email': 'admin@example.com', 'id': 10001, 'loginname': 'testadmin', 'groups': ['uffd_access', 'uffd_admin', 'users']}
])
Service.query.filter_by(name='test').first().hide_deactivated_users = True
db.session.commit()
r = self.client.get(path=url_for('api.getusers'), headers=[basic_auth('test', 'test')], follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertEqual(self.fix_result(r.json), [
{'displayname': 'Test Admin', 'email': 'admin@example.com', 'id': 10001, 'loginname': 'testadmin', 'groups': ['uffd_access', 'uffd_admin', 'users']}
])
class TestAPIGetgroups(UffdTestCase):
def setUpDB(self):
db.session.add(APIClient(service=Service(name='test'), auth_username='test', auth_password='test', perm_users=True))
......@@ -220,6 +244,26 @@ class TestAPIGetgroups(UffdTestCase):
{'id': 20003, 'members': ['testadmin'], 'name': 'uffd_admin'}
])
def test_all_deactivated_members(self):
self.get_user().is_deactivated = True
db.session.commit()
r = self.client.get(path=url_for('api.getgroups'), headers=[basic_auth('test', 'test')], follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertEqual(self.fix_result(r.json), [
{'id': 20001, 'members': ['testadmin', 'testuser'], 'name': 'users'},
{'id': 20002, 'members': ['testadmin', 'testuser'], 'name': 'uffd_access'},
{'id': 20003, 'members': ['testadmin'], 'name': 'uffd_admin'}
])
Service.query.filter_by(name='test').first().hide_deactivated_users = True
db.session.commit()
r = self.client.get(path=url_for('api.getgroups'), headers=[basic_auth('test', 'test')], follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertEqual(self.fix_result(r.json), [
{'id': 20001, 'members': ['testadmin'], 'name': 'users'},
{'id': 20002, 'members': ['testadmin'], 'name': 'uffd_access'},
{'id': 20003, 'members': ['testadmin'], 'name': 'uffd_admin'}
])
def test_id(self):
r = self.client.get(path=url_for('api.getgroups', id=20002), headers=[basic_auth('test', 'test')], follow_redirects=True)
self.assertEqual(r.status_code, 200)
......@@ -257,6 +301,21 @@ class TestAPIGetgroups(UffdTestCase):
self.assertEqual(r.status_code, 200)
self.assertEqual(r.json, [])
def test_member_deactivated(self):
self.get_user().is_deactivated = True
db.session.commit()
r = self.client.get(path=url_for('api.getgroups', member='testuser'), headers=[basic_auth('test', 'test')], follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertEqual(self.fix_result(r.json), [
{'id': 20001, 'members': ['testadmin', 'testuser'], 'name': 'users'},
{'id': 20002, 'members': ['testadmin', 'testuser'], 'name': 'uffd_access'},
])
Service.query.filter_by(name='test').first().hide_deactivated_users = True
db.session.commit()
r = self.client.get(path=url_for('api.getgroups', member='testuser'), headers=[basic_auth('test', 'test')], follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertEqual(self.fix_result(r.json), [])
class TestAPIRemailerResolve(UffdTestCase):
def setUpDB(self):
db.session.add(APIClient(service=Service(name='test'), auth_username='test', auth_password='test', perm_remailer=True))
......
import time
from flask import url_for, session, request
from uffd.database import db
from uffd.models import Role, RoleGroup, MFAMethod, RecoveryCodeMethod, TOTPMethod, WebauthnMethod
from uffd.models.mfa import _hotp
from tests.utils import dump, UffdTestCase, db_flush
def get_fido2_test_cred(self):
try:
from uffd.fido2_compat import AttestedCredentialData
except ImportError:
self.skipTest('fido2 could not be imported')
# Example public key from webauthn spec 6.5.1.1
return AttestedCredentialData(bytes.fromhex('00000000000000000000000000000000'+'0040'+'053cbcc9d37a61d3bac87cdcc77ee326256def08ab15775d3a720332e4101d14fae95aeee3bc9698781812e143c0597dc6e180595683d501891e9dd030454c0a'+'A501020326200121582065eda5a12577c2bae829437fe338701a10aaa375e1bb5b5de108de439c08551d2258201e52ed75701163f7f9e40ddf9f341b3dc9ba860af7e0ca7ca7e9eecd0084d19c'))
class TestMfaViews(UffdTestCase):
def setUp(self):
super().setUp()
db.session.add(RecoveryCodeMethod(user=self.get_admin()))
db.session.add(TOTPMethod(user=self.get_admin(), name='Admin Phone'))
# We don't want to skip all tests only because fido2 is not installed!
#db.session.add(WebauthnMethod(user=get_testadmin(), cred=get_fido2_test_cred(self), name='Admin FIDO2 dongle'))
db.session.commit()
def add_recovery_codes(self, count=10):
user = self.get_user()
for _ in range(count):
db.session.add(RecoveryCodeMethod(user=user))
db.session.commit()
def add_totp(self):
db.session.add(TOTPMethod(user=self.get_user(), name='My phone'))
db.session.commit()
def add_webauthn(self):
db.session.add(WebauthnMethod(user=self.get_user(), cred=get_fido2_test_cred(self), name='My FIDO2 dongle'))
db.session.commit()
def test_setup_disabled(self):
self.login_as('user')
r = self.client.get(path=url_for('mfa.setup'), follow_redirects=True)
dump('mfa_setup_disabled', r)
self.assertEqual(r.status_code, 200)
def test_setup_recovery_codes(self):
self.login_as('user')
self.add_recovery_codes()
r = self.client.get(path=url_for('mfa.setup'), follow_redirects=True)
dump('mfa_setup_only_recovery_codes', r)
self.assertEqual(r.status_code, 200)
def test_setup_enabled(self):
self.login_as('user')
self.add_recovery_codes()
self.add_totp()
self.add_webauthn()
r = self.client.get(path=url_for('mfa.setup'), follow_redirects=True)
dump('mfa_setup_enabled', r)
self.assertEqual(r.status_code, 200)
def test_setup_few_recovery_codes(self):
self.login_as('user')
self.add_totp()
self.add_recovery_codes(1)
r = self.client.get(path=url_for('mfa.setup'), follow_redirects=True)
dump('mfa_setup_few_recovery_codes', r)
self.assertEqual(r.status_code, 200)
def test_setup_no_recovery_codes(self):
self.login_as('user')
self.add_totp()
r = self.client.get(path=url_for('mfa.setup'), follow_redirects=True)
dump('mfa_setup_no_recovery_codes', r)
self.assertEqual(r.status_code, 200)
def test_disable(self):
baserole = Role(name='baserole', is_default=True)
db.session.add(baserole)
baserole.groups[self.get_access_group()] = RoleGroup()
db.session.commit()
self.login_as('user')
self.add_recovery_codes()
self.add_totp()
admin_methods = len(MFAMethod.query.filter_by(user=self.get_admin()).all())
r = self.client.get(path=url_for('mfa.disable'), follow_redirects=True)
dump('mfa_disable', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('mfa.disable_confirm'), follow_redirects=True)
dump('mfa_disable_submit', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(MFAMethod.query.filter_by(user=request.user).all()), 0)
self.assertEqual(len(MFAMethod.query.filter_by(user=self.get_admin()).all()), admin_methods)
def test_disable_recovery_only(self):
baserole = Role(name='baserole', is_default=True)
db.session.add(baserole)
baserole.groups[self.get_access_group()] = RoleGroup()
db.session.commit()
self.login_as('user')
self.add_recovery_codes()
admin_methods = len(MFAMethod.query.filter_by(user=self.get_admin()).all())
self.assertNotEqual(len(MFAMethod.query.filter_by(user=request.user).all()), 0)
r = self.client.get(path=url_for('mfa.disable'), follow_redirects=True)
dump('mfa_disable_recovery_only', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('mfa.disable_confirm'), follow_redirects=True)
dump('mfa_disable_recovery_only_submit', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(MFAMethod.query.filter_by(user=request.user).all()), 0)
self.assertEqual(len(MFAMethod.query.filter_by(user=self.get_admin()).all()), admin_methods)
def test_admin_disable(self):
for method in MFAMethod.query.filter_by(user=self.get_admin()).all():
if not isinstance(method, RecoveryCodeMethod):
db.session.delete(method)
db.session.commit()
self.add_recovery_codes()
self.add_totp()
self.login_as('admin')
self.assertIsNotNone(request.user)
admin_methods = len(MFAMethod.query.filter_by(user=self.get_admin()).all())
r = self.client.get(path=url_for('mfa.admin_disable', id=self.get_user().id), follow_redirects=True)
dump('mfa_admin_disable', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(MFAMethod.query.filter_by(user=self.get_user()).all()), 0)
self.assertEqual(len(MFAMethod.query.filter_by(user=self.get_admin()).all()), admin_methods)
def test_setup_recovery(self):
self.login_as('user')
self.assertEqual(len(RecoveryCodeMethod.query.filter_by(user=request.user).all()), 0)
r = self.client.post(path=url_for('mfa.setup_recovery'), follow_redirects=True)
dump('mfa_setup_recovery', r)
self.assertEqual(r.status_code, 200)
methods = RecoveryCodeMethod.query.filter_by(user=request.user).all()
self.assertNotEqual(len(methods), 0)
r = self.client.post(path=url_for('mfa.setup_recovery'), follow_redirects=True)
dump('mfa_setup_recovery_reset', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(RecoveryCodeMethod.query.filter_by(id=methods[0].id).all()), 0)
self.assertNotEqual(len(methods), 0)
def test_setup_totp(self):
self.login_as('user')
self.add_recovery_codes()
r = self.client.get(path=url_for('mfa.setup_totp', name='My TOTP Authenticator'), follow_redirects=True)
dump('mfa_setup_totp', r)
self.assertEqual(r.status_code, 200)
self.assertNotEqual(len(session.get('mfa_totp_key', '')), 0)
def test_setup_totp_without_recovery(self):
self.login_as('user')
r = self.client.get(path=url_for('mfa.setup_totp', name='My TOTP Authenticator'), follow_redirects=True)
dump('mfa_setup_totp_without_recovery', r)
self.assertEqual(r.status_code, 200)
def test_setup_totp_finish(self):
baserole = Role(name='baserole', is_default=True)
db.session.add(baserole)
baserole.groups[self.get_access_group()] = RoleGroup()
db.session.commit()
self.login_as('user')
self.add_recovery_codes()
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
r = self.client.get(path=url_for('mfa.setup_totp', name='My TOTP Authenticator'), follow_redirects=True)
method = TOTPMethod(request.user, key=session.get('mfa_totp_key', ''))
code = _hotp(int(time.time()/30), method.raw_key)
r = self.client.post(path=url_for('mfa.setup_totp_finish', name='My TOTP Authenticator'), data={'code': code}, follow_redirects=True)
dump('mfa_setup_totp_finish', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 1)
def test_setup_totp_finish_without_recovery(self):
self.login_as('user')
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
r = self.client.get(path=url_for('mfa.setup_totp', name='My TOTP Authenticator'), follow_redirects=True)
method = TOTPMethod(request.user, key=session.get('mfa_totp_key', ''))
code = _hotp(int(time.time()/30), method.raw_key)
r = self.client.post(path=url_for('mfa.setup_totp_finish', name='My TOTP Authenticator'), data={'code': code}, follow_redirects=True)
dump('mfa_setup_totp_finish_without_recovery', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
def test_setup_totp_finish_wrong_code(self):
self.login_as('user')
self.add_recovery_codes()
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
r = self.client.get(path=url_for('mfa.setup_totp', name='My TOTP Authenticator'), follow_redirects=True)
method = TOTPMethod(request.user, key=session.get('mfa_totp_key', ''))
code = _hotp(int(time.time()/30), method.raw_key)
code = str(int(code[0])+1)[-1] + code[1:]
r = self.client.post(path=url_for('mfa.setup_totp_finish', name='My TOTP Authenticator'), data={'code': code}, follow_redirects=True)
dump('mfa_setup_totp_finish_wrong_code', r)
self.assertEqual(r.status_code, 200)
db_flush()
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
def test_setup_totp_finish_empty_code(self):
self.login_as('user')
self.add_recovery_codes()
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
r = self.client.get(path=url_for('mfa.setup_totp', name='My TOTP Authenticator'), follow_redirects=True)
r = self.client.post(path=url_for('mfa.setup_totp_finish', name='My TOTP Authenticator'), data={'code': ''}, follow_redirects=True)
dump('mfa_setup_totp_finish_empty_code', r)
self.assertEqual(r.status_code, 200)
db_flush()
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
def test_delete_totp(self):
baserole = Role(name='baserole', is_default=True)
db.session.add(baserole)
baserole.groups[self.get_access_group()] = RoleGroup()
db.session.commit()
self.login_as('user')
self.add_recovery_codes()
self.add_totp()
method = TOTPMethod(request.user, name='test')
db.session.add(method)
db.session.commit()
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 2)
r = self.client.get(path=url_for('mfa.delete_totp', id=method.id), follow_redirects=True)
dump('mfa_delete_totp', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(TOTPMethod.query.filter_by(id=method.id).all()), 0)
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 1)
# TODO: webauthn setup tests
def test_auth_integration(self):
self.add_recovery_codes()
self.add_totp()
db.session.commit()
self.assertIsNone(request.user)
r = self.login_as('user')
dump('mfa_auth_redirected', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'/mfa/auth', r.data)
self.assertIsNone(request.user)
r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False)
dump('mfa_auth', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
def test_auth_disabled(self):
self.assertIsNone(request.user)
self.login_as('user')
r = self.client.get(path=url_for('mfa.auth', ref='/redirecttarget'), follow_redirects=False)
self.assertEqual(r.status_code, 302)
self.assertTrue(r.location.endswith('/redirecttarget'))
self.assertIsNotNone(request.user)
def test_auth_recovery_only(self):
self.add_recovery_codes()
self.assertIsNone(request.user)
self.login_as('user')
r = self.client.get(path=url_for('mfa.auth', ref='/redirecttarget'), follow_redirects=False)
self.assertEqual(r.status_code, 302)
self.assertTrue(r.location.endswith('/redirecttarget'))
self.assertIsNotNone(request.user)
def test_auth_recovery_code(self):
self.add_recovery_codes()
self.add_totp()
method = RecoveryCodeMethod(user=self.get_user())
db.session.add(method)
db.session.commit()
method_id = method.id
self.login_as('user')
r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False)
dump('mfa_auth_recovery_code', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
r = self.client.post(path=url_for('mfa.auth_finish', ref='/redirecttarget'), data={'code': method.code})
self.assertEqual(r.status_code, 302)
self.assertTrue(r.location.endswith('/redirecttarget'))
self.assertIsNotNone(request.user)
self.assertEqual(len(RecoveryCodeMethod.query.filter_by(id=method_id).all()), 0)
def test_auth_totp_code(self):
self.add_recovery_codes()
self.add_totp()
method = TOTPMethod(user=self.get_user(), name='testname')
raw_key = method.raw_key
db.session.add(method)
db.session.commit()
self.login_as('user')
r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False)
dump('mfa_auth_totp_code', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
code = _hotp(int(time.time()/30), raw_key)
r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': code}, follow_redirects=True)
dump('mfa_auth_totp_code_submit', r)
self.assertEqual(r.status_code, 200)
self.assertIsNotNone(request.user)
def test_auth_empty_code(self):
self.add_recovery_codes()
self.add_totp()
self.login_as('user')
r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': ''}, follow_redirects=True)
dump('mfa_auth_empty_code', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
def test_auth_invalid_code(self):
self.add_recovery_codes()
self.add_totp()
method = TOTPMethod(user=self.get_user(), name='testname')
raw_key = method.raw_key
db.session.add(method)
db.session.commit()
self.login_as('user')
r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
code = _hotp(int(time.time()/30), raw_key)
code = str(int(code[0])+1)[-1] + code[1:]
r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': code}, follow_redirects=True)
dump('mfa_auth_invalid_code', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
def test_auth_ratelimit(self):
self.add_recovery_codes()
self.add_totp()
method = TOTPMethod(user=self.get_user(), name='testname')
raw_key = method.raw_key
db.session.add(method)
db.session.commit()
self.login_as('user')
self.assertIsNone(request.user)
code = _hotp(int(time.time()/30), raw_key)
inv_code = str(int(code[0])+1)[-1] + code[1:]
for i in range(20):
r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': inv_code}, follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': code}, follow_redirects=True)
dump('mfa_auth_ratelimit', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
# TODO: webauthn auth tests
This diff is collapsed.
import datetime
import re
import time
from flask import url_for, request
from flask import url_for, request, session
from uffd.database import db
from uffd.models import PasswordToken, UserEmail, Role, RoleGroup, Service, ServiceUser, FeatureFlag
from uffd.models import PasswordToken, UserEmail, Role, RoleGroup, Service, ServiceUser, FeatureFlag, MFAMethod, RecoveryCodeMethod, TOTPMethod, WebauthnMethod
from uffd.models.mfa import _hotp
from tests.utils import dump, UffdTestCase
from tests.utils import dump, UffdTestCase, db_flush
class TestSelfservice(UffdTestCase):
def test_index(self):
......@@ -21,25 +23,23 @@ class TestSelfservice(UffdTestCase):
def test_update_displayname(self):
self.login_as('user')
user = request.user
r = self.client.post(path=url_for('selfservice.update_profile'),
data={'displayname': 'New Display Name'},
follow_redirects=True)
dump('update_displayname', r)
self.assertEqual(r.status_code, 200)
_user = request.user
self.assertEqual(_user.displayname, 'New Display Name')
user = self.get_user()
self.assertEqual(user.displayname, 'New Display Name')
def test_update_displayname_invalid(self):
self.login_as('user')
user = request.user
r = self.client.post(path=url_for('selfservice.update_profile'),
data={'displayname': ''},
follow_redirects=True)
dump('update_displayname_invalid', r)
self.assertEqual(r.status_code, 200)
_user = request.user
self.assertNotEqual(_user.displayname, '')
user = self.get_user()
self.assertNotEqual(user.displayname, '')
def test_add_email(self):
self.login_as('user')
......@@ -52,7 +52,7 @@ class TestSelfservice(UffdTestCase):
m = re.search(r'/email/([0-9]+)/verify/(.*)', str(self.app.last_mail.get_content()))
email_id, secret = m.groups()
email = UserEmail.query.get(email_id)
self.assertEqual(email.user, request.user)
self.assertEqual(email.user.id, request.user.id)
self.assertEqual(email.address, 'new@example.com')
self.assertFalse(email.verified)
self.assertFalse(email.verification_expired)
......@@ -164,7 +164,7 @@ class TestSelfservice(UffdTestCase):
m = re.search(r'/email/([0-9]+)/verify/(.*)', str(self.app.last_mail.get_content()))
email_id, secret = m.groups()
email = UserEmail.query.get(email_id)
self.assertEqual(email.user, request.user)
self.assertEqual(email.user.id, request.user.id)
self.assertEqual(email.address, 'new@example.com')
self.assertFalse(email.verified)
self.assertFalse(email.verification_expired)
......@@ -245,17 +245,20 @@ class TestSelfservice(UffdTestCase):
r = self.client.post(path=url_for('selfservice.update_email_preferences'),
data={'primary_email': str(email_id), 'recovery_email': 'primary'},
follow_redirects=True)
self.assertEqual(self.get_user().primary_email.address, 'test@example.com')
with self.app.test_request_context():
self.assertEqual(self.get_user().primary_email.address, 'test@example.com')
with self.assertRaises(Exception):
r = self.client.post(path=url_for('selfservice.update_email_preferences'),
data={'primary_email': str(old_email_id), 'recovery_email': str(email_id)},
follow_redirects=True)
self.assertIsNone(self.get_user().recovery_email)
with self.app.test_request_context():
self.assertIsNone(self.get_user().recovery_email)
with self.assertRaises(Exception):
r = self.client.post(path=url_for('selfservice.update_email_preferences'),
data={'primary_email': str(old_email_id), 'recovery_email': 'primary', f'service_{service_id}_email': str(email_id)},
follow_redirects=True)
self.assertIsNone(ServiceUser.query.get((service_id, user_id)).service_email)
with self.app.test_request_context():
self.assertIsNone(ServiceUser.query.get((service_id, user_id)).service_email)
def test_update_email_preferences_invalid(self):
self.login_as('user')
......@@ -284,52 +287,47 @@ class TestSelfservice(UffdTestCase):
def test_change_password(self):
self.login_as('user')
user = request.user
r = self.client.post(path=url_for('selfservice.change_password'),
data={'password1': 'newpassword', 'password2': 'newpassword'},
follow_redirects=True)
dump('change_password', r)
self.assertEqual(r.status_code, 200)
_user = request.user
self.assertTrue(_user.password.verify('newpassword'))
self.assertTrue(self.get_user().password.verify('newpassword'))
def test_change_password_invalid(self):
self.login_as('user')
user = request.user
r = self.client.post(path=url_for('selfservice.change_password'),
data={'password1': 'shortpw', 'password2': 'shortpw'},
follow_redirects=True)
dump('change_password_invalid', r)
self.assertEqual(r.status_code, 200)
_user = request.user
self.assertFalse(_user.password.verify('shortpw'))
self.assertTrue(_user.password.verify('userpassword'))
user = self.get_user()
self.assertFalse(user.password.verify('shortpw'))
self.assertTrue(user.password.verify('userpassword'))
# Regression test for #100 (login not possible if password contains character disallowed by SASLprep)
def test_change_password_samlprep_invalid(self):
self.login_as('user')
user = request.user
r = self.client.post(path=url_for('selfservice.change_password'),
data={'password1': 'shortpw\n', 'password2': 'shortpw\n'},
follow_redirects=True)
dump('change_password_samlprep_invalid', r)
self.assertEqual(r.status_code, 200)
_user = request.user
self.assertFalse(_user.password.verify('shortpw\n'))
self.assertTrue(_user.password.verify('userpassword'))
user = self.get_user()
self.assertFalse(user.password.verify('shortpw\n'))
self.assertTrue(user.password.verify('userpassword'))
def test_change_password_mismatch(self):
self.login_as('user')
user = request.user
r = self.client.post(path=url_for('selfservice.change_password'),
data={'password1': 'newpassword1', 'password2': 'newpassword2'},
follow_redirects=True)
dump('change_password_mismatch', r)
self.assertEqual(r.status_code, 200)
_user = request.user
self.assertFalse(_user.password.verify('newpassword1'))
self.assertFalse(_user.password.verify('newpassword2'))
self.assertTrue(_user.password.verify('userpassword'))
user = self.get_user()
self.assertFalse(user.password.verify('newpassword1'))
self.assertFalse(user.password.verify('newpassword2'))
self.assertTrue(user.password.verify('userpassword'))
def test_leave_role(self):
baserole = Role(name='baserole', is_default=True)
......@@ -396,6 +394,19 @@ class TestSelfservice(UffdTestCase):
self.assertFalse(hasattr(self.app, 'last_mail'))
self.assertEqual(len(PasswordToken.query.all()), 0)
def test_forgot_password_wrong_user(self):
user = self.get_user()
r = self.client.get(path=url_for('selfservice.forgot_password'))
self.assertEqual(r.status_code, 200)
user = self.get_user()
user.is_deactivated = True
db.session.commit()
r = self.client.post(path=url_for('selfservice.forgot_password'),
data={'loginname': user.loginname, 'mail': user.primary_email.address}, follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertFalse(hasattr(self.app, 'last_mail'))
self.assertEqual(len(PasswordToken.query.all()), 0)
def test_token_password(self):
user = self.get_user()
token = PasswordToken(user=user)
......@@ -469,3 +480,207 @@ class TestSelfservice(UffdTestCase):
dump('token_password_different_passwords_submit', r)
self.assertEqual(r.status_code, 200)
self.assertTrue(self.get_user().password.verify('userpassword'))
def get_fido2_test_cred(self):
try:
from uffd.fido2_compat import AttestedCredentialData
except ImportError:
self.skipTest('fido2 could not be imported')
# Example public key from webauthn spec 6.5.1.1
return AttestedCredentialData(bytes.fromhex('00000000000000000000000000000000'+'0040'+'053cbcc9d37a61d3bac87cdcc77ee326256def08ab15775d3a720332e4101d14fae95aeee3bc9698781812e143c0597dc6e180595683d501891e9dd030454c0a'+'A501020326200121582065eda5a12577c2bae829437fe338701a10aaa375e1bb5b5de108de439c08551d2258201e52ed75701163f7f9e40ddf9f341b3dc9ba860af7e0ca7ca7e9eecd0084d19c'))
class TestMfaViews(UffdTestCase):
def setUp(self):
super().setUp()
db.session.add(RecoveryCodeMethod(user=self.get_admin()))
db.session.add(TOTPMethod(user=self.get_admin(), name='Admin Phone'))
# We don't want to skip all tests only because fido2 is not installed!
#db.session.add(WebauthnMethod(user=get_testadmin(), cred=get_fido2_test_cred(self), name='Admin FIDO2 dongle'))
db.session.commit()
def add_recovery_codes(self, count=10):
user = self.get_user()
for _ in range(count):
db.session.add(RecoveryCodeMethod(user=user))
db.session.commit()
def add_totp(self):
db.session.add(TOTPMethod(user=self.get_user(), name='My phone'))
db.session.commit()
def add_webauthn(self):
db.session.add(WebauthnMethod(user=self.get_user(), cred=get_fido2_test_cred(self), name='My FIDO2 dongle'))
db.session.commit()
def test_setup_disabled(self):
self.login_as('user')
r = self.client.get(path=url_for('selfservice.setup_mfa'), follow_redirects=True)
dump('mfa_setup_disabled', r)
self.assertEqual(r.status_code, 200)
def test_setup_recovery_codes(self):
self.login_as('user')
self.add_recovery_codes()
r = self.client.get(path=url_for('selfservice.setup_mfa'), follow_redirects=True)
dump('mfa_setup_only_recovery_codes', r)
self.assertEqual(r.status_code, 200)
def test_setup_enabled(self):
self.login_as('user')
self.add_recovery_codes()
self.add_totp()
self.add_webauthn()
r = self.client.get(path=url_for('selfservice.setup_mfa'), follow_redirects=True)
dump('mfa_setup_enabled', r)
self.assertEqual(r.status_code, 200)
def test_setup_few_recovery_codes(self):
self.login_as('user')
self.add_totp()
self.add_recovery_codes(1)
r = self.client.get(path=url_for('selfservice.setup_mfa'), follow_redirects=True)
dump('mfa_setup_few_recovery_codes', r)
self.assertEqual(r.status_code, 200)
def test_setup_no_recovery_codes(self):
self.login_as('user')
self.add_totp()
r = self.client.get(path=url_for('selfservice.setup_mfa'), follow_redirects=True)
dump('mfa_setup_no_recovery_codes', r)
self.assertEqual(r.status_code, 200)
def test_disable(self):
baserole = Role(name='baserole', is_default=True)
db.session.add(baserole)
baserole.groups[self.get_access_group()] = RoleGroup()
db.session.commit()
self.login_as('user')
self.add_recovery_codes()
self.add_totp()
admin_methods = len(MFAMethod.query.filter_by(user=self.get_admin()).all())
r = self.client.get(path=url_for('selfservice.disable_mfa'), follow_redirects=True)
dump('mfa_disable', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('selfservice.disable_mfa_confirm'), follow_redirects=True)
dump('mfa_disable_submit', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(MFAMethod.query.filter_by(user=request.user).all()), 0)
self.assertEqual(len(MFAMethod.query.filter_by(user=self.get_admin()).all()), admin_methods)
def test_disable_recovery_only(self):
baserole = Role(name='baserole', is_default=True)
db.session.add(baserole)
baserole.groups[self.get_access_group()] = RoleGroup()
db.session.commit()
self.login_as('user')
self.add_recovery_codes()
admin_methods = len(MFAMethod.query.filter_by(user=self.get_admin()).all())
self.assertNotEqual(len(MFAMethod.query.filter_by(user=request.user).all()), 0)
r = self.client.get(path=url_for('selfservice.disable_mfa'), follow_redirects=True)
dump('mfa_disable_recovery_only', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('selfservice.disable_mfa_confirm'), follow_redirects=True)
dump('mfa_disable_recovery_only_submit', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(MFAMethod.query.filter_by(user=request.user).all()), 0)
self.assertEqual(len(MFAMethod.query.filter_by(user=self.get_admin()).all()), admin_methods)
def test_setup_recovery(self):
self.login_as('user')
self.assertEqual(len(RecoveryCodeMethod.query.filter_by(user=request.user).all()), 0)
r = self.client.post(path=url_for('selfservice.setup_mfa_recovery'), follow_redirects=True)
dump('mfa_setup_recovery', r)
self.assertEqual(r.status_code, 200)
methods = RecoveryCodeMethod.query.filter_by(user=request.user).all()
self.assertNotEqual(len(methods), 0)
r = self.client.post(path=url_for('selfservice.setup_mfa_recovery'), follow_redirects=True)
dump('mfa_setup_recovery_reset', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(RecoveryCodeMethod.query.filter_by(id=methods[0].id).all()), 0)
self.assertNotEqual(len(methods), 0)
def test_setup_totp(self):
self.login_as('user')
self.add_recovery_codes()
r = self.client.get(path=url_for('selfservice.setup_mfa_totp', name='My TOTP Authenticator'), follow_redirects=True)
dump('mfa_setup_totp', r)
self.assertEqual(r.status_code, 200)
self.assertNotEqual(len(session.get('mfa_totp_key', '')), 0)
def test_setup_totp_without_recovery(self):
self.login_as('user')
r = self.client.get(path=url_for('selfservice.setup_mfa_totp', name='My TOTP Authenticator'), follow_redirects=True)
dump('mfa_setup_totp_without_recovery', r)
self.assertEqual(r.status_code, 200)
def test_setup_totp_finish(self):
baserole = Role(name='baserole', is_default=True)
db.session.add(baserole)
baserole.groups[self.get_access_group()] = RoleGroup()
db.session.commit()
self.login_as('user')
self.add_recovery_codes()
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
r = self.client.get(path=url_for('selfservice.setup_mfa_totp', name='My TOTP Authenticator'), follow_redirects=True)
method = TOTPMethod(request.user, key=session.get('mfa_totp_key', ''))
code = _hotp(int(time.time()/30), method.raw_key)
r = self.client.post(path=url_for('selfservice.setup_mfa_totp_finish', name='My TOTP Authenticator'), data={'code': code}, follow_redirects=True)
dump('mfa_setup_totp_finish', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 1)
def test_setup_totp_finish_without_recovery(self):
self.login_as('user')
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
r = self.client.get(path=url_for('selfservice.setup_mfa_totp', name='My TOTP Authenticator'), follow_redirects=True)
method = TOTPMethod(request.user, key=session.get('mfa_totp_key', ''))
code = _hotp(int(time.time()/30), method.raw_key)
r = self.client.post(path=url_for('selfservice.setup_mfa_totp_finish', name='My TOTP Authenticator'), data={'code': code}, follow_redirects=True)
dump('mfa_setup_totp_finish_without_recovery', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
def test_setup_totp_finish_wrong_code(self):
self.login_as('user')
self.add_recovery_codes()
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
r = self.client.get(path=url_for('selfservice.setup_mfa_totp', name='My TOTP Authenticator'), follow_redirects=True)
method = TOTPMethod(request.user, key=session.get('mfa_totp_key', ''))
code = _hotp(int(time.time()/30), method.raw_key)
code = str(int(code[0])+1)[-1] + code[1:]
r = self.client.post(path=url_for('selfservice.setup_mfa_totp_finish', name='My TOTP Authenticator'), data={'code': code}, follow_redirects=True)
dump('mfa_setup_totp_finish_wrong_code', r)
self.assertEqual(r.status_code, 200)
db_flush()
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
def test_setup_totp_finish_empty_code(self):
self.login_as('user')
self.add_recovery_codes()
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
r = self.client.get(path=url_for('selfservice.setup_mfa_totp', name='My TOTP Authenticator'), follow_redirects=True)
r = self.client.post(path=url_for('selfservice.setup_mfa_totp_finish', name='My TOTP Authenticator'), data={'code': ''}, follow_redirects=True)
dump('mfa_setup_totp_finish_empty_code', r)
self.assertEqual(r.status_code, 200)
db_flush()
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 0)
def test_delete_totp(self):
baserole = Role(name='baserole', is_default=True)
db.session.add(baserole)
baserole.groups[self.get_access_group()] = RoleGroup()
db.session.commit()
self.login_as('user')
self.add_recovery_codes()
self.add_totp()
method = TOTPMethod(request.user, name='test')
db.session.add(method)
db.session.commit()
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 2)
r = self.client.get(path=url_for('selfservice.delete_mfa_totp', id=method.id), follow_redirects=True)
dump('mfa_delete_totp', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(TOTPMethod.query.filter_by(id=method.id).all()), 0)
self.assertEqual(len(TOTPMethod.query.filter_by(user=request.user).all()), 1)
# TODO: webauthn setup tests
......@@ -172,6 +172,7 @@ class TestServiceAdminViews(UffdTestCase):
self.assertEqual(service.access_group, None)
self.assertEqual(service.remailer_mode, RemailerMode.DISABLED)
self.assertEqual(service.enable_email_preferences, False)
self.assertEqual(service.hide_deactivated_users, False)
def test_edit_access_all(self):
self.login_as('admin')
......@@ -209,6 +210,24 @@ class TestServiceAdminViews(UffdTestCase):
self.assertEqual(service.limit_access, True)
self.assertEqual(service.access_group, self.get_users_group())
def test_edit_hide_deactivated_users(self):
self.login_as('admin')
r = self.client.post(
path=url_for('service.edit_submit', id=self.service_id),
follow_redirects=True,
data={
'name': 'test1',
'access-group': '',
'remailer-mode': 'DISABLED',
'remailer-overwrite-mode': 'ENABLED_V2',
'remailer-overwrite-users': '',
'hide_deactivated_users': '1',
},
)
self.assertEqual(r.status_code, 200)
service = Service.query.get(self.service_id)
self.assertEqual(service.hide_deactivated_users, True)
def test_edit_email_preferences(self):
self.login_as('admin')
r = self.client.post(
......
......@@ -5,7 +5,8 @@ from flask import url_for, request
from uffd.database import db
from uffd.password_hash import PlaintextPasswordHash
from uffd.models import DeviceLoginConfirmation, Service, OAuth2Client, OAuth2DeviceLoginInitiation, User
from uffd.models import DeviceLoginConfirmation, Service, OAuth2Client, OAuth2DeviceLoginInitiation, User, RecoveryCodeMethod, TOTPMethod
from uffd.models.mfa import _hotp
from uffd.views.session import login_required
from tests.utils import dump, UffdTestCase, db_flush
......@@ -17,7 +18,7 @@ class TestSession(UffdTestCase):
@self.app.route('/test_login_required')
@login_required()
def test_login_required():
return 'SUCCESS', 200
return 'SUCCESS ' + request.user.loginname, 200
@self.app.route('/test_group_required1')
@login_required(lambda: request.user.is_in_group('users'))
......@@ -38,15 +39,10 @@ class TestSession(UffdTestCase):
self.assertIsNotNone(request.user)
def assertLoggedIn(self):
self.assertIsNotNone(request.user)
self.assertEqual(self.client.get(path=url_for('test_login_required'), follow_redirects=True).data, b'SUCCESS')
self.assertEqual(request.user.loginname, self.get_user().loginname)
self.assertEqual(self.client.get(path=url_for('test_login_required'), follow_redirects=True).data, b'SUCCESS testuser')
def assertLoggedOut(self):
self.assertIsNone(request.user)
self.assertNotEqual(self.client.get(path=url_for('test_login_required'),
follow_redirects=True).data, b'SUCCESS')
self.assertEqual(request.user, None)
self.assertNotIn(b'SUCCESS', self.client.get(path=url_for('test_login_required'), follow_redirects=True).data)
def test_login(self):
self.assertLoggedOut()
......@@ -78,7 +74,7 @@ class TestSession(UffdTestCase):
def test_redirect(self):
r = self.login_as('user', ref=url_for('test_login_required'))
self.assertEqual(r.status_code, 200)
self.assertEqual(r.data, b'SUCCESS')
self.assertEqual(r.data, b'SUCCESS testuser')
def test_wrong_password(self):
r = self.client.post(path=url_for('session.login'),
......@@ -125,6 +121,20 @@ class TestSession(UffdTestCase):
self.assertEqual(r.status_code, 200)
self.assertLoggedOut()
def test_deactivated(self):
self.get_user().is_deactivated = True
db.session.commit()
r = self.login_as('user')
dump('login_deactivated', r)
self.assertEqual(r.status_code, 200)
self.assertLoggedOut()
def test_deactivated_after_login(self):
self.login_as('user')
self.get_user().is_deactivated = True
db.session.commit()
self.assertLoggedOut()
def test_group_required(self):
self.login()
self.assertEqual(self.client.get(path=url_for('test_group_required1'),
......@@ -139,7 +149,6 @@ class TestSession(UffdTestCase):
self.assertEqual(r.status_code, 200)
self.assertLoggedOut()
@unittest.skip('See #29')
def test_timeout(self):
self.login()
time.sleep(3)
......@@ -174,8 +183,162 @@ class TestSession(UffdTestCase):
self.assertEqual(r.status_code, 200)
initiation = OAuth2DeviceLoginInitiation.query.filter_by(code=code).one()
self.assertEqual(len(initiation.confirmations), 1)
self.assertEqual(initiation.confirmations[0].user.loginname, 'testuser')
self.assertEqual(initiation.confirmations[0].session.user.loginname, 'testuser')
self.assertIn(initiation.confirmations[0].code.encode(), r.data)
r = self.client.get(path=url_for('session.deviceauth_finish'), follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertEqual(DeviceLoginConfirmation.query.all(), [])
class TestMfaViews(UffdTestCase):
def add_recovery_codes(self, count=10):
user = self.get_user()
for _ in range(count):
db.session.add(RecoveryCodeMethod(user=user))
db.session.commit()
def add_totp(self):
db.session.add(TOTPMethod(user=self.get_user(), name='My phone'))
db.session.commit()
def test_auth_integration(self):
self.add_recovery_codes()
self.add_totp()
db.session.commit()
self.assertIsNone(request.user)
r = self.login_as('user')
dump('mfa_auth_redirected', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'/mfa/auth', r.data)
self.assertIsNone(request.user)
r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False)
dump('mfa_auth', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
def test_auth_disabled(self):
self.assertIsNone(request.user)
self.login_as('user')
r = self.client.get(path=url_for('session.mfa_auth', ref='/redirecttarget'), follow_redirects=False)
self.assertEqual(r.status_code, 302)
self.assertTrue(r.location.endswith('/redirecttarget'))
self.assertIsNotNone(request.user)
def test_auth_recovery_only(self):
self.add_recovery_codes()
self.assertIsNone(request.user)
self.login_as('user')
r = self.client.get(path=url_for('session.mfa_auth', ref='/redirecttarget'), follow_redirects=False)
self.assertEqual(r.status_code, 302)
self.assertTrue(r.location.endswith('/redirecttarget'))
self.assertIsNotNone(request.user)
def test_auth_recovery_code(self):
self.add_recovery_codes()
self.add_totp()
method = RecoveryCodeMethod(user=self.get_user())
db.session.add(method)
db.session.commit()
method_id = method.id
self.login_as('user')
r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False)
dump('mfa_auth_recovery_code', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
r = self.client.post(path=url_for('session.mfa_auth_finish', ref='/redirecttarget'), data={'code': method.code_value})
self.assertEqual(r.status_code, 302)
self.assertTrue(r.location.endswith('/redirecttarget'))
self.assertIsNotNone(request.user)
self.assertEqual(len(RecoveryCodeMethod.query.filter_by(id=method_id).all()), 0)
def test_auth_totp_code(self):
self.add_recovery_codes()
self.add_totp()
method = TOTPMethod(user=self.get_user(), name='testname')
raw_key = method.raw_key
db.session.add(method)
db.session.commit()
self.login_as('user')
r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False)
dump('mfa_auth_totp_code', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
code = _hotp(int(time.time()/30), raw_key)
r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': code}, follow_redirects=True)
dump('mfa_auth_totp_code_submit', r)
self.assertEqual(r.status_code, 200)
self.assertIsNotNone(request.user)
def test_auth_totp_code_reuse(self):
self.add_recovery_codes()
self.add_totp()
method = TOTPMethod(user=self.get_user(), name='testname')
raw_key = method.raw_key
db.session.add(method)
db.session.commit()
self.login_as('user')
r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
code = _hotp(int(time.time()/30), raw_key)
r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': code}, follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertIsNotNone(request.user)
self.login_as('user')
r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': code}, follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
def test_auth_empty_code(self):
self.add_recovery_codes()
self.add_totp()
self.login_as('user')
r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': ''}, follow_redirects=True)
dump('mfa_auth_empty_code', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
def test_auth_invalid_code(self):
self.add_recovery_codes()
self.add_totp()
method = TOTPMethod(user=self.get_user(), name='testname')
raw_key = method.raw_key
db.session.add(method)
db.session.commit()
self.login_as('user')
r = self.client.get(path=url_for('session.mfa_auth'), follow_redirects=False)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
code = _hotp(int(time.time()/30), raw_key)
code = str(int(code[0])+1)[-1] + code[1:]
r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': code}, follow_redirects=True)
dump('mfa_auth_invalid_code', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
def test_auth_ratelimit(self):
self.add_recovery_codes()
self.add_totp()
method = TOTPMethod(user=self.get_user(), name='testname')
raw_key = method.raw_key
db.session.add(method)
db.session.commit()
self.login_as('user')
self.assertIsNone(request.user)
code = _hotp(int(time.time()/30), raw_key)
inv_code = str(int(code[0])+1)[-1] + code[1:]
for i in range(20):
r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': inv_code}, follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
r = self.client.post(path=url_for('session.mfa_auth_finish'), data={'code': code}, follow_redirects=True)
dump('mfa_auth_ratelimit', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
# TODO: webauthn auth tests
......@@ -3,8 +3,7 @@ import datetime
from flask import url_for, request
from uffd.database import db
from uffd.models import Signup, Role, RoleGroup, FeatureFlag
from uffd.views.session import login_get_user
from uffd.models import User, Signup, Role, RoleGroup, FeatureFlag
from tests.utils import dump, UffdTestCase, db_flush
......@@ -161,13 +160,13 @@ class TestSignupViews(UffdTestCase):
signup = Signup(loginname='newuser', displayname='New User', mail='new@example.com', password='notsecret')
signup = refetch_signup(signup)
self.assertFalse(signup.completed)
self.assertIsNone(login_get_user('newuser', 'notsecret'))
self.assertIsNone(User.query.filter_by(loginname='newuser').one_or_none())
r = self.client.get(path=url_for('signup.signup_confirm', signup_id=signup.id, token=signup.token), follow_redirects=True)
dump('test_signup_confirm', r)
self.assertEqual(r.status_code, 200)
signup = refetch_signup(signup)
self.assertFalse(signup.completed)
self.assertIsNone(login_get_user('newuser', 'notsecret'))
self.assertIsNone(User.query.filter_by(loginname='newuser').one_or_none())
r = self.client.post(path=url_for('signup.signup_confirm_submit', signup_id=signup.id, token=signup.token), follow_redirects=True, data={'password': 'notsecret'})
dump('test_signup_confirm_submit', r)
self.assertEqual(r.status_code, 200)
......@@ -176,7 +175,7 @@ class TestSignupViews(UffdTestCase):
self.assertEqual(signup.user.loginname, 'newuser')
self.assertEqual(signup.user.displayname, 'New User')
self.assertEqual(signup.user.primary_email.address, 'new@example.com')
self.assertIsNotNone(login_get_user('newuser', 'notsecret'))
self.assertTrue(User.query.filter_by(loginname='newuser').one_or_none().password.verify('notsecret'))
def test_confirm_loggedin(self):
baserole = Role(name='baserole', is_default=True)
......