From 89d106f6b9e809af8c9ed98aeedc8e35303407d5 Mon Sep 17 00:00:00 2001 From: Sistason <c3infra@sistason.de> Date: Mon, 5 Apr 2021 00:51:13 +0200 Subject: [PATCH] fixed password-hash-mocking and added user tests thanks @julianr for https://git.cccv.de/infra/uffd/uffd/-/snippets/5 --- tests/test_selfservice.py | 56 +++++++++++------- tests/test_user.py | 117 +++++++++++++++++++++++++++++++++----- uffd/ldap.py | 54 +++++++++++++++--- 3 files changed, 186 insertions(+), 41 deletions(-) diff --git a/tests/test_selfservice.py b/tests/test_selfservice.py index 13b15cf0..220950fd 100644 --- a/tests/test_selfservice.py +++ b/tests/test_selfservice.py @@ -13,8 +13,10 @@ from uffd import create_app, db, ldap from utils import dump, UffdTestCase -def get_ldap_password(): - return User.query.get('uid=testuser,ou=users,dc=example,dc=com').pwhash + +def get_user(): + return User.query.get('uid=testuser,ou=users,dc=example,dc=com') + class TestSelfservice(UffdTestCase): def login(self): @@ -137,6 +139,8 @@ class TestSelfservice(UffdTestCase): self.assertEqual(len(tokens), 0) def test_forgot_password(self): + if self.use_userconnection: + self.skipTest('Password Reset not possible in user mode') user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') r = self.client.get(path=url_for('selfservice.forgot_password')) dump('forgot_password', r) @@ -150,6 +154,8 @@ class TestSelfservice(UffdTestCase): self.assertIn(token.token, str(self.app.last_mail.get_content())) def test_forgot_password_wrong_user(self): + if self.use_userconnection: + self.skipTest('Password Reset not possible in user mode') user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') r = self.client.get(path=url_for('selfservice.forgot_password')) self.assertEqual(r.status_code, 200) @@ -161,6 +167,8 @@ class TestSelfservice(UffdTestCase): self.assertEqual(len(PasswordToken.query.all()), 0) def test_forgot_password_wrong_email(self): + if self.use_userconnection: + self.skipTest('Password Reset not possible in user mode') user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') r = self.client.get(path=url_for('selfservice.forgot_password'), follow_redirects=True) self.assertEqual(r.status_code, 200) @@ -173,6 +181,8 @@ class TestSelfservice(UffdTestCase): # Regression test for #31 def test_forgot_password_invalid_user(self): + if self.use_userconnection: + self.skipTest('Password Reset not possible in user mode') r = self.client.post(path=url_for('selfservice.forgot_password'), data={'loginname': '=', 'mail': 'test@example.com'}, follow_redirects=True) dump('forgot_password_submit_invalid_user', r) @@ -181,8 +191,9 @@ class TestSelfservice(UffdTestCase): self.assertEqual(len(PasswordToken.query.all()), 0) def test_token_password(self): - user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') - oldpw = get_ldap_password() + if self.use_userconnection: + self.login() + user = get_user() token = PasswordToken(loginname=user.loginname) db.session.add(token) db.session.commit() @@ -193,13 +204,12 @@ class TestSelfservice(UffdTestCase): data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True) dump('token_password_submit', r) self.assertEqual(r.status_code, 200) - self.assertNotEqual(oldpw, get_ldap_password()) - # TODO: Verify that the new password is actually correct - self.assertEqual(len(PasswordToken.query.all()), 0) + self.assertTrue(ldap.test_user_bind(user.dn, 'newpassword')) def test_token_password_emptydb(self): - user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') - oldpw = get_ldap_password() + if self.use_userconnection: + self.login() + user = get_user() r = self.client.get(path=url_for('selfservice.token_password', token='A'*128), follow_redirects=True) dump('token_password_emptydb', r) self.assertEqual(r.status_code, 200) @@ -209,11 +219,12 @@ class TestSelfservice(UffdTestCase): dump('token_password_emptydb_submit', r) self.assertEqual(r.status_code, 200) self.assertIn(b'Token expired, please try again', r.data) - self.assertEqual(oldpw, get_ldap_password()) + self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword')) def test_token_password_invalid(self): - user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') - oldpw = get_ldap_password() + if self.use_userconnection: + self.login() + user = get_user() token = PasswordToken(loginname=user.loginname) db.session.add(token) db.session.commit() @@ -226,11 +237,12 @@ class TestSelfservice(UffdTestCase): dump('token_password_invalid_submit', r) self.assertEqual(r.status_code, 200) self.assertIn(b'Token expired, please try again', r.data) - self.assertEqual(oldpw, get_ldap_password()) + self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword')) def test_token_password_expired(self): - user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') - oldpw = get_ldap_password() + if self.use_userconnection: + self.login() + user = get_user() token = PasswordToken(loginname=user.loginname, created=(datetime.datetime.now() - datetime.timedelta(days=10))) db.session.add(token) @@ -244,11 +256,12 @@ class TestSelfservice(UffdTestCase): dump('token_password_invalid_expired_submit', r) self.assertEqual(r.status_code, 200) self.assertIn(b'Token expired, please try again', r.data) - self.assertEqual(oldpw, get_ldap_password()) + self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword')) def test_token_password_different_passwords(self): - user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') - oldpw = get_ldap_password() + if self.use_userconnection: + self.login() + user = get_user() token = PasswordToken(loginname=user.loginname) db.session.add(token) db.session.commit() @@ -258,7 +271,12 @@ class TestSelfservice(UffdTestCase): data={'password1': 'newpassword', 'password2': 'differentpassword'}, follow_redirects=True) dump('token_password_different_passwords_submit', r) self.assertEqual(r.status_code, 200) - self.assertEqual(oldpw, get_ldap_password()) + self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword')) + class TestSelfserviceOL(TestSelfservice): use_openldap = True + + +class TestSelfserviceOLUser(TestSelfserviceOL): + use_userconnection = True diff --git a/tests/test_user.py b/tests/test_user.py index 63e429f7..527188bc 100644 --- a/tests/test_user.py +++ b/tests/test_user.py @@ -18,9 +18,6 @@ from utils import dump, UffdTestCase def get_user(): return User.query.get('uid=testuser,ou=users,dc=example,dc=com') -def get_user_password(): - return get_user().pwhash - def get_admin(): return User.query.get('uid=testadmin,ou=users,dc=example,dc=com') @@ -52,6 +49,14 @@ class TestUserModel(UffdTestCase): class TestUserModelOL(TestUserModel): use_openldap = True +class TestUserModelOLUser(TestUserModelOL): + use_userconnection = True + + def setUp(self): + super().setUp() + self.client.post(path=url_for('session.login'), + data={'loginname': 'testadmin', 'password': 'adminpassword'}, follow_redirects=True) + class TestUserViews(UffdTestCase): def setUp(self): super().setUp() @@ -87,10 +92,14 @@ class TestUserViews(UffdTestCase): self.assertEqual(user.displayname, 'New User') self.assertEqual(user.mail, 'newuser@example.com') self.assertTrue(user.uid) + r = self.client.post(path=url_for('session.login'), + data={'loginname': 'newuser', 'password': 'newpassword'}, follow_redirects=True) + dump('login_post', r) + self.assertEqual(r.status_code, 200) + role1 = Role(name='role1') print('test_new', role1.db_members, role1.members, user.roles) self.assertEqual(roles, ['base', 'role1']) - # TODO: check password hash def test_new_invalid_loginname(self): r = self.client.post(path=url_for('user.update'), @@ -134,7 +143,6 @@ class TestUserViews(UffdTestCase): role2.members.add(user) db.session.commit() role1_id = role1.id - oldpw = get_user_password() r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) dump('user_update', r) self.assertEqual(r.status_code, 200) @@ -149,12 +157,11 @@ class TestUserViews(UffdTestCase): self.assertEqual(_user.mail, 'newuser@example.com') self.assertEqual(_user.uid, user.uid) self.assertEqual(_user.loginname, user.loginname) - self.assertEqual(get_user_password(), oldpw) + self.assertTrue(ldap.test_user_bind(_user.dn, 'userpassword')) self.assertEqual(roles, ['base', 'role1']) def test_update_password(self): user = get_user() - oldpw = get_user_password() r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) self.assertEqual(r.status_code, 200) r = self.client.post(path=url_for('user.update', uid=user.uid), @@ -167,12 +174,12 @@ class TestUserViews(UffdTestCase): self.assertEqual(_user.mail, 'newuser@example.com') self.assertEqual(_user.uid, user.uid) self.assertEqual(_user.loginname, user.loginname) - self.assertNotEqual(get_user_password(), oldpw) + self.assertTrue(ldap.test_user_bind(_user.dn, 'newpassword')) + @unittest.skip('See #28') def test_update_invalid_password(self): user = get_user() - oldpw = get_user_password() r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) self.assertEqual(r.status_code, 200) r = self.client.post(path=url_for('user.update', uid=user.uid), @@ -181,14 +188,13 @@ class TestUserViews(UffdTestCase): dump('user_update_password', r) self.assertEqual(r.status_code, 200) _user = get_user() - self.assertEqual(get_user_password(), oldpw) + self.assertTrue(ldap.test_user_bind(_user.dn, 'newpassword')) self.assertEqual(_user.displayname, user.displayname) self.assertEqual(_user.mail, user.mail) self.assertEqual(_user.loginname, user.loginname) def test_update_empty_email(self): user = get_user() - oldpw = get_user_password() r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) self.assertEqual(r.status_code, 200) r = self.client.post(path=url_for('user.update', uid=user.uid), @@ -200,11 +206,10 @@ class TestUserViews(UffdTestCase): self.assertEqual(_user.displayname, user.displayname) self.assertEqual(_user.mail, user.mail) self.assertEqual(_user.loginname, user.loginname) - self.assertEqual(get_user_password(), oldpw) + self.assertTrue(ldap.test_user_bind(_user.dn, 'newpassword')) def test_update_invalid_display_name(self): user = get_user() - oldpw = get_user_password() r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) self.assertEqual(r.status_code, 200) r = self.client.post(path=url_for('user.update', uid=user.uid), @@ -216,7 +221,7 @@ class TestUserViews(UffdTestCase): self.assertEqual(_user.displayname, user.displayname) self.assertEqual(_user.mail, user.mail) self.assertEqual(_user.loginname, user.loginname) - self.assertEqual(get_user_password(), oldpw) + self.assertTrue(ldap.test_user_bind(_user.dn, 'newpassword')) def test_show(self): r = self.client.get(path=url_for('user.show', uid=get_user().uid), follow_redirects=True) @@ -329,6 +334,87 @@ newuser12,newuser12@example.com,{role1.id};{role1.id} class TestUserViewsOL(TestUserViews): use_openldap = True +class TestUserViewsOLUserAsAdmin(TestUserViewsOL): + use_userconnection = True + +class TestUserViewsOLUserAsUser(UffdTestCase): + use_userconnection = True + use_openldap = True + + def setUp(self): + super().setUp() + self.client.post(path=url_for('session.login'), + data={'loginname': 'testuser', 'password': 'userpassword'}, follow_redirects=True) + + def test_view_own(self): + user_ = get_user() + r = self.client.get(path=url_for('user.show', uid=user_.uid), follow_redirects=True) + dump('user_view_own', r) + self.assertEqual(r.status_code, 200) + + def test_view_others(self): + admin = get_admin() + r = self.client.get(path=url_for('user.show', uid=admin.uid), follow_redirects=True) + dump('user_view_others', r) + self.assertEqual(r.status_code, 200) + + def test_view_index(self): + r = self.client.get(path=url_for('user.index'), follow_redirects=True) + dump('user_index', r) + self.assertEqual(r.status_code, 200) + + def test_update_other_user(self): + user_ = get_admin() + db.session.add(Role(name='base')) + role1 = Role(name='role1') + db.session.add(role1) + role2 = Role(name='role2') + db.session.add(role2) + role2.members.add(user_) + db.session.commit() + role1_id = role1.id + r = self.client.get(path=url_for('user.show', uid=user_.uid), follow_redirects=True) + dump('user_update', r) + self.assertEqual(r.status_code, 200) + r = self.client.post(path=url_for('user.update', uid=user_.uid), + data={'loginname': user_.loginname, 'mail': user_.mail, 'displayname': user_.displayname + "12345", + f'role-{role1_id}': '1', 'password': ''}, follow_redirects=True) + dump('user_update_submit', r) + self.assertEqual(r.status_code, 200) + _user = get_admin() + self.assertEqual(_user.displayname, user_.displayname) + self.assertEqual(_user.mail, user_.mail) + self.assertEqual(_user.uid, user_.uid) + self.assertEqual(_user.loginname, user_.loginname) + + def test_new(self): + db.session.add(Role(name='base')) + role1 = Role(name='role1') + db.session.add(role1) + role2 = Role(name='role2') + db.session.add(role2) + db.session.commit() + role1_id = role1.id + r = self.client.get(path=url_for('user.show'), follow_redirects=True) + dump('user_new', r) + self.assertEqual(r.status_code, 200) + self.assertIsNone(User.query.get('uid=newuser,ou=users,dc=example,dc=com')) + r = self.client.post(path=url_for('user.update'), + data={'loginname': 'newuser', 'mail': 'newuser@example.com', 'displayname': 'New User', + f'role-{role1_id}': '1', 'password': 'newpassword'}, follow_redirects=True) + dump('user_new_submit', r) + self.assertEqual(r.status_code, 200) + user = User.query.get('uid=newuser,ou=users,dc=example,dc=com') + self.assertIsNone(user) + + def test_delete(self): + user = get_admin() + r = self.client.get(path=url_for('user.delete', uid=user.uid), follow_redirects=True) + dump('user_delete', r) + self.assertEqual(r.status_code, 200) + self.assertIsNotNone(get_admin()) + + class TestGroupViews(UffdTestCase): def setUp(self): super().setUp() @@ -347,3 +433,6 @@ class TestGroupViews(UffdTestCase): class TestGroupViewsOL(TestGroupViews): use_openldap = True + +class TestGroupViewsOLUser(TestGroupViewsOL): + use_userconnection = True diff --git a/uffd/ldap.py b/uffd/ldap.py index bfa79030..28996ee8 100644 --- a/uffd/ldap.py +++ b/uffd/ldap.py @@ -1,3 +1,6 @@ +import base64 +import hashlib + from flask import current_app, request, abort, session import ldap3 @@ -8,6 +11,36 @@ from ldapalchemy.model import Query from ldapalchemy.core import encode_filter +def check_hashed(password_hash, password): + '''Return if password matches a LDAP-compatible password hash + + :param password_hash: LDAP-compatible password hash (plain password or "{ssha512}...") + :type password_hash: bytes + :param password: Plain, (ideally) utf8-encoded password + :type password: bytes''' + algorithms = { + b'md5': 'MD5', + b'sha': 'SHA1', + b'sha256': 'SHA256', + b'sha384': 'SHA384', + b'sha512': 'SHA512' + } + if not password_hash.startswith(b'{'): + return password_hash == password + algorithm, data = password_hash[1:].split(b'}', 1) + data = base64.b64decode(data) + if algorithm in algorithms: + ctx = hashlib.new(algorithms[algorithm], password) + return data == ctx.digest() + elif algorithm.startswith(b's') and algorithm[1:] in algorithms: + ctx = hashlib.new(algorithms[algorithm[1:]], password) + salt = data[ctx.digest_size:] + ctx.update(salt) + return data == ctx.digest() + salt + else: + raise NotImplementedError() + + class FlaskQuery(Query): def get_or_404(self, dn): res = self.get(dn) @@ -25,25 +58,30 @@ class FlaskQuery(Query): def test_user_bind(bind_dn, bind_pw): try: if current_app.config.get('LDAP_SERVICE_MOCK', False): - # Since we reuse the same conn for all calls to `user_conn()` we - # simulate the password check by rebinding. Note that ldap3's mocking - # implementation just compares the string in the objects's userPassword - # field with the password, no support for hashing or OpenLDAP-style - # password-prefixes ("{PLAIN}..." or "{ssha512}..."). + # Since we reuse the same conn and ldap3's mock only supports plain + # passwords for bind and rebind, we simulate the bind by retrieving + # and checking the password hash ourselves. conn = ldap.get_connection() - if not conn.rebind(bind_dn, bind_pw): + conn.search(bind_dn, search_filter='(objectclass=*)', search_scope=ldap3.BASE, + attributes=ldap3.ALL_ATTRIBUTES) + if not conn.response: + return False + if not conn.response[0]['attributes'].get('userPassword'): return False + return check_hashed(conn.response[0]['attributes']['userPassword'][0], bind_pw.encode()) else: server = ldap3.Server(current_app.config["LDAP_SERVICE_URL"]) conn = connect_and_bind_to_ldap(server, bind_dn, bind_pw) if not conn: return False - except (LDAPBindError, LDAPPasswordIsMandatoryError): + except (LDAPBindError, LDAPPasswordIsMandatoryError, LDAPInvalidDnError): return False conn.search(conn.user, encode_filter(current_app.config["LDAP_USER_SEARCH_FILTER"])) lazy_entries = conn.entries - conn.unbind() + # Do not end the connection when using mock, as it will be reused afterwards + if not current_app.config.get('LDAP_SERVICE_MOCK', False): + conn.unbind() return len(lazy_entries) == 1 -- GitLab