Skip to content
Snippets Groups Projects
Select Git revision
  • b9198be2a42f7f30fac960b02d84e2dffe753dc7
  • develop default protected
  • ical-export
  • feature/audit_log
  • fix/index
  • badge-redeem-404
  • 720-schedule_source
  • room-docu
  • chore/event-views
  • 511-schedule-foo-fixed
  • 607-schedule-versions
  • deploy/curl-verbose
  • fix/public-badge-access-rights
  • 445-schedule-redirects
  • 623-wiki-im-baustellenmodus-sollte-mal-als-wiki-admin-trotzdem-seiten-anlegen-bearbeiten-konnen
  • fix/registration_mail_subject
  • feature/conference-query-set
  • feature/568-habitatmanagement
  • feat/unit-integration-tests
  • camp23-prod
  • production
  • prod-2024-12-27_20-15
  • prod-2024-12-27_16-37
  • prod-2024-12-27_16-01
  • prod-2024-12-27_13-29
  • prod-2024-12-27_00-34
  • prod-2024-12-26_21-45
  • prod-2024-12-26_13-12
  • prod-2024-12-26_00-21
  • prod-2024-12-25_21-04
  • prod-2024-12-25_15-54
  • prod-2024-12-25_01-29
  • prod-2024-12-24_14-48
  • prod-2024-12-23_23-39
  • prod-2024-12-22_21-12
  • prod-2024-12-22_17-25
  • prod-2024-12-22_01-34
  • prod-2024-12-22_00-55
  • prod-2024-12-21_13-42
  • prod-2024-12-21_10-44
  • prod-2024-12-20_12-25
41 results

schedulejson.py

