Skip to content
Snippets Groups Projects
Select Git revision
  • 4bc7ffd0ba088b3a2c24b84889fe425f8d2368bb
  • master default protected
  • claims-in-idtoke
  • jwt_encode_inconsistencies
  • recovery-code-pwhash
  • incremental-sync
  • redis-rate-limits
  • typehints
  • v1.2.x
  • v1.x.x
  • v1.1.x
  • feature_invite_validuntil_minmax
  • Dockerfile
  • v1.0.x
  • roles-recursive-cte
  • v2.3.1
  • v2.3.0
  • v2.2.0
  • v2.1.0
  • v2.0.1
  • v2.0.0
  • v1.2.0
  • v1.1.2
  • v1.1.1
  • v1.0.2
  • v1.1.0
  • v1.0.1
  • v1.0.0
  • v0.3.0
  • v0.2.0
  • v0.1.5
  • v0.1.4
  • v0.1.2
33 results

test_api.py

Blame
  • Forked from uffd / uffd
    Source project has a limited visibility.
    dn.py 11.06 KiB
    '''LDAP Distinguished Name Utilities
    
    Distinguished Names (DNs) identifiy objects in an LDAP directory. In LDAP
    protocol messages and when stored in attribute values DNs are encoded with
    a string representation scheme described in RFC4514.
    
    This module provides classes to represent `DN` objects and theirs parts
    (`RD` and `RDNAssertion`) that correctly implement encoding, decoding and
    comparing.
    
    Limitations:
    
    * Hexstring attribute values (`foo=#ABCDEF...`) are not supported
    * Attribute values are only validated in from_str
    '''
    
    import typing
    import re
    
    from . import exceptions
    
    __all__ = ['DN', 'RDN', 'RDNAssertion']
    
    class DN(tuple):
    	'''Distinguished Name consiting of zero ore more `RDN` objects'''
    	schema: typing.Any
    
    	def __new__(cls, schema, *args, **kwargs):
    		if len(args) == 1 and isinstance(args[0], DN):
    			args = args[0]
    		if len(args) == 1 and isinstance(args[0], str):
    			args = cls.from_str(schema, args[0])
    		for rdn in args:
    			if not isinstance(rdn, RDN):
    				raise TypeError(f'Argument {repr(rdn)} is of type {repr(type(rdn))}, expected ldapserver.dn.RDN object')
    		rdns = tuple(args)
    		if kwargs:
    			rdns = (RDN(schema, **kwargs),) + rdns
    		dn = super().__new__(cls, rdns)
    		dn.schema = schema
    		return dn
    
    	# Syntax definiton from RFC4514:
    	# distinguishedName = [ relativeDistinguishedName *( COMMA relativeDistinguishedName ) ]
    
    	@classmethod
    	def from_str(cls, schema, expr):
    		escaped = False
    		rdns = []
    		token = ''
    		for char in expr:
    			if escaped:
    				escaped = False
    				token += char
    			elif char == ',':
    				rdns.append(RDN.from_str(schema, token))
    				token = ''
    			else:
    				if char == '\\':
    					escaped = True
    				token += char
    		if token:
    			rdns.append(RDN.from_str(schema, token))
    		return cls(schema, *rdns)
    
    	def __str__(self):
    		return ','.join(map(str, self))
    
    	def __repr__(self):
    		return '<ldapserver.dn.DN %s>'%str(self)
    
    	def __eq__(self, obj):
    		return type(self) is type(obj) and super().__eq__(obj)
    
    	def __ne__(self, obj):
    		return not self == obj
    
    	def __add__(self, value):
    		if isinstance(value, DN):
    			return DN(self.schema, *(tuple(self) + tuple(value)))
    		elif isinstance(value, RDN):
    			return self + DN(self.schema, value)
    		else:
    			raise TypeError(f'Can only add DN or RDN to DN, not {type(value)}')
    
    	def __getitem__(self, key):
    		if isinstance(key, slice):
    			return type(self)(self.schema, *super().__getitem__(key))
    		return super().__getitem__(key)
    
    	def __strip_common_suffix(self, value):
    		value = DN(self.schema, value)
    		minlen = min(len(self), len(value))
    		for i in range(minlen):
    			if self[-1 - i] != value[-1 - i]:
    				return self[:-i or None], value[:-i or None]
    		return self[:-minlen or None], value[:-minlen or None]
    
    	def is_direct_child_of(self, base):
    		rchild, rbase = self.__strip_common_suffix(DN(self.schema, base))
    		return not rbase and len(rchild) == 1
    
    	def in_subtree_of(self, base):
    		rchild, rbase = self.__strip_common_suffix(DN(self.schema, base)) # pylint: disable=unused-variable
    		return not rbase
    
    	@property
    	def object_attribute(self):
    		if len(self) == 0:
    			return None
    		return self[0].attribute # pylint: disable=no-member
    
    	@property
    	def object_attribute_type(self):
    		if len(self) == 0:
    			return None
    		return self[0].attribute_type # pylint: disable=no-member
    
    	@property
    	def object_value(self):
    		if len(self) == 0:
    			return None
    		return self[0].value # pylint: disable=no-member
    
    	@property
    	def object_value_normalized(self):
    		if len(self) == 0:
    			return None
    		return self[0].value_normalized # pylint: disable=no-member
    
    class DNWithUID(DN):
    	# pylint: disable=arguments-differ,no-member
    	def __new__(cls, schema, dn, uid=None):
    		if not uid:
    			return dn
    		if not re.fullmatch(r"'[01]*'B", uid):
    			raise ValueError('Invalid uid value')
    		obj = super().__new__(cls, schema, *dn)
    		obj.uid = uid
    		return obj
    
    	@classmethod
    	def from_str(cls, schema, expr):
    		dn_part, uid_part = (expr.rsplit('#', 1) + [''])[:2]
    		return cls(schema, DN.from_str(schema, dn_part), uid_part or None)
    
    	def __str__(self):
    		return super().__str__() + '#' + self.uid
    
    	def __repr__(self):
    		return '<ldapserver.dn.DNWithUID %s>'%str(self)
    
    	def __eq__(self, obj):
    		return type(self) is type(obj) and super().__eq__(obj) and self.uid == obj.uid
    
    	@property
    	def dn(self):
    		return DN(self.schema, *self)
    
    class RDN(tuple):
    	'''Relative Distinguished Name consisting of one or more `RDNAssertion` objects'''
    	schema: typing.Any
    
    	def __new__(cls, schema, *assertions, **kwargs):
    		for assertion in assertions:
    			if not isinstance(assertion, RDNAssertion):
    				raise TypeError(f'Argument {repr(assertion)} is of type {repr(type(assertion))}, expected ldapserver.dn.RDNAssertion')
    			if assertion.attribute_type.schema is not schema:
    				raise ValueError('RDNAssertion has different schema')
    		assertions = list(assertions)
    		for key, value in kwargs.items():
    			assertions.append(RDNAssertion(schema, key, value))
    		if not assertions:
    			raise ValueError('RDN must have at least one assertion')
    		rdn = super().__new__(cls, assertions)
    		rdn.schema = schema
    		return rdn
    
    	# Syntax definiton from RFC4514:
    	# relativeDistinguishedName = attributeTypeAndValue *( PLUS attributeTypeAndValue )
    
    	@classmethod
    	def from_str(cls, schema, expr):
    		escaped = False
    		assertions = []
    		token = ''
    		for char in expr:
    			if escaped:
    				escaped = False
    				token += char
    			elif char == '+':
    				assertions.append(RDNAssertion.from_str(schema, token))
    				token = ''
    			else:
    				if char == '\\':
    					escaped = True
    				token += char
    		if token:
    			assertions.append(RDNAssertion.from_str(schema, token))
    		return cls(schema, *assertions)
    
    	def __str__(self):
    		return '+'.join(map(str, self))
    
    	def __repr__(self):
    		return '<ldapserver.dn.RDN %s>'%str(self)
    
    	def __eq__(self, obj):
    		return type(self) is type(obj) and set(self) == set(obj)
    
    	def __ne__(self, obj):
    		return not self == obj
    
    	def __add__(self, value):
    		if isinstance(value, RDN):
    			return DN(self.schema, self, value)
    		elif isinstance(value, DN):
    			return DN(self.schema, self) + value
    		else:
    			raise TypeError(f'Can only add DN or RDN to RDN, not {type(value)}')
    
    	@property
    	def attribute(self):
    		if len(self) != 1:
    			return None
    		return self[0].attribute
    
    	@property
    	def attribute_type(self):
    		if len(self) != 1:
    			return None
    		return self[0].attribute_type
    
    	@property
    	def value(self):
    		if len(self) != 1:
    			return None
    		return self[0].value
    
    DN_ESCAPED = (
    	0x0022, # '"'
    	0x002B, # '+'
    	0x002C, # ','
    	0x003B, # ';'
    	0x003C, # '<'
    	0x003E, # '>'
    )
    DN_SPECIAL = DN_ESCAPED + (
    	0x0020, # ' '
    	0x0023, # '#'
    	0x003D, # '='
    )
    HEXDIGITS = (
    	0x0030, # '0'
    	0x0031, # '1'
    	0x0032, # '2'
    	0x0033, # '3'
    	0x0034, # '4'
    	0x0035, # '5'
    	0x0036, # '6'
    	0x0037, # '7'
    	0x0038, # '8'
    	0x0039, # '9'
    	0x0041, # 'A'
    	0x0042, # 'B'
    	0x0043, # 'C'
    	0x0044, # 'D'
    	0x0045, # 'E'
    	0x0046, # 'F'
    	0x0061, # 'a'
    	0x0062, # 'b'
    	0x0063, # 'c'
    	0x0064, # 'd'
    	0x0065, # 'e'
    	0x0066, # 'f'
    )
    
    class RDNAssertion:
    	'''A single attribute value assertion'''
    	__slots__ = ['attribute', 'attribute_type', 'value']
    	attribute: str
    	attribute_type: typing.Any
    	value: typing.Any
    	schema: typing.Any
    
    	def __init__(self, schema, attribute, value):
    		try:
    			super().__setattr__('attribute_type', schema.attribute_types[attribute])
    		except KeyError as exc:
    			raise ValueError('Invalid RDN attribute type: Attribute type undefined in schema') from exc
    		super().__setattr__('attribute', attribute)
    		if not self.attribute_type.equality:
    			raise ValueError('Invalid RDN attribute type: Attribute type has no EQUALITY matching rule')
    		super().__setattr__('value', value)
    
    	# Syntax definiton from RFC4514 and 4512:
    	#
    	# attributeTypeAndValue = attributeType EQUALS attributeValue
    	# attributeType = descr / numericoid
    	# attributeValue = string / hexstring
    	#
    	# descr = ALPHA *( ALPHA / DIGIT / HYPHEN )
    	# numericoid = number 1*( DOT number )
    	# number = DIGIT / ( LDIGIT 1*DIGIT )
    	#
    	# ; The following characters are to be escaped when they appear
    	# ; in the value to be encoded: ESC, one of <escaped>, leading
    	# ; SHARP or SPACE, trailing SPACE, and NULL.
    	# string = [ ( leadchar / pair ) [ *( stringchar / pair ) ( trailchar / pair ) ] ]
    	#
    	# leadchar = LUTF1 / UTFMB
    	# LUTF1 = %x01-1F / %x21 / %x24-2A / %x2D-3A / %x3D / %x3F-5B / %x5D-7F
    	#
    	# trailchar = TUTF1 / UTFMB
    	# TUTF1 = %x01-1F / %x21 / %x23-2A / %x2D-3A / %x3D / %x3F-5B / %x5D-7F
    	#
    	# stringchar = SUTF1 / UTFMB
    	# SUTF1 = %x01-21 / %x23-2A / %x2D-3A / %x3D / %x3F-5B / %x5D-7F
    	#
    	# pair = ESC ( ESC / special / hexpair )
    	# special = escaped / SPACE / SHARP / EQUALS
    	# escaped = DQUOTE / PLUS / COMMA / SEMI / LANGLE / RANGLE
    	#
    	# hexstring = SHARP 1*hexpair
    	# hexpair = HEX HEX
    
    	@classmethod
    	def from_str(cls, schema, expr):
    		attribute, escaped_value = expr.split('=', 1)
    		if escaped_value.startswith('#'):
    			# The "#..." form is used for unknown attribute types and those without
    			# an LDAP string encoding. Supporting it would require us to somehow
    			# handle the hex-encoded BER encoding of the data. We'll stay away from
    			# this mess for now.
    			raise ValueError('Hex-encoded RDN assertion values are not supported')
    		escaped = False
    		hexdigit = None
    		encoded_value = b''
    		for char in escaped_value:
    			if hexdigit is not None:
    				encoded_value += bytes.fromhex('%s%s'%(hexdigit, char))
    				hexdigit = None
    			elif escaped:
    				if ord(char) in DN_SPECIAL + (b'\\'[0],):
    					encoded_value += char.encode('utf8')
    				elif ord(char) in HEXDIGITS:
    					hexdigit = char
    				else:
    					raise ValueError('Invalid escape: \\%s'%char)
    				escaped = False
    			elif char == '\\':
    				escaped = True
    			else:
    				encoded_value += char.encode('utf8')
    		try:
    			attribute_type = schema.attribute_types[attribute]
    		except KeyError as exc:
    			raise ValueError('Invalid RDN attribute type: Attribute type undefined in schema') from exc
    		try:
    			value = attribute_type.syntax.decode(encoded_value)
    		except exceptions.LDAPInvalidAttributeSyntax as exc:
    			raise ValueError('Invalid RDN assertion value') from exc
    		return cls(schema, attribute, value)
    
    	def __str__(self):
    		encoded_value = self.attribute_type.syntax.encode(self.value)
    		escaped_value = ''
    		for index in range(len(encoded_value)):
    			byte = encoded_value[index:index+1]
    			if not byte.isascii() or not byte.decode().isprintable():
    				escaped_value += '\\%02x'%byte[0]
    			elif byte[0] in DN_ESCAPED + (b'\\'[0],):
    				escaped_value += '\\' + byte.decode()
    			else:
    				escaped_value += byte.decode()
    		if escaped_value.startswith(' ') or escaped_value.startswith('#'):
    			escaped_value = '\\' + escaped_value
    		if escaped_value.endswith(' '):
    			escaped_value = escaped_value[:-1] + '\\' + escaped_value[-1]
    		return '%s=%s'%(self.attribute, escaped_value)
    
    	def __hash__(self):
    		return hash(self.attribute_type.oid)
    
    	def __repr__(self):
    		return '<ldapserver.dn.RDNAssertion %s>'%str(self)
    
    	def __eq__(self, obj):
    		return type(self) is type(obj) and self.attribute_type is obj.attribute_type and \
    		       self.attribute_type.equality.match_equal([self.value], obj.value)
    
    	def __ne__(self, obj):
    		return not self == obj
    
    	def __setattr__(self, *args):
    		raise TypeError('RDNAssertion object is immutable')
    
    	def __delattr__(self, *args):
    		raise TypeError('RDNAssertion object is immutable')