Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Select Git revision
Show changes
Commits on Source (27)
...@@ -16,7 +16,7 @@ db_migrations_updated: ...@@ -16,7 +16,7 @@ db_migrations_updated:
stage: test stage: test
script: script:
- FLASK_APP=uffd flask db upgrade - FLASK_APP=uffd flask db upgrade
- FLASK_APP=uffd flask db migrate 2>&1 | grep -q 'No changes in schema detected' - FLASK_APP=uffd flask db migrate
linter: linter:
stage: test stage: test
......
...@@ -61,6 +61,15 @@ hook-pre-app = exec:FLASK_APP=uffd flask db upgrade ...@@ -61,6 +61,15 @@ hook-pre-app = exec:FLASK_APP=uffd flask db upgrade
tabs. tabs.
## Bind with service account or as user?
Uffd can use a dedicated service account for LDAP operations by setting `LDAP_SERVICE_BIND_DN`.
Or it uses the credentials of the currently logged in user, by not setting `LDAP_SERVICE_BIND_DN`.
If you choose to run with user credentials, some features are not available, like password resets
or self signup, since in both cases, no user credentials can exist.
## OAuth2 Single-Sign-On Provider ## OAuth2 Single-Sign-On Provider
Other services can use uffd as an OAuth2.0-based authentication provider. Other services can use uffd as an OAuth2.0-based authentication provider.
......
Subproject commit 34cdfca289eb4bcb79c4580135ba971c7a2437d4 Subproject commit 7a232d305fda3e261b6f8d3c0958a16f4c2e8d8b
{ {
"entries": [ "entries": [
{ {
"dn": "uid=testuser,ou=users,dc=example,dc=com", "dn": "uid=testuser,ou=users,dc=example,dc=com",
"raw": { "raw": {
"cn": [ "cn": [
"Test User" "Test User"
......
...@@ -3,18 +3,20 @@ import unittest ...@@ -3,18 +3,20 @@ import unittest
from flask import url_for from flask import url_for
from uffd.ldap import ldap # These imports are required, because otherwise we get circular imports?!
from uffd import user from uffd import ldap, user
from uffd.session.views import get_current_user from uffd.session.views import get_current_user
from uffd.selfservice.models import MailToken, PasswordToken from uffd.selfservice.models import MailToken, PasswordToken
from uffd.user.models import User from uffd.user.models import User
from uffd import create_app, db, ldap from uffd import create_app, db
from utils import dump, UffdTestCase from utils import dump, UffdTestCase
def get_ldap_password():
return User.query.get('uid=testuser,ou=users,dc=example,dc=com').pwhash def get_user():
return User.query.get('uid=testuser,ou=users,dc=example,dc=com')
class TestSelfservice(UffdTestCase): class TestSelfservice(UffdTestCase):
def login(self): def login(self):
...@@ -137,6 +139,8 @@ class TestSelfservice(UffdTestCase): ...@@ -137,6 +139,8 @@ class TestSelfservice(UffdTestCase):
self.assertEqual(len(tokens), 0) self.assertEqual(len(tokens), 0)
def test_forgot_password(self): def test_forgot_password(self):
if self.use_userconnection:
self.skipTest('Password Reset is not possible in user mode')
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
r = self.client.get(path=url_for('selfservice.forgot_password')) r = self.client.get(path=url_for('selfservice.forgot_password'))
dump('forgot_password', r) dump('forgot_password', r)
...@@ -150,6 +154,8 @@ class TestSelfservice(UffdTestCase): ...@@ -150,6 +154,8 @@ class TestSelfservice(UffdTestCase):
self.assertIn(token.token, str(self.app.last_mail.get_content())) self.assertIn(token.token, str(self.app.last_mail.get_content()))
def test_forgot_password_wrong_user(self): def test_forgot_password_wrong_user(self):
if self.use_userconnection:
self.skipTest('Password Reset is not possible in user mode')
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
r = self.client.get(path=url_for('selfservice.forgot_password')) r = self.client.get(path=url_for('selfservice.forgot_password'))
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -161,6 +167,8 @@ class TestSelfservice(UffdTestCase): ...@@ -161,6 +167,8 @@ class TestSelfservice(UffdTestCase):
self.assertEqual(len(PasswordToken.query.all()), 0) self.assertEqual(len(PasswordToken.query.all()), 0)
def test_forgot_password_wrong_email(self): def test_forgot_password_wrong_email(self):
if self.use_userconnection:
self.skipTest('Password Reset is not possible in user mode')
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') user = User.query.get('uid=testuser,ou=users,dc=example,dc=com')
r = self.client.get(path=url_for('selfservice.forgot_password'), follow_redirects=True) r = self.client.get(path=url_for('selfservice.forgot_password'), follow_redirects=True)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -173,6 +181,8 @@ class TestSelfservice(UffdTestCase): ...@@ -173,6 +181,8 @@ class TestSelfservice(UffdTestCase):
# Regression test for #31 # Regression test for #31
def test_forgot_password_invalid_user(self): def test_forgot_password_invalid_user(self):
if self.use_userconnection:
self.skipTest('Password Reset is not possible in user mode')
r = self.client.post(path=url_for('selfservice.forgot_password'), r = self.client.post(path=url_for('selfservice.forgot_password'),
data={'loginname': '=', 'mail': 'test@example.com'}, follow_redirects=True) data={'loginname': '=', 'mail': 'test@example.com'}, follow_redirects=True)
dump('forgot_password_submit_invalid_user', r) dump('forgot_password_submit_invalid_user', r)
...@@ -181,8 +191,9 @@ class TestSelfservice(UffdTestCase): ...@@ -181,8 +191,9 @@ class TestSelfservice(UffdTestCase):
self.assertEqual(len(PasswordToken.query.all()), 0) self.assertEqual(len(PasswordToken.query.all()), 0)
def test_token_password(self): def test_token_password(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') if self.use_userconnection:
oldpw = get_ldap_password() self.skipTest('Password Token is not possible in user mode')
user = get_user()
token = PasswordToken(loginname=user.loginname) token = PasswordToken(loginname=user.loginname)
db.session.add(token) db.session.add(token)
db.session.commit() db.session.commit()
...@@ -193,13 +204,12 @@ class TestSelfservice(UffdTestCase): ...@@ -193,13 +204,12 @@ class TestSelfservice(UffdTestCase):
data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True) data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True)
dump('token_password_submit', r) dump('token_password_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertNotEqual(oldpw, get_ldap_password()) self.assertTrue(ldap.test_user_bind(user.dn, 'newpassword'))
# TODO: Verify that the new password is actually correct
self.assertEqual(len(PasswordToken.query.all()), 0)
def test_token_password_emptydb(self): def test_token_password_emptydb(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') if self.use_userconnection:
oldpw = get_ldap_password() self.skipTest('Password Token is not possible in user mode')
user = get_user()
r = self.client.get(path=url_for('selfservice.token_password', token='A'*128), follow_redirects=True) r = self.client.get(path=url_for('selfservice.token_password', token='A'*128), follow_redirects=True)
dump('token_password_emptydb', r) dump('token_password_emptydb', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -209,11 +219,12 @@ class TestSelfservice(UffdTestCase): ...@@ -209,11 +219,12 @@ class TestSelfservice(UffdTestCase):
dump('token_password_emptydb_submit', r) dump('token_password_emptydb_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertIn(b'Token expired, please try again', r.data) self.assertIn(b'Token expired, please try again', r.data)
self.assertEqual(oldpw, get_ldap_password()) self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword'))
def test_token_password_invalid(self): def test_token_password_invalid(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') if self.use_userconnection:
oldpw = get_ldap_password() self.skipTest('Password Token is not possible in user mode')
user = get_user()
token = PasswordToken(loginname=user.loginname) token = PasswordToken(loginname=user.loginname)
db.session.add(token) db.session.add(token)
db.session.commit() db.session.commit()
...@@ -226,11 +237,12 @@ class TestSelfservice(UffdTestCase): ...@@ -226,11 +237,12 @@ class TestSelfservice(UffdTestCase):
dump('token_password_invalid_submit', r) dump('token_password_invalid_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertIn(b'Token expired, please try again', r.data) self.assertIn(b'Token expired, please try again', r.data)
self.assertEqual(oldpw, get_ldap_password()) self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword'))
def test_token_password_expired(self): def test_token_password_expired(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') if self.use_userconnection:
oldpw = get_ldap_password() self.skipTest('Password Token is not possible in user mode')
user = get_user()
token = PasswordToken(loginname=user.loginname, token = PasswordToken(loginname=user.loginname,
created=(datetime.datetime.now() - datetime.timedelta(days=10))) created=(datetime.datetime.now() - datetime.timedelta(days=10)))
db.session.add(token) db.session.add(token)
...@@ -244,11 +256,12 @@ class TestSelfservice(UffdTestCase): ...@@ -244,11 +256,12 @@ class TestSelfservice(UffdTestCase):
dump('token_password_invalid_expired_submit', r) dump('token_password_invalid_expired_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertIn(b'Token expired, please try again', r.data) self.assertIn(b'Token expired, please try again', r.data)
self.assertEqual(oldpw, get_ldap_password()) self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword'))
def test_token_password_different_passwords(self): def test_token_password_different_passwords(self):
user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') if self.use_userconnection:
oldpw = get_ldap_password() self.skipTest('Password Token is not possible in user mode')
user = get_user()
token = PasswordToken(loginname=user.loginname) token = PasswordToken(loginname=user.loginname)
db.session.add(token) db.session.add(token)
db.session.commit() db.session.commit()
...@@ -258,7 +271,12 @@ class TestSelfservice(UffdTestCase): ...@@ -258,7 +271,12 @@ class TestSelfservice(UffdTestCase):
data={'password1': 'newpassword', 'password2': 'differentpassword'}, follow_redirects=True) data={'password1': 'newpassword', 'password2': 'differentpassword'}, follow_redirects=True)
dump('token_password_different_passwords_submit', r) dump('token_password_different_passwords_submit', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertEqual(oldpw, get_ldap_password()) self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword'))
class TestSelfserviceOL(TestSelfservice): class TestSelfserviceOL(TestSelfservice):
use_openldap = True use_openldap = True
class TestSelfserviceOLUser(TestSelfserviceOL):
use_userconnection = True
...@@ -135,3 +135,6 @@ class TestSession(UffdTestCase): ...@@ -135,3 +135,6 @@ class TestSession(UffdTestCase):
class TestSessionOL(TestSession): class TestSessionOL(TestSession):
use_openldap = True use_openldap = True
class TestSessionOLUser(TestSessionOL):
use_userconnection = True
import datetime import datetime
import time
import unittest import unittest
from flask import url_for, session from flask import url_for, session
...@@ -7,20 +6,17 @@ from flask import url_for, session ...@@ -7,20 +6,17 @@ from flask import url_for, session
# These imports are required, because otherwise we get circular imports?! # These imports are required, because otherwise we get circular imports?!
from uffd import ldap, user from uffd import ldap, user
from uffd.session.views import get_current_user
from uffd.user.models import User from uffd.user.models import User
from uffd.role.models import Role from uffd.role.models import Role
from uffd.session.views import get_current_user, is_valid_session
from uffd.mfa.models import MFAMethod, MFAType, RecoveryCodeMethod, TOTPMethod, WebauthnMethod, _hotp
from uffd import create_app, db from uffd import create_app, db
from utils import dump, UffdTestCase from utils import dump, UffdTestCase
def get_user(): def get_user():
return User.query.get('uid=testuser,ou=users,dc=example,dc=com') return User.query.get('uid=testuser,ou=users,dc=example,dc=com')
def get_user_password():
return get_user().pwhash
def get_admin(): def get_admin():
return User.query.get('uid=testadmin,ou=users,dc=example,dc=com') return User.query.get('uid=testadmin,ou=users,dc=example,dc=com')
...@@ -52,6 +48,14 @@ class TestUserModel(UffdTestCase): ...@@ -52,6 +48,14 @@ class TestUserModel(UffdTestCase):
class TestUserModelOL(TestUserModel): class TestUserModelOL(TestUserModel):
use_openldap = True use_openldap = True
class TestUserModelOLUser(TestUserModelOL):
use_userconnection = True
def setUp(self):
super().setUp()
self.client.post(path=url_for('session.login'),
data={'loginname': 'testadmin', 'password': 'adminpassword'}, follow_redirects=True)
class TestUserViews(UffdTestCase): class TestUserViews(UffdTestCase):
def setUp(self): def setUp(self):
super().setUp() super().setUp()
...@@ -90,7 +94,8 @@ class TestUserViews(UffdTestCase): ...@@ -90,7 +94,8 @@ class TestUserViews(UffdTestCase):
role1 = Role(name='role1') role1 = Role(name='role1')
print('test_new', role1.db_members, role1.members, user.roles) print('test_new', role1.db_members, role1.members, user.roles)
self.assertEqual(roles, ['base', 'role1']) self.assertEqual(roles, ['base', 'role1'])
# TODO: check password hash # TODO: confirm Mail is send, login not yet possible
#self.assertTrue(ldap.test_user_bind(user.dn, 'newpassword'))
def test_new_invalid_loginname(self): def test_new_invalid_loginname(self):
r = self.client.post(path=url_for('user.update'), r = self.client.post(path=url_for('user.update'),
...@@ -134,7 +139,6 @@ class TestUserViews(UffdTestCase): ...@@ -134,7 +139,6 @@ class TestUserViews(UffdTestCase):
role2.members.add(user) role2.members.add(user)
db.session.commit() db.session.commit()
role1_id = role1.id role1_id = role1.id
oldpw = get_user_password()
r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True)
dump('user_update', r) dump('user_update', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
...@@ -149,12 +153,11 @@ class TestUserViews(UffdTestCase): ...@@ -149,12 +153,11 @@ class TestUserViews(UffdTestCase):
self.assertEqual(_user.mail, 'newuser@example.com') self.assertEqual(_user.mail, 'newuser@example.com')
self.assertEqual(_user.uid, user.uid) self.assertEqual(_user.uid, user.uid)
self.assertEqual(_user.loginname, user.loginname) self.assertEqual(_user.loginname, user.loginname)
self.assertEqual(get_user_password(), oldpw) self.assertTrue(ldap.test_user_bind(user.dn, 'userpassword'))
self.assertEqual(roles, ['base', 'role1']) self.assertEqual(roles, ['base', 'role1'])
def test_update_password(self): def test_update_password(self):
user = get_user() user = get_user()
oldpw = get_user_password()
r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', uid=user.uid), r = self.client.post(path=url_for('user.update', uid=user.uid),
...@@ -167,12 +170,12 @@ class TestUserViews(UffdTestCase): ...@@ -167,12 +170,12 @@ class TestUserViews(UffdTestCase):
self.assertEqual(_user.mail, 'newuser@example.com') self.assertEqual(_user.mail, 'newuser@example.com')
self.assertEqual(_user.uid, user.uid) self.assertEqual(_user.uid, user.uid)
self.assertEqual(_user.loginname, user.loginname) self.assertEqual(_user.loginname, user.loginname)
self.assertNotEqual(get_user_password(), oldpw) self.assertTrue(ldap.test_user_bind(_user.dn, 'newpassword'))
@unittest.skip('See #28') @unittest.skip('See #28')
def test_update_invalid_password(self): def test_update_invalid_password(self):
user = get_user() user = get_user()
oldpw = get_user_password()
r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', uid=user.uid), r = self.client.post(path=url_for('user.update', uid=user.uid),
...@@ -181,14 +184,13 @@ class TestUserViews(UffdTestCase): ...@@ -181,14 +184,13 @@ class TestUserViews(UffdTestCase):
dump('user_update_password', r) dump('user_update_password', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
_user = get_user() _user = get_user()
self.assertEqual(get_user_password(), oldpw) self.assertFalse(ldap.test_user_bind(_user.dn, 'A'))
self.assertEqual(_user.displayname, user.displayname) self.assertEqual(_user.displayname, user.displayname)
self.assertEqual(_user.mail, user.mail) self.assertEqual(_user.mail, user.mail)
self.assertEqual(_user.loginname, user.loginname) self.assertEqual(_user.loginname, user.loginname)
def test_update_empty_email(self): def test_update_empty_email(self):
user = get_user() user = get_user()
oldpw = get_user_password()
r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', uid=user.uid), r = self.client.post(path=url_for('user.update', uid=user.uid),
...@@ -200,11 +202,10 @@ class TestUserViews(UffdTestCase): ...@@ -200,11 +202,10 @@ class TestUserViews(UffdTestCase):
self.assertEqual(_user.displayname, user.displayname) self.assertEqual(_user.displayname, user.displayname)
self.assertEqual(_user.mail, user.mail) self.assertEqual(_user.mail, user.mail)
self.assertEqual(_user.loginname, user.loginname) self.assertEqual(_user.loginname, user.loginname)
self.assertEqual(get_user_password(), oldpw) self.assertFalse(ldap.test_user_bind(_user.dn, 'newpassword'))
def test_update_invalid_display_name(self): def test_update_invalid_display_name(self):
user = get_user() user = get_user()
oldpw = get_user_password()
r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True) r = self.client.get(path=url_for('user.show', uid=user.uid), follow_redirects=True)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', uid=user.uid), r = self.client.post(path=url_for('user.update', uid=user.uid),
...@@ -216,7 +217,7 @@ class TestUserViews(UffdTestCase): ...@@ -216,7 +217,7 @@ class TestUserViews(UffdTestCase):
self.assertEqual(_user.displayname, user.displayname) self.assertEqual(_user.displayname, user.displayname)
self.assertEqual(_user.mail, user.mail) self.assertEqual(_user.mail, user.mail)
self.assertEqual(_user.loginname, user.loginname) self.assertEqual(_user.loginname, user.loginname)
self.assertEqual(get_user_password(), oldpw) self.assertFalse(ldap.test_user_bind(_user.dn, 'newpassword'))
def test_show(self): def test_show(self):
r = self.client.get(path=url_for('user.show', uid=get_user().uid), follow_redirects=True) r = self.client.get(path=url_for('user.show', uid=get_user().uid), follow_redirects=True)
...@@ -258,77 +259,158 @@ newuser12,newuser12@example.com,{role1.id};{role1.id} ...@@ -258,77 +259,158 @@ newuser12,newuser12@example.com,{role1.id};{role1.id}
dump('user_csvimport', r) dump('user_csvimport', r)
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
user = User.query.get('uid=newuser1,ou=users,dc=example,dc=com') user = User.query.get('uid=newuser1,ou=users,dc=example,dc=com')
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user) self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser1') self.assertEqual(user.loginname, 'newuser1')
self.assertEqual(user.displayname, 'newuser1') self.assertEqual(user.displayname, 'newuser1')
self.assertEqual(user.mail, 'newuser1@example.com') self.assertEqual(user.mail, 'newuser1@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['base']) self.assertEqual(roles, ['base'])
user = User.query.get('uid=newuser2,ou=users,dc=example,dc=com') user = User.query.get('uid=newuser2,ou=users,dc=example,dc=com')
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user) self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser2') self.assertEqual(user.loginname, 'newuser2')
self.assertEqual(user.displayname, 'newuser2') self.assertEqual(user.displayname, 'newuser2')
self.assertEqual(user.mail, 'newuser2@example.com') self.assertEqual(user.mail, 'newuser2@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['base', 'role1']) self.assertEqual(roles, ['base', 'role1'])
user = User.query.get('uid=newuser3,ou=users,dc=example,dc=com') user = User.query.get('uid=newuser3,ou=users,dc=example,dc=com')
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user) self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser3') self.assertEqual(user.loginname, 'newuser3')
self.assertEqual(user.displayname, 'newuser3') self.assertEqual(user.displayname, 'newuser3')
self.assertEqual(user.mail, 'newuser3@example.com') self.assertEqual(user.mail, 'newuser3@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['base', 'role1', 'role2']) self.assertEqual(roles, ['base', 'role1', 'role2'])
user = User.query.get('uid=newuser4,ou=users,dc=example,dc=com') user = User.query.get('uid=newuser4,ou=users,dc=example,dc=com')
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user) self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser4') self.assertEqual(user.loginname, 'newuser4')
self.assertEqual(user.displayname, 'newuser4') self.assertEqual(user.displayname, 'newuser4')
self.assertEqual(user.mail, 'newuser4@example.com') self.assertEqual(user.mail, 'newuser4@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['base']) self.assertEqual(roles, ['base'])
user = User.query.get('uid=newuser5,ou=users,dc=example,dc=com') user = User.query.get('uid=newuser5,ou=users,dc=example,dc=com')
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user) self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser5') self.assertEqual(user.loginname, 'newuser5')
self.assertEqual(user.displayname, 'newuser5') self.assertEqual(user.displayname, 'newuser5')
self.assertEqual(user.mail, 'newuser5@example.com') self.assertEqual(user.mail, 'newuser5@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['base']) self.assertEqual(roles, ['base'])
user = User.query.get('uid=newuser6,ou=users,dc=example,dc=com') user = User.query.get('uid=newuser6,ou=users,dc=example,dc=com')
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user) self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser6') self.assertEqual(user.loginname, 'newuser6')
self.assertEqual(user.displayname, 'newuser6') self.assertEqual(user.displayname, 'newuser6')
self.assertEqual(user.mail, 'newuser6@example.com') self.assertEqual(user.mail, 'newuser6@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['base', 'role1', 'role2']) self.assertEqual(roles, ['base', 'role1', 'role2'])
self.assertIsNone(User.query.get('uid=newuser7,ou=users,dc=example,dc=com')) self.assertIsNone(User.query.get('uid=newuser7,ou=users,dc=example,dc=com'))
self.assertIsNone(User.query.get('uid=newuser8,ou=users,dc=example,dc=com')) self.assertIsNone(User.query.get('uid=newuser8,ou=users,dc=example,dc=com'))
self.assertIsNone(User.query.get('uid=newuser9,ou=users,dc=example,dc=com')) self.assertIsNone(User.query.get('uid=newuser9,ou=users,dc=example,dc=com'))
user = User.query.get('uid=newuser10,ou=users,dc=example,dc=com') user = User.query.get('uid=newuser10,ou=users,dc=example,dc=com')
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user) self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser10') self.assertEqual(user.loginname, 'newuser10')
self.assertEqual(user.displayname, 'newuser10') self.assertEqual(user.displayname, 'newuser10')
self.assertEqual(user.mail, 'newuser10@example.com') self.assertEqual(user.mail, 'newuser10@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['base']) self.assertEqual(roles, ['base'])
user = User.query.get('uid=newuser11,ou=users,dc=example,dc=com') user = User.query.get('uid=newuser11,ou=users,dc=example,dc=com')
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user) self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser11') self.assertEqual(user.loginname, 'newuser11')
self.assertEqual(user.displayname, 'newuser11') self.assertEqual(user.displayname, 'newuser11')
self.assertEqual(user.mail, 'newuser11@example.com') self.assertEqual(user.mail, 'newuser11@example.com')
# Currently the csv import is not very robust, imho newuser11 should have role1 and role2! # Currently the csv import is not very robust, imho newuser11 should have role1 and role2!
roles = sorted([r.name for r in user.roles])
#self.assertEqual(roles, ['base', 'role1', 'role2']) #self.assertEqual(roles, ['base', 'role1', 'role2'])
self.assertEqual(roles, ['base', 'role2']) self.assertEqual(roles, ['base', 'role2'])
user = User.query.get('uid=newuser12,ou=users,dc=example,dc=com') user = User.query.get('uid=newuser12,ou=users,dc=example,dc=com')
roles = sorted([r.name for r in user.roles])
self.assertIsNotNone(user) self.assertIsNotNone(user)
self.assertEqual(user.loginname, 'newuser12') self.assertEqual(user.loginname, 'newuser12')
self.assertEqual(user.displayname, 'newuser12') self.assertEqual(user.displayname, 'newuser12')
self.assertEqual(user.mail, 'newuser12@example.com') self.assertEqual(user.mail, 'newuser12@example.com')
roles = sorted([r.name for r in user.roles])
self.assertEqual(roles, ['base', 'role1']) self.assertEqual(roles, ['base', 'role1'])
class TestUserViewsOL(TestUserViews): class TestUserViewsOL(TestUserViews):
use_openldap = True use_openldap = True
class TestUserViewsOLUserAsAdmin(TestUserViewsOL):
use_userconnection = True
class TestUserViewsOLUserAsUser(UffdTestCase):
use_userconnection = True
use_openldap = True
def setUp(self):
super().setUp()
self.client.post(path=url_for('session.login'),
data={'loginname': 'testuser', 'password': 'userpassword'}, follow_redirects=True)
def test_view_own(self):
user_ = get_user()
r = self.client.get(path=url_for('user.show', uid=user_.uid), follow_redirects=True)
dump('user_view_own', r)
self.assertEqual(r.status_code, 200)
def test_view_others(self):
admin = get_admin()
r = self.client.get(path=url_for('user.show', uid=admin.uid), follow_redirects=True)
dump('user_view_others', r)
self.assertEqual(r.status_code, 200)
def test_view_index(self):
r = self.client.get(path=url_for('user.index'), follow_redirects=True)
dump('user_index', r)
self.assertEqual(r.status_code, 200)
def test_update_other_user(self):
user_ = get_admin()
db.session.add(Role(name='base'))
role1 = Role(name='role1')
db.session.add(role1)
role2 = Role(name='role2')
db.session.add(role2)
role2.members.add(user_)
db.session.commit()
role1_id = role1.id
r = self.client.get(path=url_for('user.show', uid=user_.uid), follow_redirects=True)
dump('user_update', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('user.update', uid=user_.uid),
data={'loginname': user_.loginname, 'mail': user_.mail, 'displayname': user_.displayname + "12345",
f'role-{role1_id}': '1', 'password': ''}, follow_redirects=True)
dump('user_update_submit', r)
self.assertEqual(r.status_code, 200)
_user = get_admin()
self.assertEqual(_user.displayname, user_.displayname)
self.assertEqual(_user.mail, user_.mail)
self.assertEqual(_user.uid, user_.uid)
self.assertEqual(_user.loginname, user_.loginname)
def test_new(self):
db.session.add(Role(name='base'))
role1 = Role(name='role1')
db.session.add(role1)
role2 = Role(name='role2')
db.session.add(role2)
db.session.commit()
role1_id = role1.id
r = self.client.get(path=url_for('user.show'), follow_redirects=True)
dump('user_new', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(User.query.get('uid=newuser,ou=users,dc=example,dc=com'))
r = self.client.post(path=url_for('user.update'),
data={'loginname': 'newuser', 'mail': 'newuser@example.com', 'displayname': 'New User',
f'role-{role1_id}': '1', 'password': 'newpassword'}, follow_redirects=True)
dump('user_new_submit', r)
self.assertEqual(r.status_code, 200)
user = User.query.get('uid=newuser,ou=users,dc=example,dc=com')
self.assertIsNone(user)
def test_delete(self):
user = get_admin()
r = self.client.get(path=url_for('user.delete', uid=user.uid), follow_redirects=True)
dump('user_delete', r)
self.assertEqual(r.status_code, 200)
self.assertIsNotNone(get_admin())
class TestGroupViews(UffdTestCase): class TestGroupViews(UffdTestCase):
def setUp(self): def setUp(self):
super().setUp() super().setUp()
...@@ -347,3 +429,6 @@ class TestGroupViews(UffdTestCase): ...@@ -347,3 +429,6 @@ class TestGroupViews(UffdTestCase):
class TestGroupViewsOL(TestGroupViews): class TestGroupViewsOL(TestGroupViews):
use_openldap = True use_openldap = True
class TestGroupViewsOLUser(TestGroupViewsOL):
use_userconnection = True
...@@ -15,7 +15,7 @@ def dump(basename, resp): ...@@ -15,7 +15,7 @@ def dump(basename, resp):
return return
os.makedirs(root, exist_ok=True) os.makedirs(root, exist_ok=True)
path = os.path.join(root, basename+suffix) path = os.path.join(root, basename+suffix)
with open(path, 'xb') as f: with open(path, 'wb') as f:
f.write(resp.data) f.write(resp.data)
def db_flush(): def db_flush():
...@@ -25,6 +25,7 @@ def db_flush(): ...@@ -25,6 +25,7 @@ def db_flush():
class UffdTestCase(unittest.TestCase): class UffdTestCase(unittest.TestCase):
use_openldap = False use_openldap = False
use_userconnection = False
def setUp(self): def setUp(self):
self.dir = tempfile.mkdtemp() self.dir = tempfile.mkdtemp()
...@@ -43,7 +44,10 @@ class UffdTestCase(unittest.TestCase): ...@@ -43,7 +44,10 @@ class UffdTestCase(unittest.TestCase):
self.skipTest('OPENLDAP_TESTING not set') self.skipTest('OPENLDAP_TESTING not set')
config['LDAP_SERVICE_MOCK'] = False config['LDAP_SERVICE_MOCK'] = False
config['LDAP_SERVICE_URL'] = 'ldap://localhost' config['LDAP_SERVICE_URL'] = 'ldap://localhost'
config['LDAP_SERVICE_BIND_DN'] = 'cn=uffd,ou=system,dc=example,dc=com' if self.use_userconnection:
config['LDAP_SERVICE_BIND_DN'] = None
else:
config['LDAP_SERVICE_BIND_DN'] = 'cn=uffd,ou=system,dc=example,dc=com'
config['LDAP_SERVICE_BIND_PASSWORD'] = 'uffd-ldap-password' config['LDAP_SERVICE_BIND_PASSWORD'] = 'uffd-ldap-password'
os.system("ldapdelete -c -D 'cn=uffd,ou=system,dc=example,dc=com' -w 'uffd-ldap-password' -H 'ldap://localhost' -f ldap_server_entries_cleanup.ldif > /dev/null 2>&1") os.system("ldapdelete -c -D 'cn=uffd,ou=system,dc=example,dc=com' -w 'uffd-ldap-password' -H 'ldap://localhost' -f ldap_server_entries_cleanup.ldif > /dev/null 2>&1")
os.system("ldapadd -c -D 'cn=uffd,ou=system,dc=example,dc=com' -w 'uffd-ldap-password' -H 'ldap://localhost' -f ldap_server_entries_add.ldif") os.system("ldapadd -c -D 'cn=uffd,ou=system,dc=example,dc=com' -w 'uffd-ldap-password' -H 'ldap://localhost' -f ldap_server_entries_add.ldif")
......
...@@ -2,7 +2,7 @@ import os ...@@ -2,7 +2,7 @@ import os
import secrets import secrets
import sys import sys
from flask import Flask, redirect, url_for from flask import Flask, redirect, url_for, request
from werkzeug.routing import IntegerConverter from werkzeug.routing import IntegerConverter
from werkzeug.serving import make_ssl_devcert from werkzeug.serving import make_ssl_devcert
from werkzeug.contrib.profiler import ProfilerMiddleware from werkzeug.contrib.profiler import ProfilerMiddleware
...@@ -12,7 +12,7 @@ sys.path.append('deps/ldapalchemy') ...@@ -12,7 +12,7 @@ sys.path.append('deps/ldapalchemy')
# pylint: disable=wrong-import-position # pylint: disable=wrong-import-position
from uffd.database import db, SQLAlchemyJSON from uffd.database import db, SQLAlchemyJSON
from uffd.ldap import ldap import uffd.ldap
from uffd.template_helper import register_template_helper from uffd.template_helper import register_template_helper
from uffd.navbar import setup_navbar from uffd.navbar import setup_navbar
# pylint: enable=wrong-import-position # pylint: enable=wrong-import-position
...@@ -48,21 +48,36 @@ def create_app(test_config=None): # pylint: disable=too-many-locals ...@@ -48,21 +48,36 @@ def create_app(test_config=None): # pylint: disable=too-many-locals
db.init_app(app) db.init_app(app)
Migrate(app, db, render_as_batch=True) Migrate(app, db, render_as_batch=True)
# pylint: disable=C0415 # pylint: disable=C0415
from uffd import user, selfservice, role, mail, session, csrf, mfa, oauth2, services, signup, invite from uffd import user, selfservice, role, mail, session, csrf, mfa, oauth2, services
# pylint: enable=C0415 # pylint: enable=C0415
for i in user.bp + selfservice.bp + role.bp + mail.bp + session.bp + csrf.bp + mfa.bp + oauth2.bp + services.bp + signup.bp + invite.bp: for i in user.bp + selfservice.bp + role.bp + mail.bp + session.bp + csrf.bp + mfa.bp + oauth2.bp + services.bp:
app.register_blueprint(i) app.register_blueprint(i)
if app.config['LDAP_SERVICE_BIND_DN'] or app.config.get('LDAP_SERVICE_MOCK', False):
# pylint: disable=C0415
from uffd import signup, invite
# pylint: enable=C0415
for i in signup.bp + invite.bp:
app.register_blueprint(i)
else:
app.config['ENABLE_PASSWORDRESET'] = False
@app.shell_context_processor @app.shell_context_processor
def push_request_context(): #pylint: disable=unused-variable def push_request_context(): #pylint: disable=unused-variable
app.test_request_context().push() # LDAP ORM requires request context app.test_request_context().push() # LDAP ORM requires request context
return {'db': db, 'ldap': ldap} return {'db': db, 'ldap': uffd.ldap.ldap}
@app.route("/") @app.route("/")
def index(): #pylint: disable=unused-variable def index(): #pylint: disable=unused-variable
return redirect(url_for('selfservice.index')) return redirect(url_for('selfservice.index'))
@app.teardown_request
def close_connection(exception): #pylint: disable=unused-variable,unused-argument
if hasattr(request, "ldap_connection"):
request.ldap_connection.unbind()
@app.cli.command("gendevcert", help='Generates a self-signed TLS certificate for development') @app.cli.command("gendevcert", help='Generates a self-signed TLS certificate for development')
def gendevcert(): #pylint: disable=unused-variable def gendevcert(): #pylint: disable=unused-variable
if os.path.exists('devcert.crt') or os.path.exists('devcert.key'): if os.path.exists('devcert.crt') or os.path.exists('devcert.key'):
......
...@@ -37,6 +37,8 @@ LDAP_MAIL_UID_ATTRIBUTE="uid" ...@@ -37,6 +37,8 @@ LDAP_MAIL_UID_ATTRIBUTE="uid"
LDAP_MAIL_RECEIVERS_ATTRIBUTE="mailacceptinggeneralid" LDAP_MAIL_RECEIVERS_ATTRIBUTE="mailacceptinggeneralid"
LDAP_MAIL_DESTINATIONS_ATTRIBUTE="maildrop" LDAP_MAIL_DESTINATIONS_ATTRIBUTE="maildrop"
# If you do not set the service bind_dn, connections use the user credentials.
# When using a user connection, some features are not available, since they require a service connection
LDAP_SERVICE_BIND_DN="" LDAP_SERVICE_BIND_DN=""
LDAP_SERVICE_BIND_PASSWORD="" LDAP_SERVICE_BIND_PASSWORD=""
LDAP_SERVICE_URL="ldapi:///" LDAP_SERVICE_URL="ldapi:///"
...@@ -60,6 +62,8 @@ MAIL_FROM_ADDRESS='foo@bar.com' ...@@ -60,6 +62,8 @@ MAIL_FROM_ADDRESS='foo@bar.com'
# Do not enable this on a public service! There is no spam protection implemented at the moment. # Do not enable this on a public service! There is no spam protection implemented at the moment.
SELF_SIGNUP=False SELF_SIGNUP=False
# PASSWORDRESET is not available when not using a service connection
ENABLE_PASSWORDRESET=True
LOGINNAME_BLACKLIST=['^admin$', '^root$'] LOGINNAME_BLACKLIST=['^admin$', '^root$']
......
...@@ -15,6 +15,7 @@ from uffd.navbar import register_navbar ...@@ -15,6 +15,7 @@ from uffd.navbar import register_navbar
from uffd.ratelimit import host_ratelimit, format_delay from uffd.ratelimit import host_ratelimit, format_delay
from uffd.signup.views import signup_ratelimit from uffd.signup.views import signup_ratelimit
bp = Blueprint('invite', __name__, template_folder='templates', url_prefix='/invite/') bp = Blueprint('invite', __name__, template_folder='templates', url_prefix='/invite/')
def invite_acl(): def invite_acl():
......
from flask import current_app, request, abort import base64
import hashlib
from flask import current_app, request, abort, session
import ldap3 import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPPasswordIsMandatoryError, LDAPInvalidDnError
from ldapalchemy import LDAPMapper, LDAPCommitError # pylint: disable=unused-import from ldapalchemy import LDAPMapper, LDAPCommitError # pylint: disable=unused-import
from ldapalchemy.model import Query from ldapalchemy.model import Query
from ldapalchemy.core import encode_filter
def check_hashed(password_hash, password):
'''Return if password matches a LDAP-compatible password hash
:param password_hash: LDAP-compatible password hash (plain password or "{ssha512}...")
:type password_hash: bytes
:param password: Plain, (ideally) utf8-encoded password
:type password: bytes'''
algorithms = {
b'md5': 'MD5',
b'sha': 'SHA1',
b'sha256': 'SHA256',
b'sha384': 'SHA384',
b'sha512': 'SHA512'
}
if not password_hash.startswith(b'{'):
return password_hash == password
algorithm, data = password_hash[1:].split(b'}', 1)
data = base64.b64decode(data)
if algorithm in algorithms:
ctx = hashlib.new(algorithms[algorithm], password)
return data == ctx.digest()
if algorithm.startswith(b's') and algorithm[1:] in algorithms:
ctx = hashlib.new(algorithms[algorithm[1:]], password)
salt = data[ctx.digest_size:]
ctx.update(salt)
return data == ctx.digest() + salt
raise NotImplementedError()
class FlaskQuery(Query): class FlaskQuery(Query):
def get_or_404(self, dn): def get_or_404(self, dn):
...@@ -18,9 +53,54 @@ class FlaskQuery(Query): ...@@ -18,9 +53,54 @@ class FlaskQuery(Query):
abort(404) abort(404)
return res return res
def test_user_bind(bind_dn, bind_pw):
try:
if current_app.config.get('LDAP_SERVICE_MOCK', False):
# Since we reuse the same conn and ldap3's mock only supports plain
# passwords for bind and rebind, we simulate the bind by retrieving
# and checking the password hash ourselves.
conn = ldap.get_connection()
conn.search(bind_dn, search_filter='(objectclass=*)', search_scope=ldap3.BASE,
attributes=ldap3.ALL_ATTRIBUTES)
if not conn.response:
return False
if not conn.response[0]['attributes'].get('userPassword'):
return False
return check_hashed(conn.response[0]['attributes']['userPassword'][0], bind_pw.encode())
server = ldap3.Server(current_app.config["LDAP_SERVICE_URL"])
conn = connect_and_bind_to_ldap(server, bind_dn, bind_pw)
if not conn:
return False
except (LDAPBindError, LDAPPasswordIsMandatoryError, LDAPInvalidDnError):
return False
conn.search(conn.user, encode_filter(current_app.config["LDAP_USER_SEARCH_FILTER"]))
lazy_entries = conn.entries
# Do not end the connection when using mock, as it will be reused afterwards
if not current_app.config.get('LDAP_SERVICE_MOCK', False):
conn.unbind()
return len(lazy_entries) == 1
def connect_and_bind_to_ldap(server, bind_dn, bind_pw):
# Using auto_bind cannot close the connection, so define the connection with extra steps
_connection = ldap3.Connection(server, bind_dn, bind_pw)
if _connection.closed:
_connection.open(read_server_info=False)
if current_app.config["LDAP_SERVICE_USE_STARTTLS"]:
_connection.start_tls(read_server_info=False)
if not _connection.bind(read_server_info=True):
_connection.unbind()
raise LDAPBindError
return _connection
class FlaskLDAPMapper(LDAPMapper): class FlaskLDAPMapper(LDAPMapper):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
class Model(self.Model): class Model(self.Model):
query_class = FlaskQuery query_class = FlaskQuery
...@@ -48,9 +128,20 @@ class FlaskLDAPMapper(LDAPMapper): ...@@ -48,9 +128,20 @@ class FlaskLDAPMapper(LDAPMapper):
current_app.ldap_mock.bind() current_app.ldap_mock.bind()
return current_app.ldap_mock return current_app.ldap_mock
server = ldap3.Server(current_app.config["LDAP_SERVICE_URL"], get_info=ldap3.ALL) server = ldap3.Server(current_app.config["LDAP_SERVICE_URL"], get_info=ldap3.ALL)
auto_bind = ldap3.AUTO_BIND_TLS_BEFORE_BIND if current_app.config["LDAP_SERVICE_USE_STARTTLS"] else True
request.ldap_connection = ldap3.Connection(server, current_app.config["LDAP_SERVICE_BIND_DN"], # If the configured LDAP service bind_dn is empty, connect to LDAP as a user
current_app.config["LDAP_SERVICE_BIND_PASSWORD"], auto_bind=auto_bind) if current_app.config['LDAP_SERVICE_BIND_DN']:
bind_dn = current_app.config["LDAP_SERVICE_BIND_DN"]
bind_pw = current_app.config["LDAP_SERVICE_BIND_PASSWORD"]
else:
bind_dn = session['user_dn']
bind_pw = session['user_pw']
try:
request.ldap_connection = connect_and_bind_to_ldap(server, bind_dn, bind_pw)
except (LDAPBindError, LDAPPasswordIsMandatoryError):
return None
return request.ldap_connection return request.ldap_connection
ldap = FlaskLDAPMapper() ldap = FlaskLDAPMapper()
# pylint: disable=invalid-name
navbarList = []
# pylint: enable=invalid-name
def setup_navbar(app): def setup_navbar(app):
app.jinja_env.globals['getnavbar'] = lambda: [n for n in navbarList if n['visible']()] app.navbarList = []
app.jinja_env.globals['getnavbar'] = lambda: [n for n in app.navbarList if n['visible']()]
# iconlib can be 'bootstrap' # iconlib can be 'bootstrap'
# ( see: http://getbootstrap.com/components/#glyphicons ) # ( see: http://getbootstrap.com/components/#glyphicons )
...@@ -12,22 +9,37 @@ def setup_navbar(app): ...@@ -12,22 +9,37 @@ def setup_navbar(app):
# visible is a function that returns "True" if this icon should be visible in the calling context # visible is a function that returns "True" if this icon should be visible in the calling context
def register_navbar(name, iconlib='fa', icon=None, group=None, endpoint=None, blueprint=None, visible=None): def register_navbar(name, iconlib='fa', icon=None, group=None, endpoint=None, blueprint=None, visible=None):
def wrapper(func): def wrapper(func):
urlendpoint = endpoint def deferred_call(state):
if not endpoint: urlendpoint = endpoint
# pylint: disable=protected-access if not endpoint:
if blueprint: # pylint: disable=protected-access
urlendpoint = "{}.{}".format(blueprint.name, func.__name__) if blueprint:
else: urlendpoint = "{}.{}".format(blueprint.name, func.__name__)
urlendpoint = func.__name_ else:
urlendpoint = func.__name_
# pylint: enable=protected-access # pylint: enable=protected-access
item = {} item = {}
item['iconlib'] = iconlib item['iconlib'] = iconlib
item['icon'] = icon item['icon'] = icon
item['group'] = group item['group'] = group
item['endpoint'] = urlendpoint item['endpoint'] = urlendpoint
item['name'] = name item['name'] = name
item['blueprint'] = blueprint item['blueprint'] = blueprint
item['visible'] = visible or (lambda: True) item['visible'] = visible or (lambda: True)
navbarList.append(item)
state.app.navbarList.append(item)
if blueprint:
blueprint.record_once(deferred_call)
else:
class StateMock:
def __init__(self, app):
self.app = app
# pylint: disable=C0415
from flask import current_app
# pylint: enable=C0415
deferred_call(StateMock(current_app))
return func return func
return wrapper return wrapper
...@@ -4,7 +4,7 @@ import smtplib ...@@ -4,7 +4,7 @@ import smtplib
from email.message import EmailMessage from email.message import EmailMessage
import email.utils import email.utils
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session
from uffd.navbar import register_navbar from uffd.navbar import register_navbar
from uffd.csrf import csrf_protect from uffd.csrf import csrf_protect
...@@ -29,6 +29,7 @@ def index(): ...@@ -29,6 +29,7 @@ def index():
@csrf_protect(blueprint=bp) @csrf_protect(blueprint=bp)
@login_required() @login_required()
def update(): def update():
password_changed = False
user = get_current_user() user = get_current_user()
if request.values['displayname'] != user.displayname: if request.values['displayname'] != user.displayname:
if user.set_displayname(request.values['displayname']): if user.set_displayname(request.values['displayname']):
...@@ -41,12 +42,16 @@ def update(): ...@@ -41,12 +42,16 @@ def update():
else: else:
if user.set_password(request.values['password1']): if user.set_password(request.values['password1']):
flash('Password changed.') flash('Password changed.')
password_changed = True
else: else:
flash('Password could not be set.') flash('Password could not be set.')
if request.values['mail'] != user.mail: if request.values['mail'] != user.mail:
send_mail_verification(user.loginname, request.values['mail']) send_mail_verification(user.loginname, request.values['mail'])
flash('We sent you an email, please verify your mail address.') flash('We sent you an email, please verify your mail address.')
ldap.session.commit() ldap.session.commit()
# When using a user_connection, update the connection on password-change
if password_changed and not current_app.config['LDAP_SERVICE_BIND_DN']:
session['user_pw'] = request.values['password1']
return redirect(url_for('selfservice.index')) return redirect(url_for('selfservice.index'))
@bp.route("/passwordreset", methods=(['GET', 'POST'])) @bp.route("/passwordreset", methods=(['GET', 'POST']))
......
...@@ -25,7 +25,9 @@ ...@@ -25,7 +25,9 @@
{% if config['SELF_SIGNUP'] %} {% if config['SELF_SIGNUP'] %}
<a href="{{ url_for("signup.signup_start") }}" class="float-left">Register</a> <a href="{{ url_for("signup.signup_start") }}" class="float-left">Register</a>
{% endif %} {% endif %}
{% if config['ENABLE_PASSWORDRESET'] %}
<a href="{{ url_for("selfservice.forgot_password") }}" class="float-right">Forgot Password?</a> <a href="{{ url_for("selfservice.forgot_password") }}" class="float-right">Forgot Password?</a>
{% endif %}
</div> </div>
</div> </div>
</div> </div>
......
...@@ -4,12 +4,8 @@ import functools ...@@ -4,12 +4,8 @@ import functools
from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session, abort from flask import Blueprint, render_template, request, url_for, redirect, flash, current_app, session, abort
import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPPasswordIsMandatoryError
from ldapalchemy.core import encode_filter
from uffd.user.models import User from uffd.user.models import User
from uffd.ldap import ldap from uffd.ldap import ldap, test_user_bind, LDAPInvalidDnError
from uffd.ratelimit import Ratelimit, host_ratelimit, format_delay from uffd.ratelimit import Ratelimit, host_ratelimit, format_delay
bp = Blueprint("session", __name__, template_folder='templates', url_prefix='/') bp = Blueprint("session", __name__, template_folder='templates', url_prefix='/')
...@@ -18,29 +14,27 @@ login_ratelimit = Ratelimit('login', 1*60, 3) ...@@ -18,29 +14,27 @@ login_ratelimit = Ratelimit('login', 1*60, 3)
def login_get_user(loginname, password): def login_get_user(loginname, password):
dn = User(loginname=loginname).dn dn = User(loginname=loginname).dn
if current_app.config.get('LDAP_SERVICE_MOCK', False):
conn = ldap.get_connection() # If we use a service connection, test user bind seperately
# Since we reuse the same conn for all calls to `user_conn()` we if current_app.config['LDAP_SERVICE_BIND_DN'] or current_app.config.get('LDAP_SERVICE_MOCK', False):
# simulate the password check by rebinding. Note that ldap3's mocking if not test_user_bind(dn, password):
# implementation just compares the string in the objects's userPassword
# field with the password, no support for hashing or OpenLDAP-style
# password-prefixes ("{PLAIN}..." or "{ssha512}...").
try:
if not conn.rebind(dn, password):
return None
except (LDAPBindError, LDAPPasswordIsMandatoryError):
return None return None
# If we use a user connection, just create the connection normally
else: else:
server = ldap3.Server(current_app.config["LDAP_SERVICE_URL"], get_info=ldap3.ALL) # ldap.get_connection gets the credentials from the session, so set it here initially
auto_bind = ldap3.AUTO_BIND_TLS_BEFORE_BIND if current_app.config["LDAP_SERVICE_USE_STARTTLS"] else True session['user_dn'] = dn
try: session['user_pw'] = password
conn = ldap3.Connection(server, dn, password, auto_bind=auto_bind) if not ldap.get_connection():
except (LDAPBindError, LDAPPasswordIsMandatoryError): session.clear()
return None return None
conn.search(conn.user, encode_filter(current_app.config["LDAP_USER_SEARCH_FILTER"]))
if len(conn.entries) != 1: try:
return None user = User.query.get(dn)
return User.query.get(dn) if user:
return user
except LDAPInvalidDnError:
pass
return None
@bp.route("/logout") @bp.route("/logout")
def logout(): def logout():
...@@ -50,9 +44,12 @@ def logout(): ...@@ -50,9 +44,12 @@ def logout():
session.clear() session.clear()
return resp return resp
def set_session(user, skip_mfa=False): def set_session(user, password='', skip_mfa=False):
session.clear() session.clear()
session['user_dn'] = user.dn session['user_dn'] = user.dn
# only save the password if we use a user connection
if password and not current_app.config['LDAP_SERVICE_BIND_DN']:
session['user_pw'] = password
session['logintime'] = datetime.datetime.now().timestamp() session['logintime'] = datetime.datetime.now().timestamp()
session['_csrf_token'] = secrets.token_hex(128) session['_csrf_token'] = secrets.token_hex(128)
if skip_mfa: if skip_mfa:
...@@ -82,7 +79,7 @@ def login(): ...@@ -82,7 +79,7 @@ def login():
if not user.is_in_group(current_app.config['ACL_SELFSERVICE_GROUP']): if not user.is_in_group(current_app.config['ACL_SELFSERVICE_GROUP']):
flash('You do not have access to this service') flash('You do not have access to this service')
return render_template('login.html', ref=request.values.get('ref')) return render_template('login.html', ref=request.values.get('ref'))
set_session(user) set_session(user, password=password)
return redirect(url_for('mfa.auth', ref=request.values.get('ref', url_for('index')))) return redirect(url_for('mfa.auth', ref=request.values.get('ref', url_for('index'))))
def get_current_user(): def get_current_user():
......
...@@ -97,6 +97,6 @@ def signup_confirm_submit(token): ...@@ -97,6 +97,6 @@ def signup_confirm_submit(token):
return render_template('signup/confirm.html', signup=signup, error=msg) return render_template('signup/confirm.html', signup=signup, error=msg)
db.session.commit() db.session.commit()
ldap.session.commit() ldap.session.commit()
set_session(user, skip_mfa=True) set_session(user, password=request.form['password'], skip_mfa=True)
flash('Your account was successfully created') flash('Your account was successfully created')
return redirect(url_for('selfservice.index')) return redirect(url_for('selfservice.index'))
...@@ -47,11 +47,11 @@ def update(uid=None): ...@@ -47,11 +47,11 @@ def update(uid=None):
return redirect(url_for('user.show')) return redirect(url_for('user.show'))
else: else:
user = User.query.filter_by(uid=uid).first_or_404() user = User.query.filter_by(uid=uid).first_or_404()
if not user.set_mail(request.form['mail']): if user.mail != request.form['mail'] and not user.set_mail(request.form['mail']):
flash('Mail is invalid') flash('Mail is invalid')
return redirect(url_for('user.show', uid=uid)) return redirect(url_for('user.show', uid=uid))
new_displayname = request.form['displayname'] if request.form['displayname'] else request.form['loginname'] new_displayname = request.form['displayname'] if request.form['displayname'] else request.form['loginname']
if not user.set_displayname(new_displayname): if user.displayname != new_displayname and not user.set_displayname(new_displayname):
flash('Display name does not meet requirements') flash('Display name does not meet requirements')
return redirect(url_for('user.show', uid=uid)) return redirect(url_for('user.show', uid=uid))
new_password = request.form.get('password') new_password = request.form.get('password')
......