Blame
  • Forked from hub / hub
    Source project has a limited visibility.
    core.py 8.71 KiB
    from ldap3 import MODIFY_REPLACE, MODIFY_DELETE, MODIFY_ADD, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES
    from ldap3.utils.conv import escape_filter_chars
    
    def encode_filter(filter_params):
    	return '(&%s)'%(''.join(['(%s=%s)'%(attr, escape_filter_chars(value)) for attr, value in filter_params]))
    
    def match_dn(dn, base):
    	return dn.endswith(base) # Probably good enougth for all valid dns
    
    def make_cache_key(search_base, filter_params):
    	res = (search_base,)
    	for attr, value in sorted(filter_params):
    		res += ((attr, value),)
    	return res
    
    class LDAPCommitError(Exception):
    	pass
    
    class SessionState:
    	def __init__(self, objects=None, deleted_objects=None, references=None):
    		self.objects = objects or {}
    		self.deleted_objects = deleted_objects or {}
    		self.references = references or {} # {(attr_name, value): {srcobj, ...}, ...}
    
    	def copy(self):
    		objects = self.objects.copy()
    		deleted_objects = self.deleted_objects.copy()
    		references = {key: objs.copy() for key, objs in self.references.items()}
    		return SessionState(objects, deleted_objects, references)
    
    	def ref(self, obj, attr, values):
    		for value in values:
    			key = (attr, value)
    			if key not in self.references:
    				self.references[key] = {obj}
    			else:
    				self.references[key].add(obj)
    
    	def unref(self, obj, attr, values):
    		for value in values:
    			self.references.get((attr, value), set()).discard(obj)
    
    class ObjectState:
    	def __init__(self, session=None, attributes=None, dn=None):
    		self.session = session
    		self.attributes = attributes or {}
    		self.dn = dn
    
    	def copy(self):
    		attributes = {name: values.copy() for name, values in self.attributes.items()}
    		return ObjectState(attributes=attributes, dn=self.dn, session=self.session)
    
    class AddOperation:
    	def __init__(self, obj, dn, object_classes):
    		self.obj = obj
    		self.dn = dn
    		self.object_classes = object_classes
    		self.attributes = {name: values.copy() for name, values in obj.state.attributes.items()}
    
    	def apply_object(self, obj_state):
    		obj_state.dn = self.dn
    		obj_state.attributes = {name: values.copy() for name, values in self.attributes.items()}
    
    	def apply_session(self, session_state):
    		assert self.dn not in session_state.objects
    		session_state.objects[self.dn] = self.obj
    		for name, values in self.attributes.items():
    			session_state.ref(self.obj, name, values)
    
    	def apply_ldap(self, conn):
    		success = conn.add(self.dn, self.object_classes, self.attributes)
    		if not success:
    			raise LDAPCommitError()
    
    class DeleteOperation:
    	def __init__(self, obj):
    		self.dn = obj.state.dn
    		self.obj = obj
    		self.attributes = {name: values.copy() for name, values in obj.state.attributes.items()}
    
    	def apply_object(self, obj_state):
    		obj_state.dn = None
    
    	def apply_session(self, session_state):
    		assert self.dn in session_state.objects
    		del session_state.objects[self.dn]
    		session_state.deleted_objects[self.dn] = self.obj
    		for name, values in self.attributes.items():
    			session_state.unref(self.obj, name, values)
    
    	def apply_ldap(self, conn):
    		success = conn.delete(self.dn)
    		if not success:
    			raise LDAPCommitError()
    
    class ModifyOperation:
    	def __init__(self, obj, changes):
    		self.obj = obj
    		self.attributes = {name: values.copy() for name, values in obj.state.attributes.items()}
    		self.changes = changes
    
    	def apply_object(self, obj_state):
    		for attr, changes in self.changes.items():
    			for action, values in changes:
    				if action == MODIFY_REPLACE:
    					obj_state.attributes[attr] = values
    				elif action == MODIFY_ADD:
    					obj_state.attributes[attr] += values
    				elif action == MODIFY_DELETE:
    					for value in values:
    						if value in obj_state.attributes[attr]:
    							obj_state.attributes[attr].remove(value)
    
    	def apply_session(self, session_state):
    		for attr, changes in self.changes.items():
    			for action, values in changes:
    				if action == MODIFY_REPLACE:
    					session_state.unref(self.obj, attr, self.attributes.get(attr, []))
    					session_state.ref(self.obj, attr, values)
    				elif action == MODIFY_ADD:
    					session_state.ref(self.obj, attr, values)
    				elif action == MODIFY_DELETE:
    					session_state.unref(self.obj, attr, values)
    
    	def apply_ldap(self, conn):
    		success = conn.modify(self.obj.state.dn, self.changes)
    		if not success:
    			raise LDAPCommitError()
    
    class Session:
    	def __init__(self, get_connection):
    		self.get_connection = get_connection
    		self.committed_state = SessionState()
    		self.state = SessionState()
    		self.changes = []
    		self.cached_searches = set()
    
    	def add(self, obj, dn, object_classes):
    		if self.state.objects.get(dn) == obj:
    			return
    		assert obj.state.session is None
    		oper = AddOperation(obj, dn, object_classes)
    		oper.apply_object(obj.state)
    		obj.state.session = self
    		oper.apply_session(self.state)
    		self.changes.append(oper)
    
    	def delete(self, obj):
    		if obj.state.dn not in self.state.objects:
    			return
    		assert obj.state.session == self
    		oper = DeleteOperation(obj)
    		oper.apply_object(obj.state)
    		obj.state.session = None
    		oper.apply_session(self.state)
    		self.changes.append(oper)
    
    	def record(self, oper):
    		assert oper.obj.state.session == self
    		self.changes.append(oper)
    
    	def commit(self):
    		conn = self.get_connection()
    		while self.changes:
    			oper = self.changes.pop(0)
    			try:
    				oper.apply_ldap(conn)
    			except Exception as err:
    				self.changes.insert(0, oper)
    				raise err
    			oper.apply_object(oper.obj.committed_state)
    			oper.apply_session(self.committed_state)
    		self.committed_state = self.state.copy()
    
    	def rollback(self):
    		for obj in self.state.objects.values():
    			obj.state = obj.committed_state.copy()
    		for obj in self.state.deleted_objects.values():
    			obj.state = obj.committed_state.copy()
    		self.state = self.committed_state.copy()
    		self.changes.clear()
    
    	def get(self, dn, filter_params):
    		if dn in self.state.objects:
    			obj = self.state.objects[dn]
    			return obj if obj.match(filter_params) else None
    		if dn in self.state.deleted_objects:
    			return None
    		conn = self.get_connection()
    		conn.search(dn, encode_filter(filter_params), attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES])
    		if not conn.response:
    			return None
    		assert len(conn.response) == 1
    		assert conn.response[0]['dn'] == dn
    		obj = Object(self, conn.response[0])
    		self.state.objects[dn] = obj
    		self.committed_state.objects[dn] = obj
    		for attr, values in obj.state.attributes.items():
    			self.state.ref(obj, attr, values)
    		return obj
    
    	def filter(self, search_base, filter_params):
    		if not filter_params:
    			matches = self.state.objects.values()
    		else:
    			submatches = [self.state.references.get((attr, value), set()) for attr, value in filter_params]
    			matches = submatches.pop(0)
    			while submatches:
    				matches = matches.intersection(submatches.pop(0))
    		res = [obj for obj in matches if match_dn(obj.state.dn, search_base)]
    		cache_key = make_cache_key(search_base, filter_params)
    		if cache_key in self.cached_searches:
    			return res
    		conn = self.get_connection()
    		conn.search(search_base, encode_filter(filter_params), attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES])
    		for response in conn.response:
    			dn = response['dn']
    			if dn in self.state.objects or dn in self.state.deleted_objects:
    				continue
    			obj = Object(self, response)
    			self.state.objects[dn] = obj
    			self.committed_state.objects[dn] = obj
    			for attr, values in obj.state.attributes.items():
    				self.state.ref(obj, attr, values)
    			res.append(obj)
    		self.cached_searches.add(cache_key)
    		return res
    
    class Object:
    	def __init__(self, session=None, response=None):
    		if response is None:
    			self.committed_state = ObjectState()
    		else:
    			assert session is not None
    			attrs = {attr: value if isinstance(value, list) else [value] for attr, value in response['attributes'].items()}
    			self.committed_state = ObjectState(session, attrs, response['dn'])
    		self.state = self.committed_state.copy()
    
    	@property
    	def dn(self):
    		return self.state.dn
    
    	@property
    	def session(self):
    		return self.state.session
    
    	def getattr(self, name):
    		return self.state.attributes.get(name, [])
    
    	def setattr(self, name, values):
    		oper = ModifyOperation(self, {name: [(MODIFY_REPLACE, values)]})
    		oper.apply_object(self.state)
    		if self.state.session:
    			oper.apply_session(self.state.session.state)
    			self.state.session.changes.append(oper)
    
    	def attr_append(self, name, value):
    		oper = ModifyOperation(self, {name: [(MODIFY_ADD, [value])]})
    		oper.apply_object(self.state)
    		if self.state.session:
    			oper.apply_session(self.state.session.state)
    			self.state.session.changes.append(oper)
    
    	def attr_remove(self, name, value):
    		oper = ModifyOperation(self, {name: [(MODIFY_DELETE, [value])]})
    		oper.apply_object(self.state)
    		if self.state.session:
    			oper.apply_session(self.state.session.state)
    			self.state.session.changes.append(oper)
    
    	def match(self, filter_params):
    		for attr, value in filter_params:
    			if value not in self.getattr(attr):
    				return False
    		return True