diff --git a/tests/test_api.py b/tests/test_api.py index 2cefc16ee4778a7e7e242d5c0cf18fbbcb148c20..b01a6ff3eef261a1148a98d01742fa2ca007fe89 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -3,7 +3,10 @@ import base64 from flask import url_for from uffd.api.views import apikey_required -from utils import UffdTestCase +from uffd.user.models import User +from uffd.password_hash import PlaintextPasswordHash +from uffd.database import db +from utils import UffdTestCase, db_flush def basic_auth(username, password): return ('Authorization', 'Basic ' + base64.b64encode(f'{username}:{password}'.encode()).decode()) @@ -56,7 +59,7 @@ class TestAPIAuth(UffdTestCase): r = self.client.get(path=url_for('testendpoint1'), follow_redirects=True) self.assertEqual(r.status_code, 401) -class TestAPIViews(UffdTestCase): +class TestAPIGetmails(UffdTestCase): def setUpApp(self): self.app.config['API_CLIENTS_2'] = { 'test': {'client_secret': 'test', 'scopes': ['getmails']}, @@ -80,3 +83,29 @@ class TestAPIViews(UffdTestCase): self.assertEqual(r.status_code, 200) self.assertEqual(r.json, [{'name': 'test', 'receive_addresses': ['test1@example.com', 'test2@example.com'], 'destination_addresses': ['testuser@mail.example.com']}]) +class TestAPICheckPassword(UffdTestCase): + def setUpApp(self): + self.app.config['API_CLIENTS_2'] = { + 'test': {'client_secret': 'test', 'scopes': ['checkpassword']}, + } + + def test(self): + r = self.client.post(path=url_for('api.checkpassword'), data={'loginname': 'testuser', 'password': 'userpassword'}, headers=[basic_auth('test', 'test')]) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.json['loginname'], 'testuser') + + def test_password_rehash(self): + self.get_user().password = PlaintextPasswordHash.from_password('userpassword') + db.session.commit() + self.assertIsInstance(self.get_user().password, PlaintextPasswordHash) + db_flush() + r = self.client.post(path=url_for('api.checkpassword'), data={'loginname': 'testuser', 'password': 'userpassword'}, headers=[basic_auth('test', 'test')]) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.json['loginname'], 'testuser') + self.assertIsInstance(self.get_user().password, User.password.method_cls) + self.assertTrue(self.get_user().password.verify('userpassword')) + + def test_wrong_password(self): + r = self.client.post(path=url_for('api.checkpassword'), data={'loginname': 'testuser', 'password': 'wrongpassword'}, headers=[basic_auth('test', 'test')]) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.json, None) diff --git a/tests/test_invite.py b/tests/test_invite.py index 9a7304ee137bc422f21344b2b714f824d9f23995..dee382d03988b7046a8ada3a8ac224d846957f38 100644 --- a/tests/test_invite.py +++ b/tests/test_invite.py @@ -646,7 +646,7 @@ class TestInviteUseViews(UffdTestCase): self.assertEqual(signup.displayname, 'New User') self.assertEqual(signup.mail, 'test@example.com') self.assertIn(signup.token, str(self.app.last_mail.get_content())) - self.assertTrue(signup.check_password('notsecret')) + self.assertTrue(signup.password.verify('notsecret')) self.assertTrue(signup.validate()[0]) def test_signup_invalid_invite(self): diff --git a/tests/test_password_hash.py b/tests/test_password_hash.py new file mode 100644 index 0000000000000000000000000000000000000000..ed17b895457b450ef643e39b1d5d255c12c9061d --- /dev/null +++ b/tests/test_password_hash.py @@ -0,0 +1,181 @@ +import unittest + +from uffd.password_hash import * + +class TestPasswordHashRegistry(unittest.TestCase): + def test(self): + registry = PasswordHashRegistry() + + @registry.register + class TestPasswordHash: + METHOD_NAME = 'test' + def __init__(self, value, **kwargs): + self.value = value + self.kwargs = kwargs + + @registry.register + class Test2PasswordHash: + METHOD_NAME = 'test2' + + result = registry.parse('{test}data', key='value') + self.assertIsInstance(result, TestPasswordHash) + self.assertEqual(result.value, '{test}data') + self.assertEqual(result.kwargs, {'key': 'value'}) + with self.assertRaises(ValueError): + registry.parse('{invalid}data') + with self.assertRaises(ValueError): + registry.parse('invalid') + with self.assertRaises(ValueError): + registry.parse('{invalid') + +class TestPasswordHash(unittest.TestCase): + def setUp(self): + class TestPasswordHash(PasswordHash): + @classmethod + def from_password(cls, password): + cls(build_value(cls.METHOD_NAME, password)) + + def verify(self, password): + return self.data == password + + class TestPasswordHash1(TestPasswordHash): + METHOD_NAME = 'test1' + + class TestPasswordHash2(TestPasswordHash): + METHOD_NAME = 'test2' + + self.TestPasswordHash1 = TestPasswordHash1 + self.TestPasswordHash2 = TestPasswordHash2 + + def test(self): + obj = self.TestPasswordHash1('{test1}data') + self.assertEqual(obj.value, '{test1}data') + self.assertEqual(obj.data, 'data') + self.assertIs(obj.target_cls, self.TestPasswordHash1) + self.assertFalse(obj.needs_rehash) + + def test_invalid(self): + with self.assertRaises(ValueError): + self.TestPasswordHash1('invalid') + with self.assertRaises(ValueError): + self.TestPasswordHash1('{invalid}data') + with self.assertRaises(ValueError): + self.TestPasswordHash1('{test2}data') + + def test_target_cls(self): + obj = self.TestPasswordHash1('{test1}data', target_cls=self.TestPasswordHash1) + self.assertEqual(obj.value, '{test1}data') + self.assertEqual(obj.data, 'data') + self.assertIs(obj.target_cls, self.TestPasswordHash1) + self.assertFalse(obj.needs_rehash) + obj = self.TestPasswordHash1('{test1}data', target_cls=self.TestPasswordHash2) + self.assertEqual(obj.value, '{test1}data') + self.assertEqual(obj.data, 'data') + self.assertIs(obj.target_cls, self.TestPasswordHash2) + self.assertTrue(obj.needs_rehash) + obj = self.TestPasswordHash1('{test1}data', target_cls=PasswordHash) + self.assertEqual(obj.value, '{test1}data') + self.assertEqual(obj.data, 'data') + self.assertIs(obj.target_cls, PasswordHash) + self.assertFalse(obj.needs_rehash) + +class TestPlaintextPasswordHash(unittest.TestCase): + def test_verify(self): + obj = PlaintextPasswordHash('{plain}password') + self.assertTrue(obj.verify('password')) + self.assertFalse(obj.verify('notpassword')) + + def test_from_password(self): + obj = PlaintextPasswordHash.from_password('password') + self.assertEqual(obj.value, '{plain}password') + self.assertTrue(obj.verify('password')) + self.assertFalse(obj.verify('notpassword')) + +class TestHashlibPasswordHash(unittest.TestCase): + def test_verify(self): + obj = SHA512PasswordHash('{sha512}sQnzu7wkTrgkQZF+0G1hi5AI3Qmzvv0bXgc5THBqi7mAsdd4Xll27ASbRt9fEyavWi6m0QP9B8lThf+rDKy8hg==') + self.assertTrue(obj.verify('password')) + self.assertFalse(obj.verify('notpassword')) + + def test_from_password(self): + obj = SHA512PasswordHash.from_password('password') + self.assertIsNotNone(obj.value) + self.assertTrue(obj.value.startswith('{sha512}')) + self.assertTrue(obj.verify('password')) + self.assertFalse(obj.verify('notpassword')) + +class TestSaltedHashlibPasswordHash(unittest.TestCase): + def test_verify(self): + obj = SaltedSHA512PasswordHash('{ssha512}dOeDLmVpHJThhHeag10Hm2g4T7s3SBE6rGHcXUolXJHVufY4qT782rwZ/0XE6cuLcBZ0KpnwmUzRpAEtZBdv+JYEEtZQs/uC') + self.assertTrue(obj.verify('password')) + self.assertFalse(obj.verify('notpassword')) + + def test_from_password(self): + obj = SaltedSHA512PasswordHash.from_password('password') + self.assertIsNotNone(obj.value) + self.assertTrue(obj.value.startswith('{ssha512}')) + self.assertTrue(obj.verify('password')) + self.assertFalse(obj.verify('notpassword')) + +class TestCryptPasswordHash(unittest.TestCase): + def test_verify(self): + obj = CryptPasswordHash('{crypt}$5$UbTTMBH9NRurlQcX$bUiUTyedvmArlVt.62ZLRV80e2v3DjcBp/tSDkP2imD') + self.assertTrue(obj.verify('password')) + self.assertFalse(obj.verify('notpassword')) + + def test_from_password(self): + obj = CryptPasswordHash.from_password('password') + self.assertIsNotNone(obj.value) + self.assertTrue(obj.value.startswith('{crypt}')) + self.assertTrue(obj.verify('password')) + self.assertFalse(obj.verify('notpassword')) + +class TestInvalidPasswordHash(unittest.TestCase): + def test(self): + obj = InvalidPasswordHash('test') + self.assertEqual(obj.value, 'test') + self.assertFalse(obj.verify('test')) + self.assertTrue(obj.needs_rehash) + self.assertFalse(obj) + obj = InvalidPasswordHash(None) + self.assertIsNone(obj.value) + self.assertFalse(obj.verify('test')) + self.assertTrue(obj.needs_rehash) + self.assertFalse(obj) + +class TestPasswordWrapper(unittest.TestCase): + def setUp(self): + class Test: + password_hash = None + password = PasswordHashAttribute('password_hash', PlaintextPasswordHash) + + self.test = Test() + + def test_get_none(self): + self.test.password_hash = None + obj = self.test.password + self.assertIsInstance(obj, InvalidPasswordHash) + self.assertEqual(obj.value, None) + self.assertTrue(obj.needs_rehash) + + def test_get_valid(self): + self.test.password_hash = '{plain}password' + obj = self.test.password + self.assertIsInstance(obj, PlaintextPasswordHash) + self.assertEqual(obj.value, '{plain}password') + self.assertFalse(obj.needs_rehash) + + def test_get_needs_rehash(self): + self.test.password_hash = '{ssha512}dOeDLmVpHJThhHeag10Hm2g4T7s3SBE6rGHcXUolXJHVufY4qT782rwZ/0XE6cuLcBZ0KpnwmUzRpAEtZBdv+JYEEtZQs/uC' + obj = self.test.password + self.assertIsInstance(obj, SaltedSHA512PasswordHash) + self.assertEqual(obj.value, '{ssha512}dOeDLmVpHJThhHeag10Hm2g4T7s3SBE6rGHcXUolXJHVufY4qT782rwZ/0XE6cuLcBZ0KpnwmUzRpAEtZBdv+JYEEtZQs/uC') + self.assertTrue(obj.needs_rehash) + + def test_set(self): + self.test.password = 'password' + self.assertEqual(self.test.password_hash, '{plain}password') + + def test_set_none(self): + self.test.password = None + self.assertIsNone(self.test.password_hash) diff --git a/tests/test_selfservice.py b/tests/test_selfservice.py index 2365d7ccd46489f3f76f96244dd205f3b3bab44f..cd9988be88d3e582d500cdd7fa24454674decef1 100644 --- a/tests/test_selfservice.py +++ b/tests/test_selfservice.py @@ -87,7 +87,7 @@ class TestSelfservice(UffdTestCase): dump('change_password', r) self.assertEqual(r.status_code, 200) _user = request.user - self.assertTrue(_user.check_password('newpassword')) + self.assertTrue(_user.password.verify('newpassword')) def test_change_password_invalid(self): self.login_as('user') @@ -98,8 +98,8 @@ class TestSelfservice(UffdTestCase): dump('change_password_invalid', r) self.assertEqual(r.status_code, 200) _user = request.user - self.assertFalse(_user.check_password('shortpw')) - self.assertTrue(_user.check_password('userpassword')) + self.assertFalse(_user.password.verify('shortpw')) + self.assertTrue(_user.password.verify('userpassword')) # Regression test for #100 (login not possible if password contains character disallowed by SASLprep) def test_change_password_samlprep_invalid(self): @@ -111,8 +111,8 @@ class TestSelfservice(UffdTestCase): dump('change_password_samlprep_invalid', r) self.assertEqual(r.status_code, 200) _user = request.user - self.assertFalse(_user.check_password('shortpw\n')) - self.assertTrue(_user.check_password('userpassword')) + self.assertFalse(_user.password.verify('shortpw\n')) + self.assertTrue(_user.password.verify('userpassword')) def test_change_password_mismatch(self): self.login_as('user') @@ -123,9 +123,9 @@ class TestSelfservice(UffdTestCase): dump('change_password_mismatch', r) self.assertEqual(r.status_code, 200) _user = request.user - self.assertFalse(_user.check_password('newpassword1')) - self.assertFalse(_user.check_password('newpassword2')) - self.assertTrue(_user.check_password('userpassword')) + self.assertFalse(_user.password.verify('newpassword1')) + self.assertFalse(_user.password.verify('newpassword2')) + self.assertTrue(_user.password.verify('userpassword')) def test_leave_role(self): baserole = Role(name='baserole', is_default=True) @@ -258,7 +258,7 @@ class TestSelfservice(UffdTestCase): data={'password1': 'newpassword', 'password2': 'newpassword'}, follow_redirects=True) dump('token_password_submit', r) self.assertEqual(r.status_code, 200) - self.assertTrue(self.get_user().check_password('newpassword')) + self.assertTrue(self.get_user().password.verify('newpassword')) def test_token_password_emptydb(self): user = self.get_user() @@ -271,7 +271,7 @@ class TestSelfservice(UffdTestCase): dump('token_password_emptydb_submit', r) self.assertEqual(r.status_code, 200) self.assertIn(b'Token expired, please try again', r.data) - self.assertTrue(self.get_user().check_password('userpassword')) + self.assertTrue(self.get_user().password.verify('userpassword')) def test_token_password_invalid(self): user = self.get_user() @@ -287,7 +287,7 @@ class TestSelfservice(UffdTestCase): dump('token_password_invalid_submit', r) self.assertEqual(r.status_code, 200) self.assertIn(b'Token expired, please try again', r.data) - self.assertTrue(self.get_user().check_password('userpassword')) + self.assertTrue(self.get_user().password.verify('userpassword')) def test_token_password_expired(self): user = self.get_user() @@ -303,7 +303,7 @@ class TestSelfservice(UffdTestCase): dump('token_password_invalid_expired_submit', r) self.assertEqual(r.status_code, 200) self.assertIn(b'Token expired, please try again', r.data) - self.assertTrue(self.get_user().check_password('userpassword')) + self.assertTrue(self.get_user().password.verify('userpassword')) def test_token_password_different_passwords(self): user = self.get_user() @@ -316,4 +316,4 @@ class TestSelfservice(UffdTestCase): data={'password1': 'newpassword', 'password2': 'differentpassword'}, follow_redirects=True) dump('token_password_different_passwords_submit', r) self.assertEqual(r.status_code, 200) - self.assertTrue(self.get_user().check_password('userpassword')) + self.assertTrue(self.get_user().password.verify('userpassword')) diff --git a/tests/test_session.py b/tests/test_session.py index cf7a96974f8581af0faba72cfc6da7dfbf746035..711c76b9482bb943c9512053119438a50a9ec145 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -9,9 +9,11 @@ from uffd import user from uffd.session.views import login_required from uffd.session.models import DeviceLoginConfirmation from uffd.oauth2.models import OAuth2DeviceLoginInitiation +from uffd.user.models import User +from uffd.password_hash import PlaintextPasswordHash from uffd import create_app, db -from utils import dump, UffdTestCase +from utils import dump, UffdTestCase, db_flush class TestSession(UffdTestCase): def setUpApp(self): @@ -61,6 +63,17 @@ class TestSession(UffdTestCase): self.assertEqual(r.status_code, 200) self.assertLoggedIn() + def test_login_password_rehash(self): + self.get_user().password = PlaintextPasswordHash.from_password('userpassword') + db.session.commit() + self.assertIsInstance(self.get_user().password, PlaintextPasswordHash) + db_flush() + r = self.login_as('user') + self.assertEqual(r.status_code, 200) + self.assertLoggedIn() + self.assertIsInstance(self.get_user().password, User.password.method_cls) + self.assertTrue(self.get_user().password.verify('userpassword')) + def test_titlecase_password(self): r = self.client.post(path=url_for('session.login'), data={'loginname': self.get_user().loginname.title(), 'password': 'userpassword'}, follow_redirects=True) diff --git a/tests/test_signup.py b/tests/test_signup.py index 063635a2c94a1cc9d421a320b6f7c0a670c1e57d..e64d01234d37a0eb550f4092ff879f2ed4f1eeb9 100644 --- a/tests/test_signup.py +++ b/tests/test_signup.py @@ -22,7 +22,7 @@ def refetch_signup(signup): db_flush() return Signup.query.get(id) -# We assume in all tests that Signup.validate and Signup.check_password do +# We assume in all tests that Signup.validate and Signup.password.verify do # not alter any state class TestSignupModel(UffdTestCase): @@ -55,12 +55,12 @@ class TestSignupModel(UffdTestCase): def test_password(self): signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com') - self.assertFalse(signup.check_password('notsecret')) - self.assertFalse(signup.check_password('')) - self.assertFalse(signup.check_password('wrongpassword')) - signup.password = 'notsecret' - self.assertTrue(signup.check_password('notsecret')) - self.assertFalse(signup.check_password('wrongpassword')) + self.assertFalse(signup.password.verify('notsecret')) + self.assertFalse(signup.password.verify('')) + self.assertFalse(signup.password.verify('wrongpassword')) + self.assertTrue(signup.set_password('notsecret')) + self.assertTrue(signup.password.verify('notsecret')) + self.assertFalse(signup.password.verify('wrongpassword')) def test_expired(self): # TODO: Find a better way to test this! @@ -111,7 +111,8 @@ class TestSignupModel(UffdTestCase): self.assert_validate_invalid(refetch_signup(signup)) def test_validate_password(self): - signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com', password='') + signup = Signup(loginname='newuser', displayname='New User', mail='test@example.com') + self.assertFalse(signup.set_password('')) self.assert_validate_invalid(signup) self.assert_validate_invalid(refetch_signup(signup)) @@ -200,7 +201,7 @@ class TestSignupViews(UffdTestCase): self.assertEqual(signup.displayname, 'New User') self.assertEqual(signup.mail, 'test@example.com') self.assertIn(signup.token, str(self.app.last_mail.get_content())) - self.assertTrue(signup.check_password('notsecret')) + self.assertTrue(signup.password.verify('notsecret')) self.assertTrue(signup.validate()[0]) def test_signup_disabled(self): diff --git a/tests/test_user.py b/tests/test_user.py index 358c2f2fe28d2a18d6aee568e9c68141eae63f8a..8973d16c6928d67d8b097a062a1f7253661ea3ce 100644 --- a/tests/test_user.py +++ b/tests/test_user.py @@ -162,7 +162,7 @@ class TestUserViews(UffdTestCase): self.assertEqual(user_updated.mail, 'newuser@example.com') self.assertEqual(user_updated.unix_uid, user_unupdated.unix_uid) self.assertEqual(user_updated.loginname, user_unupdated.loginname) - self.assertTrue(user_updated.check_password('userpassword')) + self.assertTrue(user_updated.password.verify('userpassword')) self.assertEqual(roles, ['base', 'role1']) def test_update_password(self): @@ -179,8 +179,8 @@ class TestUserViews(UffdTestCase): self.assertEqual(user_updated.mail, 'newuser@example.com') self.assertEqual(user_updated.unix_uid, user_unupdated.unix_uid) self.assertEqual(user_updated.loginname, user_unupdated.loginname) - self.assertTrue(user_updated.check_password('newpassword')) - self.assertFalse(user_updated.check_password('userpassword')) + self.assertTrue(user_updated.password.verify('newpassword')) + self.assertFalse(user_updated.password.verify('userpassword')) def test_update_invalid_password(self): user_unupdated = self.get_user() @@ -192,8 +192,8 @@ class TestUserViews(UffdTestCase): dump('user_update_invalid_password', r) self.assertEqual(r.status_code, 200) user_updated = self.get_user() - self.assertFalse(user_updated.check_password('A')) - self.assertTrue(user_updated.check_password('userpassword')) + self.assertFalse(user_updated.password.verify('A')) + self.assertTrue(user_updated.password.verify('userpassword')) self.assertEqual(user_updated.displayname, user_unupdated.displayname) self.assertEqual(user_updated.mail, user_unupdated.mail) self.assertEqual(user_updated.loginname, user_unupdated.loginname) @@ -209,8 +209,8 @@ class TestUserViews(UffdTestCase): dump('user_update_invalid_password', r) self.assertEqual(r.status_code, 200) user_updated = self.get_user() - self.assertFalse(user_updated.check_password('newpassword\n')) - self.assertTrue(user_updated.check_password('userpassword')) + self.assertFalse(user_updated.password.verify('newpassword\n')) + self.assertTrue(user_updated.password.verify('userpassword')) self.assertEqual(user_updated.displayname, user_unupdated.displayname) self.assertEqual(user_updated.mail, user_unupdated.mail) self.assertEqual(user_updated.loginname, user_unupdated.loginname) @@ -228,8 +228,8 @@ class TestUserViews(UffdTestCase): self.assertEqual(user_updated.displayname, user_unupdated.displayname) self.assertEqual(user_updated.mail, user_unupdated.mail) self.assertEqual(user_updated.loginname, user_unupdated.loginname) - self.assertFalse(user_updated.check_password('newpassword')) - self.assertTrue(user_updated.check_password('userpassword')) + self.assertFalse(user_updated.password.verify('newpassword')) + self.assertTrue(user_updated.password.verify('userpassword')) def test_update_invalid_display_name(self): user_unupdated = self.get_user() @@ -244,8 +244,8 @@ class TestUserViews(UffdTestCase): self.assertEqual(user_updated.displayname, user_unupdated.displayname) self.assertEqual(user_updated.mail, user_unupdated.mail) self.assertEqual(user_updated.loginname, user_unupdated.loginname) - self.assertFalse(user_updated.check_password('newpassword')) - self.assertTrue(user_updated.check_password('userpassword')) + self.assertFalse(user_updated.password.verify('newpassword')) + self.assertTrue(user_updated.password.verify('userpassword')) def test_show(self): r = self.client.get(path=url_for('user.show', id=self.get_user().id), follow_redirects=True) @@ -393,7 +393,7 @@ class TestUserCLI(UffdTestCase): self.assertIsNotNone(user) self.assertEqual(user.mail, 'newmail@example.com') self.assertEqual(user.displayname, 'New Display Name') - self.assertTrue(user.check_password('newpassword')) + self.assertTrue(user.password.verify('newpassword')) self.assertEqual(user.roles, Role.query.filter_by(name='admin').all()) self.assertIn(self.get_admin_group(), user.groups) @@ -416,7 +416,7 @@ class TestUserCLI(UffdTestCase): self.assertIsNotNone(user) self.assertEqual(user.mail, 'newmail@example.com') self.assertEqual(user.displayname, 'New Display Name') - self.assertTrue(user.check_password('newpassword')) + self.assertTrue(user.password.verify('newpassword')) result = self.app.test_cli_runner().invoke(args=['user', 'update', 'testuser', '--add-role', 'admin', '--add-role', 'test']) self.assertEqual(result.exit_code, 0) with self.app.test_request_context(): diff --git a/uffd/api/views.py b/uffd/api/views.py index b086987848cecdd9c1d048a088e3bc083a36f907..16446fe79b76c8e9f8f81cc432daef6edaea2653 100644 --- a/uffd/api/views.py +++ b/uffd/api/views.py @@ -6,6 +6,7 @@ from flask import Blueprint, jsonify, current_app, request, abort from uffd.user.models import User, Group from uffd.mail.models import Mail, MailReceiveAddress, MailDestinationAddress from uffd.session.views import login_get_user, login_ratelimit +from uffd.database import db bp = Blueprint('api', __name__, template_folder='templates', url_prefix='/api/v1/') @@ -97,6 +98,9 @@ def checkpassword(): if user is None: login_ratelimit.log(username) return jsonify(None) + if user.password.needs_rehash: + user.password = password + db.session.commit() return jsonify(generate_user_dict(user)) def generate_mail_dict(mail): diff --git a/uffd/migrations/versions/af07cea65391_unified_password_hashing_for_user_and_signup.py b/uffd/migrations/versions/af07cea65391_unified_password_hashing_for_user_and_signup.py new file mode 100644 index 0000000000000000000000000000000000000000..87e6a48afacc79f6de76194f90efce06a69aace8 --- /dev/null +++ b/uffd/migrations/versions/af07cea65391_unified_password_hashing_for_user_and_signup.py @@ -0,0 +1,67 @@ +"""Unified password hashing for User and Signup + +Revision ID: af07cea65391 +Revises: 042879d5e3ac +Create Date: 2022-02-11 23:55:35.502529 + +""" +from alembic import op +import sqlalchemy as sa + +revision = 'af07cea65391' +down_revision = '042879d5e3ac' +branch_labels = None +depends_on = None + +meta = sa.MetaData(bind=op.get_bind()) +signup = sa.Table('signup', meta, + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('token', sa.String(length=128), nullable=False), + sa.Column('created', sa.DateTime(), nullable=False), + sa.Column('loginname', sa.Text(), nullable=True), + sa.Column('displayname', sa.Text(), nullable=True), + sa.Column('mail', sa.Text(), nullable=True), + sa.Column('pwhash', sa.Text(), nullable=True), + sa.Column('user_id', sa.Integer(), nullable=True), + sa.Column('type', sa.String(length=50), nullable=True), + sa.ForeignKeyConstraint(['user_id'], ['user.id'], name=op.f('fk_signup_user_id_user'), onupdate='CASCADE', ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id', name=op.f('pk_signup')), + sa.UniqueConstraint('user_id', name=op.f('uq_signup_user_id')) +) + +def upgrade(): + user = sa.Table('user', meta, + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('unix_uid', sa.Integer(), nullable=False), + sa.Column('loginname', sa.String(length=32), nullable=False), + sa.Column('displayname', sa.String(length=128), nullable=False), + sa.Column('mail', sa.String(length=128), nullable=False), + sa.Column('pwhash', sa.String(length=256), nullable=True), + sa.Column('is_service_user', sa.Boolean(), nullable=False), + sa.PrimaryKeyConstraint('id', name=op.f('pk_user')), + sa.UniqueConstraint('loginname', name=op.f('uq_user_loginname')), + sa.UniqueConstraint('unix_uid', name=op.f('uq_user_unix_uid')) + ) + with op.batch_alter_table('user', copy_from=user) as batch_op: + batch_op.alter_column('pwhash', existing_type=sa.String(length=256), type_=sa.Text()) + + op.execute(signup.update().values(pwhash=('{crypt}' + signup.c.pwhash))) + +def downgrade(): + user = sa.Table('user', meta, + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('unix_uid', sa.Integer(), nullable=False), + sa.Column('loginname', sa.String(length=32), nullable=False), + sa.Column('displayname', sa.String(length=128), nullable=False), + sa.Column('mail', sa.String(length=128), nullable=False), + sa.Column('pwhash', sa.Text(), nullable=True), + sa.Column('is_service_user', sa.Boolean(), nullable=False), + sa.PrimaryKeyConstraint('id', name=op.f('pk_user')), + sa.UniqueConstraint('loginname', name=op.f('uq_user_loginname')), + sa.UniqueConstraint('unix_uid', name=op.f('uq_user_unix_uid')) + ) + with op.batch_alter_table('user', copy_from=user) as batch_op: + batch_op.alter_column('pwhash', existing_type=sa.Text(), type_=sa.String(length=256)) + + op.execute(signup.update().values(pwhash=None).where(sa.not_(signup.c.pwhash.ilike('{crypt}%')))) + op.execute(signup.update().values(pwhash=sa.func.substr(signup.c.pwhash, len('{crypt}') + 1)).where(signup.c.pwhash.ilike('{crypt}%'))) diff --git a/uffd/password_hash.py b/uffd/password_hash.py new file mode 100644 index 0000000000000000000000000000000000000000..4862cc36562612604aaf35d5f9ff7d61309e7faa --- /dev/null +++ b/uffd/password_hash.py @@ -0,0 +1,273 @@ +import secrets +import hashlib +import base64 +from crypt import crypt + +def build_value(method_name, data): + return '{' + method_name + '}' + data + +def parse_value(value): + if value is not None and value.startswith('{') and '}' in value: + method_name, data = value[1:].split('}', 1) + return method_name.lower(), data + raise ValueError('Invalid password hash') + +class PasswordHashRegistry: + '''Factory for creating objects of the correct PasswordHash subclass for a + given password hash value''' + + def __init__(self): + self.methods = {} + + def register(self, cls): + assert cls.METHOD_NAME not in self.methods + self.methods[cls.METHOD_NAME] = cls + return cls + + def parse(self, value, **kwargs): + method_name, _ = parse_value(value) + method_cls = self.methods.get(method_name) + if method_cls is None: + raise ValueError(f'Unknown password hash method {method_name}') + return method_cls(value, **kwargs) + +registry = PasswordHashRegistry() + +class PasswordHash: + '''OpenLDAP-/NIS-style password hash + + Instances wrap password hash strings in the form "{METHOD_NAME}DATA". + + Allows gradual migration of password hashing methods by checking + PasswordHash.needs_rehash every time the password is processed and rehashing + it with PasswordHash.from_password if needed. For PasswordHash.needs_rehash + to work, the PasswordHash subclass for the current password hashing method + is instantiated with target_cls set to the PasswordHash subclass of the + intended hashing method. + + Instances should be created with PasswordHashRegistry.parse to get the + appropriate subclass based on the method name in a value.''' + + METHOD_NAME = None + + def __init__(self, value, target_cls=None): + method_name, data = parse_value(value) + if method_name != self.METHOD_NAME: + raise ValueError('Invalid password hash') + self.value = value + self.data = data + self.target_cls = target_cls or type(self) + + @classmethod + def from_password(cls, password): + raise NotImplementedError() + + def verify(self, password): + raise NotImplementedError() + + @property + def needs_rehash(self): + return not isinstance(self, self.target_cls) + +@registry.register +class PlaintextPasswordHash(PasswordHash): + '''Pseudo password hash for passwords stored without hashing + + Should only be used for migration of existing plaintext passwords. Add the + prefix "{plain}" for this.''' + + METHOD_NAME = 'plain' + + @classmethod + def from_password(cls, password): + return cls(build_value(cls.METHOD_NAME, password)) + + def verify(self, password): + return secrets.compare_digest(self.data, password) + +class HashlibPasswordHash(PasswordHash): + HASHLIB_ALGORITHM = None + + @classmethod + def from_password(cls, password): + ctx = hashlib.new(cls.HASHLIB_ALGORITHM, password.encode()) + return cls(build_value(cls.METHOD_NAME, base64.b64encode(ctx.digest()).decode())) + + def verify(self, password): + digest = base64.b64decode(self.data.encode()) + ctx = hashlib.new(self.HASHLIB_ALGORITHM, password.encode()) + return secrets.compare_digest(digest, ctx.digest()) + +class SaltedHashlibPasswordHash(PasswordHash): + HASHLIB_ALGORITHM = None + + @classmethod + def from_password(cls, password): + salt = secrets.token_bytes(8) + ctx = hashlib.new(cls.HASHLIB_ALGORITHM) + ctx.update(password.encode()) + ctx.update(salt) + return cls(build_value(cls.METHOD_NAME, base64.b64encode(ctx.digest()+salt).decode())) + + def verify(self, password): + data = base64.b64decode(self.data.encode()) + ctx = hashlib.new(self.HASHLIB_ALGORITHM) + digest = data[:ctx.digest_size] + salt = data[ctx.digest_size:] + ctx.update(password.encode()) + ctx.update(salt) + return secrets.compare_digest(digest, ctx.digest()) + +@registry.register +class MD5PasswordHash(HashlibPasswordHash): + METHOD_NAME = 'md5' + HASHLIB_ALGORITHM = 'md5' + +@registry.register +class SaltedMD5PasswordHash(SaltedHashlibPasswordHash): + METHOD_NAME = 'smd5' + HASHLIB_ALGORITHM = 'md5' + +@registry.register +class SHA1PasswordHash(HashlibPasswordHash): + METHOD_NAME = 'sha' + HASHLIB_ALGORITHM = 'sha1' + +@registry.register +class SaltedSHA1PasswordHash(SaltedHashlibPasswordHash): + METHOD_NAME = 'ssha' + HASHLIB_ALGORITHM = 'sha1' + +@registry.register +class SHA256PasswordHash(HashlibPasswordHash): + METHOD_NAME = 'sha256' + HASHLIB_ALGORITHM = 'sha256' + +@registry.register +class SaltedSHA256PasswordHash(SaltedHashlibPasswordHash): + METHOD_NAME = 'ssha256' + HASHLIB_ALGORITHM = 'sha256' + +@registry.register +class SHA384PasswordHash(HashlibPasswordHash): + METHOD_NAME = 'sha384' + HASHLIB_ALGORITHM = 'sha384' + +@registry.register +class SaltedSHA384PasswordHash(SaltedHashlibPasswordHash): + METHOD_NAME = 'ssha384' + HASHLIB_ALGORITHM = 'sha384' + +@registry.register +class SHA512PasswordHash(HashlibPasswordHash): + METHOD_NAME = 'sha512' + HASHLIB_ALGORITHM = 'sha512' + +@registry.register +class SaltedSHA512PasswordHash(SaltedHashlibPasswordHash): + METHOD_NAME = 'ssha512' + HASHLIB_ALGORITHM = 'sha512' + +@registry.register +class CryptPasswordHash(PasswordHash): + METHOD_NAME = 'crypt' + + @classmethod + def from_password(cls, password): + return cls(build_value(cls.METHOD_NAME, crypt(password))) + + def verify(self, password): + return secrets.compare_digest(crypt(password, self.data), self.data) + +class InvalidPasswordHash: + def __init__(self, value=None): + self.value = value + + # pylint: disable=no-self-use,unused-argument + def verify(self, password): + return False + + @property + def needs_rehash(self): + return True + + def __bool__(self): + return False + +# An alternative approach for the behaviour of PasswordHashAttribute would be +# to use sqlalchemy.TypeDecorator. A type decorator allows custom encoding and +# decoding of values coming from the database (when query results are loaded) +# and going into the database (when statements are executed). +# +# This has one downside: After setting e.g. user.password to a string value it +# remains a string value until the change is flushed. It is not possible to +# coerce values to PasswordHash objects as soon as they are set. +# +# This is too inconsistent. Code should be able to rely on user.password to +# always behave like a PasswordHash object. + +class PasswordHashAttribute: + '''Descriptor for wrapping an attribute storing a password hash string + + Usage example: + + >>> class User: + ... # Could e.g. be an SQLAlchemy.Column or just a simple attribute + ... _passord_hash = None + ... password = PasswordHashAttribute('_passord_hash', SHA512PasswordHash) + ... + >>> user = User() + >>> type(user.password) + <class 'uffd.password_hash.InvalidPasswordHash'> + >>> + >>> user._password_hash = '{plain}my_password' + >>> type(user.password) + <class 'uffd.password_hash.InvalidPasswordHash'> + >>> user.password.needs_rehash + True + >>> + >>> user.password = 'my_password' + >>> user._passord_hash + '{sha512}3ajDRohg3LJOIoq47kQgjUPrL1/So6U4uvvTnbT/EUyYKaZL0aRxDgwCH4pBNLai+LF+zMh//nnYRZ4t8pT7AQ==' + >>> type(user.password) + <class 'uffd.password_hash.SHA512PasswordHash'> + >>> + >>> user.password = None + >>> user._passord_hash is None + True + >>> type(user.password) + <class 'uffd.password_hash.InvalidPasswordHash'> + + When set to a (plaintext) password the underlying attribute is set to a hash + value for the password. When set to None the underlying attribute is also set + to None.''' + def __init__(self, attribute_name, method_cls): + self.attribute_name = attribute_name + self.method_cls = method_cls + + def __get__(self, obj, objtype=None): + if obj is None: + return self + value = getattr(obj, self.attribute_name) + try: + return registry.parse(value, target_cls=self.method_cls) + except ValueError: + return InvalidPasswordHash(value) + + def __set__(self, obj, value): + if value is None: + value = InvalidPasswordHash() + elif isinstance(value, str): + value = self.method_cls.from_password(value) + setattr(obj, self.attribute_name, value.value) + +# Hashing method for (potentially) low entropy secrets like user passwords. Is +# usually slow and uses salting to make dictionary attacks difficult. Note +# that SSHA512 is not slow and should be replaced with a modern alternative. +LowEntropyPasswordHash = SaltedSHA512PasswordHash + +# Hashing method for high entropy secrets like API keys. The secrets are +# generated instead of user-selected to ensure a high level of entropy. Is +# fast and does not need salting, since dictionary attacks are not feasable +# due to high entropy. +HighEntropyPasswordHash = SHA512PasswordHash diff --git a/uffd/session/views.py b/uffd/session/views.py index e09fac3fa12b1672948e70d976f675f4f4ba8e64..4d578fc29f3d28599eb78b82d3aa1f7f6f985ae5 100644 --- a/uffd/session/views.py +++ b/uffd/session/views.py @@ -35,7 +35,7 @@ def set_request_user(): def login_get_user(loginname, password): user = User.query.filter_by(loginname=loginname).one_or_none() - if user is None or not user.check_password(password): + if user is None or not user.password.verify(password): return None return user @@ -78,6 +78,9 @@ def login(): host_ratelimit.log() flash(_('Login name or password is wrong')) return render_template('session/login.html', ref=request.values.get('ref')) + if user.password.needs_rehash: + user.password = password + db.session.commit() if not user.is_in_group(current_app.config['ACL_ACCESS_GROUP']): flash(_('You do not have access to this service')) return render_template('session/login.html', ref=request.values.get('ref')) diff --git a/uffd/signup/models.py b/uffd/signup/models.py index e089b73fdc6231c871b003b48eb69c2a14514422..af19bd5d537a2f5de83d85a13dbf4394b7a66d0a 100644 --- a/uffd/signup/models.py +++ b/uffd/signup/models.py @@ -1,5 +1,4 @@ import datetime -from crypt import crypt from sqlalchemy import Column, Integer, String, DateTime, Text, ForeignKey from sqlalchemy.orm import relationship, backref @@ -7,6 +6,7 @@ from sqlalchemy.orm import relationship, backref from uffd.database import db from uffd.user.models import User from uffd.utils import token_urlfriendly +from uffd.password_hash import PasswordHashAttribute, LowEntropyPasswordHash class Signup(db.Model): '''Model that represents a self-signup request @@ -31,7 +31,8 @@ class Signup(db.Model): loginname = Column(Text) displayname = Column(Text) mail = Column(Text) - pwhash = Column(Text) + _password = Column('pwhash', Text) + password = PasswordHashAttribute('_password', LowEntropyPasswordHash) user_id = Column(Integer(), ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=True, unique=True) user = relationship('User', backref=backref('signups', cascade='all, delete-orphan')) @@ -41,15 +42,11 @@ class Signup(db.Model): 'polymorphic_on': type } - # Write-only property - def password(self, value): + def set_password(self, value): if not User().set_password(value): - return - self.pwhash = crypt(value) - password = property(fset=password) - - def check_password(self, value): - return self.pwhash is not None and crypt(value, self.pwhash) == self.pwhash + return False + self.password = value + return True @property def expired(self): @@ -73,7 +70,7 @@ class Signup(db.Model): return False, 'Display name is invalid' if not User().set_mail(self.mail): return False, 'Mail address is invalid' - if self.pwhash is None: + if not self.password: return False, 'Invalid password' if User.query.filter_by(loginname=self.loginname).all(): return False, 'A user with this login name already exists' @@ -92,16 +89,16 @@ class Signup(db.Model): User object.''' if self.completed or self.expired: return None, 'Invalid signup request' - if not self.check_password(password): + if not self.password.verify(password): return None, 'Wrong password' if User.query.filter_by(loginname=self.loginname).all(): return None, 'A user with this login name already exists' - user = User(loginname=self.loginname, displayname=self.displayname, mail=self.mail, password=password) + user = User(loginname=self.loginname, displayname=self.displayname, mail=self.mail, password=self.password) db.session.add(user) user.update_groups() # pylint: disable=no-member self.user = user self.loginname = None self.displayname = None self.mail = None - self.pwhash = None + self.password = None return user, 'Success' diff --git a/uffd/signup/views.py b/uffd/signup/views.py index 88cda5f991b12dc52ccafc9e506922007c44cb6c..45d38b921819e4835ac3af24498e1315a75830bf 100644 --- a/uffd/signup/views.py +++ b/uffd/signup/views.py @@ -57,7 +57,11 @@ def signup_submit(): host_ratelimit.log() signup = Signup(loginname=request.form['loginname'], displayname=request.form['displayname'], - mail=request.form['mail'], password=request.form['password1']) + mail=request.form['mail']) + # If the password is invalid, signup.set_password returns False and does not + # set signup.password. We don't need to check the return value here, because + # we call signup.verify next and that checks if signup.password is set. + signup.set_password(request.form['password1']) valid, msg = signup.validate() if not valid: return render_template('signup/start.html', error=msg) @@ -90,7 +94,7 @@ def signup_confirm_submit(signup_id, token): return render_template('signup/confirm.html', signup=signup, error=_('Too many failed attempts! Please wait %(delay)s.', delay=format_delay(confirm_delay))) if host_delay: return render_template('signup/confirm.html', signup=signup, error=_('Too many requests! Please wait %(delay)s.', delay=format_delay(host_delay))) - if not signup.check_password(request.form['password']): + if not signup.password.verify(request.form['password']): host_ratelimit.log() confirm_ratelimit.log(token) return render_template('signup/confirm.html', signup=signup, error=_('Wrong password')) diff --git a/uffd/user/models.py b/uffd/user/models.py index ad6f3a7a7470323a1044acc692b4339c9ac4a905..eee54cd0405f1d4f43d555e91237b774487a8cff 100644 --- a/uffd/user/models.py +++ b/uffd/user/models.py @@ -1,43 +1,14 @@ -import secrets import string import re -import hashlib -import base64 from flask import current_app, escape from flask_babel import lazy_gettext -from sqlalchemy import Column, Integer, String, ForeignKey, Boolean +from sqlalchemy import Column, Integer, String, ForeignKey, Boolean, Text from sqlalchemy.orm import relationship from sqlalchemy.sql.expression import func from uffd.database import db - -# Interface inspired by argon2-cffi -class PasswordHasher: - # pylint: disable=no-self-use - def hash(self, password): - salt = secrets.token_bytes(8) - ctx = hashlib.sha512() - ctx.update(password.encode()) - ctx.update(salt) - return '{ssha512}'+base64.b64encode(ctx.digest()+salt).decode() - - def verify(self, hash, password): - if hash is None: - return False - if hash.startswith('{ssha512}'): - data = base64.b64decode(hash[len('{ssha512}'):].encode()) - ctx = hashlib.sha512() - digest = data[:ctx.digest_size] - salt = data[ctx.digest_size:] - ctx.update(password.encode()) - ctx.update(salt) - return secrets.compare_digest(digest, ctx.digest()) - return False - - # pylint: disable=unused-argument - def check_needs_rehash(self, hash): - return False +from uffd.password_hash import PasswordHashAttribute, LowEntropyPasswordHash def get_next_unix_uid(context): is_service_user = bool(context.get_current_parameters().get('is_service_user', False)) @@ -82,7 +53,8 @@ class User(db.Model): loginname = Column(String(32), unique=True, nullable=False) displayname = Column(String(128), nullable=False) mail = Column(String(128), nullable=False) - pwhash = Column(String(256), nullable=True) + _password = Column('pwhash', Text(), nullable=True) + password = PasswordHashAttribute('_password', LowEntropyPasswordHash) is_service_user = Column(Boolean(), default=False, nullable=False) groups = relationship('Group', secondary='user_groups', back_populates='members') roles = relationship('Role', secondary='role_members', back_populates='members') @@ -91,14 +63,6 @@ class User(db.Model): def unix_gid(self): return current_app.config['USER_GID'] - # Write-only property - def password(self, value): - self.pwhash = PasswordHasher().hash(value) - password = property(fset=password) - - def check_password(self, value): - return PasswordHasher().verify(self.pwhash, value) - def is_in_group(self, name): if not name: return True