diff --git a/examples/passwd.py b/examples/passwd.py index 7a516eb8ae05c5ce2e549154009990e6a54fb592..8e746cf7f5523941946593261c905ce05637f49f 100644 --- a/examples/passwd.py +++ b/examples/passwd.py @@ -19,7 +19,7 @@ class RequestHandler(ldapserver.LDAPRequestHandler): user_gids = {} for user in pwd.getpwall(): user_gids[user.pw_gid] = user_gids.get(user.pw_gid, set()) | {user.pw_name} - yield self.subschema.Object(ldapserver.dn.DN('ou=users,dc=example,dc=com', uid=user.pw_name), **{ + yield self.subschema.Object(self.subschema.DN('ou=users,dc=example,dc=com', uid=user.pw_name), **{ 'objectClass': ['top', 'organizationalperson', 'person', 'posixaccount'], 'structuralObjectClass': ['organizationalperson'], 'uid': [user.pw_name], @@ -29,12 +29,12 @@ class RequestHandler(ldapserver.LDAPRequestHandler): }) for group in grp.getgrall(): members = set(group.gr_mem) | user_gids.get(group.gr_gid, set()) - yield self.subschema.Object(ldapserver.dn.DN('ou=groups,dc=example,dc=com', cn=group.gr_name), **{ + yield self.subschema.Object(self.subschema.DN('ou=groups,dc=example,dc=com', cn=group.gr_name), **{ 'objectClass': ['top', 'groupOfUniqueNames', 'posixGroup'], 'structuralObjectClass': ['groupOfUniqueNames'], 'cn': [group.gr_name], 'gidNumber': [group.gr_gid], - 'uniqueMember': [ldapserver.dn.DN('ou=user,dc=example,dc=com', uid=name) for name in members], + 'uniqueMember': [self.subschema.DN('ou=user,dc=example,dc=com', uid=name) for name in members], }) if __name__ == '__main__': diff --git a/ldapserver/dn.py b/ldapserver/dn.py index 8e4ded8abbe5c0255db3a9f477dd1104b8a5cfcb..2895537b8ce7e3c5aaa0f5d32e48a6889f1fef70 100644 --- a/ldapserver/dn.py +++ b/ldapserver/dn.py @@ -10,38 +10,41 @@ comparing. Limitations: -* Supported attribute types: `cn`, `l`, `st`, `o`, `ou`, `c`, `street`, `dc`, `uid` -* Dotted-decimal/OID attribute types are not supported * Hexstring attribute values (`foo=#ABCDEF...`) are not supported +* Attribute values are only validated in from_str ''' import typing -import unicodedata import re -from string import hexdigits as HEXDIGITS + +from . import exceptions __all__ = ['DN', 'RDN', 'RDNAssertion'] class DN(tuple): '''Distinguished Name consiting of zero ore more `RDN` objects''' - def __new__(cls, *args, **kwargs): + schema: typing.Any + + def __new__(cls, schema, *args, **kwargs): if len(args) == 1 and isinstance(args[0], DN): args = args[0] if len(args) == 1 and isinstance(args[0], str): - args = cls.from_str(args[0]) + args = cls.from_str(schema, args[0]) for rdn in args: if not isinstance(rdn, RDN): raise TypeError(f'Argument {repr(rdn)} is of type {repr(type(rdn))}, expected ldapserver.dn.RDN object') rdns = tuple(args) if kwargs: - rdns = (RDN(**kwargs),) + rdns - return super().__new__(cls, rdns) + rdns = (RDN(schema, **kwargs),) + rdns + dn = super().__new__(cls, rdns) + dn.schema = schema + return dn # Syntax definiton from RFC4514: # distinguishedName = [ relativeDistinguishedName *( COMMA relativeDistinguishedName ) ] @classmethod - def from_str(cls, expr): + def from_str(cls, schema, expr): escaped = False rdns = [] token = '' @@ -50,15 +53,15 @@ class DN(tuple): escaped = False token += char elif char == ',': - rdns.append(RDN.from_str(token)) + rdns.append(RDN.from_str(schema, token)) token = '' else: if char == '\\': escaped = True token += char if token: - rdns.append(RDN.from_str(token)) - return cls(*rdns) + rdns.append(RDN.from_str(schema, token)) + return cls(schema, *rdns) def __str__(self): return ','.join(map(str, self)) @@ -69,24 +72,24 @@ class DN(tuple): def __eq__(self, obj): return type(self) is type(obj) and super().__eq__(obj) - def __hash__(self): - return hash((type(self), tuple(self))) + def __ne__(self, obj): + return not self == obj def __add__(self, value): if isinstance(value, DN): - return DN(*(tuple(self) + tuple(value))) + return DN(self.schema, *(tuple(self) + tuple(value))) elif isinstance(value, RDN): - return self + DN(value) + return self + DN(self.schema, value) else: raise TypeError(f'Can only add DN or RDN to DN, not {type(value)}') def __getitem__(self, key): if isinstance(key, slice): - return type(self)(*super().__getitem__(key)) + return type(self)(self.schema, *super().__getitem__(key)) return super().__getitem__(key) def __strip_common_suffix(self, value): - value = DN(value) + value = DN(self.schema, value) minlen = min(len(self), len(value)) for i in range(minlen): if self[-1 - i] != value[-1 - i]: @@ -94,11 +97,11 @@ class DN(tuple): return self[:-minlen or None], value[:-minlen or None] def is_direct_child_of(self, base): - rchild, rbase = self.__strip_common_suffix(DN(base)) + rchild, rbase = self.__strip_common_suffix(DN(self.schema, base)) return not rbase and len(rchild) == 1 def in_subtree_of(self, base): - rchild, rbase = self.__strip_common_suffix(DN(base)) # pylint: disable=unused-variable + rchild, rbase = self.__strip_common_suffix(DN(self.schema, base)) # pylint: disable=unused-variable return not rbase @property @@ -107,6 +110,12 @@ class DN(tuple): return None return self[0].attribute # pylint: disable=no-member + @property + def object_attribute_type(self): + if len(self) == 0: + return None + return self[0].attribute_type # pylint: disable=no-member + @property def object_value(self): if len(self) == 0: @@ -121,19 +130,19 @@ class DN(tuple): class DNWithUID(DN): # pylint: disable=arguments-differ,no-member - def __new__(cls, dn, uid=None): + def __new__(cls, schema, dn, uid=None): if not uid: return dn if not re.fullmatch(r"'[01]*'B", uid): raise ValueError('Invalid uid value') - obj = super().__new__(cls, *dn) + obj = super().__new__(cls, schema, *dn) obj.uid = uid return obj @classmethod - def from_str(cls, expr): + def from_str(cls, schema, expr): dn_part, uid_part = (expr.rsplit('#', 1) + [''])[:2] - return cls(DN.from_str(dn_part), uid_part or None) + return cls(schema, DN.from_str(schema, dn_part), uid_part or None) def __str__(self): return super().__str__() + '#' + self.uid @@ -146,26 +155,32 @@ class DNWithUID(DN): @property def dn(self): - return DN(*self) + return DN(self.schema, *self) class RDN(tuple): '''Relative Distinguished Name consisting of one or more `RDNAssertion` objects''' - def __new__(cls, *assertions, **kwargs): + schema: typing.Any + + def __new__(cls, schema, *assertions, **kwargs): for assertion in assertions: if not isinstance(assertion, RDNAssertion): raise TypeError(f'Argument {repr(assertion)} is of type {repr(type(assertion))}, expected ldapserver.dn.RDNAssertion') - assertions = set(assertions) + if assertion.attribute_type.schema is not schema: + raise ValueError('RDNAssertion has different schema') + assertions = list(assertions) for key, value in kwargs.items(): - assertions.add(RDNAssertion(key, value)) + assertions.append(RDNAssertion(schema, key, value)) if not assertions: raise ValueError('RDN must have at least one assertion') - return super().__new__(cls, sorted(assertions, key=lambda assertion: (assertion.attribute, assertion.value_normalized))) + rdn = super().__new__(cls, assertions) + rdn.schema = schema + return rdn # Syntax definiton from RFC4514: # relativeDistinguishedName = attributeTypeAndValue *( PLUS attributeTypeAndValue ) @classmethod - def from_str(cls, expr): + def from_str(cls, schema, expr): escaped = False assertions = [] token = '' @@ -174,15 +189,15 @@ class RDN(tuple): escaped = False token += char elif char == '+': - assertions.append(RDNAssertion.from_str(token)) + assertions.append(RDNAssertion.from_str(schema, token)) token = '' else: if char == '\\': escaped = True token += char if token: - assertions.append(RDNAssertion.from_str(token)) - return cls(*assertions) + assertions.append(RDNAssertion.from_str(schema, token)) + return cls(schema, *assertions) def __str__(self): return '+'.join(map(str, self)) @@ -191,16 +206,16 @@ class RDN(tuple): return '<ldapserver.dn.RDN %s>'%str(self) def __eq__(self, obj): - return type(self) is type(obj) and super().__eq__(obj) + return type(self) is type(obj) and set(self) == set(obj) - def __hash__(self): - return hash((type(self), tuple(self))) + def __ne__(self, obj): + return not self == obj def __add__(self, value): if isinstance(value, RDN): - return DN(self, value) + return DN(self.schema, self, value) elif isinstance(value, DN): - return DN(self) + value + return DN(self.schema, self) + value else: raise TypeError(f'Can only add DN or RDN to RDN, not {type(value)}') @@ -211,46 +226,72 @@ class RDN(tuple): return self[0].attribute @property - def value(self): + def attribute_type(self): if len(self) != 1: return None - return self[0].value + return self[0].attribute_type @property - def value_normalized(self): + def value(self): if len(self) != 1: return None - return self[0].value_normalized - -# Mandatory attribute types (RFC4514) -STRI_ATTRIBUTES = ('cn', 'l', 'st', 'o', 'ou', 'c', 'street', 'dc', 'uid') + return self[0].value -DN_ESCAPED = ('"', '+', ',', ';', '<', '>') -DN_SPECIAL = DN_ESCAPED + (' ', '#', '=') +DN_ESCAPED = ( + 0x0022, # '"' + 0x002B, # '+' + 0x002C, # ',' + 0x003B, # ';' + 0x003C, # '<' + 0x003E, # '>' +) +DN_SPECIAL = DN_ESCAPED + ( + 0x0020, # ' ' + 0x0023, # '#' + 0x003D, # '=' +) +HEXDIGITS = ( + 0x0030, # '0' + 0x0031, # '1' + 0x0032, # '2' + 0x0033, # '3' + 0x0034, # '4' + 0x0035, # '5' + 0x0036, # '6' + 0x0037, # '7' + 0x0038, # '8' + 0x0039, # '9' + 0x0041, # 'A' + 0x0042, # 'B' + 0x0043, # 'C' + 0x0044, # 'D' + 0x0045, # 'E' + 0x0046, # 'F' + 0x0061, # 'a' + 0x0062, # 'b' + 0x0063, # 'c' + 0x0064, # 'd' + 0x0065, # 'e' + 0x0066, # 'f' +) class RDNAssertion: '''A single attribute value assertion''' - __slots__ = ['attribute', 'value', 'value_normalized'] + __slots__ = ['attribute', 'attribute_type', 'value'] attribute: str + attribute_type: typing.Any value: typing.Any - value_normalized: typing.Any - - def __init__(self, attribute, value): - if not isinstance(attribute, str): - raise TypeError(f'RDNAssertion attribute {repr(attribute)} must be a string but is {type(attribute)}') - attribute = attribute.lower() - value_normalized = value - if attribute in STRI_ATTRIBUTES: - if not isinstance(value, str): - raise TypeError(f'RDNAssertion value {repr(value)} for attribute "{attribute}" must be a string but is {type(value)}') - if value == '': - raise ValueError(f'RDNAssertion value for attribute "{attribute}" must not be empty') - value_normalized = unicodedata.normalize('NFC', value.lower()) - else: - raise ValueError(f'RDNAssertion attribute "{attribute}" is unsupported') - super().__setattr__('value', value) - super().__setattr__('value_normalized', value_normalized) + schema: typing.Any + + def __init__(self, schema, attribute, value): + try: + super().__setattr__('attribute_type', schema.attribute_types[attribute]) + except KeyError as exc: + raise ValueError('Invalid RDN attribute type: Attribute type undefined in schema') from exc super().__setattr__('attribute', attribute) + if not self.attribute_type.equality: + raise ValueError('Invalid RDN attribute type: Attribute type has no EQUALITY matching rule') + super().__setattr__('value', value) # Syntax definiton from RFC4514 and 4512: # @@ -284,7 +325,7 @@ class RDNAssertion: # hexpair = HEX HEX @classmethod - def from_str(cls, expr): + def from_str(cls, schema, expr): attribute, escaped_value = expr.split('=', 1) if escaped_value.startswith('#'): # The "#..." form is used for unknown attribute types and those without @@ -294,17 +335,15 @@ class RDNAssertion: raise ValueError('Hex-encoded RDN assertion values are not supported') escaped = False hexdigit = None - # We store the unescaped value temporarily as bytes to correctly handle - # hex-escaped multi-byte UTF8 sequences encoded_value = b'' for char in escaped_value: if hexdigit is not None: encoded_value += bytes.fromhex('%s%s'%(hexdigit, char)) hexdigit = None elif escaped: - if char in DN_SPECIAL + ('\\',): + if ord(char) in DN_SPECIAL + (b'\\'[0],): encoded_value += char.encode('utf8') - elif char in HEXDIGITS: + elif ord(char) in HEXDIGITS: hexdigit = char else: raise ValueError('Invalid escape: \\%s'%char) @@ -313,36 +352,45 @@ class RDNAssertion: escaped = True else: encoded_value += char.encode('utf8') - value = encoded_value.decode('utf8') - return cls(attribute, value) + try: + attribute_type = schema.attribute_types[attribute] + except KeyError as exc: + raise ValueError('Invalid RDN attribute type: Attribute type undefined in schema') from exc + try: + value = attribute_type.syntax.decode(encoded_value) + except exceptions.LDAPInvalidAttributeSyntax as exc: + raise ValueError('Invalid RDN assertion value') from exc + return cls(schema, attribute, value) def __str__(self): + encoded_value = self.attribute_type.syntax.encode(self.value) escaped_value = '' - for char in self.value: - if char in DN_ESCAPED + ('\\',): - escaped_value += '\\' + char - # Escape non-printable characters for readability. This goes beyond - # what the standard requires. - elif char in ('\x00',) or not char.isprintable(): - for codepoint in char.encode('utf8'): - escaped_value += '\\%02x'%codepoint + for index in range(len(encoded_value)): + byte = encoded_value[index:index+1] + if not byte.isascii() or not byte.decode().isprintable(): + escaped_value += '\\%02x'%byte[0] + elif byte[0] in DN_ESCAPED + (b'\\'[0],): + escaped_value += '\\' + byte.decode() else: - escaped_value += char + escaped_value += byte.decode() if escaped_value.startswith(' ') or escaped_value.startswith('#'): escaped_value = '\\' + escaped_value if escaped_value.endswith(' '): escaped_value = escaped_value[:-1] + '\\' + escaped_value[-1] return '%s=%s'%(self.attribute, escaped_value) + def __hash__(self): + return hash(self.attribute_type.oid) + def __repr__(self): return '<ldapserver.dn.RDNAssertion %s>'%str(self) def __eq__(self, obj): - return type(self) is type(obj) and self.attribute == obj.attribute and \ - self.value_normalized == obj.value_normalized + return type(self) is type(obj) and self.attribute_type is obj.attribute_type and \ + self.attribute_type.equality.match_equal([self.value], obj.value) - def __hash__(self): - return hash((type(self), self.attribute, self.value_normalized)) + def __ne__(self, obj): + return not self == obj def __setattr__(self, *args): raise TypeError('RDNAssertion object is immutable') diff --git a/ldapserver/objects.py b/ldapserver/objects.py index 5b930e47400753c13d134f855cd86c0b2d45cc0c..0accdceffc78f142334bbaf8de453e526e07c5db 100644 --- a/ldapserver/objects.py +++ b/ldapserver/objects.py @@ -2,7 +2,7 @@ import collections.abc import enum from . import ldap, exceptions -from .dn import DN +from .dn import DN, RDN, RDNAssertion class TypeKeysView(collections.abc.Set): def __init__(self, attributes): @@ -110,7 +110,7 @@ class FilterResult(enum.Enum): class Object(AttributeDict): def __init__(self, schema, dn, **attributes): super().__init__(schema, **attributes) - self.dn = DN(dn) + self.dn = DN(schema, dn) def __search_match_dn(self, basedn, scope): if scope == ldap.SearchScope.baseObject: @@ -212,7 +212,7 @@ class Object(AttributeDict): return FilterResult.UNDEFINED def match_search(self, base_obj, scope, filter_obj): - return self.__search_match_dn(DN.from_str(base_obj), scope) and \ + return self.__search_match_dn(DN.from_str(self.schema, base_obj), scope) and \ self.__search_match_filter(filter_obj) == FilterResult.TRUE def search(self, base_obj, scope, filter_obj, attributes, types_only): @@ -237,7 +237,7 @@ class Object(AttributeDict): def compare(self, dn, attribute, value): try: - dn = DN.from_str(dn) + dn = DN.from_str(self.schema, dn) except ValueError as exc: raise exceptions.LDAPNoSuchObject() from exc if dn != self.dn: @@ -250,11 +250,11 @@ class Object(AttributeDict): class RootDSE(Object): def __init__(self, schema, *args, **kwargs): - super().__init__(schema, DN(), *args, **kwargs) + super().__init__(schema, DN(schema), *args, **kwargs) self.setdefault('objectClass', ['top']) def match_search(self, base_obj, scope, filter_obj): - return not DN.from_str(base_obj) and scope == ldap.SearchScope.baseObject and \ + return not DN.from_str(self.schema, base_obj) and scope == ldap.SearchScope.baseObject and \ isinstance(filter_obj, ldap.FilterPresent) and \ filter_obj.attribute.lower() == 'objectclass' @@ -272,7 +272,7 @@ class TemplateFilterResult(enum.Enum): class ObjectTemplate(AttributeDict): def __init__(self, schema, parent_dn, rdn_attribute, **attributes): super().__init__(schema, **attributes) - self.parent_dn = DN(parent_dn) + self.parent_dn = DN(schema, parent_dn) self.rdn_attribute = rdn_attribute def __match_extract_dn_constraints(self, basedn, scope): @@ -428,19 +428,18 @@ class ObjectTemplate(AttributeDict): def match_search(self, base_obj, scope, filter_obj): '''Return whether objects based on this template might match the search parameters''' - print(base_obj, scope, filter_obj, self.__search_match_filter(filter_obj)) - return self.__search_match_dn(DN.from_str(base_obj), scope) and \ + return self.__search_match_dn(DN.from_str(self.schema, base_obj), scope) and \ self.__search_match_filter(filter_obj) in (TemplateFilterResult.TRUE, TemplateFilterResult.MAYBE_TRUE) def extract_search_constraints(self, base_obj, scope, filter_obj): constraints = self.__extract_filter_constraints(filter_obj) - for key, values in self.__extract_dn_constraints(DN.from_str(base_obj), scope).items(): + for key, values in self.__extract_dn_constraints(DN.from_str(self.schema, base_obj), scope).items(): constraints[key] += values return constraints def create_object(self, rdn_value, **attributes): - obj = Object(self.schema, DN(self.parent_dn, **{self.rdn_attribute: rdn_value})) + obj = Object(self.schema, DN(self.schema, self.parent_dn, **{self.rdn_attribute: rdn_value})) for key, values in attributes.items(): if WILDCARD_VALUE not in self[key]: raise ValueError(f'Cannot set attribute "{key}" that is not set to [WILDCARD_VALUE] in the template') @@ -467,9 +466,23 @@ class SubschemaSubentry(Object): self.Object = lambda *args, **attributes: Object(schema, *args, subschemaSubentry=[self.dn], **attributes) self.RootDSE = lambda **attributes: RootDSE(schema, subschemaSubentry=[self.dn], **attributes) self.ObjectTemplate = lambda *args, **kwargs: ObjectTemplate(schema, *args, subschemaSubentry=[self.dn], **kwargs) + class Wrapper: + def __init__(self, cls, schema): + self.cls = cls + self.schema = schema + + def __call__(self, *args, **kwargs): + return self.cls(self.schema, *args, **kwargs) + + def from_str(self, *args, **kwargs): + return self.cls.from_str(self.schema, *args, **kwargs) + + self.DN = Wrapper(DN, schema) + self.RDN = Wrapper(RDN, schema) + self.RDNAssertion = Wrapper(RDNAssertion, schema) def match_search(self, base_obj, scope, filter_obj): - return DN.from_str(base_obj) == self.dn and \ + return DN.from_str(self.schema, base_obj) == self.dn and \ scope == ldap.SearchScope.baseObject and \ isinstance(filter_obj, ldap.FilterEqual) and \ filter_obj.attribute.lower() == 'objectclass' and \ diff --git a/ldapserver/schema/syntaxes.py b/ldapserver/schema/syntaxes.py index a17e1a29155327f43c161d01266d0ade65c974fe..2660a03f25fbd990d7e951d691ff9ad17ed831b0 100644 --- a/ldapserver/schema/syntaxes.py +++ b/ldapserver/schema/syntaxes.py @@ -64,7 +64,7 @@ class DNSyntaxDefinition(SyntaxDefinition): def decode(self, schema, raw_value): try: - return dn.DN.from_str(raw_value.decode('utf8')) + return dn.DN.from_str(schema, raw_value.decode('utf8')) except (UnicodeDecodeError, TypeError, ValueError) as exc: raise exceptions.LDAPInvalidAttributeSyntax() from exc @@ -74,7 +74,7 @@ class NameAndOptionalUIDSyntaxDefinition(StringSyntaxDefinition): def decode(self, schema, raw_value): try: - return dn.DNWithUID.from_str(raw_value.decode('utf8')) + return dn.DNWithUID.from_str(schema, raw_value.decode('utf8')) except (UnicodeDecodeError, TypeError, ValueError) as exc: raise exceptions.LDAPInvalidAttributeSyntax() from exc @@ -250,7 +250,7 @@ Binary = BytesSyntaxDefinition('1.3.6.1.4.1.1466.115.121.1.5', desc='Binary') Fax = BytesSyntaxDefinition('1.3.6.1.4.1.1466.115.121.1.23', desc='Fax') JPEG = BytesSyntaxDefinition('1.3.6.1.4.1.1466.115.121.1.28', desc='JPEG') OctetString = BytesSyntaxDefinition('1.3.6.1.4.1.1466.115.121.1.40', desc='Octet String') -DirectoryString = StringSyntaxDefinition('1.3.6.1.4.1.1466.115.121.1.15', desc='Directory String') +DirectoryString = StringSyntaxDefinition('1.3.6.1.4.1.1466.115.121.1.15', desc='Directory String', re_pattern='.+') IA5String = StringSyntaxDefinition('1.3.6.1.4.1.1466.115.121.1.26', desc='IA5 String', encoding='ascii', extra_compatability_tags=DirectoryString.compatability_tags) PrintableString = StringSyntaxDefinition('1.3.6.1.4.1.1466.115.121.1.44', desc='Printable String', encoding='ascii', re_pattern='[A-Za-z0-9\'()+,.=/:? -]*', extra_compatability_tags=IA5String.compatability_tags) CountryString = StringSyntaxDefinition('1.3.6.1.4.1.1466.115.121.1.11', desc='Country String', encoding='ascii', re_pattern='[A-Za-z0-9\'()+,.=/:? -]{2}', extra_compatability_tags=PrintableString.compatability_tags) diff --git a/tests/test_dn.py b/tests/test_dn.py index cfcb2c8aa9e6a3da2ae92aa8ce4fa323fa5c43b4..31c05357605aa63b53795fb5ff09485164025d31 100644 --- a/tests/test_dn.py +++ b/tests/test_dn.py @@ -2,6 +2,22 @@ import unittest import enum from ldapserver.dn import DN, RDN, RDNAssertion +from ldapserver.schema import RFC4519_SCHEMA as schema + +class Wrapper: + def __init__(self, cls, schema): + self.cls = cls + self.schema = schema + + def __call__(self, *args, **kwargs): + return self.cls(self.schema, *args, **kwargs) + + def from_str(self, *args, **kwargs): + return self.cls.from_str(self.schema, *args, **kwargs) + +DN = Wrapper(DN, schema) +RDN = Wrapper(RDN, schema) +RDNAssertion = Wrapper(RDNAssertion, schema) class TestDN(unittest.TestCase): def test_equal(self): @@ -9,10 +25,6 @@ class TestDN(unittest.TestCase): self.assertEqual(DN(RDN(uid='jsmith'), RDN(dc='example'), RDN(dc='net')), DN(RDN(uid='jsmith'), RDN(dc='example'), RDN(dc='net'))) self.assertNotEqual(DN(RDN(dc='example'), RDN(dc='net')), DN(RDN(dc='net'), RDN(dc='example'))) - def test_hash(self): - self.assertEqual(hash(DN()), hash(DN())) - self.assertEqual(hash(DN(RDN(uid='jsmith'), RDN(dc='example'), RDN(dc='net'))), hash(DN(RDN(uid='jsmith'), RDN(dc='example'), RDN(dc='net')))) - def test_repr(self): repr(DN(RDN(uid='jsmith'), RDN(dc='example'), RDN(dc='net'))) repr(DN(RDN(cn='James "Jim" Smith, III'), RDN(dc='example'), RDN(dc='net'))) @@ -92,12 +104,6 @@ class TestRDN(unittest.TestCase): self.assertEqual(RDN(RDNAssertion('ou', 'Sales'), RDNAssertion('cn', 'J. Smith'), RDNAssertion('ou', 'HR')), RDN(RDNAssertion('cn', 'J. Smith'), RDNAssertion('ou', 'HR'), RDNAssertion('ou', 'Sales'))) - def test_hash(self): - self.assertEqual(hash(RDN(RDNAssertion('uid', 'jsmith'))), hash(RDN(RDNAssertion('uid', 'Jsmith')))) - self.assertEqual(hash(RDN(RDNAssertion('uid', 'jsmith'))), hash(RDN(RDNAssertion('UID', 'jsmith')))) - self.assertEqual(hash(RDN(RDNAssertion('ou', 'Sales'), RDNAssertion('cn', 'J. Smith'), RDNAssertion('ou', 'HR'))), - hash(RDN(RDNAssertion('cn', 'J. Smith'), RDNAssertion('ou', 'HR'), RDNAssertion('ou', 'Sales')))) - def test_repr(self): repr(RDN(cn='J. Smith', ou='Sales')) repr(RDN(cn='James "Jim" Smith, III')) @@ -143,8 +149,9 @@ class TestRDNAssertion(unittest.TestCase): def test_init(self): with self.assertRaises(ValueError): RDNAssertion('invalidAttributeType', 'foobar') - with self.assertRaises(ValueError): - RDNAssertion('cn', '') + # We currently don't validate values + #with self.assertRaises(ValueError): + # RDNAssertion('cn', '') def test_equal(self): # NFD vs. NFC of string value diff --git a/tests/test_objects.py b/tests/test_objects.py index dae7801c0c7ebdd0cb13449e16e45f0200e2af3d..cf29d6e25cad5a64e19152666498f13a47d84199 100644 --- a/tests/test_objects.py +++ b/tests/test_objects.py @@ -98,7 +98,7 @@ class TestAttributeDict(unittest.TestCase): class TestObject(unittest.TestCase): def test_init(self): obj = Object(schema, 'cn=foo,dc=example,dc=com', cn=['foo', 'bar'], uid=[]) - self.assertEqual(obj.dn, DN.from_str('cn=foo,dc=example,dc=com')) + self.assertEqual(obj.dn, DN.from_str(schema, 'cn=foo,dc=example,dc=com')) self.assertEqual(obj['cn'], ['foo', 'bar']) self.assertEqual(obj['uid'], []) @@ -346,9 +346,9 @@ class TestObject(unittest.TestCase): def match_search(self, base_obj, scope, filter_obj): return False - obj = FalseObject(schema, 'cn=foo,dc=example,dc=com', cn=['foo', 'bar'], uid=[], objectclass=['top'], subschemaSubentry=[DN('cn=subschema')]) + obj = FalseObject(schema, 'cn=foo,dc=example,dc=com', cn=['foo', 'bar'], uid=[], objectclass=['top'], subschemaSubentry=[DN(schema, 'cn=subschema')]) self.assertIsNone(obj.search('cn=foo,dc=example,dc=com', ldap.SearchScope.baseObject, ldap.FilterPresent('objectclass'), [], False)) - obj = TrueObject(schema, 'cn=foo,dc=example,dc=com', cn=['foo', 'bar'], uid=[], objectclass=['top'], subschemaSubentry=[DN('cn=subschema')]) + obj = TrueObject(schema, 'cn=foo,dc=example,dc=com', cn=['foo', 'bar'], uid=[], objectclass=['top'], subschemaSubentry=[DN(schema, 'cn=subschema')]) result = obj.search('cn=foo,dc=example,dc=com', ldap.SearchScope.baseObject, ldap.FilterPresent('objectclass'), [], False) self.assertEqual(result.objectName, 'cn=foo,dc=example,dc=com') self.assertEqual(len(result.attributes), 2) @@ -387,9 +387,9 @@ class TestObject(unittest.TestCase): class TestRootDSE(unittest.TestCase): def test_init(self): obj = RootDSE(schema) - self.assertEqual(obj.dn, DN()) + self.assertEqual(obj.dn, DN(schema)) obj = RootDSE(schema, cn=['foo', 'bar']) - self.assertEqual(obj.dn, DN()) + self.assertEqual(obj.dn, DN(schema)) self.assertEqual(obj['cn'], ['foo', 'bar']) def test_match_search(self): @@ -730,7 +730,7 @@ class TestObjectTemplate(unittest.TestCase): def test_create_object(self): template = ObjectTemplate(schema, 'dc=example,dc=com', 'cn', objectclass=['top'], cn=[WILDCARD_VALUE], c=[WILDCARD_VALUE], uid=['foobar']) obj = template.create_object('foo', cn=['foo', 'bar'], c=['DE']) - self.assertEqual(obj.dn, DN('cn=foo,dc=example,dc=com')) + self.assertEqual(obj.dn, DN(schema, 'cn=foo,dc=example,dc=com')) self.assertEqual(dict(obj.items()), {'cn': ['foo', 'bar'], 'uid': ['foobar'], 'c': ['DE'], 'objectClass': ['top']}) obj = template.create_object('foo', cn=['foo', 'bar']) self.assertEqual(dict(obj.items()), {'cn': ['foo', 'bar'], 'uid': ['foobar'], 'objectClass': ['top']}) @@ -759,9 +759,9 @@ class TestSubschemaSubentry(unittest.TestCase): obj = subschema.Object('cn=foo,dc=example,dc=com', cn=['foo']) self.assertIsInstance(obj, Object) self.assertIs(obj.schema, subschema.schema) - self.assertEqual(obj.dn, DN('cn=foo,dc=example,dc=com')) + self.assertEqual(obj.dn, DN(schema, 'cn=foo,dc=example,dc=com')) self.assertEqual(obj['cn'], ['foo']) - self.assertEqual(obj['subschemaSubentry'], [DN('cn=Subschema')]) + self.assertEqual(obj['subschemaSubentry'], [DN(schema, 'cn=Subschema')]) rootdse = subschema.RootDSE(cn=['foo']) self.assertIsInstance(rootdse, RootDSE) self.assertIs(rootdse.schema, subschema.schema) @@ -769,4 +769,4 @@ class TestSubschemaSubentry(unittest.TestCase): template = subschema.ObjectTemplate('dc=example,dc=com', 'cn', objectclass=['top'], cn=[WILDCARD_VALUE]) self.assertIsInstance(template, ObjectTemplate) self.assertIs(template.schema, subschema.schema) - self.assertEqual(template['subschemaSubentry'], [DN('cn=Subschema')]) + self.assertEqual(template['subschemaSubentry'], [DN(schema, 'cn=Subschema')]) diff --git a/tests/test_schema_syntaxes.py b/tests/test_schema_syntaxes.py index 955481f7046d9bdb81d22bd7159260fbededb0ee..2995c79575f7a80bc5a1988b1ec71f63db3c4187 100644 --- a/tests/test_schema_syntaxes.py +++ b/tests/test_schema_syntaxes.py @@ -4,60 +4,62 @@ import datetime import ldapserver from ldapserver.schema import syntaxes +schema = ldapserver.schema.RFC4519_SCHEMA + class TestBytesSyntaxDefinition(unittest.TestCase): def test_encode(self): syntax = syntaxes.OctetString - self.assertEqual(syntax.encode(None, b'Foo'), b'Foo') + self.assertEqual(syntax.encode(schema, b'Foo'), b'Foo') def test_decode(self): syntax = syntaxes.OctetString - self.assertEqual(syntax.decode(None, b'Foo'), b'Foo') + self.assertEqual(syntax.decode(schema, b'Foo'), b'Foo') class TestStringSyntaxDefinition(unittest.TestCase): def test_encode(self): syntax = syntaxes.DirectoryString - self.assertEqual(syntax.encode(None, 'Foo'), b'Foo') - self.assertEqual(syntax.encode(None, 'äöü'), b'\xc3\xa4\xc3\xb6\xc3\xbc') + self.assertEqual(syntax.encode(schema, 'Foo'), b'Foo') + self.assertEqual(syntax.encode(schema, 'äöü'), b'\xc3\xa4\xc3\xb6\xc3\xbc') def test_decode(self): syntax = syntaxes.DirectoryString - self.assertEqual(syntax.decode(None, b'Foo'), 'Foo') - self.assertEqual(syntax.decode(None, b'\xc3\xa4\xc3\xb6\xc3\xbc'), 'äöü') + self.assertEqual(syntax.decode(schema, b'Foo'), 'Foo') + self.assertEqual(syntax.decode(schema, b'\xc3\xa4\xc3\xb6\xc3\xbc'), 'äöü') syntax = syntaxes.IA5String with self.assertRaises(ldapserver.exceptions.LDAPInvalidAttributeSyntax): - syntax.decode(None, b'\xc3\xa4\xc3\xb6\xc3\xbc') + syntax.decode(schema, b'\xc3\xa4\xc3\xb6\xc3\xbc') # Test regex matching syntax = syntaxes.BitString - self.assertEqual(syntax.decode(None, b"''B"), "''B") - self.assertEqual(syntax.decode(None, b"'0'B"), "'0'B") - self.assertEqual(syntax.decode(None, b"'010101'B"), "'010101'B") + self.assertEqual(syntax.decode(schema, b"''B"), "''B") + self.assertEqual(syntax.decode(schema, b"'0'B"), "'0'B") + self.assertEqual(syntax.decode(schema, b"'010101'B"), "'010101'B") with self.assertRaises(ldapserver.exceptions.LDAPInvalidAttributeSyntax): - syntax.decode(None, b"") + syntax.decode(schema, b"") with self.assertRaises(ldapserver.exceptions.LDAPInvalidAttributeSyntax): - syntax.decode(None, b"'0'") + syntax.decode(schema, b"'0'") with self.assertRaises(ldapserver.exceptions.LDAPInvalidAttributeSyntax): - syntax.decode(None, b"'0'b") + syntax.decode(schema, b"'0'b") with self.assertRaises(ldapserver.exceptions.LDAPInvalidAttributeSyntax): - syntax.decode(None, b"'0123'B") + syntax.decode(schema, b"'0123'B") class TestIntegerSyntaxDefinition(unittest.TestCase): def test_encode(self): syntax = syntaxes.INTEGER - self.assertEqual(syntax.encode(None, 0), b'0') - self.assertEqual(syntax.encode(None, 1234), b'1234') - self.assertEqual(syntax.encode(None, -1234), b'-1234') + self.assertEqual(syntax.encode(schema, 0), b'0') + self.assertEqual(syntax.encode(schema, 1234), b'1234') + self.assertEqual(syntax.encode(schema, -1234), b'-1234') def test_decode(self): syntax = syntaxes.INTEGER - self.assertEqual(syntax.decode(None, b'0'), 0) - self.assertEqual(syntax.decode(None, b'1234'), 1234) - self.assertEqual(syntax.decode(None, b'-1234'), -1234) + self.assertEqual(syntax.decode(schema, b'0'), 0) + self.assertEqual(syntax.decode(schema, b'1234'), 1234) + self.assertEqual(syntax.decode(schema, b'-1234'), -1234) with self.assertRaises(ldapserver.exceptions.LDAPInvalidAttributeSyntax): - syntax.decode(None, b'-0') + syntax.decode(schema, b'-0') with self.assertRaises(ldapserver.exceptions.LDAPInvalidAttributeSyntax): - syntax.decode(None, b'+1') + syntax.decode(schema, b'+1') with self.assertRaises(ldapserver.exceptions.LDAPInvalidAttributeSyntax): - syntax.decode(None, b'0123') + syntax.decode(schema, b'0123') class TestSchemaElementSyntaxDefinition(unittest.TestCase): def test_encode(self): @@ -65,107 +67,107 @@ class TestSchemaElementSyntaxDefinition(unittest.TestCase): def __str__(self): return '( SCHEMA ELEMENT )' syntax = syntaxes.SchemaElementSyntaxDefinition('1.2.3.4') - self.assertEqual(syntax.encode(None, SchemaElement()), b'( SCHEMA ELEMENT )') + self.assertEqual(syntax.encode(schema, SchemaElement()), b'( SCHEMA ELEMENT )') class TestBooleanSyntaxDefinition(unittest.TestCase): def test_encode(self): syntax = syntaxes.Boolean - self.assertEqual(syntax.encode(None, True), b'TRUE') - self.assertEqual(syntax.encode(None, False), b'FALSE') + self.assertEqual(syntax.encode(schema, True), b'TRUE') + self.assertEqual(syntax.encode(schema, False), b'FALSE') def test_decode(self): syntax = syntaxes.Boolean - self.assertEqual(syntax.decode(None, b'TRUE'), True) - self.assertEqual(syntax.decode(None, b'FALSE'), False) + self.assertEqual(syntax.decode(schema, b'TRUE'), True) + self.assertEqual(syntax.decode(schema, b'FALSE'), False) with self.assertRaises(ldapserver.exceptions.LDAPInvalidAttributeSyntax): - syntax.decode(None, b'true') + syntax.decode(schema, b'true') with self.assertRaises(ldapserver.exceptions.LDAPInvalidAttributeSyntax): - syntax.decode(None, b'') + syntax.decode(schema, b'') class TestDNSyntaxDefinition(unittest.TestCase): def test_encode(self): syntax = syntaxes.DN - self.assertEqual(syntax.encode(None, ldapserver.dn.DN(cn='foobar')), b'cn=foobar') + self.assertEqual(syntax.encode(schema, ldapserver.dn.DN(schema, cn='foobar')), b'cn=foobar') def test_decode(self): syntax = syntaxes.DN - self.assertEqual(syntax.decode(None, b'cn=foobar'), ldapserver.dn.DN(cn='foobar')) + self.assertEqual(syntax.decode(schema, b'cn=foobar'), ldapserver.dn.DN(schema, cn='foobar')) with self.assertRaises(ldapserver.exceptions.LDAPInvalidAttributeSyntax): - syntax.decode(None, b'cn=foobar,,,') + syntax.decode(schema, b'cn=foobar,,,') class TestNameAndOptionalUIDSyntaxDefinition(unittest.TestCase): def test_encode(self): syntax = syntaxes.NameAndOptionalUID - self.assertEqual(syntax.encode(None, ldapserver.dn.DN(cn='foobar')), b'cn=foobar') - self.assertEqual(syntax.encode(None, ldapserver.dn.DNWithUID(ldapserver.dn.DN(cn='foobar'), "'0101'B")), b"cn=foobar#'0101'B") + self.assertEqual(syntax.encode(schema, ldapserver.dn.DN(schema, cn='foobar')), b'cn=foobar') + self.assertEqual(syntax.encode(schema, ldapserver.dn.DNWithUID(schema, ldapserver.dn.DN(schema, cn='foobar'), "'0101'B")), b"cn=foobar#'0101'B") def test_decode(self): syntax = syntaxes.NameAndOptionalUID - self.assertEqual(syntax.decode(None, b'cn=foobar'), ldapserver.dn.DN(cn='foobar')) - self.assertEqual(syntax.decode(None, b"cn=foobar#'0101'B"), ldapserver.dn.DNWithUID(ldapserver.dn.DN(cn='foobar'), "'0101'B")) + self.assertEqual(syntax.decode(schema, b'cn=foobar'), ldapserver.dn.DN(schema, cn='foobar')) + self.assertEqual(syntax.decode(schema, b"cn=foobar#'0101'B"), ldapserver.dn.DNWithUID(schema, ldapserver.dn.DN(schema, cn='foobar'), "'0101'B")) with self.assertRaises(ldapserver.exceptions.LDAPInvalidAttributeSyntax): - syntax.decode(None, b'cn=foobar,,,') + syntax.decode(schema, b'cn=foobar,,,') with self.assertRaises(ldapserver.exceptions.LDAPInvalidAttributeSyntax): - syntax.decode(None, b"cn=foobar,,,#'0101'B") + syntax.decode(schema, b"cn=foobar,,,#'0101'B") with self.assertRaises(ldapserver.exceptions.LDAPInvalidAttributeSyntax): - syntax.decode(None, b"cn=foobar#'0102'B") + syntax.decode(schema, b"cn=foobar#'0102'B") class TestGeneralizedTimeSyntaxDefinition(unittest.TestCase): def test_encode(self): syntax = syntaxes.GeneralizedTime - self.assertEqual(syntax.encode(None, datetime.datetime(1994, 12, 16, 10, 32, tzinfo=datetime.timezone.utc)), + self.assertEqual(syntax.encode(schema, datetime.datetime(1994, 12, 16, 10, 32, tzinfo=datetime.timezone.utc)), b'199412161032Z') - self.assertEqual(syntax.encode(None, datetime.datetime(1994, 12, 16, 5, 32, tzinfo=datetime.timezone(datetime.timedelta(hours=-5)))), + self.assertEqual(syntax.encode(schema, datetime.datetime(1994, 12, 16, 5, 32, tzinfo=datetime.timezone(datetime.timedelta(hours=-5)))), b'199412160532-0500') def test_decode(self): syntax = syntaxes.GeneralizedTime - self.assertEqual(syntax.decode(None, b'199412161032Z'), + self.assertEqual(syntax.decode(schema, b'199412161032Z'), datetime.datetime(1994, 12, 16, 10, 32, tzinfo=datetime.timezone.utc)) - self.assertEqual(syntax.decode(None, b'199412160532-0500'), + self.assertEqual(syntax.decode(schema, b'199412160532-0500'), datetime.datetime(1994, 12, 16, 5, 32, tzinfo=datetime.timezone(datetime.timedelta(hours=-5)))) class TestPostalAddressSyntaxDefinition(unittest.TestCase): def test_encode(self): syntax = syntaxes.PostalAddress - self.assertEqual(syntax.encode(None, ['1234 Main St.', 'Anytown, CA 12345', 'USA']), + self.assertEqual(syntax.encode(schema, ['1234 Main St.', 'Anytown, CA 12345', 'USA']), b'1234 Main St.$Anytown, CA 12345$USA') - self.assertEqual(syntax.encode(None, ['$1,000,000 Sweepstakes', 'PO Box 1000000', 'Anytown, CA 12345', 'USA']), + self.assertEqual(syntax.encode(schema, ['$1,000,000 Sweepstakes', 'PO Box 1000000', 'Anytown, CA 12345', 'USA']), b'\\241,000,000 Sweepstakes$PO Box 1000000$Anytown, CA 12345$USA') def test_decode(self): syntax = syntaxes.PostalAddress - self.assertEqual(syntax.decode(None, b'1234 Main St.$Anytown, CA 12345$USA'), + self.assertEqual(syntax.decode(schema, b'1234 Main St.$Anytown, CA 12345$USA'), ['1234 Main St.', 'Anytown, CA 12345', 'USA']) - self.assertEqual(syntax.decode(None, b'\\241,000,000 Sweepstakes$PO Box 1000000$Anytown, CA 12345$USA'), + self.assertEqual(syntax.decode(schema, b'\\241,000,000 Sweepstakes$PO Box 1000000$Anytown, CA 12345$USA'), ['$1,000,000 Sweepstakes', 'PO Box 1000000', 'Anytown, CA 12345', 'USA']) class TestSubstringAssertionSyntaxDefinition(unittest.TestCase): def test_decode(self): syntax = syntaxes.SubstringAssertion - self.assertEqual(syntax.decode(None, b'*foo*'), (None, ['foo'], None)) - self.assertEqual(syntax.decode(None, b'*foo*bar*'), (None, ['foo', 'bar'], None)) - self.assertEqual(syntax.decode(None, b'a*foo*bar*b'), ('a', ['foo', 'bar'], 'b')) - self.assertEqual(syntax.decode(None, b'a*b'), ('a', [], 'b')) - self.assertEqual(syntax.decode(None, b' a\\2A*\\2Afoo*\\5Cbar*\\2Ab'), (' a*', ['*foo', '\\bar'], '*b')) + self.assertEqual(syntax.decode(schema, b'*foo*'), (None, ['foo'], None)) + self.assertEqual(syntax.decode(schema, b'*foo*bar*'), (None, ['foo', 'bar'], None)) + self.assertEqual(syntax.decode(schema, b'a*foo*bar*b'), ('a', ['foo', 'bar'], 'b')) + self.assertEqual(syntax.decode(schema, b'a*b'), ('a', [], 'b')) + self.assertEqual(syntax.decode(schema, b' a\\2A*\\2Afoo*\\5Cbar*\\2Ab'), (' a*', ['*foo', '\\bar'], '*b')) with self.assertRaises(ldapserver.exceptions.LDAPInvalidAttributeSyntax): - syntax.decode(None, b'') + syntax.decode(schema, b'') with self.assertRaises(ldapserver.exceptions.LDAPInvalidAttributeSyntax): - syntax.decode(None, b'foo') + syntax.decode(schema, b'foo') class TestUTCTimeSyntaxDefinition(unittest.TestCase): def test_encode(self): syntax = syntaxes.UTCTime - self.assertEqual(syntax.encode(None, datetime.datetime(1994, 12, 16, 10, 32, tzinfo=datetime.timezone.utc)), + self.assertEqual(syntax.encode(schema, datetime.datetime(1994, 12, 16, 10, 32, tzinfo=datetime.timezone.utc)), b'9412161032Z') - self.assertEqual(syntax.encode(None, datetime.datetime(1994, 12, 16, 5, 32, tzinfo=datetime.timezone(datetime.timedelta(hours=-5)))), + self.assertEqual(syntax.encode(schema, datetime.datetime(1994, 12, 16, 5, 32, tzinfo=datetime.timezone(datetime.timedelta(hours=-5)))), b'9412160532-0500') def test_decode(self): syntax = syntaxes.UTCTime - self.assertEqual(syntax.decode(None, b'9412161032Z'), + self.assertEqual(syntax.decode(schema, b'9412161032Z'), datetime.datetime(1994, 12, 16, 10, 32, tzinfo=datetime.timezone.utc)) - self.assertEqual(syntax.decode(None, b'9412160532-0500'), + self.assertEqual(syntax.decode(schema, b'9412160532-0500'), datetime.datetime(1994, 12, 16, 5, 32, tzinfo=datetime.timezone(datetime.timedelta(hours=-5))))