Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Select Git revision
Show changes
Showing
with 1389 additions and 1995 deletions
uid=testuser,ou=users,dc=example,dc=com
uid=testadmin,ou=users,dc=example,dc=com
uid=newuser,ou=users,dc=example,dc=com
uid=newuser1,ou=users,dc=example,dc=com
uid=newuser2,ou=users,dc=example,dc=com
uid=newuser3,ou=users,dc=example,dc=com
uid=newuser4,ou=users,dc=example,dc=com
uid=newuser5,ou=users,dc=example,dc=com
uid=newuser6,ou=users,dc=example,dc=com
uid=newuser7,ou=users,dc=example,dc=com
uid=newuser8,ou=users,dc=example,dc=com
uid=newuser9,ou=users,dc=example,dc=com
uid=newuser10,ou=users,dc=example,dc=com
uid=newuser11,ou=users,dc=example,dc=com
uid=newuser12,ou=users,dc=example,dc=com
uid=test,ou=postfix,dc=example,dc=com
uid=test1,ou=postfix,dc=example,dc=com
version: 1
dn: cn=users,ou=groups,dc=example,dc=com
changetype: modify
add: uniqueMember
uniqueMember: uid=testuser,ou=users,dc=example,dc=com
uniqueMember: uid=testadmin,ou=users,dc=example,dc=com
dn: cn=uffd_access,ou=groups,dc=example,dc=com
changetype: modify
add: uniqueMember
uniqueMember: uid=testuser,ou=users,dc=example,dc=com
uniqueMember: uid=testadmin,ou=users,dc=example,dc=com
dn: cn=uffd_admin,ou=groups,dc=example,dc=com
changetype: modify
add: uniqueMember
uniqueMember: uid=testadmin,ou=users,dc=example,dc=com
{
"entries": [
{
"dn": "uid=testuser,ou=users,dc=example,dc=com",
"raw": {
"cn": [
"Test User"
],
"createTimestamp": [
"20200101000000Z"
],
"creatorsName": [
"cn=admin,dc=example,dc=com"
],
"displayName": [
"Test User"
],
"entryDN": [
"uid=testuser,ou=users,dc=example,dc=com"
],
"entryUUID": [
"75e62c6a-03c2-11eb-adc1-0242ac120002"
],
"gidNumber": [
"20001"
],
"givenName": [
"Test User"
],
"hasSubordinates": [
"FALSE"
],
"homeDirectory": [
"/home/testuser"
],
"mail": [
"testuser@example.com"
],
"memberOf": [
"cn=uffd_access,ou=groups,dc=example,dc=com",
"cn=users,ou=groups,dc=example,dc=com"
],
"modifiersName": [
"cn=admin,dc=example,dc=com"
],
"modifyTimestamp": [
"20200101000000Z"
],
"objectClass": [
"top",
"inetOrgPerson",
"organizationalPerson",
"person",
"posixAccount"
],
"sn": [
" "
],
"structuralObjectClass": [
"inetOrgPerson"
],
"subschemaSubentry": [
"cn=Subschema"
],
"uid": [
"testuser"
],
"uidNumber": [
"10000"
],
"userPassword": [
"userpassword"
]
}
},
{
"dn": "uid=testadmin,ou=users,dc=example,dc=com",
"raw": {
"cn": [
"Test Admin"
],
"createTimestamp": [
"20200101000000Z"
],
"creatorsName": [
"cn=admin,dc=example,dc=com"
],
"displayName": [
"Test Admin"
],
"entryDN": [
"uid=testadmin,ou=users,dc=example,dc=com"
],
"entryUUID": [
"678c8470-03c2-11eb-adc1-0242ac120002"
],
"gidNumber": [
"20001"
],
"givenName": [
"Test Admin"
],
"hasSubordinates": [
"FALSE"
],
"homeDirectory": [
"/home/testadmin"
],
"mail": [
"testadmin@example.com"
],
"memberOf": [
"cn=users,ou=groups,dc=example,dc=com",
"cn=uffd_access,ou=groups,dc=example,dc=com",
"cn=uffd_admin,ou=groups,dc=example,dc=com"
],
"modifiersName": [
"cn=admin,dc=example,dc=com"
],
"modifyTimestamp": [
"20200101000000Z"
],
"objectClass": [
"top",
"inetOrgPerson",
"organizationalPerson",
"person",
"posixAccount"
],
"sn": [
" "
],
"structuralObjectClass": [
"inetOrgPerson"
],
"subschemaSubentry": [
"cn=Subschema"
],
"uid": [
"testadmin"
],
"uidNumber": [
"10001"
],
"userPassword": [
"adminpassword"
]
}
},
{
"dn": "cn=users,ou=groups,dc=example,dc=com",
"raw": {
"cn": [
"users"
],
"createTimestamp": [
"20200101000000Z"
],
"creatorsName": [
"cn=admin,dc=example,dc=com"
],
"description": [
"Base group for all users"
],
"entryDN": [
"cn=users,ou=groups,dc=example,dc=com"
],
"entryUUID": [
"1aec0e8c-03c3-11eb-adc1-0242ac120002"
],
"gidNumber": [
"20001"
],
"hasSubordinates": [
"FALSE"
],
"modifiersName": [
"cn=admin,dc=example,dc=com"
],
"modifyTimestamp": [
"20200101000000Z"
],
"objectClass": [
"posixGroup",
"groupOfUniqueNames",
"top"
],
"structuralObjectClass": [
"groupOfUniqueNames"
],
"subschemaSubentry": [
"cn=Subschema"
],
"uniqueMember": [
"cn=dummy,ou=system,dc=example,dc=com",
"uid=testuser,ou=users,dc=example,dc=com",
"uid=testadmin,ou=users,dc=example,dc=com"
]
}
},
{
"dn": "cn=uffd_access,ou=groups,dc=example,dc=com",
"raw": {
"cn": [
"uffd_access"
],
"createTimestamp": [
"20200101000000Z"
],
"creatorsName": [
"cn=admin,dc=example,dc=com"
],
"description": [
"User access to uffd selfservice"
],
"entryDN": [
"cn=uffd_access,ou=groups,dc=example,dc=com"
],
"entryUUID": [
"4fc8dd60-03c3-11eb-adc1-0242ac120002"
],
"gidNumber": [
"20002"
],
"hasSubordinates": [
"FALSE"
],
"modifiersName": [
"cn=admin,dc=example,dc=com"
],
"modifyTimestamp": [
"20200101000000Z"
],
"objectClass": [
"posixGroup",
"groupOfUniqueNames",
"top"
],
"structuralObjectClass": [
"groupOfUniqueNames"
],
"subschemaSubentry": [
"cn=Subschema"
],
"uniqueMember": [
"cn=dummy,ou=system,dc=example,dc=com",
"uid=testuser,ou=users,dc=example,dc=com",
"uid=testadmin,ou=users,dc=example,dc=com"
]
}
},
{
"dn": "cn=uffd_admin,ou=groups,dc=example,dc=com",
"raw": {
"cn": [
"uffd_admin"
],
"createTimestamp": [
"20200101000000Z"
],
"creatorsName": [
"cn=admin,dc=example,dc=com"
],
"description": [
"Admin access to uffd selfservice"
],
"entryDN": [
"cn=uffd_admin,ou=groups,dc=example,dc=com"
],
"entryUUID": [
"b5d869d6-03c3-11eb-adc1-0242ac120002"
],
"gidNumber": [
"20003"
],
"hasSubordinates": [
"FALSE"
],
"modifiersName": [
"cn=admin,dc=example,dc=com"
],
"modifyTimestamp": [
"20200101000000Z"
],
"objectClass": [
"posixGroup",
"groupOfUniqueNames",
"top"
],
"structuralObjectClass": [
"groupOfUniqueNames"
],
"subschemaSubentry": [
"cn=Subschema"
],
"uniqueMember": [
"cn=dummy,ou=system,dc=example,dc=com",
"uid=testadmin,ou=users,dc=example,dc=com"
]
}
},
{
"dn": "uid=test,ou=postfix,dc=example,dc=com",
"raw": {
"createTimestamp": [
"20200101000000Z"
],
"creatorsName": [
"cn=admin,dc=example,dc=com"
],
"entryDN": [
"uid=test,ou=postfix,dc=example,dc=com"
],
"entryUUID": [
"926e5273-a545-4dfe-8f20-d1eeaf41d796"
],
"hasSubordinates": [
"FALSE"
],
"mailacceptinggeneralid": [
"test1@example.com",
"test2@example.com"
],
"maildrop": [
"testuser@mail.example.com"
],
"modifiersName": [
"cn=admin,dc=example,dc=com"
],
"modifyTimestamp": [
"20200101000000Z"
],
"objectClass": [
"top",
"postfixVirtual"
],
"structuralObjectClass": [
"postfixVirtual"
],
"subschemaSubentry": [
"cn=Subschema"
],
"uid": [
"test"
]
}
}
]
}
{
"raw": {
"altServer": [],
"configContext": [
"cn=config"
],
"entryDN": [
""
],
"namingContexts": [
"dc=example,dc=com"
],
"objectClass": [
"top",
"OpenLDAProotDSE"
],
"structuralObjectClass": [
"OpenLDAProotDSE"
],
"subschemaSubentry": [
"cn=Subschema"
],
"supportedCapabilities": [],
"supportedControl": [
"2.16.840.1.113730.3.4.18",
"2.16.840.1.113730.3.4.2",
"1.3.6.1.4.1.4203.1.10.1",
"1.3.6.1.1.22",
"1.2.840.113556.1.4.319",
"1.2.826.0.1.3344810.2.3",
"1.3.6.1.1.13.2",
"1.3.6.1.1.13.1",
"1.3.6.1.1.12"
],
"supportedExtension": [
"1.3.6.1.4.1.1466.20037",
"1.3.6.1.4.1.4203.1.11.1",
"1.3.6.1.4.1.4203.1.11.3",
"1.3.6.1.1.8"
],
"supportedFeatures": [
"1.3.6.1.1.14",
"1.3.6.1.4.1.4203.1.5.1",
"1.3.6.1.4.1.4203.1.5.2",
"1.3.6.1.4.1.4203.1.5.3",
"1.3.6.1.4.1.4203.1.5.4",
"1.3.6.1.4.1.4203.1.5.5"
],
"supportedLDAPVersion": [
"3"
],
"supportedSASLMechanisms": [
"DIGEST-MD5",
"CRAM-MD5",
"NTLM"
],
"vendorName": [],
"vendorVersion": []
},
"type": "DsaInfo"
}
This diff is collapsed.
......@@ -2,7 +2,7 @@ import unittest
from flask import Flask, Blueprint, session, url_for
from uffd.csrf import csrf_bp, csrf_protect
from uffd.csrf import bp as csrf_bp, csrf_protect
uid_counter = 0
......
import unittest
import datetime
import time
from flask import url_for, session, request
# These imports are required, because otherwise we get circular imports?!
from uffd import ldap, user
from uffd.user.models import User
from uffd.mfa.models import MFAMethod, MFAType, RecoveryCodeMethod, TOTPMethod, WebauthnMethod, _hotp
from uffd import create_app, db
from utils import dump, UffdTestCase
class TestMfaPrimitives(unittest.TestCase):
def test_hotp(self):
self.assertEqual(_hotp(5555555, b'\xae\xa3T\x05\x89\xd6\xb76\xf61r\x92\xcc\xb5WZ\xe6)\x05q'), '458290')
self.assertEqual(_hotp(5555555, b'\xae\xa3T\x05\x89\xd6\xb76\xf61r\x92\xcc\xb5WZ\xe6)\x05q', digits=8), '20458290')
for digits in range(1, 10):
self.assertEqual(len(_hotp(1, b'abcd', digits=digits)), digits)
self.assertEqual(_hotp(1234, b''), '161024')
self.assertEqual(_hotp(0, b'\x04\x8fM\xcc\x7f\x82\x9c$a\x1b\xb3'), '279354')
self.assertEqual(_hotp(2**64-1, b'abcde'), '899292')
def get_fido2_test_cred(self):
try:
from fido2.ctap2 import AttestedCredentialData
except ImportError:
self.skipTest('fido2 could not be imported')
# Example public key from webauthn spec 6.5.1.1
return AttestedCredentialData(bytes.fromhex('00000000000000000000000000000000'+'0040'+'053cbcc9d37a61d3bac87cdcc77ee326256def08ab15775d3a720332e4101d14fae95aeee3bc9698781812e143c0597dc6e180595683d501891e9dd030454c0a'+'A501020326200121582065eda5a12577c2bae829437fe338701a10aaa375e1bb5b5de108de439c08551d2258201e52ed75701163f7f9e40ddf9f341b3dc9ba860af7e0ca7ca7e9eecd0084d19c'))
class TestMfaMethodModels(UffdTestCase):
def test_common_attributes(self):
method = MFAMethod(user=self.get_user(), name='testname')
self.assertTrue(method.created <= datetime.datetime.now())
self.assertEqual(method.name, 'testname')
self.assertEqual(method.user.loginname, 'testuser')
method.user = self.get_admin()
self.assertEqual(method.user.loginname, 'testadmin')
def test_recovery_code_method(self):
method = RecoveryCodeMethod(user=self.get_user())
db.session.add(method)
db.session.commit()
db.session = db.create_scoped_session() # Ensure the next query does not return the cached method object
_method = RecoveryCodeMethod.query.get(method.id)
self.assertFalse(hasattr(_method, 'code'))
self.assertFalse(_method.verify(''))
self.assertFalse(_method.verify('A'*8))
self.assertTrue(_method.verify(method.code))
def test_totp_method_attributes(self):
method = TOTPMethod(user=self.get_user(), name='testname')
self.assertEqual(method.name, 'testname')
# Restore method with key parameter
_method = TOTPMethod(user=self.get_user(), key=method.key, name='testname')
self.assertEqual(_method.name, 'testname')
self.assertEqual(method.raw_key, _method.raw_key)
self.assertEqual(method.issuer, _method.issuer)
self.assertEqual(method.accountname, _method.accountname)
self.assertEqual(method.key_uri, _method.key_uri)
db.session.add(method)
db.session.commit()
db.session = db.create_scoped_session() # Ensure the next query does not return the cached method object
# Restore method from db
_method = TOTPMethod.query.get(method.id)
self.assertEqual(_method.name, 'testname')
self.assertEqual(method.raw_key, _method.raw_key)
self.assertEqual(method.issuer, _method.issuer)
self.assertEqual(method.accountname, _method.accountname)
self.assertEqual(method.key_uri, _method.key_uri)
def test_totp_method_verify(self):
method = TOTPMethod(user=self.get_user())
counter = int(time.time()/30)
self.assertFalse(method.verify(''))
self.assertFalse(method.verify(_hotp(counter-2, method.raw_key)))
self.assertTrue(method.verify(_hotp(counter, method.raw_key)))
self.assertFalse(method.verify(_hotp(counter+2, method.raw_key)))
def test_webauthn_method(self):
data = get_fido2_test_cred(self)
method = WebauthnMethod(user=self.get_user(), cred=data, name='testname')
self.assertEqual(method.name, 'testname')
db.session.add(method)
db.session.commit()
db.session = db.create_scoped_session() # Ensure the next query does not return the cached method object
_method = WebauthnMethod.query.get(method.id)
self.assertEqual(_method.name, 'testname')
self.assertEqual(bytes(method.cred), bytes(_method.cred))
self.assertEqual(data.credential_id, _method.cred.credential_id)
self.assertEqual(data.public_key, _method.cred.public_key)
# We only test (de-)serialization here, as everything else is currently implemented in the views
class TestMfaViews(UffdTestCase):
def setUp(self):
super().setUp()
db.session.add(RecoveryCodeMethod(user=self.get_admin()))
db.session.add(TOTPMethod(user=self.get_admin(), name='Admin Phone'))
# We don't want to skip all tests only because fido2 is not installed!
#db.session.add(WebauthnMethod(user=get_testadmin(), cred=get_fido2_test_cred(self), name='Admin FIDO2 dongle'))
db.session.commit()
def add_recovery_codes(self, count=10):
user = self.get_user()
for _ in range(count):
db.session.add(RecoveryCodeMethod(user=user))
db.session.commit()
def add_totp(self):
db.session.add(TOTPMethod(user=self.get_user(), name='My phone'))
db.session.commit()
def add_webauthn(self):
db.session.add(WebauthnMethod(user=self.get_user(), cred=get_fido2_test_cred(self), name='My FIDO2 dongle'))
db.session.commit()
def test_setup_disabled(self):
self.login_as('user')
r = self.client.get(path=url_for('mfa.setup'), follow_redirects=True)
dump('mfa_setup_disabled', r)
self.assertEqual(r.status_code, 200)
def test_setup_recovery_codes(self):
self.login_as('user')
self.add_recovery_codes()
r = self.client.get(path=url_for('mfa.setup'), follow_redirects=True)
dump('mfa_setup_only_recovery_codes', r)
self.assertEqual(r.status_code, 200)
def test_setup_enabled(self):
self.login_as('user')
self.add_recovery_codes()
self.add_totp()
self.add_webauthn()
r = self.client.get(path=url_for('mfa.setup'), follow_redirects=True)
dump('mfa_setup_enabled', r)
self.assertEqual(r.status_code, 200)
def test_setup_few_recovery_codes(self):
self.login_as('user')
self.add_totp()
self.add_recovery_codes(1)
r = self.client.get(path=url_for('mfa.setup'), follow_redirects=True)
dump('mfa_setup_few_recovery_codes', r)
self.assertEqual(r.status_code, 200)
def test_setup_no_recovery_codes(self):
self.login_as('user')
self.add_totp()
r = self.client.get(path=url_for('mfa.setup'), follow_redirects=True)
dump('mfa_setup_no_recovery_codes', r)
self.assertEqual(r.status_code, 200)
def test_disable(self):
self.login_as('user')
self.add_recovery_codes()
self.add_totp()
admin_methods = len(MFAMethod.query.filter_by(dn=self.get_admin().dn).all())
r = self.client.get(path=url_for('mfa.disable'), follow_redirects=True)
dump('mfa_disable', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('mfa.disable_confirm'), follow_redirects=True)
dump('mfa_disable_submit', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(MFAMethod.query.filter_by(dn=request.user.dn).all()), 0)
self.assertEqual(len(MFAMethod.query.filter_by(dn=self.get_admin().dn).all()), admin_methods)
def test_disable_recovery_only(self):
self.login_as('user')
self.add_recovery_codes()
admin_methods = len(MFAMethod.query.filter_by(dn=self.get_admin().dn).all())
self.assertNotEqual(len(MFAMethod.query.filter_by(dn=request.user.dn).all()), 0)
r = self.client.get(path=url_for('mfa.disable'), follow_redirects=True)
dump('mfa_disable_recovery_only', r)
self.assertEqual(r.status_code, 200)
r = self.client.post(path=url_for('mfa.disable_confirm'), follow_redirects=True)
dump('mfa_disable_recovery_only_submit', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(MFAMethod.query.filter_by(dn=request.user.dn).all()), 0)
self.assertEqual(len(MFAMethod.query.filter_by(dn=self.get_admin().dn).all()), admin_methods)
def test_admin_disable(self):
for method in MFAMethod.query.filter_by(dn=self.get_admin().dn).all():
if not isinstance(method, RecoveryCodeMethod):
db.session.delete(method)
db.session.commit()
self.add_recovery_codes()
self.add_totp()
self.login_as('admin')
self.assertIsNotNone(request.user)
admin_methods = len(MFAMethod.query.filter_by(dn=self.get_admin().dn).all())
r = self.client.get(path=url_for('mfa.admin_disable', uid=self.get_user().uid), follow_redirects=True)
dump('mfa_admin_disable', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(MFAMethod.query.filter_by(dn=self.get_user().dn).all()), 0)
self.assertEqual(len(MFAMethod.query.filter_by(dn=self.get_admin().dn).all()), admin_methods)
def test_setup_recovery(self):
self.login_as('user')
self.assertEqual(len(RecoveryCodeMethod.query.filter_by(dn=request.user.dn).all()), 0)
r = self.client.post(path=url_for('mfa.setup_recovery'), follow_redirects=True)
dump('mfa_setup_recovery', r)
self.assertEqual(r.status_code, 200)
methods = RecoveryCodeMethod.query.filter_by(dn=request.user.dn).all()
self.assertNotEqual(len(methods), 0)
r = self.client.post(path=url_for('mfa.setup_recovery'), follow_redirects=True)
dump('mfa_setup_recovery_reset', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(RecoveryCodeMethod.query.filter_by(id=methods[0].id).all()), 0)
self.assertNotEqual(len(methods), 0)
def test_setup_totp(self):
self.login_as('user')
self.add_recovery_codes()
r = self.client.get(path=url_for('mfa.setup_totp', name='My TOTP Authenticator'), follow_redirects=True)
dump('mfa_setup_totp', r)
self.assertEqual(r.status_code, 200)
self.assertNotEqual(len(session.get('mfa_totp_key', '')), 0)
def test_setup_totp_without_recovery(self):
self.login_as('user')
r = self.client.get(path=url_for('mfa.setup_totp', name='My TOTP Authenticator'), follow_redirects=True)
dump('mfa_setup_totp_without_recovery', r)
self.assertEqual(r.status_code, 200)
def test_setup_totp_finish(self):
self.login_as('user')
self.add_recovery_codes()
self.assertEqual(len(TOTPMethod.query.filter_by(dn=request.user.dn).all()), 0)
r = self.client.get(path=url_for('mfa.setup_totp', name='My TOTP Authenticator'), follow_redirects=True)
method = TOTPMethod(request.user, key=session.get('mfa_totp_key', ''))
code = _hotp(int(time.time()/30), method.raw_key)
r = self.client.post(path=url_for('mfa.setup_totp_finish', name='My TOTP Authenticator'), data={'code': code}, follow_redirects=True)
dump('mfa_setup_totp_finish', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(TOTPMethod.query.filter_by(dn=request.user.dn).all()), 1)
def test_setup_totp_finish_without_recovery(self):
self.login_as('user')
self.assertEqual(len(TOTPMethod.query.filter_by(dn=request.user.dn).all()), 0)
r = self.client.get(path=url_for('mfa.setup_totp', name='My TOTP Authenticator'), follow_redirects=True)
method = TOTPMethod(request.user, key=session.get('mfa_totp_key', ''))
code = _hotp(int(time.time()/30), method.raw_key)
r = self.client.post(path=url_for('mfa.setup_totp_finish', name='My TOTP Authenticator'), data={'code': code}, follow_redirects=True)
dump('mfa_setup_totp_finish_without_recovery', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(TOTPMethod.query.filter_by(dn=request.user.dn).all()), 0)
def test_setup_totp_finish_wrong_code(self):
self.login_as('user')
self.add_recovery_codes()
self.assertEqual(len(TOTPMethod.query.filter_by(dn=request.user.dn).all()), 0)
r = self.client.get(path=url_for('mfa.setup_totp', name='My TOTP Authenticator'), follow_redirects=True)
method = TOTPMethod(request.user, key=session.get('mfa_totp_key', ''))
code = _hotp(int(time.time()/30), method.raw_key)
code = str(int(code[0])+1)[-1] + code[1:]
r = self.client.post(path=url_for('mfa.setup_totp_finish', name='My TOTP Authenticator'), data={'code': code}, follow_redirects=True)
dump('mfa_setup_totp_finish_wrong_code', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(TOTPMethod.query.filter_by(dn=request.user.dn).all()), 0)
def test_setup_totp_finish_empty_code(self):
self.login_as('user')
self.add_recovery_codes()
self.assertEqual(len(TOTPMethod.query.filter_by(dn=request.user.dn).all()), 0)
r = self.client.get(path=url_for('mfa.setup_totp', name='My TOTP Authenticator'), follow_redirects=True)
r = self.client.post(path=url_for('mfa.setup_totp_finish', name='My TOTP Authenticator'), data={'code': ''}, follow_redirects=True)
dump('mfa_setup_totp_finish_empty_code', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(TOTPMethod.query.filter_by(dn=request.user.dn).all()), 0)
def test_delete_totp(self):
self.login_as('user')
self.add_recovery_codes()
self.add_totp()
method = TOTPMethod(request.user, name='test')
db.session.add(method)
db.session.commit()
self.assertEqual(len(TOTPMethod.query.filter_by(dn=request.user.dn).all()), 2)
r = self.client.get(path=url_for('mfa.delete_totp', id=method.id), follow_redirects=True)
dump('mfa_delete_totp', r)
self.assertEqual(r.status_code, 200)
self.assertEqual(len(TOTPMethod.query.filter_by(id=method.id).all()), 0)
self.assertEqual(len(TOTPMethod.query.filter_by(dn=request.user.dn).all()), 1)
# TODO: webauthn setup tests
def test_auth_integration(self):
self.add_recovery_codes()
self.add_totp()
db.session.commit()
self.assertIsNone(request.user)
r = self.login_as('user')
dump('mfa_auth_redirected', r)
self.assertEqual(r.status_code, 200)
self.assertIn(b'/mfa/auth', r.data)
self.assertIsNone(request.user)
r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False)
dump('mfa_auth', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
def test_auth_disabled(self):
self.assertIsNone(request.user)
self.login_as('user')
r = self.client.get(path=url_for('mfa.auth', ref='/redirecttarget'), follow_redirects=False)
self.assertEqual(r.status_code, 302)
self.assertTrue(r.location.endswith('/redirecttarget'))
self.assertIsNotNone(request.user)
def test_auth_recovery_only(self):
self.add_recovery_codes()
self.assertIsNone(request.user)
self.login_as('user')
r = self.client.get(path=url_for('mfa.auth', ref='/redirecttarget'), follow_redirects=False)
self.assertEqual(r.status_code, 302)
self.assertTrue(r.location.endswith('/redirecttarget'))
self.assertIsNotNone(request.user)
def test_auth_recovery_code(self):
self.add_recovery_codes()
self.add_totp()
method = RecoveryCodeMethod(user=self.get_user())
db.session.add(method)
db.session.commit()
method_id = method.id
self.login_as('user')
r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False)
dump('mfa_auth_recovery_code', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
r = self.client.post(path=url_for('mfa.auth_finish', ref='/redirecttarget'), data={'code': method.code})
self.assertEqual(r.status_code, 302)
self.assertTrue(r.location.endswith('/redirecttarget'))
self.assertIsNotNone(request.user)
self.assertEqual(len(RecoveryCodeMethod.query.filter_by(id=method_id).all()), 0)
def test_auth_totp_code(self):
self.add_recovery_codes()
self.add_totp()
method = TOTPMethod(user=self.get_user(), name='testname')
raw_key = method.raw_key
db.session.add(method)
db.session.commit()
self.login_as('user')
r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False)
dump('mfa_auth_totp_code', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
code = _hotp(int(time.time()/30), raw_key)
r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': code}, follow_redirects=True)
dump('mfa_auth_totp_code_submit', r)
self.assertEqual(r.status_code, 200)
self.assertIsNotNone(request.user)
def test_auth_empty_code(self):
self.add_recovery_codes()
self.add_totp()
self.login_as('user')
r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': ''}, follow_redirects=True)
dump('mfa_auth_empty_code', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
def test_auth_invalid_code(self):
self.add_recovery_codes()
self.add_totp()
method = TOTPMethod(user=self.get_user(), name='testname')
raw_key = method.raw_key
db.session.add(method)
db.session.commit()
self.login_as('user')
r = self.client.get(path=url_for('mfa.auth'), follow_redirects=False)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
code = _hotp(int(time.time()/30), raw_key)
code = str(int(code[0])+1)[-1] + code[1:]
r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': code}, follow_redirects=True)
dump('mfa_auth_invalid_code', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
def test_auth_ratelimit(self):
self.add_recovery_codes()
self.add_totp()
method = TOTPMethod(user=self.get_user(), name='testname')
raw_key = method.raw_key
db.session.add(method)
db.session.commit()
self.login_as('user')
self.assertIsNone(request.user)
code = _hotp(int(time.time()/30), raw_key)
inv_code = str(int(code[0])+1)[-1] + code[1:]
for i in range(20):
r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': inv_code}, follow_redirects=True)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
r = self.client.post(path=url_for('mfa.auth_finish'), data={'code': code}, follow_redirects=True)
dump('mfa_auth_ratelimit', r)
self.assertEqual(r.status_code, 200)
self.assertIsNone(request.user)
# TODO: webauthn auth tests
class TestMfaViewsOL(TestMfaViews):
use_openldap = True
This diff is collapsed.
This diff is collapsed.
import time
from uffd.models.ratelimit import get_addrkey, format_delay, Ratelimit
from flask import Flask, Blueprint, session, url_for
from uffd.ratelimit import get_addrkey, format_delay, Ratelimit, RatelimitEvent
from utils import UffdTestCase
from tests.utils import UffdTestCase
class TestRatelimit(UffdTestCase):
def test_limiting(self):
......@@ -48,19 +44,3 @@ class TestRatelimit(UffdTestCase):
self.assertIsInstance(format_delay(120), str)
self.assertIsInstance(format_delay(3600), str)
self.assertIsInstance(format_delay(4000), str)
def test_cleanup(self):
ratelimit = Ratelimit('test', 1, 1)
ratelimit.log('')
ratelimit.log('1')
ratelimit.log('2')
ratelimit.log('3')
ratelimit.log('4')
time.sleep(1)
ratelimit.log('5')
self.assertEqual(RatelimitEvent.query.filter(RatelimitEvent.name == 'test').count(), 6)
ratelimit.cleanup()
self.assertEqual(RatelimitEvent.query.filter(RatelimitEvent.name == 'test').count(), 1)
time.sleep(1)
ratelimit.cleanup()
self.assertEqual(RatelimitEvent.query.filter(RatelimitEvent.name == 'test').count(), 0)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
import unittest
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from uffd.tasks import CleanupTask
class TestCleanupTask(unittest.TestCase):
def test(self):
app = Flask(__name__)
app.testing = True
app.debug = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
db = SQLAlchemy(app)
cleanup_task = CleanupTask()
@cleanup_task.delete_by_attribute('delete_me')
class TestModel(db.Model):
id = db.Column(db.Integer(), primary_key=True, autoincrement=True)
delete_me = db.Column(db.Boolean(), default=False, nullable=False)
with app.test_request_context():
db.create_all()
db.session.add(TestModel(delete_me=True))
db.session.add(TestModel(delete_me=True))
db.session.add(TestModel(delete_me=True))
db.session.add(TestModel(delete_me=False))
db.session.add(TestModel(delete_me=False))
db.session.commit()
db.session.expire_all()
self.assertEqual(TestModel.query.count(), 5)
with app.test_request_context():
cleanup_task.run()
db.session.commit()
db.session.expire_all()
with app.test_request_context():
self.assertEqual(TestModel.query.count(), 2)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.