Select Git revision
dn.py 3.14 KiB
from string import hexdigits as HEXDIGITS
DN_ESCAPED = ('"', '+', ',', ';', '<', '>')
DN_SPECIAL = DN_ESCAPED + (' ', '#', '=')
def parse_assertion(expr, case_ignore_attrs=None):
case_ignore_attrs = case_ignore_attrs or []
hexdigit = None
escaped = False
tokens = []
token = b''
for c in expr:
if hexdigit is not None:
if c not in HEXDIGITS:
raise ValueError('Invalid hexpair: \\%c%c'%(hexdigit, c))
token += bytes.fromhex('%c%c'%(hexdigit, c))
hexdigit = None
elif escaped:
escaped = False
if c in DN_SPECIAL or c == '\\':
token += c.encode()
elif c in HEXDIGITS:
hexdigit = c
else:
raise ValueError('Invalid escape: \\%c'%c)
elif c == '\\':
escaped = True
elif c == '=':
tokens.append(token)
token = b''
else:
token += c.encode()
tokens.append(token)
if len(tokens) != 2:
raise ValueError('Invalid assertion in RDN: "%s"'%expr)
name = tokens[0].decode().lower()
value = tokens[1]
if not name or not value:
raise ValueError('Invalid assertion in RDN: "%s"'%expr)
# TODO: handle hex strings
if name in case_ignore_attrs:
value = value.lower()
return (name, value)
def parse_rdn(rdn, case_ignore_attrs=None):
escaped = False
assertions = []
token = ''
for c in rdn:
if escaped:
escaped = False
token += c
elif c == '+':
assertions.append(parse_assertion(token, case_ignore_attrs=case_ignore_attrs))
token = ''
else:
if c == '\\':
escaped = True
token += c
assertions.append(parse_assertion(token, case_ignore_attrs=case_ignore_attrs))
if not assertions:
raise ValueError('Invalid RDN "%s"'%rdn)
return tuple(sorted(assertions))
def parse_dn(dn, case_ignore_attrs=None):
if not dn:
return tuple()
escaped = False
rdns = []
rdn = ''
for c in dn:
if escaped:
escaped = False
rdn += c
elif c == ',':
rdns.append(parse_rdn(rdn, case_ignore_attrs=case_ignore_attrs))
rdn = ''
else:
if c == '\\':
escaped = True
rdn += c
rdns.append(parse_rdn(rdn, case_ignore_attrs=case_ignore_attrs))
return tuple(rdns)
# >>> parse_dn('OU=Sales+CN=J. Smith,DC=example,DC=net', case_ignore_attrs=['cn', 'ou', 'dc'])
# ((('cn', b'j. smith'), ('ou', b'sales')), (('dc', b'example'),), (('dc', b'net'),))
def escape_dn_value(value):
if isinstance(value, int):
value = str(value)
if isinstance(value, str):
value = value.encode()
res = ''
for c in value:
c = bytes((c,))
try:
s = c.decode()
except UnicodeDecodeError:
s = '\\'+c.hex()
if s in DN_SPECIAL:
s = '\\'+s
res += s
return res
def build_assertion(assertion):
name, value = assertion
return '%s=%s'%(escape_dn_value(name.encode()), escape_dn_value(value))
def build_rdn(assertions):
return '+'.join(map(build_assertion, assertions))
def build_dn(rdns):
return ','.join(map(build_rdn, rdns))
from enum import Enum
class DNScope(Enum):
baseObject = 0 # The scope is constrained to the entry named by baseObject.
singleLevel = 1 # The scope is constrained to the immediate subordinates of the entry named by baseObject.
wholeSubtree = 2 # The scope is constrained to the entry named by baseObject and to all its subordinates.
@classmethod
def from_bytes(cls, data):
return self(data)