From f9f1eb802a49e0c9584fb374e55645672aaebb29 Mon Sep 17 00:00:00 2001 From: Julian Rother <julian@jrother.eu> Date: Wed, 28 Jul 2021 20:41:53 +0200 Subject: [PATCH] Refactored types and naming in asn1/ldap code --- ldapserver/asn1.py | 110 ++++++++++++------------ ldapserver/ldap.py | 204 +++++++++++++++++++++++++-------------------- 2 files changed, 171 insertions(+), 143 deletions(-) diff --git a/ldapserver/asn1.py b/ldapserver/asn1.py index c8b1e91..57ae0e7 100644 --- a/ldapserver/asn1.py +++ b/ldapserver/asn1.py @@ -1,3 +1,6 @@ +import typing +import enum +from abc import ABC, abstractmethod from collections import namedtuple BERObject = namedtuple('BERObject', ['tag', 'content']) @@ -62,12 +65,14 @@ def encode_ber_integer(value): raise NotImplementedError('Encoding of integers greater than 255 is not implemented') return bytes([value]) -class BERType: +class BERType(ABC): @classmethod + @abstractmethod def from_ber(cls, data): raise NotImplementedError() @classmethod + @abstractmethod def to_ber(cls, obj): raise NotImplementedError() @@ -75,28 +80,28 @@ class BERType: return type(self).to_ber(self) class OctetString(BERType): - ber_tag = (0, False, 4) + BER_TAG = (0, False, 4) @classmethod def from_ber(cls, data): obj, rest = decode_ber(data) - if obj.tag != cls.ber_tag: - raise ValueError('Expected tag %s but found %s'%(cls.ber_tag, obj.tag)) + if obj.tag != cls.BER_TAG: + raise ValueError('Expected tag %s but found %s'%(cls.BER_TAG, obj.tag)) return obj.content, rest @classmethod def to_ber(cls, obj): if not isinstance(obj, bytes): raise TypeError() - return encode_ber(BERObject(cls.ber_tag, obj)) + return encode_ber(BERObject(cls.BER_TAG, obj)) class Integer(BERType): - ber_tag = (0, False, 2) + BER_TAG = (0, False, 2) @classmethod def from_ber(cls, data): obj, rest = decode_ber(data) - if obj.tag != cls.ber_tag: + if obj.tag != cls.BER_TAG: raise ValueError() return decode_ber_integer(obj.content), rest @@ -104,15 +109,15 @@ class Integer(BERType): def to_ber(cls, obj): if not isinstance(obj, int): raise TypeError() - return encode_ber(BERObject(cls.ber_tag, encode_ber_integer(obj))) + return encode_ber(BERObject(cls.BER_TAG, encode_ber_integer(obj))) class Boolean(BERType): - ber_tag = (0, False, 1) + BER_TAG = (0, False, 1) @classmethod def from_ber(cls, data): obj, rest = decode_ber(data) - if obj.tag != cls.ber_tag: + if obj.tag != cls.BER_TAG: raise ValueError() return bool(decode_ber_integer(obj.content)), rest @@ -121,21 +126,21 @@ class Boolean(BERType): if not isinstance(obj, bool): raise TypeError() content = b'\xff' if obj else b'\x00' - return encode_ber(BERObject(cls.ber_tag, content)) + return encode_ber(BERObject(cls.BER_TAG, content)) class Set(BERType): - ber_tag = (0, True, 17) - set_type = OctetString + BER_TAG = (0, True, 17) + SET_TYPE: BERType @classmethod def from_ber(cls, data): setobj, rest = decode_ber(data) - if setobj.tag != cls.ber_tag: + if setobj.tag != cls.BER_TAG: raise ValueError() objs = [] data = setobj.content while data: - obj, data = cls.set_type.from_ber(data) + obj, data = cls.SET_TYPE.from_ber(data) objs.append(obj) return list(objs), rest @@ -143,20 +148,19 @@ class Set(BERType): def to_ber(cls, obj): content = b'' for item in obj: - content += cls.set_type.to_ber(item) - return encode_ber(BERObject(cls.ber_tag, content)) + content += cls.SET_TYPE.to_ber(item) + return encode_ber(BERObject(cls.BER_TAG, content)) class SequenceOf(Set): - ber_tag = (0, True, 16) + BER_TAG = (0, True, 16) class Sequence(BERType): - ber_tag = (0, True, 16) - sequence_fields = [ - #(Type, attr_name, default_value, optional?), - ] + BER_TAG = (0, True, 16) + # (Type, attr_name, default_value, optional?) + SEQUENCE_FIELDS: typing.ClassVar[typing.List[typing.Tuple[BERType, str, typing.Any, bool]]] = [] def __init__(self, *args, **kwargs): - for index, spec in enumerate(type(self).sequence_fields): + for index, spec in enumerate(type(self).SEQUENCE_FIELDS): # pylint: disable=consider-using-get,unused-variable field_type, name, default, optional = spec if index < len(args): @@ -170,19 +174,19 @@ class Sequence(BERType): def __repr__(self): args = [] # pylint: disable=unused-variable - for field_type, name, default, optional in type(self).sequence_fields: + for field_type, name, default, optional in type(self).SEQUENCE_FIELDS: args.append('%s=%s'%(name, repr(getattr(self, name)))) return '<%s(%s)>'%(type(self).__name__, ', '.join(args)) @classmethod def from_ber(cls, data): seqobj, rest = decode_ber(data) - if seqobj.tag != cls.ber_tag: + if seqobj.tag != cls.BER_TAG: raise ValueError() args = [] data = seqobj.content # pylint: disable=unused-variable - for field_type, name, default, optional in cls.sequence_fields: + for field_type, name, default, optional in cls.SEQUENCE_FIELDS: try: obj, data = field_type.from_ber(data) args.append(obj) @@ -198,19 +202,19 @@ class Sequence(BERType): raise TypeError() content = b'' # pylint: disable=unused-variable - for field_type, name, default, optional in cls.sequence_fields: + for field_type, name, default, optional in cls.SEQUENCE_FIELDS: if not optional or getattr(obj, name) is not None: content += field_type.to_ber(getattr(obj, name)) - return encode_ber(BERObject(cls.ber_tag, content)) + return encode_ber(BERObject(cls.BER_TAG, content)) class Choice(BERType): - ber_tag = None + BER_TAG: typing.ClassVar[typing.Tuple[int, bool, int]] @classmethod def from_ber(cls, data): obj, rest = decode_ber(data) for subcls in cls.__subclasses__(): - if subcls.ber_tag == obj.tag: + if subcls.BER_TAG == obj.tag: return subcls.from_ber(data) return None, rest @@ -222,68 +226,68 @@ class Choice(BERType): raise TypeError() class Wrapper(BERType): - ber_tag = None - wrapped_attribute = None - wrapped_type = None - wrapped_default = None - wrapped_clsattrs = {} + BER_TAG: typing.ClassVar[typing.Tuple[int, bool, int]] + WRAPPED_ATTRIBUTE: typing.ClassVar[str] + WRAPPED_TYPE: typing.ClassVar[BERType] + WRAPPED_DEFAULT: typing.ClassVar[typing.Any] + WRAPPED_CLSATTRS: typing.ClassVar[typing.Dict[str, typing.Any]] = {} def __init__(self, *args, **kwargs): cls = type(self) - attribute = cls.wrapped_attribute + attribute = cls.WRAPPED_ATTRIBUTE if args: setattr(self, attribute, args[0]) elif kwargs: setattr(self, attribute, kwargs[attribute]) else: - setattr(self, attribute, cls.wrapped_default() if callable(cls.wrapped_default) else cls.wrapped_default) + setattr(self, attribute, cls.WRAPPED_DEFAULT() if callable(cls.WRAPPED_DEFAULT) else cls.WRAPPED_DEFAULT) def __repr__(self): - return '<%s(%s)>'%(type(self).__name__, repr(getattr(self, type(self).wrapped_attribute))) + return '<%s(%s)>'%(type(self).__name__, repr(getattr(self, type(self).WRAPPED_ATTRIBUTE))) @classmethod def from_ber(cls, data): - class WrappedType(cls.wrapped_type): - ber_tag = cls.ber_tag - for key, value in cls.wrapped_clsattrs.items(): + class WrappedType(cls.WRAPPED_TYPE): + BER_TAG = cls.BER_TAG + for key, value in cls.WRAPPED_CLSATTRS.items(): setattr(WrappedType, key, value) value, rest = WrappedType.from_ber(data) return cls(value), rest @classmethod def to_ber(cls, obj): - class WrappedType(cls.wrapped_type): - ber_tag = cls.ber_tag - for key, value in cls.wrapped_clsattrs.items(): + class WrappedType(cls.WRAPPED_TYPE): + BER_TAG = cls.BER_TAG + for key, value in cls.WRAPPED_CLSATTRS.items(): setattr(WrappedType, key, value) if not isinstance(obj, cls): raise TypeError() - return WrappedType.to_ber(getattr(obj, cls.wrapped_attribute)) + return WrappedType.to_ber(getattr(obj, cls.WRAPPED_ATTRIBUTE)) def retag(cls, tag): class Overwritten(cls): - ber_tag = tag + BER_TAG = tag return Overwritten class Enum(BERType): - ber_tag = (0, False, 10) - enum_type = None + BER_TAG = (0, False, 10) + ENUM_TYPE = typing.ClassVar[enum.Enum] @classmethod def from_ber(cls, data): obj, rest = decode_ber(data) - if obj.tag != cls.ber_tag: + if obj.tag != cls.BER_TAG: raise ValueError() value = decode_ber_integer(obj.content) - return cls.enum_type(value), rest + return cls.ENUM_TYPE(value), rest @classmethod def to_ber(cls, obj): - if not isinstance(obj, cls.enum_type): + if not isinstance(obj, cls.ENUM_TYPE): raise TypeError() - return encode_ber(BERObject(cls.ber_tag, encode_ber_integer(obj.value))) + return encode_ber(BERObject(cls.BER_TAG, encode_ber_integer(obj.value))) def wrapenum(enumtype): class WrappedEnum(Enum): - enum_type = enumtype + ENUM_TYPE = enumtype return WrappedEnum diff --git a/ldapserver/ldap.py b/ldapserver/ldap.py index 2888d86..a1c9f51 100644 --- a/ldapserver/ldap.py +++ b/ldapserver/ldap.py @@ -1,6 +1,8 @@ # Enums/constants/attributes are mostly named after RFCs # No-members is raised for all the dynamically populated attributes -# pylint: disable=invalid-name,no-member +# pylint: disable=invalid-name +import typing +from abc import ABC, abstractmethod import enum from . import asn1 @@ -21,11 +23,14 @@ class LDAPOID(LDAPString): pass class AttributeValueAssertion(asn1.Sequence): - sequence_fields = [ + SEQUENCE_FIELDS = [ (LDAPString, 'attributeDesc', None, False), (asn1.OctetString, 'assertionValue', None, False), ] + attributeDesc: str + assertionValue: bytes + def escape_filter_assertionvalue(s): allowed_bytes = b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'+,-./:;<=>?@[\\]^_`{|}~ ' res = [] @@ -36,9 +41,10 @@ def escape_filter_assertionvalue(s): res.append(byte) return bytes(res).decode() -class Filter(asn1.Choice): +class Filter(asn1.Choice, ABC): '''Base class for filters in SEARCH operations''' + @abstractmethod def get_filter_string(self): raise NotImplementedError() @@ -51,10 +57,12 @@ class FilterAnd(asn1.Wrapper, Filter): List of :any:`Filter` objects ''' - ber_tag = (2, True, 0) - wrapped_attribute = 'filters' - wrapped_type = asn1.Set - wrapped_clsattrs = {'set_type': Filter} + BER_TAG = (2, True, 0) + WRAPPED_ATTRIBUTE = 'filters' + WRAPPED_TYPE = asn1.Set + WRAPPED_CLSATTRS = {'SET_TYPE': Filter} + + filters: typing.List[Filter] def __init__(self, filters=None): super().__init__(filters=filters) @@ -71,10 +79,12 @@ class FilterOr(asn1.Wrapper, Filter): List of :any:`Filter` objects ''' - ber_tag = (2, True, 1) - wrapped_attribute = 'filters' - wrapped_type = asn1.Set - wrapped_clsattrs = {'set_type': Filter} + BER_TAG = (2, True, 1) + WRAPPED_ATTRIBUTE = 'filters' + WRAPPED_TYPE = asn1.Set + WRAPPED_CLSATTRS = {'SET_TYPE': Filter} + + filters: typing.List[Filter] def __init__(self, filters=None): super().__init__(filters=filters) @@ -89,11 +99,13 @@ class FilterNot(asn1.Sequence, Filter): :type: Filter ''' - ber_tag = (2, True, 2) - sequence_fields = [ + BER_TAG = (2, True, 2) + SEQUENCE_FIELDS = [ (Filter, 'filter', None, False) ] + filter: Filter + # pylint: disable=redefined-builtin def __init__(self, filter=None): super().__init__(filter=filter) @@ -109,26 +121,29 @@ class FilterEqual(asn1.Sequence, Filter): .. py:attribute:: value :type: bytes ''' - ber_tag = (2, True, 3) - sequence_fields = [ + BER_TAG = (2, True, 3) + SEQUENCE_FIELDS = [ (LDAPString, 'attribute', None, False), (asn1.OctetString, 'value', None, False) ] + attribute: str + value: bytes + def __init__(self, attribute=None, value=None): super().__init__(attribute=attribute, value=value) def get_filter_string(self): return '(%s=%s)'%(self.attribute, escape_filter_assertionvalue(self.value)) -class FilterGreaterOrEqual(AttributeValueAssertion, Filter): - ber_tag = (2, True, 5) +class FilterGreaterOrEqual(FilterEqual): + BER_TAG = (2, True, 5) def get_filter_string(self): return '(%s>=%s)'%(self.attribute, escape_filter_assertionvalue(self.value)) -class FilterLessOrEqual(AttributeValueAssertion, Filter): - ber_tag = (2, True, 6) +class FilterLessOrEqual(FilterEqual): + BER_TAG = (2, True, 6) def get_filter_string(self): return '(%s<=%s)'%(self.attribute, escape_filter_assertionvalue(self.value)) @@ -139,10 +154,12 @@ class FilterPresent(asn1.Wrapper, Filter): .. py:attribute:: attribute :type: str ''' - ber_tag = (2, False, 7) - wrapped_attribute = 'attribute' - wrapped_type = LDAPString - wrapped_default = None + BER_TAG = (2, False, 7) + WRAPPED_ATTRIBUTE = 'attribute' + WRAPPED_TYPE = LDAPString + WRAPPED_DEFAULT = None + + attribute: str def __init__(self, attribute=None): super().__init__(attribute=attribute) @@ -150,23 +167,28 @@ class FilterPresent(asn1.Wrapper, Filter): def get_filter_string(self): return '(%s=*)'%(self.attribute) -class FilterApproxMatch(AttributeValueAssertion, Filter): - ber_tag = (2, True, 8) +class FilterApproxMatch(FilterEqual): + BER_TAG = (2, True, 8) def get_filter_string(self): return '(%s~=%s)'%(self.attribute, escape_filter_assertionvalue(self.value)) class FilterExtensibleMatch(asn1.Sequence, Filter): - ber_tag = (2, True, 9) - sequence_fields = [ + BER_TAG = (2, True, 9) + SEQUENCE_FIELDS = [ (asn1.retag(LDAPString, (2, False, 1)), 'matchingRule', None, True), (asn1.retag(LDAPString, (2, False, 2)), 'type', None, True), (asn1.retag(asn1.OctetString, (2, False, 3)), 'matchValue', None, False), (asn1.retag(asn1.Boolean, (2, False, 4)), 'dnAttributes', None, True), ] + matchingRule: str + type: str + matchValue: bytes + dnAttributes: bool + def get_filter_string(self): - return '(%s:=%s)'%(self.attribute, escape_filter_assertionvalue(self.value)) + return '(%s:%s:=%s)'%(self.matchingRule, self.type, escape_filter_assertionvalue(self.matchValue)) class SearchScope(enum.Enum): ''':any:`enum.Enum` of `scope` values in SEARCH operations''' @@ -232,24 +254,26 @@ class LDAPResultCode(enum.Enum): other = 80 class LDAPResult(asn1.Sequence): - ber_tag = (5, True, 1) - sequence_fields = [ + BER_TAG = (5, True, 1) + SEQUENCE_FIELDS = [ (asn1.wrapenum(LDAPResultCode), 'resultCode', None, False), (LDAPString, 'matchedDN', '', False), (LDAPString, 'diagnosticMessage', '', False), ] class AttributeSelection(asn1.SequenceOf): - set_type = LDAPString + SET_TYPE = LDAPString class AuthenticationChoice(asn1.Choice): pass class SimpleAuthentication(asn1.Wrapper, AuthenticationChoice): - ber_tag = (2, False, 0) - wrapped_attribute = 'password' - wrapped_type = asn1.OctetString - wrapped_default = b'' + BER_TAG = (2, False, 0) + WRAPPED_ATTRIBUTE = 'password' + WRAPPED_TYPE = asn1.OctetString + WRAPPED_DEFAULT = b'' + + password: bytes def __repr__(self): if not self.password: @@ -257,48 +281,48 @@ class SimpleAuthentication(asn1.Wrapper, AuthenticationChoice): return '<%s(PASSWORD HIDDEN)>'%(type(self).__name__) class SaslCredentials(asn1.Sequence, AuthenticationChoice): - ber_tag = (2, True, 3) - sequence_fields = [ + BER_TAG = (2, True, 3) + SEQUENCE_FIELDS = [ (LDAPString, 'mechanism', None, False), (asn1.OctetString, 'credentials', None, True), ] class AttributeValueSet(asn1.Set): - set_type = asn1.OctetString + SET_TYPE = asn1.OctetString class PartialAttribute(asn1.Sequence): - sequence_fields = [ + SEQUENCE_FIELDS = [ (LDAPString, 'type', None, False), (AttributeValueSet, 'vals', lambda: [], False), ] class PartialAttributeList(asn1.SequenceOf): - set_type = PartialAttribute + SET_TYPE = PartialAttribute class Attribute(asn1.Sequence): # Constrain: vals must not be empty - sequence_fields = [ + SEQUENCE_FIELDS = [ (LDAPString, 'type', None, False), (AttributeValueSet, 'vals', lambda: [], False), ] class AttributeList(asn1.SequenceOf): - set_type = Attribute + SET_TYPE = Attribute class ProtocolOp(asn1.Choice): pass class BindRequest(asn1.Sequence, ProtocolOp): - ber_tag = (1, True, 0) - sequence_fields = [ + BER_TAG = (1, True, 0) + SEQUENCE_FIELDS = [ (asn1.Integer, 'version', 3, False), (LDAPString, 'name', '', False), (AuthenticationChoice, 'authentication', lambda: SimpleAuthentication(), False) # pylint: disable=unnecessary-lambda ] class BindResponse(asn1.Sequence, ProtocolOp): - ber_tag = (1, True, 1) - sequence_fields = [ + BER_TAG = (1, True, 1) + SEQUENCE_FIELDS = [ (asn1.wrapenum(LDAPResultCode), 'resultCode', None, False), (LDAPString, 'matchedDN', '', False), (LDAPString, 'diagnosticMessage', '', False), @@ -306,11 +330,11 @@ class BindResponse(asn1.Sequence, ProtocolOp): ] class UnbindRequest(asn1.Sequence, ProtocolOp): - ber_tag = (1, False, 2) + BER_TAG = (1, False, 2) class SearchRequest(asn1.Sequence, ProtocolOp): - ber_tag = (1, True, 3) - sequence_fields = [ + BER_TAG = (1, True, 3) + SEQUENCE_FIELDS = [ (LDAPString, 'baseObject', '', False), (asn1.wrapenum(SearchScope), 'scope', SearchScope.wholeSubtree, False), (asn1.wrapenum(DerefAliases), 'derefAliases', DerefAliases.neverDerefAliases, False), @@ -326,14 +350,14 @@ class SearchRequest(asn1.Sequence, ProtocolOp): return super().from_ber(data) class SearchResultEntry(asn1.Sequence, ProtocolOp): - ber_tag = (1, True, 4) - sequence_fields = [ + BER_TAG = (1, True, 4) + SEQUENCE_FIELDS = [ (LDAPString, 'objectName', '', False), (PartialAttributeList, 'attributes', lambda: [], False), ] class SearchResultDone(LDAPResult, ProtocolOp): - ber_tag = (1, True, 5) + BER_TAG = (1, True, 5) class ModifyOperation(enum.Enum): add = 0 @@ -341,46 +365,46 @@ class ModifyOperation(enum.Enum): replace = 2 class ModifyChange(asn1.Sequence): - sequence_fields = [ + SEQUENCE_FIELDS = [ (asn1.wrapenum(ModifyOperation), 'operation', None, False), (PartialAttribute, 'modification', None, False), ] class ModifyChanges(asn1.SequenceOf): - set_type = ModifyChange + SET_TYPE = ModifyChange class ModifyRequest(asn1.Sequence, ProtocolOp): - ber_tag = (1, True, 6) - sequence_fields = [ + BER_TAG = (1, True, 6) + SEQUENCE_FIELDS = [ (LDAPString, 'object', None, False), (ModifyChanges, 'changes', None, False), ] class ModifyResponse(LDAPResult, ProtocolOp): - ber_tag = (1, True, 7) + BER_TAG = (1, True, 7) class AddRequest(asn1.Sequence, ProtocolOp): - ber_tag = (1, True, 8) - sequence_fields = [ + BER_TAG = (1, True, 8) + SEQUENCE_FIELDS = [ (LDAPString, 'entry', None, False), (AttributeList, 'attributes', None, False), ] class AddResponse(LDAPResult, ProtocolOp): - ber_tag = (1, True, 9) + BER_TAG = (1, True, 9) class DelRequest(asn1.Wrapper, ProtocolOp): - ber_tag = (1, False, 10) - wrapped_attribute = 'dn' - wrapped_type = LDAPString - wrapped_default = None + BER_TAG = (1, False, 10) + WRAPPED_ATTRIBUTE = 'dn' + WRAPPED_TYPE = LDAPString + WRAPPED_DEFAULT = None class DelResponse(LDAPResult, ProtocolOp): - ber_tag = (1, True, 11) + BER_TAG = (1, True, 11) class ModifyDNRequest(asn1.Sequence, ProtocolOp): - ber_tag = (1, True, 12) - sequence_fields = [ + BER_TAG = (1, True, 12) + SEQUENCE_FIELDS = [ (LDAPString, 'entry', None, False), (LDAPString, 'newrdn', None, False), (asn1.Boolean, 'deleteoldrdn', None, False), @@ -388,34 +412,34 @@ class ModifyDNRequest(asn1.Sequence, ProtocolOp): ] class ModifyDNResponse(LDAPResult, ProtocolOp): - ber_tag = (1, True, 13) + BER_TAG = (1, True, 13) class CompareRequest(asn1.Sequence, ProtocolOp): - ber_tag = (1, True, 14) - sequence_fields = [ + BER_TAG = (1, True, 14) + SEQUENCE_FIELDS = [ (LDAPString, 'entry', None, False), (AttributeValueAssertion, 'ava', None, False), ] class CompareResponse(LDAPResult, ProtocolOp): - ber_tag = (1, True, 15) + BER_TAG = (1, True, 15) class AbandonRequest(asn1.Wrapper, ProtocolOp): - ber_tag = (1, False, 16) - wrapped_attribute = 'messageID' - wrapped_type = asn1.Integer - wrapped_default = None + BER_TAG = (1, False, 16) + WRAPPED_ATTRIBUTE = 'messageID' + WRAPPED_TYPE = asn1.Integer + WRAPPED_DEFAULT = None class ExtendedRequest(asn1.Sequence, ProtocolOp): - ber_tag = (1, True, 23) - sequence_fields = [ + BER_TAG = (1, True, 23) + SEQUENCE_FIELDS = [ (asn1.retag(LDAPOID, (2, False, 0)), 'requestName', None, True), (asn1.retag(asn1.OctetString, (2, False, 1)), 'requestValue', None, True), ] class ExtendedResponse(asn1.Sequence, ProtocolOp): - ber_tag = (1, True, 24) - sequence_fields = [ + BER_TAG = (1, True, 24) + SEQUENCE_FIELDS = [ (asn1.wrapenum(LDAPResultCode), 'resultCode', None, False), (LDAPString, 'matchedDN', '', False), (LDAPString, 'diagnosticMessage', '', False), @@ -424,32 +448,32 @@ class ExtendedResponse(asn1.Sequence, ProtocolOp): ] class IntermediateResponse(asn1.Sequence, ProtocolOp): - ber_tag = (1, True, 25) - sequence_fields = [ + BER_TAG = (1, True, 25) + SEQUENCE_FIELDS = [ (asn1.retag(LDAPOID, (2, False, 0)), 'responseName', None, True), (asn1.retag(asn1.OctetString, (2, False, 1)), 'responseValue', None, True), ] class Control(asn1.Sequence): - sequence_fields = [ + SEQUENCE_FIELDS = [ (LDAPOID, 'controlType', None, False), (asn1.Boolean, 'criticality', None, True), (asn1.OctetString, 'controlValue', None, True), ] class Controls(asn1.SequenceOf): - ber_tag = (2, True, 0) - set_type = Control + BER_TAG = (2, True, 0) + SET_TYPE = Control class LDAPMessage(asn1.Sequence): - sequence_fields = [ + SEQUENCE_FIELDS = [ (asn1.Integer, 'messageID', None, False), (ProtocolOp, 'protocolOp', None, False), (Controls, 'controls', None, True) ] class ShallowLDAPMessage(asn1.BERType): - ber_tag = (0, True, 16) + BER_TAG = (0, True, 16) def __init__(self, messageID=None, protocolOpType=None, data=None): self.messageID = messageID @@ -463,13 +487,13 @@ class ShallowLDAPMessage(asn1.BERType): def from_ber(cls, data): seq, rest = asn1.decode_ber(data) data = data[:len(data)-len(rest)] - if seq.tag != cls.ber_tag: + if seq.tag != cls.BER_TAG: raise ValueError() content = seq.content messageID, content = asn1.Integer.from_ber(content) op, content = asn1.decode_ber(content) for subcls in ProtocolOp.__subclasses__(): - if subcls.ber_tag == op.tag: + if subcls.BER_TAG == op.tag: return cls(messageID, subcls, data), rest return cls(messageID, None, data), rest @@ -486,13 +510,13 @@ EXT_WHOAMI_OID = '1.3.6.1.4.1.4203.1.11.3' EXT_PASSWORD_MODIFY_OID = '1.3.6.1.4.1.4203.1.11.1' class PasswdModifyRequestValue(asn1.Sequence): - sequence_fields = [ + SEQUENCE_FIELDS = [ (asn1.retag(LDAPString, (2, False, 0)), 'userIdentity', None, True), (asn1.retag(asn1.OctetString, (2, False, 1)), 'oldPasswd', None, True), (asn1.retag(asn1.OctetString, (2, False, 2)), 'newPasswd', None, True), ] class PasswdModifyResponseValue(asn1.Sequence): - sequence_fields = [ + SEQUENCE_FIELDS = [ (asn1.retag(asn1.OctetString, (2, False, 0)), 'genPasswd', None, True), ] -- GitLab