Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • ldap_user_conn_test
  • master
2 results

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Select Git revision
  • Dockerfile
  • feature_invite_validuntil_minmax
  • incremental-sync
  • jwt_encode_inconsistencies
  • master
  • redis-rate-limits
  • roles-recursive-cte
  • typehints
  • v1.0.x
  • v1.1.x
  • v1.2.x
  • v1.x.x
  • v0.1.2
  • v0.1.4
  • v0.1.5
  • v0.2.0
  • v0.3.0
  • v1.0.0
  • v1.0.1
  • v1.0.2
  • v1.1.0
  • v1.1.1
  • v1.1.2
  • v1.2.0
  • v2.0.0
  • v2.0.1
  • v2.1.0
  • v2.2.0
  • v2.3.0
  • v2.3.1
30 results
Show changes
Commits on Source (26)
......@@ -61,6 +61,15 @@ hook-pre-app = exec:FLASK_APP=uffd flask db upgrade
tabs.
## Bind with service account or as user?
Uffd can use a dedicated service account for LDAP operations by setting `LDAP_SERVICE_BIND_DN`.
Or it uses the credentials of the currently logged in user, by not setting `LDAP_SERVICE_BIND_DN`.
If you choose to run with user credentials, some features are not available, like password resets
or self signup, since in both cases, no user credentials can exist.
## OAuth2 Single-Sign-On Provider
Other services can use uffd as an OAuth2.0-based authentication provider.
......
Subproject commit 34cdfca289eb4bcb79c4580135ba971c7a2437d4
Subproject commit 7a232d305fda3e261b6f8d3c0958a16f4c2e8d8b
{
"entries": [
{
"dn": "uid=testuser,ou=users,dc=example,dc=com",
"dn": "uid=testuser,ou=users,dc=example,dc=com",
"raw": {
"cn": [
"Test User"
......
......@@ -3,18 +3,20 @@ import unittest
from flask import url_for
from uffd.ldap import ldap
from uffd import user
# These imports are required, because otherwise we get circular imports?!
from uffd import ldap, user
from uffd.session.views import get_current_user
from uffd.selfservice.models import MailToken, PasswordToken
from uffd.user.models import User
from uffd import create_app, db, ldap
from uffd import create_app, db
from utils import dump, UffdTestCase
def get_ldap_password():
return User.query.get('uid=testuser,ou=users,dc=example,dc=com').pwhash
def get_user():
return User.query.get('uid=testuser,ou=users,dc=example,dc=com')
class TestSelfservice(UffdTestCase):
def login(self):
......@@ -137,6 +139,8 @@ class TestSelfservice(UffdTestCase):
self.assertEqual(len(tokens), 0)
def test_forgot_password(self):
if self.use_userconnection:
self.skipTest('Password Reset is not possible in user mode')
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
r = self.client.get(path=url_for('selfservice.forgot_password'))
dump('forgot_password', r)
......@@ -150,6 +154,8 @@ class TestSelfservice(UffdTestCase):
self.assertIn(token.token, str(self.app.last_mail.get_content()))
def test_forgot_password_wrong_user(self):
if self.use_userconnection:
self.skipTest('Password Reset is not possible in user mode')
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
r = self.client.get(path=url_for('selfservice.forgot_password'))
self.assertEqual(r.status_code, 200)
......@@ -161,6 +167,8 @@ class TestSelfservice(UffdTestCase):
self.assertEqual(len(PasswordToken.query.all()), 0)
def test_forgot_password_wrong_email(self):
if self.use_userconnection:
self.skipTest('Password Reset is not possible in user mode')
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
r = self.client.get(path=url_for('selfservice.forgot_password'), follow_redirects=True)
self.assertEqual(r.status_code, 200)
......@@ -173,6 +181,8 @@ class TestSelfservice(UffdTestCase):
# Regression test for #31
def test_forgot_password_invalid_user(self):
if self.use_userconnection:
self.skipTest('Password Reset is not possible in user mode')
r = self.client.post(path=url_for('selfservice.forgot_password'),
data={'loginname': '=', 'mail': 'test@example.com'}, follow_redirects=True)
dump('forgot_password_submit_invalid_user', r)
......@@ -181,8 +191,9 @@ class TestSelfservice(UffdTestCase):
self.assertEqual(len(PasswordToken.query.all()), 0)
def test_token_password(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
oldpw = get_ldap_password()
if self.use_userconnection:
self.skipTest('Password Token is not possible in user mode')
user = get_user()
token = PasswordToken(loginname=user.loginname)
db.session.add(token)
db.session.commit()
......@@ -193,13 +204,12 @@ class TestSelfservice(UffdTestCase):
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_submit', r)
self.assertEqual(r.status_code, 200)
self.assertNotEqual(oldpw, get_ldap_password())
# TODO: Verify that the new password is actually correct
self.assertEqual(len(PasswordToken.query.all()), 0)
self.assertTrue(ldap.test_user_bind(user.dn, 'newpassword'))
def test_token_password_emptydb(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
oldpw = get_ldap_password()
if self.use_userconnection:
self.skipTest('Password Token is not possible in user mode')
user = get_user()
r = self.client.get(path=url_for('selfservice.token_password', token='A'*128), follow_redirects=True)
dump('token_password_emptydb', r)
self.assertEqual(r.status_code, 200)
......@@ -209,11 +219,12 @@ class TestSelfservice(UffdTestCase):
dump('token_password_emptydb_submit', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'Token expired, please try again', r.data)
self.assertEqual(oldpw, get_ldap_password())
self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword'))
def test_token_password_invalid(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
oldpw = get_ldap_password()
if self.use_userconnection:
self.skipTest('Password Token is not possible in user mode')
user = get_user()
token = PasswordToken(loginname=user.loginname)
db.session.add(token)
db.session.commit()
......@@ -226,11 +237,12 @@ class TestSelfservice(UffdTestCase):
dump('token_password_invalid_submit', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'Token expired, please try again', r.data)
self.assertEqual(oldpw, get_ldap_password())
self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword'))
def test_token_password_expired(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
oldpw = get_ldap_password()
if self.use_userconnection:
self.skipTest('Password Token is not possible in user mode')
user = get_user()
token = PasswordToken(loginname=user.loginname,
created=(datetime.datetime.now() - datetime.timedelta(days=10)))
db.session.add(token)
......@@ -244,11 +256,12 @@ class TestSelfservice(UffdTestCase):
dump('token_password_invalid_expired_submit', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'Token expired, please try again', r.data)
self.assertEqual(oldpw, get_ldap_password())
self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword'))
def test_token_password_different_passwords(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
oldpw = get_ldap_password()
if self.use_userconnection:
self.skipTest('Password Token is not possible in user mode')
user = get_user()
token = PasswordToken(loginname=user.loginname)
db.session.add(token)
db.session.commit()
......@@ -258,7 +271,12 @@ class TestSelfservice(UffdTestCase):
data={'password1': 'newpassword', 'password2': 'differentpassword'}, follow_redirects=True)
dump('token_password_different_passwords_submit', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(oldpw, get_ldap_password())
self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword'))
class TestSelfserviceOL(TestSelfservice):
use_openldap = True
class TestSelfserviceOLUser(TestSelfserviceOL):
use_userconnection = True
......@@ -135,3 +135,6 @@ class TestSession(UffdTestCase):
class TestSessionOL(TestSession):
use_openldap = True
class TestSessionOLUser(TestSessionOL):
use_userconnection = True
import datetime
import time
import unittest
from flask import url_for, session
......@@ -7,20 +6,17 @@ from flask import url_for, session
# These imports are required, because otherwise we get circular imports?!
from uffd import ldap, user
from uffd.session.views import get_current_user
from uffd.user.models import User
from uffd.role.models import Role
from uffd.session.views import get_current_user, is_valid_session
from uffd.mfa.models import MFAMethod, MFAType, RecoveryCodeMethod, TOTPMethod, WebauthnMethod, _hotp
from uffd import create_app, db
from utils import dump, UffdTestCase
def get_user():
return User.query.get('uid=testuser,ou=users,dc=example,dc=com')
def get_user_password():
return get_user().pwhash
def get_admin():
return User.query.get('uid=testadmin,ou=users,dc=example,dc=com')
......@@ -52,6 +48,14 @@ class TestUserModel(UffdTestCase):
class TestUserModelOL(TestUserModel):
use_openldap = True
class TestUserModelOLUser(TestUserModelOL):
use_userconnection = True
def setUp(self):
super().setUp()
self.client.post(path=url_for('session.login'),
data={'loginname': 'testadmin', 'password': 'adminpassword'}, follow_redirects=True)
class TestUserViews(UffdTestCase):
def setUp(self):
super().setUp()
......@@ -90,7 +94,8 @@ class TestUserViews(UffdTestCase):
role1 = Role(name='role1')
print('test_new', role1.db_members, role1.members, user.roles)
self.assertEqual(roles, ['base', 'role1'])
# TODO: check password hash
# TODO: confirm Mail is send, login not yet possible
#self.assertTrue(ldap.test_user_bind(user.dn, 'newpassword'))
def test_new_invalid_loginname(self):
r = self.client.post(path=url_for('user.update'),
......@@ -134,7 +139,6 @@ class TestUserViews(UffdTestCase):
role2.members.add(user)
db.session.commit()
role1_id = role1.id
oldpw = get_user_password()
r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True)
dump('user_update', r)
self.assertEqual(r.status_code, 200)
......@@ -149,12 +153,11 @@ class TestUserViews(UffdTestCase):
self.assertEqual(_user.mail, 'newuser@example.com')
self.assertEqual(_user.uid, user.uid)
self.assertEqual(_user.loginname, user.loginname)
self.assertEqual(get_user_password(), oldpw)
self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword'))
self.assertEqual(roles, ['base', 'role1'])
def test_update_password(self):
user = get_user()
oldpw = get_user_password()
r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', uid=user.uid),
......@@ -167,12 +170,12 @@ class TestUserViews(UffdTestCase):
self.assertEqual(_user.mail, 'newuser@example.com')
self.assertEqual(_user.uid, user.uid)
self.assertEqual(_user.loginname, user.loginname)
self.assertNotEqual(get_user_password(), oldpw)
self.assertTrue(ldap.test_user_bind(_user.dn, 'newpassword'))
@unittest.skip('See #28')
def test_update_invalid_password(self):
user = get_user()
oldpw = get_user_password()
r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', uid=user.uid),
......@@ -181,14 +184,13 @@ class TestUserViews(UffdTestCase):
dump('user_update_password', r)
self.assertEqual(r.status_code, 200)
_user = get_user()
self.assertEqual(get_user_password(), oldpw)
self.assertFalse(ldap.test_user_bind(_user.dn, 'A'))
self.assertEqual(_user.displayname, user.displayname)
self.assertEqual(_user.mail, user.mail)
self.assertEqual(_user.loginname, user.loginname)
def test_update_empty_email(self):
user = get_user()
oldpw = get_user_password()
r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', uid=user.uid),
......@@ -200,11 +202,10 @@ class TestUserViews(UffdTestCase):
self.assertEqual(_user.displayname, user.displayname)
self.assertEqual(_user.mail, user.mail)
self.assertEqual(_user.loginname, user.loginname)
self.assertEqual(get_user_password(), oldpw)
self.assertFalse(ldap.test_user_bind(_user.dn, 'newpassword'))
def test_update_invalid_display_name(self):
user = get_user()
oldpw = get_user_password()
r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', uid=user.uid),
......@@ -216,7 +217,7 @@ class TestUserViews(UffdTestCase):
self.assertEqual(_user.displayname, user.displayname)
self.assertEqual(_user.mail, user.mail)
self.assertEqual(_user.loginname, user.loginname)
self.assertEqual(get_user_password(), oldpw)
self.assertFalse(ldap.test_user_bind(_user.dn, 'newpassword'))
def test_show(self):
r = self.client.get(path=url_for('user.show', uid=get_user().uid), follow_redirects=True)
......@@ -258,77 +259,158 @@ newuser12,newuser12@example.com,{role1.id};{role1.id}
dump('user_csvimport', r)
self.assertEqual(r.status_code, 200)
user = User.query.get('uid=newuser1,ou=users,dc=example,dc=com')
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser1')
self.assertEqual(user.displayname, 'newuser1')
self.assertEqual(user.mail, 'newuser1@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['base'])
user = User.query.get('uid=newuser2,ou=users,dc=example,dc=com')
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser2')
self.assertEqual(user.displayname, 'newuser2')
self.assertEqual(user.mail, 'newuser2@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['base', 'role1'])
user = User.query.get('uid=newuser3,ou=users,dc=example,dc=com')
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser3')
self.assertEqual(user.displayname, 'newuser3')
self.assertEqual(user.mail, 'newuser3@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['base', 'role1', 'role2'])
user = User.query.get('uid=newuser4,ou=users,dc=example,dc=com')
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser4')
self.assertEqual(user.displayname, 'newuser4')
self.assertEqual(user.mail, 'newuser4@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['base'])
user = User.query.get('uid=newuser5,ou=users,dc=example,dc=com')
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser5')
self.assertEqual(user.displayname, 'newuser5')
self.assertEqual(user.mail, 'newuser5@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['base'])
user = User.query.get('uid=newuser6,ou=users,dc=example,dc=com')
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser6')
self.assertEqual(user.displayname, 'newuser6')
self.assertEqual(user.mail, 'newuser6@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['base', 'role1', 'role2'])
self.assertIsNone(User.query.get('uid=newuser7,ou=users,dc=example,dc=com'))
self.assertIsNone(User.query.get('uid=newuser8,ou=users,dc=example,dc=com'))
self.assertIsNone(User.query.get('uid=newuser9,ou=users,dc=example,dc=com'))
user = User.query.get('uid=newuser10,ou=users,dc=example,dc=com')
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser10')
self.assertEqual(user.displayname, 'newuser10')
self.assertEqual(user.mail, 'newuser10@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['base'])
user = User.query.get('uid=newuser11,ou=users,dc=example,dc=com')
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser11')
self.assertEqual(user.displayname, 'newuser11')
self.assertEqual(user.mail, 'newuser11@example.com')
# Currently the csv import is not very robust, imho newuser11 should have role1 and role2!
roles = sorted([r.name for r in user.roles])
#self.assertEqual(roles, ['base', 'role1', 'role2'])
self.assertEqual(roles, ['base', 'role2'])
user = User.query.get('uid=newuser12,ou=users,dc=example,dc=com')
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser12')
self.assertEqual(user.displayname, 'newuser12')
self.assertEqual(user.mail, 'newuser12@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['base', 'role1'])
class TestUserViewsOL(TestUserViews):
use_openldap = True
class TestUserViewsOLUserAsAdmin(TestUserViewsOL):
use_userconnection = True
class TestUserViewsOLUserAsUser(UffdTestCase):
use_userconnection = True
use_openldap = True
def setUp(self):
super().setUp()
self.client.post(path=url_for('session.login'),
data={'loginname': 'testuser', 'password': 'userpassword'}, follow_redirects=True)
def test_view_own(self):
user_ = get_user()
r = self.client.get(path=url_for('user.show', uid=user_.uid), follow_redirects=True)
dump('user_view_own', r)
self.assertEqual(r.status_code, 200)
def test_view_others(self):
admin = get_admin()
r = self.client.get(path=url_for('user.show', uid=admin.uid), follow_redirects=True)
dump('user_view_others', r)
self.assertEqual(r.status_code, 200)
def test_view_index(self):
r = self.client.get(path=url_for('user.index'), follow_redirects=True)
dump('user_index', r)
self.assertEqual(r.status_code, 200)
def test_update_other_user(self):
user_ = get_admin()
db.session.add(Role(name='base'))
role1 = Role(name='role1')
db.session.add(role1)
role2 = Role(name='role2')
db.session.add(role2)
role2.members.add(user_)
db.session.commit()
role1_id = role1.id
r = self.client.get(path=url_for('user.show', uid=user_.uid), follow_redirects=True)
dump('user_update', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', uid=user_.uid),
data={'loginname': user_.loginname, 'mail': user_.mail, 'displayname': user_.displayname + "12345",
f'role-{role1_id}': '1', 'password': ''}, follow_redirects=True)
dump('user_update_submit', r)
self.assertEqual(r.status_code, 200)
_user = get_admin()
self.assertEqual(_user.displayname, user_.displayname)
self.assertEqual(_user.mail, user_.mail)
self.assertEqual(_user.uid, user_.uid)
self.assertEqual(_user.loginname, user_.loginname)
def test_new(self):
db.session.add(Role(name='base'))
role1 = Role(name='role1')
db.session.add(role1)
role2 = Role(name='role2')
db.session.add(role2)
db.session.commit()
role1_id = role1.id
r = self.client.get(path=url_for('user.show'), follow_redirects=True)
dump('user_new', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(User.query.get('uid=newuser,ou=users,dc=example,dc=com'))
r = self.client.post(path=url_for('user.update'),
data={'loginname': 'newuser', 'mail': 'newuser@example.com', 'displayname': 'New User',
f'role-{role1_id}': '1', 'password': 'newpassword'}, follow_redirects=True)
dump('user_new_submit', r)
self.assertEqual(r.status_code, 200)
user = User.query.get('uid=newuser,ou=users,dc=example,dc=com')
self.assertIsNone(user)
def test_delete(self):
user = get_admin()
r = self.client.get(path=url_for('user.delete', uid=user.uid), follow_redirects=True)
dump('user_delete', r)
self.assertEqual(r.status_code, 200)
self.assertIsNotNone(get_admin())
class TestGroupViews(UffdTestCase):
def setUp(self):
super().setUp()
......@@ -347,3 +429,6 @@ class TestGroupViews(UffdTestCase):
class TestGroupViewsOL(TestGroupViews):
use_openldap = True
class TestGroupViewsOLUser(TestGroupViewsOL):
use_userconnection = True
......@@ -15,7 +15,7 @@ def dump(basename, resp):
return
os.makedirs(root, exist_ok=True)
path = os.path.join(root, basename+suffix)
with open(path, 'xb') as f:
with open(path, 'wb') as f:
f.write(resp.data)
def db_flush():
......@@ -25,6 +25,7 @@ def db_flush():
class UffdTestCase(unittest.TestCase):
use_openldap = False
use_userconnection = False
def setUp(self):
self.dir = tempfile.mkdtemp()
......@@ -43,7 +44,10 @@ class UffdTestCase(unittest.TestCase):
self.skipTest('OPENLDAP_TESTING not set')
config['LDAP_SERVICE_MOCK'] = False
config['LDAP_SERVICE_URL'] = 'ldap://localhost'
config['LDAP_SERVICE_BIND_DN'] = 'cn=uffd,ou=system,dc=example,dc=com'
if self.use_userconnection:
config['LDAP_SERVICE_BIND_DN'] = None
else:
config['LDAP_SERVICE_BIND_DN'] = 'cn=uffd,ou=system,dc=example,dc=com'
config['LDAP_SERVICE_BIND_PASSWORD'] = 'uffd-ldap-password'
os.system("ldapdelete -c -D 'cn=uffd,ou=system,dc=example,dc=com' -w 'uffd-ldap-password' -H 'ldap://localhost' -f ldap_server_entries_cleanup.ldif > /dev/null 2>&1")
os.system("ldapadd -c -D 'cn=uffd,ou=system,dc=example,dc=com' -w 'uffd-ldap-password' -H 'ldap://localhost' -f ldap_server_entries_add.ldif")
......
......@@ -2,7 +2,7 @@ import os
import secrets
import sys
from flask import Flask, redirect, url_for
from flask import Flask, redirect, url_for, request
from werkzeug.routing import IntegerConverter
from werkzeug.serving import make_ssl_devcert
from werkzeug.contrib.profiler import ProfilerMiddleware
......@@ -12,7 +12,7 @@ sys.path.append('deps/ldapalchemy')
# pylint: disable=wrong-import-position
from uffd.database import db, SQLAlchemyJSON
from uffd.ldap import ldap
import uffd.ldap
from uffd.template_helper import register_template_helper
from uffd.navbar import setup_navbar
# pylint: enable=wrong-import-position
......@@ -48,21 +48,36 @@ def create_app(test_config=None): # pylint: disable=too-many-locals
db.init_app(app)
Migrate(app, db, render_as_batch=True)
# pylint: disable=C0415
from uffd import user, selfservice, role, mail, session, csrf, mfa, oauth2, services, signup, invite
from uffd import user, selfservice, role, mail, session, csrf, mfa, oauth2, services
# pylint: enable=C0415
for i in user.bp + selfservice.bp + role.bp + mail.bp + session.bp + csrf.bp + mfa.bp + oauth2.bp + services.bp + signup.bp + invite.bp:
for i in user.bp + selfservice.bp + role.bp + mail.bp + session.bp + csrf.bp + mfa.bp + oauth2.bp + services.bp:
app.register_blueprint(i)
if app.config['LDAP_SERVICE_BIND_DN'] or app.config.get('LDAP_SERVICE_MOCK', False):
# pylint: disable=C0415
from uffd import signup, invite
# pylint: enable=C0415
for i in signup.bp + invite.bp:
app.register_blueprint(i)
else:
app.config['ENABLE_PASSWORDRESET'] = False
@app.shell_context_processor
def push_request_context(): #pylint: disable=unused-variable
app.test_request_context().push() # LDAP ORM requires request context
return {'db': db, 'ldap': ldap}
return {'db': db, 'ldap': uffd.ldap.ldap}
@app.route("/")
def index(): #pylint: disable=unused-variable
return redirect(url_for('selfservice.index'))
@app.teardown_request
def close_connection(exception): #pylint: disable=unused-variable,unused-argument
if hasattr(request, "ldap_connection"):
request.ldap_connection.unbind()
@app.cli.command("gendevcert", help='Generates a self-signed TLS certificate for development')
def gendevcert(): #pylint: disable=unused-variable
if os.path.exists('devcert.crt') or os.path.exists('devcert.key'):
......
......@@ -37,6 +37,8 @@ LDAP_MAIL_UID_ATTRIBUTE="uid"
LDAP_MAIL_RECEIVERS_ATTRIBUTE="mailacceptinggeneralid"
LDAP_MAIL_DESTINATIONS_ATTRIBUTE="maildrop"
# If you do not set the service bind_dn, connections use the user credentials.
# When using a user connection, some features are not available, since they require a service connection
LDAP_SERVICE_BIND_DN=""
LDAP_SERVICE_BIND_PASSWORD=""
LDAP_SERVICE_URL="ldapi:///"
......@@ -60,6 +62,8 @@ MAIL_FROM_ADDRESS='foo@bar.com'
# Do not enable this on a public service! There is no spam protection implemented at the moment.
SELF_SIGNUP=False
# PASSWORDRESET is not available when not using a service connection
ENABLE_PASSWORDRESET=True
LOGINNAME_BLACKLIST=['^admin$', '^root$']
......
......@@ -15,6 +15,7 @@ from uffd.navbar import register_navbar
from uffd.ratelimit import host_ratelimit, format_delay
from uffd.signup.views import signup_ratelimit
bp = Blueprint('invite', __name__, template_folder='templates', url_prefix='/invite/')
def invite_acl():
......
from flask import current_app, request, abort
import base64
import hashlib
from flask import current_app, request, abort, session
import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPPasswordIsMandatoryError, LDAPInvalidDnError
from ldapalchemy import LDAPMapper, LDAPCommitError # pylint: disable=unused-import
from ldapalchemy.model import Query
from ldapalchemy.core import encode_filter
def check_hashed(password_hash, password):
'''Return if password matches a LDAP-compatible password hash
:param password_hash: LDAP-compatible password hash (plain password or "{ssha512}...")
:type password_hash: bytes
:param password: Plain, (ideally) utf8-encoded password
:type password: bytes'''
algorithms = {
b'md5': 'MD5',
b'sha': 'SHA1',
b'sha256': 'SHA256',
b'sha384': 'SHA384',
b'sha512': 'SHA512'
}
if not password_hash.startswith(b'{'):
return password_hash == password
algorithm, data = password_hash[1:].split(b'}', 1)
data = base64.b64decode(data)
if algorithm in algorithms:
ctx = hashlib.new(algorithms[algorithm], password)
return data == ctx.digest()
if algorithm.startswith(b's') and algorithm[1:] in algorithms:
ctx = hashlib.new(algorithms[algorithm[1:]], password)
salt = data[ctx.digest_size:]
ctx.update(salt)
return data == ctx.digest() + salt
raise NotImplementedError()
class FlaskQuery(Query):
def get_or_404(self, dn):
......@@ -18,9 +53,54 @@ class FlaskQuery(Query):
abort(404)
return res
def test_user_bind(bind_dn, bind_pw):
try:
if current_app.config.get('LDAP_SERVICE_MOCK', False):
# Since we reuse the same conn and ldap3's mock only supports plain
# passwords for bind and rebind, we simulate the bind by retrieving
# and checking the password hash ourselves.
conn = ldap.get_connection()
conn.search(bind_dn, search_filter='(objectclass=*)', search_scope=ldap3.BASE,
attributes=ldap3.ALL_ATTRIBUTES)
if not conn.response:
return False
if not conn.response[0]['attributes'].get('userPassword'):
return False
return check_hashed(conn.response[0]['attributes']['userPassword'][0], bind_pw.encode())
server = ldap3.Server(current_app.config["LDAP_SERVICE_URL"])
conn = connect_and_bind_to_ldap(server, bind_dn, bind_pw)
if not conn:
return False
except (LDAPBindError, LDAPPasswordIsMandatoryError, LDAPInvalidDnError):
return False
conn.search(conn.user, encode_filter(current_app.config["LDAP_USER_SEARCH_FILTER"]))
lazy_entries = conn.entries
# Do not end the connection when using mock, as it will be reused afterwards
if not current_app.config.get('LDAP_SERVICE_MOCK', False):
conn.unbind()
return len(lazy_entries) == 1
def connect_and_bind_to_ldap(server, bind_dn, bind_pw):
# Using auto_bind cannot close the connection, so define the connection with extra steps
_connection = ldap3.Connection(server, bind_dn, bind_pw)
if _connection.closed:
_connection.open(read_server_info=False)
if current_app.config["LDAP_SERVICE_USE_STARTTLS"]:
_connection.start_tls(read_server_info=False)
if not _connection.bind(read_server_info=True):
_connection.unbind()
raise LDAPBindError
return _connection
class FlaskLDAPMapper(LDAPMapper):
def __init__(self):
super().__init__()
class Model(self.Model):
query_class = FlaskQuery
......@@ -48,9 +128,20 @@ class FlaskLDAPMapper(LDAPMapper):
current_app.ldap_mock.bind()
return current_app.ldap_mock
server = ldap3.Server(current_app.config["LDAP_SERVICE_URL"], get_info=ldap3.ALL)
auto_bind = ldap3.AUTO_BIND_TLS_BEFORE_BIND if current_app.config["LDAP_SERVICE_USE_STARTTLS"] else True
request.ldap_connection = ldap3.Connection(server, current_app.config["LDAP_SERVICE_BIND_DN"],
current_app.config["LDAP_SERVICE_BIND_PASSWORD"], auto_bind=auto_bind)
# If the configured LDAP service bind_dn is empty, connect to LDAP as a user
if current_app.config['LDAP_SERVICE_BIND_DN']:
bind_dn = current_app.config["LDAP_SERVICE_BIND_DN"]
bind_pw = current_app.config["LDAP_SERVICE_BIND_PASSWORD"]
else:
bind_dn = session['user_dn']
bind_pw = session['user_pw']
try:
request.ldap_connection = connect_and_bind_to_ldap(server, bind_dn, bind_pw)
except (LDAPBindError, LDAPPasswordIsMandatoryError):
return None
return request.ldap_connection
ldap = FlaskLDAPMapper()
# pylint: disable=invalid-name
navbarList = []
# pylint: enable=invalid-name
def setup_navbar(app):
app.jinja_env.globals['getnavbar'] = lambda: [n for n in navbarList if n['visible']()]
app.navbarList = []
app.jinja_env.globals['getnavbar'] = lambda: [n for n in app.navbarList if n['visible']()]
# iconlib can be 'bootstrap'
# ( see: http://getbootstrap.com/components/#glyphicons )
......@@ -12,22 +9,37 @@ def setup_navbar(app):
# visible is a function that returns "True" if this icon should be visible in the calling context
def register_navbar(name, iconlib='fa', icon=None, group=None, endpoint=None, blueprint=None, visible=None):
def wrapper(func):
urlendpoint = endpoint
if not endpoint:
# pylint: disable=protected-access
if blueprint:
urlendpoint = "{}.{}".format(blueprint.name, func.__name__)
else:
urlendpoint = func.__name_
def deferred_call(state):
urlendpoint = endpoint
if not endpoint:
# pylint: disable=protected-access
if blueprint:
urlendpoint = "{}.{}".format(blueprint.name, func.__name__)
else:
urlendpoint = func.__name_
# pylint: enable=protected-access
item = {}
item['iconlib'] = iconlib
item['icon'] = icon
item['group'] = group
item['endpoint'] = urlendpoint
item['name'] = name
item['blueprint'] = blueprint
item['visible'] = visible or (lambda: True)
navbarList.append(item)
item = {}
item['iconlib'] = iconlib
item['icon'] = icon
item['group'] = group
item['endpoint'] = urlendpoint
item['name'] = name
item['blueprint'] = blueprint
item['visible'] = visible or (lambda: True)
state.app.navbarList.append(item)
if blueprint:
blueprint.record_once(deferred_call)
else:
class StateMock:
def __init__(self, app):
self.app = app
# pylint: disable=C0415
from flask import current_app
# pylint: enable=C0415
deferred_call(StateMock(current_app))
return func
return wrapper
......@@ -4,7 +4,7 @@ import smtplib
from email.message import EmailMessage
import email.utils
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session
from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect
......@@ -29,6 +29,7 @@ def index():
@csrf_protect(blueprint=bp)
@login_required()
def update():
password_changed = False
user = get_current_user()
if request.values['displayname'] != user.displayname:
if user.set_displayname(request.values['displayname']):
......@@ -41,12 +42,16 @@ def update():
else:
if user.set_password(request.values['password1']):
flash('Password changed.')
password_changed = True
else:
flash('Password could not be set.')
if request.values['mail'] != user.mail:
send_mail_verification(user.loginname, request.values['mail'])
flash('We sent you an email, please verify your mail address.')
ldap.session.commit()
# When using a user_connection, update the connection on password-change
if password_changed and not current_app.config['LDAP_SERVICE_BIND_DN']:
session['user_pw'] = request.values['password1']
return redirect(url_for('selfservice.index'))
@bp.route("/passwordreset", methods=(['GET', 'POST']))
......
......@@ -25,7 +25,9 @@
{% if config['SELF_SIGNUP'] %}
<a href="{{ url_for("signup.signup_start") }}" class="float-left">Register</a>
{% endif %}
{% if config['ENABLE_PASSWORDRESET'] %}
<a href="{{ url_for("selfservice.forgot_password") }}" class="float-right">Forgot Password?</a>
{% endif %}
</div>
</div>
</div>
......
......@@ -4,12 +4,8 @@ import functools
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session, abort
import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPPasswordIsMandatoryError
from ldapalchemy.core import encode_filter
from uffd.user.models import User
from uffd.ldap import ldap
from uffd.ldap import ldap, test_user_bind, LDAPInvalidDnError
from uffd.ratelimit import Ratelimit, host_ratelimit, format_delay
bp = Blueprint("session", __name__, template_folder='templates', url_prefix='/')
......@@ -18,29 +14,27 @@ login_ratelimit = Ratelimit('login', 1*60, 3)
def login_get_user(loginname, password):
dn = User(loginname=loginname).dn
if current_app.config.get('LDAP_SERVICE_MOCK', False):
conn = ldap.get_connection()
# Since we reuse the same conn for all calls to `user_conn()` we
# simulate the password check by rebinding. Note that ldap3's mocking
# implementation just compares the string in the objects's userPassword
# field with the password, no support for hashing or OpenLDAP-style
# password-prefixes ("{PLAIN}..." or "{ssha512}...").
try:
if not conn.rebind(dn, password):
return None
except (LDAPBindError, LDAPPasswordIsMandatoryError):
# If we use a service connection, test user bind seperately
if current_app.config['LDAP_SERVICE_BIND_DN'] or current_app.config.get('LDAP_SERVICE_MOCK', False):
if not test_user_bind(dn, password):
return None
# If we use a user connection, just create the connection normally
else:
server = ldap3.Server(current_app.config["LDAP_SERVICE_URL"], get_info=ldap3.ALL)
auto_bind = ldap3.AUTO_BIND_TLS_BEFORE_BIND if current_app.config["LDAP_SERVICE_USE_STARTTLS"] else True
try:
conn = ldap3.Connection(server, dn, password, auto_bind=auto_bind)
except (LDAPBindError, LDAPPasswordIsMandatoryError):
# ldap.get_connection gets the credentials from the session, so set it here initially
session['user_dn'] = dn
session['user_pw'] = password
if not ldap.get_connection():
session.clear()
return None
conn.search(conn.user, encode_filter(current_app.config["LDAP_USER_SEARCH_FILTER"]))
if len(conn.entries) != 1:
return None
return User.query.get(dn)
try:
user = User.query.get(dn)
if user:
return user
except LDAPInvalidDnError:
pass
return None
@bp.route("/logout")
def logout():
......@@ -50,9 +44,12 @@ def logout():
session.clear()
return resp
def set_session(user, skip_mfa=False):
def set_session(user, password='', skip_mfa=False):
session.clear()
session['user_dn'] = user.dn
# only save the password if we use a user connection
if password and not current_app.config['LDAP_SERVICE_BIND_DN']:
session['user_pw'] = password
session['logintime'] = datetime.datetime.now().timestamp()
session['_csrf_token'] = secrets.token_hex(128)
if skip_mfa:
......@@ -82,7 +79,7 @@ def login():
if not user.is_in_group(current_app.config['ACL_SELFSERVICE_GROUP']):
flash('You do not have access to this service')
return render_template('login.html', ref=request.values.get('ref'))
set_session(user)
set_session(user, password=password)
return redirect(url_for('mfa.auth', ref=request.values.get('ref', url_for('index'))))
def get_current_user():
......
......@@ -97,6 +97,6 @@ def signup_confirm_submit(token):
return render_template('signup/confirm.html', signup=signup, error=msg)
db.session.commit()
ldap.session.commit()
set_session(user, skip_mfa=True)
set_session(user, password=request.form['password'], skip_mfa=True)
flash('Your account was successfully created')
return redirect(url_for('selfservice.index'))
......@@ -47,11 +47,11 @@ def update(uid=None):
return redirect(url_for('user.show'))
else:
user = User.query.filter_by(uid=uid).first_or_404()
if not user.set_mail(request.form['mail']):
if user.mail != request.form['mail'] and not user.set_mail(request.form['mail']):
flash('Mail is invalid')
return redirect(url_for('user.show', uid=uid))
new_displayname = request.form['displayname'] if request.form['displayname'] else request.form['loginname']
if not user.set_displayname(new_displayname):
if user.displayname != new_displayname and not user.set_displayname(new_displayname):
flash('Display name does not meet requirements')
return redirect(url_for('user.show', uid=uid))
new_password = request.form.get('password')
......