Skip to content
Snippets Groups Projects
Commit 8f14ee7a authored by Julian Rother's avatar Julian Rother
Browse files

Removed parts of the SASL support code (esp. MD5-DIGEST)

parent 6685f5c2
No related branches found
No related tags found
No related merge requests found
from . import digest_md5
import hashlib
import secrets
from ..exceptions import *
def _parse_token(s):
SEP = [b'(', b')', b'<', b'>', b'@', b',', b';', b':', b'\\', b'\'', b'/',
b'[', b']', b'?', b'=', b'{', b'}', b' ', b'\t']
CTL = [bytes([c]) for c in range(0, 31)] + [b'127']
for index in range(len(s)):
c = bytes([s[index]])
if c in SEP + CTL:
return bytes(s[:index]), bytes(s[index:])
return s, b''
def _parse_qstr(s):
if s[0] != b'"'[0]:
raise ValueError()
res = b''
escaped = False
for index in range(1, len(s)):
c = bytes([s[index]])
if escaped:
res += c
escaped = False
elif c == b'\\':
escaped = True
elif c == b'"':
return res, bytes(s[index+1:])
else:
res += c
raise ValueError()
def _parse_token_qstr(s):
if s[0] == b'"'[0]:
return _parse_qstr(s)
return _parse_token(s)
def _parse_kwargs(s):
res = []
while True:
key, s = _parse_token(s)
if s[0] != b'='[0]:
raise ValueError()
value, s = _parse_token_qstr(bytes(s[1:]))
res.append((key, value))
if not s:
return res
if s[0] != b','[0]:
raise ValueError()
s = bytes(s[1:])
return res
def _generate_nonce():
return secrets.token_urlsafe(1024).encode()
def _hexdigest(data):
ctx = hashlib.md5()
ctx.update(data)
return ctx.hexdigest().lower().encode()
def _handle_ldap_bind(get_credentials, get_nonce=_generate_nonce, initial_response=None):
# Defined by RFC2831 and RFC2829, obsoleted by RFC6331
nonce = get_nonce()
challenge = b'nonce="%s",charset="utf-8",algorithm="md5-sess"'%(nonce)
resp = yield challenge
args = {key: value for key, value in _parse_kwargs(resp)}
if args[b'nonce'] != nonce:
raise LDAPProtocolError()
try:
charset = 'utf-8' if args.get(b'charset', b'utf-8') == b'utf-8' else 'latin_1'
username = args[b'username']
realm = args.get(b'realm', b'')
cnonce = args[b'cnonce']
nc = args.get(b'nc', b'00000001')
qop = args.get(b'qop', b'auth')
digest_uri = args[b'digest-uri']
parts = digest_uri.decode(charset).split('/', 2)
serv_type = parts[0]
host = parts[1]
serv_name = parts[2] if len(parts) == 3 else None
authzid = args[b'authzid'].decode(charset) if b'authzid' in args else None
response = args[b'response']
except (KeyError, IndexError):
raise LDAPProtocolError()
except UnicodeError:
raise LDAPProtocolError()
if serv_type != 'ldap':
raise LDAPInvalidCredentials()
valid_credentials = get_credentials(username, realm, host, serv_name=None, authzid=None, charset=charset)
a2 = b'AUTHENTICATE:' + digest_uri
data = nonce + b':' + nc + b':' + cnonce + b':' + qop + b':' + _hexdigest(a2)
for bind_obj, pwdigest in valid_credentials:
a1 = pwdigest + b':' + nonce + b':' + cnonce
key = _hexdigest(a1)
expected_response = _hexdigest(key + b':' + data)
if expected_response != response:
continue
# We don't support subsequent authentication so according to RFC 2829 the
# serverSaslCreds field in our response should be absent and we should
# return (bind_obj, None). But this seems to confuse some clients (e.g.
# openldap's ldapsearch) so we return serverSaslCreds with rspauth instead.
a2 = b':' + digest_uri
data = nonce + b':' + nc + b':' + cnonce + b':' + qop + b':' + _hexdigest(a2)
response = b'rspauth=%s'%_hexdigest(key + b':' + data)
return bind_obj, response
raise LDAPInvalidCredentials()
def _encode_latin1_or_utf8(s):
try:
return s.encode('latin_1')
except UnicodeEncodeError:
pass
return s.encode('utf-8')
def credential_digest(username, realm, password):
'''Compute DIGEST-MD5-specific credential digest
:param username: Name of the user account
:type username: bytes or str
:param realm: Realm containing the user account
:type realm: bytes or str
:param password: Password for the user account
:type password: bytes or str
:returns: DIGEST-MD5-specific credential digest (16 bytes)
:rtype: bytes
Parameters passed as strings are encoded according to the special DIGEST-MD5
encoding rules (latin_1 whenever all characters can be encoded wit it, utf-8
otherwise).'''
if isinstance(username, str):
username = _encode_latin1_or_utf8(username)
if isinstance(realm, str):
realm = _encode_latin1_or_utf8(realm)
if isinstance(password, str):
password = _encode_latin1_or_utf8(password)
ctx = hashlib.md5()
ctx.update(username + b':' + realm + b':' + password)
return ctx.digest()
import struct
def get_peercred(sock):
'''Get pid and effective uid/gid of the process connected to an UNIX socket
:param sock: Socket to use
:type sock: socket.socket
:returns: pid, uid, gid as returned by SO_PEERCRED sockopt
:rtype: tuple (int, int, int)
This is commonly used for SASL "EXTERNAL" authentication on UNIX domain
sockets.'''
ucred = struct.Struct('III')
data = sock.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED, ucred.size)
pid, uid, gid = ucred.unpack(data)
return pid, uid, gid
...@@ -202,8 +202,6 @@ class SimpleLDAPRequestHandler(BaseLDAPRequestHandler): ...@@ -202,8 +202,6 @@ class SimpleLDAPRequestHandler(BaseLDAPRequestHandler):
res.append(b'PLAIN') res.append(b'PLAIN')
if self.supports_sasl_external: if self.supports_sasl_external:
res.append(b'EXTERNAL') res.append(b'EXTERNAL')
if self.supports_sasl_digest_md5:
res.append(b'DIGEST-MD5')
return res return res
def handle_bind(self, op, controls=None): def handle_bind(self, op, controls=None):
...@@ -356,8 +354,6 @@ class SimpleLDAPRequestHandler(BaseLDAPRequestHandler): ...@@ -356,8 +354,6 @@ class SimpleLDAPRequestHandler(BaseLDAPRequestHandler):
if credentials is not None: if credentials is not None:
credentials = credentials.decode() credentials = credentials.decode()
return self.do_bind_sasl_external(authzid=credentials), None return self.do_bind_sasl_external(authzid=credentials), None
if mechanism == 'DIGEST-MD5' and self.supports_sasl_digest_md5:
return sasl.digest_md5._handle_ldap_bind(self.do_bind_sasl_digest_md5, initial_response=credentials)
raise LDAPAuthMethodNotSupported() raise LDAPAuthMethodNotSupported()
supports_sasl_anonymous = False supports_sasl_anonymous = False
...@@ -419,44 +415,6 @@ class SimpleLDAPRequestHandler(BaseLDAPRequestHandler): ...@@ -419,44 +415,6 @@ class SimpleLDAPRequestHandler(BaseLDAPRequestHandler):
:any:`LDAPAuthMethodNotSupported` exception.''' :any:`LDAPAuthMethodNotSupported` exception.'''
raise LDAPAuthMethodNotSupported() raise LDAPAuthMethodNotSupported()
#:
supports_sasl_digest_md5 = False
def do_bind_sasl_digest_md5(self, username, realm, host, serv_name=None, authzid=None, charset='utf-8'):
'''Do LDAP BIND with SASL "DIGEST-MD5" mechanism (RFC 2829)
:param username: Name of the user account
:type username: bytes
:param realm: Realm containing the user account
:type realm: bytes
:param host: DNS host name or IP address for the requested service, should be verified
:type host: str
:param serv_name: Name of the service if it is replicated, see RFC 2829 for details
:type serv_name: str, optional
:param authzid: Authorization identity
:type authzid: str, optional
:param charset: Charset ("utf-8" or "latin_1") that username and realm are encoded with
:type charset: str
:returns: Pairs of bind objects and credential digests that are acceptable for username and realm
:rtype: [(obj, bytes), ...]
WARNING: "DIGEST-MD5" is insecure and was obsoleted by RFC 6331. It is only
implemented for completeness and widespread client support.
To implement this mechanism, passwords must be stored either unencrypted or
as mechanism-specific credential digests. Use :any:`sasl.digest_md5.credential_digest` to
generate the digest from username, realm and password.
Note that username and realm are passed as bytes instead of strings to
reduce the risk of encoding/normalization-related problems. Decode them with
`username.decode(charset)`. Make sure to pass the values as bytes to
`sasl.md5_digest.credential_digest` if possible.
Called by :any:`do_bind_sasl`. The default implementation raises an
:any:`LDAPInvalidCredentials` exception.'''
raise LDAPInvalidCredentials()
def handle_search(self, op, controls=None): def handle_search(self, op, controls=None):
for dn, attributes in self.do_search(op.baseObject, op.scope, op.filter): for dn, attributes in self.do_search(op.baseObject, op.scope, op.filter):
attributes = [PartialAttribute(name, values) for name, values in attributes.items()] attributes = [PartialAttribute(name, values) for name, values in attributes.items()]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment