Skip to content
Snippets Groups Projects
Commit 1d24046c authored by Andreas Hubel's avatar Andreas Hubel
Browse files

draft: new ScheduleICal class to import events from an ical feed

parent 0037fb9f
No related tags found
No related merge requests found
import re
from urllib.parse import urlparse
import icalendar
import requests
from requests_file import FileAdapter
from .base import BaseScheduleSupport, filter_additional_data, schedule_time_to_timedelta
s = requests.Session()
s.mount('file://', FileAdapter())
class ScheduleICalSupport(BaseScheduleSupport):
identifier = 'schedule-ical'
readonly = True
configuration_fields = {
# 'key': (type, default value, mandatory, translation text)
'kind': ('string', 'assembly', False, 'kind of events, either "assembly" or "official" or "sos" or "lightning"'),
'headers': ('dict', {}, False, 'HTTP headers to send with the request e.g. Authorization'),
}
def ready(self):
r = s.head(self.remote_url)
return r.ok
def fetch(self):
"""
This method is the workhorse of the schedule support module:
its job is to query upstream for the current set of data.
It shall return a dictionary with keys 'rooms' and 'events',
each containing a dictionary with entries mapping a source id
to a dictionary which can be understood by Room.from_dict()
and Event.from_dict() respectively.
The hub will update events it already knows by the source id:
all events need to have an unique but stable identifier, i.e.
if the name of the event changes the identifier must not change!
"""
headers = {}
if self.conf_value('headers'):
headers = self.conf_value('headers')
schedule = ScheduleICal.from_url(self.remote_url, headers=headers)
instance = urlparse(schedule.get('base_url', self.remote_url))
host = f'//{instance.netloc}'
kind = self.conf_value('kind')
def ensure_full_url(uri):
if not uri:
return None
if not uri.startswith('http') and not uri.startswith('//'):
return f'{host}{uri}'
return uri
return {
'version': schedule.version(),
'rooms': {r['name']: r for r in schedule.rooms()},
'events': {
e.get('id'): {
'guid': e.get('guid'),
'slug': e.get('slug').split(f"{e.get('id')}-")[1][0:150].strip('-') or e.get('slug')[0:150].strip('-'),
'name': e.get('title'),
'language': e.get('language'),
'abstract': e.get('abstract') or '',
'description': e.get('description') or '',
'track': e.get('track'),
'room': e.get('room'),
'schedule_start': e.get('date'),
'schedule_duration': str(schedule_time_to_timedelta(e.get('duration'))),
'is_public': True,
'kind': kind,
'speakers': e.get('persons', []),
'banner_image_url': ensure_full_url(e.get('logo')),
'additional_data': filter_additional_data(e, self.computed_data(e)),
}
for e in schedule.events()
},
}
def computed_data(self, event: dict):
# TODO only add feedback_url if feedback is enabled via configuraiton_fields in ScheduleSource config
if self.conf_value('feedback'):
return {'feedback_url': f"{event['url']}feedback/"}
return {}
class ScheduleICal:
"""
Schedule from iCal feed
"""
_cal = None
_events = None
def __init__(self, cal, event_map=False):
self._cal = cal
# if event_map:
# self._events = {e.get('guid'): e for e in self.events()}
@classmethod
def from_url(cls, url, client=None, headers=None, event_map=False):
r = (client if client else s).get(url=re.sub(r'^webcal', 'http', url), headers=headers)
if r.ok is False:
raise Exception(f'Request failed, HTTP {r.status_code}.')
cal = icalendar.Calendar.from_ical(r.text)
# Close the raw file handle if it's still open
if hasattr(r, 'raw') and r.raw.closed is False:
r.raw.close()
return ScheduleICal(cal, event_map=event_map)
@classmethod
def event_to_dict(e: icalendar.Event, context: ScheduleICal) -> dict:
# title, subtitle, event_type = re.match(r"^(.+?)(?:( ?[:–] .+?))?(?: \((.+?)\))?$", e.name).groups()
(track,) = [str(c) for c in e.get('categories').cats] or [None]
begin = e['dtstart'].dt
end = e['dtend'].dt
duration = end - begin
return {
k: (v if isinstance(v, list) or v is None else str(v))
for k, v in {
'guid': gen_uuid(e['uid']),
'id': e['event-id'],
'title': e.get('summary'),
'subtitle': '',
'abstract': e['description'],
'description': '', # empty description for pretalx importer (temporary workaround)
'date': begin.isoformat(),
'start': begin.strftime('%H:%M'),
'duration': format_duration(duration),
'room': track, # context['name'],
'persons': [{**p, 'id': 0} for p in extract_persons(e)],
'track': track,
'language': 'de',
'type': 'Session',
'url': e.get('url', None),
}.items()
}
def __getitem__(self, key):
return self._cal[key]
def get(self, key, default=None):
return self._cal.get(key, default)
def schedule(self):
raise 'Not implemented'
def version(self):
raise 'Not implemented'
def days(self):
raise 'Not implemented'
def rooms(self):
# try to access the room dict from schedule.json gen 2021
rooms = self._schedule.get('conference', {}).get('rooms', [])
if rooms:
return list(rooms)
# looks like we have an older schudule.json (gen 2020), without a dedicated room list
# so we have use a fallback and iterate all days adding the rooms to a set, creating uniqueness
rooms = set()
for day in self.days():
for roomname in day.get('rooms'):
rooms.add(roomname)
return [{'name': name} for name in rooms]
def events(self):
for event in self._cal.walk('vevent'):
yield self.event(event)
def event(self, guid):
if guid in self._events:
return self._events[guid]
return None
def __str__(self):
return json.dumps(self._cal, indent=2)
def extract_persons(e: icalendar.Event) -> list:
person_str = str(e.get('location', '')).replace(' und ', '; ').strip()
# persons = re.split(r'\s*[,;/]\s*', person_str)
persons = re.split(r'[,;/](?![^()]*\))', person_str)
if len(persons) == 0:
return []
pattern = r'([^()]+)(?:\((\w{2,3}\s+)?([^)]*)\))'
result = []
for p in persons:
# p is either "name (org)" or or "name (org role)" or "name (name@org.tld)"
match = re.match(pattern, p)
if match:
name, org, role = match.groups()
if role and '@' in role:
match = re.search(r'@(.+)(\.de)?$', role)
org = match.group(1)
result.append({'name': name.strip(), 'org': org.strip(), 'email': role.strip(), 'guid': gen_person_uuid(role)})
else:
if not org:
if len(role) <= 3:
org = role
role = None
else:
# try catch `Distribution Cordinator, ZER` and split org
m = re.match(r'^(.+?), (\w{2,3})$', role)
if m:
org = m.group(2)
role = m.group(1)
if name:
result.append(
{
'name': name.strip(),
'org': org.strip() if org else None,
'role': role.strip() if role else None,
}
)
elif p:
result.append(
{
'name': p.strip(),
}
)
return result
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment