Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Select Git revision
Show changes
Commits on Source (18)
...@@ -31,6 +31,15 @@ Use uwsgi. ...@@ -31,6 +31,15 @@ Use uwsgi.
tabs. tabs.
## Bind with service account or as user?
Uffd can use a dedicated service account for LDAP operations by setting `LDAP_SERVICE_BIND_DN`.
Or it uses the credentials of the currently logged in user, by not setting `LDAP_SERVICE_BIND_DN`.
If you choose to run with user credentials, some features are not available, like password resets
or self signup, since in both cases, no user credentials can exist.
## OAuth2 Single-Sign-On Provider ## OAuth2 Single-Sign-On Provider
Other services can use uffd as an OAuth2.0-based authentication provider. Other services can use uffd as an OAuth2.0-based authentication provider.
......
...@@ -3,18 +3,20 @@ import unittest ...@@ -3,18 +3,20 @@ import unittest
from flask import url_for from flask import url_for
from uffd.ldap import ldap # These imports are required, because otherwise we get circular imports?!
from uffd import user from uffd import ldap, user
from uffd.session.views import get_current_user from uffd.session.views import get_current_user
from uffd.selfservice.models import MailToken, PasswordToken from uffd.selfservice.models import MailToken, PasswordToken
from uffd.user.models import User from uffd.user.models import User
from uffd import create_app, db, ldap from uffd import create_app, db
from utils import dump, UffdTestCase from utils import dump, UffdTestCase
def get_ldap_password():
return User.query.get('uid=testuser,ou=users,dc=example,dc=com').pwhash def get_user():
return User.query.get('uid=testuser,ou=users,dc=example,dc=com')
class TestSelfservice(UffdTestCase): class TestSelfservice(UffdTestCase):
def login(self): def login(self):
...@@ -137,6 +139,8 @@ class TestSelfservice(UffdTestCase): ...@@ -137,6 +139,8 @@ class TestSelfservice(UffdTestCase):
self.assertEqual(len(tokens), 0) self.assertEqual(len(tokens), 0)
def test_forgot_password(self): def test_forgot_password(self):
if self.use_userconnection:
self.skipTest('Password Reset not possible in user mode')
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
r = self.client.get(path=url_for('selfservice.forgot_password')) r = self.client.get(path=url_for('selfservice.forgot_password'))
dump('forgot_password', r) dump('forgot_password', r)
...@@ -150,6 +154,8 @@ class TestSelfservice(UffdTestCase): ...@@ -150,6 +154,8 @@ class TestSelfservice(UffdTestCase):
self.assertIn(token.token, str(self.app.last_mail.get_content())) self.assertIn(token.token, str(self.app.last_mail.get_content()))
def test_forgot_password_wrong_user(self): def test_forgot_password_wrong_user(self):
if self.use_userconnection:
self.skipTest('Password Reset not possible in user mode')
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
r = self.client.get(path=url_for('selfservice.forgot_password')) r = self.client.get(path=url_for('selfservice.forgot_password'))
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -161,6 +167,8 @@ class TestSelfservice(UffdTestCase): ...@@ -161,6 +167,8 @@ class TestSelfservice(UffdTestCase):
self.assertEqual(len(PasswordToken.query.all()), 0) self.assertEqual(len(PasswordToken.query.all()), 0)
def test_forgot_password_wrong_email(self): def test_forgot_password_wrong_email(self):
if self.use_userconnection:
self.skipTest('Password Reset not possible in user mode')
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
r = self.client.get(path=url_for('selfservice.forgot_password'), follow_redirects=True) r = self.client.get(path=url_for('selfservice.forgot_password'), follow_redirects=True)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -173,6 +181,8 @@ class TestSelfservice(UffdTestCase): ...@@ -173,6 +181,8 @@ class TestSelfservice(UffdTestCase):
# Regression test for #31 # Regression test for #31
def test_forgot_password_invalid_user(self): def test_forgot_password_invalid_user(self):
if self.use_userconnection:
self.skipTest('Password Reset not possible in user mode')
r = self.client.post(path=url_for('selfservice.forgot_password'), r = self.client.post(path=url_for('selfservice.forgot_password'),
data={'loginname': '=', 'mail': 'test@example.com'}, follow_redirects=True) data={'loginname': '=', 'mail': 'test@example.com'}, follow_redirects=True)
dump('forgot_password_submit_invalid_user', r) dump('forgot_password_submit_invalid_user', r)
...@@ -181,8 +191,9 @@ class TestSelfservice(UffdTestCase): ...@@ -181,8 +191,9 @@ class TestSelfservice(UffdTestCase):
self.assertEqual(len(PasswordToken.query.all()), 0) self.assertEqual(len(PasswordToken.query.all()), 0)
def test_token_password(self): def test_token_password(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') if self.use_userconnection:
oldpw = get_ldap_password() self.login()
user = get_user()
token = PasswordToken(loginname=user.loginname) token = PasswordToken(loginname=user.loginname)
db.session.add(token) db.session.add(token)
db.session.commit() db.session.commit()
...@@ -193,13 +204,12 @@ class TestSelfservice(UffdTestCase): ...@@ -193,13 +204,12 @@ class TestSelfservice(UffdTestCase):
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True) data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_submit', r) dump('token_password_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertNotEqual(oldpw, get_ldap_password()) self.assertTrue(ldap.test_user_bind(user.dn, 'newpassword'))
# TODO: Verify that the new password is actually correct
self.assertEqual(len(PasswordToken.query.all()), 0)
def test_token_password_emptydb(self): def test_token_password_emptydb(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') if self.use_userconnection:
oldpw = get_ldap_password() self.login()
user = get_user()
r = self.client.get(path=url_for('selfservice.token_password', token='A'*128), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_password', token='A'*128), follow_redirects=True)
dump('token_password_emptydb', r) dump('token_password_emptydb', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -209,11 +219,12 @@ class TestSelfservice(UffdTestCase): ...@@ -209,11 +219,12 @@ class TestSelfservice(UffdTestCase):
dump('token_password_emptydb_submit', r) dump('token_password_emptydb_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertIn(b'Token expired, please try again', r.data) self.assertIn(b'Token expired, please try again', r.data)
self.assertEqual(oldpw, get_ldap_password()) self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword'))
def test_token_password_invalid(self): def test_token_password_invalid(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') if self.use_userconnection:
oldpw = get_ldap_password() self.login()
user = get_user()
token = PasswordToken(loginname=user.loginname) token = PasswordToken(loginname=user.loginname)
db.session.add(token) db.session.add(token)
db.session.commit() db.session.commit()
...@@ -226,11 +237,12 @@ class TestSelfservice(UffdTestCase): ...@@ -226,11 +237,12 @@ class TestSelfservice(UffdTestCase):
dump('token_password_invalid_submit', r) dump('token_password_invalid_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertIn(b'Token expired, please try again', r.data) self.assertIn(b'Token expired, please try again', r.data)
self.assertEqual(oldpw, get_ldap_password()) self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword'))
def test_token_password_expired(self): def test_token_password_expired(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') if self.use_userconnection:
oldpw = get_ldap_password() self.login()
user = get_user()
token = PasswordToken(loginname=user.loginname, token = PasswordToken(loginname=user.loginname,
created=(datetime.datetime.now() - datetime.timedelta(days=10))) created=(datetime.datetime.now() - datetime.timedelta(days=10)))
db.session.add(token) db.session.add(token)
...@@ -244,11 +256,12 @@ class TestSelfservice(UffdTestCase): ...@@ -244,11 +256,12 @@ class TestSelfservice(UffdTestCase):
dump('token_password_invalid_expired_submit', r) dump('token_password_invalid_expired_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertIn(b'Token expired, please try again', r.data) self.assertIn(b'Token expired, please try again', r.data)
self.assertEqual(oldpw, get_ldap_password()) self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword'))
def test_token_password_different_passwords(self): def test_token_password_different_passwords(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') if self.use_userconnection:
oldpw = get_ldap_password() self.login()
user = get_user()
token = PasswordToken(loginname=user.loginname) token = PasswordToken(loginname=user.loginname)
db.session.add(token) db.session.add(token)
db.session.commit() db.session.commit()
...@@ -258,7 +271,12 @@ class TestSelfservice(UffdTestCase): ...@@ -258,7 +271,12 @@ class TestSelfservice(UffdTestCase):
data={'password1': 'newpassword', 'password2': 'differentpassword'}, follow_redirects=True) data={'password1': 'newpassword', 'password2': 'differentpassword'}, follow_redirects=True)
dump('token_password_different_passwords_submit', r) dump('token_password_different_passwords_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertEqual(oldpw, get_ldap_password()) self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword'))
class TestSelfserviceOL(TestSelfservice): class TestSelfserviceOL(TestSelfservice):
use_openldap = True use_openldap = True
class TestSelfserviceOLUser(TestSelfserviceOL):
use_userconnection = True
...@@ -135,3 +135,6 @@ class TestSession(UffdTestCase): ...@@ -135,3 +135,6 @@ class TestSession(UffdTestCase):
class TestSessionOL(TestSession): class TestSessionOL(TestSession):
use_openldap = True use_openldap = True
class TestSessionOLUser(TestSessionOL):
use_userconnection = True
import datetime import datetime
import time
import unittest import unittest
from flask import url_for, session from flask import url_for, session
...@@ -7,20 +6,17 @@ from flask import url_for, session ...@@ -7,20 +6,17 @@ from flask import url_for, session
# These imports are required, because otherwise we get circular imports?! # These imports are required, because otherwise we get circular imports?!
from uffd import ldap, user from uffd import ldap, user
from uffd.session.views import get_current_user
from uffd.user.models import User from uffd.user.models import User
from uffd.role.models import Role from uffd.role.models import Role
from uffd.session.views import get_current_user, is_valid_session
from uffd.mfa.models import MFAMethod, MFAType, RecoveryCodeMethod, TOTPMethod, WebauthnMethod, _hotp
from uffd import create_app, db from uffd import create_app, db
from utils import dump, UffdTestCase from utils import dump, UffdTestCase
def get_user(): def get_user():
return User.query.get('uid=testuser,ou=users,dc=example,dc=com') return User.query.get('uid=testuser,ou=users,dc=example,dc=com')
def get_user_password():
return get_user().pwhash
def get_admin(): def get_admin():
return User.query.get('uid=testadmin,ou=users,dc=example,dc=com') return User.query.get('uid=testadmin,ou=users,dc=example,dc=com')
...@@ -52,6 +48,14 @@ class TestUserModel(UffdTestCase): ...@@ -52,6 +48,14 @@ class TestUserModel(UffdTestCase):
class TestUserModelOL(TestUserModel): class TestUserModelOL(TestUserModel):
use_openldap = True use_openldap = True
class TestUserModelOLUser(TestUserModelOL):
use_userconnection = True
def setUp(self):
super().setUp()
self.client.post(path=url_for('session.login'),
data={'loginname': 'testadmin', 'password': 'adminpassword'}, follow_redirects=True)
class TestUserViews(UffdTestCase): class TestUserViews(UffdTestCase):
def setUp(self): def setUp(self):
super().setUp() super().setUp()
...@@ -90,7 +94,8 @@ class TestUserViews(UffdTestCase): ...@@ -90,7 +94,8 @@ class TestUserViews(UffdTestCase):
role1 = Role(name='role1') role1 = Role(name='role1')
print('test_new', role1.db_members, role1.members, user.roles) print('test_new', role1.db_members, role1.members, user.roles)
self.assertEqual(roles, ['base', 'role1']) self.assertEqual(roles, ['base', 'role1'])
# TODO: check password hash # TODO: confirm Mail is send, login not yet possible
#self.assertTrue(ldap.test_user_bind(user.dn, 'newpassword'))
def test_new_invalid_loginname(self): def test_new_invalid_loginname(self):
r = self.client.post(path=url_for('user.update'), r = self.client.post(path=url_for('user.update'),
...@@ -134,7 +139,6 @@ class TestUserViews(UffdTestCase): ...@@ -134,7 +139,6 @@ class TestUserViews(UffdTestCase):
role2.members.add(user) role2.members.add(user)
db.session.commit() db.session.commit()
role1_id = role1.id role1_id = role1.id
oldpw = get_user_password()
r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True)
dump('user_update', r) dump('user_update', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -149,12 +153,11 @@ class TestUserViews(UffdTestCase): ...@@ -149,12 +153,11 @@ class TestUserViews(UffdTestCase):
self.assertEqual(_user.mail, 'newuser@example.com') self.assertEqual(_user.mail, 'newuser@example.com')
self.assertEqual(_user.uid, user.uid) self.assertEqual(_user.uid, user.uid)
self.assertEqual(_user.loginname, user.loginname) self.assertEqual(_user.loginname, user.loginname)
self.assertEqual(get_user_password(), oldpw) self.assertTrue(ldap.test_user_bind(_user.dn, 'userpassword'))
self.assertEqual(roles, ['base', 'role1']) self.assertEqual(roles, ['base', 'role1'])
def test_update_password(self): def test_update_password(self):
user = get_user() user = get_user()
oldpw = get_user_password()
r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', uid=user.uid), r = self.client.post(path=url_for('user.update', uid=user.uid),
...@@ -167,12 +170,12 @@ class TestUserViews(UffdTestCase): ...@@ -167,12 +170,12 @@ class TestUserViews(UffdTestCase):
self.assertEqual(_user.mail, 'newuser@example.com') self.assertEqual(_user.mail, 'newuser@example.com')
self.assertEqual(_user.uid, user.uid) self.assertEqual(_user.uid, user.uid)
self.assertEqual(_user.loginname, user.loginname) self.assertEqual(_user.loginname, user.loginname)
self.assertNotEqual(get_user_password(), oldpw) self.assertTrue(ldap.test_user_bind(_user.dn, 'newpassword'))
@unittest.skip('See #28') @unittest.skip('See #28')
def test_update_invalid_password(self): def test_update_invalid_password(self):
user = get_user() user = get_user()
oldpw = get_user_password()
r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', uid=user.uid), r = self.client.post(path=url_for('user.update', uid=user.uid),
...@@ -181,14 +184,13 @@ class TestUserViews(UffdTestCase): ...@@ -181,14 +184,13 @@ class TestUserViews(UffdTestCase):
dump('user_update_password', r) dump('user_update_password', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
_user = get_user() _user = get_user()
self.assertEqual(get_user_password(), oldpw) self.assertFalse(ldap.test_user_bind(_user.dn, 'A'))
self.assertEqual(_user.displayname, user.displayname) self.assertEqual(_user.displayname, user.displayname)
self.assertEqual(_user.mail, user.mail) self.assertEqual(_user.mail, user.mail)
self.assertEqual(_user.loginname, user.loginname) self.assertEqual(_user.loginname, user.loginname)
def test_update_empty_email(self): def test_update_empty_email(self):
user = get_user() user = get_user()
oldpw = get_user_password()
r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', uid=user.uid), r = self.client.post(path=url_for('user.update', uid=user.uid),
...@@ -200,11 +202,10 @@ class TestUserViews(UffdTestCase): ...@@ -200,11 +202,10 @@ class TestUserViews(UffdTestCase):
self.assertEqual(_user.displayname, user.displayname) self.assertEqual(_user.displayname, user.displayname)
self.assertEqual(_user.mail, user.mail) self.assertEqual(_user.mail, user.mail)
self.assertEqual(_user.loginname, user.loginname) self.assertEqual(_user.loginname, user.loginname)
self.assertEqual(get_user_password(), oldpw) self.assertFalse(ldap.test_user_bind(_user.dn, 'newpassword'))
def test_update_invalid_display_name(self): def test_update_invalid_display_name(self):
user = get_user() user = get_user()
oldpw = get_user_password()
r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', uid=user.uid), r = self.client.post(path=url_for('user.update', uid=user.uid),
...@@ -216,7 +217,7 @@ class TestUserViews(UffdTestCase): ...@@ -216,7 +217,7 @@ class TestUserViews(UffdTestCase):
self.assertEqual(_user.displayname, user.displayname) self.assertEqual(_user.displayname, user.displayname)
self.assertEqual(_user.mail, user.mail) self.assertEqual(_user.mail, user.mail)
self.assertEqual(_user.loginname, user.loginname) self.assertEqual(_user.loginname, user.loginname)
self.assertEqual(get_user_password(), oldpw) self.assertFalse(ldap.test_user_bind(_user.dn, 'newpassword'))
def test_show(self): def test_show(self):
r = self.client.get(path=url_for('user.show', uid=get_user().uid), follow_redirects=True) r = self.client.get(path=url_for('user.show', uid=get_user().uid), follow_redirects=True)
...@@ -329,6 +330,87 @@ newuser12,newuser12@example.com,{role1.id};{role1.id} ...@@ -329,6 +330,87 @@ newuser12,newuser12@example.com,{role1.id};{role1.id}
class TestUserViewsOL(TestUserViews): class TestUserViewsOL(TestUserViews):
use_openldap = True use_openldap = True
class TestUserViewsOLUserAsAdmin(TestUserViewsOL):
use_userconnection = True
class TestUserViewsOLUserAsUser(UffdTestCase):
use_userconnection = True
use_openldap = True
def setUp(self):
super().setUp()
self.client.post(path=url_for('session.login'),
data={'loginname': 'testuser', 'password': 'userpassword'}, follow_redirects=True)
def test_view_own(self):
user_ = get_user()
r = self.client.get(path=url_for('user.show', uid=user_.uid), follow_redirects=True)
dump('user_view_own', r)
self.assertEqual(r.status_code, 200)
def test_view_others(self):
admin = get_admin()
r = self.client.get(path=url_for('user.show', uid=admin.uid), follow_redirects=True)
dump('user_view_others', r)
self.assertEqual(r.status_code, 200)
def test_view_index(self):
r = self.client.get(path=url_for('user.index'), follow_redirects=True)
dump('user_index', r)
self.assertEqual(r.status_code, 200)
def test_update_other_user(self):
user_ = get_admin()
db.session.add(Role(name='base'))
role1 = Role(name='role1')
db.session.add(role1)
role2 = Role(name='role2')
db.session.add(role2)
role2.members.add(user_)
db.session.commit()
role1_id = role1.id
r = self.client.get(path=url_for('user.show', uid=user_.uid), follow_redirects=True)
dump('user_update', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', uid=user_.uid),
data={'loginname': user_.loginname, 'mail': user_.mail, 'displayname': user_.displayname + "12345",
f'role-{role1_id}': '1', 'password': ''}, follow_redirects=True)
dump('user_update_submit', r)
self.assertEqual(r.status_code, 200)
_user = get_admin()
self.assertEqual(_user.displayname, user_.displayname)
self.assertEqual(_user.mail, user_.mail)
self.assertEqual(_user.uid, user_.uid)
self.assertEqual(_user.loginname, user_.loginname)
def test_new(self):
db.session.add(Role(name='base'))
role1 = Role(name='role1')
db.session.add(role1)
role2 = Role(name='role2')
db.session.add(role2)
db.session.commit()
role1_id = role1.id
r = self.client.get(path=url_for('user.show'), follow_redirects=True)
dump('user_new', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(User.query.get('uid=newuser,ou=users,dc=example,dc=com'))
r = self.client.post(path=url_for('user.update'),
data={'loginname': 'newuser', 'mail': 'newuser@example.com', 'displayname': 'New User',
f'role-{role1_id}': '1', 'password': 'newpassword'}, follow_redirects=True)
dump('user_new_submit', r)
self.assertEqual(r.status_code, 200)
user = User.query.get('uid=newuser,ou=users,dc=example,dc=com')
self.assertIsNone(user)
def test_delete(self):
user = get_admin()
r = self.client.get(path=url_for('user.delete', uid=user.uid), follow_redirects=True)
dump('user_delete', r)
self.assertEqual(r.status_code, 200)
self.assertIsNotNone(get_admin())
class TestGroupViews(UffdTestCase): class TestGroupViews(UffdTestCase):
def setUp(self): def setUp(self):
super().setUp() super().setUp()
...@@ -347,3 +429,6 @@ class TestGroupViews(UffdTestCase): ...@@ -347,3 +429,6 @@ class TestGroupViews(UffdTestCase):
class TestGroupViewsOL(TestGroupViews): class TestGroupViewsOL(TestGroupViews):
use_openldap = True use_openldap = True
class TestGroupViewsOLUser(TestGroupViewsOL):
use_userconnection = True
...@@ -25,6 +25,7 @@ def db_flush(): ...@@ -25,6 +25,7 @@ def db_flush():
class UffdTestCase(unittest.TestCase): class UffdTestCase(unittest.TestCase):
use_openldap = False use_openldap = False
use_userconnection = False
def setUp(self): def setUp(self):
self.dir = tempfile.mkdtemp() self.dir = tempfile.mkdtemp()
...@@ -43,6 +44,9 @@ class UffdTestCase(unittest.TestCase): ...@@ -43,6 +44,9 @@ class UffdTestCase(unittest.TestCase):
self.skipTest('OPENLDAP_TESTING not set') self.skipTest('OPENLDAP_TESTING not set')
config['LDAP_SERVICE_MOCK'] = False config['LDAP_SERVICE_MOCK'] = False
config['LDAP_SERVICE_URL'] = 'ldap://localhost' config['LDAP_SERVICE_URL'] = 'ldap://localhost'
if self.use_userconnection:
config['LDAP_SERVICE_BIND_DN'] = None
else:
config['LDAP_SERVICE_BIND_DN'] = 'cn=uffd,ou=system,dc=example,dc=com' config['LDAP_SERVICE_BIND_DN'] = 'cn=uffd,ou=system,dc=example,dc=com'
config['LDAP_SERVICE_BIND_PASSWORD'] = 'uffd-ldap-password' config['LDAP_SERVICE_BIND_PASSWORD'] = 'uffd-ldap-password'
os.system("ldapdelete -c -D 'cn=uffd,ou=system,dc=example,dc=com' -w 'uffd-ldap-password' -H 'ldap://localhost' -f ldap_server_entries_cleanup.ldif > /dev/null 2>&1") os.system("ldapdelete -c -D 'cn=uffd,ou=system,dc=example,dc=com' -w 'uffd-ldap-password' -H 'ldap://localhost' -f ldap_server_entries_cleanup.ldif > /dev/null 2>&1")
......
...@@ -2,7 +2,7 @@ import os ...@@ -2,7 +2,7 @@ import os
import secrets import secrets
import sys import sys
from flask import Flask, redirect, url_for from flask import Flask, redirect, url_for, request
from werkzeug.routing import IntegerConverter from werkzeug.routing import IntegerConverter
sys.path.append('deps/ldapalchemy') sys.path.append('deps/ldapalchemy')
...@@ -42,17 +42,33 @@ def create_app(test_config=None): # pylint: disable=too-many-locals ...@@ -42,17 +42,33 @@ def create_app(test_config=None): # pylint: disable=too-many-locals
pass pass
db.init_app(app) db.init_app(app)
# pylint: disable=C0415 # pylint: disable=C0415
from uffd import user, selfservice, role, mail, session, csrf, mfa, oauth2, services, signup, invite from uffd import user, selfservice, role, mail, session, csrf, mfa, oauth2, services
# pylint: enable=C0415 # pylint: enable=C0415
for i in user.bp + selfservice.bp + role.bp + mail.bp + session.bp + csrf.bp + mfa.bp + oauth2.bp + services.bp + signup.bp + invite.bp: for i in user.bp + selfservice.bp + role.bp + mail.bp + session.bp + csrf.bp + mfa.bp + oauth2.bp + services.bp:
app.register_blueprint(i)
if app.config['LDAP_SERVICE_BIND_DN'] or app.config.get('LDAP_SERVICE_MOCK', False):
# pylint: disable=C0415
from uffd import signup, invite
# pylint: enable=C0415
for i in signup.bp + invite.bp:
app.register_blueprint(i) app.register_blueprint(i)
else:
app.config['ENABLE_PASSWORDRESET'] = False
@app.route("/") @app.route("/")
def index(): #pylint: disable=unused-variable def index(): #pylint: disable=unused-variable
return redirect(url_for('selfservice.index')) return redirect(url_for('selfservice.index'))
@app.teardown_request
def close_connection(exception): #pylint: disable=unused-variable,unused-argument
if hasattr(request, "ldap_connection"):
request.ldap_connection.unbind()
return app return app
def init_db(app): def init_db(app):
......
...@@ -37,6 +37,8 @@ LDAP_MAIL_UID_ATTRIBUTE="uid" ...@@ -37,6 +37,8 @@ LDAP_MAIL_UID_ATTRIBUTE="uid"
LDAP_MAIL_RECEIVERS_ATTRIBUTE="mailacceptinggeneralid" LDAP_MAIL_RECEIVERS_ATTRIBUTE="mailacceptinggeneralid"
LDAP_MAIL_DESTINATIONS_ATTRIBUTE="maildrop" LDAP_MAIL_DESTINATIONS_ATTRIBUTE="maildrop"
# If you do not set the service bind_dn, connections use the user credentials.
# When using a user connection, some features are not available, since they require a service connection
LDAP_SERVICE_BIND_DN="" LDAP_SERVICE_BIND_DN=""
LDAP_SERVICE_BIND_PASSWORD="" LDAP_SERVICE_BIND_PASSWORD=""
LDAP_SERVICE_URL="ldapi:///" LDAP_SERVICE_URL="ldapi:///"
...@@ -60,6 +62,8 @@ MAIL_FROM_ADDRESS='foo@bar.com' ...@@ -60,6 +62,8 @@ MAIL_FROM_ADDRESS='foo@bar.com'
# Do not enable this on a public service! There is no spam protection implemented at the moment. # Do not enable this on a public service! There is no spam protection implemented at the moment.
SELF_SIGNUP=False SELF_SIGNUP=False
# PASSWORDRESET is not available when not using a service connection
ENABLE_PASSWORDRESET=True
#MFA_ICON_URL = 'https://example.com/logo.png' #MFA_ICON_URL = 'https://example.com/logo.png'
#MFA_RP_ID = 'example.com' # If unset, hostname from current request is used #MFA_RP_ID = 'example.com' # If unset, hostname from current request is used
......
from flask import current_app, request, abort import base64
import hashlib
from flask import current_app, request, abort, session
import ldap3 import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPPasswordIsMandatoryError, LDAPInvalidDnError
from ldapalchemy import LDAPMapper, LDAPCommitError # pylint: disable=unused-import from ldapalchemy import LDAPMapper, LDAPCommitError # pylint: disable=unused-import
from ldapalchemy.model import Query from ldapalchemy.model import Query
from ldapalchemy.core import encode_filter
def check_hashed(password_hash, password):
'''Return if password matches a LDAP-compatible password hash
:param password_hash: LDAP-compatible password hash (plain password or "{ssha512}...")
:type password_hash: bytes
:param password: Plain, (ideally) utf8-encoded password
:type password: bytes'''
algorithms = {
b'md5': 'MD5',
b'sha': 'SHA1',
b'sha256': 'SHA256',
b'sha384': 'SHA384',
b'sha512': 'SHA512'
}
if not password_hash.startswith(b'{'):
return password_hash == password
algorithm, data = password_hash[1:].split(b'}', 1)
data = base64.b64decode(data)
if algorithm in algorithms:
ctx = hashlib.new(algorithms[algorithm], password)
return data == ctx.digest()
elif algorithm.startswith(b's') and algorithm[1:] in algorithms:
ctx = hashlib.new(algorithms[algorithm[1:]], password)
salt = data[ctx.digest_size:]
ctx.update(salt)
return data == ctx.digest() + salt
else:
raise NotImplementedError()
class FlaskQuery(Query): class FlaskQuery(Query):
def get_or_404(self, dn): def get_or_404(self, dn):
...@@ -18,9 +54,54 @@ class FlaskQuery(Query): ...@@ -18,9 +54,54 @@ class FlaskQuery(Query):
abort(404) abort(404)
return res return res
def test_user_bind(bind_dn, bind_pw):
try:
if current_app.config.get('LDAP_SERVICE_MOCK', False):
# Since we reuse the same conn and ldap3's mock only supports plain
# passwords for bind and rebind, we simulate the bind by retrieving
# and checking the password hash ourselves.
conn = ldap.get_connection()
conn.search(bind_dn, search_filter='(objectclass=*)', search_scope=ldap3.BASE,
attributes=ldap3.ALL_ATTRIBUTES)
if not conn.response:
return False
if not conn.response[0]['attributes'].get('userPassword'):
return False
return check_hashed(conn.response[0]['attributes']['userPassword'][0], bind_pw.encode())
else:
server = ldap3.Server(current_app.config["LDAP_SERVICE_URL"])
conn = connect_and_bind_to_ldap(server, bind_dn, bind_pw)
if not conn:
return False
except (LDAPBindError, LDAPPasswordIsMandatoryError, LDAPInvalidDnError):
return False
conn.search(conn.user, encode_filter(current_app.config["LDAP_USER_SEARCH_FILTER"]))
lazy_entries = conn.entries
# Do not end the connection when using mock, as it will be reused afterwards
if not current_app.config.get('LDAP_SERVICE_MOCK', False):
conn.unbind()
return len(lazy_entries) == 1
def connect_and_bind_to_ldap(server, bind_dn, bind_pw):
# Using auto_bind cannot close the connection, so define the connection with extra steps
_connection = ldap3.Connection(server, bind_dn, bind_pw)
if _connection.closed:
_connection.open(read_server_info=False)
if current_app.config["LDAP_SERVICE_USE_STARTTLS"]:
_connection.start_tls(read_server_info=False)
if not _connection.bind(read_server_info=True):
_connection.unbind()
raise LDAPBindError
return _connection
class FlaskLDAPMapper(LDAPMapper): class FlaskLDAPMapper(LDAPMapper):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
class Model(self.Model): class Model(self.Model):
query_class = FlaskQuery query_class = FlaskQuery
...@@ -48,9 +129,20 @@ class FlaskLDAPMapper(LDAPMapper): ...@@ -48,9 +129,20 @@ class FlaskLDAPMapper(LDAPMapper):
current_app.ldap_mock.bind() current_app.ldap_mock.bind()
return current_app.ldap_mock return current_app.ldap_mock
server = ldap3.Server(current_app.config["LDAP_SERVICE_URL"], get_info=ldap3.ALL) server = ldap3.Server(current_app.config["LDAP_SERVICE_URL"], get_info=ldap3.ALL)
auto_bind = ldap3.AUTO_BIND_TLS_BEFORE_BIND if current_app.config["LDAP_SERVICE_USE_STARTTLS"] else True
request.ldap_connection = ldap3.Connection(server, current_app.config["LDAP_SERVICE_BIND_DN"], # If the configured LDAP service bind_dn is empty, connect to LDAP as a user
current_app.config["LDAP_SERVICE_BIND_PASSWORD"], auto_bind=auto_bind) if current_app.config['LDAP_SERVICE_BIND_DN']:
bind_dn = current_app.config["LDAP_SERVICE_BIND_DN"]
bind_pw = current_app.config["LDAP_SERVICE_BIND_PASSWORD"]
else:
bind_dn = session['user_dn']
bind_pw = session['user_pw']
try:
request.ldap_connection = connect_and_bind_to_ldap(server, bind_dn, bind_pw)
except (LDAPBindError, LDAPPasswordIsMandatoryError):
return None
return request.ldap_connection return request.ldap_connection
ldap = FlaskLDAPMapper() ldap = FlaskLDAPMapper()
...@@ -4,7 +4,7 @@ import smtplib ...@@ -4,7 +4,7 @@ import smtplib
from email.message import EmailMessage from email.message import EmailMessage
import email.utils import email.utils
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session
from uffd.navbar import register_navbar from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect from uffd.csrf import csrf_protect
...@@ -29,6 +29,7 @@ def index(): ...@@ -29,6 +29,7 @@ def index():
@csrf_protect(blueprint=bp) @csrf_protect(blueprint=bp)
@login_required() @login_required()
def update(): def update():
password_changed = False
user = get_current_user() user = get_current_user()
if request.values['displayname'] != user.displayname: if request.values['displayname'] != user.displayname:
if user.set_displayname(request.values['displayname']): if user.set_displayname(request.values['displayname']):
...@@ -41,12 +42,16 @@ def update(): ...@@ -41,12 +42,16 @@ def update():
else: else:
if user.set_password(request.values['password1']): if user.set_password(request.values['password1']):
flash('Password changed.') flash('Password changed.')
password_changed = True
else: else:
flash('Password could not be set.') flash('Password could not be set.')
if request.values['mail'] != user.mail: if request.values['mail'] != user.mail:
send_mail_verification(user.loginname, request.values['mail']) send_mail_verification(user.loginname, request.values['mail'])
flash('We sent you an email, please verify your mail address.') flash('We sent you an email, please verify your mail address.')
ldap.session.commit() ldap.session.commit()
# When using a user_connection, update the connection on password-change
if password_changed and not current_app.config['LDAP_SERVICE_BIND_DN']:
session['user_pw'] = request.values['password1']
return redirect(url_for('selfservice.index')) return redirect(url_for('selfservice.index'))
@bp.route("/passwordreset", methods=(['GET', 'POST'])) @bp.route("/passwordreset", methods=(['GET', 'POST']))
......
...@@ -25,7 +25,9 @@ ...@@ -25,7 +25,9 @@
{% if config['SELF_SIGNUP'] %} {% if config['SELF_SIGNUP'] %}
<a href="{{ url_for("signup.signup_start") }}" class="float-left">Register</a> <a href="{{ url_for("signup.signup_start") }}" class="float-left">Register</a>
{% endif %} {% endif %}
{% if config['ENABLE_PASSWORDRESET'] %}
<a href="{{ url_for("selfservice.forgot_password") }}" class="float-right">Forgot Password?</a> <a href="{{ url_for("selfservice.forgot_password") }}" class="float-right">Forgot Password?</a>
{% endif %}
</div> </div>
</div> </div>
</div> </div>
......
...@@ -4,12 +4,8 @@ import functools ...@@ -4,12 +4,8 @@ import functools
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session, abort from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session, abort
import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPPasswordIsMandatoryError
from ldapalchemy.core import encode_filter
from uffd.user.models import User from uffd.user.models import User
from uffd.ldap import ldap from uffd.ldap import ldap, test_user_bind, LDAPInvalidDnError
from uffd.ratelimit import Ratelimit, host_ratelimit, format_delay from uffd.ratelimit import Ratelimit, host_ratelimit, format_delay
bp = Blueprint("session", __name__, template_folder='templates', url_prefix='/') bp = Blueprint("session", __name__, template_folder='templates', url_prefix='/')
...@@ -18,29 +14,26 @@ login_ratelimit = Ratelimit('login', 1*60, 3) ...@@ -18,29 +14,26 @@ login_ratelimit = Ratelimit('login', 1*60, 3)
def login_get_user(loginname, password): def login_get_user(loginname, password):
dn = User(loginname=loginname).dn dn = User(loginname=loginname).dn
if current_app.config.get('LDAP_SERVICE_MOCK', False):
conn = ldap.get_connection() # If we use a service connection, test user bind seperately
# Since we reuse the same conn for all calls to `user_conn()` we if current_app.config['LDAP_SERVICE_BIND_DN'] or current_app.config.get('LDAP_SERVICE_MOCK', False):
# simulate the password check by rebinding. Note that ldap3's mocking if not test_user_bind(dn, password):
# implementation just compares the string in the objects's userPassword
# field with the password, no support for hashing or OpenLDAP-style
# password-prefixes ("{PLAIN}..." or "{ssha512}...").
try:
if not conn.rebind(dn, password):
return None
except (LDAPBindError, LDAPPasswordIsMandatoryError):
return None return None
# If we use a user connection, just create the connection normally
else: else:
server = ldap3.Server(current_app.config["LDAP_SERVICE_URL"], get_info=ldap3.ALL) # ldap.get_connection gets the credentials from the session, so set it here initially
auto_bind = ldap3.AUTO_BIND_TLS_BEFORE_BIND if current_app.config["LDAP_SERVICE_USE_STARTTLS"] else True session['user_dn'] = dn
try: session['user_pw'] = password
conn = ldap3.Connection(server, dn, password, auto_bind=auto_bind) if not ldap.get_connection():
except (LDAPBindError, LDAPPasswordIsMandatoryError): session.clear()
return None return None
conn.search(conn.user, encode_filter(current_app.config["LDAP_USER_SEARCH_FILTER"]))
if len(conn.entries) != 1: try:
user = User.query.get(dn)
if user:
return user
except LDAPInvalidDnError:
return None return None
return User.query.get(dn)
@bp.route("/logout") @bp.route("/logout")
def logout(): def logout():
...@@ -50,9 +43,12 @@ def logout(): ...@@ -50,9 +43,12 @@ def logout():
session.clear() session.clear()
return resp return resp
def set_session(user, skip_mfa=False): def set_session(user, password='', skip_mfa=False):
session.clear() session.clear()
session['user_dn'] = user.dn session['user_dn'] = user.dn
# only save the password if we use a user connection
if password and not current_app.config['LDAP_SERVICE_BIND_DN']:
session['user_pw'] = password
session['logintime'] = datetime.datetime.now().timestamp() session['logintime'] = datetime.datetime.now().timestamp()
session['_csrf_token'] = secrets.token_hex(128) session['_csrf_token'] = secrets.token_hex(128)
if skip_mfa: if skip_mfa:
...@@ -82,7 +78,7 @@ def login(): ...@@ -82,7 +78,7 @@ def login():
if not user.is_in_group(current_app.config['ACL_SELFSERVICE_GROUP']): if not user.is_in_group(current_app.config['ACL_SELFSERVICE_GROUP']):
flash('You do not have access to this service') flash('You do not have access to this service')
return render_template('login.html', ref=request.values.get('ref')) return render_template('login.html', ref=request.values.get('ref'))
set_session(user) set_session(user, password=password)
return redirect(url_for('mfa.auth', ref=request.values.get('ref', url_for('index')))) return redirect(url_for('mfa.auth', ref=request.values.get('ref', url_for('index'))))
def get_current_user(): def get_current_user():
......
...@@ -97,6 +97,6 @@ def signup_confirm_submit(token): ...@@ -97,6 +97,6 @@ def signup_confirm_submit(token):
return render_template('signup/confirm.html', signup=signup, error=msg) return render_template('signup/confirm.html', signup=signup, error=msg)
db.session.commit() db.session.commit()
ldap.session.commit() ldap.session.commit()
set_session(user, skip_mfa=True) set_session(user, password=request.form['password'], skip_mfa=True)
flash('Your account was successfully created') flash('Your account was successfully created')
return redirect(url_for('selfservice.index')) return redirect(url_for('selfservice.index'))