Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • uffd/uffd
  • rixx/uffd
  • thies/uffd
  • leona/uffd
  • enbewe/uffd
  • strifel/uffd
  • thies/uffd-2
7 results
Show changes
Showing
with 14 additions and 2235 deletions
import ldap3
from .types import LDAPCommitError
from . import base
class LDAP3Mapper:
def __init__(self, server=None, bind_dn=None, bind_password=None):
if not hasattr(type(self), 'server'):
self.server = server
if not hasattr(type(self), 'bind_dn'):
self.bind_dn = bind_dn
if not hasattr(type(self), 'bind_password'):
self.bind_password = bind_password
if not hasattr(type(self), 'session'):
self.session = base.Session()
class Model(base.Model):
ldap_mapper = self
class Attribute(base.Attribute):
ldap_mapper = self
class Relation(base.Relation):
ldap_mapper = self
class Backref(base.Backref):
ldap_mapper = self
self.Model = Model # pylint: disable=invalid-name
self.Attribute = Attribute # pylint: disable=invalid-name
self.Relation = Relation # pylint: disable=invalid-name
self.Backref = Backref # pylint: disable=invalid-name
def connect(self):
return ldap3.Connection(self.server, self.bind_dn, self.bind_password, auto_bind=True)
from copy import deepcopy
from ldap3.utils.conv import escape_filter_chars
from ldap3 import MODIFY_REPLACE, MODIFY_DELETE, MODIFY_ADD, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES
from .types import LDAPSet, LDAPCommitError
class Session:
def __init__(self):
self.__objects = {} # dn -> instance
self.__to_delete = []
self.__relations = {} # (srccls, srcattr, dn) -> {srcobj, ...}
def lookup(self, dn):
return self.__objects.get(dn)
def register(self, obj):
if obj.dn in self.__objects and self.__objects[obj.dn] != obj:
raise Exception()
self.__objects[obj.dn] = obj
return obj
def lookup_relations(self, srccls, srcattr, dn):
key = (srccls, srcattr, dn)
return self.__relations.get(key, set())
def update_relations(self, srcobj, srcattr, delete_dns=None, add_dns=None):
for dn in (delete_dns or []):
key = (type(srcobj), srcattr, dn)
self.__relations[key] = self.__relations.get(key, set())
self.__relations[key].discard(srcobj)
for dn in (add_dns or []):
key = (type(srcobj), srcattr, dn)
self.__relations[key] = self.__relations.get(key, set())
self.__relations[key].add(srcobj)
def add(self, obj):
self.register(obj)
def delete(self, obj):
if obj.dn in self.__objects:
del self.__objects[obj.dn]
self.__to_delete.append(obj)
def commit(self):
while self.__to_delete:
self.__to_delete.pop(0).ldap_delete()
for obj in list(self.__objects.values()):
if not obj.ldap_created:
obj.ldap_create()
elif obj.ldap_dirty:
obj.ldap_modify()
def rollback(self):
self.__to_delete.clear()
self.__objects = {dn: obj for dn, obj in self.__objects.items() if obj.ldap_created}
for obj in self.__objects.values():
if obj.ldap_dirty:
obj.ldap_reset()
class Model:
ldap_mapper = None # Overwritten by LDAP3Mapper
ldap_dn_attribute = None
ldap_dn_base = None
ldap_base = None
ldap_object_classes = None
ldap_filter = None
# Caution: Never mutate ldap_pre_create_hooks and ldap_relations, always reassign!
ldap_pre_create_hooks = []
ldap_relations = []
def __init__(self, _ldap_response=None, **kwargs):
self.ldap_session = self.ldap_mapper.session
self.ldap_relation_data = set()
self.__ldap_dn = None if _ldap_response is None else _ldap_response['dn']
self.__ldap_attributes = {}
for key, values in (_ldap_response or {}).get('attributes', {}).items():
if isinstance(values, list):
self.__ldap_attributes[key] = values
else:
self.__ldap_attributes[key] = [values]
self.__attributes = deepcopy(self.__ldap_attributes)
self.__changes = {}
for key, value, in kwargs.items():
if not hasattr(self, key):
raise Exception()
setattr(self, key, value)
for name in self.ldap_relations:
self.__update_relations(name, add_dns=self.__attributes.get(name, []))
def __update_relations(self, name, delete_dns=None, add_dns=None):
if name in self.ldap_relations:
self.ldap_session.update_relations(self, name, delete_dns, add_dns)
def ldap_getattr(self, name):
return self.__attributes.get(name, [])
def ldap_setattr(self, name, values):
self.__update_relations(name, delete_dns=self.__attributes.get(name, []))
self.__changes[name] = [(MODIFY_REPLACE, values)]
self.__attributes[name] = values
self.__update_relations(name, add_dns=values)
def ldap_attradd(self, name, value):
self.__changes[name] = self.__changes.get(name, []) + [(MODIFY_ADD, [value])]
self.__attributes[name].append(value)
self.__update_relations(name, add_dns=[value])
def ldap_attrdel(self, name, value):
self.__changes[name] = self.__changes.get(name, []) + [(MODIFY_DELETE, [value])]
if value in self.__attributes.get(name, []):
self.__attributes[name].remove(value)
self.__update_relations(name, delete_dns=[value])
def __repr__(self):
name = '%s.%s'%(type(self).__module__, type(self).__name__)
if self.__ldap_dn is None:
return '<%s>'%name
return '<%s %s>'%(name, self.__ldap_dn)
def build_dn(self):
if self.ldap_dn_attribute is None:
return None
if self.ldap_dn_base is None:
return None
if self.__attributes.get(self.ldap_dn_attribute) is None:
return None
return '%s=%s,%s'%(self.ldap_dn_attribute, escape_filter_chars(self.__attributes[self.ldap_dn_attribute][0]), self.ldap_dn_base)
@property
def dn(self):
if self.__ldap_dn is not None:
return self.__ldap_dn
return self.build_dn()
@classmethod
def ldap_get(cls, dn):
obj = cls.ldap_mapper.session.lookup(dn)
if obj is None:
conn = cls.ldap_mapper.connect()
conn.search(dn, cls.ldap_filter, attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES])
if not conn.response:
return None
if len(conn.response) != 1:
raise Exception()
obj = cls.ldap_mapper.session.register(cls(_ldap_response=conn.response[0]))
return obj
@classmethod
def ldap_all(cls):
conn = cls.ldap_mapper.connect()
conn.search(cls.ldap_base, cls.ldap_filter, attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES])
res = []
for entry in conn.response:
obj = cls.ldap_mapper.session.lookup(entry['dn'])
if obj is None:
obj = cls.ldap_mapper.session.register(cls(_ldap_response=entry))
res.append(obj)
return res
@classmethod
def ldap_filter_by_raw(cls, **kwargs):
filters = [cls.ldap_filter]
for key, value in kwargs.items():
filters.append('(%s=%s)'%(key, escape_filter_chars(value)))
conn = cls.ldap_mapper.connect()
conn.search(cls.ldap_base, '(&%s)'%(''.join(filters)), attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES])
res = []
for entry in conn.response:
obj = cls.ldap_mapper.session.lookup(entry['dn'])
if obj is None:
obj = cls.ldap_mapper.session.register(cls(_ldap_response=entry))
res.append(obj)
return res
@classmethod
def ldap_filter_by(cls, **kwargs):
_kwargs = {}
for key, value in kwargs.items():
attr = getattr(cls, key)
_kwargs[attr.name] = attr.encode(value)
return cls.ldap_filter_by_raw(**_kwargs)
def ldap_reset(self):
for name in self.ldap_relations:
self.__update_relations(name, delete_dns=self.__attributes.get(name, []))
self.__changes = {}
self.__attributes = deepcopy(self.__ldap_attributes)
for name in self.ldap_relations:
self.__update_relations(name, add_dns=self.__attributes.get(name, []))
@property
def ldap_dirty(self):
return bool(self.__changes)
@property
def ldap_created(self):
return bool(self.__ldap_attributes)
def ldap_modify(self):
if not self.ldap_created:
raise Exception()
if not self.ldap_dirty:
return
conn = self.ldap_mapper.connect()
success = conn.modify(self.dn, self.__changes)
if not success:
raise Exception()
self.__changes = {}
self.__ldap_attributes = deepcopy(self.__attributes)
def ldap_create(self):
if self.ldap_created:
raise Exception()
conn = self.ldap_mapper.connect()
for func in self.ldap_pre_create_hooks:
func(self)
success = conn.add(self.dn, self.ldap_object_classes, self.__attributes)
if not success:
raise LDAPCommitError()
self.__changes = {}
self.__ldap_attributes = deepcopy(self.__attributes)
def ldap_delete(self):
conn = self.ldap_mapper.connect()
success = conn.delete(self.dn)
if not success:
raise Exception()
self.__ldap_attributes = {}
class Attribute:
ldap_mapper = None # Overwritten by LDAP3Mapper
def __init__(self, name, multi=False, default=None, encode=None, decode=None, aliases=None):
self.name = name
self.multi = multi
self.encode = encode or (lambda x: x)
self.decode = decode or (lambda x: x)
self.default_values = default
self.aliases = aliases or []
def default(self, obj):
if obj.ldap_getattr(self.name) == []:
values = self.default_values
if callable(values):
values = values()
self.__set__(obj, values)
def additem(self, obj, value):
obj.ldap_attradd(self.name, value)
for name in self.aliases:
obj.ldap_attradd(name, value)
def delitem(self, obj, value):
obj.ldap_attradd(self.name, value)
for name in self.aliases:
obj.ldap_attradd(name, value)
def __set_name__(self, cls, name):
if self.default_values is not None:
cls.ldap_pre_create_hooks = cls.ldap_pre_create_hooks + [self.default]
def __get__(self, obj, objtype=None):
if obj is None:
return self
if self.multi:
return LDAPSet(getitems=lambda: obj.ldap_getattr(self.name),
additem=lambda value: self.additem(obj, value),
delitem=lambda value: self.delitem(obj, value),
encode=self.encode, decode=self.decode)
return self.decode((obj.ldap_getattr(self.name) or [None])[0])
def __set__(self, obj, values):
if not self.multi:
values = [values]
obj.ldap_setattr(self.name, [self.encode(value) for value in values])
for name in self.aliases:
obj.ldap_setattr(name, [self.encode(value) for value in values])
class Backref:
ldap_mapper = None # Overwritten by LDAP3Mapper
def __init__(self, srccls, srcattr):
self.srccls = srccls
self.srcattr = srcattr
srccls.ldap_relations = srccls.ldap_relations + [srcattr]
def init(self, obj):
if self.srcattr not in obj.ldap_relation_data and obj.ldap_created:
# The query instanciates all related objects that in turn add their relations to session
self.srccls.ldap_filter_by_raw(**{self.srcattr: obj.dn})
obj.ldap_relation_data.add(self.srcattr)
def __get__(self, obj, objtype=None):
if obj is None:
return self
self.init(obj)
return LDAPSet(getitems=lambda: obj.ldap_session.lookup_relations(self.srccls, self.srcattr, obj.dn),
additem=lambda value: value.ldap_attradd(self.srcattr, obj.dn),
delitem=lambda value: value.ldap_attrdel(self.srcattr, obj.dn))
def __set__(self, obj, values):
current = self.__get__(obj)
current.clear()
for value in values:
current.add(value)
class Relation(Attribute):
ldap_mapper = None # Overwritten by LDAP3Mapper
def __init__(self, name, dest, backref=None):
super().__init__(name, multi=True, encode=lambda value: value.dn, decode=dest.ldap_get)
self.name = name
self.dest = dest
self.backref = backref
def __set_name__(self, cls, name):
if self.backref is not None:
setattr(self.dest, self.backref, Backref(cls, self.name))
from .types import LDAPSet
class DB2LDAPBackref:
def __init__(self, baseattr, mapcls, backattr):
self.baseattr = baseattr
self.mapcls = mapcls
self.backattr = backattr
def getitems(self, ldapobj):
return {getattr(mapobj, self.backattr) for mapobj in self.mapcls.query.filter_by(dn=ldapobj.dn)}
def additem(self, ldapobj, dbobj):
if dbobj not in self.getitems(ldapobj):
getattr(dbobj, self.baseattr).append(self.mapcls(dn=ldapobj.dn))
def delitem(self, ldapobj, dbobj):
for mapobj in list(getattr(dbobj, self.baseattr)):
if mapobj.dn == ldapobj.dn:
getattr(dbobj, self.baseattr).remove(mapobj)
def __get__(self, ldapobj, objtype=None):
if ldapobj is None:
return self
return LDAPSet(getitems=lambda: self.getitems(ldapobj),
additem=lambda dbobj: self.additem(ldapobj, dbobj),
delitem=lambda dbobj: self.delitem(ldapobj, dbobj))
def __set__(self, ldapobj, dbobjs):
rel = self.__get__(ldapobj)
rel.clear()
for dbobj in dbobjs:
rel.add(dbobj)
class DB2LDAPRelation:
def __init__(self, baseattr, mapcls, ldapcls, backattr=None, backref=None):
self.baseattr = baseattr
self.mapcls = mapcls
self.ldapcls = ldapcls
if backref is not None:
setattr(ldapcls, backref, DB2LDAPBackref(baseattr, mapcls, backattr))
def getitems(self, dbobj):
return {mapobj.dn for mapobj in getattr(dbobj, self.baseattr)}
def additem(self, dbobj, dn):
if dn not in self.getitems(dbobj):
getattr(dbobj, self.baseattr).append(self.mapcls(dn=dn))
def delitem(self, dbobj, dn):
for mapobj in list(getattr(dbobj, self.baseattr)):
if mapobj.dn == dn:
getattr(dbobj, self.baseattr).remove(mapobj)
def __get__(self, dbobj, objtype=None):
if dbobj is None:
return self
return LDAPSet(getitems=lambda: self.getitems(dbobj),
additem=lambda dn: self.additem(dbobj, dn),
delitem=lambda dn: self.delitem(dbobj, dn),
encode=lambda ldapobj: ldapobj.dn,
decode=self.ldapcls.ldap_get)
def __set__(self, dbobj, ldapobjs):
getattr(dbobj, self.baseattr).clear()
for ldapobj in ldapobjs:
getattr(dbobj, self.baseattr).append(self.mapcls(dn=ldapobj.dn))
from collections.abc import MutableSet
class LDAPCommitError(Exception):
pass
class LDAPSet(MutableSet):
def __init__(self, getitems, additem, delitem, encode=None, decode=None):
self.__getitems = getitems
self.__additem = additem
self.__delitem = delitem
self.__encode = encode or (lambda x: x)
self.__decode = decode or (lambda x: x)
def __repr__(self):
return repr(set(self))
def __contains__(self, value):
return value is not None and self.__encode(value) in self.__getitems()
def __iter__(self):
return iter(filter(lambda obj: obj is not None, map(self.__decode, self.__getitems())))
def __len__(self):
return len(set(self))
def add(self, value):
if value not in self:
self.__additem(self.__encode(value))
def discard(self, value):
self.__delitem(self.__encode(value))
def update(self, values):
for value in values:
self.add(value)
import ldap3
from . import model, attribute, relationship
__all__ = ['LDAPMapper']
class LDAPMapper:
def __init__(self, server=None, bind_dn=None, bind_password=None):
class Model(model.Model):
ldap_mapper = self
self.Model = Model # pylint: disable=invalid-name
self.Session = model.Session # pylint: disable=invalid-name
self.Attribute = attribute.Attribute # pylint: disable=invalid-name
self.Relationship = relationship.Relationship # pylint: disable=invalid-name
self.Backreference = relationship.Backreference # pylint: disable=invalid-name
if not hasattr(type(self), 'server'):
self.server = server
if not hasattr(type(self), 'bind_dn'):
self.bind_dn = bind_dn
if not hasattr(type(self), 'bind_password'):
self.bind_password = bind_password
if not hasattr(type(self), 'session'):
self.session = self.Session(self.get_connection)
def get_connection(self):
return ldap3.Connection(self.server, self.bind_dn, self.bind_password, auto_bind=True)
from collections.abc import MutableSequence
class AttributeList(MutableSequence):
def __init__(self, ldap_object, name, aliases):
self.__ldap_object = ldap_object
self.__name = name
self.__aliases = [name] + aliases
def __get(self):
return list(self.__ldap_object.getattr(self.__name))
def __set(self, values):
for name in self.__aliases:
self.__ldap_object.setattr(name, values)
def __repr__(self):
return repr(self.__get())
def __setitem__(self, key, value):
tmp = self.__get()
tmp[key] = value
self.__set(tmp)
def __delitem__(self, key):
tmp = self.__get()
del tmp[key]
self.__set(tmp)
def __len__(self):
return len(self.__get())
def __getitem__(self, key):
return self.__get()[key]
def insert(self, index, value):
tmp = self.__get()
tmp.insert(index, value)
self.__set(tmp)
class Attribute:
def __init__(self, name, aliases=None, multi=False, default=None):
self.name = name
self.aliases = aliases or []
self.multi = multi
self.default = default
def add_hook(self, obj):
if obj.ldap_object.getattr(self.name) == []:
self.__set__(self.name, self.default() if callable(self.default) else self.default)
def __set_name__(self, cls, name):
if self.default is not None:
cls.ldap_add_hooks = cls.ldap_add_hooks + (self.add_hook,)
def __get__(self, obj, objtype=None):
if obj is None:
return self
if self.multi:
return AttributeList(obj.ldap_object, self.name, self.aliases)
return obj.ldap_object.getattr(self.name)
def __set__(self, obj, values):
if not self.multi:
values = [values]
for name in self.aliases:
obj.ldap_object.setattr(name, values)
from copy import deepcopy
from ldap3 import MODIFY_REPLACE, MODIFY_DELETE, MODIFY_ADD, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES
from ldap3.utils.conv import escape_filter_chars
def encode_filter(filter_params):
return '(&%s)'%(''.join(['(%s=%s)'%(attr, escape_filter_chars(value)) for attr, value in filter_params]))
def match_dn(dn, base):
return dn.endswith(base) # Probably good enougth for all valid dns
def make_cache_key(search_base, filter_params):
res = [search_base]
for attr, value in sorted(filter_params):
res.append((attr, value))
return res
class LDAPCommitError(Exception):
pass
class SessionState:
def __init__(self, objects=None, deleted_objects=None, references=None):
self.objects = objects or {}
self.deleted_objects = deleted_objects or {}
self.references = references or {} # {(attr_name, value): {srcobj, ...}, ...}
def copy(self):
return SessionState(deepcopy(self.objects), deepcopy(self.deleted_objects), deepcopy(self.references))
def ref(self, obj, attr, values):
for value in values:
key = (attr, value)
if key not in self.references:
self.references[key] = {obj}
else:
self.references[key].add(obj)
def unref(self, obj, attr, values):
for value in values:
self.references.get((attr, value), set()).discard(obj)
class ObjectState:
def __init__(self, session=None, attributes=None, dn=None):
self.session = session
self.attributes = attributes or {}
self.dn = dn
def copy(self):
return ObjectState(attributes=deepcopy(self.attributes), dn=self.dn, session=self.session)
class AddOperation:
def __init__(self, obj, dn, object_classes):
self.obj = obj
self.dn = dn
self.object_classes = object_classes
self.attributes = deepcopy(obj.state.attributes)
def apply_object(self, obj_state):
obj_state.dn = self.dn
obj_state.attributes = deepcopy(self.attributes)
def apply_session(self, session_state):
assert self.dn not in session_state.objects
session_state.objects[self.dn] = self.obj
for name, values in self.attributes.items():
session_state.ref(self.obj, name, values)
def apply_ldap(self, conn):
success = conn.add(self.dn, self.object_classes, self.attributes)
if not success:
raise LDAPCommitError()
class DeleteOperation:
def __init__(self, obj):
self.dn = obj.state.dn
self.obj = obj
self.attributes = deepcopy(obj.state.attributes)
def apply_object(self, obj_state):
obj_state.dn = None
def apply_session(self, session_state):
assert self.dn in session_state.objects
del session_state.objects[self.dn]
session_state.deleted_objects[self.dn] = self.obj
for name, values in self.attributes.items():
session_state.unref(self.obj, name, values)
def apply_ldap(self, conn):
success = conn.delete(self.dn)
if not success:
raise LDAPCommitError()
class ModifyOperation:
def __init__(self, obj, changes):
self.obj = obj
self.attributes = deepcopy(obj.state.attributes)
self.changes = deepcopy(changes)
def apply_object(self, obj_state):
for attr, changes in self.changes.items():
for action, values in changes:
if action == MODIFY_REPLACE:
obj_state.attributes[attr] = values
elif action == MODIFY_ADD:
obj_state.attributes[attr] += values
elif action == MODIFY_DELETE:
for value in values:
if value in obj_state.attributes[attr]:
obj_state.attributes[attr].remove(value)
def apply_session(self, session_state):
for attr, changes in self.changes.items():
for action, values in changes:
if action == MODIFY_REPLACE:
session_state.unref(self.obj, attr, self.attributes.get(attr, []))
session_state.ref(self.obj, attr, values)
elif action == MODIFY_ADD:
session_state.ref(self.obj, attr, values)
elif action == MODIFY_DELETE:
session_state.unref(self.obj, attr, values)
def apply_ldap(self, conn):
success = conn.modify(self.obj.state.dn, self.changes)
if not success:
raise LDAPCommitError()
class Session:
def __init__(self, get_connection):
self.get_connection = get_connection
self.committed_state = SessionState()
self.state = SessionState()
self.changes = []
self.cached_searches = set()
def add(self, obj, dn, object_classes):
if self.state.objects.get(dn) == obj:
return
assert obj.state.session is None
oper = AddOperation(obj, dn, object_classes)
oper.apply_object(obj.state)
obj.state.session = self
oper.apply_session(self.state)
self.changes.append(oper)
def delete(self, obj):
if obj.state.dn not in self.state.objects:
return
assert obj.state.session == self
oper = DeleteOperation(obj)
oper.apply_object(obj.state)
obj.state.session = None
oper.apply_session(self.state)
self.changes.append(oper)
def record(self, oper):
assert oper.obj.state.session == self
self.changes.append(oper)
def commit(self):
conn = self.get_connection()
while self.changes:
oper = self.changes.pop(0)
try:
oper.apply_ldap(conn)
except Exception as err:
self.changes.insert(0, oper)
raise err
oper.apply_object(oper.obj.committed_state)
oper.apply_session(self.committed_state)
self.committed_state = self.state.copy()
def rollback(self):
for obj in self.state.objects.values():
obj.state = obj.committed_state.copy()
for obj in self.state.deleted_objects.values():
obj.state = obj.committed_state.copy()
self.state = self.committed_state.copy()
self.changes.clear()
def get(self, dn, filter_params):
if dn in self.state.objects:
obj = self.state.objects[dn]
return obj if obj.matches(filter_params) else None
if dn in self.state.deleted_objects:
return None
conn = self.get_connection()
conn.search(dn, encode_filter(filter_params), attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES])
if not conn.response:
return None
assert len(conn.response) == 1
assert conn.response[0]['dn'] == dn
obj = Object(self, conn.response[0])
self.state.objects[dn] = obj
self.committed_state.objects[dn] = obj
for attr, values in obj.state.attributes.items():
self.state.ref(obj, attr, values)
return obj
def filter(self, search_base, filter_params):
if not filter_params:
matches = self.state.objects.values()
else:
submatches = [self.state.references.get((attr, value), set()) for attr, value in filter_params]
matches = submatches.pop(0)
while submatches:
matches = matches.intersection(submatches.pop(0))
res = [obj for obj in matches if match_dn(obj.state.dn, search_base)]
cache_key = make_cache_key(search_base, filter_params)
if cache_key in self.cached_searches:
return res
conn = self.get_connection()
conn.search(search_base, encode_filter(filter_params), attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES])
for response in conn.response:
dn = response['dn']
if dn in self.state.objects or dn in self.state.deleted_objects:
continue
obj = Object(self, response)
self.state.objects[dn] = obj
self.committed_state.objects[dn] = obj
for attr, values in obj.state.attributes.items():
self.state.ref(obj, attr, values)
res.append(obj)
self.cached_searches.add(cache_key)
return res
class Object:
def __init__(self, session=None, response=None):
if response is None:
self.committed_state = ObjectState()
else:
assert session is not None
attrs = {attr: value if isinstance(value, list) else [value] for attr, value in response['attributes'].items()}
self.committed_state = ObjectState(session, attrs, response['dn'])
self.state = self.committed_state.copy()
@property
def dn(self):
return self.state.dn
@property
def session(self):
return self.state.session
def getattr(self, name):
return self.state.attributes.get(name, [])
def setattr(self, name, values):
oper = ModifyOperation(self, {name: [(MODIFY_REPLACE, values)]})
oper.apply_object(self.state)
if self.state.session:
oper.apply_session(self.state.session.state)
self.state.session.changes.append(oper)
def attr_append(self, name, value):
oper = ModifyOperation(self, {name: [(MODIFY_ADD, [value])]})
oper.apply_object(self.state)
if self.state.session:
oper.apply_session(self.state.session.state)
self.state.session.changes.append(oper)
def attr_remove(self, name, value):
oper = ModifyOperation(self, {name: [(MODIFY_DELETE, [value])]})
oper.apply_object(self.state)
if self.state.session:
oper.apply_session(self.state.session.state)
self.state.session.changes.append(oper)
def match(self, filter_params):
for attr, value in filter_params:
if value not in self.getattr(attr):
return False
return True
from collections.abc import MutableSet
from .model import add_to_session
class DBRelationshipSet(MutableSet):
def __init__(self, dbobj, relattr, ldapcls):
self.__dbobj = dbobj
self.__relattr = relattr
self.__ldapcls = ldapcls
def __get_dns(self):
return [mapobj.dn for mapobj in getattr(self.__dbobj, self.__relattr)]
def __repr__(self):
return repr(set(self))
def __contains__(self, value):
if value is None or not isinstance(value, self.__ldapcls):
return False
return value.ldap_object.dn in self.__get_dns()
def __iter__(self):
return iter(filter(lambda obj: obj is not None, [self.__ldapcls.query.get(dn) for dn in self.__get_dns()]))
def __len__(self):
return len(set(self))
def add(self, value):
if not isinstance(value, self.__ldapcls):
raise TypeError()
if value.ldap_object.session is not None:
add_to_session(value, self.__ldapcls.ldap_mapper.session)
if value.ldap_object.dn not in self.__get_dns():
getattr(self.__dbobj, self.__relattr).append(self.__ldapcls(dn=value.ldap_object.dn))
def discard(self, value):
if not isinstance(value, self.__ldapcls):
raise TypeError()
rel = getattr(self.__dbobj, self.__relattr)
for mapobj in list(rel):
if mapobj.dn == value.ldap_object.dn:
rel.remove(mapobj)
class DBRelationship:
def __init__(self, relattr, ldapcls, mapcls, backref=None, backattr=None):
self.relattr = relattr
self.ldapcls = ldapcls
self.mapcls = mapcls
self.backref = backref
self.backattr = backattr
def __set_name__(self, cls, name):
if self.backref:
assert self.backattr
setattr(self.ldapcls, self.backref, DBBackreference(cls, self.relattr, self.mapcls, self.backattr))
def __get__(self, obj, objtype=None):
if obj is None:
return self
return DBRelationshipSet(obj, self.relattr, self.ldapcls)
def __set__(self, obj, values):
tmp = self.__get__(obj)
tmp.clear()
for value in values:
tmp.add(value)
class DBBackreferenceSet(MutableSet):
def __init__(self, ldapobj, dbcls, relattr, mapcls, backattr):
self.__ldapobj = ldapobj
self.__dbcls = dbcls
self.__relattr, = relattr
self.__mapcls = mapcls
self.__backattr = backattr
@property
def __dn(self):
return self.__ldapobj.ldap_object.dn
def __get(self):
return {getattr(mapobj, self.__backattr) for mapobj in self.__mapcls.query.filter_by(dn=self.__dn)}
def __repr__(self):
return repr(self.__get())
def __contains__(self, value):
return value in self.__get()
def __iter__(self):
return iter(self.__get())
def __len__(self):
return len(self.__get())
def add(self, value):
# TODO: add value to db session if necessary
if not isinstance(value, self.__dbcls):
raise TypeError()
rel = getattr(value, self.__relattr)
if self.__dn not in {mapobj.dn for mapobj in rel}:
rel.append(self.__mapcls(dn=self.__dn))
def discard(self, value):
if not isinstance(value, self.__dbcls):
raise TypeError()
rel = getattr(value, self.__relattr)
for mapobj in list(rel):
if mapobj.dn == self.__dn:
rel.remove(mapobj)
class DBBackreference:
def __init__(self, dbcls, relattr, mapcls, backattr):
self.dbcls = dbcls
self.relattr = relattr
self.mapcls = mapcls
self.backattr = backattr
def __get__(self, obj, objtype=None):
if obj is None:
return self
return DBBackreferenceSet(obj, self.dbcls, self.relattr, self.mapcls, self.backattr)
def __set__(self, obj, values):
tmp = self.__get__(obj)
tmp.clear()
for value in values:
tmp.add(value)
try:
# Added in v2.5
from ldap3.utils.dn import escape_rdn
except ImportError:
# From ldap3 source code, Copyright Giovanni Cannata, LGPL v3 license
def escape_rdn(rdn):
# '/' must be handled first or the escape slashes will be escaped!
for char in ['\\', ',', '+', '"', '<', '>', ';', '=', '\x00']:
rdn = rdn.replace(char, '\\' + char)
if rdn[0] == '#' or rdn[0] == ' ':
rdn = ''.join(('\\', rdn))
if rdn[-1] == ' ':
rdn = ''.join((rdn[:-1], '\\ '))
return rdn
from . import core
def add_to_session(obj, session):
for func in obj.ldap_add_hooks:
func(obj)
session.add(obj.ldap_object, obj.dn, obj.ldap_object_classes)
class Session:
def __init__(self, get_connection):
self.ldap_session = core.Session(get_connection)
def add(self, obj):
add_to_session(obj, self.ldap_session)
def delete(self, obj):
self.ldap_session.delete(obj.ldap_object)
def commit(self):
self.ldap_session.commit()
def rollback(self):
self.ldap_session.rollback()
def make_modelobj(obj, model):
if obj is None:
return None
if not hasattr(obj, 'model'):
obj.model = model()
obj.model.ldap_object = obj
if not isinstance(obj.model, model):
return None
return obj.model
def make_modelobjs(objs, model):
modelobjs = []
for obj in objs:
modelobj = make_modelobj(obj, model)
if modelobj is not None:
modelobjs.append(modelobj)
return modelobjs
class ModelQuery:
def __init__(self, model):
self.model = model
def get(self, dn):
session = self.model.ldap_mapper.session.ldap_session
return make_modelobj(session.get(dn, self.model.ldap_filter_params), self.model)
def all(self):
session = self.model.ldap_mapper.session.ldap_session
objs = session.filter(self.model.ldap_search_base, self.model.ldap_filter_params)
return make_modelobjs(objs, self.model)
def filter_by(self, **kwargs):
filter_params = self.model.ldap_filter_params + list(kwargs.items())
session = self.model.ldap_mapper.session.ldap_session
objs = session.filter(self.model.ldap_search_base, filter_params)
return make_modelobjs(objs, self.model)
class ModelQueryWrapper:
def __get__(self, obj, objtype=None):
return ModelQuery(objtype)
class Model:
# Overwritten by mapper
ldap_mapper = None
query = ModelQueryWrapper()
ldap_add_hooks = tuple()
# Overwritten by models
ldap_search_base = None
ldap_filter_params = None
ldap_object_classes = None
ldap_dn_base = None
ldap_dn_attribute = None
def __init__(self, **kwargs):
self.ldap_object = core.Object()
for key, value, in kwargs.items():
if not hasattr(self, key):
raise Exception()
setattr(self, key, value)
@property
def dn(self):
if self.ldap_object.dn is not None:
return self.ldap_object.dn
if self.ldap_dn_base is None or self.ldap_dn_attribute is None:
return None
values = self.ldap_object.getattr(self.ldap_dn_attribute)
if not values:
return None
return '%s=%s,%s'%(self.ldap_dn_attribute, escape_rdn(values[0]), self.ldap_dn_base)
def __repr__(self):
cls_name = '%s.%s'%(type(self).__module__, type(self).__name__)
if self.dn is not None:
return '<%s %s>'%(cls_name, self.dn)
return '<%s>'%cls_name
from collections.abc import MutableSet
from .model import make_modelobj, make_modelobjs, add_to_session
class UnboundObjectError(Exception):
pass
class RelationshipSet(MutableSet):
def __init__(self, ldap_object, name, model, destmodel):
self.__ldap_object = ldap_object
self.__name = name
self.__model = model
self.__destmodel = destmodel
def __modify_check(self, value):
if self.__ldap_object.session is None:
raise UnboundObjectError()
if not isinstance(value, self.__destmodel):
raise TypeError()
def __repr__(self):
return repr(set(self))
def __contains__(self, value):
if value is None or not isinstance(value, self.__destmodel):
return False
return value.ldap_object.dn in self.__ldap_object.getattr(self.__name)
def __iter__(self):
def get(dn):
return make_modelobj(self.__ldap_object.session.get(dn, self.__model.ldap_filter_params), self.__destmodel)
dns = set(self.__ldap_object.getattr(self.__name))
return iter(filter(lambda obj: obj is not None, map(get, dns)))
def __len__(self):
return len(set(self))
def add(self, value):
self.__modify_check(value)
if value.ldap_object.session is None:
add_to_session(value, self.__ldap_object.session)
assert value.ldap_object.session == self.__ldap_object.session
self.__ldap_object.attradd(self.__name, value.dn)
def discard(self, value):
self.__modify_check(value)
self.__ldap_object.attrdel(self.__name, value.dn)
class Relationship:
def __init__(self, name, destmodel, backref=None):
self.name = name
self.destmodel = destmodel
self.backref = backref
def __set_name__(self, cls, name):
if self.backref is not None:
setattr(self.destmodel, self.backref, Backreference(self.name, cls))
def __get__(self, obj, objtype=None):
if obj is None:
return self
return RelationshipSet(obj, self.name, type(obj), self.destmodel)
def __set__(self, obj, values):
tmp = self.__get__(obj)
tmp.clear()
for value in values:
tmp.add(value)
class BackreferenceSet(MutableSet):
def __init__(self, ldap_object, name, model, srcmodel):
self.__ldap_object = ldap_object
self.__name = name
self.__model = model
self.__srcmodel = srcmodel
def __modify_check(self, value):
if self.__ldap_object.session is None:
raise UnboundObjectError()
if not isinstance(value, self.__srcmodel):
raise TypeError()
def __get(self):
if self.__ldap_object.session is None:
return set()
filter_params = self.__srcmodel.filter_params + [(self.__name, self.__ldap_object.dn)]
objs = self.__ldap_object.session.filter(self.__srcmodel.ldap_search_base, filter_params)
return set(make_modelobjs(objs, self.__srcmodel))
def __repr__(self):
return repr(self.__get())
def __contains__(self, value):
return value in self.__get()
def __iter__(self):
return iter(self.__get())
def __len__(self):
return len(self.__get())
def add(self, value):
self.__modify_check(value)
if value.ldap_object.session is None:
add_to_session(value, self.__ldap_object.session)
assert value.ldap_object.session == self.__ldap_object.session
if self.__ldap_object.dn not in value.ldap_object.getattr(self.__name):
value.ldap_object.attradd(self.__name, self.__ldap_object.dn)
def discard(self, value):
self.__modify_check(value)
value.ldap_object.attrdel(self.__name, self.__ldap_object.dn)
class Backreference:
def __init__(self, name, srcmodel):
self.name = name
self.srcmodel = srcmodel
def __get__(self, obj, objtype=None):
if obj is None:
return self
return BackreferenceSet(obj, self.name, type(obj), self.srcmodel)
def __set__(self, obj, values):
tmp = self.__get__(obj)
tmp.clear()
for value in values:
tmp.add(value)
{
"entries": [
{
"dn": "uid=testuser,ou=users,dc=example,dc=com",
"raw": {
"cn": [
"Test User"
],
"createTimestamp": [
"20200101000000Z"
],
"creatorsName": [
"cn=admin,dc=example,dc=com"
],
"displayName": [
"Test User"
],
"entryDN": [
"uid=testuser,ou=users,dc=example,dc=com"
],
"entryUUID": [
"75e62c6a-03c2-11eb-adc1-0242ac120002"
],
"gidNumber": [
"20001"
],
"givenName": [
"Test User"
],
"hasSubordinates": [
"FALSE"
],
"homeDirectory": [
"/home/testuser"
],
"mail": [
"testuser@example.com"
],
"memberOf": [
"cn=uffd_access,ou=groups,dc=example,dc=com",
"cn=users,ou=groups,dc=example,dc=com"
],
"modifiersName": [
"cn=admin,dc=example,dc=com"
],
"modifyTimestamp": [
"20200101000000Z"
],
"objectClass": [
"top",
"inetOrgPerson",
"organizationalPerson",
"person",
"posixAccount"
],
"sn": [
" "
],
"structuralObjectClass": [
"inetOrgPerson"
],
"subschemaSubentry": [
"cn=Subschema"
],
"uid": [
"testuser"
],
"uidNumber": [
"10000"
],
"userPassword": [
"userpassword"
]
}
},
{
"dn": "uid=testadmin,ou=users,dc=example,dc=com",
"raw": {
"cn": [
"Test Admin"
],
"createTimestamp": [
"20200101000000Z"
],
"creatorsName": [
"cn=admin,dc=example,dc=com"
],
"displayName": [
"Test Admin"
],
"entryDN": [
"uid=testadmin,ou=users,dc=example,dc=com"
],
"entryUUID": [
"678c8470-03c2-11eb-adc1-0242ac120002"
],
"gidNumber": [
"20001"
],
"givenName": [
"Test Admin"
],
"hasSubordinates": [
"FALSE"
],
"homeDirectory": [
"/home/testadmin"
],
"mail": [
"testadmin@example.com"
],
"memberOf": [
"cn=users,ou=groups,dc=example,dc=com",
"cn=uffd_access,ou=groups,dc=example,dc=com",
"cn=uffd_admin,ou=groups,dc=example,dc=com"
],
"modifiersName": [
"cn=admin,dc=example,dc=com"
],
"modifyTimestamp": [
"20200101000000Z"
],
"objectClass": [
"top",
"inetOrgPerson",
"organizationalPerson",
"person",
"posixAccount"
],
"sn": [
" "
],
"structuralObjectClass": [
"inetOrgPerson"
],
"subschemaSubentry": [
"cn=Subschema"
],
"uid": [
"testadmin"
],
"uidNumber": [
"10001"
],
"userPassword": [
"adminpassword"
]
}
},
{
"dn": "cn=users,ou=groups,dc=example,dc=com",
"raw": {
"cn": [
"users"
],
"createTimestamp": [
"20200101000000Z"
],
"creatorsName": [
"cn=admin,dc=example,dc=com"
],
"description": [
"Base group for all users"
],
"entryDN": [
"cn=users,ou=groups,dc=example,dc=com"
],
"entryUUID": [
"1aec0e8c-03c3-11eb-adc1-0242ac120002"
],
"gidNumber": [
"20001"
],
"hasSubordinates": [
"FALSE"
],
"modifiersName": [
"cn=admin,dc=example,dc=com"
],
"modifyTimestamp": [
"20200101000000Z"
],
"objectClass": [
"posixGroup",
"groupOfUniqueNames",
"top"
],
"structuralObjectClass": [
"groupOfUniqueNames"
],
"subschemaSubentry": [
"cn=Subschema"
],
"uniqueMember": [
"cn=dummy,ou=system,dc=example,dc=com",
"uid=testuser,ou=users,dc=example,dc=com",
"uid=testadmin,ou=users,dc=example,dc=com"
]
}
},
{
"dn": "cn=uffd_access,ou=groups,dc=example,dc=com",
"raw": {
"cn": [
"uffd_access"
],
"createTimestamp": [
"20200101000000Z"
],
"creatorsName": [
"cn=admin,dc=example,dc=com"
],
"description": [
"User access to uffd selfservice"
],
"entryDN": [
"cn=uffd_access,ou=groups,dc=example,dc=com"
],
"entryUUID": [
"4fc8dd60-03c3-11eb-adc1-0242ac120002"
],
"gidNumber": [
"20002"
],
"hasSubordinates": [
"FALSE"
],
"modifiersName": [
"cn=admin,dc=example,dc=com"
],
"modifyTimestamp": [
"20200101000000Z"
],
"objectClass": [
"posixGroup",
"groupOfUniqueNames",
"top"
],
"structuralObjectClass": [
"groupOfUniqueNames"
],
"subschemaSubentry": [
"cn=Subschema"
],
"uniqueMember": [
"cn=dummy,ou=system,dc=example,dc=com",
"uid=testuser,ou=users,dc=example,dc=com",
"uid=testadmin,ou=users,dc=example,dc=com"
]
}
},
{
"dn": "cn=uffd_admin,ou=groups,dc=example,dc=com",
"raw": {
"cn": [
"uffd_admin"
],
"createTimestamp": [
"20200101000000Z"
],
"creatorsName": [
"cn=admin,dc=example,dc=com"
],
"description": [
"Admin access to uffd selfservice"
],
"entryDN": [
"cn=uffd_admin,ou=groups,dc=example,dc=com"
],
"entryUUID": [
"b5d869d6-03c3-11eb-adc1-0242ac120002"
],
"gidNumber": [
"20003"
],
"hasSubordinates": [
"FALSE"
],
"modifiersName": [
"cn=admin,dc=example,dc=com"
],
"modifyTimestamp": [
"20200101000000Z"
],
"objectClass": [
"posixGroup",
"groupOfUniqueNames",
"top"
],
"structuralObjectClass": [
"groupOfUniqueNames"
],
"subschemaSubentry": [
"cn=Subschema"
],
"uniqueMember": [
"cn=dummy,ou=system,dc=example,dc=com",
"uid=testadmin,ou=users,dc=example,dc=com"
]
}
},
{
"dn": "uid=test,ou=postfix,dc=example,dc=com",
"raw": {
"createTimestamp": [
"20200101000000Z"
],
"creatorsName": [
"cn=admin,dc=example,dc=com"
],
"entryDN": [
"uid=test,ou=postfix,dc=example,dc=com"
],
"entryUUID": [
"926e5273-a545-4dfe-8f20-d1eeaf41d796"
],
"hasSubordinates": [
"FALSE"
],
"mailacceptinggeneralid": [
"test1@example.com",
"test2@example.com"
],
"maildrop": [
"testuser@mail.example.com"
],
"modifiersName": [
"cn=admin,dc=example,dc=com"
],
"modifyTimestamp": [
"20200101000000Z"
],
"objectClass": [
"top",
"postfixVirtual"
],
"structuralObjectClass": [
"postfixVirtual"
],
"subschemaSubentry": [
"cn=Subschema"
],
"uid": [
"test"
]
}
}
]
}
version: 1
dn: uid=testuser,ou=users,dc=example,dc=com
objectClass: top
objectClass: organizationalPerson
objectClass: inetOrgPerson
objectClass: person
objectClass: posixAccount
cn: Test User
displayName: Test User
gidNumber: 20001
givenName: Test User
homeDirectory: /home/testuser
mail: testuser@example.com
sn:: IA==
uid: testuser
uidNumber: 10000
userPassword: {ssha512}P6mPgcE974bMZkYHnowsXheE74lqtR0HemVUjZxZT7cgPlEhE7fSU1DYEhOx1ZYhOTuE7Ei3EaMFSSoi9Jqf5MHHcjG9oVWL
dn: uid=testadmin,ou=users,dc=example,dc=com
objectClass: top
objectClass: organizationalPerson
objectClass: inetOrgPerson
objectClass: person
objectClass: posixAccount
cn: Test Admin
displayName: Test Admin
gidNumber: 20001
givenName: Test Admin
homeDirectory: /home/testadmin
mail: testadmin@example.com
sn:: IA==
uid: testadmin
uidNumber: 10001
userPassword: {ssha512}SGARsM9lNP9PQ4S+M/pmA7MIDvdyF9WZ8Ki2JvjvxIlMLene5+s+M+Qfi0lfJHOSqucd6CR0F7vDl32rEJNd1ZPCLbCO20pB
dn: uid=test,ou=postfix,dc=example,dc=com
objectClass: top
objectClass: postfixVirtual
uid: test
mailacceptinggeneralid: test1@example.com
mailacceptinggeneralid: test2@example.com
maildrop: testuser@mail.example.com
uid=testuser,ou=users,dc=example,dc=com
uid=testadmin,ou=users,dc=example,dc=com
uid=newuser,ou=users,dc=example,dc=com
uid=newuser1,ou=users,dc=example,dc=com
uid=newuser2,ou=users,dc=example,dc=com
uid=newuser3,ou=users,dc=example,dc=com
uid=newuser4,ou=users,dc=example,dc=com
uid=newuser5,ou=users,dc=example,dc=com
uid=newuser6,ou=users,dc=example,dc=com
uid=newuser7,ou=users,dc=example,dc=com
uid=newuser8,ou=users,dc=example,dc=com
uid=newuser9,ou=users,dc=example,dc=com
uid=newuser10,ou=users,dc=example,dc=com
uid=newuser11,ou=users,dc=example,dc=com
uid=newuser12,ou=users,dc=example,dc=com
uid=test,ou=postfix,dc=example,dc=com
uid=test1,ou=postfix,dc=example,dc=com
version: 1
dn: cn=users,ou=groups,dc=example,dc=com
changetype: modify
add: uniqueMember
uniqueMember: uid=testuser,ou=users,dc=example,dc=com
uniqueMember: uid=testadmin,ou=users,dc=example,dc=com
dn: cn=uffd_access,ou=groups,dc=example,dc=com
changetype: modify
add: uniqueMember
uniqueMember: uid=testuser,ou=users,dc=example,dc=com
uniqueMember: uid=testadmin,ou=users,dc=example,dc=com
dn: cn=uffd_admin,ou=groups,dc=example,dc=com
changetype: modify
add: uniqueMember
uniqueMember: uid=testadmin,ou=users,dc=example,dc=com
{
"raw": {
"altServer": [],
"configContext": [
"cn=config"
],
"entryDN": [
""
],
"namingContexts": [
"dc=example,dc=com"
],
"objectClass": [
"top",
"OpenLDAProotDSE"
],
"structuralObjectClass": [
"OpenLDAProotDSE"
],
"subschemaSubentry": [
"cn=Subschema"
],
"supportedCapabilities": [],
"supportedControl": [
"2.16.840.1.113730.3.4.18",
"2.16.840.1.113730.3.4.2",
"1.3.6.1.4.1.4203.1.10.1",
"1.3.6.1.1.22",
"1.2.840.113556.1.4.319",
"1.2.826.0.1.3344810.2.3",
"1.3.6.1.1.13.2",
"1.3.6.1.1.13.1",
"1.3.6.1.1.12"
],
"supportedExtension": [
"1.3.6.1.4.1.1466.20037",
"1.3.6.1.4.1.4203.1.11.1",
"1.3.6.1.4.1.4203.1.11.3",
"1.3.6.1.1.8"
],
"supportedFeatures": [
"1.3.6.1.1.14",
"1.3.6.1.4.1.4203.1.5.1",
"1.3.6.1.4.1.4203.1.5.2",
"1.3.6.1.4.1.4203.1.5.3",
"1.3.6.1.4.1.4203.1.5.4",
"1.3.6.1.4.1.4203.1.5.5"
],
"supportedLDAPVersion": [
"3"
],
"supportedSASLMechanisms": [
"DIGEST-MD5",
"CRAM-MD5",
"NTLM"
],
"vendorName": [],
"vendorVersion": []
},
"type": "DsaInfo"
}
This diff is collapsed.
location / {
uwsgi_pass unix:///run/uwsgi/app/uffd/socket;
include uwsgi_params;
}
location /static {
alias /usr/share/uffd/uffd/static;
}
#!/usr/bin/python3
from werkzeug.contrib.profiler import ProfilerMiddleware
from uffd import create_app
app = create_app()
app.wsgi_app = ProfilerMiddleware(app.wsgi_app, restrictions=[30])
app.run(debug=True)
[pytest]
filterwarnings =
# DeprecationWarning from dependencies that we use
ignore:`formatargspec` is deprecated since Python 3.5. Use `signature` and the `Signature` object directly:DeprecationWarning
ignore:Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3, and in 3.10 it will stop working:DeprecationWarning
# Versions from Debian Buster
ldap3==2.4.1
flask==1.0.2
Flask-SQLAlchemy==2.1
qrcode==6.1
fido2==0.5.0
Flask-OAuthlib==0.9.5
# The main dependencies on their own lead to version collisions and pip is
# not very good at resolving them, so we pin the versions from Debian Buster
# for all dependencies.
certifi==2018.8.24
#cffi==1.12.2
cffi # v1.12.2 no longer works with python3.9. Newer versions seem to work fine.
chardet==3.0.4
click==7.0
cryptography==2.6.1
idna==2.6
itsdangerous==0.24
Jinja2==2.10
MarkupSafe==1.1.0
oauthlib==2.1.0
pyasn1==0.4.2
pycparser==2.19
requests==2.21.0
requests-oauthlib==1.0.0
six==1.12.0
SQLAlchemy==1.2.18
urllib3==1.24.1
Werkzeug==0.14.1
.
# Testing
pytest==3.10.1
......@@ -36,3 +7,4 @@ attrs==18.2.0
more-itertools==4.2.0
pluggy==0.8.0
py==1.7.0
pyOpenSSL==19.0.0