From a470ed2b6b5c694a28e5eb7081dae1e09f85350a Mon Sep 17 00:00:00 2001 From: Julian Rother <julian@jrother.eu> Date: Thu, 29 Jul 2021 16:05:36 +0200 Subject: [PATCH] Added tests for server --- tests/test_dummy.py | 8 --- tests/test_server.py | 141 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+), 8 deletions(-) delete mode 100644 tests/test_dummy.py create mode 100644 tests/test_server.py diff --git a/tests/test_dummy.py b/tests/test_dummy.py deleted file mode 100644 index 6308729..0000000 --- a/tests/test_dummy.py +++ /dev/null @@ -1,8 +0,0 @@ -import unittest - -# Test if imports work in tests -from ldapserver.dn import DN - -class TestDummy(unittest.TestCase): - def test_dummy(self): - DN('cn=foobar') == DN('CN=foobar') diff --git a/tests/test_server.py b/tests/test_server.py new file mode 100644 index 0000000..ca9b8ee --- /dev/null +++ b/tests/test_server.py @@ -0,0 +1,141 @@ +import unittest + +from ldapserver import BaseLDAPRequestHandler, SimpleLDAPRequestHandler, ldap + +class MockConnection: + def __init__(self, data, chunksize): + self.data = data + self.chunksize = chunksize + self.sent = b'' + + def recv(self, length): + length = min(length, self.chunksize) + chunk = self.data[:length] + self.data = self.data[length:] + return chunk + + def sendall(self, data): + self.sent += data + + def close(self): + pass + +class TestBaseLDAPRequestHandler(unittest.TestCase): + def test_handle(self): + req = bytes(ldap.LDAPMessage(messageID=1, protocolOp=ldap.SearchRequest())) + msglen = len(req) + req += bytes(ldap.LDAPMessage(messageID=2, protocolOp=ldap.SearchRequest())) + resp1 = bytes(ldap.LDAPMessage(messageID=1, protocolOp=ldap.SearchResultDone(ldap.LDAPResultCode.success))) + resp2 = bytes(ldap.LDAPMessage(messageID=2, protocolOp=ldap.SearchResultDone(ldap.LDAPResultCode.success))) + resp = resp1 + resp2 + # Chunking + conn = MockConnection(req, 4096) + BaseLDAPRequestHandler(conn, '', None).handle() + self.assertEqual(conn.sent, resp) + conn = MockConnection(req, msglen) + BaseLDAPRequestHandler(conn, '', None).handle() + self.assertEqual(conn.sent, resp) + conn = MockConnection(req, 1) + BaseLDAPRequestHandler(conn, '', None).handle() + self.assertEqual(conn.sent, resp) + conn = MockConnection(req, 15) + BaseLDAPRequestHandler(conn, '', None).handle() + self.assertEqual(conn.sent, resp) + # No data + conn = MockConnection(b'', 4096) + BaseLDAPRequestHandler(conn, '', None).handle() + self.assertEqual(conn.sent, b'') + # Hangup/incomplete message + conn = MockConnection(req[:-1], 4096) + BaseLDAPRequestHandler(conn, '', None).handle() + self.assertEqual(conn.sent, resp1) + # Invalid message + req = bytes(ldap.LDAPMessage(messageID=1, protocolOp=ldap.SearchRequest())) + req = req[:7] + b'0xab' + req[8:] + resp = bytes(ldap.LDAPMessage(messageID=1, protocolOp=ldap.SearchResultDone(ldap.LDAPResultCode.protocolError))) + conn = MockConnection(req, 4096) + BaseLDAPRequestHandler(conn, '', None).handle() + self.assertEqual(conn.sent, resp) + # Unrecoverable invalid message + conn = MockConnection(b'\x00\xff', 4096) + with self.assertRaises(ValueError): + BaseLDAPRequestHandler(conn, '', None).handle() + +class TestSimpleLDAPRequestHandler(unittest.TestCase): + def test_session_python_ldap3(self): + class RequestHandler(SimpleLDAPRequestHandler): + def handle(self): + pass + def do_bind_simple_authenticated(self, dn, password): + return dn == 'cn=service,ou=system,dc=example,dc=com' and password == b'foobar' + # conn = ldap3.Connection(server, 'cn=service,ou=system,dc=example,dc=com', 'foobar') + # conn.bind() + # conn.search('ou=users,dc=example,dc=com', '(uid=testuser)', attributes=[ldap3.ALL_ATTRIBUTES]) + # conn.unbind() + handler = RequestHandler(None, None, None) + resps = list(handler.handle_message(ldap.ShallowLDAPMessage.from_ber(b'08\x02\x01\x1e`3\x02\x01\x03\x04&cn=service,ou=system,dc=example,dc=com\x80\x06foobar')[0])) + self.assertEqual(len(resps), 1) + self.assertIsInstance(resps[0].protocolOp, ldap.BindResponse) + self.assertEqual(resps[0].protocolOp.resultCode, ldap.LDAPResultCode.success) + # ldap3 automatically fetches rootdse and subschema per default + resps = list(handler.handle_message(ldap.ShallowLDAPMessage.from_ber(b'0;\x02\x01\x1fc6\x04\x00\n\x01\x00\n\x01\x03\x02\x01\x00\x02\x01\x00\x01\x01\x00\x87\x0bobjectClass0\x16\x04\x11subschemaSubentry\x04\x01+')[0])) + self.assertEqual(len(resps), 2) + self.assertIsInstance(resps[0].protocolOp, ldap.SearchResultEntry) + self.assertIsInstance(resps[1].protocolOp, ldap.SearchResultDone) + self.assertEqual(resps[1].protocolOp.resultCode, ldap.LDAPResultCode.success) + resps = list(handler.handle_message(ldap.ShallowLDAPMessage.from_ber(b'0\x81\xe4\x02\x01 c\x81\xde\x04\x0ccn=Subschema\n\x01\x00\n\x01\x03\x02\x01\x00\x02\x01\x00\x01\x01\x00\xa3\x18\x04\x0bobjectClass\x04\tsubschema0\x81\xa4\x04\robjectClasses\x04\x0eattributeTypes\x04\x0cldapSyntaxes\x04\rmatchingRules\x04\x0fmatchingRuleUse\x04\x0fdITContentRules\x04\x11dITStructureRules\x04\tnameForms\x04\x0fcreateTimestamp\x04\x0fmodifyTimestamp\x04\x01*\x04\x01+')[0])) + self.assertEqual(len(resps), 2) + self.assertIsInstance(resps[0].protocolOp, ldap.SearchResultEntry) + self.assertIsInstance(resps[1].protocolOp, ldap.SearchResultDone) + self.assertEqual(resps[1].protocolOp.resultCode, ldap.LDAPResultCode.success) + resps = list(handler.handle_message(ldap.ShallowLDAPMessage.from_ber(b'0F\x02\x01!cA\x04\x1aou=users,dc=example,dc=com\n\x01\x02\n\x01\x03\x02\x01\x00\x02\x01\x00\x01\x01\x00\xa3\x0f\x04\x03uid\x04\x08testuser0\x03\x04\x01*')[0])) + self.assertEqual(len(resps), 1) + self.assertIsInstance(resps[0].protocolOp, ldap.SearchResultDone) + self.assertEqual(resps[0].protocolOp.resultCode, ldap.LDAPResultCode.success) + resps = list(handler.handle_message(ldap.ShallowLDAPMessage.from_ber(b'0\x05\x02\x01"B\x00')[0])) + self.assertEqual(len(resps), 0) + + def test_session_openldap_utils(self): + class RequestHandler(SimpleLDAPRequestHandler): + def handle(self): + pass + supports_sasl_plain = True + def do_bind_sasl_plain(self, identity, password, authzid=None): + return identity == 'service' and password == 'foobar' and (authzid is None or authzid == 'service') + # ldapsearch -x -b '' -s subtree '(&(objectClass=person)(memberof=cn=users,ou=groups,dc=example,dc=com))' + handler = RequestHandler(None, None, None) + resps = list(handler.handle_message(ldap.ShallowLDAPMessage.from_ber(b'0\x0c\x02\x01\x01`\x07\x02\x01\x03\x04\x00\x80\x00')[0])) + self.assertEqual(len(resps), 1) + self.assertIsInstance(resps[0].protocolOp, ldap.BindResponse) + self.assertEqual(resps[0].protocolOp.resultCode, ldap.LDAPResultCode.success) + resps = list(handler.handle_message(ldap.ShallowLDAPMessage.from_ber(b'0c\x02\x01\x02c^\x04\x00\n\x01\x02\n\x01\x00\x02\x01\x00\x02\x01\x00\x01\x01\x00\xa0I\xa3\x15\x04\x0bobjectClass\x04\x06person\xa30\x04\x08memberof\x04$cn=users,ou=groups,dc=example,dc=com0\x00')[0])) + self.assertEqual(len(resps), 1) + self.assertIsInstance(resps[0].protocolOp, ldap.SearchResultDone) + self.assertEqual(resps[0].protocolOp.resultCode, ldap.LDAPResultCode.success) + resps = list(handler.handle_message(ldap.ShallowLDAPMessage.from_ber(b'0\x05\x02\x01\x03B\x00')[0])) + self.assertEqual(len(resps), 0) + # ldapsearch -x -MM -b '' -s subtree '(objectClass=*)' + handler = RequestHandler(None, None, None) + resps = list(handler.handle_message(ldap.ShallowLDAPMessage.from_ber(b'0\x0c\x02\x01\x01`\x07\x02\x01\x03\x04\x00\x80\x00')[0])) + self.assertEqual(len(resps), 1) + self.assertIsInstance(resps[0].protocolOp, ldap.BindResponse) + self.assertEqual(resps[0].protocolOp.resultCode, ldap.LDAPResultCode.success) + resps = list(handler.handle_message(ldap.ShallowLDAPMessage.from_ber(b'0E\x02\x01\x02c \x04\x00\n\x01\x02\n\x01\x00\x02\x01\x00\x02\x01\x00\x01\x01\x00\x87\x0bobjectClass0\x00\xa0\x1e0\x1c\x04\x172.16.840.1.113730.3.4.2\x01\x01\xff')[0])) + self.assertEqual(len(resps), 1) + self.assertIsInstance(resps[0].protocolOp, ldap.SearchResultDone) + self.assertEqual(resps[0].protocolOp.resultCode, ldap.LDAPResultCode.unavailableCriticalExtension) + resps = list(handler.handle_message(ldap.ShallowLDAPMessage.from_ber(b'0\x05\x02\x01\x03B\x00')[0])) + self.assertEqual(len(resps), 0) + # ldapsearch -U service -X service -b '' -s base '(objectClass=*)' + handler = RequestHandler(None, None, None) + resps = list(handler.handle_message(ldap.ShallowLDAPMessage.from_ber(b'0>\x02\x01\x01c9\x04\x00\n\x01\x00\n\x01\x00\x02\x01\x00\x02\x01\x00\x01\x01\x00\x87\x0bobjectclass0\x19\x04\x17supportedSASLMechanisms')[0])) + self.assertEqual(len(resps), 2) + self.assertIsInstance(resps[0].protocolOp, ldap.SearchResultEntry) + self.assertIsInstance(resps[1].protocolOp, ldap.SearchResultDone) + self.assertEqual(resps[1].protocolOp.resultCode, ldap.LDAPResultCode.success) + resps = list(handler.handle_message(ldap.ShallowLDAPMessage.from_ber(b'0+\x02\x01\x02`&\x02\x01\x03\x04\x00\xa3\x1f\x04\x05PLAIN\x04\x16service\x00service\x00foobar')[0])) + self.assertEqual(len(resps), 1) + self.assertIsInstance(resps[0].protocolOp, ldap.BindResponse) + self.assertEqual(resps[0].protocolOp.resultCode, ldap.LDAPResultCode.success) + resps = list(handler.handle_message(ldap.ShallowLDAPMessage.from_ber(b'0\x05\x02\x01\x03B\x00')[0])) + self.assertEqual(len(resps), 0) -- GitLab