From c3af2db29719e2c56eb1c36a1262d1e11d4d815c Mon Sep 17 00:00:00 2001 From: Julian Rother <julian@jrother.eu> Date: Wed, 28 Jul 2021 18:53:37 +0200 Subject: [PATCH] Fixed (and disabled) many more linter errors --- .pylintrc | 11 +++---- ldapserver/asn1.py | 4 +++ ldapserver/directory.py | 26 ++++++++--------- ldapserver/dn.py | 63 ++++++++++++++++++++--------------------- ldapserver/ldap.py | 6 ++-- ldapserver/server.py | 28 +++++++++--------- 6 files changed, 69 insertions(+), 69 deletions(-) diff --git a/.pylintrc b/.pylintrc index af17b43..c5388c2 100644 --- a/.pylintrc +++ b/.pylintrc @@ -64,6 +64,7 @@ disable=unused-argument, # Too many false-positives, we're implem too-few-public-methods, # Too many false-positives no-else-return, # Personal stylistic preference attribute-defined-outside-init, # False-positives with socketserver + no-self-use, # Too many false-positives missing-module-docstring, # Temporarily disabled missing-class-docstring, # Temporarily disabled missing-function-docstring, # Temporarily disabled @@ -245,14 +246,10 @@ good-names=i, k, a, b, - ex, - Run, + e, _, - db, - bp, - id, dn, - cn, + op, # Include a hint for the correct naming format with invalid-name. include-naming-hint=no @@ -500,7 +497,7 @@ max-parents=7 max-public-methods=30 # Maximum number of return / yield for function / method body. -max-returns=6 +max-returns=12 # Maximum number of statements in function / method body. max-statements=50 diff --git a/ldapserver/asn1.py b/ldapserver/asn1.py index fb6b2a4..c8b1e91 100644 --- a/ldapserver/asn1.py +++ b/ldapserver/asn1.py @@ -157,6 +157,7 @@ class Sequence(BERType): def __init__(self, *args, **kwargs): for index, spec in enumerate(type(self).sequence_fields): + # pylint: disable=consider-using-get,unused-variable field_type, name, default, optional = spec if index < len(args): value = args[index] @@ -168,6 +169,7 @@ class Sequence(BERType): def __repr__(self): args = [] + # pylint: disable=unused-variable for field_type, name, default, optional in type(self).sequence_fields: args.append('%s=%s'%(name, repr(getattr(self, name)))) return '<%s(%s)>'%(type(self).__name__, ', '.join(args)) @@ -179,6 +181,7 @@ class Sequence(BERType): raise ValueError() args = [] data = seqobj.content + # pylint: disable=unused-variable for field_type, name, default, optional in cls.sequence_fields: try: obj, data = field_type.from_ber(data) @@ -194,6 +197,7 @@ class Sequence(BERType): if not isinstance(obj, cls): raise TypeError() content = b'' + # pylint: disable=unused-variable for field_type, name, default, optional in cls.sequence_fields: if not optional or getattr(obj, name) is not None: content += field_type.to_ber(getattr(obj, name)) diff --git a/ldapserver/directory.py b/ldapserver/directory.py index a64df5c..5dc0411 100644 --- a/ldapserver/directory.py +++ b/ldapserver/directory.py @@ -5,7 +5,7 @@ from . import ldap class BaseDirectory: '''Base class for LDAP directories''' - def search(self, baseobj, scope, filter): + def search(self, baseobj, scope, filterobj): '''Perform search :param baseobj: Distinguished name of the LDAP entry relative to which the search is @@ -13,17 +13,17 @@ class BaseDirectory: :type baseobj: str :param scope: Search scope :type scope: SearchScope - :param filter: Filter object - :type filter: Filter + :param filterobj: Filter object + :type filterobj: Filter :returns: Iterable of dn, attributes tuples''' return [] class FilterMixin: '''Mixin for :any:`BaseDirectory` that implements :any:`BaseDirectory.search` by calling appropirate `filter_*` methods''' - def search(self, baseobj, scope, filter): + def search(self, baseobj, scope, filterobj): dn_res = self.filter_dn(baseobj, scope) - filter_res = self.search_filter(filter) + filter_res = self.search_filter(filterobj) return self.search_fetch(self.filter_and(dn_res, filter_res)) def search_fetch(self, result): @@ -113,7 +113,7 @@ class FilterMixin: ''' return False - def filter_dn(self, baseobj, scope): + def filter_dn(self, base, scope): ''' ''' return False @@ -158,10 +158,10 @@ class SimpleFilterMixin(FilterMixin): return False class RootDSE(BaseDirectory, AttributeDict): - def search(self, baseobj, scope, filter): + def search(self, baseobj, scope, filterobj): if baseobj or scope != ldap.SearchScope.baseObject: return [] - if not isinstance(filter, ldap.FilterPresent) or filter.attribute.lower() != 'objectclass': + if not isinstance(filterobj, ldap.FilterPresent) or filterobj.attribute.lower() != 'objectclass': return [] attrs = {} for name, values in self.items(): @@ -194,12 +194,12 @@ class Subschema(BaseDirectory, AttributeDict): self['objectClasses'] = list(objectclasses) self['attributeTypes'] = list(attributetypes) - def search(self, baseobj, scope, filter): + def search(self, baseobj, scope, filterobj): if DN(baseobj) != self.dn or scope != ldap.SearchScope.baseObject: return [] - if not isinstance(filter, ldap.FilterEqual): + if not isinstance(filterobj, ldap.FilterEqual): return [] - if filter.attribute.lower() != 'objectclass' or filter.value.lower() != b'subschema': + if filterobj.attribute.lower() != 'objectclass' or filterobj.value.lower() != b'subschema': return [] return [(str(self.dn), {key: [encode_attribute(value) for value in values] for key, values in self.items()})] @@ -238,7 +238,7 @@ class StaticDirectory(BaseDirectory): tmp[key] = [encode_attribute(value) for value in values] self.objects[DN(dn)] = tmp - def search(self, baseobj, scope, filter): + def search(self, baseobj, scope, filterobj): baseobj = DN(baseobj) for dn, attributes in self.objects.items(): if scope == ldap.SearchScope.baseObject: @@ -252,6 +252,6 @@ class StaticDirectory(BaseDirectory): continue else: continue - if not eval_ldap_filter(attributes, filter): + if not eval_ldap_filter(attributes, filterobj): continue yield str(dn), attributes diff --git a/ldapserver/dn.py b/ldapserver/dn.py index 709aa42..690fa32 100644 --- a/ldapserver/dn.py +++ b/ldapserver/dn.py @@ -11,27 +11,27 @@ def parse_assertion(expr, case_ignore_attrs=None): escaped = False tokens = [] token = b'' - for c in expr: + for char in expr: if hexdigit is not None: - if c not in HEXDIGITS: - raise ValueError('Invalid hexpair: \\%c%c'%(hexdigit, c)) - token += bytes.fromhex('%c%c'%(hexdigit, c)) + if char not in HEXDIGITS: + raise ValueError('Invalid hexpair: \\%s%s'%(hexdigit, char)) + token += bytes.fromhex('%s%s'%(hexdigit, char)) hexdigit = None elif escaped: escaped = False - if c in DN_SPECIAL or c == '\\': - token += c.encode() - elif c in HEXDIGITS: - hexdigit = c + if char in DN_SPECIAL or char == '\\': + token += char.encode() + elif char in HEXDIGITS: + hexdigit = char else: - raise ValueError('Invalid escape: \\%c'%c) - elif c == '\\': + raise ValueError('Invalid escape: \\%s'%char) + elif char == '\\': escaped = True - elif c == '=': + elif char == '=': tokens.append(token) token = b'' else: - token += c.encode() + token += char.encode() tokens.append(token) if len(tokens) != 2: raise ValueError('Invalid assertion in RDN: "%s"'%expr) @@ -48,17 +48,17 @@ def parse_rdn(rdn, case_ignore_attrs=None): escaped = False assertions = [] token = '' - for c in rdn: + for char in rdn: if escaped: escaped = False - token += c - elif c == '+': + token += char + elif char == '+': assertions.append(parse_assertion(token, case_ignore_attrs=case_ignore_attrs)) token = '' else: - if c == '\\': + if char == '\\': escaped = True - token += c + token += char assertions.append(parse_assertion(token, case_ignore_attrs=case_ignore_attrs)) if not assertions: raise ValueError('Invalid RDN "%s"'%rdn) @@ -70,38 +70,35 @@ def parse_dn(dn, case_ignore_attrs=None): escaped = False rdns = [] rdn = '' - for c in dn: + for char in dn: if escaped: escaped = False - rdn += c - elif c == ',': + rdn += char + elif char == ',': rdns.append(parse_rdn(rdn, case_ignore_attrs=case_ignore_attrs)) rdn = '' else: - if c == '\\': + if char == '\\': escaped = True - rdn += c + rdn += char rdns.append(parse_rdn(rdn, case_ignore_attrs=case_ignore_attrs)) return tuple(rdns) -# >>> parse_dn('OU=Sales+CN=J. Smith,DC=example,DC=net', case_ignore_attrs=['cn', 'ou', 'dc']) -# ((('cn', b'j. smith'), ('ou', b'sales')), (('dc', b'example'),), (('dc', b'net'),)) - def escape_dn_value(value): if isinstance(value, int): value = str(value) if isinstance(value, str): value = value.encode() res = '' - for c in value: - c = bytes((c,)) + for byte in value: + byte = bytes((byte,)) try: - s = c.decode() + chars = byte.decode() except UnicodeDecodeError: - s = '\\'+c.hex() - if s in DN_SPECIAL: - s = '\\'+s - res += s + chars = '\\'+byte.hex() + if chars in DN_SPECIAL: + chars = '\\'+chars + res += chars return res def build_assertion(assertion): @@ -156,7 +153,7 @@ class DN(tuple): return not rbase and len(rchild) == 1 def in_subtree_of(self, base): - rchild, rbase = self.strip_common_suffix(DN(base)) + rchild, rbase = self.strip_common_suffix(DN(base)) # pylint: disable=unused-variable return not rbase class RDN(tuple): diff --git a/ldapserver/ldap.py b/ldapserver/ldap.py index 9a39b79..2888d86 100644 --- a/ldapserver/ldap.py +++ b/ldapserver/ldap.py @@ -1,5 +1,6 @@ # Enums/constants/attributes are mostly named after RFCs -# pylint: disable=invalid-name +# No-members is raised for all the dynamically populated attributes +# pylint: disable=invalid-name,no-member import enum from . import asn1 @@ -93,6 +94,7 @@ class FilterNot(asn1.Sequence, Filter): (Filter, 'filter', None, False) ] + # pylint: disable=redefined-builtin def __init__(self, filter=None): super().__init__(filter=filter) @@ -291,7 +293,7 @@ class BindRequest(asn1.Sequence, ProtocolOp): sequence_fields = [ (asn1.Integer, 'version', 3, False), (LDAPString, 'name', '', False), - (AuthenticationChoice, 'authentication', lambda: SimpleAuthentication(), False) + (AuthenticationChoice, 'authentication', lambda: SimpleAuthentication(), False) # pylint: disable=unnecessary-lambda ] class BindResponse(asn1.Sequence, ProtocolOp): diff --git a/ldapserver/server.py b/ldapserver/server.py index a8b7607..afe2687 100644 --- a/ldapserver/server.py +++ b/ldapserver/server.py @@ -7,11 +7,11 @@ from . import asn1, exceptions, ldap, schema, directory def decode_msg(shallowmsg): try: return shallowmsg.decode()[0] - except: + except Exception as e: traceback.print_exc() - raise exceptions.LDAPProtocolError() + raise exceptions.LDAPProtocolError() from e -def reject_critical_controls(controls=None, supported_oids=[]): +def reject_critical_controls(controls=None, supported_oids=tuple()): for control in controls or []: if not control.criticality: continue @@ -68,9 +68,9 @@ class BaseLDAPRequestHandler(socketserver.BaseRequestHandler): raise exceptions.LDAPProtocolError() try: msg = decode_msg(shallowmsg) - except ValueError: + except ValueError as e: self.on_recv_invalid(shallowmsg) - raise exceptions.LDAPProtocolError() + raise exceptions.LDAPProtocolError() from e self.on_recv(msg) for args in handler(msg.protocolOp, msg.controls): response, controls = args if isinstance(args, tuple) else (args, None) @@ -80,7 +80,7 @@ class BaseLDAPRequestHandler(socketserver.BaseRequestHandler): respmsg = ldap.LDAPMessage(shallowmsg.messageID, response_type(e.code, diagnosticMessage=e.message)) self.on_send(respmsg) yield respmsg - except Exception as e: + except Exception as e: # pylint: disable=broad-except if response_type is not None: respmsg = ldap.LDAPMessage(shallowmsg.messageID, response_type(ldap.LDAPResultCode.other)) self.on_send(respmsg) @@ -430,7 +430,7 @@ class SimpleLDAPRequestHandler(BaseLDAPRequestHandler): yield ldap.SearchResultEntry(dn, pattributes) yield ldap.SearchResultDone(ldap.LDAPResultCode.success) - def do_search(self, baseobj, scope, filter): + def do_search(self, baseobj, scope, filterobj): '''Do LDAP SEARCH operation :param baseobj: Distinguished name of the LDAP entry relative to which the @@ -438,8 +438,8 @@ class SimpleLDAPRequestHandler(BaseLDAPRequestHandler): :type baseobj: str :param scope: Search scope :type scope: SearchScope - :param filter: Filter object - :type filter: Filter + :param filterobj: Filter object + :type filterobj: Filter :raises LDAPError: on error @@ -447,8 +447,8 @@ class SimpleLDAPRequestHandler(BaseLDAPRequestHandler): The default implementation returns matching objects from the root dse and the subschema.''' - yield from self.rootdse.search(baseobj, scope, filter) - yield from self.subschema.search(baseobj, scope, filter) + yield from self.rootdse.search(baseobj, scope, filterobj) + yield from self.subschema.search(baseobj, scope, filterobj) def handle_unbind(self, op, controls=None): reject_critical_controls(controls) @@ -462,7 +462,7 @@ class SimpleLDAPRequestHandler(BaseLDAPRequestHandler): yield ldap.ExtendedResponse(ldap.LDAPResultCode.success, responseName=ldap.EXT_STARTTLS_OID) try: self.do_starttls() - except Exception as e: + except Exception: # pylint: disable=broad-except traceback.print_exc() self.keep_running = False elif op.requestName == ldap.EXT_WHOAMI_OID and self.supports_whoami: @@ -473,10 +473,10 @@ class SimpleLDAPRequestHandler(BaseLDAPRequestHandler): # Password Modify Extended Operation (RFC 3062) newpw = None if op.requestValue is None: - newpw = self.do_passwd() + newpw = self.do_password_modify() else: decoded, _ = ldap.PasswdModifyRequestValue.from_ber(op.requestValue) - newpw = self.do_passwd(decoded.userIdentity, decoded.oldPasswd, decoded.newPasswd) + newpw = self.do_password_modify(decoded.userIdentity, decoded.oldPasswd, decoded.newPasswd) if newpw is None: yield ldap.ExtendedResponse(ldap.LDAPResultCode.success) else: -- GitLab