Select Git revision
test_api.py
Forked from
uffd / uffd
Source project has a limited visibility.
dn.py 11.06 KiB
'''LDAP Distinguished Name Utilities
Distinguished Names (DNs) identifiy objects in an LDAP directory. In LDAP
protocol messages and when stored in attribute values DNs are encoded with
a string representation scheme described in RFC4514.
This module provides classes to represent `DN` objects and theirs parts
(`RD` and `RDNAssertion`) that correctly implement encoding, decoding and
comparing.
Limitations:
* Hexstring attribute values (`foo=#ABCDEF...`) are not supported
* Attribute values are only validated in from_str
'''
import typing
import re
from . import exceptions
__all__ = ['DN', 'RDN', 'RDNAssertion']
class DN(tuple):
'''Distinguished Name consiting of zero ore more `RDN` objects'''
schema: typing.Any
def __new__(cls, schema, *args, **kwargs):
if len(args) == 1 and isinstance(args[0], DN):
args = args[0]
if len(args) == 1 and isinstance(args[0], str):
args = cls.from_str(schema, args[0])
for rdn in args:
if not isinstance(rdn, RDN):
raise TypeError(f'Argument {repr(rdn)} is of type {repr(type(rdn))}, expected ldapserver.dn.RDN object')
rdns = tuple(args)
if kwargs:
rdns = (RDN(schema, **kwargs),) + rdns
dn = super().__new__(cls, rdns)
dn.schema = schema
return dn
# Syntax definiton from RFC4514:
# distinguishedName = [ relativeDistinguishedName *( COMMA relativeDistinguishedName ) ]
@classmethod
def from_str(cls, schema, expr):
escaped = False
rdns = []
token = ''
for char in expr:
if escaped:
escaped = False
token += char
elif char == ',':
rdns.append(RDN.from_str(schema, token))
token = ''
else:
if char == '\\':
escaped = True
token += char
if token:
rdns.append(RDN.from_str(schema, token))
return cls(schema, *rdns)
def __str__(self):
return ','.join(map(str, self))
def __repr__(self):
return '<ldapserver.dn.DN %s>'%str(self)
def __eq__(self, obj):
return type(self) is type(obj) and super().__eq__(obj)
def __ne__(self, obj):
return not self == obj
def __add__(self, value):
if isinstance(value, DN):
return DN(self.schema, *(tuple(self) + tuple(value)))
elif isinstance(value, RDN):
return self + DN(self.schema, value)
else:
raise TypeError(f'Can only add DN or RDN to DN, not {type(value)}')
def __getitem__(self, key):
if isinstance(key, slice):
return type(self)(self.schema, *super().__getitem__(key))
return super().__getitem__(key)
def __strip_common_suffix(self, value):
value = DN(self.schema, value)
minlen = min(len(self), len(value))
for i in range(minlen):
if self[-1 - i] != value[-1 - i]:
return self[:-i or None], value[:-i or None]
return self[:-minlen or None], value[:-minlen or None]
def is_direct_child_of(self, base):
rchild, rbase = self.__strip_common_suffix(DN(self.schema, base))
return not rbase and len(rchild) == 1
def in_subtree_of(self, base):
rchild, rbase = self.__strip_common_suffix(DN(self.schema, base)) # pylint: disable=unused-variable
return not rbase
@property
def object_attribute(self):
if len(self) == 0:
return None
return self[0].attribute # pylint: disable=no-member
@property
def object_attribute_type(self):
if len(self) == 0:
return None
return self[0].attribute_type # pylint: disable=no-member
@property
def object_value(self):
if len(self) == 0:
return None
return self[0].value # pylint: disable=no-member
@property
def object_value_normalized(self):
if len(self) == 0:
return None
return self[0].value_normalized # pylint: disable=no-member
class DNWithUID(DN):
# pylint: disable=arguments-differ,no-member
def __new__(cls, schema, dn, uid=None):
if not uid:
return dn
if not re.fullmatch(r"'[01]*'B", uid):
raise ValueError('Invalid uid value')
obj = super().__new__(cls, schema, *dn)
obj.uid = uid
return obj
@classmethod
def from_str(cls, schema, expr):
dn_part, uid_part = (expr.rsplit('#', 1) + [''])[:2]
return cls(schema, DN.from_str(schema, dn_part), uid_part or None)
def __str__(self):
return super().__str__() + '#' + self.uid
def __repr__(self):
return '<ldapserver.dn.DNWithUID %s>'%str(self)
def __eq__(self, obj):
return type(self) is type(obj) and super().__eq__(obj) and self.uid == obj.uid
@property
def dn(self):
return DN(self.schema, *self)
class RDN(tuple):
'''Relative Distinguished Name consisting of one or more `RDNAssertion` objects'''
schema: typing.Any
def __new__(cls, schema, *assertions, **kwargs):
for assertion in assertions:
if not isinstance(assertion, RDNAssertion):
raise TypeError(f'Argument {repr(assertion)} is of type {repr(type(assertion))}, expected ldapserver.dn.RDNAssertion')
if assertion.attribute_type.schema is not schema:
raise ValueError('RDNAssertion has different schema')
assertions = list(assertions)
for key, value in kwargs.items():
assertions.append(RDNAssertion(schema, key, value))
if not assertions:
raise ValueError('RDN must have at least one assertion')
rdn = super().__new__(cls, assertions)
rdn.schema = schema
return rdn
# Syntax definiton from RFC4514:
# relativeDistinguishedName = attributeTypeAndValue *( PLUS attributeTypeAndValue )
@classmethod
def from_str(cls, schema, expr):
escaped = False
assertions = []
token = ''
for char in expr:
if escaped:
escaped = False
token += char
elif char == '+':
assertions.append(RDNAssertion.from_str(schema, token))
token = ''
else:
if char == '\\':
escaped = True
token += char
if token:
assertions.append(RDNAssertion.from_str(schema, token))
return cls(schema, *assertions)
def __str__(self):
return '+'.join(map(str, self))
def __repr__(self):
return '<ldapserver.dn.RDN %s>'%str(self)
def __eq__(self, obj):
return type(self) is type(obj) and set(self) == set(obj)
def __ne__(self, obj):
return not self == obj
def __add__(self, value):
if isinstance(value, RDN):
return DN(self.schema, self, value)
elif isinstance(value, DN):
return DN(self.schema, self) + value
else:
raise TypeError(f'Can only add DN or RDN to RDN, not {type(value)}')
@property
def attribute(self):
if len(self) != 1:
return None
return self[0].attribute
@property
def attribute_type(self):
if len(self) != 1:
return None
return self[0].attribute_type
@property
def value(self):
if len(self) != 1:
return None
return self[0].value
DN_ESCAPED = (
0x0022, # '"'
0x002B, # '+'
0x002C, # ','
0x003B, # ';'
0x003C, # '<'
0x003E, # '>'
)
DN_SPECIAL = DN_ESCAPED + (
0x0020, # ' '
0x0023, # '#'
0x003D, # '='
)
HEXDIGITS = (
0x0030, # '0'
0x0031, # '1'
0x0032, # '2'
0x0033, # '3'
0x0034, # '4'
0x0035, # '5'
0x0036, # '6'
0x0037, # '7'
0x0038, # '8'
0x0039, # '9'
0x0041, # 'A'
0x0042, # 'B'
0x0043, # 'C'
0x0044, # 'D'
0x0045, # 'E'
0x0046, # 'F'
0x0061, # 'a'
0x0062, # 'b'
0x0063, # 'c'
0x0064, # 'd'
0x0065, # 'e'
0x0066, # 'f'
)
class RDNAssertion:
'''A single attribute value assertion'''
__slots__ = ['attribute', 'attribute_type', 'value']
attribute: str
attribute_type: typing.Any
value: typing.Any
schema: typing.Any
def __init__(self, schema, attribute, value):
try:
super().__setattr__('attribute_type', schema.attribute_types[attribute])
except KeyError as exc:
raise ValueError('Invalid RDN attribute type: Attribute type undefined in schema') from exc
super().__setattr__('attribute', attribute)
if not self.attribute_type.equality:
raise ValueError('Invalid RDN attribute type: Attribute type has no EQUALITY matching rule')
super().__setattr__('value', value)
# Syntax definiton from RFC4514 and 4512:
#
# attributeTypeAndValue = attributeType EQUALS attributeValue
# attributeType = descr / numericoid
# attributeValue = string / hexstring
#
# descr = ALPHA *( ALPHA / DIGIT / HYPHEN )
# numericoid = number 1*( DOT number )
# number = DIGIT / ( LDIGIT 1*DIGIT )
#
# ; The following characters are to be escaped when they appear
# ; in the value to be encoded: ESC, one of <escaped>, leading
# ; SHARP or SPACE, trailing SPACE, and NULL.
# string = [ ( leadchar / pair ) [ *( stringchar / pair ) ( trailchar / pair ) ] ]
#
# leadchar = LUTF1 / UTFMB
# LUTF1 = %x01-1F / %x21 / %x24-2A / %x2D-3A / %x3D / %x3F-5B / %x5D-7F
#
# trailchar = TUTF1 / UTFMB
# TUTF1 = %x01-1F / %x21 / %x23-2A / %x2D-3A / %x3D / %x3F-5B / %x5D-7F
#
# stringchar = SUTF1 / UTFMB
# SUTF1 = %x01-21 / %x23-2A / %x2D-3A / %x3D / %x3F-5B / %x5D-7F
#
# pair = ESC ( ESC / special / hexpair )
# special = escaped / SPACE / SHARP / EQUALS
# escaped = DQUOTE / PLUS / COMMA / SEMI / LANGLE / RANGLE
#
# hexstring = SHARP 1*hexpair
# hexpair = HEX HEX
@classmethod
def from_str(cls, schema, expr):
attribute, escaped_value = expr.split('=', 1)
if escaped_value.startswith('#'):
# The "#..." form is used for unknown attribute types and those without
# an LDAP string encoding. Supporting it would require us to somehow
# handle the hex-encoded BER encoding of the data. We'll stay away from
# this mess for now.
raise ValueError('Hex-encoded RDN assertion values are not supported')
escaped = False
hexdigit = None
encoded_value = b''
for char in escaped_value:
if hexdigit is not None:
encoded_value += bytes.fromhex('%s%s'%(hexdigit, char))
hexdigit = None
elif escaped:
if ord(char) in DN_SPECIAL + (b'\\'[0],):
encoded_value += char.encode('utf8')
elif ord(char) in HEXDIGITS:
hexdigit = char
else:
raise ValueError('Invalid escape: \\%s'%char)
escaped = False
elif char == '\\':
escaped = True
else:
encoded_value += char.encode('utf8')
try:
attribute_type = schema.attribute_types[attribute]
except KeyError as exc:
raise ValueError('Invalid RDN attribute type: Attribute type undefined in schema') from exc
try:
value = attribute_type.syntax.decode(encoded_value)
except exceptions.LDAPInvalidAttributeSyntax as exc:
raise ValueError('Invalid RDN assertion value') from exc
return cls(schema, attribute, value)
def __str__(self):
encoded_value = self.attribute_type.syntax.encode(self.value)
escaped_value = ''
for index in range(len(encoded_value)):
byte = encoded_value[index:index+1]
if not byte.isascii() or not byte.decode().isprintable():
escaped_value += '\\%02x'%byte[0]
elif byte[0] in DN_ESCAPED + (b'\\'[0],):
escaped_value += '\\' + byte.decode()
else:
escaped_value += byte.decode()
if escaped_value.startswith(' ') or escaped_value.startswith('#'):
escaped_value = '\\' + escaped_value
if escaped_value.endswith(' '):
escaped_value = escaped_value[:-1] + '\\' + escaped_value[-1]
return '%s=%s'%(self.attribute, escaped_value)
def __hash__(self):
return hash(self.attribute_type.oid)
def __repr__(self):
return '<ldapserver.dn.RDNAssertion %s>'%str(self)
def __eq__(self, obj):
return type(self) is type(obj) and self.attribute_type is obj.attribute_type and \
self.attribute_type.equality.match_equal([self.value], obj.value)
def __ne__(self, obj):
return not self == obj
def __setattr__(self, *args):
raise TypeError('RDNAssertion object is immutable')
def __delattr__(self, *args):
raise TypeError('RDNAssertion object is immutable')