From ae80b02c52dffd3c9aa114e06d2646817f8bfa90 Mon Sep 17 00:00:00 2001 From: Julian Rother <julian@jrother.eu> Date: Thu, 21 Oct 2021 22:58:46 +0200 Subject: [PATCH] Separated Object-related code from schema code --- ldapserver/__init__.py | 1 + ldapserver/objects.py | 409 ++++++++++++++++++ ldapserver/schema/__init__.py | 8 +- ldapserver/schema/rfc4517/matching_rules.py | 60 +-- ldapserver/schema/types.py | 446 +------------------- ldapserver/server.py | 7 +- 6 files changed, 453 insertions(+), 478 deletions(-) create mode 100644 ldapserver/objects.py diff --git a/ldapserver/__init__.py b/ldapserver/__init__.py index f39804c..4435599 100644 --- a/ldapserver/__init__.py +++ b/ldapserver/__init__.py @@ -3,3 +3,4 @@ from . import dn from . import exceptions from .server import BaseLDAPRequestHandler, LDAPRequestHandler +from .objects import SubschemaSubentry diff --git a/ldapserver/objects.py b/ldapserver/objects.py new file mode 100644 index 0000000..dc87113 --- /dev/null +++ b/ldapserver/objects.py @@ -0,0 +1,409 @@ +import enum + +from . import ldap +from .dn import DN + +class FilterResult(enum.Enum): + TRUE = enum.auto() + FALSE = enum.auto() + UNDEFINED = enum.auto() + MAYBE_TRUE = enum.auto() # used by ObjectTemplate + +def match_to_filter_result(match_result): + if match_result is True: + return FilterResult.TRUE + if match_result is False: + return FilterResult.FALSE + return FilterResult.UNDEFINED + +def any_3value(iterable): + '''Extended three-valued logic equivalent of any builtin + + If all items are TRUE, return TRUE. Otherwise if any item is MAYBE_TRUE, + return MAYBE_TRUE. If neither TRUE nor MAYBE_TRUE are in items, but any + item is UNDEFINED, return UNDEFINED. Otherwise (all items are FALSE), + return FALSE.''' + result = FilterResult.FALSE + for item in iterable: + if item == FilterResult.TRUE: + return FilterResult.TRUE + elif item == FilterResult.MAYBE_TRUE: + result = FilterResult.MAYBE_TRUE + elif item == FilterResult.UNDEFINED and result == FilterResult.FALSE: + result = FilterResult.UNDEFINED + return result + +def all_3value(iterable): + '''Extended three-valued logic equivalent of all builtin + + If all items are TRUE, return TRUE. If any item is FALSE, return FALSE. + If no item is FALSE and any item is UNDEFINED, return UNDEFINED. + Otherwise (not item is FALSE or UNDEFINED and not all items are TRUE, + so at least one item is MAYBE_TRUE), return MAYBE_TRUE.''' + result = FilterResult.TRUE + for item in iterable: + if item == FilterResult.FALSE: + return FilterResult.FALSE + elif item == FilterResult.UNDEFINED: + result = FilterResult.UNDEFINED + elif item == FilterResult.MAYBE_TRUE and result == FilterResult.TRUE: + result = FilterResult.MAYBE_TRUE + return result + +class AttributeDict(dict): + def __init__(self, schema, **attributes): + super().__init__() + self.schema = schema + for key, value in attributes.items(): + self[key] = value + + def __contains__(self, key): + return super().__contains__(self.schema.lookup_attribute(key)) + + def __setitem__(self, key, value): + super().__setitem__(self.schema.lookup_attribute(key, fail_if_not_found=True), value) + + def __getitem__(self, key): + key = self.schema.lookup_attribute(key, fail_if_not_found=True) + if key not in self: + super().__setitem__(key, []) + result = super().__getitem__(key) + if callable(result): + return result() + return result + + def setdefault(self, key, default=None): + key = self.schema.lookup_attribute(key, fail_if_not_found=True) + return super().setdefault(key, default) + + def get(self, key, default=None): + key = self.schema.lookup_attribute(key, fail_if_not_found=True) + if key in self: + return self[key] + return default + + def get_all(self, key): + result = [] + for attr in self.schema.lookup_attribute_list(key): + result += self[attr] + return result + + def match_present(self, key): + attribute_type = self.schema.lookup_attribute(key) + if attribute_type is None: + return FilterResult.UNDEFINED + if self[attribute_type] != []: + return FilterResult.TRUE + else: + return FilterResult.FALSE + + def match_equal(self, key, assertion_value): + attribute_type = self.schema.lookup_attribute(key) + if attribute_type is None or attribute_type.equality is None: + return FilterResult.UNDEFINED + assertion_value = attribute_type.equality.syntax.decode(assertion_value) + if assertion_value is None: + return FilterResult.UNDEFINED + return any_3value(map(lambda attrval: match_to_filter_result(attribute_type.equality.match_equal(attrval, assertion_value)), self.get_all(key))) + + def match_substr(self, key, inital_substring, any_substrings, final_substring): + attribute_type = self.schema.lookup_attribute(key) + if attribute_type is None or attribute_type.equality is None or attribute_type.substr is None: + return FilterResult.UNDEFINED + if inital_substring: + inital_substring = attribute_type.equality.syntax.decode(inital_substring) + if inital_substring is None: + return FilterResult.UNDEFINED + any_substrings = [attribute_type.equality.syntax.decode(substring) for substring in any_substrings] + if None in any_substrings: + return FilterResult.UNDEFINED + if final_substring: + final_substring = attribute_type.equality.syntax.decode(final_substring) + if final_substring is None: + return FilterResult.UNDEFINED + return any_3value(map(lambda attrval: match_to_filter_result(attribute_type.substr.match_substr(attrval, inital_substring, any_substrings, final_substring)), self.get_all(key))) + + def match_approx(self, key, assertion_value): + attribute_type = self.schema.lookup_attribute(key) + if attribute_type is None or attribute_type.equality is None: + return FilterResult.UNDEFINED + assertion_value = attribute_type.equality.syntax.decode(assertion_value) + if assertion_value is None: + return FilterResult.UNDEFINED + return any_3value(map(lambda attrval: match_to_filter_result(attribute_type.equality.match_approx(attrval, assertion_value)), self.get_all(key))) + + def match_greater_or_equal(self, key, assertion_value): + attribute_type = self.schema.lookup_attribute(key) + if attribute_type is None or attribute_type.ordering is None: + return FilterResult.UNDEFINED + assertion_value = attribute_type.ordering.syntax.decode(assertion_value) + if assertion_value is None: + return FilterResult.UNDEFINED + return any_3value(map(lambda attrval: match_to_filter_result(attribute_type.ordering.match_greater_or_equal(attrval, assertion_value)), self.get_all(key))) + + def match_less(self, key, assertion_value): + attribute_type = self.schema.lookup_attribute(key) + if attribute_type is None or attribute_type.ordering is None: + return FilterResult.UNDEFINED + assertion_value = attribute_type.ordering.syntax.decode(assertion_value) + if assertion_value is None: + return FilterResult.UNDEFINED + return any_3value(map(lambda attrval: match_to_filter_result(attribute_type.ordering.match_less(attrval, assertion_value)), self.get_all(key))) + + def match_less_or_equal(self, key, assertion_value): + return any_3value((self.match_equal(key, assertion_value), + self.match_less(key, assertion_value))) + + def match_filter(self, filter_obj): + if isinstance(filter_obj, ldap.FilterAnd): + return all_3value(map(self.match_filter, filter_obj.filters)) + elif isinstance(filter_obj, ldap.FilterOr): + return any_3value(map(self.match_filter, filter_obj.filters)) + elif isinstance(filter_obj, ldap.FilterNot): + subresult = self.match_filter(filter_obj.filter) + if subresult == FilterResult.TRUE: + return FilterResult.FALSE + elif subresult == FilterResult.FALSE: + return FilterResult.TRUE + else: + return subresult + elif isinstance(filter_obj, ldap.FilterPresent): + return self.match_present(filter_obj.attribute) + elif isinstance(filter_obj, ldap.FilterEqual): + return self.match_equal(filter_obj.attribute, filter_obj.value) + elif isinstance(filter_obj, ldap.FilterSubstrings): + return self.match_substr(filter_obj.attribute, filter_obj.initial_substring, + filter_obj.any_substrings, filter_obj.final_substring) + elif isinstance(filter_obj, ldap.FilterApproxMatch): + return self.match_approx(filter_obj.attribute, filter_obj.value) + elif isinstance(filter_obj, ldap.FilterGreaterOrEqual): + return self.match_greater_or_equal(filter_obj.attribute, filter_obj.value) + elif isinstance(filter_obj, ldap.FilterLessOrEqual): + return self.match_less_or_equal(filter_obj.attribute, filter_obj.value) + else: + return FilterResult.UNDEFINED + +class Object(AttributeDict): + def __init__(self, schema, dn, **attributes): + super().__init__(schema, **attributes) + self.dn = DN(dn) + + def match_dn(self, basedn, scope): + if scope == ldap.SearchScope.baseObject: + return self.dn == basedn + elif scope == ldap.SearchScope.singleLevel: + return self.dn.is_direct_child_of(basedn) + elif scope == ldap.SearchScope.wholeSubtree: + return self.dn.in_subtree_of(basedn) + else: + return False + + def match_search(self, base_obj, scope, filter_obj): + return self.match_dn(DN.from_str(base_obj), scope) and self.match_filter(filter_obj) == FilterResult.TRUE + + def get_search_result_entry(self, attributes=None, types_only=False): + selected_attributes = set() + for selector in attributes or ['*']: + if selector == '*': + selected_attributes |= self.schema.user_attribute_types + elif selector == '1.1': + continue + else: + attribute = self.schema.lookup_attribute(selector) + if attribute is not None: + selected_attributes.add(attribute) + partial_attributes = [] + for attribute in self: + if attribute not in selected_attributes: + continue + values = self[attribute] + if values != []: + if types_only: + values = [] + partial_attributes.append(ldap.PartialAttribute(attribute.name, [attribute.syntax.encode(value) for value in values])) + return ldap.SearchResultEntry(str(self.dn), partial_attributes) + +class RootDSE(Object): + def __init__(self, schema, *args, **kwargs): + super().__init__(schema, DN(), *args, **kwargs) + + def match_search(self, base_obj, scope, filter_obj): + return not base_obj and scope == ldap.SearchScope.baseObject and \ + isinstance(filter_obj, ldap.FilterPresent) and \ + filter_obj.attribute.lower() == 'objectclass' + +class WildcardValue: + pass + +WILDCARD_VALUE = WildcardValue() + +class ObjectTemplate(AttributeDict): + def __init__(self, schema, parent_dn, rdn_attribute, **attributes): + super().__init__(schema, **attributes) + self.parent_dn = parent_dn + self.rdn_attribute = rdn_attribute + + def match_present(self, key): + attribute_type = self.schema.lookup_attribute(key) + if attribute_type is None: + return FilterResult.UNDEFINED + values = self[attribute_type] + if values == []: + return FilterResult.FALSE + elif WILDCARD_VALUE in values: + return FilterResult.MAYBE_TRUE + else: + return FilterResult.TRUE + + def match_equal(self, key, assertion_value): + attribute_type = self.schema.lookup_attribute(key) + if attribute_type is None or attribute_type.equality is None: + return FilterResult.UNDEFINED + assertion_value = attribute_type.equality.syntax.decode(assertion_value) + if assertion_value is None: + return FilterResult.UNDEFINED + values = self.get_all(key) + if WILDCARD_VALUE in values: + return FilterResult.MAYBE_TRUE + return any_3value(map(lambda attrval: match_to_filter_result(attribute_type.equality.match_equal(attrval, assertion_value)), values)) + + def match_substr(self, key, inital_substring, any_substrings, final_substring): + attribute_type = self.schema.lookup_attribute(key) + if attribute_type is None or attribute_type.equality is None or attribute_type.substr is None: + return FilterResult.UNDEFINED + if inital_substring: + inital_substring = attribute_type.equality.syntax.decode(inital_substring) + if inital_substring is None: + return FilterResult.UNDEFINED + any_substrings = [attribute_type.equality.syntax.decode(substring) for substring in any_substrings] + if None in any_substrings: + return FilterResult.UNDEFINED + if final_substring: + final_substring = attribute_type.equality.syntax.decode(final_substring) + if final_substring is None: + return FilterResult.UNDEFINED + values = self.get_all(key) + if WILDCARD_VALUE in values: + return FilterResult.MAYBE_TRUE + return any_3value(map(lambda attrval: match_to_filter_result(attribute_type.substr.match_substr(attrval, inital_substring, any_substrings, final_substring)), values)) + + def match_approx(self, key, assertion_value): + attribute_type = self.schema.lookup_attribute(key) + if attribute_type is None or attribute_type.equality is None: + return FilterResult.UNDEFINED + assertion_value = attribute_type.equality.syntax.decode(assertion_value) + if assertion_value is None: + return FilterResult.UNDEFINED + values = self.get_all(key) + if WILDCARD_VALUE in values: + return FilterResult.MAYBE_TRUE + return any_3value(map(lambda attrval: match_to_filter_result(attribute_type.equality.match_approx(attrval, assertion_value)), values)) + + def match_greater_or_equal(self, key, assertion_value): + attribute_type = self.schema.lookup_attribute(key) + if attribute_type is None or attribute_type.ordering is None: + return FilterResult.UNDEFINED + assertion_value = attribute_type.ordering.syntax.decode(assertion_value) + if assertion_value is None: + return FilterResult.UNDEFINED + values = self.get_all(key) + if WILDCARD_VALUE in values: + return FilterResult.MAYBE_TRUE + return any_3value(map(lambda attrval: match_to_filter_result(attribute_type.ordering.match_greater_or_equal(attrval, assertion_value)), values)) + + def match_less(self, key, assertion_value): + attribute_type = self.schema.lookup_attribute(key) + if attribute_type is None or attribute_type.ordering is None: + return FilterResult.UNDEFINED + assertion_value = attribute_type.ordering.syntax.decode(assertion_value) + if assertion_value is None: + return FilterResult.UNDEFINED + values = self.get_all(key) + if WILDCARD_VALUE in values: + return FilterResult.MAYBE_TRUE + return any_3value(map(lambda attrval: match_to_filter_result(attribute_type.ordering.match_less(attrval, assertion_value)), values)) + + def __extract_dn_constraints(self, basedn, scope): + if scope == ldap.SearchScope.baseObject: + if basedn[1:] != self.parent_dn or basedn.object_attribute != self.rdn_attribute: + return False, AttributeDict(self.schema) + return True, AttributeDict(self.schema, **{self.rdn_attribute: [basedn.object_value]}) + elif scope == ldap.SearchScope.singleLevel: + return basedn == self.parent_dn, AttributeDict(self.schema) + elif scope == ldap.SearchScope.wholeSubtree: + if self.parent_dn.in_subtree_of(basedn): + return True, AttributeDict(self.schema) + if basedn[1:] != self.parent_dn or basedn.object_attribute != self.rdn_attribute: + return False, AttributeDict(self.schema) + return True, AttributeDict(self.schema, **{self.rdn_attribute: [basedn.object_value]}) + else: + return False, AttributeDict(self.schema) + + def match_dn(self, basedn, scope): + '''Return whether objects from this template might match the provided parameters''' + return self.__extract_dn_constraints(basedn, scope)[0] + + def extract_dn_constraints(self, basedn, scope): + return self.__extract_dn_constraints(basedn, scope)[1] + + def extract_filter_constraints(self, filter_obj): + if isinstance(filter_obj, ldap.FilterEqual): + attribute_type = self.schema.lookup_attribute(filter_obj.attribute) + if attribute_type is None or attribute_type.equality is None: + return AttributeDict(self.schema) + assertion_value = attribute_type.equality.syntax.decode(filter_obj.value) + if assertion_value is None: + return AttributeDict(self.schema) + return AttributeDict(self.schema, **{filter_obj.attribute: [assertion_value]}) + if isinstance(filter_obj, ldap.FilterAnd): + result = AttributeDict(self.schema) + for subfilter in filter_obj.filters: + for name, values in self.extract_filter_constraints(subfilter).items(): + result[name] += values + return result + return AttributeDict(self.schema) + + def match_search(self, base_obj, scope, filter_obj): + '''Return whether objects based on this template might match the search parameters''' + return self.match_dn(DN.from_str(base_obj), scope) and self.match_filter(filter_obj) in (FilterResult.TRUE, FilterResult.MAYBE_TRUE) + + def extract_search_constraints(self, base_obj, scope, filter_obj): + constraints = self.extract_filter_constraints(filter_obj) + for key, values in self.extract_dn_constraints(DN.from_str(base_obj), scope).items(): + constraints[key] += values + return constraints + + def create_object(self, rdn_value, **attributes): + obj = Object(self.schema, DN(self.parent_dn, **{self.rdn_attribute: rdn_value})) + for key, values in attributes.items(): + if WILDCARD_VALUE not in self[key]: + raise ValueError(f'Cannot set attribute "{key}" that is not set to [WILDCARD_VALUE] in the template') + obj[key] = values + for attribute_type, values in self.items(): + if WILDCARD_VALUE not in values: + obj[attribute_type] = values + return obj + +class SubschemaSubentry(Object): + '''Special :any:`Object` providing information on a Schema''' + def __init__(self, schema, dn, **attributes): + super().__init__(schema, dn, **attributes) + self['subschemaSubentry'] = [self.dn] + self['structuralObjectClass'] = ['subtree'] + self['objectClass'] = ['top', 'subtree', 'subschema'] + self['objectClasses'] = schema.object_class_definitions + self['ldapSyntaxes'] = schema.syntax_definitions + self['matchingRules'] = schema.matching_rule_definitions + self['attributeTypes'] = schema.attribute_type_definitions + # pylint: disable=invalid-name + self.AttributeDict = lambda **attributes: AttributeDict(schema, **attributes) + self.Object = lambda *args, **attributes: Object(schema, dn, subschemaSubentry=[dn], **attributes) + self.RootDSE = lambda **attributes: RootDSE(schema, subschemaSubentry=[dn], **attributes) + self.ObjectTemplate = lambda *args, **kwargs: ObjectTemplate(schema, *args, subschemaSubentry=[dn], **kwargs) + + def match_search(self, base_obj, scope, filter_obj): + return DN.from_str(base_obj) == self.dn and \ + scope == ldap.SearchScope.baseObject and \ + isinstance(filter_obj, ldap.FilterEqual) and \ + filter_obj.attribute.lower() == 'objectclass' and \ + filter_obj.value.lower() == b'subschema' diff --git a/ldapserver/schema/__init__.py b/ldapserver/schema/__init__.py index 483d876..11c6765 100644 --- a/ldapserver/schema/__init__.py +++ b/ldapserver/schema/__init__.py @@ -2,13 +2,13 @@ from .types import * from . import rfc4517, rfc4512, rfc4519, rfc4524, rfc3112, rfc2307bis, rfc2079, rfc2252, rfc2798, rfc4523, rfc1274 # Core LDAP Schema -RFC4519_SUBSCHEMA = Subschema('cn=Subschema', rfc4519.object_classes.ALL, rfc4519.attribute_types.ALL, rfc4519.matching_rules.ALL, rfc4519.matching_rules.ALL) +RFC4519_SUBSCHEMA = Schema( rfc4519.object_classes.ALL, rfc4519.attribute_types.ALL, rfc4519.matching_rules.ALL, rfc4519.matching_rules.ALL) # COSINE LDAP/X.500 Schema -RFC4524_SUBSCHEMA = Subschema('cn=Subschema', rfc4524.object_classes.ALL, rfc4524.attribute_types.ALL, rfc4524.matching_rules.ALL, rfc4524.matching_rules.ALL) +RFC4524_SUBSCHEMA = Schema(rfc4524.object_classes.ALL, rfc4524.attribute_types.ALL, rfc4524.matching_rules.ALL, rfc4524.matching_rules.ALL) # inetOrgPerson Schema -RFC2798_SUBSCHEMA = Subschema('cn=Subschema', rfc2798.object_classes.ALL, rfc2798.attribute_types.ALL, rfc2798.matching_rules.ALL, rfc2798.matching_rules.ALL) +RFC2798_SUBSCHEMA = Schema(rfc2798.object_classes.ALL, rfc2798.attribute_types.ALL, rfc2798.matching_rules.ALL, rfc2798.matching_rules.ALL) # Extended RFC2307 (NIS) Schema -RFC2307BIS_SUBSCHEMA = Subschema('cn=Subschema', rfc2307bis.object_classes.ALL, rfc2307bis.attribute_types.ALL, rfc2307bis.matching_rules.ALL, rfc2307bis.matching_rules.ALL) +RFC2307BIS_SUBSCHEMA = Schema(rfc2307bis.object_classes.ALL, rfc2307bis.attribute_types.ALL, rfc2307bis.matching_rules.ALL, rfc2307bis.matching_rules.ALL) diff --git a/ldapserver/schema/rfc4517/matching_rules.py b/ldapserver/schema/rfc4517/matching_rules.py index bd43cb6..ea0d989 100644 --- a/ldapserver/schema/rfc4517/matching_rules.py +++ b/ldapserver/schema/rfc4517/matching_rules.py @@ -1,25 +1,16 @@ -from ..types import MatchingRule, FilterResult +from ..types import MatchingRule from ... import rfc4518_stringprep from . import syntaxes class GenericMatchingRule(MatchingRule): def match_equal(self, attribute_value, assertion_value): - if attribute_value == assertion_value: - return FilterResult.TRUE - else: - return FilterResult.FALSE + return attribute_value == assertion_value def match_less(self, attribute_value, assertion_value): - if attribute_value < assertion_value: - return FilterResult.TRUE - else: - return FilterResult.FALSE + return attribute_value < assertion_value def match_greater_or_equal(self, attribute_value, assertion_value): - if attribute_value >= assertion_value: - return FilterResult.TRUE - else: - return FilterResult.FALSE + return attribute_value >= assertion_value class StringMatchingRule(MatchingRule): def __init__(self, oid, name, syntax, matching_type=rfc4518_stringprep.MatchingType.EXACT_STRING): @@ -31,33 +22,24 @@ class StringMatchingRule(MatchingRule): attribute_value = rfc4518_stringprep.prepare(attribute_value, self.matching_type) assertion_value = rfc4518_stringprep.prepare(assertion_value, self.matching_type) except ValueError: - return FilterResult.UNDEFINED - if attribute_value == assertion_value: - return FilterResult.TRUE - else: - return FilterResult.FALSE + return None + return attribute_value == assertion_value def match_less(self, attribute_value, assertion_value): try: attribute_value = rfc4518_stringprep.prepare(attribute_value, self.matching_type) assertion_value = rfc4518_stringprep.prepare(assertion_value, self.matching_type) except ValueError: - return FilterResult.UNDEFINED - if attribute_value < assertion_value: - return FilterResult.TRUE - else: - return FilterResult.FALSE + return None + return attribute_value < assertion_value def match_greater_or_equal(self, attribute_value, assertion_value): try: attribute_value = rfc4518_stringprep.prepare(attribute_value, self.matching_type) assertion_value = rfc4518_stringprep.prepare(assertion_value, self.matching_type) except ValueError: - return FilterResult.UNDEFINED - if attribute_value >= assertion_value: - return FilterResult.TRUE - else: - return FilterResult.FALSE + return None + return attribute_value >= assertion_value def match_substr(self, attribute_value, inital_substring, any_substrings, final_substring): try: @@ -68,21 +50,21 @@ class StringMatchingRule(MatchingRule): if final_substring: final_substring = rfc4518_stringprep.prepare(final_substring, self.matching_type, rfc4518_stringprep.SubstringType.FINAL) except ValueError: - return FilterResult.UNDEFINED + return None if inital_substring: if not attribute_value.startswith(inital_substring): - return FilterResult.FALSE + return False attribute_value = attribute_value[len(inital_substring):] if final_substring: if not attribute_value.endswith(final_substring): - return FilterResult.FALSE + return False attribute_value = attribute_value[:-len(final_substring)] for substring in any_substrings: index = attribute_value.find(substring) if index == -1: - return FilterResult.FALSE + return False attribute_value = attribute_value[index+len(substring):] - return FilterResult.TRUE + return True class StringListMatchingRule(MatchingRule): def __init__(self, oid, name, syntax, matching_type=rfc4518_stringprep.MatchingType.EXACT_STRING): @@ -95,11 +77,8 @@ class StringListMatchingRule(MatchingRule): attribute_value = [rfc4518_stringprep.prepare(line, self.matching_type) for line in attribute_value] assertion_value = [rfc4518_stringprep.prepare(line, self.matching_type) for line in assertion_value] except ValueError: - return FilterResult.UNDEFINED - if attribute_value == assertion_value: - return FilterResult.TRUE - else: - return FilterResult.FALSE + return None + return attribute_value == assertion_value class FirstComponentMatchingRule(MatchingRule): def __init__(self, oid, name, syntax, attribute_name): @@ -109,10 +88,7 @@ class FirstComponentMatchingRule(MatchingRule): def match_equal(self, attribute_value, assertion_value): if not hasattr(attribute_value, self.attribute_name): return None - if getattr(attribute_value, self.attribute_name)() == assertion_value: - return FilterResult.TRUE - else: - return FilterResult.FALSE + return getattr(attribute_value, self.attribute_name)() == assertion_value bitStringMatch = GenericMatchingRule('2.5.13.16', name='bitStringMatch', syntax=syntaxes.BitString()) booleanMatch = GenericMatchingRule('2.5.13.13', name='booleanMatch', syntax=syntaxes.Boolean()) diff --git a/ldapserver/schema/types.py b/ldapserver/schema/types.py index 718c578..628f1a8 100644 --- a/ldapserver/schema/types.py +++ b/ldapserver/schema/types.py @@ -1,29 +1,5 @@ import enum -from .. import ldap -from ..dn import DN - -__all__ = [ - 'FilterResult', - 'Syntax', - 'MatchingRule', - 'AttributeTypeUsage', - 'AttributeType', - 'ObjectClassKind', - 'ObjectClass', - 'Object', - 'RootDSE', - 'Subschema', - 'WILDCARD_VALUE', - 'ObjectTemplate', -] - -class FilterResult(enum.Enum): - TRUE = enum.auto() - FALSE = enum.auto() - UNDEFINED = enum.auto() - MAYBE_TRUE = enum.auto() # used by ObjectTemplate - def escape(string): result = '' for char in string: @@ -91,19 +67,19 @@ class MatchingRule: return f'<ldapserver.schema.MatchingRule {self.encode_syntax_definition()}>' def match_equal(self, attribute_value, assertion_value): - return FilterResult.UNDEFINED + return None def match_approx(self, attribute_value, assertion_value): return self.match_equal(attribute_value, assertion_value) def match_less(self, attribute_value, assertion_value): - return FilterResult.UNDEFINED + return None def match_greater_or_equal(self, attribute_value, assertion_value): - return FilterResult.UNDEFINED + return None def match_substr(self, attribute_value, inital_substring, any_substrings, final_substring): - return FilterResult.UNDEFINED + return None class AttributeTypeUsage(enum.Enum): # pylint: disable=invalid-name @@ -238,378 +214,10 @@ class ObjectClass: def __repr__(self): return f'<ldapserver.schema.ObjectClass {self.schema_encoding}>' -def any_3value(iterable): - '''Extended three-valued logic equivalent of any builtin - - If all items are TRUE, return TRUE. Otherwise if any item is MAYBE_TRUE, - return MAYBE_TRUE. If neither TRUE nor MAYBE_TRUE are in items, but any - item is UNDEFINED, return UNDEFINED. Otherwise (all items are FALSE), - return FALSE.''' - result = FilterResult.FALSE - for item in iterable: - if item == FilterResult.TRUE: - return FilterResult.TRUE - elif item == FilterResult.MAYBE_TRUE: - result = FilterResult.MAYBE_TRUE - elif item == FilterResult.UNDEFINED and result == FilterResult.FALSE: - result = FilterResult.UNDEFINED - return result - -def all_3value(iterable): - '''Extended three-valued logic equivalent of all builtin - - If all items are TRUE, return TRUE. If any item is FALSE, return FALSE. - If no item is FALSE and any item is UNDEFINED, return UNDEFINED. - Otherwise (not item is FALSE or UNDEFINED and not all items are TRUE, - so at least one item is MAYBE_TRUE), return MAYBE_TRUE.''' - result = FilterResult.TRUE - for item in iterable: - if item == FilterResult.FALSE: - return FilterResult.FALSE - elif item == FilterResult.UNDEFINED: - result = FilterResult.UNDEFINED - elif item == FilterResult.MAYBE_TRUE and result == FilterResult.TRUE: - result = FilterResult.MAYBE_TRUE - return result - -class AttributeDict(dict): - def __init__(self, subschema, **attributes): - super().__init__() - self.subschema = subschema - for key, value in attributes.items(): - self[key] = value - - def __contains__(self, key): - return super().__contains__(self.subschema.lookup_attribute(key)) - - def __setitem__(self, key, value): - super().__setitem__(self.subschema.lookup_attribute(key, fail_if_not_found=True), value) - - def __getitem__(self, key): - key = self.subschema.lookup_attribute(key, fail_if_not_found=True) - if key not in self: - super().__setitem__(key, []) - result = super().__getitem__(key) - if callable(result): - return result() - return result - - def setdefault(self, key, default=None): - key = self.subschema.lookup_attribute(key, fail_if_not_found=True) - return super().setdefault(key, default) - - def get(self, key, default=None): - key = self.subschema.lookup_attribute(key, fail_if_not_found=True) - if key in self: - return self[key] - return default - - def get_all(self, key): - result = [] - for attr in self.subschema.lookup_attribute_list(key): - result += self[attr] - return result - - def match_present(self, key): - attribute_type = self.subschema.lookup_attribute(key) - if attribute_type is None: - return FilterResult.UNDEFINED - if self[attribute_type] != []: - return FilterResult.TRUE - else: - return FilterResult.FALSE - - def match_equal(self, key, assertion_value): - attribute_type = self.subschema.lookup_attribute(key) - if attribute_type is None or attribute_type.equality is None: - return FilterResult.UNDEFINED - assertion_value = attribute_type.equality.syntax.decode(assertion_value) - if assertion_value is None: - return FilterResult.UNDEFINED - return any_3value(map(lambda attrval: attribute_type.equality.match_equal(attrval, assertion_value), self.get_all(key))) - - def match_substr(self, key, inital_substring, any_substrings, final_substring): - attribute_type = self.subschema.lookup_attribute(key) - if attribute_type is None or attribute_type.equality is None or attribute_type.substr is None: - return FilterResult.UNDEFINED - if inital_substring: - inital_substring = attribute_type.equality.syntax.decode(inital_substring) - if inital_substring is None: - return FilterResult.UNDEFINED - any_substrings = [attribute_type.equality.syntax.decode(substring) for substring in any_substrings] - if None in any_substrings: - return FilterResult.UNDEFINED - if final_substring: - final_substring = attribute_type.equality.syntax.decode(final_substring) - if final_substring is None: - return FilterResult.UNDEFINED - return any_3value(map(lambda attrval: attribute_type.substr.match_substr(attrval, inital_substring, any_substrings, final_substring), self.get_all(key))) - - def match_approx(self, key, assertion_value): - attribute_type = self.subschema.lookup_attribute(key) - if attribute_type is None or attribute_type.equality is None: - return FilterResult.UNDEFINED - assertion_value = attribute_type.equality.syntax.decode(assertion_value) - if assertion_value is None: - return FilterResult.UNDEFINED - return any_3value(map(lambda attrval: attribute_type.equality.match_approx(attrval, assertion_value), self.get_all(key))) - - def match_greater_or_equal(self, key, assertion_value): - attribute_type = self.subschema.lookup_attribute(key) - if attribute_type is None or attribute_type.ordering is None: - return FilterResult.UNDEFINED - assertion_value = attribute_type.ordering.syntax.decode(assertion_value) - if assertion_value is None: - return FilterResult.UNDEFINED - return any_3value(map(lambda attrval: attribute_type.ordering.match_greater_or_equal(attrval, assertion_value), self.get_all(key))) - - def match_less(self, key, assertion_value): - attribute_type = self.subschema.lookup_attribute(key) - if attribute_type is None or attribute_type.ordering is None: - return FilterResult.UNDEFINED - assertion_value = attribute_type.ordering.syntax.decode(assertion_value) - if assertion_value is None: - return FilterResult.UNDEFINED - return any_3value(map(lambda attrval: attribute_type.ordering.match_less(attrval, assertion_value), self.get_all(key))) - - def match_less_or_equal(self, key, assertion_value): - return any_3value((self.match_equal(key, assertion_value), - self.match_less(key, assertion_value))) - - def match_filter(self, filter_obj): - if isinstance(filter_obj, ldap.FilterAnd): - return all_3value(map(self.match_filter, filter_obj.filters)) - elif isinstance(filter_obj, ldap.FilterOr): - return any_3value(map(self.match_filter, filter_obj.filters)) - elif isinstance(filter_obj, ldap.FilterNot): - subresult = self.match_filter(filter_obj.filter) - if subresult == FilterResult.TRUE: - return FilterResult.FALSE - elif subresult == FilterResult.FALSE: - return FilterResult.TRUE - else: - return subresult - elif isinstance(filter_obj, ldap.FilterPresent): - return self.match_present(filter_obj.attribute) - elif isinstance(filter_obj, ldap.FilterEqual): - return self.match_equal(filter_obj.attribute, filter_obj.value) - elif isinstance(filter_obj, ldap.FilterSubstrings): - return self.match_substr(filter_obj.attribute, filter_obj.initial_substring, - filter_obj.any_substrings, filter_obj.final_substring) - elif isinstance(filter_obj, ldap.FilterApproxMatch): - return self.match_approx(filter_obj.attribute, filter_obj.value) - elif isinstance(filter_obj, ldap.FilterGreaterOrEqual): - return self.match_greater_or_equal(filter_obj.attribute, filter_obj.value) - elif isinstance(filter_obj, ldap.FilterLessOrEqual): - return self.match_less_or_equal(filter_obj.attribute, filter_obj.value) - else: - return FilterResult.UNDEFINED - -class Object(AttributeDict): - def __init__(self, subschema, dn, **attributes): - super().__init__(subschema, **attributes) - self.dn = DN(dn) - self.setdefault('subschemaSubentry', [self.subschema.dn]) - - def match_dn(self, basedn, scope): - if scope == ldap.SearchScope.baseObject: - return self.dn == basedn - elif scope == ldap.SearchScope.singleLevel: - return self.dn.is_direct_child_of(basedn) - elif scope == ldap.SearchScope.wholeSubtree: - return self.dn.in_subtree_of(basedn) - else: - return False - - def match_search(self, base_obj, scope, filter_obj): - return self.match_dn(DN.from_str(base_obj), scope) and self.match_filter(filter_obj) == FilterResult.TRUE - - def get_search_result_entry(self, attributes=None, types_only=False): - selected_attributes = set() - for selector in attributes or ['*']: - if selector == '*': - selected_attributes |= self.subschema.user_attribute_types - elif selector == '1.1': - continue - else: - attribute = self.subschema.lookup_attribute(selector) - if attribute is not None: - selected_attributes.add(attribute) - partial_attributes = [] - for attribute in self: - if attribute not in selected_attributes: - continue - values = self[attribute] - if values != []: - if types_only: - values = [] - partial_attributes.append(ldap.PartialAttribute(attribute.name, [attribute.syntax.encode(value) for value in values])) - return ldap.SearchResultEntry(str(self.dn), partial_attributes) - -class RootDSE(Object): - def __init__(self, subschema, *args, **kwargs): - super().__init__(subschema, DN(), *args, **kwargs) - - def match_search(self, base_obj, scope, filter_obj): - return not base_obj and scope == ldap.SearchScope.baseObject and \ - isinstance(filter_obj, ldap.FilterPresent) and \ - filter_obj.attribute.lower() == 'objectclass' - -class WildcardValue: - pass - -WILDCARD_VALUE = WildcardValue() - -class ObjectTemplate(AttributeDict): - def __init__(self, subschema, parent_dn, rdn_attribute, **attributes): - super().__init__(subschema, **attributes) - self.parent_dn = parent_dn - self.rdn_attribute = rdn_attribute - - def match_present(self, key): - attribute_type = self.subschema.lookup_attribute(key) - if attribute_type is None: - return FilterResult.UNDEFINED - values = self[attribute_type] - if values == []: - return FilterResult.FALSE - elif WILDCARD_VALUE in values: - return FilterResult.MAYBE_TRUE - else: - return FilterResult.TRUE - - def match_equal(self, key, assertion_value): - attribute_type = self.subschema.lookup_attribute(key) - if attribute_type is None or attribute_type.equality is None: - return FilterResult.UNDEFINED - assertion_value = attribute_type.equality.syntax.decode(assertion_value) - if assertion_value is None: - return FilterResult.UNDEFINED - values = self.get_all(key) - if WILDCARD_VALUE in values: - return FilterResult.MAYBE_TRUE - return any_3value(map(lambda attrval: attribute_type.equality.match_equal(attrval, assertion_value), values)) - - def match_substr(self, key, inital_substring, any_substrings, final_substring): - attribute_type = self.subschema.lookup_attribute(key) - if attribute_type is None or attribute_type.equality is None or attribute_type.substr is None: - return FilterResult.UNDEFINED - if inital_substring: - inital_substring = attribute_type.equality.syntax.decode(inital_substring) - if inital_substring is None: - return FilterResult.UNDEFINED - any_substrings = [attribute_type.equality.syntax.decode(substring) for substring in any_substrings] - if None in any_substrings: - return FilterResult.UNDEFINED - if final_substring: - final_substring = attribute_type.equality.syntax.decode(final_substring) - if final_substring is None: - return FilterResult.UNDEFINED - values = self.get_all(key) - if WILDCARD_VALUE in values: - return FilterResult.MAYBE_TRUE - return any_3value(map(lambda attrval: attribute_type.substr.match_substr(attrval, inital_substring, any_substrings, final_substring), values)) - - def match_approx(self, key, assertion_value): - attribute_type = self.subschema.lookup_attribute(key) - if attribute_type is None or attribute_type.equality is None: - return FilterResult.UNDEFINED - assertion_value = attribute_type.equality.syntax.decode(assertion_value) - if assertion_value is None: - return FilterResult.UNDEFINED - values = self.get_all(key) - if WILDCARD_VALUE in values: - return FilterResult.MAYBE_TRUE - return any_3value(map(lambda attrval: attribute_type.equality.match_approx(attrval, assertion_value), values)) - - def match_greater_or_equal(self, key, assertion_value): - attribute_type = self.subschema.lookup_attribute(key) - if attribute_type is None or attribute_type.ordering is None: - return FilterResult.UNDEFINED - assertion_value = attribute_type.ordering.syntax.decode(assertion_value) - if assertion_value is None: - return FilterResult.UNDEFINED - values = self.get_all(key) - if WILDCARD_VALUE in values: - return FilterResult.MAYBE_TRUE - return any_3value(map(lambda attrval: attribute_type.ordering.match_greater_or_equal(attrval, assertion_value), values)) - - def match_less(self, key, assertion_value): - attribute_type = self.subschema.lookup_attribute(key) - if attribute_type is None or attribute_type.ordering is None: - return FilterResult.UNDEFINED - assertion_value = attribute_type.ordering.syntax.decode(assertion_value) - if assertion_value is None: - return FilterResult.UNDEFINED - values = self.get_all(key) - if WILDCARD_VALUE in values: - return FilterResult.MAYBE_TRUE - return any_3value(map(lambda attrval: attribute_type.ordering.match_less(attrval, assertion_value), values)) - - def __extract_dn_constraints(self, basedn, scope): - if scope == ldap.SearchScope.baseObject: - if basedn[1:] != self.parent_dn or basedn.object_attribute != self.rdn_attribute: - return False, AttributeDict(self.subschema) - return True, AttributeDict(self.subschema, **{self.rdn_attribute: [basedn.object_value]}) - elif scope == ldap.SearchScope.singleLevel: - return basedn == self.parent_dn, AttributeDict(self.subschema) - elif scope == ldap.SearchScope.wholeSubtree: - if self.parent_dn.in_subtree_of(basedn): - return True, AttributeDict(self.subschema) - if basedn[1:] != self.parent_dn or basedn.object_attribute != self.rdn_attribute: - return False, AttributeDict(self.subschema) - return True, AttributeDict(self.subschema, **{self.rdn_attribute: [basedn.object_value]}) - else: - return False, AttributeDict(self.subschema) - - def match_dn(self, basedn, scope): - '''Return whether objects from this template might match the provided parameters''' - return self.__extract_dn_constraints(basedn, scope)[0] - - def extract_dn_constraints(self, basedn, scope): - return self.__extract_dn_constraints(basedn, scope)[1] - - def extract_filter_constraints(self, filter_obj): - if isinstance(filter_obj, ldap.FilterEqual): - attribute_type = self.subschema.lookup_attribute(filter_obj.attribute) - if attribute_type is None or attribute_type.equality is None: - return AttributeDict(self.subschema) - assertion_value = attribute_type.equality.syntax.decode(filter_obj.value) - if assertion_value is None: - return AttributeDict(self.subschema) - return AttributeDict(self.subschema, **{filter_obj.attribute: [assertion_value]}) - if isinstance(filter_obj, ldap.FilterAnd): - result = AttributeDict(self.subschema) - for subfilter in filter_obj.filters: - for name, values in self.extract_filter_constraints(subfilter).items(): - result[name] += values - return result - return AttributeDict(self.subschema) - - def match_search(self, base_obj, scope, filter_obj): - '''Return whether objects based on this template might match the search parameters''' - return self.match_dn(DN.from_str(base_obj), scope) and self.match_filter(filter_obj) in (FilterResult.TRUE, FilterResult.MAYBE_TRUE) - - def extract_search_constraints(self, base_obj, scope, filter_obj): - constraints = self.extract_filter_constraints(filter_obj) - for key, values in self.extract_dn_constraints(DN.from_str(base_obj), scope).items(): - constraints[key] += values - return constraints - - def create_object(self, rdn_value, **attributes): - obj = Object(self.subschema, DN(self.parent_dn, **{self.rdn_attribute: rdn_value})) - for key, values in attributes.items(): - if WILDCARD_VALUE not in self[key]: - raise ValueError(f'Cannot set attribute "{key}" that is not set to [WILDCARD_VALUE] in the template') - obj[key] = values - for attribute_type, values in self.items(): - if WILDCARD_VALUE not in values: - obj[attribute_type] = values - return obj - -class Subschema(Object): - def __init__(self, dn, object_classes=None, attribute_types=None, matching_rules=None, syntaxes=None): - # Setup schema data before calling super().__init__(), because we are our own schema +class Schema: + '''Collection of LDAP syntaxes, matching rules, attribute types and object + classes forming an LDAP schema.''' + def __init__(self, object_classes=None, attribute_types=None, matching_rules=None, syntaxes=None): attribute_types = list(attribute_types or []) matching_rules = list(matching_rules or []) syntaxes = list(syntaxes or []) @@ -644,31 +252,22 @@ class Subschema(Object): self.syntaxes = {} for syntax in syntaxes: self.syntaxes[syntax.oid] = syntax + self.object_class_definitions = [objectclass.schema_encoding for objectclass in self.object_classes.values()] + self.syntax_definitions = [syntax.encode_syntax_definition() for syntax in self.syntaxes.values()] + self.matching_rule_definitions = [matching_rule.encode_syntax_definition() for matching_rule in self.matching_rules.values()] + self.attribute_type_definitions = [attribute_type.schema_encoding for attribute_type in self.attribute_types.values()] - super().__init__(subschema=self, dn=dn) - # pylint: disable=invalid-name - self.AttributeDict = lambda **attributes: AttributeDict(self, **attributes) - self.Object = lambda dn, **attributes: Object(self, dn, **attributes) - self.RootDSE = lambda **attributes: RootDSE(self, **attributes) - self.ObjectTemplate = lambda *args, **kwargs: ObjectTemplate(self, *args, **kwargs) - self['objectClass'] = [objectclass.schema_encoding for objectclass in self.object_classes.values()] - self['ldapSyntaxes'] = [syntax.encode_syntax_definition() for syntax in self.syntaxes.values()] - self['matchingRules'] = [matching_rule.encode_syntax_definition() for matching_rule in self.matching_rules.values()] - self['attributeTypes'] = [attribute_type.schema_encoding for attribute_type in self.attribute_types.values()] - - def extend(self, *subschemas, dn=None, object_classes=None, attribute_types=None, matching_rules=None, syntaxes=None): - if dn is None: - dn = self.dn + def extend(self, *schemas, object_classes=None, attribute_types=None, matching_rules=None, syntaxes=None): object_classes = list(self.object_classes.values()) + list(object_classes or []) attribute_types = list(self.attribute_types.values()) + list(attribute_types or []) matching_rules = list(self.matching_rules.values()) + list(matching_rules or []) syntaxes = list(self.syntaxes.values()) + list(syntaxes or []) - for subschema in subschemas: - object_classes += list(subschema.object_classes.values()) - attribute_types += list(subschema.attribute_types.values()) - matching_rules += list(subschema.matching_rules.values()) - syntaxes += list(subschema.syntaxes.values()) - return Subschema(dn, object_classes, attribute_types, matching_rules, syntaxes) + for schema in schemas: + object_classes += list(schema.object_classes.values()) + attribute_types += list(schema.attribute_types.values()) + matching_rules += list(schema.matching_rules.values()) + syntaxes += list(schema.syntaxes.values()) + return Schema(object_classes, attribute_types, matching_rules, syntaxes) def lookup_attribute(self, oid_or_name, fail_if_not_found=False): if isinstance(oid_or_name, AttributeType): @@ -689,10 +288,3 @@ class Subschema(Object): if result is None: return [] return [result] - - def match_search(self, base_obj, scope, filter_obj): - return DN.from_str(base_obj) == self.dn and \ - scope == ldap.SearchScope.baseObject and \ - isinstance(filter_obj, ldap.FilterEqual) and \ - filter_obj.attribute.lower() == 'objectclass' and \ - filter_obj.value.lower() == b'subschema' diff --git a/ldapserver/server.py b/ldapserver/server.py index 76f0437..d554fb2 100644 --- a/ldapserver/server.py +++ b/ldapserver/server.py @@ -3,7 +3,7 @@ import ssl import socketserver import typing -from . import asn1, exceptions, ldap, schema +from . import asn1, exceptions, ldap, schema, objects def reject_critical_controls(controls=None): for control in controls or []: @@ -125,7 +125,7 @@ class LDAPRequestHandler(BaseLDAPRequestHandler): mechansims. Attributes can be accessed in a dict-like fashion. ''' - subschema = schema.RFC4519_SUBSCHEMA + subschema = objects.SubschemaSubentry(schema.RFC4519_SUBSCHEMA, 'cn=Subschema') ''' .. py:attribute:: subschema @@ -134,9 +134,6 @@ class LDAPRequestHandler(BaseLDAPRequestHandler): rules and objectclasses/attributetypes for the rootdse and subschema. It does not include objectclasses/attributetypes for actual data (e.g. users and groups). See :any:`Subschema` for details. - - If `subschema` is not `None`, the subschemaSubentry attribute is - automatically added to all results returned by :any:`do_search`. ''' static_objects = tuple() -- GitLab