Select Git revision
schedule.py 17.46 KiB
# ruff: noqa: PLW2901
import json
import logging
import re
from collections import OrderedDict
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
from uuid import UUID
from lxml import etree as ET
from django.conf import settings
from django.db.models import Prefetch
from core.models.assemblies import Assembly
from core.models.conference import Conference, ConferenceMember
from core.models.events import Event, EventParticipant
from core.models.rooms import Room
from core.models.users import PlatformUser
if TYPE_CHECKING:
from django.db.models import QuerySet
logger = logging.getLogger(__name__)
# template for optimized attribute order
event_template = {
'guid': None,
'id': None,
'date': None,
'start': None,
'duration': None,
'room': None,
'slug': None,
'url': None,
'title': None,
'subtitle': None,
'language': None,
'track': None,
'type': 'other',
'abstract': None,
'description': None,
'logo': None,
'persons': [],
'links': [],
}
class Day:
parent: 'Schedule'
def __init__(self, day, parent: 'Schedule'):
self.day = day
self.rooms = OrderedDict()
self.parent = parent
def index(self):
return self.day.index
def room_exists(self, room: Room):
return room.name in self.rooms
def add_room(self, room: Room):
if room.name not in self.rooms:
self.rooms[room.name] = RoomDay(room)
self.parent.add_room(room)
def add_event(self, event: Event):
if event.room is None:
# silently ignore events without associated rooms
# TODO create pseudo rooms for SoS where location is string field
# TODO check for time overlaps in pseudo rooms, an create a new one if necessary
return
if not self.room_exists(event.room):
self.add_room(event.room)
self.rooms[event.room.name].add_event(event)
class RoomDay:
def __init__(self, room: Room):
self.room = room
self.events = []
def add_event(self, event: Event):
self.events.append(event)
class ScheduleEncoder(json.JSONEncoder):
tz = None
def encode_duration(self, duration: timedelta | None) -> str | None:
"""converts a python `timedelta` to the schedule xml timedelta string that represents this timedelta. ([d:]HH:mm)"""
if duration is None:
return None
days = duration.days
hours = duration.seconds // 3600
minutes = (duration.seconds % 3600) // 60
if days:
return f'{days}:{hours:02d}:{minutes:02d}'
return f'{hours:02d}:{minutes:02d}'
def encode_person(self, p: EventParticipant | PlatformUser | str | dict):
if isinstance(p, str):
return {'id': None, 'name': p, 'public_name': p}
if isinstance(p, PlatformUser):
# TODO: Update after deciding oh one or more conferences in #648
member: ConferenceMember = p.conferences.first() # TODO search for correct conference
name = p.get_display_name()
return {
'guid': p.uuid,
'name': name,
'public_name': name,
'avatar': p.avatar_url,
'biography': member.description if member else None,
# 'links': p.links, # TODO
'url': p.get_absolute_url(),
}
if isinstance(p, EventParticipant):
member: ConferenceMember = p.event.conference.users.filter(pk=p.participant.id).first()
name = p.participant.get_display_name()
return {
'guid': p.participant.uuid,
'name': name,
'public_name': name,
'avatar': p.participant.avatar_url,
'biography': member.description if member else '',
# 'links': p.participant.links, # TODO
'url': p.participant.get_absolute_url(),
}
# we assume it is a dict, but we normally should never get here
return {
'guid': p.get('guid', None),
'id': p.get('id', None),
'name': p.get('name', p.get('public_name')),
'avatar': p.get('avatar', None),
'biography': p.get('biography') or p.get('description', ''),
'links': p.get('links', []),
}
def collect_persons(self, event):
persons = []
if hasattr(event, 'speakers'):
# event is a QuerySet and already fetched speakers
persons = [speaker.participant for speaker in event.speakers]
else:
# direct event lookup -> fetch persons via public_speakers
persons = event.public_speakers
return [self.encode_person(person) for person in persons]
def encode_event(self, event: Event, tz=None):
start = event.schedule_start.astimezone(tz or self.tz) if event.schedule_start is not None else None
additional_data = event.additional_data or {}
legacy_id = additional_data.get('id') or int(re.sub('[^0-9]+', '', str(event.id))[0:6])
slug = f'{event.conference.slug}-{event.slug}'
if event.streaming == Event.Streaming.NO:
additional_data['do_not_stream'] = True
if event.recording == Event.Recording.NO:
additional_data['do_not_record'] = True
return {
**event_template,
'id': legacy_id,
'logo': settings.PLAINUI_BASE_URL + event.banner_image.url if event.banner_image and event.banner_image.url else None,
**additional_data,
# ATTENTION: if the description also exists in additional_data it is overwritten
'abstract': event.abstract,
'description': event.description,
'persons': self.collect_persons(event),
'slug': slug,
'url': event.get_absolute_url(),
'guid': event.id,
'date': start.isoformat() if start is not None else None,
'start': start.strftime('%H:%M') if start is not None else None,
'duration': self.encode_duration(event.schedule_duration),
'room': event.room.name if event.room is not None else None,
'title': event.name,
'language': event.language,
'track': event.track.name if event.track else None,
}
def encode_day(self, obj: Day):
return {
'index': obj.day.index,
'date': obj.day.start.strftime('%Y-%m-%d'),
'day_start': obj.day.start.isoformat(),
'day_end': obj.day.end.isoformat(),
'rooms': obj.rooms,
}
def encode_room_day(self, obj: RoomDay):
return obj.events
def encode_room_conference(self, obj: Room):
features = {}
if obj.recording_state != Room.RecordingState.UNKNOWN:
features['recording'] = obj.recording_state
return {
'name': obj.name,
'slug': obj.slug,
'guid': obj.id,
'type': obj.room_type,
'stream_id': obj.voc_stream_id,
'capacity': obj.capacity,
'description_en': obj.description_en,
'description_de': obj.description_de,
'features': features,
'assembly': obj.assembly if obj.assembly else None,
# Future TODO 'url': obj.get_absolute_url(),
}
def encode_assembly(self, obj: Assembly):
return {
'name': obj.name,
'slug': obj.slug,
'guid': obj.id,
# Future TODO: # 'type': obj., # cluster vs. virtual vs. ...?
# 'description_en': obj.description_en,
# 'description_de': obj.description_de,
# Future TODO 'url': obj.get_absolute_url(),
}
def transform(self, obj):
if isinstance(obj, Schedule):
self.tz = obj.tz
return {'$schema': 'https://c3voc.de/schedule/schema.json', 'schedule': obj._schedule}
if isinstance(obj, Day):
return self.encode_day(obj)
if isinstance(obj, RoomDay):
return self.encode_room_day(obj)
if isinstance(obj, Event):
return self.encode_event(obj)
if isinstance(obj, Room):
return self.encode_room_conference(obj)
if isinstance(obj, Assembly):
return self.encode_assembly(obj)
if isinstance(obj, UUID):
return str(obj)
return None
def default(self, obj):
r = self.transform(obj)
return json.JSONEncoder.default(self, obj) if r is None else r
class Schedule:
"""
Schedule class with import and export methods
"""
_schedule: dict
_rooms: dict[UUID:Room]
conference: Conference
def __init__(self, conference: Conference):
self.conference = conference
self.tz = conference.timezone
self._rooms = {}
self._schedule = {
'version': datetime.now(self.tz).strftime('%Y-%m-%d %H:%M'),
'base_url': settings.PLAINUI_BASE_URL, # 'https://events.ccc.de/' + conference.slug + '/',
'conference': {
'acronym': conference.slug,
'title': conference.name,
'start': conference.start.isoformat(),
'end': conference.end.isoformat(),
'daysCount': conference.days_count,
'timeslot_duration': '00:10',
'time_zone_name': self.tz.key if self.tz.key else datetime.now(self.tz).tzname(),
# 'logo': conference.logo.url if conference.logo else None,
# Future TODO: expose colors, etc.
# ruff: noqa
# 'colors': {
# 'primary': conference.color,
# 'background': '#000000',
# },
'url': settings.PLAINUI_BASE_URL,
'tracks': [{'name': track.name, 'color': track.color, 'slug': track.slug} for track in conference.tracks.all()],
'rooms': [],
'days': [Day(day, self) for day in conference.days],
},
}
def __getitem__(self, key):
return self._schedule[key]
def schedule(self):
return self._schedule
def version(self) -> str:
return self._schedule['version']
def days(self) -> list[Day]:
return self._schedule['conference']['days']
def day(self, day: int) -> Day:
return self._schedule['conference']['days'][day - 1]
def rooms(self):
return self._rooms.values()
# this method is called by the Day class, so we only add the room to the schedule if it is used by at least one event which is part of this export
def add_room(self, room: Room):
if room.id not in self._rooms:
self._rooms[room.id] = room
self._schedule['conference']['rooms'].append(room)
def add_rooms(self, rooms):
for day in self._schedule['conference']['days']:
for room in rooms:
day.add_room(room)
def add_events(self, events: 'QuerySet[Event]'):
events = events.select_related('track', 'room', 'assembly', 'conference').prefetch_related(
Prefetch(
# TODO: we have to prefetch the conference members here, as we need the description for the speakers
'participants',
queryset=EventParticipant.objects.filter(is_public=True, role=EventParticipant.Role.SPEAKER).select_related('participant'),
to_attr='speakers',
)
)
for event in events:
try: # noqa: SIM105
self.add_event(event)
except Warning:
# TODO log event and error
pass
def add_event(self, event: Event):
if event.schedule_start is None:
# silently ignore events without a start date
return
day = self.get_day_from_time(event.schedule_start)
day.add_event(event)
def get_day_from_time(self, start_time) -> Day:
for i in range(self.conference.days_count):
day = self.day(i + 1)
if day.day.start <= start_time < day.day.end:
return day
raise Warning(' illegal start time: ' + start_time.isoformat())
def __str__(self):
return json.dumps(self, indent=2, cls=ScheduleEncoder)
def json(self):
return json.dumps(self, cls=ScheduleEncoder)
# dict_to_etree from http://stackoverflow.com/a/10076823
def xml(self):
root_node = None
encoder = ScheduleEncoder()
def _set_attrib(tag, k, v):
if isinstance(v, str):
tag.set(k, v)
elif isinstance(v, (UUID, int)):
tag.set(k, str(v))
elif v is not None:
logger.error('unknown attribute type %s=%s', k, v)
def _to_etree(d, node, parent=''):
d = encoder.transform(d) or d
if d is None:
pass
elif isinstance(d, str):
node.text = d
elif isinstance(d, int):
node.text = str(d)
elif parent == 'person':
node.text = d.get('public_name') or d.get('name')
if 'id' in d:
_set_attrib(node, 'id', d['id'])
if 'guid' in d:
_set_attrib(node, 'guid', d['guid'])
elif isinstance(d, (OrderedDict, dict)):
if parent == 'schedule' and 'base_url' in d:
d['conference']['base_url'] = d['base_url']
del d['base_url']
# count variable is used to check how many items actually end as elements
# (as they are mapped to an attribute)
count = len(d)
recording_license = ''
for k, v in d.items():
if parent == 'day':
if k[:4] == 'day_':
# remove day_ prefix from items
k = k[4:]
elif parent == 'conference' and k == 'rooms':
# we ignore room map on conference in schedul.xml as we add the guid on the day.room level
continue
if parent == 'track' or parent == 'color' or k == 'id' or k == 'guid' or (parent == 'day' and isinstance(v, (str, int))):
_set_attrib(node, k, v)
count -= 1
elif k == 'url' and parent not in ['event', 'conference']:
_set_attrib(node, 'href', v)
count -= 1
elif count == 1 and isinstance(v, str):
node.text = v
else:
node_ = node
if parent == 'room':
# create room tag for each instance of a room name
node_ = ET.SubElement(node, 'room')
node_.set('name', k)
k = 'event'
if k == 'days':
# in the xml schedule days are not a child of a conference, but directly in the document node
node_ = root_node
# special handing for collections: days, rooms etc.
if k[-1:] == 's':
# don't ask me why the pentabarf schedule xml schema is so inconsistent --Andi
# create collection tag for specific tags, e.g. persons, links etc.
if parent == 'event':
node_ = ET.SubElement(node, k)
# remove last char (which is an s)
k = k[:-1]
# different notation for conference length in days
elif parent == 'conference' and k == 'daysCount':
k = 'days'
# special handling for recoding_licence and do_not_record flag
elif k == 'recording_license':
# store value for next loop iteration
recording_license = v
# skip forward to next loop iteration
continue
elif k == 'do_not_record':
k = 'recording'
v = OrderedDict([('license', recording_license), ('optout', 'true' if v else 'false')])
if isinstance(v, RoomDay):
_set_attrib(node_, 'guid', v.room.id)
for event in v.events:
_to_etree(event, ET.SubElement(node_, k), k)
elif isinstance(v, list):
for element in v:
_to_etree(element, ET.SubElement(node_, k), k)
# don't produce single empty room tag, as we have to create one for each room, see above
elif parent == 'day' and k == 'room':
_to_etree(v, node_, k)
else:
_to_etree(v, ET.SubElement(node_, k), k)
else:
raise Exception('unknown type', d, parent)
root_node = ET.Element('schedule')
root_node.set('{http://www.w3.org/2001/XMLSchema-instance}noNamespaceSchemaLocation', 'https://c3voc.de/schedule/schema.xsd')
_to_etree(self._schedule, root_node, 'schedule')
return ET.tounicode(root_node, doctype='<?xml version="1.0"?>')