Skip to content
Snippets Groups Projects
Select Git revision
  • 38eeb41cb1a4ce4d9440b8a441ecab7a253ee811
  • master default protected
  • decorator-interface
  • v0.1.2 protected
  • v0.1.1 protected
  • v0.1.0 protected
  • v0.0.1.dev6 protected
  • v0.0.1.dev5 protected
  • v0.0.1.dev4 protected
  • v0.0.1.dev3 protected
  • v0.0.1.dev2 protected
  • v0.0.1.dev1 protected
  • v0.0.1.dev0 protected
13 results

server.py

Blame
  • dn.py 3.14 KiB
    from string import hexdigits as HEXDIGITS
    
    DN_ESCAPED = ('"', '+', ',', ';', '<', '>')
    DN_SPECIAL = DN_ESCAPED + (' ', '#', '=')
    
    def parse_assertion(expr, case_ignore_attrs=None):
    	case_ignore_attrs = case_ignore_attrs or []
    	hexdigit = None
    	escaped = False
    	tokens = []
    	token = b''
    	for c in expr:
    		if hexdigit is not None:
    			if c not in HEXDIGITS:
    				raise ValueError('Invalid hexpair: \\%c%c'%(hexdigit, c))
    			token += bytes.fromhex('%c%c'%(hexdigit, c))
    			hexdigit = None
    		elif escaped:
    			escaped = False
    			if c in DN_SPECIAL or c == '\\':
    				token += c.encode()
    			elif c in HEXDIGITS:
    				hexdigit = c
    			else:
    				raise ValueError('Invalid escape: \\%c'%c)
    		elif c == '\\':
    			escaped = True
    		elif c == '=':
    			tokens.append(token)
    			token = b''
    		else:
    			token += c.encode()
    	tokens.append(token)
    	if len(tokens) != 2:
    		raise ValueError('Invalid assertion in RDN: "%s"'%expr)
    	name = tokens[0].decode().lower()
    	value = tokens[1]
    	if not name or not value:
    		raise ValueError('Invalid assertion in RDN: "%s"'%expr)
    	# TODO: handle hex strings
    	if name in case_ignore_attrs:
    		value = value.lower()
    	return (name, value)
    
    def parse_rdn(rdn, case_ignore_attrs=None):
    	escaped = False
    	assertions = []
    	token = ''
    	for c in rdn:
    		if escaped:
    			escaped = False
    			token += c
    		elif c == '+':
    			assertions.append(parse_assertion(token, case_ignore_attrs=case_ignore_attrs))
    			token = ''
    		else:
    			if c == '\\':
    				escaped = True
    			token += c
    	assertions.append(parse_assertion(token, case_ignore_attrs=case_ignore_attrs))
    	if not assertions:
    		raise ValueError('Invalid RDN "%s"'%rdn)
    	return tuple(sorted(assertions))
    
    def parse_dn(dn, case_ignore_attrs=None):
    	if not dn:
    		return tuple()
    	escaped = False
    	rdns = []
    	rdn = ''
    	for c in dn:
    		if escaped:
    			escaped = False
    			rdn += c
    		elif c == ',':
    			rdns.append(parse_rdn(rdn, case_ignore_attrs=case_ignore_attrs))
    			rdn = ''
    		else:
    			if c == '\\':
    				escaped = True
    			rdn += c
    	rdns.append(parse_rdn(rdn, case_ignore_attrs=case_ignore_attrs))
    	return tuple(rdns)
    
    # >>> parse_dn('OU=Sales+CN=J.  Smith,DC=example,DC=net', case_ignore_attrs=['cn', 'ou', 'dc'])
    # ((('cn', b'j.  smith'), ('ou', b'sales')), (('dc', b'example'),), (('dc', b'net'),))
    
    def escape_dn_value(value):
    	if isinstance(value, int):
    		value = str(value)
    	if isinstance(value, str):
    		value = value.encode()
    	res = ''
    	for c in value:
    		c = bytes((c,))
    		try:
    			s = c.decode()
    		except UnicodeDecodeError:
    			s = '\\'+c.hex()
    		if s in DN_SPECIAL:
    			s = '\\'+s
    		res += s
    	return res
    
    def build_assertion(assertion):
    	name, value = assertion
    	return '%s=%s'%(escape_dn_value(name.encode()), escape_dn_value(value))
    
    def build_rdn(assertions):
    	return '+'.join(map(build_assertion, assertions))
    
    def build_dn(rdns):
    	return ','.join(map(build_rdn, rdns))
    
    from enum import Enum
    
    class DNScope(Enum):
    	baseObject = 0 # The scope is constrained to the entry named by baseObject.
    	singleLevel = 1 # The scope is constrained to the immediate subordinates of the entry named by baseObject.
    	wholeSubtree = 2 # The scope is constrained to the entry named by baseObject and to all its subordinates.
    
    	@classmethod
    	def from_bytes(cls, data):
    		return self(data)