import logging from datetime import timedelta from rest_framework import authentication from rest_framework.response import Response from rest_framework.views import APIView from django.db.models import F from django.http import HttpResponse, JsonResponse from django.utils.dateparse import parse_datetime from django.views.generic import View from core.models.assemblies import Assembly from core.models.conference import ConferenceExportCache, ConferenceTrack from core.models.events import Event from core.models.rooms import Room from core.schedules.base import filter_additional_data, schedule_time_to_timedelta from api.permissions import IsApiUserOrReadOnly from api.schedule import Schedule, ScheduleEncoder from api.views.mixins import ConferenceSlugMixin logger = logging.getLogger(__name__) class BaseScheduleView(ConferenceSlugMixin, View): filters = { 'name': ('name__icontains', 'str'), 'tags': ('tags__tag__slug__in', 'list'), 'room.tags': ('room__tags__tag__slug__in', 'list'), } def get(self, *args, **kwargs): filters = {} get_parameters = getattr(self.request, 'GET', {}) for parameter, (query_parameter, parameter_type) in self.filters.items(): value: str | bool | list[str] if value := get_parameters.get(parameter, self.kwargs.get(parameter)): match parameter_type: case 'bool': value = value == 'true' case 'list': value = value.split(',') value = [x.strip() for x in value] filters[query_parameter] = value req_format = 'json' if self.request.resolver_match.kwargs.get('format') == 'json' else 'xml' cache_id = f'{self.get_cache_id()}.{req_format}.{hash(str(filters))}' def gen_data(): schedule = Schedule(self.conference) events = self.get_events(filters) schedule.add_events(events) if req_format == 'json': return schedule.json() else: return schedule.xml() return ConferenceExportCache.handle_http_request( request=self.request, conference=self.conference, entry_type=ConferenceExportCache.Type.SCHEDULE, ident=cache_id, content_type=lambda: 'application/json' if req_format == 'json' else 'text/xml', result_func=gen_data, ) def get_cache_id(self): raise NotImplementedError('Overwrite .get_cache_id() in a descendant class!') def get_events(self): raise NotImplementedError('Overwrite .get_events() in a descendant class!') class ConferenceSchedule(BaseScheduleView): def get_cache_id(self, **kwargs): return '' def get_events(self, filter=None, **kwargs): # pylint: disable=redefined-builtin queryset = ( Event.objects.conference_accessible(conference=self.conference) .exclude(schedule_duration=None) .exclude(schedule_duration__lte=timedelta(minutes=5)) .order_by(F('assembly__is_official').desc(nulls_last=True), 'room__official_room_order', F('room__capacity').desc(nulls_last=True), 'name') ) if filter: queryset = queryset.filter(**filter) return queryset class AssemblySchedule(BaseScheduleView): def get_cache_id(self): assembly_id = self.request.resolver_match.kwargs.get('assembly') return f'assembly_{assembly_id}' def get_events(self, filter=None): # pylint: disable=redefined-builtin assembly_id = self.request.resolver_match.kwargs.get('assembly') queryset = ( Event.objects.conference_accessible(conference=self.conference) .filter(assembly_id=assembly_id) .order_by('room__room_type', F('room__capacity').desc(nulls_last=True), 'room__name', 'schedule_start') ) if filter: queryset = queryset.filter(**filter) return queryset class RoomSchedule(BaseScheduleView): def get_cache_id(self): room_id = self.request.resolver_match.kwargs.get('pk') return f'room-{room_id}' def get_events(self, filter=None): # pylint: disable=redefined-builtin room_id = self.request.resolver_match.kwargs.get('pk') queryset = Event.objects.conference_accessible(conference=self.conference).filter(room_id=room_id).order_by('schedule_start') if filter: queryset = queryset.filter(**filter) return queryset class EventSchedule(ConferenceSlugMixin, APIView): authentication_classes = [authentication.TokenAuthentication] permission_classes = [IsApiUserOrReadOnly] def get(self, request, pk, format=None, **kwargs): # pylint: disable=redefined-builtin event = Event.objects.associated_with_user(conference=self.conference, user=self.request.user, show_public=True).get(pk=pk) return Response(ScheduleEncoder().encode_event(event, self.conference.timezone)) def post(self, request, pk, format=None, **kwargs): # pylint: disable=redefined-builtin event = request.data if len(event) == 0: return Response({'error': 'No data.'}, status=400) try: obj = Event.objects.get(conference=self.conference, pk=pk) except Event.DoesNotExist: obj = Event(conference=self.conference, pk=pk, is_public=True) logger.warning('Event schedule POST: id %s did not exist yet, creating.', pk) try: if (event_guid := event.get('guid')) is not None: if event_guid != str(obj.pk): logger.warning('Attempted update of event %s with guid "%s".', obj.pk, event_guid) return JsonResponse({'error': 'GUID mismatch.'}, status=400) if (event_slug := event.get('slug')) is not None: obj.slug = event_slug if (event_roomid := event.get('room_id')) is not None: obj.room = Room.objects.get(conference=self.conference, pk=event_roomid) elif (event_room := event.get('room')) is not None: try: obj.room = Room.objects.get(conference=self.conference, name__iexact=event_room) except Room.MultipleObjectsReturned: return JsonResponse({'error': 'Room name is not unique, please provide room_id!'}, status=400) if (event_assemblyid := event.get('assembly_id')) is not None: obj.assembly = Assembly.objects.get(conference=self.conference, pk=event_assemblyid) elif (event_assemblyslug := event.get('assembly_slug')) is not None: obj.assembly = Assembly.objects.get(conference=self.conference, slug__iexact=event_assemblyslug) if obj.assembly_id is None: if obj.room_id is not None: obj.assembly = obj.room.assembly else: return JsonResponse({'error': 'Assembly association missing, please provide assembly_id or a valid room.'}, status=400) obj.kind = 'assembly' if not obj.room.assembly.is_official else 'official' if (event_public := event.get('public')) is not None: obj.is_public = event_public is True if (event_title := event.get('title')) is not None: obj.name = event_title if (event_language := event.get('language')) is not None: obj.language = event_language if (event_abstract := event.get('abstract')) is not None and (event_description := event.get('description')) is not None: obj.description = str(event_abstract) + '\n\n' + str(event_description) if (event_date := event.get('date')) is not None: obj.schedule_start = parse_datetime(event_date) if (event_duration := event.get('duration')) is not None: obj.schedule_duration = schedule_time_to_timedelta(event_duration) if (event_track := event.get('track')) is not None: obj.track = ConferenceTrack.objects.get(conference=self.conference, name__iexact=event_track) obj.additional_data = filter_additional_data(event) except Assembly.DoesNotExist: return Response({'error': f'Assembly {event.get("assembly_id")}/{event.get("assembly_slug")} does not exist.'}, status=400) except Room.DoesNotExist: return Response({'error': f'Room {event.get("room_id")}/{event.get("room")} does not exist.'}, status=400) except ConferenceTrack.DoesNotExist: return Response({'error': f'Track {event.get("track_id")}/{event.get("track")} does not exist.'}, status=400) obj.save() logger.info('Event %s updated via POST by %s', obj, request.user) return HttpResponse(status=201)