diff --git a/deps/ldapalchemy b/deps/ldapalchemy index 2358086f5b8184fe89bbb69334a5c80e9b20cfbf..40ee661e418dd7866b9dc539fa6544cb12f9cd70 160000 --- a/deps/ldapalchemy +++ b/deps/ldapalchemy @@ -1 +1 @@ -Subproject commit 2358086f5b8184fe89bbb69334a5c80e9b20cfbf +Subproject commit 40ee661e418dd7866b9dc539fa6544cb12f9cd70 diff --git a/tests/test_selfservice.py b/tests/test_selfservice.py index 851f2d7ba2a40dede9140c146337bf174d5ff588..13b15cf02f732b4118c711ef5460273339c5cf2d 100644 --- a/tests/test_selfservice.py +++ b/tests/test_selfservice.py @@ -17,9 +17,6 @@ def get_ldap_password(): return User.query.get('uid=testuser,ou=users,dc=example,dc=com').pwhash class TestSelfservice(UffdTestCase): - def setUpApp(self): - self.app.config['MAIL_SKIP_SEND'] = True - def login(self): self.client.post(path=url_for('session.login'), data={'loginname': 'testuser', 'password': 'userpassword'}, follow_redirects=True) diff --git a/tests/test_signup.py b/tests/test_signup.py new file mode 100644 index 0000000000000000000000000000000000000000..501e8896db3f63cc05c80bae0be5270f1f5e6378 --- /dev/null +++ b/tests/test_signup.py @@ -0,0 +1,444 @@ +import unittest +import datetime +import time + +from flask import url_for, session + +# These imports are required, because otherwise we get circular imports?! +from uffd import user +from uffd.ldap import ldap + +from uffd import create_app, db +from uffd.signup.models import Signup +from uffd.user.models import User +from uffd.session.views import get_current_user, is_valid_session, login_get_user + +from utils import dump, UffdTestCase, db_flush + +def refetch_signup(signup): + db.session.add(signup) + db.session.commit() + token = signup.token + db_flush() + return Signup.query.get(token) + +# We assume in all tests that Signup.validate and Signup.check_password do +# not alter any state + +class TestSignupModel(UffdTestCase): + def assert_validate_valid(self, signup): + valid, msg = signup.validate() + self.assertTrue(valid) + self.assertIsInstance(msg, str) + + def assert_validate_invalid(self, signup): + valid, msg = signup.validate() + self.assertFalse(valid) + self.assertIsInstance(msg, str) + self.assertNotEqual(msg, '') + + def assert_finish_success(self, signup, password): + self.assertIsNone(signup.user) + user, msg = signup.finish(password) + ldap.session.commit() + self.assertIsNotNone(user) + self.assertIsInstance(msg, str) + self.assertIsNotNone(signup.user) + + def assert_finish_failure(self, signup, password): + prev_dn = signup.user_dn + user, msg = signup.finish(password) + self.assertIsNone(user) + self.assertIsInstance(msg, str) + self.assertNotEqual(msg, '') + self.assertEqual(signup.user_dn, prev_dn) + + def test_password(self): + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com') + self.assertFalse(signup.check_password('notsecret')) + self.assertFalse(signup.check_password('')) + self.assertFalse(signup.check_password('wrongpassword')) + signup.password = 'notsecret' + self.assertTrue(signup.check_password('notsecret')) + self.assertFalse(signup.check_password('wrongpassword')) + + def test_expired(self): + # TODO: Find a better way to test this! + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + self.assertFalse(signup.expired) + signup.created = created=datetime.datetime.now() - datetime.timedelta(hours=49) + self.assertTrue(signup.expired) + + def test_completed(self): + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + self.assertFalse(signup.completed) + signup.finish('notsecret') + ldap.session.commit() + self.assertTrue(signup.completed) + signup = refetch_signup(signup) + self.assertTrue(signup.completed) + + def test_validate(self): + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + self.assert_validate_valid(signup) + self.assert_validate_valid(refetch_signup(signup)) + + def test_validate_completed(self): + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + self.assert_finish_success(signup, 'notsecret') + self.assert_validate_invalid(signup) + self.assert_validate_invalid(refetch_signup(signup)) + + def test_validate_expired(self): + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', + password='notsecret', created=datetime.datetime.now()-datetime.timedelta(hours=49)) + self.assert_validate_invalid(signup) + self.assert_validate_invalid(refetch_signup(signup)) + + def test_validate_loginname(self): + signup = Signup(loginname='', displayname='New User', mail='test@example.com', password='notsecret') + self.assert_validate_invalid(signup) + self.assert_validate_invalid(refetch_signup(signup)) + + def test_validate_displayname(self): + signup = Signup(loginname='newuser', displayname='', mail='test@example.com', password='notsecret') + self.assert_validate_invalid(signup) + self.assert_validate_invalid(refetch_signup(signup)) + + def test_validate_mail(self): + signup = Signup(loginname='newuser', displayname='New User', mail='', password='notsecret') + self.assert_validate_invalid(signup) + self.assert_validate_invalid(refetch_signup(signup)) + + def test_validate_password(self): + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='') + self.assert_validate_invalid(signup) + self.assert_validate_invalid(refetch_signup(signup)) + + def test_validate_exists(self): + signup = Signup(loginname='testuser', displayname='New User', mail='test@example.com', password='notsecret') + self.assert_validate_invalid(signup) + self.assert_validate_invalid(refetch_signup(signup)) + + def test_finish(self): + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + if self.use_openldap: + self.assertIsNone(login_get_user('newuser', 'notsecret')) + self.assert_finish_success(signup, 'notsecret') + user = User.query.get('uid=newuser,ou=users,dc=example,dc=com') + self.assertEqual(user.loginname, 'newuser') + self.assertEqual(user.displayname, 'New User') + self.assertEqual(user.mail, 'test@example.com') + if self.use_openldap: + self.assertIsNotNone(login_get_user('newuser', 'notsecret')) + + def test_finish_completed(self): + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + self.assert_finish_success(signup, 'notsecret') + self.assert_finish_failure(refetch_signup(signup), 'notsecret') + + def test_finish_expired(self): + # TODO: Find a better way to test this! + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', + password='notsecret', created=datetime.datetime.now()-datetime.timedelta(hours=49)) + self.assert_finish_failure(signup, 'notsecret') + self.assert_finish_failure(refetch_signup(signup), 'notsecret') + + def test_finish_wrongpassword(self): + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com') + self.assert_finish_failure(signup, '') + self.assert_finish_failure(signup, 'wrongpassword') + signup = refetch_signup(signup) + self.assert_finish_failure(signup, '') + self.assert_finish_failure(signup, 'wrongpassword') + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + self.assert_finish_failure(signup, 'wrongpassword') + self.assert_finish_failure(refetch_signup(signup), 'wrongpassword') + + def test_finish_ldaperror(self): + signup = Signup(loginname='testuser', displayname='New User', mail='test@example.com', password='notsecret') + self.assert_finish_failure(signup, 'notsecret') + self.assert_finish_failure(refetch_signup(signup), 'notsecret') + + def test_duplicate(self): + signup = Signup(loginname='newuser', displayname='New User', mail='test1@example.com', password='notsecret') + self.assert_validate_valid(signup) + db.session.add(signup) + db.session.commit() + signup1_token = signup.token + signup = Signup(loginname='newuser', displayname='New User', mail='test2@example.com', password='notsecret') + self.assert_validate_valid(signup) + db.session.add(signup) + db.session.commit() + signup2_token = signup.token + db_flush() + signup = Signup.query.get(signup2_token) + self.assert_finish_success(signup, 'notsecret') + db.session.commit() + db_flush() + signup = Signup.query.get(signup1_token) + self.assert_finish_failure(signup, 'notsecret') + user = User.query.get('uid=newuser,ou=users,dc=example,dc=com') + self.assertEqual(user.mail, 'test2@example.com') + +class TestSignupModelOL(TestSignupModel): + use_openldap = True + +class TestSignupViews(UffdTestCase): + def setUpApp(self): + self.app.config['SELF_SIGNUP'] = True + self.app.last_mail = None + + def test_signup(self): + r = self.client.get(path=url_for('signup.signup_start'), follow_redirects=True) + dump('test_signup', r) + self.assertEqual(r.status_code, 200) + self.assertEqual(Signup.query.filter_by(loginname='newuser').all(), []) + r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True, + data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com', + 'password1': 'notsecret', 'password2': 'notsecret'}) + dump('test_signup_submit', r) + self.assertEqual(r.status_code, 200) + db_flush() + signups = Signup.query.filter_by(loginname='newuser').all() + self.assertEqual(len(signups), 1) + signup = signups[0] + self.assertEqual(signup.loginname, 'newuser') + self.assertEqual(signup.displayname, 'New User') + self.assertEqual(signup.mail, 'test@example.com') + self.assertIn(signup.token, str(self.app.last_mail.get_content())) + self.assertTrue(signup.check_password('notsecret')) + self.assertTrue(signup.validate()[0]) + + def test_signup_disabled(self): + self.app.config['SELF_SIGNUP'] = False + r = self.client.get(path=url_for('signup.signup_start'), follow_redirects=True) + dump('test_signup_disabled', r) + self.assertEqual(r.status_code, 200) + self.assertEqual(Signup.query.filter_by(loginname='newuser').all(), []) + r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True, + data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com', + 'password1': 'notsecret', 'password2': 'notsecret'}) + dump('test_signup_submit_disabled', r) + self.assertEqual(r.status_code, 200) + db_flush() + self.assertEqual(Signup.query.filter_by(loginname='newuser').all(), []) + + def test_signup_wrongpassword(self): + r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True, + data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com', + 'password1': 'notsecret', 'password2': 'notthesame'}) + dump('test_signup_wrongpassword', r) + self.assertEqual(r.status_code, 200) + self.assertIsNone(self.app.last_mail) + + def test_signup_invalid(self): + r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True, + data={'loginname': '', 'displayname': 'New User', 'mail': 'test@example.com', + 'password1': 'notsecret', 'password2': 'notsecret'}) + dump('test_signup_invalid', r) + self.assertEqual(r.status_code, 200) + self.assertIsNone(self.app.last_mail) + + def test_signup_mailerror(self): + self.app.config['MAIL_SKIP_SEND'] = 'fail' + r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True, + data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com', + 'password1': 'notsecret', 'password2': 'notsecret'}) + dump('test_signup_mailerror', r) + self.assertEqual(r.status_code, 200) + + def test_signup_hostlimit(self): + # Each signup_submit request leaks information about the existance of a + # user with a specific loginname. A host/network-based ratelimit should + # make enumerations of all user accounts difficult/next to impossible. + # Additionally each successful requests sends a mail to an + # attacker-controlled address. The ratelimit limits the applicability for + # spamming. + for i in range(20): + r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True, + data={'loginname': 'newuser%d'%i, 'displayname': 'New User', 'mail': 'test%d@example.com'%i, + 'password1': 'notsecret', 'password2': 'notsecret'}) + self.assertEqual(r.status_code, 200) + self.app.last_mail = None + r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True, + data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com', + 'password1': 'notsecret', 'password2': 'notsecret'}) + dump('test_signup_hostlimit', r) + self.assertEqual(r.status_code, 200) + self.assertEqual(Signup.query.filter_by(loginname='newuser').all(), []) + self.assertIsNone(self.app.last_mail) + + def test_signup_maillimit(self): + for i in range(3): + r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True, + data={'loginname': 'newuser%d'%i, 'displayname': 'New User', 'mail': 'test@example.com', + 'password1': 'notsecret', 'password2': 'notsecret'}) + self.assertEqual(r.status_code, 200) + self.app.last_mail = None + r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True, + data={'loginname': 'newuser', 'displayname': 'New User', 'mail': 'test@example.com', + 'password1': 'notsecret', 'password2': 'notsecret'}) + dump('test_signup_maillimit', r) + self.assertEqual(r.status_code, 200) + self.assertIsNone(self.app.last_mail) + # Check that we did not hit the host limit + r = self.client.post(path=url_for('signup.signup_submit'), follow_redirects=True, + data={'loginname': 'differentuser', 'displayname': 'New User', + 'mail': 'different@mailaddress.com', 'password1': 'notsecret', + 'password2': 'notsecret'}) + self.assertIsNotNone(self.app.last_mail) + + def test_signup_check(self): + r = self.client.post(path=url_for('signup.signup_check'), follow_redirects=True, + data={'loginname': 'newuser'}) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.content_type, 'application/json') + self.assertEqual(r.json['status'], 'ok') + + def test_signup_check_invalid(self): + r = self.client.post(path=url_for('signup.signup_check'), follow_redirects=True, + data={'loginname': ''}) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.content_type, 'application/json') + self.assertEqual(r.json['status'], 'invalid') + + def test_signup_check_exists(self): + r = self.client.post(path=url_for('signup.signup_check'), follow_redirects=True, + data={'loginname': 'testuser'}) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.content_type, 'application/json') + self.assertEqual(r.json['status'], 'exists') + + def test_signup_check_ratelimited(self): + for i in range(20): + r = self.client.post(path=url_for('signup.signup_check'), follow_redirects=True, + data={'loginname': 'newuser%d'%i}) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.content_type, 'application/json') + r = self.client.post(path=url_for('signup.signup_check'), follow_redirects=True, + data={'loginname': 'newuser'}) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.content_type, 'application/json') + self.assertEqual(r.json['status'], 'ratelimited') + + def test_confirm(self): + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + signup = refetch_signup(signup) + self.assertFalse(signup.completed) + if self.use_openldap: + self.assertIsNone(login_get_user('newuser', 'notsecret')) + r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True) + dump('test_signup_confirm', r) + self.assertEqual(r.status_code, 200) + signup = refetch_signup(signup) + self.assertFalse(signup.completed) + if self.use_openldap: + self.assertIsNone(login_get_user('newuser', 'notsecret')) + r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) + dump('test_signup_confirm_submit', r) + self.assertEqual(r.status_code, 200) + signup = refetch_signup(signup) + self.assertTrue(signup.completed) + self.assertEqual(signup.user.loginname, 'newuser') + self.assertEqual(signup.user.displayname, 'New User') + self.assertEqual(signup.user.mail, 'test@example.com') + if self.use_openldap: + self.assertIsNotNone(login_get_user('newuser', 'notsecret')) + self.assertTrue(is_valid_session()) + self.assertEqual(get_current_user().loginname, 'newuser') + + def test_confirm_loggedin(self): + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + signup = refetch_signup(signup) + self.client.post(path=url_for('session.login'), + data={'loginname': 'testuser', 'password': 'userpassword'}, follow_redirects=True) + self.assertFalse(signup.completed) + self.assertTrue(is_valid_session()) + self.assertEqual(get_current_user().loginname, 'testuser') + r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True) + self.assertEqual(r.status_code, 200) + r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) + self.assertEqual(r.status_code, 200) + signup = refetch_signup(signup) + self.assertTrue(signup.completed) + self.assertTrue(is_valid_session()) + self.assertEqual(get_current_user().loginname, 'newuser') + + def test_confirm_notfound(self): + r = self.client.get(path=url_for('signup.signup_confirm', token='notasignuptoken'), follow_redirects=True) + dump('test_signup_confirm_notfound', r) + self.assertEqual(r.status_code, 200) + r = self.client.post(path=url_for('signup.signup_confirm_submit', token='notasignuptoken'), follow_redirects=True, data={'password': 'notsecret'}) + dump('test_signup_confirm_submit_notfound', r) + self.assertEqual(r.status_code, 200) + + def test_confirm_expired(self): + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + signup.created = datetime.datetime.now() - datetime.timedelta(hours=49) + signup = refetch_signup(signup) + r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True) + dump('test_signup_confirm_expired', r) + self.assertEqual(r.status_code, 200) + r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) + dump('test_signup_confirm_submit_expired', r) + self.assertEqual(r.status_code, 200) + + def test_confirm_completed(self): + signup = Signup(loginname='testuser', displayname='New User', mail='test@example.com', password='notsecret') + signup.user = User.query.get('uid=testuser,ou=users,dc=example,dc=com') + signup = refetch_signup(signup) + self.assertTrue(signup.completed) + r = self.client.get(path=url_for('signup.signup_confirm', token=signup.token), follow_redirects=True) + dump('test_signup_confirm_completed', r) + self.assertEqual(r.status_code, 200) + r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) + dump('test_signup_confirm_submit_completed', r) + self.assertEqual(r.status_code, 200) + + def test_confirm_wrongpassword(self): + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + signup = refetch_signup(signup) + r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'wrongpassword'}) + dump('test_signup_confirm_wrongpassword', r) + self.assertEqual(r.status_code, 200) + self.assertFalse(signup.completed) + + def test_confirm_error(self): + # finish returns None and error message (here: because the user already exists) + signup = Signup(loginname='testuser', displayname='New User', mail='test@example.com', password='notsecret') + signup = refetch_signup(signup) + r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) + dump('test_signup_confirm_error', r) + self.assertEqual(r.status_code, 200) + self.assertFalse(signup.completed) + + def test_confirm_hostlimit(self): + for i in range(20): + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + signup = refetch_signup(signup) + r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i}) + self.assertEqual(r.status_code, 200) + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + signup = refetch_signup(signup) + self.assertFalse(signup.completed) + r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) + dump('test_signup_confirm_hostlimit', r) + self.assertEqual(r.status_code, 200) + self.assertFalse(signup.completed) + + def test_confirm_confirmlimit(self): + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='notsecret') + signup = refetch_signup(signup) + self.assertFalse(signup.completed) + for i in range(5): + r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'wrongpassword%d'%i}) + self.assertEqual(r.status_code, 200) + self.assertFalse(signup.completed) + r = self.client.post(path=url_for('signup.signup_confirm_submit', token=signup.token), follow_redirects=True, data={'password': 'notsecret'}) + dump('test_signup_confirm_confirmlimit', r) + self.assertEqual(r.status_code, 200) + self.assertFalse(signup.completed) + +class TestSignupViewsOL(TestSignupViews): + use_openldap = True diff --git a/tests/utils.py b/tests/utils.py index c1456ef9dacdc9df2fb82aaec741375f93ad601b..8567d49d4f1e8b3bf3a44af4e4f5d91ff078dde3 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -3,6 +3,8 @@ import tempfile import shutil import unittest +from flask import request + from uffd import create_app, db def dump(basename, resp): @@ -16,6 +18,11 @@ def dump(basename, resp): with open(path, 'xb') as f: f.write(resp.data) +def db_flush(): + db.session = db.create_scoped_session() + if hasattr(request, 'ldap_connection'): + del request.ldap_session + class UffdTestCase(unittest.TestCase): use_openldap = False @@ -29,6 +36,7 @@ class UffdTestCase(unittest.TestCase): 'SQLALCHEMY_DATABASE_URI': 'sqlite:///%s/db.sqlite'%self.dir, 'SECRET_KEY': 'DEBUGKEY', 'LDAP_SERVICE_MOCK': True, + 'MAIL_SKIP_SEND': True, } if self.use_openldap: if not os.environ.get('UNITTEST_OPENLDAP'): diff --git a/uffd/__init__.py b/uffd/__init__.py index 417b982001f55777c08a946e3fb893c4c1708687..e124a3ce1209e0fd10d280aa32fa065a1d2f67e3 100644 --- a/uffd/__init__.py +++ b/uffd/__init__.py @@ -13,7 +13,7 @@ from uffd.template_helper import register_template_helper from uffd.navbar import setup_navbar # pylint: enable=wrong-import-position -def create_app(test_config=None): +def create_app(test_config=None): # pylint: disable=too-many-locals # create and configure the app app = Flask(__name__, instance_relative_config=False) app.json_encoder = SQLAlchemyJSON @@ -43,10 +43,10 @@ def create_app(test_config=None): db.init_app(app) # pylint: disable=C0415 - from uffd import user, selfservice, role, mail, session, csrf, mfa, oauth2, services + from uffd import user, selfservice, role, mail, session, csrf, mfa, oauth2, services, signup # pylint: enable=C0415 - for i in user.bp + selfservice.bp + role.bp + mail.bp + session.bp + csrf.bp + mfa.bp + oauth2.bp + services.bp: + for i in user.bp + selfservice.bp + role.bp + mail.bp + session.bp + csrf.bp + mfa.bp + oauth2.bp + services.bp + signup.bp: app.register_blueprint(i) @app.route("/") diff --git a/uffd/default_config.cfg b/uffd/default_config.cfg index 619b5fe8ca0ae8b220eb3f1d51abc7c6ab00503f..727cb849ae1e847f2f89309c1cc8dd3c03f46da9 100644 --- a/uffd/default_config.cfg +++ b/uffd/default_config.cfg @@ -58,6 +58,9 @@ MAIL_PASSWORD='*****' MAIL_USE_STARTTLS=True MAIL_FROM_ADDRESS='foo@bar.com' +# Do not enable this on a public service! There is no spam protection implemented at the moment. +SELF_SIGNUP=False + #MFA_ICON_URL = 'https://example.com/logo.png' #MFA_RP_ID = 'example.com' # If unset, hostname from current request is used MFA_RP_NAME = 'Uffd Test Service' # Service name passed to U2F/FIDO2 authenticators diff --git a/uffd/sendmail.py b/uffd/sendmail.py new file mode 100644 index 0000000000000000000000000000000000000000..bf36ea8624253c35e9a7a7527b708de252b0b3ef --- /dev/null +++ b/uffd/sendmail.py @@ -0,0 +1,34 @@ +import smtplib +from email.message import EmailMessage +import email.utils + +from flask import render_template, current_app + +def sendmail(addr, subject, template_name, **kwargs): + msg = EmailMessage() + msg.set_content(render_template(template_name, **kwargs)) + msg['Subject'] = subject + msg['From'] = current_app.config['MAIL_FROM_ADDRESS'] + msg['To'] = addr + msg['Date'] = email.utils.formatdate(localtime=1) + msg['Message-ID'] = email.utils.make_msgid() + try: + if current_app.debug: + current_app.last_mail = None + current_app.logger.debug('Trying to send email to %s:\n'%(addr)+str(msg)) + if current_app.debug and current_app.config.get('MAIL_SKIP_SEND', False): + if current_app.config['MAIL_SKIP_SEND'] == 'fail': + raise smtplib.SMTPException() + current_app.last_mail = msg + return True + server = smtplib.SMTP(host=current_app.config['MAIL_SERVER'], port=current_app.config['MAIL_PORT']) + if current_app.config['MAIL_USE_STARTTLS']: + server.starttls() + server.login(current_app.config['MAIL_USERNAME'], current_app.config['MAIL_PASSWORD']) + server.send_message(msg) + server.quit() + if current_app.debug: + current_app.last_mail = msg + return True + except smtplib.SMTPException: + return False diff --git a/uffd/session/__init__.py b/uffd/session/__init__.py index 97d96e52ee61b4d273cefdc10f46a6f69fa2278d..5cddcdc6892a9c2e0a85e1047733432499b6d171 100644 --- a/uffd/session/__init__.py +++ b/uffd/session/__init__.py @@ -1,3 +1,3 @@ -from .views import bp as bp_ui, get_current_user, login_required, is_valid_session +from .views import bp as bp_ui, get_current_user, login_required, is_valid_session, set_session bp = [bp_ui] diff --git a/uffd/session/templates/login.html b/uffd/session/templates/login.html index e91bc833c1f257e5f828f650af672c3945c82310..e3a17c5716c9b1f4a90f4f86de72a4b9c69db7bd 100644 --- a/uffd/session/templates/login.html +++ b/uffd/session/templates/login.html @@ -22,7 +22,9 @@ <button type="submit" class="btn btn-primary btn-block" tabindex = "3">Login</button> </div> <div class="clearfix col-12"> - <a href="#" class="float-left">Register</a> + {% if config['SELF_SIGNUP'] %} + <a href="{{ url_for("signup.signup_start") }}" class="float-left">Register</a> + {% endif %} <a href="{{ url_for("selfservice.forgot_password") }}" class="float-right">Forgot Password?</a> </div> </div> diff --git a/uffd/session/views.py b/uffd/session/views.py index a235e3fcf295d0474fdab0c0272185dbe7409a42..0331f2f103f2f2bc9c0459164fa911acc4209755 100644 --- a/uffd/session/views.py +++ b/uffd/session/views.py @@ -50,6 +50,14 @@ def logout(): session.clear() return resp +def set_session(user, skip_mfa=False): + session.clear() + session['user_dn'] = user.dn + session['logintime'] = datetime.datetime.now().timestamp() + session['_csrf_token'] = secrets.token_hex(128) + if skip_mfa: + session['user_mfa'] = True + @bp.route("/login", methods=('GET', 'POST')) def login(): if request.method == 'GET': @@ -74,10 +82,7 @@ def login(): if not user.is_in_group(current_app.config['ACL_SELFSERVICE_GROUP']): flash('You do not have access to this service') return render_template('login.html', ref=request.values.get('ref')) - session.clear() - session['user_dn'] = user.dn - session['logintime'] = datetime.datetime.now().timestamp() - session['_csrf_token'] = secrets.token_hex(128) + set_session(user) return redirect(url_for('mfa.auth', ref=request.values.get('ref', url_for('index')))) def get_current_user(): diff --git a/uffd/signup/__init__.py b/uffd/signup/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..656390049779db11f3fbd9a16498218b18982068 --- /dev/null +++ b/uffd/signup/__init__.py @@ -0,0 +1,3 @@ +from .views import bp as _bp + +bp = [_bp] diff --git a/uffd/signup/models.py b/uffd/signup/models.py new file mode 100644 index 0000000000000000000000000000000000000000..82e89ced572c3b1ff2a9f6027559de4f83598a50 --- /dev/null +++ b/uffd/signup/models.py @@ -0,0 +1,112 @@ +import secrets +import datetime +from crypt import crypt + +from flask import current_app +from sqlalchemy import Column, String, Text, DateTime +from ldapalchemy.dbutils import DBRelationship + +from uffd.database import db +from uffd.ldap import ldap +from uffd.user.models import User +from uffd.role.models import Role + +class Signup(db.Model): + '''Model that represents a self-signup request + + When a person tries to sign up, an instance with user-provided loginname, + displayname, mail and password is created. Signup.validate is called to + validate the request. To ensure that person is in control of the provided + mail address, a mail with Signup.token is sent to that address. To complete + the signup, Signup.finish is called with a user-provided password that must + be equal to the initial password. + + Signup.token requires the password again so that a mistyped-but-valid mail + address does not allow a third party to complete the signup procedure and + set a new password with the (also mail-based) password reset functionality. + + As long as they are not completed, signup requests have no effect each other + or different parts of the application.''' + __tablename__ = 'signup' + token = Column(String(128), primary_key=True, default=lambda: secrets.token_hex(20)) + created = Column(DateTime, default=datetime.datetime.now, nullable=False) + loginname = Column(Text) + displayname = Column(Text) + mail = Column(Text) + pwhash = Column(Text) + user_dn = Column(String(128)) # Set after successful confirmation + user = DBRelationship('user_dn', User) + + type = Column(String(50)) + __mapper_args__ = { + 'polymorphic_identity': 'Signup', + 'polymorphic_on': type + } + + # Write-only property + def password(self, value): + if not User().set_password(value): + return + self.pwhash = crypt(value) + password = property(fset=password) + + def check_password(self, value): + return self.pwhash is not None and crypt(value, self.pwhash) == self.pwhash + + @property + def expired(self): + return self.created is not None and datetime.datetime.now() >= self.created + datetime.timedelta(hours=48) + + @property + def completed(self): + return self.user_dn is not None + + def validate(self): # pylint: disable=too-many-return-statements + '''Return whether the signup request is valid and Signup.finish is likely to succeed + + :returns: Tuple (valid, errmsg), if the signup request is invalid, `valid` + is False and `errmsg` contains a string describing why. Otherwise + `valid` is True.''' + if self.completed or self.expired: + return False, 'Invalid signup request' + if not User().set_loginname(self.loginname): + return False, 'Login name is invalid' + if not User().set_displayname(self.displayname): + return False, 'Display name is invalid' + if not User().set_mail(self.mail): + return False, 'Mail address is invalid' + if self.pwhash is None: + return False, 'Invalid password' + if User.query.filter_by(loginname=self.loginname).all(): + return False, 'A user with this login name already exists' + return True, 'Valid' + + def finish(self, password): + '''Complete the signup procedure and return the new user + + Signup.finish should only be called on an object that was (at some point) + successfully validated with Signup.validate! + + :param password: User password + + :returns: Tuple (user, errmsg), if the operation fails, `user` is None and + `errmsg` contains a string describing why. Otherwise `user` is a + User object.''' + if self.completed or self.expired: + return None, 'Invalid signup request' + if not self.check_password(password): + return None, 'Wrong password' + if User.query.filter_by(loginname=self.loginname).all(): + return None, 'A user with this login name already exists' + user = User(loginname=self.loginname, displayname=self.displayname, mail=self.mail, password=password) + ldap.session.add(user) + for name in current_app.config['ROLES_BASEROLES']: + for role in Role.query.filter_by(name=name).all(): + user.roles.add(role) + user.update_groups() + self.user = user + self.loginname = None + self.displayname = None + self.mail = None + self.pwhash = None + return user, 'Success' diff --git a/uffd/signup/templates/signup/confirm.html b/uffd/signup/templates/signup/confirm.html new file mode 100644 index 0000000000000000000000000000000000000000..9e235523a577d5e716cab5aa72df925a0a770c44 --- /dev/null +++ b/uffd/signup/templates/signup/confirm.html @@ -0,0 +1,26 @@ +{% extends 'base.html' %} + +{% block body %} +<form action="{{ url_for(".signup_confirm_submit", token=signup.token) }}" method="POST"> +<div class="row mt-2 justify-content-center"> + <div class="col-lg-6 col-md-10" style="background: #f7f7f7; box-shadow: 0px 2px 2px rgba(0, 0, 0, 0.3); padding: 30px;"> + <div class="text-center"> + <img alt="branding logo" src="{{ config.get("BRANDING_LOGO_URL") }}" class="col-lg-8 col-md-12" > + </div> + <div class="col-12"> + <h2 class="text-center">Complete Registration</h2> + </div> + {% if error %} + <div class="alert alert-danger" role="alert">{{ error }}</div> + {% endif %} + <div class="form-group col-12"> + <label for="user-password1">Please enter your password to complete the account registration</label> + <input type="password" class="form-control" id="user-password1" name="password" required="required"> + </div> + <div class="form-group col-12"> + <button type="submit" class="btn btn-primary btn-block">Complete Account Registration</button> + </div> + </div> +</div> +</form> +{% endblock %} diff --git a/uffd/signup/templates/signup/mail.txt b/uffd/signup/templates/signup/mail.txt new file mode 100644 index 0000000000000000000000000000000000000000..38cf3a846f9820fc6c7d2614a302844fa3d7192d --- /dev/null +++ b/uffd/signup/templates/signup/mail.txt @@ -0,0 +1,13 @@ +Hi {{ signup.displayname }}, + +an account was created on the CCCV infrastructure with this mail address. +Please visit the following url to complete the account registration: + +{{ url_for('signup.signup_confirm', token=signup.token, _external=True) }} + +**The link is valid for 48h** + +You can find more information at https://docs.cccv.de/. + +If you have not requested an account on the CCCV infrastructure, you can +ignore this mail. diff --git a/uffd/signup/templates/signup/start.html b/uffd/signup/templates/signup/start.html new file mode 100644 index 0000000000000000000000000000000000000000..abb225e96de93307b411f4119551b46fa0d50fbf --- /dev/null +++ b/uffd/signup/templates/signup/start.html @@ -0,0 +1,108 @@ +{% extends 'base.html' %} + +{% block body %} +<form action="{{ url_for('.signup_submit') }}" method="POST" onInput="password2.setCustomValidity(password1.value != password2.value ? 'Passwords do not match.' : '') "> +<div class="row mt-2 justify-content-center"> + <div class="col-lg-6 col-md-10" style="background: #f7f7f7; box-shadow: 0px 2px 2px rgba(0, 0, 0, 0.3); padding: 30px;"> + <div class="text-center"> + <img alt="branding logo" src="{{ config.get("BRANDING_LOGO_URL") }}" class="col-lg-8 col-md-12" > + </div> + <div class="col-12"> + <h2 class="text-center">Account Registration</h2> + </div> + {% if error %} + <div class="form-group col-12"> + <div class="alert alert-danger" role="alert">{{ error }}</div> + </div> + {% endif %} + <div class="form-group col-12"> + <label for="user-loginname">Login Name</label> + <div class="js-only-input-group"> + <input type="text" class="form-control" id="user-loginname" name="loginname" aria-describedby="loginname-feedback" value="{{ request.form.loginname }}" minlength=1 maxlength=32 pattern="[a-z0-9_-]*" required> + <div class="js-only-input-group-append d-none"> + <button class="btn btn-outline-secondary rounded-right" type="button" id="check-loginname">Check</button> + </div> + <div id="loginname-feedback" class="invalid-feedback">foobar</div> + </div> + <small class="form-text text-muted"> + At least one and at most 32 lower-case characters, digits, dashes ("-") or underscores ("_"). <b>Cannot be changed later!</b> + </small> + </div> + <div class="form-group col-12"> + <label for="user-displayname">Display Name</label> + <input type="text" class="form-control" id="user-displayname" name="displayname" value="{{ request.form.displayname }}" minlength=1 maxlength=128 required> + <small class="form-text text-muted"> + At least one and at most 128 characters, no other special requirements. + </small> + </div> + <div class="form-group col-12"> + <label for="user-mail">E-Mail Address</label> + <input type="email" class="form-control" id="user-mail" name="mail" value="{{ request.form.mail }}" required> + <small class="form-text text-muted"> + We will send a confirmation mail to this address that you need to complete the registration. + </small> + </div> + <div class="form-group col-12"> + <label for="user-password1">Password</label> + <input type="password" class="form-control" id="user-password1" name="password1" minlength=8 maxlength=256 required> + <small class="form-text text-muted"> + At least 8 and at most 256 characters, no other special requirements. But please don't be stupid, do use a password manager. + </small> + </div> + <div class="form-group col-12"> + <label for="user-password2">Repeat Password</label> + <input type="password" class="form-control" id="user-password2" name="password2" required> + </div> + <div class="form-group col-12"> + <button type="submit" class="btn btn-primary btn-block">Create Account</button> + </div> + </div> +</div> +</form> + +<script> +$(".js-only-input-group").addClass("input-group"); +$(".js-only-input-group-append").removeClass("d-none"); +$(".js-only-input-group-append").addClass("input-group-append"); + +let checkreq; +$("#check-loginname").on("click", function () { + if (checkreq) + checkreq.abort(); + $("#user-loginname").removeClass("is-valid"); + $("#user-loginname").removeClass("is-invalid"); + $("#check-loginname").prop("disabled", true); + checkreq = $.ajax({ + url: {{ url_for('.signup_check')|tojson }}, + method: "POST", + data: {"loginname": $("#user-loginname").val()}, + success: function (resp) { + checkreq = null; + $("#check-loginname").prop("disabled", false); + if (resp.status == "ok") { + $("#user-loginname").addClass("is-valid"); + $("#loginname-feedback").text(""); + } else if (resp.status == 'exists') { + $("#loginname-feedback").text("The name is already taken"); + $("#user-loginname").addClass("is-invalid"); + } else if (resp.status == 'ratelimited') { + $("#loginname-feedback").text("Too many requests! Please wait a bit before trying again!"); + $("#user-loginname").addClass("is-invalid"); + } else { + $("#loginname-feedback").text("The name is invalid"); + $("#user-loginname").addClass("is-invalid"); + } + } + }); +}); +$("#user-loginname").on("input", function () { + if (checkreq) + checkreq.abort(); + checkreq = null; + $("#user-loginname").removeClass("is-valid"); + $("#user-loginname").removeClass("is-invalid"); + $("#check-loginname").prop("disabled", false); +}); + +</script> +{% endblock %} diff --git a/uffd/signup/templates/signup/submitted.html b/uffd/signup/templates/signup/submitted.html new file mode 100644 index 0000000000000000000000000000000000000000..685c1a01b3a89f86039882e486eba30538ccbdf9 --- /dev/null +++ b/uffd/signup/templates/signup/submitted.html @@ -0,0 +1,17 @@ +{% extends 'base.html' %} + +{% block body %} + +<div class="row mt-2 justify-content-center"> + <div class="col-lg-6 col-md-10" style="background: #f7f7f7; box-shadow: 0px 2px 2px rgba(0, 0, 0, 0.3); padding: 30px;"> + <div class="text-center"> + <img alt="branding logo" src="{{ config.get("BRANDING_LOGO_URL") }}" class="col-lg-8 col-md-12" > + </div> + <div class="col-12 mb-3"> + <h2 class="text-center">Confirm your E-Mail Address</h2> + </div> + <p>We sent a confirmation mail to <b>{{ signup.mail }}</b>. You need to confirm your mail address within 48 hours to complete the account registration.</p> + <p>If you mistyped your mail address or don't receive the confirmation mail for another reason, retry the registration procedure from the beginning.</p> + </div> +</div> +{% endblock %} diff --git a/uffd/signup/views.py b/uffd/signup/views.py new file mode 100644 index 0000000000000000000000000000000000000000..ce9b139e83f729dffb93be72b83a6b633bb41f49 --- /dev/null +++ b/uffd/signup/views.py @@ -0,0 +1,102 @@ +import functools + +from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, jsonify + +from uffd.database import db +from uffd.ldap import ldap +from uffd.session import set_session +from uffd.user.models import User +from uffd.sendmail import sendmail +from uffd.signup.models import Signup +from uffd.ratelimit import Ratelimit, host_ratelimit, format_delay + +bp = Blueprint('signup', __name__, template_folder='templates', url_prefix='/signup/') + +signup_ratelimit = Ratelimit('signup', 24*60, 3) +confirm_ratelimit = Ratelimit('signup_confirm', 10*60, 3) + +def signup_enabled(func): + @functools.wraps(func) + def decorator(*args, **kwargs): + if not current_app.config['SELF_SIGNUP']: + flash('Singup not enabled') + return redirect(url_for('index')) + return func(*args, **kwargs) + return decorator + +@bp.route('/') +@signup_enabled +def signup_start(): + return render_template('signup/start.html') + +@bp.route('/check', methods=['POST']) +@signup_enabled +def signup_check(): + if host_ratelimit.get_delay(): + return jsonify({'status': 'ratelimited'}) + host_ratelimit.log() + if not User().set_loginname(request.form['loginname']): + return jsonify({'status': 'invalid'}) + if User.query.filter_by(loginname=request.form['loginname']).all(): + return jsonify({'status': 'exists'}) + return jsonify({'status': 'ok'}) + +@bp.route('/', methods=['POST']) +@signup_enabled +def signup_submit(): + if request.form['password1'] != request.form['password2']: + return render_template('signup/start.html', error='Passwords do not match') + signup_delay = signup_ratelimit.get_delay(request.form['mail']) + host_delay = host_ratelimit.get_delay() + if signup_delay and signup_delay > host_delay: + return render_template('signup/start.html', error='Too many signup requests with this mail address! Please wait %s.'%format_delay(signup_delay)) + if host_delay: + return render_template('signup/start.html', error='Too many requests! Please wait %s.'%format_delay(host_delay)) + host_ratelimit.log() + signup = Signup(loginname=request.form['loginname'], + displayname=request.form['displayname'], + mail=request.form['mail'], password=request.form['password1']) + valid, msg = signup.validate() + if not valid: + return render_template('signup/start.html', error=msg) + db.session.add(signup) + db.session.commit() + sent = sendmail(signup.mail, 'Confirm your mail address', 'signup/mail.txt', signup=signup) + if not sent: + return render_template('signup/start.html', error='Cound not send mail') + signup_ratelimit.log(request.form['mail']) + return render_template('signup/submitted.html', signup=signup) + +# signup_confirm* views are always accessible so other modules (e.g. invite) can reuse them +@bp.route('/confirm/<token>') +def signup_confirm(token): + signup = Signup.query.get(token) + if not signup or signup.expired or signup.completed: + flash('Invalid signup link') + return redirect(url_for('index')) + return render_template('signup/confirm.html', signup=signup) + +@bp.route('/confirm/<token>', methods=['POST']) +def signup_confirm_submit(token): + signup = Signup.query.get(token) + if not signup or signup.expired or signup.completed: + flash('Invalid signup link') + return redirect(url_for('index')) + confirm_delay = confirm_ratelimit.get_delay(token) + host_delay = host_ratelimit.get_delay() + if confirm_delay and confirm_delay > host_delay: + return render_template('signup/confirm.html', signup=signup, error='Too many failed attempts! Please wait %s.'%format_delay(confirm_delay)) + if host_delay: + return render_template('signup/confirm.html', signup=signup, error='Too many requests! Please wait %s.'%format_delay(host_delay)) + if not signup.check_password(request.form['password']): + host_ratelimit.log() + confirm_ratelimit.log(token) + return render_template('signup/confirm.html', signup=signup, error='Wrong password') + user, msg = signup.finish(request.form['password']) + if user is None: + return render_template('signup/confirm.html', signup=signup, error=msg) + db.session.commit() + ldap.session.commit() + set_session(user, skip_mfa=True) + flash('Your account was successfully created') + return redirect(url_for('selfservice.index')) diff --git a/uffd/user/models.py b/uffd/user/models.py index ed8232f017294690891f5ac707350ba7e2ebbb7f..e9f0ea7a64afcd4ca5d7769aa58c2f70d578dd3e 100644 --- a/uffd/user/models.py +++ b/uffd/user/models.py @@ -42,8 +42,8 @@ class BaseUser(ldap.Model): mail = ldap.Attribute(lazyconfig_str('LDAP_USER_MAIL_ATTRIBUTE'), aliases=lazyconfig_list('LDAP_USER_MAIL_ALIASES')) pwhash = ldap.Attribute('userPassword', default=lambda: hashed(HASHED_SALTED_SHA512, secrets.token_hex(128))) - groups = [] # Shuts up pylint, overwritten by back-reference - roles = [] # Shuts up pylint, overwritten by back-reference + groups = set() # Shuts up pylint, overwritten by back-reference + roles = set() # Shuts up pylint, overwritten by back-reference def add_default_attributes(self): for name, values in current_app.config['LDAP_USER_DEFAULT_ATTRIBUTES'].items():