From 13c4f75fbc9d8a17fcb881abb2f0ec54cd3b3a6b Mon Sep 17 00:00:00 2001 From: sistason <c3infra@sistason.de> Date: Sat, 1 May 2021 15:34:22 +0000 Subject: [PATCH] Add user-connection and proper connection teardowns --- README.md | 10 +++ deps/ldapalchemy | 2 +- ldap_server_entries.json | 2 +- tests/test_invite.py | 2 +- tests/test_selfservice.py | 62 +++++++++----- tests/test_session.py | 3 + tests/test_user.py | 137 ++++++++++++++++++++++++------ tests/utils.py | 14 ++- uffd/__init__.py | 26 +++++- uffd/default_config.cfg | 10 ++- uffd/invite/views.py | 1 + uffd/ldap.py | 95 ++++++++++++++++++++- uffd/navbar.py | 54 +++++++----- uffd/selfservice/views.py | 7 +- uffd/session/templates/login.html | 2 + uffd/session/views.py | 49 ++++++----- uffd/signup/views.py | 2 +- uffd/user/views_user.py | 4 +- 18 files changed, 369 insertions(+), 113 deletions(-) diff --git a/README.md b/README.md index 71a37bb5..e6661c0c 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,16 @@ hook-pre-app = exec:FLASK_APP=uffd flask db upgrade tabs. +## Bind with service account or as user? + +Uffd can use a dedicated service account for LDAP operations by setting `LDAP_SERVICE_BIND_DN`. +Leave that variable blank to use anonymouse bind. +Or set `LDAP_SERVICE_USER_BIND` to use the credentials of the currently logged in user. + +If you choose to run with user credentials, some features are not available, like password resets +or self signup, since in both cases, no user credentials can exist. + + ## OAuth2 Single-Sign-On Provider Other services can use uffd as an OAuth2.0-based authentication provider. diff --git a/deps/ldapalchemy b/deps/ldapalchemy index 34cdfca2..7a232d30 160000 --- a/deps/ldapalchemy +++ b/deps/ldapalchemy @@ -1 +1 @@ -Subproject commit 34cdfca289eb4bcb79c4580135ba971c7a2437d4 +Subproject commit 7a232d305fda3e261b6f8d3c0958a16f4c2e8d8b diff --git a/ldap_server_entries.json b/ldap_server_entries.json index 5d98bb39..00d2aaea 100644 --- a/ldap_server_entries.json +++ b/ldap_server_entries.json @@ -1,7 +1,7 @@ { "entries": [ { - "dn": "uid=testuser,ou=users,dc=example,dc=com", + "dn": "uid=testuser,ou=users,dc=example,dc=com", "raw": { "cn": [ "Test User" diff --git a/tests/test_invite.py b/tests/test_invite.py index d488b84f..c73130b0 100644 --- a/tests/test_invite.py +++ b/tests/test_invite.py @@ -58,7 +58,7 @@ class TestInviteModel(UffdTestCase): invite.reset() self.assertFalse(invite.active) - def test_reset_disabled(self): + def test_reset_single_use(self): invite = Invite(valid_until=datetime.datetime.now() + datetime.timedelta(seconds=60), single_use=False) invite.used = True invite.disable() diff --git a/tests/test_selfservice.py b/tests/test_selfservice.py index 13b15cf0..1d6ab94b 100644 --- a/tests/test_selfservice.py +++ b/tests/test_selfservice.py @@ -3,18 +3,20 @@ import unittest from flask import url_for -from uffd.ldap import ldap -from uffd import user +# These imports are required, because otherwise we get circular imports?! +from uffd import ldap, user from uffd.session.views import get_current_user from uffd.selfservice.models import MailToken, PasswordToken from uffd.user.models import User -from uffd import create_app, db, ldap +from uffd import create_app, db from utils import dump, UffdTestCase -def get_ldap_password(): - return User.query.get('uid=testuser,ou=users,dc=example,dc=com').pwhash + +def get_user(): + return User.query.get('uid=testuser,ou=users,dc=example,dc=com') + class TestSelfservice(UffdTestCase): def login(self): @@ -137,6 +139,8 @@ class TestSelfservice(UffdTestCase): self.assertEqual(len(tokens), 0) def test_forgot_password(self): + if self.use_userconnection: + self.skipTest('Password Reset is not possible in user mode') user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') r = self.client.get(path=url_for('selfservice.forgot_password')) dump('forgot_password', r) @@ -150,6 +154,8 @@ class TestSelfservice(UffdTestCase): self.assertIn(token.token, str(self.app.last_mail.get_content())) def test_forgot_password_wrong_user(self): + if self.use_userconnection: + self.skipTest('Password Reset is not possible in user mode') user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') r = self.client.get(path=url_for('selfservice.forgot_password')) self.assertEqual(r.status_code, 200) @@ -161,6 +167,8 @@ class TestSelfservice(UffdTestCase): self.assertEqual(len(PasswordToken.query.all()), 0) def test_forgot_password_wrong_email(self): + if self.use_userconnection: + self.skipTest('Password Reset is not possible in user mode') user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') r = self.client.get(path=url_for('selfservice.forgot_password'), follow_redirects=True) self.assertEqual(r.status_code, 200) @@ -173,6 +181,8 @@ class TestSelfservice(UffdTestCase): # Regression test for #31 def test_forgot_password_invalid_user(self): + if self.use_userconnection: + self.skipTest('Password Reset is not possible in user mode') r = self.client.post(path=url_for('selfservice.forgot_password'), data={'loginname': '=', 'mail': 'test@example.com'}, follow_redirects=True) dump('forgot_password_submit_invalid_user', r) @@ -181,8 +191,9 @@ class TestSelfservice(UffdTestCase): self.assertEqual(len(PasswordToken.query.all()), 0) def test_token_password(self): - user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') - oldpw = get_ldap_password() + if self.use_userconnection: + self.skipTest('Password Token is not possible in user mode') + user = get_user() token = PasswordToken(loginname=user.loginname) db.session.add(token) db.session.commit() @@ -193,13 +204,12 @@ class TestSelfservice(UffdTestCase): data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True) dump('token_password_submit', r) self.assertEqual(r.status_code, 200) - self.assertNotEqual(oldpw, get_ldap_password()) - # TODO: Verify that the new password is actually correct - self.assertEqual(len(PasswordToken.query.all()), 0) + self.assertTrue(ldap.test_user_bind(user.dn, 'newpassword')) def test_token_password_emptydb(self): - user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') - oldpw = get_ldap_password() + if self.use_userconnection: + self.skipTest('Password Token is not possible in user mode') + user = get_user() r = self.client.get(path=url_for('selfservice.token_password', token='A'*128), follow_redirects=True) dump('token_password_emptydb', r) self.assertEqual(r.status_code, 200) @@ -209,11 +219,12 @@ class TestSelfservice(UffdTestCase): dump('token_password_emptydb_submit', r) self.assertEqual(r.status_code, 200) self.assertIn(b'Token expired, please try again', r.data) - self.assertEqual(oldpw, get_ldap_password()) + self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword')) def test_token_password_invalid(self): - user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') - oldpw = get_ldap_password() + if self.use_userconnection: + self.skipTest('Password Token is not possible in user mode') + user = get_user() token = PasswordToken(loginname=user.loginname) db.session.add(token) db.session.commit() @@ -226,11 +237,12 @@ class TestSelfservice(UffdTestCase): dump('token_password_invalid_submit', r) self.assertEqual(r.status_code, 200) self.assertIn(b'Token expired, please try again', r.data) - self.assertEqual(oldpw, get_ldap_password()) + self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword')) def test_token_password_expired(self): - user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') - oldpw = get_ldap_password() + if self.use_userconnection: + self.skipTest('Password Token is not possible in user mode') + user = get_user() token = PasswordToken(loginname=user.loginname, created=(datetime.datetime.now() - datetime.timedelta(days=10))) db.session.add(token) @@ -244,11 +256,12 @@ class TestSelfservice(UffdTestCase): dump('token_password_invalid_expired_submit', r) self.assertEqual(r.status_code, 200) self.assertIn(b'Token expired, please try again', r.data) - self.assertEqual(oldpw, get_ldap_password()) + self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword')) def test_token_password_different_passwords(self): - user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') - oldpw = get_ldap_password() + if self.use_userconnection: + self.skipTest('Password Token is not possible in user mode') + user = get_user() token = PasswordToken(loginname=user.loginname) db.session.add(token) db.session.commit() @@ -258,7 +271,12 @@ class TestSelfservice(UffdTestCase): data={'password1': 'newpassword', 'password2': 'differentpassword'}, follow_redirects=True) dump('token_password_different_passwords_submit', r) self.assertEqual(r.status_code, 200) - self.assertEqual(oldpw, get_ldap_password()) + self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword')) + class TestSelfserviceOL(TestSelfservice): use_openldap = True + + +class TestSelfserviceOLUser(TestSelfserviceOL): + use_userconnection = True diff --git a/tests/test_session.py b/tests/test_session.py index f9814ba6..dae41ab5 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -135,3 +135,6 @@ class TestSession(UffdTestCase): class TestSessionOL(TestSession): use_openldap = True + +class TestSessionOLUser(TestSessionOL): + use_userconnection = True diff --git a/tests/test_user.py b/tests/test_user.py index 63e429f7..8fae44db 100644 --- a/tests/test_user.py +++ b/tests/test_user.py @@ -1,5 +1,4 @@ import datetime -import time import unittest from flask import url_for, session @@ -7,20 +6,17 @@ from flask import url_for, session # These imports are required, because otherwise we get circular imports?! from uffd import ldap, user +from uffd.session.views import get_current_user from uffd.user.models import User from uffd.role.models import Role -from uffd.session.views import get_current_user, is_valid_session -from uffd.mfa.models import MFAMethod, MFAType, RecoveryCodeMethod, TOTPMethod, WebauthnMethod, _hotp from uffd import create_app, db from utils import dump, UffdTestCase + def get_user(): return User.query.get('uid=testuser,ou=users,dc=example,dc=com') -def get_user_password(): - return get_user().pwhash - def get_admin(): return User.query.get('uid=testadmin,ou=users,dc=example,dc=com') @@ -52,6 +48,14 @@ class TestUserModel(UffdTestCase): class TestUserModelOL(TestUserModel): use_openldap = True +class TestUserModelOLUser(TestUserModelOL): + use_userconnection = True + + def setUp(self): + super().setUp() + self.client.post(path=url_for('session.login'), + data={'loginname': 'testadmin', 'password': 'adminpassword'}, follow_redirects=True) + class TestUserViews(UffdTestCase): def setUp(self): super().setUp() @@ -90,7 +94,8 @@ class TestUserViews(UffdTestCase): role1 = Role(name='role1') print('test_new', role1.db_members, role1.members, user.roles) self.assertEqual(roles, ['base', 'role1']) - # TODO: check password hash + # TODO: confirm Mail is send, login not yet possible + #self.assertTrue(ldap.test_user_bind(user.dn, 'newpassword')) def test_new_invalid_loginname(self): r = self.client.post(path=url_for('user.update'), @@ -134,7 +139,6 @@ class TestUserViews(UffdTestCase): role2.members.add(user) db.session.commit() role1_id = role1.id - oldpw = get_user_password() r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) dump('user_update', r) self.assertEqual(r.status_code, 200) @@ -149,12 +153,11 @@ class TestUserViews(UffdTestCase): self.assertEqual(_user.mail, 'newuser@example.com') self.assertEqual(_user.uid, user.uid) self.assertEqual(_user.loginname, user.loginname) - self.assertEqual(get_user_password(), oldpw) + self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword')) self.assertEqual(roles, ['base', 'role1']) def test_update_password(self): user = get_user() - oldpw = get_user_password() r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) self.assertEqual(r.status_code, 200) r = self.client.post(path=url_for('user.update', uid=user.uid), @@ -167,12 +170,12 @@ class TestUserViews(UffdTestCase): self.assertEqual(_user.mail, 'newuser@example.com') self.assertEqual(_user.uid, user.uid) self.assertEqual(_user.loginname, user.loginname) - self.assertNotEqual(get_user_password(), oldpw) + self.assertTrue(ldap.test_user_bind(_user.dn, 'newpassword')) + @unittest.skip('See #28') def test_update_invalid_password(self): user = get_user() - oldpw = get_user_password() r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) self.assertEqual(r.status_code, 200) r = self.client.post(path=url_for('user.update', uid=user.uid), @@ -181,14 +184,13 @@ class TestUserViews(UffdTestCase): dump('user_update_password', r) self.assertEqual(r.status_code, 200) _user = get_user() - self.assertEqual(get_user_password(), oldpw) + self.assertFalse(ldap.test_user_bind(_user.dn, 'A')) self.assertEqual(_user.displayname, user.displayname) self.assertEqual(_user.mail, user.mail) self.assertEqual(_user.loginname, user.loginname) def test_update_empty_email(self): user = get_user() - oldpw = get_user_password() r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) self.assertEqual(r.status_code, 200) r = self.client.post(path=url_for('user.update', uid=user.uid), @@ -200,11 +202,10 @@ class TestUserViews(UffdTestCase): self.assertEqual(_user.displayname, user.displayname) self.assertEqual(_user.mail, user.mail) self.assertEqual(_user.loginname, user.loginname) - self.assertEqual(get_user_password(), oldpw) + self.assertFalse(ldap.test_user_bind(_user.dn, 'newpassword')) def test_update_invalid_display_name(self): user = get_user() - oldpw = get_user_password() r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) self.assertEqual(r.status_code, 200) r = self.client.post(path=url_for('user.update', uid=user.uid), @@ -216,7 +217,7 @@ class TestUserViews(UffdTestCase): self.assertEqual(_user.displayname, user.displayname) self.assertEqual(_user.mail, user.mail) self.assertEqual(_user.loginname, user.loginname) - self.assertEqual(get_user_password(), oldpw) + self.assertFalse(ldap.test_user_bind(_user.dn, 'newpassword')) def test_show(self): r = self.client.get(path=url_for('user.show', uid=get_user().uid), follow_redirects=True) @@ -258,77 +259,158 @@ newuser12,newuser12@example.com,{role1.id};{role1.id} dump('user_csvimport', r) self.assertEqual(r.status_code, 200) user = User.query.get('uid=newuser1,ou=users,dc=example,dc=com') - roles = sorted([r.name for r in user.roles]) self.assertIsNotNone(user) self.assertEqual(user.loginname, 'newuser1') self.assertEqual(user.displayname, 'newuser1') self.assertEqual(user.mail, 'newuser1@example.com') + roles = sorted([r.name for r in user.roles]) self.assertEqual(roles, ['base']) user = User.query.get('uid=newuser2,ou=users,dc=example,dc=com') - roles = sorted([r.name for r in user.roles]) self.assertIsNotNone(user) self.assertEqual(user.loginname, 'newuser2') self.assertEqual(user.displayname, 'newuser2') self.assertEqual(user.mail, 'newuser2@example.com') + roles = sorted([r.name for r in user.roles]) self.assertEqual(roles, ['base', 'role1']) user = User.query.get('uid=newuser3,ou=users,dc=example,dc=com') - roles = sorted([r.name for r in user.roles]) self.assertIsNotNone(user) self.assertEqual(user.loginname, 'newuser3') self.assertEqual(user.displayname, 'newuser3') self.assertEqual(user.mail, 'newuser3@example.com') + roles = sorted([r.name for r in user.roles]) self.assertEqual(roles, ['base', 'role1', 'role2']) user = User.query.get('uid=newuser4,ou=users,dc=example,dc=com') - roles = sorted([r.name for r in user.roles]) self.assertIsNotNone(user) self.assertEqual(user.loginname, 'newuser4') self.assertEqual(user.displayname, 'newuser4') self.assertEqual(user.mail, 'newuser4@example.com') + roles = sorted([r.name for r in user.roles]) self.assertEqual(roles, ['base']) user = User.query.get('uid=newuser5,ou=users,dc=example,dc=com') - roles = sorted([r.name for r in user.roles]) self.assertIsNotNone(user) self.assertEqual(user.loginname, 'newuser5') self.assertEqual(user.displayname, 'newuser5') self.assertEqual(user.mail, 'newuser5@example.com') + roles = sorted([r.name for r in user.roles]) self.assertEqual(roles, ['base']) user = User.query.get('uid=newuser6,ou=users,dc=example,dc=com') - roles = sorted([r.name for r in user.roles]) self.assertIsNotNone(user) self.assertEqual(user.loginname, 'newuser6') self.assertEqual(user.displayname, 'newuser6') self.assertEqual(user.mail, 'newuser6@example.com') + roles = sorted([r.name for r in user.roles]) self.assertEqual(roles, ['base', 'role1', 'role2']) self.assertIsNone(User.query.get('uid=newuser7,ou=users,dc=example,dc=com')) self.assertIsNone(User.query.get('uid=newuser8,ou=users,dc=example,dc=com')) self.assertIsNone(User.query.get('uid=newuser9,ou=users,dc=example,dc=com')) user = User.query.get('uid=newuser10,ou=users,dc=example,dc=com') - roles = sorted([r.name for r in user.roles]) self.assertIsNotNone(user) self.assertEqual(user.loginname, 'newuser10') self.assertEqual(user.displayname, 'newuser10') self.assertEqual(user.mail, 'newuser10@example.com') + roles = sorted([r.name for r in user.roles]) self.assertEqual(roles, ['base']) user = User.query.get('uid=newuser11,ou=users,dc=example,dc=com') - roles = sorted([r.name for r in user.roles]) self.assertIsNotNone(user) self.assertEqual(user.loginname, 'newuser11') self.assertEqual(user.displayname, 'newuser11') self.assertEqual(user.mail, 'newuser11@example.com') # Currently the csv import is not very robust, imho newuser11 should have role1 and role2! + roles = sorted([r.name for r in user.roles]) #self.assertEqual(roles, ['base', 'role1', 'role2']) self.assertEqual(roles, ['base', 'role2']) user = User.query.get('uid=newuser12,ou=users,dc=example,dc=com') - roles = sorted([r.name for r in user.roles]) self.assertIsNotNone(user) self.assertEqual(user.loginname, 'newuser12') self.assertEqual(user.displayname, 'newuser12') self.assertEqual(user.mail, 'newuser12@example.com') + roles = sorted([r.name for r in user.roles]) self.assertEqual(roles, ['base', 'role1']) class TestUserViewsOL(TestUserViews): use_openldap = True +class TestUserViewsOLUserAsAdmin(TestUserViewsOL): + use_userconnection = True + +class TestUserViewsOLUserAsUser(UffdTestCase): + use_userconnection = True + use_openldap = True + + def setUp(self): + super().setUp() + self.client.post(path=url_for('session.login'), + data={'loginname': 'testuser', 'password': 'userpassword'}, follow_redirects=True) + + def test_view_own(self): + user_ = get_user() + r = self.client.get(path=url_for('user.show', uid=user_.uid), follow_redirects=True) + dump('user_view_own', r) + self.assertEqual(r.status_code, 200) + + def test_view_others(self): + admin = get_admin() + r = self.client.get(path=url_for('user.show', uid=admin.uid), follow_redirects=True) + dump('user_view_others', r) + self.assertEqual(r.status_code, 200) + + def test_view_index(self): + r = self.client.get(path=url_for('user.index'), follow_redirects=True) + dump('user_index', r) + self.assertEqual(r.status_code, 200) + + def test_update_other_user(self): + user_ = get_admin() + db.session.add(Role(name='base')) + role1 = Role(name='role1') + db.session.add(role1) + role2 = Role(name='role2') + db.session.add(role2) + role2.members.add(user_) + db.session.commit() + role1_id = role1.id + r = self.client.get(path=url_for('user.show', uid=user_.uid), follow_redirects=True) + dump('user_update', r) + self.assertEqual(r.status_code, 200) + r = self.client.post(path=url_for('user.update', uid=user_.uid), + data={'loginname': user_.loginname, 'mail': user_.mail, 'displayname': user_.displayname + "12345", + f'role-{role1_id}': '1', 'password': ''}, follow_redirects=True) + dump('user_update_submit', r) + self.assertEqual(r.status_code, 200) + _user = get_admin() + self.assertEqual(_user.displayname, user_.displayname) + self.assertEqual(_user.mail, user_.mail) + self.assertEqual(_user.uid, user_.uid) + self.assertEqual(_user.loginname, user_.loginname) + + def test_new(self): + db.session.add(Role(name='base')) + role1 = Role(name='role1') + db.session.add(role1) + role2 = Role(name='role2') + db.session.add(role2) + db.session.commit() + role1_id = role1.id + r = self.client.get(path=url_for('user.show'), follow_redirects=True) + dump('user_new', r) + self.assertEqual(r.status_code, 200) + self.assertIsNone(User.query.get('uid=newuser,ou=users,dc=example,dc=com')) + r = self.client.post(path=url_for('user.update'), + data={'loginname': 'newuser', 'mail': 'newuser@example.com', 'displayname': 'New User', + f'role-{role1_id}': '1', 'password': 'newpassword'}, follow_redirects=True) + dump('user_new_submit', r) + self.assertEqual(r.status_code, 200) + user = User.query.get('uid=newuser,ou=users,dc=example,dc=com') + self.assertIsNone(user) + + def test_delete(self): + user = get_admin() + r = self.client.get(path=url_for('user.delete', uid=user.uid), follow_redirects=True) + dump('user_delete', r) + self.assertEqual(r.status_code, 200) + self.assertIsNotNone(get_admin()) + + class TestGroupViews(UffdTestCase): def setUp(self): super().setUp() @@ -347,3 +429,6 @@ class TestGroupViews(UffdTestCase): class TestGroupViewsOL(TestGroupViews): use_openldap = True + +class TestGroupViewsOLUser(TestGroupViewsOL): + use_userconnection = True diff --git a/tests/utils.py b/tests/utils.py index 8567d49d..3d198c55 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -15,7 +15,7 @@ def dump(basename, resp): return os.makedirs(root, exist_ok=True) path = os.path.join(root, basename+suffix) - with open(path, 'xb') as f: + with open(path, 'wb') as f: f.write(resp.data) def db_flush(): @@ -25,6 +25,7 @@ def db_flush(): class UffdTestCase(unittest.TestCase): use_openldap = False + use_userconnection = False def setUp(self): self.dir = tempfile.mkdtemp() @@ -37,13 +38,22 @@ class UffdTestCase(unittest.TestCase): 'SECRET_KEY': 'DEBUGKEY', 'LDAP_SERVICE_MOCK': True, 'MAIL_SKIP_SEND': True, + 'SELF_SIGNUP': True, + 'ENABLE_INVITE': True, + 'ENABLE_PASSWORDRESET': True } if self.use_openldap: if not os.environ.get('UNITTEST_OPENLDAP'): self.skipTest('OPENLDAP_TESTING not set') config['LDAP_SERVICE_MOCK'] = False config['LDAP_SERVICE_URL'] = 'ldap://localhost' - config['LDAP_SERVICE_BIND_DN'] = 'cn=uffd,ou=system,dc=example,dc=com' + if self.use_userconnection: + config['LDAP_SERVICE_USER_BIND'] = True + config['SELF_SIGNUP'] = False + config['ENABLE_INVITE'] = False + config['ENABLE_PASSWORDRESET'] = False + else: + config['LDAP_SERVICE_BIND_DN'] = 'cn=uffd,ou=system,dc=example,dc=com' config['LDAP_SERVICE_BIND_PASSWORD'] = 'uffd-ldap-password' os.system("ldapdelete -c -D 'cn=uffd,ou=system,dc=example,dc=com' -w 'uffd-ldap-password' -H 'ldap://localhost' -f ldap_server_entries_cleanup.ldif > /dev/null 2>&1") os.system("ldapadd -c -D 'cn=uffd,ou=system,dc=example,dc=com' -w 'uffd-ldap-password' -H 'ldap://localhost' -f ldap_server_entries_add.ldif") diff --git a/uffd/__init__.py b/uffd/__init__.py index fbe6993c..557044ca 100644 --- a/uffd/__init__.py +++ b/uffd/__init__.py @@ -2,17 +2,18 @@ import os import secrets import sys -from flask import Flask, redirect, url_for +from flask import Flask, redirect, url_for, request from werkzeug.routing import IntegerConverter from werkzeug.serving import make_ssl_devcert from werkzeug.contrib.profiler import ProfilerMiddleware +from werkzeug.exceptions import InternalServerError from flask_migrate import Migrate sys.path.append('deps/ldapalchemy') # pylint: disable=wrong-import-position from uffd.database import db, SQLAlchemyJSON -from uffd.ldap import ldap +import uffd.ldap from uffd.template_helper import register_template_helper from uffd.navbar import setup_navbar # pylint: enable=wrong-import-position @@ -51,18 +52,35 @@ def create_app(test_config=None): # pylint: disable=too-many-locals from uffd import user, selfservice, role, mail, session, csrf, mfa, oauth2, services, signup, invite # pylint: enable=C0415 - for i in user.bp + selfservice.bp + role.bp + mail.bp + session.bp + csrf.bp + mfa.bp + oauth2.bp + services.bp + signup.bp + invite.bp: + for i in user.bp + selfservice.bp + role.bp + mail.bp + session.bp + csrf.bp + mfa.bp + oauth2.bp + services.bp: app.register_blueprint(i) + if app.config['LDAP_SERVICE_USER_BIND'] and (app.config['ENABLE_INVITE'] or + app.config['SELF_SIGNUP'] or + app.config['ENABLE_PASSWORDRESET']): + raise InternalServerError(description="You cannot use INVITES, SIGNUP or PASSWORDRESET when using a USER_BIND!") + + if app.config['ENABLE_INVITE'] or app.config['SELF_SIGNUP']: + for i in signup.bp: + app.register_blueprint(i) + if app.config['ENABLE_INVITE']: + for i in invite.bp: + app.register_blueprint(i) + @app.shell_context_processor def push_request_context(): #pylint: disable=unused-variable app.test_request_context().push() # LDAP ORM requires request context - return {'db': db, 'ldap': ldap} + return {'db': db, 'ldap': uffd.ldap.ldap} @app.route("/") def index(): #pylint: disable=unused-variable return redirect(url_for('selfservice.index')) + @app.teardown_request + def close_connection(exception): #pylint: disable=unused-variable,unused-argument + if hasattr(request, "ldap_connection"): + request.ldap_connection.unbind() + @app.cli.command("gendevcert", help='Generates a self-signed TLS certificate for development') def gendevcert(): #pylint: disable=unused-variable if os.path.exists('devcert.crt') or os.path.exists('devcert.key'): diff --git a/uffd/default_config.cfg b/uffd/default_config.cfg index dde6df5d..1c08de08 100644 --- a/uffd/default_config.cfg +++ b/uffd/default_config.cfg @@ -37,10 +37,13 @@ LDAP_MAIL_UID_ATTRIBUTE="uid" LDAP_MAIL_RECEIVERS_ATTRIBUTE="mailacceptinggeneralid" LDAP_MAIL_DESTINATIONS_ATTRIBUTE="maildrop" -LDAP_SERVICE_BIND_DN="" -LDAP_SERVICE_BIND_PASSWORD="" LDAP_SERVICE_URL="ldapi:///" LDAP_SERVICE_USE_STARTTLS=True +LDAP_SERVICE_BIND_DN="" +LDAP_SERVICE_BIND_PASSWORD="" +# Connections use LDAP_SERVICE_BIND_DN if LDAP_SERVICE_USER_BIND=False, otherwise they use the users credentials. +# When using a user connection, some features are not available, since they require a service connection +LDAP_SERVICE_USER_BIND=False SESSION_LIFETIME_SECONDS=3600 # CSRF protection @@ -58,6 +61,9 @@ MAIL_PASSWORD='*****' MAIL_USE_STARTTLS=True MAIL_FROM_ADDRESS='foo@bar.com' +# The following settings are not available when using a user connection +ENABLE_INVITE=True +ENABLE_PASSWORDRESET=True # Do not enable this on a public service! There is no spam protection implemented at the moment. SELF_SIGNUP=False diff --git a/uffd/invite/views.py b/uffd/invite/views.py index 79b3f7b2..a0a00e86 100644 --- a/uffd/invite/views.py +++ b/uffd/invite/views.py @@ -15,6 +15,7 @@ from uffd.navbar import register_navbar from uffd.ratelimit import host_ratelimit, format_delay from uffd.signup.views import signup_ratelimit + bp = Blueprint('invite', __name__, template_folder='templates', url_prefix='/invite/') def invite_acl(): diff --git a/uffd/ldap.py b/uffd/ldap.py index 2d7f6f93..1d87efe8 100644 --- a/uffd/ldap.py +++ b/uffd/ldap.py @@ -1,9 +1,44 @@ -from flask import current_app, request, abort +import base64 +import hashlib + +from flask import current_app, request, abort, session import ldap3 +from ldap3.core.exceptions import LDAPBindError, LDAPPasswordIsMandatoryError, LDAPInvalidDnError from ldapalchemy import LDAPMapper, LDAPCommitError # pylint: disable=unused-import from ldapalchemy.model import Query +from ldapalchemy.core import encode_filter + + +def check_hashed(password_hash, password): + '''Return if password matches a LDAP-compatible password hash (only used for LDAP_SERVICE_MOCK) + + :param password_hash: LDAP-compatible password hash (plain password or "{ssha512}...") + :type password_hash: bytes + :param password: Plain, (ideally) utf8-encoded password + :type password: bytes''' + algorithms = { + b'md5': 'MD5', + b'sha': 'SHA1', + b'sha256': 'SHA256', + b'sha384': 'SHA384', + b'sha512': 'SHA512' + } + if not password_hash.startswith(b'{'): + return password_hash == password + algorithm, data = password_hash[1:].split(b'}', 1) + data = base64.b64decode(data) + if algorithm in algorithms: + ctx = hashlib.new(algorithms[algorithm], password) + return data == ctx.digest() + if algorithm.startswith(b's') and algorithm[1:] in algorithms: + ctx = hashlib.new(algorithms[algorithm[1:]], password) + salt = data[ctx.digest_size:] + ctx.update(salt) + return data == ctx.digest() + salt + raise NotImplementedError() + class FlaskQuery(Query): def get_or_404(self, dn): @@ -18,9 +53,54 @@ class FlaskQuery(Query): abort(404) return res + +def test_user_bind(bind_dn, bind_pw): + try: + if current_app.config.get('LDAP_SERVICE_MOCK', False): + # Since we reuse the same conn and ldap3's mock only supports plain + # passwords for bind and rebind, we simulate the bind by retrieving + # and checking the password hash ourselves. + conn = ldap.get_connection() + conn.search(bind_dn, search_filter='(objectclass=*)', search_scope=ldap3.BASE, + attributes=ldap3.ALL_ATTRIBUTES) + if not conn.response: + return False + if not conn.response[0]['attributes'].get('userPassword'): + return False + return check_hashed(conn.response[0]['attributes']['userPassword'][0], bind_pw.encode()) + + server = ldap3.Server(current_app.config["LDAP_SERVICE_URL"]) + conn = connect_and_bind_to_ldap(server, bind_dn, bind_pw) + if not conn: + return False + except (LDAPBindError, LDAPPasswordIsMandatoryError, LDAPInvalidDnError): + return False + + conn.search(conn.user, encode_filter(current_app.config["LDAP_USER_SEARCH_FILTER"])) + lazy_entries = conn.entries + # Do not end the connection when using mock, as it will be reused afterwards + if not current_app.config.get('LDAP_SERVICE_MOCK', False): + conn.unbind() + return len(lazy_entries) == 1 + + +def connect_and_bind_to_ldap(server, bind_dn, bind_pw): + # Using auto_bind cannot close the connection, so define the connection with extra steps + connection = ldap3.Connection(server, bind_dn, bind_pw) + if connection.closed: + connection.open(read_server_info=False) + if current_app.config["LDAP_SERVICE_USE_STARTTLS"]: + connection.start_tls(read_server_info=False) + if not connection.bind(read_server_info=True): + connection.unbind() + raise LDAPBindError + return connection + + class FlaskLDAPMapper(LDAPMapper): def __init__(self): super().__init__() + class Model(self.Model): query_class = FlaskQuery @@ -48,9 +128,16 @@ class FlaskLDAPMapper(LDAPMapper): current_app.ldap_mock.bind() return current_app.ldap_mock server = ldap3.Server(current_app.config["LDAP_SERVICE_URL"], get_info=ldap3.ALL) - auto_bind = ldap3.AUTO_BIND_TLS_BEFORE_BIND if current_app.config["LDAP_SERVICE_USE_STARTTLS"] else True - request.ldap_connection = ldap3.Connection(server, current_app.config["LDAP_SERVICE_BIND_DN"], - current_app.config["LDAP_SERVICE_BIND_PASSWORD"], auto_bind=auto_bind) + + if current_app.config['LDAP_SERVICE_USER_BIND']: + bind_dn = session['user_dn'] + bind_pw = session['user_pw'] + else: + bind_dn = current_app.config["LDAP_SERVICE_BIND_DN"] + bind_pw = current_app.config["LDAP_SERVICE_BIND_PASSWORD"] + + request.ldap_connection = connect_and_bind_to_ldap(server, bind_dn, bind_pw) return request.ldap_connection + ldap = FlaskLDAPMapper() diff --git a/uffd/navbar.py b/uffd/navbar.py index dc46b763..59198cc6 100644 --- a/uffd/navbar.py +++ b/uffd/navbar.py @@ -1,9 +1,6 @@ -# pylint: disable=invalid-name -navbarList = [] -# pylint: enable=invalid-name - def setup_navbar(app): - app.jinja_env.globals['getnavbar'] = lambda: [n for n in navbarList if n['visible']()] + app.navbarList = [] + app.jinja_env.globals['getnavbar'] = lambda: [n for n in app.navbarList if n['visible']()] # iconlib can be 'bootstrap' # ( see: http://getbootstrap.com/components/#glyphicons ) @@ -12,22 +9,37 @@ def setup_navbar(app): # visible is a function that returns "True" if this icon should be visible in the calling context def register_navbar(name, iconlib='fa', icon=None, group=None, endpoint=None, blueprint=None, visible=None): def wrapper(func): - urlendpoint = endpoint - if not endpoint: - # pylint: disable=protected-access - if blueprint: - urlendpoint = "{}.{}".format(blueprint.name, func.__name__) - else: - urlendpoint = func.__name_ + def deferred_call(state): + urlendpoint = endpoint + if not endpoint: + # pylint: disable=protected-access + if blueprint: + urlendpoint = "{}.{}".format(blueprint.name, func.__name__) + else: + urlendpoint = func.__name_ # pylint: enable=protected-access - item = {} - item['iconlib'] = iconlib - item['icon'] = icon - item['group'] = group - item['endpoint'] = urlendpoint - item['name'] = name - item['blueprint'] = blueprint - item['visible'] = visible or (lambda: True) - navbarList.append(item) + item = {} + item['iconlib'] = iconlib + item['icon'] = icon + item['group'] = group + item['endpoint'] = urlendpoint + item['name'] = name + item['blueprint'] = blueprint + item['visible'] = visible or (lambda: True) + + state.app.navbarList.append(item) + + if blueprint: + blueprint.record_once(deferred_call) + else: + class StateMock: + def __init__(self, app): + self.app = app + # pylint: disable=C0415 + from flask import current_app + # pylint: enable=C0415 + deferred_call(StateMock(current_app)) + return func + return wrapper diff --git a/uffd/selfservice/views.py b/uffd/selfservice/views.py index 40dc49e0..44556f36 100644 --- a/uffd/selfservice/views.py +++ b/uffd/selfservice/views.py @@ -4,7 +4,7 @@ import smtplib from email.message import EmailMessage import email.utils -from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app +from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session from uffd.navbar import register_navbar from uffd.csrf import csrf_protect @@ -29,6 +29,7 @@ def index(): @csrf_protect(blueprint=bp) @login_required() def update(): + password_changed = False user = get_current_user() if request.values['displayname'] != user.displayname: if user.set_displayname(request.values['displayname']): @@ -41,12 +42,16 @@ def update(): else: if user.set_password(request.values['password1']): flash('Password changed.') + password_changed = True else: flash('Password could not be set.') if request.values['mail'] != user.mail: send_mail_verification(user.loginname, request.values['mail']) flash('We sent you an email, please verify your mail address.') ldap.session.commit() + # When using a user_connection, update the connection on password-change + if password_changed and current_app.config['LDAP_SERVICE_USER_BIND']: + session['user_pw'] = request.values['password1'] return redirect(url_for('selfservice.index')) @bp.route("/passwordreset", methods=(['GET', 'POST'])) diff --git a/uffd/session/templates/login.html b/uffd/session/templates/login.html index e3a17c57..15c0f8f0 100644 --- a/uffd/session/templates/login.html +++ b/uffd/session/templates/login.html @@ -25,7 +25,9 @@ {% if config['SELF_SIGNUP'] %} <a href="{{ url_for("signup.signup_start") }}" class="float-left">Register</a> {% endif %} + {% if config['ENABLE_PASSWORDRESET'] %} <a href="{{ url_for("selfservice.forgot_password") }}" class="float-right">Forgot Password?</a> + {% endif %} </div> </div> </div> diff --git a/uffd/session/views.py b/uffd/session/views.py index a2ff2125..0d49a799 100644 --- a/uffd/session/views.py +++ b/uffd/session/views.py @@ -4,12 +4,8 @@ import functools from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session, abort -import ldap3 -from ldap3.core.exceptions import LDAPBindError, LDAPPasswordIsMandatoryError -from ldapalchemy.core import encode_filter - from uffd.user.models import User -from uffd.ldap import ldap +from uffd.ldap import ldap, test_user_bind, LDAPInvalidDnError, LDAPBindError, LDAPPasswordIsMandatoryError from uffd.ratelimit import Ratelimit, host_ratelimit, format_delay bp = Blueprint("session", __name__, template_folder='templates', url_prefix='/') @@ -18,29 +14,29 @@ login_ratelimit = Ratelimit('login', 1*60, 3) def login_get_user(loginname, password): dn = User(loginname=loginname).dn - if current_app.config.get('LDAP_SERVICE_MOCK', False): - conn = ldap.get_connection() - # Since we reuse the same conn for all calls to `user_conn()` we - # simulate the password check by rebinding. Note that ldap3's mocking - # implementation just compares the string in the objects's userPassword - # field with the password, no support for hashing or OpenLDAP-style - # password-prefixes ("{PLAIN}..." or "{ssha512}..."). - try: - if not conn.rebind(dn, password): - return None - except (LDAPBindError, LDAPPasswordIsMandatoryError): + + # If we use a service connection, test user bind seperately + if not current_app.config['LDAP_SERVICE_USER_BIND'] or current_app.config.get('LDAP_SERVICE_MOCK', False): + if not test_user_bind(dn, password): return None + # If we use a user connection, just create the connection normally else: - server = ldap3.Server(current_app.config["LDAP_SERVICE_URL"], get_info=ldap3.ALL) - auto_bind = ldap3.AUTO_BIND_TLS_BEFORE_BIND if current_app.config["LDAP_SERVICE_USE_STARTTLS"] else True + # ldap.get_connection gets the credentials from the session, so set it here initially + session['user_dn'] = dn + session['user_pw'] = password try: - conn = ldap3.Connection(server, dn, password, auto_bind=auto_bind) + ldap.get_connection() except (LDAPBindError, LDAPPasswordIsMandatoryError): + session.clear() return None - conn.search(conn.user, encode_filter(current_app.config["LDAP_USER_SEARCH_FILTER"])) - if len(conn.entries) != 1: - return None - return User.query.get(dn) + + try: + user = User.query.get(dn) + if user: + return user + except LDAPInvalidDnError: + pass + return None @bp.route("/logout") def logout(): @@ -50,9 +46,12 @@ def logout(): session.clear() return resp -def set_session(user, skip_mfa=False): +def set_session(user, password='', skip_mfa=False): session.clear() session['user_dn'] = user.dn + # only save the password if we use a user connection + if password and current_app.config['LDAP_SERVICE_USER_BIND']: + session['user_pw'] = password session['logintime'] = datetime.datetime.now().timestamp() session['_csrf_token'] = secrets.token_hex(128) if skip_mfa: @@ -82,7 +81,7 @@ def login(): if not user.is_in_group(current_app.config['ACL_SELFSERVICE_GROUP']): flash('You do not have access to this service') return render_template('login.html', ref=request.values.get('ref')) - set_session(user) + set_session(user, password=password) return redirect(url_for('mfa.auth', ref=request.values.get('ref', url_for('index')))) def get_current_user(): diff --git a/uffd/signup/views.py b/uffd/signup/views.py index ce9b139e..ba0f58c6 100644 --- a/uffd/signup/views.py +++ b/uffd/signup/views.py @@ -97,6 +97,6 @@ def signup_confirm_submit(token): return render_template('signup/confirm.html', signup=signup, error=msg) db.session.commit() ldap.session.commit() - set_session(user, skip_mfa=True) + set_session(user, password=request.form['password'], skip_mfa=True) flash('Your account was successfully created') return redirect(url_for('selfservice.index')) diff --git a/uffd/user/views_user.py b/uffd/user/views_user.py index 9a824945..e25f89e7 100644 --- a/uffd/user/views_user.py +++ b/uffd/user/views_user.py @@ -47,11 +47,11 @@ def update(uid=None): return redirect(url_for('user.show')) else: user = User.query.filter_by(uid=uid).first_or_404() - if not user.set_mail(request.form['mail']): + if user.mail != request.form['mail'] and not user.set_mail(request.form['mail']): flash('Mail is invalid') return redirect(url_for('user.show', uid=uid)) new_displayname = request.form['displayname'] if request.form['displayname'] else request.form['loginname'] - if not user.set_displayname(new_displayname): + if user.displayname != new_displayname and not user.set_displayname(new_displayname): flash('Display name does not meet requirements') return redirect(url_for('user.show', uid=uid)) new_password = request.form.get('password') -- GitLab